Skip to main content

hitbox_backend/composition/
mod.rs

1//! Multi-tier caching by combining two backends.
2//!
3//! This backend implements a layered caching strategy where:
4//! - **L1** (first layer): Fast local cache (e.g., Moka)
5//! - **L2** (second layer): Distributed cache (e.g., Redis)
6//!
7//! # Policies
8//!
9//! Behavior is controlled by configurable policies. See the [`policy`] module for details.
10//!
11//! ## Read Policies
12//! - [`policy::SequentialReadPolicy`] - Try L1, then L2 on miss **(default)**
13//! - [`policy::RaceReadPolicy`] - Race both layers, return first hit
14//! - [`policy::ParallelReadPolicy`] - Query both in parallel, prefer fresher
15//!
16//! ## Write Policies
17//! - [`policy::SequentialWritePolicy`] - Write L1, then L2
18//! - [`policy::OptimisticParallelWritePolicy`] - Write both in parallel **(default)**
19//! - [`policy::RaceWritePolicy`] - Race both, background the slower
20//!
21//! ## Refill Policy
22//! - [`policy::RefillPolicy::Always`] - Populate L1 after L2 hit
23//! - [`policy::RefillPolicy::Never`] - Skip L1 population **(default)**
24//!
25//! # Example
26//!
27//! ```ignore
28//! use hitbox_backend::composition::{Compose, CompositionPolicy};
29//! use hitbox_backend::composition::policy::{RaceReadPolicy, RefillPolicy};
30//!
31//! // Default policies
32//! let cache = moka.compose(redis, offload);
33//!
34//! // Custom policies
35//! let policy = CompositionPolicy::new()
36//!     .read(RaceReadPolicy::new())
37//!     .refill(RefillPolicy::Always);
38//!
39//! let cache = moka.compose(redis, offload).with_policy(policy);
40//! ```
41
42pub mod compose;
43pub mod policy;
44
45mod context;
46mod envelope;
47mod format;
48
49pub use compose::Compose;
50pub use policy::CompositionPolicy;
51
52// Re-exports for submodules (not part of public API)
53pub(crate) use context::{CompositionContext, CompositionLayer};
54pub(crate) use format::CompositionFormat;
55
56use crate::format::Format;
57use crate::metrics::Timer;
58use crate::{
59    Backend, BackendError, BackendResult, CacheBackend, CacheKeyFormat, Compressor, DeleteStatus,
60    PassthroughCompressor,
61};
62use async_trait::async_trait;
63use envelope::CompositionEnvelope;
64use hitbox_core::{
65    BackendLabel, BoxContext, CacheContext, CacheKey, CacheStatus, CacheValue, Cacheable,
66    CacheableResponse, Offload, Raw, ResponseSource,
67};
68use policy::{
69    CompositionReadPolicy, CompositionWritePolicy, OptimisticParallelWritePolicy, ReadResult,
70    RefillPolicy, SequentialReadPolicy,
71};
72use smol_str::SmolStr;
73use std::sync::Arc;
74use thiserror::Error;
75
76/// Error type for composition backend operations.
77///
78/// This error type preserves errors from both cache layers for debugging,
79/// while keeping the implementation details encapsulated.
80#[derive(Debug, Error)]
81pub enum CompositionError {
82    /// Both L1 and L2 cache layers failed.
83    #[error("Both cache layers failed - L1: {l1}, L2: {l2}")]
84    BothLayersFailed {
85        /// Error from L1 layer
86        l1: BackendError,
87        /// Error from L2 layer
88        l2: BackendError,
89    },
90}
91
92/// A backend that composes two cache backends into a layered caching system.
93///
94/// The first backend (L1) is checked first on reads, and if not found,
95/// the second backend (L2) is checked. On writes, both backends are updated.
96///
97/// Each layer can use its own serialization format and compression since
98/// `CacheBackend` operates on typed data, not raw bytes.
99///
100/// Behavior can be customized via `CompositionReadPolicy`, `CompositionWritePolicy`, and `RefillPolicy` to control
101/// how reads, writes, and L1 refills are executed across the layers.
102pub struct CompositionBackend<
103    L1,
104    L2,
105    O,
106    R = SequentialReadPolicy,
107    W = OptimisticParallelWritePolicy,
108> where
109    L1: Backend,
110    L2: Backend,
111    O: Offload<'static>,
112    R: CompositionReadPolicy,
113    W: CompositionWritePolicy,
114{
115    /// First-layer cache (typically fast, local)
116    l1: L1,
117    /// Second-layer cache (typically distributed, persistent)
118    l2: L2,
119    /// Composition format
120    format: CompositionFormat,
121    /// Offload for background tasks
122    offload: O,
123    /// Read policy
124    read_policy: R,
125    /// Write policy
126    write_policy: W,
127    /// Refill policy
128    refill_policy: RefillPolicy,
129    /// Label of this backend for source path composition
130    label: BackendLabel,
131    /// Pre-computed metrics label for L1: "{label}.{l1.label()}"
132    l1_label: SmolStr,
133    /// Pre-computed metrics label for L2: "{label}.{l2.label()}"
134    l2_label: SmolStr,
135}
136
137/// Helper to compose a metrics label: "{prefix}.{suffix}"
138#[inline]
139fn compose_label(prefix: &str, suffix: &str) -> SmolStr {
140    SmolStr::from(format!("{}.{}", prefix, suffix))
141}
142
143impl<L1, L2, O> CompositionBackend<L1, L2, O, SequentialReadPolicy, OptimisticParallelWritePolicy>
144where
145    L1: Backend,
146    L2: Backend,
147    O: Offload<'static>,
148{
149    /// Creates a new composition backend with two layers using default policies.
150    ///
151    /// Default policies:
152    /// - Read: `SequentialReadPolicy` (try L1 first, then L2)
153    /// - Write: `OptimisticParallelWritePolicy` (write to both, succeed if ≥1 succeeds)
154    /// - Refill: `RefillPolicy::Never` (do not populate L1 after L2 hit)
155    ///
156    /// # Arguments
157    /// * `l1` - First-layer backend (checked first on reads)
158    /// * `l2` - Second-layer backend (checked if L1 misses)
159    /// * `offload` - Offload manager for background tasks (e.g., race policy losers)
160    pub fn new(l1: L1, l2: L2, offload: O) -> Self {
161        let label = BackendLabel::new_static("composition");
162        let l1_label = compose_label(label.as_str(), l1.label().as_str());
163        let l2_label = compose_label(label.as_str(), l2.label().as_str());
164        let format = CompositionFormat::new(
165            Arc::new(l1.value_format().clone_box()),
166            Arc::new(l2.value_format().clone_box()),
167            Arc::new(l1.compressor().clone_box()),
168            Arc::new(l2.compressor().clone_box()),
169            l1_label.clone(),
170            l2_label.clone(),
171        );
172        Self {
173            l1,
174            l2,
175            format,
176            offload,
177            read_policy: SequentialReadPolicy::new(),
178            write_policy: OptimisticParallelWritePolicy::new(),
179            refill_policy: RefillPolicy::default(),
180            label,
181            l1_label,
182            l2_label,
183        }
184    }
185}
186
187impl<L1, L2, O, R, W> CompositionBackend<L1, L2, O, R, W>
188where
189    L1: Backend,
190    L2: Backend,
191    O: Offload<'static>,
192    R: CompositionReadPolicy,
193    W: CompositionWritePolicy,
194{
195    /// Returns a reference to the read policy.
196    pub fn read_policy(&self) -> &R {
197        &self.read_policy
198    }
199
200    /// Returns a reference to the write policy.
201    pub fn write_policy(&self) -> &W {
202        &self.write_policy
203    }
204
205    /// Returns a reference to the refill policy.
206    pub fn refill_policy(&self) -> &RefillPolicy {
207        &self.refill_policy
208    }
209
210    /// Returns a reference to the offload manager.
211    pub fn offload(&self) -> &O {
212        &self.offload
213    }
214
215    /// Set a custom label for this backend.
216    ///
217    /// The label is used for source path composition in multi-layer caches.
218    /// For example, with label "cache", the source path might be "cache.L1".
219    pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
220        self.label = label.into();
221        // Recalculate labels with new label
222        self.l1_label = compose_label(self.label.as_str(), self.l1.label().as_str());
223        self.l2_label = compose_label(self.label.as_str(), self.l2.label().as_str());
224        // Update format labels too
225        self.format
226            .set_labels(self.l1_label.clone(), self.l2_label.clone());
227        self
228    }
229
230    /// Set all policies at once using CompositionPolicy builder.
231    ///
232    /// This is the preferred way to configure multiple policies.
233    ///
234    /// # Example
235    /// ```ignore
236    /// use hitbox_backend::{CompositionBackend, composition::CompositionPolicy};
237    /// use hitbox_backend::composition::policy::{RaceReadPolicy, SequentialWritePolicy, RefillPolicy};
238    ///
239    /// let policy = CompositionPolicy::new()
240    ///     .read(RaceReadPolicy::new())
241    ///     .write(SequentialWritePolicy::new())
242    ///     .refill(RefillPolicy::Always);
243    ///
244    /// let backend = CompositionBackend::new(l1, l2, offload)
245    ///     .with_policy(policy);
246    /// ```
247    pub fn with_policy<NewR, NewW>(
248        self,
249        policy: CompositionPolicy<NewR, NewW>,
250    ) -> CompositionBackend<L1, L2, O, NewR, NewW>
251    where
252        NewR: CompositionReadPolicy,
253        NewW: CompositionWritePolicy,
254    {
255        CompositionBackend {
256            l1: self.l1,
257            l2: self.l2,
258            format: self.format,
259            offload: self.offload,
260            read_policy: policy.read,
261            write_policy: policy.write,
262            refill_policy: policy.refill,
263            label: self.label,
264            l1_label: self.l1_label,
265            l2_label: self.l2_label,
266        }
267    }
268
269    /// Set the read policy (builder pattern).
270    ///
271    /// This consumes the backend and returns a new one with the updated read policy.
272    ///
273    /// # Example
274    /// ```ignore
275    /// use hitbox_backend::CompositionBackend;
276    /// use hitbox_backend::composition::policy::RaceReadPolicy;
277    ///
278    /// let backend = CompositionBackend::new(l1, l2, offload)
279    ///     .read(RaceReadPolicy::new());
280    /// ```
281    pub fn read<NewR: CompositionReadPolicy>(
282        self,
283        read_policy: NewR,
284    ) -> CompositionBackend<L1, L2, O, NewR, W> {
285        CompositionBackend {
286            l1: self.l1,
287            l2: self.l2,
288            format: self.format,
289            offload: self.offload,
290            read_policy,
291            write_policy: self.write_policy,
292            refill_policy: self.refill_policy,
293            label: self.label,
294            l1_label: self.l1_label,
295            l2_label: self.l2_label,
296        }
297    }
298
299    /// Set the write policy (builder pattern).
300    ///
301    /// This consumes the backend and returns a new one with the updated write policy.
302    ///
303    /// # Example
304    /// ```ignore
305    /// use hitbox_backend::CompositionBackend;
306    /// use hitbox_backend::composition::policy::SequentialWritePolicy;
307    ///
308    /// let backend = CompositionBackend::new(l1, l2, offload)
309    ///     .write(SequentialWritePolicy::new());
310    /// ```
311    pub fn write<NewW: CompositionWritePolicy>(
312        self,
313        write_policy: NewW,
314    ) -> CompositionBackend<L1, L2, O, R, NewW> {
315        CompositionBackend {
316            l1: self.l1,
317            l2: self.l2,
318            format: self.format,
319            offload: self.offload,
320            read_policy: self.read_policy,
321            write_policy,
322            refill_policy: self.refill_policy,
323            label: self.label,
324            l1_label: self.l1_label,
325            l2_label: self.l2_label,
326        }
327    }
328
329    /// Set the refill policy (builder pattern).
330    ///
331    /// This consumes the backend and returns a new one with the updated refill policy.
332    ///
333    /// # Example
334    /// ```ignore
335    /// use hitbox_backend::CompositionBackend;
336    /// use hitbox_backend::composition::policy::RefillPolicy;
337    ///
338    /// let backend = CompositionBackend::new(l1, l2, offload)
339    ///     .refill(RefillPolicy::Always);
340    /// ```
341    pub fn refill(mut self, refill_policy: RefillPolicy) -> Self {
342        self.refill_policy = refill_policy;
343        self
344    }
345}
346
347impl<L1, L2, O, R, W> Clone for CompositionBackend<L1, L2, O, R, W>
348where
349    L1: Clone + Backend,
350    L2: Clone + Backend,
351    O: Offload<'static>,
352    R: Clone + CompositionReadPolicy,
353    W: Clone + CompositionWritePolicy,
354{
355    fn clone(&self) -> Self {
356        Self {
357            l1: self.l1.clone(),
358            l2: self.l2.clone(),
359            format: self.format.clone(),
360            offload: self.offload.clone(),
361            read_policy: self.read_policy.clone(),
362            write_policy: self.write_policy.clone(),
363            refill_policy: self.refill_policy,
364            label: self.label.clone(),
365            l1_label: self.l1_label.clone(),
366            l2_label: self.l2_label.clone(),
367        }
368    }
369}
370
371impl<L1, L2, O, R, W> std::fmt::Debug for CompositionBackend<L1, L2, O, R, W>
372where
373    L1: std::fmt::Debug + Backend,
374    L2: std::fmt::Debug + Backend,
375    O: std::fmt::Debug + Offload<'static>,
376    R: std::fmt::Debug + CompositionReadPolicy,
377    W: std::fmt::Debug + CompositionWritePolicy,
378{
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        f.debug_struct("CompositionBackend")
381            .field("label", &self.label)
382            .field("l1", &self.l1)
383            .field("l2", &self.l2)
384            .field("format", &self.format)
385            .field("offload", &self.offload)
386            .field("read_policy", &self.read_policy)
387            .field("write_policy", &self.write_policy)
388            .field("refill_policy", &self.refill_policy)
389            .finish()
390    }
391}
392
393// Backend implementation for CompositionBackend
394// This implementation packs/unpacks CompositionEnvelope to enable
395// use as Box<dyn Backend> trait object
396//
397// PERFORMANCE NOTE: Negligible overhead - only metadata (expire/stale timestamps + envelope
398// discriminant) is serialized using bitcode. The already-serialized cached data (Bytes) is
399// copied into the buffer as-is without re-serialization. When using CompositionBackend
400// directly via CacheBackend::get/set, even this minimal envelope overhead is avoided.
401#[async_trait]
402impl<L1, L2, O, R, W> Backend for CompositionBackend<L1, L2, O, R, W>
403where
404    L1: Backend + Clone + Send + Sync + 'static,
405    L2: Backend + Clone + Send + Sync + 'static,
406    O: Offload<'static>,
407    R: CompositionReadPolicy,
408    W: CompositionWritePolicy,
409{
410    #[tracing::instrument(skip(self), level = "trace")]
411    async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
412        // Clone backends for 'static closures
413        let l1 = self.l1.clone();
414        let l2 = self.l2.clone();
415        // Use pre-computed labels (no allocation)
416        let l1_label = self.l1_label.clone();
417        let l2_label = self.l2_label.clone();
418
419        let read_l1_with_envelope = |k: CacheKey| async move {
420            let ctx: BoxContext = CacheContext::default().boxed();
421            let timer = Timer::new();
422            let read_result = l1.read(&k).await;
423            crate::metrics::record_read(&l1_label, timer.elapsed());
424
425            let result = match read_result {
426                Ok(Some(l1_value)) => {
427                    crate::metrics::record_read_bytes(&l1_label, l1_value.data().len());
428                    let (expire, stale) = (l1_value.expire(), l1_value.stale());
429                    let envelope = CompositionEnvelope::L1(l1_value);
430                    match envelope.serialize() {
431                        Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
432                        Err(e) => Err(e),
433                    }
434                }
435                Ok(None) => Ok(None),
436                Err(e) => {
437                    crate::metrics::record_read_error(&l1_label);
438                    Err(e)
439                }
440            };
441            (result, ctx)
442        };
443
444        let read_l2_with_envelope = |k: CacheKey| async move {
445            let ctx: BoxContext = CacheContext::default().boxed();
446            let timer = Timer::new();
447            let read_result = l2.read(&k).await;
448            crate::metrics::record_read(&l2_label, timer.elapsed());
449
450            let result = match read_result {
451                Ok(Some(l2_value)) => {
452                    crate::metrics::record_read_bytes(&l2_label, l2_value.data().len());
453                    let (expire, stale) = (l2_value.expire(), l2_value.stale());
454                    let envelope = CompositionEnvelope::L2(l2_value);
455                    match envelope.serialize() {
456                        Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
457                        Err(e) => Err(e),
458                    }
459                }
460                Ok(None) => Ok(None),
461                Err(e) => {
462                    crate::metrics::record_read_error(&l2_label);
463                    Err(e)
464                }
465            };
466            (result, ctx)
467        };
468
469        let ReadResult { value, .. } = self
470            .read_policy
471            .execute_with(
472                key.clone(),
473                read_l1_with_envelope,
474                read_l2_with_envelope,
475                &self.offload,
476            )
477            .await?;
478
479        // No context creation - Format will extract context from envelope during deserialization
480        Ok(value)
481    }
482
483    #[tracing::instrument(skip(self, value), level = "trace")]
484    async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
485        // Unpack CompositionEnvelope using zero-copy format
486        let composition = CompositionEnvelope::deserialize(value.data())?;
487
488        // Write to appropriate layers
489        // In normal usage via CacheBackend::set, this is always Both variant
490        // The L1/L2 branches are defensive code for edge cases
491        match composition {
492            CompositionEnvelope::Both { l1, l2 } => {
493                // Clone backends for 'static closures
494                let l1_backend = self.l1.clone();
495                let l2_backend = self.l2.clone();
496                // Use pre-computed labels (no allocation)
497                let l1_label = self.l1_label.clone();
498                let l2_label = self.l2_label.clone();
499                let l1_len = l1.data().len();
500                let l2_len = l2.data().len();
501
502                let write_l1 = |k: CacheKey| async move {
503                    let timer = Timer::new();
504                    let result = l1_backend.write(&k, l1).await;
505                    crate::metrics::record_write(&l1_label, timer.elapsed());
506                    match &result {
507                        Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
508                        Err(_) => crate::metrics::record_write_error(&l1_label),
509                    }
510                    result
511                };
512                let write_l2 = |k: CacheKey| async move {
513                    let timer = Timer::new();
514                    let result = l2_backend.write(&k, l2).await;
515                    crate::metrics::record_write(&l2_label, timer.elapsed());
516                    match &result {
517                        Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
518                        Err(_) => crate::metrics::record_write_error(&l2_label),
519                    }
520                    result
521                };
522
523                self.write_policy
524                    .execute_with(key.clone(), write_l1, write_l2, &self.offload)
525                    .await
526            }
527            CompositionEnvelope::L1(l1) => {
528                let l1_len = l1.data().len();
529                let timer = Timer::new();
530                let result = self.l1.write(key, l1).await;
531                crate::metrics::record_write(&self.l1_label, timer.elapsed());
532                match &result {
533                    Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
534                    Err(_) => crate::metrics::record_write_error(&self.l1_label),
535                }
536                result
537            }
538            CompositionEnvelope::L2(l2) => {
539                let l2_len = l2.data().len();
540                let timer = Timer::new();
541                let result = self.l2.write(key, l2).await;
542                crate::metrics::record_write(&self.l2_label, timer.elapsed());
543                match &result {
544                    Ok(()) => crate::metrics::record_write_bytes(&self.l2_label, l2_len),
545                    Err(_) => crate::metrics::record_write_error(&self.l2_label),
546                }
547                result
548            }
549        }
550    }
551
552    #[tracing::instrument(skip(self), level = "trace")]
553    async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
554        // Delete from both layers in parallel for better performance
555        let (l1_result, l2_result) = futures::join!(self.l1.remove(key), self.l2.remove(key));
556
557        match (l1_result, l2_result) {
558            (Err(e1), Err(e2)) => {
559                tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
560                Err(BackendError::InternalError(Box::new(
561                    CompositionError::BothLayersFailed { l1: e1, l2: e2 },
562                )))
563            }
564            (Err(e), Ok(status)) => {
565                tracing::warn!(error = ?e, "L1 delete failed");
566                Ok(status)
567            }
568            (Ok(status), Err(e)) => {
569                tracing::warn!(error = ?e, "L2 delete failed");
570                Ok(status)
571            }
572            (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
573                Ok(DeleteStatus::Deleted(n1 + n2))
574            }
575            (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
576            | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
577                Ok(DeleteStatus::Deleted(n))
578            }
579            (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => Ok(DeleteStatus::Missing),
580        }
581    }
582
583    fn label(&self) -> BackendLabel {
584        self.label.clone()
585    }
586
587    fn value_format(&self) -> &dyn Format {
588        &self.format
589    }
590
591    fn key_format(&self) -> &CacheKeyFormat {
592        &CacheKeyFormat::Bitcode
593    }
594
595    fn compressor(&self) -> &dyn Compressor {
596        &PassthroughCompressor
597    }
598}
599
600impl<L1, L2, O, R, W> CacheBackend for CompositionBackend<L1, L2, O, R, W>
601where
602    L1: CacheBackend + Clone + Send + Sync + 'static,
603    L2: CacheBackend + Clone + Send + Sync + 'static,
604    O: Offload<'static>,
605    R: CompositionReadPolicy,
606    W: CompositionWritePolicy,
607{
608    #[tracing::instrument(skip(self, ctx), level = "trace")]
609    async fn get<T>(
610        &self,
611        key: &CacheKey,
612        ctx: &mut BoxContext,
613    ) -> BackendResult<Option<CacheValue<T::Cached>>>
614    where
615        T: CacheableResponse,
616        T::Cached: Cacheable,
617    {
618        // Clone backends for 'static closures
619        let l1 = self.l1.clone();
620        let l2 = self.l2.clone();
621
622        // Use pre-computed composed labels for metrics
623        let l1_label = self.l1_label.clone();
624        let l2_label = self.l2_label.clone();
625
626        // Use inner backend labels for source path (merge_from adds composition prefix)
627        let l1_name = l1.label();
628        let l2_name = l2.label();
629
630        // Clone format for each closure
631        let format_for_l1 = self.format.clone();
632        let format_for_l2 = self.format.clone();
633
634        // Clone context for internal L1/L2 operations
635        let l1_ctx = ctx.clone_box();
636        let l2_ctx = ctx.clone_box();
637
638        let read_l1 = |k: CacheKey| async move {
639            let mut internal_ctx = l1_ctx;
640
641            // Read raw bytes from L1 with metrics
642            let read_timer = Timer::new();
643            let read_result = l1.read(&k).await;
644            crate::metrics::record_read(&l1_label, read_timer.elapsed());
645
646            let result = match read_result {
647                Ok(Some(raw_value)) => {
648                    let (meta, raw_data) = raw_value.into_parts();
649                    crate::metrics::record_read_bytes(&l1_label, raw_data.len());
650
651                    // Deserialize using CompositionFormat (records decompress/deserialize metrics)
652                    let mut deserialized_opt: Option<T::Cached> = None;
653                    match format_for_l1.deserialize_layer(
654                        &raw_data,
655                        CompositionLayer::L1,
656                        &mut |deserializer| {
657                            let value: T::Cached = deserializer.deserialize()?;
658                            deserialized_opt = Some(value);
659                            Ok(())
660                        },
661                        &mut internal_ctx,
662                    ) {
663                        Ok(()) => match deserialized_opt {
664                            Some(deserialized) => {
665                                // Set cache status
666                                internal_ctx.set_status(CacheStatus::Hit);
667
668                                // Get source from context (handles nested compositions)
669                                // If context was upgraded to CompositionContext, extract source from it
670                                let source = if let Some(comp_ctx) =
671                                    internal_ctx.as_any().downcast_ref::<CompositionContext>()
672                                {
673                                    // Nested composition: get label from inner format
674                                    BackendLabel::from(
675                                        comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
676                                    )
677                                } else {
678                                    // Simple backend: use backend name
679                                    l1_name.clone()
680                                };
681                                internal_ctx.set_source(ResponseSource::Backend(source));
682
683                                Ok(Some(CacheValue::new(deserialized, meta.expire, meta.stale)))
684                            }
685                            None => Err(BackendError::InternalError(Box::new(
686                                std::io::Error::other("deserialization produced no result"),
687                            ))),
688                        },
689                        Err(e) => Err(BackendError::InternalError(Box::new(e))),
690                    }
691                }
692                Ok(None) => Ok(None),
693                Err(e) => {
694                    crate::metrics::record_read_error(&l1_label);
695                    Err(e)
696                }
697            };
698
699            (result, internal_ctx)
700        };
701
702        let read_l2 = |k: CacheKey| async move {
703            let mut internal_ctx = l2_ctx;
704
705            // Read raw bytes from L2 with metrics
706            let read_timer = Timer::new();
707            let read_result = l2.read(&k).await;
708            crate::metrics::record_read(&l2_label, read_timer.elapsed());
709
710            let result = match read_result {
711                Ok(Some(raw_value)) => {
712                    let (meta, raw_data) = raw_value.into_parts();
713                    crate::metrics::record_read_bytes(&l2_label, raw_data.len());
714
715                    // Deserialize using CompositionFormat (records decompress/deserialize metrics)
716                    // Note: deserialize_layer upgrades context to CompositionContext with L2 layer,
717                    // which sets ReadMode::Refill - CacheFuture will handle the actual refill
718                    let mut deserialized_opt: Option<T::Cached> = None;
719                    match format_for_l2.deserialize_layer(
720                        &raw_data,
721                        CompositionLayer::L2,
722                        &mut |deserializer| {
723                            let value: T::Cached = deserializer.deserialize()?;
724                            deserialized_opt = Some(value);
725                            Ok(())
726                        },
727                        &mut internal_ctx,
728                    ) {
729                        Ok(()) => match deserialized_opt {
730                            Some(deserialized) => {
731                                let cache_value =
732                                    CacheValue::new(deserialized, meta.expire, meta.stale);
733
734                                // Set cache status and source for L2 hit
735                                internal_ctx.set_status(CacheStatus::Hit);
736
737                                // Get source from context (handles nested compositions)
738                                // If context was upgraded to CompositionContext, extract source from it
739                                let source = if let Some(comp_ctx) =
740                                    internal_ctx.as_any().downcast_ref::<CompositionContext>()
741                                {
742                                    // Nested composition: get label from inner format
743                                    BackendLabel::from(
744                                        comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
745                                    )
746                                } else {
747                                    // Simple backend: use backend name
748                                    l2_name.clone()
749                                };
750                                internal_ctx.set_source(ResponseSource::Backend(source));
751
752                                Ok(Some(cache_value))
753                            }
754                            None => Err(BackendError::InternalError(Box::new(
755                                std::io::Error::other("deserialization produced no result"),
756                            ))),
757                        },
758                        Err(e) => Err(BackendError::InternalError(Box::new(e))),
759                    }
760                }
761                Ok(None) => Ok(None),
762                Err(e) => {
763                    crate::metrics::record_read_error(&l2_label);
764                    Err(e)
765                }
766            };
767
768            (result, internal_ctx)
769        };
770
771        let ReadResult {
772            value,
773            source,
774            context: inner_ctx,
775        } = self
776            .read_policy
777            .execute_with(key.clone(), read_l1, read_l2, &self.offload)
778            .await?;
779
780        // Merge inner context into outer context, composing source paths
781        if let Some(ref _cache_value) = value {
782            ctx.merge_from(&*inner_ctx, &self.label);
783
784            // If L2 hit and refill policy is Always, set ReadMode::Refill
785            // CacheFuture will handle the actual refill via set()
786            if source == CompositionLayer::L2 && self.refill_policy == RefillPolicy::Always {
787                ctx.set_read_mode(hitbox_core::ReadMode::Refill);
788            }
789        }
790
791        Ok(value)
792    }
793
794    #[tracing::instrument(skip(self, value, ctx), level = "trace")]
795    async fn set<T>(
796        &self,
797        key: &CacheKey,
798        value: &CacheValue<T::Cached>,
799        ctx: &mut BoxContext,
800    ) -> BackendResult<()>
801    where
802        T: CacheableResponse,
803        T::Cached: Cacheable,
804    {
805        use hitbox_core::ReadMode;
806
807        // Check if this is a refill operation (triggered by CacheFuture after L2 hit)
808        // This happens when CacheBackend::get() sets ReadMode::Refill
809        if ctx.read_mode() == ReadMode::Refill {
810            match self.refill_policy {
811                RefillPolicy::Always => {
812                    // Refill L1 only - write serialized data to L1
813                    let l1_bytes = self
814                        .format
815                        .serialize_layer(
816                            CompositionLayer::L1,
817                            &mut |serializer| {
818                                serializer.serialize(value.data())?;
819                                Ok(())
820                            },
821                            &**ctx,
822                        )
823                        .map_err(|e| BackendError::InternalError(Box::new(e)))?;
824
825                    let l1_len = l1_bytes.len();
826                    let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
827
828                    // Write to L1 with metrics
829                    let timer = Timer::new();
830                    let result = self.l1.write(key, l1_value).await;
831                    crate::metrics::record_write(&self.l1_label, timer.elapsed());
832                    match &result {
833                        Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
834                        Err(_) => crate::metrics::record_write_error(&self.l1_label),
835                    }
836                    result?;
837
838                    // Recursively call L2.set() for nested refill
839                    // L2 (if it's a CompositionBackend) will handle its own refill logic
840                    return self.l2.set::<T>(key, value, ctx).await;
841                }
842                RefillPolicy::Never => {
843                    // With Never policy, don't refill at all
844                    // L2 already has the data (it's the source), so skip write
845                    return Ok(());
846                }
847            }
848        }
849
850        // Check if this is a nested refill operation via CompositionContext
851        // Each CompositionContext wraps an inner context and tracks which layer provided data
852        if let Some(comp_ctx) = ctx.as_any().downcast_ref::<CompositionContext>()
853            && comp_ctx.layer == CompositionLayer::L2
854        {
855            match self.refill_policy {
856                RefillPolicy::Always => {
857                    // This level needs refill: write to L1 only
858                    let l1_bytes = self
859                        .format
860                        .serialize_layer(
861                            CompositionLayer::L1,
862                            &mut |serializer| {
863                                serializer.serialize(value.data())?;
864                                Ok(())
865                            },
866                            &**ctx,
867                        )
868                        .map_err(|e| BackendError::InternalError(Box::new(e)))?;
869
870                    let l1_len = l1_bytes.len();
871                    let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
872
873                    // Write to L1 with metrics
874                    let timer = Timer::new();
875                    let result = self.l1.write(key, l1_value).await;
876                    crate::metrics::record_write(&self.l1_label, timer.elapsed());
877                    match &result {
878                        Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
879                        Err(_) => crate::metrics::record_write_error(&self.l1_label),
880                    }
881                    result?;
882
883                    // Recursively call L2.set() with inner context for nested refill
884                    // Inner context may be another CompositionContext (nested) or CacheContext (leaf)
885                    let mut inner_ctx = comp_ctx.inner().clone_box();
886                    return self.l2.set::<T>(key, value, &mut inner_ctx).await;
887                }
888                RefillPolicy::Never => {
889                    // Skip L1 write (no refill), but recurse to L2 for nested handling
890                    let mut inner_ctx = comp_ctx.inner().clone_box();
891                    return self.l2.set::<T>(key, value, &mut inner_ctx).await;
892                }
893            }
894        }
895
896        // Normal mode: write to both layers
897        // Serialize for both layers using CompositionFormat
898        // This handles same-format optimization and records metrics with composed labels
899        let (l1_bytes, l2_bytes) = self
900            .format
901            .serialize_parts(
902                &mut |serializer| {
903                    serializer.serialize(value.data())?;
904                    Ok(())
905                },
906                &**ctx,
907            )
908            .map_err(|e| BackendError::InternalError(Box::new(e)))?;
909
910        let l1_len = l1_bytes.len();
911        let l2_len = l2_bytes.len();
912
913        // Create raw values for Backend::write
914        let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
915        let l2_value = CacheValue::new(l2_bytes, value.expire(), value.stale());
916
917        // Clone backends for 'static closures
918        let l1 = self.l1.clone();
919        let l2 = self.l2.clone();
920
921        // Use pre-computed composed labels
922        let l1_label = self.l1_label.clone();
923        let l2_label = self.l2_label.clone();
924
925        // Write closures using Backend::write directly with composed labels
926        let write_l1 = |k: CacheKey| async move {
927            let timer = Timer::new();
928            let result = l1.write(&k, l1_value).await;
929            crate::metrics::record_write(&l1_label, timer.elapsed());
930            match &result {
931                Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
932                Err(_) => crate::metrics::record_write_error(&l1_label),
933            }
934            result
935        };
936
937        let write_l2 = |k: CacheKey| async move {
938            let timer = Timer::new();
939            let result = l2.write(&k, l2_value).await;
940            crate::metrics::record_write(&l2_label, timer.elapsed());
941            match &result {
942                Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
943                Err(_) => crate::metrics::record_write_error(&l2_label),
944            }
945            result
946        };
947
948        self.write_policy
949            .execute_with(key.clone(), write_l1, write_l2, &self.offload)
950            .await
951    }
952
953    #[tracing::instrument(skip(self, ctx), level = "trace")]
954    async fn delete(&self, key: &CacheKey, ctx: &mut BoxContext) -> BackendResult<DeleteStatus> {
955        // Delete from both layers in parallel for better performance
956        let mut l1_ctx = ctx.clone_box();
957        let mut l2_ctx = ctx.clone_box();
958        let (l1_result, l2_result) = futures::join!(
959            self.l1.delete(key, &mut l1_ctx),
960            self.l2.delete(key, &mut l2_ctx)
961        );
962
963        // Aggregate results
964        match (l1_result, l2_result) {
965            (Err(e1), Err(e2)) => {
966                tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
967                Err(BackendError::InternalError(Box::new(
968                    CompositionError::BothLayersFailed { l1: e1, l2: e2 },
969                )))
970            }
971            (Err(e), Ok(status)) => {
972                tracing::warn!(error = ?e, "L1 delete failed");
973                Ok(status)
974            }
975            (Ok(status), Err(e)) => {
976                tracing::warn!(error = ?e, "L2 delete failed");
977                Ok(status)
978            }
979            (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
980                tracing::trace!("Deleted from both L1 and L2");
981                Ok(DeleteStatus::Deleted(n1 + n2))
982            }
983            (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
984            | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
985                tracing::trace!("Deleted from one layer");
986                Ok(DeleteStatus::Deleted(n))
987            }
988            (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => {
989                tracing::trace!("Key missing from both layers");
990                Ok(DeleteStatus::Missing)
991            }
992        }
993    }
994}
995
996#[cfg(test)]
997mod tests {
998    use super::*;
999    use crate::format::{Format, JsonFormat};
1000    use crate::{Backend, CacheKeyFormat, Compressor, PassthroughCompressor};
1001    use async_trait::async_trait;
1002    use chrono::Utc;
1003    use hitbox_core::{
1004        BoxContext, CacheContext, CachePolicy, CacheStatus, CacheValue, CacheableResponse,
1005        EntityPolicyConfig, Predicate, Raw, ResponseSource,
1006    };
1007    use serde::{Deserialize, Serialize};
1008    use smol_str::SmolStr;
1009    use std::collections::HashMap;
1010    use std::future::Future;
1011    use std::sync::{Arc, Mutex};
1012
1013    #[cfg(feature = "rkyv_format")]
1014    use rkyv::{Archive, Serialize as RkyvSerialize};
1015
1016    /// Test offload that spawns tasks with tokio::spawn
1017    #[derive(Clone, Debug)]
1018    struct TestOffload;
1019
1020    impl Offload<'static> for TestOffload {
1021        #[allow(deprecated)]
1022        fn spawn<F>(&self, _kind: impl Into<SmolStr>, future: F)
1023        where
1024            F: Future<Output = ()> + Send + 'static,
1025        {
1026            tokio::spawn(future);
1027        }
1028    }
1029
1030    // Simple in-memory backend for testing
1031    #[derive(Clone, Debug)]
1032    struct TestBackend {
1033        store: Arc<Mutex<HashMap<CacheKey, CacheValue<Raw>>>>,
1034        backend_label: &'static str,
1035    }
1036
1037    impl TestBackend {
1038        fn new() -> Self {
1039            Self {
1040                store: Arc::new(Mutex::new(HashMap::new())),
1041                backend_label: "test",
1042            }
1043        }
1044
1045        fn with_label(label: &'static str) -> Self {
1046            Self {
1047                store: Arc::new(Mutex::new(HashMap::new())),
1048                backend_label: label,
1049            }
1050        }
1051    }
1052
1053    #[async_trait]
1054    impl Backend for TestBackend {
1055        async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
1056            Ok(self.store.lock().unwrap().get(key).cloned())
1057        }
1058
1059        async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
1060            self.store.lock().unwrap().insert(key.clone(), value);
1061            Ok(())
1062        }
1063
1064        async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
1065            match self.store.lock().unwrap().remove(key) {
1066                Some(_) => Ok(DeleteStatus::Deleted(1)),
1067                None => Ok(DeleteStatus::Missing),
1068            }
1069        }
1070
1071        fn label(&self) -> BackendLabel {
1072            BackendLabel::new(self.backend_label)
1073        }
1074
1075        fn value_format(&self) -> &dyn Format {
1076            &JsonFormat
1077        }
1078
1079        fn key_format(&self) -> &CacheKeyFormat {
1080            &CacheKeyFormat::Bitcode
1081        }
1082
1083        fn compressor(&self) -> &dyn Compressor {
1084            &PassthroughCompressor
1085        }
1086    }
1087
1088    impl CacheBackend for TestBackend {}
1089
1090    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1091    #[cfg_attr(
1092        feature = "rkyv_format",
1093        derive(Archive, RkyvSerialize, rkyv::Deserialize)
1094    )]
1095    struct CachedData {
1096        value: String,
1097    }
1098
1099    // Mock CacheableResponse for testing
1100    // We only need the associated type, the actual methods are not used in these tests
1101    struct MockResponse;
1102
1103    // Note: This is a minimal implementation just for testing CacheBackend.
1104    // The methods are not actually called in these tests.
1105    impl CacheableResponse for MockResponse {
1106        type Cached = CachedData;
1107        type Subject = MockResponse;
1108        type IntoCachedFuture = std::future::Ready<CachePolicy<Self::Cached, Self>>;
1109        type FromCachedFuture = std::future::Ready<Self>;
1110
1111        async fn cache_policy<P: Predicate<Subject = Self::Subject> + Send + Sync>(
1112            self,
1113            _predicate: P,
1114            _config: &EntityPolicyConfig,
1115        ) -> CachePolicy<CacheValue<Self::Cached>, Self> {
1116            unimplemented!("Not used in these tests")
1117        }
1118
1119        fn into_cached(self) -> Self::IntoCachedFuture {
1120            unimplemented!("Not used in these tests")
1121        }
1122
1123        fn from_cached(_cached: Self::Cached) -> Self::FromCachedFuture {
1124            unimplemented!("Not used in these tests")
1125        }
1126    }
1127
1128    #[tokio::test]
1129    async fn test_l1_hit() {
1130        let l1 = TestBackend::with_label("moka");
1131        let l2 = TestBackend::with_label("redis");
1132        let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1133
1134        let key = CacheKey::from_str("test", "key1");
1135        let value = CacheValue::new(
1136            CachedData {
1137                value: "value1".to_string(),
1138            },
1139            Some(Utc::now() + chrono::Duration::seconds(60)),
1140            None,
1141        );
1142
1143        // Write to populate both layers
1144        let mut ctx: BoxContext = CacheContext::default().boxed();
1145        backend
1146            .set::<MockResponse>(&key, &value, &mut ctx)
1147            .await
1148            .unwrap();
1149
1150        // Read should hit L1
1151        let mut ctx: BoxContext = CacheContext::default().boxed();
1152        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1153        assert_eq!(result.unwrap().data().value, "value1");
1154
1155        // Verify source path is composed correctly: "cache.moka"
1156        assert_eq!(ctx.status(), CacheStatus::Hit);
1157        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1158    }
1159
1160    #[tokio::test]
1161    async fn test_l2_hit_sets_refill_mode() {
1162        use hitbox_core::ReadMode;
1163
1164        let l1 = TestBackend::with_label("moka");
1165        let l2 = TestBackend::with_label("redis");
1166
1167        let key = CacheKey::from_str("test", "key1");
1168        let value = CacheValue::new(
1169            CachedData {
1170                value: "value1".to_string(),
1171            },
1172            Some(Utc::now() + chrono::Duration::seconds(60)),
1173            None,
1174        );
1175
1176        // Backend with RefillPolicy::Always
1177        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1178            .label("cache")
1179            .refill(RefillPolicy::Always);
1180
1181        // Write through CompositionBackend (populates both L1 and L2)
1182        let mut ctx: BoxContext = CacheContext::default().boxed();
1183        backend
1184            .set::<MockResponse>(&key, &value, &mut ctx)
1185            .await
1186            .unwrap();
1187
1188        // Clear L1 to simulate L1 miss scenario
1189        l1.store.lock().unwrap().clear();
1190
1191        // Read should hit L2 and set ReadMode::Refill
1192        let mut ctx: BoxContext = CacheContext::default().boxed();
1193        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1194        assert_eq!(result.unwrap().data().value, "value1");
1195
1196        // Verify source path is composed correctly: "cache.redis" (hit L2)
1197        assert_eq!(ctx.status(), CacheStatus::Hit);
1198        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1199
1200        // Verify ReadMode::Refill is set (CacheFuture will use this to call set())
1201        assert_eq!(ctx.read_mode(), ReadMode::Refill);
1202
1203        // L1 should NOT be populated yet (refill happens via CacheFuture.set())
1204        let mut ctx: BoxContext = CacheContext::default().boxed();
1205        let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1206        assert!(
1207            l1_result.is_none(),
1208            "L1 should not be populated directly by get()"
1209        );
1210    }
1211
1212    #[tokio::test]
1213    async fn test_miss_both_layers() {
1214        let l1 = TestBackend::new();
1215        let l2 = TestBackend::new();
1216        let backend = CompositionBackend::new(l1, l2, TestOffload);
1217
1218        let key = CacheKey::from_str("test", "nonexistent");
1219
1220        let mut ctx: BoxContext = CacheContext::default().boxed();
1221        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1222        assert!(result.is_none());
1223    }
1224
1225    #[tokio::test]
1226    async fn test_write_to_both_layers() {
1227        let l1 = TestBackend::new();
1228        let l2 = TestBackend::new();
1229
1230        let key = CacheKey::from_str("test", "key1");
1231        let value = CacheValue::new(
1232            CachedData {
1233                value: "value1".to_string(),
1234            },
1235            Some(Utc::now() + chrono::Duration::seconds(60)),
1236            None,
1237        );
1238
1239        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1240
1241        let mut ctx: BoxContext = CacheContext::default().boxed();
1242        backend
1243            .set::<MockResponse>(&key, &value, &mut ctx)
1244            .await
1245            .unwrap();
1246
1247        // Verify both layers have the value
1248        let mut ctx: BoxContext = CacheContext::default().boxed();
1249        let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1250        assert_eq!(l1_result.unwrap().data().value, "value1");
1251
1252        let mut ctx: BoxContext = CacheContext::default().boxed();
1253        let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1254        assert_eq!(l2_result.unwrap().data().value, "value1");
1255    }
1256
1257    #[tokio::test]
1258    async fn test_delete_from_both_layers() {
1259        let l1 = TestBackend::new();
1260        let l2 = TestBackend::new();
1261
1262        let key = CacheKey::from_str("test", "key1");
1263        let value = CacheValue::new(
1264            CachedData {
1265                value: "value1".to_string(),
1266            },
1267            Some(Utc::now() + chrono::Duration::seconds(60)),
1268            None,
1269        );
1270
1271        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1272
1273        // Write to both
1274        let mut ctx: BoxContext = CacheContext::default().boxed();
1275        backend
1276            .set::<MockResponse>(&key, &value, &mut ctx)
1277            .await
1278            .unwrap();
1279
1280        // Delete from both
1281        let mut ctx: BoxContext = CacheContext::default().boxed();
1282        let status = backend.delete(&key, &mut ctx).await.unwrap();
1283        assert_eq!(status, DeleteStatus::Deleted(2));
1284
1285        // Verify both layers no longer have the value
1286        let mut ctx: BoxContext = CacheContext::default().boxed();
1287        let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1288        assert!(l1_result.is_none());
1289
1290        let mut ctx: BoxContext = CacheContext::default().boxed();
1291        let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1292        assert!(l2_result.is_none());
1293    }
1294
1295    #[tokio::test]
1296    async fn test_clone() {
1297        let l1 = TestBackend::new();
1298        let l2 = TestBackend::new();
1299        let backend = CompositionBackend::new(l1, l2, TestOffload);
1300
1301        let cloned = backend.clone();
1302
1303        let key = CacheKey::from_str("test", "key1");
1304        let value = CacheValue::new(
1305            CachedData {
1306                value: "value1".to_string(),
1307            },
1308            Some(Utc::now() + chrono::Duration::seconds(60)),
1309            None,
1310        );
1311
1312        // Write via original
1313        let mut ctx: BoxContext = CacheContext::default().boxed();
1314        backend
1315            .set::<MockResponse>(&key, &value, &mut ctx)
1316            .await
1317            .unwrap();
1318
1319        // Read via clone should work (shared backends)
1320        let mut ctx: BoxContext = CacheContext::default().boxed();
1321        let result = cloned.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1322        assert_eq!(result.unwrap().data().value, "value1");
1323    }
1324
1325    #[tokio::test]
1326    async fn test_nested_composition_source_path() {
1327        // Create a nested composition: outer(inner(l1, l2), l3)
1328        // to test hierarchical source paths like "outer.inner.moka"
1329
1330        let l1 = TestBackend::with_label("moka");
1331        let l2 = TestBackend::with_label("redis");
1332        let l3 = TestBackend::with_label("disk");
1333
1334        // Inner composition: L1=moka, L2=redis
1335        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1336
1337        // Outer composition: L1=inner, L2=disk
1338        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1339
1340        let key = CacheKey::from_str("test", "nested");
1341        let value = CacheValue::new(
1342            CachedData {
1343                value: "nested_value".to_string(),
1344            },
1345            Some(Utc::now() + chrono::Duration::seconds(60)),
1346            None,
1347        );
1348
1349        // Write only to innermost L1 (moka)
1350        let mut ctx: BoxContext = CacheContext::default().boxed();
1351        l1.set::<MockResponse>(&key, &value, &mut ctx)
1352            .await
1353            .unwrap();
1354
1355        // Read through outer composition - should hit inner.L1 (moka)
1356        let mut ctx: BoxContext = CacheContext::default().boxed();
1357        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1358        assert_eq!(result.unwrap().data().value, "nested_value");
1359
1360        // Verify nested source path: "outer.inner.moka"
1361        assert_eq!(ctx.status(), CacheStatus::Hit);
1362        assert_eq!(
1363            ctx.source(),
1364            &ResponseSource::Backend("outer.inner.moka".into())
1365        );
1366    }
1367
1368    #[tokio::test]
1369    async fn test_nested_composition_l2_source_path() {
1370        // Test nested composition where hit comes from inner L2
1371
1372        let l1 = TestBackend::with_label("moka");
1373        let l2 = TestBackend::with_label("redis");
1374        let l3 = TestBackend::with_label("disk");
1375
1376        // Inner composition: L1=moka, L2=redis
1377        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1378
1379        // Outer composition: L1=inner, L2=disk
1380        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1381
1382        let key = CacheKey::from_str("test", "nested_l2");
1383        let value = CacheValue::new(
1384            CachedData {
1385                value: "from_redis".to_string(),
1386            },
1387            Some(Utc::now() + chrono::Duration::seconds(60)),
1388            None,
1389        );
1390
1391        // Write only to inner L2 (redis) - not to moka
1392        let mut ctx: BoxContext = CacheContext::default().boxed();
1393        l2.set::<MockResponse>(&key, &value, &mut ctx)
1394            .await
1395            .unwrap();
1396
1397        // Read through outer composition - should hit inner.L2 (redis)
1398        let mut ctx: BoxContext = CacheContext::default().boxed();
1399        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1400        assert_eq!(result.unwrap().data().value, "from_redis");
1401
1402        // Verify nested source path: "outer.inner.redis"
1403        assert_eq!(ctx.status(), CacheStatus::Hit);
1404        assert_eq!(
1405            ctx.source(),
1406            &ResponseSource::Backend("outer.inner.redis".into())
1407        );
1408    }
1409
1410    #[tokio::test]
1411    async fn test_nested_composition_outer_l2_source_path() {
1412        // Test nested composition where hit comes from outer L2 (disk)
1413
1414        let l1 = TestBackend::with_label("moka");
1415        let l2 = TestBackend::with_label("redis");
1416        let l3 = TestBackend::with_label("disk");
1417
1418        // Inner composition: L1=moka, L2=redis
1419        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1420
1421        // Outer composition: L1=inner, L2=disk
1422        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1423
1424        let key = CacheKey::from_str("test", "outer_l2");
1425        let value = CacheValue::new(
1426            CachedData {
1427                value: "from_disk".to_string(),
1428            },
1429            Some(Utc::now() + chrono::Duration::seconds(60)),
1430            None,
1431        );
1432
1433        // Write only to outer L2 (disk) - not to inner composition
1434        let mut ctx: BoxContext = CacheContext::default().boxed();
1435        l3.set::<MockResponse>(&key, &value, &mut ctx)
1436            .await
1437            .unwrap();
1438
1439        // Read through outer composition - should hit outer L2 (disk)
1440        let mut ctx: BoxContext = CacheContext::default().boxed();
1441        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1442        assert_eq!(result.unwrap().data().value, "from_disk");
1443
1444        // Verify source path: "outer.disk"
1445        assert_eq!(ctx.status(), CacheStatus::Hit);
1446        assert_eq!(ctx.source(), &ResponseSource::Backend("outer.disk".into()));
1447    }
1448
1449    #[tokio::test]
1450    async fn test_l1_hit_status() {
1451        let l1 = TestBackend::with_label("moka");
1452        let l2 = TestBackend::with_label("redis");
1453        let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1454
1455        let key = CacheKey::from_str("test", "metrics1");
1456        let value = CacheValue::new(
1457            CachedData {
1458                value: "value1".to_string(),
1459            },
1460            Some(Utc::now() + chrono::Duration::seconds(60)),
1461            None,
1462        );
1463
1464        // Write directly to L1 backend to set up the test
1465        let mut ctx: BoxContext = CacheContext::default().boxed();
1466        l1.set::<MockResponse>(&key, &value, &mut ctx)
1467            .await
1468            .unwrap();
1469
1470        // Read through composition should hit L1
1471        let mut ctx: BoxContext = CacheContext::default().boxed();
1472        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1473        assert_eq!(result.unwrap().data().value, "value1");
1474
1475        // Verify status and source
1476        assert_eq!(ctx.status(), CacheStatus::Hit);
1477        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1478    }
1479
1480    #[tokio::test]
1481    async fn test_l2_hit_with_refill_via_set() {
1482        use hitbox_core::ReadMode;
1483
1484        let l1 = TestBackend::with_label("moka");
1485        let l2 = TestBackend::with_label("redis");
1486
1487        let key = CacheKey::from_str("test", "metrics2");
1488        let value = CacheValue::new(
1489            CachedData {
1490                value: "from_l2".to_string(),
1491            },
1492            Some(Utc::now() + chrono::Duration::seconds(60)),
1493            None,
1494        );
1495
1496        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1497            .label("cache")
1498            .refill(RefillPolicy::Always);
1499
1500        // Write through CompositionBackend (populates both L1 and L2)
1501        let mut ctx: BoxContext = CacheContext::default().boxed();
1502        backend
1503            .set::<MockResponse>(&key, &value, &mut ctx)
1504            .await
1505            .unwrap();
1506
1507        // Clear L1 to simulate L1 miss scenario
1508        l1.store.lock().unwrap().clear();
1509
1510        // Read should hit L2 and set ReadMode::Refill
1511        let mut ctx: BoxContext = CacheContext::default().boxed();
1512        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1513        let cached_value = result.unwrap();
1514        assert_eq!(cached_value.data().value, "from_l2");
1515
1516        // Verify status and source - L2 hit
1517        assert_eq!(ctx.status(), CacheStatus::Hit);
1518        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1519        assert_eq!(ctx.read_mode(), ReadMode::Refill);
1520
1521        // Simulate CacheFuture calling set() with refill context (only writes to L1)
1522        backend
1523            .set::<MockResponse>(&key, &cached_value, &mut ctx)
1524            .await
1525            .unwrap();
1526
1527        // Verify L1 was refilled - read again should hit L1
1528        let mut ctx: BoxContext = CacheContext::default().boxed();
1529        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1530        assert_eq!(result.unwrap().data().value, "from_l2");
1531        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1532    }
1533
1534    #[tokio::test]
1535    async fn test_nested_composition_status() {
1536        let l1 = TestBackend::with_label("moka");
1537        let l2 = TestBackend::with_label("redis");
1538        let l3 = TestBackend::with_label("disk");
1539
1540        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1541        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1542
1543        let key = CacheKey::from_str("test", "nested_metrics");
1544        let value = CacheValue::new(
1545            CachedData {
1546                value: "nested".to_string(),
1547            },
1548            Some(Utc::now() + chrono::Duration::seconds(60)),
1549            None,
1550        );
1551
1552        // Write to innermost L1 (moka)
1553        let mut ctx: BoxContext = CacheContext::default().boxed();
1554        l1.set::<MockResponse>(&key, &value, &mut ctx)
1555            .await
1556            .unwrap();
1557
1558        // Read through outer composition
1559        let mut ctx: BoxContext = CacheContext::default().boxed();
1560        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1561        assert_eq!(result.unwrap().data().value, "nested");
1562
1563        // Verify nested source path
1564        assert_eq!(ctx.status(), CacheStatus::Hit);
1565        assert_eq!(
1566            ctx.source(),
1567            &ResponseSource::Backend("outer.inner.moka".into())
1568        );
1569    }
1570}