Skip to main content

hitbox_backend/composition/policy/read/
parallel.rs

1//! Parallel read policy implementation.
2//!
3//! This policy queries both L1 and L2 simultaneously and waits for both to complete,
4//! preferring the response with the longest remaining TTL (freshest data).
5
6use async_trait::async_trait;
7use hitbox_core::{BoxContext, CacheContext, CacheKey, CacheValue, Offload};
8use std::future::Future;
9
10use super::{CompositionReadPolicy, ReadResult};
11use crate::composition::CompositionLayer;
12
13/// Parallel read policy: Query both L1 and L2 in parallel, prefer freshest data (by TTL).
14///
15/// This strategy provides:
16/// - Freshness guarantee (returns data with longest remaining TTL)
17/// - Cache warming (keeps both layers hot)
18/// - Observability into both layer performance
19/// - Natural cache coherency (prefers recently updated data)
20///
21/// # Behavior
22/// 1. Start both `read_l1(key)` and `read_l2(key)` in parallel
23/// 2. Wait for **both** to complete
24/// 3. Compare TTLs and prefer the response with **longest remaining TTL**
25/// 4. Fall back to any available value if one layer misses/errors
26///
27/// # TTL Comparison Rules
28/// When both L1 and L2 have data:
29/// - Compare remaining TTLs using `CacheValue::ttl()`
30/// - Prefer response with longer TTL (fresher data)
31/// - If one has no expiry (`None` TTL), prefer it (infinite freshness)
32/// - If TTLs are equal, prefer L1 (tie-breaker)
33/// - If both have no expiry, prefer L1 (tie-breaker)
34///
35/// # Tradeoffs
36/// - **Pros**: Freshness guarantee, handles L1/L2 consistency naturally, production-viable
37/// - **Cons**: 2x backend load, latency limited by slower backend
38///
39/// # Use Cases
40/// - Production systems where data freshness is critical
41/// - Multi-region setups where L2 may get updated first
42/// - Cache warming while ensuring freshest data
43/// - Validating L1/L2 consistency
44/// - Monitoring both layer health
45///
46/// # Note
47/// Unlike `RaceReadPolicy`, this policy always waits for both backends to complete,
48/// making it slower but providing freshness guarantees and better observability.
49#[derive(Debug, Clone, Copy, Default)]
50pub struct ParallelReadPolicy;
51
52impl ParallelReadPolicy {
53    /// Create a new parallel read policy.
54    pub fn new() -> Self {
55        Self
56    }
57}
58
59#[async_trait]
60impl CompositionReadPolicy for ParallelReadPolicy {
61    #[tracing::instrument(skip(self, key, read_l1, read_l2, _offload), level = "trace")]
62    async fn execute_with<T, E, F1, F2, Fut1, Fut2, O>(
63        &self,
64        key: CacheKey,
65        read_l1: F1,
66        read_l2: F2,
67        _offload: &O,
68    ) -> Result<ReadResult<T>, E>
69    where
70        T: Send + 'static,
71        E: Send + std::fmt::Debug + 'static,
72        F1: FnOnce(CacheKey) -> Fut1 + Send,
73        F2: FnOnce(CacheKey) -> Fut2 + Send,
74        Fut1: Future<Output = (Result<Option<CacheValue<T>>, E>, BoxContext)> + Send + 'static,
75        Fut2: Future<Output = (Result<Option<CacheValue<T>>, E>, BoxContext)> + Send + 'static,
76        O: Offload<'static>,
77    {
78        // Query both in parallel and wait for both to complete
79        let ((l1_result, l1_ctx), (l2_result, l2_ctx)) =
80            futures::join!(read_l1(key.clone()), read_l2(key));
81
82        // Aggregate results, preferring freshest data (by TTL)
83        match (l1_result, l2_result) {
84            // Both hit - compare TTLs to select freshest
85            (Ok(Some(l1_value)), Ok(Some(l2_value))) => {
86                // Compare TTLs: prefer longer remaining TTL, or L1 on tie
87                match (l1_value.ttl(), l2_value.ttl()) {
88                    (Some(l1_ttl), Some(l2_ttl)) if l2_ttl > l1_ttl => {
89                        tracing::trace!("Both hit, preferring L2 (fresher TTL)");
90                        Ok(ReadResult {
91                            value: Some(l2_value),
92                            source: CompositionLayer::L2,
93                            context: l2_ctx,
94                        })
95                    }
96                    (Some(_), None) => {
97                        tracing::trace!("Both hit, preferring L2 (no expiry)");
98                        Ok(ReadResult {
99                            value: Some(l2_value),
100                            source: CompositionLayer::L2,
101                            context: l2_ctx,
102                        })
103                    }
104                    _ => {
105                        // L1 >= L2, or L1 has no expiry, or both no expiry - prefer L1
106                        tracing::trace!("Both hit, preferring L1 (fresher or equal TTL)");
107                        Ok(ReadResult {
108                            value: Some(l1_value),
109                            source: CompositionLayer::L1,
110                            context: l1_ctx,
111                        })
112                    }
113                }
114            }
115            // L1 hit, L2 miss/error
116            (Ok(Some(value)), _) => {
117                tracing::trace!("L1 hit, L2 miss/error");
118                Ok(ReadResult {
119                    value: Some(value),
120                    source: CompositionLayer::L1,
121                    context: l1_ctx,
122                })
123            }
124            // L2 hit, L1 miss/error
125            (_, Ok(Some(value))) => {
126                tracing::trace!("L2 hit, L1 miss/error");
127                Ok(ReadResult {
128                    value: Some(value),
129                    source: CompositionLayer::L2,
130                    context: l2_ctx,
131                })
132            }
133            // Both miss
134            (Ok(None), Ok(None)) => {
135                tracing::trace!("Both layers miss");
136                Ok(ReadResult {
137                    value: None,
138                    source: CompositionLayer::L2,
139                    context: CacheContext::default().boxed(),
140                })
141            }
142            // Both error
143            (Err(e1), Err(e2)) => {
144                tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both layers failed");
145                Err(e2)
146            }
147            // One error, one miss
148            (Ok(None), Err(e)) | (Err(e), Ok(None)) => {
149                tracing::warn!(error = ?e, "One layer failed, one missed");
150                Ok(ReadResult {
151                    value: None,
152                    source: CompositionLayer::L2,
153                    context: CacheContext::default().boxed(),
154                })
155            }
156        }
157    }
158}