1#![allow(unused)]
2
3mod error;
4mod handle;
5pub mod subscriber;
6mod traits;
7
8use std::collections::HashMap;
9use std::ops::Range;
10use std::ops::{Deref, DerefMut};
11
12pub use error::{WriteError, WriteResult};
13use futures::stream::{self, SelectAll, StreamExt};
14pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
15pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
16pub use traits::{Delta, Durability, EpochStamped, Flusher};
17
18enum FlushEvent<D: Delta> {
20 FlushDelta { frozen: EpochStamped<D::Frozen> },
22 FlushStorage,
24}
25
26use crate::StorageRead;
28use crate::storage::StorageSnapshot;
29use async_trait::async_trait;
30pub use handle::EpochWatcher;
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::{broadcast, mpsc, oneshot, watch};
34use tokio::time::{Instant, Interval, interval_at};
35use tokio_util::sync::CancellationToken;
36
37#[derive(Debug, Clone)]
39pub struct WriteCoordinatorConfig {
40 pub queue_capacity: usize,
42 pub flush_interval: Duration,
44 pub flush_size_threshold: usize,
46}
47
48impl Default for WriteCoordinatorConfig {
49 fn default() -> Self {
50 Self {
51 queue_capacity: 10_000,
52 flush_interval: Duration::from_secs(10),
53 flush_size_threshold: 64 * 1024 * 1024, }
55 }
56}
57
58pub(crate) enum WriteCommand<D: Delta> {
59 Write {
60 write: D::Write,
61 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
62 },
63 Flush {
64 epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
65 flush_storage: bool,
66 },
67}
68
69pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
74 handles: HashMap<String, WriteCoordinatorHandle<D>>,
75 pause_handles: HashMap<String, PauseHandle>,
76 stop_tok: CancellationToken,
77 tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
78 write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
79 view: Arc<BroadcastedView<D>>,
80}
81
82impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
83 pub fn new(
84 config: WriteCoordinatorConfig,
85 channels: Vec<impl ToString>,
86 initial_context: D::Context,
87 initial_snapshot: Arc<dyn StorageSnapshot>,
88 flusher: F,
89 ) -> WriteCoordinator<D, F> {
90 let (watermarks, watcher) = EpochWatermarks::new();
91 let watermarks = Arc::new(watermarks);
92
93 let mut write_rxs = Vec::with_capacity(channels.len());
95 let mut write_txs = HashMap::new();
96 let mut pause_handles = HashMap::new();
97 for name in &channels {
98 let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
99 write_rxs.push(write_rx);
100 write_txs.insert(name.to_string(), write_tx);
101 pause_handles.insert(name.to_string(), pause_hdl);
102 }
103
104 let (flush_tx, flush_rx) = mpsc::channel(2);
110
111 let flush_stop_tok = CancellationToken::new();
112 let stop_tok = CancellationToken::new();
113 let write_task = WriteCoordinatorTask::new(
114 config,
115 initial_context,
116 initial_snapshot,
117 write_rxs,
118 flush_tx,
119 watermarks.clone(),
120 stop_tok.clone(),
121 flush_stop_tok.clone(),
122 );
123
124 let view = write_task.view.clone();
125
126 let mut handles = HashMap::new();
127 for name in channels {
128 let write_tx = write_txs.remove(&name.to_string()).expect("unreachable");
129 handles.insert(
130 name.to_string(),
131 WriteCoordinatorHandle::new(write_tx, watcher.clone(), view.clone()),
132 );
133 }
134
135 let flush_task = FlushTask {
136 flusher,
137 stop_tok: flush_stop_tok,
138 flush_rx,
139 watermarks: watermarks.clone(),
140 view: view.clone(),
141 last_flushed_epoch: 0,
142 };
143
144 Self {
145 handles,
146 pause_handles,
147 tasks: Some((write_task, flush_task)),
148 write_task_jh: None,
149 stop_tok,
150 view,
151 }
152 }
153
154 pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
155 self.handles
156 .get(name)
157 .expect("unknown channel name")
158 .clone()
159 }
160
161 pub fn pause_handle(&self, name: &str) -> PauseHandle {
162 self.pause_handles
163 .get(name)
164 .expect("unknown channel name")
165 .clone()
166 }
167
168 pub fn start(&mut self) {
169 let Some((write_task, flush_task)) = self.tasks.take() else {
170 return;
172 };
173 let flush_task_jh = flush_task.run();
174 let write_task_jh = write_task.run(flush_task_jh);
175 self.write_task_jh = Some(write_task_jh);
176 }
177
178 pub async fn stop(mut self) -> Result<(), String> {
179 let Some(write_task_jh) = self.write_task_jh.take() else {
180 return Ok(());
181 };
182 self.stop_tok.cancel();
183 write_task_jh.await.map_err(|e| e.to_string())?
184 }
185
186 pub fn view(&self) -> Arc<View<D>> {
187 self.view.current()
188 }
189
190 pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
191 let (view_rx, initial_view) = self.view.subscribe();
192 ViewSubscriber::new(view_rx, initial_view)
193 }
194}
195
196struct WriteCoordinatorTask<D: Delta> {
197 config: WriteCoordinatorConfig,
198 delta: CurrentDelta<D>,
199 flush_tx: mpsc::Sender<FlushEvent<D>>,
200 write_rxs: Vec<PausableReceiver<D>>,
201 watermarks: Arc<EpochWatermarks>,
202 view: Arc<BroadcastedView<D>>,
203 epoch: u64,
204 delta_start_epoch: u64,
205 flush_interval: Interval,
206 stop_tok: CancellationToken,
207 flush_stop_tok: CancellationToken,
208}
209
210impl<D: Delta> WriteCoordinatorTask<D> {
211 #[allow(clippy::too_many_arguments)]
215 pub fn new(
216 config: WriteCoordinatorConfig,
217 initial_context: D::Context,
218 initial_snapshot: Arc<dyn StorageSnapshot>,
219 write_rxs: Vec<PausableReceiver<D>>,
220 flush_tx: mpsc::Sender<FlushEvent<D>>,
221 watermarks: Arc<EpochWatermarks>,
222 stop_tok: CancellationToken,
223 flush_stop_tok: CancellationToken,
224 ) -> Self {
225 let delta = D::init(initial_context);
226
227 let initial_view = View {
228 current: delta.reader(),
229 frozen: vec![],
230 snapshot: initial_snapshot,
231 last_written_delta: None,
232 };
233 let initial_view = Arc::new(BroadcastedView::new(initial_view));
234
235 let flush_interval = interval_at(
236 Instant::now() + config.flush_interval,
237 config.flush_interval,
238 );
239 Self {
240 config,
241 delta: CurrentDelta::new(delta),
242 write_rxs,
243 flush_tx,
244 watermarks,
245 view: initial_view,
246 epoch: 1,
250 delta_start_epoch: 1,
251 flush_interval,
252 stop_tok,
253 flush_stop_tok,
254 }
255 }
256
257 pub fn run(
259 mut self,
260 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
261 ) -> tokio::task::JoinHandle<Result<(), String>> {
262 tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
263 }
264
265 async fn run_coordinator(
266 mut self,
267 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
268 ) -> Result<(), String> {
269 self.flush_interval.reset();
271
272 let mut write_stream: SelectAll<_> = SelectAll::new();
274 for rx in self.write_rxs.drain(..) {
275 write_stream.push(
276 stream::unfold(
277 rx,
278 |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
279 )
280 .boxed(),
281 );
282 }
283
284 loop {
285 tokio::select! {
286 cmd = write_stream.next() => {
287 match cmd {
288 Some(WriteCommand::Write { write, result_tx }) => {
289 self.handle_write(write, result_tx).await?;
290 }
291 Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
292 let _ = epoch_tx.send(Ok(handle::WriteApplied {
294 epoch: self.epoch.saturating_sub(1),
295 result: (),
296 }));
297 self.handle_flush(flush_storage).await;
298 }
299 None => {
300 break;
302 }
303 }
304 }
305
306 _ = self.flush_interval.tick() => {
307 self.handle_flush(false).await;
308 }
309
310 _ = self.stop_tok.cancelled() => {
311 break;
312 }
313 }
314 }
315
316 self.handle_flush(false).await;
318
319 self.flush_stop_tok.cancel();
321 flush_task_jh
323 .await
324 .map_err(|e| format!("flush task panicked: {}", e))?
325 .map_err(|e| format!("flush task error: {}", e))
326 }
327
328 async fn handle_write(
329 &mut self,
330 write: D::Write,
331 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
332 ) -> Result<(), String> {
333 let write_epoch = self.epoch;
334 self.epoch += 1;
335
336 let result = self.delta.apply(write);
337 let _ = result_tx.send(
339 result
340 .map(|apply_result| handle::WriteApplied {
341 epoch: write_epoch,
342 result: apply_result,
343 })
344 .map_err(|e| handle::WriteFailed {
345 epoch: write_epoch,
346 error: e,
347 }),
348 );
349
350 self.watermarks.update_applied(write_epoch);
352
353 if self.delta.estimate_size() >= self.config.flush_size_threshold {
354 self.handle_flush(false).await;
355 }
356
357 Ok(())
358 }
359
360 async fn handle_flush(&mut self, flush_storage: bool) {
361 self.flush_if_delta_has_writes().await;
362 if flush_storage {
363 let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
364 }
365 }
366
367 async fn flush_if_delta_has_writes(&mut self) {
368 if self.epoch == self.delta_start_epoch {
369 return;
370 }
371
372 self.flush_interval.reset();
373
374 let epoch_range = self.delta_start_epoch..self.epoch;
375 self.delta_start_epoch = self.epoch;
376 let (frozen, frozen_reader) = self.delta.freeze_and_init();
377 let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
378 let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
379 let reader = self.delta.reader();
380 self.view.update_delta_frozen(stamped_frozen_reader, reader);
383 let _ = self
386 .flush_tx
387 .send(FlushEvent::FlushDelta {
388 frozen: stamped_frozen,
389 })
390 .await;
391 }
392}
393
394struct FlushTask<D: Delta, F: Flusher<D>> {
395 flusher: F,
396 stop_tok: CancellationToken,
397 flush_rx: mpsc::Receiver<FlushEvent<D>>,
398 watermarks: Arc<EpochWatermarks>,
399 view: Arc<BroadcastedView<D>>,
400 last_flushed_epoch: u64,
401}
402
403impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
404 fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
405 tokio::spawn(async move {
406 loop {
407 tokio::select! {
408 event = self.flush_rx.recv() => {
409 let Some(event) = event else {
410 break;
411 };
412 self.handle_event(event).await?;
413 }
414 _ = self.stop_tok.cancelled() => {
415 break;
416 }
417 }
418 }
419 while let Ok(event) = self.flush_rx.try_recv() {
421 self.handle_event(event).await;
422 }
423 Ok(())
424 })
425 }
426
427 async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
428 match event {
429 FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
430 FlushEvent::FlushStorage => {
431 self.flusher
432 .flush_storage()
433 .await
434 .map_err(|e| WriteError::FlushError(e.to_string()))?;
435 self.watermarks.update_durable(self.last_flushed_epoch);
436 Ok(())
437 }
438 }
439 }
440
441 async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
442 let delta = frozen.val;
443 let epoch_range = frozen.epoch_range;
444 let snapshot = self
445 .flusher
446 .flush_delta(delta, &epoch_range)
447 .await
448 .map_err(|e| WriteError::FlushError(e.to_string()))?;
449 self.last_flushed_epoch = epoch_range.end - 1;
450 self.watermarks.update_written(self.last_flushed_epoch);
451 self.view.update_flush_finished(snapshot, epoch_range);
452 Ok(())
453 }
454}
455
456struct CurrentDelta<D: Delta> {
457 delta: Option<D>,
458}
459
460impl<D: Delta> Deref for CurrentDelta<D> {
461 type Target = D;
462
463 fn deref(&self) -> &Self::Target {
464 match &self.delta {
465 Some(d) => d,
466 None => panic!("current delta not initialized"),
467 }
468 }
469}
470
471impl<D: Delta> DerefMut for CurrentDelta<D> {
472 fn deref_mut(&mut self) -> &mut Self::Target {
473 match &mut self.delta {
474 Some(d) => d,
475 None => panic!("current delta not initialized"),
476 }
477 }
478}
479
480impl<D: Delta> CurrentDelta<D> {
481 fn new(delta: D) -> Self {
482 Self { delta: Some(delta) }
483 }
484
485 fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
486 let Some(delta) = self.delta.take() else {
487 panic!("delta not initialized");
488 };
489 let (frozen, frozen_reader, context) = delta.freeze();
490 let new_delta = D::init(context);
491 self.delta = Some(new_delta);
492 (frozen, frozen_reader)
493 }
494}
495
496pub struct EpochWatermarks {
497 applied_tx: tokio::sync::watch::Sender<u64>,
498 written_tx: tokio::sync::watch::Sender<u64>,
499 durable_tx: tokio::sync::watch::Sender<u64>,
500}
501
502impl EpochWatermarks {
503 pub fn new() -> (Self, EpochWatcher) {
504 let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
505 let (written_tx, written_rx) = tokio::sync::watch::channel(0);
506 let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
507 let watcher = EpochWatcher {
508 applied_rx,
509 written_rx,
510 durable_rx,
511 };
512 let watermarks = EpochWatermarks {
513 applied_tx,
514 written_tx,
515 durable_tx,
516 };
517 (watermarks, watcher)
518 }
519
520 pub fn update_applied(&self, epoch: u64) {
521 let _ = self.applied_tx.send(epoch);
522 }
523
524 pub fn update_written(&self, epoch: u64) {
525 let _ = self.written_tx.send(epoch);
526 }
527
528 pub fn update_durable(&self, epoch: u64) {
529 let _ = self.durable_tx.send(epoch);
530 }
531}
532
533pub(crate) struct BroadcastedView<D: Delta> {
534 inner: Mutex<BroadcastedViewInner<D>>,
535}
536
537impl<D: Delta> BroadcastedView<D> {
538 fn new(initial_view: View<D>) -> Self {
539 let (view_tx, _) = broadcast::channel(16);
540 Self {
541 inner: Mutex::new(BroadcastedViewInner {
542 view: Arc::new(initial_view),
543 view_tx,
544 }),
545 }
546 }
547
548 fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
549 self.inner
550 .lock()
551 .expect("lock poisoned")
552 .update_flush_finished(snapshot, epoch_range);
553 }
554
555 fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
556 self.inner
557 .lock()
558 .expect("lock poisoned")
559 .update_delta_frozen(frozen, reader);
560 }
561
562 fn current(&self) -> Arc<View<D>> {
563 self.inner.lock().expect("lock poisoned").current()
564 }
565
566 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
567 self.inner.lock().expect("lock poisoned").subscribe()
568 }
569}
570
571struct BroadcastedViewInner<D: Delta> {
572 view: Arc<View<D>>,
573 view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
574}
575
576impl<D: Delta> BroadcastedViewInner<D> {
577 fn update_flush_finished(
578 &mut self,
579 snapshot: Arc<dyn StorageSnapshot>,
580 epoch_range: Range<u64>,
581 ) {
582 let mut new_frozen = self.view.frozen.clone();
583 let last = new_frozen
584 .pop()
585 .expect("frozen should not be empty when flush completes");
586 assert_eq!(last.epoch_range, epoch_range);
587 self.view = Arc::new(View {
588 current: self.view.current.clone(),
589 frozen: new_frozen,
590 snapshot,
591 last_written_delta: Some(last),
592 });
593 self.view_tx.send(self.view.clone());
594 }
595
596 fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
597 let mut new_frozen = vec![frozen];
599 new_frozen.extend(self.view.frozen.iter().cloned());
600 self.view = Arc::new(View {
601 current: reader,
602 frozen: new_frozen,
603 snapshot: self.view.snapshot.clone(),
604 last_written_delta: self.view.last_written_delta.clone(),
605 });
606 self.view_tx.send(self.view.clone());
607 }
608
609 fn current(&self) -> Arc<View<D>> {
610 self.view.clone()
611 }
612
613 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
614 (self.view_tx.subscribe(), self.view.clone())
615 }
616}
617
618struct PausableReceiver<D: Delta> {
619 pause_rx: Option<watch::Receiver<bool>>,
620 rx: mpsc::Receiver<WriteCommand<D>>,
621}
622
623impl<D: Delta> PausableReceiver<D> {
624 async fn recv(&mut self) -> Option<WriteCommand<D>> {
625 if let Some(pause_rx) = self.pause_rx.as_mut() {
626 pause_rx.wait_for(|v| !*v).await;
627 }
628 self.rx.recv().await
629 }
630}
631
632#[derive(Clone)]
634pub struct PauseHandle {
635 pause_tx: tokio::sync::watch::Sender<bool>,
636}
637
638impl PauseHandle {
639 pub fn pause(&self) {
640 self.pause_tx.send_replace(true);
641 }
642
643 pub fn unpause(&self) {
644 self.pause_tx.send_replace(false);
645 }
646}
647
648fn pausable_channel<D: Delta>(
649 capacity: usize,
650) -> (
651 mpsc::Sender<WriteCommand<D>>,
652 PausableReceiver<D>,
653 PauseHandle,
654) {
655 let (pause_tx, pause_rx) = watch::channel(false);
656 let (tx, rx) = mpsc::channel(capacity);
657 (
658 tx,
659 PausableReceiver {
660 pause_rx: Some(pause_rx),
661 rx,
662 },
663 PauseHandle { pause_tx },
664 )
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670 use crate::BytesRange;
671 use crate::coordinator::Durability;
672 use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
673 use crate::storage::{PutRecordOp, Record, StorageSnapshot};
674 use crate::{Storage, StorageRead};
675 use async_trait::async_trait;
676 use bytes::Bytes;
677 use std::collections::{HashMap, HashSet};
678 use std::ops::Range;
679 use std::sync::Mutex;
680 #[derive(Clone, Debug)]
685 struct TestWrite {
686 key: String,
687 value: u64,
688 size: usize,
689 }
690
691 #[derive(Clone, Debug, Default)]
693 struct TestContext {
694 next_seq: u64,
695 error: Option<String>,
696 }
697
698 #[derive(Clone, Debug, Default)]
700 struct TestDeltaReader {
701 data: Arc<Mutex<HashMap<String, u64>>>,
702 }
703
704 impl TestDeltaReader {
705 fn get(&self, key: &str) -> Option<u64> {
706 self.data.lock().unwrap().get(key).copied()
707 }
708 }
709
710 #[derive(Debug)]
713 struct TestDelta {
714 context: TestContext,
715 writes: HashMap<String, (u64, u64)>,
716 key_values: Arc<Mutex<HashMap<String, u64>>>,
717 total_size: usize,
718 }
719
720 #[derive(Clone, Debug)]
721 struct FrozenTestDelta {
722 writes: HashMap<String, (u64, u64)>,
723 }
724
725 impl Delta for TestDelta {
726 type Context = TestContext;
727 type Write = TestWrite;
728 type DeltaView = TestDeltaReader;
729 type Frozen = FrozenTestDelta;
730 type FrozenView = Arc<HashMap<String, u64>>;
731 type ApplyResult = ();
732
733 fn init(context: Self::Context) -> Self {
734 Self {
735 context,
736 writes: HashMap::default(),
737 key_values: Arc::new(Mutex::new(HashMap::default())),
738 total_size: 0,
739 }
740 }
741
742 fn apply(&mut self, write: Self::Write) -> Result<(), String> {
743 if let Some(error) = &self.context.error {
744 return Err(error.clone());
745 }
746
747 let seq = self.context.next_seq;
748 self.context.next_seq += 1;
749
750 self.writes.insert(write.key.clone(), (seq, write.value));
751 self.total_size += write.size;
752 self.key_values
753 .lock()
754 .unwrap()
755 .insert(write.key, write.value);
756 Ok(())
757 }
758
759 fn estimate_size(&self) -> usize {
760 self.total_size
761 }
762
763 fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
764 let frozen = FrozenTestDelta {
765 writes: self.writes,
766 };
767 let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
768 (frozen, frozen_view, self.context)
769 }
770
771 fn reader(&self) -> Self::DeltaView {
772 TestDeltaReader {
773 data: self.key_values.clone(),
774 }
775 }
776 }
777
778 #[derive(Default)]
780 struct TestFlusherState {
781 flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
782 flush_started_tx: Option<oneshot::Sender<()>>,
784 unblock_rx: Option<mpsc::Receiver<()>>,
786 }
787
788 #[derive(Clone)]
789 struct TestFlusher {
790 state: Arc<Mutex<TestFlusherState>>,
791 storage: Arc<InMemoryStorage>,
792 }
793
794 impl Default for TestFlusher {
795 fn default() -> Self {
796 Self {
797 state: Arc::new(Mutex::new(TestFlusherState::default())),
798 storage: Arc::new(InMemoryStorage::new()),
799 }
800 }
801 }
802
803 impl TestFlusher {
804 fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
807 let (started_tx, started_rx) = oneshot::channel();
808 let (unblock_tx, unblock_rx) = mpsc::channel(1);
809 let flusher = Self {
810 state: Arc::new(Mutex::new(TestFlusherState {
811 flushed_events: Vec::new(),
812 flush_started_tx: Some(started_tx),
813 unblock_rx: Some(unblock_rx),
814 })),
815 storage: Arc::new(InMemoryStorage::new()),
816 };
817 (flusher, started_rx, unblock_tx)
818 }
819
820 fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
821 self.state.lock().unwrap().flushed_events.clone()
822 }
823
824 async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
825 self.storage.snapshot().await.unwrap()
826 }
827 }
828
829 #[async_trait]
830 impl Flusher<TestDelta> for TestFlusher {
831 async fn flush_delta(
832 &self,
833 frozen: FrozenTestDelta,
834 epoch_range: &Range<u64>,
835 ) -> Result<Arc<dyn StorageSnapshot>, String> {
836 let flush_started_tx = {
838 let mut state = self.state.lock().unwrap();
839 state.flush_started_tx.take()
840 };
841 if let Some(tx) = flush_started_tx {
842 let _ = tx.send(());
843 }
844
845 let unblock_rx = {
847 let mut state = self.state.lock().unwrap();
848 state.unblock_rx.take()
849 };
850 if let Some(mut rx) = unblock_rx {
851 rx.recv().await;
852 }
853
854 let records: Vec<PutRecordOp> = frozen
856 .writes
857 .iter()
858 .map(|(key, (seq, value))| {
859 let mut buf = Vec::with_capacity(16);
860 buf.extend_from_slice(&seq.to_le_bytes());
861 buf.extend_from_slice(&value.to_le_bytes());
862 Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
863 })
864 .collect();
865 self.storage
866 .put(records)
867 .await
868 .map_err(|e| format!("{}", e))?;
869
870 {
872 let mut state = self.state.lock().unwrap();
873 state
874 .flushed_events
875 .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
876 }
877
878 self.storage.snapshot().await.map_err(|e| format!("{}", e))
879 }
880
881 async fn flush_storage(&self) -> Result<(), String> {
882 let flush_started_tx = {
884 let mut state = self.state.lock().unwrap();
885 state.flush_started_tx.take()
886 };
887 if let Some(tx) = flush_started_tx {
888 let _ = tx.send(());
889 }
890
891 let unblock_rx = {
893 let mut state = self.state.lock().unwrap();
894 state.unblock_rx.take()
895 };
896 if let Some(mut rx) = unblock_rx {
897 rx.recv().await;
898 }
899
900 Ok(())
901 }
902 }
903
904 fn test_config() -> WriteCoordinatorConfig {
905 WriteCoordinatorConfig {
906 queue_capacity: 100,
907 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX,
909 }
910 }
911
912 async fn assert_snapshot_has_rows(
913 snapshot: &Arc<dyn StorageSnapshot>,
914 expected: &[(&str, u64, u64)],
915 ) {
916 let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
917 assert_eq!(
918 records.len(),
919 expected.len(),
920 "expected {} rows but snapshot has {}",
921 expected.len(),
922 records.len()
923 );
924 let mut actual: Vec<(String, u64, u64)> = records
925 .iter()
926 .map(|r| {
927 let key = String::from_utf8(r.key.to_vec()).unwrap();
928 let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
929 let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
930 (key, seq, value)
931 })
932 .collect();
933 actual.sort_by(|a, b| a.0.cmp(&b.0));
934 let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
935 expected.sort_by(|a, b| a.0.cmp(b.0));
936 for (actual, expected) in actual.iter().zip(expected.iter()) {
937 assert_eq!(
938 actual.0, expected.0,
939 "key mismatch: got {:?}, expected {:?}",
940 actual.0, expected.0
941 );
942 assert_eq!(
943 actual.1, expected.1,
944 "seq mismatch for key {:?}: got {}, expected {}",
945 actual.0, actual.1, expected.1
946 );
947 assert_eq!(
948 actual.2, expected.2,
949 "value mismatch for key {:?}: got {}, expected {}",
950 actual.0, actual.2, expected.2
951 );
952 }
953 }
954
955 #[tokio::test]
960 async fn should_assign_monotonic_epochs() {
961 let flusher = TestFlusher::default();
963 let mut coordinator = WriteCoordinator::new(
964 test_config(),
965 vec!["default".to_string()],
966 TestContext::default(),
967 flusher.initial_snapshot().await,
968 flusher,
969 );
970 let handle = coordinator.handle("default");
971 coordinator.start();
972
973 let write1 = handle
975 .try_write(TestWrite {
976 key: "a".into(),
977 value: 1,
978 size: 10,
979 })
980 .await
981 .unwrap();
982 let write2 = handle
983 .try_write(TestWrite {
984 key: "b".into(),
985 value: 2,
986 size: 10,
987 })
988 .await
989 .unwrap();
990 let write3 = handle
991 .try_write(TestWrite {
992 key: "c".into(),
993 value: 3,
994 size: 10,
995 })
996 .await
997 .unwrap();
998
999 let epoch1 = write1.epoch().await.unwrap();
1000 let epoch2 = write2.epoch().await.unwrap();
1001 let epoch3 = write3.epoch().await.unwrap();
1002
1003 assert!(epoch1 < epoch2);
1005 assert!(epoch2 < epoch3);
1006
1007 coordinator.stop().await;
1009 }
1010
1011 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1012 async fn should_apply_writes_in_order() {
1013 let flusher = TestFlusher::default();
1015 let mut coordinator = WriteCoordinator::new(
1016 test_config(),
1017 vec!["default".to_string()],
1018 TestContext::default(),
1019 flusher.initial_snapshot().await,
1020 flusher.clone(),
1021 );
1022 let handle = coordinator.handle("default");
1023 coordinator.start();
1024
1025 handle
1027 .try_write(TestWrite {
1028 key: "a".into(),
1029 value: 1,
1030 size: 10,
1031 })
1032 .await
1033 .unwrap();
1034 handle
1035 .try_write(TestWrite {
1036 key: "a".into(),
1037 value: 2,
1038 size: 10,
1039 })
1040 .await
1041 .unwrap();
1042 let mut last_write = handle
1043 .try_write(TestWrite {
1044 key: "a".into(),
1045 value: 3,
1046 size: 10,
1047 })
1048 .await
1049 .unwrap();
1050
1051 handle.flush(false).await.unwrap();
1052 last_write.wait(Durability::Written).await.unwrap();
1054
1055 let events = flusher.flushed_events();
1057 assert_eq!(events.len(), 1);
1058 let frozen_delta = &events[0];
1059 let delta = &frozen_delta.val;
1060 let (seq, value) = delta.writes.get("a").unwrap();
1062 assert_eq!(*value, 3);
1063 assert_eq!(*seq, 2);
1064
1065 coordinator.stop().await;
1067 }
1068
1069 #[tokio::test]
1070 async fn should_update_applied_watermark_after_each_write() {
1071 let flusher = TestFlusher::default();
1073 let mut coordinator = WriteCoordinator::new(
1074 test_config(),
1075 vec!["default".to_string()],
1076 TestContext::default(),
1077 flusher.initial_snapshot().await,
1078 flusher,
1079 );
1080 let handle = coordinator.handle("default");
1081 coordinator.start();
1082
1083 let mut write_handle = handle
1085 .try_write(TestWrite {
1086 key: "a".into(),
1087 value: 1,
1088 size: 10,
1089 })
1090 .await
1091 .unwrap();
1092
1093 let result = write_handle.wait(Durability::Applied).await;
1095 assert!(result.is_ok());
1096
1097 coordinator.stop().await;
1099 }
1100
1101 #[tokio::test]
1102 async fn should_propagate_apply_error_to_handle() {
1103 let flusher = TestFlusher::default();
1105 let context = TestContext {
1106 error: Some("apply error".to_string()),
1107 ..Default::default()
1108 };
1109 let mut coordinator = WriteCoordinator::new(
1110 test_config(),
1111 vec!["default".to_string()],
1112 context,
1113 flusher.initial_snapshot().await,
1114 flusher,
1115 );
1116 let handle = coordinator.handle("default");
1117 coordinator.start();
1118
1119 let write = handle
1121 .try_write(TestWrite {
1122 key: "a".into(),
1123 value: 1,
1124 size: 10,
1125 })
1126 .await
1127 .unwrap();
1128
1129 let result = write.epoch().await;
1130
1131 assert!(
1133 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1134 );
1135
1136 coordinator.stop().await;
1138 }
1139
1140 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1145 async fn should_flush_on_command() {
1146 let flusher = TestFlusher::default();
1148 let mut coordinator = WriteCoordinator::new(
1149 test_config(),
1150 vec!["default".to_string()],
1151 TestContext::default(),
1152 flusher.initial_snapshot().await,
1153 flusher.clone(),
1154 );
1155 let handle = coordinator.handle("default");
1156 coordinator.start();
1157
1158 let mut write = handle
1160 .try_write(TestWrite {
1161 key: "a".into(),
1162 value: 1,
1163 size: 10,
1164 })
1165 .await
1166 .unwrap();
1167 handle.flush(false).await.unwrap();
1168 write.wait(Durability::Written).await.unwrap();
1169
1170 assert_eq!(flusher.flushed_events().len(), 1);
1172
1173 coordinator.stop().await;
1175 }
1176
1177 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1178 async fn should_wait_on_flush_handle() {
1179 let flusher = TestFlusher::default();
1181 let mut coordinator = WriteCoordinator::new(
1182 test_config(),
1183 vec!["default".to_string()],
1184 TestContext::default(),
1185 flusher.initial_snapshot().await,
1186 flusher.clone(),
1187 );
1188 let handle = coordinator.handle("default");
1189 coordinator.start();
1190
1191 handle
1193 .try_write(TestWrite {
1194 key: "a".into(),
1195 value: 1,
1196 size: 10,
1197 })
1198 .await
1199 .unwrap();
1200 let mut flush_handle = handle.flush(false).await.unwrap();
1201
1202 flush_handle.wait(Durability::Written).await.unwrap();
1204 assert_eq!(flusher.flushed_events().len(), 1);
1205
1206 coordinator.stop().await;
1208 }
1209
1210 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1211 async fn should_return_correct_epoch_from_flush_handle() {
1212 let flusher = TestFlusher::default();
1214 let mut coordinator = WriteCoordinator::new(
1215 test_config(),
1216 vec!["default".to_string()],
1217 TestContext::default(),
1218 flusher.initial_snapshot().await,
1219 flusher,
1220 );
1221 let handle = coordinator.handle("default");
1222 coordinator.start();
1223
1224 let write1 = handle
1226 .try_write(TestWrite {
1227 key: "a".into(),
1228 value: 1,
1229 size: 10,
1230 })
1231 .await
1232 .unwrap();
1233 let write2 = handle
1234 .try_write(TestWrite {
1235 key: "b".into(),
1236 value: 2,
1237 size: 10,
1238 })
1239 .await
1240 .unwrap();
1241 let flush_handle = handle.flush(false).await.unwrap();
1242
1243 let flush_epoch = flush_handle.epoch().await.unwrap();
1245 let write2_epoch = write2.epoch().await.unwrap();
1246 assert_eq!(flush_epoch, write2_epoch);
1247
1248 coordinator.stop().await;
1250 }
1251
1252 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1253 async fn should_include_all_pending_writes_in_flush() {
1254 let flusher = TestFlusher::default();
1256 let mut coordinator = WriteCoordinator::new(
1257 test_config(),
1258 vec!["default".to_string()],
1259 TestContext::default(),
1260 flusher.initial_snapshot().await,
1261 flusher.clone(),
1262 );
1263 let handle = coordinator.handle("default");
1264 coordinator.start();
1265
1266 handle
1268 .try_write(TestWrite {
1269 key: "a".into(),
1270 value: 1,
1271 size: 10,
1272 })
1273 .await
1274 .unwrap();
1275 handle
1276 .try_write(TestWrite {
1277 key: "b".into(),
1278 value: 2,
1279 size: 10,
1280 })
1281 .await
1282 .unwrap();
1283 let mut last_write = handle
1284 .try_write(TestWrite {
1285 key: "c".into(),
1286 value: 3,
1287 size: 10,
1288 })
1289 .await
1290 .unwrap();
1291
1292 handle.flush(false).await.unwrap();
1293 last_write.wait(Durability::Written).await.unwrap();
1294
1295 let events = flusher.flushed_events();
1297 assert_eq!(events.len(), 1);
1298 let frozen_delta = &events[0];
1299 assert_eq!(frozen_delta.val.writes.len(), 3);
1300 let snapshot = flusher.storage.snapshot().await.unwrap();
1301 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1302
1303 coordinator.stop().await;
1305 }
1306
1307 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1308 async fn should_skip_flush_when_no_new_writes() {
1309 let flusher = TestFlusher::default();
1311 let mut coordinator = WriteCoordinator::new(
1312 test_config(),
1313 vec!["default".to_string()],
1314 TestContext::default(),
1315 flusher.initial_snapshot().await,
1316 flusher.clone(),
1317 );
1318 let handle = coordinator.handle("default");
1319 coordinator.start();
1320
1321 let mut write = handle
1323 .try_write(TestWrite {
1324 key: "a".into(),
1325 value: 1,
1326 size: 10,
1327 })
1328 .await
1329 .unwrap();
1330 handle.flush(false).await.unwrap();
1331 write.wait(Durability::Written).await.unwrap();
1332
1333 handle.flush(false).await.unwrap();
1335
1336 let sync_write = handle
1339 .try_write(TestWrite {
1340 key: "sync".into(),
1341 value: 0,
1342 size: 1,
1343 })
1344 .await
1345 .unwrap();
1346 sync_write.epoch().await.unwrap();
1347
1348 assert_eq!(flusher.flushed_events().len(), 1);
1350
1351 coordinator.stop().await;
1353 }
1354
1355 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1356 async fn should_update_written_watermark_after_flush() {
1357 let flusher = TestFlusher::default();
1359 let mut coordinator = WriteCoordinator::new(
1360 test_config(),
1361 vec!["default".to_string()],
1362 TestContext::default(),
1363 flusher.initial_snapshot().await,
1364 flusher,
1365 );
1366 let handle = coordinator.handle("default");
1367 coordinator.start();
1368
1369 let mut write_handle = handle
1371 .try_write(TestWrite {
1372 key: "a".into(),
1373 value: 1,
1374 size: 10,
1375 })
1376 .await
1377 .unwrap();
1378
1379 handle.flush(false).await.unwrap();
1380
1381 let result = write_handle.wait(Durability::Written).await;
1383 assert!(result.is_ok());
1384
1385 coordinator.stop().await;
1387 }
1388
1389 #[tokio::test(start_paused = true)]
1394 async fn should_flush_on_flush_interval() {
1395 let flusher = TestFlusher::default();
1397 let config = WriteCoordinatorConfig {
1398 queue_capacity: 100,
1399 flush_interval: Duration::from_millis(100),
1400 flush_size_threshold: usize::MAX,
1401 };
1402 let mut coordinator = WriteCoordinator::new(
1403 config,
1404 vec!["default".to_string()],
1405 TestContext::default(),
1406 flusher.initial_snapshot().await,
1407 flusher.clone(),
1408 );
1409 let handle = coordinator.handle("default");
1410 coordinator.start();
1411
1412 tokio::task::yield_now().await;
1414 let mut write = handle
1415 .try_write(TestWrite {
1416 key: "a".into(),
1417 value: 1,
1418 size: 10,
1419 })
1420 .await
1421 .unwrap();
1422 write.wait(Durability::Applied).await.unwrap();
1423
1424 assert_eq!(flusher.flushed_events().len(), 0);
1426
1427 tokio::time::advance(Duration::from_millis(150)).await;
1429 tokio::task::yield_now().await;
1430
1431 assert_eq!(flusher.flushed_events().len(), 1);
1433 let snapshot = flusher.storage.snapshot().await.unwrap();
1434 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1435
1436 coordinator.stop().await;
1438 }
1439
1440 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1445 async fn should_flush_when_size_threshold_exceeded() {
1446 let flusher = TestFlusher::default();
1448 let config = WriteCoordinatorConfig {
1449 queue_capacity: 100,
1450 flush_interval: Duration::from_secs(3600),
1451 flush_size_threshold: 100, };
1453 let mut coordinator = WriteCoordinator::new(
1454 config,
1455 vec!["default".to_string()],
1456 TestContext::default(),
1457 flusher.initial_snapshot().await,
1458 flusher.clone(),
1459 );
1460 let handle = coordinator.handle("default");
1461 coordinator.start();
1462
1463 let mut write = handle
1465 .try_write(TestWrite {
1466 key: "a".into(),
1467 value: 1,
1468 size: 150,
1469 })
1470 .await
1471 .unwrap();
1472 write.wait(Durability::Written).await.unwrap();
1473
1474 assert_eq!(flusher.flushed_events().len(), 1);
1476
1477 coordinator.stop().await;
1479 }
1480
1481 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1482 async fn should_accumulate_until_threshold() {
1483 let flusher = TestFlusher::default();
1485 let config = WriteCoordinatorConfig {
1486 queue_capacity: 100,
1487 flush_interval: Duration::from_secs(3600),
1488 flush_size_threshold: 100,
1489 };
1490 let mut coordinator = WriteCoordinator::new(
1491 config,
1492 vec!["default".to_string()],
1493 TestContext::default(),
1494 flusher.initial_snapshot().await,
1495 flusher.clone(),
1496 );
1497 let handle = coordinator.handle("default");
1498 coordinator.start();
1499
1500 for i in 0..5 {
1502 let mut w = handle
1503 .try_write(TestWrite {
1504 key: format!("key{}", i),
1505 value: i,
1506 size: 15,
1507 })
1508 .await
1509 .unwrap();
1510 w.wait(Durability::Applied).await.unwrap();
1511 }
1512
1513 assert_eq!(flusher.flushed_events().len(), 0);
1515
1516 let mut final_write = handle
1518 .try_write(TestWrite {
1519 key: "final".into(),
1520 value: 999,
1521 size: 30,
1522 })
1523 .await
1524 .unwrap();
1525 final_write.wait(Durability::Written).await.unwrap();
1526
1527 assert_eq!(flusher.flushed_events().len(), 1);
1529
1530 coordinator.stop().await;
1532 }
1533
1534 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1539 async fn should_accept_writes_during_flush() {
1540 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1542 let mut coordinator = WriteCoordinator::new(
1543 test_config(),
1544 vec!["default".to_string()],
1545 TestContext::default(),
1546 flusher.initial_snapshot().await,
1547 flusher.clone(),
1548 );
1549 let handle = coordinator.handle("default");
1550 coordinator.start();
1551
1552 let write1 = handle
1554 .try_write(TestWrite {
1555 key: "a".into(),
1556 value: 1,
1557 size: 10,
1558 })
1559 .await
1560 .unwrap();
1561 handle.flush(false).await.unwrap();
1562 flush_started_rx.await.unwrap(); let write2 = handle
1566 .try_write(TestWrite {
1567 key: "b".into(),
1568 value: 2,
1569 size: 10,
1570 })
1571 .await
1572 .unwrap();
1573 assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1574
1575 unblock_tx.send(()).await.unwrap();
1577 coordinator.stop().await;
1578 }
1579
1580 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1581 async fn should_assign_new_epochs_during_flush() {
1582 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1584 let mut coordinator = WriteCoordinator::new(
1585 test_config(),
1586 vec!["default".to_string()],
1587 TestContext::default(),
1588 flusher.initial_snapshot().await,
1589 flusher.clone(),
1590 );
1591 let handle = coordinator.handle("default");
1592 coordinator.start();
1593
1594 handle
1596 .try_write(TestWrite {
1597 key: "a".into(),
1598 value: 1,
1599 size: 10,
1600 })
1601 .await
1602 .unwrap();
1603 handle.flush(false).await.unwrap();
1604 flush_started_rx.await.unwrap(); let w1 = handle
1608 .try_write(TestWrite {
1609 key: "b".into(),
1610 value: 2,
1611 size: 10,
1612 })
1613 .await
1614 .unwrap();
1615 let w2 = handle
1616 .try_write(TestWrite {
1617 key: "c".into(),
1618 value: 3,
1619 size: 10,
1620 })
1621 .await
1622 .unwrap();
1623
1624 let e1 = w1.epoch().await.unwrap();
1626 let e2 = w2.epoch().await.unwrap();
1627 assert!(e1 < e2);
1628
1629 unblock_tx.send(()).await.unwrap();
1631 coordinator.stop().await;
1632 }
1633
1634 #[tokio::test]
1639 async fn should_return_backpressure_when_queue_full() {
1640 let flusher = TestFlusher::default();
1642 let config = WriteCoordinatorConfig {
1643 queue_capacity: 2,
1644 flush_interval: Duration::from_secs(3600),
1645 flush_size_threshold: usize::MAX,
1646 };
1647 let mut coordinator = WriteCoordinator::new(
1648 config,
1649 vec!["default".to_string()],
1650 TestContext::default(),
1651 flusher.initial_snapshot().await,
1652 flusher.clone(),
1653 );
1654 let handle = coordinator.handle("default");
1655 let _ = handle
1659 .try_write(TestWrite {
1660 key: "a".into(),
1661 value: 1,
1662 size: 10,
1663 })
1664 .await;
1665 let _ = handle
1666 .try_write(TestWrite {
1667 key: "b".into(),
1668 value: 2,
1669 size: 10,
1670 })
1671 .await;
1672
1673 let result = handle
1675 .try_write(TestWrite {
1676 key: "c".into(),
1677 value: 3,
1678 size: 10,
1679 })
1680 .await;
1681
1682 assert!(matches!(result, Err(WriteError::Backpressure(_))));
1684 }
1685
1686 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1687 async fn should_accept_writes_after_queue_drains() {
1688 let flusher = TestFlusher::default();
1690 let config = WriteCoordinatorConfig {
1691 queue_capacity: 2,
1692 flush_interval: Duration::from_secs(3600),
1693 flush_size_threshold: usize::MAX,
1694 };
1695 let mut coordinator = WriteCoordinator::new(
1696 config,
1697 vec!["default".to_string()],
1698 TestContext::default(),
1699 flusher.initial_snapshot().await,
1700 flusher.clone(),
1701 );
1702 let handle = coordinator.handle("default");
1703
1704 let _ = handle
1706 .try_write(TestWrite {
1707 key: "a".into(),
1708 value: 1,
1709 size: 10,
1710 })
1711 .await;
1712 let mut write_b = handle
1713 .try_write(TestWrite {
1714 key: "b".into(),
1715 value: 2,
1716 size: 10,
1717 })
1718 .await
1719 .unwrap();
1720
1721 coordinator.start();
1723 write_b.wait(Durability::Applied).await.unwrap();
1724
1725 let result = handle
1727 .try_write(TestWrite {
1728 key: "c".into(),
1729 value: 3,
1730 size: 10,
1731 })
1732 .await;
1733 assert!(result.is_ok());
1734
1735 coordinator.stop().await;
1737 }
1738
1739 #[tokio::test]
1744 async fn should_shutdown_cleanly_when_stop_called() {
1745 let flusher = TestFlusher::default();
1747 let mut coordinator = WriteCoordinator::new(
1748 test_config(),
1749 vec!["default".to_string()],
1750 TestContext::default(),
1751 flusher.initial_snapshot().await,
1752 flusher.clone(),
1753 );
1754 let handle = coordinator.handle("default");
1755 coordinator.start();
1756
1757 let result = coordinator.stop().await;
1759
1760 assert!(result.is_ok());
1762 }
1763
1764 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1765 async fn should_flush_pending_writes_on_shutdown() {
1766 let flusher = TestFlusher::default();
1768 let config = WriteCoordinatorConfig {
1769 queue_capacity: 100,
1770 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX, };
1773 let mut coordinator = WriteCoordinator::new(
1774 config,
1775 vec!["default".to_string()],
1776 TestContext::default(),
1777 flusher.initial_snapshot().await,
1778 flusher.clone(),
1779 );
1780 let handle = coordinator.handle("default");
1781 coordinator.start();
1782
1783 let write = handle
1785 .try_write(TestWrite {
1786 key: "a".into(),
1787 value: 1,
1788 size: 10,
1789 })
1790 .await
1791 .unwrap();
1792 let epoch = write.epoch().await.unwrap();
1793
1794 coordinator.stop().await;
1796
1797 let events = flusher.flushed_events();
1799 assert_eq!(events.len(), 1);
1800 let epoch_range = &events[0].epoch_range;
1801 assert!(epoch_range.contains(&epoch));
1802 }
1803
1804 #[tokio::test]
1805 async fn should_return_shutdown_error_after_coordinator_stops() {
1806 let flusher = TestFlusher::default();
1808 let mut coordinator = WriteCoordinator::new(
1809 test_config(),
1810 vec!["default".to_string()],
1811 TestContext::default(),
1812 flusher.initial_snapshot().await,
1813 flusher.clone(),
1814 );
1815 let handle = coordinator.handle("default");
1816 coordinator.start();
1817
1818 coordinator.stop().await;
1820
1821 let result = handle
1823 .try_write(TestWrite {
1824 key: "a".into(),
1825 value: 1,
1826 size: 10,
1827 })
1828 .await;
1829
1830 assert!(matches!(result, Err(WriteError::Shutdown)));
1832 }
1833
1834 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1839 async fn should_track_epoch_range_in_flush_event() {
1840 let flusher = TestFlusher::default();
1842 let mut coordinator = WriteCoordinator::new(
1843 test_config(),
1844 vec!["default".to_string()],
1845 TestContext::default(),
1846 flusher.initial_snapshot().await,
1847 flusher.clone(),
1848 );
1849 let handle = coordinator.handle("default");
1850 coordinator.start();
1851
1852 handle
1854 .try_write(TestWrite {
1855 key: "a".into(),
1856 value: 1,
1857 size: 10,
1858 })
1859 .await
1860 .unwrap();
1861 handle
1862 .try_write(TestWrite {
1863 key: "b".into(),
1864 value: 2,
1865 size: 10,
1866 })
1867 .await
1868 .unwrap();
1869 let mut last_write = handle
1870 .try_write(TestWrite {
1871 key: "c".into(),
1872 value: 3,
1873 size: 10,
1874 })
1875 .await
1876 .unwrap();
1877
1878 handle.flush(false).await.unwrap();
1879 last_write.wait(Durability::Written).await.unwrap();
1880
1881 let events = flusher.flushed_events();
1883 assert_eq!(events.len(), 1);
1884 let epoch_range = &events[0].epoch_range;
1885 assert_eq!(epoch_range.start, 1);
1886 assert_eq!(epoch_range.end, 4); coordinator.stop().await;
1890 }
1891
1892 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1893 async fn should_have_contiguous_epoch_ranges() {
1894 let flusher = TestFlusher::default();
1896 let mut coordinator = WriteCoordinator::new(
1897 test_config(),
1898 vec!["default".to_string()],
1899 TestContext::default(),
1900 flusher.initial_snapshot().await,
1901 flusher.clone(),
1902 );
1903 let handle = coordinator.handle("default");
1904 coordinator.start();
1905
1906 handle
1908 .try_write(TestWrite {
1909 key: "a".into(),
1910 value: 1,
1911 size: 10,
1912 })
1913 .await
1914 .unwrap();
1915 let mut write2 = handle
1916 .try_write(TestWrite {
1917 key: "b".into(),
1918 value: 2,
1919 size: 10,
1920 })
1921 .await
1922 .unwrap();
1923 handle.flush(false).await.unwrap();
1924 write2.wait(Durability::Written).await.unwrap();
1925
1926 let mut write3 = handle
1928 .try_write(TestWrite {
1929 key: "c".into(),
1930 value: 3,
1931 size: 10,
1932 })
1933 .await
1934 .unwrap();
1935 handle.flush(false).await.unwrap();
1936 write3.wait(Durability::Written).await.unwrap();
1937
1938 let events = flusher.flushed_events();
1940 assert_eq!(events.len(), 2);
1941
1942 let range1 = &events[0].epoch_range;
1943 let range2 = &events[1].epoch_range;
1944
1945 assert_eq!(range1.end, range2.start);
1947 assert_eq!(range1, &(1..3));
1948 assert_eq!(range2, &(3..4));
1949
1950 coordinator.stop().await;
1952 }
1953
1954 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1955 async fn should_include_exact_epochs_in_range() {
1956 let flusher = TestFlusher::default();
1958 let mut coordinator = WriteCoordinator::new(
1959 test_config(),
1960 vec!["default".to_string()],
1961 TestContext::default(),
1962 flusher.initial_snapshot().await,
1963 flusher.clone(),
1964 );
1965 let handle = coordinator.handle("default");
1966 coordinator.start();
1967
1968 let write1 = handle
1970 .try_write(TestWrite {
1971 key: "a".into(),
1972 value: 1,
1973 size: 10,
1974 })
1975 .await
1976 .unwrap();
1977 let epoch1 = write1.epoch().await.unwrap();
1978
1979 let mut write2 = handle
1980 .try_write(TestWrite {
1981 key: "b".into(),
1982 value: 2,
1983 size: 10,
1984 })
1985 .await
1986 .unwrap();
1987 let epoch2 = write2.epoch().await.unwrap();
1988
1989 handle.flush(false).await.unwrap();
1990 write2.wait(Durability::Written).await.unwrap();
1991
1992 let events = flusher.flushed_events();
1994 assert_eq!(events.len(), 1);
1995 let epoch_range = &events[0].epoch_range;
1996
1997 assert_eq!(epoch_range.start, epoch1);
1999 assert_eq!(epoch_range.end, epoch2 + 1);
2001 assert!(epoch_range.contains(&epoch1));
2003 assert!(epoch_range.contains(&epoch2));
2004
2005 coordinator.stop().await;
2007 }
2008
2009 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2014 async fn should_preserve_context_across_flushes() {
2015 let flusher = TestFlusher::default();
2017 let mut coordinator = WriteCoordinator::new(
2018 test_config(),
2019 vec!["default".to_string()],
2020 TestContext::default(),
2021 flusher.initial_snapshot().await,
2022 flusher.clone(),
2023 );
2024 let handle = coordinator.handle("default");
2025 coordinator.start();
2026
2027 let mut write1 = handle
2029 .try_write(TestWrite {
2030 key: "a".into(),
2031 value: 1,
2032 size: 10,
2033 })
2034 .await
2035 .unwrap();
2036 handle.flush(false).await.unwrap();
2037 write1.wait(Durability::Written).await.unwrap();
2038
2039 let mut write2 = handle
2041 .try_write(TestWrite {
2042 key: "a".into(),
2043 value: 2,
2044 size: 10,
2045 })
2046 .await
2047 .unwrap();
2048 handle.flush(false).await.unwrap();
2049 write2.wait(Durability::Written).await.unwrap();
2050
2051 let events = flusher.flushed_events();
2053 assert_eq!(events.len(), 2);
2054
2055 let (seq1, _) = events[0].val.writes.get("a").unwrap();
2057 assert_eq!(*seq1, 0);
2058
2059 let (seq2, _) = events[1].val.writes.get("a").unwrap();
2061 assert_eq!(*seq2, 1);
2062
2063 coordinator.stop().await;
2065 }
2066
2067 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2072 async fn should_receive_view_on_subscribe() {
2073 let flusher = TestFlusher::default();
2075 let mut coordinator = WriteCoordinator::new(
2076 test_config(),
2077 vec!["default".to_string()],
2078 TestContext::default(),
2079 flusher.initial_snapshot().await,
2080 flusher.clone(),
2081 );
2082 let handle = coordinator.handle("default");
2083 let (mut subscriber, _) = coordinator.subscribe();
2084 subscriber.initialize();
2085 coordinator.start();
2086
2087 handle
2089 .try_write(TestWrite {
2090 key: "a".into(),
2091 value: 1,
2092 size: 10,
2093 })
2094 .await
2095 .unwrap();
2096 handle.flush(false).await.unwrap();
2097
2098 let result = subscriber.recv().await;
2100 assert!(result.is_ok());
2101
2102 coordinator.stop().await;
2104 }
2105
2106 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2107 async fn should_include_snapshot_in_view_after_flush() {
2108 let flusher = TestFlusher::default();
2110 let mut coordinator = WriteCoordinator::new(
2111 test_config(),
2112 vec!["default".to_string()],
2113 TestContext::default(),
2114 flusher.initial_snapshot().await,
2115 flusher.clone(),
2116 );
2117 let handle = coordinator.handle("default");
2118 let (mut subscriber, _) = coordinator.subscribe();
2119 subscriber.initialize();
2120 coordinator.start();
2121
2122 handle
2124 .try_write(TestWrite {
2125 key: "a".into(),
2126 value: 1,
2127 size: 10,
2128 })
2129 .await
2130 .unwrap();
2131 handle.flush(false).await.unwrap();
2132
2133 let _ = subscriber.recv().await.unwrap();
2135 let result = subscriber.recv().await.unwrap();
2137
2138 assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2140
2141 coordinator.stop().await;
2143 }
2144
2145 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2146 async fn should_include_delta_in_view_after_flush() {
2147 let flusher = TestFlusher::default();
2149 let mut coordinator = WriteCoordinator::new(
2150 test_config(),
2151 vec!["default".to_string()],
2152 TestContext::default(),
2153 flusher.initial_snapshot().await,
2154 flusher.clone(),
2155 );
2156 let handle = coordinator.handle("default");
2157 let (mut subscriber, _) = coordinator.subscribe();
2158 subscriber.initialize();
2159 coordinator.start();
2160
2161 handle
2163 .try_write(TestWrite {
2164 key: "a".into(),
2165 value: 42,
2166 size: 10,
2167 })
2168 .await
2169 .unwrap();
2170 handle.flush(false).await.unwrap();
2171
2172 let _ = subscriber.recv().await.unwrap();
2174 let result = subscriber.recv().await.unwrap();
2176
2177 let flushed = result.last_written_delta.as_ref().unwrap();
2179 assert_eq!(flushed.val.get("a"), Some(&42));
2180
2181 coordinator.stop().await;
2183 }
2184
2185 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2186 async fn should_include_epoch_range_in_view_after_flush() {
2187 let flusher = TestFlusher::default();
2189 let mut coordinator = WriteCoordinator::new(
2190 test_config(),
2191 vec!["default".to_string()],
2192 TestContext::default(),
2193 flusher.initial_snapshot().await,
2194 flusher.clone(),
2195 );
2196 let handle = coordinator.handle("default");
2197 let (mut subscriber, _) = coordinator.subscribe();
2198 subscriber.initialize();
2199 coordinator.start();
2200
2201 let write1 = handle
2203 .try_write(TestWrite {
2204 key: "a".into(),
2205 value: 1,
2206 size: 10,
2207 })
2208 .await
2209 .unwrap();
2210 let write2 = handle
2211 .try_write(TestWrite {
2212 key: "b".into(),
2213 value: 2,
2214 size: 10,
2215 })
2216 .await
2217 .unwrap();
2218 handle.flush(false).await.unwrap();
2219
2220 let _ = subscriber.recv().await.unwrap();
2222 let result = subscriber.recv().await.unwrap();
2224
2225 let flushed = result.last_written_delta.as_ref().unwrap();
2227 let epoch1 = write1.epoch().await.unwrap();
2228 let epoch2 = write2.epoch().await.unwrap();
2229 assert!(flushed.epoch_range.contains(&epoch1));
2230 assert!(flushed.epoch_range.contains(&epoch2));
2231 assert_eq!(flushed.epoch_range.start, epoch1);
2232 assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2233
2234 coordinator.stop().await;
2236 }
2237
2238 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2239 async fn should_broadcast_frozen_delta_on_freeze() {
2240 let flusher = TestFlusher::default();
2242 let mut coordinator = WriteCoordinator::new(
2243 test_config(),
2244 vec!["default".to_string()],
2245 TestContext::default(),
2246 flusher.initial_snapshot().await,
2247 flusher.clone(),
2248 );
2249 let handle = coordinator.handle("default");
2250 let (mut subscriber, _) = coordinator.subscribe();
2251 subscriber.initialize();
2252 coordinator.start();
2253
2254 handle
2256 .try_write(TestWrite {
2257 key: "a".into(),
2258 value: 1,
2259 size: 10,
2260 })
2261 .await
2262 .unwrap();
2263 handle.flush(false).await.unwrap();
2264
2265 let state = subscriber.recv().await.unwrap();
2267 assert_eq!(state.frozen.len(), 1);
2268 assert!(state.frozen[0].val.contains_key("a"));
2269
2270 coordinator.stop().await;
2272 }
2273
2274 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2275 async fn should_remove_frozen_delta_after_flush_complete() {
2276 let flusher = TestFlusher::default();
2278 let mut coordinator = WriteCoordinator::new(
2279 test_config(),
2280 vec!["default".to_string()],
2281 TestContext::default(),
2282 flusher.initial_snapshot().await,
2283 flusher.clone(),
2284 );
2285 let handle = coordinator.handle("default");
2286 let (mut subscriber, _) = coordinator.subscribe();
2287 subscriber.initialize();
2288 coordinator.start();
2289
2290 handle
2292 .try_write(TestWrite {
2293 key: "a".into(),
2294 value: 1,
2295 size: 10,
2296 })
2297 .await
2298 .unwrap();
2299 handle.flush(false).await.unwrap();
2300
2301 let state1 = subscriber.recv().await.unwrap();
2303 assert_eq!(state1.frozen.len(), 1);
2304
2305 let state2 = subscriber.recv().await.unwrap();
2307 assert_eq!(state2.frozen.len(), 0);
2308 assert!(state2.last_written_delta.is_some());
2309
2310 coordinator.stop().await;
2312 }
2313
2314 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2319 async fn should_flush_even_when_no_writes_if_flush_storage() {
2320 let flusher = TestFlusher::default();
2322 let storage = Arc::new(InMemoryStorage::new());
2323 let snapshot = storage.snapshot().await.unwrap();
2324 let mut coordinator = WriteCoordinator::new(
2325 test_config(),
2326 vec!["default".to_string()],
2327 TestContext::default(),
2328 snapshot,
2329 flusher.clone(),
2330 );
2331 let handle = coordinator.handle("default");
2332 coordinator.start();
2333
2334 let mut flush_handle = handle.flush(true).await.unwrap();
2336 flush_handle.wait(Durability::Durable).await.unwrap();
2337
2338 assert_eq!(flusher.flushed_events().len(), 0);
2341
2342 coordinator.stop().await;
2344 }
2345
2346 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2347 async fn should_advance_durable_watermark() {
2348 let flusher = TestFlusher::default();
2350 let storage = Arc::new(InMemoryStorage::new());
2351 let snapshot = storage.snapshot().await.unwrap();
2352 let mut coordinator = WriteCoordinator::new(
2353 test_config(),
2354 vec!["default".to_string()],
2355 TestContext::default(),
2356 snapshot,
2357 flusher.clone(),
2358 );
2359 let handle = coordinator.handle("default");
2360 coordinator.start();
2361
2362 let mut write = handle
2364 .try_write(TestWrite {
2365 key: "a".into(),
2366 value: 1,
2367 size: 10,
2368 })
2369 .await
2370 .unwrap();
2371 let mut flush_handle = handle.flush(true).await.unwrap();
2372
2373 flush_handle.wait(Durability::Durable).await.unwrap();
2375 write.wait(Durability::Durable).await.unwrap();
2376 assert_eq!(flusher.flushed_events().len(), 1);
2377
2378 coordinator.stop().await;
2380 }
2381
2382 #[tokio::test]
2383 async fn should_see_applied_write_via_view() {
2384 let flusher = TestFlusher::default();
2386 let mut coordinator = WriteCoordinator::new(
2387 test_config(),
2388 vec!["default".to_string()],
2389 TestContext::default(),
2390 flusher.initial_snapshot().await,
2391 flusher,
2392 );
2393 let handle = coordinator.handle("default");
2394 coordinator.start();
2395
2396 let mut write = handle
2398 .try_write(TestWrite {
2399 key: "a".into(),
2400 value: 42,
2401 size: 10,
2402 })
2403 .await
2404 .unwrap();
2405 write.wait(Durability::Applied).await.unwrap();
2406
2407 let view = coordinator.view();
2409 assert_eq!(view.current.get("a"), Some(42));
2410
2411 coordinator.stop().await;
2413 }
2414
2415 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2420 async fn should_flush_writes_from_multiple_channels() {
2421 let flusher = TestFlusher::default();
2423 let mut coordinator = WriteCoordinator::new(
2424 test_config(),
2425 vec!["ch1".to_string(), "ch2".to_string()],
2426 TestContext::default(),
2427 flusher.initial_snapshot().await,
2428 flusher.clone(),
2429 );
2430 let ch1 = coordinator.handle("ch1");
2431 let ch2 = coordinator.handle("ch2");
2432 coordinator.start();
2433
2434 let mut w1 = ch1
2437 .try_write(TestWrite {
2438 key: "a".into(),
2439 value: 10,
2440 size: 10,
2441 })
2442 .await
2443 .unwrap();
2444 w1.wait(Durability::Applied).await.unwrap();
2445
2446 let mut w2 = ch2
2447 .try_write(TestWrite {
2448 key: "b".into(),
2449 value: 20,
2450 size: 10,
2451 })
2452 .await
2453 .unwrap();
2454 w2.wait(Durability::Applied).await.unwrap();
2455
2456 let mut w3 = ch1
2457 .try_write(TestWrite {
2458 key: "c".into(),
2459 value: 30,
2460 size: 10,
2461 })
2462 .await
2463 .unwrap();
2464 w3.wait(Durability::Applied).await.unwrap();
2465
2466 ch1.flush(false).await.unwrap();
2467 w3.wait(Durability::Written).await.unwrap();
2468
2469 let snapshot = flusher.storage.snapshot().await.unwrap();
2471 assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2472
2473 coordinator.stop().await;
2475 }
2476
2477 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2478 async fn should_succeed_with_write_timeout_when_queue_has_space() {
2479 let flusher = TestFlusher::default();
2481 let mut coordinator = WriteCoordinator::new(
2482 test_config(),
2483 vec!["default".to_string()],
2484 TestContext::default(),
2485 flusher.initial_snapshot().await,
2486 flusher.clone(),
2487 );
2488 let handle = coordinator.handle("default");
2489 coordinator.start();
2490
2491 let mut wh = handle
2493 .write_timeout(
2494 TestWrite {
2495 key: "a".into(),
2496 value: 1,
2497 size: 10,
2498 },
2499 Duration::from_secs(1),
2500 )
2501 .await
2502 .unwrap();
2503
2504 let result = wh.wait(Durability::Applied).await;
2506 assert!(result.is_ok());
2507
2508 coordinator.stop().await;
2510 }
2511
2512 #[tokio::test]
2513 async fn should_timeout_when_queue_full() {
2514 let flusher = TestFlusher::default();
2516 let config = WriteCoordinatorConfig {
2517 queue_capacity: 2,
2518 flush_interval: Duration::from_secs(3600),
2519 flush_size_threshold: usize::MAX,
2520 };
2521 let mut coordinator = WriteCoordinator::new(
2522 config,
2523 vec!["default".to_string()],
2524 TestContext::default(),
2525 flusher.initial_snapshot().await,
2526 flusher.clone(),
2527 );
2528 let handle = coordinator.handle("default");
2529
2530 let _ = handle
2532 .try_write(TestWrite {
2533 key: "a".into(),
2534 value: 1,
2535 size: 10,
2536 })
2537 .await;
2538 let _ = handle
2539 .try_write(TestWrite {
2540 key: "b".into(),
2541 value: 2,
2542 size: 10,
2543 })
2544 .await;
2545
2546 let result = handle
2548 .write_timeout(
2549 TestWrite {
2550 key: "c".into(),
2551 value: 3,
2552 size: 10,
2553 },
2554 Duration::from_millis(10),
2555 )
2556 .await;
2557
2558 assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2560 }
2561
2562 #[tokio::test]
2563 async fn should_return_write_in_timeout_error() {
2564 let flusher = TestFlusher::default();
2566 let config = WriteCoordinatorConfig {
2567 queue_capacity: 1,
2568 flush_interval: Duration::from_secs(3600),
2569 flush_size_threshold: usize::MAX,
2570 };
2571 let mut coordinator = WriteCoordinator::new(
2572 config,
2573 vec!["default".to_string()],
2574 TestContext::default(),
2575 flusher.initial_snapshot().await,
2576 flusher.clone(),
2577 );
2578 let handle = coordinator.handle("default");
2579
2580 let _ = handle
2582 .try_write(TestWrite {
2583 key: "a".into(),
2584 value: 1,
2585 size: 10,
2586 })
2587 .await;
2588
2589 let result = handle
2591 .write_timeout(
2592 TestWrite {
2593 key: "retry_me".into(),
2594 value: 42,
2595 size: 10,
2596 },
2597 Duration::from_millis(10),
2598 )
2599 .await;
2600 let Err(err) = result else {
2601 panic!("expected TimeoutError");
2602 };
2603
2604 let write = err.into_inner().expect("should contain the write");
2606 assert_eq!(write.key, "retry_me");
2607 assert_eq!(write.value, 42);
2608 }
2609
2610 #[tokio::test]
2611 async fn should_return_write_in_backpressure_error() {
2612 let flusher = TestFlusher::default();
2614 let config = WriteCoordinatorConfig {
2615 queue_capacity: 1,
2616 flush_interval: Duration::from_secs(3600),
2617 flush_size_threshold: usize::MAX,
2618 };
2619 let mut coordinator = WriteCoordinator::new(
2620 config,
2621 vec!["default".to_string()],
2622 TestContext::default(),
2623 flusher.initial_snapshot().await,
2624 flusher.clone(),
2625 );
2626 let handle = coordinator.handle("default");
2627
2628 let _ = handle
2630 .try_write(TestWrite {
2631 key: "a".into(),
2632 value: 1,
2633 size: 10,
2634 })
2635 .await;
2636
2637 let result = handle
2639 .try_write(TestWrite {
2640 key: "retry_me".into(),
2641 value: 42,
2642 size: 10,
2643 })
2644 .await;
2645 let Err(err) = result else {
2646 panic!("expected Backpressure");
2647 };
2648
2649 let write = err.into_inner().expect("should contain the write");
2651 assert_eq!(write.key, "retry_me");
2652 assert_eq!(write.value, 42);
2653 }
2654
2655 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2656 async fn should_succeed_when_queue_drains_within_timeout() {
2657 let flusher = TestFlusher::default();
2659 let config = WriteCoordinatorConfig {
2660 queue_capacity: 2,
2661 flush_interval: Duration::from_secs(3600),
2662 flush_size_threshold: usize::MAX,
2663 };
2664 let mut coordinator = WriteCoordinator::new(
2665 config,
2666 vec!["default".to_string()],
2667 TestContext::default(),
2668 flusher.initial_snapshot().await,
2669 flusher.clone(),
2670 );
2671 let handle = coordinator.handle("default");
2672
2673 let _ = handle
2675 .try_write(TestWrite {
2676 key: "a".into(),
2677 value: 1,
2678 size: 10,
2679 })
2680 .await;
2681 let _ = handle
2682 .try_write(TestWrite {
2683 key: "b".into(),
2684 value: 2,
2685 size: 10,
2686 })
2687 .await;
2688
2689 coordinator.start();
2691 let result = handle
2692 .write_timeout(
2693 TestWrite {
2694 key: "c".into(),
2695 value: 3,
2696 size: 10,
2697 },
2698 Duration::from_secs(5),
2699 )
2700 .await;
2701
2702 assert!(result.is_ok());
2704
2705 coordinator.stop().await;
2707 }
2708
2709 #[tokio::test]
2710 async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2711 let flusher = TestFlusher::default();
2713 let mut coordinator = WriteCoordinator::new(
2714 test_config(),
2715 vec!["default".to_string()],
2716 TestContext::default(),
2717 flusher.initial_snapshot().await,
2718 flusher.clone(),
2719 );
2720 let handle = coordinator.handle("default");
2721 coordinator.start();
2722
2723 coordinator.stop().await;
2725 let result = handle
2726 .write_timeout(
2727 TestWrite {
2728 key: "a".into(),
2729 value: 1,
2730 size: 10,
2731 },
2732 Duration::from_secs(1),
2733 )
2734 .await;
2735
2736 assert!(matches!(result, Err(WriteError::Shutdown)));
2738 }
2739
2740 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2741 async fn should_pause_and_resume_write_channel() {
2742 let flusher = TestFlusher::default();
2744 let mut config = test_config();
2745 config.flush_size_threshold = usize::MAX;
2746 config.flush_interval = Duration::from_hours(24);
2747 let mut coordinator = WriteCoordinator::new(
2748 config,
2749 vec!["a", "b"],
2750 TestContext::default(),
2751 flusher.initial_snapshot().await,
2752 flusher.clone(),
2753 );
2754 let handle_a = coordinator.handle("a");
2755 let handle_b = coordinator.handle("b");
2756 let pause_handle = coordinator.pause_handle("a");
2757
2758 pause_handle.pause();
2767 coordinator.start();
2768
2769 let mut result_a = handle_a
2771 .try_write(TestWrite {
2772 key: "a".into(),
2773 value: 1,
2774 size: 1,
2775 })
2776 .await
2777 .unwrap();
2778 for i in 0..1000 {
2779 handle_b
2780 .try_write(TestWrite {
2781 key: format!("b{}", i),
2782 value: i,
2783 size: 1,
2784 })
2785 .await
2786 .unwrap()
2787 .wait(Durability::Applied)
2788 .await
2789 .unwrap();
2790 }
2791
2792 let data = coordinator.view().current.data.lock().unwrap().clone();
2795 let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2796 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2797 pause_handle.unpause();
2798 result_a.wait(Durability::Applied).await.unwrap();
2800 let data = coordinator.view().current.data.lock().unwrap().clone();
2801 expected.insert("a".into());
2802 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2803 }
2804}