common/coordinator/subscriber.rs
1//! View subscription and progress monitoring.
2//!
3//! When a caller subscribes to a [`WriteCoordinator`](super::WriteCoordinator),
4//! it receives a paired [`ViewSubscriber`] and [`ViewMonitor`]:
5//!
6//! - The **subscriber** receives [`View`] broadcasts from the coordinator
7//! (on freeze and flush events) and advances epoch watermarks to signal
8//! how far it has processed.
9//! - The **monitor** lets other tasks wait until the subscriber has reached
10//! a given epoch at a given durability level.
11//!
12//! Together they form a progress-tracking channel: the subscriber drives
13//! forward, and the monitor synchronizes on that progress.
14//!
15//! # Usage
16//!
17//! ```ignore
18//! // Subscribe to the coordinator.
19//! let (mut subscriber, monitor) = coordinator.subscribe();
20//!
21//! // Spawn a task that processes views as they arrive.
22//! tokio::spawn(async move {
23//! // Initialize returns the initial view, captured atomically with
24//! // the subscription. Must be called before recv().
25//! let initial_view = subscriber.initialize();
26//! // Bootstrap read state from initial_view ...
27//!
28//! while let Ok(view) = subscriber.recv().await {
29//! // Process the view (update read state)...
30//! subscriber.update_durable(epoch);
31//! }
32//! });
33//!
34//! // Elsewhere, wait for the subscriber to catch up.
35//! monitor.clone().wait(epoch, Durability::Durable).await?;
36//! ```
37
38use std::sync::Arc;
39
40use tokio::sync::broadcast;
41
42use super::traits::EpochStamped;
43use super::{Delta, Durability, EpochWatcher, EpochWatermarks, View};
44
45/// Error type for subscriber and monitor operations.
46#[derive(Debug)]
47pub enum SubscribeError {
48 /// The coordinator has shut down.
49 Shutdown,
50 /// [`ViewSubscriber::recv()`] was called before [`ViewSubscriber::initialize()`].
51 NotInitialized,
52}
53
54impl std::fmt::Display for SubscribeError {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 match self {
57 SubscribeError::Shutdown => write!(f, "coordinator shut down"),
58 SubscribeError::NotInitialized => {
59 write!(f, "initialize() must be called before recv()")
60 }
61 }
62 }
63}
64
65impl std::error::Error for SubscribeError {}
66
67/// Receives [`View`] broadcasts from the coordinator and advances epoch
68/// watermarks to signal progress. Paired with a [`ViewMonitor`] that can
69/// wait on the watermarks this subscriber advances.
70pub struct ViewSubscriber<D: Delta> {
71 view_rx: broadcast::Receiver<Arc<View<D>>>,
72 initial_view: Option<Arc<View<D>>>,
73 watermarks: Arc<EpochWatermarks>,
74}
75
76impl<D: Delta> ViewSubscriber<D> {
77 /// Creates a new `ViewSubscriber` and paired `ViewMonitor`.
78 pub fn new(
79 view_rx: broadcast::Receiver<Arc<View<D>>>,
80 initial_view: Arc<View<D>>,
81 ) -> (Self, ViewMonitor) {
82 let (watermarks, watcher) = EpochWatermarks::new();
83 let watermarks = Arc::new(watermarks);
84 let subscriber = Self {
85 view_rx,
86 initial_view: Some(initial_view),
87 watermarks: watermarks.clone(),
88 };
89 let monitor = ViewMonitor {
90 watcher,
91 watermarks,
92 };
93 (subscriber, monitor)
94 }
95
96 /// Takes the initial view captured at subscription time, marking the
97 /// subscriber as ready to receive broadcasts.
98 ///
99 /// Must be called exactly once before [`recv()`](Self::recv). The initial
100 /// view is captured atomically with the broadcast subscription, so it is
101 /// safe to use even when subscribing to an active writer.
102 pub fn initialize(&mut self) -> Arc<View<D>> {
103 self.initial_view
104 .take()
105 .expect("initialize() must be called exactly once")
106 }
107
108 /// Receives the next view broadcast from the coordinator.
109 ///
110 /// Returns [`SubscribeError::NotInitialized`] if [`initialize()`](Self::initialize)
111 /// has not been called.
112 pub async fn recv(&mut self) -> Result<Arc<View<D>>, SubscribeError> {
113 if self.initial_view.is_some() {
114 return Err(SubscribeError::NotInitialized);
115 }
116 loop {
117 match self.view_rx.recv().await {
118 Ok(view) => {
119 return Ok(view);
120 }
121 Err(broadcast::error::RecvError::Lagged(_)) => {
122 // TODO: Skipping missed views is not safe in general.
123 // Consumers may depend on processing every view (e.g. to
124 // apply new segments from each flush). Recovery likely
125 // requires killing this subscriber and resubscribing to
126 // get a fresh initial view to reset state from.
127 continue;
128 }
129 Err(broadcast::error::RecvError::Closed) => {
130 return Err(SubscribeError::Shutdown);
131 }
132 }
133 }
134 }
135
136 /// Advances the applied watermark, signaling that the reader has processed
137 /// up through the given epoch.
138 pub fn update_applied(&self, epoch: u64) {
139 self.watermarks.update_applied(epoch);
140 }
141
142 /// Advances the flushed watermark, signaling that the reader has processed
143 /// up through the given epoch.
144 pub fn update_written(&self, epoch: u64) {
145 self.watermarks.update_written(epoch);
146 }
147
148 /// Advances the durable watermark, signaling that the reader has processed
149 /// up through the given epoch.
150 pub fn update_durable(&self, epoch: u64) {
151 self.watermarks.update_durable(epoch);
152 }
153}
154
155/// Monitors the progress of a paired [`ViewSubscriber`] through epoch
156/// watermarks.
157///
158/// Cloneable — multiple tasks can wait on the same subscriber's progress.
159#[derive(Clone)]
160pub struct ViewMonitor {
161 watcher: EpochWatcher,
162 #[allow(dead_code)]
163 watermarks: Arc<EpochWatermarks>,
164}
165
166impl ViewMonitor {
167 /// Waits until the subscriber has processed at least `epoch` at the
168 /// given [`Durability`] level.
169 pub async fn wait(&mut self, epoch: u64, durability: Durability) -> Result<(), SubscribeError> {
170 self.watcher
171 .wait(epoch, durability)
172 .await
173 .map_err(|_| SubscribeError::Shutdown)?;
174 Ok(())
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 /// Creates a paired watermarks and `ViewMonitor` without needing a
183 /// full coordinator. This is sufficient for testing the update/wait
184 /// contract since those only touch the watermark channels.
185 fn create_pair() -> (Arc<EpochWatermarks>, ViewMonitor) {
186 let (watermarks, watcher) = EpochWatermarks::new();
187 let watermarks = Arc::new(watermarks);
188 let monitor = ViewMonitor {
189 watcher,
190 watermarks: watermarks.clone(),
191 };
192 (watermarks, monitor)
193 }
194
195 #[tokio::test]
196 async fn should_update_and_wait_applied() {
197 // given
198 let (watermarks, mut monitor) = create_pair();
199
200 // when
201 watermarks.update_applied(5);
202
203 // then
204 monitor.wait(5, Durability::Applied).await.unwrap();
205 }
206
207 #[tokio::test]
208 async fn should_update_and_wait_flushed() {
209 // given
210 let (watermarks, mut monitor) = create_pair();
211
212 // when
213 watermarks.update_written(3);
214
215 // then
216 monitor.wait(3, Durability::Written).await.unwrap();
217 }
218
219 #[tokio::test]
220 async fn should_update_and_wait_durable() {
221 // given
222 let (watermarks, mut monitor) = create_pair();
223
224 // when
225 watermarks.update_durable(7);
226
227 // then
228 monitor.wait(7, Durability::Durable).await.unwrap();
229 }
230
231 #[tokio::test]
232 async fn should_wait_for_epoch_already_reached() {
233 // given
234 let (watermarks, mut monitor) = create_pair();
235
236 // when - advance past the epoch we'll wait for
237 watermarks.update_durable(10);
238
239 // then - waiting for a lower epoch returns immediately
240 monitor.wait(5, Durability::Durable).await.unwrap();
241 }
242
243 #[tokio::test]
244 async fn should_track_levels_independently() {
245 // given
246 let (watermarks, mut monitor) = create_pair();
247
248 // when - advance each level to a different epoch
249 watermarks.update_applied(3);
250 watermarks.update_written(2);
251 watermarks.update_durable(1);
252
253 // then - each level tracks independently
254 monitor.wait(3, Durability::Applied).await.unwrap();
255 monitor.wait(2, Durability::Written).await.unwrap();
256 monitor.wait(1, Durability::Durable).await.unwrap();
257 }
258}