Skip to main content

common/coordinator/
mod.rs

1#![allow(unused)]
2
3mod error;
4mod handle;
5mod traits;
6
7use std::collections::HashMap;
8use std::ops::Range;
9use std::ops::{Deref, DerefMut};
10
11pub use error::{WriteError, WriteResult};
12use futures::stream::{self, SelectAll, StreamExt};
13pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
14pub use traits::{Delta, Durability, Flusher};
15
16/// Event sent from the write coordinator task to the flush task.
17enum FlushEvent<D: Delta> {
18    /// Flush a frozen delta to storage.
19    FlushDelta { frozen: EpochStamped<D::Frozen> },
20    /// Ensure storage durability (e.g. call storage.flush()).
21    FlushStorage,
22}
23
24// Internal use only
25use crate::StorageRead;
26use crate::coordinator::traits::EpochStamped;
27use crate::storage::StorageSnapshot;
28pub(crate) use handle::EpochWatcher;
29use std::sync::{Arc, Mutex};
30use std::time::Duration;
31use tokio::sync::{broadcast, mpsc, oneshot, watch};
32use tokio::time::{Instant, Interval, interval_at};
33use tokio_util::sync::CancellationToken;
34
35/// Configuration for the write coordinator.
36#[derive(Debug, Clone)]
37pub struct WriteCoordinatorConfig {
38    /// Maximum number of pending writes in the queue.
39    pub queue_capacity: usize,
40    /// Interval at which to trigger automatic flushes.
41    pub flush_interval: Duration,
42    /// Delta size threshold at which to trigger a flush.
43    pub flush_size_threshold: usize,
44}
45
46impl Default for WriteCoordinatorConfig {
47    fn default() -> Self {
48        Self {
49            queue_capacity: 10_000,
50            flush_interval: Duration::from_secs(10),
51            flush_size_threshold: 64 * 1024 * 1024, // 64 MB
52        }
53    }
54}
55
56pub(crate) enum WriteCommand<D: Delta> {
57    Write {
58        write: D::Write,
59        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
60    },
61    Flush {
62        epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
63        flush_storage: bool,
64    },
65}
66
67/// The write coordinator manages write ordering, batching, and durability.
68///
69/// It accepts writes through `WriteCoordinatorHandle`, applies them to a `Delta`,
70/// and coordinates flushing through a `Flusher`.
71pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
72    handles: HashMap<String, WriteCoordinatorHandle<D>>,
73    stop_tok: CancellationToken,
74    tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
75    write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
76    view: Arc<BroadcastedView<D>>,
77}
78
79impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
80    pub fn new(
81        config: WriteCoordinatorConfig,
82        channels: Vec<String>,
83        initial_context: D::Context,
84        initial_snapshot: Arc<dyn StorageSnapshot>,
85        flusher: F,
86    ) -> WriteCoordinator<D, F> {
87        let (watermarks, watcher) = EpochWatermarks::new();
88        let watermarks = Arc::new(watermarks);
89
90        // Create a write channel per named input
91        let mut write_rxs = Vec::with_capacity(channels.len());
92        let mut handles = HashMap::new();
93        for name in channels {
94            let (write_tx, write_rx) = mpsc::channel(config.queue_capacity);
95            write_rxs.push(write_rx);
96            handles.insert(name, WriteCoordinatorHandle::new(write_tx, watcher.clone()));
97        }
98
99        // this is the channel that sends FlushEvents to be flushed
100        // by a background task so that the process of converting deltas
101        // to storage operations is non-blocking. for now, we apply no
102        // backpressure on this channel, so writes will block if more than
103        // one flush is pending
104        let (flush_tx, flush_rx) = mpsc::channel(2);
105
106        let flush_stop_tok = CancellationToken::new();
107        let stop_tok = CancellationToken::new();
108        let write_task = WriteCoordinatorTask::new(
109            config,
110            initial_context,
111            initial_snapshot,
112            write_rxs,
113            flush_tx,
114            watermarks.clone(),
115            stop_tok.clone(),
116            flush_stop_tok.clone(),
117        );
118
119        let view = write_task.view.clone();
120
121        let flush_task = FlushTask {
122            flusher,
123            stop_tok: flush_stop_tok,
124            flush_rx,
125            watermarks: watermarks.clone(),
126            view: view.clone(),
127            last_flushed_epoch: 0,
128        };
129
130        Self {
131            handles,
132            tasks: Some((write_task, flush_task)),
133            write_task_jh: None,
134            stop_tok,
135            view,
136        }
137    }
138
139    pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
140        self.handles
141            .get(name)
142            .expect("unknown channel name")
143            .clone()
144    }
145
146    pub fn start(&mut self) {
147        let Some((write_task, flush_task)) = self.tasks.take() else {
148            // already started
149            return;
150        };
151        let flush_task_jh = flush_task.run();
152        let write_task_jh = write_task.run(flush_task_jh);
153        self.write_task_jh = Some(write_task_jh);
154    }
155
156    pub async fn stop(mut self) -> Result<(), String> {
157        let Some(write_task_jh) = self.write_task_jh.take() else {
158            return Ok(());
159        };
160        self.stop_tok.cancel();
161        write_task_jh.await.map_err(|e| e.to_string())?
162    }
163
164    pub fn view(&self) -> Arc<View<D>> {
165        self.view.current()
166    }
167
168    pub fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
169        self.view.subscribe()
170    }
171}
172
173struct WriteCoordinatorTask<D: Delta> {
174    config: WriteCoordinatorConfig,
175    delta: CurrentDelta<D>,
176    flush_tx: mpsc::Sender<FlushEvent<D>>,
177    write_rxs: Vec<mpsc::Receiver<WriteCommand<D>>>,
178    watermarks: Arc<EpochWatermarks>,
179    view: Arc<BroadcastedView<D>>,
180    epoch: u64,
181    delta_start_epoch: u64,
182    flush_interval: Interval,
183    stop_tok: CancellationToken,
184    flush_stop_tok: CancellationToken,
185}
186
187impl<D: Delta> WriteCoordinatorTask<D> {
188    /// Create a new write coordinator with the given flusher.
189    ///
190    /// This is useful for testing with mock flushers.
191    #[allow(clippy::too_many_arguments)]
192    pub fn new(
193        config: WriteCoordinatorConfig,
194        initial_context: D::Context,
195        initial_snapshot: Arc<dyn StorageSnapshot>,
196        write_rxs: Vec<mpsc::Receiver<WriteCommand<D>>>,
197        flush_tx: mpsc::Sender<FlushEvent<D>>,
198        watermarks: Arc<EpochWatermarks>,
199        stop_tok: CancellationToken,
200        flush_stop_tok: CancellationToken,
201    ) -> Self {
202        let delta = D::init(initial_context);
203
204        let initial_view = View {
205            current: delta.reader(),
206            frozen: vec![],
207            snapshot: initial_snapshot,
208            last_flushed_delta: None,
209        };
210        let initial_view = Arc::new(BroadcastedView::new(initial_view));
211
212        let flush_interval = interval_at(
213            Instant::now() + config.flush_interval,
214            config.flush_interval,
215        );
216        Self {
217            config,
218            delta: CurrentDelta::new(delta),
219            write_rxs,
220            flush_tx,
221            watermarks,
222            view: initial_view,
223            // Epochs start at 1 because watch channels initialize to 0 (meaning "nothing
224            // processed yet"). If the first write had epoch 0, wait() would return
225            // immediately since the condition `watermark < epoch` would be `0 < 0` = false.
226            epoch: 1,
227            delta_start_epoch: 1,
228            flush_interval,
229            stop_tok,
230            flush_stop_tok,
231        }
232    }
233
234    /// Run the coordinator event loop.
235    pub fn run(
236        mut self,
237        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
238    ) -> tokio::task::JoinHandle<Result<(), String>> {
239        tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
240    }
241
242    async fn run_coordinator(
243        mut self,
244        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
245    ) -> Result<(), String> {
246        // Reset the interval to start fresh from when run() is called
247        self.flush_interval.reset();
248
249        // Merge all write receivers into a single stream
250        let mut write_stream: SelectAll<_> = SelectAll::new();
251        for rx in self.write_rxs.drain(..) {
252            write_stream.push(
253                stream::unfold(
254                    rx,
255                    |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
256                )
257                .boxed(),
258            );
259        }
260
261        loop {
262            tokio::select! {
263                cmd = write_stream.next() => {
264                    match cmd {
265                        Some(WriteCommand::Write { write, result_tx }) => {
266                            self.handle_write(write, result_tx).await?;
267                        }
268                        Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
269                            // Send back the epoch of the last processed write
270                            let _ = epoch_tx.send(Ok(handle::WriteApplied {
271                                epoch: self.epoch.saturating_sub(1),
272                                result: (),
273                            }));
274                            self.handle_flush(flush_storage).await;
275                        }
276                        None => {
277                            // All write channels closed
278                            break;
279                        }
280                    }
281                }
282
283                _ = self.flush_interval.tick() => {
284                    self.handle_flush(false).await;
285                }
286
287                _ = self.stop_tok.cancelled() => {
288                    break;
289                }
290            }
291        }
292
293        // Flush any remaining pending writes before shutdown
294        self.handle_flush(false).await;
295
296        // Signal the flush task to stop
297        self.flush_stop_tok.cancel();
298        // Wait for the flush task to complete and propagate any errors
299        flush_task_jh
300            .await
301            .map_err(|e| format!("flush task panicked: {}", e))?
302            .map_err(|e| format!("flush task error: {}", e))
303    }
304
305    async fn handle_write(
306        &mut self,
307        write: D::Write,
308        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
309    ) -> Result<(), String> {
310        let write_epoch = self.epoch;
311        self.epoch += 1;
312
313        let result = self.delta.apply(write);
314        // Ignore error if receiver was dropped (fire-and-forget write)
315        let _ = result_tx.send(
316            result
317                .map(|apply_result| handle::WriteApplied {
318                    epoch: write_epoch,
319                    result: apply_result,
320                })
321                .map_err(|e| handle::WriteFailed {
322                    epoch: write_epoch,
323                    error: e,
324                }),
325        );
326
327        // Ignore error if no watchers are listening - this is non-fatal
328        self.watermarks.update_applied(write_epoch);
329
330        if self.delta.estimate_size() >= self.config.flush_size_threshold {
331            self.handle_flush(false).await;
332        }
333
334        Ok(())
335    }
336
337    async fn handle_flush(&mut self, flush_storage: bool) {
338        self.flush_if_delta_has_writes().await;
339        if flush_storage {
340            let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
341        }
342    }
343
344    async fn flush_if_delta_has_writes(&mut self) {
345        if self.epoch == self.delta_start_epoch {
346            return;
347        }
348
349        self.flush_interval.reset();
350
351        let epoch_range = self.delta_start_epoch..self.epoch;
352        self.delta_start_epoch = self.epoch;
353        let (frozen, frozen_reader) = self.delta.freeze_and_init();
354        let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
355        let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
356        let reader = self.delta.reader();
357        // update the view before sending the flush msg to ensure the flusher sees
358        // the frozen reader when updating the view post-flush
359        self.view.update_delta_frozen(stamped_frozen_reader, reader);
360        // this is the blocking section of the flush, new writes will not be accepted
361        // until the event is sent to the FlushTask
362        let _ = self
363            .flush_tx
364            .send(FlushEvent::FlushDelta {
365                frozen: stamped_frozen,
366            })
367            .await;
368    }
369}
370
371struct FlushTask<D: Delta, F: Flusher<D>> {
372    flusher: F,
373    stop_tok: CancellationToken,
374    flush_rx: mpsc::Receiver<FlushEvent<D>>,
375    watermarks: Arc<EpochWatermarks>,
376    view: Arc<BroadcastedView<D>>,
377    last_flushed_epoch: u64,
378}
379
380impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
381    fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
382        tokio::spawn(async move {
383            loop {
384                tokio::select! {
385                    event = self.flush_rx.recv() => {
386                        let Some(event) = event else {
387                            break;
388                        };
389                        self.handle_event(event).await?;
390                    }
391                    _ = self.stop_tok.cancelled() => {
392                        break;
393                    }
394                }
395            }
396            // drain all remaining flush events
397            while let Ok(event) = self.flush_rx.try_recv() {
398                self.handle_event(event).await;
399            }
400            Ok(())
401        })
402    }
403
404    async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
405        match event {
406            FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
407            FlushEvent::FlushStorage => {
408                self.flusher
409                    .flush_storage()
410                    .await
411                    .map_err(|e| WriteError::FlushError(e.to_string()))?;
412                self.watermarks.update_durable(self.last_flushed_epoch);
413                Ok(())
414            }
415        }
416    }
417
418    async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
419        let delta = frozen.val;
420        let epoch_range = frozen.epoch_range;
421        let snapshot = self
422            .flusher
423            .flush_delta(delta, &epoch_range)
424            .await
425            .map_err(|e| WriteError::FlushError(e.to_string()))?;
426        self.last_flushed_epoch = epoch_range.end - 1;
427        self.watermarks.update_flushed(self.last_flushed_epoch);
428        self.view.update_flush_finished(snapshot, epoch_range);
429        Ok(())
430    }
431}
432
433struct CurrentDelta<D: Delta> {
434    delta: Option<D>,
435}
436
437impl<D: Delta> Deref for CurrentDelta<D> {
438    type Target = D;
439
440    fn deref(&self) -> &Self::Target {
441        match &self.delta {
442            Some(d) => d,
443            None => panic!("current delta not initialized"),
444        }
445    }
446}
447
448impl<D: Delta> DerefMut for CurrentDelta<D> {
449    fn deref_mut(&mut self) -> &mut Self::Target {
450        match &mut self.delta {
451            Some(d) => d,
452            None => panic!("current delta not initialized"),
453        }
454    }
455}
456
457impl<D: Delta> CurrentDelta<D> {
458    fn new(delta: D) -> Self {
459        Self { delta: Some(delta) }
460    }
461
462    fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
463        let Some(delta) = self.delta.take() else {
464            panic!("delta not initialized");
465        };
466        let (frozen, frozen_reader, context) = delta.freeze();
467        let new_delta = D::init(context);
468        self.delta = Some(new_delta);
469        (frozen, frozen_reader)
470    }
471}
472
473struct EpochWatermarks {
474    applied_tx: tokio::sync::watch::Sender<u64>,
475    flushed_tx: tokio::sync::watch::Sender<u64>,
476    durable_tx: tokio::sync::watch::Sender<u64>,
477}
478
479impl EpochWatermarks {
480    fn new() -> (Self, EpochWatcher) {
481        let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
482        let (flushed_tx, flushed_rx) = tokio::sync::watch::channel(0);
483        let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
484        let watcher = EpochWatcher {
485            applied_rx,
486            flushed_rx,
487            durable_rx,
488        };
489        let watermarks = EpochWatermarks {
490            applied_tx,
491            flushed_tx,
492            durable_tx,
493        };
494        (watermarks, watcher)
495    }
496
497    fn update_applied(&self, epoch: u64) {
498        let _ = self.applied_tx.send(epoch);
499    }
500
501    fn update_flushed(&self, epoch: u64) {
502        let _ = self.flushed_tx.send(epoch);
503    }
504
505    fn update_durable(&self, epoch: u64) {
506        let _ = self.durable_tx.send(epoch);
507    }
508}
509
510struct BroadcastedView<D: Delta> {
511    inner: Mutex<BroadcastedViewInner<D>>,
512}
513
514impl<D: Delta> BroadcastedView<D> {
515    fn new(initial_view: View<D>) -> Self {
516        let (view_tx, _) = broadcast::channel(16);
517        Self {
518            inner: Mutex::new(BroadcastedViewInner {
519                view: Arc::new(initial_view),
520                view_tx,
521            }),
522        }
523    }
524
525    fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
526        self.inner
527            .lock()
528            .expect("lock poisoned")
529            .update_flush_finished(snapshot, epoch_range);
530    }
531
532    fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
533        self.inner
534            .lock()
535            .expect("lock poisoned")
536            .update_delta_frozen(frozen, reader);
537    }
538
539    fn current(&self) -> Arc<View<D>> {
540        self.inner.lock().expect("lock poisoned").current()
541    }
542
543    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
544        self.inner.lock().expect("lock poisoned").subscribe()
545    }
546}
547
548struct BroadcastedViewInner<D: Delta> {
549    view: Arc<View<D>>,
550    view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
551}
552
553impl<D: Delta> BroadcastedViewInner<D> {
554    fn update_flush_finished(
555        &mut self,
556        snapshot: Arc<dyn StorageSnapshot>,
557        epoch_range: Range<u64>,
558    ) {
559        let mut new_frozen = self.view.frozen.clone();
560        let last = new_frozen
561            .pop()
562            .expect("frozen should not be empty when flush completes");
563        assert_eq!(last.epoch_range, epoch_range);
564        self.view = Arc::new(View {
565            current: self.view.current.clone(),
566            frozen: new_frozen,
567            snapshot,
568            last_flushed_delta: Some(last),
569        });
570        self.view_tx.send(self.view.clone());
571    }
572
573    fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
574        // Update read state: add frozen delta to front, update current reader
575        let mut new_frozen = vec![frozen];
576        new_frozen.extend(self.view.frozen.iter().cloned());
577        self.view = Arc::new(View {
578            current: reader,
579            frozen: new_frozen,
580            snapshot: self.view.snapshot.clone(),
581            last_flushed_delta: self.view.last_flushed_delta.clone(),
582        });
583        self.view_tx.send(self.view.clone());
584    }
585
586    fn current(&self) -> Arc<View<D>> {
587        self.view.clone()
588    }
589
590    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
591        (self.view_tx.subscribe(), self.view.clone())
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use crate::BytesRange;
599    use crate::coordinator::Durability;
600    use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
601    use crate::storage::{Record, StorageSnapshot};
602    use crate::{Storage, StorageRead};
603    use async_trait::async_trait;
604    use bytes::Bytes;
605    use std::collections::HashMap;
606    use std::ops::Range;
607    use std::sync::Mutex;
608    // ============================================================================
609    // Test Infrastructure
610    // ============================================================================
611
612    #[derive(Clone, Debug)]
613    struct TestWrite {
614        key: String,
615        value: u64,
616        size: usize,
617    }
618
619    /// Context carries state that must persist across deltas (like sequence allocation)
620    #[derive(Clone, Debug, Default)]
621    struct TestContext {
622        next_seq: u64,
623        error: Option<String>,
624    }
625
626    /// A shared reader that sees writes as they are applied to the delta.
627    #[derive(Clone, Debug, Default)]
628    struct TestDeltaReader {
629        data: Arc<Mutex<HashMap<String, u64>>>,
630    }
631
632    impl TestDeltaReader {
633        fn get(&self, key: &str) -> Option<u64> {
634            self.data.lock().unwrap().get(key).copied()
635        }
636    }
637
638    /// Delta accumulates writes with sequence numbers.
639    /// Stores the context directly and updates it in place.
640    #[derive(Debug)]
641    struct TestDelta {
642        context: TestContext,
643        writes: HashMap<String, (u64, u64)>,
644        key_values: Arc<Mutex<HashMap<String, u64>>>,
645        total_size: usize,
646    }
647
648    #[derive(Clone, Debug)]
649    struct FrozenTestDelta {
650        writes: HashMap<String, (u64, u64)>,
651    }
652
653    impl Delta for TestDelta {
654        type Context = TestContext;
655        type Write = TestWrite;
656        type DeltaView = TestDeltaReader;
657        type Frozen = FrozenTestDelta;
658        type FrozenView = Arc<HashMap<String, u64>>;
659        type ApplyResult = ();
660
661        fn init(context: Self::Context) -> Self {
662            Self {
663                context,
664                writes: HashMap::default(),
665                key_values: Arc::new(Mutex::new(HashMap::default())),
666                total_size: 0,
667            }
668        }
669
670        fn apply(&mut self, write: Self::Write) -> Result<(), String> {
671            if let Some(error) = &self.context.error {
672                return Err(error.clone());
673            }
674
675            let seq = self.context.next_seq;
676            self.context.next_seq += 1;
677
678            self.writes.insert(write.key.clone(), (seq, write.value));
679            self.total_size += write.size;
680            self.key_values
681                .lock()
682                .unwrap()
683                .insert(write.key, write.value);
684            Ok(())
685        }
686
687        fn estimate_size(&self) -> usize {
688            self.total_size
689        }
690
691        fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
692            let frozen = FrozenTestDelta {
693                writes: self.writes,
694            };
695            let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
696            (frozen, frozen_view, self.context)
697        }
698
699        fn reader(&self) -> Self::DeltaView {
700            TestDeltaReader {
701                data: self.key_values.clone(),
702            }
703        }
704    }
705
706    /// Shared state for TestFlusher - allows test to inspect and control behavior
707    #[derive(Default)]
708    struct TestFlusherState {
709        flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
710        /// Signals when a flush starts (before blocking)
711        flush_started_tx: Option<oneshot::Sender<()>>,
712        /// Blocks flush until signaled
713        unblock_rx: Option<mpsc::Receiver<()>>,
714    }
715
716    #[derive(Clone)]
717    struct TestFlusher {
718        state: Arc<Mutex<TestFlusherState>>,
719        storage: Arc<InMemoryStorage>,
720    }
721
722    impl Default for TestFlusher {
723        fn default() -> Self {
724            Self {
725                state: Arc::new(Mutex::new(TestFlusherState::default())),
726                storage: Arc::new(InMemoryStorage::new()),
727            }
728        }
729    }
730
731    impl TestFlusher {
732        /// Create a flusher that blocks until signaled, with a notification when flush starts.
733        /// Returns (flusher, flush_started_rx, unblock_tx).
734        fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
735            let (started_tx, started_rx) = oneshot::channel();
736            let (unblock_tx, unblock_rx) = mpsc::channel(1);
737            let flusher = Self {
738                state: Arc::new(Mutex::new(TestFlusherState {
739                    flushed_events: Vec::new(),
740                    flush_started_tx: Some(started_tx),
741                    unblock_rx: Some(unblock_rx),
742                })),
743                storage: Arc::new(InMemoryStorage::new()),
744            };
745            (flusher, started_rx, unblock_tx)
746        }
747
748        fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
749            self.state.lock().unwrap().flushed_events.clone()
750        }
751
752        async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
753            self.storage.snapshot().await.unwrap()
754        }
755    }
756
757    #[async_trait]
758    impl Flusher<TestDelta> for TestFlusher {
759        async fn flush_delta(
760            &self,
761            frozen: FrozenTestDelta,
762            epoch_range: &Range<u64>,
763        ) -> Result<Arc<dyn StorageSnapshot>, String> {
764            // Signal that flush has started
765            let flush_started_tx = {
766                let mut state = self.state.lock().unwrap();
767                state.flush_started_tx.take()
768            };
769            if let Some(tx) = flush_started_tx {
770                let _ = tx.send(());
771            }
772
773            // Block if test wants to control timing
774            let unblock_rx = {
775                let mut state = self.state.lock().unwrap();
776                state.unblock_rx.take()
777            };
778            if let Some(mut rx) = unblock_rx {
779                rx.recv().await;
780            }
781
782            // Write records to storage
783            let records: Vec<Record> = frozen
784                .writes
785                .iter()
786                .map(|(key, (seq, value))| {
787                    let mut buf = Vec::with_capacity(16);
788                    buf.extend_from_slice(&seq.to_le_bytes());
789                    buf.extend_from_slice(&value.to_le_bytes());
790                    Record::new(Bytes::from(key.clone()), Bytes::from(buf))
791                })
792                .collect();
793            self.storage
794                .put(records)
795                .await
796                .map_err(|e| format!("{}", e))?;
797
798            // Record the flush
799            {
800                let mut state = self.state.lock().unwrap();
801                state
802                    .flushed_events
803                    .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
804            }
805
806            self.storage.snapshot().await.map_err(|e| format!("{}", e))
807        }
808
809        async fn flush_storage(&self) -> Result<(), String> {
810            // Signal that flush has started
811            let flush_started_tx = {
812                let mut state = self.state.lock().unwrap();
813                state.flush_started_tx.take()
814            };
815            if let Some(tx) = flush_started_tx {
816                let _ = tx.send(());
817            }
818
819            // Block if test wants to control timing
820            let unblock_rx = {
821                let mut state = self.state.lock().unwrap();
822                state.unblock_rx.take()
823            };
824            if let Some(mut rx) = unblock_rx {
825                rx.recv().await;
826            }
827
828            Ok(())
829        }
830    }
831
832    fn test_config() -> WriteCoordinatorConfig {
833        WriteCoordinatorConfig {
834            queue_capacity: 100,
835            flush_interval: Duration::from_secs(3600), // Long interval to avoid timer flushes
836            flush_size_threshold: usize::MAX,
837        }
838    }
839
840    async fn assert_snapshot_has_rows(
841        snapshot: &Arc<dyn StorageSnapshot>,
842        expected: &[(&str, u64, u64)],
843    ) {
844        let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
845        assert_eq!(
846            records.len(),
847            expected.len(),
848            "expected {} rows but snapshot has {}",
849            expected.len(),
850            records.len()
851        );
852        let mut actual: Vec<(String, u64, u64)> = records
853            .iter()
854            .map(|r| {
855                let key = String::from_utf8(r.key.to_vec()).unwrap();
856                let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
857                let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
858                (key, seq, value)
859            })
860            .collect();
861        actual.sort_by(|a, b| a.0.cmp(&b.0));
862        let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
863        expected.sort_by(|a, b| a.0.cmp(b.0));
864        for (actual, expected) in actual.iter().zip(expected.iter()) {
865            assert_eq!(
866                actual.0, expected.0,
867                "key mismatch: got {:?}, expected {:?}",
868                actual.0, expected.0
869            );
870            assert_eq!(
871                actual.1, expected.1,
872                "seq mismatch for key {:?}: got {}, expected {}",
873                actual.0, actual.1, expected.1
874            );
875            assert_eq!(
876                actual.2, expected.2,
877                "value mismatch for key {:?}: got {}, expected {}",
878                actual.0, actual.2, expected.2
879            );
880        }
881    }
882
883    // ============================================================================
884    // Basic Write Flow Tests
885    // ============================================================================
886
887    #[tokio::test]
888    async fn should_assign_monotonic_epochs() {
889        // given
890        let flusher = TestFlusher::default();
891        let mut coordinator = WriteCoordinator::new(
892            test_config(),
893            vec!["default".to_string()],
894            TestContext::default(),
895            flusher.initial_snapshot().await,
896            flusher,
897        );
898        let handle = coordinator.handle("default");
899        coordinator.start();
900
901        // when
902        let write1 = handle
903            .write(TestWrite {
904                key: "a".into(),
905                value: 1,
906                size: 10,
907            })
908            .await
909            .unwrap();
910        let write2 = handle
911            .write(TestWrite {
912                key: "b".into(),
913                value: 2,
914                size: 10,
915            })
916            .await
917            .unwrap();
918        let write3 = handle
919            .write(TestWrite {
920                key: "c".into(),
921                value: 3,
922                size: 10,
923            })
924            .await
925            .unwrap();
926
927        let epoch1 = write1.epoch().await.unwrap();
928        let epoch2 = write2.epoch().await.unwrap();
929        let epoch3 = write3.epoch().await.unwrap();
930
931        // then
932        assert!(epoch1 < epoch2);
933        assert!(epoch2 < epoch3);
934
935        // cleanup
936        coordinator.stop().await;
937    }
938
939    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
940    async fn should_apply_writes_in_order() {
941        // given
942        let flusher = TestFlusher::default();
943        let mut coordinator = WriteCoordinator::new(
944            test_config(),
945            vec!["default".to_string()],
946            TestContext::default(),
947            flusher.initial_snapshot().await,
948            flusher.clone(),
949        );
950        let handle = coordinator.handle("default");
951        coordinator.start();
952
953        // when
954        handle
955            .write(TestWrite {
956                key: "a".into(),
957                value: 1,
958                size: 10,
959            })
960            .await
961            .unwrap();
962        handle
963            .write(TestWrite {
964                key: "a".into(),
965                value: 2,
966                size: 10,
967            })
968            .await
969            .unwrap();
970        let mut last_write = handle
971            .write(TestWrite {
972                key: "a".into(),
973                value: 3,
974                size: 10,
975            })
976            .await
977            .unwrap();
978
979        handle.flush(false).await.unwrap();
980        // Wait for flush to complete via watermark
981        last_write.wait(Durability::Flushed).await.unwrap();
982
983        // then
984        let events = flusher.flushed_events();
985        assert_eq!(events.len(), 1);
986        let frozen_delta = &events[0];
987        let delta = &frozen_delta.val;
988        // Writing key "a" 3x overwrites; last write wins with seq=2 (0-indexed)
989        let (seq, value) = delta.writes.get("a").unwrap();
990        assert_eq!(*value, 3);
991        assert_eq!(*seq, 2);
992
993        // cleanup
994        coordinator.stop().await;
995    }
996
997    #[tokio::test]
998    async fn should_update_applied_watermark_after_each_write() {
999        // given
1000        let flusher = TestFlusher::default();
1001        let mut coordinator = WriteCoordinator::new(
1002            test_config(),
1003            vec!["default".to_string()],
1004            TestContext::default(),
1005            flusher.initial_snapshot().await,
1006            flusher,
1007        );
1008        let handle = coordinator.handle("default");
1009        coordinator.start();
1010
1011        // when
1012        let mut write_handle = handle
1013            .write(TestWrite {
1014                key: "a".into(),
1015                value: 1,
1016                size: 10,
1017            })
1018            .await
1019            .unwrap();
1020
1021        // then - wait should succeed immediately after write is applied
1022        let result = write_handle.wait(Durability::Applied).await;
1023        assert!(result.is_ok());
1024
1025        // cleanup
1026        coordinator.stop().await;
1027    }
1028
1029    #[tokio::test]
1030    async fn should_propagate_apply_error_to_handle() {
1031        // given
1032        let flusher = TestFlusher::default();
1033        let context = TestContext {
1034            error: Some("apply error".to_string()),
1035            ..Default::default()
1036        };
1037        let mut coordinator = WriteCoordinator::new(
1038            test_config(),
1039            vec!["default".to_string()],
1040            context,
1041            flusher.initial_snapshot().await,
1042            flusher,
1043        );
1044        let handle = coordinator.handle("default");
1045        coordinator.start();
1046
1047        // when
1048        let write = handle
1049            .write(TestWrite {
1050                key: "a".into(),
1051                value: 1,
1052                size: 10,
1053            })
1054            .await
1055            .unwrap();
1056
1057        let result = write.epoch().await;
1058
1059        // then
1060        assert!(
1061            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1062        );
1063
1064        // cleanup
1065        coordinator.stop().await;
1066    }
1067
1068    // ============================================================================
1069    // Manual Flush Tests
1070    // ============================================================================
1071
1072    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1073    async fn should_flush_on_command() {
1074        // given
1075        let flusher = TestFlusher::default();
1076        let mut coordinator = WriteCoordinator::new(
1077            test_config(),
1078            vec!["default".to_string()],
1079            TestContext::default(),
1080            flusher.initial_snapshot().await,
1081            flusher.clone(),
1082        );
1083        let handle = coordinator.handle("default");
1084        coordinator.start();
1085
1086        // when
1087        let mut write = handle
1088            .write(TestWrite {
1089                key: "a".into(),
1090                value: 1,
1091                size: 10,
1092            })
1093            .await
1094            .unwrap();
1095        handle.flush(false).await.unwrap();
1096        write.wait(Durability::Flushed).await.unwrap();
1097
1098        // then
1099        assert_eq!(flusher.flushed_events().len(), 1);
1100
1101        // cleanup
1102        coordinator.stop().await;
1103    }
1104
1105    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1106    async fn should_wait_on_flush_handle() {
1107        // given
1108        let flusher = TestFlusher::default();
1109        let mut coordinator = WriteCoordinator::new(
1110            test_config(),
1111            vec!["default".to_string()],
1112            TestContext::default(),
1113            flusher.initial_snapshot().await,
1114            flusher.clone(),
1115        );
1116        let handle = coordinator.handle("default");
1117        coordinator.start();
1118
1119        // when
1120        handle
1121            .write(TestWrite {
1122                key: "a".into(),
1123                value: 1,
1124                size: 10,
1125            })
1126            .await
1127            .unwrap();
1128        let mut flush_handle = handle.flush(false).await.unwrap();
1129
1130        // then - can wait directly on the flush handle
1131        flush_handle.wait(Durability::Flushed).await.unwrap();
1132        assert_eq!(flusher.flushed_events().len(), 1);
1133
1134        // cleanup
1135        coordinator.stop().await;
1136    }
1137
1138    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1139    async fn should_return_correct_epoch_from_flush_handle() {
1140        // given
1141        let flusher = TestFlusher::default();
1142        let mut coordinator = WriteCoordinator::new(
1143            test_config(),
1144            vec!["default".to_string()],
1145            TestContext::default(),
1146            flusher.initial_snapshot().await,
1147            flusher,
1148        );
1149        let handle = coordinator.handle("default");
1150        coordinator.start();
1151
1152        // when
1153        let write1 = handle
1154            .write(TestWrite {
1155                key: "a".into(),
1156                value: 1,
1157                size: 10,
1158            })
1159            .await
1160            .unwrap();
1161        let write2 = handle
1162            .write(TestWrite {
1163                key: "b".into(),
1164                value: 2,
1165                size: 10,
1166            })
1167            .await
1168            .unwrap();
1169        let flush_handle = handle.flush(false).await.unwrap();
1170
1171        // then - flush handle epoch should be the last write's epoch
1172        let flush_epoch = flush_handle.epoch().await.unwrap();
1173        let write2_epoch = write2.epoch().await.unwrap();
1174        assert_eq!(flush_epoch, write2_epoch);
1175
1176        // cleanup
1177        coordinator.stop().await;
1178    }
1179
1180    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1181    async fn should_include_all_pending_writes_in_flush() {
1182        // given
1183        let flusher = TestFlusher::default();
1184        let mut coordinator = WriteCoordinator::new(
1185            test_config(),
1186            vec!["default".to_string()],
1187            TestContext::default(),
1188            flusher.initial_snapshot().await,
1189            flusher.clone(),
1190        );
1191        let handle = coordinator.handle("default");
1192        coordinator.start();
1193
1194        // when
1195        handle
1196            .write(TestWrite {
1197                key: "a".into(),
1198                value: 1,
1199                size: 10,
1200            })
1201            .await
1202            .unwrap();
1203        handle
1204            .write(TestWrite {
1205                key: "b".into(),
1206                value: 2,
1207                size: 10,
1208            })
1209            .await
1210            .unwrap();
1211        let mut last_write = handle
1212            .write(TestWrite {
1213                key: "c".into(),
1214                value: 3,
1215                size: 10,
1216            })
1217            .await
1218            .unwrap();
1219
1220        handle.flush(false).await.unwrap();
1221        last_write.wait(Durability::Flushed).await.unwrap();
1222
1223        // then
1224        let events = flusher.flushed_events();
1225        assert_eq!(events.len(), 1);
1226        let frozen_delta = &events[0];
1227        assert_eq!(frozen_delta.val.writes.len(), 3);
1228        let snapshot = flusher.storage.snapshot().await.unwrap();
1229        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1230
1231        // cleanup
1232        coordinator.stop().await;
1233    }
1234
1235    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1236    async fn should_skip_flush_when_no_new_writes() {
1237        // given
1238        let flusher = TestFlusher::default();
1239        let mut coordinator = WriteCoordinator::new(
1240            test_config(),
1241            vec!["default".to_string()],
1242            TestContext::default(),
1243            flusher.initial_snapshot().await,
1244            flusher.clone(),
1245        );
1246        let handle = coordinator.handle("default");
1247        coordinator.start();
1248
1249        // when
1250        let mut write = handle
1251            .write(TestWrite {
1252                key: "a".into(),
1253                value: 1,
1254                size: 10,
1255            })
1256            .await
1257            .unwrap();
1258        handle.flush(false).await.unwrap();
1259        write.wait(Durability::Flushed).await.unwrap();
1260
1261        // Second flush with no new writes
1262        handle.flush(false).await.unwrap();
1263
1264        // Synchronization: write and wait for applied to ensure the flush command
1265        // has been processed (commands are processed in order)
1266        let sync_write = handle
1267            .write(TestWrite {
1268                key: "sync".into(),
1269                value: 0,
1270                size: 1,
1271            })
1272            .await
1273            .unwrap();
1274        sync_write.epoch().await.unwrap();
1275
1276        // then - only one flush should have occurred (the second flush was a no-op)
1277        assert_eq!(flusher.flushed_events().len(), 1);
1278
1279        // cleanup
1280        coordinator.stop().await;
1281    }
1282
1283    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1284    async fn should_update_flushed_watermark_after_flush() {
1285        // given
1286        let flusher = TestFlusher::default();
1287        let mut coordinator = WriteCoordinator::new(
1288            test_config(),
1289            vec!["default".to_string()],
1290            TestContext::default(),
1291            flusher.initial_snapshot().await,
1292            flusher,
1293        );
1294        let handle = coordinator.handle("default");
1295        coordinator.start();
1296
1297        // when
1298        let mut write_handle = handle
1299            .write(TestWrite {
1300                key: "a".into(),
1301                value: 1,
1302                size: 10,
1303            })
1304            .await
1305            .unwrap();
1306
1307        handle.flush(false).await.unwrap();
1308
1309        // then - wait for Flushed should succeed after flush completes
1310        let result = write_handle.wait(Durability::Flushed).await;
1311        assert!(result.is_ok());
1312
1313        // cleanup
1314        coordinator.stop().await;
1315    }
1316
1317    // ============================================================================
1318    // Timer-Based Flush Tests
1319    // ============================================================================
1320
1321    #[tokio::test(start_paused = true)]
1322    async fn should_flush_on_flush_interval() {
1323        // given - create coordinator with short flush interval
1324        let flusher = TestFlusher::default();
1325        let config = WriteCoordinatorConfig {
1326            queue_capacity: 100,
1327            flush_interval: Duration::from_millis(100),
1328            flush_size_threshold: usize::MAX,
1329        };
1330        let mut coordinator = WriteCoordinator::new(
1331            config,
1332            vec!["default".to_string()],
1333            TestContext::default(),
1334            flusher.initial_snapshot().await,
1335            flusher.clone(),
1336        );
1337        let handle = coordinator.handle("default");
1338        coordinator.start();
1339
1340        // when - ensure coordinator task runs and then write something
1341        tokio::task::yield_now().await;
1342        let mut write = handle
1343            .write(TestWrite {
1344                key: "a".into(),
1345                value: 1,
1346                size: 10,
1347            })
1348            .await
1349            .unwrap();
1350        write.wait(Durability::Applied).await.unwrap();
1351
1352        // then - no flush should have happened yet (interval was reset in run())
1353        assert_eq!(flusher.flushed_events().len(), 0);
1354
1355        // when - advance time past the flush interval from when run() was called
1356        tokio::time::advance(Duration::from_millis(150)).await;
1357        tokio::task::yield_now().await;
1358
1359        // then - flush should have happened
1360        assert_eq!(flusher.flushed_events().len(), 1);
1361        let snapshot = flusher.storage.snapshot().await.unwrap();
1362        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1363
1364        // cleanup
1365        coordinator.stop().await;
1366    }
1367
1368    // ============================================================================
1369    // Size-Threshold Flush Tests
1370    // ============================================================================
1371
1372    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1373    async fn should_flush_when_size_threshold_exceeded() {
1374        // given
1375        let flusher = TestFlusher::default();
1376        let config = WriteCoordinatorConfig {
1377            queue_capacity: 100,
1378            flush_interval: Duration::from_secs(3600),
1379            flush_size_threshold: 100, // Low threshold for testing
1380        };
1381        let mut coordinator = WriteCoordinator::new(
1382            config,
1383            vec!["default".to_string()],
1384            TestContext::default(),
1385            flusher.initial_snapshot().await,
1386            flusher.clone(),
1387        );
1388        let handle = coordinator.handle("default");
1389        coordinator.start();
1390
1391        // when - write that exceeds threshold
1392        let mut write = handle
1393            .write(TestWrite {
1394                key: "a".into(),
1395                value: 1,
1396                size: 150,
1397            })
1398            .await
1399            .unwrap();
1400        write.wait(Durability::Flushed).await.unwrap();
1401
1402        // then
1403        assert_eq!(flusher.flushed_events().len(), 1);
1404
1405        // cleanup
1406        coordinator.stop().await;
1407    }
1408
1409    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1410    async fn should_accumulate_until_threshold() {
1411        // given
1412        let flusher = TestFlusher::default();
1413        let config = WriteCoordinatorConfig {
1414            queue_capacity: 100,
1415            flush_interval: Duration::from_secs(3600),
1416            flush_size_threshold: 100,
1417        };
1418        let mut coordinator = WriteCoordinator::new(
1419            config,
1420            vec!["default".to_string()],
1421            TestContext::default(),
1422            flusher.initial_snapshot().await,
1423            flusher.clone(),
1424        );
1425        let handle = coordinator.handle("default");
1426        coordinator.start();
1427
1428        // when - small writes that accumulate
1429        for i in 0..5 {
1430            let mut w = handle
1431                .write(TestWrite {
1432                    key: format!("key{}", i),
1433                    value: i,
1434                    size: 15,
1435                })
1436                .await
1437                .unwrap();
1438            w.wait(Durability::Applied).await.unwrap();
1439        }
1440
1441        // then - no flush yet (75 bytes < 100 threshold)
1442        assert_eq!(flusher.flushed_events().len(), 0);
1443
1444        // when - write that pushes over threshold
1445        let mut final_write = handle
1446            .write(TestWrite {
1447                key: "final".into(),
1448                value: 999,
1449                size: 30,
1450            })
1451            .await
1452            .unwrap();
1453        final_write.wait(Durability::Flushed).await.unwrap();
1454
1455        // then - should have flushed (105 bytes > 100 threshold)
1456        assert_eq!(flusher.flushed_events().len(), 1);
1457
1458        // cleanup
1459        coordinator.stop().await;
1460    }
1461
1462    // ============================================================================
1463    // Non-Blocking Flush (Concurrency) Tests
1464    // ============================================================================
1465
1466    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1467    async fn should_accept_writes_during_flush() {
1468        // given
1469        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1470        let mut coordinator = WriteCoordinator::new(
1471            test_config(),
1472            vec!["default".to_string()],
1473            TestContext::default(),
1474            flusher.initial_snapshot().await,
1475            flusher.clone(),
1476        );
1477        let handle = coordinator.handle("default");
1478        coordinator.start();
1479
1480        // when: trigger a flush and wait for it to start (proving it's in progress)
1481        let write1 = handle
1482            .write(TestWrite {
1483                key: "a".into(),
1484                value: 1,
1485                size: 10,
1486            })
1487            .await
1488            .unwrap();
1489        handle.flush(false).await.unwrap();
1490        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1491
1492        // then: writes during blocked flush still succeed
1493        let write2 = handle
1494            .write(TestWrite {
1495                key: "b".into(),
1496                value: 2,
1497                size: 10,
1498            })
1499            .await
1500            .unwrap();
1501        assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1502
1503        // cleanup
1504        unblock_tx.send(()).await.unwrap();
1505        coordinator.stop().await;
1506    }
1507
1508    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1509    async fn should_assign_new_epochs_during_flush() {
1510        // given
1511        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1512        let mut coordinator = WriteCoordinator::new(
1513            test_config(),
1514            vec!["default".to_string()],
1515            TestContext::default(),
1516            flusher.initial_snapshot().await,
1517            flusher.clone(),
1518        );
1519        let handle = coordinator.handle("default");
1520        coordinator.start();
1521
1522        // when: write, flush, then write more during blocked flush
1523        handle
1524            .write(TestWrite {
1525                key: "a".into(),
1526                value: 1,
1527                size: 10,
1528            })
1529            .await
1530            .unwrap();
1531        handle.flush(false).await.unwrap();
1532        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1533
1534        // Writes during blocked flush get new epochs
1535        let w1 = handle
1536            .write(TestWrite {
1537                key: "b".into(),
1538                value: 2,
1539                size: 10,
1540            })
1541            .await
1542            .unwrap();
1543        let w2 = handle
1544            .write(TestWrite {
1545                key: "c".into(),
1546                value: 3,
1547                size: 10,
1548            })
1549            .await
1550            .unwrap();
1551
1552        // then: epochs continue incrementing
1553        let e1 = w1.epoch().await.unwrap();
1554        let e2 = w2.epoch().await.unwrap();
1555        assert!(e1 < e2);
1556
1557        // cleanup
1558        unblock_tx.send(()).await.unwrap();
1559        coordinator.stop().await;
1560    }
1561
1562    // ============================================================================
1563    // Backpressure Tests
1564    // ============================================================================
1565
1566    #[tokio::test]
1567    async fn should_return_backpressure_when_queue_full() {
1568        // given
1569        let flusher = TestFlusher::default();
1570        let config = WriteCoordinatorConfig {
1571            queue_capacity: 2,
1572            flush_interval: Duration::from_secs(3600),
1573            flush_size_threshold: usize::MAX,
1574        };
1575        let mut coordinator = WriteCoordinator::new(
1576            config,
1577            vec!["default".to_string()],
1578            TestContext::default(),
1579            flusher.initial_snapshot().await,
1580            flusher.clone(),
1581        );
1582        let handle = coordinator.handle("default");
1583        // Don't start coordinator - queue will fill
1584
1585        // when - fill the queue
1586        let _ = handle
1587            .write(TestWrite {
1588                key: "a".into(),
1589                value: 1,
1590                size: 10,
1591            })
1592            .await;
1593        let _ = handle
1594            .write(TestWrite {
1595                key: "b".into(),
1596                value: 2,
1597                size: 10,
1598            })
1599            .await;
1600
1601        // Third write should fail with backpressure
1602        let result = handle
1603            .write(TestWrite {
1604                key: "c".into(),
1605                value: 3,
1606                size: 10,
1607            })
1608            .await;
1609
1610        // then
1611        assert!(matches!(result, Err(WriteError::Backpressure)));
1612    }
1613
1614    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1615    async fn should_accept_writes_after_queue_drains() {
1616        // given
1617        let flusher = TestFlusher::default();
1618        let config = WriteCoordinatorConfig {
1619            queue_capacity: 2,
1620            flush_interval: Duration::from_secs(3600),
1621            flush_size_threshold: usize::MAX,
1622        };
1623        let mut coordinator = WriteCoordinator::new(
1624            config,
1625            vec!["default".to_string()],
1626            TestContext::default(),
1627            flusher.initial_snapshot().await,
1628            flusher.clone(),
1629        );
1630        let handle = coordinator.handle("default");
1631
1632        // Fill queue without processing
1633        let _ = handle
1634            .write(TestWrite {
1635                key: "a".into(),
1636                value: 1,
1637                size: 10,
1638            })
1639            .await;
1640        let mut write_b = handle
1641            .write(TestWrite {
1642                key: "b".into(),
1643                value: 2,
1644                size: 10,
1645            })
1646            .await
1647            .unwrap();
1648
1649        // when - start coordinator to drain queue and wait for it to process writes
1650        coordinator.start();
1651        write_b.wait(Durability::Applied).await.unwrap();
1652
1653        // then - writes should succeed now
1654        let result = handle
1655            .write(TestWrite {
1656                key: "c".into(),
1657                value: 3,
1658                size: 10,
1659            })
1660            .await;
1661        assert!(result.is_ok());
1662
1663        // cleanup
1664        coordinator.stop().await;
1665    }
1666
1667    // ============================================================================
1668    // Shutdown Tests
1669    // ============================================================================
1670
1671    #[tokio::test]
1672    async fn should_shutdown_cleanly_when_stop_called() {
1673        // given
1674        let flusher = TestFlusher::default();
1675        let mut coordinator = WriteCoordinator::new(
1676            test_config(),
1677            vec!["default".to_string()],
1678            TestContext::default(),
1679            flusher.initial_snapshot().await,
1680            flusher.clone(),
1681        );
1682        let handle = coordinator.handle("default");
1683        coordinator.start();
1684
1685        // when
1686        let result = coordinator.stop().await;
1687
1688        // then - coordinator should return Ok
1689        assert!(result.is_ok());
1690    }
1691
1692    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1693    async fn should_flush_pending_writes_on_shutdown() {
1694        // given
1695        let flusher = TestFlusher::default();
1696        let config = WriteCoordinatorConfig {
1697            queue_capacity: 100,
1698            flush_interval: Duration::from_secs(3600), // Long interval - won't trigger
1699            flush_size_threshold: usize::MAX,          // High threshold - won't trigger
1700        };
1701        let mut coordinator = WriteCoordinator::new(
1702            config,
1703            vec!["default".to_string()],
1704            TestContext::default(),
1705            flusher.initial_snapshot().await,
1706            flusher.clone(),
1707        );
1708        let handle = coordinator.handle("default");
1709        coordinator.start();
1710
1711        // when - write without explicit flush, then shutdown
1712        let write = handle
1713            .write(TestWrite {
1714                key: "a".into(),
1715                value: 1,
1716                size: 10,
1717            })
1718            .await
1719            .unwrap();
1720        let epoch = write.epoch().await.unwrap();
1721
1722        // Drop handle to trigger shutdown
1723        coordinator.stop().await;
1724
1725        // then - pending writes should have been flushed
1726        let events = flusher.flushed_events();
1727        assert_eq!(events.len(), 1);
1728        let epoch_range = &events[0].epoch_range;
1729        assert!(epoch_range.contains(&epoch));
1730    }
1731
1732    #[tokio::test]
1733    async fn should_return_shutdown_error_after_coordinator_stops() {
1734        // given
1735        let flusher = TestFlusher::default();
1736        let mut coordinator = WriteCoordinator::new(
1737            test_config(),
1738            vec!["default".to_string()],
1739            TestContext::default(),
1740            flusher.initial_snapshot().await,
1741            flusher.clone(),
1742        );
1743        let handle = coordinator.handle("default");
1744        coordinator.start();
1745
1746        // Stop coordinator
1747        coordinator.stop().await;
1748
1749        // when
1750        let result = handle
1751            .write(TestWrite {
1752                key: "a".into(),
1753                value: 1,
1754                size: 10,
1755            })
1756            .await;
1757
1758        // then
1759        assert!(matches!(result, Err(WriteError::Shutdown)));
1760    }
1761
1762    // ============================================================================
1763    // Epoch Range Tracking Tests
1764    // ============================================================================
1765
1766    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1767    async fn should_track_epoch_range_in_flush_event() {
1768        // given
1769        let flusher = TestFlusher::default();
1770        let mut coordinator = WriteCoordinator::new(
1771            test_config(),
1772            vec!["default".to_string()],
1773            TestContext::default(),
1774            flusher.initial_snapshot().await,
1775            flusher.clone(),
1776        );
1777        let handle = coordinator.handle("default");
1778        coordinator.start();
1779
1780        // when
1781        handle
1782            .write(TestWrite {
1783                key: "a".into(),
1784                value: 1,
1785                size: 10,
1786            })
1787            .await
1788            .unwrap();
1789        handle
1790            .write(TestWrite {
1791                key: "b".into(),
1792                value: 2,
1793                size: 10,
1794            })
1795            .await
1796            .unwrap();
1797        let mut last_write = handle
1798            .write(TestWrite {
1799                key: "c".into(),
1800                value: 3,
1801                size: 10,
1802            })
1803            .await
1804            .unwrap();
1805
1806        handle.flush(false).await.unwrap();
1807        last_write.wait(Durability::Flushed).await.unwrap();
1808
1809        // then
1810        let events = flusher.flushed_events();
1811        assert_eq!(events.len(), 1);
1812        let epoch_range = &events[0].epoch_range;
1813        assert_eq!(epoch_range.start, 1);
1814        assert_eq!(epoch_range.end, 4); // exclusive: one past the last epoch (3)
1815
1816        // cleanup
1817        coordinator.stop().await;
1818    }
1819
1820    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1821    async fn should_have_contiguous_epoch_ranges() {
1822        // given
1823        let flusher = TestFlusher::default();
1824        let mut coordinator = WriteCoordinator::new(
1825            test_config(),
1826            vec!["default".to_string()],
1827            TestContext::default(),
1828            flusher.initial_snapshot().await,
1829            flusher.clone(),
1830        );
1831        let handle = coordinator.handle("default");
1832        coordinator.start();
1833
1834        // when - first batch
1835        handle
1836            .write(TestWrite {
1837                key: "a".into(),
1838                value: 1,
1839                size: 10,
1840            })
1841            .await
1842            .unwrap();
1843        let mut write2 = handle
1844            .write(TestWrite {
1845                key: "b".into(),
1846                value: 2,
1847                size: 10,
1848            })
1849            .await
1850            .unwrap();
1851        handle.flush(false).await.unwrap();
1852        write2.wait(Durability::Flushed).await.unwrap();
1853
1854        // when - second batch
1855        let mut write3 = handle
1856            .write(TestWrite {
1857                key: "c".into(),
1858                value: 3,
1859                size: 10,
1860            })
1861            .await
1862            .unwrap();
1863        handle.flush(false).await.unwrap();
1864        write3.wait(Durability::Flushed).await.unwrap();
1865
1866        // then
1867        let events = flusher.flushed_events();
1868        assert_eq!(events.len(), 2);
1869
1870        let range1 = &events[0].epoch_range;
1871        let range2 = &events[1].epoch_range;
1872
1873        // Ranges should be contiguous (end of first == start of second)
1874        assert_eq!(range1.end, range2.start);
1875        assert_eq!(range1, &(1..3));
1876        assert_eq!(range2, &(3..4));
1877
1878        // cleanup
1879        coordinator.stop().await;
1880    }
1881
1882    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1883    async fn should_include_exact_epochs_in_range() {
1884        // given
1885        let flusher = TestFlusher::default();
1886        let mut coordinator = WriteCoordinator::new(
1887            test_config(),
1888            vec!["default".to_string()],
1889            TestContext::default(),
1890            flusher.initial_snapshot().await,
1891            flusher.clone(),
1892        );
1893        let handle = coordinator.handle("default");
1894        coordinator.start();
1895
1896        // when - write and capture the assigned epochs
1897        let write1 = handle
1898            .write(TestWrite {
1899                key: "a".into(),
1900                value: 1,
1901                size: 10,
1902            })
1903            .await
1904            .unwrap();
1905        let epoch1 = write1.epoch().await.unwrap();
1906
1907        let mut write2 = handle
1908            .write(TestWrite {
1909                key: "b".into(),
1910                value: 2,
1911                size: 10,
1912            })
1913            .await
1914            .unwrap();
1915        let epoch2 = write2.epoch().await.unwrap();
1916
1917        handle.flush(false).await.unwrap();
1918        write2.wait(Durability::Flushed).await.unwrap();
1919
1920        // then - the epoch_range should contain exactly the epochs assigned to writes
1921        let events = flusher.flushed_events();
1922        assert_eq!(events.len(), 1);
1923        let epoch_range = &events[0].epoch_range;
1924
1925        // The range should start at the first write's epoch
1926        assert_eq!(epoch_range.start, epoch1);
1927        // The range end should be one past the last write's epoch (exclusive)
1928        assert_eq!(epoch_range.end, epoch2 + 1);
1929        // Both epochs should be contained in the range
1930        assert!(epoch_range.contains(&epoch1));
1931        assert!(epoch_range.contains(&epoch2));
1932
1933        // cleanup
1934        coordinator.stop().await;
1935    }
1936
1937    // ============================================================================
1938    // State Carryover (ID Allocation) Tests
1939    // ============================================================================
1940
1941    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1942    async fn should_preserve_context_across_flushes() {
1943        // given
1944        let flusher = TestFlusher::default();
1945        let mut coordinator = WriteCoordinator::new(
1946            test_config(),
1947            vec!["default".to_string()],
1948            TestContext::default(),
1949            flusher.initial_snapshot().await,
1950            flusher.clone(),
1951        );
1952        let handle = coordinator.handle("default");
1953        coordinator.start();
1954
1955        // when - write key "a" in first batch (seq 0)
1956        let mut write1 = handle
1957            .write(TestWrite {
1958                key: "a".into(),
1959                value: 1,
1960                size: 10,
1961            })
1962            .await
1963            .unwrap();
1964        handle.flush(false).await.unwrap();
1965        write1.wait(Durability::Flushed).await.unwrap();
1966
1967        // Write to key "a" again in second batch (seq 1)
1968        let mut write2 = handle
1969            .write(TestWrite {
1970                key: "a".into(),
1971                value: 2,
1972                size: 10,
1973            })
1974            .await
1975            .unwrap();
1976        handle.flush(false).await.unwrap();
1977        write2.wait(Durability::Flushed).await.unwrap();
1978
1979        // then
1980        let events = flusher.flushed_events();
1981        assert_eq!(events.len(), 2);
1982
1983        // Batch 1: "a" with seq 0
1984        let (seq1, _) = events[0].val.writes.get("a").unwrap();
1985        assert_eq!(*seq1, 0);
1986
1987        // Batch 2: "a" with seq 1 (sequence continues)
1988        let (seq2, _) = events[1].val.writes.get("a").unwrap();
1989        assert_eq!(*seq2, 1);
1990
1991        // cleanup
1992        coordinator.stop().await;
1993    }
1994
1995    // ============================================================================
1996    // Subscribe Tests
1997    // ============================================================================
1998
1999    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2000    async fn should_receive_view_on_subscribe() {
2001        // given
2002        let flusher = TestFlusher::default();
2003        let mut coordinator = WriteCoordinator::new(
2004            test_config(),
2005            vec!["default".to_string()],
2006            TestContext::default(),
2007            flusher.initial_snapshot().await,
2008            flusher.clone(),
2009        );
2010        let handle = coordinator.handle("default");
2011        let (mut subscriber, _) = coordinator.subscribe();
2012        coordinator.start();
2013
2014        // when
2015        handle
2016            .write(TestWrite {
2017                key: "a".into(),
2018                value: 1,
2019                size: 10,
2020            })
2021            .await
2022            .unwrap();
2023        handle.flush(false).await.unwrap();
2024
2025        // then - first broadcast is on freeze (delta added to frozen)
2026        let result = subscriber.recv().await;
2027        assert!(result.is_ok());
2028
2029        // cleanup
2030        coordinator.stop().await;
2031    }
2032
2033    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2034    async fn should_include_snapshot_in_view_after_flush() {
2035        // given
2036        let flusher = TestFlusher::default();
2037        let mut coordinator = WriteCoordinator::new(
2038            test_config(),
2039            vec!["default".to_string()],
2040            TestContext::default(),
2041            flusher.initial_snapshot().await,
2042            flusher.clone(),
2043        );
2044        let handle = coordinator.handle("default");
2045        let (mut subscriber, _) = coordinator.subscribe();
2046        coordinator.start();
2047
2048        // when
2049        handle
2050            .write(TestWrite {
2051                key: "a".into(),
2052                value: 1,
2053                size: 10,
2054            })
2055            .await
2056            .unwrap();
2057        handle.flush(false).await.unwrap();
2058
2059        // First broadcast: freeze (frozen delta added)
2060        let _ = subscriber.recv().await.unwrap();
2061        // Second broadcast: flush complete (snapshot updated)
2062        let result = subscriber.recv().await.unwrap();
2063
2064        // then - snapshot should contain the flushed data
2065        assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2066
2067        // cleanup
2068        coordinator.stop().await;
2069    }
2070
2071    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2072    async fn should_include_delta_in_view_after_flush() {
2073        // given
2074        let flusher = TestFlusher::default();
2075        let mut coordinator = WriteCoordinator::new(
2076            test_config(),
2077            vec!["default".to_string()],
2078            TestContext::default(),
2079            flusher.initial_snapshot().await,
2080            flusher.clone(),
2081        );
2082        let handle = coordinator.handle("default");
2083        let (mut subscriber, _) = coordinator.subscribe();
2084        coordinator.start();
2085
2086        // when
2087        handle
2088            .write(TestWrite {
2089                key: "a".into(),
2090                value: 42,
2091                size: 10,
2092            })
2093            .await
2094            .unwrap();
2095        handle.flush(false).await.unwrap();
2096
2097        // First broadcast: freeze
2098        let _ = subscriber.recv().await.unwrap();
2099        // Second broadcast: flush complete
2100        let result = subscriber.recv().await.unwrap();
2101
2102        // then - last_flushed_delta should contain the write we made
2103        let flushed = result.last_flushed_delta.as_ref().unwrap();
2104        assert_eq!(flushed.val.get("a"), Some(&42));
2105
2106        // cleanup
2107        coordinator.stop().await;
2108    }
2109
2110    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2111    async fn should_include_epoch_range_in_view_after_flush() {
2112        // given
2113        let flusher = TestFlusher::default();
2114        let mut coordinator = WriteCoordinator::new(
2115            test_config(),
2116            vec!["default".to_string()],
2117            TestContext::default(),
2118            flusher.initial_snapshot().await,
2119            flusher.clone(),
2120        );
2121        let handle = coordinator.handle("default");
2122        let (mut subscriber, _) = coordinator.subscribe();
2123        coordinator.start();
2124
2125        // when
2126        let write1 = handle
2127            .write(TestWrite {
2128                key: "a".into(),
2129                value: 1,
2130                size: 10,
2131            })
2132            .await
2133            .unwrap();
2134        let write2 = handle
2135            .write(TestWrite {
2136                key: "b".into(),
2137                value: 2,
2138                size: 10,
2139            })
2140            .await
2141            .unwrap();
2142        handle.flush(false).await.unwrap();
2143
2144        // First broadcast: freeze
2145        let _ = subscriber.recv().await.unwrap();
2146        // Second broadcast: flush complete
2147        let result = subscriber.recv().await.unwrap();
2148
2149        // then - epoch range from last_flushed_delta should contain the epochs
2150        let flushed = result.last_flushed_delta.as_ref().unwrap();
2151        let epoch1 = write1.epoch().await.unwrap();
2152        let epoch2 = write2.epoch().await.unwrap();
2153        assert!(flushed.epoch_range.contains(&epoch1));
2154        assert!(flushed.epoch_range.contains(&epoch2));
2155        assert_eq!(flushed.epoch_range.start, epoch1);
2156        assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2157
2158        // cleanup
2159        coordinator.stop().await;
2160    }
2161
2162    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2163    async fn should_broadcast_frozen_delta_on_freeze() {
2164        // given
2165        let flusher = TestFlusher::default();
2166        let mut coordinator = WriteCoordinator::new(
2167            test_config(),
2168            vec!["default".to_string()],
2169            TestContext::default(),
2170            flusher.initial_snapshot().await,
2171            flusher.clone(),
2172        );
2173        let handle = coordinator.handle("default");
2174        let (mut subscriber, _) = coordinator.subscribe();
2175        coordinator.start();
2176
2177        // when
2178        handle
2179            .write(TestWrite {
2180                key: "a".into(),
2181                value: 1,
2182                size: 10,
2183            })
2184            .await
2185            .unwrap();
2186        handle.flush(false).await.unwrap();
2187
2188        // then - first broadcast should have the frozen delta in the frozen vec
2189        let state = subscriber.recv().await.unwrap();
2190        assert_eq!(state.frozen.len(), 1);
2191        assert!(state.frozen[0].val.contains_key("a"));
2192
2193        // cleanup
2194        coordinator.stop().await;
2195    }
2196
2197    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2198    async fn should_remove_frozen_delta_after_flush_complete() {
2199        // given
2200        let flusher = TestFlusher::default();
2201        let mut coordinator = WriteCoordinator::new(
2202            test_config(),
2203            vec!["default".to_string()],
2204            TestContext::default(),
2205            flusher.initial_snapshot().await,
2206            flusher.clone(),
2207        );
2208        let handle = coordinator.handle("default");
2209        let (mut subscriber, _) = coordinator.subscribe();
2210        coordinator.start();
2211
2212        // when
2213        handle
2214            .write(TestWrite {
2215                key: "a".into(),
2216                value: 1,
2217                size: 10,
2218            })
2219            .await
2220            .unwrap();
2221        handle.flush(false).await.unwrap();
2222
2223        // First broadcast: freeze (frozen has 1 entry)
2224        let state1 = subscriber.recv().await.unwrap();
2225        assert_eq!(state1.frozen.len(), 1);
2226
2227        // Second broadcast: flush complete (frozen is empty, last_flushed_delta set)
2228        let state2 = subscriber.recv().await.unwrap();
2229        assert_eq!(state2.frozen.len(), 0);
2230        assert!(state2.last_flushed_delta.is_some());
2231
2232        // cleanup
2233        coordinator.stop().await;
2234    }
2235
2236    // ============================================================================
2237    // Durable Flush Tests
2238    // ============================================================================
2239
2240    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2241    async fn should_flush_even_when_no_writes_if_flush_storage() {
2242        // given
2243        let flusher = TestFlusher::default();
2244        let storage = Arc::new(InMemoryStorage::new());
2245        let snapshot = storage.snapshot().await.unwrap();
2246        let mut coordinator = WriteCoordinator::new(
2247            test_config(),
2248            vec!["default".to_string()],
2249            TestContext::default(),
2250            snapshot,
2251            flusher.clone(),
2252        );
2253        let handle = coordinator.handle("default");
2254        coordinator.start();
2255
2256        // when - flush with flush_storage but no pending writes
2257        let mut flush_handle = handle.flush(true).await.unwrap();
2258        flush_handle.wait(Durability::Durable).await.unwrap();
2259
2260        // then - flusher was called (durable event sent) but no delta was recorded
2261        // (TestFlusher only records events with deltas)
2262        assert_eq!(flusher.flushed_events().len(), 0);
2263
2264        // cleanup
2265        coordinator.stop().await;
2266    }
2267
2268    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2269    async fn should_advance_durable_watermark() {
2270        // given
2271        let flusher = TestFlusher::default();
2272        let storage = Arc::new(InMemoryStorage::new());
2273        let snapshot = storage.snapshot().await.unwrap();
2274        let mut coordinator = WriteCoordinator::new(
2275            test_config(),
2276            vec!["default".to_string()],
2277            TestContext::default(),
2278            snapshot,
2279            flusher.clone(),
2280        );
2281        let handle = coordinator.handle("default");
2282        coordinator.start();
2283
2284        // when - write and flush with durable
2285        let mut write = handle
2286            .write(TestWrite {
2287                key: "a".into(),
2288                value: 1,
2289                size: 10,
2290            })
2291            .await
2292            .unwrap();
2293        let mut flush_handle = handle.flush(true).await.unwrap();
2294
2295        // then - can wait for Durable durability level
2296        flush_handle.wait(Durability::Durable).await.unwrap();
2297        write.wait(Durability::Durable).await.unwrap();
2298        assert_eq!(flusher.flushed_events().len(), 1);
2299
2300        // cleanup
2301        coordinator.stop().await;
2302    }
2303
2304    #[tokio::test]
2305    async fn should_see_applied_write_via_view() {
2306        // given
2307        let flusher = TestFlusher::default();
2308        let mut coordinator = WriteCoordinator::new(
2309            test_config(),
2310            vec!["default".to_string()],
2311            TestContext::default(),
2312            flusher.initial_snapshot().await,
2313            flusher,
2314        );
2315        let handle = coordinator.handle("default");
2316        coordinator.start();
2317
2318        // when
2319        let mut write = handle
2320            .write(TestWrite {
2321                key: "a".into(),
2322                value: 42,
2323                size: 10,
2324            })
2325            .await
2326            .unwrap();
2327        write.wait(Durability::Applied).await.unwrap();
2328
2329        // then
2330        let view = coordinator.view();
2331        assert_eq!(view.current.get("a"), Some(42));
2332
2333        // cleanup
2334        coordinator.stop().await;
2335    }
2336
2337    // ============================================================================
2338    // Multi-Channel Tests
2339    // ============================================================================
2340
2341    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2342    async fn should_flush_writes_from_multiple_channels() {
2343        // given
2344        let flusher = TestFlusher::default();
2345        let mut coordinator = WriteCoordinator::new(
2346            test_config(),
2347            vec!["ch1".to_string(), "ch2".to_string()],
2348            TestContext::default(),
2349            flusher.initial_snapshot().await,
2350            flusher.clone(),
2351        );
2352        let ch1 = coordinator.handle("ch1");
2353        let ch2 = coordinator.handle("ch2");
2354        coordinator.start();
2355
2356        // when - write to both channels, waiting for each to be applied
2357        // to ensure deterministic ordering
2358        let mut w1 = ch1
2359            .write(TestWrite {
2360                key: "a".into(),
2361                value: 10,
2362                size: 10,
2363            })
2364            .await
2365            .unwrap();
2366        w1.wait(Durability::Applied).await.unwrap();
2367
2368        let mut w2 = ch2
2369            .write(TestWrite {
2370                key: "b".into(),
2371                value: 20,
2372                size: 10,
2373            })
2374            .await
2375            .unwrap();
2376        w2.wait(Durability::Applied).await.unwrap();
2377
2378        let mut w3 = ch1
2379            .write(TestWrite {
2380                key: "c".into(),
2381                value: 30,
2382                size: 10,
2383            })
2384            .await
2385            .unwrap();
2386        w3.wait(Durability::Applied).await.unwrap();
2387
2388        ch1.flush(false).await.unwrap();
2389        w3.wait(Durability::Flushed).await.unwrap();
2390
2391        // then - snapshot should contain writes from both channels
2392        let snapshot = flusher.storage.snapshot().await.unwrap();
2393        assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2394
2395        // cleanup
2396        coordinator.stop().await;
2397    }
2398}