Skip to main content

common/coordinator/
mod.rs

1#![allow(unused)]
2
3mod error;
4mod handle;
5pub mod subscriber;
6mod traits;
7
8use std::collections::HashMap;
9use std::ops::Range;
10use std::ops::{Deref, DerefMut};
11
12pub use error::{WriteError, WriteResult};
13use futures::stream::{self, SelectAll, StreamExt};
14pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
15pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
16pub use traits::{Delta, Durability, EpochStamped, Flusher};
17
18/// Event sent from the write coordinator task to the flush task.
19enum FlushEvent<D: Delta> {
20    /// Flush a frozen delta to storage.
21    FlushDelta { frozen: EpochStamped<D::Frozen> },
22    /// Ensure storage durability (e.g. call storage.flush()).
23    FlushStorage,
24}
25
26// Internal use only
27use crate::StorageRead;
28use crate::storage::StorageSnapshot;
29use async_trait::async_trait;
30pub use handle::EpochWatcher;
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::{broadcast, mpsc, oneshot, watch};
34use tokio::time::{Instant, Interval, interval_at};
35use tokio_util::sync::CancellationToken;
36
37/// Configuration for the write coordinator.
38#[derive(Debug, Clone)]
39pub struct WriteCoordinatorConfig {
40    /// Maximum number of pending writes in the queue.
41    pub queue_capacity: usize,
42    /// Interval at which to trigger automatic flushes.
43    pub flush_interval: Duration,
44    /// Delta size threshold at which to trigger a flush.
45    pub flush_size_threshold: usize,
46}
47
48impl Default for WriteCoordinatorConfig {
49    fn default() -> Self {
50        Self {
51            queue_capacity: 10_000,
52            flush_interval: Duration::from_secs(10),
53            flush_size_threshold: 64 * 1024 * 1024, // 64 MB
54        }
55    }
56}
57
58pub(crate) enum WriteCommand<D: Delta> {
59    Write {
60        write: D::Write,
61        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
62    },
63    Flush {
64        epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
65        flush_storage: bool,
66    },
67}
68
69/// The write coordinator manages write ordering, batching, and durability.
70///
71/// It accepts writes through `WriteCoordinatorHandle`, applies them to a `Delta`,
72/// and coordinates flushing through a `Flusher`.
73pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
74    handles: HashMap<String, WriteCoordinatorHandle<D>>,
75    pause_handles: HashMap<String, PauseHandle>,
76    stop_tok: CancellationToken,
77    tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
78    write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
79    view: Arc<BroadcastedView<D>>,
80}
81
82impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
83    pub fn new(
84        config: WriteCoordinatorConfig,
85        channels: Vec<impl ToString>,
86        initial_context: D::Context,
87        initial_snapshot: Arc<dyn StorageSnapshot>,
88        flusher: F,
89    ) -> WriteCoordinator<D, F> {
90        let (watermarks, watcher) = EpochWatermarks::new();
91        let watermarks = Arc::new(watermarks);
92
93        // Create a write channel per named input
94        let mut write_rxs = Vec::with_capacity(channels.len());
95        let mut write_txs = HashMap::new();
96        let mut pause_handles = HashMap::new();
97        for name in &channels {
98            let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
99            write_rxs.push(write_rx);
100            write_txs.insert(name.to_string(), write_tx);
101            pause_handles.insert(name.to_string(), pause_hdl);
102        }
103
104        // this is the channel that sends FlushEvents to be flushed
105        // by a background task so that the process of converting deltas
106        // to storage operations is non-blocking. for now, we apply no
107        // backpressure on this channel, so writes will block if more than
108        // one flush is pending
109        let (flush_tx, flush_rx) = mpsc::channel(2);
110
111        let flush_stop_tok = CancellationToken::new();
112        let stop_tok = CancellationToken::new();
113        let write_task = WriteCoordinatorTask::new(
114            config,
115            initial_context,
116            initial_snapshot,
117            write_rxs,
118            flush_tx,
119            watermarks.clone(),
120            stop_tok.clone(),
121            flush_stop_tok.clone(),
122        );
123
124        let view = write_task.view.clone();
125
126        let mut handles = HashMap::new();
127        for name in channels {
128            let write_tx = write_txs.remove(&name.to_string()).expect("unreachable");
129            handles.insert(
130                name.to_string(),
131                WriteCoordinatorHandle::new(write_tx, watcher.clone(), view.clone()),
132            );
133        }
134
135        let flush_task = FlushTask {
136            flusher,
137            stop_tok: flush_stop_tok,
138            flush_rx,
139            watermarks: watermarks.clone(),
140            view: view.clone(),
141            last_flushed_epoch: 0,
142        };
143
144        Self {
145            handles,
146            pause_handles,
147            tasks: Some((write_task, flush_task)),
148            write_task_jh: None,
149            stop_tok,
150            view,
151        }
152    }
153
154    pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
155        self.handles
156            .get(name)
157            .expect("unknown channel name")
158            .clone()
159    }
160
161    pub fn pause_handle(&self, name: &str) -> PauseHandle {
162        self.pause_handles
163            .get(name)
164            .expect("unknown channel name")
165            .clone()
166    }
167
168    pub fn start(&mut self) {
169        let Some((write_task, flush_task)) = self.tasks.take() else {
170            // already started
171            return;
172        };
173        let flush_task_jh = flush_task.run();
174        let write_task_jh = write_task.run(flush_task_jh);
175        self.write_task_jh = Some(write_task_jh);
176    }
177
178    pub async fn stop(mut self) -> Result<(), String> {
179        let Some(write_task_jh) = self.write_task_jh.take() else {
180            return Ok(());
181        };
182        self.stop_tok.cancel();
183        write_task_jh.await.map_err(|e| e.to_string())?
184    }
185
186    pub fn view(&self) -> Arc<View<D>> {
187        self.view.current()
188    }
189
190    pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
191        let (view_rx, initial_view) = self.view.subscribe();
192        ViewSubscriber::new(view_rx, initial_view)
193    }
194}
195
196struct WriteCoordinatorTask<D: Delta> {
197    config: WriteCoordinatorConfig,
198    delta: CurrentDelta<D>,
199    flush_tx: mpsc::Sender<FlushEvent<D>>,
200    write_rxs: Vec<PausableReceiver<D>>,
201    watermarks: Arc<EpochWatermarks>,
202    view: Arc<BroadcastedView<D>>,
203    epoch: u64,
204    delta_start_epoch: u64,
205    flush_interval: Interval,
206    stop_tok: CancellationToken,
207    flush_stop_tok: CancellationToken,
208}
209
210impl<D: Delta> WriteCoordinatorTask<D> {
211    /// Create a new write coordinator with the given flusher.
212    ///
213    /// This is useful for testing with mock flushers.
214    #[allow(clippy::too_many_arguments)]
215    pub fn new(
216        config: WriteCoordinatorConfig,
217        initial_context: D::Context,
218        initial_snapshot: Arc<dyn StorageSnapshot>,
219        write_rxs: Vec<PausableReceiver<D>>,
220        flush_tx: mpsc::Sender<FlushEvent<D>>,
221        watermarks: Arc<EpochWatermarks>,
222        stop_tok: CancellationToken,
223        flush_stop_tok: CancellationToken,
224    ) -> Self {
225        let delta = D::init(initial_context);
226
227        let initial_view = View {
228            current: delta.reader(),
229            frozen: vec![],
230            snapshot: initial_snapshot,
231            last_written_delta: None,
232        };
233        let initial_view = Arc::new(BroadcastedView::new(initial_view));
234
235        let flush_interval = interval_at(
236            Instant::now() + config.flush_interval,
237            config.flush_interval,
238        );
239        Self {
240            config,
241            delta: CurrentDelta::new(delta),
242            write_rxs,
243            flush_tx,
244            watermarks,
245            view: initial_view,
246            // Epochs start at 1 because watch channels initialize to 0 (meaning "nothing
247            // processed yet"). If the first write had epoch 0, wait() would return
248            // immediately since the condition `watermark < epoch` would be `0 < 0` = false.
249            epoch: 1,
250            delta_start_epoch: 1,
251            flush_interval,
252            stop_tok,
253            flush_stop_tok,
254        }
255    }
256
257    /// Run the coordinator event loop.
258    pub fn run(
259        mut self,
260        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
261    ) -> tokio::task::JoinHandle<Result<(), String>> {
262        tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
263    }
264
265    async fn run_coordinator(
266        mut self,
267        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
268    ) -> Result<(), String> {
269        // Reset the interval to start fresh from when run() is called
270        self.flush_interval.reset();
271
272        // Merge all write receivers into a single stream
273        let mut write_stream: SelectAll<_> = SelectAll::new();
274        for rx in self.write_rxs.drain(..) {
275            write_stream.push(
276                stream::unfold(
277                    rx,
278                    |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
279                )
280                .boxed(),
281            );
282        }
283
284        loop {
285            tokio::select! {
286                cmd = write_stream.next() => {
287                    match cmd {
288                        Some(WriteCommand::Write { write, result_tx }) => {
289                            self.handle_write(write, result_tx).await?;
290                        }
291                        Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
292                            // Send back the epoch of the last processed write
293                            let _ = epoch_tx.send(Ok(handle::WriteApplied {
294                                epoch: self.epoch.saturating_sub(1),
295                                result: (),
296                            }));
297                            self.handle_flush(flush_storage).await;
298                        }
299                        None => {
300                            // All write channels closed
301                            break;
302                        }
303                    }
304                }
305
306                _ = self.flush_interval.tick() => {
307                    self.handle_flush(false).await;
308                }
309
310                _ = self.stop_tok.cancelled() => {
311                    break;
312                }
313            }
314        }
315
316        // Flush any remaining pending writes before shutdown
317        self.handle_flush(false).await;
318
319        // Signal the flush task to stop
320        self.flush_stop_tok.cancel();
321        // Wait for the flush task to complete and propagate any errors
322        flush_task_jh
323            .await
324            .map_err(|e| format!("flush task panicked: {}", e))?
325            .map_err(|e| format!("flush task error: {}", e))
326    }
327
328    async fn handle_write(
329        &mut self,
330        write: D::Write,
331        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
332    ) -> Result<(), String> {
333        let write_epoch = self.epoch;
334        self.epoch += 1;
335
336        let result = self.delta.apply(write);
337        // Ignore error if receiver was dropped (fire-and-forget write)
338        let _ = result_tx.send(
339            result
340                .map(|apply_result| handle::WriteApplied {
341                    epoch: write_epoch,
342                    result: apply_result,
343                })
344                .map_err(|e| handle::WriteFailed {
345                    epoch: write_epoch,
346                    error: e,
347                }),
348        );
349
350        // Ignore error if no watchers are listening - this is non-fatal
351        self.watermarks.update_applied(write_epoch);
352
353        if self.delta.estimate_size() >= self.config.flush_size_threshold {
354            self.handle_flush(false).await;
355        }
356
357        Ok(())
358    }
359
360    async fn handle_flush(&mut self, flush_storage: bool) {
361        self.flush_if_delta_has_writes().await;
362        if flush_storage {
363            let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
364        }
365    }
366
367    async fn flush_if_delta_has_writes(&mut self) {
368        if self.epoch == self.delta_start_epoch {
369            return;
370        }
371
372        self.flush_interval.reset();
373
374        let epoch_range = self.delta_start_epoch..self.epoch;
375        self.delta_start_epoch = self.epoch;
376        let (frozen, frozen_reader) = self.delta.freeze_and_init();
377        let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
378        let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
379        let reader = self.delta.reader();
380        // update the view before sending the flush msg to ensure the flusher sees
381        // the frozen reader when updating the view post-flush
382        self.view.update_delta_frozen(stamped_frozen_reader, reader);
383        // this is the blocking section of the flush, new writes will not be accepted
384        // until the event is sent to the FlushTask
385        let _ = self
386            .flush_tx
387            .send(FlushEvent::FlushDelta {
388                frozen: stamped_frozen,
389            })
390            .await;
391    }
392}
393
394struct FlushTask<D: Delta, F: Flusher<D>> {
395    flusher: F,
396    stop_tok: CancellationToken,
397    flush_rx: mpsc::Receiver<FlushEvent<D>>,
398    watermarks: Arc<EpochWatermarks>,
399    view: Arc<BroadcastedView<D>>,
400    last_flushed_epoch: u64,
401}
402
403impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
404    fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
405        tokio::spawn(async move {
406            loop {
407                tokio::select! {
408                    event = self.flush_rx.recv() => {
409                        let Some(event) = event else {
410                            break;
411                        };
412                        self.handle_event(event).await?;
413                    }
414                    _ = self.stop_tok.cancelled() => {
415                        break;
416                    }
417                }
418            }
419            // drain all remaining flush events
420            while let Ok(event) = self.flush_rx.try_recv() {
421                self.handle_event(event).await;
422            }
423            Ok(())
424        })
425    }
426
427    async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
428        match event {
429            FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
430            FlushEvent::FlushStorage => {
431                self.flusher
432                    .flush_storage()
433                    .await
434                    .map_err(|e| WriteError::FlushError(e.to_string()))?;
435                self.watermarks.update_durable(self.last_flushed_epoch);
436                Ok(())
437            }
438        }
439    }
440
441    async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
442        let delta = frozen.val;
443        let epoch_range = frozen.epoch_range;
444        let snapshot = self
445            .flusher
446            .flush_delta(delta, &epoch_range)
447            .await
448            .map_err(|e| WriteError::FlushError(e.to_string()))?;
449        self.last_flushed_epoch = epoch_range.end - 1;
450        self.watermarks.update_written(self.last_flushed_epoch);
451        self.view.update_flush_finished(snapshot, epoch_range);
452        Ok(())
453    }
454}
455
456struct CurrentDelta<D: Delta> {
457    delta: Option<D>,
458}
459
460impl<D: Delta> Deref for CurrentDelta<D> {
461    type Target = D;
462
463    fn deref(&self) -> &Self::Target {
464        match &self.delta {
465            Some(d) => d,
466            None => panic!("current delta not initialized"),
467        }
468    }
469}
470
471impl<D: Delta> DerefMut for CurrentDelta<D> {
472    fn deref_mut(&mut self) -> &mut Self::Target {
473        match &mut self.delta {
474            Some(d) => d,
475            None => panic!("current delta not initialized"),
476        }
477    }
478}
479
480impl<D: Delta> CurrentDelta<D> {
481    fn new(delta: D) -> Self {
482        Self { delta: Some(delta) }
483    }
484
485    fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
486        let Some(delta) = self.delta.take() else {
487            panic!("delta not initialized");
488        };
489        let (frozen, frozen_reader, context) = delta.freeze();
490        let new_delta = D::init(context);
491        self.delta = Some(new_delta);
492        (frozen, frozen_reader)
493    }
494}
495
496pub struct EpochWatermarks {
497    applied_tx: tokio::sync::watch::Sender<u64>,
498    written_tx: tokio::sync::watch::Sender<u64>,
499    durable_tx: tokio::sync::watch::Sender<u64>,
500}
501
502impl EpochWatermarks {
503    pub fn new() -> (Self, EpochWatcher) {
504        let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
505        let (written_tx, written_rx) = tokio::sync::watch::channel(0);
506        let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
507        let watcher = EpochWatcher {
508            applied_rx,
509            written_rx,
510            durable_rx,
511        };
512        let watermarks = EpochWatermarks {
513            applied_tx,
514            written_tx,
515            durable_tx,
516        };
517        (watermarks, watcher)
518    }
519
520    pub fn update_applied(&self, epoch: u64) {
521        let _ = self.applied_tx.send(epoch);
522    }
523
524    pub fn update_written(&self, epoch: u64) {
525        let _ = self.written_tx.send(epoch);
526    }
527
528    pub fn update_durable(&self, epoch: u64) {
529        let _ = self.durable_tx.send(epoch);
530    }
531}
532
533pub(crate) struct BroadcastedView<D: Delta> {
534    inner: Mutex<BroadcastedViewInner<D>>,
535}
536
537impl<D: Delta> BroadcastedView<D> {
538    fn new(initial_view: View<D>) -> Self {
539        let (view_tx, _) = broadcast::channel(16);
540        Self {
541            inner: Mutex::new(BroadcastedViewInner {
542                view: Arc::new(initial_view),
543                view_tx,
544            }),
545        }
546    }
547
548    fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
549        self.inner
550            .lock()
551            .expect("lock poisoned")
552            .update_flush_finished(snapshot, epoch_range);
553    }
554
555    fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
556        self.inner
557            .lock()
558            .expect("lock poisoned")
559            .update_delta_frozen(frozen, reader);
560    }
561
562    fn current(&self) -> Arc<View<D>> {
563        self.inner.lock().expect("lock poisoned").current()
564    }
565
566    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
567        self.inner.lock().expect("lock poisoned").subscribe()
568    }
569}
570
571struct BroadcastedViewInner<D: Delta> {
572    view: Arc<View<D>>,
573    view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
574}
575
576impl<D: Delta> BroadcastedViewInner<D> {
577    fn update_flush_finished(
578        &mut self,
579        snapshot: Arc<dyn StorageSnapshot>,
580        epoch_range: Range<u64>,
581    ) {
582        let mut new_frozen = self.view.frozen.clone();
583        let last = new_frozen
584            .pop()
585            .expect("frozen should not be empty when flush completes");
586        assert_eq!(last.epoch_range, epoch_range);
587        self.view = Arc::new(View {
588            current: self.view.current.clone(),
589            frozen: new_frozen,
590            snapshot,
591            last_written_delta: Some(last),
592        });
593        self.view_tx.send(self.view.clone());
594    }
595
596    fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
597        // Update read state: add frozen delta to front, update current reader
598        let mut new_frozen = vec![frozen];
599        new_frozen.extend(self.view.frozen.iter().cloned());
600        self.view = Arc::new(View {
601            current: reader,
602            frozen: new_frozen,
603            snapshot: self.view.snapshot.clone(),
604            last_written_delta: self.view.last_written_delta.clone(),
605        });
606        self.view_tx.send(self.view.clone());
607    }
608
609    fn current(&self) -> Arc<View<D>> {
610        self.view.clone()
611    }
612
613    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
614        (self.view_tx.subscribe(), self.view.clone())
615    }
616}
617
618struct PausableReceiver<D: Delta> {
619    pause_rx: Option<watch::Receiver<bool>>,
620    rx: mpsc::Receiver<WriteCommand<D>>,
621}
622
623impl<D: Delta> PausableReceiver<D> {
624    async fn recv(&mut self) -> Option<WriteCommand<D>> {
625        if let Some(pause_rx) = self.pause_rx.as_mut() {
626            pause_rx.wait_for(|v| !*v).await;
627        }
628        self.rx.recv().await
629    }
630}
631
632/// A handle that can be used to pause/unpause the coordinator's consumption from a write queue
633#[derive(Clone)]
634pub struct PauseHandle {
635    pause_tx: tokio::sync::watch::Sender<bool>,
636}
637
638impl PauseHandle {
639    pub fn pause(&self) {
640        self.pause_tx.send_replace(true);
641    }
642
643    pub fn unpause(&self) {
644        self.pause_tx.send_replace(false);
645    }
646}
647
648fn pausable_channel<D: Delta>(
649    capacity: usize,
650) -> (
651    mpsc::Sender<WriteCommand<D>>,
652    PausableReceiver<D>,
653    PauseHandle,
654) {
655    let (pause_tx, pause_rx) = watch::channel(false);
656    let (tx, rx) = mpsc::channel(capacity);
657    (
658        tx,
659        PausableReceiver {
660            pause_rx: Some(pause_rx),
661            rx,
662        },
663        PauseHandle { pause_tx },
664    )
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use crate::BytesRange;
671    use crate::coordinator::Durability;
672    use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
673    use crate::storage::{PutRecordOp, Record, StorageSnapshot};
674    use crate::{Storage, StorageRead};
675    use async_trait::async_trait;
676    use bytes::Bytes;
677    use std::collections::{HashMap, HashSet};
678    use std::ops::Range;
679    use std::sync::Mutex;
680    // ============================================================================
681    // Test Infrastructure
682    // ============================================================================
683
684    #[derive(Clone, Debug)]
685    struct TestWrite {
686        key: String,
687        value: u64,
688        size: usize,
689    }
690
691    /// Context carries state that must persist across deltas (like sequence allocation)
692    #[derive(Clone, Debug, Default)]
693    struct TestContext {
694        next_seq: u64,
695        error: Option<String>,
696    }
697
698    /// A shared reader that sees writes as they are applied to the delta.
699    #[derive(Clone, Debug, Default)]
700    struct TestDeltaReader {
701        data: Arc<Mutex<HashMap<String, u64>>>,
702    }
703
704    impl TestDeltaReader {
705        fn get(&self, key: &str) -> Option<u64> {
706            self.data.lock().unwrap().get(key).copied()
707        }
708    }
709
710    /// Delta accumulates writes with sequence numbers.
711    /// Stores the context directly and updates it in place.
712    #[derive(Debug)]
713    struct TestDelta {
714        context: TestContext,
715        writes: HashMap<String, (u64, u64)>,
716        key_values: Arc<Mutex<HashMap<String, u64>>>,
717        total_size: usize,
718    }
719
720    #[derive(Clone, Debug)]
721    struct FrozenTestDelta {
722        writes: HashMap<String, (u64, u64)>,
723    }
724
725    impl Delta for TestDelta {
726        type Context = TestContext;
727        type Write = TestWrite;
728        type DeltaView = TestDeltaReader;
729        type Frozen = FrozenTestDelta;
730        type FrozenView = Arc<HashMap<String, u64>>;
731        type ApplyResult = ();
732
733        fn init(context: Self::Context) -> Self {
734            Self {
735                context,
736                writes: HashMap::default(),
737                key_values: Arc::new(Mutex::new(HashMap::default())),
738                total_size: 0,
739            }
740        }
741
742        fn apply(&mut self, write: Self::Write) -> Result<(), String> {
743            if let Some(error) = &self.context.error {
744                return Err(error.clone());
745            }
746
747            let seq = self.context.next_seq;
748            self.context.next_seq += 1;
749
750            self.writes.insert(write.key.clone(), (seq, write.value));
751            self.total_size += write.size;
752            self.key_values
753                .lock()
754                .unwrap()
755                .insert(write.key, write.value);
756            Ok(())
757        }
758
759        fn estimate_size(&self) -> usize {
760            self.total_size
761        }
762
763        fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
764            let frozen = FrozenTestDelta {
765                writes: self.writes,
766            };
767            let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
768            (frozen, frozen_view, self.context)
769        }
770
771        fn reader(&self) -> Self::DeltaView {
772            TestDeltaReader {
773                data: self.key_values.clone(),
774            }
775        }
776    }
777
778    /// Shared state for TestFlusher - allows test to inspect and control behavior
779    #[derive(Default)]
780    struct TestFlusherState {
781        flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
782        /// Signals when a flush starts (before blocking)
783        flush_started_tx: Option<oneshot::Sender<()>>,
784        /// Blocks flush until signaled
785        unblock_rx: Option<mpsc::Receiver<()>>,
786    }
787
788    #[derive(Clone)]
789    struct TestFlusher {
790        state: Arc<Mutex<TestFlusherState>>,
791        storage: Arc<InMemoryStorage>,
792    }
793
794    impl Default for TestFlusher {
795        fn default() -> Self {
796            Self {
797                state: Arc::new(Mutex::new(TestFlusherState::default())),
798                storage: Arc::new(InMemoryStorage::new()),
799            }
800        }
801    }
802
803    impl TestFlusher {
804        /// Create a flusher that blocks until signaled, with a notification when flush starts.
805        /// Returns (flusher, flush_started_rx, unblock_tx).
806        fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
807            let (started_tx, started_rx) = oneshot::channel();
808            let (unblock_tx, unblock_rx) = mpsc::channel(1);
809            let flusher = Self {
810                state: Arc::new(Mutex::new(TestFlusherState {
811                    flushed_events: Vec::new(),
812                    flush_started_tx: Some(started_tx),
813                    unblock_rx: Some(unblock_rx),
814                })),
815                storage: Arc::new(InMemoryStorage::new()),
816            };
817            (flusher, started_rx, unblock_tx)
818        }
819
820        fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
821            self.state.lock().unwrap().flushed_events.clone()
822        }
823
824        async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
825            self.storage.snapshot().await.unwrap()
826        }
827    }
828
829    #[async_trait]
830    impl Flusher<TestDelta> for TestFlusher {
831        async fn flush_delta(
832            &self,
833            frozen: FrozenTestDelta,
834            epoch_range: &Range<u64>,
835        ) -> Result<Arc<dyn StorageSnapshot>, String> {
836            // Signal that flush has started
837            let flush_started_tx = {
838                let mut state = self.state.lock().unwrap();
839                state.flush_started_tx.take()
840            };
841            if let Some(tx) = flush_started_tx {
842                let _ = tx.send(());
843            }
844
845            // Block if test wants to control timing
846            let unblock_rx = {
847                let mut state = self.state.lock().unwrap();
848                state.unblock_rx.take()
849            };
850            if let Some(mut rx) = unblock_rx {
851                rx.recv().await;
852            }
853
854            // Write records to storage
855            let records: Vec<PutRecordOp> = frozen
856                .writes
857                .iter()
858                .map(|(key, (seq, value))| {
859                    let mut buf = Vec::with_capacity(16);
860                    buf.extend_from_slice(&seq.to_le_bytes());
861                    buf.extend_from_slice(&value.to_le_bytes());
862                    Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
863                })
864                .collect();
865            self.storage
866                .put(records)
867                .await
868                .map_err(|e| format!("{}", e))?;
869
870            // Record the flush
871            {
872                let mut state = self.state.lock().unwrap();
873                state
874                    .flushed_events
875                    .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
876            }
877
878            self.storage.snapshot().await.map_err(|e| format!("{}", e))
879        }
880
881        async fn flush_storage(&self) -> Result<(), String> {
882            // Signal that flush has started
883            let flush_started_tx = {
884                let mut state = self.state.lock().unwrap();
885                state.flush_started_tx.take()
886            };
887            if let Some(tx) = flush_started_tx {
888                let _ = tx.send(());
889            }
890
891            // Block if test wants to control timing
892            let unblock_rx = {
893                let mut state = self.state.lock().unwrap();
894                state.unblock_rx.take()
895            };
896            if let Some(mut rx) = unblock_rx {
897                rx.recv().await;
898            }
899
900            Ok(())
901        }
902    }
903
904    fn test_config() -> WriteCoordinatorConfig {
905        WriteCoordinatorConfig {
906            queue_capacity: 100,
907            flush_interval: Duration::from_secs(3600), // Long interval to avoid timer flushes
908            flush_size_threshold: usize::MAX,
909        }
910    }
911
912    async fn assert_snapshot_has_rows(
913        snapshot: &Arc<dyn StorageSnapshot>,
914        expected: &[(&str, u64, u64)],
915    ) {
916        let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
917        assert_eq!(
918            records.len(),
919            expected.len(),
920            "expected {} rows but snapshot has {}",
921            expected.len(),
922            records.len()
923        );
924        let mut actual: Vec<(String, u64, u64)> = records
925            .iter()
926            .map(|r| {
927                let key = String::from_utf8(r.key.to_vec()).unwrap();
928                let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
929                let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
930                (key, seq, value)
931            })
932            .collect();
933        actual.sort_by(|a, b| a.0.cmp(&b.0));
934        let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
935        expected.sort_by(|a, b| a.0.cmp(b.0));
936        for (actual, expected) in actual.iter().zip(expected.iter()) {
937            assert_eq!(
938                actual.0, expected.0,
939                "key mismatch: got {:?}, expected {:?}",
940                actual.0, expected.0
941            );
942            assert_eq!(
943                actual.1, expected.1,
944                "seq mismatch for key {:?}: got {}, expected {}",
945                actual.0, actual.1, expected.1
946            );
947            assert_eq!(
948                actual.2, expected.2,
949                "value mismatch for key {:?}: got {}, expected {}",
950                actual.0, actual.2, expected.2
951            );
952        }
953    }
954
955    // ============================================================================
956    // Basic Write Flow Tests
957    // ============================================================================
958
959    #[tokio::test]
960    async fn should_assign_monotonic_epochs() {
961        // given
962        let flusher = TestFlusher::default();
963        let mut coordinator = WriteCoordinator::new(
964            test_config(),
965            vec!["default".to_string()],
966            TestContext::default(),
967            flusher.initial_snapshot().await,
968            flusher,
969        );
970        let handle = coordinator.handle("default");
971        coordinator.start();
972
973        // when
974        let write1 = handle
975            .try_write(TestWrite {
976                key: "a".into(),
977                value: 1,
978                size: 10,
979            })
980            .await
981            .unwrap();
982        let write2 = handle
983            .try_write(TestWrite {
984                key: "b".into(),
985                value: 2,
986                size: 10,
987            })
988            .await
989            .unwrap();
990        let write3 = handle
991            .try_write(TestWrite {
992                key: "c".into(),
993                value: 3,
994                size: 10,
995            })
996            .await
997            .unwrap();
998
999        let epoch1 = write1.epoch().await.unwrap();
1000        let epoch2 = write2.epoch().await.unwrap();
1001        let epoch3 = write3.epoch().await.unwrap();
1002
1003        // then
1004        assert!(epoch1 < epoch2);
1005        assert!(epoch2 < epoch3);
1006
1007        // cleanup
1008        coordinator.stop().await;
1009    }
1010
1011    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1012    async fn should_apply_writes_in_order() {
1013        // given
1014        let flusher = TestFlusher::default();
1015        let mut coordinator = WriteCoordinator::new(
1016            test_config(),
1017            vec!["default".to_string()],
1018            TestContext::default(),
1019            flusher.initial_snapshot().await,
1020            flusher.clone(),
1021        );
1022        let handle = coordinator.handle("default");
1023        coordinator.start();
1024
1025        // when
1026        handle
1027            .try_write(TestWrite {
1028                key: "a".into(),
1029                value: 1,
1030                size: 10,
1031            })
1032            .await
1033            .unwrap();
1034        handle
1035            .try_write(TestWrite {
1036                key: "a".into(),
1037                value: 2,
1038                size: 10,
1039            })
1040            .await
1041            .unwrap();
1042        let mut last_write = handle
1043            .try_write(TestWrite {
1044                key: "a".into(),
1045                value: 3,
1046                size: 10,
1047            })
1048            .await
1049            .unwrap();
1050
1051        handle.flush(false).await.unwrap();
1052        // Wait for flush to complete via watermark
1053        last_write.wait(Durability::Written).await.unwrap();
1054
1055        // then
1056        let events = flusher.flushed_events();
1057        assert_eq!(events.len(), 1);
1058        let frozen_delta = &events[0];
1059        let delta = &frozen_delta.val;
1060        // Writing key "a" 3x overwrites; last write wins with seq=2 (0-indexed)
1061        let (seq, value) = delta.writes.get("a").unwrap();
1062        assert_eq!(*value, 3);
1063        assert_eq!(*seq, 2);
1064
1065        // cleanup
1066        coordinator.stop().await;
1067    }
1068
1069    #[tokio::test]
1070    async fn should_update_applied_watermark_after_each_write() {
1071        // given
1072        let flusher = TestFlusher::default();
1073        let mut coordinator = WriteCoordinator::new(
1074            test_config(),
1075            vec!["default".to_string()],
1076            TestContext::default(),
1077            flusher.initial_snapshot().await,
1078            flusher,
1079        );
1080        let handle = coordinator.handle("default");
1081        coordinator.start();
1082
1083        // when
1084        let mut write_handle = handle
1085            .try_write(TestWrite {
1086                key: "a".into(),
1087                value: 1,
1088                size: 10,
1089            })
1090            .await
1091            .unwrap();
1092
1093        // then - wait should succeed immediately after write is applied
1094        let result = write_handle.wait(Durability::Applied).await;
1095        assert!(result.is_ok());
1096
1097        // cleanup
1098        coordinator.stop().await;
1099    }
1100
1101    #[tokio::test]
1102    async fn should_propagate_apply_error_to_handle() {
1103        // given
1104        let flusher = TestFlusher::default();
1105        let context = TestContext {
1106            error: Some("apply error".to_string()),
1107            ..Default::default()
1108        };
1109        let mut coordinator = WriteCoordinator::new(
1110            test_config(),
1111            vec!["default".to_string()],
1112            context,
1113            flusher.initial_snapshot().await,
1114            flusher,
1115        );
1116        let handle = coordinator.handle("default");
1117        coordinator.start();
1118
1119        // when
1120        let write = handle
1121            .try_write(TestWrite {
1122                key: "a".into(),
1123                value: 1,
1124                size: 10,
1125            })
1126            .await
1127            .unwrap();
1128
1129        let result = write.epoch().await;
1130
1131        // then
1132        assert!(
1133            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1134        );
1135
1136        // cleanup
1137        coordinator.stop().await;
1138    }
1139
1140    // ============================================================================
1141    // Manual Flush Tests
1142    // ============================================================================
1143
1144    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1145    async fn should_flush_on_command() {
1146        // given
1147        let flusher = TestFlusher::default();
1148        let mut coordinator = WriteCoordinator::new(
1149            test_config(),
1150            vec!["default".to_string()],
1151            TestContext::default(),
1152            flusher.initial_snapshot().await,
1153            flusher.clone(),
1154        );
1155        let handle = coordinator.handle("default");
1156        coordinator.start();
1157
1158        // when
1159        let mut write = handle
1160            .try_write(TestWrite {
1161                key: "a".into(),
1162                value: 1,
1163                size: 10,
1164            })
1165            .await
1166            .unwrap();
1167        handle.flush(false).await.unwrap();
1168        write.wait(Durability::Written).await.unwrap();
1169
1170        // then
1171        assert_eq!(flusher.flushed_events().len(), 1);
1172
1173        // cleanup
1174        coordinator.stop().await;
1175    }
1176
1177    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1178    async fn should_wait_on_flush_handle() {
1179        // given
1180        let flusher = TestFlusher::default();
1181        let mut coordinator = WriteCoordinator::new(
1182            test_config(),
1183            vec!["default".to_string()],
1184            TestContext::default(),
1185            flusher.initial_snapshot().await,
1186            flusher.clone(),
1187        );
1188        let handle = coordinator.handle("default");
1189        coordinator.start();
1190
1191        // when
1192        handle
1193            .try_write(TestWrite {
1194                key: "a".into(),
1195                value: 1,
1196                size: 10,
1197            })
1198            .await
1199            .unwrap();
1200        let mut flush_handle = handle.flush(false).await.unwrap();
1201
1202        // then - can wait directly on the flush handle
1203        flush_handle.wait(Durability::Written).await.unwrap();
1204        assert_eq!(flusher.flushed_events().len(), 1);
1205
1206        // cleanup
1207        coordinator.stop().await;
1208    }
1209
1210    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1211    async fn should_return_correct_epoch_from_flush_handle() {
1212        // given
1213        let flusher = TestFlusher::default();
1214        let mut coordinator = WriteCoordinator::new(
1215            test_config(),
1216            vec!["default".to_string()],
1217            TestContext::default(),
1218            flusher.initial_snapshot().await,
1219            flusher,
1220        );
1221        let handle = coordinator.handle("default");
1222        coordinator.start();
1223
1224        // when
1225        let write1 = handle
1226            .try_write(TestWrite {
1227                key: "a".into(),
1228                value: 1,
1229                size: 10,
1230            })
1231            .await
1232            .unwrap();
1233        let write2 = handle
1234            .try_write(TestWrite {
1235                key: "b".into(),
1236                value: 2,
1237                size: 10,
1238            })
1239            .await
1240            .unwrap();
1241        let flush_handle = handle.flush(false).await.unwrap();
1242
1243        // then - flush handle epoch should be the last write's epoch
1244        let flush_epoch = flush_handle.epoch().await.unwrap();
1245        let write2_epoch = write2.epoch().await.unwrap();
1246        assert_eq!(flush_epoch, write2_epoch);
1247
1248        // cleanup
1249        coordinator.stop().await;
1250    }
1251
1252    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1253    async fn should_include_all_pending_writes_in_flush() {
1254        // given
1255        let flusher = TestFlusher::default();
1256        let mut coordinator = WriteCoordinator::new(
1257            test_config(),
1258            vec!["default".to_string()],
1259            TestContext::default(),
1260            flusher.initial_snapshot().await,
1261            flusher.clone(),
1262        );
1263        let handle = coordinator.handle("default");
1264        coordinator.start();
1265
1266        // when
1267        handle
1268            .try_write(TestWrite {
1269                key: "a".into(),
1270                value: 1,
1271                size: 10,
1272            })
1273            .await
1274            .unwrap();
1275        handle
1276            .try_write(TestWrite {
1277                key: "b".into(),
1278                value: 2,
1279                size: 10,
1280            })
1281            .await
1282            .unwrap();
1283        let mut last_write = handle
1284            .try_write(TestWrite {
1285                key: "c".into(),
1286                value: 3,
1287                size: 10,
1288            })
1289            .await
1290            .unwrap();
1291
1292        handle.flush(false).await.unwrap();
1293        last_write.wait(Durability::Written).await.unwrap();
1294
1295        // then
1296        let events = flusher.flushed_events();
1297        assert_eq!(events.len(), 1);
1298        let frozen_delta = &events[0];
1299        assert_eq!(frozen_delta.val.writes.len(), 3);
1300        let snapshot = flusher.storage.snapshot().await.unwrap();
1301        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1302
1303        // cleanup
1304        coordinator.stop().await;
1305    }
1306
1307    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1308    async fn should_skip_flush_when_no_new_writes() {
1309        // given
1310        let flusher = TestFlusher::default();
1311        let mut coordinator = WriteCoordinator::new(
1312            test_config(),
1313            vec!["default".to_string()],
1314            TestContext::default(),
1315            flusher.initial_snapshot().await,
1316            flusher.clone(),
1317        );
1318        let handle = coordinator.handle("default");
1319        coordinator.start();
1320
1321        // when
1322        let mut write = handle
1323            .try_write(TestWrite {
1324                key: "a".into(),
1325                value: 1,
1326                size: 10,
1327            })
1328            .await
1329            .unwrap();
1330        handle.flush(false).await.unwrap();
1331        write.wait(Durability::Written).await.unwrap();
1332
1333        // Second flush with no new writes
1334        handle.flush(false).await.unwrap();
1335
1336        // Synchronization: write and wait for applied to ensure the flush command
1337        // has been processed (commands are processed in order)
1338        let sync_write = handle
1339            .try_write(TestWrite {
1340                key: "sync".into(),
1341                value: 0,
1342                size: 1,
1343            })
1344            .await
1345            .unwrap();
1346        sync_write.epoch().await.unwrap();
1347
1348        // then - only one flush should have occurred (the second flush was a no-op)
1349        assert_eq!(flusher.flushed_events().len(), 1);
1350
1351        // cleanup
1352        coordinator.stop().await;
1353    }
1354
1355    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1356    async fn should_update_written_watermark_after_flush() {
1357        // given
1358        let flusher = TestFlusher::default();
1359        let mut coordinator = WriteCoordinator::new(
1360            test_config(),
1361            vec!["default".to_string()],
1362            TestContext::default(),
1363            flusher.initial_snapshot().await,
1364            flusher,
1365        );
1366        let handle = coordinator.handle("default");
1367        coordinator.start();
1368
1369        // when
1370        let mut write_handle = handle
1371            .try_write(TestWrite {
1372                key: "a".into(),
1373                value: 1,
1374                size: 10,
1375            })
1376            .await
1377            .unwrap();
1378
1379        handle.flush(false).await.unwrap();
1380
1381        // then - wait for Written should succeed after flush completes
1382        let result = write_handle.wait(Durability::Written).await;
1383        assert!(result.is_ok());
1384
1385        // cleanup
1386        coordinator.stop().await;
1387    }
1388
1389    // ============================================================================
1390    // Timer-Based Flush Tests
1391    // ============================================================================
1392
1393    #[tokio::test(start_paused = true)]
1394    async fn should_flush_on_flush_interval() {
1395        // given - create coordinator with short flush interval
1396        let flusher = TestFlusher::default();
1397        let config = WriteCoordinatorConfig {
1398            queue_capacity: 100,
1399            flush_interval: Duration::from_millis(100),
1400            flush_size_threshold: usize::MAX,
1401        };
1402        let mut coordinator = WriteCoordinator::new(
1403            config,
1404            vec!["default".to_string()],
1405            TestContext::default(),
1406            flusher.initial_snapshot().await,
1407            flusher.clone(),
1408        );
1409        let handle = coordinator.handle("default");
1410        coordinator.start();
1411
1412        // when - ensure coordinator task runs and then write something
1413        tokio::task::yield_now().await;
1414        let mut write = handle
1415            .try_write(TestWrite {
1416                key: "a".into(),
1417                value: 1,
1418                size: 10,
1419            })
1420            .await
1421            .unwrap();
1422        write.wait(Durability::Applied).await.unwrap();
1423
1424        // then - no flush should have happened yet (interval was reset in run())
1425        assert_eq!(flusher.flushed_events().len(), 0);
1426
1427        // when - advance time past the flush interval from when run() was called
1428        tokio::time::advance(Duration::from_millis(150)).await;
1429        tokio::task::yield_now().await;
1430
1431        // then - flush should have happened
1432        assert_eq!(flusher.flushed_events().len(), 1);
1433        let snapshot = flusher.storage.snapshot().await.unwrap();
1434        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1435
1436        // cleanup
1437        coordinator.stop().await;
1438    }
1439
1440    // ============================================================================
1441    // Size-Threshold Flush Tests
1442    // ============================================================================
1443
1444    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1445    async fn should_flush_when_size_threshold_exceeded() {
1446        // given
1447        let flusher = TestFlusher::default();
1448        let config = WriteCoordinatorConfig {
1449            queue_capacity: 100,
1450            flush_interval: Duration::from_secs(3600),
1451            flush_size_threshold: 100, // Low threshold for testing
1452        };
1453        let mut coordinator = WriteCoordinator::new(
1454            config,
1455            vec!["default".to_string()],
1456            TestContext::default(),
1457            flusher.initial_snapshot().await,
1458            flusher.clone(),
1459        );
1460        let handle = coordinator.handle("default");
1461        coordinator.start();
1462
1463        // when - write that exceeds threshold
1464        let mut write = handle
1465            .try_write(TestWrite {
1466                key: "a".into(),
1467                value: 1,
1468                size: 150,
1469            })
1470            .await
1471            .unwrap();
1472        write.wait(Durability::Written).await.unwrap();
1473
1474        // then
1475        assert_eq!(flusher.flushed_events().len(), 1);
1476
1477        // cleanup
1478        coordinator.stop().await;
1479    }
1480
1481    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1482    async fn should_accumulate_until_threshold() {
1483        // given
1484        let flusher = TestFlusher::default();
1485        let config = WriteCoordinatorConfig {
1486            queue_capacity: 100,
1487            flush_interval: Duration::from_secs(3600),
1488            flush_size_threshold: 100,
1489        };
1490        let mut coordinator = WriteCoordinator::new(
1491            config,
1492            vec!["default".to_string()],
1493            TestContext::default(),
1494            flusher.initial_snapshot().await,
1495            flusher.clone(),
1496        );
1497        let handle = coordinator.handle("default");
1498        coordinator.start();
1499
1500        // when - small writes that accumulate
1501        for i in 0..5 {
1502            let mut w = handle
1503                .try_write(TestWrite {
1504                    key: format!("key{}", i),
1505                    value: i,
1506                    size: 15,
1507                })
1508                .await
1509                .unwrap();
1510            w.wait(Durability::Applied).await.unwrap();
1511        }
1512
1513        // then - no flush yet (75 bytes < 100 threshold)
1514        assert_eq!(flusher.flushed_events().len(), 0);
1515
1516        // when - write that pushes over threshold
1517        let mut final_write = handle
1518            .try_write(TestWrite {
1519                key: "final".into(),
1520                value: 999,
1521                size: 30,
1522            })
1523            .await
1524            .unwrap();
1525        final_write.wait(Durability::Written).await.unwrap();
1526
1527        // then - should have flushed (105 bytes > 100 threshold)
1528        assert_eq!(flusher.flushed_events().len(), 1);
1529
1530        // cleanup
1531        coordinator.stop().await;
1532    }
1533
1534    // ============================================================================
1535    // Non-Blocking Flush (Concurrency) Tests
1536    // ============================================================================
1537
1538    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1539    async fn should_accept_writes_during_flush() {
1540        // given
1541        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1542        let mut coordinator = WriteCoordinator::new(
1543            test_config(),
1544            vec!["default".to_string()],
1545            TestContext::default(),
1546            flusher.initial_snapshot().await,
1547            flusher.clone(),
1548        );
1549        let handle = coordinator.handle("default");
1550        coordinator.start();
1551
1552        // when: trigger a flush and wait for it to start (proving it's in progress)
1553        let write1 = handle
1554            .try_write(TestWrite {
1555                key: "a".into(),
1556                value: 1,
1557                size: 10,
1558            })
1559            .await
1560            .unwrap();
1561        handle.flush(false).await.unwrap();
1562        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1563
1564        // then: writes during blocked flush still succeed
1565        let write2 = handle
1566            .try_write(TestWrite {
1567                key: "b".into(),
1568                value: 2,
1569                size: 10,
1570            })
1571            .await
1572            .unwrap();
1573        assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1574
1575        // cleanup
1576        unblock_tx.send(()).await.unwrap();
1577        coordinator.stop().await;
1578    }
1579
1580    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1581    async fn should_assign_new_epochs_during_flush() {
1582        // given
1583        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1584        let mut coordinator = WriteCoordinator::new(
1585            test_config(),
1586            vec!["default".to_string()],
1587            TestContext::default(),
1588            flusher.initial_snapshot().await,
1589            flusher.clone(),
1590        );
1591        let handle = coordinator.handle("default");
1592        coordinator.start();
1593
1594        // when: write, flush, then write more during blocked flush
1595        handle
1596            .try_write(TestWrite {
1597                key: "a".into(),
1598                value: 1,
1599                size: 10,
1600            })
1601            .await
1602            .unwrap();
1603        handle.flush(false).await.unwrap();
1604        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1605
1606        // Writes during blocked flush get new epochs
1607        let w1 = handle
1608            .try_write(TestWrite {
1609                key: "b".into(),
1610                value: 2,
1611                size: 10,
1612            })
1613            .await
1614            .unwrap();
1615        let w2 = handle
1616            .try_write(TestWrite {
1617                key: "c".into(),
1618                value: 3,
1619                size: 10,
1620            })
1621            .await
1622            .unwrap();
1623
1624        // then: epochs continue incrementing
1625        let e1 = w1.epoch().await.unwrap();
1626        let e2 = w2.epoch().await.unwrap();
1627        assert!(e1 < e2);
1628
1629        // cleanup
1630        unblock_tx.send(()).await.unwrap();
1631        coordinator.stop().await;
1632    }
1633
1634    // ============================================================================
1635    // Backpressure Tests
1636    // ============================================================================
1637
1638    #[tokio::test]
1639    async fn should_return_backpressure_when_queue_full() {
1640        // given
1641        let flusher = TestFlusher::default();
1642        let config = WriteCoordinatorConfig {
1643            queue_capacity: 2,
1644            flush_interval: Duration::from_secs(3600),
1645            flush_size_threshold: usize::MAX,
1646        };
1647        let mut coordinator = WriteCoordinator::new(
1648            config,
1649            vec!["default".to_string()],
1650            TestContext::default(),
1651            flusher.initial_snapshot().await,
1652            flusher.clone(),
1653        );
1654        let handle = coordinator.handle("default");
1655        // Don't start coordinator - queue will fill
1656
1657        // when - fill the queue
1658        let _ = handle
1659            .try_write(TestWrite {
1660                key: "a".into(),
1661                value: 1,
1662                size: 10,
1663            })
1664            .await;
1665        let _ = handle
1666            .try_write(TestWrite {
1667                key: "b".into(),
1668                value: 2,
1669                size: 10,
1670            })
1671            .await;
1672
1673        // Third write should fail with backpressure
1674        let result = handle
1675            .try_write(TestWrite {
1676                key: "c".into(),
1677                value: 3,
1678                size: 10,
1679            })
1680            .await;
1681
1682        // then
1683        assert!(matches!(result, Err(WriteError::Backpressure(_))));
1684    }
1685
1686    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1687    async fn should_accept_writes_after_queue_drains() {
1688        // given
1689        let flusher = TestFlusher::default();
1690        let config = WriteCoordinatorConfig {
1691            queue_capacity: 2,
1692            flush_interval: Duration::from_secs(3600),
1693            flush_size_threshold: usize::MAX,
1694        };
1695        let mut coordinator = WriteCoordinator::new(
1696            config,
1697            vec!["default".to_string()],
1698            TestContext::default(),
1699            flusher.initial_snapshot().await,
1700            flusher.clone(),
1701        );
1702        let handle = coordinator.handle("default");
1703
1704        // Fill queue without processing
1705        let _ = handle
1706            .try_write(TestWrite {
1707                key: "a".into(),
1708                value: 1,
1709                size: 10,
1710            })
1711            .await;
1712        let mut write_b = handle
1713            .try_write(TestWrite {
1714                key: "b".into(),
1715                value: 2,
1716                size: 10,
1717            })
1718            .await
1719            .unwrap();
1720
1721        // when - start coordinator to drain queue and wait for it to process writes
1722        coordinator.start();
1723        write_b.wait(Durability::Applied).await.unwrap();
1724
1725        // then - writes should succeed now
1726        let result = handle
1727            .try_write(TestWrite {
1728                key: "c".into(),
1729                value: 3,
1730                size: 10,
1731            })
1732            .await;
1733        assert!(result.is_ok());
1734
1735        // cleanup
1736        coordinator.stop().await;
1737    }
1738
1739    // ============================================================================
1740    // Shutdown Tests
1741    // ============================================================================
1742
1743    #[tokio::test]
1744    async fn should_shutdown_cleanly_when_stop_called() {
1745        // given
1746        let flusher = TestFlusher::default();
1747        let mut coordinator = WriteCoordinator::new(
1748            test_config(),
1749            vec!["default".to_string()],
1750            TestContext::default(),
1751            flusher.initial_snapshot().await,
1752            flusher.clone(),
1753        );
1754        let handle = coordinator.handle("default");
1755        coordinator.start();
1756
1757        // when
1758        let result = coordinator.stop().await;
1759
1760        // then - coordinator should return Ok
1761        assert!(result.is_ok());
1762    }
1763
1764    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1765    async fn should_flush_pending_writes_on_shutdown() {
1766        // given
1767        let flusher = TestFlusher::default();
1768        let config = WriteCoordinatorConfig {
1769            queue_capacity: 100,
1770            flush_interval: Duration::from_secs(3600), // Long interval - won't trigger
1771            flush_size_threshold: usize::MAX,          // High threshold - won't trigger
1772        };
1773        let mut coordinator = WriteCoordinator::new(
1774            config,
1775            vec!["default".to_string()],
1776            TestContext::default(),
1777            flusher.initial_snapshot().await,
1778            flusher.clone(),
1779        );
1780        let handle = coordinator.handle("default");
1781        coordinator.start();
1782
1783        // when - write without explicit flush, then shutdown
1784        let write = handle
1785            .try_write(TestWrite {
1786                key: "a".into(),
1787                value: 1,
1788                size: 10,
1789            })
1790            .await
1791            .unwrap();
1792        let epoch = write.epoch().await.unwrap();
1793
1794        // Drop handle to trigger shutdown
1795        coordinator.stop().await;
1796
1797        // then - pending writes should have been flushed
1798        let events = flusher.flushed_events();
1799        assert_eq!(events.len(), 1);
1800        let epoch_range = &events[0].epoch_range;
1801        assert!(epoch_range.contains(&epoch));
1802    }
1803
1804    #[tokio::test]
1805    async fn should_return_shutdown_error_after_coordinator_stops() {
1806        // given
1807        let flusher = TestFlusher::default();
1808        let mut coordinator = WriteCoordinator::new(
1809            test_config(),
1810            vec!["default".to_string()],
1811            TestContext::default(),
1812            flusher.initial_snapshot().await,
1813            flusher.clone(),
1814        );
1815        let handle = coordinator.handle("default");
1816        coordinator.start();
1817
1818        // Stop coordinator
1819        coordinator.stop().await;
1820
1821        // when
1822        let result = handle
1823            .try_write(TestWrite {
1824                key: "a".into(),
1825                value: 1,
1826                size: 10,
1827            })
1828            .await;
1829
1830        // then
1831        assert!(matches!(result, Err(WriteError::Shutdown)));
1832    }
1833
1834    // ============================================================================
1835    // Epoch Range Tracking Tests
1836    // ============================================================================
1837
1838    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1839    async fn should_track_epoch_range_in_flush_event() {
1840        // given
1841        let flusher = TestFlusher::default();
1842        let mut coordinator = WriteCoordinator::new(
1843            test_config(),
1844            vec!["default".to_string()],
1845            TestContext::default(),
1846            flusher.initial_snapshot().await,
1847            flusher.clone(),
1848        );
1849        let handle = coordinator.handle("default");
1850        coordinator.start();
1851
1852        // when
1853        handle
1854            .try_write(TestWrite {
1855                key: "a".into(),
1856                value: 1,
1857                size: 10,
1858            })
1859            .await
1860            .unwrap();
1861        handle
1862            .try_write(TestWrite {
1863                key: "b".into(),
1864                value: 2,
1865                size: 10,
1866            })
1867            .await
1868            .unwrap();
1869        let mut last_write = handle
1870            .try_write(TestWrite {
1871                key: "c".into(),
1872                value: 3,
1873                size: 10,
1874            })
1875            .await
1876            .unwrap();
1877
1878        handle.flush(false).await.unwrap();
1879        last_write.wait(Durability::Written).await.unwrap();
1880
1881        // then
1882        let events = flusher.flushed_events();
1883        assert_eq!(events.len(), 1);
1884        let epoch_range = &events[0].epoch_range;
1885        assert_eq!(epoch_range.start, 1);
1886        assert_eq!(epoch_range.end, 4); // exclusive: one past the last epoch (3)
1887
1888        // cleanup
1889        coordinator.stop().await;
1890    }
1891
1892    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1893    async fn should_have_contiguous_epoch_ranges() {
1894        // given
1895        let flusher = TestFlusher::default();
1896        let mut coordinator = WriteCoordinator::new(
1897            test_config(),
1898            vec!["default".to_string()],
1899            TestContext::default(),
1900            flusher.initial_snapshot().await,
1901            flusher.clone(),
1902        );
1903        let handle = coordinator.handle("default");
1904        coordinator.start();
1905
1906        // when - first batch
1907        handle
1908            .try_write(TestWrite {
1909                key: "a".into(),
1910                value: 1,
1911                size: 10,
1912            })
1913            .await
1914            .unwrap();
1915        let mut write2 = handle
1916            .try_write(TestWrite {
1917                key: "b".into(),
1918                value: 2,
1919                size: 10,
1920            })
1921            .await
1922            .unwrap();
1923        handle.flush(false).await.unwrap();
1924        write2.wait(Durability::Written).await.unwrap();
1925
1926        // when - second batch
1927        let mut write3 = handle
1928            .try_write(TestWrite {
1929                key: "c".into(),
1930                value: 3,
1931                size: 10,
1932            })
1933            .await
1934            .unwrap();
1935        handle.flush(false).await.unwrap();
1936        write3.wait(Durability::Written).await.unwrap();
1937
1938        // then
1939        let events = flusher.flushed_events();
1940        assert_eq!(events.len(), 2);
1941
1942        let range1 = &events[0].epoch_range;
1943        let range2 = &events[1].epoch_range;
1944
1945        // Ranges should be contiguous (end of first == start of second)
1946        assert_eq!(range1.end, range2.start);
1947        assert_eq!(range1, &(1..3));
1948        assert_eq!(range2, &(3..4));
1949
1950        // cleanup
1951        coordinator.stop().await;
1952    }
1953
1954    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1955    async fn should_include_exact_epochs_in_range() {
1956        // given
1957        let flusher = TestFlusher::default();
1958        let mut coordinator = WriteCoordinator::new(
1959            test_config(),
1960            vec!["default".to_string()],
1961            TestContext::default(),
1962            flusher.initial_snapshot().await,
1963            flusher.clone(),
1964        );
1965        let handle = coordinator.handle("default");
1966        coordinator.start();
1967
1968        // when - write and capture the assigned epochs
1969        let write1 = handle
1970            .try_write(TestWrite {
1971                key: "a".into(),
1972                value: 1,
1973                size: 10,
1974            })
1975            .await
1976            .unwrap();
1977        let epoch1 = write1.epoch().await.unwrap();
1978
1979        let mut write2 = handle
1980            .try_write(TestWrite {
1981                key: "b".into(),
1982                value: 2,
1983                size: 10,
1984            })
1985            .await
1986            .unwrap();
1987        let epoch2 = write2.epoch().await.unwrap();
1988
1989        handle.flush(false).await.unwrap();
1990        write2.wait(Durability::Written).await.unwrap();
1991
1992        // then - the epoch_range should contain exactly the epochs assigned to writes
1993        let events = flusher.flushed_events();
1994        assert_eq!(events.len(), 1);
1995        let epoch_range = &events[0].epoch_range;
1996
1997        // The range should start at the first write's epoch
1998        assert_eq!(epoch_range.start, epoch1);
1999        // The range end should be one past the last write's epoch (exclusive)
2000        assert_eq!(epoch_range.end, epoch2 + 1);
2001        // Both epochs should be contained in the range
2002        assert!(epoch_range.contains(&epoch1));
2003        assert!(epoch_range.contains(&epoch2));
2004
2005        // cleanup
2006        coordinator.stop().await;
2007    }
2008
2009    // ============================================================================
2010    // State Carryover (ID Allocation) Tests
2011    // ============================================================================
2012
2013    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2014    async fn should_preserve_context_across_flushes() {
2015        // given
2016        let flusher = TestFlusher::default();
2017        let mut coordinator = WriteCoordinator::new(
2018            test_config(),
2019            vec!["default".to_string()],
2020            TestContext::default(),
2021            flusher.initial_snapshot().await,
2022            flusher.clone(),
2023        );
2024        let handle = coordinator.handle("default");
2025        coordinator.start();
2026
2027        // when - write key "a" in first batch (seq 0)
2028        let mut write1 = handle
2029            .try_write(TestWrite {
2030                key: "a".into(),
2031                value: 1,
2032                size: 10,
2033            })
2034            .await
2035            .unwrap();
2036        handle.flush(false).await.unwrap();
2037        write1.wait(Durability::Written).await.unwrap();
2038
2039        // Write to key "a" again in second batch (seq 1)
2040        let mut write2 = handle
2041            .try_write(TestWrite {
2042                key: "a".into(),
2043                value: 2,
2044                size: 10,
2045            })
2046            .await
2047            .unwrap();
2048        handle.flush(false).await.unwrap();
2049        write2.wait(Durability::Written).await.unwrap();
2050
2051        // then
2052        let events = flusher.flushed_events();
2053        assert_eq!(events.len(), 2);
2054
2055        // Batch 1: "a" with seq 0
2056        let (seq1, _) = events[0].val.writes.get("a").unwrap();
2057        assert_eq!(*seq1, 0);
2058
2059        // Batch 2: "a" with seq 1 (sequence continues)
2060        let (seq2, _) = events[1].val.writes.get("a").unwrap();
2061        assert_eq!(*seq2, 1);
2062
2063        // cleanup
2064        coordinator.stop().await;
2065    }
2066
2067    // ============================================================================
2068    // Subscribe Tests
2069    // ============================================================================
2070
2071    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2072    async fn should_receive_view_on_subscribe() {
2073        // given
2074        let flusher = TestFlusher::default();
2075        let mut coordinator = WriteCoordinator::new(
2076            test_config(),
2077            vec!["default".to_string()],
2078            TestContext::default(),
2079            flusher.initial_snapshot().await,
2080            flusher.clone(),
2081        );
2082        let handle = coordinator.handle("default");
2083        let (mut subscriber, _) = coordinator.subscribe();
2084        subscriber.initialize();
2085        coordinator.start();
2086
2087        // when
2088        handle
2089            .try_write(TestWrite {
2090                key: "a".into(),
2091                value: 1,
2092                size: 10,
2093            })
2094            .await
2095            .unwrap();
2096        handle.flush(false).await.unwrap();
2097
2098        // then - first broadcast is on freeze (delta added to frozen)
2099        let result = subscriber.recv().await;
2100        assert!(result.is_ok());
2101
2102        // cleanup
2103        coordinator.stop().await;
2104    }
2105
2106    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2107    async fn should_include_snapshot_in_view_after_flush() {
2108        // given
2109        let flusher = TestFlusher::default();
2110        let mut coordinator = WriteCoordinator::new(
2111            test_config(),
2112            vec!["default".to_string()],
2113            TestContext::default(),
2114            flusher.initial_snapshot().await,
2115            flusher.clone(),
2116        );
2117        let handle = coordinator.handle("default");
2118        let (mut subscriber, _) = coordinator.subscribe();
2119        subscriber.initialize();
2120        coordinator.start();
2121
2122        // when
2123        handle
2124            .try_write(TestWrite {
2125                key: "a".into(),
2126                value: 1,
2127                size: 10,
2128            })
2129            .await
2130            .unwrap();
2131        handle.flush(false).await.unwrap();
2132
2133        // First broadcast: freeze (frozen delta added)
2134        let _ = subscriber.recv().await.unwrap();
2135        // Second broadcast: flush complete (snapshot updated)
2136        let result = subscriber.recv().await.unwrap();
2137
2138        // then - snapshot should contain the flushed data
2139        assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2140
2141        // cleanup
2142        coordinator.stop().await;
2143    }
2144
2145    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2146    async fn should_include_delta_in_view_after_flush() {
2147        // given
2148        let flusher = TestFlusher::default();
2149        let mut coordinator = WriteCoordinator::new(
2150            test_config(),
2151            vec!["default".to_string()],
2152            TestContext::default(),
2153            flusher.initial_snapshot().await,
2154            flusher.clone(),
2155        );
2156        let handle = coordinator.handle("default");
2157        let (mut subscriber, _) = coordinator.subscribe();
2158        subscriber.initialize();
2159        coordinator.start();
2160
2161        // when
2162        handle
2163            .try_write(TestWrite {
2164                key: "a".into(),
2165                value: 42,
2166                size: 10,
2167            })
2168            .await
2169            .unwrap();
2170        handle.flush(false).await.unwrap();
2171
2172        // First broadcast: freeze
2173        let _ = subscriber.recv().await.unwrap();
2174        // Second broadcast: flush complete
2175        let result = subscriber.recv().await.unwrap();
2176
2177        // then - last_written_delta should contain the write we made
2178        let flushed = result.last_written_delta.as_ref().unwrap();
2179        assert_eq!(flushed.val.get("a"), Some(&42));
2180
2181        // cleanup
2182        coordinator.stop().await;
2183    }
2184
2185    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2186    async fn should_include_epoch_range_in_view_after_flush() {
2187        // given
2188        let flusher = TestFlusher::default();
2189        let mut coordinator = WriteCoordinator::new(
2190            test_config(),
2191            vec!["default".to_string()],
2192            TestContext::default(),
2193            flusher.initial_snapshot().await,
2194            flusher.clone(),
2195        );
2196        let handle = coordinator.handle("default");
2197        let (mut subscriber, _) = coordinator.subscribe();
2198        subscriber.initialize();
2199        coordinator.start();
2200
2201        // when
2202        let write1 = handle
2203            .try_write(TestWrite {
2204                key: "a".into(),
2205                value: 1,
2206                size: 10,
2207            })
2208            .await
2209            .unwrap();
2210        let write2 = handle
2211            .try_write(TestWrite {
2212                key: "b".into(),
2213                value: 2,
2214                size: 10,
2215            })
2216            .await
2217            .unwrap();
2218        handle.flush(false).await.unwrap();
2219
2220        // First broadcast: freeze
2221        let _ = subscriber.recv().await.unwrap();
2222        // Second broadcast: flush complete
2223        let result = subscriber.recv().await.unwrap();
2224
2225        // then - epoch range from last_written_delta should contain the epochs
2226        let flushed = result.last_written_delta.as_ref().unwrap();
2227        let epoch1 = write1.epoch().await.unwrap();
2228        let epoch2 = write2.epoch().await.unwrap();
2229        assert!(flushed.epoch_range.contains(&epoch1));
2230        assert!(flushed.epoch_range.contains(&epoch2));
2231        assert_eq!(flushed.epoch_range.start, epoch1);
2232        assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2233
2234        // cleanup
2235        coordinator.stop().await;
2236    }
2237
2238    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2239    async fn should_broadcast_frozen_delta_on_freeze() {
2240        // given
2241        let flusher = TestFlusher::default();
2242        let mut coordinator = WriteCoordinator::new(
2243            test_config(),
2244            vec!["default".to_string()],
2245            TestContext::default(),
2246            flusher.initial_snapshot().await,
2247            flusher.clone(),
2248        );
2249        let handle = coordinator.handle("default");
2250        let (mut subscriber, _) = coordinator.subscribe();
2251        subscriber.initialize();
2252        coordinator.start();
2253
2254        // when
2255        handle
2256            .try_write(TestWrite {
2257                key: "a".into(),
2258                value: 1,
2259                size: 10,
2260            })
2261            .await
2262            .unwrap();
2263        handle.flush(false).await.unwrap();
2264
2265        // then - first broadcast should have the frozen delta in the frozen vec
2266        let state = subscriber.recv().await.unwrap();
2267        assert_eq!(state.frozen.len(), 1);
2268        assert!(state.frozen[0].val.contains_key("a"));
2269
2270        // cleanup
2271        coordinator.stop().await;
2272    }
2273
2274    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2275    async fn should_remove_frozen_delta_after_flush_complete() {
2276        // given
2277        let flusher = TestFlusher::default();
2278        let mut coordinator = WriteCoordinator::new(
2279            test_config(),
2280            vec!["default".to_string()],
2281            TestContext::default(),
2282            flusher.initial_snapshot().await,
2283            flusher.clone(),
2284        );
2285        let handle = coordinator.handle("default");
2286        let (mut subscriber, _) = coordinator.subscribe();
2287        subscriber.initialize();
2288        coordinator.start();
2289
2290        // when
2291        handle
2292            .try_write(TestWrite {
2293                key: "a".into(),
2294                value: 1,
2295                size: 10,
2296            })
2297            .await
2298            .unwrap();
2299        handle.flush(false).await.unwrap();
2300
2301        // First broadcast: freeze (frozen has 1 entry)
2302        let state1 = subscriber.recv().await.unwrap();
2303        assert_eq!(state1.frozen.len(), 1);
2304
2305        // Second broadcast: flush complete (frozen is empty, last_written_delta set)
2306        let state2 = subscriber.recv().await.unwrap();
2307        assert_eq!(state2.frozen.len(), 0);
2308        assert!(state2.last_written_delta.is_some());
2309
2310        // cleanup
2311        coordinator.stop().await;
2312    }
2313
2314    // ============================================================================
2315    // Durable Flush Tests
2316    // ============================================================================
2317
2318    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2319    async fn should_flush_even_when_no_writes_if_flush_storage() {
2320        // given
2321        let flusher = TestFlusher::default();
2322        let storage = Arc::new(InMemoryStorage::new());
2323        let snapshot = storage.snapshot().await.unwrap();
2324        let mut coordinator = WriteCoordinator::new(
2325            test_config(),
2326            vec!["default".to_string()],
2327            TestContext::default(),
2328            snapshot,
2329            flusher.clone(),
2330        );
2331        let handle = coordinator.handle("default");
2332        coordinator.start();
2333
2334        // when - flush with flush_storage but no pending writes
2335        let mut flush_handle = handle.flush(true).await.unwrap();
2336        flush_handle.wait(Durability::Durable).await.unwrap();
2337
2338        // then - flusher was called (durable event sent) but no delta was recorded
2339        // (TestFlusher only records events with deltas)
2340        assert_eq!(flusher.flushed_events().len(), 0);
2341
2342        // cleanup
2343        coordinator.stop().await;
2344    }
2345
2346    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2347    async fn should_advance_durable_watermark() {
2348        // given
2349        let flusher = TestFlusher::default();
2350        let storage = Arc::new(InMemoryStorage::new());
2351        let snapshot = storage.snapshot().await.unwrap();
2352        let mut coordinator = WriteCoordinator::new(
2353            test_config(),
2354            vec!["default".to_string()],
2355            TestContext::default(),
2356            snapshot,
2357            flusher.clone(),
2358        );
2359        let handle = coordinator.handle("default");
2360        coordinator.start();
2361
2362        // when - write and flush with durable
2363        let mut write = handle
2364            .try_write(TestWrite {
2365                key: "a".into(),
2366                value: 1,
2367                size: 10,
2368            })
2369            .await
2370            .unwrap();
2371        let mut flush_handle = handle.flush(true).await.unwrap();
2372
2373        // then - can wait for Durable durability level
2374        flush_handle.wait(Durability::Durable).await.unwrap();
2375        write.wait(Durability::Durable).await.unwrap();
2376        assert_eq!(flusher.flushed_events().len(), 1);
2377
2378        // cleanup
2379        coordinator.stop().await;
2380    }
2381
2382    #[tokio::test]
2383    async fn should_see_applied_write_via_view() {
2384        // given
2385        let flusher = TestFlusher::default();
2386        let mut coordinator = WriteCoordinator::new(
2387            test_config(),
2388            vec!["default".to_string()],
2389            TestContext::default(),
2390            flusher.initial_snapshot().await,
2391            flusher,
2392        );
2393        let handle = coordinator.handle("default");
2394        coordinator.start();
2395
2396        // when
2397        let mut write = handle
2398            .try_write(TestWrite {
2399                key: "a".into(),
2400                value: 42,
2401                size: 10,
2402            })
2403            .await
2404            .unwrap();
2405        write.wait(Durability::Applied).await.unwrap();
2406
2407        // then
2408        let view = coordinator.view();
2409        assert_eq!(view.current.get("a"), Some(42));
2410
2411        // cleanup
2412        coordinator.stop().await;
2413    }
2414
2415    // ============================================================================
2416    // Multi-Channel Tests
2417    // ============================================================================
2418
2419    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2420    async fn should_flush_writes_from_multiple_channels() {
2421        // given
2422        let flusher = TestFlusher::default();
2423        let mut coordinator = WriteCoordinator::new(
2424            test_config(),
2425            vec!["ch1".to_string(), "ch2".to_string()],
2426            TestContext::default(),
2427            flusher.initial_snapshot().await,
2428            flusher.clone(),
2429        );
2430        let ch1 = coordinator.handle("ch1");
2431        let ch2 = coordinator.handle("ch2");
2432        coordinator.start();
2433
2434        // when - write to both channels, waiting for each to be applied
2435        // to ensure deterministic ordering
2436        let mut w1 = ch1
2437            .try_write(TestWrite {
2438                key: "a".into(),
2439                value: 10,
2440                size: 10,
2441            })
2442            .await
2443            .unwrap();
2444        w1.wait(Durability::Applied).await.unwrap();
2445
2446        let mut w2 = ch2
2447            .try_write(TestWrite {
2448                key: "b".into(),
2449                value: 20,
2450                size: 10,
2451            })
2452            .await
2453            .unwrap();
2454        w2.wait(Durability::Applied).await.unwrap();
2455
2456        let mut w3 = ch1
2457            .try_write(TestWrite {
2458                key: "c".into(),
2459                value: 30,
2460                size: 10,
2461            })
2462            .await
2463            .unwrap();
2464        w3.wait(Durability::Applied).await.unwrap();
2465
2466        ch1.flush(false).await.unwrap();
2467        w3.wait(Durability::Written).await.unwrap();
2468
2469        // then - snapshot should contain writes from both channels
2470        let snapshot = flusher.storage.snapshot().await.unwrap();
2471        assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2472
2473        // cleanup
2474        coordinator.stop().await;
2475    }
2476
2477    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2478    async fn should_succeed_with_write_timeout_when_queue_has_space() {
2479        // given
2480        let flusher = TestFlusher::default();
2481        let mut coordinator = WriteCoordinator::new(
2482            test_config(),
2483            vec!["default".to_string()],
2484            TestContext::default(),
2485            flusher.initial_snapshot().await,
2486            flusher.clone(),
2487        );
2488        let handle = coordinator.handle("default");
2489        coordinator.start();
2490
2491        // when
2492        let mut wh = handle
2493            .write_timeout(
2494                TestWrite {
2495                    key: "a".into(),
2496                    value: 1,
2497                    size: 10,
2498                },
2499                Duration::from_secs(1),
2500            )
2501            .await
2502            .unwrap();
2503
2504        // then
2505        let result = wh.wait(Durability::Applied).await;
2506        assert!(result.is_ok());
2507
2508        // cleanup
2509        coordinator.stop().await;
2510    }
2511
2512    #[tokio::test]
2513    async fn should_timeout_when_queue_full() {
2514        // given - queue_capacity=2, coordinator NOT started so nothing drains
2515        let flusher = TestFlusher::default();
2516        let config = WriteCoordinatorConfig {
2517            queue_capacity: 2,
2518            flush_interval: Duration::from_secs(3600),
2519            flush_size_threshold: usize::MAX,
2520        };
2521        let mut coordinator = WriteCoordinator::new(
2522            config,
2523            vec!["default".to_string()],
2524            TestContext::default(),
2525            flusher.initial_snapshot().await,
2526            flusher.clone(),
2527        );
2528        let handle = coordinator.handle("default");
2529
2530        // fill the queue
2531        let _ = handle
2532            .try_write(TestWrite {
2533                key: "a".into(),
2534                value: 1,
2535                size: 10,
2536            })
2537            .await;
2538        let _ = handle
2539            .try_write(TestWrite {
2540                key: "b".into(),
2541                value: 2,
2542                size: 10,
2543            })
2544            .await;
2545
2546        // when - third write should time out
2547        let result = handle
2548            .write_timeout(
2549                TestWrite {
2550                    key: "c".into(),
2551                    value: 3,
2552                    size: 10,
2553                },
2554                Duration::from_millis(10),
2555            )
2556            .await;
2557
2558        // then
2559        assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2560    }
2561
2562    #[tokio::test]
2563    async fn should_return_write_in_timeout_error() {
2564        // given - queue_capacity=1, coordinator NOT started
2565        let flusher = TestFlusher::default();
2566        let config = WriteCoordinatorConfig {
2567            queue_capacity: 1,
2568            flush_interval: Duration::from_secs(3600),
2569            flush_size_threshold: usize::MAX,
2570        };
2571        let mut coordinator = WriteCoordinator::new(
2572            config,
2573            vec!["default".to_string()],
2574            TestContext::default(),
2575            flusher.initial_snapshot().await,
2576            flusher.clone(),
2577        );
2578        let handle = coordinator.handle("default");
2579
2580        // fill the queue
2581        let _ = handle
2582            .try_write(TestWrite {
2583                key: "a".into(),
2584                value: 1,
2585                size: 10,
2586            })
2587            .await;
2588
2589        // when
2590        let result = handle
2591            .write_timeout(
2592                TestWrite {
2593                    key: "retry_me".into(),
2594                    value: 42,
2595                    size: 10,
2596                },
2597                Duration::from_millis(10),
2598            )
2599            .await;
2600        let Err(err) = result else {
2601            panic!("expected TimeoutError");
2602        };
2603
2604        // then - original write is returned inside the error
2605        let write = err.into_inner().expect("should contain the write");
2606        assert_eq!(write.key, "retry_me");
2607        assert_eq!(write.value, 42);
2608    }
2609
2610    #[tokio::test]
2611    async fn should_return_write_in_backpressure_error() {
2612        // given - queue_capacity=1, coordinator NOT started
2613        let flusher = TestFlusher::default();
2614        let config = WriteCoordinatorConfig {
2615            queue_capacity: 1,
2616            flush_interval: Duration::from_secs(3600),
2617            flush_size_threshold: usize::MAX,
2618        };
2619        let mut coordinator = WriteCoordinator::new(
2620            config,
2621            vec!["default".to_string()],
2622            TestContext::default(),
2623            flusher.initial_snapshot().await,
2624            flusher.clone(),
2625        );
2626        let handle = coordinator.handle("default");
2627
2628        // fill the queue
2629        let _ = handle
2630            .try_write(TestWrite {
2631                key: "a".into(),
2632                value: 1,
2633                size: 10,
2634            })
2635            .await;
2636
2637        // when
2638        let result = handle
2639            .try_write(TestWrite {
2640                key: "retry_me".into(),
2641                value: 42,
2642                size: 10,
2643            })
2644            .await;
2645        let Err(err) = result else {
2646            panic!("expected Backpressure");
2647        };
2648
2649        // then - original write is returned inside the error
2650        let write = err.into_inner().expect("should contain the write");
2651        assert_eq!(write.key, "retry_me");
2652        assert_eq!(write.value, 42);
2653    }
2654
2655    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2656    async fn should_succeed_when_queue_drains_within_timeout() {
2657        // given - queue_capacity=2, coordinator started so it drains
2658        let flusher = TestFlusher::default();
2659        let config = WriteCoordinatorConfig {
2660            queue_capacity: 2,
2661            flush_interval: Duration::from_secs(3600),
2662            flush_size_threshold: usize::MAX,
2663        };
2664        let mut coordinator = WriteCoordinator::new(
2665            config,
2666            vec!["default".to_string()],
2667            TestContext::default(),
2668            flusher.initial_snapshot().await,
2669            flusher.clone(),
2670        );
2671        let handle = coordinator.handle("default");
2672
2673        // fill the queue before starting
2674        let _ = handle
2675            .try_write(TestWrite {
2676                key: "a".into(),
2677                value: 1,
2678                size: 10,
2679            })
2680            .await;
2681        let _ = handle
2682            .try_write(TestWrite {
2683                key: "b".into(),
2684                value: 2,
2685                size: 10,
2686            })
2687            .await;
2688
2689        // when - start coordinator (begins draining) and write with generous timeout
2690        coordinator.start();
2691        let result = handle
2692            .write_timeout(
2693                TestWrite {
2694                    key: "c".into(),
2695                    value: 3,
2696                    size: 10,
2697                },
2698                Duration::from_secs(5),
2699            )
2700            .await;
2701
2702        // then
2703        assert!(result.is_ok());
2704
2705        // cleanup
2706        coordinator.stop().await;
2707    }
2708
2709    #[tokio::test]
2710    async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2711        // given
2712        let flusher = TestFlusher::default();
2713        let mut coordinator = WriteCoordinator::new(
2714            test_config(),
2715            vec!["default".to_string()],
2716            TestContext::default(),
2717            flusher.initial_snapshot().await,
2718            flusher.clone(),
2719        );
2720        let handle = coordinator.handle("default");
2721        coordinator.start();
2722
2723        // when
2724        coordinator.stop().await;
2725        let result = handle
2726            .write_timeout(
2727                TestWrite {
2728                    key: "a".into(),
2729                    value: 1,
2730                    size: 10,
2731                },
2732                Duration::from_secs(1),
2733            )
2734            .await;
2735
2736        // then
2737        assert!(matches!(result, Err(WriteError::Shutdown)));
2738    }
2739
2740    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2741    async fn should_pause_and_resume_write_channel() {
2742        // given
2743        let flusher = TestFlusher::default();
2744        let mut config = test_config();
2745        config.flush_size_threshold = usize::MAX;
2746        config.flush_interval = Duration::from_hours(24);
2747        let mut coordinator = WriteCoordinator::new(
2748            config,
2749            vec!["a", "b"],
2750            TestContext::default(),
2751            flusher.initial_snapshot().await,
2752            flusher.clone(),
2753        );
2754        let handle_a = coordinator.handle("a");
2755        let handle_b = coordinator.handle("b");
2756        let pause_handle = coordinator.pause_handle("a");
2757
2758        // pause before starting the coordinator. otherwise
2759        // there's a race condition where the PausableReceiver
2760        // makes it past pause_rx.wait_for(|v| !*v).await; and
2761        // waits for recv before we pause the handle - we could
2762        // change the behavior of the PausableReceiver to first
2763        // wait on recv and then check pause before returning
2764        // but that feels weird and is not necessary for the use
2765        // cases we have in mind
2766        pause_handle.pause();
2767        coordinator.start();
2768
2769        // when
2770        let mut result_a = handle_a
2771            .try_write(TestWrite {
2772                key: "a".into(),
2773                value: 1,
2774                size: 1,
2775            })
2776            .await
2777            .unwrap();
2778        for i in 0..1000 {
2779            handle_b
2780                .try_write(TestWrite {
2781                    key: format!("b{}", i),
2782                    value: i,
2783                    size: 1,
2784                })
2785                .await
2786                .unwrap()
2787                .wait(Durability::Applied)
2788                .await
2789                .unwrap();
2790        }
2791
2792        // then
2793        // make sure the data only contains keys from b (so a was never processed)
2794        let data = coordinator.view().current.data.lock().unwrap().clone();
2795        let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2796        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2797        pause_handle.unpause();
2798        // after resuming, wait for the data to include keys from a
2799        result_a.wait(Durability::Applied).await.unwrap();
2800        let data = coordinator.view().current.data.lock().unwrap().clone();
2801        expected.insert("a".into());
2802        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2803    }
2804}