Skip to main content

common/coordinator/
mod.rs

1#![allow(unused)]
2
3mod error;
4mod handle;
5pub mod subscriber;
6mod traits;
7
8use std::collections::HashMap;
9use std::ops::Range;
10use std::ops::{Deref, DerefMut};
11
12pub use error::{WriteError, WriteResult};
13use futures::stream::{self, SelectAll, StreamExt};
14pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
15pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
16pub use traits::{Delta, Durability, EpochStamped, Flusher};
17
18/// Event sent from the write coordinator task to the flush task.
19enum FlushEvent<D: Delta> {
20    /// Flush a frozen delta to storage.
21    FlushDelta { frozen: EpochStamped<D::Frozen> },
22    /// Ensure storage durability (e.g. call storage.flush()).
23    FlushStorage,
24}
25
26// Internal use only
27use crate::StorageRead;
28use crate::storage::StorageSnapshot;
29use async_trait::async_trait;
30pub use handle::EpochWatcher;
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::{broadcast, mpsc, oneshot, watch};
34use tokio::time::{Instant, Interval, interval_at};
35use tokio_util::sync::CancellationToken;
36
37/// Configuration for the write coordinator.
38#[derive(Debug, Clone)]
39pub struct WriteCoordinatorConfig {
40    /// Maximum number of pending writes in the queue.
41    pub queue_capacity: usize,
42    /// Interval at which to trigger automatic flushes.
43    pub flush_interval: Duration,
44    /// Delta size threshold at which to trigger a flush.
45    pub flush_size_threshold: usize,
46}
47
48impl Default for WriteCoordinatorConfig {
49    fn default() -> Self {
50        Self {
51            queue_capacity: 10_000,
52            flush_interval: Duration::from_secs(10),
53            flush_size_threshold: 64 * 1024 * 1024, // 64 MB
54        }
55    }
56}
57
58pub(crate) enum WriteCommand<D: Delta> {
59    Write {
60        write: D::Write,
61        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
62    },
63    Flush {
64        epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
65        flush_storage: bool,
66    },
67}
68
69/// The write coordinator manages write ordering, batching, and durability.
70///
71/// It accepts writes through `WriteCoordinatorHandle`, applies them to a `Delta`,
72/// and coordinates flushing through a `Flusher`.
73pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
74    handles: HashMap<String, WriteCoordinatorHandle<D>>,
75    pause_handles: HashMap<String, PauseHandle>,
76    stop_tok: CancellationToken,
77    tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
78    write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
79    view: Arc<BroadcastedView<D>>,
80}
81
82impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
83    pub fn new(
84        config: WriteCoordinatorConfig,
85        channels: Vec<impl ToString>,
86        initial_context: D::Context,
87        initial_snapshot: Arc<dyn StorageSnapshot>,
88        flusher: F,
89    ) -> WriteCoordinator<D, F> {
90        let (watermarks, watcher) = EpochWatermarks::new();
91        let watermarks = Arc::new(watermarks);
92
93        // Create a write channel per named input
94        let mut write_rxs = Vec::with_capacity(channels.len());
95        let mut write_txs = HashMap::new();
96        let mut pause_handles = HashMap::new();
97        for name in &channels {
98            let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
99            write_rxs.push(write_rx);
100            write_txs.insert(name.to_string(), write_tx);
101            pause_handles.insert(name.to_string(), pause_hdl);
102        }
103
104        // this is the channel that sends FlushEvents to be flushed
105        // by a background task so that the process of converting deltas
106        // to storage operations is non-blocking. for now, we apply no
107        // backpressure on this channel, so writes will block if more than
108        // one flush is pending
109        let (flush_tx, flush_rx) = mpsc::channel(2);
110
111        let flush_stop_tok = CancellationToken::new();
112        let stop_tok = CancellationToken::new();
113        let write_task = WriteCoordinatorTask::new(
114            config,
115            initial_context,
116            initial_snapshot,
117            write_rxs,
118            flush_tx,
119            watermarks.clone(),
120            stop_tok.clone(),
121            flush_stop_tok.clone(),
122        );
123
124        let view = write_task.view.clone();
125
126        let mut handles = HashMap::new();
127        for name in channels {
128            let write_tx = write_txs.remove(&name.to_string()).expect("unreachable");
129            handles.insert(
130                name.to_string(),
131                WriteCoordinatorHandle::new(write_tx, watcher.clone(), view.clone()),
132            );
133        }
134
135        let flush_task = FlushTask {
136            flusher,
137            stop_tok: flush_stop_tok,
138            flush_rx,
139            watermarks: watermarks.clone(),
140            view: view.clone(),
141            last_flushed_epoch: 0,
142        };
143
144        Self {
145            handles,
146            pause_handles,
147            tasks: Some((write_task, flush_task)),
148            write_task_jh: None,
149            stop_tok,
150            view,
151        }
152    }
153
154    pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
155        self.handles
156            .get(name)
157            .expect("unknown channel name")
158            .clone()
159    }
160
161    pub fn pause_handle(&self, name: &str) -> PauseHandle {
162        self.pause_handles
163            .get(name)
164            .expect("unknown channel name")
165            .clone()
166    }
167
168    pub fn start(&mut self) {
169        let Some((write_task, flush_task)) = self.tasks.take() else {
170            // already started
171            return;
172        };
173        let flush_task_jh = flush_task.run();
174        let write_task_jh = write_task.run(flush_task_jh);
175        self.write_task_jh = Some(write_task_jh);
176    }
177
178    pub async fn stop(mut self) -> Result<(), String> {
179        let Some(write_task_jh) = self.write_task_jh.take() else {
180            return Ok(());
181        };
182        self.stop_tok.cancel();
183        write_task_jh.await.map_err(|e| e.to_string())?
184    }
185
186    pub fn view(&self) -> Arc<View<D>> {
187        self.view.current()
188    }
189
190    pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
191        let (view_rx, initial_view) = self.view.subscribe();
192        ViewSubscriber::new(view_rx, initial_view)
193    }
194}
195
196struct WriteCoordinatorTask<D: Delta> {
197    config: WriteCoordinatorConfig,
198    delta: CurrentDelta<D>,
199    flush_tx: mpsc::Sender<FlushEvent<D>>,
200    write_rxs: Vec<PausableReceiver<D>>,
201    watermarks: Arc<EpochWatermarks>,
202    view: Arc<BroadcastedView<D>>,
203    epoch: u64,
204    delta_start_epoch: u64,
205    flush_interval: Interval,
206    stop_tok: CancellationToken,
207    flush_stop_tok: CancellationToken,
208}
209
210impl<D: Delta> WriteCoordinatorTask<D> {
211    /// Create a new write coordinator with the given flusher.
212    ///
213    /// This is useful for testing with mock flushers.
214    #[allow(clippy::too_many_arguments)]
215    pub fn new(
216        config: WriteCoordinatorConfig,
217        initial_context: D::Context,
218        initial_snapshot: Arc<dyn StorageSnapshot>,
219        write_rxs: Vec<PausableReceiver<D>>,
220        flush_tx: mpsc::Sender<FlushEvent<D>>,
221        watermarks: Arc<EpochWatermarks>,
222        stop_tok: CancellationToken,
223        flush_stop_tok: CancellationToken,
224    ) -> Self {
225        let delta = D::init(initial_context);
226
227        let initial_view = View {
228            current: delta.reader(),
229            frozen: vec![],
230            snapshot: initial_snapshot,
231            last_written_delta: None,
232        };
233        let initial_view = Arc::new(BroadcastedView::new(initial_view));
234
235        let flush_interval = interval_at(
236            Instant::now() + config.flush_interval,
237            config.flush_interval,
238        );
239        Self {
240            config,
241            delta: CurrentDelta::new(delta),
242            write_rxs,
243            flush_tx,
244            watermarks,
245            view: initial_view,
246            // Epochs start at 1 because watch channels initialize to 0 (meaning "nothing
247            // processed yet"). If the first write had epoch 0, wait() would return
248            // immediately since the condition `watermark < epoch` would be `0 < 0` = false.
249            epoch: 1,
250            delta_start_epoch: 1,
251            flush_interval,
252            stop_tok,
253            flush_stop_tok,
254        }
255    }
256
257    /// Run the coordinator event loop.
258    pub fn run(
259        mut self,
260        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
261    ) -> tokio::task::JoinHandle<Result<(), String>> {
262        tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
263    }
264
265    async fn run_coordinator(
266        mut self,
267        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
268    ) -> Result<(), String> {
269        // Reset the interval to start fresh from when run() is called
270        self.flush_interval.reset();
271
272        // Merge all write receivers into a single stream
273        let mut write_stream: SelectAll<_> = SelectAll::new();
274        for rx in self.write_rxs.drain(..) {
275            write_stream.push(
276                stream::unfold(
277                    rx,
278                    |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
279                )
280                .boxed(),
281            );
282        }
283
284        loop {
285            tokio::select! {
286                cmd = write_stream.next() => {
287                    match cmd {
288                        Some(WriteCommand::Write { write, result_tx }) => {
289                            self.handle_write(write, result_tx).await?;
290                        }
291                        Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
292                            // Send back the epoch of the last processed write
293                            let _ = epoch_tx.send(Ok(handle::WriteApplied {
294                                epoch: self.epoch.saturating_sub(1),
295                                result: (),
296                            }));
297                            self.handle_flush(flush_storage).await;
298                        }
299                        None => {
300                            // All write channels closed
301                            break;
302                        }
303                    }
304                }
305
306                _ = self.flush_interval.tick() => {
307                    self.handle_flush(false).await;
308                }
309
310                _ = self.stop_tok.cancelled() => {
311                    break;
312                }
313            }
314        }
315
316        // Flush any remaining pending writes before shutdown
317        self.handle_flush(false).await;
318
319        // Signal the flush task to stop
320        self.flush_stop_tok.cancel();
321        // Wait for the flush task to complete and propagate any errors
322        flush_task_jh
323            .await
324            .map_err(|e| format!("flush task panicked: {}", e))?
325            .map_err(|e| format!("flush task error: {}", e))
326    }
327
328    async fn handle_write(
329        &mut self,
330        write: D::Write,
331        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
332    ) -> Result<(), String> {
333        let write_epoch = self.epoch;
334        self.epoch += 1;
335
336        let result = self.delta.apply(write);
337        // Ignore error if receiver was dropped (fire-and-forget write)
338        let _ = result_tx.send(
339            result
340                .map(|apply_result| handle::WriteApplied {
341                    epoch: write_epoch,
342                    result: apply_result,
343                })
344                .map_err(|e| handle::WriteFailed {
345                    epoch: write_epoch,
346                    error: e,
347                }),
348        );
349
350        // Ignore error if no watchers are listening - this is non-fatal
351        self.watermarks.update_applied(write_epoch);
352
353        if self.delta.estimate_size() >= self.config.flush_size_threshold {
354            self.handle_flush(false).await;
355        }
356
357        Ok(())
358    }
359
360    async fn handle_flush(&mut self, flush_storage: bool) {
361        self.flush_if_delta_has_writes().await;
362        if flush_storage {
363            let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
364        }
365    }
366
367    async fn flush_if_delta_has_writes(&mut self) {
368        if self.epoch == self.delta_start_epoch {
369            return;
370        }
371
372        self.flush_interval.reset();
373
374        let epoch_range = self.delta_start_epoch..self.epoch;
375        self.delta_start_epoch = self.epoch;
376        let (frozen, frozen_reader) = self.delta.freeze_and_init();
377        let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
378        let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
379        let reader = self.delta.reader();
380        // update the view before sending the flush msg to ensure the flusher sees
381        // the frozen reader when updating the view post-flush
382        self.view.update_delta_frozen(stamped_frozen_reader, reader);
383        // this is the blocking section of the flush, new writes will not be accepted
384        // until the event is sent to the FlushTask
385        let _ = self
386            .flush_tx
387            .send(FlushEvent::FlushDelta {
388                frozen: stamped_frozen,
389            })
390            .await;
391    }
392}
393
394struct FlushTask<D: Delta, F: Flusher<D>> {
395    flusher: F,
396    stop_tok: CancellationToken,
397    flush_rx: mpsc::Receiver<FlushEvent<D>>,
398    watermarks: Arc<EpochWatermarks>,
399    view: Arc<BroadcastedView<D>>,
400    last_flushed_epoch: u64,
401}
402
403impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
404    fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
405        tokio::spawn(async move {
406            loop {
407                tokio::select! {
408                    event = self.flush_rx.recv() => {
409                        let Some(event) = event else {
410                            break;
411                        };
412                        self.handle_event(event).await?;
413                    }
414                    _ = self.stop_tok.cancelled() => {
415                        break;
416                    }
417                }
418            }
419            // drain all remaining flush events
420            while let Ok(event) = self.flush_rx.try_recv() {
421                self.handle_event(event).await;
422            }
423            Ok(())
424        })
425    }
426
427    async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
428        match event {
429            FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
430            FlushEvent::FlushStorage => {
431                self.flusher
432                    .flush_storage()
433                    .await
434                    .map_err(|e| WriteError::FlushError(e.to_string()))?;
435                self.watermarks.update_durable(self.last_flushed_epoch);
436                Ok(())
437            }
438        }
439    }
440
441    async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
442        let delta = frozen.val;
443        let epoch_range = frozen.epoch_range;
444        let snapshot = self
445            .flusher
446            .flush_delta(delta, &epoch_range)
447            .await
448            .map_err(|e| WriteError::FlushError(e.to_string()))?;
449        self.last_flushed_epoch = epoch_range.end - 1;
450        self.watermarks.update_written(self.last_flushed_epoch);
451        self.view.update_flush_finished(snapshot, epoch_range);
452        Ok(())
453    }
454}
455
456struct CurrentDelta<D: Delta> {
457    delta: Option<D>,
458}
459
460impl<D: Delta> Deref for CurrentDelta<D> {
461    type Target = D;
462
463    fn deref(&self) -> &Self::Target {
464        match &self.delta {
465            Some(d) => d,
466            None => panic!("current delta not initialized"),
467        }
468    }
469}
470
471impl<D: Delta> DerefMut for CurrentDelta<D> {
472    fn deref_mut(&mut self) -> &mut Self::Target {
473        match &mut self.delta {
474            Some(d) => d,
475            None => panic!("current delta not initialized"),
476        }
477    }
478}
479
480impl<D: Delta> CurrentDelta<D> {
481    fn new(delta: D) -> Self {
482        Self { delta: Some(delta) }
483    }
484
485    fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
486        let Some(delta) = self.delta.take() else {
487            panic!("delta not initialized");
488        };
489        let (frozen, frozen_reader, context) = delta.freeze();
490        let new_delta = D::init(context);
491        self.delta = Some(new_delta);
492        (frozen, frozen_reader)
493    }
494}
495
496pub struct EpochWatermarks {
497    applied_tx: tokio::sync::watch::Sender<u64>,
498    written_tx: tokio::sync::watch::Sender<u64>,
499    durable_tx: tokio::sync::watch::Sender<u64>,
500}
501
502impl EpochWatermarks {
503    pub fn new() -> (Self, EpochWatcher) {
504        let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
505        let (written_tx, written_rx) = tokio::sync::watch::channel(0);
506        let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
507        let watcher = EpochWatcher {
508            applied_rx,
509            written_rx,
510            durable_rx,
511        };
512        let watermarks = EpochWatermarks {
513            applied_tx,
514            written_tx,
515            durable_tx,
516        };
517        (watermarks, watcher)
518    }
519
520    pub fn update_applied(&self, epoch: u64) {
521        let _ = self.applied_tx.send(epoch);
522    }
523
524    pub fn update_written(&self, epoch: u64) {
525        let _ = self.written_tx.send(epoch);
526    }
527
528    pub fn update_durable(&self, epoch: u64) {
529        let _ = self.durable_tx.send(epoch);
530    }
531}
532
533pub(crate) struct BroadcastedView<D: Delta> {
534    inner: Mutex<BroadcastedViewInner<D>>,
535}
536
537impl<D: Delta> BroadcastedView<D> {
538    fn new(initial_view: View<D>) -> Self {
539        let (view_tx, _) = broadcast::channel(16);
540        Self {
541            inner: Mutex::new(BroadcastedViewInner {
542                view: Arc::new(initial_view),
543                view_tx,
544            }),
545        }
546    }
547
548    fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
549        self.inner
550            .lock()
551            .expect("lock poisoned")
552            .update_flush_finished(snapshot, epoch_range);
553    }
554
555    fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
556        self.inner
557            .lock()
558            .expect("lock poisoned")
559            .update_delta_frozen(frozen, reader);
560    }
561
562    fn current(&self) -> Arc<View<D>> {
563        self.inner.lock().expect("lock poisoned").current()
564    }
565
566    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
567        self.inner.lock().expect("lock poisoned").subscribe()
568    }
569}
570
571struct BroadcastedViewInner<D: Delta> {
572    view: Arc<View<D>>,
573    view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
574}
575
576impl<D: Delta> BroadcastedViewInner<D> {
577    fn update_flush_finished(
578        &mut self,
579        snapshot: Arc<dyn StorageSnapshot>,
580        epoch_range: Range<u64>,
581    ) {
582        let mut new_frozen = self.view.frozen.clone();
583        let last = new_frozen
584            .pop()
585            .expect("frozen should not be empty when flush completes");
586        assert_eq!(last.epoch_range, epoch_range);
587        self.view = Arc::new(View {
588            current: self.view.current.clone(),
589            frozen: new_frozen,
590            snapshot,
591            last_written_delta: Some(last),
592        });
593        self.view_tx.send(self.view.clone());
594    }
595
596    fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
597        // Update read state: add frozen delta to front, update current reader
598        let mut new_frozen = vec![frozen];
599        new_frozen.extend(self.view.frozen.iter().cloned());
600        self.view = Arc::new(View {
601            current: reader,
602            frozen: new_frozen,
603            snapshot: self.view.snapshot.clone(),
604            last_written_delta: self.view.last_written_delta.clone(),
605        });
606        self.view_tx.send(self.view.clone());
607    }
608
609    fn current(&self) -> Arc<View<D>> {
610        self.view.clone()
611    }
612
613    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
614        (self.view_tx.subscribe(), self.view.clone())
615    }
616}
617
618struct PausableReceiver<D: Delta> {
619    pause_rx: Option<watch::Receiver<bool>>,
620    rx: mpsc::Receiver<WriteCommand<D>>,
621}
622
623impl<D: Delta> PausableReceiver<D> {
624    async fn recv(&mut self) -> Option<WriteCommand<D>> {
625        if let Some(pause_rx) = self.pause_rx.as_mut() {
626            pause_rx.wait_for(|v| !*v).await;
627        }
628        self.rx.recv().await
629    }
630}
631
632/// A handle that can be used to pause/unpause the coordinator's consumption from a write queue
633#[derive(Clone)]
634pub struct PauseHandle {
635    pause_tx: tokio::sync::watch::Sender<bool>,
636}
637
638impl PauseHandle {
639    pub fn pause(&self) {
640        self.pause_tx.send_replace(true);
641    }
642
643    pub fn unpause(&self) {
644        self.pause_tx.send_replace(false);
645    }
646}
647
648fn pausable_channel<D: Delta>(
649    capacity: usize,
650) -> (
651    mpsc::Sender<WriteCommand<D>>,
652    PausableReceiver<D>,
653    PauseHandle,
654) {
655    let (pause_tx, pause_rx) = watch::channel(false);
656    let (tx, rx) = mpsc::channel(capacity);
657    (
658        tx,
659        PausableReceiver {
660            pause_rx: Some(pause_rx),
661            rx,
662        },
663        PauseHandle { pause_tx },
664    )
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use crate::BytesRange;
671    use crate::coordinator::Durability;
672    use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
673    use crate::storage::{PutRecordOp, Record, StorageSnapshot};
674    use crate::{Storage, StorageRead};
675    use async_trait::async_trait;
676    use bytes::Bytes;
677    use std::collections::{HashMap, HashSet};
678    use std::ops::Range;
679    use std::sync::Mutex;
680    // ============================================================================
681    // Test Infrastructure
682    // ============================================================================
683
684    #[derive(Clone, Debug)]
685    struct TestWrite {
686        key: String,
687        value: u64,
688        size: usize,
689    }
690
691    /// Context carries state that must persist across deltas (like sequence allocation)
692    #[derive(Clone, Debug, Default)]
693    struct TestContext {
694        next_seq: u64,
695        error: Option<String>,
696    }
697
698    /// A shared reader that sees writes as they are applied to the delta.
699    #[derive(Clone, Debug, Default)]
700    struct TestDeltaReader {
701        data: Arc<Mutex<HashMap<String, u64>>>,
702    }
703
704    impl TestDeltaReader {
705        fn get(&self, key: &str) -> Option<u64> {
706            self.data.lock().unwrap().get(key).copied()
707        }
708    }
709
710    /// Delta accumulates writes with sequence numbers.
711    /// Stores the context directly and updates it in place.
712    #[derive(Debug)]
713    struct TestDelta {
714        context: TestContext,
715        writes: HashMap<String, (u64, u64)>,
716        key_values: Arc<Mutex<HashMap<String, u64>>>,
717        total_size: usize,
718    }
719
720    #[derive(Clone, Debug)]
721    struct FrozenTestDelta {
722        writes: HashMap<String, (u64, u64)>,
723    }
724
725    impl std::fmt::Debug for View<TestDelta> {
726        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727            f.write_str("View<TestDelta>")
728        }
729    }
730
731    impl Delta for TestDelta {
732        type Context = TestContext;
733        type Write = TestWrite;
734        type DeltaView = TestDeltaReader;
735        type Frozen = FrozenTestDelta;
736        type FrozenView = Arc<HashMap<String, u64>>;
737        type ApplyResult = ();
738
739        fn init(context: Self::Context) -> Self {
740            Self {
741                context,
742                writes: HashMap::default(),
743                key_values: Arc::new(Mutex::new(HashMap::default())),
744                total_size: 0,
745            }
746        }
747
748        fn apply(&mut self, write: Self::Write) -> Result<(), String> {
749            if let Some(error) = &self.context.error {
750                return Err(error.clone());
751            }
752
753            let seq = self.context.next_seq;
754            self.context.next_seq += 1;
755
756            self.writes.insert(write.key.clone(), (seq, write.value));
757            self.total_size += write.size;
758            self.key_values
759                .lock()
760                .unwrap()
761                .insert(write.key, write.value);
762            Ok(())
763        }
764
765        fn estimate_size(&self) -> usize {
766            self.total_size
767        }
768
769        fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
770            let frozen = FrozenTestDelta {
771                writes: self.writes,
772            };
773            let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
774            (frozen, frozen_view, self.context)
775        }
776
777        fn reader(&self) -> Self::DeltaView {
778            TestDeltaReader {
779                data: self.key_values.clone(),
780            }
781        }
782    }
783
784    /// Shared state for TestFlusher - allows test to inspect and control behavior
785    #[derive(Default)]
786    struct TestFlusherState {
787        flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
788        /// Signals when a flush starts (before blocking)
789        flush_started_tx: Option<oneshot::Sender<()>>,
790        /// Blocks flush until signaled
791        unblock_rx: Option<mpsc::Receiver<()>>,
792    }
793
794    #[derive(Clone)]
795    struct TestFlusher {
796        state: Arc<Mutex<TestFlusherState>>,
797        storage: Arc<InMemoryStorage>,
798    }
799
800    impl Default for TestFlusher {
801        fn default() -> Self {
802            Self {
803                state: Arc::new(Mutex::new(TestFlusherState::default())),
804                storage: Arc::new(InMemoryStorage::new()),
805            }
806        }
807    }
808
809    impl TestFlusher {
810        /// Create a flusher that blocks until signaled, with a notification when flush starts.
811        /// Returns (flusher, flush_started_rx, unblock_tx).
812        fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
813            let (started_tx, started_rx) = oneshot::channel();
814            let (unblock_tx, unblock_rx) = mpsc::channel(1);
815            let flusher = Self {
816                state: Arc::new(Mutex::new(TestFlusherState {
817                    flushed_events: Vec::new(),
818                    flush_started_tx: Some(started_tx),
819                    unblock_rx: Some(unblock_rx),
820                })),
821                storage: Arc::new(InMemoryStorage::new()),
822            };
823            (flusher, started_rx, unblock_tx)
824        }
825
826        fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
827            self.state.lock().unwrap().flushed_events.clone()
828        }
829
830        async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
831            self.storage.snapshot().await.unwrap()
832        }
833    }
834
835    #[async_trait]
836    impl Flusher<TestDelta> for TestFlusher {
837        async fn flush_delta(
838            &mut self,
839            frozen: FrozenTestDelta,
840            epoch_range: &Range<u64>,
841        ) -> Result<Arc<dyn StorageSnapshot>, String> {
842            // Signal that flush has started
843            let flush_started_tx = {
844                let mut state = self.state.lock().unwrap();
845                state.flush_started_tx.take()
846            };
847            if let Some(tx) = flush_started_tx {
848                let _ = tx.send(());
849            }
850
851            // Block if test wants to control timing
852            let unblock_rx = {
853                let mut state = self.state.lock().unwrap();
854                state.unblock_rx.take()
855            };
856            if let Some(mut rx) = unblock_rx {
857                rx.recv().await;
858            }
859
860            // Write records to storage
861            let records: Vec<PutRecordOp> = frozen
862                .writes
863                .iter()
864                .map(|(key, (seq, value))| {
865                    let mut buf = Vec::with_capacity(16);
866                    buf.extend_from_slice(&seq.to_le_bytes());
867                    buf.extend_from_slice(&value.to_le_bytes());
868                    Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
869                })
870                .collect();
871            self.storage
872                .put(records)
873                .await
874                .map_err(|e| format!("{}", e))?;
875
876            // Record the flush
877            {
878                let mut state = self.state.lock().unwrap();
879                state
880                    .flushed_events
881                    .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
882            }
883
884            self.storage.snapshot().await.map_err(|e| format!("{}", e))
885        }
886
887        async fn flush_storage(&self) -> Result<(), String> {
888            // Signal that flush has started
889            let flush_started_tx = {
890                let mut state = self.state.lock().unwrap();
891                state.flush_started_tx.take()
892            };
893            if let Some(tx) = flush_started_tx {
894                let _ = tx.send(());
895            }
896
897            // Block if test wants to control timing
898            let unblock_rx = {
899                let mut state = self.state.lock().unwrap();
900                state.unblock_rx.take()
901            };
902            if let Some(mut rx) = unblock_rx {
903                rx.recv().await;
904            }
905
906            Ok(())
907        }
908    }
909
910    fn test_config() -> WriteCoordinatorConfig {
911        WriteCoordinatorConfig {
912            queue_capacity: 100,
913            flush_interval: Duration::from_secs(3600), // Long interval to avoid timer flushes
914            flush_size_threshold: usize::MAX,
915        }
916    }
917
918    async fn assert_snapshot_has_rows(
919        snapshot: &Arc<dyn StorageSnapshot>,
920        expected: &[(&str, u64, u64)],
921    ) {
922        let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
923        assert_eq!(
924            records.len(),
925            expected.len(),
926            "expected {} rows but snapshot has {}",
927            expected.len(),
928            records.len()
929        );
930        let mut actual: Vec<(String, u64, u64)> = records
931            .iter()
932            .map(|r| {
933                let key = String::from_utf8(r.key.to_vec()).unwrap();
934                let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
935                let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
936                (key, seq, value)
937            })
938            .collect();
939        actual.sort_by(|a, b| a.0.cmp(&b.0));
940        let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
941        expected.sort_by(|a, b| a.0.cmp(b.0));
942        for (actual, expected) in actual.iter().zip(expected.iter()) {
943            assert_eq!(
944                actual.0, expected.0,
945                "key mismatch: got {:?}, expected {:?}",
946                actual.0, expected.0
947            );
948            assert_eq!(
949                actual.1, expected.1,
950                "seq mismatch for key {:?}: got {}, expected {}",
951                actual.0, actual.1, expected.1
952            );
953            assert_eq!(
954                actual.2, expected.2,
955                "value mismatch for key {:?}: got {}, expected {}",
956                actual.0, actual.2, expected.2
957            );
958        }
959    }
960
961    // ============================================================================
962    // Basic Write Flow Tests
963    // ============================================================================
964
965    #[tokio::test]
966    async fn should_assign_monotonic_epochs() {
967        // given
968        let flusher = TestFlusher::default();
969        let mut coordinator = WriteCoordinator::new(
970            test_config(),
971            vec!["default".to_string()],
972            TestContext::default(),
973            flusher.initial_snapshot().await,
974            flusher,
975        );
976        let handle = coordinator.handle("default");
977        coordinator.start();
978
979        // when
980        let write1 = handle
981            .try_write(TestWrite {
982                key: "a".into(),
983                value: 1,
984                size: 10,
985            })
986            .await
987            .unwrap();
988        let write2 = handle
989            .try_write(TestWrite {
990                key: "b".into(),
991                value: 2,
992                size: 10,
993            })
994            .await
995            .unwrap();
996        let write3 = handle
997            .try_write(TestWrite {
998                key: "c".into(),
999                value: 3,
1000                size: 10,
1001            })
1002            .await
1003            .unwrap();
1004
1005        let epoch1 = write1.epoch().await.unwrap();
1006        let epoch2 = write2.epoch().await.unwrap();
1007        let epoch3 = write3.epoch().await.unwrap();
1008
1009        // then
1010        assert!(epoch1 < epoch2);
1011        assert!(epoch2 < epoch3);
1012
1013        // cleanup
1014        coordinator.stop().await;
1015    }
1016
1017    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1018    async fn should_apply_writes_in_order() {
1019        // given
1020        let flusher = TestFlusher::default();
1021        let mut coordinator = WriteCoordinator::new(
1022            test_config(),
1023            vec!["default".to_string()],
1024            TestContext::default(),
1025            flusher.initial_snapshot().await,
1026            flusher.clone(),
1027        );
1028        let handle = coordinator.handle("default");
1029        coordinator.start();
1030
1031        // when
1032        handle
1033            .try_write(TestWrite {
1034                key: "a".into(),
1035                value: 1,
1036                size: 10,
1037            })
1038            .await
1039            .unwrap();
1040        handle
1041            .try_write(TestWrite {
1042                key: "a".into(),
1043                value: 2,
1044                size: 10,
1045            })
1046            .await
1047            .unwrap();
1048        let mut last_write = handle
1049            .try_write(TestWrite {
1050                key: "a".into(),
1051                value: 3,
1052                size: 10,
1053            })
1054            .await
1055            .unwrap();
1056
1057        handle.flush(false).await.unwrap();
1058        // Wait for flush to complete via watermark
1059        last_write.wait(Durability::Written).await.unwrap();
1060
1061        // then
1062        let events = flusher.flushed_events();
1063        assert_eq!(events.len(), 1);
1064        let frozen_delta = &events[0];
1065        let delta = &frozen_delta.val;
1066        // Writing key "a" 3x overwrites; last write wins with seq=2 (0-indexed)
1067        let (seq, value) = delta.writes.get("a").unwrap();
1068        assert_eq!(*value, 3);
1069        assert_eq!(*seq, 2);
1070
1071        // cleanup
1072        coordinator.stop().await;
1073    }
1074
1075    #[tokio::test]
1076    async fn should_update_applied_watermark_after_each_write() {
1077        // given
1078        let flusher = TestFlusher::default();
1079        let mut coordinator = WriteCoordinator::new(
1080            test_config(),
1081            vec!["default".to_string()],
1082            TestContext::default(),
1083            flusher.initial_snapshot().await,
1084            flusher,
1085        );
1086        let handle = coordinator.handle("default");
1087        coordinator.start();
1088
1089        // when
1090        let mut write_handle = handle
1091            .try_write(TestWrite {
1092                key: "a".into(),
1093                value: 1,
1094                size: 10,
1095            })
1096            .await
1097            .unwrap();
1098
1099        // then - wait should succeed immediately after write is applied
1100        let result = write_handle.wait(Durability::Applied).await;
1101        assert!(result.is_ok());
1102
1103        // cleanup
1104        coordinator.stop().await;
1105    }
1106
1107    #[tokio::test]
1108    async fn should_propagate_apply_error_to_handle() {
1109        // given
1110        let flusher = TestFlusher::default();
1111        let context = TestContext {
1112            error: Some("apply error".to_string()),
1113            ..Default::default()
1114        };
1115        let mut coordinator = WriteCoordinator::new(
1116            test_config(),
1117            vec!["default".to_string()],
1118            context,
1119            flusher.initial_snapshot().await,
1120            flusher,
1121        );
1122        let handle = coordinator.handle("default");
1123        coordinator.start();
1124
1125        // when
1126        let write = handle
1127            .try_write(TestWrite {
1128                key: "a".into(),
1129                value: 1,
1130                size: 10,
1131            })
1132            .await
1133            .unwrap();
1134
1135        let result = write.epoch().await;
1136
1137        // then
1138        assert!(
1139            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1140        );
1141
1142        // cleanup
1143        coordinator.stop().await;
1144    }
1145
1146    // ============================================================================
1147    // Manual Flush Tests
1148    // ============================================================================
1149
1150    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1151    async fn should_flush_on_command() {
1152        // given
1153        let flusher = TestFlusher::default();
1154        let mut coordinator = WriteCoordinator::new(
1155            test_config(),
1156            vec!["default".to_string()],
1157            TestContext::default(),
1158            flusher.initial_snapshot().await,
1159            flusher.clone(),
1160        );
1161        let handle = coordinator.handle("default");
1162        coordinator.start();
1163
1164        // when
1165        let mut write = handle
1166            .try_write(TestWrite {
1167                key: "a".into(),
1168                value: 1,
1169                size: 10,
1170            })
1171            .await
1172            .unwrap();
1173        handle.flush(false).await.unwrap();
1174        write.wait(Durability::Written).await.unwrap();
1175
1176        // then
1177        assert_eq!(flusher.flushed_events().len(), 1);
1178
1179        // cleanup
1180        coordinator.stop().await;
1181    }
1182
1183    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1184    async fn should_wait_on_flush_handle() {
1185        // given
1186        let flusher = TestFlusher::default();
1187        let mut coordinator = WriteCoordinator::new(
1188            test_config(),
1189            vec!["default".to_string()],
1190            TestContext::default(),
1191            flusher.initial_snapshot().await,
1192            flusher.clone(),
1193        );
1194        let handle = coordinator.handle("default");
1195        coordinator.start();
1196
1197        // when
1198        handle
1199            .try_write(TestWrite {
1200                key: "a".into(),
1201                value: 1,
1202                size: 10,
1203            })
1204            .await
1205            .unwrap();
1206        let mut flush_handle = handle.flush(false).await.unwrap();
1207
1208        // then - can wait directly on the flush handle
1209        flush_handle.wait(Durability::Written).await.unwrap();
1210        assert_eq!(flusher.flushed_events().len(), 1);
1211
1212        // cleanup
1213        coordinator.stop().await;
1214    }
1215
1216    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1217    async fn should_return_correct_epoch_from_flush_handle() {
1218        // given
1219        let flusher = TestFlusher::default();
1220        let mut coordinator = WriteCoordinator::new(
1221            test_config(),
1222            vec!["default".to_string()],
1223            TestContext::default(),
1224            flusher.initial_snapshot().await,
1225            flusher,
1226        );
1227        let handle = coordinator.handle("default");
1228        coordinator.start();
1229
1230        // when
1231        let write1 = handle
1232            .try_write(TestWrite {
1233                key: "a".into(),
1234                value: 1,
1235                size: 10,
1236            })
1237            .await
1238            .unwrap();
1239        let write2 = handle
1240            .try_write(TestWrite {
1241                key: "b".into(),
1242                value: 2,
1243                size: 10,
1244            })
1245            .await
1246            .unwrap();
1247        let flush_handle = handle.flush(false).await.unwrap();
1248
1249        // then - flush handle epoch should be the last write's epoch
1250        let flush_epoch = flush_handle.epoch().await.unwrap();
1251        let write2_epoch = write2.epoch().await.unwrap();
1252        assert_eq!(flush_epoch, write2_epoch);
1253
1254        // cleanup
1255        coordinator.stop().await;
1256    }
1257
1258    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1259    async fn should_include_all_pending_writes_in_flush() {
1260        // given
1261        let flusher = TestFlusher::default();
1262        let mut coordinator = WriteCoordinator::new(
1263            test_config(),
1264            vec!["default".to_string()],
1265            TestContext::default(),
1266            flusher.initial_snapshot().await,
1267            flusher.clone(),
1268        );
1269        let handle = coordinator.handle("default");
1270        coordinator.start();
1271
1272        // when
1273        handle
1274            .try_write(TestWrite {
1275                key: "a".into(),
1276                value: 1,
1277                size: 10,
1278            })
1279            .await
1280            .unwrap();
1281        handle
1282            .try_write(TestWrite {
1283                key: "b".into(),
1284                value: 2,
1285                size: 10,
1286            })
1287            .await
1288            .unwrap();
1289        let mut last_write = handle
1290            .try_write(TestWrite {
1291                key: "c".into(),
1292                value: 3,
1293                size: 10,
1294            })
1295            .await
1296            .unwrap();
1297
1298        handle.flush(false).await.unwrap();
1299        last_write.wait(Durability::Written).await.unwrap();
1300
1301        // then
1302        let events = flusher.flushed_events();
1303        assert_eq!(events.len(), 1);
1304        let frozen_delta = &events[0];
1305        assert_eq!(frozen_delta.val.writes.len(), 3);
1306        let snapshot = flusher.storage.snapshot().await.unwrap();
1307        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1308
1309        // cleanup
1310        coordinator.stop().await;
1311    }
1312
1313    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1314    async fn should_skip_flush_when_no_new_writes() {
1315        // given
1316        let flusher = TestFlusher::default();
1317        let mut coordinator = WriteCoordinator::new(
1318            test_config(),
1319            vec!["default".to_string()],
1320            TestContext::default(),
1321            flusher.initial_snapshot().await,
1322            flusher.clone(),
1323        );
1324        let handle = coordinator.handle("default");
1325        coordinator.start();
1326
1327        // when
1328        let mut write = handle
1329            .try_write(TestWrite {
1330                key: "a".into(),
1331                value: 1,
1332                size: 10,
1333            })
1334            .await
1335            .unwrap();
1336        handle.flush(false).await.unwrap();
1337        write.wait(Durability::Written).await.unwrap();
1338
1339        // Second flush with no new writes
1340        handle.flush(false).await.unwrap();
1341
1342        // Synchronization: write and wait for applied to ensure the flush command
1343        // has been processed (commands are processed in order)
1344        let sync_write = handle
1345            .try_write(TestWrite {
1346                key: "sync".into(),
1347                value: 0,
1348                size: 1,
1349            })
1350            .await
1351            .unwrap();
1352        sync_write.epoch().await.unwrap();
1353
1354        // then - only one flush should have occurred (the second flush was a no-op)
1355        assert_eq!(flusher.flushed_events().len(), 1);
1356
1357        // cleanup
1358        coordinator.stop().await;
1359    }
1360
1361    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1362    async fn should_update_written_watermark_after_flush() {
1363        // given
1364        let flusher = TestFlusher::default();
1365        let mut coordinator = WriteCoordinator::new(
1366            test_config(),
1367            vec!["default".to_string()],
1368            TestContext::default(),
1369            flusher.initial_snapshot().await,
1370            flusher,
1371        );
1372        let handle = coordinator.handle("default");
1373        coordinator.start();
1374
1375        // when
1376        let mut write_handle = handle
1377            .try_write(TestWrite {
1378                key: "a".into(),
1379                value: 1,
1380                size: 10,
1381            })
1382            .await
1383            .unwrap();
1384
1385        handle.flush(false).await.unwrap();
1386
1387        // then - wait for Written should succeed after flush completes
1388        let result = write_handle.wait(Durability::Written).await;
1389        assert!(result.is_ok());
1390
1391        // cleanup
1392        coordinator.stop().await;
1393    }
1394
1395    // ============================================================================
1396    // Timer-Based Flush Tests
1397    // ============================================================================
1398
1399    #[tokio::test(start_paused = true)]
1400    async fn should_flush_on_flush_interval() {
1401        // given - create coordinator with short flush interval
1402        let flusher = TestFlusher::default();
1403        let config = WriteCoordinatorConfig {
1404            queue_capacity: 100,
1405            flush_interval: Duration::from_millis(100),
1406            flush_size_threshold: usize::MAX,
1407        };
1408        let mut coordinator = WriteCoordinator::new(
1409            config,
1410            vec!["default".to_string()],
1411            TestContext::default(),
1412            flusher.initial_snapshot().await,
1413            flusher.clone(),
1414        );
1415        let handle = coordinator.handle("default");
1416        coordinator.start();
1417
1418        // when - ensure coordinator task runs and then write something
1419        tokio::task::yield_now().await;
1420        let mut write = handle
1421            .try_write(TestWrite {
1422                key: "a".into(),
1423                value: 1,
1424                size: 10,
1425            })
1426            .await
1427            .unwrap();
1428        write.wait(Durability::Applied).await.unwrap();
1429
1430        // then - no flush should have happened yet (interval was reset in run())
1431        assert_eq!(flusher.flushed_events().len(), 0);
1432
1433        // when - advance time past the flush interval from when run() was called
1434        tokio::time::advance(Duration::from_millis(150)).await;
1435        tokio::task::yield_now().await;
1436
1437        // then - flush should have happened
1438        assert_eq!(flusher.flushed_events().len(), 1);
1439        let snapshot = flusher.storage.snapshot().await.unwrap();
1440        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1441
1442        // cleanup
1443        coordinator.stop().await;
1444    }
1445
1446    // ============================================================================
1447    // Size-Threshold Flush Tests
1448    // ============================================================================
1449
1450    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1451    async fn should_flush_when_size_threshold_exceeded() {
1452        // given
1453        let flusher = TestFlusher::default();
1454        let config = WriteCoordinatorConfig {
1455            queue_capacity: 100,
1456            flush_interval: Duration::from_secs(3600),
1457            flush_size_threshold: 100, // Low threshold for testing
1458        };
1459        let mut coordinator = WriteCoordinator::new(
1460            config,
1461            vec!["default".to_string()],
1462            TestContext::default(),
1463            flusher.initial_snapshot().await,
1464            flusher.clone(),
1465        );
1466        let handle = coordinator.handle("default");
1467        coordinator.start();
1468
1469        // when - write that exceeds threshold
1470        let mut write = handle
1471            .try_write(TestWrite {
1472                key: "a".into(),
1473                value: 1,
1474                size: 150,
1475            })
1476            .await
1477            .unwrap();
1478        write.wait(Durability::Written).await.unwrap();
1479
1480        // then
1481        assert_eq!(flusher.flushed_events().len(), 1);
1482
1483        // cleanup
1484        coordinator.stop().await;
1485    }
1486
1487    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1488    async fn should_accumulate_until_threshold() {
1489        // given
1490        let flusher = TestFlusher::default();
1491        let config = WriteCoordinatorConfig {
1492            queue_capacity: 100,
1493            flush_interval: Duration::from_secs(3600),
1494            flush_size_threshold: 100,
1495        };
1496        let mut coordinator = WriteCoordinator::new(
1497            config,
1498            vec!["default".to_string()],
1499            TestContext::default(),
1500            flusher.initial_snapshot().await,
1501            flusher.clone(),
1502        );
1503        let handle = coordinator.handle("default");
1504        coordinator.start();
1505
1506        // when - small writes that accumulate
1507        for i in 0..5 {
1508            let mut w = handle
1509                .try_write(TestWrite {
1510                    key: format!("key{}", i),
1511                    value: i,
1512                    size: 15,
1513                })
1514                .await
1515                .unwrap();
1516            w.wait(Durability::Applied).await.unwrap();
1517        }
1518
1519        // then - no flush yet (75 bytes < 100 threshold)
1520        assert_eq!(flusher.flushed_events().len(), 0);
1521
1522        // when - write that pushes over threshold
1523        let mut final_write = handle
1524            .try_write(TestWrite {
1525                key: "final".into(),
1526                value: 999,
1527                size: 30,
1528            })
1529            .await
1530            .unwrap();
1531        final_write.wait(Durability::Written).await.unwrap();
1532
1533        // then - should have flushed (105 bytes > 100 threshold)
1534        assert_eq!(flusher.flushed_events().len(), 1);
1535
1536        // cleanup
1537        coordinator.stop().await;
1538    }
1539
1540    // ============================================================================
1541    // Non-Blocking Flush (Concurrency) Tests
1542    // ============================================================================
1543
1544    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1545    async fn should_accept_writes_during_flush() {
1546        // given
1547        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1548        let mut coordinator = WriteCoordinator::new(
1549            test_config(),
1550            vec!["default".to_string()],
1551            TestContext::default(),
1552            flusher.initial_snapshot().await,
1553            flusher.clone(),
1554        );
1555        let handle = coordinator.handle("default");
1556        coordinator.start();
1557
1558        // when: trigger a flush and wait for it to start (proving it's in progress)
1559        let write1 = handle
1560            .try_write(TestWrite {
1561                key: "a".into(),
1562                value: 1,
1563                size: 10,
1564            })
1565            .await
1566            .unwrap();
1567        handle.flush(false).await.unwrap();
1568        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1569
1570        // then: writes during blocked flush still succeed
1571        let write2 = handle
1572            .try_write(TestWrite {
1573                key: "b".into(),
1574                value: 2,
1575                size: 10,
1576            })
1577            .await
1578            .unwrap();
1579        assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1580
1581        // cleanup
1582        unblock_tx.send(()).await.unwrap();
1583        coordinator.stop().await;
1584    }
1585
1586    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1587    async fn should_assign_new_epochs_during_flush() {
1588        // given
1589        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1590        let mut coordinator = WriteCoordinator::new(
1591            test_config(),
1592            vec!["default".to_string()],
1593            TestContext::default(),
1594            flusher.initial_snapshot().await,
1595            flusher.clone(),
1596        );
1597        let handle = coordinator.handle("default");
1598        coordinator.start();
1599
1600        // when: write, flush, then write more during blocked flush
1601        handle
1602            .try_write(TestWrite {
1603                key: "a".into(),
1604                value: 1,
1605                size: 10,
1606            })
1607            .await
1608            .unwrap();
1609        handle.flush(false).await.unwrap();
1610        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1611
1612        // Writes during blocked flush get new epochs
1613        let w1 = handle
1614            .try_write(TestWrite {
1615                key: "b".into(),
1616                value: 2,
1617                size: 10,
1618            })
1619            .await
1620            .unwrap();
1621        let w2 = handle
1622            .try_write(TestWrite {
1623                key: "c".into(),
1624                value: 3,
1625                size: 10,
1626            })
1627            .await
1628            .unwrap();
1629
1630        // then: epochs continue incrementing
1631        let e1 = w1.epoch().await.unwrap();
1632        let e2 = w2.epoch().await.unwrap();
1633        assert!(e1 < e2);
1634
1635        // cleanup
1636        unblock_tx.send(()).await.unwrap();
1637        coordinator.stop().await;
1638    }
1639
1640    // ============================================================================
1641    // Backpressure Tests
1642    // ============================================================================
1643
1644    #[tokio::test]
1645    async fn should_return_backpressure_when_queue_full() {
1646        // given
1647        let flusher = TestFlusher::default();
1648        let config = WriteCoordinatorConfig {
1649            queue_capacity: 2,
1650            flush_interval: Duration::from_secs(3600),
1651            flush_size_threshold: usize::MAX,
1652        };
1653        let mut coordinator = WriteCoordinator::new(
1654            config,
1655            vec!["default".to_string()],
1656            TestContext::default(),
1657            flusher.initial_snapshot().await,
1658            flusher.clone(),
1659        );
1660        let handle = coordinator.handle("default");
1661        // Don't start coordinator - queue will fill
1662
1663        // when - fill the queue
1664        let _ = handle
1665            .try_write(TestWrite {
1666                key: "a".into(),
1667                value: 1,
1668                size: 10,
1669            })
1670            .await;
1671        let _ = handle
1672            .try_write(TestWrite {
1673                key: "b".into(),
1674                value: 2,
1675                size: 10,
1676            })
1677            .await;
1678
1679        // Third write should fail with backpressure
1680        let result = handle
1681            .try_write(TestWrite {
1682                key: "c".into(),
1683                value: 3,
1684                size: 10,
1685            })
1686            .await;
1687
1688        // then
1689        assert!(matches!(result, Err(WriteError::Backpressure(_))));
1690    }
1691
1692    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1693    async fn should_accept_writes_after_queue_drains() {
1694        // given
1695        let flusher = TestFlusher::default();
1696        let config = WriteCoordinatorConfig {
1697            queue_capacity: 2,
1698            flush_interval: Duration::from_secs(3600),
1699            flush_size_threshold: usize::MAX,
1700        };
1701        let mut coordinator = WriteCoordinator::new(
1702            config,
1703            vec!["default".to_string()],
1704            TestContext::default(),
1705            flusher.initial_snapshot().await,
1706            flusher.clone(),
1707        );
1708        let handle = coordinator.handle("default");
1709
1710        // Fill queue without processing
1711        let _ = handle
1712            .try_write(TestWrite {
1713                key: "a".into(),
1714                value: 1,
1715                size: 10,
1716            })
1717            .await;
1718        let mut write_b = handle
1719            .try_write(TestWrite {
1720                key: "b".into(),
1721                value: 2,
1722                size: 10,
1723            })
1724            .await
1725            .unwrap();
1726
1727        // when - start coordinator to drain queue and wait for it to process writes
1728        coordinator.start();
1729        write_b.wait(Durability::Applied).await.unwrap();
1730
1731        // then - writes should succeed now
1732        let result = handle
1733            .try_write(TestWrite {
1734                key: "c".into(),
1735                value: 3,
1736                size: 10,
1737            })
1738            .await;
1739        assert!(result.is_ok());
1740
1741        // cleanup
1742        coordinator.stop().await;
1743    }
1744
1745    // ============================================================================
1746    // Shutdown Tests
1747    // ============================================================================
1748
1749    #[tokio::test]
1750    async fn should_shutdown_cleanly_when_stop_called() {
1751        // given
1752        let flusher = TestFlusher::default();
1753        let mut coordinator = WriteCoordinator::new(
1754            test_config(),
1755            vec!["default".to_string()],
1756            TestContext::default(),
1757            flusher.initial_snapshot().await,
1758            flusher.clone(),
1759        );
1760        let handle = coordinator.handle("default");
1761        coordinator.start();
1762
1763        // when
1764        let result = coordinator.stop().await;
1765
1766        // then - coordinator should return Ok
1767        assert!(result.is_ok());
1768    }
1769
1770    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1771    async fn should_flush_pending_writes_on_shutdown() {
1772        // given
1773        let flusher = TestFlusher::default();
1774        let config = WriteCoordinatorConfig {
1775            queue_capacity: 100,
1776            flush_interval: Duration::from_secs(3600), // Long interval - won't trigger
1777            flush_size_threshold: usize::MAX,          // High threshold - won't trigger
1778        };
1779        let mut coordinator = WriteCoordinator::new(
1780            config,
1781            vec!["default".to_string()],
1782            TestContext::default(),
1783            flusher.initial_snapshot().await,
1784            flusher.clone(),
1785        );
1786        let handle = coordinator.handle("default");
1787        coordinator.start();
1788
1789        // when - write without explicit flush, then shutdown
1790        let write = handle
1791            .try_write(TestWrite {
1792                key: "a".into(),
1793                value: 1,
1794                size: 10,
1795            })
1796            .await
1797            .unwrap();
1798        let epoch = write.epoch().await.unwrap();
1799
1800        // Drop handle to trigger shutdown
1801        coordinator.stop().await;
1802
1803        // then - pending writes should have been flushed
1804        let events = flusher.flushed_events();
1805        assert_eq!(events.len(), 1);
1806        let epoch_range = &events[0].epoch_range;
1807        assert!(epoch_range.contains(&epoch));
1808    }
1809
1810    #[tokio::test]
1811    async fn should_return_shutdown_error_after_coordinator_stops() {
1812        // given
1813        let flusher = TestFlusher::default();
1814        let mut coordinator = WriteCoordinator::new(
1815            test_config(),
1816            vec!["default".to_string()],
1817            TestContext::default(),
1818            flusher.initial_snapshot().await,
1819            flusher.clone(),
1820        );
1821        let handle = coordinator.handle("default");
1822        coordinator.start();
1823
1824        // Stop coordinator
1825        coordinator.stop().await;
1826
1827        // when
1828        let result = handle
1829            .try_write(TestWrite {
1830                key: "a".into(),
1831                value: 1,
1832                size: 10,
1833            })
1834            .await;
1835
1836        // then
1837        assert!(matches!(result, Err(WriteError::Shutdown)));
1838    }
1839
1840    // ============================================================================
1841    // Epoch Range Tracking Tests
1842    // ============================================================================
1843
1844    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1845    async fn should_track_epoch_range_in_flush_event() {
1846        // given
1847        let flusher = TestFlusher::default();
1848        let mut coordinator = WriteCoordinator::new(
1849            test_config(),
1850            vec!["default".to_string()],
1851            TestContext::default(),
1852            flusher.initial_snapshot().await,
1853            flusher.clone(),
1854        );
1855        let handle = coordinator.handle("default");
1856        coordinator.start();
1857
1858        // when
1859        handle
1860            .try_write(TestWrite {
1861                key: "a".into(),
1862                value: 1,
1863                size: 10,
1864            })
1865            .await
1866            .unwrap();
1867        handle
1868            .try_write(TestWrite {
1869                key: "b".into(),
1870                value: 2,
1871                size: 10,
1872            })
1873            .await
1874            .unwrap();
1875        let mut last_write = handle
1876            .try_write(TestWrite {
1877                key: "c".into(),
1878                value: 3,
1879                size: 10,
1880            })
1881            .await
1882            .unwrap();
1883
1884        handle.flush(false).await.unwrap();
1885        last_write.wait(Durability::Written).await.unwrap();
1886
1887        // then
1888        let events = flusher.flushed_events();
1889        assert_eq!(events.len(), 1);
1890        let epoch_range = &events[0].epoch_range;
1891        assert_eq!(epoch_range.start, 1);
1892        assert_eq!(epoch_range.end, 4); // exclusive: one past the last epoch (3)
1893
1894        // cleanup
1895        coordinator.stop().await;
1896    }
1897
1898    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1899    async fn should_have_contiguous_epoch_ranges() {
1900        // given
1901        let flusher = TestFlusher::default();
1902        let mut coordinator = WriteCoordinator::new(
1903            test_config(),
1904            vec!["default".to_string()],
1905            TestContext::default(),
1906            flusher.initial_snapshot().await,
1907            flusher.clone(),
1908        );
1909        let handle = coordinator.handle("default");
1910        coordinator.start();
1911
1912        // when - first batch
1913        handle
1914            .try_write(TestWrite {
1915                key: "a".into(),
1916                value: 1,
1917                size: 10,
1918            })
1919            .await
1920            .unwrap();
1921        let mut write2 = handle
1922            .try_write(TestWrite {
1923                key: "b".into(),
1924                value: 2,
1925                size: 10,
1926            })
1927            .await
1928            .unwrap();
1929        handle.flush(false).await.unwrap();
1930        write2.wait(Durability::Written).await.unwrap();
1931
1932        // when - second batch
1933        let mut write3 = handle
1934            .try_write(TestWrite {
1935                key: "c".into(),
1936                value: 3,
1937                size: 10,
1938            })
1939            .await
1940            .unwrap();
1941        handle.flush(false).await.unwrap();
1942        write3.wait(Durability::Written).await.unwrap();
1943
1944        // then
1945        let events = flusher.flushed_events();
1946        assert_eq!(events.len(), 2);
1947
1948        let range1 = &events[0].epoch_range;
1949        let range2 = &events[1].epoch_range;
1950
1951        // Ranges should be contiguous (end of first == start of second)
1952        assert_eq!(range1.end, range2.start);
1953        assert_eq!(range1, &(1..3));
1954        assert_eq!(range2, &(3..4));
1955
1956        // cleanup
1957        coordinator.stop().await;
1958    }
1959
1960    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1961    async fn should_include_exact_epochs_in_range() {
1962        // given
1963        let flusher = TestFlusher::default();
1964        let mut coordinator = WriteCoordinator::new(
1965            test_config(),
1966            vec!["default".to_string()],
1967            TestContext::default(),
1968            flusher.initial_snapshot().await,
1969            flusher.clone(),
1970        );
1971        let handle = coordinator.handle("default");
1972        coordinator.start();
1973
1974        // when - write and capture the assigned epochs
1975        let write1 = handle
1976            .try_write(TestWrite {
1977                key: "a".into(),
1978                value: 1,
1979                size: 10,
1980            })
1981            .await
1982            .unwrap();
1983        let epoch1 = write1.epoch().await.unwrap();
1984
1985        let mut write2 = handle
1986            .try_write(TestWrite {
1987                key: "b".into(),
1988                value: 2,
1989                size: 10,
1990            })
1991            .await
1992            .unwrap();
1993        let epoch2 = write2.epoch().await.unwrap();
1994
1995        handle.flush(false).await.unwrap();
1996        write2.wait(Durability::Written).await.unwrap();
1997
1998        // then - the epoch_range should contain exactly the epochs assigned to writes
1999        let events = flusher.flushed_events();
2000        assert_eq!(events.len(), 1);
2001        let epoch_range = &events[0].epoch_range;
2002
2003        // The range should start at the first write's epoch
2004        assert_eq!(epoch_range.start, epoch1);
2005        // The range end should be one past the last write's epoch (exclusive)
2006        assert_eq!(epoch_range.end, epoch2 + 1);
2007        // Both epochs should be contained in the range
2008        assert!(epoch_range.contains(&epoch1));
2009        assert!(epoch_range.contains(&epoch2));
2010
2011        // cleanup
2012        coordinator.stop().await;
2013    }
2014
2015    // ============================================================================
2016    // State Carryover (ID Allocation) Tests
2017    // ============================================================================
2018
2019    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2020    async fn should_preserve_context_across_flushes() {
2021        // given
2022        let flusher = TestFlusher::default();
2023        let mut coordinator = WriteCoordinator::new(
2024            test_config(),
2025            vec!["default".to_string()],
2026            TestContext::default(),
2027            flusher.initial_snapshot().await,
2028            flusher.clone(),
2029        );
2030        let handle = coordinator.handle("default");
2031        coordinator.start();
2032
2033        // when - write key "a" in first batch (seq 0)
2034        let mut write1 = handle
2035            .try_write(TestWrite {
2036                key: "a".into(),
2037                value: 1,
2038                size: 10,
2039            })
2040            .await
2041            .unwrap();
2042        handle.flush(false).await.unwrap();
2043        write1.wait(Durability::Written).await.unwrap();
2044
2045        // Write to key "a" again in second batch (seq 1)
2046        let mut write2 = handle
2047            .try_write(TestWrite {
2048                key: "a".into(),
2049                value: 2,
2050                size: 10,
2051            })
2052            .await
2053            .unwrap();
2054        handle.flush(false).await.unwrap();
2055        write2.wait(Durability::Written).await.unwrap();
2056
2057        // then
2058        let events = flusher.flushed_events();
2059        assert_eq!(events.len(), 2);
2060
2061        // Batch 1: "a" with seq 0
2062        let (seq1, _) = events[0].val.writes.get("a").unwrap();
2063        assert_eq!(*seq1, 0);
2064
2065        // Batch 2: "a" with seq 1 (sequence continues)
2066        let (seq2, _) = events[1].val.writes.get("a").unwrap();
2067        assert_eq!(*seq2, 1);
2068
2069        // cleanup
2070        coordinator.stop().await;
2071    }
2072
2073    // ============================================================================
2074    // Subscribe Tests
2075    // ============================================================================
2076
2077    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2078    async fn should_receive_view_on_subscribe() {
2079        // given
2080        let flusher = TestFlusher::default();
2081        let mut coordinator = WriteCoordinator::new(
2082            test_config(),
2083            vec!["default".to_string()],
2084            TestContext::default(),
2085            flusher.initial_snapshot().await,
2086            flusher.clone(),
2087        );
2088        let handle = coordinator.handle("default");
2089        let (mut subscriber, _) = coordinator.subscribe();
2090        subscriber.initialize();
2091        coordinator.start();
2092
2093        // when
2094        handle
2095            .try_write(TestWrite {
2096                key: "a".into(),
2097                value: 1,
2098                size: 10,
2099            })
2100            .await
2101            .unwrap();
2102        handle.flush(false).await.unwrap();
2103
2104        // then - first broadcast is on freeze (delta added to frozen)
2105        let result = subscriber.recv().await;
2106        assert!(result.is_ok());
2107
2108        // cleanup
2109        coordinator.stop().await;
2110    }
2111
2112    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2113    async fn should_include_snapshot_in_view_after_flush() {
2114        // given
2115        let flusher = TestFlusher::default();
2116        let mut coordinator = WriteCoordinator::new(
2117            test_config(),
2118            vec!["default".to_string()],
2119            TestContext::default(),
2120            flusher.initial_snapshot().await,
2121            flusher.clone(),
2122        );
2123        let handle = coordinator.handle("default");
2124        let (mut subscriber, _) = coordinator.subscribe();
2125        subscriber.initialize();
2126        coordinator.start();
2127
2128        // when
2129        handle
2130            .try_write(TestWrite {
2131                key: "a".into(),
2132                value: 1,
2133                size: 10,
2134            })
2135            .await
2136            .unwrap();
2137        handle.flush(false).await.unwrap();
2138
2139        // First broadcast: freeze (frozen delta added)
2140        let _ = subscriber.recv().await.unwrap();
2141        // Second broadcast: flush complete (snapshot updated)
2142        let result = subscriber.recv().await.unwrap();
2143
2144        // then - snapshot should contain the flushed data
2145        assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2146
2147        // cleanup
2148        coordinator.stop().await;
2149    }
2150
2151    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2152    async fn should_include_delta_in_view_after_flush() {
2153        // given
2154        let flusher = TestFlusher::default();
2155        let mut coordinator = WriteCoordinator::new(
2156            test_config(),
2157            vec!["default".to_string()],
2158            TestContext::default(),
2159            flusher.initial_snapshot().await,
2160            flusher.clone(),
2161        );
2162        let handle = coordinator.handle("default");
2163        let (mut subscriber, _) = coordinator.subscribe();
2164        subscriber.initialize();
2165        coordinator.start();
2166
2167        // when
2168        handle
2169            .try_write(TestWrite {
2170                key: "a".into(),
2171                value: 42,
2172                size: 10,
2173            })
2174            .await
2175            .unwrap();
2176        handle.flush(false).await.unwrap();
2177
2178        // First broadcast: freeze
2179        let _ = subscriber.recv().await.unwrap();
2180        // Second broadcast: flush complete
2181        let result = subscriber.recv().await.unwrap();
2182
2183        // then - last_written_delta should contain the write we made
2184        let flushed = result.last_written_delta.as_ref().unwrap();
2185        assert_eq!(flushed.val.get("a"), Some(&42));
2186
2187        // cleanup
2188        coordinator.stop().await;
2189    }
2190
2191    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2192    async fn should_include_epoch_range_in_view_after_flush() {
2193        // given
2194        let flusher = TestFlusher::default();
2195        let mut coordinator = WriteCoordinator::new(
2196            test_config(),
2197            vec!["default".to_string()],
2198            TestContext::default(),
2199            flusher.initial_snapshot().await,
2200            flusher.clone(),
2201        );
2202        let handle = coordinator.handle("default");
2203        let (mut subscriber, _) = coordinator.subscribe();
2204        subscriber.initialize();
2205        coordinator.start();
2206
2207        // when
2208        let write1 = handle
2209            .try_write(TestWrite {
2210                key: "a".into(),
2211                value: 1,
2212                size: 10,
2213            })
2214            .await
2215            .unwrap();
2216        let write2 = handle
2217            .try_write(TestWrite {
2218                key: "b".into(),
2219                value: 2,
2220                size: 10,
2221            })
2222            .await
2223            .unwrap();
2224        handle.flush(false).await.unwrap();
2225
2226        // First broadcast: freeze
2227        let _ = subscriber.recv().await.unwrap();
2228        // Second broadcast: flush complete
2229        let result = subscriber.recv().await.unwrap();
2230
2231        // then - epoch range from last_written_delta should contain the epochs
2232        let flushed = result.last_written_delta.as_ref().unwrap();
2233        let epoch1 = write1.epoch().await.unwrap();
2234        let epoch2 = write2.epoch().await.unwrap();
2235        assert!(flushed.epoch_range.contains(&epoch1));
2236        assert!(flushed.epoch_range.contains(&epoch2));
2237        assert_eq!(flushed.epoch_range.start, epoch1);
2238        assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2239
2240        // cleanup
2241        coordinator.stop().await;
2242    }
2243
2244    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2245    async fn should_broadcast_frozen_delta_on_freeze() {
2246        // given
2247        let flusher = TestFlusher::default();
2248        let mut coordinator = WriteCoordinator::new(
2249            test_config(),
2250            vec!["default".to_string()],
2251            TestContext::default(),
2252            flusher.initial_snapshot().await,
2253            flusher.clone(),
2254        );
2255        let handle = coordinator.handle("default");
2256        let (mut subscriber, _) = coordinator.subscribe();
2257        subscriber.initialize();
2258        coordinator.start();
2259
2260        // when
2261        handle
2262            .try_write(TestWrite {
2263                key: "a".into(),
2264                value: 1,
2265                size: 10,
2266            })
2267            .await
2268            .unwrap();
2269        handle.flush(false).await.unwrap();
2270
2271        // then - first broadcast should have the frozen delta in the frozen vec
2272        let state = subscriber.recv().await.unwrap();
2273        assert_eq!(state.frozen.len(), 1);
2274        assert!(state.frozen[0].val.contains_key("a"));
2275
2276        // cleanup
2277        coordinator.stop().await;
2278    }
2279
2280    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2281    async fn should_remove_frozen_delta_after_flush_complete() {
2282        // given
2283        let flusher = TestFlusher::default();
2284        let mut coordinator = WriteCoordinator::new(
2285            test_config(),
2286            vec!["default".to_string()],
2287            TestContext::default(),
2288            flusher.initial_snapshot().await,
2289            flusher.clone(),
2290        );
2291        let handle = coordinator.handle("default");
2292        let (mut subscriber, _) = coordinator.subscribe();
2293        subscriber.initialize();
2294        coordinator.start();
2295
2296        // when
2297        handle
2298            .try_write(TestWrite {
2299                key: "a".into(),
2300                value: 1,
2301                size: 10,
2302            })
2303            .await
2304            .unwrap();
2305        handle.flush(false).await.unwrap();
2306
2307        // First broadcast: freeze (frozen has 1 entry)
2308        let state1 = subscriber.recv().await.unwrap();
2309        assert_eq!(state1.frozen.len(), 1);
2310
2311        // Second broadcast: flush complete (frozen is empty, last_written_delta set)
2312        let state2 = subscriber.recv().await.unwrap();
2313        assert_eq!(state2.frozen.len(), 0);
2314        assert!(state2.last_written_delta.is_some());
2315
2316        // cleanup
2317        coordinator.stop().await;
2318    }
2319
2320    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2321    async fn should_recover_from_message_lost_subscriber() {
2322        // given - a coordinator with a small broadcast buffer (16)
2323        let flusher = TestFlusher::default();
2324        let mut coordinator = WriteCoordinator::new(
2325            test_config(),
2326            vec!["default".to_string()],
2327            TestContext::default(),
2328            flusher.initial_snapshot().await,
2329            flusher.clone(),
2330        );
2331        let handle = coordinator.handle("default");
2332        let (mut subscriber, _) = coordinator.subscribe();
2333        subscriber.initialize();
2334        coordinator.start();
2335
2336        // when - write and flush enough times to overflow the broadcast buffer
2337        // (capacity is 16, so ~20 flushes should cause lag)
2338        // The subscriber never called recv(), so all broadcasts accumulate and overflow the buffer
2339        for i in 0..20 {
2340            let _write_handle = handle
2341                .try_write(TestWrite {
2342                    key: format!("key_{}", i),
2343                    value: i as u64,
2344                    size: 10,
2345                })
2346                .await
2347                .unwrap();
2348            let _ = handle.flush(false).await.unwrap();
2349        }
2350
2351        // make changes durable
2352        let mut watermark = handle
2353            .flush(true)
2354            .await
2355            .expect("flush(true) should succeed");
2356
2357        // wait for durability watermark
2358        watermark.wait(Durability::Durable).await;
2359
2360        // expect SubscribeError to be MessageLost
2361        let result = subscriber
2362            .recv()
2363            .await
2364            .expect_err("expected recv() to yield an error");
2365        assert!(matches!(result, SubscribeError::MessageLost));
2366
2367        // when - resubscribe to recover
2368        let (rx, initial_view) = handle.subscribe();
2369        (subscriber, _) = ViewSubscriber::new(rx, initial_view);
2370        let view = subscriber.initialize();
2371
2372        // then - the fresh view should reflect the current state
2373        // (all 20 writes should be in the snapshot after all the flushes)
2374        let records = view.snapshot.scan(BytesRange::unbounded()).await.unwrap();
2375        assert!(
2376            records.len() >= 20,
2377            "expected at least 20 rows, got {}",
2378            records.len()
2379        );
2380
2381        // and - we should be able to receive future broadcasts
2382        let _write_handle = handle
2383            .try_write(TestWrite {
2384                key: "post_recovery".into(),
2385                value: 100,
2386                size: 10,
2387            })
2388            .await
2389            .unwrap();
2390        let _ = handle.flush(false).await.unwrap();
2391
2392        let result = subscriber.recv().await;
2393        assert!(result.is_ok());
2394
2395        // cleanup
2396        coordinator.stop().await;
2397    }
2398
2399    // ============================================================================
2400    // Durable Flush Tests
2401    // ============================================================================
2402
2403    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2404    async fn should_flush_even_when_no_writes_if_flush_storage() {
2405        // given
2406        let flusher = TestFlusher::default();
2407        let storage = Arc::new(InMemoryStorage::new());
2408        let snapshot = storage.snapshot().await.unwrap();
2409        let mut coordinator = WriteCoordinator::new(
2410            test_config(),
2411            vec!["default".to_string()],
2412            TestContext::default(),
2413            snapshot,
2414            flusher.clone(),
2415        );
2416        let handle = coordinator.handle("default");
2417        coordinator.start();
2418
2419        // when - flush with flush_storage but no pending writes
2420        let mut flush_handle = handle.flush(true).await.unwrap();
2421        flush_handle.wait(Durability::Durable).await.unwrap();
2422
2423        // then - flusher was called (durable event sent) but no delta was recorded
2424        // (TestFlusher only records events with deltas)
2425        assert_eq!(flusher.flushed_events().len(), 0);
2426
2427        // cleanup
2428        coordinator.stop().await;
2429    }
2430
2431    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2432    async fn should_advance_durable_watermark() {
2433        // given
2434        let flusher = TestFlusher::default();
2435        let storage = Arc::new(InMemoryStorage::new());
2436        let snapshot = storage.snapshot().await.unwrap();
2437        let mut coordinator = WriteCoordinator::new(
2438            test_config(),
2439            vec!["default".to_string()],
2440            TestContext::default(),
2441            snapshot,
2442            flusher.clone(),
2443        );
2444        let handle = coordinator.handle("default");
2445        coordinator.start();
2446
2447        // when - write and flush with durable
2448        let mut write = handle
2449            .try_write(TestWrite {
2450                key: "a".into(),
2451                value: 1,
2452                size: 10,
2453            })
2454            .await
2455            .unwrap();
2456        let mut flush_handle = handle.flush(true).await.unwrap();
2457
2458        // then - can wait for Durable durability level
2459        flush_handle.wait(Durability::Durable).await.unwrap();
2460        write.wait(Durability::Durable).await.unwrap();
2461        assert_eq!(flusher.flushed_events().len(), 1);
2462
2463        // cleanup
2464        coordinator.stop().await;
2465    }
2466
2467    #[tokio::test]
2468    async fn should_see_applied_write_via_view() {
2469        // given
2470        let flusher = TestFlusher::default();
2471        let mut coordinator = WriteCoordinator::new(
2472            test_config(),
2473            vec!["default".to_string()],
2474            TestContext::default(),
2475            flusher.initial_snapshot().await,
2476            flusher,
2477        );
2478        let handle = coordinator.handle("default");
2479        coordinator.start();
2480
2481        // when
2482        let mut write = handle
2483            .try_write(TestWrite {
2484                key: "a".into(),
2485                value: 42,
2486                size: 10,
2487            })
2488            .await
2489            .unwrap();
2490        write.wait(Durability::Applied).await.unwrap();
2491
2492        // then
2493        let view = coordinator.view();
2494        assert_eq!(view.current.get("a"), Some(42));
2495
2496        // cleanup
2497        coordinator.stop().await;
2498    }
2499
2500    // ============================================================================
2501    // Multi-Channel Tests
2502    // ============================================================================
2503
2504    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2505    async fn should_flush_writes_from_multiple_channels() {
2506        // given
2507        let flusher = TestFlusher::default();
2508        let mut coordinator = WriteCoordinator::new(
2509            test_config(),
2510            vec!["ch1".to_string(), "ch2".to_string()],
2511            TestContext::default(),
2512            flusher.initial_snapshot().await,
2513            flusher.clone(),
2514        );
2515        let ch1 = coordinator.handle("ch1");
2516        let ch2 = coordinator.handle("ch2");
2517        coordinator.start();
2518
2519        // when - write to both channels, waiting for each to be applied
2520        // to ensure deterministic ordering
2521        let mut w1 = ch1
2522            .try_write(TestWrite {
2523                key: "a".into(),
2524                value: 10,
2525                size: 10,
2526            })
2527            .await
2528            .unwrap();
2529        w1.wait(Durability::Applied).await.unwrap();
2530
2531        let mut w2 = ch2
2532            .try_write(TestWrite {
2533                key: "b".into(),
2534                value: 20,
2535                size: 10,
2536            })
2537            .await
2538            .unwrap();
2539        w2.wait(Durability::Applied).await.unwrap();
2540
2541        let mut w3 = ch1
2542            .try_write(TestWrite {
2543                key: "c".into(),
2544                value: 30,
2545                size: 10,
2546            })
2547            .await
2548            .unwrap();
2549        w3.wait(Durability::Applied).await.unwrap();
2550
2551        ch1.flush(false).await.unwrap();
2552        w3.wait(Durability::Written).await.unwrap();
2553
2554        // then - snapshot should contain writes from both channels
2555        let snapshot = flusher.storage.snapshot().await.unwrap();
2556        assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2557
2558        // cleanup
2559        coordinator.stop().await;
2560    }
2561
2562    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2563    async fn should_succeed_with_write_timeout_when_queue_has_space() {
2564        // given
2565        let flusher = TestFlusher::default();
2566        let mut coordinator = WriteCoordinator::new(
2567            test_config(),
2568            vec!["default".to_string()],
2569            TestContext::default(),
2570            flusher.initial_snapshot().await,
2571            flusher.clone(),
2572        );
2573        let handle = coordinator.handle("default");
2574        coordinator.start();
2575
2576        // when
2577        let mut wh = handle
2578            .write_timeout(
2579                TestWrite {
2580                    key: "a".into(),
2581                    value: 1,
2582                    size: 10,
2583                },
2584                Duration::from_secs(1),
2585            )
2586            .await
2587            .unwrap();
2588
2589        // then
2590        let result = wh.wait(Durability::Applied).await;
2591        assert!(result.is_ok());
2592
2593        // cleanup
2594        coordinator.stop().await;
2595    }
2596
2597    #[tokio::test]
2598    async fn should_timeout_when_queue_full() {
2599        // given - queue_capacity=2, coordinator NOT started so nothing drains
2600        let flusher = TestFlusher::default();
2601        let config = WriteCoordinatorConfig {
2602            queue_capacity: 2,
2603            flush_interval: Duration::from_secs(3600),
2604            flush_size_threshold: usize::MAX,
2605        };
2606        let mut coordinator = WriteCoordinator::new(
2607            config,
2608            vec!["default".to_string()],
2609            TestContext::default(),
2610            flusher.initial_snapshot().await,
2611            flusher.clone(),
2612        );
2613        let handle = coordinator.handle("default");
2614
2615        // fill the queue
2616        let _ = handle
2617            .try_write(TestWrite {
2618                key: "a".into(),
2619                value: 1,
2620                size: 10,
2621            })
2622            .await;
2623        let _ = handle
2624            .try_write(TestWrite {
2625                key: "b".into(),
2626                value: 2,
2627                size: 10,
2628            })
2629            .await;
2630
2631        // when - third write should time out
2632        let result = handle
2633            .write_timeout(
2634                TestWrite {
2635                    key: "c".into(),
2636                    value: 3,
2637                    size: 10,
2638                },
2639                Duration::from_millis(10),
2640            )
2641            .await;
2642
2643        // then
2644        assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2645    }
2646
2647    #[tokio::test]
2648    async fn should_return_write_in_timeout_error() {
2649        // given - queue_capacity=1, coordinator NOT started
2650        let flusher = TestFlusher::default();
2651        let config = WriteCoordinatorConfig {
2652            queue_capacity: 1,
2653            flush_interval: Duration::from_secs(3600),
2654            flush_size_threshold: usize::MAX,
2655        };
2656        let mut coordinator = WriteCoordinator::new(
2657            config,
2658            vec!["default".to_string()],
2659            TestContext::default(),
2660            flusher.initial_snapshot().await,
2661            flusher.clone(),
2662        );
2663        let handle = coordinator.handle("default");
2664
2665        // fill the queue
2666        let _ = handle
2667            .try_write(TestWrite {
2668                key: "a".into(),
2669                value: 1,
2670                size: 10,
2671            })
2672            .await;
2673
2674        // when
2675        let result = handle
2676            .write_timeout(
2677                TestWrite {
2678                    key: "retry_me".into(),
2679                    value: 42,
2680                    size: 10,
2681                },
2682                Duration::from_millis(10),
2683            )
2684            .await;
2685        let Err(err) = result else {
2686            panic!("expected TimeoutError");
2687        };
2688
2689        // then - original write is returned inside the error
2690        let write = err.into_inner().expect("should contain the write");
2691        assert_eq!(write.key, "retry_me");
2692        assert_eq!(write.value, 42);
2693    }
2694
2695    #[tokio::test]
2696    async fn should_return_write_in_backpressure_error() {
2697        // given - queue_capacity=1, coordinator NOT started
2698        let flusher = TestFlusher::default();
2699        let config = WriteCoordinatorConfig {
2700            queue_capacity: 1,
2701            flush_interval: Duration::from_secs(3600),
2702            flush_size_threshold: usize::MAX,
2703        };
2704        let mut coordinator = WriteCoordinator::new(
2705            config,
2706            vec!["default".to_string()],
2707            TestContext::default(),
2708            flusher.initial_snapshot().await,
2709            flusher.clone(),
2710        );
2711        let handle = coordinator.handle("default");
2712
2713        // fill the queue
2714        let _ = handle
2715            .try_write(TestWrite {
2716                key: "a".into(),
2717                value: 1,
2718                size: 10,
2719            })
2720            .await;
2721
2722        // when
2723        let result = handle
2724            .try_write(TestWrite {
2725                key: "retry_me".into(),
2726                value: 42,
2727                size: 10,
2728            })
2729            .await;
2730        let Err(err) = result else {
2731            panic!("expected Backpressure");
2732        };
2733
2734        // then - original write is returned inside the error
2735        let write = err.into_inner().expect("should contain the write");
2736        assert_eq!(write.key, "retry_me");
2737        assert_eq!(write.value, 42);
2738    }
2739
2740    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2741    async fn should_succeed_when_queue_drains_within_timeout() {
2742        // given - queue_capacity=2, coordinator started so it drains
2743        let flusher = TestFlusher::default();
2744        let config = WriteCoordinatorConfig {
2745            queue_capacity: 2,
2746            flush_interval: Duration::from_secs(3600),
2747            flush_size_threshold: usize::MAX,
2748        };
2749        let mut coordinator = WriteCoordinator::new(
2750            config,
2751            vec!["default".to_string()],
2752            TestContext::default(),
2753            flusher.initial_snapshot().await,
2754            flusher.clone(),
2755        );
2756        let handle = coordinator.handle("default");
2757
2758        // fill the queue before starting
2759        let _ = handle
2760            .try_write(TestWrite {
2761                key: "a".into(),
2762                value: 1,
2763                size: 10,
2764            })
2765            .await;
2766        let _ = handle
2767            .try_write(TestWrite {
2768                key: "b".into(),
2769                value: 2,
2770                size: 10,
2771            })
2772            .await;
2773
2774        // when - start coordinator (begins draining) and write with generous timeout
2775        coordinator.start();
2776        let result = handle
2777            .write_timeout(
2778                TestWrite {
2779                    key: "c".into(),
2780                    value: 3,
2781                    size: 10,
2782                },
2783                Duration::from_secs(5),
2784            )
2785            .await;
2786
2787        // then
2788        assert!(result.is_ok());
2789
2790        // cleanup
2791        coordinator.stop().await;
2792    }
2793
2794    #[tokio::test]
2795    async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2796        // given
2797        let flusher = TestFlusher::default();
2798        let mut coordinator = WriteCoordinator::new(
2799            test_config(),
2800            vec!["default".to_string()],
2801            TestContext::default(),
2802            flusher.initial_snapshot().await,
2803            flusher.clone(),
2804        );
2805        let handle = coordinator.handle("default");
2806        coordinator.start();
2807
2808        // when
2809        coordinator.stop().await;
2810        let result = handle
2811            .write_timeout(
2812                TestWrite {
2813                    key: "a".into(),
2814                    value: 1,
2815                    size: 10,
2816                },
2817                Duration::from_secs(1),
2818            )
2819            .await;
2820
2821        // then
2822        assert!(matches!(result, Err(WriteError::Shutdown)));
2823    }
2824
2825    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2826    async fn should_pause_and_resume_write_channel() {
2827        // given
2828        let flusher = TestFlusher::default();
2829        let mut config = test_config();
2830        config.flush_size_threshold = usize::MAX;
2831        config.flush_interval = Duration::from_hours(24);
2832        let mut coordinator = WriteCoordinator::new(
2833            config,
2834            vec!["a", "b"],
2835            TestContext::default(),
2836            flusher.initial_snapshot().await,
2837            flusher.clone(),
2838        );
2839        let handle_a = coordinator.handle("a");
2840        let handle_b = coordinator.handle("b");
2841        let pause_handle = coordinator.pause_handle("a");
2842
2843        // pause before starting the coordinator. otherwise
2844        // there's a race condition where the PausableReceiver
2845        // makes it past pause_rx.wait_for(|v| !*v).await; and
2846        // waits for recv before we pause the handle - we could
2847        // change the behavior of the PausableReceiver to first
2848        // wait on recv and then check pause before returning
2849        // but that feels weird and is not necessary for the use
2850        // cases we have in mind
2851        pause_handle.pause();
2852        coordinator.start();
2853
2854        // when
2855        let mut result_a = handle_a
2856            .try_write(TestWrite {
2857                key: "a".into(),
2858                value: 1,
2859                size: 1,
2860            })
2861            .await
2862            .unwrap();
2863        for i in 0..1000 {
2864            handle_b
2865                .try_write(TestWrite {
2866                    key: format!("b{}", i),
2867                    value: i,
2868                    size: 1,
2869                })
2870                .await
2871                .unwrap()
2872                .wait(Durability::Applied)
2873                .await
2874                .unwrap();
2875        }
2876
2877        // then
2878        // make sure the data only contains keys from b (so a was never processed)
2879        let data = coordinator.view().current.data.lock().unwrap().clone();
2880        let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2881        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2882        pause_handle.unpause();
2883        // after resuming, wait for the data to include keys from a
2884        result_a.wait(Durability::Applied).await.unwrap();
2885        let data = coordinator.view().current.data.lock().unwrap().clone();
2886        expected.insert("a".into());
2887        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2888    }
2889}