Skip to main content

common/coordinator/
subscriber.rs

1//! View subscription and progress monitoring.
2//!
3//! When a caller subscribes to a [`WriteCoordinator`](super::WriteCoordinator),
4//! it receives a paired [`ViewSubscriber`] and [`ViewMonitor`]:
5//!
6//! - The **subscriber** receives [`View`] broadcasts from the coordinator
7//!   (on freeze and flush events) and advances epoch watermarks to signal
8//!   how far it has processed.
9//! - The **monitor** lets other tasks wait until the subscriber has reached
10//!   a given epoch at a given durability level.
11//!
12//! Together they form a progress-tracking channel: the subscriber drives
13//! forward, and the monitor synchronizes on that progress.
14//!
15//! # Usage
16//!
17//! ```ignore
18//! // Subscribe to the coordinator.
19//! let (mut subscriber, monitor) = coordinator.subscribe();
20//!
21//! // Spawn a task that processes views as they arrive.
22//! tokio::spawn(async move {
23//!     // Initialize returns the initial view, captured atomically with
24//!     // the subscription. Must be called before recv().
25//!     let initial_view = subscriber.initialize();
26//!     // Bootstrap read state from initial_view ...
27//!
28//!     while let Ok(view) = subscriber.recv().await {
29//!         // Process the view (update read state)...
30//!         subscriber.update_durable(epoch);
31//!     }
32//! });
33//!
34//! // Elsewhere, wait for the subscriber to catch up.
35//! monitor.clone().wait(epoch, Durability::Durable).await?;
36//! ```
37
38use std::sync::Arc;
39
40use tokio::sync::broadcast;
41
42use super::traits::EpochStamped;
43use super::{Delta, Durability, EpochWatcher, EpochWatermarks, View};
44
45/// Error type for subscriber and monitor operations.
46#[derive(Debug)]
47pub enum SubscribeError {
48    /// The coordinator has shut down.
49    Shutdown,
50    /// [`ViewSubscriber::recv()`] was called before [`ViewSubscriber::initialize()`].
51    NotInitialized,
52}
53
54impl std::fmt::Display for SubscribeError {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        match self {
57            SubscribeError::Shutdown => write!(f, "coordinator shut down"),
58            SubscribeError::NotInitialized => {
59                write!(f, "initialize() must be called before recv()")
60            }
61        }
62    }
63}
64
65impl std::error::Error for SubscribeError {}
66
67/// Receives [`View`] broadcasts from the coordinator and advances epoch
68/// watermarks to signal progress. Paired with a [`ViewMonitor`] that can
69/// wait on the watermarks this subscriber advances.
70pub struct ViewSubscriber<D: Delta> {
71    view_rx: broadcast::Receiver<Arc<View<D>>>,
72    initial_view: Option<Arc<View<D>>>,
73    watermarks: Arc<EpochWatermarks>,
74}
75
76impl<D: Delta> ViewSubscriber<D> {
77    /// Creates a new `ViewSubscriber` and paired `ViewMonitor`.
78    pub fn new(
79        view_rx: broadcast::Receiver<Arc<View<D>>>,
80        initial_view: Arc<View<D>>,
81    ) -> (Self, ViewMonitor) {
82        let (watermarks, watcher) = EpochWatermarks::new();
83        let watermarks = Arc::new(watermarks);
84        let subscriber = Self {
85            view_rx,
86            initial_view: Some(initial_view),
87            watermarks: watermarks.clone(),
88        };
89        let monitor = ViewMonitor {
90            watcher,
91            watermarks,
92        };
93        (subscriber, monitor)
94    }
95
96    /// Takes the initial view captured at subscription time, marking the
97    /// subscriber as ready to receive broadcasts.
98    ///
99    /// Must be called exactly once before [`recv()`](Self::recv). The initial
100    /// view is captured atomically with the broadcast subscription, so it is
101    /// safe to use even when subscribing to an active writer.
102    pub fn initialize(&mut self) -> Arc<View<D>> {
103        self.initial_view
104            .take()
105            .expect("initialize() must be called exactly once")
106    }
107
108    /// Receives the next view broadcast from the coordinator.
109    ///
110    /// Returns [`SubscribeError::NotInitialized`] if [`initialize()`](Self::initialize)
111    /// has not been called.
112    pub async fn recv(&mut self) -> Result<Arc<View<D>>, SubscribeError> {
113        if self.initial_view.is_some() {
114            return Err(SubscribeError::NotInitialized);
115        }
116        loop {
117            match self.view_rx.recv().await {
118                Ok(view) => {
119                    return Ok(view);
120                }
121                Err(broadcast::error::RecvError::Lagged(_)) => {
122                    // TODO: Skipping missed views is not safe in general.
123                    // Consumers may depend on processing every view (e.g. to
124                    // apply new segments from each flush). Recovery likely
125                    // requires killing this subscriber and resubscribing to
126                    // get a fresh initial view to reset state from.
127                    continue;
128                }
129                Err(broadcast::error::RecvError::Closed) => {
130                    return Err(SubscribeError::Shutdown);
131                }
132            }
133        }
134    }
135
136    /// Advances the applied watermark, signaling that the reader has processed
137    /// up through the given epoch.
138    pub fn update_applied(&self, epoch: u64) {
139        self.watermarks.update_applied(epoch);
140    }
141
142    /// Advances the flushed watermark, signaling that the reader has processed
143    /// up through the given epoch.
144    pub fn update_written(&self, epoch: u64) {
145        self.watermarks.update_written(epoch);
146    }
147
148    /// Advances the durable watermark, signaling that the reader has processed
149    /// up through the given epoch.
150    pub fn update_durable(&self, epoch: u64) {
151        self.watermarks.update_durable(epoch);
152    }
153}
154
155/// Monitors the progress of a paired [`ViewSubscriber`] through epoch
156/// watermarks.
157///
158/// Cloneable — multiple tasks can wait on the same subscriber's progress.
159#[derive(Clone)]
160pub struct ViewMonitor {
161    watcher: EpochWatcher,
162    #[allow(dead_code)]
163    watermarks: Arc<EpochWatermarks>,
164}
165
166impl ViewMonitor {
167    /// Waits until the subscriber has processed at least `epoch` at the
168    /// given [`Durability`] level.
169    pub async fn wait(&mut self, epoch: u64, durability: Durability) -> Result<(), SubscribeError> {
170        self.watcher
171            .wait(epoch, durability)
172            .await
173            .map_err(|_| SubscribeError::Shutdown)?;
174        Ok(())
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    /// Creates a paired watermarks and `ViewMonitor` without needing a
183    /// full coordinator. This is sufficient for testing the update/wait
184    /// contract since those only touch the watermark channels.
185    fn create_pair() -> (Arc<EpochWatermarks>, ViewMonitor) {
186        let (watermarks, watcher) = EpochWatermarks::new();
187        let watermarks = Arc::new(watermarks);
188        let monitor = ViewMonitor {
189            watcher,
190            watermarks: watermarks.clone(),
191        };
192        (watermarks, monitor)
193    }
194
195    #[tokio::test]
196    async fn should_update_and_wait_applied() {
197        // given
198        let (watermarks, mut monitor) = create_pair();
199
200        // when
201        watermarks.update_applied(5);
202
203        // then
204        monitor.wait(5, Durability::Applied).await.unwrap();
205    }
206
207    #[tokio::test]
208    async fn should_update_and_wait_flushed() {
209        // given
210        let (watermarks, mut monitor) = create_pair();
211
212        // when
213        watermarks.update_written(3);
214
215        // then
216        monitor.wait(3, Durability::Written).await.unwrap();
217    }
218
219    #[tokio::test]
220    async fn should_update_and_wait_durable() {
221        // given
222        let (watermarks, mut monitor) = create_pair();
223
224        // when
225        watermarks.update_durable(7);
226
227        // then
228        monitor.wait(7, Durability::Durable).await.unwrap();
229    }
230
231    #[tokio::test]
232    async fn should_wait_for_epoch_already_reached() {
233        // given
234        let (watermarks, mut monitor) = create_pair();
235
236        // when - advance past the epoch we'll wait for
237        watermarks.update_durable(10);
238
239        // then - waiting for a lower epoch returns immediately
240        monitor.wait(5, Durability::Durable).await.unwrap();
241    }
242
243    #[tokio::test]
244    async fn should_track_levels_independently() {
245        // given
246        let (watermarks, mut monitor) = create_pair();
247
248        // when - advance each level to a different epoch
249        watermarks.update_applied(3);
250        watermarks.update_written(2);
251        watermarks.update_durable(1);
252
253        // then - each level tracks independently
254        monitor.wait(3, Durability::Applied).await.unwrap();
255        monitor.wait(2, Durability::Written).await.unwrap();
256        monitor.wait(1, Durability::Durable).await.unwrap();
257    }
258}