1use super::metrics;
2use super::{BroadcastedView, WriteCommand};
3use super::{Delta, Durability, WriteError, WriteResult};
4use crate::StorageRead;
5use crate::coordinator::traits::EpochStamped;
6use crate::storage::StorageSnapshot;
7use futures::FutureExt;
8use futures::future::Shared;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::{broadcast, mpsc, oneshot, watch};
12
13pub struct View<D: Delta> {
28 pub current: D::DeltaView,
29 pub frozen: Vec<EpochStamped<D::FrozenView>>,
30 pub snapshot: Arc<dyn StorageSnapshot>,
31 pub last_written_delta: Option<EpochStamped<D::FrozenView>>,
32}
33
34impl<D: Delta> Clone for View<D> {
35 fn clone(&self) -> Self {
36 Self {
37 current: self.current.clone(),
38 frozen: self.frozen.clone(),
39 snapshot: self.snapshot.clone(),
40 last_written_delta: self.last_written_delta.clone(),
41 }
42 }
43}
44
45#[derive(Clone)]
50pub struct EpochWatcher {
51 pub applied_rx: watch::Receiver<u64>,
52 pub written_rx: watch::Receiver<u64>,
53 pub durable_rx: watch::Receiver<u64>,
54}
55
56impl EpochWatcher {
57 pub async fn wait(
62 &mut self,
63 epoch: u64,
64 durability: Durability,
65 ) -> Result<(), watch::error::RecvError> {
66 let rx = match durability {
67 Durability::Applied => &mut self.applied_rx,
68 Durability::Written => &mut self.written_rx,
69 Durability::Durable => &mut self.durable_rx,
70 };
71 rx.wait_for(|curr| *curr >= epoch).await.map(|_| ())
72 }
73}
74
75#[derive(Clone, Debug)]
77pub(crate) struct WriteApplied<M> {
78 pub epoch: u64,
79 pub result: M,
80}
81
82#[derive(Clone, Debug)]
84pub(crate) struct WriteFailed {
85 pub epoch: u64,
86 pub error: String,
87}
88
89pub(crate) type EpochResult<M> = Result<WriteApplied<M>, WriteFailed>;
91
92pub struct WriteHandle<M: Clone + Send + 'static = ()> {
97 inner: Shared<oneshot::Receiver<EpochResult<M>>>,
98 watchers: EpochWatcher,
99}
100
101impl<M: Clone + Send + 'static> WriteHandle<M> {
102 pub(crate) fn new(rx: oneshot::Receiver<EpochResult<M>>, watchers: EpochWatcher) -> Self {
103 Self {
104 inner: rx.shared(),
105 watchers,
106 }
107 }
108
109 async fn recv(&self) -> WriteResult<WriteApplied<M>> {
110 self.inner
111 .clone()
112 .await
113 .map_err(|_| WriteError::Shutdown)?
114 .map_err(|e| WriteError::ApplyError(e.epoch, e.error))
115 }
116
117 pub async fn epoch(&self) -> WriteResult<u64> {
123 Ok(self.recv().await?.epoch)
124 }
125
126 pub async fn wait(&mut self, durability: Durability) -> WriteResult<M> {
131 let WriteApplied { epoch, result } = self.recv().await?;
132
133 self.watchers
134 .wait(epoch, durability)
135 .await
136 .map_err(|_| WriteError::Shutdown)?;
137 Ok(result)
138 }
139}
140
141pub struct WriteCoordinatorHandle<D: Delta> {
146 name: Arc<str>,
147 write_tx: mpsc::Sender<WriteCommand<D>>,
148 watchers: EpochWatcher,
149 view: Arc<BroadcastedView<D>>,
150}
151
152impl<D: Delta> WriteCoordinatorHandle<D> {
153 pub(crate) fn new(
154 name: String,
155 write_tx: mpsc::Sender<WriteCommand<D>>,
156 watchers: EpochWatcher,
157 view: Arc<BroadcastedView<D>>,
158 ) -> Self {
159 Self {
160 name: Arc::from(name),
161 write_tx,
162 watchers,
163 view,
164 }
165 }
166
167 pub fn flushed_epoch(&self) -> u64 {
172 *self.watchers.written_rx.borrow()
173 }
174
175 fn record_queue_depth(&self) {
177 let max = self.write_tx.max_capacity();
178 let free = self.write_tx.capacity();
179 ::metrics::gauge!(
180 metrics::COORDINATOR_QUEUE_DEPTH,
181 "channel" => self.name.to_string(),
182 )
183 .set(max.saturating_sub(free) as f64);
184 }
185
186 fn record_send(&self, command: &'static str, status: &'static str, started: Instant) {
187 ::metrics::histogram!(
188 metrics::COORDINATOR_SEND_DURATION_SECONDS,
189 "command" => command,
190 "status" => status,
191 )
192 .record(started.elapsed().as_secs_f64());
193 }
194
195 fn record_backpressure(&self, command: &'static str, reason: &'static str) {
196 ::metrics::counter!(
197 metrics::COORDINATOR_QUEUE_BACKPRESSURE_TOTAL,
198 "command" => command,
199 "reason" => reason,
200 )
201 .increment(1);
202 }
203}
204
205impl<D: Delta> WriteCoordinatorHandle<D> {
206 pub async fn write_timeout(
218 &self,
219 write: D::Write,
220 timeout: Duration,
221 ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
222 const COMMAND: &str = "write_timeout";
223 self.record_queue_depth();
224 let started = Instant::now();
225 let (tx, rx) = oneshot::channel();
226 let send_result = self
227 .write_tx
228 .send_timeout(
229 WriteCommand::Write {
230 write,
231 result_tx: tx,
232 },
233 timeout,
234 )
235 .await;
236 match send_result {
237 Ok(()) => {
238 self.record_send(COMMAND, "ok", started);
239 Ok(WriteHandle::new(rx, self.watchers.clone()))
240 }
241 Err(mpsc::error::SendTimeoutError::Timeout(WriteCommand::Write { write, .. })) => {
242 self.record_send(COMMAND, "timeout", started);
243 self.record_backpressure(COMMAND, "timeout");
244 Err(WriteError::TimeoutError(write))
245 }
246 Err(mpsc::error::SendTimeoutError::Closed(WriteCommand::Write { .. })) => {
247 self.record_send(COMMAND, "shutdown", started);
248 self.record_backpressure(COMMAND, "closed");
249 Err(WriteError::Shutdown)
250 }
251 Err(_) => unreachable!("sent a Write command"),
252 }
253 }
254
255 pub async fn write(
265 &self,
266 write: D::Write,
267 ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
268 const COMMAND: &str = "write";
269 self.record_queue_depth();
270 let started = Instant::now();
271 let (tx, rx) = oneshot::channel();
272 let send_result = self
273 .write_tx
274 .send(WriteCommand::Write {
275 write,
276 result_tx: tx,
277 })
278 .await;
279 match send_result {
280 Ok(()) => {
281 self.record_send(COMMAND, "ok", started);
282 Ok(WriteHandle::new(rx, self.watchers.clone()))
283 }
284 Err(mpsc::error::SendError(WriteCommand::Write { .. })) => {
285 self.record_send(COMMAND, "shutdown", started);
286 self.record_backpressure(COMMAND, "closed");
287 Err(WriteError::Shutdown)
288 }
289 Err(_) => unreachable!("sent a Write command"),
290 }
291 }
292
293 pub async fn try_write(
300 &self,
301 write: D::Write,
302 ) -> Result<WriteHandle<D::ApplyResult>, WriteError<D::Write>> {
303 const COMMAND: &str = "try_write";
304 self.record_queue_depth();
305 let started = Instant::now();
306 let (tx, rx) = oneshot::channel();
307 let send_result = self.write_tx.try_send(WriteCommand::Write {
308 write,
309 result_tx: tx,
310 });
311 match send_result {
312 Ok(()) => {
313 self.record_send(COMMAND, "ok", started);
314 Ok(WriteHandle::new(rx, self.watchers.clone()))
315 }
316 Err(mpsc::error::TrySendError::Full(WriteCommand::Write { write, .. })) => {
317 self.record_send(COMMAND, "backpressure", started);
318 self.record_backpressure(COMMAND, "full");
319 Err(WriteError::Backpressure(write))
320 }
321 Err(mpsc::error::TrySendError::Closed(WriteCommand::Write { .. })) => {
322 self.record_send(COMMAND, "shutdown", started);
323 self.record_backpressure(COMMAND, "closed");
324 Err(WriteError::Shutdown)
325 }
326 Err(_) => unreachable!("sent a Write command"),
327 }
328 }
329
330 pub async fn flush(&self, flush_storage: bool) -> WriteResult<WriteHandle> {
337 const COMMAND: &str = "flush";
338 self.record_queue_depth();
339 let started = Instant::now();
340 let (tx, rx) = oneshot::channel();
341 let send_result = self.write_tx.try_send(WriteCommand::Flush {
342 epoch_tx: tx,
343 flush_storage,
344 });
345 match send_result {
346 Ok(()) => {
347 self.record_send(COMMAND, "ok", started);
348 Ok(WriteHandle::new(rx, self.watchers.clone()))
349 }
350 Err(mpsc::error::TrySendError::Full(_)) => {
351 self.record_send(COMMAND, "backpressure", started);
352 self.record_backpressure(COMMAND, "full");
353 Err(WriteError::Backpressure(()))
354 }
355 Err(mpsc::error::TrySendError::Closed(_)) => {
356 self.record_send(COMMAND, "shutdown", started);
357 self.record_backpressure(COMMAND, "closed");
358 Err(WriteError::Shutdown)
359 }
360 }
361 }
362
363 pub fn view(&self) -> Arc<View<D>> {
364 self.view.current()
365 }
366
367 pub fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
368 self.view.subscribe()
369 }
370}
371
372impl<D: Delta> Clone for WriteCoordinatorHandle<D> {
373 fn clone(&self) -> Self {
374 Self {
375 name: self.name.clone(),
376 write_tx: self.write_tx.clone(),
377 watchers: self.watchers.clone(),
378 view: self.view.clone(),
379 }
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use tokio::sync::watch;
387
388 fn create_watchers(
389 applied: watch::Receiver<u64>,
390 flushed: watch::Receiver<u64>,
391 durable: watch::Receiver<u64>,
392 ) -> EpochWatcher {
393 EpochWatcher {
394 applied_rx: applied,
395 written_rx: flushed,
396 durable_rx: durable,
397 }
398 }
399
400 #[tokio::test]
401 async fn should_return_epoch_when_assigned() {
402 let (tx, rx) = oneshot::channel();
404 let (_applied_tx, applied_rx) = watch::channel(0u64);
405 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
406 let (_durable_tx, durable_rx) = watch::channel(0u64);
407 let handle: WriteHandle<()> =
408 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
409
410 tx.send(Ok(WriteApplied {
412 epoch: 42,
413 result: (),
414 }))
415 .unwrap();
416 let result = handle.epoch().await;
417
418 assert!(result.is_ok());
420 assert_eq!(result.unwrap(), 42);
421 }
422
423 #[tokio::test]
424 async fn should_allow_multiple_epoch_calls() {
425 let (tx, rx) = oneshot::channel();
427 let (_applied_tx, applied_rx) = watch::channel(0u64);
428 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
429 let (_durable_tx, durable_rx) = watch::channel(0u64);
430 let handle: WriteHandle<()> =
431 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
432 tx.send(Ok(WriteApplied {
433 epoch: 42,
434 result: (),
435 }))
436 .unwrap();
437
438 let result1 = handle.epoch().await;
440 let result2 = handle.epoch().await;
441 let result3 = handle.epoch().await;
442
443 assert_eq!(result1.unwrap(), 42);
445 assert_eq!(result2.unwrap(), 42);
446 assert_eq!(result3.unwrap(), 42);
447 }
448
449 #[tokio::test]
450 async fn should_return_apply_result_from_wait() {
451 let (tx, rx) = oneshot::channel();
453 let (_applied_tx, applied_rx) = watch::channel(100u64);
454 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
455 let (_durable_tx, durable_rx) = watch::channel(0u64);
456 let mut handle: WriteHandle<String> =
457 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
458
459 tx.send(Ok(WriteApplied {
461 epoch: 1,
462 result: "hello".to_string(),
463 }))
464 .unwrap();
465
466 assert_eq!(handle.wait(Durability::Applied).await.unwrap(), "hello");
468 }
469
470 #[tokio::test]
471 async fn should_return_immediately_when_watermark_already_reached() {
472 let (tx, rx) = oneshot::channel();
474 let (_applied_tx, applied_rx) = watch::channel(100u64); let (_flushed_tx, flushed_rx) = watch::channel(0u64);
476 let (_durable_tx, durable_rx) = watch::channel(0u64);
477 let mut handle: WriteHandle<()> =
478 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
479 tx.send(Ok(WriteApplied {
480 epoch: 50,
481 result: (),
482 }))
483 .unwrap(); let result = handle.wait(Durability::Applied).await;
487
488 assert!(result.is_ok());
490 }
491
492 #[tokio::test]
493 async fn should_wait_until_watermark_reaches_epoch() {
494 let (tx, rx) = oneshot::channel();
496 let (applied_tx, applied_rx) = watch::channel(0u64);
497 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
498 let (_durable_tx, durable_rx) = watch::channel(0u64);
499 let mut handle: WriteHandle<()> =
500 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
501 tx.send(Ok(WriteApplied {
502 epoch: 10,
503 result: (),
504 }))
505 .unwrap();
506
507 let wait_task = tokio::spawn(async move { handle.wait(Durability::Applied).await });
509
510 tokio::task::yield_now().await;
511 applied_tx.send(5).unwrap(); tokio::task::yield_now().await;
513 applied_tx.send(10).unwrap(); let result = wait_task.await.unwrap();
516
517 assert!(result.is_ok());
519 }
520
521 #[tokio::test]
522 async fn should_wait_for_correct_durability_level() {
523 let (tx, rx) = oneshot::channel();
525 let (_applied_tx, applied_rx) = watch::channel(100u64);
526 let (_flushed_tx, flushed_rx) = watch::channel(50u64);
527 let (durable_tx, durable_rx) = watch::channel(10u64);
528 let mut handle: WriteHandle<()> =
529 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
530 tx.send(Ok(WriteApplied {
531 epoch: 25,
532 result: (),
533 }))
534 .unwrap();
535
536 let wait_task = tokio::spawn(async move { handle.wait(Durability::Durable).await });
538
539 tokio::task::yield_now().await;
540 durable_tx.send(25).unwrap(); let result = wait_task.await.unwrap();
543
544 assert!(result.is_ok());
546 }
547
548 #[tokio::test]
549 async fn should_propagate_epoch_error_in_wait() {
550 let (tx, rx) = oneshot::channel::<EpochResult<()>>();
552 let (_applied_tx, applied_rx) = watch::channel(0u64);
553 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
554 let (_durable_tx, durable_rx) = watch::channel(0u64);
555 let mut handle = WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
556
557 drop(tx);
559 let result = handle.wait(Durability::Applied).await;
560
561 assert!(matches!(result, Err(WriteError::Shutdown)));
563 }
564
565 #[tokio::test]
566 async fn should_propagate_apply_error_in_wait() {
567 let (tx, rx) = oneshot::channel();
569 let (_applied_tx, applied_rx) = watch::channel(0u64);
570 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
571 let (_durable_tx, durable_rx) = watch::channel(0u64);
572 let mut handle: WriteHandle<()> =
573 WriteHandle::new(rx, create_watchers(applied_rx, flushed_rx, durable_rx));
574
575 tx.send(Err(WriteFailed {
577 epoch: 1,
578 error: "apply error".into(),
579 }))
580 .unwrap();
581 let result = handle.wait(Durability::Applied).await;
582
583 assert!(
585 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
586 );
587 }
588
589 #[tokio::test]
590 async fn epoch_watcher_should_resolve_when_watermark_reached() {
591 let (applied_tx, applied_rx) = watch::channel(0u64);
593 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
594 let (_durable_tx, durable_rx) = watch::channel(0u64);
595 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
596
597 let wait_task = tokio::spawn(async move { watcher.wait(5, Durability::Applied).await });
599 tokio::task::yield_now().await;
600 applied_tx.send(5).unwrap();
601
602 assert!(wait_task.await.unwrap().is_ok());
604 }
605
606 #[tokio::test]
607 async fn epoch_watcher_should_resolve_immediately_when_already_reached() {
608 let (_applied_tx, applied_rx) = watch::channel(10u64);
610 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
611 let (_durable_tx, durable_rx) = watch::channel(0u64);
612 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
613
614 assert!(watcher.wait(5, Durability::Applied).await.is_ok());
616 }
617
618 #[tokio::test]
619 async fn epoch_watcher_should_select_correct_durability_receiver() {
620 let (_applied_tx, applied_rx) = watch::channel(0u64);
622 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
623 let (durable_tx, durable_rx) = watch::channel(0u64);
624 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
625
626 let wait_task = tokio::spawn(async move { watcher.wait(3, Durability::Durable).await });
628 tokio::task::yield_now().await;
629 durable_tx.send(3).unwrap();
630
631 assert!(wait_task.await.unwrap().is_ok());
633 }
634
635 #[tokio::test]
636 async fn epoch_watcher_should_return_error_on_sender_drop() {
637 let (applied_tx, applied_rx) = watch::channel(0u64);
639 let (_flushed_tx, flushed_rx) = watch::channel(0u64);
640 let (_durable_tx, durable_rx) = watch::channel(0u64);
641 let mut watcher = create_watchers(applied_rx, flushed_rx, durable_rx);
642
643 drop(applied_tx);
645 let result = watcher.wait(1, Durability::Applied).await;
646
647 assert!(result.is_err());
649 }
650}