Skip to main content

common/coordinator/
handle.rs

1use super::metrics;
2use super::{BroadcastedView, WriteCommand};
3use super::{Delta, Durability, WriteError, WriteResult};
4use crate::StorageRead;
5use crate::coordinator::traits::EpochStamped;
6use crate::storage::StorageSnapshot;
7use futures::FutureExt;
8use futures::future::Shared;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::{broadcast, mpsc, oneshot, watch};
12
13/// A point-in-time view of all applied writes, broadcast by the coordinator
14/// on freeze and flush events.
15///
16/// Each field corresponds to a stage in the write pipeline, ordered by
17/// increasing durability:
18///
19/// - `current` — the active delta, still accepting writes (Applied).
20/// - `frozen` — deltas that have been sealed but not yet flushed to storage
21///   (Applied, ordered newest-first).
22/// - `snapshot` — the storage snapshot, updated after each flush. Contains
23///   all data up through the most recently written delta.
24/// - `last_written_delta` — the most recently written delta (Written). Data
25///   has been written to storage but not necessarily synced to disk. A
26///   separate `FlushStorage` step advances the durable watermark.
27pub struct View<D: Delta> {
28    pub current: D::DeltaView,
29    pub frozen: Vec<EpochStamped<D::FrozenView>>,
30    pub snapshot: Arc<dyn StorageSnapshot>,
31    pub last_written_delta: Option<EpochStamped<D::FrozenView>>,
32}
33
34impl<D: Delta> Clone for View<D> {
35    fn clone(&self) -> Self {
36        Self {
37            current: self.current.clone(),
38            frozen: self.frozen.clone(),
39            snapshot: self.snapshot.clone(),
40            last_written_delta: self.last_written_delta.clone(),
41        }
42    }
43}
44
45/// Receivers for durability watermark updates.
46///
47/// Each receiver tracks the highest epoch that has reached the corresponding
48/// [`Durability`] level. See [`Durability`] for details on each level.
49#[derive(Clone)]
50pub struct EpochWatcher {
51    pub applied_rx: watch::Receiver<u64>,
52    pub written_rx: watch::Receiver<u64>,
53    pub durable_rx: watch::Receiver<u64>,
54}
55
56impl EpochWatcher {
57    /// Waits until the given epoch has reached the specified durability level.
58    ///
59    /// Returns `Err` if the corresponding [`EpochWatermarks`](super::EpochWatermarks)
60    /// was dropped (i.e. the writer shut down).
61    pub async fn wait(
62        &mut self,
63        epoch: u64,
64        durability: Durability,
65    ) -> Result<(), watch::error::RecvError> {
66        let rx = match durability {
67            Durability::Applied => &mut self.applied_rx,
68            Durability::Written => &mut self.written_rx,
69            Durability::Durable => &mut self.durable_rx,
70        };
71        rx.wait_for(|curr| *curr >= epoch).await.map(|_| ())
72    }
73}
74
75/// Successful write application with its assigned epoch.
76#[derive(Clone, Debug)]
77pub(crate) struct WriteApplied<M> {
78    pub epoch: u64,
79    pub result: M,
80}
81
82/// Failed write application with its assigned epoch.
83#[derive(Clone, Debug)]
84pub(crate) struct WriteFailed {
85    pub epoch: u64,
86    pub error: String,
87}
88
89/// Result payload sent through the oneshot channel for a write or flush.
90pub(crate) type EpochResult<M> = Result<WriteApplied<M>, WriteFailed>;
91
92/// Handle returned from a write or flush operation.
93///
94/// Provides the epoch assigned to the operation, the apply result (for writes),
95/// and allows waiting for the operation to reach a desired durability level.
96pub struct WriteHandle<M: Clone + Send + 'static = ()> {
97    inner: Shared<oneshot::Receiver<EpochResult<M>>>,
98    watchers: EpochWatcher,
99}
100
101impl<M: Clone + Send + 'static> WriteHandle<M> {
102    pub(crate) fn new(rx: oneshot::Receiver<EpochResult<M>>, watchers: EpochWatcher) -> Self {
103        Self {
104            inner: rx.shared(),
105            watchers,
106        }
107    }
108
109    async fn recv(&self) -> WriteResult<WriteApplied<M>> {
110        self.inner
111            .clone()
112            .await
113            .map_err(|_| WriteError::Shutdown)?
114            .map_err(|e| WriteError::ApplyError(e.epoch, e.error))
115    }
116
117    /// Returns the epoch assigned to this write.
118    ///
119    /// Epochs are assigned when the coordinator dequeues the write, so this
120    /// method blocks until sequencing completes. Epochs are monotonically
121    /// increasing and reflect the actual write order.
122    pub async fn epoch(&self) -> WriteResult<u64> {
123        Ok(self.recv().await?.epoch)
124    }
125
126    /// Wait for the write to reach the specified durability level.
127    ///
128    /// Returns the apply result produced by [`Delta::apply`] once the
129    /// requested durability level has been reached.
130    pub async fn wait(&mut self, durability: Durability) -> WriteResult<M> {
131        let WriteApplied { epoch, result } = self.recv().await?;
132
133        self.watchers
134            .wait(epoch, durability)
135            .await
136            .map_err(|_| WriteError::Shutdown)?;
137        Ok(result)
138    }
139}
140
141/// Handle for submitting writes to the coordinator.
142///
143/// This is the main interface for interacting with the write coordinator.
144/// It can be cloned and shared across tasks.
145pub struct WriteCoordinatorHandle<D: Delta> {
146    name: Arc<str>,
147    write_tx: mpsc::Sender<WriteCommand<D>>,
148    watchers: EpochWatcher,
149    view: Arc<BroadcastedView<D>>,
150}
151
152impl<D: Delta> WriteCoordinatorHandle<D> {
153    pub(crate) fn new(
154        name: String,
155        write_tx: mpsc::Sender<WriteCommand<D>>,
156        watchers: EpochWatcher,
157        view: Arc<BroadcastedView<D>>,
158    ) -> Self {
159        Self {
160            name: Arc::from(name),
161            write_tx,
162            watchers,
163            view,
164        }
165    }
166
167    /// Returns the highest epoch that has been flushed to storage.
168    ///
169    /// This is a non-blocking snapshot of the current flushed watermark.
170    /// Returns 0 if no data has been flushed yet.
171    pub fn flushed_epoch(&self) -> u64 {
172        *self.watchers.written_rx.borrow()
173    }
174
175    /// Sample queue depth on each send. Cheap atomic reads on `mpsc::Sender`.
176    fn record_queue_depth(&self) {
177        let max = self.write_tx.max_capacity();
178        let free = self.write_tx.capacity();
179        ::metrics::gauge!(
180            metrics::COORDINATOR_QUEUE_DEPTH,
181            "channel" => self.name.to_string(),
182        )
183        .set(max.saturating_sub(free) as f64);
184    }
185
186    fn record_send(&self, command: &'static str, status: &'static str, started: Instant) {
187        ::metrics::histogram!(
188            metrics::COORDINATOR_SEND_DURATION_SECONDS,
189            "command" => command,
190            "status" => status,
191        )
192        .record(started.elapsed().as_secs_f64());
193    }
194
195    fn record_backpressure(&self, command: &'static str, reason: &'static str) {
196        ::metrics::counter!(
197            metrics::COORDINATOR_QUEUE_BACKPRESSURE_TOTAL,
198            "command" => command,
199            "reason" => reason,
200        )
201        .increment(1);
202    }
203}
204
205impl<D: Delta> WriteCoordinatorHandle<D> {
206    /// Submit a write to the coordinator with a timeout.
207    ///
208    /// Unlike [`write`](Self::try_write), which fails immediately when the queue
209    /// is full, this method waits up to `timeout` for space.
210    ///
211    /// # Errors
212    ///
213    /// - [`WriteError::TimeoutError`] — the queue remained full for the
214    ///   entire duration. Contains the original write so it can be retried
215    ///   without cloning.
216    /// - [`WriteError::Shutdown`] — the coordinator has stopped.
217    pub async fn write_timeout(
218        &self,
219        write: D::Write,
220        timeout: Duration,
221    ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
222        const COMMAND: &str = "write_timeout";
223        self.record_queue_depth();
224        let started = Instant::now();
225        let (tx, rx) = oneshot::channel();
226        let send_result = self
227            .write_tx
228            .send_timeout(
229                WriteCommand::Write {
230                    write,
231                    result_tx: tx,
232                },
233                timeout,
234            )
235            .await;
236        match send_result {
237            Ok(()) => {
238                self.record_send(COMMAND, "ok", started);
239                Ok(WriteHandle::new(rx, self.watchers.clone()))
240            }
241            Err(mpsc::error::SendTimeoutError::Timeout(WriteCommand::Write { write, .. })) => {
242                self.record_send(COMMAND, "timeout", started);
243                self.record_backpressure(COMMAND, "timeout");
244                Err(WriteError::TimeoutError(write))
245            }
246            Err(mpsc::error::SendTimeoutError::Closed(WriteCommand::Write { .. })) => {
247                self.record_send(COMMAND, "shutdown", started);
248                self.record_backpressure(COMMAND, "closed");
249                Err(WriteError::Shutdown)
250            }
251            Err(_) => unreachable!("sent a Write command"),
252        }
253    }
254
255    /// Submit a write to the coordinator, blocking indefinitely until there is
256    /// room in the channel.
257    ///
258    /// # Errors
259    ///
260    /// - [`WriteError::TimeoutError`] — the queue remained full for the
261    ///   entire duration. Contains the original write so it can be retried
262    ///   without cloning.
263    /// - [`WriteError::Shutdown`] — the coordinator has stopped.
264    pub async fn write(
265        &self,
266        write: D::Write,
267    ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
268        const COMMAND: &str = "write";
269        self.record_queue_depth();
270        let started = Instant::now();
271        let (tx, rx) = oneshot::channel();
272        let send_result = self
273            .write_tx
274            .send(WriteCommand::Write {
275                write,
276                result_tx: tx,
277            })
278            .await;
279        match send_result {
280            Ok(()) => {
281                self.record_send(COMMAND, "ok", started);
282                Ok(WriteHandle::new(rx, self.watchers.clone()))
283            }
284            Err(mpsc::error::SendError(WriteCommand::Write { .. })) => {
285                self.record_send(COMMAND, "shutdown", started);
286                self.record_backpressure(COMMAND, "closed");
287                Err(WriteError::Shutdown)
288            }
289            Err(_) => unreachable!("sent a Write command"),
290        }
291    }
292
293    /// Submit a write to the coordinator.
294    ///
295    /// Returns a handle that can be used to retrieve the apply result
296    /// and wait for the write to reach a desired durability level. On
297    /// failure the original write is returned inside the error so it
298    /// can be retried without cloning.
299    pub async fn try_write(
300        &self,
301        write: D::Write,
302    ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
303        const COMMAND: &str = "try_write";
304        self.record_queue_depth();
305        let started = Instant::now();
306        let (tx, rx) = oneshot::channel();
307        let send_result = self.write_tx.try_send(WriteCommand::Write {
308            write,
309            result_tx: tx,
310        });
311        match send_result {
312            Ok(()) => {
313                self.record_send(COMMAND, "ok", started);
314                Ok(WriteHandle::new(rx, self.watchers.clone()))
315            }
316            Err(mpsc::error::TrySendError::Full(WriteCommand::Write { write, .. })) => {
317                self.record_send(COMMAND, "backpressure", started);
318                self.record_backpressure(COMMAND, "full");
319                Err(WriteError::Backpressure(write))
320            }
321            Err(mpsc::error::TrySendError::Closed(WriteCommand::Write { .. })) => {
322                self.record_send(COMMAND, "shutdown", started);
323                self.record_backpressure(COMMAND, "closed");
324                Err(WriteError::Shutdown)
325            }
326            Err(_) => unreachable!("sent a Write command"),
327        }
328    }
329
330    /// Request a flush of the current delta.
331    ///
332    /// This will trigger a flush even if the flush threshold has not been reached.
333    /// When `flush_storage` is true, the flush will also call `storage.flush()`
334    /// to guarantee durability, and the durable watermark will be advanced.
335    /// Returns a handle that can be used to wait for the flush to complete.
336    pub async fn flush(&self, flush_storage: bool) -> WriteResult<WriteHandle> {
337        const COMMAND: &str = "flush";
338        self.record_queue_depth();
339        let started = Instant::now();
340        let (tx, rx) = oneshot::channel();
341        let send_result = self.write_tx.try_send(WriteCommand::Flush {
342            epoch_tx: tx,
343            flush_storage,
344        });
345        match send_result {
346            Ok(()) => {
347                self.record_send(COMMAND, "ok", started);
348                Ok(WriteHandle::new(rx, self.watchers.clone()))
349            }
350            Err(mpsc::error::TrySendError::Full(_)) => {
351                self.record_send(COMMAND, "backpressure", started);
352                self.record_backpressure(COMMAND, "full");
353                Err(WriteError::Backpressure(()))
354            }
355            Err(mpsc::error::TrySendError::Closed(_)) => {
356                self.record_send(COMMAND, "shutdown", started);
357                self.record_backpressure(COMMAND, "closed");
358                Err(WriteError::Shutdown)
359            }
360        }
361    }
362
363    pub fn view(&self) -> Arc<View<D>> {
364        self.view.current()
365    }
366
367    pub fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
368        self.view.subscribe()
369    }
370}
371
372impl<D: Delta> Clone for WriteCoordinatorHandle<D> {
373    fn clone(&self) -> Self {
374        Self {
375            name: self.name.clone(),
376            write_tx: self.write_tx.clone(),
377            watchers: self.watchers.clone(),
378            view: self.view.clone(),
379        }
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use tokio::sync::watch;
387
388    fn create_watchers(
389        applied: watch::Receiver<u64>,
390        flushed: watch::Receiver<u64>,
391        durable: watch::Receiver<u64>,
392    ) -> EpochWatcher {
393        EpochWatcher {
394            applied_rx: applied,
395            written_rx: flushed,
396            durable_rx: durable,
397        }
398    }
399
400    #[tokio::test]
401    async fn should_return_epoch_when_assigned() {
402        // given
403        let (tx, rx) = oneshot::channel();
404        let (_applied_tx, applied_rx) = watch::channel(0u64);
405        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
406        let (_durable_tx, durable_rx) = watch::channel(0u64);
407        let handle: WriteHandle<()> =
408            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
409
410        // when
411        tx.send(Ok(WriteApplied {
412            epoch: 42,
413            result: (),
414        }))
415        .unwrap();
416        let result = handle.epoch().await;
417
418        // then
419        assert!(result.is_ok());
420        assert_eq!(result.unwrap(), 42);
421    }
422
423    #[tokio::test]
424    async fn should_allow_multiple_epoch_calls() {
425        // given
426        let (tx, rx) = oneshot::channel();
427        let (_applied_tx, applied_rx) = watch::channel(0u64);
428        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
429        let (_durable_tx, durable_rx) = watch::channel(0u64);
430        let handle: WriteHandle<()> =
431            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
432        tx.send(Ok(WriteApplied {
433            epoch: 42,
434            result: (),
435        }))
436        .unwrap();
437
438        // when
439        let result1 = handle.epoch().await;
440        let result2 = handle.epoch().await;
441        let result3 = handle.epoch().await;
442
443        // then
444        assert_eq!(result1.unwrap(), 42);
445        assert_eq!(result2.unwrap(), 42);
446        assert_eq!(result3.unwrap(), 42);
447    }
448
449    #[tokio::test]
450    async fn should_return_apply_result_from_wait() {
451        // given
452        let (tx, rx) = oneshot::channel();
453        let (_applied_tx, applied_rx) = watch::channel(100u64);
454        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
455        let (_durable_tx, durable_rx) = watch::channel(0u64);
456        let mut handle: WriteHandle<String> =
457            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
458
459        // when
460        tx.send(Ok(WriteApplied {
461            epoch: 1,
462            result: "hello".to_string(),
463        }))
464        .unwrap();
465
466        // then
467        assert_eq!(handle.wait(Durability::Applied).await.unwrap(), "hello");
468    }
469
470    #[tokio::test]
471    async fn should_return_immediately_when_watermark_already_reached() {
472        // given
473        let (tx, rx) = oneshot::channel();
474        let (_applied_tx, applied_rx) = watch::channel(100u64); // watermark already at 100
475        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
476        let (_durable_tx, durable_rx) = watch::channel(0u64);
477        let mut handle: WriteHandle<()> =
478            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
479        tx.send(Ok(WriteApplied {
480            epoch: 50,
481            result: (),
482        }))
483        .unwrap(); // epoch is 50, watermark is 100
484
485        // when
486        let result = handle.wait(Durability::Applied).await;
487
488        // then
489        assert!(result.is_ok());
490    }
491
492    #[tokio::test]
493    async fn should_wait_until_watermark_reaches_epoch() {
494        // given
495        let (tx, rx) = oneshot::channel();
496        let (applied_tx, applied_rx) = watch::channel(0u64);
497        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
498        let (_durable_tx, durable_rx) = watch::channel(0u64);
499        let mut handle: WriteHandle<()> =
500            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
501        tx.send(Ok(WriteApplied {
502            epoch: 10,
503            result: (),
504        }))
505        .unwrap();
506
507        // when - spawn a task to update the watermark after a delay
508        let wait_task = tokio::spawn(async move { handle.wait(Durability::Applied).await });
509
510        tokio::task::yield_now().await;
511        applied_tx.send(5).unwrap(); // still below epoch
512        tokio::task::yield_now().await;
513        applied_tx.send(10).unwrap(); // reaches epoch
514
515        let result = wait_task.await.unwrap();
516
517        // then
518        assert!(result.is_ok());
519    }
520
521    #[tokio::test]
522    async fn should_wait_for_correct_durability_level() {
523        // given - set up watchers with different values
524        let (tx, rx) = oneshot::channel();
525        let (_applied_tx, applied_rx) = watch::channel(100u64);
526        let (_flushed_tx, flushed_rx) = watch::channel(50u64);
527        let (durable_tx, durable_rx) = watch::channel(10u64);
528        let mut handle: WriteHandle<()> =
529            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
530        tx.send(Ok(WriteApplied {
531            epoch: 25,
532            result: (),
533        }))
534        .unwrap();
535
536        // when - wait for Durable (watermark is 10, epoch is 25)
537        let wait_task = tokio::spawn(async move { handle.wait(Durability::Durable).await });
538
539        tokio::task::yield_now().await;
540        durable_tx.send(25).unwrap(); // update durable watermark
541
542        let result = wait_task.await.unwrap();
543
544        // then
545        assert!(result.is_ok());
546    }
547
548    #[tokio::test]
549    async fn should_propagate_epoch_error_in_wait() {
550        // given
551        let (tx, rx) = oneshot::channel::<EpochResult<()>>();
552        let (_applied_tx, applied_rx) = watch::channel(0u64);
553        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
554        let (_durable_tx, durable_rx) = watch::channel(0u64);
555        let mut handle = WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
556
557        // when - drop the sender without sending
558        drop(tx);
559        let result = handle.wait(Durability::Applied).await;
560
561        // then
562        assert!(matches!(result, Err(WriteError::Shutdown)));
563    }
564
565    #[tokio::test]
566    async fn should_propagate_apply_error_in_wait() {
567        // given
568        let (tx, rx) = oneshot::channel();
569        let (_applied_tx, applied_rx) = watch::channel(0u64);
570        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
571        let (_durable_tx, durable_rx) = watch::channel(0u64);
572        let mut handle: WriteHandle<()> =
573            WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
574
575        // when - send an error
576        tx.send(Err(WriteFailed {
577            epoch: 1,
578            error: "apply error".into(),
579        }))
580        .unwrap();
581        let result = handle.wait(Durability::Applied).await;
582
583        // then
584        assert!(
585            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
586        );
587    }
588
589    #[tokio::test]
590    async fn epoch_watcher_should_resolve_when_watermark_reached() {
591        // given
592        let (applied_tx, applied_rx) = watch::channel(0u64);
593        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
594        let (_durable_tx, durable_rx) = watch::channel(0u64);
595        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
596
597        // when
598        let wait_task = tokio::spawn(async move { watcher.wait(5, Durability::Applied).await });
599        tokio::task::yield_now().await;
600        applied_tx.send(5).unwrap();
601
602        // then
603        assert!(wait_task.await.unwrap().is_ok());
604    }
605
606    #[tokio::test]
607    async fn epoch_watcher_should_resolve_immediately_when_already_reached() {
608        // given
609        let (_applied_tx, applied_rx) = watch::channel(10u64);
610        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
611        let (_durable_tx, durable_rx) = watch::channel(0u64);
612        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
613
614        // when/then
615        assert!(watcher.wait(5, Durability::Applied).await.is_ok());
616    }
617
618    #[tokio::test]
619    async fn epoch_watcher_should_select_correct_durability_receiver() {
620        // given
621        let (_applied_tx, applied_rx) = watch::channel(0u64);
622        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
623        let (durable_tx, durable_rx) = watch::channel(0u64);
624        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
625
626        // when
627        let wait_task = tokio::spawn(async move { watcher.wait(3, Durability::Durable).await });
628        tokio::task::yield_now().await;
629        durable_tx.send(3).unwrap();
630
631        // then
632        assert!(wait_task.await.unwrap().is_ok());
633    }
634
635    #[tokio::test]
636    async fn epoch_watcher_should_return_error_on_sender_drop() {
637        // given
638        let (applied_tx, applied_rx) = watch::channel(0u64);
639        let (_flushed_tx, flushed_rx) = watch::channel(0u64);
640        let (_durable_tx, durable_rx) = watch::channel(0u64);
641        let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
642
643        // when
644        drop(applied_tx);
645        let result = watcher.wait(1, Durability::Applied).await;
646
647        // then
648        assert!(result.is_err());
649    }
650}