hitbox_backend/composition/policy/write/race.rs
1//! Race write policy implementation.
2//!
3//! This policy writes to both L1 and L2 simultaneously and returns as soon as
4//! the first write succeeds, handling the losing future based on the configured policy.
5
6use async_trait::async_trait;
7use futures::future::{Either, select};
8use hitbox_core::{CacheKey, Offload};
9use std::future::Future;
10
11use super::CompositionWritePolicy;
12use crate::BackendError;
13use crate::composition::CompositionError;
14
15/// Policy for handling the losing future in a race.
16#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
17pub enum RaceLoserPolicy {
18 /// Offload the losing future to background execution.
19 /// This ensures the operation completes without blocking the response.
20 #[default]
21 Offload,
22 /// Drop the losing future immediately.
23 /// The operation may be cancelled mid-flight.
24 Drop,
25}
26
27/// Race write policy: Write to both simultaneously, return on first success.
28///
29/// This strategy provides:
30/// - Minimal write latency (return as soon as one succeeds)
31/// - High availability (succeeds if either layer is available)
32///
33/// # Behavior
34/// 1. Start both `write_l1(key)` and `write_l2(key)` in parallel using `select`
35/// 2. When the first completes:
36/// - If success: Return Ok immediately, handle losing future based on policy
37/// - If failure: Wait for the second to complete
38/// 3. If both fail: Return error with both failures
39///
40/// # Loser Policy
41/// When one backend wins with a successful write, the losing future can be:
42/// - `RaceLoserPolicy::Offload` (default): Spawned to background, completes without blocking
43/// - `RaceLoserPolicy::Drop`: Dropped immediately, may cancel mid-operation
44///
45/// # Consistency Guarantee
46/// If this operation returns `Ok(())`, **at least one** of L1 or L2 has been updated.
47/// With `RaceLoserPolicy::Offload`, the other layer will eventually be updated (unless it fails).
48/// With `RaceLoserPolicy::Drop`, the other layer may or may not be updated.
49///
50/// # Tradeoffs
51/// - **Pros**: Lowest latency, high availability
52/// - **Cons**: Only one layer guaranteed to be written at return time
53///
54/// # Use Cases
55/// - Latency-critical write paths where one layer is sufficient
56/// - Caches with background reconciliation
57/// - Write-heavy workloads where L2 persistence is less critical
58///
59/// # Note
60/// If you need both layers to be written, use [`super::OptimisticParallelWritePolicy`] instead,
61/// which waits for both writes to complete.
62#[derive(Debug, Clone, Copy, Default)]
63pub struct RaceWritePolicy {
64 /// Policy for handling the losing future.
65 loser_policy: RaceLoserPolicy,
66}
67
68impl RaceWritePolicy {
69 /// Create a new race write policy with default settings (offload losers).
70 pub fn new() -> Self {
71 Self::default()
72 }
73
74 /// Set the policy for handling losing futures.
75 pub fn loser_policy(mut self, policy: RaceLoserPolicy) -> Self {
76 self.loser_policy = policy;
77 self
78 }
79}
80
81#[async_trait]
82impl CompositionWritePolicy for RaceWritePolicy {
83 #[tracing::instrument(skip(self, key, write_l1, write_l2, offload), level = "trace")]
84 async fn execute_with<F1, F2, Fut1, Fut2, O>(
85 &self,
86 key: CacheKey,
87 write_l1: F1,
88 write_l2: F2,
89 offload: &O,
90 ) -> Result<(), BackendError>
91 where
92 F1: FnOnce(CacheKey) -> Fut1 + Send,
93 F2: FnOnce(CacheKey) -> Fut2 + Send,
94 Fut1: Future<Output = Result<(), BackendError>> + Send + 'static,
95 Fut2: Future<Output = Result<(), BackendError>> + Send + 'static,
96 O: Offload<'static>,
97 {
98 // Box futures so we can move them to offload if needed
99 let l1_fut = Box::pin(write_l1(key.clone()));
100 let l2_fut = Box::pin(write_l2(key));
101
102 // Race both futures
103 match select(l1_fut, l2_fut).await {
104 Either::Left((l1_result, l2_fut)) => {
105 // L1 completed first
106 match l1_result {
107 Ok(()) => {
108 // L1 succeeded - handle losing L2 future based on policy
109 tracing::trace!("L1 write succeeded (won race)");
110 match self.loser_policy {
111 RaceLoserPolicy::Offload => {
112 offload.register(
113 hitbox_core::OffloadKey::auto("race_write_l2_loser"),
114 async move {
115 let _ = l2_fut.await;
116 },
117 );
118 }
119 RaceLoserPolicy::Drop => {
120 drop(l2_fut);
121 }
122 }
123 Ok(())
124 }
125 Err(e1) => {
126 // L1 failed - must wait for L2
127 tracing::trace!("L1 write failed, waiting for L2");
128 match l2_fut.await {
129 Ok(()) => {
130 tracing::trace!("L2 write succeeded after L1 failure");
131 Ok(())
132 }
133 Err(e2) => {
134 tracing::error!(
135 l1_error = ?e1,
136 l2_error = ?e2,
137 "Both L1 and L2 writes failed"
138 );
139 Err(BackendError::InternalError(Box::new(
140 CompositionError::BothLayersFailed { l1: e1, l2: e2 },
141 )))
142 }
143 }
144 }
145 }
146 }
147 Either::Right((l2_result, l1_fut)) => {
148 // L2 completed first
149 match l2_result {
150 Ok(()) => {
151 // L2 succeeded - handle losing L1 future based on policy
152 tracing::trace!("L2 write succeeded (won race)");
153 match self.loser_policy {
154 RaceLoserPolicy::Offload => {
155 offload.register(
156 hitbox_core::OffloadKey::auto("race_write_l1_loser"),
157 async move {
158 let _ = l1_fut.await;
159 },
160 );
161 }
162 RaceLoserPolicy::Drop => {
163 drop(l1_fut);
164 }
165 }
166 Ok(())
167 }
168 Err(e2) => {
169 // L2 failed - must wait for L1
170 tracing::trace!("L2 write failed, waiting for L1");
171 match l1_fut.await {
172 Ok(()) => {
173 tracing::trace!("L1 write succeeded after L2 failure");
174 Ok(())
175 }
176 Err(e1) => {
177 tracing::error!(
178 l1_error = ?e1,
179 l2_error = ?e2,
180 "Both L1 and L2 writes failed"
181 );
182 Err(BackendError::InternalError(Box::new(
183 CompositionError::BothLayersFailed { l1: e1, l2: e2 },
184 )))
185 }
186 }
187 }
188 }
189 }
190 }
191 }
192}