1#![allow(unused)]
2
3mod error;
4mod handle;
5pub(crate) mod metrics;
6pub mod subscriber;
7mod traits;
8
9use std::collections::HashMap;
10use std::ops::Range;
11use std::ops::{Deref, DerefMut};
12
13pub use error::{WriteError, WriteResult};
14use futures::stream::{self, SelectAll, StreamExt};
15pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
16pub use metrics::describe_coordinator_metrics;
17pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
18pub use traits::{Delta, Durability, EpochStamped, Flusher};
19
20enum FlushEvent<D: Delta> {
22 FlushDelta { frozen: EpochStamped<D::Frozen> },
24 FlushStorage,
26}
27
28use crate::StorageRead;
30use crate::storage::StorageSnapshot;
31use async_trait::async_trait;
32pub use handle::EpochWatcher;
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35use tokio::sync::{broadcast, mpsc, oneshot, watch};
36use tokio::time::{Instant, Interval, interval_at};
37use tokio_util::sync::CancellationToken;
38
39#[derive(Debug, Clone)]
41pub struct WriteCoordinatorConfig {
42 pub queue_capacity: usize,
44 pub flush_interval: Duration,
46 pub flush_size_threshold: usize,
48}
49
50impl Default for WriteCoordinatorConfig {
51 fn default() -> Self {
52 Self {
53 queue_capacity: 10_000,
54 flush_interval: Duration::from_secs(10),
55 flush_size_threshold: 64 * 1024 * 1024, }
57 }
58}
59
60pub(crate) enum WriteCommand<D: Delta> {
61 Write {
62 write: D::Write,
63 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
64 },
65 Flush {
66 epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
67 flush_storage: bool,
68 },
69}
70
71pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
76 handles: HashMap<String, WriteCoordinatorHandle<D>>,
77 pause_handles: HashMap<String, PauseHandle>,
78 stop_tok: CancellationToken,
79 tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
80 write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
81 view: Arc<BroadcastedView<D>>,
82}
83
84impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
85 pub fn new(
86 config: WriteCoordinatorConfig,
87 channels: Vec<impl ToString>,
88 initial_context: D::Context,
89 initial_snapshot: Arc<dyn StorageSnapshot>,
90 flusher: F,
91 ) -> WriteCoordinator<D, F> {
92 let (watermarks, watcher) = EpochWatermarks::new();
93 let watermarks = Arc::new(watermarks);
94
95 let mut write_rxs = Vec::with_capacity(channels.len());
97 let mut write_txs = HashMap::new();
98 let mut pause_handles = HashMap::new();
99 for name in &channels {
100 let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
101 write_rxs.push(write_rx);
102 write_txs.insert(name.to_string(), write_tx);
103 pause_handles.insert(name.to_string(), pause_hdl);
104 }
105
106 let (flush_tx, flush_rx) = mpsc::channel(2);
112
113 let flush_stop_tok = CancellationToken::new();
114 let stop_tok = CancellationToken::new();
115 let write_task = WriteCoordinatorTask::new(
116 config,
117 initial_context,
118 initial_snapshot,
119 write_rxs,
120 flush_tx,
121 watermarks.clone(),
122 stop_tok.clone(),
123 flush_stop_tok.clone(),
124 );
125
126 let view = write_task.view.clone();
127
128 let mut handles = HashMap::new();
129 for name in channels {
130 let name = name.to_string();
131 let write_tx = write_txs.remove(&name).expect("unreachable");
132 handles.insert(
133 name.clone(),
134 WriteCoordinatorHandle::new(name, write_tx, watcher.clone(), view.clone()),
135 );
136 }
137
138 let flush_task = FlushTask {
139 flusher,
140 stop_tok: flush_stop_tok,
141 flush_rx,
142 watermarks: watermarks.clone(),
143 view: view.clone(),
144 last_flushed_epoch: 0,
145 };
146
147 Self {
148 handles,
149 pause_handles,
150 tasks: Some((write_task, flush_task)),
151 write_task_jh: None,
152 stop_tok,
153 view,
154 }
155 }
156
157 pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
158 self.handles
159 .get(name)
160 .expect("unknown channel name")
161 .clone()
162 }
163
164 pub fn pause_handle(&self, name: &str) -> PauseHandle {
165 self.pause_handles
166 .get(name)
167 .expect("unknown channel name")
168 .clone()
169 }
170
171 pub fn start(&mut self) {
172 let Some((write_task, flush_task)) = self.tasks.take() else {
173 return;
175 };
176 let flush_task_jh = flush_task.run();
177 let write_task_jh = write_task.run(flush_task_jh);
178 self.write_task_jh = Some(write_task_jh);
179 }
180
181 pub async fn stop(mut self) -> Result<(), String> {
182 let Some(write_task_jh) = self.write_task_jh.take() else {
183 return Ok(());
184 };
185 self.stop_tok.cancel();
186 write_task_jh.await.map_err(|e| e.to_string())?
187 }
188
189 pub fn view(&self) -> Arc<View<D>> {
190 self.view.current()
191 }
192
193 pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
194 let (view_rx, initial_view) = self.view.subscribe();
195 ViewSubscriber::new(view_rx, initial_view)
196 }
197}
198
199struct WriteCoordinatorTask<D: Delta> {
200 config: WriteCoordinatorConfig,
201 delta: CurrentDelta<D>,
202 flush_tx: mpsc::Sender<FlushEvent<D>>,
203 write_rxs: Vec<PausableReceiver<D>>,
204 watermarks: Arc<EpochWatermarks>,
205 view: Arc<BroadcastedView<D>>,
206 epoch: u64,
207 delta_start_epoch: u64,
208 flush_interval: Interval,
209 stop_tok: CancellationToken,
210 flush_stop_tok: CancellationToken,
211}
212
213impl<D: Delta> WriteCoordinatorTask<D> {
214 #[allow(clippy::too_many_arguments)]
218 pub fn new(
219 config: WriteCoordinatorConfig,
220 initial_context: D::Context,
221 initial_snapshot: Arc<dyn StorageSnapshot>,
222 write_rxs: Vec<PausableReceiver<D>>,
223 flush_tx: mpsc::Sender<FlushEvent<D>>,
224 watermarks: Arc<EpochWatermarks>,
225 stop_tok: CancellationToken,
226 flush_stop_tok: CancellationToken,
227 ) -> Self {
228 let delta = D::init(initial_context);
229
230 let initial_view = View {
231 current: delta.reader(),
232 frozen: vec![],
233 snapshot: initial_snapshot,
234 last_written_delta: None,
235 };
236 let initial_view = Arc::new(BroadcastedView::new(initial_view));
237
238 let flush_interval = interval_at(
239 Instant::now() + config.flush_interval,
240 config.flush_interval,
241 );
242 Self {
243 config,
244 delta: CurrentDelta::new(delta),
245 write_rxs,
246 flush_tx,
247 watermarks,
248 view: initial_view,
249 epoch: 1,
253 delta_start_epoch: 1,
254 flush_interval,
255 stop_tok,
256 flush_stop_tok,
257 }
258 }
259
260 pub fn run(
262 mut self,
263 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
264 ) -> tokio::task::JoinHandle<Result<(), String>> {
265 tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
266 }
267
268 async fn run_coordinator(
269 mut self,
270 flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
271 ) -> Result<(), String> {
272 self.flush_interval.reset();
274
275 let mut write_stream: SelectAll<_> = SelectAll::new();
277 for rx in self.write_rxs.drain(..) {
278 write_stream.push(
279 stream::unfold(
280 rx,
281 |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
282 )
283 .boxed(),
284 );
285 }
286
287 loop {
288 tokio::select! {
289 cmd = write_stream.next() => {
290 match cmd {
291 Some(WriteCommand::Write { write, result_tx }) => {
292 self.handle_write(write, result_tx).await?;
293 }
294 Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
295 let _ = epoch_tx.send(Ok(handle::WriteApplied {
297 epoch: self.epoch.saturating_sub(1),
298 result: (),
299 }));
300 self.handle_flush(FlushReason::Explicit, flush_storage).await;
301 }
302 None => {
303 break;
305 }
306 }
307 }
308
309 _ = self.flush_interval.tick() => {
310 self.handle_flush(FlushReason::Interval, false).await;
311 }
312
313 _ = self.stop_tok.cancelled() => {
314 break;
315 }
316 }
317 }
318
319 self.handle_flush(FlushReason::Shutdown, false).await;
321
322 self.flush_stop_tok.cancel();
324 flush_task_jh
326 .await
327 .map_err(|e| format!("flush task panicked: {}", e))?
328 .map_err(|e| format!("flush task error: {}", e))
329 }
330
331 async fn handle_write(
332 &mut self,
333 op: D::Write,
334 result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
335 ) -> Result<(), String> {
336 let write_epoch = self.epoch;
337 self.epoch += 1;
338
339 let apply_start = std::time::Instant::now();
340 let result = self.delta.apply(op);
341 ::metrics::histogram!(metrics::COORDINATOR_DELTA_APPLY_DURATION_SECONDS)
342 .record(apply_start.elapsed().as_secs_f64());
343
344 let _ = result_tx.send(
346 result
347 .map(|apply_result| handle::WriteApplied {
348 epoch: write_epoch,
349 result: apply_result,
350 })
351 .map_err(|e| handle::WriteFailed {
352 epoch: write_epoch,
353 error: e,
354 }),
355 );
356
357 self.watermarks.update_applied(write_epoch);
359
360 let estimated = self.delta.estimate_size();
361 ::metrics::gauge!(metrics::COORDINATOR_DELTA_ESTIMATED_BYTES).set(estimated as f64);
362 if estimated >= self.config.flush_size_threshold {
363 self.handle_flush(FlushReason::SizeThreshold, false).await;
364 }
365
366 Ok(())
367 }
368
369 async fn handle_flush(&mut self, reason: FlushReason, flush_storage: bool) {
370 self.flush_if_delta_has_writes(reason).await;
371 if flush_storage {
372 self.send_flush_event(FlushEvent::FlushStorage).await;
373 }
374 }
375
376 async fn flush_if_delta_has_writes(&mut self, reason: FlushReason) {
377 if self.epoch == self.delta_start_epoch {
378 return;
379 }
380
381 self.flush_interval.reset();
382
383 ::metrics::counter!(metrics::COORDINATOR_FLUSH_TOTAL, "reason" => reason.as_str())
384 .increment(1);
385
386 let epoch_range = self.delta_start_epoch..self.epoch;
387 self.delta_start_epoch = self.epoch;
388
389 let freeze_start = std::time::Instant::now();
390 let (frozen, frozen_reader) = self.delta.freeze_and_init();
391 ::metrics::histogram!(metrics::COORDINATOR_DELTA_FREEZE_DURATION_SECONDS)
392 .record(freeze_start.elapsed().as_secs_f64());
393 ::metrics::gauge!(metrics::COORDINATOR_DELTA_ESTIMATED_BYTES).set(0.0);
395
396 let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
397 let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
398 let reader = self.delta.reader();
399 self.view.update_delta_frozen(stamped_frozen_reader, reader);
402 self.send_flush_event(FlushEvent::FlushDelta {
405 frozen: stamped_frozen,
406 })
407 .await;
408 }
409
410 async fn send_flush_event(&self, event: FlushEvent<D>) {
411 let max = self.flush_tx.max_capacity();
414 let free = self.flush_tx.capacity();
415 ::metrics::gauge!(metrics::COORDINATOR_FLUSH_EVENT_QUEUE_DEPTH)
416 .set(max.saturating_sub(free) as f64);
417
418 let send_start = std::time::Instant::now();
419 let _ = self.flush_tx.send(event).await;
420 ::metrics::histogram!(metrics::COORDINATOR_FLUSH_EVENT_SEND_DURATION_SECONDS)
421 .record(send_start.elapsed().as_secs_f64());
422 }
423}
424
425#[derive(Clone, Copy, Debug)]
427pub(crate) enum FlushReason {
428 SizeThreshold,
429 Interval,
430 Explicit,
431 Shutdown,
432}
433
434impl FlushReason {
435 fn as_str(self) -> &'static str {
436 match self {
437 FlushReason::SizeThreshold => "size_threshold",
438 FlushReason::Interval => "interval",
439 FlushReason::Explicit => "explicit",
440 FlushReason::Shutdown => "shutdown",
441 }
442 }
443}
444
445struct FlushTask<D: Delta, F: Flusher<D>> {
446 flusher: F,
447 stop_tok: CancellationToken,
448 flush_rx: mpsc::Receiver<FlushEvent<D>>,
449 watermarks: Arc<EpochWatermarks>,
450 view: Arc<BroadcastedView<D>>,
451 last_flushed_epoch: u64,
452}
453
454impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
455 fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
456 tokio::spawn(async move {
457 loop {
458 tokio::select! {
459 event = self.flush_rx.recv() => {
460 let Some(event) = event else {
461 break;
462 };
463 self.handle_event(event).await?;
464 }
465 _ = self.stop_tok.cancelled() => {
466 break;
467 }
468 }
469 }
470 while let Ok(event) = self.flush_rx.try_recv() {
472 self.handle_event(event).await;
473 }
474 Ok(())
475 })
476 }
477
478 async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
479 match event {
480 FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
481 FlushEvent::FlushStorage => {
482 let start = std::time::Instant::now();
483 let result = self
484 .flusher
485 .flush_storage()
486 .await
487 .map_err(|e| WriteError::FlushError(e.to_string()));
488 ::metrics::histogram!(metrics::COORDINATOR_FLUSH_STORAGE_DURATION_SECONDS)
489 .record(start.elapsed().as_secs_f64());
490 result?;
491 self.watermarks.update_durable(self.last_flushed_epoch);
492 Ok(())
493 }
494 }
495 }
496
497 async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
498 let delta = frozen.val;
499 let epoch_range = frozen.epoch_range;
500 let start = std::time::Instant::now();
501 let result = self
502 .flusher
503 .flush_delta(delta, &epoch_range)
504 .await
505 .map_err(|e| WriteError::FlushError(e.to_string()));
506 ::metrics::histogram!(metrics::COORDINATOR_FLUSH_DELTA_DURATION_SECONDS)
507 .record(start.elapsed().as_secs_f64());
508 let snapshot = result?;
509 self.last_flushed_epoch = epoch_range.end - 1;
510 self.view.update_flush_finished(snapshot, epoch_range);
516 self.watermarks.update_written(self.last_flushed_epoch);
517 Ok(())
518 }
519}
520
521struct CurrentDelta<D: Delta> {
522 delta: Option<D>,
523}
524
525impl<D: Delta> Deref for CurrentDelta<D> {
526 type Target = D;
527
528 fn deref(&self) -> &Self::Target {
529 match &self.delta {
530 Some(d) => d,
531 None => panic!("current delta not initialized"),
532 }
533 }
534}
535
536impl<D: Delta> DerefMut for CurrentDelta<D> {
537 fn deref_mut(&mut self) -> &mut Self::Target {
538 match &mut self.delta {
539 Some(d) => d,
540 None => panic!("current delta not initialized"),
541 }
542 }
543}
544
545impl<D: Delta> CurrentDelta<D> {
546 fn new(delta: D) -> Self {
547 Self { delta: Some(delta) }
548 }
549
550 fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
551 let Some(delta) = self.delta.take() else {
552 panic!("delta not initialized");
553 };
554 let (frozen, frozen_reader, context) = delta.freeze();
555 let new_delta = D::init(context);
556 self.delta = Some(new_delta);
557 (frozen, frozen_reader)
558 }
559}
560
561pub struct EpochWatermarks {
562 applied_tx: tokio::sync::watch::Sender<u64>,
563 written_tx: tokio::sync::watch::Sender<u64>,
564 durable_tx: tokio::sync::watch::Sender<u64>,
565}
566
567impl EpochWatermarks {
568 pub fn new() -> (Self, EpochWatcher) {
569 let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
570 let (written_tx, written_rx) = tokio::sync::watch::channel(0);
571 let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
572 let watcher = EpochWatcher {
573 applied_rx,
574 written_rx,
575 durable_rx,
576 };
577 let watermarks = EpochWatermarks {
578 applied_tx,
579 written_tx,
580 durable_tx,
581 };
582 (watermarks, watcher)
583 }
584
585 pub fn update_applied(&self, epoch: u64) {
586 let _ = self.applied_tx.send(epoch);
587 }
588
589 pub fn update_written(&self, epoch: u64) {
590 let _ = self.written_tx.send(epoch);
591 }
592
593 pub fn update_durable(&self, epoch: u64) {
594 let _ = self.durable_tx.send(epoch);
595 }
596}
597
598pub(crate) struct BroadcastedView<D: Delta> {
599 inner: Mutex<BroadcastedViewInner<D>>,
600}
601
602impl<D: Delta> BroadcastedView<D> {
603 fn new(initial_view: View<D>) -> Self {
604 let (view_tx, _) = broadcast::channel(16);
605 Self {
606 inner: Mutex::new(BroadcastedViewInner {
607 view: Arc::new(initial_view),
608 view_tx,
609 }),
610 }
611 }
612
613 fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
614 self.inner
615 .lock()
616 .expect("lock poisoned")
617 .update_flush_finished(snapshot, epoch_range);
618 }
619
620 fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
621 self.inner
622 .lock()
623 .expect("lock poisoned")
624 .update_delta_frozen(frozen, reader);
625 }
626
627 fn current(&self) -> Arc<View<D>> {
628 self.inner.lock().expect("lock poisoned").current()
629 }
630
631 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
632 self.inner.lock().expect("lock poisoned").subscribe()
633 }
634}
635
636struct BroadcastedViewInner<D: Delta> {
637 view: Arc<View<D>>,
638 view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
639}
640
641impl<D: Delta> BroadcastedViewInner<D> {
642 fn update_flush_finished(
643 &mut self,
644 snapshot: Arc<dyn StorageSnapshot>,
645 epoch_range: Range<u64>,
646 ) {
647 let mut new_frozen = self.view.frozen.clone();
648 let last = new_frozen
649 .pop()
650 .expect("frozen should not be empty when flush completes");
651 assert_eq!(last.epoch_range, epoch_range);
652 self.view = Arc::new(View {
653 current: self.view.current.clone(),
654 frozen: new_frozen,
655 snapshot,
656 last_written_delta: Some(last),
657 });
658 self.view_tx.send(self.view.clone());
659 }
660
661 fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
662 let mut new_frozen = vec![frozen];
664 new_frozen.extend(self.view.frozen.iter().cloned());
665 self.view = Arc::new(View {
666 current: reader,
667 frozen: new_frozen,
668 snapshot: self.view.snapshot.clone(),
669 last_written_delta: self.view.last_written_delta.clone(),
670 });
671 self.view_tx.send(self.view.clone());
672 }
673
674 fn current(&self) -> Arc<View<D>> {
675 self.view.clone()
676 }
677
678 fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
679 (self.view_tx.subscribe(), self.view.clone())
680 }
681}
682
683struct PausableReceiver<D: Delta> {
684 pause_rx: Option<watch::Receiver<bool>>,
685 rx: mpsc::Receiver<WriteCommand<D>>,
686}
687
688impl<D: Delta> PausableReceiver<D> {
689 async fn recv(&mut self) -> Option<WriteCommand<D>> {
690 if let Some(pause_rx) = self.pause_rx.as_mut() {
691 pause_rx.wait_for(|v| !*v).await;
692 }
693 self.rx.recv().await
694 }
695}
696
697#[derive(Clone)]
699pub struct PauseHandle {
700 pause_tx: tokio::sync::watch::Sender<bool>,
701}
702
703impl PauseHandle {
704 pub fn pause(&self) {
705 self.pause_tx.send_replace(true);
706 }
707
708 pub fn unpause(&self) {
709 self.pause_tx.send_replace(false);
710 }
711}
712
713fn pausable_channel<D: Delta>(
714 capacity: usize,
715) -> (
716 mpsc::Sender<WriteCommand<D>>,
717 PausableReceiver<D>,
718 PauseHandle,
719) {
720 let (pause_tx, pause_rx) = watch::channel(false);
721 let (tx, rx) = mpsc::channel(capacity);
722 (
723 tx,
724 PausableReceiver {
725 pause_rx: Some(pause_rx),
726 rx,
727 },
728 PauseHandle { pause_tx },
729 )
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use crate::BytesRange;
736 use crate::coordinator::Durability;
737 use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
738 use crate::storage::{PutRecordOp, Record, StorageSnapshot};
739 use crate::{Storage, StorageRead};
740 use async_trait::async_trait;
741 use bytes::Bytes;
742 use std::collections::{HashMap, HashSet};
743 use std::ops::Range;
744 use std::sync::Mutex;
745 #[derive(Clone, Debug)]
750 struct TestWrite {
751 key: String,
752 value: u64,
753 size: usize,
754 }
755
756 #[derive(Clone, Debug, Default)]
758 struct TestContext {
759 next_seq: u64,
760 error: Option<String>,
761 }
762
763 #[derive(Clone, Debug, Default)]
765 struct TestDeltaReader {
766 data: Arc<Mutex<HashMap<String, u64>>>,
767 }
768
769 impl TestDeltaReader {
770 fn get(&self, key: &str) -> Option<u64> {
771 self.data.lock().unwrap().get(key).copied()
772 }
773 }
774
775 #[derive(Debug)]
778 struct TestDelta {
779 context: TestContext,
780 writes: HashMap<String, (u64, u64)>,
781 key_values: Arc<Mutex<HashMap<String, u64>>>,
782 total_size: usize,
783 }
784
785 #[derive(Clone, Debug)]
786 struct FrozenTestDelta {
787 writes: HashMap<String, (u64, u64)>,
788 }
789
790 impl std::fmt::Debug for View<TestDelta> {
791 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
792 f.write_str("View<TestDelta>")
793 }
794 }
795
796 impl Delta for TestDelta {
797 type Context = TestContext;
798 type Write = TestWrite;
799 type DeltaView = TestDeltaReader;
800 type Frozen = FrozenTestDelta;
801 type FrozenView = Arc<HashMap<String, u64>>;
802 type ApplyResult = ();
803
804 fn init(context: Self::Context) -> Self {
805 Self {
806 context,
807 writes: HashMap::default(),
808 key_values: Arc::new(Mutex::new(HashMap::default())),
809 total_size: 0,
810 }
811 }
812
813 fn apply(&mut self, write: Self::Write) -> Result<(), String> {
814 if let Some(error) = &self.context.error {
815 return Err(error.clone());
816 }
817
818 let seq = self.context.next_seq;
819 self.context.next_seq += 1;
820
821 self.writes.insert(write.key.clone(), (seq, write.value));
822 self.total_size += write.size;
823 self.key_values
824 .lock()
825 .unwrap()
826 .insert(write.key, write.value);
827 Ok(())
828 }
829
830 fn estimate_size(&self) -> usize {
831 self.total_size
832 }
833
834 fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
835 let frozen = FrozenTestDelta {
836 writes: self.writes,
837 };
838 let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
839 (frozen, frozen_view, self.context)
840 }
841
842 fn reader(&self) -> Self::DeltaView {
843 TestDeltaReader {
844 data: self.key_values.clone(),
845 }
846 }
847 }
848
849 #[derive(Default)]
851 struct TestFlusherState {
852 flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
853 flush_started_tx: Option<oneshot::Sender<()>>,
855 unblock_rx: Option<mpsc::Receiver<()>>,
857 }
858
859 #[derive(Clone)]
860 struct TestFlusher {
861 state: Arc<Mutex<TestFlusherState>>,
862 storage: Arc<InMemoryStorage>,
863 }
864
865 impl Default for TestFlusher {
866 fn default() -> Self {
867 Self {
868 state: Arc::new(Mutex::new(TestFlusherState::default())),
869 storage: Arc::new(InMemoryStorage::new()),
870 }
871 }
872 }
873
874 impl TestFlusher {
875 fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
878 let (started_tx, started_rx) = oneshot::channel();
879 let (unblock_tx, unblock_rx) = mpsc::channel(1);
880 let flusher = Self {
881 state: Arc::new(Mutex::new(TestFlusherState {
882 flushed_events: Vec::new(),
883 flush_started_tx: Some(started_tx),
884 unblock_rx: Some(unblock_rx),
885 })),
886 storage: Arc::new(InMemoryStorage::new()),
887 };
888 (flusher, started_rx, unblock_tx)
889 }
890
891 fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
892 self.state.lock().unwrap().flushed_events.clone()
893 }
894
895 async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
896 self.storage.snapshot().await.unwrap()
897 }
898 }
899
900 #[async_trait]
901 impl Flusher<TestDelta> for TestFlusher {
902 async fn flush_delta(
903 &mut self,
904 frozen: FrozenTestDelta,
905 epoch_range: &Range<u64>,
906 ) -> Result<Arc<dyn StorageSnapshot>, String> {
907 let flush_started_tx = {
909 let mut state = self.state.lock().unwrap();
910 state.flush_started_tx.take()
911 };
912 if let Some(tx) = flush_started_tx {
913 let _ = tx.send(());
914 }
915
916 let unblock_rx = {
918 let mut state = self.state.lock().unwrap();
919 state.unblock_rx.take()
920 };
921 if let Some(mut rx) = unblock_rx {
922 rx.recv().await;
923 }
924
925 let records: Vec<PutRecordOp> = frozen
927 .writes
928 .iter()
929 .map(|(key, (seq, value))| {
930 let mut buf = Vec::with_capacity(16);
931 buf.extend_from_slice(&seq.to_le_bytes());
932 buf.extend_from_slice(&value.to_le_bytes());
933 Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
934 })
935 .collect();
936 self.storage
937 .put(records)
938 .await
939 .map_err(|e| format!("{}", e))?;
940
941 {
943 let mut state = self.state.lock().unwrap();
944 state
945 .flushed_events
946 .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
947 }
948
949 self.storage.snapshot().await.map_err(|e| format!("{}", e))
950 }
951
952 async fn flush_storage(&self) -> Result<(), String> {
953 let flush_started_tx = {
955 let mut state = self.state.lock().unwrap();
956 state.flush_started_tx.take()
957 };
958 if let Some(tx) = flush_started_tx {
959 let _ = tx.send(());
960 }
961
962 let unblock_rx = {
964 let mut state = self.state.lock().unwrap();
965 state.unblock_rx.take()
966 };
967 if let Some(mut rx) = unblock_rx {
968 rx.recv().await;
969 }
970
971 Ok(())
972 }
973 }
974
975 fn test_config() -> WriteCoordinatorConfig {
976 WriteCoordinatorConfig {
977 queue_capacity: 100,
978 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX,
980 }
981 }
982
983 async fn assert_snapshot_has_rows(
984 snapshot: &Arc<dyn StorageSnapshot>,
985 expected: &[(&str, u64, u64)],
986 ) {
987 let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
988 assert_eq!(
989 records.len(),
990 expected.len(),
991 "expected {} rows but snapshot has {}",
992 expected.len(),
993 records.len()
994 );
995 let mut actual: Vec<(String, u64, u64)> = records
996 .iter()
997 .map(|r| {
998 let key = String::from_utf8(r.key.to_vec()).unwrap();
999 let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
1000 let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
1001 (key, seq, value)
1002 })
1003 .collect();
1004 actual.sort_by(|a, b| a.0.cmp(&b.0));
1005 let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
1006 expected.sort_by(|a, b| a.0.cmp(b.0));
1007 for (actual, expected) in actual.iter().zip(expected.iter()) {
1008 assert_eq!(
1009 actual.0, expected.0,
1010 "key mismatch: got {:?}, expected {:?}",
1011 actual.0, expected.0
1012 );
1013 assert_eq!(
1014 actual.1, expected.1,
1015 "seq mismatch for key {:?}: got {}, expected {}",
1016 actual.0, actual.1, expected.1
1017 );
1018 assert_eq!(
1019 actual.2, expected.2,
1020 "value mismatch for key {:?}: got {}, expected {}",
1021 actual.0, actual.2, expected.2
1022 );
1023 }
1024 }
1025
1026 #[tokio::test]
1031 async fn should_assign_monotonic_epochs() {
1032 let flusher = TestFlusher::default();
1034 let mut coordinator = WriteCoordinator::new(
1035 test_config(),
1036 vec!["default".to_string()],
1037 TestContext::default(),
1038 flusher.initial_snapshot().await,
1039 flusher,
1040 );
1041 let handle = coordinator.handle("default");
1042 coordinator.start();
1043
1044 let write1 = handle
1046 .try_write(TestWrite {
1047 key: "a".into(),
1048 value: 1,
1049 size: 10,
1050 })
1051 .await
1052 .unwrap();
1053 let write2 = handle
1054 .try_write(TestWrite {
1055 key: "b".into(),
1056 value: 2,
1057 size: 10,
1058 })
1059 .await
1060 .unwrap();
1061 let write3 = handle
1062 .try_write(TestWrite {
1063 key: "c".into(),
1064 value: 3,
1065 size: 10,
1066 })
1067 .await
1068 .unwrap();
1069
1070 let epoch1 = write1.epoch().await.unwrap();
1071 let epoch2 = write2.epoch().await.unwrap();
1072 let epoch3 = write3.epoch().await.unwrap();
1073
1074 assert!(epoch1 < epoch2);
1076 assert!(epoch2 < epoch3);
1077
1078 coordinator.stop().await;
1080 }
1081
1082 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1083 async fn should_apply_writes_in_order() {
1084 let flusher = TestFlusher::default();
1086 let mut coordinator = WriteCoordinator::new(
1087 test_config(),
1088 vec!["default".to_string()],
1089 TestContext::default(),
1090 flusher.initial_snapshot().await,
1091 flusher.clone(),
1092 );
1093 let handle = coordinator.handle("default");
1094 coordinator.start();
1095
1096 handle
1098 .try_write(TestWrite {
1099 key: "a".into(),
1100 value: 1,
1101 size: 10,
1102 })
1103 .await
1104 .unwrap();
1105 handle
1106 .try_write(TestWrite {
1107 key: "a".into(),
1108 value: 2,
1109 size: 10,
1110 })
1111 .await
1112 .unwrap();
1113 let mut last_write = handle
1114 .try_write(TestWrite {
1115 key: "a".into(),
1116 value: 3,
1117 size: 10,
1118 })
1119 .await
1120 .unwrap();
1121
1122 handle.flush(false).await.unwrap();
1123 last_write.wait(Durability::Written).await.unwrap();
1125
1126 let events = flusher.flushed_events();
1128 assert_eq!(events.len(), 1);
1129 let frozen_delta = &events[0];
1130 let delta = &frozen_delta.val;
1131 let (seq, value) = delta.writes.get("a").unwrap();
1133 assert_eq!(*value, 3);
1134 assert_eq!(*seq, 2);
1135
1136 coordinator.stop().await;
1138 }
1139
1140 #[tokio::test]
1141 async fn should_update_applied_watermark_after_each_write() {
1142 let flusher = TestFlusher::default();
1144 let mut coordinator = WriteCoordinator::new(
1145 test_config(),
1146 vec!["default".to_string()],
1147 TestContext::default(),
1148 flusher.initial_snapshot().await,
1149 flusher,
1150 );
1151 let handle = coordinator.handle("default");
1152 coordinator.start();
1153
1154 let mut write_handle = handle
1156 .try_write(TestWrite {
1157 key: "a".into(),
1158 value: 1,
1159 size: 10,
1160 })
1161 .await
1162 .unwrap();
1163
1164 let result = write_handle.wait(Durability::Applied).await;
1166 assert!(result.is_ok());
1167
1168 coordinator.stop().await;
1170 }
1171
1172 #[tokio::test]
1173 async fn should_propagate_apply_error_to_handle() {
1174 let flusher = TestFlusher::default();
1176 let context = TestContext {
1177 error: Some("apply error".to_string()),
1178 ..Default::default()
1179 };
1180 let mut coordinator = WriteCoordinator::new(
1181 test_config(),
1182 vec!["default".to_string()],
1183 context,
1184 flusher.initial_snapshot().await,
1185 flusher,
1186 );
1187 let handle = coordinator.handle("default");
1188 coordinator.start();
1189
1190 let write = handle
1192 .try_write(TestWrite {
1193 key: "a".into(),
1194 value: 1,
1195 size: 10,
1196 })
1197 .await
1198 .unwrap();
1199
1200 let result = write.epoch().await;
1201
1202 assert!(
1204 matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1205 );
1206
1207 coordinator.stop().await;
1209 }
1210
1211 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1216 async fn should_flush_on_command() {
1217 let flusher = TestFlusher::default();
1219 let mut coordinator = WriteCoordinator::new(
1220 test_config(),
1221 vec!["default".to_string()],
1222 TestContext::default(),
1223 flusher.initial_snapshot().await,
1224 flusher.clone(),
1225 );
1226 let handle = coordinator.handle("default");
1227 coordinator.start();
1228
1229 let mut write = handle
1231 .try_write(TestWrite {
1232 key: "a".into(),
1233 value: 1,
1234 size: 10,
1235 })
1236 .await
1237 .unwrap();
1238 handle.flush(false).await.unwrap();
1239 write.wait(Durability::Written).await.unwrap();
1240
1241 assert_eq!(flusher.flushed_events().len(), 1);
1243
1244 coordinator.stop().await;
1246 }
1247
1248 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1249 async fn should_wait_on_flush_handle() {
1250 let flusher = TestFlusher::default();
1252 let mut coordinator = WriteCoordinator::new(
1253 test_config(),
1254 vec!["default".to_string()],
1255 TestContext::default(),
1256 flusher.initial_snapshot().await,
1257 flusher.clone(),
1258 );
1259 let handle = coordinator.handle("default");
1260 coordinator.start();
1261
1262 handle
1264 .try_write(TestWrite {
1265 key: "a".into(),
1266 value: 1,
1267 size: 10,
1268 })
1269 .await
1270 .unwrap();
1271 let mut flush_handle = handle.flush(false).await.unwrap();
1272
1273 flush_handle.wait(Durability::Written).await.unwrap();
1275 assert_eq!(flusher.flushed_events().len(), 1);
1276
1277 coordinator.stop().await;
1279 }
1280
1281 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1282 async fn should_return_correct_epoch_from_flush_handle() {
1283 let flusher = TestFlusher::default();
1285 let mut coordinator = WriteCoordinator::new(
1286 test_config(),
1287 vec!["default".to_string()],
1288 TestContext::default(),
1289 flusher.initial_snapshot().await,
1290 flusher,
1291 );
1292 let handle = coordinator.handle("default");
1293 coordinator.start();
1294
1295 let write1 = handle
1297 .try_write(TestWrite {
1298 key: "a".into(),
1299 value: 1,
1300 size: 10,
1301 })
1302 .await
1303 .unwrap();
1304 let write2 = handle
1305 .try_write(TestWrite {
1306 key: "b".into(),
1307 value: 2,
1308 size: 10,
1309 })
1310 .await
1311 .unwrap();
1312 let flush_handle = handle.flush(false).await.unwrap();
1313
1314 let flush_epoch = flush_handle.epoch().await.unwrap();
1316 let write2_epoch = write2.epoch().await.unwrap();
1317 assert_eq!(flush_epoch, write2_epoch);
1318
1319 coordinator.stop().await;
1321 }
1322
1323 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1324 async fn should_include_all_pending_writes_in_flush() {
1325 let flusher = TestFlusher::default();
1327 let mut coordinator = WriteCoordinator::new(
1328 test_config(),
1329 vec!["default".to_string()],
1330 TestContext::default(),
1331 flusher.initial_snapshot().await,
1332 flusher.clone(),
1333 );
1334 let handle = coordinator.handle("default");
1335 coordinator.start();
1336
1337 handle
1339 .try_write(TestWrite {
1340 key: "a".into(),
1341 value: 1,
1342 size: 10,
1343 })
1344 .await
1345 .unwrap();
1346 handle
1347 .try_write(TestWrite {
1348 key: "b".into(),
1349 value: 2,
1350 size: 10,
1351 })
1352 .await
1353 .unwrap();
1354 let mut last_write = handle
1355 .try_write(TestWrite {
1356 key: "c".into(),
1357 value: 3,
1358 size: 10,
1359 })
1360 .await
1361 .unwrap();
1362
1363 handle.flush(false).await.unwrap();
1364 last_write.wait(Durability::Written).await.unwrap();
1365
1366 let events = flusher.flushed_events();
1368 assert_eq!(events.len(), 1);
1369 let frozen_delta = &events[0];
1370 assert_eq!(frozen_delta.val.writes.len(), 3);
1371 let snapshot = flusher.storage.snapshot().await.unwrap();
1372 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1373
1374 coordinator.stop().await;
1376 }
1377
1378 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1379 async fn should_skip_flush_when_no_new_writes() {
1380 let flusher = TestFlusher::default();
1382 let mut coordinator = WriteCoordinator::new(
1383 test_config(),
1384 vec!["default".to_string()],
1385 TestContext::default(),
1386 flusher.initial_snapshot().await,
1387 flusher.clone(),
1388 );
1389 let handle = coordinator.handle("default");
1390 coordinator.start();
1391
1392 let mut write = handle
1394 .try_write(TestWrite {
1395 key: "a".into(),
1396 value: 1,
1397 size: 10,
1398 })
1399 .await
1400 .unwrap();
1401 handle.flush(false).await.unwrap();
1402 write.wait(Durability::Written).await.unwrap();
1403
1404 handle.flush(false).await.unwrap();
1406
1407 let sync_write = handle
1410 .try_write(TestWrite {
1411 key: "sync".into(),
1412 value: 0,
1413 size: 1,
1414 })
1415 .await
1416 .unwrap();
1417 sync_write.epoch().await.unwrap();
1418
1419 assert_eq!(flusher.flushed_events().len(), 1);
1421
1422 coordinator.stop().await;
1424 }
1425
1426 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1427 async fn should_update_written_watermark_after_flush() {
1428 let flusher = TestFlusher::default();
1430 let mut coordinator = WriteCoordinator::new(
1431 test_config(),
1432 vec!["default".to_string()],
1433 TestContext::default(),
1434 flusher.initial_snapshot().await,
1435 flusher,
1436 );
1437 let handle = coordinator.handle("default");
1438 coordinator.start();
1439
1440 let mut write_handle = handle
1442 .try_write(TestWrite {
1443 key: "a".into(),
1444 value: 1,
1445 size: 10,
1446 })
1447 .await
1448 .unwrap();
1449
1450 handle.flush(false).await.unwrap();
1451
1452 let result = write_handle.wait(Durability::Written).await;
1454 assert!(result.is_ok());
1455
1456 coordinator.stop().await;
1458 }
1459
1460 #[tokio::test(start_paused = true)]
1465 async fn should_flush_on_flush_interval() {
1466 let flusher = TestFlusher::default();
1468 let config = WriteCoordinatorConfig {
1469 queue_capacity: 100,
1470 flush_interval: Duration::from_millis(100),
1471 flush_size_threshold: usize::MAX,
1472 };
1473 let mut coordinator = WriteCoordinator::new(
1474 config,
1475 vec!["default".to_string()],
1476 TestContext::default(),
1477 flusher.initial_snapshot().await,
1478 flusher.clone(),
1479 );
1480 let handle = coordinator.handle("default");
1481 coordinator.start();
1482
1483 tokio::task::yield_now().await;
1485 let mut write = handle
1486 .try_write(TestWrite {
1487 key: "a".into(),
1488 value: 1,
1489 size: 10,
1490 })
1491 .await
1492 .unwrap();
1493 write.wait(Durability::Applied).await.unwrap();
1494
1495 assert_eq!(flusher.flushed_events().len(), 0);
1497
1498 tokio::time::advance(Duration::from_millis(150)).await;
1500 tokio::task::yield_now().await;
1501
1502 assert_eq!(flusher.flushed_events().len(), 1);
1504 let snapshot = flusher.storage.snapshot().await.unwrap();
1505 assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1506
1507 coordinator.stop().await;
1509 }
1510
1511 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1516 async fn should_flush_when_size_threshold_exceeded() {
1517 let flusher = TestFlusher::default();
1519 let config = WriteCoordinatorConfig {
1520 queue_capacity: 100,
1521 flush_interval: Duration::from_secs(3600),
1522 flush_size_threshold: 100, };
1524 let mut coordinator = WriteCoordinator::new(
1525 config,
1526 vec!["default".to_string()],
1527 TestContext::default(),
1528 flusher.initial_snapshot().await,
1529 flusher.clone(),
1530 );
1531 let handle = coordinator.handle("default");
1532 coordinator.start();
1533
1534 let mut write = handle
1536 .try_write(TestWrite {
1537 key: "a".into(),
1538 value: 1,
1539 size: 150,
1540 })
1541 .await
1542 .unwrap();
1543 write.wait(Durability::Written).await.unwrap();
1544
1545 assert_eq!(flusher.flushed_events().len(), 1);
1547
1548 coordinator.stop().await;
1550 }
1551
1552 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1553 async fn should_accumulate_until_threshold() {
1554 let flusher = TestFlusher::default();
1556 let config = WriteCoordinatorConfig {
1557 queue_capacity: 100,
1558 flush_interval: Duration::from_secs(3600),
1559 flush_size_threshold: 100,
1560 };
1561 let mut coordinator = WriteCoordinator::new(
1562 config,
1563 vec!["default".to_string()],
1564 TestContext::default(),
1565 flusher.initial_snapshot().await,
1566 flusher.clone(),
1567 );
1568 let handle = coordinator.handle("default");
1569 coordinator.start();
1570
1571 for i in 0..5 {
1573 let mut w = handle
1574 .try_write(TestWrite {
1575 key: format!("key{}", i),
1576 value: i,
1577 size: 15,
1578 })
1579 .await
1580 .unwrap();
1581 w.wait(Durability::Applied).await.unwrap();
1582 }
1583
1584 assert_eq!(flusher.flushed_events().len(), 0);
1586
1587 let mut final_write = handle
1589 .try_write(TestWrite {
1590 key: "final".into(),
1591 value: 999,
1592 size: 30,
1593 })
1594 .await
1595 .unwrap();
1596 final_write.wait(Durability::Written).await.unwrap();
1597
1598 assert_eq!(flusher.flushed_events().len(), 1);
1600
1601 coordinator.stop().await;
1603 }
1604
1605 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1610 async fn should_accept_writes_during_flush() {
1611 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1613 let mut coordinator = WriteCoordinator::new(
1614 test_config(),
1615 vec!["default".to_string()],
1616 TestContext::default(),
1617 flusher.initial_snapshot().await,
1618 flusher.clone(),
1619 );
1620 let handle = coordinator.handle("default");
1621 coordinator.start();
1622
1623 let write1 = handle
1625 .try_write(TestWrite {
1626 key: "a".into(),
1627 value: 1,
1628 size: 10,
1629 })
1630 .await
1631 .unwrap();
1632 handle.flush(false).await.unwrap();
1633 flush_started_rx.await.unwrap(); let write2 = handle
1637 .try_write(TestWrite {
1638 key: "b".into(),
1639 value: 2,
1640 size: 10,
1641 })
1642 .await
1643 .unwrap();
1644 assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1645
1646 unblock_tx.send(()).await.unwrap();
1648 coordinator.stop().await;
1649 }
1650
1651 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1652 async fn should_assign_new_epochs_during_flush() {
1653 let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1655 let mut coordinator = WriteCoordinator::new(
1656 test_config(),
1657 vec!["default".to_string()],
1658 TestContext::default(),
1659 flusher.initial_snapshot().await,
1660 flusher.clone(),
1661 );
1662 let handle = coordinator.handle("default");
1663 coordinator.start();
1664
1665 handle
1667 .try_write(TestWrite {
1668 key: "a".into(),
1669 value: 1,
1670 size: 10,
1671 })
1672 .await
1673 .unwrap();
1674 handle.flush(false).await.unwrap();
1675 flush_started_rx.await.unwrap(); let w1 = handle
1679 .try_write(TestWrite {
1680 key: "b".into(),
1681 value: 2,
1682 size: 10,
1683 })
1684 .await
1685 .unwrap();
1686 let w2 = handle
1687 .try_write(TestWrite {
1688 key: "c".into(),
1689 value: 3,
1690 size: 10,
1691 })
1692 .await
1693 .unwrap();
1694
1695 let e1 = w1.epoch().await.unwrap();
1697 let e2 = w2.epoch().await.unwrap();
1698 assert!(e1 < e2);
1699
1700 unblock_tx.send(()).await.unwrap();
1702 coordinator.stop().await;
1703 }
1704
1705 #[tokio::test]
1710 async fn should_return_backpressure_when_queue_full() {
1711 let flusher = TestFlusher::default();
1713 let config = WriteCoordinatorConfig {
1714 queue_capacity: 2,
1715 flush_interval: Duration::from_secs(3600),
1716 flush_size_threshold: usize::MAX,
1717 };
1718 let mut coordinator = WriteCoordinator::new(
1719 config,
1720 vec!["default".to_string()],
1721 TestContext::default(),
1722 flusher.initial_snapshot().await,
1723 flusher.clone(),
1724 );
1725 let handle = coordinator.handle("default");
1726 let _ = handle
1730 .try_write(TestWrite {
1731 key: "a".into(),
1732 value: 1,
1733 size: 10,
1734 })
1735 .await;
1736 let _ = handle
1737 .try_write(TestWrite {
1738 key: "b".into(),
1739 value: 2,
1740 size: 10,
1741 })
1742 .await;
1743
1744 let result = handle
1746 .try_write(TestWrite {
1747 key: "c".into(),
1748 value: 3,
1749 size: 10,
1750 })
1751 .await;
1752
1753 assert!(matches!(result, Err(WriteError::Backpressure(_))));
1755 }
1756
1757 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1758 async fn should_accept_writes_after_queue_drains() {
1759 let flusher = TestFlusher::default();
1761 let config = WriteCoordinatorConfig {
1762 queue_capacity: 2,
1763 flush_interval: Duration::from_secs(3600),
1764 flush_size_threshold: usize::MAX,
1765 };
1766 let mut coordinator = WriteCoordinator::new(
1767 config,
1768 vec!["default".to_string()],
1769 TestContext::default(),
1770 flusher.initial_snapshot().await,
1771 flusher.clone(),
1772 );
1773 let handle = coordinator.handle("default");
1774
1775 let _ = handle
1777 .try_write(TestWrite {
1778 key: "a".into(),
1779 value: 1,
1780 size: 10,
1781 })
1782 .await;
1783 let mut write_b = handle
1784 .try_write(TestWrite {
1785 key: "b".into(),
1786 value: 2,
1787 size: 10,
1788 })
1789 .await
1790 .unwrap();
1791
1792 coordinator.start();
1794 write_b.wait(Durability::Applied).await.unwrap();
1795
1796 let result = handle
1798 .try_write(TestWrite {
1799 key: "c".into(),
1800 value: 3,
1801 size: 10,
1802 })
1803 .await;
1804 assert!(result.is_ok());
1805
1806 coordinator.stop().await;
1808 }
1809
1810 #[tokio::test]
1815 async fn should_shutdown_cleanly_when_stop_called() {
1816 let flusher = TestFlusher::default();
1818 let mut coordinator = WriteCoordinator::new(
1819 test_config(),
1820 vec!["default".to_string()],
1821 TestContext::default(),
1822 flusher.initial_snapshot().await,
1823 flusher.clone(),
1824 );
1825 let handle = coordinator.handle("default");
1826 coordinator.start();
1827
1828 let result = coordinator.stop().await;
1830
1831 assert!(result.is_ok());
1833 }
1834
1835 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1836 async fn should_flush_pending_writes_on_shutdown() {
1837 let flusher = TestFlusher::default();
1839 let config = WriteCoordinatorConfig {
1840 queue_capacity: 100,
1841 flush_interval: Duration::from_secs(3600), flush_size_threshold: usize::MAX, };
1844 let mut coordinator = WriteCoordinator::new(
1845 config,
1846 vec!["default".to_string()],
1847 TestContext::default(),
1848 flusher.initial_snapshot().await,
1849 flusher.clone(),
1850 );
1851 let handle = coordinator.handle("default");
1852 coordinator.start();
1853
1854 let write = handle
1856 .try_write(TestWrite {
1857 key: "a".into(),
1858 value: 1,
1859 size: 10,
1860 })
1861 .await
1862 .unwrap();
1863 let epoch = write.epoch().await.unwrap();
1864
1865 coordinator.stop().await;
1867
1868 let events = flusher.flushed_events();
1870 assert_eq!(events.len(), 1);
1871 let epoch_range = &events[0].epoch_range;
1872 assert!(epoch_range.contains(&epoch));
1873 }
1874
1875 #[tokio::test]
1876 async fn should_return_shutdown_error_after_coordinator_stops() {
1877 let flusher = TestFlusher::default();
1879 let mut coordinator = WriteCoordinator::new(
1880 test_config(),
1881 vec!["default".to_string()],
1882 TestContext::default(),
1883 flusher.initial_snapshot().await,
1884 flusher.clone(),
1885 );
1886 let handle = coordinator.handle("default");
1887 coordinator.start();
1888
1889 coordinator.stop().await;
1891
1892 let result = handle
1894 .try_write(TestWrite {
1895 key: "a".into(),
1896 value: 1,
1897 size: 10,
1898 })
1899 .await;
1900
1901 assert!(matches!(result, Err(WriteError::Shutdown)));
1903 }
1904
1905 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1910 async fn should_track_epoch_range_in_flush_event() {
1911 let flusher = TestFlusher::default();
1913 let mut coordinator = WriteCoordinator::new(
1914 test_config(),
1915 vec!["default".to_string()],
1916 TestContext::default(),
1917 flusher.initial_snapshot().await,
1918 flusher.clone(),
1919 );
1920 let handle = coordinator.handle("default");
1921 coordinator.start();
1922
1923 handle
1925 .try_write(TestWrite {
1926 key: "a".into(),
1927 value: 1,
1928 size: 10,
1929 })
1930 .await
1931 .unwrap();
1932 handle
1933 .try_write(TestWrite {
1934 key: "b".into(),
1935 value: 2,
1936 size: 10,
1937 })
1938 .await
1939 .unwrap();
1940 let mut last_write = handle
1941 .try_write(TestWrite {
1942 key: "c".into(),
1943 value: 3,
1944 size: 10,
1945 })
1946 .await
1947 .unwrap();
1948
1949 handle.flush(false).await.unwrap();
1950 last_write.wait(Durability::Written).await.unwrap();
1951
1952 let events = flusher.flushed_events();
1954 assert_eq!(events.len(), 1);
1955 let epoch_range = &events[0].epoch_range;
1956 assert_eq!(epoch_range.start, 1);
1957 assert_eq!(epoch_range.end, 4); coordinator.stop().await;
1961 }
1962
1963 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1964 async fn should_have_contiguous_epoch_ranges() {
1965 let flusher = TestFlusher::default();
1967 let mut coordinator = WriteCoordinator::new(
1968 test_config(),
1969 vec!["default".to_string()],
1970 TestContext::default(),
1971 flusher.initial_snapshot().await,
1972 flusher.clone(),
1973 );
1974 let handle = coordinator.handle("default");
1975 coordinator.start();
1976
1977 handle
1979 .try_write(TestWrite {
1980 key: "a".into(),
1981 value: 1,
1982 size: 10,
1983 })
1984 .await
1985 .unwrap();
1986 let mut write2 = handle
1987 .try_write(TestWrite {
1988 key: "b".into(),
1989 value: 2,
1990 size: 10,
1991 })
1992 .await
1993 .unwrap();
1994 handle.flush(false).await.unwrap();
1995 write2.wait(Durability::Written).await.unwrap();
1996
1997 let mut write3 = handle
1999 .try_write(TestWrite {
2000 key: "c".into(),
2001 value: 3,
2002 size: 10,
2003 })
2004 .await
2005 .unwrap();
2006 handle.flush(false).await.unwrap();
2007 write3.wait(Durability::Written).await.unwrap();
2008
2009 let events = flusher.flushed_events();
2011 assert_eq!(events.len(), 2);
2012
2013 let range1 = &events[0].epoch_range;
2014 let range2 = &events[1].epoch_range;
2015
2016 assert_eq!(range1.end, range2.start);
2018 assert_eq!(range1, &(1..3));
2019 assert_eq!(range2, &(3..4));
2020
2021 coordinator.stop().await;
2023 }
2024
2025 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2026 async fn should_include_exact_epochs_in_range() {
2027 let flusher = TestFlusher::default();
2029 let mut coordinator = WriteCoordinator::new(
2030 test_config(),
2031 vec!["default".to_string()],
2032 TestContext::default(),
2033 flusher.initial_snapshot().await,
2034 flusher.clone(),
2035 );
2036 let handle = coordinator.handle("default");
2037 coordinator.start();
2038
2039 let write1 = handle
2041 .try_write(TestWrite {
2042 key: "a".into(),
2043 value: 1,
2044 size: 10,
2045 })
2046 .await
2047 .unwrap();
2048 let epoch1 = write1.epoch().await.unwrap();
2049
2050 let mut write2 = handle
2051 .try_write(TestWrite {
2052 key: "b".into(),
2053 value: 2,
2054 size: 10,
2055 })
2056 .await
2057 .unwrap();
2058 let epoch2 = write2.epoch().await.unwrap();
2059
2060 handle.flush(false).await.unwrap();
2061 write2.wait(Durability::Written).await.unwrap();
2062
2063 let events = flusher.flushed_events();
2065 assert_eq!(events.len(), 1);
2066 let epoch_range = &events[0].epoch_range;
2067
2068 assert_eq!(epoch_range.start, epoch1);
2070 assert_eq!(epoch_range.end, epoch2 + 1);
2072 assert!(epoch_range.contains(&epoch1));
2074 assert!(epoch_range.contains(&epoch2));
2075
2076 coordinator.stop().await;
2078 }
2079
2080 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2085 async fn should_preserve_context_across_flushes() {
2086 let flusher = TestFlusher::default();
2088 let mut coordinator = WriteCoordinator::new(
2089 test_config(),
2090 vec!["default".to_string()],
2091 TestContext::default(),
2092 flusher.initial_snapshot().await,
2093 flusher.clone(),
2094 );
2095 let handle = coordinator.handle("default");
2096 coordinator.start();
2097
2098 let mut write1 = handle
2100 .try_write(TestWrite {
2101 key: "a".into(),
2102 value: 1,
2103 size: 10,
2104 })
2105 .await
2106 .unwrap();
2107 handle.flush(false).await.unwrap();
2108 write1.wait(Durability::Written).await.unwrap();
2109
2110 let mut write2 = handle
2112 .try_write(TestWrite {
2113 key: "a".into(),
2114 value: 2,
2115 size: 10,
2116 })
2117 .await
2118 .unwrap();
2119 handle.flush(false).await.unwrap();
2120 write2.wait(Durability::Written).await.unwrap();
2121
2122 let events = flusher.flushed_events();
2124 assert_eq!(events.len(), 2);
2125
2126 let (seq1, _) = events[0].val.writes.get("a").unwrap();
2128 assert_eq!(*seq1, 0);
2129
2130 let (seq2, _) = events[1].val.writes.get("a").unwrap();
2132 assert_eq!(*seq2, 1);
2133
2134 coordinator.stop().await;
2136 }
2137
2138 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2143 async fn should_receive_view_on_subscribe() {
2144 let flusher = TestFlusher::default();
2146 let mut coordinator = WriteCoordinator::new(
2147 test_config(),
2148 vec!["default".to_string()],
2149 TestContext::default(),
2150 flusher.initial_snapshot().await,
2151 flusher.clone(),
2152 );
2153 let handle = coordinator.handle("default");
2154 let (mut subscriber, _) = coordinator.subscribe();
2155 subscriber.initialize();
2156 coordinator.start();
2157
2158 handle
2160 .try_write(TestWrite {
2161 key: "a".into(),
2162 value: 1,
2163 size: 10,
2164 })
2165 .await
2166 .unwrap();
2167 handle.flush(false).await.unwrap();
2168
2169 let result = subscriber.recv().await;
2171 assert!(result.is_ok());
2172
2173 coordinator.stop().await;
2175 }
2176
2177 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2178 async fn should_include_snapshot_in_view_after_flush() {
2179 let flusher = TestFlusher::default();
2181 let mut coordinator = WriteCoordinator::new(
2182 test_config(),
2183 vec!["default".to_string()],
2184 TestContext::default(),
2185 flusher.initial_snapshot().await,
2186 flusher.clone(),
2187 );
2188 let handle = coordinator.handle("default");
2189 let (mut subscriber, _) = coordinator.subscribe();
2190 subscriber.initialize();
2191 coordinator.start();
2192
2193 handle
2195 .try_write(TestWrite {
2196 key: "a".into(),
2197 value: 1,
2198 size: 10,
2199 })
2200 .await
2201 .unwrap();
2202 handle.flush(false).await.unwrap();
2203
2204 let _ = subscriber.recv().await.unwrap();
2206 let result = subscriber.recv().await.unwrap();
2208
2209 assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2211
2212 coordinator.stop().await;
2214 }
2215
2216 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2217 async fn should_include_delta_in_view_after_flush() {
2218 let flusher = TestFlusher::default();
2220 let mut coordinator = WriteCoordinator::new(
2221 test_config(),
2222 vec!["default".to_string()],
2223 TestContext::default(),
2224 flusher.initial_snapshot().await,
2225 flusher.clone(),
2226 );
2227 let handle = coordinator.handle("default");
2228 let (mut subscriber, _) = coordinator.subscribe();
2229 subscriber.initialize();
2230 coordinator.start();
2231
2232 handle
2234 .try_write(TestWrite {
2235 key: "a".into(),
2236 value: 42,
2237 size: 10,
2238 })
2239 .await
2240 .unwrap();
2241 handle.flush(false).await.unwrap();
2242
2243 let _ = subscriber.recv().await.unwrap();
2245 let result = subscriber.recv().await.unwrap();
2247
2248 let flushed = result.last_written_delta.as_ref().unwrap();
2250 assert_eq!(flushed.val.get("a"), Some(&42));
2251
2252 coordinator.stop().await;
2254 }
2255
2256 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2257 async fn should_include_epoch_range_in_view_after_flush() {
2258 let flusher = TestFlusher::default();
2260 let mut coordinator = WriteCoordinator::new(
2261 test_config(),
2262 vec!["default".to_string()],
2263 TestContext::default(),
2264 flusher.initial_snapshot().await,
2265 flusher.clone(),
2266 );
2267 let handle = coordinator.handle("default");
2268 let (mut subscriber, _) = coordinator.subscribe();
2269 subscriber.initialize();
2270 coordinator.start();
2271
2272 let write1 = handle
2274 .try_write(TestWrite {
2275 key: "a".into(),
2276 value: 1,
2277 size: 10,
2278 })
2279 .await
2280 .unwrap();
2281 let write2 = handle
2282 .try_write(TestWrite {
2283 key: "b".into(),
2284 value: 2,
2285 size: 10,
2286 })
2287 .await
2288 .unwrap();
2289 handle.flush(false).await.unwrap();
2290
2291 let _ = subscriber.recv().await.unwrap();
2293 let result = subscriber.recv().await.unwrap();
2295
2296 let flushed = result.last_written_delta.as_ref().unwrap();
2298 let epoch1 = write1.epoch().await.unwrap();
2299 let epoch2 = write2.epoch().await.unwrap();
2300 assert!(flushed.epoch_range.contains(&epoch1));
2301 assert!(flushed.epoch_range.contains(&epoch2));
2302 assert_eq!(flushed.epoch_range.start, epoch1);
2303 assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2304
2305 coordinator.stop().await;
2307 }
2308
2309 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2310 async fn should_broadcast_frozen_delta_on_freeze() {
2311 let flusher = TestFlusher::default();
2313 let mut coordinator = WriteCoordinator::new(
2314 test_config(),
2315 vec!["default".to_string()],
2316 TestContext::default(),
2317 flusher.initial_snapshot().await,
2318 flusher.clone(),
2319 );
2320 let handle = coordinator.handle("default");
2321 let (mut subscriber, _) = coordinator.subscribe();
2322 subscriber.initialize();
2323 coordinator.start();
2324
2325 handle
2327 .try_write(TestWrite {
2328 key: "a".into(),
2329 value: 1,
2330 size: 10,
2331 })
2332 .await
2333 .unwrap();
2334 handle.flush(false).await.unwrap();
2335
2336 let state = subscriber.recv().await.unwrap();
2338 assert_eq!(state.frozen.len(), 1);
2339 assert!(state.frozen[0].val.contains_key("a"));
2340
2341 coordinator.stop().await;
2343 }
2344
2345 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2346 async fn should_remove_frozen_delta_after_flush_complete() {
2347 let flusher = TestFlusher::default();
2349 let mut coordinator = WriteCoordinator::new(
2350 test_config(),
2351 vec!["default".to_string()],
2352 TestContext::default(),
2353 flusher.initial_snapshot().await,
2354 flusher.clone(),
2355 );
2356 let handle = coordinator.handle("default");
2357 let (mut subscriber, _) = coordinator.subscribe();
2358 subscriber.initialize();
2359 coordinator.start();
2360
2361 handle
2363 .try_write(TestWrite {
2364 key: "a".into(),
2365 value: 1,
2366 size: 10,
2367 })
2368 .await
2369 .unwrap();
2370 handle.flush(false).await.unwrap();
2371
2372 let state1 = subscriber.recv().await.unwrap();
2374 assert_eq!(state1.frozen.len(), 1);
2375
2376 let state2 = subscriber.recv().await.unwrap();
2378 assert_eq!(state2.frozen.len(), 0);
2379 assert!(state2.last_written_delta.is_some());
2380
2381 coordinator.stop().await;
2383 }
2384
2385 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2386 async fn should_recover_from_message_lost_subscriber() {
2387 let flusher = TestFlusher::default();
2389 let mut coordinator = WriteCoordinator::new(
2390 test_config(),
2391 vec!["default".to_string()],
2392 TestContext::default(),
2393 flusher.initial_snapshot().await,
2394 flusher.clone(),
2395 );
2396 let handle = coordinator.handle("default");
2397 let (mut subscriber, _) = coordinator.subscribe();
2398 subscriber.initialize();
2399 coordinator.start();
2400
2401 for i in 0..20 {
2405 let _write_handle = handle
2406 .try_write(TestWrite {
2407 key: format!("key_{}", i),
2408 value: i as u64,
2409 size: 10,
2410 })
2411 .await
2412 .unwrap();
2413 let _ = handle.flush(false).await.unwrap();
2414 }
2415
2416 let mut watermark = handle
2418 .flush(true)
2419 .await
2420 .expect("flush(true) should succeed");
2421
2422 watermark.wait(Durability::Durable).await;
2424
2425 let result = subscriber
2427 .recv()
2428 .await
2429 .expect_err("expected recv() to yield an error");
2430 assert!(matches!(result, SubscribeError::MessageLost));
2431
2432 let (rx, initial_view) = handle.subscribe();
2434 (subscriber, _) = ViewSubscriber::new(rx, initial_view);
2435 let view = subscriber.initialize();
2436
2437 let records = view.snapshot.scan(BytesRange::unbounded()).await.unwrap();
2440 assert!(
2441 records.len() >= 20,
2442 "expected at least 20 rows, got {}",
2443 records.len()
2444 );
2445
2446 let _write_handle = handle
2448 .try_write(TestWrite {
2449 key: "post_recovery".into(),
2450 value: 100,
2451 size: 10,
2452 })
2453 .await
2454 .unwrap();
2455 let _ = handle.flush(false).await.unwrap();
2456
2457 let result = subscriber.recv().await;
2458 assert!(result.is_ok());
2459
2460 coordinator.stop().await;
2462 }
2463
2464 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2469 async fn should_flush_even_when_no_writes_if_flush_storage() {
2470 let flusher = TestFlusher::default();
2472 let storage = Arc::new(InMemoryStorage::new());
2473 let snapshot = storage.snapshot().await.unwrap();
2474 let mut coordinator = WriteCoordinator::new(
2475 test_config(),
2476 vec!["default".to_string()],
2477 TestContext::default(),
2478 snapshot,
2479 flusher.clone(),
2480 );
2481 let handle = coordinator.handle("default");
2482 coordinator.start();
2483
2484 let mut flush_handle = handle.flush(true).await.unwrap();
2486 flush_handle.wait(Durability::Durable).await.unwrap();
2487
2488 assert_eq!(flusher.flushed_events().len(), 0);
2491
2492 coordinator.stop().await;
2494 }
2495
2496 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2497 async fn should_advance_durable_watermark() {
2498 let flusher = TestFlusher::default();
2500 let storage = Arc::new(InMemoryStorage::new());
2501 let snapshot = storage.snapshot().await.unwrap();
2502 let mut coordinator = WriteCoordinator::new(
2503 test_config(),
2504 vec!["default".to_string()],
2505 TestContext::default(),
2506 snapshot,
2507 flusher.clone(),
2508 );
2509 let handle = coordinator.handle("default");
2510 coordinator.start();
2511
2512 let mut write = handle
2514 .try_write(TestWrite {
2515 key: "a".into(),
2516 value: 1,
2517 size: 10,
2518 })
2519 .await
2520 .unwrap();
2521 let mut flush_handle = handle.flush(true).await.unwrap();
2522
2523 flush_handle.wait(Durability::Durable).await.unwrap();
2525 write.wait(Durability::Durable).await.unwrap();
2526 assert_eq!(flusher.flushed_events().len(), 1);
2527
2528 coordinator.stop().await;
2530 }
2531
2532 #[tokio::test]
2533 async fn should_see_applied_write_via_view() {
2534 let flusher = TestFlusher::default();
2536 let mut coordinator = WriteCoordinator::new(
2537 test_config(),
2538 vec!["default".to_string()],
2539 TestContext::default(),
2540 flusher.initial_snapshot().await,
2541 flusher,
2542 );
2543 let handle = coordinator.handle("default");
2544 coordinator.start();
2545
2546 let mut write = handle
2548 .try_write(TestWrite {
2549 key: "a".into(),
2550 value: 42,
2551 size: 10,
2552 })
2553 .await
2554 .unwrap();
2555 write.wait(Durability::Applied).await.unwrap();
2556
2557 let view = coordinator.view();
2559 assert_eq!(view.current.get("a"), Some(42));
2560
2561 coordinator.stop().await;
2563 }
2564
2565 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2570 async fn should_flush_writes_from_multiple_channels() {
2571 let flusher = TestFlusher::default();
2573 let mut coordinator = WriteCoordinator::new(
2574 test_config(),
2575 vec!["ch1".to_string(), "ch2".to_string()],
2576 TestContext::default(),
2577 flusher.initial_snapshot().await,
2578 flusher.clone(),
2579 );
2580 let ch1 = coordinator.handle("ch1");
2581 let ch2 = coordinator.handle("ch2");
2582 coordinator.start();
2583
2584 let mut w1 = ch1
2587 .try_write(TestWrite {
2588 key: "a".into(),
2589 value: 10,
2590 size: 10,
2591 })
2592 .await
2593 .unwrap();
2594 w1.wait(Durability::Applied).await.unwrap();
2595
2596 let mut w2 = ch2
2597 .try_write(TestWrite {
2598 key: "b".into(),
2599 value: 20,
2600 size: 10,
2601 })
2602 .await
2603 .unwrap();
2604 w2.wait(Durability::Applied).await.unwrap();
2605
2606 let mut w3 = ch1
2607 .try_write(TestWrite {
2608 key: "c".into(),
2609 value: 30,
2610 size: 10,
2611 })
2612 .await
2613 .unwrap();
2614 w3.wait(Durability::Applied).await.unwrap();
2615
2616 ch1.flush(false).await.unwrap();
2617 w3.wait(Durability::Written).await.unwrap();
2618
2619 let snapshot = flusher.storage.snapshot().await.unwrap();
2621 assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2622
2623 coordinator.stop().await;
2625 }
2626
2627 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2628 async fn should_succeed_with_write_timeout_when_queue_has_space() {
2629 let flusher = TestFlusher::default();
2631 let mut coordinator = WriteCoordinator::new(
2632 test_config(),
2633 vec!["default".to_string()],
2634 TestContext::default(),
2635 flusher.initial_snapshot().await,
2636 flusher.clone(),
2637 );
2638 let handle = coordinator.handle("default");
2639 coordinator.start();
2640
2641 let mut wh = handle
2643 .write_timeout(
2644 TestWrite {
2645 key: "a".into(),
2646 value: 1,
2647 size: 10,
2648 },
2649 Duration::from_secs(1),
2650 )
2651 .await
2652 .unwrap();
2653
2654 let result = wh.wait(Durability::Applied).await;
2656 assert!(result.is_ok());
2657
2658 coordinator.stop().await;
2660 }
2661
2662 #[tokio::test]
2663 async fn should_timeout_when_queue_full() {
2664 let flusher = TestFlusher::default();
2666 let config = WriteCoordinatorConfig {
2667 queue_capacity: 2,
2668 flush_interval: Duration::from_secs(3600),
2669 flush_size_threshold: usize::MAX,
2670 };
2671 let mut coordinator = WriteCoordinator::new(
2672 config,
2673 vec!["default".to_string()],
2674 TestContext::default(),
2675 flusher.initial_snapshot().await,
2676 flusher.clone(),
2677 );
2678 let handle = coordinator.handle("default");
2679
2680 let _ = handle
2682 .try_write(TestWrite {
2683 key: "a".into(),
2684 value: 1,
2685 size: 10,
2686 })
2687 .await;
2688 let _ = handle
2689 .try_write(TestWrite {
2690 key: "b".into(),
2691 value: 2,
2692 size: 10,
2693 })
2694 .await;
2695
2696 let result = handle
2698 .write_timeout(
2699 TestWrite {
2700 key: "c".into(),
2701 value: 3,
2702 size: 10,
2703 },
2704 Duration::from_millis(10),
2705 )
2706 .await;
2707
2708 assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2710 }
2711
2712 #[tokio::test]
2713 async fn should_return_write_in_timeout_error() {
2714 let flusher = TestFlusher::default();
2716 let config = WriteCoordinatorConfig {
2717 queue_capacity: 1,
2718 flush_interval: Duration::from_secs(3600),
2719 flush_size_threshold: usize::MAX,
2720 };
2721 let mut coordinator = WriteCoordinator::new(
2722 config,
2723 vec!["default".to_string()],
2724 TestContext::default(),
2725 flusher.initial_snapshot().await,
2726 flusher.clone(),
2727 );
2728 let handle = coordinator.handle("default");
2729
2730 let _ = handle
2732 .try_write(TestWrite {
2733 key: "a".into(),
2734 value: 1,
2735 size: 10,
2736 })
2737 .await;
2738
2739 let result = handle
2741 .write_timeout(
2742 TestWrite {
2743 key: "retry_me".into(),
2744 value: 42,
2745 size: 10,
2746 },
2747 Duration::from_millis(10),
2748 )
2749 .await;
2750 let Err(err) = result else {
2751 panic!("expected TimeoutError");
2752 };
2753
2754 let write = err.into_inner().expect("should contain the write");
2756 assert_eq!(write.key, "retry_me");
2757 assert_eq!(write.value, 42);
2758 }
2759
2760 #[tokio::test]
2761 async fn should_return_write_in_backpressure_error() {
2762 let flusher = TestFlusher::default();
2764 let config = WriteCoordinatorConfig {
2765 queue_capacity: 1,
2766 flush_interval: Duration::from_secs(3600),
2767 flush_size_threshold: usize::MAX,
2768 };
2769 let mut coordinator = WriteCoordinator::new(
2770 config,
2771 vec!["default".to_string()],
2772 TestContext::default(),
2773 flusher.initial_snapshot().await,
2774 flusher.clone(),
2775 );
2776 let handle = coordinator.handle("default");
2777
2778 let _ = handle
2780 .try_write(TestWrite {
2781 key: "a".into(),
2782 value: 1,
2783 size: 10,
2784 })
2785 .await;
2786
2787 let result = handle
2789 .try_write(TestWrite {
2790 key: "retry_me".into(),
2791 value: 42,
2792 size: 10,
2793 })
2794 .await;
2795 let Err(err) = result else {
2796 panic!("expected Backpressure");
2797 };
2798
2799 let write = err.into_inner().expect("should contain the write");
2801 assert_eq!(write.key, "retry_me");
2802 assert_eq!(write.value, 42);
2803 }
2804
2805 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2806 async fn should_succeed_when_queue_drains_within_timeout() {
2807 let flusher = TestFlusher::default();
2809 let config = WriteCoordinatorConfig {
2810 queue_capacity: 2,
2811 flush_interval: Duration::from_secs(3600),
2812 flush_size_threshold: usize::MAX,
2813 };
2814 let mut coordinator = WriteCoordinator::new(
2815 config,
2816 vec!["default".to_string()],
2817 TestContext::default(),
2818 flusher.initial_snapshot().await,
2819 flusher.clone(),
2820 );
2821 let handle = coordinator.handle("default");
2822
2823 let _ = handle
2825 .try_write(TestWrite {
2826 key: "a".into(),
2827 value: 1,
2828 size: 10,
2829 })
2830 .await;
2831 let _ = handle
2832 .try_write(TestWrite {
2833 key: "b".into(),
2834 value: 2,
2835 size: 10,
2836 })
2837 .await;
2838
2839 coordinator.start();
2841 let result = handle
2842 .write_timeout(
2843 TestWrite {
2844 key: "c".into(),
2845 value: 3,
2846 size: 10,
2847 },
2848 Duration::from_secs(5),
2849 )
2850 .await;
2851
2852 assert!(result.is_ok());
2854
2855 coordinator.stop().await;
2857 }
2858
2859 #[tokio::test]
2860 async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2861 let flusher = TestFlusher::default();
2863 let mut coordinator = WriteCoordinator::new(
2864 test_config(),
2865 vec!["default".to_string()],
2866 TestContext::default(),
2867 flusher.initial_snapshot().await,
2868 flusher.clone(),
2869 );
2870 let handle = coordinator.handle("default");
2871 coordinator.start();
2872
2873 coordinator.stop().await;
2875 let result = handle
2876 .write_timeout(
2877 TestWrite {
2878 key: "a".into(),
2879 value: 1,
2880 size: 10,
2881 },
2882 Duration::from_secs(1),
2883 )
2884 .await;
2885
2886 assert!(matches!(result, Err(WriteError::Shutdown)));
2888 }
2889
2890 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2891 async fn should_pause_and_resume_write_channel() {
2892 let flusher = TestFlusher::default();
2894 let mut config = test_config();
2895 config.flush_size_threshold = usize::MAX;
2896 config.flush_interval = Duration::from_hours(24);
2897 let mut coordinator = WriteCoordinator::new(
2898 config,
2899 vec!["a", "b"],
2900 TestContext::default(),
2901 flusher.initial_snapshot().await,
2902 flusher.clone(),
2903 );
2904 let handle_a = coordinator.handle("a");
2905 let handle_b = coordinator.handle("b");
2906 let pause_handle = coordinator.pause_handle("a");
2907
2908 pause_handle.pause();
2917 coordinator.start();
2918
2919 let mut result_a = handle_a
2921 .try_write(TestWrite {
2922 key: "a".into(),
2923 value: 1,
2924 size: 1,
2925 })
2926 .await
2927 .unwrap();
2928 for i in 0..1000 {
2929 handle_b
2930 .try_write(TestWrite {
2931 key: format!("b{}", i),
2932 value: i,
2933 size: 1,
2934 })
2935 .await
2936 .unwrap()
2937 .wait(Durability::Applied)
2938 .await
2939 .unwrap();
2940 }
2941
2942 let data = coordinator.view().current.data.lock().unwrap().clone();
2945 let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2946 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2947 pause_handle.unpause();
2948 result_a.wait(Durability::Applied).await.unwrap();
2950 let data = coordinator.view().current.data.lock().unwrap().clone();
2951 expected.insert("a".into());
2952 assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2953 }
2954}