Skip to main content

common/coordinator/
handle.rs

1use super::{BroadcastedView, WriteCommand};
2use super::{Delta, Durability, WriteError, WriteResult};
3use crate::StorageRead;
4use crate::coordinator::traits::EpochStamped;
5use crate::storage::StorageSnapshot;
6use futures::FutureExt;
7use futures::future::Shared;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::{broadcast, mpsc, oneshot, watch};
11
12/// A point-in-time view of all applied writes, broadcast by the coordinator
13/// on freeze and flush events.
14///
15/// Each field corresponds to a stage in the write pipeline, ordered by
16/// increasing durability:
17///
18/// - `current` — the active delta, still accepting writes (Applied).
19/// - `frozen` — deltas that have been sealed but not yet flushed to storage
20///   (Applied, ordered newest-first).
21/// - `snapshot` — the storage snapshot, updated after each flush. Contains
22///   all data up through the most recently written delta.
23/// - `last_written_delta` — the most recently written delta (Written). Data
24///   has been written to storage but not necessarily synced to disk. A
25///   separate `FlushStorage` step advances the durable watermark.
26pub struct View<D: Delta> {
27    pub current: D::DeltaView,
28    pub frozen: Vec<EpochStamped<D::FrozenView>>,
29    pub snapshot: Arc<dyn StorageSnapshot>,
30    pub last_written_delta: Option<EpochStamped<D::FrozenView>>,
31}
32
33impl<D: Delta> Clone for View<D> {
34    fn clone(&self) -> Self {
35        Self {
36            current: self.current.clone(),
37            frozen: self.frozen.clone(),
38            snapshot: self.snapshot.clone(),
39            last_written_delta: self.last_written_delta.clone(),
40        }
41    }
42}
43
44/// Receivers for durability watermark updates.
45///
46/// Each receiver tracks the highest epoch that has reached the corresponding
47/// [`Durability`] level. See [`Durability`] for details on each level.
48#[derive(Clone)]
49pub struct EpochWatcher {
50    pub applied_rx: watch::Receiver<u64>,
51    pub written_rx: watch::Receiver<u64>,
52    pub durable_rx: watch::Receiver<u64>,
53}
54
55impl EpochWatcher {
56    /// Waits until the given epoch has reached the specified durability level.
57    ///
58    /// Returns `Err` if the corresponding [`EpochWatermarks`](super::EpochWatermarks)
59    /// was dropped (i.e. the writer shut down).
60    pub async fn wait(
61        &mut self,
62        epoch: u64,
63        durability: Durability,
64    ) -> Result<(), watch::error::RecvError> {
65        let rx = match durability {
66            Durability::Applied => &mut self.applied_rx,
67            Durability::Written => &mut self.written_rx,
68            Durability::Durable => &mut self.durable_rx,
69        };
70        rx.wait_for(|curr| *curr >= epoch).await.map(|_| ())
71    }
72}
73
74/// Successful write application with its assigned epoch.
75#[derive(Clone, Debug)]
76pub(crate) struct WriteApplied<M> {
77    pub epoch: u64,
78    pub result: M,
79}
80
81/// Failed write application with its assigned epoch.
82#[derive(Clone, Debug)]
83pub(crate) struct WriteFailed {
84    pub epoch: u64,
85    pub error: String,
86}
87
88/// Result payload sent through the oneshot channel for a write or flush.
89pub(crate) type EpochResult<M> = Result<WriteApplied<M>, WriteFailed>;
90
91/// Handle returned from a write or flush operation.
92///
93/// Provides the epoch assigned to the operation, the apply result (for writes),
94/// and allows waiting for the operation to reach a desired durability level.
95pub struct WriteHandle<M: Clone + Send + 'static = ()> {
96    inner: Shared<oneshot::Receiver<EpochResult<M>>>,
97    watchers: EpochWatcher,
98}
99
100impl<M: Clone + Send + 'static> WriteHandle<M> {
101    pub(crate) fn new(rx: oneshot::Receiver<EpochResult<M>>, watchers: EpochWatcher) -> Self {
102        Self {
103            inner: rx.shared(),
104            watchers,
105        }
106    }
107
108    async fn recv(&self) -> WriteResult<WriteApplied<M>> {
109        self.inner
110            .clone()
111            .await
112            .map_err(|_| WriteError::Shutdown)?
113            .map_err(|e| WriteError::ApplyError(e.epoch, e.error))
114    }
115
116    /// Returns the epoch assigned to this write.
117    ///
118    /// Epochs are assigned when the coordinator dequeues the write, so this
119    /// method blocks until sequencing completes. Epochs are monotonically
120    /// increasing and reflect the actual write order.
121    pub async fn epoch(&self) -> WriteResult<u64> {
122        Ok(self.recv().await?.epoch)
123    }
124
125    /// Wait for the write to reach the specified durability level.
126    ///
127    /// Returns the apply result produced by [`Delta::apply`] once the
128    /// requested durability level has been reached.
129    pub async fn wait(&mut self, durability: Durability) -> WriteResult<M> {
130        let WriteApplied { epoch, result } = self.recv().await?;
131
132        self.watchers
133            .wait(epoch, durability)
134            .await
135            .map_err(|_| WriteError::Shutdown)?;
136        Ok(result)
137    }
138}
139
140/// Handle for submitting writes to the coordinator.
141///
142/// This is the main interface for interacting with the write coordinator.
143/// It can be cloned and shared across tasks.
144pub struct WriteCoordinatorHandle<D: Delta> {
145    write_tx: mpsc::Sender<WriteCommand<D>>,
146    watchers: EpochWatcher,
147    view: Arc<BroadcastedView<D>>,
148}
149
150impl<D: Delta> WriteCoordinatorHandle<D> {
151    pub(crate) fn new(
152        write_tx: mpsc::Sender<WriteCommand<D>>,
153        watchers: EpochWatcher,
154        view: Arc<BroadcastedView<D>>,
155    ) -> Self {
156        Self {
157            write_tx,
158            watchers,
159            view,
160        }
161    }
162
163    /// Returns the highest epoch that has been flushed to storage.
164    ///
165    /// This is a non-blocking snapshot of the current flushed watermark.
166    /// Returns 0 if no data has been flushed yet.
167    pub fn flushed_epoch(&self) -> u64 {
168        *self.watchers.written_rx.borrow()
169    }
170}
171
172impl<D: Delta> WriteCoordinatorHandle<D> {
173    /// Submit a write to the coordinator with a timeout.
174    ///
175    /// Unlike [`write`](Self::try_write), which fails immediately when the queue
176    /// is full, this method waits up to `timeout` for space.
177    ///
178    /// # Errors
179    ///
180    /// - [`WriteError::TimeoutError`] — the queue remained full for the
181    ///   entire duration. Contains the original write so it can be retried
182    ///   without cloning.
183    /// - [`WriteError::Shutdown`] — the coordinator has stopped.
184    pub async fn write_timeout(
185        &self,
186        write: D::Write,
187        timeout: Duration,
188    ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
189        let (tx, rx) = oneshot::channel();
190        self.write_tx
191            .send_timeout(
192                WriteCommand::Write {
193                    write,
194                    result_tx: tx,
195                },
196                timeout,
197            )
198            .await
199            .map_err(|e| match e {
200                mpsc::error::SendTimeoutError::Timeout(WriteCommand::Write { write, .. }) => {
201                    WriteError::TimeoutError(write)
202                }
203                mpsc::error::SendTimeoutError::Closed(WriteCommand::Write { write, .. }) => {
204                    WriteError::Shutdown
205                }
206                _ => unreachable!("sent a Write command"),
207            })?;
208
209        Ok(WriteHandle::new(rx, self.watchers.clone()))
210    }
211
212    /// Submit a write to the coordinator, blocking indefinitely until there is
213    /// room in the channel.
214    ///
215    /// # Errors
216    ///
217    /// - [`WriteError::TimeoutError`] — the queue remained full for the
218    ///   entire duration. Contains the original write so it can be retried
219    ///   without cloning.
220    /// - [`WriteError::Shutdown`] — the coordinator has stopped.
221    pub async fn write(
222        &self,
223        write: D::Write,
224    ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
225        let (tx, rx) = oneshot::channel();
226        self.write_tx
227            .send(WriteCommand::Write {
228                write,
229                result_tx: tx,
230            })
231            .await
232            .map_err(|e| match e {
233                mpsc::error::SendError(WriteCommand::Write { write, .. }) => WriteError::Shutdown,
234                _ => unreachable!("sent a Write command"),
235            })?;
236
237        Ok(WriteHandle::new(rx, self.watchers.clone()))
238    }
239
240    /// Submit a write to the coordinator.
241    ///
242    /// Returns a handle that can be used to retrieve the apply result
243    /// and wait for the write to reach a desired durability level. On
244    /// failure the original write is returned inside the error so it
245    /// can be retried without cloning.
246    pub async fn try_write(
247        &self,
248        write: D::Write,
249    ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
250        let (tx, rx) = oneshot::channel();
251        self.write_tx
252            .try_send(WriteCommand::Write {
253                write,
254                result_tx: tx,
255            })
256            .map_err(|e| match e {
257                mpsc::error::TrySendError::Full(WriteCommand::Write { write, .. }) => {
258                    WriteError::Backpressure(write)
259                }
260                mpsc::error::TrySendError::Closed(WriteCommand::Write { write, .. }) => {
261                    WriteError::Shutdown
262                }
263                _ => unreachable!("sent a Write command"),
264            })?;
265
266        Ok(WriteHandle::new(rx, self.watchers.clone()))
267    }
268
269    /// Request a flush of the current delta.
270    ///
271    /// This will trigger a flush even if the flush threshold has not been reached.
272    /// When `flush_storage` is true, the flush will also call `storage.flush()`
273    /// to guarantee durability, and the durable watermark will be advanced.
274    /// Returns a handle that can be used to wait for the flush to complete.
275    pub async fn flush(&self, flush_storage: bool) -> WriteResult<WriteHandle> {
276        let (tx, rx) = oneshot::channel();
277        self.write_tx
278            .try_send(WriteCommand::Flush {
279                epoch_tx: tx,
280                flush_storage,
281            })
282            .map_err(|e| match e {
283                mpsc::error::TrySendError::Full(_) => WriteError::Backpressure(()),
284                mpsc::error::TrySendError::Closed(_) => WriteError::Shutdown,
285            })?;
286
287        Ok(WriteHandle::new(rx, self.watchers.clone()))
288    }
289
290    pub fn view(&self) -> Arc<View<D>> {
291        self.view.current()
292    }
293
294    pub fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
295        self.view.subscribe()
296    }
297}
298
299impl<D: Delta> Clone for WriteCoordinatorHandle<D> {
300    fn clone(&self) -> Self {
301        Self {
302            write_tx: self.write_tx.clone(),
303            watchers: self.watchers.clone(),
304            view: self.view.clone(),
305        }
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use tokio::sync::watch;
313
314    fn create_watchers(
315        applied: watch::Receiver<u64>,
316        flushed: watch::Receiver<u64>,
317        durable: watch::Receiver<u64>,
318    ) -> EpochWatcher {
319        EpochWatcher {
320            applied_rx: applied,
321            written_rx: flushed,
322            durable_rx: durable,
323        }
324    }
325
326    #[tokio::test]
327    async fn should_return_epoch_when_assigned() {
328        // given
329        let (tx, rx) = oneshot::channel();
330        let (_applied_tx, applied_rx) = watch::channel(0u64);
331        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
332        let (_durable_tx, durable_rx) = watch::channel(0u64);
333        let handle: WriteHandle<()> =
334            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
335
336        // when
337        tx.send(Ok(WriteApplied {
338            epoch: 42,
339            result: (),
340        }))
341        .unwrap();
342        let result = handle.epoch().await;
343
344        // then
345        assert!(result.is_ok());
346        assert_eq!(result.unwrap(), 42);
347    }
348
349    #[tokio::test]
350    async fn should_allow_multiple_epoch_calls() {
351        // given
352        let (tx, rx) = oneshot::channel();
353        let (_applied_tx, applied_rx) = watch::channel(0u64);
354        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
355        let (_durable_tx, durable_rx) = watch::channel(0u64);
356        let handle: WriteHandle<()> =
357            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
358        tx.send(Ok(WriteApplied {
359            epoch: 42,
360            result: (),
361        }))
362        .unwrap();
363
364        // when
365        let result1 = handle.epoch().await;
366        let result2 = handle.epoch().await;
367        let result3 = handle.epoch().await;
368
369        // then
370        assert_eq!(result1.unwrap(), 42);
371        assert_eq!(result2.unwrap(), 42);
372        assert_eq!(result3.unwrap(), 42);
373    }
374
375    #[tokio::test]
376    async fn should_return_apply_result_from_wait() {
377        // given
378        let (tx, rx) = oneshot::channel();
379        let (_applied_tx, applied_rx) = watch::channel(100u64);
380        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
381        let (_durable_tx, durable_rx) = watch::channel(0u64);
382        let mut handle: WriteHandle<String> =
383            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
384
385        // when
386        tx.send(Ok(WriteApplied {
387            epoch: 1,
388            result: "hello".to_string(),
389        }))
390        .unwrap();
391
392        // then
393        assert_eq!(handle.wait(Durability::Applied).await.unwrap(), "hello");
394    }
395
396    #[tokio::test]
397    async fn should_return_immediately_when_watermark_already_reached() {
398        // given
399        let (tx, rx) = oneshot::channel();
400        let (_applied_tx, applied_rx) = watch::channel(100u64); // watermark already at 100
401        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
402        let (_durable_tx, durable_rx) = watch::channel(0u64);
403        let mut handle: WriteHandle<()> =
404            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
405        tx.send(Ok(WriteApplied {
406            epoch: 50,
407            result: (),
408        }))
409        .unwrap(); // epoch is 50, watermark is 100
410
411        // when
412        let result = handle.wait(Durability::Applied).await;
413
414        // then
415        assert!(result.is_ok());
416    }
417
418    #[tokio::test]
419    async fn should_wait_until_watermark_reaches_epoch() {
420        // given
421        let (tx, rx) = oneshot::channel();
422        let (applied_tx, applied_rx) = watch::channel(0u64);
423        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
424        let (_durable_tx, durable_rx) = watch::channel(0u64);
425        let mut handle: WriteHandle<()> =
426            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
427        tx.send(Ok(WriteApplied {
428            epoch: 10,
429            result: (),
430        }))
431        .unwrap();
432
433        // when - spawn a task to update the watermark after a delay
434        let wait_task = tokio::spawn(async move { handle.wait(Durability::Applied).await });
435
436        tokio::task::yield_now().await;
437        applied_tx.send(5).unwrap(); // still below epoch
438        tokio::task::yield_now().await;
439        applied_tx.send(10).unwrap(); // reaches epoch
440
441        let result = wait_task.await.unwrap();
442
443        // then
444        assert!(result.is_ok());
445    }
446
447    #[tokio::test]
448    async fn should_wait_for_correct_durability_level() {
449        // given - set up watchers with different values
450        let (tx, rx) = oneshot::channel();
451        let (_applied_tx, applied_rx) = watch::channel(100u64);
452        let (_flushed_tx, flushed_rx) = watch::channel(50u64);
453        let (durable_tx, durable_rx) = watch::channel(10u64);
454        let mut handle: WriteHandle<()> =
455            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
456        tx.send(Ok(WriteApplied {
457            epoch: 25,
458            result: (),
459        }))
460        .unwrap();
461
462        // when - wait for Durable (watermark is 10, epoch is 25)
463        let wait_task = tokio::spawn(async move { handle.wait(Durability::Durable).await });
464
465        tokio::task::yield_now().await;
466        durable_tx.send(25).unwrap(); // update durable watermark
467
468        let result = wait_task.await.unwrap();
469
470        // then
471        assert!(result.is_ok());
472    }
473
474    #[tokio::test]
475    async fn should_propagate_epoch_error_in_wait() {
476        // given
477        let (tx, rx) = oneshot::channel::<EpochResult<()>>();
478        let (_applied_tx, applied_rx) = watch::channel(0u64);
479        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
480        let (_durable_tx, durable_rx) = watch::channel(0u64);
481        let mut handle = WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
482
483        // when - drop the sender without sending
484        drop(tx);
485        let result = handle.wait(Durability::Applied).await;
486
487        // then
488        assert!(matches!(result, Err(WriteError::Shutdown)));
489    }
490
491    #[tokio::test]
492    async fn should_propagate_apply_error_in_wait() {
493        // given
494        let (tx, rx) = oneshot::channel();
495        let (_applied_tx, applied_rx) = watch::channel(0u64);
496        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
497        let (_durable_tx, durable_rx) = watch::channel(0u64);
498        let mut handle: WriteHandle<()> =
499            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
500
501        // when - send an error
502        tx.send(Err(WriteFailed {
503            epoch: 1,
504            error: "apply error".into(),
505        }))
506        .unwrap();
507        let result = handle.wait(Durability::Applied).await;
508
509        // then
510        assert!(
511            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
512        );
513    }
514
515    #[tokio::test]
516    async fn epoch_watcher_should_resolve_when_watermark_reached() {
517        // given
518        let (applied_tx, applied_rx) = watch::channel(0u64);
519        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
520        let (_durable_tx, durable_rx) = watch::channel(0u64);
521        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
522
523        // when
524        let wait_task = tokio::spawn(async move { watcher.wait(5, Durability::Applied).await });
525        tokio::task::yield_now().await;
526        applied_tx.send(5).unwrap();
527
528        // then
529        assert!(wait_task.await.unwrap().is_ok());
530    }
531
532    #[tokio::test]
533    async fn epoch_watcher_should_resolve_immediately_when_already_reached() {
534        // given
535        let (_applied_tx, applied_rx) = watch::channel(10u64);
536        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
537        let (_durable_tx, durable_rx) = watch::channel(0u64);
538        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
539
540        // when/then
541        assert!(watcher.wait(5, Durability::Applied).await.is_ok());
542    }
543
544    #[tokio::test]
545    async fn epoch_watcher_should_select_correct_durability_receiver() {
546        // given
547        let (_applied_tx, applied_rx) = watch::channel(0u64);
548        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
549        let (durable_tx, durable_rx) = watch::channel(0u64);
550        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
551
552        // when
553        let wait_task = tokio::spawn(async move { watcher.wait(3, Durability::Durable).await });
554        tokio::task::yield_now().await;
555        durable_tx.send(3).unwrap();
556
557        // then
558        assert!(wait_task.await.unwrap().is_ok());
559    }
560
561    #[tokio::test]
562    async fn epoch_watcher_should_return_error_on_sender_drop() {
563        // given
564        let (applied_tx, applied_rx) = watch::channel(0u64);
565        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
566        let (_durable_tx, durable_rx) = watch::channel(0u64);
567        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
568
569        // when
570        drop(applied_tx);
571        let result = watcher.wait(1, Durability::Applied).await;
572
573        // then
574        assert!(result.is_err());
575    }
576}