1use super::{BroadcastedView, WriteCommand};
2use super::{Delta, Durability, WriteError, WriteResult};
3use crate::StorageRead;
4use crate::coordinator::traits::EpochStamped;
5use crate::storage::StorageSnapshot;
6use futures::FutureExt;
7use futures::future::Shared;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::{broadcast, mpsc, oneshot, watch};
11
12pub struct View<D: Delta> {
27 pub current: D::DeltaView,
28 pub frozen: Vec<EpochStamped<D::FrozenView>>,
29 pub snapshot: Arc<dyn StorageSnapshot>,
30 pub last_written_delta: Option<EpochStamped<D::FrozenView>>,
31}
32
33impl<D: Delta> Clone for View<D> {
34 fn clone(&self) -> Self {
35 Self {
36 current: self.current.clone(),
37 frozen: self.frozen.clone(),
38 snapshot: self.snapshot.clone(),
39 last_written_delta: self.last_written_delta.clone(),
40 }
41 }
42}
43
44#[derive(Clone)]
49pub struct EpochWatcher {
50 pub applied_rx: watch::Receiver<u64>,
51 pub written_rx: watch::Receiver<u64>,
52 pub durable_rx: watch::Receiver<u64>,
53}
54
55impl EpochWatcher {
56 pub async fn wait(
61 &mut self,
62 epoch: u64,
63 durability: Durability,
64 ) -> Result<(), watch::error::RecvError> {
65 let rx = match durability {
66 Durability::Applied => &mut self.applied_rx,
67 Durability::Written => &mut self.written_rx,
68 Durability::Durable => &mut self.durable_rx,
69 };
70 rx.wait_for(|curr| *curr >= epoch).await.map(|_| ())
71 }
72}
73
74#[derive(Clone, Debug)]
76pub(crate) struct WriteApplied<M> {
77 pub epoch: u64,
78 pub result: M,
79}
80
81#[derive(Clone, Debug)]
83pub(crate) struct WriteFailed {
84 pub epoch: u64,
85 pub error: String,
86}
87
88pub(crate) type EpochResult<M> = Result<WriteApplied<M>, WriteFailed>;
90
91pub struct WriteHandle<M: Clone + Send + 'static = ()> {
96 inner: Shared<oneshot::Receiver<EpochResult<M>>>,
97 watchers: EpochWatcher,
98}
99
100impl<M: Clone + Send + 'static> WriteHandle<M> {
101 pub(crate) fn new(rx: oneshot::Receiver<EpochResult<M>>, watchers: EpochWatcher) -> Self {
102 Self {
103 inner: rx.shared(),
104 watchers,
105 }
106 }
107
108 async fn recv(&self) -> WriteResult<WriteApplied<M>> {
109 self.inner
110 .clone()
111 .await
112 .map_err(|_| WriteError::Shutdown)?
113 .map_err(|e| WriteError::ApplyError(e.epoch, e.error))
114 }
115
116 pub async fn epoch(&self) -> WriteResult<u64> {
122 Ok(self.recv().await?.epoch)
123 }
124
125 pub async fn wait(&mut self, durability: Durability) -> WriteResult<M> {
130 let WriteApplied { epoch, result } = self.recv().await?;
131
132 self.watchers
133 .wait(epoch, durability)
134 .await
135 .map_err(|_| WriteError::Shutdown)?;
136 Ok(result)
137 }
138}
139
140pub struct WriteCoordinatorHandle<D: Delta> {
145 write_tx: mpsc::Sender<WriteCommand<D>>,
146 watchers: EpochWatcher,
147 view: Arc<BroadcastedView<D>>,
148}
149
150impl<D: Delta> WriteCoordinatorHandle<D> {
151 pub(crate) fn new(
152 write_tx: mpsc::Sender<WriteCommand<D>>,
153 watchers: EpochWatcher,
154 view: Arc<BroadcastedView<D>>,
155 ) -> Self {
156 Self {
157 write_tx,
158 watchers,
159 view,
160 }
161 }
162
163 pub fn flushed_epoch(&self) -> u64 {
168 *self.watchers.written_rx.borrow()
169 }
170}
171
172impl<D: Delta> WriteCoordinatorHandle<D> {
173 pub async fn write_timeout(
185 &self,
186 write: D::Write,
187 timeout: Duration,
188 ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
189 let (tx, rx) = oneshot::channel();
190 self.write_tx
191 .send_timeout(
192 WriteCommand::Write {
193 write,
194 result_tx: tx,
195 },
196 timeout,
197 )
198 .await
199 .map_err(|e| match e {
200 mpsc::error::SendTimeoutError::Timeout(WriteCommand::Write { write, .. }) => {
201 WriteError::TimeoutError(write)
202 }
203 mpsc::error::SendTimeoutError::Closed(WriteCommand::Write { write, .. }) => {
204 WriteError::Shutdown
205 }
206 _ => unreachable!("sent a Write command"),
207 })?;
208
209 Ok(WriteHandle::new(rx, self.watchers.clone()))
210 }
211
212 pub async fn write(
222 &self,
223 write: D::Write,
224 ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
225 let (tx, rx) = oneshot::channel();
226 self.write_tx
227 .send(WriteCommand::Write {
228 write,
229 result_tx: tx,
230 })
231 .await
232 .map_err(|e| match e {
233 mpsc::error::SendError(WriteCommand::Write { write, .. }) => WriteError::Shutdown,
234 _ => unreachable!("sent a Write command"),
235 })?;
236
237 Ok(WriteHandle::new(rx, self.watchers.clone()))
238 }
239
240 pub async fn try_write(
247 &self,
248 write: D::Write,
249 ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
250 let (tx, rx) = oneshot::channel();
251 self.write_tx
252 .try_send(WriteCommand::Write {
253 write,
254 result_tx: tx,
255 })
256 .map_err(|e| match e {
257 mpsc::error::TrySendError::Full(WriteCommand::Write { write, .. }) => {
258 WriteError::Backpressure(write)
259 }
260 mpsc::error::TrySendError::Closed(WriteCommand::Write { write, .. }) => {
261 WriteError::Shutdown
262 }
263 _ => unreachable!("sent a Write command"),
264 })?;
265
266 Ok(WriteHandle::new(rx, self.watchers.clone()))
267 }
268
269 pub async fn flush(&self, flush_storage: bool) -> WriteResult<WriteHandle> {
276 let (tx, rx) = oneshot::channel();
277 self.write_tx
278 .try_send(WriteCommand::Flush {
279 epoch_tx: tx,
280 flush_storage,
281 })
282 .map_err(|e| match e {
283 mpsc::error::TrySendError::Full(_) => WriteError::Backpressure(()),
284 mpsc::error::TrySendError::Closed(_) => WriteError::Shutdown,
285 })?;
286
287 Ok(WriteHandle::new(rx, self.watchers.clone()))
288 }
289
290 pub fn view(&self) -> Arc<View<D>> {
291 self.view.current()
292 }
293
294 pub fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
295 self.view.subscribe()
296 }
297}
298
299impl<D: Delta> Clone for WriteCoordinatorHandle<D> {
300 fn clone(&self) -> Self {
301 Self {
302 write_tx: self.write_tx.clone(),
303 watchers: self.watchers.clone(),
304 view: self.view.clone(),
305 }
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312 use tokio::sync::watch;
313
314 fn create_watchers(
315 applied: watch::Receiver<u64>,
316 flushed: watch::Receiver<u64>,
317 durable: watch::Receiver<u64>,
318 ) -> EpochWatcher {
319 EpochWatcher {
320 applied_rx: applied,
321 written_rx: flushed,
322 durable_rx: durable,
323 }
324 }
325
326 #[tokio::test]
327 async fn should_return_epoch_when_assigned() {
328 let (tx, rx) = oneshot::channel();
330 let (_applied_tx, applied_rx) = watch::channel(0u64);
331 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
332 let (_durable_tx, durable_rx) = watch::channel(0u64);
333 let handle: WriteHandle<()> =
334 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
335
336 tx.send(Ok(WriteApplied {
338 epoch: 42,
339 result: (),
340 }))
341 .unwrap();
342 let result = handle.epoch().await;
343
344 assert!(result.is_ok());
346 assert_eq!(result.unwrap(), 42);
347 }
348
349 #[tokio::test]
350 async fn should_allow_multiple_epoch_calls() {
351 let (tx, rx) = oneshot::channel();
353 let (_applied_tx, applied_rx) = watch::channel(0u64);
354 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
355 let (_durable_tx, durable_rx) = watch::channel(0u64);
356 let handle: WriteHandle<()> =
357 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
358 tx.send(Ok(WriteApplied {
359 epoch: 42,
360 result: (),
361 }))
362 .unwrap();
363
364 let result1 = handle.epoch().await;
366 let result2 = handle.epoch().await;
367 let result3 = handle.epoch().await;
368
369 assert_eq!(result1.unwrap(), 42);
371 assert_eq!(result2.unwrap(), 42);
372 assert_eq!(result3.unwrap(), 42);
373 }
374
375 #[tokio::test]
376 async fn should_return_apply_result_from_wait() {
377 let (tx, rx) = oneshot::channel();
379 let (_applied_tx, applied_rx) = watch::channel(100u64);
380 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
381 let (_durable_tx, durable_rx) = watch::channel(0u64);
382 let mut handle: WriteHandle<String> =
383 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
384
385 tx.send(Ok(WriteApplied {
387 epoch: 1,
388 result: "hello".to_string(),
389 }))
390 .unwrap();
391
392 assert_eq!(handle.wait(Durability::Applied).await.unwrap(), "hello");
394 }
395
396 #[tokio::test]
397 async fn should_return_immediately_when_watermark_already_reached() {
398 let (tx, rx) = oneshot::channel();
400 let (_applied_tx, applied_rx) = watch::channel(100u64); let (_flushed_tx, flushed_rx) = watch::channel(0u64);
402 let (_durable_tx, durable_rx) = watch::channel(0u64);
403 let mut handle: WriteHandle<()> =
404 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
405 tx.send(Ok(WriteApplied {
406 epoch: 50,
407 result: (),
408 }))
409 .unwrap(); let result = handle.wait(Durability::Applied).await;
413
414 assert!(result.is_ok());
416 }
417
418 #[tokio::test]
419 async fn should_wait_until_watermark_reaches_epoch() {
420 let (tx, rx) = oneshot::channel();
422 let (applied_tx, applied_rx) = watch::channel(0u64);
423 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
424 let (_durable_tx, durable_rx) = watch::channel(0u64);
425 let mut handle: WriteHandle<()> =
426 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
427 tx.send(Ok(WriteApplied {
428 epoch: 10,
429 result: (),
430 }))
431 .unwrap();
432
433 let wait_task = tokio::spawn(async move { handle.wait(Durability::Applied).await });
435
436 tokio::task::yield_now().await;
437 applied_tx.send(5).unwrap(); tokio::task::yield_now().await;
439 applied_tx.send(10).unwrap(); let result = wait_task.await.unwrap();
442
443 assert!(result.is_ok());
445 }
446
447 #[tokio::test]
448 async fn should_wait_for_correct_durability_level() {
449 let (tx, rx) = oneshot::channel();
451 let (_applied_tx, applied_rx) = watch::channel(100u64);
452 let (_flushed_tx, flushed_rx) = watch::channel(50u64);
453 let (durable_tx, durable_rx) = watch::channel(10u64);
454 let mut handle: WriteHandle<()> =
455 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
456 tx.send(Ok(WriteApplied {
457 epoch: 25,
458 result: (),
459 }))
460 .unwrap();
461
462 let wait_task = tokio::spawn(async move { handle.wait(Durability::Durable).await });
464
465 tokio::task::yield_now().await;
466 durable_tx.send(25).unwrap(); let result = wait_task.await.unwrap();
469
470 assert!(result.is_ok());
472 }
473
474 #[tokio::test]
475 async fn should_propagate_epoch_error_in_wait() {
476 let (tx, rx) = oneshot::channel::<EpochResult<()>>();
478 let (_applied_tx, applied_rx) = watch::channel(0u64);
479 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
480 let (_durable_tx, durable_rx) = watch::channel(0u64);
481 let mut handle = WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
482
483 drop(tx);
485 let result = handle.wait(Durability::Applied).await;
486
487 assert!(matches!(result, Err(WriteError::Shutdown)));
489 }
490
491 #[tokio::test]
492 async fn should_propagate_apply_error_in_wait() {
493 let (tx, rx) = oneshot::channel();
495 let (_applied_tx, applied_rx) = watch::channel(0u64);
496 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
497 let (_durable_tx, durable_rx) = watch::channel(0u64);
498 let mut handle: WriteHandle<()> =
499 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
500
501 tx.send(Err(WriteFailed {
503 epoch: 1,
504 error: "apply error".into(),
505 }))
506 .unwrap();
507 let result = handle.wait(Durability::Applied).await;
508
509 assert!(
511 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
512 );
513 }
514
515 #[tokio::test]
516 async fn epoch_watcher_should_resolve_when_watermark_reached() {
517 let (applied_tx, applied_rx) = watch::channel(0u64);
519 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
520 let (_durable_tx, durable_rx) = watch::channel(0u64);
521 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
522
523 let wait_task = tokio::spawn(async move { watcher.wait(5, Durability::Applied).await });
525 tokio::task::yield_now().await;
526 applied_tx.send(5).unwrap();
527
528 assert!(wait_task.await.unwrap().is_ok());
530 }
531
532 #[tokio::test]
533 async fn epoch_watcher_should_resolve_immediately_when_already_reached() {
534 let (_applied_tx, applied_rx) = watch::channel(10u64);
536 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
537 let (_durable_tx, durable_rx) = watch::channel(0u64);
538 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
539
540 assert!(watcher.wait(5, Durability::Applied).await.is_ok());
542 }
543
544 #[tokio::test]
545 async fn epoch_watcher_should_select_correct_durability_receiver() {
546 let (_applied_tx, applied_rx) = watch::channel(0u64);
548 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
549 let (durable_tx, durable_rx) = watch::channel(0u64);
550 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
551
552 let wait_task = tokio::spawn(async move { watcher.wait(3, Durability::Durable).await });
554 tokio::task::yield_now().await;
555 durable_tx.send(3).unwrap();
556
557 assert!(wait_task.await.unwrap().is_ok());
559 }
560
561 #[tokio::test]
562 async fn epoch_watcher_should_return_error_on_sender_drop() {
563 let (applied_tx, applied_rx) = watch::channel(0u64);
565 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
566 let (_durable_tx, durable_rx) = watch::channel(0u64);
567 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
568
569 drop(applied_tx);
571 let result = watcher.wait(1, Durability::Applied).await;
572
573 assert!(result.is_err());
575 }
576}