Skip to main content

common/coordinator/
handle.rs

1use super::WriteCommand;
2use super::{Delta, Durability, WriteError, WriteResult};
3use crate::StorageRead;
4use crate::coordinator::traits::EpochStamped;
5use crate::storage::StorageSnapshot;
6use futures::FutureExt;
7use futures::future::Shared;
8use std::sync::Arc;
9use tokio::sync::{broadcast, mpsc, oneshot, watch};
10
11/// The current view of all applied writes. The view is made up a reader for
12/// the current delta, all frozen deltas awaiting flush, and the current
13/// storage snapshot. Readers use the view to query the written data.
14pub struct View<D: Delta> {
15    pub current: D::DeltaView,
16    pub frozen: Vec<EpochStamped<D::FrozenView>>,
17    pub snapshot: Arc<dyn StorageSnapshot>,
18    pub last_flushed_delta: Option<EpochStamped<D::FrozenView>>,
19}
20
21impl<D: Delta> Clone for View<D> {
22    fn clone(&self) -> Self {
23        Self {
24            current: self.current.clone(),
25            frozen: self.frozen.clone(),
26            snapshot: self.snapshot.clone(),
27            last_flushed_delta: self.last_flushed_delta.clone(),
28        }
29    }
30}
31
32/// Receivers for durability watermark updates.
33///
34/// Each receiver tracks the highest epoch that has reached the corresponding
35/// [`Durability`] level. See [`Durability`] for details on each level.
36#[derive(Clone)]
37pub(crate) struct EpochWatcher {
38    pub applied_rx: watch::Receiver<u64>,
39    pub flushed_rx: watch::Receiver<u64>,
40    pub durable_rx: watch::Receiver<u64>,
41}
42
43/// Successful write application with its assigned epoch.
44#[derive(Clone, Debug)]
45pub(crate) struct WriteApplied<M> {
46    pub epoch: u64,
47    pub result: M,
48}
49
50/// Failed write application with its assigned epoch.
51#[derive(Clone, Debug)]
52pub(crate) struct WriteFailed {
53    pub epoch: u64,
54    pub error: String,
55}
56
57/// Result payload sent through the oneshot channel for a write or flush.
58pub(crate) type EpochResult<M> = Result<WriteApplied<M>, WriteFailed>;
59
60/// Handle returned from a write or flush operation.
61///
62/// Provides the epoch assigned to the operation, the apply result (for writes),
63/// and allows waiting for the operation to reach a desired durability level.
64pub struct WriteHandle<M: Clone + Send + 'static = ()> {
65    inner: Shared<oneshot::Receiver<EpochResult<M>>>,
66    watchers: EpochWatcher,
67}
68
69impl<M: Clone + Send + 'static> WriteHandle<M> {
70    pub(crate) fn new(rx: oneshot::Receiver<EpochResult<M>>, watchers: EpochWatcher) -> Self {
71        Self {
72            inner: rx.shared(),
73            watchers,
74        }
75    }
76
77    async fn recv(&self) -> WriteResult<WriteApplied<M>> {
78        self.inner
79            .clone()
80            .await
81            .map_err(|_| WriteError::Shutdown)?
82            .map_err(|e| WriteError::ApplyError(e.epoch, e.error))
83    }
84
85    /// Returns the epoch assigned to this write.
86    ///
87    /// Epochs are assigned when the coordinator dequeues the write, so this
88    /// method blocks until sequencing completes. Epochs are monotonically
89    /// increasing and reflect the actual write order.
90    #[cfg(test)]
91    pub async fn epoch(&self) -> WriteResult<u64> {
92        Ok(self.recv().await?.epoch)
93    }
94
95    /// Wait for the write to reach the specified durability level.
96    ///
97    /// Returns the apply result produced by [`Delta::apply`] once the
98    /// requested durability level has been reached.
99    pub async fn wait(&mut self, durability: Durability) -> WriteResult<M> {
100        let WriteApplied { epoch, result } = self.recv().await?;
101
102        let recv = match durability {
103            Durability::Applied => &mut self.watchers.applied_rx,
104            Durability::Flushed => &mut self.watchers.flushed_rx,
105            Durability::Durable => &mut self.watchers.durable_rx,
106        };
107
108        recv.wait_for(|curr| *curr >= epoch)
109            .await
110            .map_err(|_| WriteError::Shutdown)?;
111        Ok(result)
112    }
113}
114
115/// Handle for submitting writes to the coordinator.
116///
117/// This is the main interface for interacting with the write coordinator.
118/// It can be cloned and shared across tasks.
119pub struct WriteCoordinatorHandle<D: Delta> {
120    write_tx: mpsc::Sender<WriteCommand<D>>,
121    watchers: EpochWatcher,
122}
123
124impl<D: Delta> WriteCoordinatorHandle<D> {
125    pub(crate) fn new(write_tx: mpsc::Sender<WriteCommand<D>>, watchers: EpochWatcher) -> Self {
126        Self { write_tx, watchers }
127    }
128}
129
130impl<D: Delta> WriteCoordinatorHandle<D> {
131    /// Submit a write to the coordinator.
132    ///
133    /// Returns a handle that can be used to retrieve the apply result
134    /// and wait for the write to reach a desired durability level.
135    pub async fn write(&self, write: D::Write) -> WriteResult<WriteHandle<D::ApplyResult>> {
136        let (tx, rx) = oneshot::channel();
137        self.write_tx
138            .try_send(WriteCommand::Write {
139                write,
140                result_tx: tx,
141            })
142            .map_err(|e| match e {
143                mpsc::error::TrySendError::Full(_) => WriteError::Backpressure,
144                mpsc::error::TrySendError::Closed(_) => WriteError::Shutdown,
145            })?;
146
147        Ok(WriteHandle::new(rx, self.watchers.clone()))
148    }
149
150    /// Request a flush of the current delta.
151    ///
152    /// This will trigger a flush even if the flush threshold has not been reached.
153    /// When `flush_storage` is true, the flush will also call `storage.flush()`
154    /// to guarantee durability, and the durable watermark will be advanced.
155    /// Returns a handle that can be used to wait for the flush to complete.
156    pub async fn flush(&self, flush_storage: bool) -> WriteResult<WriteHandle> {
157        let (tx, rx) = oneshot::channel();
158        self.write_tx
159            .try_send(WriteCommand::Flush {
160                epoch_tx: tx,
161                flush_storage,
162            })
163            .map_err(|e| match e {
164                mpsc::error::TrySendError::Full(_) => WriteError::Backpressure,
165                mpsc::error::TrySendError::Closed(_) => WriteError::Shutdown,
166            })?;
167
168        Ok(WriteHandle::new(rx, self.watchers.clone()))
169    }
170}
171
172impl<D: Delta> Clone for WriteCoordinatorHandle<D> {
173    fn clone(&self) -> Self {
174        Self {
175            write_tx: self.write_tx.clone(),
176            watchers: self.watchers.clone(),
177        }
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use tokio::sync::watch;
185
186    fn create_watchers(
187        applied: watch::Receiver<u64>,
188        flushed: watch::Receiver<u64>,
189        durable: watch::Receiver<u64>,
190    ) -> EpochWatcher {
191        EpochWatcher {
192            applied_rx: applied,
193            flushed_rx: flushed,
194            durable_rx: durable,
195        }
196    }
197
198    #[tokio::test]
199    async fn should_return_epoch_when_assigned() {
200        // given
201        let (tx, rx) = oneshot::channel();
202        let (_applied_tx, applied_rx) = watch::channel(0u64);
203        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
204        let (_durable_tx, durable_rx) = watch::channel(0u64);
205        let handle: WriteHandle<()> =
206            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
207
208        // when
209        tx.send(Ok(WriteApplied {
210            epoch: 42,
211            result: (),
212        }))
213        .unwrap();
214        let result = handle.epoch().await;
215
216        // then
217        assert!(result.is_ok());
218        assert_eq!(result.unwrap(), 42);
219    }
220
221    #[tokio::test]
222    async fn should_allow_multiple_epoch_calls() {
223        // given
224        let (tx, rx) = oneshot::channel();
225        let (_applied_tx, applied_rx) = watch::channel(0u64);
226        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
227        let (_durable_tx, durable_rx) = watch::channel(0u64);
228        let handle: WriteHandle<()> =
229            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
230        tx.send(Ok(WriteApplied {
231            epoch: 42,
232            result: (),
233        }))
234        .unwrap();
235
236        // when
237        let result1 = handle.epoch().await;
238        let result2 = handle.epoch().await;
239        let result3 = handle.epoch().await;
240
241        // then
242        assert_eq!(result1.unwrap(), 42);
243        assert_eq!(result2.unwrap(), 42);
244        assert_eq!(result3.unwrap(), 42);
245    }
246
247    #[tokio::test]
248    async fn should_return_apply_result_from_wait() {
249        // given
250        let (tx, rx) = oneshot::channel();
251        let (_applied_tx, applied_rx) = watch::channel(100u64);
252        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
253        let (_durable_tx, durable_rx) = watch::channel(0u64);
254        let mut handle: WriteHandle<String> =
255            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
256
257        // when
258        tx.send(Ok(WriteApplied {
259            epoch: 1,
260            result: "hello".to_string(),
261        }))
262        .unwrap();
263
264        // then
265        assert_eq!(handle.wait(Durability::Applied).await.unwrap(), "hello");
266    }
267
268    #[tokio::test]
269    async fn should_return_immediately_when_watermark_already_reached() {
270        // given
271        let (tx, rx) = oneshot::channel();
272        let (_applied_tx, applied_rx) = watch::channel(100u64); // watermark already at 100
273        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
274        let (_durable_tx, durable_rx) = watch::channel(0u64);
275        let mut handle: WriteHandle<()> =
276            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
277        tx.send(Ok(WriteApplied {
278            epoch: 50,
279            result: (),
280        }))
281        .unwrap(); // epoch is 50, watermark is 100
282
283        // when
284        let result = handle.wait(Durability::Applied).await;
285
286        // then
287        assert!(result.is_ok());
288    }
289
290    #[tokio::test]
291    async fn should_wait_until_watermark_reaches_epoch() {
292        // given
293        let (tx, rx) = oneshot::channel();
294        let (applied_tx, applied_rx) = watch::channel(0u64);
295        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
296        let (_durable_tx, durable_rx) = watch::channel(0u64);
297        let mut handle: WriteHandle<()> =
298            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
299        tx.send(Ok(WriteApplied {
300            epoch: 10,
301            result: (),
302        }))
303        .unwrap();
304
305        // when - spawn a task to update the watermark after a delay
306        let wait_task = tokio::spawn(async move { handle.wait(Durability::Applied).await });
307
308        tokio::task::yield_now().await;
309        applied_tx.send(5).unwrap(); // still below epoch
310        tokio::task::yield_now().await;
311        applied_tx.send(10).unwrap(); // reaches epoch
312
313        let result = wait_task.await.unwrap();
314
315        // then
316        assert!(result.is_ok());
317    }
318
319    #[tokio::test]
320    async fn should_wait_for_correct_durability_level() {
321        // given - set up watchers with different values
322        let (tx, rx) = oneshot::channel();
323        let (_applied_tx, applied_rx) = watch::channel(100u64);
324        let (_flushed_tx, flushed_rx) = watch::channel(50u64);
325        let (durable_tx, durable_rx) = watch::channel(10u64);
326        let mut handle: WriteHandle<()> =
327            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
328        tx.send(Ok(WriteApplied {
329            epoch: 25,
330            result: (),
331        }))
332        .unwrap();
333
334        // when - wait for Durable (watermark is 10, epoch is 25)
335        let wait_task = tokio::spawn(async move { handle.wait(Durability::Durable).await });
336
337        tokio::task::yield_now().await;
338        durable_tx.send(25).unwrap(); // update durable watermark
339
340        let result = wait_task.await.unwrap();
341
342        // then
343        assert!(result.is_ok());
344    }
345
346    #[tokio::test]
347    async fn should_propagate_epoch_error_in_wait() {
348        // given
349        let (tx, rx) = oneshot::channel::<EpochResult<()>>();
350        let (_applied_tx, applied_rx) = watch::channel(0u64);
351        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
352        let (_durable_tx, durable_rx) = watch::channel(0u64);
353        let mut handle = WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
354
355        // when - drop the sender without sending
356        drop(tx);
357        let result = handle.wait(Durability::Applied).await;
358
359        // then
360        assert!(matches!(result, Err(WriteError::Shutdown)));
361    }
362
363    #[tokio::test]
364    async fn should_propagate_apply_error_in_wait() {
365        // given
366        let (tx, rx) = oneshot::channel();
367        let (_applied_tx, applied_rx) = watch::channel(0u64);
368        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
369        let (_durable_tx, durable_rx) = watch::channel(0u64);
370        let mut handle: WriteHandle<()> =
371            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
372
373        // when - send an error
374        tx.send(Err(WriteFailed {
375            epoch: 1,
376            error: "apply error".into(),
377        }))
378        .unwrap();
379        let result = handle.wait(Durability::Applied).await;
380
381        // then
382        assert!(
383            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
384        );
385    }
386}