1#![allow(unused)]
2
3mod error;
4mod handle;
5pub mod subscriber;
6mod traits;
7
8use std::collections::HashMap;
9use std::ops::Range;
10use std::ops::{Deref, DerefMut};
11
12pub use error::{WriteError, WriteResult};
13use futures::stream::{self, SelectAll, StreamExt};
14pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
15pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
16pub use traits::{Delta, Durability, EpochStamped, Flusher};
17
18enum FlushEvent<D: Delta> {
20 FlushDelta { frozen: EpochStamped<D::Frozen> },
22 FlushStorage,
24}
25
26use crate::StorageRead;
28use crate::storage::StorageSnapshot;
29use async_trait::async_trait;
30pub use handle::EpochWatcher;
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::{broadcast, mpsc, oneshot, watch};
34use tokio::time::{Instant, Interval, interval_at};
35use tokio_util::sync::CancellationToken;
36
37#[derive(Debug, Clone)]
39pub struct WriteCoordinatorConfig {
40 pub queue_capacity: usize,
42 pub flush_interval: Duration,
44 pub flush_size_threshold: usize,
46}
47
48impl Default for WriteCoordinatorConfig {
49 fn default() -> Self {
50 Self {
51 queue_capacity: 10_000,
52 flush_interval: Duration::from_secs(10),
53 flush_size_threshold: 64 * 1024 * 1024, }
55 }
56}
57
58pub(crate) enum WriteCommand<D: Delta> {
59 Write {
60 write: D::Write,
61 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
62 },
63 Flush {
64 epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
65 flush_storage: bool,
66 },
67}
68
69pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
74 handles: HashMap<String, WriteCoordinatorHandle<D>>,
75 pause_handles: HashMap<String, PauseHandle>,
76 stop_tok: CancellationToken,
77 tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
78 write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
79 view: Arc<BroadcastedView<D>>,
80}
81
82impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
83 pub fn new(
84 config: WriteCoordinatorConfig,
85 channels: Vec<impl ToString>,
86 initial_context: D::Context,
87 initial_snapshot: Arc<dyn StorageSnapshot>,
88 flusher: F,
89 ) -> WriteCoordinator<D, F> {
90 let (watermarks, watcher) = EpochWatermarks::new();
91 let watermarks = Arc::new(watermarks);
92
93 let mut write_rxs = Vec::with_capacity(channels.len());
95 let mut write_txs = HashMap::new();
96 let mut pause_handles = HashMap::new();
97 for name in &channels {
98 let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
99 write_rxs.push(write_rx);
100 write_txs.insert(name.to_string(), write_tx);
101 pause_handles.insert(name.to_string(), pause_hdl);
102 }
103
104 let (flush_tx, flush_rx) = mpsc::channel(2);
110
111 let flush_stop_tok = CancellationToken::new();
112 let stop_tok = CancellationToken::new();
113 let write_task = WriteCoordinatorTask::new(
114 config,
115 initial_context,
116 initial_snapshot,
117 write_rxs,
118 flush_tx,
119 watermarks.clone(),
120 stop_tok.clone(),
121 flush_stop_tok.clone(),
122 );
123
124 let view = write_task.view.clone();
125
126 let mut handles = HashMap::new();
127 for name in channels {
128 let write_tx = write_txs.remove(&name.to_string()).expect("unreachable");
129 handles.insert(
130 name.to_string(),
131 WriteCoordinatorHandle::new(write_tx, watcher.clone(), view.clone()),
132 );
133 }
134
135 let flush_task = FlushTask {
136 flusher,
137 stop_tok: flush_stop_tok,
138 flush_rx,
139 watermarks: watermarks.clone(),
140 view: view.clone(),
141 last_flushed_epoch: 0,
142 };
143
144 Self {
145 handles,
146 pause_handles,
147 tasks: Some((write_task, flush_task)),
148 write_task_jh: None,
149 stop_tok,
150 view,
151 }
152 }
153
154 pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
155 self.handles
156 .get(name)
157 .expect("unknown channel name")
158 .clone()
159 }
160
161 pub fn pause_handle(&self, name: &str) -> PauseHandle {
162 self.pause_handles
163 .get(name)
164 .expect("unknown channel name")
165 .clone()
166 }
167
168 pub fn start(&mut self) {
169 let Some((write_task, flush_task)) = self.tasks.take() else {
170 return;
172 };
173 let flush_task_jh = flush_task.run();
174 let write_task_jh = write_task.run(flush_task_jh);
175 self.write_task_jh = Some(write_task_jh);
176 }
177
178 pub async fn stop(mut self) -> Result<(), String> {
179 let Some(write_task_jh) = self.write_task_jh.take() else {
180 return Ok(());
181 };
182 self.stop_tok.cancel();
183 write_task_jh.await.map_err(|e| e.to_string())?
184 }
185
186 pub fn view(&self) -> Arc<View<D>> {
187 self.view.current()
188 }
189
190 pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
191 let (view_rx, initial_view) = self.view.subscribe();
192 ViewSubscriber::new(view_rx, initial_view)
193 }
194}
195
196struct WriteCoordinatorTask<D: Delta> {
197 config: WriteCoordinatorConfig,
198 delta: CurrentDelta<D>,
199 flush_tx: mpsc::Sender<FlushEvent<D>>,
200 write_rxs: Vec<PausableReceiver<D>>,
201 watermarks: Arc<EpochWatermarks>,
202 view: Arc<BroadcastedView<D>>,
203 epoch: u64,
204 delta_start_epoch: u64,
205 flush_interval: Interval,
206 stop_tok: CancellationToken,
207 flush_stop_tok: CancellationToken,
208}
209
210impl<D: Delta> WriteCoordinatorTask<D> {
211 #[allow(clippy::too_many_arguments)]
215 pub fn new(
216 config: WriteCoordinatorConfig,
217 initial_context: D::Context,
218 initial_snapshot: Arc<dyn StorageSnapshot>,
219 write_rxs: Vec<PausableReceiver<D>>,
220 flush_tx: mpsc::Sender<FlushEvent<D>>,
221 watermarks: Arc<EpochWatermarks>,
222 stop_tok: CancellationToken,
223 flush_stop_tok: CancellationToken,
224 ) -> Self {
225 let delta = D::init(initial_context);
226
227 let initial_view = View {
228 current: delta.reader(),
229 frozen: vec![],
230 snapshot: initial_snapshot,
231 last_written_delta: None,
232 };
233 let initial_view = Arc::new(BroadcastedView::new(initial_view));
234
235 let flush_interval = interval_at(
236 Instant::now() + config.flush_interval,
237 config.flush_interval,
238 );
239 Self {
240 config,
241 delta: CurrentDelta::new(delta),
242 write_rxs,
243 flush_tx,
244 watermarks,
245 view: initial_view,
246 epoch: 1,
250 delta_start_epoch: 1,
251 flush_interval,
252 stop_tok,
253 flush_stop_tok,
254 }
255 }
256
257 pub fn run(
259 mut self,
260 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
261 ) -> tokio::task::JoinHandle<Result<(), String>> {
262 tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
263 }
264
265 async fn run_coordinator(
266 mut self,
267 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
268 ) -> Result<(), String> {
269 self.flush_interval.reset();
271
272 let mut write_stream: SelectAll<_> = SelectAll::new();
274 for rx in self.write_rxs.drain(..) {
275 write_stream.push(
276 stream::unfold(
277 rx,
278 |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
279 )
280 .boxed(),
281 );
282 }
283
284 loop {
285 tokio::select! {
286 cmd = write_stream.next() => {
287 match cmd {
288 Some(WriteCommand::Write { write, result_tx }) => {
289 self.handle_write(write, result_tx).await?;
290 }
291 Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
292 let _ = epoch_tx.send(Ok(handle::WriteApplied {
294 epoch: self.epoch.saturating_sub(1),
295 result: (),
296 }));
297 self.handle_flush(flush_storage).await;
298 }
299 None => {
300 break;
302 }
303 }
304 }
305
306 _ = self.flush_interval.tick() => {
307 self.handle_flush(false).await;
308 }
309
310 _ = self.stop_tok.cancelled() => {
311 break;
312 }
313 }
314 }
315
316 self.handle_flush(false).await;
318
319 self.flush_stop_tok.cancel();
321 flush_task_jh
323 .await
324 .map_err(|e| format!("flush task panicked: {}", e))?
325 .map_err(|e| format!("flush task error: {}", e))
326 }
327
328 async fn handle_write(
329 &mut self,
330 write: D::Write,
331 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
332 ) -> Result<(), String> {
333 let write_epoch = self.epoch;
334 self.epoch += 1;
335
336 let result = self.delta.apply(write);
337 let _ = result_tx.send(
339 result
340 .map(|apply_result| handle::WriteApplied {
341 epoch: write_epoch,
342 result: apply_result,
343 })
344 .map_err(|e| handle::WriteFailed {
345 epoch: write_epoch,
346 error: e,
347 }),
348 );
349
350 self.watermarks.update_applied(write_epoch);
352
353 if self.delta.estimate_size() >= self.config.flush_size_threshold {
354 self.handle_flush(false).await;
355 }
356
357 Ok(())
358 }
359
360 async fn handle_flush(&mut self, flush_storage: bool) {
361 self.flush_if_delta_has_writes().await;
362 if flush_storage {
363 let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
364 }
365 }
366
367 async fn flush_if_delta_has_writes(&mut self) {
368 if self.epoch == self.delta_start_epoch {
369 return;
370 }
371
372 self.flush_interval.reset();
373
374 let epoch_range = self.delta_start_epoch..self.epoch;
375 self.delta_start_epoch = self.epoch;
376 let (frozen, frozen_reader) = self.delta.freeze_and_init();
377 let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
378 let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
379 let reader = self.delta.reader();
380 self.view.update_delta_frozen(stamped_frozen_reader, reader);
383 let _ = self
386 .flush_tx
387 .send(FlushEvent::FlushDelta {
388 frozen: stamped_frozen,
389 })
390 .await;
391 }
392}
393
394struct FlushTask<D: Delta, F: Flusher<D>> {
395 flusher: F,
396 stop_tok: CancellationToken,
397 flush_rx: mpsc::Receiver<FlushEvent<D>>,
398 watermarks: Arc<EpochWatermarks>,
399 view: Arc<BroadcastedView<D>>,
400 last_flushed_epoch: u64,
401}
402
403impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
404 fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
405 tokio::spawn(async move {
406 loop {
407 tokio::select! {
408 event = self.flush_rx.recv() => {
409 let Some(event) = event else {
410 break;
411 };
412 self.handle_event(event).await?;
413 }
414 _ = self.stop_tok.cancelled() => {
415 break;
416 }
417 }
418 }
419 while let Ok(event) = self.flush_rx.try_recv() {
421 self.handle_event(event).await;
422 }
423 Ok(())
424 })
425 }
426
427 async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
428 match event {
429 FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
430 FlushEvent::FlushStorage => {
431 self.flusher
432 .flush_storage()
433 .await
434 .map_err(|e| WriteError::FlushError(e.to_string()))?;
435 self.watermarks.update_durable(self.last_flushed_epoch);
436 Ok(())
437 }
438 }
439 }
440
441 async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
442 let delta = frozen.val;
443 let epoch_range = frozen.epoch_range;
444 let snapshot = self
445 .flusher
446 .flush_delta(delta, &epoch_range)
447 .await
448 .map_err(|e| WriteError::FlushError(e.to_string()))?;
449 self.last_flushed_epoch = epoch_range.end - 1;
450 self.watermarks.update_written(self.last_flushed_epoch);
451 self.view.update_flush_finished(snapshot, epoch_range);
452 Ok(())
453 }
454}
455
456struct CurrentDelta<D: Delta> {
457 delta: Option<D>,
458}
459
460impl<D: Delta> Deref for CurrentDelta<D> {
461 type Target = D;
462
463 fn deref(&self) -> &Self::Target {
464 match &self.delta {
465 Some(d) => d,
466 None => panic!("current delta not initialized"),
467 }
468 }
469}
470
471impl<D: Delta> DerefMut for CurrentDelta<D> {
472 fn deref_mut(&mut self) -> &mut Self::Target {
473 match &mut self.delta {
474 Some(d) => d,
475 None => panic!("current delta not initialized"),
476 }
477 }
478}
479
480impl<D: Delta> CurrentDelta<D> {
481 fn new(delta: D) -> Self {
482 Self { delta: Some(delta) }
483 }
484
485 fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
486 let Some(delta) = self.delta.take() else {
487 panic!("delta not initialized");
488 };
489 let (frozen, frozen_reader, context) = delta.freeze();
490 let new_delta = D::init(context);
491 self.delta = Some(new_delta);
492 (frozen, frozen_reader)
493 }
494}
495
496pub struct EpochWatermarks {
497 applied_tx: tokio::sync::watch::Sender<u64>,
498 written_tx: tokio::sync::watch::Sender<u64>,
499 durable_tx: tokio::sync::watch::Sender<u64>,
500}
501
502impl EpochWatermarks {
503 pub fn new() -> (Self, EpochWatcher) {
504 let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
505 let (written_tx, written_rx) = tokio::sync::watch::channel(0);
506 let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
507 let watcher = EpochWatcher {
508 applied_rx,
509 written_rx,
510 durable_rx,
511 };
512 let watermarks = EpochWatermarks {
513 applied_tx,
514 written_tx,
515 durable_tx,
516 };
517 (watermarks, watcher)
518 }
519
520 pub fn update_applied(&self, epoch: u64) {
521 let _ = self.applied_tx.send(epoch);
522 }
523
524 pub fn update_written(&self, epoch: u64) {
525 let _ = self.written_tx.send(epoch);
526 }
527
528 pub fn update_durable(&self, epoch: u64) {
529 let _ = self.durable_tx.send(epoch);
530 }
531}
532
533pub(crate) struct BroadcastedView<D: Delta> {
534 inner: Mutex<BroadcastedViewInner<D>>,
535}
536
537impl<D: Delta> BroadcastedView<D> {
538 fn new(initial_view: View<D>) -> Self {
539 let (view_tx, _) = broadcast::channel(16);
540 Self {
541 inner: Mutex::new(BroadcastedViewInner {
542 view: Arc::new(initial_view),
543 view_tx,
544 }),
545 }
546 }
547
548 fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
549 self.inner
550 .lock()
551 .expect("lock poisoned")
552 .update_flush_finished(snapshot, epoch_range);
553 }
554
555 fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
556 self.inner
557 .lock()
558 .expect("lock poisoned")
559 .update_delta_frozen(frozen, reader);
560 }
561
562 fn current(&self) -> Arc<View<D>> {
563 self.inner.lock().expect("lock poisoned").current()
564 }
565
566 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
567 self.inner.lock().expect("lock poisoned").subscribe()
568 }
569}
570
571struct BroadcastedViewInner<D: Delta> {
572 view: Arc<View<D>>,
573 view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
574}
575
576impl<D: Delta> BroadcastedViewInner<D> {
577 fn update_flush_finished(
578 &mut self,
579 snapshot: Arc<dyn StorageSnapshot>,
580 epoch_range: Range<u64>,
581 ) {
582 let mut new_frozen = self.view.frozen.clone();
583 let last = new_frozen
584 .pop()
585 .expect("frozen should not be empty when flush completes");
586 assert_eq!(last.epoch_range, epoch_range);
587 self.view = Arc::new(View {
588 current: self.view.current.clone(),
589 frozen: new_frozen,
590 snapshot,
591 last_written_delta: Some(last),
592 });
593 self.view_tx.send(self.view.clone());
594 }
595
596 fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
597 let mut new_frozen = vec![frozen];
599 new_frozen.extend(self.view.frozen.iter().cloned());
600 self.view = Arc::new(View {
601 current: reader,
602 frozen: new_frozen,
603 snapshot: self.view.snapshot.clone(),
604 last_written_delta: self.view.last_written_delta.clone(),
605 });
606 self.view_tx.send(self.view.clone());
607 }
608
609 fn current(&self) -> Arc<View<D>> {
610 self.view.clone()
611 }
612
613 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
614 (self.view_tx.subscribe(), self.view.clone())
615 }
616}
617
618struct PausableReceiver<D: Delta> {
619 pause_rx: Option<watch::Receiver<bool>>,
620 rx: mpsc::Receiver<WriteCommand<D>>,
621}
622
623impl<D: Delta> PausableReceiver<D> {
624 async fn recv(&mut self) -> Option<WriteCommand<D>> {
625 if let Some(pause_rx) = self.pause_rx.as_mut() {
626 pause_rx.wait_for(|v| !*v).await;
627 }
628 self.rx.recv().await
629 }
630}
631
632#[derive(Clone)]
634pub struct PauseHandle {
635 pause_tx: tokio::sync::watch::Sender<bool>,
636}
637
638impl PauseHandle {
639 pub fn pause(&self) {
640 self.pause_tx.send_replace(true);
641 }
642
643 pub fn unpause(&self) {
644 self.pause_tx.send_replace(false);
645 }
646}
647
648fn pausable_channel<D: Delta>(
649 capacity: usize,
650) -> (
651 mpsc::Sender<WriteCommand<D>>,
652 PausableReceiver<D>,
653 PauseHandle,
654) {
655 let (pause_tx, pause_rx) = watch::channel(false);
656 let (tx, rx) = mpsc::channel(capacity);
657 (
658 tx,
659 PausableReceiver {
660 pause_rx: Some(pause_rx),
661 rx,
662 },
663 PauseHandle { pause_tx },
664 )
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670 use crate::BytesRange;
671 use crate::coordinator::Durability;
672 use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
673 use crate::storage::{PutRecordOp, Record, StorageSnapshot};
674 use crate::{Storage, StorageRead};
675 use async_trait::async_trait;
676 use bytes::Bytes;
677 use std::collections::{HashMap, HashSet};
678 use std::ops::Range;
679 use std::sync::Mutex;
680 #[derive(Clone, Debug)]
685 struct TestWrite {
686 key: String,
687 value: u64,
688 size: usize,
689 }
690
691 #[derive(Clone, Debug, Default)]
693 struct TestContext {
694 next_seq: u64,
695 error: Option<String>,
696 }
697
698 #[derive(Clone, Debug, Default)]
700 struct TestDeltaReader {
701 data: Arc<Mutex<HashMap<String, u64>>>,
702 }
703
704 impl TestDeltaReader {
705 fn get(&self, key: &str) -> Option<u64> {
706 self.data.lock().unwrap().get(key).copied()
707 }
708 }
709
710 #[derive(Debug)]
713 struct TestDelta {
714 context: TestContext,
715 writes: HashMap<String, (u64, u64)>,
716 key_values: Arc<Mutex<HashMap<String, u64>>>,
717 total_size: usize,
718 }
719
720 #[derive(Clone, Debug)]
721 struct FrozenTestDelta {
722 writes: HashMap<String, (u64, u64)>,
723 }
724
725 impl std::fmt::Debug for View<TestDelta> {
726 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727 f.write_str("View<TestDelta>")
728 }
729 }
730
731 impl Delta for TestDelta {
732 type Context = TestContext;
733 type Write = TestWrite;
734 type DeltaView = TestDeltaReader;
735 type Frozen = FrozenTestDelta;
736 type FrozenView = Arc<HashMap<String, u64>>;
737 type ApplyResult = ();
738
739 fn init(context: Self::Context) -> Self {
740 Self {
741 context,
742 writes: HashMap::default(),
743 key_values: Arc::new(Mutex::new(HashMap::default())),
744 total_size: 0,
745 }
746 }
747
748 fn apply(&mut self, write: Self::Write) -> Result<(), String> {
749 if let Some(error) = &self.context.error {
750 return Err(error.clone());
751 }
752
753 let seq = self.context.next_seq;
754 self.context.next_seq += 1;
755
756 self.writes.insert(write.key.clone(), (seq, write.value));
757 self.total_size += write.size;
758 self.key_values
759 .lock()
760 .unwrap()
761 .insert(write.key, write.value);
762 Ok(())
763 }
764
765 fn estimate_size(&self) -> usize {
766 self.total_size
767 }
768
769 fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
770 let frozen = FrozenTestDelta {
771 writes: self.writes,
772 };
773 let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
774 (frozen, frozen_view, self.context)
775 }
776
777 fn reader(&self) -> Self::DeltaView {
778 TestDeltaReader {
779 data: self.key_values.clone(),
780 }
781 }
782 }
783
784 #[derive(Default)]
786 struct TestFlusherState {
787 flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
788 flush_started_tx: Option<oneshot::Sender<()>>,
790 unblock_rx: Option<mpsc::Receiver<()>>,
792 }
793
794 #[derive(Clone)]
795 struct TestFlusher {
796 state: Arc<Mutex<TestFlusherState>>,
797 storage: Arc<InMemoryStorage>,
798 }
799
800 impl Default for TestFlusher {
801 fn default() -> Self {
802 Self {
803 state: Arc::new(Mutex::new(TestFlusherState::default())),
804 storage: Arc::new(InMemoryStorage::new()),
805 }
806 }
807 }
808
809 impl TestFlusher {
810 fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
813 let (started_tx, started_rx) = oneshot::channel();
814 let (unblock_tx, unblock_rx) = mpsc::channel(1);
815 let flusher = Self {
816 state: Arc::new(Mutex::new(TestFlusherState {
817 flushed_events: Vec::new(),
818 flush_started_tx: Some(started_tx),
819 unblock_rx: Some(unblock_rx),
820 })),
821 storage: Arc::new(InMemoryStorage::new()),
822 };
823 (flusher, started_rx, unblock_tx)
824 }
825
826 fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
827 self.state.lock().unwrap().flushed_events.clone()
828 }
829
830 async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
831 self.storage.snapshot().await.unwrap()
832 }
833 }
834
835 #[async_trait]
836 impl Flusher<TestDelta> for TestFlusher {
837 async fn flush_delta(
838 &mut self,
839 frozen: FrozenTestDelta,
840 epoch_range: &Range<u64>,
841 ) -> Result<Arc<dyn StorageSnapshot>, String> {
842 let flush_started_tx = {
844 let mut state = self.state.lock().unwrap();
845 state.flush_started_tx.take()
846 };
847 if let Some(tx) = flush_started_tx {
848 let _ = tx.send(());
849 }
850
851 let unblock_rx = {
853 let mut state = self.state.lock().unwrap();
854 state.unblock_rx.take()
855 };
856 if let Some(mut rx) = unblock_rx {
857 rx.recv().await;
858 }
859
860 let records: Vec<PutRecordOp> = frozen
862 .writes
863 .iter()
864 .map(|(key, (seq, value))| {
865 let mut buf = Vec::with_capacity(16);
866 buf.extend_from_slice(&seq.to_le_bytes());
867 buf.extend_from_slice(&value.to_le_bytes());
868 Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
869 })
870 .collect();
871 self.storage
872 .put(records)
873 .await
874 .map_err(|e| format!("{}", e))?;
875
876 {
878 let mut state = self.state.lock().unwrap();
879 state
880 .flushed_events
881 .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
882 }
883
884 self.storage.snapshot().await.map_err(|e| format!("{}", e))
885 }
886
887 async fn flush_storage(&self) -> Result<(), String> {
888 let flush_started_tx = {
890 let mut state = self.state.lock().unwrap();
891 state.flush_started_tx.take()
892 };
893 if let Some(tx) = flush_started_tx {
894 let _ = tx.send(());
895 }
896
897 let unblock_rx = {
899 let mut state = self.state.lock().unwrap();
900 state.unblock_rx.take()
901 };
902 if let Some(mut rx) = unblock_rx {
903 rx.recv().await;
904 }
905
906 Ok(())
907 }
908 }
909
910 fn test_config() -> WriteCoordinatorConfig {
911 WriteCoordinatorConfig {
912 queue_capacity: 100,
913 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX,
915 }
916 }
917
918 async fn assert_snapshot_has_rows(
919 snapshot: &Arc<dyn StorageSnapshot>,
920 expected: &[(&str, u64, u64)],
921 ) {
922 let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
923 assert_eq!(
924 records.len(),
925 expected.len(),
926 "expected {} rows but snapshot has {}",
927 expected.len(),
928 records.len()
929 );
930 let mut actual: Vec<(String, u64, u64)> = records
931 .iter()
932 .map(|r| {
933 let key = String::from_utf8(r.key.to_vec()).unwrap();
934 let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
935 let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
936 (key, seq, value)
937 })
938 .collect();
939 actual.sort_by(|a, b| a.0.cmp(&b.0));
940 let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
941 expected.sort_by(|a, b| a.0.cmp(b.0));
942 for (actual, expected) in actual.iter().zip(expected.iter()) {
943 assert_eq!(
944 actual.0, expected.0,
945 "key mismatch: got {:?}, expected {:?}",
946 actual.0, expected.0
947 );
948 assert_eq!(
949 actual.1, expected.1,
950 "seq mismatch for key {:?}: got {}, expected {}",
951 actual.0, actual.1, expected.1
952 );
953 assert_eq!(
954 actual.2, expected.2,
955 "value mismatch for key {:?}: got {}, expected {}",
956 actual.0, actual.2, expected.2
957 );
958 }
959 }
960
961 #[tokio::test]
966 async fn should_assign_monotonic_epochs() {
967 let flusher = TestFlusher::default();
969 let mut coordinator = WriteCoordinator::new(
970 test_config(),
971 vec!["default".to_string()],
972 TestContext::default(),
973 flusher.initial_snapshot().await,
974 flusher,
975 );
976 let handle = coordinator.handle("default");
977 coordinator.start();
978
979 let write1 = handle
981 .try_write(TestWrite {
982 key: "a".into(),
983 value: 1,
984 size: 10,
985 })
986 .await
987 .unwrap();
988 let write2 = handle
989 .try_write(TestWrite {
990 key: "b".into(),
991 value: 2,
992 size: 10,
993 })
994 .await
995 .unwrap();
996 let write3 = handle
997 .try_write(TestWrite {
998 key: "c".into(),
999 value: 3,
1000 size: 10,
1001 })
1002 .await
1003 .unwrap();
1004
1005 let epoch1 = write1.epoch().await.unwrap();
1006 let epoch2 = write2.epoch().await.unwrap();
1007 let epoch3 = write3.epoch().await.unwrap();
1008
1009 assert!(epoch1 < epoch2);
1011 assert!(epoch2 < epoch3);
1012
1013 coordinator.stop().await;
1015 }
1016
1017 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1018 async fn should_apply_writes_in_order() {
1019 let flusher = TestFlusher::default();
1021 let mut coordinator = WriteCoordinator::new(
1022 test_config(),
1023 vec!["default".to_string()],
1024 TestContext::default(),
1025 flusher.initial_snapshot().await,
1026 flusher.clone(),
1027 );
1028 let handle = coordinator.handle("default");
1029 coordinator.start();
1030
1031 handle
1033 .try_write(TestWrite {
1034 key: "a".into(),
1035 value: 1,
1036 size: 10,
1037 })
1038 .await
1039 .unwrap();
1040 handle
1041 .try_write(TestWrite {
1042 key: "a".into(),
1043 value: 2,
1044 size: 10,
1045 })
1046 .await
1047 .unwrap();
1048 let mut last_write = handle
1049 .try_write(TestWrite {
1050 key: "a".into(),
1051 value: 3,
1052 size: 10,
1053 })
1054 .await
1055 .unwrap();
1056
1057 handle.flush(false).await.unwrap();
1058 last_write.wait(Durability::Written).await.unwrap();
1060
1061 let events = flusher.flushed_events();
1063 assert_eq!(events.len(), 1);
1064 let frozen_delta = &events[0];
1065 let delta = &frozen_delta.val;
1066 let (seq, value) = delta.writes.get("a").unwrap();
1068 assert_eq!(*value, 3);
1069 assert_eq!(*seq, 2);
1070
1071 coordinator.stop().await;
1073 }
1074
1075 #[tokio::test]
1076 async fn should_update_applied_watermark_after_each_write() {
1077 let flusher = TestFlusher::default();
1079 let mut coordinator = WriteCoordinator::new(
1080 test_config(),
1081 vec!["default".to_string()],
1082 TestContext::default(),
1083 flusher.initial_snapshot().await,
1084 flusher,
1085 );
1086 let handle = coordinator.handle("default");
1087 coordinator.start();
1088
1089 let mut write_handle = handle
1091 .try_write(TestWrite {
1092 key: "a".into(),
1093 value: 1,
1094 size: 10,
1095 })
1096 .await
1097 .unwrap();
1098
1099 let result = write_handle.wait(Durability::Applied).await;
1101 assert!(result.is_ok());
1102
1103 coordinator.stop().await;
1105 }
1106
1107 #[tokio::test]
1108 async fn should_propagate_apply_error_to_handle() {
1109 let flusher = TestFlusher::default();
1111 let context = TestContext {
1112 error: Some("apply error".to_string()),
1113 ..Default::default()
1114 };
1115 let mut coordinator = WriteCoordinator::new(
1116 test_config(),
1117 vec!["default".to_string()],
1118 context,
1119 flusher.initial_snapshot().await,
1120 flusher,
1121 );
1122 let handle = coordinator.handle("default");
1123 coordinator.start();
1124
1125 let write = handle
1127 .try_write(TestWrite {
1128 key: "a".into(),
1129 value: 1,
1130 size: 10,
1131 })
1132 .await
1133 .unwrap();
1134
1135 let result = write.epoch().await;
1136
1137 assert!(
1139 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1140 );
1141
1142 coordinator.stop().await;
1144 }
1145
1146 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1151 async fn should_flush_on_command() {
1152 let flusher = TestFlusher::default();
1154 let mut coordinator = WriteCoordinator::new(
1155 test_config(),
1156 vec!["default".to_string()],
1157 TestContext::default(),
1158 flusher.initial_snapshot().await,
1159 flusher.clone(),
1160 );
1161 let handle = coordinator.handle("default");
1162 coordinator.start();
1163
1164 let mut write = handle
1166 .try_write(TestWrite {
1167 key: "a".into(),
1168 value: 1,
1169 size: 10,
1170 })
1171 .await
1172 .unwrap();
1173 handle.flush(false).await.unwrap();
1174 write.wait(Durability::Written).await.unwrap();
1175
1176 assert_eq!(flusher.flushed_events().len(), 1);
1178
1179 coordinator.stop().await;
1181 }
1182
1183 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1184 async fn should_wait_on_flush_handle() {
1185 let flusher = TestFlusher::default();
1187 let mut coordinator = WriteCoordinator::new(
1188 test_config(),
1189 vec!["default".to_string()],
1190 TestContext::default(),
1191 flusher.initial_snapshot().await,
1192 flusher.clone(),
1193 );
1194 let handle = coordinator.handle("default");
1195 coordinator.start();
1196
1197 handle
1199 .try_write(TestWrite {
1200 key: "a".into(),
1201 value: 1,
1202 size: 10,
1203 })
1204 .await
1205 .unwrap();
1206 let mut flush_handle = handle.flush(false).await.unwrap();
1207
1208 flush_handle.wait(Durability::Written).await.unwrap();
1210 assert_eq!(flusher.flushed_events().len(), 1);
1211
1212 coordinator.stop().await;
1214 }
1215
1216 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1217 async fn should_return_correct_epoch_from_flush_handle() {
1218 let flusher = TestFlusher::default();
1220 let mut coordinator = WriteCoordinator::new(
1221 test_config(),
1222 vec!["default".to_string()],
1223 TestContext::default(),
1224 flusher.initial_snapshot().await,
1225 flusher,
1226 );
1227 let handle = coordinator.handle("default");
1228 coordinator.start();
1229
1230 let write1 = handle
1232 .try_write(TestWrite {
1233 key: "a".into(),
1234 value: 1,
1235 size: 10,
1236 })
1237 .await
1238 .unwrap();
1239 let write2 = handle
1240 .try_write(TestWrite {
1241 key: "b".into(),
1242 value: 2,
1243 size: 10,
1244 })
1245 .await
1246 .unwrap();
1247 let flush_handle = handle.flush(false).await.unwrap();
1248
1249 let flush_epoch = flush_handle.epoch().await.unwrap();
1251 let write2_epoch = write2.epoch().await.unwrap();
1252 assert_eq!(flush_epoch, write2_epoch);
1253
1254 coordinator.stop().await;
1256 }
1257
1258 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1259 async fn should_include_all_pending_writes_in_flush() {
1260 let flusher = TestFlusher::default();
1262 let mut coordinator = WriteCoordinator::new(
1263 test_config(),
1264 vec!["default".to_string()],
1265 TestContext::default(),
1266 flusher.initial_snapshot().await,
1267 flusher.clone(),
1268 );
1269 let handle = coordinator.handle("default");
1270 coordinator.start();
1271
1272 handle
1274 .try_write(TestWrite {
1275 key: "a".into(),
1276 value: 1,
1277 size: 10,
1278 })
1279 .await
1280 .unwrap();
1281 handle
1282 .try_write(TestWrite {
1283 key: "b".into(),
1284 value: 2,
1285 size: 10,
1286 })
1287 .await
1288 .unwrap();
1289 let mut last_write = handle
1290 .try_write(TestWrite {
1291 key: "c".into(),
1292 value: 3,
1293 size: 10,
1294 })
1295 .await
1296 .unwrap();
1297
1298 handle.flush(false).await.unwrap();
1299 last_write.wait(Durability::Written).await.unwrap();
1300
1301 let events = flusher.flushed_events();
1303 assert_eq!(events.len(), 1);
1304 let frozen_delta = &events[0];
1305 assert_eq!(frozen_delta.val.writes.len(), 3);
1306 let snapshot = flusher.storage.snapshot().await.unwrap();
1307 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1308
1309 coordinator.stop().await;
1311 }
1312
1313 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1314 async fn should_skip_flush_when_no_new_writes() {
1315 let flusher = TestFlusher::default();
1317 let mut coordinator = WriteCoordinator::new(
1318 test_config(),
1319 vec!["default".to_string()],
1320 TestContext::default(),
1321 flusher.initial_snapshot().await,
1322 flusher.clone(),
1323 );
1324 let handle = coordinator.handle("default");
1325 coordinator.start();
1326
1327 let mut write = handle
1329 .try_write(TestWrite {
1330 key: "a".into(),
1331 value: 1,
1332 size: 10,
1333 })
1334 .await
1335 .unwrap();
1336 handle.flush(false).await.unwrap();
1337 write.wait(Durability::Written).await.unwrap();
1338
1339 handle.flush(false).await.unwrap();
1341
1342 let sync_write = handle
1345 .try_write(TestWrite {
1346 key: "sync".into(),
1347 value: 0,
1348 size: 1,
1349 })
1350 .await
1351 .unwrap();
1352 sync_write.epoch().await.unwrap();
1353
1354 assert_eq!(flusher.flushed_events().len(), 1);
1356
1357 coordinator.stop().await;
1359 }
1360
1361 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1362 async fn should_update_written_watermark_after_flush() {
1363 let flusher = TestFlusher::default();
1365 let mut coordinator = WriteCoordinator::new(
1366 test_config(),
1367 vec!["default".to_string()],
1368 TestContext::default(),
1369 flusher.initial_snapshot().await,
1370 flusher,
1371 );
1372 let handle = coordinator.handle("default");
1373 coordinator.start();
1374
1375 let mut write_handle = handle
1377 .try_write(TestWrite {
1378 key: "a".into(),
1379 value: 1,
1380 size: 10,
1381 })
1382 .await
1383 .unwrap();
1384
1385 handle.flush(false).await.unwrap();
1386
1387 let result = write_handle.wait(Durability::Written).await;
1389 assert!(result.is_ok());
1390
1391 coordinator.stop().await;
1393 }
1394
1395 #[tokio::test(start_paused = true)]
1400 async fn should_flush_on_flush_interval() {
1401 let flusher = TestFlusher::default();
1403 let config = WriteCoordinatorConfig {
1404 queue_capacity: 100,
1405 flush_interval: Duration::from_millis(100),
1406 flush_size_threshold: usize::MAX,
1407 };
1408 let mut coordinator = WriteCoordinator::new(
1409 config,
1410 vec!["default".to_string()],
1411 TestContext::default(),
1412 flusher.initial_snapshot().await,
1413 flusher.clone(),
1414 );
1415 let handle = coordinator.handle("default");
1416 coordinator.start();
1417
1418 tokio::task::yield_now().await;
1420 let mut write = handle
1421 .try_write(TestWrite {
1422 key: "a".into(),
1423 value: 1,
1424 size: 10,
1425 })
1426 .await
1427 .unwrap();
1428 write.wait(Durability::Applied).await.unwrap();
1429
1430 assert_eq!(flusher.flushed_events().len(), 0);
1432
1433 tokio::time::advance(Duration::from_millis(150)).await;
1435 tokio::task::yield_now().await;
1436
1437 assert_eq!(flusher.flushed_events().len(), 1);
1439 let snapshot = flusher.storage.snapshot().await.unwrap();
1440 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1441
1442 coordinator.stop().await;
1444 }
1445
1446 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1451 async fn should_flush_when_size_threshold_exceeded() {
1452 let flusher = TestFlusher::default();
1454 let config = WriteCoordinatorConfig {
1455 queue_capacity: 100,
1456 flush_interval: Duration::from_secs(3600),
1457 flush_size_threshold: 100, };
1459 let mut coordinator = WriteCoordinator::new(
1460 config,
1461 vec!["default".to_string()],
1462 TestContext::default(),
1463 flusher.initial_snapshot().await,
1464 flusher.clone(),
1465 );
1466 let handle = coordinator.handle("default");
1467 coordinator.start();
1468
1469 let mut write = handle
1471 .try_write(TestWrite {
1472 key: "a".into(),
1473 value: 1,
1474 size: 150,
1475 })
1476 .await
1477 .unwrap();
1478 write.wait(Durability::Written).await.unwrap();
1479
1480 assert_eq!(flusher.flushed_events().len(), 1);
1482
1483 coordinator.stop().await;
1485 }
1486
1487 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1488 async fn should_accumulate_until_threshold() {
1489 let flusher = TestFlusher::default();
1491 let config = WriteCoordinatorConfig {
1492 queue_capacity: 100,
1493 flush_interval: Duration::from_secs(3600),
1494 flush_size_threshold: 100,
1495 };
1496 let mut coordinator = WriteCoordinator::new(
1497 config,
1498 vec!["default".to_string()],
1499 TestContext::default(),
1500 flusher.initial_snapshot().await,
1501 flusher.clone(),
1502 );
1503 let handle = coordinator.handle("default");
1504 coordinator.start();
1505
1506 for i in 0..5 {
1508 let mut w = handle
1509 .try_write(TestWrite {
1510 key: format!("key{}", i),
1511 value: i,
1512 size: 15,
1513 })
1514 .await
1515 .unwrap();
1516 w.wait(Durability::Applied).await.unwrap();
1517 }
1518
1519 assert_eq!(flusher.flushed_events().len(), 0);
1521
1522 let mut final_write = handle
1524 .try_write(TestWrite {
1525 key: "final".into(),
1526 value: 999,
1527 size: 30,
1528 })
1529 .await
1530 .unwrap();
1531 final_write.wait(Durability::Written).await.unwrap();
1532
1533 assert_eq!(flusher.flushed_events().len(), 1);
1535
1536 coordinator.stop().await;
1538 }
1539
1540 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1545 async fn should_accept_writes_during_flush() {
1546 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1548 let mut coordinator = WriteCoordinator::new(
1549 test_config(),
1550 vec!["default".to_string()],
1551 TestContext::default(),
1552 flusher.initial_snapshot().await,
1553 flusher.clone(),
1554 );
1555 let handle = coordinator.handle("default");
1556 coordinator.start();
1557
1558 let write1 = handle
1560 .try_write(TestWrite {
1561 key: "a".into(),
1562 value: 1,
1563 size: 10,
1564 })
1565 .await
1566 .unwrap();
1567 handle.flush(false).await.unwrap();
1568 flush_started_rx.await.unwrap(); let write2 = handle
1572 .try_write(TestWrite {
1573 key: "b".into(),
1574 value: 2,
1575 size: 10,
1576 })
1577 .await
1578 .unwrap();
1579 assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1580
1581 unblock_tx.send(()).await.unwrap();
1583 coordinator.stop().await;
1584 }
1585
1586 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1587 async fn should_assign_new_epochs_during_flush() {
1588 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1590 let mut coordinator = WriteCoordinator::new(
1591 test_config(),
1592 vec!["default".to_string()],
1593 TestContext::default(),
1594 flusher.initial_snapshot().await,
1595 flusher.clone(),
1596 );
1597 let handle = coordinator.handle("default");
1598 coordinator.start();
1599
1600 handle
1602 .try_write(TestWrite {
1603 key: "a".into(),
1604 value: 1,
1605 size: 10,
1606 })
1607 .await
1608 .unwrap();
1609 handle.flush(false).await.unwrap();
1610 flush_started_rx.await.unwrap(); let w1 = handle
1614 .try_write(TestWrite {
1615 key: "b".into(),
1616 value: 2,
1617 size: 10,
1618 })
1619 .await
1620 .unwrap();
1621 let w2 = handle
1622 .try_write(TestWrite {
1623 key: "c".into(),
1624 value: 3,
1625 size: 10,
1626 })
1627 .await
1628 .unwrap();
1629
1630 let e1 = w1.epoch().await.unwrap();
1632 let e2 = w2.epoch().await.unwrap();
1633 assert!(e1 < e2);
1634
1635 unblock_tx.send(()).await.unwrap();
1637 coordinator.stop().await;
1638 }
1639
1640 #[tokio::test]
1645 async fn should_return_backpressure_when_queue_full() {
1646 let flusher = TestFlusher::default();
1648 let config = WriteCoordinatorConfig {
1649 queue_capacity: 2,
1650 flush_interval: Duration::from_secs(3600),
1651 flush_size_threshold: usize::MAX,
1652 };
1653 let mut coordinator = WriteCoordinator::new(
1654 config,
1655 vec!["default".to_string()],
1656 TestContext::default(),
1657 flusher.initial_snapshot().await,
1658 flusher.clone(),
1659 );
1660 let handle = coordinator.handle("default");
1661 let _ = handle
1665 .try_write(TestWrite {
1666 key: "a".into(),
1667 value: 1,
1668 size: 10,
1669 })
1670 .await;
1671 let _ = handle
1672 .try_write(TestWrite {
1673 key: "b".into(),
1674 value: 2,
1675 size: 10,
1676 })
1677 .await;
1678
1679 let result = handle
1681 .try_write(TestWrite {
1682 key: "c".into(),
1683 value: 3,
1684 size: 10,
1685 })
1686 .await;
1687
1688 assert!(matches!(result, Err(WriteError::Backpressure(_))));
1690 }
1691
1692 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1693 async fn should_accept_writes_after_queue_drains() {
1694 let flusher = TestFlusher::default();
1696 let config = WriteCoordinatorConfig {
1697 queue_capacity: 2,
1698 flush_interval: Duration::from_secs(3600),
1699 flush_size_threshold: usize::MAX,
1700 };
1701 let mut coordinator = WriteCoordinator::new(
1702 config,
1703 vec!["default".to_string()],
1704 TestContext::default(),
1705 flusher.initial_snapshot().await,
1706 flusher.clone(),
1707 );
1708 let handle = coordinator.handle("default");
1709
1710 let _ = handle
1712 .try_write(TestWrite {
1713 key: "a".into(),
1714 value: 1,
1715 size: 10,
1716 })
1717 .await;
1718 let mut write_b = handle
1719 .try_write(TestWrite {
1720 key: "b".into(),
1721 value: 2,
1722 size: 10,
1723 })
1724 .await
1725 .unwrap();
1726
1727 coordinator.start();
1729 write_b.wait(Durability::Applied).await.unwrap();
1730
1731 let result = handle
1733 .try_write(TestWrite {
1734 key: "c".into(),
1735 value: 3,
1736 size: 10,
1737 })
1738 .await;
1739 assert!(result.is_ok());
1740
1741 coordinator.stop().await;
1743 }
1744
1745 #[tokio::test]
1750 async fn should_shutdown_cleanly_when_stop_called() {
1751 let flusher = TestFlusher::default();
1753 let mut coordinator = WriteCoordinator::new(
1754 test_config(),
1755 vec!["default".to_string()],
1756 TestContext::default(),
1757 flusher.initial_snapshot().await,
1758 flusher.clone(),
1759 );
1760 let handle = coordinator.handle("default");
1761 coordinator.start();
1762
1763 let result = coordinator.stop().await;
1765
1766 assert!(result.is_ok());
1768 }
1769
1770 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1771 async fn should_flush_pending_writes_on_shutdown() {
1772 let flusher = TestFlusher::default();
1774 let config = WriteCoordinatorConfig {
1775 queue_capacity: 100,
1776 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX, };
1779 let mut coordinator = WriteCoordinator::new(
1780 config,
1781 vec!["default".to_string()],
1782 TestContext::default(),
1783 flusher.initial_snapshot().await,
1784 flusher.clone(),
1785 );
1786 let handle = coordinator.handle("default");
1787 coordinator.start();
1788
1789 let write = handle
1791 .try_write(TestWrite {
1792 key: "a".into(),
1793 value: 1,
1794 size: 10,
1795 })
1796 .await
1797 .unwrap();
1798 let epoch = write.epoch().await.unwrap();
1799
1800 coordinator.stop().await;
1802
1803 let events = flusher.flushed_events();
1805 assert_eq!(events.len(), 1);
1806 let epoch_range = &events[0].epoch_range;
1807 assert!(epoch_range.contains(&epoch));
1808 }
1809
1810 #[tokio::test]
1811 async fn should_return_shutdown_error_after_coordinator_stops() {
1812 let flusher = TestFlusher::default();
1814 let mut coordinator = WriteCoordinator::new(
1815 test_config(),
1816 vec!["default".to_string()],
1817 TestContext::default(),
1818 flusher.initial_snapshot().await,
1819 flusher.clone(),
1820 );
1821 let handle = coordinator.handle("default");
1822 coordinator.start();
1823
1824 coordinator.stop().await;
1826
1827 let result = handle
1829 .try_write(TestWrite {
1830 key: "a".into(),
1831 value: 1,
1832 size: 10,
1833 })
1834 .await;
1835
1836 assert!(matches!(result, Err(WriteError::Shutdown)));
1838 }
1839
1840 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1845 async fn should_track_epoch_range_in_flush_event() {
1846 let flusher = TestFlusher::default();
1848 let mut coordinator = WriteCoordinator::new(
1849 test_config(),
1850 vec!["default".to_string()],
1851 TestContext::default(),
1852 flusher.initial_snapshot().await,
1853 flusher.clone(),
1854 );
1855 let handle = coordinator.handle("default");
1856 coordinator.start();
1857
1858 handle
1860 .try_write(TestWrite {
1861 key: "a".into(),
1862 value: 1,
1863 size: 10,
1864 })
1865 .await
1866 .unwrap();
1867 handle
1868 .try_write(TestWrite {
1869 key: "b".into(),
1870 value: 2,
1871 size: 10,
1872 })
1873 .await
1874 .unwrap();
1875 let mut last_write = handle
1876 .try_write(TestWrite {
1877 key: "c".into(),
1878 value: 3,
1879 size: 10,
1880 })
1881 .await
1882 .unwrap();
1883
1884 handle.flush(false).await.unwrap();
1885 last_write.wait(Durability::Written).await.unwrap();
1886
1887 let events = flusher.flushed_events();
1889 assert_eq!(events.len(), 1);
1890 let epoch_range = &events[0].epoch_range;
1891 assert_eq!(epoch_range.start, 1);
1892 assert_eq!(epoch_range.end, 4); coordinator.stop().await;
1896 }
1897
1898 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1899 async fn should_have_contiguous_epoch_ranges() {
1900 let flusher = TestFlusher::default();
1902 let mut coordinator = WriteCoordinator::new(
1903 test_config(),
1904 vec!["default".to_string()],
1905 TestContext::default(),
1906 flusher.initial_snapshot().await,
1907 flusher.clone(),
1908 );
1909 let handle = coordinator.handle("default");
1910 coordinator.start();
1911
1912 handle
1914 .try_write(TestWrite {
1915 key: "a".into(),
1916 value: 1,
1917 size: 10,
1918 })
1919 .await
1920 .unwrap();
1921 let mut write2 = handle
1922 .try_write(TestWrite {
1923 key: "b".into(),
1924 value: 2,
1925 size: 10,
1926 })
1927 .await
1928 .unwrap();
1929 handle.flush(false).await.unwrap();
1930 write2.wait(Durability::Written).await.unwrap();
1931
1932 let mut write3 = handle
1934 .try_write(TestWrite {
1935 key: "c".into(),
1936 value: 3,
1937 size: 10,
1938 })
1939 .await
1940 .unwrap();
1941 handle.flush(false).await.unwrap();
1942 write3.wait(Durability::Written).await.unwrap();
1943
1944 let events = flusher.flushed_events();
1946 assert_eq!(events.len(), 2);
1947
1948 let range1 = &events[0].epoch_range;
1949 let range2 = &events[1].epoch_range;
1950
1951 assert_eq!(range1.end, range2.start);
1953 assert_eq!(range1, &(1..3));
1954 assert_eq!(range2, &(3..4));
1955
1956 coordinator.stop().await;
1958 }
1959
1960 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1961 async fn should_include_exact_epochs_in_range() {
1962 let flusher = TestFlusher::default();
1964 let mut coordinator = WriteCoordinator::new(
1965 test_config(),
1966 vec!["default".to_string()],
1967 TestContext::default(),
1968 flusher.initial_snapshot().await,
1969 flusher.clone(),
1970 );
1971 let handle = coordinator.handle("default");
1972 coordinator.start();
1973
1974 let write1 = handle
1976 .try_write(TestWrite {
1977 key: "a".into(),
1978 value: 1,
1979 size: 10,
1980 })
1981 .await
1982 .unwrap();
1983 let epoch1 = write1.epoch().await.unwrap();
1984
1985 let mut write2 = handle
1986 .try_write(TestWrite {
1987 key: "b".into(),
1988 value: 2,
1989 size: 10,
1990 })
1991 .await
1992 .unwrap();
1993 let epoch2 = write2.epoch().await.unwrap();
1994
1995 handle.flush(false).await.unwrap();
1996 write2.wait(Durability::Written).await.unwrap();
1997
1998 let events = flusher.flushed_events();
2000 assert_eq!(events.len(), 1);
2001 let epoch_range = &events[0].epoch_range;
2002
2003 assert_eq!(epoch_range.start, epoch1);
2005 assert_eq!(epoch_range.end, epoch2 + 1);
2007 assert!(epoch_range.contains(&epoch1));
2009 assert!(epoch_range.contains(&epoch2));
2010
2011 coordinator.stop().await;
2013 }
2014
2015 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2020 async fn should_preserve_context_across_flushes() {
2021 let flusher = TestFlusher::default();
2023 let mut coordinator = WriteCoordinator::new(
2024 test_config(),
2025 vec!["default".to_string()],
2026 TestContext::default(),
2027 flusher.initial_snapshot().await,
2028 flusher.clone(),
2029 );
2030 let handle = coordinator.handle("default");
2031 coordinator.start();
2032
2033 let mut write1 = handle
2035 .try_write(TestWrite {
2036 key: "a".into(),
2037 value: 1,
2038 size: 10,
2039 })
2040 .await
2041 .unwrap();
2042 handle.flush(false).await.unwrap();
2043 write1.wait(Durability::Written).await.unwrap();
2044
2045 let mut write2 = handle
2047 .try_write(TestWrite {
2048 key: "a".into(),
2049 value: 2,
2050 size: 10,
2051 })
2052 .await
2053 .unwrap();
2054 handle.flush(false).await.unwrap();
2055 write2.wait(Durability::Written).await.unwrap();
2056
2057 let events = flusher.flushed_events();
2059 assert_eq!(events.len(), 2);
2060
2061 let (seq1, _) = events[0].val.writes.get("a").unwrap();
2063 assert_eq!(*seq1, 0);
2064
2065 let (seq2, _) = events[1].val.writes.get("a").unwrap();
2067 assert_eq!(*seq2, 1);
2068
2069 coordinator.stop().await;
2071 }
2072
2073 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2078 async fn should_receive_view_on_subscribe() {
2079 let flusher = TestFlusher::default();
2081 let mut coordinator = WriteCoordinator::new(
2082 test_config(),
2083 vec!["default".to_string()],
2084 TestContext::default(),
2085 flusher.initial_snapshot().await,
2086 flusher.clone(),
2087 );
2088 let handle = coordinator.handle("default");
2089 let (mut subscriber, _) = coordinator.subscribe();
2090 subscriber.initialize();
2091 coordinator.start();
2092
2093 handle
2095 .try_write(TestWrite {
2096 key: "a".into(),
2097 value: 1,
2098 size: 10,
2099 })
2100 .await
2101 .unwrap();
2102 handle.flush(false).await.unwrap();
2103
2104 let result = subscriber.recv().await;
2106 assert!(result.is_ok());
2107
2108 coordinator.stop().await;
2110 }
2111
2112 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2113 async fn should_include_snapshot_in_view_after_flush() {
2114 let flusher = TestFlusher::default();
2116 let mut coordinator = WriteCoordinator::new(
2117 test_config(),
2118 vec!["default".to_string()],
2119 TestContext::default(),
2120 flusher.initial_snapshot().await,
2121 flusher.clone(),
2122 );
2123 let handle = coordinator.handle("default");
2124 let (mut subscriber, _) = coordinator.subscribe();
2125 subscriber.initialize();
2126 coordinator.start();
2127
2128 handle
2130 .try_write(TestWrite {
2131 key: "a".into(),
2132 value: 1,
2133 size: 10,
2134 })
2135 .await
2136 .unwrap();
2137 handle.flush(false).await.unwrap();
2138
2139 let _ = subscriber.recv().await.unwrap();
2141 let result = subscriber.recv().await.unwrap();
2143
2144 assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2146
2147 coordinator.stop().await;
2149 }
2150
2151 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2152 async fn should_include_delta_in_view_after_flush() {
2153 let flusher = TestFlusher::default();
2155 let mut coordinator = WriteCoordinator::new(
2156 test_config(),
2157 vec!["default".to_string()],
2158 TestContext::default(),
2159 flusher.initial_snapshot().await,
2160 flusher.clone(),
2161 );
2162 let handle = coordinator.handle("default");
2163 let (mut subscriber, _) = coordinator.subscribe();
2164 subscriber.initialize();
2165 coordinator.start();
2166
2167 handle
2169 .try_write(TestWrite {
2170 key: "a".into(),
2171 value: 42,
2172 size: 10,
2173 })
2174 .await
2175 .unwrap();
2176 handle.flush(false).await.unwrap();
2177
2178 let _ = subscriber.recv().await.unwrap();
2180 let result = subscriber.recv().await.unwrap();
2182
2183 let flushed = result.last_written_delta.as_ref().unwrap();
2185 assert_eq!(flushed.val.get("a"), Some(&42));
2186
2187 coordinator.stop().await;
2189 }
2190
2191 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2192 async fn should_include_epoch_range_in_view_after_flush() {
2193 let flusher = TestFlusher::default();
2195 let mut coordinator = WriteCoordinator::new(
2196 test_config(),
2197 vec!["default".to_string()],
2198 TestContext::default(),
2199 flusher.initial_snapshot().await,
2200 flusher.clone(),
2201 );
2202 let handle = coordinator.handle("default");
2203 let (mut subscriber, _) = coordinator.subscribe();
2204 subscriber.initialize();
2205 coordinator.start();
2206
2207 let write1 = handle
2209 .try_write(TestWrite {
2210 key: "a".into(),
2211 value: 1,
2212 size: 10,
2213 })
2214 .await
2215 .unwrap();
2216 let write2 = handle
2217 .try_write(TestWrite {
2218 key: "b".into(),
2219 value: 2,
2220 size: 10,
2221 })
2222 .await
2223 .unwrap();
2224 handle.flush(false).await.unwrap();
2225
2226 let _ = subscriber.recv().await.unwrap();
2228 let result = subscriber.recv().await.unwrap();
2230
2231 let flushed = result.last_written_delta.as_ref().unwrap();
2233 let epoch1 = write1.epoch().await.unwrap();
2234 let epoch2 = write2.epoch().await.unwrap();
2235 assert!(flushed.epoch_range.contains(&epoch1));
2236 assert!(flushed.epoch_range.contains(&epoch2));
2237 assert_eq!(flushed.epoch_range.start, epoch1);
2238 assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2239
2240 coordinator.stop().await;
2242 }
2243
2244 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2245 async fn should_broadcast_frozen_delta_on_freeze() {
2246 let flusher = TestFlusher::default();
2248 let mut coordinator = WriteCoordinator::new(
2249 test_config(),
2250 vec!["default".to_string()],
2251 TestContext::default(),
2252 flusher.initial_snapshot().await,
2253 flusher.clone(),
2254 );
2255 let handle = coordinator.handle("default");
2256 let (mut subscriber, _) = coordinator.subscribe();
2257 subscriber.initialize();
2258 coordinator.start();
2259
2260 handle
2262 .try_write(TestWrite {
2263 key: "a".into(),
2264 value: 1,
2265 size: 10,
2266 })
2267 .await
2268 .unwrap();
2269 handle.flush(false).await.unwrap();
2270
2271 let state = subscriber.recv().await.unwrap();
2273 assert_eq!(state.frozen.len(), 1);
2274 assert!(state.frozen[0].val.contains_key("a"));
2275
2276 coordinator.stop().await;
2278 }
2279
2280 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2281 async fn should_remove_frozen_delta_after_flush_complete() {
2282 let flusher = TestFlusher::default();
2284 let mut coordinator = WriteCoordinator::new(
2285 test_config(),
2286 vec!["default".to_string()],
2287 TestContext::default(),
2288 flusher.initial_snapshot().await,
2289 flusher.clone(),
2290 );
2291 let handle = coordinator.handle("default");
2292 let (mut subscriber, _) = coordinator.subscribe();
2293 subscriber.initialize();
2294 coordinator.start();
2295
2296 handle
2298 .try_write(TestWrite {
2299 key: "a".into(),
2300 value: 1,
2301 size: 10,
2302 })
2303 .await
2304 .unwrap();
2305 handle.flush(false).await.unwrap();
2306
2307 let state1 = subscriber.recv().await.unwrap();
2309 assert_eq!(state1.frozen.len(), 1);
2310
2311 let state2 = subscriber.recv().await.unwrap();
2313 assert_eq!(state2.frozen.len(), 0);
2314 assert!(state2.last_written_delta.is_some());
2315
2316 coordinator.stop().await;
2318 }
2319
2320 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2321 async fn should_recover_from_message_lost_subscriber() {
2322 let flusher = TestFlusher::default();
2324 let mut coordinator = WriteCoordinator::new(
2325 test_config(),
2326 vec!["default".to_string()],
2327 TestContext::default(),
2328 flusher.initial_snapshot().await,
2329 flusher.clone(),
2330 );
2331 let handle = coordinator.handle("default");
2332 let (mut subscriber, _) = coordinator.subscribe();
2333 subscriber.initialize();
2334 coordinator.start();
2335
2336 for i in 0..20 {
2340 let _write_handle = handle
2341 .try_write(TestWrite {
2342 key: format!("key_{}", i),
2343 value: i as u64,
2344 size: 10,
2345 })
2346 .await
2347 .unwrap();
2348 let _ = handle.flush(false).await.unwrap();
2349 }
2350
2351 let mut watermark = handle
2353 .flush(true)
2354 .await
2355 .expect("flush(true) should succeed");
2356
2357 watermark.wait(Durability::Durable).await;
2359
2360 let result = subscriber
2362 .recv()
2363 .await
2364 .expect_err("expected recv() to yield an error");
2365 assert!(matches!(result, SubscribeError::MessageLost));
2366
2367 let (rx, initial_view) = handle.subscribe();
2369 (subscriber, _) = ViewSubscriber::new(rx, initial_view);
2370 let view = subscriber.initialize();
2371
2372 let records = view.snapshot.scan(BytesRange::unbounded()).await.unwrap();
2375 assert!(
2376 records.len() >= 20,
2377 "expected at least 20 rows, got {}",
2378 records.len()
2379 );
2380
2381 let _write_handle = handle
2383 .try_write(TestWrite {
2384 key: "post_recovery".into(),
2385 value: 100,
2386 size: 10,
2387 })
2388 .await
2389 .unwrap();
2390 let _ = handle.flush(false).await.unwrap();
2391
2392 let result = subscriber.recv().await;
2393 assert!(result.is_ok());
2394
2395 coordinator.stop().await;
2397 }
2398
2399 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2404 async fn should_flush_even_when_no_writes_if_flush_storage() {
2405 let flusher = TestFlusher::default();
2407 let storage = Arc::new(InMemoryStorage::new());
2408 let snapshot = storage.snapshot().await.unwrap();
2409 let mut coordinator = WriteCoordinator::new(
2410 test_config(),
2411 vec!["default".to_string()],
2412 TestContext::default(),
2413 snapshot,
2414 flusher.clone(),
2415 );
2416 let handle = coordinator.handle("default");
2417 coordinator.start();
2418
2419 let mut flush_handle = handle.flush(true).await.unwrap();
2421 flush_handle.wait(Durability::Durable).await.unwrap();
2422
2423 assert_eq!(flusher.flushed_events().len(), 0);
2426
2427 coordinator.stop().await;
2429 }
2430
2431 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2432 async fn should_advance_durable_watermark() {
2433 let flusher = TestFlusher::default();
2435 let storage = Arc::new(InMemoryStorage::new());
2436 let snapshot = storage.snapshot().await.unwrap();
2437 let mut coordinator = WriteCoordinator::new(
2438 test_config(),
2439 vec!["default".to_string()],
2440 TestContext::default(),
2441 snapshot,
2442 flusher.clone(),
2443 );
2444 let handle = coordinator.handle("default");
2445 coordinator.start();
2446
2447 let mut write = handle
2449 .try_write(TestWrite {
2450 key: "a".into(),
2451 value: 1,
2452 size: 10,
2453 })
2454 .await
2455 .unwrap();
2456 let mut flush_handle = handle.flush(true).await.unwrap();
2457
2458 flush_handle.wait(Durability::Durable).await.unwrap();
2460 write.wait(Durability::Durable).await.unwrap();
2461 assert_eq!(flusher.flushed_events().len(), 1);
2462
2463 coordinator.stop().await;
2465 }
2466
2467 #[tokio::test]
2468 async fn should_see_applied_write_via_view() {
2469 let flusher = TestFlusher::default();
2471 let mut coordinator = WriteCoordinator::new(
2472 test_config(),
2473 vec!["default".to_string()],
2474 TestContext::default(),
2475 flusher.initial_snapshot().await,
2476 flusher,
2477 );
2478 let handle = coordinator.handle("default");
2479 coordinator.start();
2480
2481 let mut write = handle
2483 .try_write(TestWrite {
2484 key: "a".into(),
2485 value: 42,
2486 size: 10,
2487 })
2488 .await
2489 .unwrap();
2490 write.wait(Durability::Applied).await.unwrap();
2491
2492 let view = coordinator.view();
2494 assert_eq!(view.current.get("a"), Some(42));
2495
2496 coordinator.stop().await;
2498 }
2499
2500 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2505 async fn should_flush_writes_from_multiple_channels() {
2506 let flusher = TestFlusher::default();
2508 let mut coordinator = WriteCoordinator::new(
2509 test_config(),
2510 vec!["ch1".to_string(), "ch2".to_string()],
2511 TestContext::default(),
2512 flusher.initial_snapshot().await,
2513 flusher.clone(),
2514 );
2515 let ch1 = coordinator.handle("ch1");
2516 let ch2 = coordinator.handle("ch2");
2517 coordinator.start();
2518
2519 let mut w1 = ch1
2522 .try_write(TestWrite {
2523 key: "a".into(),
2524 value: 10,
2525 size: 10,
2526 })
2527 .await
2528 .unwrap();
2529 w1.wait(Durability::Applied).await.unwrap();
2530
2531 let mut w2 = ch2
2532 .try_write(TestWrite {
2533 key: "b".into(),
2534 value: 20,
2535 size: 10,
2536 })
2537 .await
2538 .unwrap();
2539 w2.wait(Durability::Applied).await.unwrap();
2540
2541 let mut w3 = ch1
2542 .try_write(TestWrite {
2543 key: "c".into(),
2544 value: 30,
2545 size: 10,
2546 })
2547 .await
2548 .unwrap();
2549 w3.wait(Durability::Applied).await.unwrap();
2550
2551 ch1.flush(false).await.unwrap();
2552 w3.wait(Durability::Written).await.unwrap();
2553
2554 let snapshot = flusher.storage.snapshot().await.unwrap();
2556 assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2557
2558 coordinator.stop().await;
2560 }
2561
2562 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2563 async fn should_succeed_with_write_timeout_when_queue_has_space() {
2564 let flusher = TestFlusher::default();
2566 let mut coordinator = WriteCoordinator::new(
2567 test_config(),
2568 vec!["default".to_string()],
2569 TestContext::default(),
2570 flusher.initial_snapshot().await,
2571 flusher.clone(),
2572 );
2573 let handle = coordinator.handle("default");
2574 coordinator.start();
2575
2576 let mut wh = handle
2578 .write_timeout(
2579 TestWrite {
2580 key: "a".into(),
2581 value: 1,
2582 size: 10,
2583 },
2584 Duration::from_secs(1),
2585 )
2586 .await
2587 .unwrap();
2588
2589 let result = wh.wait(Durability::Applied).await;
2591 assert!(result.is_ok());
2592
2593 coordinator.stop().await;
2595 }
2596
2597 #[tokio::test]
2598 async fn should_timeout_when_queue_full() {
2599 let flusher = TestFlusher::default();
2601 let config = WriteCoordinatorConfig {
2602 queue_capacity: 2,
2603 flush_interval: Duration::from_secs(3600),
2604 flush_size_threshold: usize::MAX,
2605 };
2606 let mut coordinator = WriteCoordinator::new(
2607 config,
2608 vec!["default".to_string()],
2609 TestContext::default(),
2610 flusher.initial_snapshot().await,
2611 flusher.clone(),
2612 );
2613 let handle = coordinator.handle("default");
2614
2615 let _ = handle
2617 .try_write(TestWrite {
2618 key: "a".into(),
2619 value: 1,
2620 size: 10,
2621 })
2622 .await;
2623 let _ = handle
2624 .try_write(TestWrite {
2625 key: "b".into(),
2626 value: 2,
2627 size: 10,
2628 })
2629 .await;
2630
2631 let result = handle
2633 .write_timeout(
2634 TestWrite {
2635 key: "c".into(),
2636 value: 3,
2637 size: 10,
2638 },
2639 Duration::from_millis(10),
2640 )
2641 .await;
2642
2643 assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2645 }
2646
2647 #[tokio::test]
2648 async fn should_return_write_in_timeout_error() {
2649 let flusher = TestFlusher::default();
2651 let config = WriteCoordinatorConfig {
2652 queue_capacity: 1,
2653 flush_interval: Duration::from_secs(3600),
2654 flush_size_threshold: usize::MAX,
2655 };
2656 let mut coordinator = WriteCoordinator::new(
2657 config,
2658 vec!["default".to_string()],
2659 TestContext::default(),
2660 flusher.initial_snapshot().await,
2661 flusher.clone(),
2662 );
2663 let handle = coordinator.handle("default");
2664
2665 let _ = handle
2667 .try_write(TestWrite {
2668 key: "a".into(),
2669 value: 1,
2670 size: 10,
2671 })
2672 .await;
2673
2674 let result = handle
2676 .write_timeout(
2677 TestWrite {
2678 key: "retry_me".into(),
2679 value: 42,
2680 size: 10,
2681 },
2682 Duration::from_millis(10),
2683 )
2684 .await;
2685 let Err(err) = result else {
2686 panic!("expected TimeoutError");
2687 };
2688
2689 let write = err.into_inner().expect("should contain the write");
2691 assert_eq!(write.key, "retry_me");
2692 assert_eq!(write.value, 42);
2693 }
2694
2695 #[tokio::test]
2696 async fn should_return_write_in_backpressure_error() {
2697 let flusher = TestFlusher::default();
2699 let config = WriteCoordinatorConfig {
2700 queue_capacity: 1,
2701 flush_interval: Duration::from_secs(3600),
2702 flush_size_threshold: usize::MAX,
2703 };
2704 let mut coordinator = WriteCoordinator::new(
2705 config,
2706 vec!["default".to_string()],
2707 TestContext::default(),
2708 flusher.initial_snapshot().await,
2709 flusher.clone(),
2710 );
2711 let handle = coordinator.handle("default");
2712
2713 let _ = handle
2715 .try_write(TestWrite {
2716 key: "a".into(),
2717 value: 1,
2718 size: 10,
2719 })
2720 .await;
2721
2722 let result = handle
2724 .try_write(TestWrite {
2725 key: "retry_me".into(),
2726 value: 42,
2727 size: 10,
2728 })
2729 .await;
2730 let Err(err) = result else {
2731 panic!("expected Backpressure");
2732 };
2733
2734 let write = err.into_inner().expect("should contain the write");
2736 assert_eq!(write.key, "retry_me");
2737 assert_eq!(write.value, 42);
2738 }
2739
2740 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2741 async fn should_succeed_when_queue_drains_within_timeout() {
2742 let flusher = TestFlusher::default();
2744 let config = WriteCoordinatorConfig {
2745 queue_capacity: 2,
2746 flush_interval: Duration::from_secs(3600),
2747 flush_size_threshold: usize::MAX,
2748 };
2749 let mut coordinator = WriteCoordinator::new(
2750 config,
2751 vec!["default".to_string()],
2752 TestContext::default(),
2753 flusher.initial_snapshot().await,
2754 flusher.clone(),
2755 );
2756 let handle = coordinator.handle("default");
2757
2758 let _ = handle
2760 .try_write(TestWrite {
2761 key: "a".into(),
2762 value: 1,
2763 size: 10,
2764 })
2765 .await;
2766 let _ = handle
2767 .try_write(TestWrite {
2768 key: "b".into(),
2769 value: 2,
2770 size: 10,
2771 })
2772 .await;
2773
2774 coordinator.start();
2776 let result = handle
2777 .write_timeout(
2778 TestWrite {
2779 key: "c".into(),
2780 value: 3,
2781 size: 10,
2782 },
2783 Duration::from_secs(5),
2784 )
2785 .await;
2786
2787 assert!(result.is_ok());
2789
2790 coordinator.stop().await;
2792 }
2793
2794 #[tokio::test]
2795 async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2796 let flusher = TestFlusher::default();
2798 let mut coordinator = WriteCoordinator::new(
2799 test_config(),
2800 vec!["default".to_string()],
2801 TestContext::default(),
2802 flusher.initial_snapshot().await,
2803 flusher.clone(),
2804 );
2805 let handle = coordinator.handle("default");
2806 coordinator.start();
2807
2808 coordinator.stop().await;
2810 let result = handle
2811 .write_timeout(
2812 TestWrite {
2813 key: "a".into(),
2814 value: 1,
2815 size: 10,
2816 },
2817 Duration::from_secs(1),
2818 )
2819 .await;
2820
2821 assert!(matches!(result, Err(WriteError::Shutdown)));
2823 }
2824
2825 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2826 async fn should_pause_and_resume_write_channel() {
2827 let flusher = TestFlusher::default();
2829 let mut config = test_config();
2830 config.flush_size_threshold = usize::MAX;
2831 config.flush_interval = Duration::from_hours(24);
2832 let mut coordinator = WriteCoordinator::new(
2833 config,
2834 vec!["a", "b"],
2835 TestContext::default(),
2836 flusher.initial_snapshot().await,
2837 flusher.clone(),
2838 );
2839 let handle_a = coordinator.handle("a");
2840 let handle_b = coordinator.handle("b");
2841 let pause_handle = coordinator.pause_handle("a");
2842
2843 pause_handle.pause();
2852 coordinator.start();
2853
2854 let mut result_a = handle_a
2856 .try_write(TestWrite {
2857 key: "a".into(),
2858 value: 1,
2859 size: 1,
2860 })
2861 .await
2862 .unwrap();
2863 for i in 0..1000 {
2864 handle_b
2865 .try_write(TestWrite {
2866 key: format!("b{}", i),
2867 value: i,
2868 size: 1,
2869 })
2870 .await
2871 .unwrap()
2872 .wait(Durability::Applied)
2873 .await
2874 .unwrap();
2875 }
2876
2877 let data = coordinator.view().current.data.lock().unwrap().clone();
2880 let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2881 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2882 pause_handle.unpause();
2883 result_a.wait(Durability::Applied).await.unwrap();
2885 let data = coordinator.view().current.data.lock().unwrap().clone();
2886 expected.insert("a".into());
2887 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2888 }
2889}