Skip to main content

hitbox_backend/composition/
mod.rs

1//! Multi-tier caching by combining two backends.
2//!
3//! This backend implements a layered caching strategy where:
4//! - **L1** (first layer): Fast local cache (e.g., Moka)
5//! - **L2** (second layer): Distributed cache (e.g., Redis)
6//!
7//! # Policies
8//!
9//! Behavior is controlled by configurable policies. See the [`policy`] module for details.
10//!
11//! ## Read Policies
12//! - [`policy::SequentialReadPolicy`] - Try L1, then L2 on miss **(default)**
13//! - [`policy::RaceReadPolicy`] - Race both layers, return first hit
14//! - [`policy::ParallelReadPolicy`] - Query both in parallel, prefer fresher
15//!
16//! ## Write Policies
17//! - [`policy::SequentialWritePolicy`] - Write L1, then L2
18//! - [`policy::OptimisticParallelWritePolicy`] - Write both in parallel **(default)**
19//! - [`policy::RaceWritePolicy`] - Race both, background the slower
20//!
21//! ## Refill Policy
22//! - [`policy::RefillPolicy::Always`] - Populate L1 after L2 hit
23//! - [`policy::RefillPolicy::Never`] - Skip L1 population **(default)**
24//!
25//! # Example
26//!
27//! ```ignore
28//! use hitbox_backend::composition::{Compose, CompositionPolicy};
29//! use hitbox_backend::composition::policy::{RaceReadPolicy, RefillPolicy};
30//!
31//! // Default policies
32//! let cache = moka.compose(redis, offload);
33//!
34//! // Custom policies
35//! let policy = CompositionPolicy::new()
36//!     .read(RaceReadPolicy::new())
37//!     .refill(RefillPolicy::Always);
38//!
39//! let cache = moka.compose(redis, offload).with_policy(policy);
40//! ```
41
42pub mod compose;
43pub mod policy;
44
45mod context;
46mod envelope;
47mod format;
48
49pub use compose::Compose;
50pub use policy::CompositionPolicy;
51
52// Re-exports for submodules (not part of public API)
53pub(crate) use context::{CompositionContext, CompositionLayer};
54pub(crate) use format::CompositionFormat;
55
56use crate::format::Format;
57use crate::metrics::Timer;
58use crate::{
59    Backend, BackendError, BackendResult, CacheBackend, CacheKeyFormat, Compressor, DeleteStatus,
60    PassthroughCompressor,
61};
62use async_trait::async_trait;
63use envelope::CompositionEnvelope;
64use hitbox_core::{
65    BackendLabel, BoxContext, CacheContext, CacheKey, CacheStatus, CacheValue, Cacheable,
66    CacheableResponse, Offload, Raw, ResponseSource,
67};
68use policy::{
69    CompositionReadPolicy, CompositionWritePolicy, OptimisticParallelWritePolicy, ReadResult,
70    RefillPolicy, SequentialReadPolicy,
71};
72use smol_str::SmolStr;
73use std::sync::Arc;
74use thiserror::Error;
75
76/// Error type for composition backend operations.
77///
78/// This error type preserves errors from both cache layers for debugging,
79/// while keeping the implementation details encapsulated.
80#[derive(Debug, Error)]
81pub enum CompositionError {
82    /// Both L1 and L2 cache layers failed.
83    #[error("Both cache layers failed - L1: {l1}, L2: {l2}")]
84    BothLayersFailed {
85        /// Error from L1 layer
86        l1: BackendError,
87        /// Error from L2 layer
88        l2: BackendError,
89    },
90}
91
92/// A backend that composes two cache backends into a layered caching system.
93///
94/// The first backend (L1) is checked first on reads, and if not found,
95/// the second backend (L2) is checked. On writes, both backends are updated.
96///
97/// Each layer can use its own serialization format and compression since
98/// `CacheBackend` operates on typed data, not raw bytes.
99///
100/// Behavior can be customized via `CompositionReadPolicy`, `CompositionWritePolicy`, and `RefillPolicy` to control
101/// how reads, writes, and L1 refills are executed across the layers.
102pub struct CompositionBackend<
103    L1,
104    L2,
105    O,
106    R = SequentialReadPolicy,
107    W = OptimisticParallelWritePolicy,
108> where
109    L1: Backend,
110    L2: Backend,
111    O: Offload<'static>,
112    R: CompositionReadPolicy,
113    W: CompositionWritePolicy,
114{
115    /// First-layer cache (typically fast, local)
116    l1: L1,
117    /// Second-layer cache (typically distributed, persistent)
118    l2: L2,
119    /// Composition format
120    format: CompositionFormat,
121    /// Offload for background tasks
122    offload: O,
123    /// Read policy
124    read_policy: R,
125    /// Write policy
126    write_policy: W,
127    /// Refill policy
128    refill_policy: RefillPolicy,
129    /// Label of this backend for source path composition
130    label: BackendLabel,
131    /// Pre-computed metrics label for L1: "{label}.{l1.label()}"
132    l1_label: SmolStr,
133    /// Pre-computed metrics label for L2: "{label}.{l2.label()}"
134    l2_label: SmolStr,
135}
136
137/// Helper to compose a metrics label: "{prefix}.{suffix}"
138#[inline]
139fn compose_label(prefix: &str, suffix: &str) -> SmolStr {
140    SmolStr::from(format!("{}.{}", prefix, suffix))
141}
142
143impl<L1, L2, O> CompositionBackend<L1, L2, O, SequentialReadPolicy, OptimisticParallelWritePolicy>
144where
145    L1: Backend,
146    L2: Backend,
147    O: Offload<'static>,
148{
149    /// Creates a new composition backend with two layers using default policies.
150    ///
151    /// Default policies:
152    /// - Read: `SequentialReadPolicy` (try L1 first, then L2)
153    /// - Write: `OptimisticParallelWritePolicy` (write to both, succeed if ≥1 succeeds)
154    /// - Refill: `RefillPolicy::Never` (do not populate L1 after L2 hit)
155    ///
156    /// # Arguments
157    /// * `l1` - First-layer backend (checked first on reads)
158    /// * `l2` - Second-layer backend (checked if L1 misses)
159    /// * `offload` - Offload manager for background tasks (e.g., race policy losers)
160    pub fn new(l1: L1, l2: L2, offload: O) -> Self {
161        let label = BackendLabel::new_static("composition");
162        let l1_label = compose_label(label.as_str(), l1.label().as_str());
163        let l2_label = compose_label(label.as_str(), l2.label().as_str());
164        let format = CompositionFormat::new(
165            Arc::new(l1.value_format().clone_box()),
166            Arc::new(l2.value_format().clone_box()),
167            Arc::new(l1.compressor().clone_box()),
168            Arc::new(l2.compressor().clone_box()),
169            l1_label.clone(),
170            l2_label.clone(),
171        );
172        Self {
173            l1,
174            l2,
175            format,
176            offload,
177            read_policy: SequentialReadPolicy::new(),
178            write_policy: OptimisticParallelWritePolicy::new(),
179            refill_policy: RefillPolicy::default(),
180            label,
181            l1_label,
182            l2_label,
183        }
184    }
185}
186
187impl<L1, L2, O, R, W> CompositionBackend<L1, L2, O, R, W>
188where
189    L1: Backend,
190    L2: Backend,
191    O: Offload<'static>,
192    R: CompositionReadPolicy,
193    W: CompositionWritePolicy,
194{
195    /// Returns a reference to the read policy.
196    pub fn read_policy(&self) -> &R {
197        &self.read_policy
198    }
199
200    /// Returns a reference to the write policy.
201    pub fn write_policy(&self) -> &W {
202        &self.write_policy
203    }
204
205    /// Returns a reference to the refill policy.
206    pub fn refill_policy(&self) -> &RefillPolicy {
207        &self.refill_policy
208    }
209
210    /// Returns a reference to the offload manager.
211    pub fn offload(&self) -> &O {
212        &self.offload
213    }
214
215    /// Set a custom label for this backend.
216    ///
217    /// The label is used for source path composition in multi-layer caches.
218    /// For example, with label "cache", the source path might be "cache.L1".
219    pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
220        self.label = label.into();
221        // Recalculate labels with new label
222        self.l1_label = compose_label(self.label.as_str(), self.l1.label().as_str());
223        self.l2_label = compose_label(self.label.as_str(), self.l2.label().as_str());
224        // Update format labels too
225        self.format
226            .set_labels(self.l1_label.clone(), self.l2_label.clone());
227        self
228    }
229
230    /// Set all policies at once using CompositionPolicy builder.
231    ///
232    /// This is the preferred way to configure multiple policies.
233    ///
234    /// # Example
235    /// ```ignore
236    /// use hitbox_backend::{CompositionBackend, composition::CompositionPolicy};
237    /// use hitbox_backend::composition::policy::{RaceReadPolicy, SequentialWritePolicy, RefillPolicy};
238    ///
239    /// let policy = CompositionPolicy::new()
240    ///     .read(RaceReadPolicy::new())
241    ///     .write(SequentialWritePolicy::new())
242    ///     .refill(RefillPolicy::Always);
243    ///
244    /// let backend = CompositionBackend::new(l1, l2, offload)
245    ///     .with_policy(policy);
246    /// ```
247    pub fn with_policy<NewR, NewW>(
248        self,
249        policy: CompositionPolicy<NewR, NewW>,
250    ) -> CompositionBackend<L1, L2, O, NewR, NewW>
251    where
252        NewR: CompositionReadPolicy,
253        NewW: CompositionWritePolicy,
254    {
255        CompositionBackend {
256            l1: self.l1,
257            l2: self.l2,
258            format: self.format,
259            offload: self.offload,
260            read_policy: policy.read,
261            write_policy: policy.write,
262            refill_policy: policy.refill,
263            label: self.label,
264            l1_label: self.l1_label,
265            l2_label: self.l2_label,
266        }
267    }
268
269    /// Set the read policy (builder pattern).
270    ///
271    /// This consumes the backend and returns a new one with the updated read policy.
272    ///
273    /// # Example
274    /// ```ignore
275    /// use hitbox_backend::CompositionBackend;
276    /// use hitbox_backend::composition::policy::RaceReadPolicy;
277    ///
278    /// let backend = CompositionBackend::new(l1, l2, offload)
279    ///     .read(RaceReadPolicy::new());
280    /// ```
281    pub fn read<NewR: CompositionReadPolicy>(
282        self,
283        read_policy: NewR,
284    ) -> CompositionBackend<L1, L2, O, NewR, W> {
285        CompositionBackend {
286            l1: self.l1,
287            l2: self.l2,
288            format: self.format,
289            offload: self.offload,
290            read_policy,
291            write_policy: self.write_policy,
292            refill_policy: self.refill_policy,
293            label: self.label,
294            l1_label: self.l1_label,
295            l2_label: self.l2_label,
296        }
297    }
298
299    /// Set the write policy (builder pattern).
300    ///
301    /// This consumes the backend and returns a new one with the updated write policy.
302    ///
303    /// # Example
304    /// ```ignore
305    /// use hitbox_backend::CompositionBackend;
306    /// use hitbox_backend::composition::policy::SequentialWritePolicy;
307    ///
308    /// let backend = CompositionBackend::new(l1, l2, offload)
309    ///     .write(SequentialWritePolicy::new());
310    /// ```
311    pub fn write<NewW: CompositionWritePolicy>(
312        self,
313        write_policy: NewW,
314    ) -> CompositionBackend<L1, L2, O, R, NewW> {
315        CompositionBackend {
316            l1: self.l1,
317            l2: self.l2,
318            format: self.format,
319            offload: self.offload,
320            read_policy: self.read_policy,
321            write_policy,
322            refill_policy: self.refill_policy,
323            label: self.label,
324            l1_label: self.l1_label,
325            l2_label: self.l2_label,
326        }
327    }
328
329    /// Set the refill policy (builder pattern).
330    ///
331    /// This consumes the backend and returns a new one with the updated refill policy.
332    ///
333    /// # Example
334    /// ```ignore
335    /// use hitbox_backend::CompositionBackend;
336    /// use hitbox_backend::composition::policy::RefillPolicy;
337    ///
338    /// let backend = CompositionBackend::new(l1, l2, offload)
339    ///     .refill(RefillPolicy::Always);
340    /// ```
341    pub fn refill(mut self, refill_policy: RefillPolicy) -> Self {
342        self.refill_policy = refill_policy;
343        self
344    }
345}
346
347impl<L1, L2, O, R, W> Clone for CompositionBackend<L1, L2, O, R, W>
348where
349    L1: Clone + Backend,
350    L2: Clone + Backend,
351    O: Offload<'static>,
352    R: Clone + CompositionReadPolicy,
353    W: Clone + CompositionWritePolicy,
354{
355    fn clone(&self) -> Self {
356        Self {
357            l1: self.l1.clone(),
358            l2: self.l2.clone(),
359            format: self.format.clone(),
360            offload: self.offload.clone(),
361            read_policy: self.read_policy.clone(),
362            write_policy: self.write_policy.clone(),
363            refill_policy: self.refill_policy,
364            label: self.label.clone(),
365            l1_label: self.l1_label.clone(),
366            l2_label: self.l2_label.clone(),
367        }
368    }
369}
370
371impl<L1, L2, O, R, W> std::fmt::Debug for CompositionBackend<L1, L2, O, R, W>
372where
373    L1: std::fmt::Debug + Backend,
374    L2: std::fmt::Debug + Backend,
375    O: std::fmt::Debug + Offload<'static>,
376    R: std::fmt::Debug + CompositionReadPolicy,
377    W: std::fmt::Debug + CompositionWritePolicy,
378{
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        f.debug_struct("CompositionBackend")
381            .field("label", &self.label)
382            .field("l1", &self.l1)
383            .field("l2", &self.l2)
384            .field("format", &self.format)
385            .field("offload", &self.offload)
386            .field("read_policy", &self.read_policy)
387            .field("write_policy", &self.write_policy)
388            .field("refill_policy", &self.refill_policy)
389            .finish()
390    }
391}
392
393// Backend implementation for CompositionBackend
394// This implementation packs/unpacks CompositionEnvelope to enable
395// use as Box<dyn Backend> trait object
396//
397// PERFORMANCE NOTE: Negligible overhead - only metadata (expire/stale timestamps + envelope
398// discriminant) is serialized using bitcode. The already-serialized cached data (Bytes) is
399// copied into the buffer as-is without re-serialization. When using CompositionBackend
400// directly via CacheBackend::get/set, even this minimal envelope overhead is avoided.
401#[async_trait]
402impl<L1, L2, O, R, W> Backend for CompositionBackend<L1, L2, O, R, W>
403where
404    L1: Backend + Clone + Send + Sync + 'static,
405    L2: Backend + Clone + Send + Sync + 'static,
406    O: Offload<'static>,
407    R: CompositionReadPolicy,
408    W: CompositionWritePolicy,
409{
410    #[tracing::instrument(skip(self), level = "trace")]
411    async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
412        // Clone backends for 'static closures
413        let l1 = self.l1.clone();
414        let l2 = self.l2.clone();
415        // Use pre-computed labels (no allocation)
416        let l1_label = self.l1_label.clone();
417        let l2_label = self.l2_label.clone();
418
419        let read_l1_with_envelope = |k: CacheKey| async move {
420            let ctx: BoxContext = CacheContext::default().boxed();
421            let timer = Timer::new();
422            let read_result = l1.read(&k).await;
423            crate::metrics::record_read(&l1_label, timer.elapsed());
424
425            let result = match read_result {
426                Ok(Some(l1_value)) => {
427                    crate::metrics::record_read_bytes(&l1_label, l1_value.data().len());
428                    let (expire, stale) = (l1_value.expire(), l1_value.stale());
429                    let envelope = CompositionEnvelope::L1(l1_value);
430                    match envelope.serialize() {
431                        Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
432                        Err(e) => Err(e),
433                    }
434                }
435                Ok(None) => Ok(None),
436                Err(e) => {
437                    crate::metrics::record_read_error(&l1_label);
438                    Err(e)
439                }
440            };
441            (result, ctx)
442        };
443
444        let read_l2_with_envelope = |k: CacheKey| async move {
445            let ctx: BoxContext = CacheContext::default().boxed();
446            let timer = Timer::new();
447            let read_result = l2.read(&k).await;
448            crate::metrics::record_read(&l2_label, timer.elapsed());
449
450            let result = match read_result {
451                Ok(Some(l2_value)) => {
452                    crate::metrics::record_read_bytes(&l2_label, l2_value.data().len());
453                    let (expire, stale) = (l2_value.expire(), l2_value.stale());
454                    let envelope = CompositionEnvelope::L2(l2_value);
455                    match envelope.serialize() {
456                        Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
457                        Err(e) => Err(e),
458                    }
459                }
460                Ok(None) => Ok(None),
461                Err(e) => {
462                    crate::metrics::record_read_error(&l2_label);
463                    Err(e)
464                }
465            };
466            (result, ctx)
467        };
468
469        let ReadResult { value, .. } = self
470            .read_policy
471            .execute_with(
472                key.clone(),
473                read_l1_with_envelope,
474                read_l2_with_envelope,
475                &self.offload,
476            )
477            .await?;
478
479        // No context creation - Format will extract context from envelope during deserialization
480        Ok(value)
481    }
482
483    #[tracing::instrument(skip(self, value), level = "trace")]
484    async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
485        // Unpack CompositionEnvelope using zero-copy format
486        let composition = CompositionEnvelope::deserialize(value.data())?;
487
488        // Write to appropriate layers
489        // In normal usage via CacheBackend::set, this is always Both variant
490        // The L1/L2 branches are defensive code for edge cases
491        match composition {
492            CompositionEnvelope::Both { l1, l2 } => {
493                // Clone backends for 'static closures
494                let l1_backend = self.l1.clone();
495                let l2_backend = self.l2.clone();
496                // Use pre-computed labels (no allocation)
497                let l1_label = self.l1_label.clone();
498                let l2_label = self.l2_label.clone();
499                let l1_len = l1.data().len();
500                let l2_len = l2.data().len();
501
502                let write_l1 = |k: CacheKey| async move {
503                    let timer = Timer::new();
504                    let result = l1_backend.write(&k, l1).await;
505                    crate::metrics::record_write(&l1_label, timer.elapsed());
506                    match &result {
507                        Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
508                        Err(_) => crate::metrics::record_write_error(&l1_label),
509                    }
510                    result
511                };
512                let write_l2 = |k: CacheKey| async move {
513                    let timer = Timer::new();
514                    let result = l2_backend.write(&k, l2).await;
515                    crate::metrics::record_write(&l2_label, timer.elapsed());
516                    match &result {
517                        Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
518                        Err(_) => crate::metrics::record_write_error(&l2_label),
519                    }
520                    result
521                };
522
523                self.write_policy
524                    .execute_with(key.clone(), write_l1, write_l2, &self.offload)
525                    .await
526            }
527            CompositionEnvelope::L1(l1) => {
528                let l1_len = l1.data().len();
529                let timer = Timer::new();
530                let result = self.l1.write(key, l1).await;
531                crate::metrics::record_write(&self.l1_label, timer.elapsed());
532                match &result {
533                    Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
534                    Err(_) => crate::metrics::record_write_error(&self.l1_label),
535                }
536                result
537            }
538            CompositionEnvelope::L2(l2) => {
539                let l2_len = l2.data().len();
540                let timer = Timer::new();
541                let result = self.l2.write(key, l2).await;
542                crate::metrics::record_write(&self.l2_label, timer.elapsed());
543                match &result {
544                    Ok(()) => crate::metrics::record_write_bytes(&self.l2_label, l2_len),
545                    Err(_) => crate::metrics::record_write_error(&self.l2_label),
546                }
547                result
548            }
549        }
550    }
551
552    #[tracing::instrument(skip(self), level = "trace")]
553    async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
554        // Delete from both layers in parallel for better performance
555        let (l1_result, l2_result) = futures::join!(self.l1.remove(key), self.l2.remove(key));
556
557        match (l1_result, l2_result) {
558            (Err(e1), Err(e2)) => {
559                tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
560                Err(BackendError::InternalError(Box::new(
561                    CompositionError::BothLayersFailed { l1: e1, l2: e2 },
562                )))
563            }
564            (Err(e), Ok(status)) => {
565                tracing::warn!(error = ?e, "L1 delete failed");
566                Ok(status)
567            }
568            (Ok(status), Err(e)) => {
569                tracing::warn!(error = ?e, "L2 delete failed");
570                Ok(status)
571            }
572            (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
573                Ok(DeleteStatus::Deleted(n1 + n2))
574            }
575            (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
576            | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
577                Ok(DeleteStatus::Deleted(n))
578            }
579            (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => Ok(DeleteStatus::Missing),
580        }
581    }
582
583    fn label(&self) -> BackendLabel {
584        self.label.clone()
585    }
586
587    fn value_format(&self) -> &dyn Format {
588        &self.format
589    }
590
591    fn key_format(&self) -> &CacheKeyFormat {
592        &CacheKeyFormat::Bitcode
593    }
594
595    fn compressor(&self) -> &dyn Compressor {
596        &PassthroughCompressor
597    }
598}
599
600impl<L1, L2, O, R, W> CacheBackend for CompositionBackend<L1, L2, O, R, W>
601where
602    L1: CacheBackend + Clone + Send + Sync + 'static,
603    L2: CacheBackend + Clone + Send + Sync + 'static,
604    O: Offload<'static>,
605    R: CompositionReadPolicy,
606    W: CompositionWritePolicy,
607{
608    #[tracing::instrument(skip(self, ctx), level = "trace")]
609    async fn get<T>(
610        &self,
611        key: &CacheKey,
612        ctx: &mut BoxContext,
613    ) -> BackendResult<Option<CacheValue<T::Cached>>>
614    where
615        T: CacheableResponse,
616        T::Cached: Cacheable,
617    {
618        // Clone backends for 'static closures
619        let l1 = self.l1.clone();
620        let l2 = self.l2.clone();
621
622        // Use pre-computed composed labels for metrics
623        let l1_label = self.l1_label.clone();
624        let l2_label = self.l2_label.clone();
625
626        // Use inner backend labels for source path (merge_from adds composition prefix)
627        let l1_name = l1.label();
628        let l2_name = l2.label();
629
630        // Clone format for each closure
631        let format_for_l1 = self.format.clone();
632        let format_for_l2 = self.format.clone();
633
634        // Clone context for internal L1/L2 operations
635        let l1_ctx = ctx.clone_box();
636        let l2_ctx = ctx.clone_box();
637
638        let read_l1 = |k: CacheKey| async move {
639            let mut internal_ctx = l1_ctx;
640
641            // Read raw bytes from L1 with metrics
642            let read_timer = Timer::new();
643            let read_result = l1.read(&k).await;
644            crate::metrics::record_read(&l1_label, read_timer.elapsed());
645
646            let result = match read_result {
647                Ok(Some(raw_value)) => {
648                    let (meta, raw_data) = raw_value.into_parts();
649                    crate::metrics::record_read_bytes(&l1_label, raw_data.len());
650
651                    // Deserialize using CompositionFormat (records decompress/deserialize metrics)
652                    let mut deserialized_opt: Option<T::Cached> = None;
653                    match format_for_l1.deserialize_layer(
654                        &raw_data,
655                        CompositionLayer::L1,
656                        &mut |deserializer| {
657                            let value: T::Cached = deserializer.deserialize()?;
658                            deserialized_opt = Some(value);
659                            Ok(())
660                        },
661                        &mut internal_ctx,
662                    ) {
663                        Ok(()) => match deserialized_opt {
664                            Some(deserialized) => {
665                                // Set cache status
666                                internal_ctx.set_status(CacheStatus::Hit);
667
668                                // Get source from context (handles nested compositions)
669                                // If context was upgraded to CompositionContext, extract source from it
670                                let source = if let Some(comp_ctx) =
671                                    internal_ctx.as_any().downcast_ref::<CompositionContext>()
672                                {
673                                    // Nested composition: get label from inner format
674                                    BackendLabel::from(
675                                        comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
676                                    )
677                                } else {
678                                    // Simple backend: use backend name
679                                    l1_name.clone()
680                                };
681                                internal_ctx.set_source(ResponseSource::Backend(source));
682
683                                Ok(Some(CacheValue::new(deserialized, meta.expire, meta.stale)))
684                            }
685                            None => Err(BackendError::InternalError(Box::new(
686                                std::io::Error::other("deserialization produced no result"),
687                            ))),
688                        },
689                        Err(e) => Err(BackendError::InternalError(Box::new(e))),
690                    }
691                }
692                Ok(None) => Ok(None),
693                Err(e) => {
694                    crate::metrics::record_read_error(&l1_label);
695                    Err(e)
696                }
697            };
698
699            (result, internal_ctx)
700        };
701
702        let read_l2 = |k: CacheKey| async move {
703            let mut internal_ctx = l2_ctx;
704
705            // Read raw bytes from L2 with metrics
706            let read_timer = Timer::new();
707            let read_result = l2.read(&k).await;
708            crate::metrics::record_read(&l2_label, read_timer.elapsed());
709
710            let result = match read_result {
711                Ok(Some(raw_value)) => {
712                    let (meta, raw_data) = raw_value.into_parts();
713                    crate::metrics::record_read_bytes(&l2_label, raw_data.len());
714
715                    // Deserialize using CompositionFormat (records decompress/deserialize metrics)
716                    // Note: deserialize_layer upgrades context to CompositionContext with L2 layer,
717                    // which sets ReadMode::Refill - CacheFuture will handle the actual refill
718                    let mut deserialized_opt: Option<T::Cached> = None;
719                    match format_for_l2.deserialize_layer(
720                        &raw_data,
721                        CompositionLayer::L2,
722                        &mut |deserializer| {
723                            let value: T::Cached = deserializer.deserialize()?;
724                            deserialized_opt = Some(value);
725                            Ok(())
726                        },
727                        &mut internal_ctx,
728                    ) {
729                        Ok(()) => match deserialized_opt {
730                            Some(deserialized) => {
731                                let cache_value =
732                                    CacheValue::new(deserialized, meta.expire, meta.stale);
733
734                                // Set cache status and source for L2 hit
735                                internal_ctx.set_status(CacheStatus::Hit);
736
737                                // Get source from context (handles nested compositions)
738                                // If context was upgraded to CompositionContext, extract source from it
739                                let source = if let Some(comp_ctx) =
740                                    internal_ctx.as_any().downcast_ref::<CompositionContext>()
741                                {
742                                    // Nested composition: get label from inner format
743                                    BackendLabel::from(
744                                        comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
745                                    )
746                                } else {
747                                    // Simple backend: use backend name
748                                    l2_name.clone()
749                                };
750                                internal_ctx.set_source(ResponseSource::Backend(source));
751
752                                Ok(Some(cache_value))
753                            }
754                            None => Err(BackendError::InternalError(Box::new(
755                                std::io::Error::other("deserialization produced no result"),
756                            ))),
757                        },
758                        Err(e) => Err(BackendError::InternalError(Box::new(e))),
759                    }
760                }
761                Ok(None) => Ok(None),
762                Err(e) => {
763                    crate::metrics::record_read_error(&l2_label);
764                    Err(e)
765                }
766            };
767
768            (result, internal_ctx)
769        };
770
771        let ReadResult {
772            value,
773            source,
774            context: inner_ctx,
775        } = self
776            .read_policy
777            .execute_with(key.clone(), read_l1, read_l2, &self.offload)
778            .await?;
779
780        // Merge inner context into outer context, composing source paths
781        if let Some(ref _cache_value) = value {
782            ctx.merge_from(&*inner_ctx, &self.label);
783
784            // If L2 hit and refill policy is Always, set ReadMode::Refill
785            // CacheFuture will handle the actual refill via set()
786            if source == CompositionLayer::L2 && self.refill_policy == RefillPolicy::Always {
787                ctx.set_read_mode(hitbox_core::ReadMode::Refill);
788            }
789        }
790
791        Ok(value)
792    }
793
794    #[tracing::instrument(skip(self, value, ctx), level = "trace")]
795    async fn set<T>(
796        &self,
797        key: &CacheKey,
798        value: &CacheValue<T::Cached>,
799        ctx: &mut BoxContext,
800    ) -> BackendResult<()>
801    where
802        T: CacheableResponse,
803        T::Cached: Cacheable,
804    {
805        use hitbox_core::ReadMode;
806
807        // Check if this is a refill operation (triggered by CacheFuture after L2 hit)
808        // This happens when CacheBackend::get() sets ReadMode::Refill
809        if ctx.read_mode() == ReadMode::Refill {
810            match self.refill_policy {
811                RefillPolicy::Always => {
812                    // Refill L1 only - write serialized data to L1
813                    let l1_bytes = self
814                        .format
815                        .serialize_layer(
816                            CompositionLayer::L1,
817                            &mut |serializer| {
818                                serializer.serialize(value.data())?;
819                                Ok(())
820                            },
821                            &**ctx,
822                        )
823                        .map_err(|e| BackendError::InternalError(Box::new(e)))?;
824
825                    let l1_len = l1_bytes.len();
826                    let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
827
828                    // Write to L1 with metrics
829                    let timer = Timer::new();
830                    let result = self.l1.write(key, l1_value).await;
831                    crate::metrics::record_write(&self.l1_label, timer.elapsed());
832                    match &result {
833                        Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
834                        Err(_) => crate::metrics::record_write_error(&self.l1_label),
835                    }
836                    result?;
837
838                    // Recursively call L2.set() for nested refill
839                    // L2 (if it's a CompositionBackend) will handle its own refill logic
840                    return self.l2.set::<T>(key, value, ctx).await;
841                }
842                RefillPolicy::Never => {
843                    // With Never policy, don't refill at all
844                    // L2 already has the data (it's the source), so skip write
845                    return Ok(());
846                }
847            }
848        }
849
850        // Check if this is a nested refill operation via CompositionContext
851        // Each CompositionContext wraps an inner context and tracks which layer provided data
852        if let Some(comp_ctx) = ctx.as_any().downcast_ref::<CompositionContext>()
853            && comp_ctx.layer == CompositionLayer::L2
854        {
855            match self.refill_policy {
856                RefillPolicy::Always => {
857                    // This level needs refill: write to L1 only
858                    let l1_bytes = self
859                        .format
860                        .serialize_layer(
861                            CompositionLayer::L1,
862                            &mut |serializer| {
863                                serializer.serialize(value.data())?;
864                                Ok(())
865                            },
866                            &**ctx,
867                        )
868                        .map_err(|e| BackendError::InternalError(Box::new(e)))?;
869
870                    let l1_len = l1_bytes.len();
871                    let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
872
873                    // Write to L1 with metrics
874                    let timer = Timer::new();
875                    let result = self.l1.write(key, l1_value).await;
876                    crate::metrics::record_write(&self.l1_label, timer.elapsed());
877                    match &result {
878                        Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
879                        Err(_) => crate::metrics::record_write_error(&self.l1_label),
880                    }
881                    result?;
882
883                    // Recursively call L2.set() with inner context for nested refill
884                    // Inner context may be another CompositionContext (nested) or CacheContext (leaf)
885                    let mut inner_ctx = comp_ctx.inner().clone_box();
886                    return self.l2.set::<T>(key, value, &mut inner_ctx).await;
887                }
888                RefillPolicy::Never => {
889                    // Skip L1 write (no refill), but recurse to L2 for nested handling
890                    let mut inner_ctx = comp_ctx.inner().clone_box();
891                    return self.l2.set::<T>(key, value, &mut inner_ctx).await;
892                }
893            }
894        }
895
896        // Normal mode: write to both layers
897        // Serialize for both layers using CompositionFormat
898        // This handles same-format optimization and records metrics with composed labels
899        let (l1_bytes, l2_bytes) = self
900            .format
901            .serialize_parts(
902                &mut |serializer| {
903                    serializer.serialize(value.data())?;
904                    Ok(())
905                },
906                &**ctx,
907            )
908            .map_err(|e| BackendError::InternalError(Box::new(e)))?;
909
910        let l1_len = l1_bytes.len();
911        let l2_len = l2_bytes.len();
912
913        // Create raw values for Backend::write
914        let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
915        let l2_value = CacheValue::new(l2_bytes, value.expire(), value.stale());
916
917        // Clone backends for 'static closures
918        let l1 = self.l1.clone();
919        let l2 = self.l2.clone();
920
921        // Use pre-computed composed labels
922        let l1_label = self.l1_label.clone();
923        let l2_label = self.l2_label.clone();
924
925        // Write closures using Backend::write directly with composed labels
926        let write_l1 = |k: CacheKey| async move {
927            let timer = Timer::new();
928            let result = l1.write(&k, l1_value).await;
929            crate::metrics::record_write(&l1_label, timer.elapsed());
930            match &result {
931                Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
932                Err(_) => crate::metrics::record_write_error(&l1_label),
933            }
934            result
935        };
936
937        let write_l2 = |k: CacheKey| async move {
938            let timer = Timer::new();
939            let result = l2.write(&k, l2_value).await;
940            crate::metrics::record_write(&l2_label, timer.elapsed());
941            match &result {
942                Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
943                Err(_) => crate::metrics::record_write_error(&l2_label),
944            }
945            result
946        };
947
948        self.write_policy
949            .execute_with(key.clone(), write_l1, write_l2, &self.offload)
950            .await
951    }
952
953    #[tracing::instrument(skip(self, ctx), level = "trace")]
954    async fn delete(&self, key: &CacheKey, ctx: &mut BoxContext) -> BackendResult<DeleteStatus> {
955        // Delete from both layers in parallel for better performance
956        let mut l1_ctx = ctx.clone_box();
957        let mut l2_ctx = ctx.clone_box();
958        let (l1_result, l2_result) = futures::join!(
959            self.l1.delete(key, &mut l1_ctx),
960            self.l2.delete(key, &mut l2_ctx)
961        );
962
963        // Aggregate results
964        match (l1_result, l2_result) {
965            (Err(e1), Err(e2)) => {
966                tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
967                Err(BackendError::InternalError(Box::new(
968                    CompositionError::BothLayersFailed { l1: e1, l2: e2 },
969                )))
970            }
971            (Err(e), Ok(status)) => {
972                tracing::warn!(error = ?e, "L1 delete failed");
973                Ok(status)
974            }
975            (Ok(status), Err(e)) => {
976                tracing::warn!(error = ?e, "L2 delete failed");
977                Ok(status)
978            }
979            (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
980                tracing::trace!("Deleted from both L1 and L2");
981                Ok(DeleteStatus::Deleted(n1 + n2))
982            }
983            (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
984            | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
985                tracing::trace!("Deleted from one layer");
986                Ok(DeleteStatus::Deleted(n))
987            }
988            (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => {
989                tracing::trace!("Key missing from both layers");
990                Ok(DeleteStatus::Missing)
991            }
992        }
993    }
994}
995
996#[cfg(test)]
997mod tests {
998    use super::*;
999    use crate::format::{Format, JsonFormat};
1000    use crate::{Backend, CacheKeyFormat, Compressor, PassthroughCompressor};
1001    use async_trait::async_trait;
1002    use chrono::Utc;
1003    use hitbox_core::{
1004        BoxContext, CacheContext, CachePolicy, CacheStatus, CacheValue, CacheableResponse,
1005        EntityPolicyConfig, Predicate, Raw, ResponseSource,
1006    };
1007    use serde::{Deserialize, Serialize};
1008    use smol_str::SmolStr;
1009    use std::collections::HashMap;
1010    use std::future::Future;
1011    use std::sync::{Arc, Mutex};
1012
1013    #[cfg(feature = "rkyv_format")]
1014    use rkyv::{Archive, Serialize as RkyvSerialize};
1015
1016    /// Test offload that spawns tasks with tokio::spawn
1017    #[derive(Clone, Debug)]
1018    struct TestOffload;
1019
1020    impl Offload<'static> for TestOffload {
1021        fn spawn<F>(&self, _kind: impl Into<SmolStr>, future: F)
1022        where
1023            F: Future<Output = ()> + Send + 'static,
1024        {
1025            tokio::spawn(future);
1026        }
1027    }
1028
1029    // Simple in-memory backend for testing
1030    #[derive(Clone, Debug)]
1031    struct TestBackend {
1032        store: Arc<Mutex<HashMap<CacheKey, CacheValue<Raw>>>>,
1033        backend_label: &'static str,
1034    }
1035
1036    impl TestBackend {
1037        fn new() -> Self {
1038            Self {
1039                store: Arc::new(Mutex::new(HashMap::new())),
1040                backend_label: "test",
1041            }
1042        }
1043
1044        fn with_label(label: &'static str) -> Self {
1045            Self {
1046                store: Arc::new(Mutex::new(HashMap::new())),
1047                backend_label: label,
1048            }
1049        }
1050    }
1051
1052    #[async_trait]
1053    impl Backend for TestBackend {
1054        async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
1055            Ok(self.store.lock().unwrap().get(key).cloned())
1056        }
1057
1058        async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
1059            self.store.lock().unwrap().insert(key.clone(), value);
1060            Ok(())
1061        }
1062
1063        async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
1064            match self.store.lock().unwrap().remove(key) {
1065                Some(_) => Ok(DeleteStatus::Deleted(1)),
1066                None => Ok(DeleteStatus::Missing),
1067            }
1068        }
1069
1070        fn label(&self) -> BackendLabel {
1071            BackendLabel::new(self.backend_label)
1072        }
1073
1074        fn value_format(&self) -> &dyn Format {
1075            &JsonFormat
1076        }
1077
1078        fn key_format(&self) -> &CacheKeyFormat {
1079            &CacheKeyFormat::Bitcode
1080        }
1081
1082        fn compressor(&self) -> &dyn Compressor {
1083            &PassthroughCompressor
1084        }
1085    }
1086
1087    impl CacheBackend for TestBackend {}
1088
1089    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1090    #[cfg_attr(
1091        feature = "rkyv_format",
1092        derive(Archive, RkyvSerialize, rkyv::Deserialize)
1093    )]
1094    struct CachedData {
1095        value: String,
1096    }
1097
1098    // Mock CacheableResponse for testing
1099    // We only need the associated type, the actual methods are not used in these tests
1100    struct MockResponse;
1101
1102    // Note: This is a minimal implementation just for testing CacheBackend.
1103    // The methods are not actually called in these tests.
1104    impl CacheableResponse for MockResponse {
1105        type Cached = CachedData;
1106        type Subject = MockResponse;
1107        type IntoCachedFuture = std::future::Ready<CachePolicy<Self::Cached, Self>>;
1108        type FromCachedFuture = std::future::Ready<Self>;
1109
1110        async fn cache_policy<P: Predicate<Subject = Self::Subject> + Send + Sync>(
1111            self,
1112            _predicate: P,
1113            _config: &EntityPolicyConfig,
1114        ) -> CachePolicy<CacheValue<Self::Cached>, Self> {
1115            unimplemented!("Not used in these tests")
1116        }
1117
1118        fn into_cached(self) -> Self::IntoCachedFuture {
1119            unimplemented!("Not used in these tests")
1120        }
1121
1122        fn from_cached(_cached: Self::Cached) -> Self::FromCachedFuture {
1123            unimplemented!("Not used in these tests")
1124        }
1125    }
1126
1127    #[tokio::test]
1128    async fn test_l1_hit() {
1129        let l1 = TestBackend::with_label("moka");
1130        let l2 = TestBackend::with_label("redis");
1131        let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1132
1133        let key = CacheKey::from_str("test", "key1");
1134        let value = CacheValue::new(
1135            CachedData {
1136                value: "value1".to_string(),
1137            },
1138            Some(Utc::now() + chrono::Duration::seconds(60)),
1139            None,
1140        );
1141
1142        // Write to populate both layers
1143        let mut ctx: BoxContext = CacheContext::default().boxed();
1144        backend
1145            .set::<MockResponse>(&key, &value, &mut ctx)
1146            .await
1147            .unwrap();
1148
1149        // Read should hit L1
1150        let mut ctx: BoxContext = CacheContext::default().boxed();
1151        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1152        assert_eq!(result.unwrap().data().value, "value1");
1153
1154        // Verify source path is composed correctly: "cache.moka"
1155        assert_eq!(ctx.status(), CacheStatus::Hit);
1156        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1157    }
1158
1159    #[tokio::test]
1160    async fn test_l2_hit_sets_refill_mode() {
1161        use hitbox_core::ReadMode;
1162
1163        let l1 = TestBackend::with_label("moka");
1164        let l2 = TestBackend::with_label("redis");
1165
1166        let key = CacheKey::from_str("test", "key1");
1167        let value = CacheValue::new(
1168            CachedData {
1169                value: "value1".to_string(),
1170            },
1171            Some(Utc::now() + chrono::Duration::seconds(60)),
1172            None,
1173        );
1174
1175        // Backend with RefillPolicy::Always
1176        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1177            .label("cache")
1178            .refill(RefillPolicy::Always);
1179
1180        // Write through CompositionBackend (populates both L1 and L2)
1181        let mut ctx: BoxContext = CacheContext::default().boxed();
1182        backend
1183            .set::<MockResponse>(&key, &value, &mut ctx)
1184            .await
1185            .unwrap();
1186
1187        // Clear L1 to simulate L1 miss scenario
1188        l1.store.lock().unwrap().clear();
1189
1190        // Read should hit L2 and set ReadMode::Refill
1191        let mut ctx: BoxContext = CacheContext::default().boxed();
1192        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1193        assert_eq!(result.unwrap().data().value, "value1");
1194
1195        // Verify source path is composed correctly: "cache.redis" (hit L2)
1196        assert_eq!(ctx.status(), CacheStatus::Hit);
1197        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1198
1199        // Verify ReadMode::Refill is set (CacheFuture will use this to call set())
1200        assert_eq!(ctx.read_mode(), ReadMode::Refill);
1201
1202        // L1 should NOT be populated yet (refill happens via CacheFuture.set())
1203        let mut ctx: BoxContext = CacheContext::default().boxed();
1204        let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1205        assert!(
1206            l1_result.is_none(),
1207            "L1 should not be populated directly by get()"
1208        );
1209    }
1210
1211    #[tokio::test]
1212    async fn test_miss_both_layers() {
1213        let l1 = TestBackend::new();
1214        let l2 = TestBackend::new();
1215        let backend = CompositionBackend::new(l1, l2, TestOffload);
1216
1217        let key = CacheKey::from_str("test", "nonexistent");
1218
1219        let mut ctx: BoxContext = CacheContext::default().boxed();
1220        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1221        assert!(result.is_none());
1222    }
1223
1224    #[tokio::test]
1225    async fn test_write_to_both_layers() {
1226        let l1 = TestBackend::new();
1227        let l2 = TestBackend::new();
1228
1229        let key = CacheKey::from_str("test", "key1");
1230        let value = CacheValue::new(
1231            CachedData {
1232                value: "value1".to_string(),
1233            },
1234            Some(Utc::now() + chrono::Duration::seconds(60)),
1235            None,
1236        );
1237
1238        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1239
1240        let mut ctx: BoxContext = CacheContext::default().boxed();
1241        backend
1242            .set::<MockResponse>(&key, &value, &mut ctx)
1243            .await
1244            .unwrap();
1245
1246        // Verify both layers have the value
1247        let mut ctx: BoxContext = CacheContext::default().boxed();
1248        let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1249        assert_eq!(l1_result.unwrap().data().value, "value1");
1250
1251        let mut ctx: BoxContext = CacheContext::default().boxed();
1252        let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1253        assert_eq!(l2_result.unwrap().data().value, "value1");
1254    }
1255
1256    #[tokio::test]
1257    async fn test_delete_from_both_layers() {
1258        let l1 = TestBackend::new();
1259        let l2 = TestBackend::new();
1260
1261        let key = CacheKey::from_str("test", "key1");
1262        let value = CacheValue::new(
1263            CachedData {
1264                value: "value1".to_string(),
1265            },
1266            Some(Utc::now() + chrono::Duration::seconds(60)),
1267            None,
1268        );
1269
1270        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1271
1272        // Write to both
1273        let mut ctx: BoxContext = CacheContext::default().boxed();
1274        backend
1275            .set::<MockResponse>(&key, &value, &mut ctx)
1276            .await
1277            .unwrap();
1278
1279        // Delete from both
1280        let mut ctx: BoxContext = CacheContext::default().boxed();
1281        let status = backend.delete(&key, &mut ctx).await.unwrap();
1282        assert_eq!(status, DeleteStatus::Deleted(2));
1283
1284        // Verify both layers no longer have the value
1285        let mut ctx: BoxContext = CacheContext::default().boxed();
1286        let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1287        assert!(l1_result.is_none());
1288
1289        let mut ctx: BoxContext = CacheContext::default().boxed();
1290        let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1291        assert!(l2_result.is_none());
1292    }
1293
1294    #[tokio::test]
1295    async fn test_clone() {
1296        let l1 = TestBackend::new();
1297        let l2 = TestBackend::new();
1298        let backend = CompositionBackend::new(l1, l2, TestOffload);
1299
1300        let cloned = backend.clone();
1301
1302        let key = CacheKey::from_str("test", "key1");
1303        let value = CacheValue::new(
1304            CachedData {
1305                value: "value1".to_string(),
1306            },
1307            Some(Utc::now() + chrono::Duration::seconds(60)),
1308            None,
1309        );
1310
1311        // Write via original
1312        let mut ctx: BoxContext = CacheContext::default().boxed();
1313        backend
1314            .set::<MockResponse>(&key, &value, &mut ctx)
1315            .await
1316            .unwrap();
1317
1318        // Read via clone should work (shared backends)
1319        let mut ctx: BoxContext = CacheContext::default().boxed();
1320        let result = cloned.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1321        assert_eq!(result.unwrap().data().value, "value1");
1322    }
1323
1324    #[tokio::test]
1325    async fn test_nested_composition_source_path() {
1326        // Create a nested composition: outer(inner(l1, l2), l3)
1327        // to test hierarchical source paths like "outer.inner.moka"
1328
1329        let l1 = TestBackend::with_label("moka");
1330        let l2 = TestBackend::with_label("redis");
1331        let l3 = TestBackend::with_label("disk");
1332
1333        // Inner composition: L1=moka, L2=redis
1334        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1335
1336        // Outer composition: L1=inner, L2=disk
1337        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1338
1339        let key = CacheKey::from_str("test", "nested");
1340        let value = CacheValue::new(
1341            CachedData {
1342                value: "nested_value".to_string(),
1343            },
1344            Some(Utc::now() + chrono::Duration::seconds(60)),
1345            None,
1346        );
1347
1348        // Write only to innermost L1 (moka)
1349        let mut ctx: BoxContext = CacheContext::default().boxed();
1350        l1.set::<MockResponse>(&key, &value, &mut ctx)
1351            .await
1352            .unwrap();
1353
1354        // Read through outer composition - should hit inner.L1 (moka)
1355        let mut ctx: BoxContext = CacheContext::default().boxed();
1356        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1357        assert_eq!(result.unwrap().data().value, "nested_value");
1358
1359        // Verify nested source path: "outer.inner.moka"
1360        assert_eq!(ctx.status(), CacheStatus::Hit);
1361        assert_eq!(
1362            ctx.source(),
1363            &ResponseSource::Backend("outer.inner.moka".into())
1364        );
1365    }
1366
1367    #[tokio::test]
1368    async fn test_nested_composition_l2_source_path() {
1369        // Test nested composition where hit comes from inner L2
1370
1371        let l1 = TestBackend::with_label("moka");
1372        let l2 = TestBackend::with_label("redis");
1373        let l3 = TestBackend::with_label("disk");
1374
1375        // Inner composition: L1=moka, L2=redis
1376        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1377
1378        // Outer composition: L1=inner, L2=disk
1379        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1380
1381        let key = CacheKey::from_str("test", "nested_l2");
1382        let value = CacheValue::new(
1383            CachedData {
1384                value: "from_redis".to_string(),
1385            },
1386            Some(Utc::now() + chrono::Duration::seconds(60)),
1387            None,
1388        );
1389
1390        // Write only to inner L2 (redis) - not to moka
1391        let mut ctx: BoxContext = CacheContext::default().boxed();
1392        l2.set::<MockResponse>(&key, &value, &mut ctx)
1393            .await
1394            .unwrap();
1395
1396        // Read through outer composition - should hit inner.L2 (redis)
1397        let mut ctx: BoxContext = CacheContext::default().boxed();
1398        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1399        assert_eq!(result.unwrap().data().value, "from_redis");
1400
1401        // Verify nested source path: "outer.inner.redis"
1402        assert_eq!(ctx.status(), CacheStatus::Hit);
1403        assert_eq!(
1404            ctx.source(),
1405            &ResponseSource::Backend("outer.inner.redis".into())
1406        );
1407    }
1408
1409    #[tokio::test]
1410    async fn test_nested_composition_outer_l2_source_path() {
1411        // Test nested composition where hit comes from outer L2 (disk)
1412
1413        let l1 = TestBackend::with_label("moka");
1414        let l2 = TestBackend::with_label("redis");
1415        let l3 = TestBackend::with_label("disk");
1416
1417        // Inner composition: L1=moka, L2=redis
1418        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1419
1420        // Outer composition: L1=inner, L2=disk
1421        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1422
1423        let key = CacheKey::from_str("test", "outer_l2");
1424        let value = CacheValue::new(
1425            CachedData {
1426                value: "from_disk".to_string(),
1427            },
1428            Some(Utc::now() + chrono::Duration::seconds(60)),
1429            None,
1430        );
1431
1432        // Write only to outer L2 (disk) - not to inner composition
1433        let mut ctx: BoxContext = CacheContext::default().boxed();
1434        l3.set::<MockResponse>(&key, &value, &mut ctx)
1435            .await
1436            .unwrap();
1437
1438        // Read through outer composition - should hit outer L2 (disk)
1439        let mut ctx: BoxContext = CacheContext::default().boxed();
1440        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1441        assert_eq!(result.unwrap().data().value, "from_disk");
1442
1443        // Verify source path: "outer.disk"
1444        assert_eq!(ctx.status(), CacheStatus::Hit);
1445        assert_eq!(ctx.source(), &ResponseSource::Backend("outer.disk".into()));
1446    }
1447
1448    #[tokio::test]
1449    async fn test_l1_hit_status() {
1450        let l1 = TestBackend::with_label("moka");
1451        let l2 = TestBackend::with_label("redis");
1452        let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1453
1454        let key = CacheKey::from_str("test", "metrics1");
1455        let value = CacheValue::new(
1456            CachedData {
1457                value: "value1".to_string(),
1458            },
1459            Some(Utc::now() + chrono::Duration::seconds(60)),
1460            None,
1461        );
1462
1463        // Write directly to L1 backend to set up the test
1464        let mut ctx: BoxContext = CacheContext::default().boxed();
1465        l1.set::<MockResponse>(&key, &value, &mut ctx)
1466            .await
1467            .unwrap();
1468
1469        // Read through composition should hit L1
1470        let mut ctx: BoxContext = CacheContext::default().boxed();
1471        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1472        assert_eq!(result.unwrap().data().value, "value1");
1473
1474        // Verify status and source
1475        assert_eq!(ctx.status(), CacheStatus::Hit);
1476        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1477    }
1478
1479    #[tokio::test]
1480    async fn test_l2_hit_with_refill_via_set() {
1481        use hitbox_core::ReadMode;
1482
1483        let l1 = TestBackend::with_label("moka");
1484        let l2 = TestBackend::with_label("redis");
1485
1486        let key = CacheKey::from_str("test", "metrics2");
1487        let value = CacheValue::new(
1488            CachedData {
1489                value: "from_l2".to_string(),
1490            },
1491            Some(Utc::now() + chrono::Duration::seconds(60)),
1492            None,
1493        );
1494
1495        let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1496            .label("cache")
1497            .refill(RefillPolicy::Always);
1498
1499        // Write through CompositionBackend (populates both L1 and L2)
1500        let mut ctx: BoxContext = CacheContext::default().boxed();
1501        backend
1502            .set::<MockResponse>(&key, &value, &mut ctx)
1503            .await
1504            .unwrap();
1505
1506        // Clear L1 to simulate L1 miss scenario
1507        l1.store.lock().unwrap().clear();
1508
1509        // Read should hit L2 and set ReadMode::Refill
1510        let mut ctx: BoxContext = CacheContext::default().boxed();
1511        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1512        let cached_value = result.unwrap();
1513        assert_eq!(cached_value.data().value, "from_l2");
1514
1515        // Verify status and source - L2 hit
1516        assert_eq!(ctx.status(), CacheStatus::Hit);
1517        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1518        assert_eq!(ctx.read_mode(), ReadMode::Refill);
1519
1520        // Simulate CacheFuture calling set() with refill context (only writes to L1)
1521        backend
1522            .set::<MockResponse>(&key, &cached_value, &mut ctx)
1523            .await
1524            .unwrap();
1525
1526        // Verify L1 was refilled - read again should hit L1
1527        let mut ctx: BoxContext = CacheContext::default().boxed();
1528        let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1529        assert_eq!(result.unwrap().data().value, "from_l2");
1530        assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1531    }
1532
1533    #[tokio::test]
1534    async fn test_nested_composition_status() {
1535        let l1 = TestBackend::with_label("moka");
1536        let l2 = TestBackend::with_label("redis");
1537        let l3 = TestBackend::with_label("disk");
1538
1539        let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1540        let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1541
1542        let key = CacheKey::from_str("test", "nested_metrics");
1543        let value = CacheValue::new(
1544            CachedData {
1545                value: "nested".to_string(),
1546            },
1547            Some(Utc::now() + chrono::Duration::seconds(60)),
1548            None,
1549        );
1550
1551        // Write to innermost L1 (moka)
1552        let mut ctx: BoxContext = CacheContext::default().boxed();
1553        l1.set::<MockResponse>(&key, &value, &mut ctx)
1554            .await
1555            .unwrap();
1556
1557        // Read through outer composition
1558        let mut ctx: BoxContext = CacheContext::default().boxed();
1559        let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1560        assert_eq!(result.unwrap().data().value, "nested");
1561
1562        // Verify nested source path
1563        assert_eq!(ctx.status(), CacheStatus::Hit);
1564        assert_eq!(
1565            ctx.source(),
1566            &ResponseSource::Backend("outer.inner.moka".into())
1567        );
1568    }
1569}