1#![allow(unused)]
2
3mod error;
4mod handle;
5mod traits;
6
7use std::collections::HashMap;
8use std::ops::Range;
9use std::ops::{Deref, DerefMut};
10
11pub use error::{WriteError, WriteResult};
12use futures::stream::{self, SelectAll, StreamExt};
13pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
14pub use traits::{Delta, Durability, Flusher};
15
16enum FlushEvent<D: Delta> {
18 FlushDelta { frozen: EpochStamped<D::Frozen> },
20 FlushStorage,
22}
23
24use crate::StorageRead;
26use crate::coordinator::traits::EpochStamped;
27use crate::storage::StorageSnapshot;
28pub(crate) use handle::EpochWatcher;
29use std::sync::{Arc, Mutex};
30use std::time::Duration;
31use tokio::sync::{broadcast, mpsc, oneshot, watch};
32use tokio::time::{Instant, Interval, interval_at};
33use tokio_util::sync::CancellationToken;
34
35#[derive(Debug, Clone)]
37pub struct WriteCoordinatorConfig {
38 pub queue_capacity: usize,
40 pub flush_interval: Duration,
42 pub flush_size_threshold: usize,
44}
45
46impl Default for WriteCoordinatorConfig {
47 fn default() -> Self {
48 Self {
49 queue_capacity: 10_000,
50 flush_interval: Duration::from_secs(10),
51 flush_size_threshold: 64 * 1024 * 1024, }
53 }
54}
55
56pub(crate) enum WriteCommand<D: Delta> {
57 Write {
58 write: D::Write,
59 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
60 },
61 Flush {
62 epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
63 flush_storage: bool,
64 },
65}
66
67pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
72 handles: HashMap<String, WriteCoordinatorHandle<D>>,
73 stop_tok: CancellationToken,
74 tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
75 write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
76 view: Arc<BroadcastedView<D>>,
77}
78
79impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
80 pub fn new(
81 config: WriteCoordinatorConfig,
82 channels: Vec<String>,
83 initial_context: D::Context,
84 initial_snapshot: Arc<dyn StorageSnapshot>,
85 flusher: F,
86 ) -> WriteCoordinator<D, F> {
87 let (watermarks, watcher) = EpochWatermarks::new();
88 let watermarks = Arc::new(watermarks);
89
90 let mut write_rxs = Vec::with_capacity(channels.len());
92 let mut handles = HashMap::new();
93 for name in channels {
94 let (write_tx, write_rx) = mpsc::channel(config.queue_capacity);
95 write_rxs.push(write_rx);
96 handles.insert(name, WriteCoordinatorHandle::new(write_tx, watcher.clone()));
97 }
98
99 let (flush_tx, flush_rx) = mpsc::channel(2);
105
106 let flush_stop_tok = CancellationToken::new();
107 let stop_tok = CancellationToken::new();
108 let write_task = WriteCoordinatorTask::new(
109 config,
110 initial_context,
111 initial_snapshot,
112 write_rxs,
113 flush_tx,
114 watermarks.clone(),
115 stop_tok.clone(),
116 flush_stop_tok.clone(),
117 );
118
119 let view = write_task.view.clone();
120
121 let flush_task = FlushTask {
122 flusher,
123 stop_tok: flush_stop_tok,
124 flush_rx,
125 watermarks: watermarks.clone(),
126 view: view.clone(),
127 last_flushed_epoch: 0,
128 };
129
130 Self {
131 handles,
132 tasks: Some((write_task, flush_task)),
133 write_task_jh: None,
134 stop_tok,
135 view,
136 }
137 }
138
139 pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
140 self.handles
141 .get(name)
142 .expect("unknown channel name")
143 .clone()
144 }
145
146 pub fn start(&mut self) {
147 let Some((write_task, flush_task)) = self.tasks.take() else {
148 return;
150 };
151 let flush_task_jh = flush_task.run();
152 let write_task_jh = write_task.run(flush_task_jh);
153 self.write_task_jh = Some(write_task_jh);
154 }
155
156 pub async fn stop(mut self) -> Result<(), String> {
157 let Some(write_task_jh) = self.write_task_jh.take() else {
158 return Ok(());
159 };
160 self.stop_tok.cancel();
161 write_task_jh.await.map_err(|e| e.to_string())?
162 }
163
164 pub fn view(&self) -> Arc<View<D>> {
165 self.view.current()
166 }
167
168 pub fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
169 self.view.subscribe()
170 }
171}
172
173struct WriteCoordinatorTask<D: Delta> {
174 config: WriteCoordinatorConfig,
175 delta: CurrentDelta<D>,
176 flush_tx: mpsc::Sender<FlushEvent<D>>,
177 write_rxs: Vec<mpsc::Receiver<WriteCommand<D>>>,
178 watermarks: Arc<EpochWatermarks>,
179 view: Arc<BroadcastedView<D>>,
180 epoch: u64,
181 delta_start_epoch: u64,
182 flush_interval: Interval,
183 stop_tok: CancellationToken,
184 flush_stop_tok: CancellationToken,
185}
186
187impl<D: Delta> WriteCoordinatorTask<D> {
188 #[allow(clippy::too_many_arguments)]
192 pub fn new(
193 config: WriteCoordinatorConfig,
194 initial_context: D::Context,
195 initial_snapshot: Arc<dyn StorageSnapshot>,
196 write_rxs: Vec<mpsc::Receiver<WriteCommand<D>>>,
197 flush_tx: mpsc::Sender<FlushEvent<D>>,
198 watermarks: Arc<EpochWatermarks>,
199 stop_tok: CancellationToken,
200 flush_stop_tok: CancellationToken,
201 ) -> Self {
202 let delta = D::init(initial_context);
203
204 let initial_view = View {
205 current: delta.reader(),
206 frozen: vec![],
207 snapshot: initial_snapshot,
208 last_flushed_delta: None,
209 };
210 let initial_view = Arc::new(BroadcastedView::new(initial_view));
211
212 let flush_interval = interval_at(
213 Instant::now() + config.flush_interval,
214 config.flush_interval,
215 );
216 Self {
217 config,
218 delta: CurrentDelta::new(delta),
219 write_rxs,
220 flush_tx,
221 watermarks,
222 view: initial_view,
223 epoch: 1,
227 delta_start_epoch: 1,
228 flush_interval,
229 stop_tok,
230 flush_stop_tok,
231 }
232 }
233
234 pub fn run(
236 mut self,
237 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
238 ) -> tokio::task::JoinHandle<Result<(), String>> {
239 tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
240 }
241
242 async fn run_coordinator(
243 mut self,
244 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
245 ) -> Result<(), String> {
246 self.flush_interval.reset();
248
249 let mut write_stream: SelectAll<_> = SelectAll::new();
251 for rx in self.write_rxs.drain(..) {
252 write_stream.push(
253 stream::unfold(
254 rx,
255 |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
256 )
257 .boxed(),
258 );
259 }
260
261 loop {
262 tokio::select! {
263 cmd = write_stream.next() => {
264 match cmd {
265 Some(WriteCommand::Write { write, result_tx }) => {
266 self.handle_write(write, result_tx).await?;
267 }
268 Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
269 let _ = epoch_tx.send(Ok(handle::WriteApplied {
271 epoch: self.epoch.saturating_sub(1),
272 result: (),
273 }));
274 self.handle_flush(flush_storage).await;
275 }
276 None => {
277 break;
279 }
280 }
281 }
282
283 _ = self.flush_interval.tick() => {
284 self.handle_flush(false).await;
285 }
286
287 _ = self.stop_tok.cancelled() => {
288 break;
289 }
290 }
291 }
292
293 self.handle_flush(false).await;
295
296 self.flush_stop_tok.cancel();
298 flush_task_jh
300 .await
301 .map_err(|e| format!("flush task panicked: {}", e))?
302 .map_err(|e| format!("flush task error: {}", e))
303 }
304
305 async fn handle_write(
306 &mut self,
307 write: D::Write,
308 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
309 ) -> Result<(), String> {
310 let write_epoch = self.epoch;
311 self.epoch += 1;
312
313 let result = self.delta.apply(write);
314 let _ = result_tx.send(
316 result
317 .map(|apply_result| handle::WriteApplied {
318 epoch: write_epoch,
319 result: apply_result,
320 })
321 .map_err(|e| handle::WriteFailed {
322 epoch: write_epoch,
323 error: e,
324 }),
325 );
326
327 self.watermarks.update_applied(write_epoch);
329
330 if self.delta.estimate_size() >= self.config.flush_size_threshold {
331 self.handle_flush(false).await;
332 }
333
334 Ok(())
335 }
336
337 async fn handle_flush(&mut self, flush_storage: bool) {
338 self.flush_if_delta_has_writes().await;
339 if flush_storage {
340 let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
341 }
342 }
343
344 async fn flush_if_delta_has_writes(&mut self) {
345 if self.epoch == self.delta_start_epoch {
346 return;
347 }
348
349 self.flush_interval.reset();
350
351 let epoch_range = self.delta_start_epoch..self.epoch;
352 self.delta_start_epoch = self.epoch;
353 let (frozen, frozen_reader) = self.delta.freeze_and_init();
354 let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
355 let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
356 let reader = self.delta.reader();
357 self.view.update_delta_frozen(stamped_frozen_reader, reader);
360 let _ = self
363 .flush_tx
364 .send(FlushEvent::FlushDelta {
365 frozen: stamped_frozen,
366 })
367 .await;
368 }
369}
370
371struct FlushTask<D: Delta, F: Flusher<D>> {
372 flusher: F,
373 stop_tok: CancellationToken,
374 flush_rx: mpsc::Receiver<FlushEvent<D>>,
375 watermarks: Arc<EpochWatermarks>,
376 view: Arc<BroadcastedView<D>>,
377 last_flushed_epoch: u64,
378}
379
380impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
381 fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
382 tokio::spawn(async move {
383 loop {
384 tokio::select! {
385 event = self.flush_rx.recv() => {
386 let Some(event) = event else {
387 break;
388 };
389 self.handle_event(event).await?;
390 }
391 _ = self.stop_tok.cancelled() => {
392 break;
393 }
394 }
395 }
396 while let Ok(event) = self.flush_rx.try_recv() {
398 self.handle_event(event).await;
399 }
400 Ok(())
401 })
402 }
403
404 async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
405 match event {
406 FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
407 FlushEvent::FlushStorage => {
408 self.flusher
409 .flush_storage()
410 .await
411 .map_err(|e| WriteError::FlushError(e.to_string()))?;
412 self.watermarks.update_durable(self.last_flushed_epoch);
413 Ok(())
414 }
415 }
416 }
417
418 async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
419 let delta = frozen.val;
420 let epoch_range = frozen.epoch_range;
421 let snapshot = self
422 .flusher
423 .flush_delta(delta, &epoch_range)
424 .await
425 .map_err(|e| WriteError::FlushError(e.to_string()))?;
426 self.last_flushed_epoch = epoch_range.end - 1;
427 self.watermarks.update_flushed(self.last_flushed_epoch);
428 self.view.update_flush_finished(snapshot, epoch_range);
429 Ok(())
430 }
431}
432
433struct CurrentDelta<D: Delta> {
434 delta: Option<D>,
435}
436
437impl<D: Delta> Deref for CurrentDelta<D> {
438 type Target = D;
439
440 fn deref(&self) -> &Self::Target {
441 match &self.delta {
442 Some(d) => d,
443 None => panic!("current delta not initialized"),
444 }
445 }
446}
447
448impl<D: Delta> DerefMut for CurrentDelta<D> {
449 fn deref_mut(&mut self) -> &mut Self::Target {
450 match &mut self.delta {
451 Some(d) => d,
452 None => panic!("current delta not initialized"),
453 }
454 }
455}
456
457impl<D: Delta> CurrentDelta<D> {
458 fn new(delta: D) -> Self {
459 Self { delta: Some(delta) }
460 }
461
462 fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
463 let Some(delta) = self.delta.take() else {
464 panic!("delta not initialized");
465 };
466 let (frozen, frozen_reader, context) = delta.freeze();
467 let new_delta = D::init(context);
468 self.delta = Some(new_delta);
469 (frozen, frozen_reader)
470 }
471}
472
473struct EpochWatermarks {
474 applied_tx: tokio::sync::watch::Sender<u64>,
475 flushed_tx: tokio::sync::watch::Sender<u64>,
476 durable_tx: tokio::sync::watch::Sender<u64>,
477}
478
479impl EpochWatermarks {
480 fn new() -> (Self, EpochWatcher) {
481 let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
482 let (flushed_tx, flushed_rx) = tokio::sync::watch::channel(0);
483 let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
484 let watcher = EpochWatcher {
485 applied_rx,
486 flushed_rx,
487 durable_rx,
488 };
489 let watermarks = EpochWatermarks {
490 applied_tx,
491 flushed_tx,
492 durable_tx,
493 };
494 (watermarks, watcher)
495 }
496
497 fn update_applied(&self, epoch: u64) {
498 let _ = self.applied_tx.send(epoch);
499 }
500
501 fn update_flushed(&self, epoch: u64) {
502 let _ = self.flushed_tx.send(epoch);
503 }
504
505 fn update_durable(&self, epoch: u64) {
506 let _ = self.durable_tx.send(epoch);
507 }
508}
509
510struct BroadcastedView<D: Delta> {
511 inner: Mutex<BroadcastedViewInner<D>>,
512}
513
514impl<D: Delta> BroadcastedView<D> {
515 fn new(initial_view: View<D>) -> Self {
516 let (view_tx, _) = broadcast::channel(16);
517 Self {
518 inner: Mutex::new(BroadcastedViewInner {
519 view: Arc::new(initial_view),
520 view_tx,
521 }),
522 }
523 }
524
525 fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
526 self.inner
527 .lock()
528 .expect("lock poisoned")
529 .update_flush_finished(snapshot, epoch_range);
530 }
531
532 fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
533 self.inner
534 .lock()
535 .expect("lock poisoned")
536 .update_delta_frozen(frozen, reader);
537 }
538
539 fn current(&self) -> Arc<View<D>> {
540 self.inner.lock().expect("lock poisoned").current()
541 }
542
543 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
544 self.inner.lock().expect("lock poisoned").subscribe()
545 }
546}
547
548struct BroadcastedViewInner<D: Delta> {
549 view: Arc<View<D>>,
550 view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
551}
552
553impl<D: Delta> BroadcastedViewInner<D> {
554 fn update_flush_finished(
555 &mut self,
556 snapshot: Arc<dyn StorageSnapshot>,
557 epoch_range: Range<u64>,
558 ) {
559 let mut new_frozen = self.view.frozen.clone();
560 let last = new_frozen
561 .pop()
562 .expect("frozen should not be empty when flush completes");
563 assert_eq!(last.epoch_range, epoch_range);
564 self.view = Arc::new(View {
565 current: self.view.current.clone(),
566 frozen: new_frozen,
567 snapshot,
568 last_flushed_delta: Some(last),
569 });
570 self.view_tx.send(self.view.clone());
571 }
572
573 fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
574 let mut new_frozen = vec![frozen];
576 new_frozen.extend(self.view.frozen.iter().cloned());
577 self.view = Arc::new(View {
578 current: reader,
579 frozen: new_frozen,
580 snapshot: self.view.snapshot.clone(),
581 last_flushed_delta: self.view.last_flushed_delta.clone(),
582 });
583 self.view_tx.send(self.view.clone());
584 }
585
586 fn current(&self) -> Arc<View<D>> {
587 self.view.clone()
588 }
589
590 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
591 (self.view_tx.subscribe(), self.view.clone())
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use crate::BytesRange;
599 use crate::coordinator::Durability;
600 use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
601 use crate::storage::{Record, StorageSnapshot};
602 use crate::{Storage, StorageRead};
603 use async_trait::async_trait;
604 use bytes::Bytes;
605 use std::collections::HashMap;
606 use std::ops::Range;
607 use std::sync::Mutex;
608 #[derive(Clone, Debug)]
613 struct TestWrite {
614 key: String,
615 value: u64,
616 size: usize,
617 }
618
619 #[derive(Clone, Debug, Default)]
621 struct TestContext {
622 next_seq: u64,
623 error: Option<String>,
624 }
625
626 #[derive(Clone, Debug, Default)]
628 struct TestDeltaReader {
629 data: Arc<Mutex<HashMap<String, u64>>>,
630 }
631
632 impl TestDeltaReader {
633 fn get(&self, key: &str) -> Option<u64> {
634 self.data.lock().unwrap().get(key).copied()
635 }
636 }
637
638 #[derive(Debug)]
641 struct TestDelta {
642 context: TestContext,
643 writes: HashMap<String, (u64, u64)>,
644 key_values: Arc<Mutex<HashMap<String, u64>>>,
645 total_size: usize,
646 }
647
648 #[derive(Clone, Debug)]
649 struct FrozenTestDelta {
650 writes: HashMap<String, (u64, u64)>,
651 }
652
653 impl Delta for TestDelta {
654 type Context = TestContext;
655 type Write = TestWrite;
656 type DeltaView = TestDeltaReader;
657 type Frozen = FrozenTestDelta;
658 type FrozenView = Arc<HashMap<String, u64>>;
659 type ApplyResult = ();
660
661 fn init(context: Self::Context) -> Self {
662 Self {
663 context,
664 writes: HashMap::default(),
665 key_values: Arc::new(Mutex::new(HashMap::default())),
666 total_size: 0,
667 }
668 }
669
670 fn apply(&mut self, write: Self::Write) -> Result<(), String> {
671 if let Some(error) = &self.context.error {
672 return Err(error.clone());
673 }
674
675 let seq = self.context.next_seq;
676 self.context.next_seq += 1;
677
678 self.writes.insert(write.key.clone(), (seq, write.value));
679 self.total_size += write.size;
680 self.key_values
681 .lock()
682 .unwrap()
683 .insert(write.key, write.value);
684 Ok(())
685 }
686
687 fn estimate_size(&self) -> usize {
688 self.total_size
689 }
690
691 fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
692 let frozen = FrozenTestDelta {
693 writes: self.writes,
694 };
695 let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
696 (frozen, frozen_view, self.context)
697 }
698
699 fn reader(&self) -> Self::DeltaView {
700 TestDeltaReader {
701 data: self.key_values.clone(),
702 }
703 }
704 }
705
706 #[derive(Default)]
708 struct TestFlusherState {
709 flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
710 flush_started_tx: Option<oneshot::Sender<()>>,
712 unblock_rx: Option<mpsc::Receiver<()>>,
714 }
715
716 #[derive(Clone)]
717 struct TestFlusher {
718 state: Arc<Mutex<TestFlusherState>>,
719 storage: Arc<InMemoryStorage>,
720 }
721
722 impl Default for TestFlusher {
723 fn default() -> Self {
724 Self {
725 state: Arc::new(Mutex::new(TestFlusherState::default())),
726 storage: Arc::new(InMemoryStorage::new()),
727 }
728 }
729 }
730
731 impl TestFlusher {
732 fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
735 let (started_tx, started_rx) = oneshot::channel();
736 let (unblock_tx, unblock_rx) = mpsc::channel(1);
737 let flusher = Self {
738 state: Arc::new(Mutex::new(TestFlusherState {
739 flushed_events: Vec::new(),
740 flush_started_tx: Some(started_tx),
741 unblock_rx: Some(unblock_rx),
742 })),
743 storage: Arc::new(InMemoryStorage::new()),
744 };
745 (flusher, started_rx, unblock_tx)
746 }
747
748 fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
749 self.state.lock().unwrap().flushed_events.clone()
750 }
751
752 async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
753 self.storage.snapshot().await.unwrap()
754 }
755 }
756
757 #[async_trait]
758 impl Flusher<TestDelta> for TestFlusher {
759 async fn flush_delta(
760 &self,
761 frozen: FrozenTestDelta,
762 epoch_range: &Range<u64>,
763 ) -> Result<Arc<dyn StorageSnapshot>, String> {
764 let flush_started_tx = {
766 let mut state = self.state.lock().unwrap();
767 state.flush_started_tx.take()
768 };
769 if let Some(tx) = flush_started_tx {
770 let _ = tx.send(());
771 }
772
773 let unblock_rx = {
775 let mut state = self.state.lock().unwrap();
776 state.unblock_rx.take()
777 };
778 if let Some(mut rx) = unblock_rx {
779 rx.recv().await;
780 }
781
782 let records: Vec<Record> = frozen
784 .writes
785 .iter()
786 .map(|(key, (seq, value))| {
787 let mut buf = Vec::with_capacity(16);
788 buf.extend_from_slice(&seq.to_le_bytes());
789 buf.extend_from_slice(&value.to_le_bytes());
790 Record::new(Bytes::from(key.clone()), Bytes::from(buf))
791 })
792 .collect();
793 self.storage
794 .put(records)
795 .await
796 .map_err(|e| format!("{}", e))?;
797
798 {
800 let mut state = self.state.lock().unwrap();
801 state
802 .flushed_events
803 .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
804 }
805
806 self.storage.snapshot().await.map_err(|e| format!("{}", e))
807 }
808
809 async fn flush_storage(&self) -> Result<(), String> {
810 let flush_started_tx = {
812 let mut state = self.state.lock().unwrap();
813 state.flush_started_tx.take()
814 };
815 if let Some(tx) = flush_started_tx {
816 let _ = tx.send(());
817 }
818
819 let unblock_rx = {
821 let mut state = self.state.lock().unwrap();
822 state.unblock_rx.take()
823 };
824 if let Some(mut rx) = unblock_rx {
825 rx.recv().await;
826 }
827
828 Ok(())
829 }
830 }
831
832 fn test_config() -> WriteCoordinatorConfig {
833 WriteCoordinatorConfig {
834 queue_capacity: 100,
835 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX,
837 }
838 }
839
840 async fn assert_snapshot_has_rows(
841 snapshot: &Arc<dyn StorageSnapshot>,
842 expected: &[(&str, u64, u64)],
843 ) {
844 let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
845 assert_eq!(
846 records.len(),
847 expected.len(),
848 "expected {} rows but snapshot has {}",
849 expected.len(),
850 records.len()
851 );
852 let mut actual: Vec<(String, u64, u64)> = records
853 .iter()
854 .map(|r| {
855 let key = String::from_utf8(r.key.to_vec()).unwrap();
856 let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
857 let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
858 (key, seq, value)
859 })
860 .collect();
861 actual.sort_by(|a, b| a.0.cmp(&b.0));
862 let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
863 expected.sort_by(|a, b| a.0.cmp(b.0));
864 for (actual, expected) in actual.iter().zip(expected.iter()) {
865 assert_eq!(
866 actual.0, expected.0,
867 "key mismatch: got {:?}, expected {:?}",
868 actual.0, expected.0
869 );
870 assert_eq!(
871 actual.1, expected.1,
872 "seq mismatch for key {:?}: got {}, expected {}",
873 actual.0, actual.1, expected.1
874 );
875 assert_eq!(
876 actual.2, expected.2,
877 "value mismatch for key {:?}: got {}, expected {}",
878 actual.0, actual.2, expected.2
879 );
880 }
881 }
882
883 #[tokio::test]
888 async fn should_assign_monotonic_epochs() {
889 let flusher = TestFlusher::default();
891 let mut coordinator = WriteCoordinator::new(
892 test_config(),
893 vec!["default".to_string()],
894 TestContext::default(),
895 flusher.initial_snapshot().await,
896 flusher,
897 );
898 let handle = coordinator.handle("default");
899 coordinator.start();
900
901 let write1 = handle
903 .write(TestWrite {
904 key: "a".into(),
905 value: 1,
906 size: 10,
907 })
908 .await
909 .unwrap();
910 let write2 = handle
911 .write(TestWrite {
912 key: "b".into(),
913 value: 2,
914 size: 10,
915 })
916 .await
917 .unwrap();
918 let write3 = handle
919 .write(TestWrite {
920 key: "c".into(),
921 value: 3,
922 size: 10,
923 })
924 .await
925 .unwrap();
926
927 let epoch1 = write1.epoch().await.unwrap();
928 let epoch2 = write2.epoch().await.unwrap();
929 let epoch3 = write3.epoch().await.unwrap();
930
931 assert!(epoch1 < epoch2);
933 assert!(epoch2 < epoch3);
934
935 coordinator.stop().await;
937 }
938
939 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
940 async fn should_apply_writes_in_order() {
941 let flusher = TestFlusher::default();
943 let mut coordinator = WriteCoordinator::new(
944 test_config(),
945 vec!["default".to_string()],
946 TestContext::default(),
947 flusher.initial_snapshot().await,
948 flusher.clone(),
949 );
950 let handle = coordinator.handle("default");
951 coordinator.start();
952
953 handle
955 .write(TestWrite {
956 key: "a".into(),
957 value: 1,
958 size: 10,
959 })
960 .await
961 .unwrap();
962 handle
963 .write(TestWrite {
964 key: "a".into(),
965 value: 2,
966 size: 10,
967 })
968 .await
969 .unwrap();
970 let mut last_write = handle
971 .write(TestWrite {
972 key: "a".into(),
973 value: 3,
974 size: 10,
975 })
976 .await
977 .unwrap();
978
979 handle.flush(false).await.unwrap();
980 last_write.wait(Durability::Flushed).await.unwrap();
982
983 let events = flusher.flushed_events();
985 assert_eq!(events.len(), 1);
986 let frozen_delta = &events[0];
987 let delta = &frozen_delta.val;
988 let (seq, value) = delta.writes.get("a").unwrap();
990 assert_eq!(*value, 3);
991 assert_eq!(*seq, 2);
992
993 coordinator.stop().await;
995 }
996
997 #[tokio::test]
998 async fn should_update_applied_watermark_after_each_write() {
999 let flusher = TestFlusher::default();
1001 let mut coordinator = WriteCoordinator::new(
1002 test_config(),
1003 vec!["default".to_string()],
1004 TestContext::default(),
1005 flusher.initial_snapshot().await,
1006 flusher,
1007 );
1008 let handle = coordinator.handle("default");
1009 coordinator.start();
1010
1011 let mut write_handle = handle
1013 .write(TestWrite {
1014 key: "a".into(),
1015 value: 1,
1016 size: 10,
1017 })
1018 .await
1019 .unwrap();
1020
1021 let result = write_handle.wait(Durability::Applied).await;
1023 assert!(result.is_ok());
1024
1025 coordinator.stop().await;
1027 }
1028
1029 #[tokio::test]
1030 async fn should_propagate_apply_error_to_handle() {
1031 let flusher = TestFlusher::default();
1033 let context = TestContext {
1034 error: Some("apply error".to_string()),
1035 ..Default::default()
1036 };
1037 let mut coordinator = WriteCoordinator::new(
1038 test_config(),
1039 vec!["default".to_string()],
1040 context,
1041 flusher.initial_snapshot().await,
1042 flusher,
1043 );
1044 let handle = coordinator.handle("default");
1045 coordinator.start();
1046
1047 let write = handle
1049 .write(TestWrite {
1050 key: "a".into(),
1051 value: 1,
1052 size: 10,
1053 })
1054 .await
1055 .unwrap();
1056
1057 let result = write.epoch().await;
1058
1059 assert!(
1061 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1062 );
1063
1064 coordinator.stop().await;
1066 }
1067
1068 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1073 async fn should_flush_on_command() {
1074 let flusher = TestFlusher::default();
1076 let mut coordinator = WriteCoordinator::new(
1077 test_config(),
1078 vec!["default".to_string()],
1079 TestContext::default(),
1080 flusher.initial_snapshot().await,
1081 flusher.clone(),
1082 );
1083 let handle = coordinator.handle("default");
1084 coordinator.start();
1085
1086 let mut write = handle
1088 .write(TestWrite {
1089 key: "a".into(),
1090 value: 1,
1091 size: 10,
1092 })
1093 .await
1094 .unwrap();
1095 handle.flush(false).await.unwrap();
1096 write.wait(Durability::Flushed).await.unwrap();
1097
1098 assert_eq!(flusher.flushed_events().len(), 1);
1100
1101 coordinator.stop().await;
1103 }
1104
1105 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1106 async fn should_wait_on_flush_handle() {
1107 let flusher = TestFlusher::default();
1109 let mut coordinator = WriteCoordinator::new(
1110 test_config(),
1111 vec!["default".to_string()],
1112 TestContext::default(),
1113 flusher.initial_snapshot().await,
1114 flusher.clone(),
1115 );
1116 let handle = coordinator.handle("default");
1117 coordinator.start();
1118
1119 handle
1121 .write(TestWrite {
1122 key: "a".into(),
1123 value: 1,
1124 size: 10,
1125 })
1126 .await
1127 .unwrap();
1128 let mut flush_handle = handle.flush(false).await.unwrap();
1129
1130 flush_handle.wait(Durability::Flushed).await.unwrap();
1132 assert_eq!(flusher.flushed_events().len(), 1);
1133
1134 coordinator.stop().await;
1136 }
1137
1138 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1139 async fn should_return_correct_epoch_from_flush_handle() {
1140 let flusher = TestFlusher::default();
1142 let mut coordinator = WriteCoordinator::new(
1143 test_config(),
1144 vec!["default".to_string()],
1145 TestContext::default(),
1146 flusher.initial_snapshot().await,
1147 flusher,
1148 );
1149 let handle = coordinator.handle("default");
1150 coordinator.start();
1151
1152 let write1 = handle
1154 .write(TestWrite {
1155 key: "a".into(),
1156 value: 1,
1157 size: 10,
1158 })
1159 .await
1160 .unwrap();
1161 let write2 = handle
1162 .write(TestWrite {
1163 key: "b".into(),
1164 value: 2,
1165 size: 10,
1166 })
1167 .await
1168 .unwrap();
1169 let flush_handle = handle.flush(false).await.unwrap();
1170
1171 let flush_epoch = flush_handle.epoch().await.unwrap();
1173 let write2_epoch = write2.epoch().await.unwrap();
1174 assert_eq!(flush_epoch, write2_epoch);
1175
1176 coordinator.stop().await;
1178 }
1179
1180 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1181 async fn should_include_all_pending_writes_in_flush() {
1182 let flusher = TestFlusher::default();
1184 let mut coordinator = WriteCoordinator::new(
1185 test_config(),
1186 vec!["default".to_string()],
1187 TestContext::default(),
1188 flusher.initial_snapshot().await,
1189 flusher.clone(),
1190 );
1191 let handle = coordinator.handle("default");
1192 coordinator.start();
1193
1194 handle
1196 .write(TestWrite {
1197 key: "a".into(),
1198 value: 1,
1199 size: 10,
1200 })
1201 .await
1202 .unwrap();
1203 handle
1204 .write(TestWrite {
1205 key: "b".into(),
1206 value: 2,
1207 size: 10,
1208 })
1209 .await
1210 .unwrap();
1211 let mut last_write = handle
1212 .write(TestWrite {
1213 key: "c".into(),
1214 value: 3,
1215 size: 10,
1216 })
1217 .await
1218 .unwrap();
1219
1220 handle.flush(false).await.unwrap();
1221 last_write.wait(Durability::Flushed).await.unwrap();
1222
1223 let events = flusher.flushed_events();
1225 assert_eq!(events.len(), 1);
1226 let frozen_delta = &events[0];
1227 assert_eq!(frozen_delta.val.writes.len(), 3);
1228 let snapshot = flusher.storage.snapshot().await.unwrap();
1229 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1230
1231 coordinator.stop().await;
1233 }
1234
1235 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1236 async fn should_skip_flush_when_no_new_writes() {
1237 let flusher = TestFlusher::default();
1239 let mut coordinator = WriteCoordinator::new(
1240 test_config(),
1241 vec!["default".to_string()],
1242 TestContext::default(),
1243 flusher.initial_snapshot().await,
1244 flusher.clone(),
1245 );
1246 let handle = coordinator.handle("default");
1247 coordinator.start();
1248
1249 let mut write = handle
1251 .write(TestWrite {
1252 key: "a".into(),
1253 value: 1,
1254 size: 10,
1255 })
1256 .await
1257 .unwrap();
1258 handle.flush(false).await.unwrap();
1259 write.wait(Durability::Flushed).await.unwrap();
1260
1261 handle.flush(false).await.unwrap();
1263
1264 let sync_write = handle
1267 .write(TestWrite {
1268 key: "sync".into(),
1269 value: 0,
1270 size: 1,
1271 })
1272 .await
1273 .unwrap();
1274 sync_write.epoch().await.unwrap();
1275
1276 assert_eq!(flusher.flushed_events().len(), 1);
1278
1279 coordinator.stop().await;
1281 }
1282
1283 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1284 async fn should_update_flushed_watermark_after_flush() {
1285 let flusher = TestFlusher::default();
1287 let mut coordinator = WriteCoordinator::new(
1288 test_config(),
1289 vec!["default".to_string()],
1290 TestContext::default(),
1291 flusher.initial_snapshot().await,
1292 flusher,
1293 );
1294 let handle = coordinator.handle("default");
1295 coordinator.start();
1296
1297 let mut write_handle = handle
1299 .write(TestWrite {
1300 key: "a".into(),
1301 value: 1,
1302 size: 10,
1303 })
1304 .await
1305 .unwrap();
1306
1307 handle.flush(false).await.unwrap();
1308
1309 let result = write_handle.wait(Durability::Flushed).await;
1311 assert!(result.is_ok());
1312
1313 coordinator.stop().await;
1315 }
1316
1317 #[tokio::test(start_paused = true)]
1322 async fn should_flush_on_flush_interval() {
1323 let flusher = TestFlusher::default();
1325 let config = WriteCoordinatorConfig {
1326 queue_capacity: 100,
1327 flush_interval: Duration::from_millis(100),
1328 flush_size_threshold: usize::MAX,
1329 };
1330 let mut coordinator = WriteCoordinator::new(
1331 config,
1332 vec!["default".to_string()],
1333 TestContext::default(),
1334 flusher.initial_snapshot().await,
1335 flusher.clone(),
1336 );
1337 let handle = coordinator.handle("default");
1338 coordinator.start();
1339
1340 tokio::task::yield_now().await;
1342 let mut write = handle
1343 .write(TestWrite {
1344 key: "a".into(),
1345 value: 1,
1346 size: 10,
1347 })
1348 .await
1349 .unwrap();
1350 write.wait(Durability::Applied).await.unwrap();
1351
1352 assert_eq!(flusher.flushed_events().len(), 0);
1354
1355 tokio::time::advance(Duration::from_millis(150)).await;
1357 tokio::task::yield_now().await;
1358
1359 assert_eq!(flusher.flushed_events().len(), 1);
1361 let snapshot = flusher.storage.snapshot().await.unwrap();
1362 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1363
1364 coordinator.stop().await;
1366 }
1367
1368 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1373 async fn should_flush_when_size_threshold_exceeded() {
1374 let flusher = TestFlusher::default();
1376 let config = WriteCoordinatorConfig {
1377 queue_capacity: 100,
1378 flush_interval: Duration::from_secs(3600),
1379 flush_size_threshold: 100, };
1381 let mut coordinator = WriteCoordinator::new(
1382 config,
1383 vec!["default".to_string()],
1384 TestContext::default(),
1385 flusher.initial_snapshot().await,
1386 flusher.clone(),
1387 );
1388 let handle = coordinator.handle("default");
1389 coordinator.start();
1390
1391 let mut write = handle
1393 .write(TestWrite {
1394 key: "a".into(),
1395 value: 1,
1396 size: 150,
1397 })
1398 .await
1399 .unwrap();
1400 write.wait(Durability::Flushed).await.unwrap();
1401
1402 assert_eq!(flusher.flushed_events().len(), 1);
1404
1405 coordinator.stop().await;
1407 }
1408
1409 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1410 async fn should_accumulate_until_threshold() {
1411 let flusher = TestFlusher::default();
1413 let config = WriteCoordinatorConfig {
1414 queue_capacity: 100,
1415 flush_interval: Duration::from_secs(3600),
1416 flush_size_threshold: 100,
1417 };
1418 let mut coordinator = WriteCoordinator::new(
1419 config,
1420 vec!["default".to_string()],
1421 TestContext::default(),
1422 flusher.initial_snapshot().await,
1423 flusher.clone(),
1424 );
1425 let handle = coordinator.handle("default");
1426 coordinator.start();
1427
1428 for i in 0..5 {
1430 let mut w = handle
1431 .write(TestWrite {
1432 key: format!("key{}", i),
1433 value: i,
1434 size: 15,
1435 })
1436 .await
1437 .unwrap();
1438 w.wait(Durability::Applied).await.unwrap();
1439 }
1440
1441 assert_eq!(flusher.flushed_events().len(), 0);
1443
1444 let mut final_write = handle
1446 .write(TestWrite {
1447 key: "final".into(),
1448 value: 999,
1449 size: 30,
1450 })
1451 .await
1452 .unwrap();
1453 final_write.wait(Durability::Flushed).await.unwrap();
1454
1455 assert_eq!(flusher.flushed_events().len(), 1);
1457
1458 coordinator.stop().await;
1460 }
1461
1462 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1467 async fn should_accept_writes_during_flush() {
1468 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1470 let mut coordinator = WriteCoordinator::new(
1471 test_config(),
1472 vec!["default".to_string()],
1473 TestContext::default(),
1474 flusher.initial_snapshot().await,
1475 flusher.clone(),
1476 );
1477 let handle = coordinator.handle("default");
1478 coordinator.start();
1479
1480 let write1 = handle
1482 .write(TestWrite {
1483 key: "a".into(),
1484 value: 1,
1485 size: 10,
1486 })
1487 .await
1488 .unwrap();
1489 handle.flush(false).await.unwrap();
1490 flush_started_rx.await.unwrap(); let write2 = handle
1494 .write(TestWrite {
1495 key: "b".into(),
1496 value: 2,
1497 size: 10,
1498 })
1499 .await
1500 .unwrap();
1501 assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1502
1503 unblock_tx.send(()).await.unwrap();
1505 coordinator.stop().await;
1506 }
1507
1508 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1509 async fn should_assign_new_epochs_during_flush() {
1510 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1512 let mut coordinator = WriteCoordinator::new(
1513 test_config(),
1514 vec!["default".to_string()],
1515 TestContext::default(),
1516 flusher.initial_snapshot().await,
1517 flusher.clone(),
1518 );
1519 let handle = coordinator.handle("default");
1520 coordinator.start();
1521
1522 handle
1524 .write(TestWrite {
1525 key: "a".into(),
1526 value: 1,
1527 size: 10,
1528 })
1529 .await
1530 .unwrap();
1531 handle.flush(false).await.unwrap();
1532 flush_started_rx.await.unwrap(); let w1 = handle
1536 .write(TestWrite {
1537 key: "b".into(),
1538 value: 2,
1539 size: 10,
1540 })
1541 .await
1542 .unwrap();
1543 let w2 = handle
1544 .write(TestWrite {
1545 key: "c".into(),
1546 value: 3,
1547 size: 10,
1548 })
1549 .await
1550 .unwrap();
1551
1552 let e1 = w1.epoch().await.unwrap();
1554 let e2 = w2.epoch().await.unwrap();
1555 assert!(e1 < e2);
1556
1557 unblock_tx.send(()).await.unwrap();
1559 coordinator.stop().await;
1560 }
1561
1562 #[tokio::test]
1567 async fn should_return_backpressure_when_queue_full() {
1568 let flusher = TestFlusher::default();
1570 let config = WriteCoordinatorConfig {
1571 queue_capacity: 2,
1572 flush_interval: Duration::from_secs(3600),
1573 flush_size_threshold: usize::MAX,
1574 };
1575 let mut coordinator = WriteCoordinator::new(
1576 config,
1577 vec!["default".to_string()],
1578 TestContext::default(),
1579 flusher.initial_snapshot().await,
1580 flusher.clone(),
1581 );
1582 let handle = coordinator.handle("default");
1583 let _ = handle
1587 .write(TestWrite {
1588 key: "a".into(),
1589 value: 1,
1590 size: 10,
1591 })
1592 .await;
1593 let _ = handle
1594 .write(TestWrite {
1595 key: "b".into(),
1596 value: 2,
1597 size: 10,
1598 })
1599 .await;
1600
1601 let result = handle
1603 .write(TestWrite {
1604 key: "c".into(),
1605 value: 3,
1606 size: 10,
1607 })
1608 .await;
1609
1610 assert!(matches!(result, Err(WriteError::Backpressure)));
1612 }
1613
1614 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1615 async fn should_accept_writes_after_queue_drains() {
1616 let flusher = TestFlusher::default();
1618 let config = WriteCoordinatorConfig {
1619 queue_capacity: 2,
1620 flush_interval: Duration::from_secs(3600),
1621 flush_size_threshold: usize::MAX,
1622 };
1623 let mut coordinator = WriteCoordinator::new(
1624 config,
1625 vec!["default".to_string()],
1626 TestContext::default(),
1627 flusher.initial_snapshot().await,
1628 flusher.clone(),
1629 );
1630 let handle = coordinator.handle("default");
1631
1632 let _ = handle
1634 .write(TestWrite {
1635 key: "a".into(),
1636 value: 1,
1637 size: 10,
1638 })
1639 .await;
1640 let mut write_b = handle
1641 .write(TestWrite {
1642 key: "b".into(),
1643 value: 2,
1644 size: 10,
1645 })
1646 .await
1647 .unwrap();
1648
1649 coordinator.start();
1651 write_b.wait(Durability::Applied).await.unwrap();
1652
1653 let result = handle
1655 .write(TestWrite {
1656 key: "c".into(),
1657 value: 3,
1658 size: 10,
1659 })
1660 .await;
1661 assert!(result.is_ok());
1662
1663 coordinator.stop().await;
1665 }
1666
1667 #[tokio::test]
1672 async fn should_shutdown_cleanly_when_stop_called() {
1673 let flusher = TestFlusher::default();
1675 let mut coordinator = WriteCoordinator::new(
1676 test_config(),
1677 vec!["default".to_string()],
1678 TestContext::default(),
1679 flusher.initial_snapshot().await,
1680 flusher.clone(),
1681 );
1682 let handle = coordinator.handle("default");
1683 coordinator.start();
1684
1685 let result = coordinator.stop().await;
1687
1688 assert!(result.is_ok());
1690 }
1691
1692 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1693 async fn should_flush_pending_writes_on_shutdown() {
1694 let flusher = TestFlusher::default();
1696 let config = WriteCoordinatorConfig {
1697 queue_capacity: 100,
1698 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX, };
1701 let mut coordinator = WriteCoordinator::new(
1702 config,
1703 vec!["default".to_string()],
1704 TestContext::default(),
1705 flusher.initial_snapshot().await,
1706 flusher.clone(),
1707 );
1708 let handle = coordinator.handle("default");
1709 coordinator.start();
1710
1711 let write = handle
1713 .write(TestWrite {
1714 key: "a".into(),
1715 value: 1,
1716 size: 10,
1717 })
1718 .await
1719 .unwrap();
1720 let epoch = write.epoch().await.unwrap();
1721
1722 coordinator.stop().await;
1724
1725 let events = flusher.flushed_events();
1727 assert_eq!(events.len(), 1);
1728 let epoch_range = &events[0].epoch_range;
1729 assert!(epoch_range.contains(&epoch));
1730 }
1731
1732 #[tokio::test]
1733 async fn should_return_shutdown_error_after_coordinator_stops() {
1734 let flusher = TestFlusher::default();
1736 let mut coordinator = WriteCoordinator::new(
1737 test_config(),
1738 vec!["default".to_string()],
1739 TestContext::default(),
1740 flusher.initial_snapshot().await,
1741 flusher.clone(),
1742 );
1743 let handle = coordinator.handle("default");
1744 coordinator.start();
1745
1746 coordinator.stop().await;
1748
1749 let result = handle
1751 .write(TestWrite {
1752 key: "a".into(),
1753 value: 1,
1754 size: 10,
1755 })
1756 .await;
1757
1758 assert!(matches!(result, Err(WriteError::Shutdown)));
1760 }
1761
1762 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1767 async fn should_track_epoch_range_in_flush_event() {
1768 let flusher = TestFlusher::default();
1770 let mut coordinator = WriteCoordinator::new(
1771 test_config(),
1772 vec!["default".to_string()],
1773 TestContext::default(),
1774 flusher.initial_snapshot().await,
1775 flusher.clone(),
1776 );
1777 let handle = coordinator.handle("default");
1778 coordinator.start();
1779
1780 handle
1782 .write(TestWrite {
1783 key: "a".into(),
1784 value: 1,
1785 size: 10,
1786 })
1787 .await
1788 .unwrap();
1789 handle
1790 .write(TestWrite {
1791 key: "b".into(),
1792 value: 2,
1793 size: 10,
1794 })
1795 .await
1796 .unwrap();
1797 let mut last_write = handle
1798 .write(TestWrite {
1799 key: "c".into(),
1800 value: 3,
1801 size: 10,
1802 })
1803 .await
1804 .unwrap();
1805
1806 handle.flush(false).await.unwrap();
1807 last_write.wait(Durability::Flushed).await.unwrap();
1808
1809 let events = flusher.flushed_events();
1811 assert_eq!(events.len(), 1);
1812 let epoch_range = &events[0].epoch_range;
1813 assert_eq!(epoch_range.start, 1);
1814 assert_eq!(epoch_range.end, 4); coordinator.stop().await;
1818 }
1819
1820 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1821 async fn should_have_contiguous_epoch_ranges() {
1822 let flusher = TestFlusher::default();
1824 let mut coordinator = WriteCoordinator::new(
1825 test_config(),
1826 vec!["default".to_string()],
1827 TestContext::default(),
1828 flusher.initial_snapshot().await,
1829 flusher.clone(),
1830 );
1831 let handle = coordinator.handle("default");
1832 coordinator.start();
1833
1834 handle
1836 .write(TestWrite {
1837 key: "a".into(),
1838 value: 1,
1839 size: 10,
1840 })
1841 .await
1842 .unwrap();
1843 let mut write2 = handle
1844 .write(TestWrite {
1845 key: "b".into(),
1846 value: 2,
1847 size: 10,
1848 })
1849 .await
1850 .unwrap();
1851 handle.flush(false).await.unwrap();
1852 write2.wait(Durability::Flushed).await.unwrap();
1853
1854 let mut write3 = handle
1856 .write(TestWrite {
1857 key: "c".into(),
1858 value: 3,
1859 size: 10,
1860 })
1861 .await
1862 .unwrap();
1863 handle.flush(false).await.unwrap();
1864 write3.wait(Durability::Flushed).await.unwrap();
1865
1866 let events = flusher.flushed_events();
1868 assert_eq!(events.len(), 2);
1869
1870 let range1 = &events[0].epoch_range;
1871 let range2 = &events[1].epoch_range;
1872
1873 assert_eq!(range1.end, range2.start);
1875 assert_eq!(range1, &(1..3));
1876 assert_eq!(range2, &(3..4));
1877
1878 coordinator.stop().await;
1880 }
1881
1882 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1883 async fn should_include_exact_epochs_in_range() {
1884 let flusher = TestFlusher::default();
1886 let mut coordinator = WriteCoordinator::new(
1887 test_config(),
1888 vec!["default".to_string()],
1889 TestContext::default(),
1890 flusher.initial_snapshot().await,
1891 flusher.clone(),
1892 );
1893 let handle = coordinator.handle("default");
1894 coordinator.start();
1895
1896 let write1 = handle
1898 .write(TestWrite {
1899 key: "a".into(),
1900 value: 1,
1901 size: 10,
1902 })
1903 .await
1904 .unwrap();
1905 let epoch1 = write1.epoch().await.unwrap();
1906
1907 let mut write2 = handle
1908 .write(TestWrite {
1909 key: "b".into(),
1910 value: 2,
1911 size: 10,
1912 })
1913 .await
1914 .unwrap();
1915 let epoch2 = write2.epoch().await.unwrap();
1916
1917 handle.flush(false).await.unwrap();
1918 write2.wait(Durability::Flushed).await.unwrap();
1919
1920 let events = flusher.flushed_events();
1922 assert_eq!(events.len(), 1);
1923 let epoch_range = &events[0].epoch_range;
1924
1925 assert_eq!(epoch_range.start, epoch1);
1927 assert_eq!(epoch_range.end, epoch2 + 1);
1929 assert!(epoch_range.contains(&epoch1));
1931 assert!(epoch_range.contains(&epoch2));
1932
1933 coordinator.stop().await;
1935 }
1936
1937 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1942 async fn should_preserve_context_across_flushes() {
1943 let flusher = TestFlusher::default();
1945 let mut coordinator = WriteCoordinator::new(
1946 test_config(),
1947 vec!["default".to_string()],
1948 TestContext::default(),
1949 flusher.initial_snapshot().await,
1950 flusher.clone(),
1951 );
1952 let handle = coordinator.handle("default");
1953 coordinator.start();
1954
1955 let mut write1 = handle
1957 .write(TestWrite {
1958 key: "a".into(),
1959 value: 1,
1960 size: 10,
1961 })
1962 .await
1963 .unwrap();
1964 handle.flush(false).await.unwrap();
1965 write1.wait(Durability::Flushed).await.unwrap();
1966
1967 let mut write2 = handle
1969 .write(TestWrite {
1970 key: "a".into(),
1971 value: 2,
1972 size: 10,
1973 })
1974 .await
1975 .unwrap();
1976 handle.flush(false).await.unwrap();
1977 write2.wait(Durability::Flushed).await.unwrap();
1978
1979 let events = flusher.flushed_events();
1981 assert_eq!(events.len(), 2);
1982
1983 let (seq1, _) = events[0].val.writes.get("a").unwrap();
1985 assert_eq!(*seq1, 0);
1986
1987 let (seq2, _) = events[1].val.writes.get("a").unwrap();
1989 assert_eq!(*seq2, 1);
1990
1991 coordinator.stop().await;
1993 }
1994
1995 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2000 async fn should_receive_view_on_subscribe() {
2001 let flusher = TestFlusher::default();
2003 let mut coordinator = WriteCoordinator::new(
2004 test_config(),
2005 vec!["default".to_string()],
2006 TestContext::default(),
2007 flusher.initial_snapshot().await,
2008 flusher.clone(),
2009 );
2010 let handle = coordinator.handle("default");
2011 let (mut subscriber, _) = coordinator.subscribe();
2012 coordinator.start();
2013
2014 handle
2016 .write(TestWrite {
2017 key: "a".into(),
2018 value: 1,
2019 size: 10,
2020 })
2021 .await
2022 .unwrap();
2023 handle.flush(false).await.unwrap();
2024
2025 let result = subscriber.recv().await;
2027 assert!(result.is_ok());
2028
2029 coordinator.stop().await;
2031 }
2032
2033 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2034 async fn should_include_snapshot_in_view_after_flush() {
2035 let flusher = TestFlusher::default();
2037 let mut coordinator = WriteCoordinator::new(
2038 test_config(),
2039 vec!["default".to_string()],
2040 TestContext::default(),
2041 flusher.initial_snapshot().await,
2042 flusher.clone(),
2043 );
2044 let handle = coordinator.handle("default");
2045 let (mut subscriber, _) = coordinator.subscribe();
2046 coordinator.start();
2047
2048 handle
2050 .write(TestWrite {
2051 key: "a".into(),
2052 value: 1,
2053 size: 10,
2054 })
2055 .await
2056 .unwrap();
2057 handle.flush(false).await.unwrap();
2058
2059 let _ = subscriber.recv().await.unwrap();
2061 let result = subscriber.recv().await.unwrap();
2063
2064 assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2066
2067 coordinator.stop().await;
2069 }
2070
2071 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2072 async fn should_include_delta_in_view_after_flush() {
2073 let flusher = TestFlusher::default();
2075 let mut coordinator = WriteCoordinator::new(
2076 test_config(),
2077 vec!["default".to_string()],
2078 TestContext::default(),
2079 flusher.initial_snapshot().await,
2080 flusher.clone(),
2081 );
2082 let handle = coordinator.handle("default");
2083 let (mut subscriber, _) = coordinator.subscribe();
2084 coordinator.start();
2085
2086 handle
2088 .write(TestWrite {
2089 key: "a".into(),
2090 value: 42,
2091 size: 10,
2092 })
2093 .await
2094 .unwrap();
2095 handle.flush(false).await.unwrap();
2096
2097 let _ = subscriber.recv().await.unwrap();
2099 let result = subscriber.recv().await.unwrap();
2101
2102 let flushed = result.last_flushed_delta.as_ref().unwrap();
2104 assert_eq!(flushed.val.get("a"), Some(&42));
2105
2106 coordinator.stop().await;
2108 }
2109
2110 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2111 async fn should_include_epoch_range_in_view_after_flush() {
2112 let flusher = TestFlusher::default();
2114 let mut coordinator = WriteCoordinator::new(
2115 test_config(),
2116 vec!["default".to_string()],
2117 TestContext::default(),
2118 flusher.initial_snapshot().await,
2119 flusher.clone(),
2120 );
2121 let handle = coordinator.handle("default");
2122 let (mut subscriber, _) = coordinator.subscribe();
2123 coordinator.start();
2124
2125 let write1 = handle
2127 .write(TestWrite {
2128 key: "a".into(),
2129 value: 1,
2130 size: 10,
2131 })
2132 .await
2133 .unwrap();
2134 let write2 = handle
2135 .write(TestWrite {
2136 key: "b".into(),
2137 value: 2,
2138 size: 10,
2139 })
2140 .await
2141 .unwrap();
2142 handle.flush(false).await.unwrap();
2143
2144 let _ = subscriber.recv().await.unwrap();
2146 let result = subscriber.recv().await.unwrap();
2148
2149 let flushed = result.last_flushed_delta.as_ref().unwrap();
2151 let epoch1 = write1.epoch().await.unwrap();
2152 let epoch2 = write2.epoch().await.unwrap();
2153 assert!(flushed.epoch_range.contains(&epoch1));
2154 assert!(flushed.epoch_range.contains(&epoch2));
2155 assert_eq!(flushed.epoch_range.start, epoch1);
2156 assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2157
2158 coordinator.stop().await;
2160 }
2161
2162 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2163 async fn should_broadcast_frozen_delta_on_freeze() {
2164 let flusher = TestFlusher::default();
2166 let mut coordinator = WriteCoordinator::new(
2167 test_config(),
2168 vec!["default".to_string()],
2169 TestContext::default(),
2170 flusher.initial_snapshot().await,
2171 flusher.clone(),
2172 );
2173 let handle = coordinator.handle("default");
2174 let (mut subscriber, _) = coordinator.subscribe();
2175 coordinator.start();
2176
2177 handle
2179 .write(TestWrite {
2180 key: "a".into(),
2181 value: 1,
2182 size: 10,
2183 })
2184 .await
2185 .unwrap();
2186 handle.flush(false).await.unwrap();
2187
2188 let state = subscriber.recv().await.unwrap();
2190 assert_eq!(state.frozen.len(), 1);
2191 assert!(state.frozen[0].val.contains_key("a"));
2192
2193 coordinator.stop().await;
2195 }
2196
2197 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2198 async fn should_remove_frozen_delta_after_flush_complete() {
2199 let flusher = TestFlusher::default();
2201 let mut coordinator = WriteCoordinator::new(
2202 test_config(),
2203 vec!["default".to_string()],
2204 TestContext::default(),
2205 flusher.initial_snapshot().await,
2206 flusher.clone(),
2207 );
2208 let handle = coordinator.handle("default");
2209 let (mut subscriber, _) = coordinator.subscribe();
2210 coordinator.start();
2211
2212 handle
2214 .write(TestWrite {
2215 key: "a".into(),
2216 value: 1,
2217 size: 10,
2218 })
2219 .await
2220 .unwrap();
2221 handle.flush(false).await.unwrap();
2222
2223 let state1 = subscriber.recv().await.unwrap();
2225 assert_eq!(state1.frozen.len(), 1);
2226
2227 let state2 = subscriber.recv().await.unwrap();
2229 assert_eq!(state2.frozen.len(), 0);
2230 assert!(state2.last_flushed_delta.is_some());
2231
2232 coordinator.stop().await;
2234 }
2235
2236 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2241 async fn should_flush_even_when_no_writes_if_flush_storage() {
2242 let flusher = TestFlusher::default();
2244 let storage = Arc::new(InMemoryStorage::new());
2245 let snapshot = storage.snapshot().await.unwrap();
2246 let mut coordinator = WriteCoordinator::new(
2247 test_config(),
2248 vec!["default".to_string()],
2249 TestContext::default(),
2250 snapshot,
2251 flusher.clone(),
2252 );
2253 let handle = coordinator.handle("default");
2254 coordinator.start();
2255
2256 let mut flush_handle = handle.flush(true).await.unwrap();
2258 flush_handle.wait(Durability::Durable).await.unwrap();
2259
2260 assert_eq!(flusher.flushed_events().len(), 0);
2263
2264 coordinator.stop().await;
2266 }
2267
2268 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2269 async fn should_advance_durable_watermark() {
2270 let flusher = TestFlusher::default();
2272 let storage = Arc::new(InMemoryStorage::new());
2273 let snapshot = storage.snapshot().await.unwrap();
2274 let mut coordinator = WriteCoordinator::new(
2275 test_config(),
2276 vec!["default".to_string()],
2277 TestContext::default(),
2278 snapshot,
2279 flusher.clone(),
2280 );
2281 let handle = coordinator.handle("default");
2282 coordinator.start();
2283
2284 let mut write = handle
2286 .write(TestWrite {
2287 key: "a".into(),
2288 value: 1,
2289 size: 10,
2290 })
2291 .await
2292 .unwrap();
2293 let mut flush_handle = handle.flush(true).await.unwrap();
2294
2295 flush_handle.wait(Durability::Durable).await.unwrap();
2297 write.wait(Durability::Durable).await.unwrap();
2298 assert_eq!(flusher.flushed_events().len(), 1);
2299
2300 coordinator.stop().await;
2302 }
2303
2304 #[tokio::test]
2305 async fn should_see_applied_write_via_view() {
2306 let flusher = TestFlusher::default();
2308 let mut coordinator = WriteCoordinator::new(
2309 test_config(),
2310 vec!["default".to_string()],
2311 TestContext::default(),
2312 flusher.initial_snapshot().await,
2313 flusher,
2314 );
2315 let handle = coordinator.handle("default");
2316 coordinator.start();
2317
2318 let mut write = handle
2320 .write(TestWrite {
2321 key: "a".into(),
2322 value: 42,
2323 size: 10,
2324 })
2325 .await
2326 .unwrap();
2327 write.wait(Durability::Applied).await.unwrap();
2328
2329 let view = coordinator.view();
2331 assert_eq!(view.current.get("a"), Some(42));
2332
2333 coordinator.stop().await;
2335 }
2336
2337 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2342 async fn should_flush_writes_from_multiple_channels() {
2343 let flusher = TestFlusher::default();
2345 let mut coordinator = WriteCoordinator::new(
2346 test_config(),
2347 vec!["ch1".to_string(), "ch2".to_string()],
2348 TestContext::default(),
2349 flusher.initial_snapshot().await,
2350 flusher.clone(),
2351 );
2352 let ch1 = coordinator.handle("ch1");
2353 let ch2 = coordinator.handle("ch2");
2354 coordinator.start();
2355
2356 let mut w1 = ch1
2359 .write(TestWrite {
2360 key: "a".into(),
2361 value: 10,
2362 size: 10,
2363 })
2364 .await
2365 .unwrap();
2366 w1.wait(Durability::Applied).await.unwrap();
2367
2368 let mut w2 = ch2
2369 .write(TestWrite {
2370 key: "b".into(),
2371 value: 20,
2372 size: 10,
2373 })
2374 .await
2375 .unwrap();
2376 w2.wait(Durability::Applied).await.unwrap();
2377
2378 let mut w3 = ch1
2379 .write(TestWrite {
2380 key: "c".into(),
2381 value: 30,
2382 size: 10,
2383 })
2384 .await
2385 .unwrap();
2386 w3.wait(Durability::Applied).await.unwrap();
2387
2388 ch1.flush(false).await.unwrap();
2389 w3.wait(Durability::Flushed).await.unwrap();
2390
2391 let snapshot = flusher.storage.snapshot().await.unwrap();
2393 assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2394
2395 coordinator.stop().await;
2397 }
2398}