1use super::WriteCommand;
2use super::{Delta, Durability, WriteError, WriteResult};
3use crate::StorageRead;
4use crate::coordinator::traits::EpochStamped;
5use crate::storage::StorageSnapshot;
6use futures::FutureExt;
7use futures::future::Shared;
8use std::sync::Arc;
9use tokio::sync::{broadcast, mpsc, oneshot, watch};
10
11pub struct View<D: Delta> {
15 pub current: D::DeltaView,
16 pub frozen: Vec<EpochStamped<D::FrozenView>>,
17 pub snapshot: Arc<dyn StorageSnapshot>,
18 pub last_flushed_delta: Option<EpochStamped<D::FrozenView>>,
19}
20
21impl<D: Delta> Clone for View<D> {
22 fn clone(&self) -> Self {
23 Self {
24 current: self.current.clone(),
25 frozen: self.frozen.clone(),
26 snapshot: self.snapshot.clone(),
27 last_flushed_delta: self.last_flushed_delta.clone(),
28 }
29 }
30}
31
32#[derive(Clone)]
37pub(crate) struct EpochWatcher {
38 pub applied_rx: watch::Receiver<u64>,
39 pub flushed_rx: watch::Receiver<u64>,
40 pub durable_rx: watch::Receiver<u64>,
41}
42
43#[derive(Clone, Debug)]
45pub(crate) struct WriteApplied<M> {
46 pub epoch: u64,
47 pub result: M,
48}
49
50#[derive(Clone, Debug)]
52pub(crate) struct WriteFailed {
53 pub epoch: u64,
54 pub error: String,
55}
56
57pub(crate) type EpochResult<M> = Result<WriteApplied<M>, WriteFailed>;
59
60pub struct WriteHandle<M: Clone + Send + 'static = ()> {
65 inner: Shared<oneshot::Receiver<EpochResult<M>>>,
66 watchers: EpochWatcher,
67}
68
69impl<M: Clone + Send + 'static> WriteHandle<M> {
70 pub(crate) fn new(rx: oneshot::Receiver<EpochResult<M>>, watchers: EpochWatcher) -> Self {
71 Self {
72 inner: rx.shared(),
73 watchers,
74 }
75 }
76
77 async fn recv(&self) -> WriteResult<WriteApplied<M>> {
78 self.inner
79 .clone()
80 .await
81 .map_err(|_| WriteError::Shutdown)?
82 .map_err(|e| WriteError::ApplyError(e.epoch, e.error))
83 }
84
85 #[cfg(test)]
91 pub async fn epoch(&self) -> WriteResult<u64> {
92 Ok(self.recv().await?.epoch)
93 }
94
95 pub async fn wait(&mut self, durability: Durability) -> WriteResult<M> {
100 let WriteApplied { epoch, result } = self.recv().await?;
101
102 let recv = match durability {
103 Durability::Applied => &mut self.watchers.applied_rx,
104 Durability::Flushed => &mut self.watchers.flushed_rx,
105 Durability::Durable => &mut self.watchers.durable_rx,
106 };
107
108 recv.wait_for(|curr| *curr >= epoch)
109 .await
110 .map_err(|_| WriteError::Shutdown)?;
111 Ok(result)
112 }
113}
114
115pub struct WriteCoordinatorHandle<D: Delta> {
120 write_tx: mpsc::Sender<WriteCommand<D>>,
121 watchers: EpochWatcher,
122}
123
124impl<D: Delta> WriteCoordinatorHandle<D> {
125 pub(crate) fn new(write_tx: mpsc::Sender<WriteCommand<D>>, watchers: EpochWatcher) -> Self {
126 Self { write_tx, watchers }
127 }
128}
129
130impl<D: Delta> WriteCoordinatorHandle<D> {
131 pub async fn write(&self, write: D::Write) -> WriteResult<WriteHandle<D::ApplyResult>> {
136 let (tx, rx) = oneshot::channel();
137 self.write_tx
138 .try_send(WriteCommand::Write {
139 write,
140 result_tx: tx,
141 })
142 .map_err(|e| match e {
143 mpsc::error::TrySendError::Full(_) => WriteError::Backpressure,
144 mpsc::error::TrySendError::Closed(_) => WriteError::Shutdown,
145 })?;
146
147 Ok(WriteHandle::new(rx, self.watchers.clone()))
148 }
149
150 pub async fn flush(&self, flush_storage: bool) -> WriteResult<WriteHandle> {
157 let (tx, rx) = oneshot::channel();
158 self.write_tx
159 .try_send(WriteCommand::Flush {
160 epoch_tx: tx,
161 flush_storage,
162 })
163 .map_err(|e| match e {
164 mpsc::error::TrySendError::Full(_) => WriteError::Backpressure,
165 mpsc::error::TrySendError::Closed(_) => WriteError::Shutdown,
166 })?;
167
168 Ok(WriteHandle::new(rx, self.watchers.clone()))
169 }
170}
171
172impl<D: Delta> Clone for WriteCoordinatorHandle<D> {
173 fn clone(&self) -> Self {
174 Self {
175 write_tx: self.write_tx.clone(),
176 watchers: self.watchers.clone(),
177 }
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use tokio::sync::watch;
185
186 fn create_watchers(
187 applied: watch::Receiver<u64>,
188 flushed: watch::Receiver<u64>,
189 durable: watch::Receiver<u64>,
190 ) -> EpochWatcher {
191 EpochWatcher {
192 applied_rx: applied,
193 flushed_rx: flushed,
194 durable_rx: durable,
195 }
196 }
197
198 #[tokio::test]
199 async fn should_return_epoch_when_assigned() {
200 let (tx, rx) = oneshot::channel();
202 let (_applied_tx, applied_rx) = watch::channel(0u64);
203 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
204 let (_durable_tx, durable_rx) = watch::channel(0u64);
205 let handle: WriteHandle<()> =
206 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
207
208 tx.send(Ok(WriteApplied {
210 epoch: 42,
211 result: (),
212 }))
213 .unwrap();
214 let result = handle.epoch().await;
215
216 assert!(result.is_ok());
218 assert_eq!(result.unwrap(), 42);
219 }
220
221 #[tokio::test]
222 async fn should_allow_multiple_epoch_calls() {
223 let (tx, rx) = oneshot::channel();
225 let (_applied_tx, applied_rx) = watch::channel(0u64);
226 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
227 let (_durable_tx, durable_rx) = watch::channel(0u64);
228 let handle: WriteHandle<()> =
229 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
230 tx.send(Ok(WriteApplied {
231 epoch: 42,
232 result: (),
233 }))
234 .unwrap();
235
236 let result1 = handle.epoch().await;
238 let result2 = handle.epoch().await;
239 let result3 = handle.epoch().await;
240
241 assert_eq!(result1.unwrap(), 42);
243 assert_eq!(result2.unwrap(), 42);
244 assert_eq!(result3.unwrap(), 42);
245 }
246
247 #[tokio::test]
248 async fn should_return_apply_result_from_wait() {
249 let (tx, rx) = oneshot::channel();
251 let (_applied_tx, applied_rx) = watch::channel(100u64);
252 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
253 let (_durable_tx, durable_rx) = watch::channel(0u64);
254 let mut handle: WriteHandle<String> =
255 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
256
257 tx.send(Ok(WriteApplied {
259 epoch: 1,
260 result: "hello".to_string(),
261 }))
262 .unwrap();
263
264 assert_eq!(handle.wait(Durability::Applied).await.unwrap(), "hello");
266 }
267
268 #[tokio::test]
269 async fn should_return_immediately_when_watermark_already_reached() {
270 let (tx, rx) = oneshot::channel();
272 let (_applied_tx, applied_rx) = watch::channel(100u64); let (_flushed_tx, flushed_rx) = watch::channel(0u64);
274 let (_durable_tx, durable_rx) = watch::channel(0u64);
275 let mut handle: WriteHandle<()> =
276 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
277 tx.send(Ok(WriteApplied {
278 epoch: 50,
279 result: (),
280 }))
281 .unwrap(); let result = handle.wait(Durability::Applied).await;
285
286 assert!(result.is_ok());
288 }
289
290 #[tokio::test]
291 async fn should_wait_until_watermark_reaches_epoch() {
292 let (tx, rx) = oneshot::channel();
294 let (applied_tx, applied_rx) = watch::channel(0u64);
295 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
296 let (_durable_tx, durable_rx) = watch::channel(0u64);
297 let mut handle: WriteHandle<()> =
298 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
299 tx.send(Ok(WriteApplied {
300 epoch: 10,
301 result: (),
302 }))
303 .unwrap();
304
305 let wait_task = tokio::spawn(async move { handle.wait(Durability::Applied).await });
307
308 tokio::task::yield_now().await;
309 applied_tx.send(5).unwrap(); tokio::task::yield_now().await;
311 applied_tx.send(10).unwrap(); let result = wait_task.await.unwrap();
314
315 assert!(result.is_ok());
317 }
318
319 #[tokio::test]
320 async fn should_wait_for_correct_durability_level() {
321 let (tx, rx) = oneshot::channel();
323 let (_applied_tx, applied_rx) = watch::channel(100u64);
324 let (_flushed_tx, flushed_rx) = watch::channel(50u64);
325 let (durable_tx, durable_rx) = watch::channel(10u64);
326 let mut handle: WriteHandle<()> =
327 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
328 tx.send(Ok(WriteApplied {
329 epoch: 25,
330 result: (),
331 }))
332 .unwrap();
333
334 let wait_task = tokio::spawn(async move { handle.wait(Durability::Durable).await });
336
337 tokio::task::yield_now().await;
338 durable_tx.send(25).unwrap(); let result = wait_task.await.unwrap();
341
342 assert!(result.is_ok());
344 }
345
346 #[tokio::test]
347 async fn should_propagate_epoch_error_in_wait() {
348 let (tx, rx) = oneshot::channel::<EpochResult<()>>();
350 let (_applied_tx, applied_rx) = watch::channel(0u64);
351 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
352 let (_durable_tx, durable_rx) = watch::channel(0u64);
353 let mut handle = WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
354
355 drop(tx);
357 let result = handle.wait(Durability::Applied).await;
358
359 assert!(matches!(result, Err(WriteError::Shutdown)));
361 }
362
363 #[tokio::test]
364 async fn should_propagate_apply_error_in_wait() {
365 let (tx, rx) = oneshot::channel();
367 let (_applied_tx, applied_rx) = watch::channel(0u64);
368 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
369 let (_durable_tx, durable_rx) = watch::channel(0u64);
370 let mut handle: WriteHandle<()> =
371 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
372
373 tx.send(Err(WriteFailed {
375 epoch: 1,
376 error: "apply error".into(),
377 }))
378 .unwrap();
379 let result = handle.wait(Durability::Applied).await;
380
381 assert!(
383 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
384 );
385 }
386}