1#![allow(unused)]
2
3mod error;
4mod handle;
5pub mod subscriber;
6mod traits;
7
8use std::collections::HashMap;
9use std::ops::Range;
10use std::ops::{Deref, DerefMut};
11
12pub use error::{WriteError, WriteResult};
13use futures::stream::{self, SelectAll, StreamExt};
14pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
15pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
16pub use traits::{Delta, Durability, EpochStamped, Flusher};
17
18enum FlushEvent<D: Delta> {
20 FlushDelta { frozen: EpochStamped<D::Frozen> },
22 FlushStorage,
24}
25
26use crate::StorageRead;
28use crate::storage::StorageSnapshot;
29use async_trait::async_trait;
30pub use handle::EpochWatcher;
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::{broadcast, mpsc, oneshot, watch};
34use tokio::time::{Instant, Interval, interval_at};
35use tokio_util::sync::CancellationToken;
36
37#[derive(Debug, Clone)]
39pub struct WriteCoordinatorConfig {
40 pub queue_capacity: usize,
42 pub flush_interval: Duration,
44 pub flush_size_threshold: usize,
46}
47
48impl Default for WriteCoordinatorConfig {
49 fn default() -> Self {
50 Self {
51 queue_capacity: 10_000,
52 flush_interval: Duration::from_secs(10),
53 flush_size_threshold: 64 * 1024 * 1024, }
55 }
56}
57
58pub(crate) enum WriteCommand<D: Delta> {
59 Write {
60 write: D::Write,
61 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
62 },
63 Flush {
64 epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
65 flush_storage: bool,
66 },
67}
68
69pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
74 handles: HashMap<String, WriteCoordinatorHandle<D>>,
75 pause_handles: HashMap<String, PauseHandle>,
76 stop_tok: CancellationToken,
77 tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
78 write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
79 view: Arc<BroadcastedView<D>>,
80}
81
82impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
83 pub fn new(
84 config: WriteCoordinatorConfig,
85 channels: Vec<impl ToString>,
86 initial_context: D::Context,
87 initial_snapshot: Arc<dyn StorageSnapshot>,
88 flusher: F,
89 ) -> WriteCoordinator<D, F> {
90 let (watermarks, watcher) = EpochWatermarks::new();
91 let watermarks = Arc::new(watermarks);
92
93 let mut write_rxs = Vec::with_capacity(channels.len());
95 let mut write_txs = HashMap::new();
96 let mut pause_handles = HashMap::new();
97 for name in &channels {
98 let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
99 write_rxs.push(write_rx);
100 write_txs.insert(name.to_string(), write_tx);
101 pause_handles.insert(name.to_string(), pause_hdl);
102 }
103
104 let (flush_tx, flush_rx) = mpsc::channel(2);
110
111 let flush_stop_tok = CancellationToken::new();
112 let stop_tok = CancellationToken::new();
113 let write_task = WriteCoordinatorTask::new(
114 config,
115 initial_context,
116 initial_snapshot,
117 write_rxs,
118 flush_tx,
119 watermarks.clone(),
120 stop_tok.clone(),
121 flush_stop_tok.clone(),
122 );
123
124 let view = write_task.view.clone();
125
126 let mut handles = HashMap::new();
127 for name in channels {
128 let write_tx = write_txs.remove(&name.to_string()).expect("unreachable");
129 handles.insert(
130 name.to_string(),
131 WriteCoordinatorHandle::new(write_tx, watcher.clone(), view.clone()),
132 );
133 }
134
135 let flush_task = FlushTask {
136 flusher,
137 stop_tok: flush_stop_tok,
138 flush_rx,
139 watermarks: watermarks.clone(),
140 view: view.clone(),
141 last_flushed_epoch: 0,
142 };
143
144 Self {
145 handles,
146 pause_handles,
147 tasks: Some((write_task, flush_task)),
148 write_task_jh: None,
149 stop_tok,
150 view,
151 }
152 }
153
154 pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
155 self.handles
156 .get(name)
157 .expect("unknown channel name")
158 .clone()
159 }
160
161 pub fn pause_handle(&self, name: &str) -> PauseHandle {
162 self.pause_handles
163 .get(name)
164 .expect("unknown channel name")
165 .clone()
166 }
167
168 pub fn start(&mut self) {
169 let Some((write_task, flush_task)) = self.tasks.take() else {
170 return;
172 };
173 let flush_task_jh = flush_task.run();
174 let write_task_jh = write_task.run(flush_task_jh);
175 self.write_task_jh = Some(write_task_jh);
176 }
177
178 pub async fn stop(mut self) -> Result<(), String> {
179 let Some(write_task_jh) = self.write_task_jh.take() else {
180 return Ok(());
181 };
182 self.stop_tok.cancel();
183 write_task_jh.await.map_err(|e| e.to_string())?
184 }
185
186 pub fn view(&self) -> Arc<View<D>> {
187 self.view.current()
188 }
189
190 pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
191 let (view_rx, initial_view) = self.view.subscribe();
192 ViewSubscriber::new(view_rx, initial_view)
193 }
194}
195
196struct WriteCoordinatorTask<D: Delta> {
197 config: WriteCoordinatorConfig,
198 delta: CurrentDelta<D>,
199 flush_tx: mpsc::Sender<FlushEvent<D>>,
200 write_rxs: Vec<PausableReceiver<D>>,
201 watermarks: Arc<EpochWatermarks>,
202 view: Arc<BroadcastedView<D>>,
203 epoch: u64,
204 delta_start_epoch: u64,
205 flush_interval: Interval,
206 stop_tok: CancellationToken,
207 flush_stop_tok: CancellationToken,
208}
209
210impl<D: Delta> WriteCoordinatorTask<D> {
211 #[allow(clippy::too_many_arguments)]
215 pub fn new(
216 config: WriteCoordinatorConfig,
217 initial_context: D::Context,
218 initial_snapshot: Arc<dyn StorageSnapshot>,
219 write_rxs: Vec<PausableReceiver<D>>,
220 flush_tx: mpsc::Sender<FlushEvent<D>>,
221 watermarks: Arc<EpochWatermarks>,
222 stop_tok: CancellationToken,
223 flush_stop_tok: CancellationToken,
224 ) -> Self {
225 let delta = D::init(initial_context);
226
227 let initial_view = View {
228 current: delta.reader(),
229 frozen: vec![],
230 snapshot: initial_snapshot,
231 last_written_delta: None,
232 };
233 let initial_view = Arc::new(BroadcastedView::new(initial_view));
234
235 let flush_interval = interval_at(
236 Instant::now() + config.flush_interval,
237 config.flush_interval,
238 );
239 Self {
240 config,
241 delta: CurrentDelta::new(delta),
242 write_rxs,
243 flush_tx,
244 watermarks,
245 view: initial_view,
246 epoch: 1,
250 delta_start_epoch: 1,
251 flush_interval,
252 stop_tok,
253 flush_stop_tok,
254 }
255 }
256
257 pub fn run(
259 mut self,
260 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
261 ) -> tokio::task::JoinHandle<Result<(), String>> {
262 tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
263 }
264
265 async fn run_coordinator(
266 mut self,
267 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
268 ) -> Result<(), String> {
269 self.flush_interval.reset();
271
272 let mut write_stream: SelectAll<_> = SelectAll::new();
274 for rx in self.write_rxs.drain(..) {
275 write_stream.push(
276 stream::unfold(
277 rx,
278 |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
279 )
280 .boxed(),
281 );
282 }
283
284 loop {
285 tokio::select! {
286 cmd = write_stream.next() => {
287 match cmd {
288 Some(WriteCommand::Write { write, result_tx }) => {
289 self.handle_write(write, result_tx).await?;
290 }
291 Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
292 let _ = epoch_tx.send(Ok(handle::WriteApplied {
294 epoch: self.epoch.saturating_sub(1),
295 result: (),
296 }));
297 self.handle_flush(flush_storage).await;
298 }
299 None => {
300 break;
302 }
303 }
304 }
305
306 _ = self.flush_interval.tick() => {
307 self.handle_flush(false).await;
308 }
309
310 _ = self.stop_tok.cancelled() => {
311 break;
312 }
313 }
314 }
315
316 self.handle_flush(false).await;
318
319 self.flush_stop_tok.cancel();
321 flush_task_jh
323 .await
324 .map_err(|e| format!("flush task panicked: {}", e))?
325 .map_err(|e| format!("flush task error: {}", e))
326 }
327
328 async fn handle_write(
329 &mut self,
330 write: D::Write,
331 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
332 ) -> Result<(), String> {
333 let write_epoch = self.epoch;
334 self.epoch += 1;
335
336 let result = self.delta.apply(write);
337 let _ = result_tx.send(
339 result
340 .map(|apply_result| handle::WriteApplied {
341 epoch: write_epoch,
342 result: apply_result,
343 })
344 .map_err(|e| handle::WriteFailed {
345 epoch: write_epoch,
346 error: e,
347 }),
348 );
349
350 self.watermarks.update_applied(write_epoch);
352
353 if self.delta.estimate_size() >= self.config.flush_size_threshold {
354 self.handle_flush(false).await;
355 }
356
357 Ok(())
358 }
359
360 async fn handle_flush(&mut self, flush_storage: bool) {
361 self.flush_if_delta_has_writes().await;
362 if flush_storage {
363 let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
364 }
365 }
366
367 async fn flush_if_delta_has_writes(&mut self) {
368 if self.epoch == self.delta_start_epoch {
369 return;
370 }
371
372 self.flush_interval.reset();
373
374 let epoch_range = self.delta_start_epoch..self.epoch;
375 self.delta_start_epoch = self.epoch;
376 let (frozen, frozen_reader) = self.delta.freeze_and_init();
377 let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
378 let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
379 let reader = self.delta.reader();
380 self.view.update_delta_frozen(stamped_frozen_reader, reader);
383 let _ = self
386 .flush_tx
387 .send(FlushEvent::FlushDelta {
388 frozen: stamped_frozen,
389 })
390 .await;
391 }
392}
393
394struct FlushTask<D: Delta, F: Flusher<D>> {
395 flusher: F,
396 stop_tok: CancellationToken,
397 flush_rx: mpsc::Receiver<FlushEvent<D>>,
398 watermarks: Arc<EpochWatermarks>,
399 view: Arc<BroadcastedView<D>>,
400 last_flushed_epoch: u64,
401}
402
403impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
404 fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
405 tokio::spawn(async move {
406 loop {
407 tokio::select! {
408 event = self.flush_rx.recv() => {
409 let Some(event) = event else {
410 break;
411 };
412 self.handle_event(event).await?;
413 }
414 _ = self.stop_tok.cancelled() => {
415 break;
416 }
417 }
418 }
419 while let Ok(event) = self.flush_rx.try_recv() {
421 self.handle_event(event).await;
422 }
423 Ok(())
424 })
425 }
426
427 async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
428 match event {
429 FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
430 FlushEvent::FlushStorage => {
431 self.flusher
432 .flush_storage()
433 .await
434 .map_err(|e| WriteError::FlushError(e.to_string()))?;
435 self.watermarks.update_durable(self.last_flushed_epoch);
436 Ok(())
437 }
438 }
439 }
440
441 async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
442 let delta = frozen.val;
443 let epoch_range = frozen.epoch_range;
444 let snapshot = self
445 .flusher
446 .flush_delta(delta, &epoch_range)
447 .await
448 .map_err(|e| WriteError::FlushError(e.to_string()))?;
449 self.last_flushed_epoch = epoch_range.end - 1;
450 self.view.update_flush_finished(snapshot, epoch_range);
456 self.watermarks.update_written(self.last_flushed_epoch);
457 Ok(())
458 }
459}
460
461struct CurrentDelta<D: Delta> {
462 delta: Option<D>,
463}
464
465impl<D: Delta> Deref for CurrentDelta<D> {
466 type Target = D;
467
468 fn deref(&self) -> &Self::Target {
469 match &self.delta {
470 Some(d) => d,
471 None => panic!("current delta not initialized"),
472 }
473 }
474}
475
476impl<D: Delta> DerefMut for CurrentDelta<D> {
477 fn deref_mut(&mut self) -> &mut Self::Target {
478 match &mut self.delta {
479 Some(d) => d,
480 None => panic!("current delta not initialized"),
481 }
482 }
483}
484
485impl<D: Delta> CurrentDelta<D> {
486 fn new(delta: D) -> Self {
487 Self { delta: Some(delta) }
488 }
489
490 fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
491 let Some(delta) = self.delta.take() else {
492 panic!("delta not initialized");
493 };
494 let (frozen, frozen_reader, context) = delta.freeze();
495 let new_delta = D::init(context);
496 self.delta = Some(new_delta);
497 (frozen, frozen_reader)
498 }
499}
500
501pub struct EpochWatermarks {
502 applied_tx: tokio::sync::watch::Sender<u64>,
503 written_tx: tokio::sync::watch::Sender<u64>,
504 durable_tx: tokio::sync::watch::Sender<u64>,
505}
506
507impl EpochWatermarks {
508 pub fn new() -> (Self, EpochWatcher) {
509 let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
510 let (written_tx, written_rx) = tokio::sync::watch::channel(0);
511 let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
512 let watcher = EpochWatcher {
513 applied_rx,
514 written_rx,
515 durable_rx,
516 };
517 let watermarks = EpochWatermarks {
518 applied_tx,
519 written_tx,
520 durable_tx,
521 };
522 (watermarks, watcher)
523 }
524
525 pub fn update_applied(&self, epoch: u64) {
526 let _ = self.applied_tx.send(epoch);
527 }
528
529 pub fn update_written(&self, epoch: u64) {
530 let _ = self.written_tx.send(epoch);
531 }
532
533 pub fn update_durable(&self, epoch: u64) {
534 let _ = self.durable_tx.send(epoch);
535 }
536}
537
538pub(crate) struct BroadcastedView<D: Delta> {
539 inner: Mutex<BroadcastedViewInner<D>>,
540}
541
542impl<D: Delta> BroadcastedView<D> {
543 fn new(initial_view: View<D>) -> Self {
544 let (view_tx, _) = broadcast::channel(16);
545 Self {
546 inner: Mutex::new(BroadcastedViewInner {
547 view: Arc::new(initial_view),
548 view_tx,
549 }),
550 }
551 }
552
553 fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
554 self.inner
555 .lock()
556 .expect("lock poisoned")
557 .update_flush_finished(snapshot, epoch_range);
558 }
559
560 fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
561 self.inner
562 .lock()
563 .expect("lock poisoned")
564 .update_delta_frozen(frozen, reader);
565 }
566
567 fn current(&self) -> Arc<View<D>> {
568 self.inner.lock().expect("lock poisoned").current()
569 }
570
571 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
572 self.inner.lock().expect("lock poisoned").subscribe()
573 }
574}
575
576struct BroadcastedViewInner<D: Delta> {
577 view: Arc<View<D>>,
578 view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
579}
580
581impl<D: Delta> BroadcastedViewInner<D> {
582 fn update_flush_finished(
583 &mut self,
584 snapshot: Arc<dyn StorageSnapshot>,
585 epoch_range: Range<u64>,
586 ) {
587 let mut new_frozen = self.view.frozen.clone();
588 let last = new_frozen
589 .pop()
590 .expect("frozen should not be empty when flush completes");
591 assert_eq!(last.epoch_range, epoch_range);
592 self.view = Arc::new(View {
593 current: self.view.current.clone(),
594 frozen: new_frozen,
595 snapshot,
596 last_written_delta: Some(last),
597 });
598 self.view_tx.send(self.view.clone());
599 }
600
601 fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
602 let mut new_frozen = vec![frozen];
604 new_frozen.extend(self.view.frozen.iter().cloned());
605 self.view = Arc::new(View {
606 current: reader,
607 frozen: new_frozen,
608 snapshot: self.view.snapshot.clone(),
609 last_written_delta: self.view.last_written_delta.clone(),
610 });
611 self.view_tx.send(self.view.clone());
612 }
613
614 fn current(&self) -> Arc<View<D>> {
615 self.view.clone()
616 }
617
618 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
619 (self.view_tx.subscribe(), self.view.clone())
620 }
621}
622
623struct PausableReceiver<D: Delta> {
624 pause_rx: Option<watch::Receiver<bool>>,
625 rx: mpsc::Receiver<WriteCommand<D>>,
626}
627
628impl<D: Delta> PausableReceiver<D> {
629 async fn recv(&mut self) -> Option<WriteCommand<D>> {
630 if let Some(pause_rx) = self.pause_rx.as_mut() {
631 pause_rx.wait_for(|v| !*v).await;
632 }
633 self.rx.recv().await
634 }
635}
636
637#[derive(Clone)]
639pub struct PauseHandle {
640 pause_tx: tokio::sync::watch::Sender<bool>,
641}
642
643impl PauseHandle {
644 pub fn pause(&self) {
645 self.pause_tx.send_replace(true);
646 }
647
648 pub fn unpause(&self) {
649 self.pause_tx.send_replace(false);
650 }
651}
652
653fn pausable_channel<D: Delta>(
654 capacity: usize,
655) -> (
656 mpsc::Sender<WriteCommand<D>>,
657 PausableReceiver<D>,
658 PauseHandle,
659) {
660 let (pause_tx, pause_rx) = watch::channel(false);
661 let (tx, rx) = mpsc::channel(capacity);
662 (
663 tx,
664 PausableReceiver {
665 pause_rx: Some(pause_rx),
666 rx,
667 },
668 PauseHandle { pause_tx },
669 )
670}
671
672#[cfg(test)]
673mod tests {
674 use super::*;
675 use crate::BytesRange;
676 use crate::coordinator::Durability;
677 use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
678 use crate::storage::{PutRecordOp, Record, StorageSnapshot};
679 use crate::{Storage, StorageRead};
680 use async_trait::async_trait;
681 use bytes::Bytes;
682 use std::collections::{HashMap, HashSet};
683 use std::ops::Range;
684 use std::sync::Mutex;
685 #[derive(Clone, Debug)]
690 struct TestWrite {
691 key: String,
692 value: u64,
693 size: usize,
694 }
695
696 #[derive(Clone, Debug, Default)]
698 struct TestContext {
699 next_seq: u64,
700 error: Option<String>,
701 }
702
703 #[derive(Clone, Debug, Default)]
705 struct TestDeltaReader {
706 data: Arc<Mutex<HashMap<String, u64>>>,
707 }
708
709 impl TestDeltaReader {
710 fn get(&self, key: &str) -> Option<u64> {
711 self.data.lock().unwrap().get(key).copied()
712 }
713 }
714
715 #[derive(Debug)]
718 struct TestDelta {
719 context: TestContext,
720 writes: HashMap<String, (u64, u64)>,
721 key_values: Arc<Mutex<HashMap<String, u64>>>,
722 total_size: usize,
723 }
724
725 #[derive(Clone, Debug)]
726 struct FrozenTestDelta {
727 writes: HashMap<String, (u64, u64)>,
728 }
729
730 impl std::fmt::Debug for View<TestDelta> {
731 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
732 f.write_str("View<TestDelta>")
733 }
734 }
735
736 impl Delta for TestDelta {
737 type Context = TestContext;
738 type Write = TestWrite;
739 type DeltaView = TestDeltaReader;
740 type Frozen = FrozenTestDelta;
741 type FrozenView = Arc<HashMap<String, u64>>;
742 type ApplyResult = ();
743
744 fn init(context: Self::Context) -> Self {
745 Self {
746 context,
747 writes: HashMap::default(),
748 key_values: Arc::new(Mutex::new(HashMap::default())),
749 total_size: 0,
750 }
751 }
752
753 fn apply(&mut self, write: Self::Write) -> Result<(), String> {
754 if let Some(error) = &self.context.error {
755 return Err(error.clone());
756 }
757
758 let seq = self.context.next_seq;
759 self.context.next_seq += 1;
760
761 self.writes.insert(write.key.clone(), (seq, write.value));
762 self.total_size += write.size;
763 self.key_values
764 .lock()
765 .unwrap()
766 .insert(write.key, write.value);
767 Ok(())
768 }
769
770 fn estimate_size(&self) -> usize {
771 self.total_size
772 }
773
774 fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
775 let frozen = FrozenTestDelta {
776 writes: self.writes,
777 };
778 let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
779 (frozen, frozen_view, self.context)
780 }
781
782 fn reader(&self) -> Self::DeltaView {
783 TestDeltaReader {
784 data: self.key_values.clone(),
785 }
786 }
787 }
788
789 #[derive(Default)]
791 struct TestFlusherState {
792 flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
793 flush_started_tx: Option<oneshot::Sender<()>>,
795 unblock_rx: Option<mpsc::Receiver<()>>,
797 }
798
799 #[derive(Clone)]
800 struct TestFlusher {
801 state: Arc<Mutex<TestFlusherState>>,
802 storage: Arc<InMemoryStorage>,
803 }
804
805 impl Default for TestFlusher {
806 fn default() -> Self {
807 Self {
808 state: Arc::new(Mutex::new(TestFlusherState::default())),
809 storage: Arc::new(InMemoryStorage::new()),
810 }
811 }
812 }
813
814 impl TestFlusher {
815 fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
818 let (started_tx, started_rx) = oneshot::channel();
819 let (unblock_tx, unblock_rx) = mpsc::channel(1);
820 let flusher = Self {
821 state: Arc::new(Mutex::new(TestFlusherState {
822 flushed_events: Vec::new(),
823 flush_started_tx: Some(started_tx),
824 unblock_rx: Some(unblock_rx),
825 })),
826 storage: Arc::new(InMemoryStorage::new()),
827 };
828 (flusher, started_rx, unblock_tx)
829 }
830
831 fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
832 self.state.lock().unwrap().flushed_events.clone()
833 }
834
835 async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
836 self.storage.snapshot().await.unwrap()
837 }
838 }
839
840 #[async_trait]
841 impl Flusher<TestDelta> for TestFlusher {
842 async fn flush_delta(
843 &mut self,
844 frozen: FrozenTestDelta,
845 epoch_range: &Range<u64>,
846 ) -> Result<Arc<dyn StorageSnapshot>, String> {
847 let flush_started_tx = {
849 let mut state = self.state.lock().unwrap();
850 state.flush_started_tx.take()
851 };
852 if let Some(tx) = flush_started_tx {
853 let _ = tx.send(());
854 }
855
856 let unblock_rx = {
858 let mut state = self.state.lock().unwrap();
859 state.unblock_rx.take()
860 };
861 if let Some(mut rx) = unblock_rx {
862 rx.recv().await;
863 }
864
865 let records: Vec<PutRecordOp> = frozen
867 .writes
868 .iter()
869 .map(|(key, (seq, value))| {
870 let mut buf = Vec::with_capacity(16);
871 buf.extend_from_slice(&seq.to_le_bytes());
872 buf.extend_from_slice(&value.to_le_bytes());
873 Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
874 })
875 .collect();
876 self.storage
877 .put(records)
878 .await
879 .map_err(|e| format!("{}", e))?;
880
881 {
883 let mut state = self.state.lock().unwrap();
884 state
885 .flushed_events
886 .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
887 }
888
889 self.storage.snapshot().await.map_err(|e| format!("{}", e))
890 }
891
892 async fn flush_storage(&self) -> Result<(), String> {
893 let flush_started_tx = {
895 let mut state = self.state.lock().unwrap();
896 state.flush_started_tx.take()
897 };
898 if let Some(tx) = flush_started_tx {
899 let _ = tx.send(());
900 }
901
902 let unblock_rx = {
904 let mut state = self.state.lock().unwrap();
905 state.unblock_rx.take()
906 };
907 if let Some(mut rx) = unblock_rx {
908 rx.recv().await;
909 }
910
911 Ok(())
912 }
913 }
914
915 fn test_config() -> WriteCoordinatorConfig {
916 WriteCoordinatorConfig {
917 queue_capacity: 100,
918 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX,
920 }
921 }
922
923 async fn assert_snapshot_has_rows(
924 snapshot: &Arc<dyn StorageSnapshot>,
925 expected: &[(&str, u64, u64)],
926 ) {
927 let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
928 assert_eq!(
929 records.len(),
930 expected.len(),
931 "expected {} rows but snapshot has {}",
932 expected.len(),
933 records.len()
934 );
935 let mut actual: Vec<(String, u64, u64)> = records
936 .iter()
937 .map(|r| {
938 let key = String::from_utf8(r.key.to_vec()).unwrap();
939 let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
940 let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
941 (key, seq, value)
942 })
943 .collect();
944 actual.sort_by(|a, b| a.0.cmp(&b.0));
945 let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
946 expected.sort_by(|a, b| a.0.cmp(b.0));
947 for (actual, expected) in actual.iter().zip(expected.iter()) {
948 assert_eq!(
949 actual.0, expected.0,
950 "key mismatch: got {:?}, expected {:?}",
951 actual.0, expected.0
952 );
953 assert_eq!(
954 actual.1, expected.1,
955 "seq mismatch for key {:?}: got {}, expected {}",
956 actual.0, actual.1, expected.1
957 );
958 assert_eq!(
959 actual.2, expected.2,
960 "value mismatch for key {:?}: got {}, expected {}",
961 actual.0, actual.2, expected.2
962 );
963 }
964 }
965
966 #[tokio::test]
971 async fn should_assign_monotonic_epochs() {
972 let flusher = TestFlusher::default();
974 let mut coordinator = WriteCoordinator::new(
975 test_config(),
976 vec!["default".to_string()],
977 TestContext::default(),
978 flusher.initial_snapshot().await,
979 flusher,
980 );
981 let handle = coordinator.handle("default");
982 coordinator.start();
983
984 let write1 = handle
986 .try_write(TestWrite {
987 key: "a".into(),
988 value: 1,
989 size: 10,
990 })
991 .await
992 .unwrap();
993 let write2 = handle
994 .try_write(TestWrite {
995 key: "b".into(),
996 value: 2,
997 size: 10,
998 })
999 .await
1000 .unwrap();
1001 let write3 = handle
1002 .try_write(TestWrite {
1003 key: "c".into(),
1004 value: 3,
1005 size: 10,
1006 })
1007 .await
1008 .unwrap();
1009
1010 let epoch1 = write1.epoch().await.unwrap();
1011 let epoch2 = write2.epoch().await.unwrap();
1012 let epoch3 = write3.epoch().await.unwrap();
1013
1014 assert!(epoch1 < epoch2);
1016 assert!(epoch2 < epoch3);
1017
1018 coordinator.stop().await;
1020 }
1021
1022 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1023 async fn should_apply_writes_in_order() {
1024 let flusher = TestFlusher::default();
1026 let mut coordinator = WriteCoordinator::new(
1027 test_config(),
1028 vec!["default".to_string()],
1029 TestContext::default(),
1030 flusher.initial_snapshot().await,
1031 flusher.clone(),
1032 );
1033 let handle = coordinator.handle("default");
1034 coordinator.start();
1035
1036 handle
1038 .try_write(TestWrite {
1039 key: "a".into(),
1040 value: 1,
1041 size: 10,
1042 })
1043 .await
1044 .unwrap();
1045 handle
1046 .try_write(TestWrite {
1047 key: "a".into(),
1048 value: 2,
1049 size: 10,
1050 })
1051 .await
1052 .unwrap();
1053 let mut last_write = handle
1054 .try_write(TestWrite {
1055 key: "a".into(),
1056 value: 3,
1057 size: 10,
1058 })
1059 .await
1060 .unwrap();
1061
1062 handle.flush(false).await.unwrap();
1063 last_write.wait(Durability::Written).await.unwrap();
1065
1066 let events = flusher.flushed_events();
1068 assert_eq!(events.len(), 1);
1069 let frozen_delta = &events[0];
1070 let delta = &frozen_delta.val;
1071 let (seq, value) = delta.writes.get("a").unwrap();
1073 assert_eq!(*value, 3);
1074 assert_eq!(*seq, 2);
1075
1076 coordinator.stop().await;
1078 }
1079
1080 #[tokio::test]
1081 async fn should_update_applied_watermark_after_each_write() {
1082 let flusher = TestFlusher::default();
1084 let mut coordinator = WriteCoordinator::new(
1085 test_config(),
1086 vec!["default".to_string()],
1087 TestContext::default(),
1088 flusher.initial_snapshot().await,
1089 flusher,
1090 );
1091 let handle = coordinator.handle("default");
1092 coordinator.start();
1093
1094 let mut write_handle = handle
1096 .try_write(TestWrite {
1097 key: "a".into(),
1098 value: 1,
1099 size: 10,
1100 })
1101 .await
1102 .unwrap();
1103
1104 let result = write_handle.wait(Durability::Applied).await;
1106 assert!(result.is_ok());
1107
1108 coordinator.stop().await;
1110 }
1111
1112 #[tokio::test]
1113 async fn should_propagate_apply_error_to_handle() {
1114 let flusher = TestFlusher::default();
1116 let context = TestContext {
1117 error: Some("apply error".to_string()),
1118 ..Default::default()
1119 };
1120 let mut coordinator = WriteCoordinator::new(
1121 test_config(),
1122 vec!["default".to_string()],
1123 context,
1124 flusher.initial_snapshot().await,
1125 flusher,
1126 );
1127 let handle = coordinator.handle("default");
1128 coordinator.start();
1129
1130 let write = handle
1132 .try_write(TestWrite {
1133 key: "a".into(),
1134 value: 1,
1135 size: 10,
1136 })
1137 .await
1138 .unwrap();
1139
1140 let result = write.epoch().await;
1141
1142 assert!(
1144 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1145 );
1146
1147 coordinator.stop().await;
1149 }
1150
1151 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1156 async fn should_flush_on_command() {
1157 let flusher = TestFlusher::default();
1159 let mut coordinator = WriteCoordinator::new(
1160 test_config(),
1161 vec!["default".to_string()],
1162 TestContext::default(),
1163 flusher.initial_snapshot().await,
1164 flusher.clone(),
1165 );
1166 let handle = coordinator.handle("default");
1167 coordinator.start();
1168
1169 let mut write = handle
1171 .try_write(TestWrite {
1172 key: "a".into(),
1173 value: 1,
1174 size: 10,
1175 })
1176 .await
1177 .unwrap();
1178 handle.flush(false).await.unwrap();
1179 write.wait(Durability::Written).await.unwrap();
1180
1181 assert_eq!(flusher.flushed_events().len(), 1);
1183
1184 coordinator.stop().await;
1186 }
1187
1188 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1189 async fn should_wait_on_flush_handle() {
1190 let flusher = TestFlusher::default();
1192 let mut coordinator = WriteCoordinator::new(
1193 test_config(),
1194 vec!["default".to_string()],
1195 TestContext::default(),
1196 flusher.initial_snapshot().await,
1197 flusher.clone(),
1198 );
1199 let handle = coordinator.handle("default");
1200 coordinator.start();
1201
1202 handle
1204 .try_write(TestWrite {
1205 key: "a".into(),
1206 value: 1,
1207 size: 10,
1208 })
1209 .await
1210 .unwrap();
1211 let mut flush_handle = handle.flush(false).await.unwrap();
1212
1213 flush_handle.wait(Durability::Written).await.unwrap();
1215 assert_eq!(flusher.flushed_events().len(), 1);
1216
1217 coordinator.stop().await;
1219 }
1220
1221 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1222 async fn should_return_correct_epoch_from_flush_handle() {
1223 let flusher = TestFlusher::default();
1225 let mut coordinator = WriteCoordinator::new(
1226 test_config(),
1227 vec!["default".to_string()],
1228 TestContext::default(),
1229 flusher.initial_snapshot().await,
1230 flusher,
1231 );
1232 let handle = coordinator.handle("default");
1233 coordinator.start();
1234
1235 let write1 = handle
1237 .try_write(TestWrite {
1238 key: "a".into(),
1239 value: 1,
1240 size: 10,
1241 })
1242 .await
1243 .unwrap();
1244 let write2 = handle
1245 .try_write(TestWrite {
1246 key: "b".into(),
1247 value: 2,
1248 size: 10,
1249 })
1250 .await
1251 .unwrap();
1252 let flush_handle = handle.flush(false).await.unwrap();
1253
1254 let flush_epoch = flush_handle.epoch().await.unwrap();
1256 let write2_epoch = write2.epoch().await.unwrap();
1257 assert_eq!(flush_epoch, write2_epoch);
1258
1259 coordinator.stop().await;
1261 }
1262
1263 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1264 async fn should_include_all_pending_writes_in_flush() {
1265 let flusher = TestFlusher::default();
1267 let mut coordinator = WriteCoordinator::new(
1268 test_config(),
1269 vec!["default".to_string()],
1270 TestContext::default(),
1271 flusher.initial_snapshot().await,
1272 flusher.clone(),
1273 );
1274 let handle = coordinator.handle("default");
1275 coordinator.start();
1276
1277 handle
1279 .try_write(TestWrite {
1280 key: "a".into(),
1281 value: 1,
1282 size: 10,
1283 })
1284 .await
1285 .unwrap();
1286 handle
1287 .try_write(TestWrite {
1288 key: "b".into(),
1289 value: 2,
1290 size: 10,
1291 })
1292 .await
1293 .unwrap();
1294 let mut last_write = handle
1295 .try_write(TestWrite {
1296 key: "c".into(),
1297 value: 3,
1298 size: 10,
1299 })
1300 .await
1301 .unwrap();
1302
1303 handle.flush(false).await.unwrap();
1304 last_write.wait(Durability::Written).await.unwrap();
1305
1306 let events = flusher.flushed_events();
1308 assert_eq!(events.len(), 1);
1309 let frozen_delta = &events[0];
1310 assert_eq!(frozen_delta.val.writes.len(), 3);
1311 let snapshot = flusher.storage.snapshot().await.unwrap();
1312 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1313
1314 coordinator.stop().await;
1316 }
1317
1318 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1319 async fn should_skip_flush_when_no_new_writes() {
1320 let flusher = TestFlusher::default();
1322 let mut coordinator = WriteCoordinator::new(
1323 test_config(),
1324 vec!["default".to_string()],
1325 TestContext::default(),
1326 flusher.initial_snapshot().await,
1327 flusher.clone(),
1328 );
1329 let handle = coordinator.handle("default");
1330 coordinator.start();
1331
1332 let mut write = handle
1334 .try_write(TestWrite {
1335 key: "a".into(),
1336 value: 1,
1337 size: 10,
1338 })
1339 .await
1340 .unwrap();
1341 handle.flush(false).await.unwrap();
1342 write.wait(Durability::Written).await.unwrap();
1343
1344 handle.flush(false).await.unwrap();
1346
1347 let sync_write = handle
1350 .try_write(TestWrite {
1351 key: "sync".into(),
1352 value: 0,
1353 size: 1,
1354 })
1355 .await
1356 .unwrap();
1357 sync_write.epoch().await.unwrap();
1358
1359 assert_eq!(flusher.flushed_events().len(), 1);
1361
1362 coordinator.stop().await;
1364 }
1365
1366 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1367 async fn should_update_written_watermark_after_flush() {
1368 let flusher = TestFlusher::default();
1370 let mut coordinator = WriteCoordinator::new(
1371 test_config(),
1372 vec!["default".to_string()],
1373 TestContext::default(),
1374 flusher.initial_snapshot().await,
1375 flusher,
1376 );
1377 let handle = coordinator.handle("default");
1378 coordinator.start();
1379
1380 let mut write_handle = handle
1382 .try_write(TestWrite {
1383 key: "a".into(),
1384 value: 1,
1385 size: 10,
1386 })
1387 .await
1388 .unwrap();
1389
1390 handle.flush(false).await.unwrap();
1391
1392 let result = write_handle.wait(Durability::Written).await;
1394 assert!(result.is_ok());
1395
1396 coordinator.stop().await;
1398 }
1399
1400 #[tokio::test(start_paused = true)]
1405 async fn should_flush_on_flush_interval() {
1406 let flusher = TestFlusher::default();
1408 let config = WriteCoordinatorConfig {
1409 queue_capacity: 100,
1410 flush_interval: Duration::from_millis(100),
1411 flush_size_threshold: usize::MAX,
1412 };
1413 let mut coordinator = WriteCoordinator::new(
1414 config,
1415 vec!["default".to_string()],
1416 TestContext::default(),
1417 flusher.initial_snapshot().await,
1418 flusher.clone(),
1419 );
1420 let handle = coordinator.handle("default");
1421 coordinator.start();
1422
1423 tokio::task::yield_now().await;
1425 let mut write = handle
1426 .try_write(TestWrite {
1427 key: "a".into(),
1428 value: 1,
1429 size: 10,
1430 })
1431 .await
1432 .unwrap();
1433 write.wait(Durability::Applied).await.unwrap();
1434
1435 assert_eq!(flusher.flushed_events().len(), 0);
1437
1438 tokio::time::advance(Duration::from_millis(150)).await;
1440 tokio::task::yield_now().await;
1441
1442 assert_eq!(flusher.flushed_events().len(), 1);
1444 let snapshot = flusher.storage.snapshot().await.unwrap();
1445 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1446
1447 coordinator.stop().await;
1449 }
1450
1451 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1456 async fn should_flush_when_size_threshold_exceeded() {
1457 let flusher = TestFlusher::default();
1459 let config = WriteCoordinatorConfig {
1460 queue_capacity: 100,
1461 flush_interval: Duration::from_secs(3600),
1462 flush_size_threshold: 100, };
1464 let mut coordinator = WriteCoordinator::new(
1465 config,
1466 vec!["default".to_string()],
1467 TestContext::default(),
1468 flusher.initial_snapshot().await,
1469 flusher.clone(),
1470 );
1471 let handle = coordinator.handle("default");
1472 coordinator.start();
1473
1474 let mut write = handle
1476 .try_write(TestWrite {
1477 key: "a".into(),
1478 value: 1,
1479 size: 150,
1480 })
1481 .await
1482 .unwrap();
1483 write.wait(Durability::Written).await.unwrap();
1484
1485 assert_eq!(flusher.flushed_events().len(), 1);
1487
1488 coordinator.stop().await;
1490 }
1491
1492 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1493 async fn should_accumulate_until_threshold() {
1494 let flusher = TestFlusher::default();
1496 let config = WriteCoordinatorConfig {
1497 queue_capacity: 100,
1498 flush_interval: Duration::from_secs(3600),
1499 flush_size_threshold: 100,
1500 };
1501 let mut coordinator = WriteCoordinator::new(
1502 config,
1503 vec!["default".to_string()],
1504 TestContext::default(),
1505 flusher.initial_snapshot().await,
1506 flusher.clone(),
1507 );
1508 let handle = coordinator.handle("default");
1509 coordinator.start();
1510
1511 for i in 0..5 {
1513 let mut w = handle
1514 .try_write(TestWrite {
1515 key: format!("key{}", i),
1516 value: i,
1517 size: 15,
1518 })
1519 .await
1520 .unwrap();
1521 w.wait(Durability::Applied).await.unwrap();
1522 }
1523
1524 assert_eq!(flusher.flushed_events().len(), 0);
1526
1527 let mut final_write = handle
1529 .try_write(TestWrite {
1530 key: "final".into(),
1531 value: 999,
1532 size: 30,
1533 })
1534 .await
1535 .unwrap();
1536 final_write.wait(Durability::Written).await.unwrap();
1537
1538 assert_eq!(flusher.flushed_events().len(), 1);
1540
1541 coordinator.stop().await;
1543 }
1544
1545 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1550 async fn should_accept_writes_during_flush() {
1551 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1553 let mut coordinator = WriteCoordinator::new(
1554 test_config(),
1555 vec!["default".to_string()],
1556 TestContext::default(),
1557 flusher.initial_snapshot().await,
1558 flusher.clone(),
1559 );
1560 let handle = coordinator.handle("default");
1561 coordinator.start();
1562
1563 let write1 = handle
1565 .try_write(TestWrite {
1566 key: "a".into(),
1567 value: 1,
1568 size: 10,
1569 })
1570 .await
1571 .unwrap();
1572 handle.flush(false).await.unwrap();
1573 flush_started_rx.await.unwrap(); let write2 = handle
1577 .try_write(TestWrite {
1578 key: "b".into(),
1579 value: 2,
1580 size: 10,
1581 })
1582 .await
1583 .unwrap();
1584 assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1585
1586 unblock_tx.send(()).await.unwrap();
1588 coordinator.stop().await;
1589 }
1590
1591 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1592 async fn should_assign_new_epochs_during_flush() {
1593 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1595 let mut coordinator = WriteCoordinator::new(
1596 test_config(),
1597 vec!["default".to_string()],
1598 TestContext::default(),
1599 flusher.initial_snapshot().await,
1600 flusher.clone(),
1601 );
1602 let handle = coordinator.handle("default");
1603 coordinator.start();
1604
1605 handle
1607 .try_write(TestWrite {
1608 key: "a".into(),
1609 value: 1,
1610 size: 10,
1611 })
1612 .await
1613 .unwrap();
1614 handle.flush(false).await.unwrap();
1615 flush_started_rx.await.unwrap(); let w1 = handle
1619 .try_write(TestWrite {
1620 key: "b".into(),
1621 value: 2,
1622 size: 10,
1623 })
1624 .await
1625 .unwrap();
1626 let w2 = handle
1627 .try_write(TestWrite {
1628 key: "c".into(),
1629 value: 3,
1630 size: 10,
1631 })
1632 .await
1633 .unwrap();
1634
1635 let e1 = w1.epoch().await.unwrap();
1637 let e2 = w2.epoch().await.unwrap();
1638 assert!(e1 < e2);
1639
1640 unblock_tx.send(()).await.unwrap();
1642 coordinator.stop().await;
1643 }
1644
1645 #[tokio::test]
1650 async fn should_return_backpressure_when_queue_full() {
1651 let flusher = TestFlusher::default();
1653 let config = WriteCoordinatorConfig {
1654 queue_capacity: 2,
1655 flush_interval: Duration::from_secs(3600),
1656 flush_size_threshold: usize::MAX,
1657 };
1658 let mut coordinator = WriteCoordinator::new(
1659 config,
1660 vec!["default".to_string()],
1661 TestContext::default(),
1662 flusher.initial_snapshot().await,
1663 flusher.clone(),
1664 );
1665 let handle = coordinator.handle("default");
1666 let _ = handle
1670 .try_write(TestWrite {
1671 key: "a".into(),
1672 value: 1,
1673 size: 10,
1674 })
1675 .await;
1676 let _ = handle
1677 .try_write(TestWrite {
1678 key: "b".into(),
1679 value: 2,
1680 size: 10,
1681 })
1682 .await;
1683
1684 let result = handle
1686 .try_write(TestWrite {
1687 key: "c".into(),
1688 value: 3,
1689 size: 10,
1690 })
1691 .await;
1692
1693 assert!(matches!(result, Err(WriteError::Backpressure(_))));
1695 }
1696
1697 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1698 async fn should_accept_writes_after_queue_drains() {
1699 let flusher = TestFlusher::default();
1701 let config = WriteCoordinatorConfig {
1702 queue_capacity: 2,
1703 flush_interval: Duration::from_secs(3600),
1704 flush_size_threshold: usize::MAX,
1705 };
1706 let mut coordinator = WriteCoordinator::new(
1707 config,
1708 vec!["default".to_string()],
1709 TestContext::default(),
1710 flusher.initial_snapshot().await,
1711 flusher.clone(),
1712 );
1713 let handle = coordinator.handle("default");
1714
1715 let _ = handle
1717 .try_write(TestWrite {
1718 key: "a".into(),
1719 value: 1,
1720 size: 10,
1721 })
1722 .await;
1723 let mut write_b = handle
1724 .try_write(TestWrite {
1725 key: "b".into(),
1726 value: 2,
1727 size: 10,
1728 })
1729 .await
1730 .unwrap();
1731
1732 coordinator.start();
1734 write_b.wait(Durability::Applied).await.unwrap();
1735
1736 let result = handle
1738 .try_write(TestWrite {
1739 key: "c".into(),
1740 value: 3,
1741 size: 10,
1742 })
1743 .await;
1744 assert!(result.is_ok());
1745
1746 coordinator.stop().await;
1748 }
1749
1750 #[tokio::test]
1755 async fn should_shutdown_cleanly_when_stop_called() {
1756 let flusher = TestFlusher::default();
1758 let mut coordinator = WriteCoordinator::new(
1759 test_config(),
1760 vec!["default".to_string()],
1761 TestContext::default(),
1762 flusher.initial_snapshot().await,
1763 flusher.clone(),
1764 );
1765 let handle = coordinator.handle("default");
1766 coordinator.start();
1767
1768 let result = coordinator.stop().await;
1770
1771 assert!(result.is_ok());
1773 }
1774
1775 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1776 async fn should_flush_pending_writes_on_shutdown() {
1777 let flusher = TestFlusher::default();
1779 let config = WriteCoordinatorConfig {
1780 queue_capacity: 100,
1781 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX, };
1784 let mut coordinator = WriteCoordinator::new(
1785 config,
1786 vec!["default".to_string()],
1787 TestContext::default(),
1788 flusher.initial_snapshot().await,
1789 flusher.clone(),
1790 );
1791 let handle = coordinator.handle("default");
1792 coordinator.start();
1793
1794 let write = handle
1796 .try_write(TestWrite {
1797 key: "a".into(),
1798 value: 1,
1799 size: 10,
1800 })
1801 .await
1802 .unwrap();
1803 let epoch = write.epoch().await.unwrap();
1804
1805 coordinator.stop().await;
1807
1808 let events = flusher.flushed_events();
1810 assert_eq!(events.len(), 1);
1811 let epoch_range = &events[0].epoch_range;
1812 assert!(epoch_range.contains(&epoch));
1813 }
1814
1815 #[tokio::test]
1816 async fn should_return_shutdown_error_after_coordinator_stops() {
1817 let flusher = TestFlusher::default();
1819 let mut coordinator = WriteCoordinator::new(
1820 test_config(),
1821 vec!["default".to_string()],
1822 TestContext::default(),
1823 flusher.initial_snapshot().await,
1824 flusher.clone(),
1825 );
1826 let handle = coordinator.handle("default");
1827 coordinator.start();
1828
1829 coordinator.stop().await;
1831
1832 let result = handle
1834 .try_write(TestWrite {
1835 key: "a".into(),
1836 value: 1,
1837 size: 10,
1838 })
1839 .await;
1840
1841 assert!(matches!(result, Err(WriteError::Shutdown)));
1843 }
1844
1845 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1850 async fn should_track_epoch_range_in_flush_event() {
1851 let flusher = TestFlusher::default();
1853 let mut coordinator = WriteCoordinator::new(
1854 test_config(),
1855 vec!["default".to_string()],
1856 TestContext::default(),
1857 flusher.initial_snapshot().await,
1858 flusher.clone(),
1859 );
1860 let handle = coordinator.handle("default");
1861 coordinator.start();
1862
1863 handle
1865 .try_write(TestWrite {
1866 key: "a".into(),
1867 value: 1,
1868 size: 10,
1869 })
1870 .await
1871 .unwrap();
1872 handle
1873 .try_write(TestWrite {
1874 key: "b".into(),
1875 value: 2,
1876 size: 10,
1877 })
1878 .await
1879 .unwrap();
1880 let mut last_write = handle
1881 .try_write(TestWrite {
1882 key: "c".into(),
1883 value: 3,
1884 size: 10,
1885 })
1886 .await
1887 .unwrap();
1888
1889 handle.flush(false).await.unwrap();
1890 last_write.wait(Durability::Written).await.unwrap();
1891
1892 let events = flusher.flushed_events();
1894 assert_eq!(events.len(), 1);
1895 let epoch_range = &events[0].epoch_range;
1896 assert_eq!(epoch_range.start, 1);
1897 assert_eq!(epoch_range.end, 4); coordinator.stop().await;
1901 }
1902
1903 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1904 async fn should_have_contiguous_epoch_ranges() {
1905 let flusher = TestFlusher::default();
1907 let mut coordinator = WriteCoordinator::new(
1908 test_config(),
1909 vec!["default".to_string()],
1910 TestContext::default(),
1911 flusher.initial_snapshot().await,
1912 flusher.clone(),
1913 );
1914 let handle = coordinator.handle("default");
1915 coordinator.start();
1916
1917 handle
1919 .try_write(TestWrite {
1920 key: "a".into(),
1921 value: 1,
1922 size: 10,
1923 })
1924 .await
1925 .unwrap();
1926 let mut write2 = handle
1927 .try_write(TestWrite {
1928 key: "b".into(),
1929 value: 2,
1930 size: 10,
1931 })
1932 .await
1933 .unwrap();
1934 handle.flush(false).await.unwrap();
1935 write2.wait(Durability::Written).await.unwrap();
1936
1937 let mut write3 = handle
1939 .try_write(TestWrite {
1940 key: "c".into(),
1941 value: 3,
1942 size: 10,
1943 })
1944 .await
1945 .unwrap();
1946 handle.flush(false).await.unwrap();
1947 write3.wait(Durability::Written).await.unwrap();
1948
1949 let events = flusher.flushed_events();
1951 assert_eq!(events.len(), 2);
1952
1953 let range1 = &events[0].epoch_range;
1954 let range2 = &events[1].epoch_range;
1955
1956 assert_eq!(range1.end, range2.start);
1958 assert_eq!(range1, &(1..3));
1959 assert_eq!(range2, &(3..4));
1960
1961 coordinator.stop().await;
1963 }
1964
1965 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1966 async fn should_include_exact_epochs_in_range() {
1967 let flusher = TestFlusher::default();
1969 let mut coordinator = WriteCoordinator::new(
1970 test_config(),
1971 vec!["default".to_string()],
1972 TestContext::default(),
1973 flusher.initial_snapshot().await,
1974 flusher.clone(),
1975 );
1976 let handle = coordinator.handle("default");
1977 coordinator.start();
1978
1979 let write1 = handle
1981 .try_write(TestWrite {
1982 key: "a".into(),
1983 value: 1,
1984 size: 10,
1985 })
1986 .await
1987 .unwrap();
1988 let epoch1 = write1.epoch().await.unwrap();
1989
1990 let mut write2 = handle
1991 .try_write(TestWrite {
1992 key: "b".into(),
1993 value: 2,
1994 size: 10,
1995 })
1996 .await
1997 .unwrap();
1998 let epoch2 = write2.epoch().await.unwrap();
1999
2000 handle.flush(false).await.unwrap();
2001 write2.wait(Durability::Written).await.unwrap();
2002
2003 let events = flusher.flushed_events();
2005 assert_eq!(events.len(), 1);
2006 let epoch_range = &events[0].epoch_range;
2007
2008 assert_eq!(epoch_range.start, epoch1);
2010 assert_eq!(epoch_range.end, epoch2 + 1);
2012 assert!(epoch_range.contains(&epoch1));
2014 assert!(epoch_range.contains(&epoch2));
2015
2016 coordinator.stop().await;
2018 }
2019
2020 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2025 async fn should_preserve_context_across_flushes() {
2026 let flusher = TestFlusher::default();
2028 let mut coordinator = WriteCoordinator::new(
2029 test_config(),
2030 vec!["default".to_string()],
2031 TestContext::default(),
2032 flusher.initial_snapshot().await,
2033 flusher.clone(),
2034 );
2035 let handle = coordinator.handle("default");
2036 coordinator.start();
2037
2038 let mut write1 = handle
2040 .try_write(TestWrite {
2041 key: "a".into(),
2042 value: 1,
2043 size: 10,
2044 })
2045 .await
2046 .unwrap();
2047 handle.flush(false).await.unwrap();
2048 write1.wait(Durability::Written).await.unwrap();
2049
2050 let mut write2 = handle
2052 .try_write(TestWrite {
2053 key: "a".into(),
2054 value: 2,
2055 size: 10,
2056 })
2057 .await
2058 .unwrap();
2059 handle.flush(false).await.unwrap();
2060 write2.wait(Durability::Written).await.unwrap();
2061
2062 let events = flusher.flushed_events();
2064 assert_eq!(events.len(), 2);
2065
2066 let (seq1, _) = events[0].val.writes.get("a").unwrap();
2068 assert_eq!(*seq1, 0);
2069
2070 let (seq2, _) = events[1].val.writes.get("a").unwrap();
2072 assert_eq!(*seq2, 1);
2073
2074 coordinator.stop().await;
2076 }
2077
2078 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2083 async fn should_receive_view_on_subscribe() {
2084 let flusher = TestFlusher::default();
2086 let mut coordinator = WriteCoordinator::new(
2087 test_config(),
2088 vec!["default".to_string()],
2089 TestContext::default(),
2090 flusher.initial_snapshot().await,
2091 flusher.clone(),
2092 );
2093 let handle = coordinator.handle("default");
2094 let (mut subscriber, _) = coordinator.subscribe();
2095 subscriber.initialize();
2096 coordinator.start();
2097
2098 handle
2100 .try_write(TestWrite {
2101 key: "a".into(),
2102 value: 1,
2103 size: 10,
2104 })
2105 .await
2106 .unwrap();
2107 handle.flush(false).await.unwrap();
2108
2109 let result = subscriber.recv().await;
2111 assert!(result.is_ok());
2112
2113 coordinator.stop().await;
2115 }
2116
2117 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2118 async fn should_include_snapshot_in_view_after_flush() {
2119 let flusher = TestFlusher::default();
2121 let mut coordinator = WriteCoordinator::new(
2122 test_config(),
2123 vec!["default".to_string()],
2124 TestContext::default(),
2125 flusher.initial_snapshot().await,
2126 flusher.clone(),
2127 );
2128 let handle = coordinator.handle("default");
2129 let (mut subscriber, _) = coordinator.subscribe();
2130 subscriber.initialize();
2131 coordinator.start();
2132
2133 handle
2135 .try_write(TestWrite {
2136 key: "a".into(),
2137 value: 1,
2138 size: 10,
2139 })
2140 .await
2141 .unwrap();
2142 handle.flush(false).await.unwrap();
2143
2144 let _ = subscriber.recv().await.unwrap();
2146 let result = subscriber.recv().await.unwrap();
2148
2149 assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2151
2152 coordinator.stop().await;
2154 }
2155
2156 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2157 async fn should_include_delta_in_view_after_flush() {
2158 let flusher = TestFlusher::default();
2160 let mut coordinator = WriteCoordinator::new(
2161 test_config(),
2162 vec!["default".to_string()],
2163 TestContext::default(),
2164 flusher.initial_snapshot().await,
2165 flusher.clone(),
2166 );
2167 let handle = coordinator.handle("default");
2168 let (mut subscriber, _) = coordinator.subscribe();
2169 subscriber.initialize();
2170 coordinator.start();
2171
2172 handle
2174 .try_write(TestWrite {
2175 key: "a".into(),
2176 value: 42,
2177 size: 10,
2178 })
2179 .await
2180 .unwrap();
2181 handle.flush(false).await.unwrap();
2182
2183 let _ = subscriber.recv().await.unwrap();
2185 let result = subscriber.recv().await.unwrap();
2187
2188 let flushed = result.last_written_delta.as_ref().unwrap();
2190 assert_eq!(flushed.val.get("a"), Some(&42));
2191
2192 coordinator.stop().await;
2194 }
2195
2196 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2197 async fn should_include_epoch_range_in_view_after_flush() {
2198 let flusher = TestFlusher::default();
2200 let mut coordinator = WriteCoordinator::new(
2201 test_config(),
2202 vec!["default".to_string()],
2203 TestContext::default(),
2204 flusher.initial_snapshot().await,
2205 flusher.clone(),
2206 );
2207 let handle = coordinator.handle("default");
2208 let (mut subscriber, _) = coordinator.subscribe();
2209 subscriber.initialize();
2210 coordinator.start();
2211
2212 let write1 = handle
2214 .try_write(TestWrite {
2215 key: "a".into(),
2216 value: 1,
2217 size: 10,
2218 })
2219 .await
2220 .unwrap();
2221 let write2 = handle
2222 .try_write(TestWrite {
2223 key: "b".into(),
2224 value: 2,
2225 size: 10,
2226 })
2227 .await
2228 .unwrap();
2229 handle.flush(false).await.unwrap();
2230
2231 let _ = subscriber.recv().await.unwrap();
2233 let result = subscriber.recv().await.unwrap();
2235
2236 let flushed = result.last_written_delta.as_ref().unwrap();
2238 let epoch1 = write1.epoch().await.unwrap();
2239 let epoch2 = write2.epoch().await.unwrap();
2240 assert!(flushed.epoch_range.contains(&epoch1));
2241 assert!(flushed.epoch_range.contains(&epoch2));
2242 assert_eq!(flushed.epoch_range.start, epoch1);
2243 assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2244
2245 coordinator.stop().await;
2247 }
2248
2249 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2250 async fn should_broadcast_frozen_delta_on_freeze() {
2251 let flusher = TestFlusher::default();
2253 let mut coordinator = WriteCoordinator::new(
2254 test_config(),
2255 vec!["default".to_string()],
2256 TestContext::default(),
2257 flusher.initial_snapshot().await,
2258 flusher.clone(),
2259 );
2260 let handle = coordinator.handle("default");
2261 let (mut subscriber, _) = coordinator.subscribe();
2262 subscriber.initialize();
2263 coordinator.start();
2264
2265 handle
2267 .try_write(TestWrite {
2268 key: "a".into(),
2269 value: 1,
2270 size: 10,
2271 })
2272 .await
2273 .unwrap();
2274 handle.flush(false).await.unwrap();
2275
2276 let state = subscriber.recv().await.unwrap();
2278 assert_eq!(state.frozen.len(), 1);
2279 assert!(state.frozen[0].val.contains_key("a"));
2280
2281 coordinator.stop().await;
2283 }
2284
2285 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2286 async fn should_remove_frozen_delta_after_flush_complete() {
2287 let flusher = TestFlusher::default();
2289 let mut coordinator = WriteCoordinator::new(
2290 test_config(),
2291 vec!["default".to_string()],
2292 TestContext::default(),
2293 flusher.initial_snapshot().await,
2294 flusher.clone(),
2295 );
2296 let handle = coordinator.handle("default");
2297 let (mut subscriber, _) = coordinator.subscribe();
2298 subscriber.initialize();
2299 coordinator.start();
2300
2301 handle
2303 .try_write(TestWrite {
2304 key: "a".into(),
2305 value: 1,
2306 size: 10,
2307 })
2308 .await
2309 .unwrap();
2310 handle.flush(false).await.unwrap();
2311
2312 let state1 = subscriber.recv().await.unwrap();
2314 assert_eq!(state1.frozen.len(), 1);
2315
2316 let state2 = subscriber.recv().await.unwrap();
2318 assert_eq!(state2.frozen.len(), 0);
2319 assert!(state2.last_written_delta.is_some());
2320
2321 coordinator.stop().await;
2323 }
2324
2325 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2326 async fn should_recover_from_message_lost_subscriber() {
2327 let flusher = TestFlusher::default();
2329 let mut coordinator = WriteCoordinator::new(
2330 test_config(),
2331 vec!["default".to_string()],
2332 TestContext::default(),
2333 flusher.initial_snapshot().await,
2334 flusher.clone(),
2335 );
2336 let handle = coordinator.handle("default");
2337 let (mut subscriber, _) = coordinator.subscribe();
2338 subscriber.initialize();
2339 coordinator.start();
2340
2341 for i in 0..20 {
2345 let _write_handle = handle
2346 .try_write(TestWrite {
2347 key: format!("key_{}", i),
2348 value: i as u64,
2349 size: 10,
2350 })
2351 .await
2352 .unwrap();
2353 let _ = handle.flush(false).await.unwrap();
2354 }
2355
2356 let mut watermark = handle
2358 .flush(true)
2359 .await
2360 .expect("flush(true) should succeed");
2361
2362 watermark.wait(Durability::Durable).await;
2364
2365 let result = subscriber
2367 .recv()
2368 .await
2369 .expect_err("expected recv() to yield an error");
2370 assert!(matches!(result, SubscribeError::MessageLost));
2371
2372 let (rx, initial_view) = handle.subscribe();
2374 (subscriber, _) = ViewSubscriber::new(rx, initial_view);
2375 let view = subscriber.initialize();
2376
2377 let records = view.snapshot.scan(BytesRange::unbounded()).await.unwrap();
2380 assert!(
2381 records.len() >= 20,
2382 "expected at least 20 rows, got {}",
2383 records.len()
2384 );
2385
2386 let _write_handle = handle
2388 .try_write(TestWrite {
2389 key: "post_recovery".into(),
2390 value: 100,
2391 size: 10,
2392 })
2393 .await
2394 .unwrap();
2395 let _ = handle.flush(false).await.unwrap();
2396
2397 let result = subscriber.recv().await;
2398 assert!(result.is_ok());
2399
2400 coordinator.stop().await;
2402 }
2403
2404 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2409 async fn should_flush_even_when_no_writes_if_flush_storage() {
2410 let flusher = TestFlusher::default();
2412 let storage = Arc::new(InMemoryStorage::new());
2413 let snapshot = storage.snapshot().await.unwrap();
2414 let mut coordinator = WriteCoordinator::new(
2415 test_config(),
2416 vec!["default".to_string()],
2417 TestContext::default(),
2418 snapshot,
2419 flusher.clone(),
2420 );
2421 let handle = coordinator.handle("default");
2422 coordinator.start();
2423
2424 let mut flush_handle = handle.flush(true).await.unwrap();
2426 flush_handle.wait(Durability::Durable).await.unwrap();
2427
2428 assert_eq!(flusher.flushed_events().len(), 0);
2431
2432 coordinator.stop().await;
2434 }
2435
2436 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2437 async fn should_advance_durable_watermark() {
2438 let flusher = TestFlusher::default();
2440 let storage = Arc::new(InMemoryStorage::new());
2441 let snapshot = storage.snapshot().await.unwrap();
2442 let mut coordinator = WriteCoordinator::new(
2443 test_config(),
2444 vec!["default".to_string()],
2445 TestContext::default(),
2446 snapshot,
2447 flusher.clone(),
2448 );
2449 let handle = coordinator.handle("default");
2450 coordinator.start();
2451
2452 let mut write = handle
2454 .try_write(TestWrite {
2455 key: "a".into(),
2456 value: 1,
2457 size: 10,
2458 })
2459 .await
2460 .unwrap();
2461 let mut flush_handle = handle.flush(true).await.unwrap();
2462
2463 flush_handle.wait(Durability::Durable).await.unwrap();
2465 write.wait(Durability::Durable).await.unwrap();
2466 assert_eq!(flusher.flushed_events().len(), 1);
2467
2468 coordinator.stop().await;
2470 }
2471
2472 #[tokio::test]
2473 async fn should_see_applied_write_via_view() {
2474 let flusher = TestFlusher::default();
2476 let mut coordinator = WriteCoordinator::new(
2477 test_config(),
2478 vec!["default".to_string()],
2479 TestContext::default(),
2480 flusher.initial_snapshot().await,
2481 flusher,
2482 );
2483 let handle = coordinator.handle("default");
2484 coordinator.start();
2485
2486 let mut write = handle
2488 .try_write(TestWrite {
2489 key: "a".into(),
2490 value: 42,
2491 size: 10,
2492 })
2493 .await
2494 .unwrap();
2495 write.wait(Durability::Applied).await.unwrap();
2496
2497 let view = coordinator.view();
2499 assert_eq!(view.current.get("a"), Some(42));
2500
2501 coordinator.stop().await;
2503 }
2504
2505 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2510 async fn should_flush_writes_from_multiple_channels() {
2511 let flusher = TestFlusher::default();
2513 let mut coordinator = WriteCoordinator::new(
2514 test_config(),
2515 vec!["ch1".to_string(), "ch2".to_string()],
2516 TestContext::default(),
2517 flusher.initial_snapshot().await,
2518 flusher.clone(),
2519 );
2520 let ch1 = coordinator.handle("ch1");
2521 let ch2 = coordinator.handle("ch2");
2522 coordinator.start();
2523
2524 let mut w1 = ch1
2527 .try_write(TestWrite {
2528 key: "a".into(),
2529 value: 10,
2530 size: 10,
2531 })
2532 .await
2533 .unwrap();
2534 w1.wait(Durability::Applied).await.unwrap();
2535
2536 let mut w2 = ch2
2537 .try_write(TestWrite {
2538 key: "b".into(),
2539 value: 20,
2540 size: 10,
2541 })
2542 .await
2543 .unwrap();
2544 w2.wait(Durability::Applied).await.unwrap();
2545
2546 let mut w3 = ch1
2547 .try_write(TestWrite {
2548 key: "c".into(),
2549 value: 30,
2550 size: 10,
2551 })
2552 .await
2553 .unwrap();
2554 w3.wait(Durability::Applied).await.unwrap();
2555
2556 ch1.flush(false).await.unwrap();
2557 w3.wait(Durability::Written).await.unwrap();
2558
2559 let snapshot = flusher.storage.snapshot().await.unwrap();
2561 assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2562
2563 coordinator.stop().await;
2565 }
2566
2567 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2568 async fn should_succeed_with_write_timeout_when_queue_has_space() {
2569 let flusher = TestFlusher::default();
2571 let mut coordinator = WriteCoordinator::new(
2572 test_config(),
2573 vec!["default".to_string()],
2574 TestContext::default(),
2575 flusher.initial_snapshot().await,
2576 flusher.clone(),
2577 );
2578 let handle = coordinator.handle("default");
2579 coordinator.start();
2580
2581 let mut wh = handle
2583 .write_timeout(
2584 TestWrite {
2585 key: "a".into(),
2586 value: 1,
2587 size: 10,
2588 },
2589 Duration::from_secs(1),
2590 )
2591 .await
2592 .unwrap();
2593
2594 let result = wh.wait(Durability::Applied).await;
2596 assert!(result.is_ok());
2597
2598 coordinator.stop().await;
2600 }
2601
2602 #[tokio::test]
2603 async fn should_timeout_when_queue_full() {
2604 let flusher = TestFlusher::default();
2606 let config = WriteCoordinatorConfig {
2607 queue_capacity: 2,
2608 flush_interval: Duration::from_secs(3600),
2609 flush_size_threshold: usize::MAX,
2610 };
2611 let mut coordinator = WriteCoordinator::new(
2612 config,
2613 vec!["default".to_string()],
2614 TestContext::default(),
2615 flusher.initial_snapshot().await,
2616 flusher.clone(),
2617 );
2618 let handle = coordinator.handle("default");
2619
2620 let _ = handle
2622 .try_write(TestWrite {
2623 key: "a".into(),
2624 value: 1,
2625 size: 10,
2626 })
2627 .await;
2628 let _ = handle
2629 .try_write(TestWrite {
2630 key: "b".into(),
2631 value: 2,
2632 size: 10,
2633 })
2634 .await;
2635
2636 let result = handle
2638 .write_timeout(
2639 TestWrite {
2640 key: "c".into(),
2641 value: 3,
2642 size: 10,
2643 },
2644 Duration::from_millis(10),
2645 )
2646 .await;
2647
2648 assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2650 }
2651
2652 #[tokio::test]
2653 async fn should_return_write_in_timeout_error() {
2654 let flusher = TestFlusher::default();
2656 let config = WriteCoordinatorConfig {
2657 queue_capacity: 1,
2658 flush_interval: Duration::from_secs(3600),
2659 flush_size_threshold: usize::MAX,
2660 };
2661 let mut coordinator = WriteCoordinator::new(
2662 config,
2663 vec!["default".to_string()],
2664 TestContext::default(),
2665 flusher.initial_snapshot().await,
2666 flusher.clone(),
2667 );
2668 let handle = coordinator.handle("default");
2669
2670 let _ = handle
2672 .try_write(TestWrite {
2673 key: "a".into(),
2674 value: 1,
2675 size: 10,
2676 })
2677 .await;
2678
2679 let result = handle
2681 .write_timeout(
2682 TestWrite {
2683 key: "retry_me".into(),
2684 value: 42,
2685 size: 10,
2686 },
2687 Duration::from_millis(10),
2688 )
2689 .await;
2690 let Err(err) = result else {
2691 panic!("expected TimeoutError");
2692 };
2693
2694 let write = err.into_inner().expect("should contain the write");
2696 assert_eq!(write.key, "retry_me");
2697 assert_eq!(write.value, 42);
2698 }
2699
2700 #[tokio::test]
2701 async fn should_return_write_in_backpressure_error() {
2702 let flusher = TestFlusher::default();
2704 let config = WriteCoordinatorConfig {
2705 queue_capacity: 1,
2706 flush_interval: Duration::from_secs(3600),
2707 flush_size_threshold: usize::MAX,
2708 };
2709 let mut coordinator = WriteCoordinator::new(
2710 config,
2711 vec!["default".to_string()],
2712 TestContext::default(),
2713 flusher.initial_snapshot().await,
2714 flusher.clone(),
2715 );
2716 let handle = coordinator.handle("default");
2717
2718 let _ = handle
2720 .try_write(TestWrite {
2721 key: "a".into(),
2722 value: 1,
2723 size: 10,
2724 })
2725 .await;
2726
2727 let result = handle
2729 .try_write(TestWrite {
2730 key: "retry_me".into(),
2731 value: 42,
2732 size: 10,
2733 })
2734 .await;
2735 let Err(err) = result else {
2736 panic!("expected Backpressure");
2737 };
2738
2739 let write = err.into_inner().expect("should contain the write");
2741 assert_eq!(write.key, "retry_me");
2742 assert_eq!(write.value, 42);
2743 }
2744
2745 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2746 async fn should_succeed_when_queue_drains_within_timeout() {
2747 let flusher = TestFlusher::default();
2749 let config = WriteCoordinatorConfig {
2750 queue_capacity: 2,
2751 flush_interval: Duration::from_secs(3600),
2752 flush_size_threshold: usize::MAX,
2753 };
2754 let mut coordinator = WriteCoordinator::new(
2755 config,
2756 vec!["default".to_string()],
2757 TestContext::default(),
2758 flusher.initial_snapshot().await,
2759 flusher.clone(),
2760 );
2761 let handle = coordinator.handle("default");
2762
2763 let _ = handle
2765 .try_write(TestWrite {
2766 key: "a".into(),
2767 value: 1,
2768 size: 10,
2769 })
2770 .await;
2771 let _ = handle
2772 .try_write(TestWrite {
2773 key: "b".into(),
2774 value: 2,
2775 size: 10,
2776 })
2777 .await;
2778
2779 coordinator.start();
2781 let result = handle
2782 .write_timeout(
2783 TestWrite {
2784 key: "c".into(),
2785 value: 3,
2786 size: 10,
2787 },
2788 Duration::from_secs(5),
2789 )
2790 .await;
2791
2792 assert!(result.is_ok());
2794
2795 coordinator.stop().await;
2797 }
2798
2799 #[tokio::test]
2800 async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2801 let flusher = TestFlusher::default();
2803 let mut coordinator = WriteCoordinator::new(
2804 test_config(),
2805 vec!["default".to_string()],
2806 TestContext::default(),
2807 flusher.initial_snapshot().await,
2808 flusher.clone(),
2809 );
2810 let handle = coordinator.handle("default");
2811 coordinator.start();
2812
2813 coordinator.stop().await;
2815 let result = handle
2816 .write_timeout(
2817 TestWrite {
2818 key: "a".into(),
2819 value: 1,
2820 size: 10,
2821 },
2822 Duration::from_secs(1),
2823 )
2824 .await;
2825
2826 assert!(matches!(result, Err(WriteError::Shutdown)));
2828 }
2829
2830 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2831 async fn should_pause_and_resume_write_channel() {
2832 let flusher = TestFlusher::default();
2834 let mut config = test_config();
2835 config.flush_size_threshold = usize::MAX;
2836 config.flush_interval = Duration::from_hours(24);
2837 let mut coordinator = WriteCoordinator::new(
2838 config,
2839 vec!["a", "b"],
2840 TestContext::default(),
2841 flusher.initial_snapshot().await,
2842 flusher.clone(),
2843 );
2844 let handle_a = coordinator.handle("a");
2845 let handle_b = coordinator.handle("b");
2846 let pause_handle = coordinator.pause_handle("a");
2847
2848 pause_handle.pause();
2857 coordinator.start();
2858
2859 let mut result_a = handle_a
2861 .try_write(TestWrite {
2862 key: "a".into(),
2863 value: 1,
2864 size: 1,
2865 })
2866 .await
2867 .unwrap();
2868 for i in 0..1000 {
2869 handle_b
2870 .try_write(TestWrite {
2871 key: format!("b{}", i),
2872 value: i,
2873 size: 1,
2874 })
2875 .await
2876 .unwrap()
2877 .wait(Durability::Applied)
2878 .await
2879 .unwrap();
2880 }
2881
2882 let data = coordinator.view().current.data.lock().unwrap().clone();
2885 let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2886 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2887 pause_handle.unpause();
2888 result_a.wait(Durability::Applied).await.unwrap();
2890 let data = coordinator.view().current.data.lock().unwrap().clone();
2891 expected.insert("a".into());
2892 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2893 }
2894}