Skip to main content

common/coordinator/
subscriber.rs

1//! View subscription and progress monitoring.
2//!
3//! When a caller subscribes to a [`WriteCoordinator`](super::WriteCoordinator),
4//! it receives a paired [`ViewSubscriber`] and [`ViewMonitor`]:
5//!
6//! - The **subscriber** receives [`View`] broadcasts from the coordinator
7//!   (on freeze and flush events) and advances epoch watermarks to signal
8//!   how far it has processed.
9//! - The **monitor** lets other tasks wait until the subscriber has reached
10//!   a given epoch at a given durability level.
11//!
12//! Together they form a progress-tracking channel: the subscriber drives
13//! forward, and the monitor synchronizes on that progress.
14//!
15//! # Usage
16//!
17//! ```ignore
18//! // Subscribe to the coordinator.
19//! let (mut subscriber, monitor) = coordinator.subscribe();
20//!
21//! // Spawn a task that processes views as they arrive.
22//! tokio::spawn(async move {
23//!     // Initialize returns the initial view, captured atomically with
24//!     // the subscription. Must be called before recv().
25//!     let initial_view = subscriber.initialize();
26//!     // Bootstrap read state from initial_view ...
27//!
28//!     loop {
29//!         match subscriber.recv().await {
30//!             Ok(view) => {
31//!                 // Process the view (update read state)...
32//!                 subscriber.update_durable(epoch);
33//!             }
34//!             Err(SubscribeError::MessageLost) => {
35//!                 // Re-subscribe to get a consistent initial view, then
36//!                 // recover db-specific state from the snapshot.
37//!                 let (rx, view) = handle.subscribe();
38//!                 (subscriber, _) = ViewSubscriber::new(rx, view);
39//!                 let view = subscriber.initialize();
40//!                 // db.recover_from_view(&view).await?;
41//!             }
42//!             Err(SubscribeError::Shutdown) => break,
43//!             Err(SubscribeError::NotInitialized) => unreachable!(),
44//!         }
45//!     }
46//! });
47//!
48//! // Elsewhere, wait for the subscriber to catch up.
49//! monitor.clone().wait(epoch, Durability::Durable).await?;
50//! ```
51
52use std::sync::Arc;
53
54use tokio::sync::broadcast;
55
56use super::traits::EpochStamped;
57use super::{Delta, Durability, EpochWatcher, EpochWatermarks, View};
58
59/// Error type for subscriber and monitor operations.
60#[derive(Debug)]
61pub enum SubscribeError {
62    /// The coordinator has shut down.
63    Shutdown,
64    /// [`ViewSubscriber::recv()`] was called before [`ViewSubscriber::initialize()`].
65    NotInitialized,
66    /// A message was lost indicating that the subscriber is lagging behind the coordinator's progress.
67    MessageLost,
68}
69
70impl std::fmt::Display for SubscribeError {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            SubscribeError::Shutdown => write!(f, "coordinator shut down"),
74            SubscribeError::NotInitialized => {
75                write!(f, "initialize() must be called before recv()")
76            }
77            SubscribeError::MessageLost => {
78                write!(f, "message was lost, subscriber is lagging behind")
79            }
80        }
81    }
82}
83
84impl std::error::Error for SubscribeError {}
85
86/// Receives [`View`] broadcasts from the coordinator and advances epoch
87/// watermarks to signal progress. Paired with a [`ViewMonitor`] that can
88/// wait on the watermarks this subscriber advances.
89pub struct ViewSubscriber<D: Delta> {
90    view_rx: broadcast::Receiver<Arc<View<D>>>,
91    initial_view: Option<Arc<View<D>>>,
92    watermarks: Arc<EpochWatermarks>,
93}
94
95impl<D: Delta> ViewSubscriber<D> {
96    /// Creates a new `ViewSubscriber` and paired `ViewMonitor`.
97    pub fn new(
98        view_rx: broadcast::Receiver<Arc<View<D>>>,
99        initial_view: Arc<View<D>>,
100    ) -> (Self, ViewMonitor) {
101        let (watermarks, watcher) = EpochWatermarks::new();
102        let watermarks = Arc::new(watermarks);
103        let subscriber = Self {
104            view_rx,
105            initial_view: Some(initial_view),
106            watermarks: watermarks.clone(),
107        };
108        let monitor = ViewMonitor {
109            watcher,
110            watermarks,
111        };
112        (subscriber, monitor)
113    }
114
115    /// Takes the initial view captured at subscription time, marking the
116    /// subscriber as ready to receive broadcasts.
117    ///
118    /// Must be called exactly once before [`recv()`](Self::recv). The initial
119    /// view is captured atomically with the broadcast subscription, so it is
120    /// safe to use even when subscribing to an active writer.
121    pub fn initialize(&mut self) -> Arc<View<D>> {
122        self.initial_view
123            .take()
124            .expect("initialize() must be called exactly once")
125    }
126
127    /// Receives the next view broadcast from the coordinator.
128    ///
129    /// Returns [`SubscribeError::NotInitialized`] if [`initialize()`](Self::initialize)
130    /// has not been called.
131    pub async fn recv(&mut self) -> Result<Arc<View<D>>, SubscribeError> {
132        if self.initial_view.is_some() {
133            return Err(SubscribeError::NotInitialized);
134        }
135        match self.view_rx.recv().await {
136            Ok(view) => Ok(view),
137            Err(broadcast::error::RecvError::Lagged(_)) => Err(SubscribeError::MessageLost), // the subscriber is lagging behind the coordinator's progress.
138            Err(broadcast::error::RecvError::Closed) => Err(SubscribeError::Shutdown),
139        }
140    }
141
142    /// Advances the applied watermark, signaling that the reader has processed
143    /// up through the given epoch.
144    pub fn update_applied(&self, epoch: u64) {
145        self.watermarks.update_applied(epoch);
146    }
147
148    /// Advances the flushed watermark, signaling that the reader has processed
149    /// up through the given epoch.
150    pub fn update_written(&self, epoch: u64) {
151        self.watermarks.update_written(epoch);
152    }
153
154    /// Advances the durable watermark, signaling that the reader has processed
155    /// up through the given epoch.
156    pub fn update_durable(&self, epoch: u64) {
157        self.watermarks.update_durable(epoch);
158    }
159}
160
161/// Monitors the progress of a paired [`ViewSubscriber`] through epoch
162/// watermarks.
163///
164/// Cloneable — multiple tasks can wait on the same subscriber's progress.
165#[derive(Clone)]
166pub struct ViewMonitor {
167    watcher: EpochWatcher,
168    #[allow(dead_code)]
169    watermarks: Arc<EpochWatermarks>,
170}
171
172impl ViewMonitor {
173    /// Waits until the subscriber has processed at least `epoch` at the
174    /// given [`Durability`] level.
175    pub async fn wait(&mut self, epoch: u64, durability: Durability) -> Result<(), SubscribeError> {
176        self.watcher
177            .wait(epoch, durability)
178            .await
179            .map_err(|_| SubscribeError::Shutdown)?;
180        Ok(())
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    /// Creates a paired watermarks and `ViewMonitor` without needing a
189    /// full coordinator. This is sufficient for testing the update/wait
190    /// contract since those only touch the watermark channels.
191    fn create_pair() -> (Arc<EpochWatermarks>, ViewMonitor) {
192        let (watermarks, watcher) = EpochWatermarks::new();
193        let watermarks = Arc::new(watermarks);
194        let monitor = ViewMonitor {
195            watcher,
196            watermarks: watermarks.clone(),
197        };
198        (watermarks, monitor)
199    }
200
201    #[tokio::test]
202    async fn should_update_and_wait_applied() {
203        // given
204        let (watermarks, mut monitor) = create_pair();
205
206        // when
207        watermarks.update_applied(5);
208
209        // then
210        monitor.wait(5, Durability::Applied).await.unwrap();
211    }
212
213    #[tokio::test]
214    async fn should_update_and_wait_flushed() {
215        // given
216        let (watermarks, mut monitor) = create_pair();
217
218        // when
219        watermarks.update_written(3);
220
221        // then
222        monitor.wait(3, Durability::Written).await.unwrap();
223    }
224
225    #[tokio::test]
226    async fn should_update_and_wait_durable() {
227        // given
228        let (watermarks, mut monitor) = create_pair();
229
230        // when
231        watermarks.update_durable(7);
232
233        // then
234        monitor.wait(7, Durability::Durable).await.unwrap();
235    }
236
237    #[tokio::test]
238    async fn should_wait_for_epoch_already_reached() {
239        // given
240        let (watermarks, mut monitor) = create_pair();
241
242        // when - advance past the epoch we'll wait for
243        watermarks.update_durable(10);
244
245        // then - waiting for a lower epoch returns immediately
246        monitor.wait(5, Durability::Durable).await.unwrap();
247    }
248
249    #[tokio::test]
250    async fn should_track_levels_independently() {
251        // given
252        let (watermarks, mut monitor) = create_pair();
253
254        // when - advance each level to a different epoch
255        watermarks.update_applied(3);
256        watermarks.update_written(2);
257        watermarks.update_durable(1);
258
259        // then - each level tracks independently
260        monitor.wait(3, Durability::Applied).await.unwrap();
261        monitor.wait(2, Durability::Written).await.unwrap();
262        monitor.wait(1, Durability::Durable).await.unwrap();
263    }
264}