hitbox_backend/composition/policy/read/parallel.rs
1//! Parallel read policy implementation.
2//!
3//! This policy queries both L1 and L2 simultaneously and waits for both to complete,
4//! preferring the response with the longest remaining TTL (freshest data).
5
6use async_trait::async_trait;
7use hitbox_core::{BoxContext, CacheContext, CacheKey, CacheValue, Offload};
8use std::future::Future;
9
10use super::{CompositionReadPolicy, ReadResult};
11use crate::composition::CompositionLayer;
12
13/// Parallel read policy: Query both L1 and L2 in parallel, prefer freshest data (by TTL).
14///
15/// This strategy provides:
16/// - Freshness guarantee (returns data with longest remaining TTL)
17/// - Cache warming (keeps both layers hot)
18/// - Observability into both layer performance
19/// - Natural cache coherency (prefers recently updated data)
20///
21/// # Behavior
22/// 1. Start both `read_l1(key)` and `read_l2(key)` in parallel
23/// 2. Wait for **both** to complete
24/// 3. Compare TTLs and prefer the response with **longest remaining TTL**
25/// 4. Fall back to any available value if one layer misses/errors
26///
27/// # TTL Comparison Rules
28/// When both L1 and L2 have data:
29/// - Compare remaining TTLs using `CacheValue::ttl()`
30/// - Prefer response with longer TTL (fresher data)
31/// - If one has no expiry (`None` TTL), prefer it (infinite freshness)
32/// - If TTLs are equal, prefer L1 (tie-breaker)
33/// - If both have no expiry, prefer L1 (tie-breaker)
34///
35/// # Tradeoffs
36/// - **Pros**: Freshness guarantee, handles L1/L2 consistency naturally, production-viable
37/// - **Cons**: 2x backend load, latency limited by slower backend
38///
39/// # Use Cases
40/// - Production systems where data freshness is critical
41/// - Multi-region setups where L2 may get updated first
42/// - Cache warming while ensuring freshest data
43/// - Validating L1/L2 consistency
44/// - Monitoring both layer health
45///
46/// # Note
47/// Unlike `RaceReadPolicy`, this policy always waits for both backends to complete,
48/// making it slower but providing freshness guarantees and better observability.
49#[derive(Debug, Clone, Copy, Default)]
50pub struct ParallelReadPolicy;
51
52impl ParallelReadPolicy {
53 /// Create a new parallel read policy.
54 pub fn new() -> Self {
55 Self
56 }
57}
58
59#[async_trait]
60impl CompositionReadPolicy for ParallelReadPolicy {
61 #[tracing::instrument(skip(self, key, read_l1, read_l2, _offload), level = "trace")]
62 async fn execute_with<T, E, F1, F2, Fut1, Fut2, O>(
63 &self,
64 key: CacheKey,
65 read_l1: F1,
66 read_l2: F2,
67 _offload: &O,
68 ) -> Result<ReadResult<T>, E>
69 where
70 T: Send + 'static,
71 E: Send + std::fmt::Debug + 'static,
72 F1: FnOnce(CacheKey) -> Fut1 + Send,
73 F2: FnOnce(CacheKey) -> Fut2 + Send,
74 Fut1: Future<Output = (Result<Option<CacheValue<T>>, E>, BoxContext)> + Send + 'static,
75 Fut2: Future<Output = (Result<Option<CacheValue<T>>, E>, BoxContext)> + Send + 'static,
76 O: Offload<'static>,
77 {
78 // Query both in parallel and wait for both to complete
79 let ((l1_result, l1_ctx), (l2_result, l2_ctx)) =
80 futures::join!(read_l1(key.clone()), read_l2(key));
81
82 // Aggregate results, preferring freshest data (by TTL)
83 match (l1_result, l2_result) {
84 // Both hit - compare TTLs to select freshest
85 (Ok(Some(l1_value)), Ok(Some(l2_value))) => {
86 // Compare TTLs: prefer longer remaining TTL, or L1 on tie
87 match (l1_value.ttl(), l2_value.ttl()) {
88 (Some(l1_ttl), Some(l2_ttl)) if l2_ttl > l1_ttl => {
89 tracing::trace!("Both hit, preferring L2 (fresher TTL)");
90 Ok(ReadResult {
91 value: Some(l2_value),
92 source: CompositionLayer::L2,
93 context: l2_ctx,
94 })
95 }
96 (Some(_), None) => {
97 tracing::trace!("Both hit, preferring L2 (no expiry)");
98 Ok(ReadResult {
99 value: Some(l2_value),
100 source: CompositionLayer::L2,
101 context: l2_ctx,
102 })
103 }
104 _ => {
105 // L1 >= L2, or L1 has no expiry, or both no expiry - prefer L1
106 tracing::trace!("Both hit, preferring L1 (fresher or equal TTL)");
107 Ok(ReadResult {
108 value: Some(l1_value),
109 source: CompositionLayer::L1,
110 context: l1_ctx,
111 })
112 }
113 }
114 }
115 // L1 hit, L2 miss/error
116 (Ok(Some(value)), _) => {
117 tracing::trace!("L1 hit, L2 miss/error");
118 Ok(ReadResult {
119 value: Some(value),
120 source: CompositionLayer::L1,
121 context: l1_ctx,
122 })
123 }
124 // L2 hit, L1 miss/error
125 (_, Ok(Some(value))) => {
126 tracing::trace!("L2 hit, L1 miss/error");
127 Ok(ReadResult {
128 value: Some(value),
129 source: CompositionLayer::L2,
130 context: l2_ctx,
131 })
132 }
133 // Both miss
134 (Ok(None), Ok(None)) => {
135 tracing::trace!("Both layers miss");
136 Ok(ReadResult {
137 value: None,
138 source: CompositionLayer::L2,
139 context: CacheContext::default().boxed(),
140 })
141 }
142 // Both error
143 (Err(e1), Err(e2)) => {
144 tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both layers failed");
145 Err(e2)
146 }
147 // One error, one miss
148 (Ok(None), Err(e)) | (Err(e), Ok(None)) => {
149 tracing::warn!(error = ?e, "One layer failed, one missed");
150 Ok(ReadResult {
151 value: None,
152 source: CompositionLayer::L2,
153 context: CacheContext::default().boxed(),
154 })
155 }
156 }
157 }
158}