Skip to main content

common/coordinator/
mod.rs

1#![allow(unused)]
2
3mod error;
4mod handle;
5pub mod subscriber;
6mod traits;
7
8use std::collections::HashMap;
9use std::ops::Range;
10use std::ops::{Deref, DerefMut};
11
12pub use error::{WriteError, WriteResult};
13use futures::stream::{self, SelectAll, StreamExt};
14pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
15pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
16pub use traits::{Delta, Durability, EpochStamped, Flusher};
17
18/// Event sent from the write coordinator task to the flush task.
19enum FlushEvent<D: Delta> {
20    /// Flush a frozen delta to storage.
21    FlushDelta { frozen: EpochStamped<D::Frozen> },
22    /// Ensure storage durability (e.g. call storage.flush()).
23    FlushStorage,
24}
25
26// Internal use only
27use crate::StorageRead;
28use crate::storage::StorageSnapshot;
29use async_trait::async_trait;
30pub use handle::EpochWatcher;
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::{broadcast, mpsc, oneshot, watch};
34use tokio::time::{Instant, Interval, interval_at};
35use tokio_util::sync::CancellationToken;
36
37/// Configuration for the write coordinator.
38#[derive(Debug, Clone)]
39pub struct WriteCoordinatorConfig {
40    /// Maximum number of pending writes in the queue.
41    pub queue_capacity: usize,
42    /// Interval at which to trigger automatic flushes.
43    pub flush_interval: Duration,
44    /// Delta size threshold at which to trigger a flush.
45    pub flush_size_threshold: usize,
46}
47
48impl Default for WriteCoordinatorConfig {
49    fn default() -> Self {
50        Self {
51            queue_capacity: 10_000,
52            flush_interval: Duration::from_secs(10),
53            flush_size_threshold: 64 * 1024 * 1024, // 64 MB
54        }
55    }
56}
57
58pub(crate) enum WriteCommand<D: Delta> {
59    Write {
60        write: D::Write,
61        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
62    },
63    Flush {
64        epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
65        flush_storage: bool,
66    },
67}
68
69/// The write coordinator manages write ordering, batching, and durability.
70///
71/// It accepts writes through `WriteCoordinatorHandle`, applies them to a `Delta`,
72/// and coordinates flushing through a `Flusher`.
73pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
74    handles: HashMap<String, WriteCoordinatorHandle<D>>,
75    pause_handles: HashMap<String, PauseHandle>,
76    stop_tok: CancellationToken,
77    tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
78    write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
79    view: Arc<BroadcastedView<D>>,
80}
81
82impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
83    pub fn new(
84        config: WriteCoordinatorConfig,
85        channels: Vec<impl ToString>,
86        initial_context: D::Context,
87        initial_snapshot: Arc<dyn StorageSnapshot>,
88        flusher: F,
89    ) -> WriteCoordinator<D, F> {
90        let (watermarks, watcher) = EpochWatermarks::new();
91        let watermarks = Arc::new(watermarks);
92
93        // Create a write channel per named input
94        let mut write_rxs = Vec::with_capacity(channels.len());
95        let mut write_txs = HashMap::new();
96        let mut pause_handles = HashMap::new();
97        for name in &channels {
98            let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
99            write_rxs.push(write_rx);
100            write_txs.insert(name.to_string(), write_tx);
101            pause_handles.insert(name.to_string(), pause_hdl);
102        }
103
104        // this is the channel that sends FlushEvents to be flushed
105        // by a background task so that the process of converting deltas
106        // to storage operations is non-blocking. for now, we apply no
107        // backpressure on this channel, so writes will block if more than
108        // one flush is pending
109        let (flush_tx, flush_rx) = mpsc::channel(2);
110
111        let flush_stop_tok = CancellationToken::new();
112        let stop_tok = CancellationToken::new();
113        let write_task = WriteCoordinatorTask::new(
114            config,
115            initial_context,
116            initial_snapshot,
117            write_rxs,
118            flush_tx,
119            watermarks.clone(),
120            stop_tok.clone(),
121            flush_stop_tok.clone(),
122        );
123
124        let view = write_task.view.clone();
125
126        let mut handles = HashMap::new();
127        for name in channels {
128            let write_tx = write_txs.remove(&name.to_string()).expect("unreachable");
129            handles.insert(
130                name.to_string(),
131                WriteCoordinatorHandle::new(write_tx, watcher.clone(), view.clone()),
132            );
133        }
134
135        let flush_task = FlushTask {
136            flusher,
137            stop_tok: flush_stop_tok,
138            flush_rx,
139            watermarks: watermarks.clone(),
140            view: view.clone(),
141            last_flushed_epoch: 0,
142        };
143
144        Self {
145            handles,
146            pause_handles,
147            tasks: Some((write_task, flush_task)),
148            write_task_jh: None,
149            stop_tok,
150            view,
151        }
152    }
153
154    pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
155        self.handles
156            .get(name)
157            .expect("unknown channel name")
158            .clone()
159    }
160
161    pub fn pause_handle(&self, name: &str) -> PauseHandle {
162        self.pause_handles
163            .get(name)
164            .expect("unknown channel name")
165            .clone()
166    }
167
168    pub fn start(&mut self) {
169        let Some((write_task, flush_task)) = self.tasks.take() else {
170            // already started
171            return;
172        };
173        let flush_task_jh = flush_task.run();
174        let write_task_jh = write_task.run(flush_task_jh);
175        self.write_task_jh = Some(write_task_jh);
176    }
177
178    pub async fn stop(mut self) -> Result<(), String> {
179        let Some(write_task_jh) = self.write_task_jh.take() else {
180            return Ok(());
181        };
182        self.stop_tok.cancel();
183        write_task_jh.await.map_err(|e| e.to_string())?
184    }
185
186    pub fn view(&self) -> Arc<View<D>> {
187        self.view.current()
188    }
189
190    pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
191        let (view_rx, initial_view) = self.view.subscribe();
192        ViewSubscriber::new(view_rx, initial_view)
193    }
194}
195
196struct WriteCoordinatorTask<D: Delta> {
197    config: WriteCoordinatorConfig,
198    delta: CurrentDelta<D>,
199    flush_tx: mpsc::Sender<FlushEvent<D>>,
200    write_rxs: Vec<PausableReceiver<D>>,
201    watermarks: Arc<EpochWatermarks>,
202    view: Arc<BroadcastedView<D>>,
203    epoch: u64,
204    delta_start_epoch: u64,
205    flush_interval: Interval,
206    stop_tok: CancellationToken,
207    flush_stop_tok: CancellationToken,
208}
209
210impl<D: Delta> WriteCoordinatorTask<D> {
211    /// Create a new write coordinator with the given flusher.
212    ///
213    /// This is useful for testing with mock flushers.
214    #[allow(clippy::too_many_arguments)]
215    pub fn new(
216        config: WriteCoordinatorConfig,
217        initial_context: D::Context,
218        initial_snapshot: Arc<dyn StorageSnapshot>,
219        write_rxs: Vec<PausableReceiver<D>>,
220        flush_tx: mpsc::Sender<FlushEvent<D>>,
221        watermarks: Arc<EpochWatermarks>,
222        stop_tok: CancellationToken,
223        flush_stop_tok: CancellationToken,
224    ) -> Self {
225        let delta = D::init(initial_context);
226
227        let initial_view = View {
228            current: delta.reader(),
229            frozen: vec![],
230            snapshot: initial_snapshot,
231            last_written_delta: None,
232        };
233        let initial_view = Arc::new(BroadcastedView::new(initial_view));
234
235        let flush_interval = interval_at(
236            Instant::now() + config.flush_interval,
237            config.flush_interval,
238        );
239        Self {
240            config,
241            delta: CurrentDelta::new(delta),
242            write_rxs,
243            flush_tx,
244            watermarks,
245            view: initial_view,
246            // Epochs start at 1 because watch channels initialize to 0 (meaning "nothing
247            // processed yet"). If the first write had epoch 0, wait() would return
248            // immediately since the condition `watermark < epoch` would be `0 < 0` = false.
249            epoch: 1,
250            delta_start_epoch: 1,
251            flush_interval,
252            stop_tok,
253            flush_stop_tok,
254        }
255    }
256
257    /// Run the coordinator event loop.
258    pub fn run(
259        mut self,
260        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
261    ) -> tokio::task::JoinHandle<Result<(), String>> {
262        tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
263    }
264
265    async fn run_coordinator(
266        mut self,
267        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
268    ) -> Result<(), String> {
269        // Reset the interval to start fresh from when run() is called
270        self.flush_interval.reset();
271
272        // Merge all write receivers into a single stream
273        let mut write_stream: SelectAll<_> = SelectAll::new();
274        for rx in self.write_rxs.drain(..) {
275            write_stream.push(
276                stream::unfold(
277                    rx,
278                    |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
279                )
280                .boxed(),
281            );
282        }
283
284        loop {
285            tokio::select! {
286                cmd = write_stream.next() => {
287                    match cmd {
288                        Some(WriteCommand::Write { write, result_tx }) => {
289                            self.handle_write(write, result_tx).await?;
290                        }
291                        Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
292                            // Send back the epoch of the last processed write
293                            let _ = epoch_tx.send(Ok(handle::WriteApplied {
294                                epoch: self.epoch.saturating_sub(1),
295                                result: (),
296                            }));
297                            self.handle_flush(flush_storage).await;
298                        }
299                        None => {
300                            // All write channels closed
301                            break;
302                        }
303                    }
304                }
305
306                _ = self.flush_interval.tick() => {
307                    self.handle_flush(false).await;
308                }
309
310                _ = self.stop_tok.cancelled() => {
311                    break;
312                }
313            }
314        }
315
316        // Flush any remaining pending writes before shutdown
317        self.handle_flush(false).await;
318
319        // Signal the flush task to stop
320        self.flush_stop_tok.cancel();
321        // Wait for the flush task to complete and propagate any errors
322        flush_task_jh
323            .await
324            .map_err(|e| format!("flush task panicked: {}", e))?
325            .map_err(|e| format!("flush task error: {}", e))
326    }
327
328    async fn handle_write(
329        &mut self,
330        write: D::Write,
331        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
332    ) -> Result<(), String> {
333        let write_epoch = self.epoch;
334        self.epoch += 1;
335
336        let result = self.delta.apply(write);
337        // Ignore error if receiver was dropped (fire-and-forget write)
338        let _ = result_tx.send(
339            result
340                .map(|apply_result| handle::WriteApplied {
341                    epoch: write_epoch,
342                    result: apply_result,
343                })
344                .map_err(|e| handle::WriteFailed {
345                    epoch: write_epoch,
346                    error: e,
347                }),
348        );
349
350        // Ignore error if no watchers are listening - this is non-fatal
351        self.watermarks.update_applied(write_epoch);
352
353        if self.delta.estimate_size() >= self.config.flush_size_threshold {
354            self.handle_flush(false).await;
355        }
356
357        Ok(())
358    }
359
360    async fn handle_flush(&mut self, flush_storage: bool) {
361        self.flush_if_delta_has_writes().await;
362        if flush_storage {
363            let _ = self.flush_tx.send(FlushEvent::FlushStorage).await;
364        }
365    }
366
367    async fn flush_if_delta_has_writes(&mut self) {
368        if self.epoch == self.delta_start_epoch {
369            return;
370        }
371
372        self.flush_interval.reset();
373
374        let epoch_range = self.delta_start_epoch..self.epoch;
375        self.delta_start_epoch = self.epoch;
376        let (frozen, frozen_reader) = self.delta.freeze_and_init();
377        let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
378        let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
379        let reader = self.delta.reader();
380        // update the view before sending the flush msg to ensure the flusher sees
381        // the frozen reader when updating the view post-flush
382        self.view.update_delta_frozen(stamped_frozen_reader, reader);
383        // this is the blocking section of the flush, new writes will not be accepted
384        // until the event is sent to the FlushTask
385        let _ = self
386            .flush_tx
387            .send(FlushEvent::FlushDelta {
388                frozen: stamped_frozen,
389            })
390            .await;
391    }
392}
393
394struct FlushTask<D: Delta, F: Flusher<D>> {
395    flusher: F,
396    stop_tok: CancellationToken,
397    flush_rx: mpsc::Receiver<FlushEvent<D>>,
398    watermarks: Arc<EpochWatermarks>,
399    view: Arc<BroadcastedView<D>>,
400    last_flushed_epoch: u64,
401}
402
403impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
404    fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
405        tokio::spawn(async move {
406            loop {
407                tokio::select! {
408                    event = self.flush_rx.recv() => {
409                        let Some(event) = event else {
410                            break;
411                        };
412                        self.handle_event(event).await?;
413                    }
414                    _ = self.stop_tok.cancelled() => {
415                        break;
416                    }
417                }
418            }
419            // drain all remaining flush events
420            while let Ok(event) = self.flush_rx.try_recv() {
421                self.handle_event(event).await;
422            }
423            Ok(())
424        })
425    }
426
427    async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
428        match event {
429            FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
430            FlushEvent::FlushStorage => {
431                self.flusher
432                    .flush_storage()
433                    .await
434                    .map_err(|e| WriteError::FlushError(e.to_string()))?;
435                self.watermarks.update_durable(self.last_flushed_epoch);
436                Ok(())
437            }
438        }
439    }
440
441    async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
442        let delta = frozen.val;
443        let epoch_range = frozen.epoch_range;
444        let snapshot = self
445            .flusher
446            .flush_delta(delta, &epoch_range)
447            .await
448            .map_err(|e| WriteError::FlushError(e.to_string()))?;
449        self.last_flushed_epoch = epoch_range.end - 1;
450        // Publish the refreshed view first so any waiter awoken by the
451        // watermark notification below observes a view that already includes
452        // this flush. Reversing the order races on multi-thread runtimes:
453        // a waiter on `Written` can schedule on another worker and read
454        // `view.snapshot` before this task completes the view update.
455        self.view.update_flush_finished(snapshot, epoch_range);
456        self.watermarks.update_written(self.last_flushed_epoch);
457        Ok(())
458    }
459}
460
461struct CurrentDelta<D: Delta> {
462    delta: Option<D>,
463}
464
465impl<D: Delta> Deref for CurrentDelta<D> {
466    type Target = D;
467
468    fn deref(&self) -> &Self::Target {
469        match &self.delta {
470            Some(d) => d,
471            None => panic!("current delta not initialized"),
472        }
473    }
474}
475
476impl<D: Delta> DerefMut for CurrentDelta<D> {
477    fn deref_mut(&mut self) -> &mut Self::Target {
478        match &mut self.delta {
479            Some(d) => d,
480            None => panic!("current delta not initialized"),
481        }
482    }
483}
484
485impl<D: Delta> CurrentDelta<D> {
486    fn new(delta: D) -> Self {
487        Self { delta: Some(delta) }
488    }
489
490    fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
491        let Some(delta) = self.delta.take() else {
492            panic!("delta not initialized");
493        };
494        let (frozen, frozen_reader, context) = delta.freeze();
495        let new_delta = D::init(context);
496        self.delta = Some(new_delta);
497        (frozen, frozen_reader)
498    }
499}
500
501pub struct EpochWatermarks {
502    applied_tx: tokio::sync::watch::Sender<u64>,
503    written_tx: tokio::sync::watch::Sender<u64>,
504    durable_tx: tokio::sync::watch::Sender<u64>,
505}
506
507impl EpochWatermarks {
508    pub fn new() -> (Self, EpochWatcher) {
509        let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
510        let (written_tx, written_rx) = tokio::sync::watch::channel(0);
511        let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
512        let watcher = EpochWatcher {
513            applied_rx,
514            written_rx,
515            durable_rx,
516        };
517        let watermarks = EpochWatermarks {
518            applied_tx,
519            written_tx,
520            durable_tx,
521        };
522        (watermarks, watcher)
523    }
524
525    pub fn update_applied(&self, epoch: u64) {
526        let _ = self.applied_tx.send(epoch);
527    }
528
529    pub fn update_written(&self, epoch: u64) {
530        let _ = self.written_tx.send(epoch);
531    }
532
533    pub fn update_durable(&self, epoch: u64) {
534        let _ = self.durable_tx.send(epoch);
535    }
536}
537
538pub(crate) struct BroadcastedView<D: Delta> {
539    inner: Mutex<BroadcastedViewInner<D>>,
540}
541
542impl<D: Delta> BroadcastedView<D> {
543    fn new(initial_view: View<D>) -> Self {
544        let (view_tx, _) = broadcast::channel(16);
545        Self {
546            inner: Mutex::new(BroadcastedViewInner {
547                view: Arc::new(initial_view),
548                view_tx,
549            }),
550        }
551    }
552
553    fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
554        self.inner
555            .lock()
556            .expect("lock poisoned")
557            .update_flush_finished(snapshot, epoch_range);
558    }
559
560    fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
561        self.inner
562            .lock()
563            .expect("lock poisoned")
564            .update_delta_frozen(frozen, reader);
565    }
566
567    fn current(&self) -> Arc<View<D>> {
568        self.inner.lock().expect("lock poisoned").current()
569    }
570
571    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
572        self.inner.lock().expect("lock poisoned").subscribe()
573    }
574}
575
576struct BroadcastedViewInner<D: Delta> {
577    view: Arc<View<D>>,
578    view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
579}
580
581impl<D: Delta> BroadcastedViewInner<D> {
582    fn update_flush_finished(
583        &mut self,
584        snapshot: Arc<dyn StorageSnapshot>,
585        epoch_range: Range<u64>,
586    ) {
587        let mut new_frozen = self.view.frozen.clone();
588        let last = new_frozen
589            .pop()
590            .expect("frozen should not be empty when flush completes");
591        assert_eq!(last.epoch_range, epoch_range);
592        self.view = Arc::new(View {
593            current: self.view.current.clone(),
594            frozen: new_frozen,
595            snapshot,
596            last_written_delta: Some(last),
597        });
598        self.view_tx.send(self.view.clone());
599    }
600
601    fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
602        // Update read state: add frozen delta to front, update current reader
603        let mut new_frozen = vec![frozen];
604        new_frozen.extend(self.view.frozen.iter().cloned());
605        self.view = Arc::new(View {
606            current: reader,
607            frozen: new_frozen,
608            snapshot: self.view.snapshot.clone(),
609            last_written_delta: self.view.last_written_delta.clone(),
610        });
611        self.view_tx.send(self.view.clone());
612    }
613
614    fn current(&self) -> Arc<View<D>> {
615        self.view.clone()
616    }
617
618    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
619        (self.view_tx.subscribe(), self.view.clone())
620    }
621}
622
623struct PausableReceiver<D: Delta> {
624    pause_rx: Option<watch::Receiver<bool>>,
625    rx: mpsc::Receiver<WriteCommand<D>>,
626}
627
628impl<D: Delta> PausableReceiver<D> {
629    async fn recv(&mut self) -> Option<WriteCommand<D>> {
630        if let Some(pause_rx) = self.pause_rx.as_mut() {
631            pause_rx.wait_for(|v| !*v).await;
632        }
633        self.rx.recv().await
634    }
635}
636
637/// A handle that can be used to pause/unpause the coordinator's consumption from a write queue
638#[derive(Clone)]
639pub struct PauseHandle {
640    pause_tx: tokio::sync::watch::Sender<bool>,
641}
642
643impl PauseHandle {
644    pub fn pause(&self) {
645        self.pause_tx.send_replace(true);
646    }
647
648    pub fn unpause(&self) {
649        self.pause_tx.send_replace(false);
650    }
651}
652
653fn pausable_channel<D: Delta>(
654    capacity: usize,
655) -> (
656    mpsc::Sender<WriteCommand<D>>,
657    PausableReceiver<D>,
658    PauseHandle,
659) {
660    let (pause_tx, pause_rx) = watch::channel(false);
661    let (tx, rx) = mpsc::channel(capacity);
662    (
663        tx,
664        PausableReceiver {
665            pause_rx: Some(pause_rx),
666            rx,
667        },
668        PauseHandle { pause_tx },
669    )
670}
671
672#[cfg(test)]
673mod tests {
674    use super::*;
675    use crate::BytesRange;
676    use crate::coordinator::Durability;
677    use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
678    use crate::storage::{PutRecordOp, Record, StorageSnapshot};
679    use crate::{Storage, StorageRead};
680    use async_trait::async_trait;
681    use bytes::Bytes;
682    use std::collections::{HashMap, HashSet};
683    use std::ops::Range;
684    use std::sync::Mutex;
685    // ============================================================================
686    // Test Infrastructure
687    // ============================================================================
688
689    #[derive(Clone, Debug)]
690    struct TestWrite {
691        key: String,
692        value: u64,
693        size: usize,
694    }
695
696    /// Context carries state that must persist across deltas (like sequence allocation)
697    #[derive(Clone, Debug, Default)]
698    struct TestContext {
699        next_seq: u64,
700        error: Option<String>,
701    }
702
703    /// A shared reader that sees writes as they are applied to the delta.
704    #[derive(Clone, Debug, Default)]
705    struct TestDeltaReader {
706        data: Arc<Mutex<HashMap<String, u64>>>,
707    }
708
709    impl TestDeltaReader {
710        fn get(&self, key: &str) -> Option<u64> {
711            self.data.lock().unwrap().get(key).copied()
712        }
713    }
714
715    /// Delta accumulates writes with sequence numbers.
716    /// Stores the context directly and updates it in place.
717    #[derive(Debug)]
718    struct TestDelta {
719        context: TestContext,
720        writes: HashMap<String, (u64, u64)>,
721        key_values: Arc<Mutex<HashMap<String, u64>>>,
722        total_size: usize,
723    }
724
725    #[derive(Clone, Debug)]
726    struct FrozenTestDelta {
727        writes: HashMap<String, (u64, u64)>,
728    }
729
730    impl std::fmt::Debug for View<TestDelta> {
731        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
732            f.write_str("View<TestDelta>")
733        }
734    }
735
736    impl Delta for TestDelta {
737        type Context = TestContext;
738        type Write = TestWrite;
739        type DeltaView = TestDeltaReader;
740        type Frozen = FrozenTestDelta;
741        type FrozenView = Arc<HashMap<String, u64>>;
742        type ApplyResult = ();
743
744        fn init(context: Self::Context) -> Self {
745            Self {
746                context,
747                writes: HashMap::default(),
748                key_values: Arc::new(Mutex::new(HashMap::default())),
749                total_size: 0,
750            }
751        }
752
753        fn apply(&mut self, write: Self::Write) -> Result<(), String> {
754            if let Some(error) = &self.context.error {
755                return Err(error.clone());
756            }
757
758            let seq = self.context.next_seq;
759            self.context.next_seq += 1;
760
761            self.writes.insert(write.key.clone(), (seq, write.value));
762            self.total_size += write.size;
763            self.key_values
764                .lock()
765                .unwrap()
766                .insert(write.key, write.value);
767            Ok(())
768        }
769
770        fn estimate_size(&self) -> usize {
771            self.total_size
772        }
773
774        fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
775            let frozen = FrozenTestDelta {
776                writes: self.writes,
777            };
778            let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
779            (frozen, frozen_view, self.context)
780        }
781
782        fn reader(&self) -> Self::DeltaView {
783            TestDeltaReader {
784                data: self.key_values.clone(),
785            }
786        }
787    }
788
789    /// Shared state for TestFlusher - allows test to inspect and control behavior
790    #[derive(Default)]
791    struct TestFlusherState {
792        flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
793        /// Signals when a flush starts (before blocking)
794        flush_started_tx: Option<oneshot::Sender<()>>,
795        /// Blocks flush until signaled
796        unblock_rx: Option<mpsc::Receiver<()>>,
797    }
798
799    #[derive(Clone)]
800    struct TestFlusher {
801        state: Arc<Mutex<TestFlusherState>>,
802        storage: Arc<InMemoryStorage>,
803    }
804
805    impl Default for TestFlusher {
806        fn default() -> Self {
807            Self {
808                state: Arc::new(Mutex::new(TestFlusherState::default())),
809                storage: Arc::new(InMemoryStorage::new()),
810            }
811        }
812    }
813
814    impl TestFlusher {
815        /// Create a flusher that blocks until signaled, with a notification when flush starts.
816        /// Returns (flusher, flush_started_rx, unblock_tx).
817        fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
818            let (started_tx, started_rx) = oneshot::channel();
819            let (unblock_tx, unblock_rx) = mpsc::channel(1);
820            let flusher = Self {
821                state: Arc::new(Mutex::new(TestFlusherState {
822                    flushed_events: Vec::new(),
823                    flush_started_tx: Some(started_tx),
824                    unblock_rx: Some(unblock_rx),
825                })),
826                storage: Arc::new(InMemoryStorage::new()),
827            };
828            (flusher, started_rx, unblock_tx)
829        }
830
831        fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
832            self.state.lock().unwrap().flushed_events.clone()
833        }
834
835        async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
836            self.storage.snapshot().await.unwrap()
837        }
838    }
839
840    #[async_trait]
841    impl Flusher<TestDelta> for TestFlusher {
842        async fn flush_delta(
843            &mut self,
844            frozen: FrozenTestDelta,
845            epoch_range: &Range<u64>,
846        ) -> Result<Arc<dyn StorageSnapshot>, String> {
847            // Signal that flush has started
848            let flush_started_tx = {
849                let mut state = self.state.lock().unwrap();
850                state.flush_started_tx.take()
851            };
852            if let Some(tx) = flush_started_tx {
853                let _ = tx.send(());
854            }
855
856            // Block if test wants to control timing
857            let unblock_rx = {
858                let mut state = self.state.lock().unwrap();
859                state.unblock_rx.take()
860            };
861            if let Some(mut rx) = unblock_rx {
862                rx.recv().await;
863            }
864
865            // Write records to storage
866            let records: Vec<PutRecordOp> = frozen
867                .writes
868                .iter()
869                .map(|(key, (seq, value))| {
870                    let mut buf = Vec::with_capacity(16);
871                    buf.extend_from_slice(&seq.to_le_bytes());
872                    buf.extend_from_slice(&value.to_le_bytes());
873                    Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
874                })
875                .collect();
876            self.storage
877                .put(records)
878                .await
879                .map_err(|e| format!("{}", e))?;
880
881            // Record the flush
882            {
883                let mut state = self.state.lock().unwrap();
884                state
885                    .flushed_events
886                    .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
887            }
888
889            self.storage.snapshot().await.map_err(|e| format!("{}", e))
890        }
891
892        async fn flush_storage(&self) -> Result<(), String> {
893            // Signal that flush has started
894            let flush_started_tx = {
895                let mut state = self.state.lock().unwrap();
896                state.flush_started_tx.take()
897            };
898            if let Some(tx) = flush_started_tx {
899                let _ = tx.send(());
900            }
901
902            // Block if test wants to control timing
903            let unblock_rx = {
904                let mut state = self.state.lock().unwrap();
905                state.unblock_rx.take()
906            };
907            if let Some(mut rx) = unblock_rx {
908                rx.recv().await;
909            }
910
911            Ok(())
912        }
913    }
914
915    fn test_config() -> WriteCoordinatorConfig {
916        WriteCoordinatorConfig {
917            queue_capacity: 100,
918            flush_interval: Duration::from_secs(3600), // Long interval to avoid timer flushes
919            flush_size_threshold: usize::MAX,
920        }
921    }
922
923    async fn assert_snapshot_has_rows(
924        snapshot: &Arc<dyn StorageSnapshot>,
925        expected: &[(&str, u64, u64)],
926    ) {
927        let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
928        assert_eq!(
929            records.len(),
930            expected.len(),
931            "expected {} rows but snapshot has {}",
932            expected.len(),
933            records.len()
934        );
935        let mut actual: Vec<(String, u64, u64)> = records
936            .iter()
937            .map(|r| {
938                let key = String::from_utf8(r.key.to_vec()).unwrap();
939                let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
940                let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
941                (key, seq, value)
942            })
943            .collect();
944        actual.sort_by(|a, b| a.0.cmp(&b.0));
945        let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
946        expected.sort_by(|a, b| a.0.cmp(b.0));
947        for (actual, expected) in actual.iter().zip(expected.iter()) {
948            assert_eq!(
949                actual.0, expected.0,
950                "key mismatch: got {:?}, expected {:?}",
951                actual.0, expected.0
952            );
953            assert_eq!(
954                actual.1, expected.1,
955                "seq mismatch for key {:?}: got {}, expected {}",
956                actual.0, actual.1, expected.1
957            );
958            assert_eq!(
959                actual.2, expected.2,
960                "value mismatch for key {:?}: got {}, expected {}",
961                actual.0, actual.2, expected.2
962            );
963        }
964    }
965
966    // ============================================================================
967    // Basic Write Flow Tests
968    // ============================================================================
969
970    #[tokio::test]
971    async fn should_assign_monotonic_epochs() {
972        // given
973        let flusher = TestFlusher::default();
974        let mut coordinator = WriteCoordinator::new(
975            test_config(),
976            vec!["default".to_string()],
977            TestContext::default(),
978            flusher.initial_snapshot().await,
979            flusher,
980        );
981        let handle = coordinator.handle("default");
982        coordinator.start();
983
984        // when
985        let write1 = handle
986            .try_write(TestWrite {
987                key: "a".into(),
988                value: 1,
989                size: 10,
990            })
991            .await
992            .unwrap();
993        let write2 = handle
994            .try_write(TestWrite {
995                key: "b".into(),
996                value: 2,
997                size: 10,
998            })
999            .await
1000            .unwrap();
1001        let write3 = handle
1002            .try_write(TestWrite {
1003                key: "c".into(),
1004                value: 3,
1005                size: 10,
1006            })
1007            .await
1008            .unwrap();
1009
1010        let epoch1 = write1.epoch().await.unwrap();
1011        let epoch2 = write2.epoch().await.unwrap();
1012        let epoch3 = write3.epoch().await.unwrap();
1013
1014        // then
1015        assert!(epoch1 < epoch2);
1016        assert!(epoch2 < epoch3);
1017
1018        // cleanup
1019        coordinator.stop().await;
1020    }
1021
1022    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1023    async fn should_apply_writes_in_order() {
1024        // given
1025        let flusher = TestFlusher::default();
1026        let mut coordinator = WriteCoordinator::new(
1027            test_config(),
1028            vec!["default".to_string()],
1029            TestContext::default(),
1030            flusher.initial_snapshot().await,
1031            flusher.clone(),
1032        );
1033        let handle = coordinator.handle("default");
1034        coordinator.start();
1035
1036        // when
1037        handle
1038            .try_write(TestWrite {
1039                key: "a".into(),
1040                value: 1,
1041                size: 10,
1042            })
1043            .await
1044            .unwrap();
1045        handle
1046            .try_write(TestWrite {
1047                key: "a".into(),
1048                value: 2,
1049                size: 10,
1050            })
1051            .await
1052            .unwrap();
1053        let mut last_write = handle
1054            .try_write(TestWrite {
1055                key: "a".into(),
1056                value: 3,
1057                size: 10,
1058            })
1059            .await
1060            .unwrap();
1061
1062        handle.flush(false).await.unwrap();
1063        // Wait for flush to complete via watermark
1064        last_write.wait(Durability::Written).await.unwrap();
1065
1066        // then
1067        let events = flusher.flushed_events();
1068        assert_eq!(events.len(), 1);
1069        let frozen_delta = &events[0];
1070        let delta = &frozen_delta.val;
1071        // Writing key "a" 3x overwrites; last write wins with seq=2 (0-indexed)
1072        let (seq, value) = delta.writes.get("a").unwrap();
1073        assert_eq!(*value, 3);
1074        assert_eq!(*seq, 2);
1075
1076        // cleanup
1077        coordinator.stop().await;
1078    }
1079
1080    #[tokio::test]
1081    async fn should_update_applied_watermark_after_each_write() {
1082        // given
1083        let flusher = TestFlusher::default();
1084        let mut coordinator = WriteCoordinator::new(
1085            test_config(),
1086            vec!["default".to_string()],
1087            TestContext::default(),
1088            flusher.initial_snapshot().await,
1089            flusher,
1090        );
1091        let handle = coordinator.handle("default");
1092        coordinator.start();
1093
1094        // when
1095        let mut write_handle = handle
1096            .try_write(TestWrite {
1097                key: "a".into(),
1098                value: 1,
1099                size: 10,
1100            })
1101            .await
1102            .unwrap();
1103
1104        // then - wait should succeed immediately after write is applied
1105        let result = write_handle.wait(Durability::Applied).await;
1106        assert!(result.is_ok());
1107
1108        // cleanup
1109        coordinator.stop().await;
1110    }
1111
1112    #[tokio::test]
1113    async fn should_propagate_apply_error_to_handle() {
1114        // given
1115        let flusher = TestFlusher::default();
1116        let context = TestContext {
1117            error: Some("apply error".to_string()),
1118            ..Default::default()
1119        };
1120        let mut coordinator = WriteCoordinator::new(
1121            test_config(),
1122            vec!["default".to_string()],
1123            context,
1124            flusher.initial_snapshot().await,
1125            flusher,
1126        );
1127        let handle = coordinator.handle("default");
1128        coordinator.start();
1129
1130        // when
1131        let write = handle
1132            .try_write(TestWrite {
1133                key: "a".into(),
1134                value: 1,
1135                size: 10,
1136            })
1137            .await
1138            .unwrap();
1139
1140        let result = write.epoch().await;
1141
1142        // then
1143        assert!(
1144            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1145        );
1146
1147        // cleanup
1148        coordinator.stop().await;
1149    }
1150
1151    // ============================================================================
1152    // Manual Flush Tests
1153    // ============================================================================
1154
1155    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1156    async fn should_flush_on_command() {
1157        // given
1158        let flusher = TestFlusher::default();
1159        let mut coordinator = WriteCoordinator::new(
1160            test_config(),
1161            vec!["default".to_string()],
1162            TestContext::default(),
1163            flusher.initial_snapshot().await,
1164            flusher.clone(),
1165        );
1166        let handle = coordinator.handle("default");
1167        coordinator.start();
1168
1169        // when
1170        let mut write = handle
1171            .try_write(TestWrite {
1172                key: "a".into(),
1173                value: 1,
1174                size: 10,
1175            })
1176            .await
1177            .unwrap();
1178        handle.flush(false).await.unwrap();
1179        write.wait(Durability::Written).await.unwrap();
1180
1181        // then
1182        assert_eq!(flusher.flushed_events().len(), 1);
1183
1184        // cleanup
1185        coordinator.stop().await;
1186    }
1187
1188    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1189    async fn should_wait_on_flush_handle() {
1190        // given
1191        let flusher = TestFlusher::default();
1192        let mut coordinator = WriteCoordinator::new(
1193            test_config(),
1194            vec!["default".to_string()],
1195            TestContext::default(),
1196            flusher.initial_snapshot().await,
1197            flusher.clone(),
1198        );
1199        let handle = coordinator.handle("default");
1200        coordinator.start();
1201
1202        // when
1203        handle
1204            .try_write(TestWrite {
1205                key: "a".into(),
1206                value: 1,
1207                size: 10,
1208            })
1209            .await
1210            .unwrap();
1211        let mut flush_handle = handle.flush(false).await.unwrap();
1212
1213        // then - can wait directly on the flush handle
1214        flush_handle.wait(Durability::Written).await.unwrap();
1215        assert_eq!(flusher.flushed_events().len(), 1);
1216
1217        // cleanup
1218        coordinator.stop().await;
1219    }
1220
1221    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1222    async fn should_return_correct_epoch_from_flush_handle() {
1223        // given
1224        let flusher = TestFlusher::default();
1225        let mut coordinator = WriteCoordinator::new(
1226            test_config(),
1227            vec!["default".to_string()],
1228            TestContext::default(),
1229            flusher.initial_snapshot().await,
1230            flusher,
1231        );
1232        let handle = coordinator.handle("default");
1233        coordinator.start();
1234
1235        // when
1236        let write1 = handle
1237            .try_write(TestWrite {
1238                key: "a".into(),
1239                value: 1,
1240                size: 10,
1241            })
1242            .await
1243            .unwrap();
1244        let write2 = handle
1245            .try_write(TestWrite {
1246                key: "b".into(),
1247                value: 2,
1248                size: 10,
1249            })
1250            .await
1251            .unwrap();
1252        let flush_handle = handle.flush(false).await.unwrap();
1253
1254        // then - flush handle epoch should be the last write's epoch
1255        let flush_epoch = flush_handle.epoch().await.unwrap();
1256        let write2_epoch = write2.epoch().await.unwrap();
1257        assert_eq!(flush_epoch, write2_epoch);
1258
1259        // cleanup
1260        coordinator.stop().await;
1261    }
1262
1263    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1264    async fn should_include_all_pending_writes_in_flush() {
1265        // given
1266        let flusher = TestFlusher::default();
1267        let mut coordinator = WriteCoordinator::new(
1268            test_config(),
1269            vec!["default".to_string()],
1270            TestContext::default(),
1271            flusher.initial_snapshot().await,
1272            flusher.clone(),
1273        );
1274        let handle = coordinator.handle("default");
1275        coordinator.start();
1276
1277        // when
1278        handle
1279            .try_write(TestWrite {
1280                key: "a".into(),
1281                value: 1,
1282                size: 10,
1283            })
1284            .await
1285            .unwrap();
1286        handle
1287            .try_write(TestWrite {
1288                key: "b".into(),
1289                value: 2,
1290                size: 10,
1291            })
1292            .await
1293            .unwrap();
1294        let mut last_write = handle
1295            .try_write(TestWrite {
1296                key: "c".into(),
1297                value: 3,
1298                size: 10,
1299            })
1300            .await
1301            .unwrap();
1302
1303        handle.flush(false).await.unwrap();
1304        last_write.wait(Durability::Written).await.unwrap();
1305
1306        // then
1307        let events = flusher.flushed_events();
1308        assert_eq!(events.len(), 1);
1309        let frozen_delta = &events[0];
1310        assert_eq!(frozen_delta.val.writes.len(), 3);
1311        let snapshot = flusher.storage.snapshot().await.unwrap();
1312        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1313
1314        // cleanup
1315        coordinator.stop().await;
1316    }
1317
1318    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1319    async fn should_skip_flush_when_no_new_writes() {
1320        // given
1321        let flusher = TestFlusher::default();
1322        let mut coordinator = WriteCoordinator::new(
1323            test_config(),
1324            vec!["default".to_string()],
1325            TestContext::default(),
1326            flusher.initial_snapshot().await,
1327            flusher.clone(),
1328        );
1329        let handle = coordinator.handle("default");
1330        coordinator.start();
1331
1332        // when
1333        let mut write = handle
1334            .try_write(TestWrite {
1335                key: "a".into(),
1336                value: 1,
1337                size: 10,
1338            })
1339            .await
1340            .unwrap();
1341        handle.flush(false).await.unwrap();
1342        write.wait(Durability::Written).await.unwrap();
1343
1344        // Second flush with no new writes
1345        handle.flush(false).await.unwrap();
1346
1347        // Synchronization: write and wait for applied to ensure the flush command
1348        // has been processed (commands are processed in order)
1349        let sync_write = handle
1350            .try_write(TestWrite {
1351                key: "sync".into(),
1352                value: 0,
1353                size: 1,
1354            })
1355            .await
1356            .unwrap();
1357        sync_write.epoch().await.unwrap();
1358
1359        // then - only one flush should have occurred (the second flush was a no-op)
1360        assert_eq!(flusher.flushed_events().len(), 1);
1361
1362        // cleanup
1363        coordinator.stop().await;
1364    }
1365
1366    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1367    async fn should_update_written_watermark_after_flush() {
1368        // given
1369        let flusher = TestFlusher::default();
1370        let mut coordinator = WriteCoordinator::new(
1371            test_config(),
1372            vec!["default".to_string()],
1373            TestContext::default(),
1374            flusher.initial_snapshot().await,
1375            flusher,
1376        );
1377        let handle = coordinator.handle("default");
1378        coordinator.start();
1379
1380        // when
1381        let mut write_handle = handle
1382            .try_write(TestWrite {
1383                key: "a".into(),
1384                value: 1,
1385                size: 10,
1386            })
1387            .await
1388            .unwrap();
1389
1390        handle.flush(false).await.unwrap();
1391
1392        // then - wait for Written should succeed after flush completes
1393        let result = write_handle.wait(Durability::Written).await;
1394        assert!(result.is_ok());
1395
1396        // cleanup
1397        coordinator.stop().await;
1398    }
1399
1400    // ============================================================================
1401    // Timer-Based Flush Tests
1402    // ============================================================================
1403
1404    #[tokio::test(start_paused = true)]
1405    async fn should_flush_on_flush_interval() {
1406        // given - create coordinator with short flush interval
1407        let flusher = TestFlusher::default();
1408        let config = WriteCoordinatorConfig {
1409            queue_capacity: 100,
1410            flush_interval: Duration::from_millis(100),
1411            flush_size_threshold: usize::MAX,
1412        };
1413        let mut coordinator = WriteCoordinator::new(
1414            config,
1415            vec!["default".to_string()],
1416            TestContext::default(),
1417            flusher.initial_snapshot().await,
1418            flusher.clone(),
1419        );
1420        let handle = coordinator.handle("default");
1421        coordinator.start();
1422
1423        // when - ensure coordinator task runs and then write something
1424        tokio::task::yield_now().await;
1425        let mut write = handle
1426            .try_write(TestWrite {
1427                key: "a".into(),
1428                value: 1,
1429                size: 10,
1430            })
1431            .await
1432            .unwrap();
1433        write.wait(Durability::Applied).await.unwrap();
1434
1435        // then - no flush should have happened yet (interval was reset in run())
1436        assert_eq!(flusher.flushed_events().len(), 0);
1437
1438        // when - advance time past the flush interval from when run() was called
1439        tokio::time::advance(Duration::from_millis(150)).await;
1440        tokio::task::yield_now().await;
1441
1442        // then - flush should have happened
1443        assert_eq!(flusher.flushed_events().len(), 1);
1444        let snapshot = flusher.storage.snapshot().await.unwrap();
1445        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1446
1447        // cleanup
1448        coordinator.stop().await;
1449    }
1450
1451    // ============================================================================
1452    // Size-Threshold Flush Tests
1453    // ============================================================================
1454
1455    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1456    async fn should_flush_when_size_threshold_exceeded() {
1457        // given
1458        let flusher = TestFlusher::default();
1459        let config = WriteCoordinatorConfig {
1460            queue_capacity: 100,
1461            flush_interval: Duration::from_secs(3600),
1462            flush_size_threshold: 100, // Low threshold for testing
1463        };
1464        let mut coordinator = WriteCoordinator::new(
1465            config,
1466            vec!["default".to_string()],
1467            TestContext::default(),
1468            flusher.initial_snapshot().await,
1469            flusher.clone(),
1470        );
1471        let handle = coordinator.handle("default");
1472        coordinator.start();
1473
1474        // when - write that exceeds threshold
1475        let mut write = handle
1476            .try_write(TestWrite {
1477                key: "a".into(),
1478                value: 1,
1479                size: 150,
1480            })
1481            .await
1482            .unwrap();
1483        write.wait(Durability::Written).await.unwrap();
1484
1485        // then
1486        assert_eq!(flusher.flushed_events().len(), 1);
1487
1488        // cleanup
1489        coordinator.stop().await;
1490    }
1491
1492    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1493    async fn should_accumulate_until_threshold() {
1494        // given
1495        let flusher = TestFlusher::default();
1496        let config = WriteCoordinatorConfig {
1497            queue_capacity: 100,
1498            flush_interval: Duration::from_secs(3600),
1499            flush_size_threshold: 100,
1500        };
1501        let mut coordinator = WriteCoordinator::new(
1502            config,
1503            vec!["default".to_string()],
1504            TestContext::default(),
1505            flusher.initial_snapshot().await,
1506            flusher.clone(),
1507        );
1508        let handle = coordinator.handle("default");
1509        coordinator.start();
1510
1511        // when - small writes that accumulate
1512        for i in 0..5 {
1513            let mut w = handle
1514                .try_write(TestWrite {
1515                    key: format!("key{}", i),
1516                    value: i,
1517                    size: 15,
1518                })
1519                .await
1520                .unwrap();
1521            w.wait(Durability::Applied).await.unwrap();
1522        }
1523
1524        // then - no flush yet (75 bytes < 100 threshold)
1525        assert_eq!(flusher.flushed_events().len(), 0);
1526
1527        // when - write that pushes over threshold
1528        let mut final_write = handle
1529            .try_write(TestWrite {
1530                key: "final".into(),
1531                value: 999,
1532                size: 30,
1533            })
1534            .await
1535            .unwrap();
1536        final_write.wait(Durability::Written).await.unwrap();
1537
1538        // then - should have flushed (105 bytes > 100 threshold)
1539        assert_eq!(flusher.flushed_events().len(), 1);
1540
1541        // cleanup
1542        coordinator.stop().await;
1543    }
1544
1545    // ============================================================================
1546    // Non-Blocking Flush (Concurrency) Tests
1547    // ============================================================================
1548
1549    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1550    async fn should_accept_writes_during_flush() {
1551        // given
1552        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1553        let mut coordinator = WriteCoordinator::new(
1554            test_config(),
1555            vec!["default".to_string()],
1556            TestContext::default(),
1557            flusher.initial_snapshot().await,
1558            flusher.clone(),
1559        );
1560        let handle = coordinator.handle("default");
1561        coordinator.start();
1562
1563        // when: trigger a flush and wait for it to start (proving it's in progress)
1564        let write1 = handle
1565            .try_write(TestWrite {
1566                key: "a".into(),
1567                value: 1,
1568                size: 10,
1569            })
1570            .await
1571            .unwrap();
1572        handle.flush(false).await.unwrap();
1573        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1574
1575        // then: writes during blocked flush still succeed
1576        let write2 = handle
1577            .try_write(TestWrite {
1578                key: "b".into(),
1579                value: 2,
1580                size: 10,
1581            })
1582            .await
1583            .unwrap();
1584        assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1585
1586        // cleanup
1587        unblock_tx.send(()).await.unwrap();
1588        coordinator.stop().await;
1589    }
1590
1591    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1592    async fn should_assign_new_epochs_during_flush() {
1593        // given
1594        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1595        let mut coordinator = WriteCoordinator::new(
1596            test_config(),
1597            vec!["default".to_string()],
1598            TestContext::default(),
1599            flusher.initial_snapshot().await,
1600            flusher.clone(),
1601        );
1602        let handle = coordinator.handle("default");
1603        coordinator.start();
1604
1605        // when: write, flush, then write more during blocked flush
1606        handle
1607            .try_write(TestWrite {
1608                key: "a".into(),
1609                value: 1,
1610                size: 10,
1611            })
1612            .await
1613            .unwrap();
1614        handle.flush(false).await.unwrap();
1615        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1616
1617        // Writes during blocked flush get new epochs
1618        let w1 = handle
1619            .try_write(TestWrite {
1620                key: "b".into(),
1621                value: 2,
1622                size: 10,
1623            })
1624            .await
1625            .unwrap();
1626        let w2 = handle
1627            .try_write(TestWrite {
1628                key: "c".into(),
1629                value: 3,
1630                size: 10,
1631            })
1632            .await
1633            .unwrap();
1634
1635        // then: epochs continue incrementing
1636        let e1 = w1.epoch().await.unwrap();
1637        let e2 = w2.epoch().await.unwrap();
1638        assert!(e1 < e2);
1639
1640        // cleanup
1641        unblock_tx.send(()).await.unwrap();
1642        coordinator.stop().await;
1643    }
1644
1645    // ============================================================================
1646    // Backpressure Tests
1647    // ============================================================================
1648
1649    #[tokio::test]
1650    async fn should_return_backpressure_when_queue_full() {
1651        // given
1652        let flusher = TestFlusher::default();
1653        let config = WriteCoordinatorConfig {
1654            queue_capacity: 2,
1655            flush_interval: Duration::from_secs(3600),
1656            flush_size_threshold: usize::MAX,
1657        };
1658        let mut coordinator = WriteCoordinator::new(
1659            config,
1660            vec!["default".to_string()],
1661            TestContext::default(),
1662            flusher.initial_snapshot().await,
1663            flusher.clone(),
1664        );
1665        let handle = coordinator.handle("default");
1666        // Don't start coordinator - queue will fill
1667
1668        // when - fill the queue
1669        let _ = handle
1670            .try_write(TestWrite {
1671                key: "a".into(),
1672                value: 1,
1673                size: 10,
1674            })
1675            .await;
1676        let _ = handle
1677            .try_write(TestWrite {
1678                key: "b".into(),
1679                value: 2,
1680                size: 10,
1681            })
1682            .await;
1683
1684        // Third write should fail with backpressure
1685        let result = handle
1686            .try_write(TestWrite {
1687                key: "c".into(),
1688                value: 3,
1689                size: 10,
1690            })
1691            .await;
1692
1693        // then
1694        assert!(matches!(result, Err(WriteError::Backpressure(_))));
1695    }
1696
1697    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1698    async fn should_accept_writes_after_queue_drains() {
1699        // given
1700        let flusher = TestFlusher::default();
1701        let config = WriteCoordinatorConfig {
1702            queue_capacity: 2,
1703            flush_interval: Duration::from_secs(3600),
1704            flush_size_threshold: usize::MAX,
1705        };
1706        let mut coordinator = WriteCoordinator::new(
1707            config,
1708            vec!["default".to_string()],
1709            TestContext::default(),
1710            flusher.initial_snapshot().await,
1711            flusher.clone(),
1712        );
1713        let handle = coordinator.handle("default");
1714
1715        // Fill queue without processing
1716        let _ = handle
1717            .try_write(TestWrite {
1718                key: "a".into(),
1719                value: 1,
1720                size: 10,
1721            })
1722            .await;
1723        let mut write_b = handle
1724            .try_write(TestWrite {
1725                key: "b".into(),
1726                value: 2,
1727                size: 10,
1728            })
1729            .await
1730            .unwrap();
1731
1732        // when - start coordinator to drain queue and wait for it to process writes
1733        coordinator.start();
1734        write_b.wait(Durability::Applied).await.unwrap();
1735
1736        // then - writes should succeed now
1737        let result = handle
1738            .try_write(TestWrite {
1739                key: "c".into(),
1740                value: 3,
1741                size: 10,
1742            })
1743            .await;
1744        assert!(result.is_ok());
1745
1746        // cleanup
1747        coordinator.stop().await;
1748    }
1749
1750    // ============================================================================
1751    // Shutdown Tests
1752    // ============================================================================
1753
1754    #[tokio::test]
1755    async fn should_shutdown_cleanly_when_stop_called() {
1756        // given
1757        let flusher = TestFlusher::default();
1758        let mut coordinator = WriteCoordinator::new(
1759            test_config(),
1760            vec!["default".to_string()],
1761            TestContext::default(),
1762            flusher.initial_snapshot().await,
1763            flusher.clone(),
1764        );
1765        let handle = coordinator.handle("default");
1766        coordinator.start();
1767
1768        // when
1769        let result = coordinator.stop().await;
1770
1771        // then - coordinator should return Ok
1772        assert!(result.is_ok());
1773    }
1774
1775    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1776    async fn should_flush_pending_writes_on_shutdown() {
1777        // given
1778        let flusher = TestFlusher::default();
1779        let config = WriteCoordinatorConfig {
1780            queue_capacity: 100,
1781            flush_interval: Duration::from_secs(3600), // Long interval - won't trigger
1782            flush_size_threshold: usize::MAX,          // High threshold - won't trigger
1783        };
1784        let mut coordinator = WriteCoordinator::new(
1785            config,
1786            vec!["default".to_string()],
1787            TestContext::default(),
1788            flusher.initial_snapshot().await,
1789            flusher.clone(),
1790        );
1791        let handle = coordinator.handle("default");
1792        coordinator.start();
1793
1794        // when - write without explicit flush, then shutdown
1795        let write = handle
1796            .try_write(TestWrite {
1797                key: "a".into(),
1798                value: 1,
1799                size: 10,
1800            })
1801            .await
1802            .unwrap();
1803        let epoch = write.epoch().await.unwrap();
1804
1805        // Drop handle to trigger shutdown
1806        coordinator.stop().await;
1807
1808        // then - pending writes should have been flushed
1809        let events = flusher.flushed_events();
1810        assert_eq!(events.len(), 1);
1811        let epoch_range = &events[0].epoch_range;
1812        assert!(epoch_range.contains(&epoch));
1813    }
1814
1815    #[tokio::test]
1816    async fn should_return_shutdown_error_after_coordinator_stops() {
1817        // given
1818        let flusher = TestFlusher::default();
1819        let mut coordinator = WriteCoordinator::new(
1820            test_config(),
1821            vec!["default".to_string()],
1822            TestContext::default(),
1823            flusher.initial_snapshot().await,
1824            flusher.clone(),
1825        );
1826        let handle = coordinator.handle("default");
1827        coordinator.start();
1828
1829        // Stop coordinator
1830        coordinator.stop().await;
1831
1832        // when
1833        let result = handle
1834            .try_write(TestWrite {
1835                key: "a".into(),
1836                value: 1,
1837                size: 10,
1838            })
1839            .await;
1840
1841        // then
1842        assert!(matches!(result, Err(WriteError::Shutdown)));
1843    }
1844
1845    // ============================================================================
1846    // Epoch Range Tracking Tests
1847    // ============================================================================
1848
1849    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1850    async fn should_track_epoch_range_in_flush_event() {
1851        // given
1852        let flusher = TestFlusher::default();
1853        let mut coordinator = WriteCoordinator::new(
1854            test_config(),
1855            vec!["default".to_string()],
1856            TestContext::default(),
1857            flusher.initial_snapshot().await,
1858            flusher.clone(),
1859        );
1860        let handle = coordinator.handle("default");
1861        coordinator.start();
1862
1863        // when
1864        handle
1865            .try_write(TestWrite {
1866                key: "a".into(),
1867                value: 1,
1868                size: 10,
1869            })
1870            .await
1871            .unwrap();
1872        handle
1873            .try_write(TestWrite {
1874                key: "b".into(),
1875                value: 2,
1876                size: 10,
1877            })
1878            .await
1879            .unwrap();
1880        let mut last_write = handle
1881            .try_write(TestWrite {
1882                key: "c".into(),
1883                value: 3,
1884                size: 10,
1885            })
1886            .await
1887            .unwrap();
1888
1889        handle.flush(false).await.unwrap();
1890        last_write.wait(Durability::Written).await.unwrap();
1891
1892        // then
1893        let events = flusher.flushed_events();
1894        assert_eq!(events.len(), 1);
1895        let epoch_range = &events[0].epoch_range;
1896        assert_eq!(epoch_range.start, 1);
1897        assert_eq!(epoch_range.end, 4); // exclusive: one past the last epoch (3)
1898
1899        // cleanup
1900        coordinator.stop().await;
1901    }
1902
1903    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1904    async fn should_have_contiguous_epoch_ranges() {
1905        // given
1906        let flusher = TestFlusher::default();
1907        let mut coordinator = WriteCoordinator::new(
1908            test_config(),
1909            vec!["default".to_string()],
1910            TestContext::default(),
1911            flusher.initial_snapshot().await,
1912            flusher.clone(),
1913        );
1914        let handle = coordinator.handle("default");
1915        coordinator.start();
1916
1917        // when - first batch
1918        handle
1919            .try_write(TestWrite {
1920                key: "a".into(),
1921                value: 1,
1922                size: 10,
1923            })
1924            .await
1925            .unwrap();
1926        let mut write2 = handle
1927            .try_write(TestWrite {
1928                key: "b".into(),
1929                value: 2,
1930                size: 10,
1931            })
1932            .await
1933            .unwrap();
1934        handle.flush(false).await.unwrap();
1935        write2.wait(Durability::Written).await.unwrap();
1936
1937        // when - second batch
1938        let mut write3 = handle
1939            .try_write(TestWrite {
1940                key: "c".into(),
1941                value: 3,
1942                size: 10,
1943            })
1944            .await
1945            .unwrap();
1946        handle.flush(false).await.unwrap();
1947        write3.wait(Durability::Written).await.unwrap();
1948
1949        // then
1950        let events = flusher.flushed_events();
1951        assert_eq!(events.len(), 2);
1952
1953        let range1 = &events[0].epoch_range;
1954        let range2 = &events[1].epoch_range;
1955
1956        // Ranges should be contiguous (end of first == start of second)
1957        assert_eq!(range1.end, range2.start);
1958        assert_eq!(range1, &(1..3));
1959        assert_eq!(range2, &(3..4));
1960
1961        // cleanup
1962        coordinator.stop().await;
1963    }
1964
1965    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1966    async fn should_include_exact_epochs_in_range() {
1967        // given
1968        let flusher = TestFlusher::default();
1969        let mut coordinator = WriteCoordinator::new(
1970            test_config(),
1971            vec!["default".to_string()],
1972            TestContext::default(),
1973            flusher.initial_snapshot().await,
1974            flusher.clone(),
1975        );
1976        let handle = coordinator.handle("default");
1977        coordinator.start();
1978
1979        // when - write and capture the assigned epochs
1980        let write1 = handle
1981            .try_write(TestWrite {
1982                key: "a".into(),
1983                value: 1,
1984                size: 10,
1985            })
1986            .await
1987            .unwrap();
1988        let epoch1 = write1.epoch().await.unwrap();
1989
1990        let mut write2 = handle
1991            .try_write(TestWrite {
1992                key: "b".into(),
1993                value: 2,
1994                size: 10,
1995            })
1996            .await
1997            .unwrap();
1998        let epoch2 = write2.epoch().await.unwrap();
1999
2000        handle.flush(false).await.unwrap();
2001        write2.wait(Durability::Written).await.unwrap();
2002
2003        // then - the epoch_range should contain exactly the epochs assigned to writes
2004        let events = flusher.flushed_events();
2005        assert_eq!(events.len(), 1);
2006        let epoch_range = &events[0].epoch_range;
2007
2008        // The range should start at the first write's epoch
2009        assert_eq!(epoch_range.start, epoch1);
2010        // The range end should be one past the last write's epoch (exclusive)
2011        assert_eq!(epoch_range.end, epoch2 + 1);
2012        // Both epochs should be contained in the range
2013        assert!(epoch_range.contains(&epoch1));
2014        assert!(epoch_range.contains(&epoch2));
2015
2016        // cleanup
2017        coordinator.stop().await;
2018    }
2019
2020    // ============================================================================
2021    // State Carryover (ID Allocation) Tests
2022    // ============================================================================
2023
2024    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2025    async fn should_preserve_context_across_flushes() {
2026        // given
2027        let flusher = TestFlusher::default();
2028        let mut coordinator = WriteCoordinator::new(
2029            test_config(),
2030            vec!["default".to_string()],
2031            TestContext::default(),
2032            flusher.initial_snapshot().await,
2033            flusher.clone(),
2034        );
2035        let handle = coordinator.handle("default");
2036        coordinator.start();
2037
2038        // when - write key "a" in first batch (seq 0)
2039        let mut write1 = handle
2040            .try_write(TestWrite {
2041                key: "a".into(),
2042                value: 1,
2043                size: 10,
2044            })
2045            .await
2046            .unwrap();
2047        handle.flush(false).await.unwrap();
2048        write1.wait(Durability::Written).await.unwrap();
2049
2050        // Write to key "a" again in second batch (seq 1)
2051        let mut write2 = handle
2052            .try_write(TestWrite {
2053                key: "a".into(),
2054                value: 2,
2055                size: 10,
2056            })
2057            .await
2058            .unwrap();
2059        handle.flush(false).await.unwrap();
2060        write2.wait(Durability::Written).await.unwrap();
2061
2062        // then
2063        let events = flusher.flushed_events();
2064        assert_eq!(events.len(), 2);
2065
2066        // Batch 1: "a" with seq 0
2067        let (seq1, _) = events[0].val.writes.get("a").unwrap();
2068        assert_eq!(*seq1, 0);
2069
2070        // Batch 2: "a" with seq 1 (sequence continues)
2071        let (seq2, _) = events[1].val.writes.get("a").unwrap();
2072        assert_eq!(*seq2, 1);
2073
2074        // cleanup
2075        coordinator.stop().await;
2076    }
2077
2078    // ============================================================================
2079    // Subscribe Tests
2080    // ============================================================================
2081
2082    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2083    async fn should_receive_view_on_subscribe() {
2084        // given
2085        let flusher = TestFlusher::default();
2086        let mut coordinator = WriteCoordinator::new(
2087            test_config(),
2088            vec!["default".to_string()],
2089            TestContext::default(),
2090            flusher.initial_snapshot().await,
2091            flusher.clone(),
2092        );
2093        let handle = coordinator.handle("default");
2094        let (mut subscriber, _) = coordinator.subscribe();
2095        subscriber.initialize();
2096        coordinator.start();
2097
2098        // when
2099        handle
2100            .try_write(TestWrite {
2101                key: "a".into(),
2102                value: 1,
2103                size: 10,
2104            })
2105            .await
2106            .unwrap();
2107        handle.flush(false).await.unwrap();
2108
2109        // then - first broadcast is on freeze (delta added to frozen)
2110        let result = subscriber.recv().await;
2111        assert!(result.is_ok());
2112
2113        // cleanup
2114        coordinator.stop().await;
2115    }
2116
2117    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2118    async fn should_include_snapshot_in_view_after_flush() {
2119        // given
2120        let flusher = TestFlusher::default();
2121        let mut coordinator = WriteCoordinator::new(
2122            test_config(),
2123            vec!["default".to_string()],
2124            TestContext::default(),
2125            flusher.initial_snapshot().await,
2126            flusher.clone(),
2127        );
2128        let handle = coordinator.handle("default");
2129        let (mut subscriber, _) = coordinator.subscribe();
2130        subscriber.initialize();
2131        coordinator.start();
2132
2133        // when
2134        handle
2135            .try_write(TestWrite {
2136                key: "a".into(),
2137                value: 1,
2138                size: 10,
2139            })
2140            .await
2141            .unwrap();
2142        handle.flush(false).await.unwrap();
2143
2144        // First broadcast: freeze (frozen delta added)
2145        let _ = subscriber.recv().await.unwrap();
2146        // Second broadcast: flush complete (snapshot updated)
2147        let result = subscriber.recv().await.unwrap();
2148
2149        // then - snapshot should contain the flushed data
2150        assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2151
2152        // cleanup
2153        coordinator.stop().await;
2154    }
2155
2156    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2157    async fn should_include_delta_in_view_after_flush() {
2158        // given
2159        let flusher = TestFlusher::default();
2160        let mut coordinator = WriteCoordinator::new(
2161            test_config(),
2162            vec!["default".to_string()],
2163            TestContext::default(),
2164            flusher.initial_snapshot().await,
2165            flusher.clone(),
2166        );
2167        let handle = coordinator.handle("default");
2168        let (mut subscriber, _) = coordinator.subscribe();
2169        subscriber.initialize();
2170        coordinator.start();
2171
2172        // when
2173        handle
2174            .try_write(TestWrite {
2175                key: "a".into(),
2176                value: 42,
2177                size: 10,
2178            })
2179            .await
2180            .unwrap();
2181        handle.flush(false).await.unwrap();
2182
2183        // First broadcast: freeze
2184        let _ = subscriber.recv().await.unwrap();
2185        // Second broadcast: flush complete
2186        let result = subscriber.recv().await.unwrap();
2187
2188        // then - last_written_delta should contain the write we made
2189        let flushed = result.last_written_delta.as_ref().unwrap();
2190        assert_eq!(flushed.val.get("a"), Some(&42));
2191
2192        // cleanup
2193        coordinator.stop().await;
2194    }
2195
2196    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2197    async fn should_include_epoch_range_in_view_after_flush() {
2198        // given
2199        let flusher = TestFlusher::default();
2200        let mut coordinator = WriteCoordinator::new(
2201            test_config(),
2202            vec!["default".to_string()],
2203            TestContext::default(),
2204            flusher.initial_snapshot().await,
2205            flusher.clone(),
2206        );
2207        let handle = coordinator.handle("default");
2208        let (mut subscriber, _) = coordinator.subscribe();
2209        subscriber.initialize();
2210        coordinator.start();
2211
2212        // when
2213        let write1 = handle
2214            .try_write(TestWrite {
2215                key: "a".into(),
2216                value: 1,
2217                size: 10,
2218            })
2219            .await
2220            .unwrap();
2221        let write2 = handle
2222            .try_write(TestWrite {
2223                key: "b".into(),
2224                value: 2,
2225                size: 10,
2226            })
2227            .await
2228            .unwrap();
2229        handle.flush(false).await.unwrap();
2230
2231        // First broadcast: freeze
2232        let _ = subscriber.recv().await.unwrap();
2233        // Second broadcast: flush complete
2234        let result = subscriber.recv().await.unwrap();
2235
2236        // then - epoch range from last_written_delta should contain the epochs
2237        let flushed = result.last_written_delta.as_ref().unwrap();
2238        let epoch1 = write1.epoch().await.unwrap();
2239        let epoch2 = write2.epoch().await.unwrap();
2240        assert!(flushed.epoch_range.contains(&epoch1));
2241        assert!(flushed.epoch_range.contains(&epoch2));
2242        assert_eq!(flushed.epoch_range.start, epoch1);
2243        assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2244
2245        // cleanup
2246        coordinator.stop().await;
2247    }
2248
2249    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2250    async fn should_broadcast_frozen_delta_on_freeze() {
2251        // given
2252        let flusher = TestFlusher::default();
2253        let mut coordinator = WriteCoordinator::new(
2254            test_config(),
2255            vec!["default".to_string()],
2256            TestContext::default(),
2257            flusher.initial_snapshot().await,
2258            flusher.clone(),
2259        );
2260        let handle = coordinator.handle("default");
2261        let (mut subscriber, _) = coordinator.subscribe();
2262        subscriber.initialize();
2263        coordinator.start();
2264
2265        // when
2266        handle
2267            .try_write(TestWrite {
2268                key: "a".into(),
2269                value: 1,
2270                size: 10,
2271            })
2272            .await
2273            .unwrap();
2274        handle.flush(false).await.unwrap();
2275
2276        // then - first broadcast should have the frozen delta in the frozen vec
2277        let state = subscriber.recv().await.unwrap();
2278        assert_eq!(state.frozen.len(), 1);
2279        assert!(state.frozen[0].val.contains_key("a"));
2280
2281        // cleanup
2282        coordinator.stop().await;
2283    }
2284
2285    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2286    async fn should_remove_frozen_delta_after_flush_complete() {
2287        // given
2288        let flusher = TestFlusher::default();
2289        let mut coordinator = WriteCoordinator::new(
2290            test_config(),
2291            vec!["default".to_string()],
2292            TestContext::default(),
2293            flusher.initial_snapshot().await,
2294            flusher.clone(),
2295        );
2296        let handle = coordinator.handle("default");
2297        let (mut subscriber, _) = coordinator.subscribe();
2298        subscriber.initialize();
2299        coordinator.start();
2300
2301        // when
2302        handle
2303            .try_write(TestWrite {
2304                key: "a".into(),
2305                value: 1,
2306                size: 10,
2307            })
2308            .await
2309            .unwrap();
2310        handle.flush(false).await.unwrap();
2311
2312        // First broadcast: freeze (frozen has 1 entry)
2313        let state1 = subscriber.recv().await.unwrap();
2314        assert_eq!(state1.frozen.len(), 1);
2315
2316        // Second broadcast: flush complete (frozen is empty, last_written_delta set)
2317        let state2 = subscriber.recv().await.unwrap();
2318        assert_eq!(state2.frozen.len(), 0);
2319        assert!(state2.last_written_delta.is_some());
2320
2321        // cleanup
2322        coordinator.stop().await;
2323    }
2324
2325    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2326    async fn should_recover_from_message_lost_subscriber() {
2327        // given - a coordinator with a small broadcast buffer (16)
2328        let flusher = TestFlusher::default();
2329        let mut coordinator = WriteCoordinator::new(
2330            test_config(),
2331            vec!["default".to_string()],
2332            TestContext::default(),
2333            flusher.initial_snapshot().await,
2334            flusher.clone(),
2335        );
2336        let handle = coordinator.handle("default");
2337        let (mut subscriber, _) = coordinator.subscribe();
2338        subscriber.initialize();
2339        coordinator.start();
2340
2341        // when - write and flush enough times to overflow the broadcast buffer
2342        // (capacity is 16, so ~20 flushes should cause lag)
2343        // The subscriber never called recv(), so all broadcasts accumulate and overflow the buffer
2344        for i in 0..20 {
2345            let _write_handle = handle
2346                .try_write(TestWrite {
2347                    key: format!("key_{}", i),
2348                    value: i as u64,
2349                    size: 10,
2350                })
2351                .await
2352                .unwrap();
2353            let _ = handle.flush(false).await.unwrap();
2354        }
2355
2356        // make changes durable
2357        let mut watermark = handle
2358            .flush(true)
2359            .await
2360            .expect("flush(true) should succeed");
2361
2362        // wait for durability watermark
2363        watermark.wait(Durability::Durable).await;
2364
2365        // expect SubscribeError to be MessageLost
2366        let result = subscriber
2367            .recv()
2368            .await
2369            .expect_err("expected recv() to yield an error");
2370        assert!(matches!(result, SubscribeError::MessageLost));
2371
2372        // when - resubscribe to recover
2373        let (rx, initial_view) = handle.subscribe();
2374        (subscriber, _) = ViewSubscriber::new(rx, initial_view);
2375        let view = subscriber.initialize();
2376
2377        // then - the fresh view should reflect the current state
2378        // (all 20 writes should be in the snapshot after all the flushes)
2379        let records = view.snapshot.scan(BytesRange::unbounded()).await.unwrap();
2380        assert!(
2381            records.len() >= 20,
2382            "expected at least 20 rows, got {}",
2383            records.len()
2384        );
2385
2386        // and - we should be able to receive future broadcasts
2387        let _write_handle = handle
2388            .try_write(TestWrite {
2389                key: "post_recovery".into(),
2390                value: 100,
2391                size: 10,
2392            })
2393            .await
2394            .unwrap();
2395        let _ = handle.flush(false).await.unwrap();
2396
2397        let result = subscriber.recv().await;
2398        assert!(result.is_ok());
2399
2400        // cleanup
2401        coordinator.stop().await;
2402    }
2403
2404    // ============================================================================
2405    // Durable Flush Tests
2406    // ============================================================================
2407
2408    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2409    async fn should_flush_even_when_no_writes_if_flush_storage() {
2410        // given
2411        let flusher = TestFlusher::default();
2412        let storage = Arc::new(InMemoryStorage::new());
2413        let snapshot = storage.snapshot().await.unwrap();
2414        let mut coordinator = WriteCoordinator::new(
2415            test_config(),
2416            vec!["default".to_string()],
2417            TestContext::default(),
2418            snapshot,
2419            flusher.clone(),
2420        );
2421        let handle = coordinator.handle("default");
2422        coordinator.start();
2423
2424        // when - flush with flush_storage but no pending writes
2425        let mut flush_handle = handle.flush(true).await.unwrap();
2426        flush_handle.wait(Durability::Durable).await.unwrap();
2427
2428        // then - flusher was called (durable event sent) but no delta was recorded
2429        // (TestFlusher only records events with deltas)
2430        assert_eq!(flusher.flushed_events().len(), 0);
2431
2432        // cleanup
2433        coordinator.stop().await;
2434    }
2435
2436    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2437    async fn should_advance_durable_watermark() {
2438        // given
2439        let flusher = TestFlusher::default();
2440        let storage = Arc::new(InMemoryStorage::new());
2441        let snapshot = storage.snapshot().await.unwrap();
2442        let mut coordinator = WriteCoordinator::new(
2443            test_config(),
2444            vec!["default".to_string()],
2445            TestContext::default(),
2446            snapshot,
2447            flusher.clone(),
2448        );
2449        let handle = coordinator.handle("default");
2450        coordinator.start();
2451
2452        // when - write and flush with durable
2453        let mut write = handle
2454            .try_write(TestWrite {
2455                key: "a".into(),
2456                value: 1,
2457                size: 10,
2458            })
2459            .await
2460            .unwrap();
2461        let mut flush_handle = handle.flush(true).await.unwrap();
2462
2463        // then - can wait for Durable durability level
2464        flush_handle.wait(Durability::Durable).await.unwrap();
2465        write.wait(Durability::Durable).await.unwrap();
2466        assert_eq!(flusher.flushed_events().len(), 1);
2467
2468        // cleanup
2469        coordinator.stop().await;
2470    }
2471
2472    #[tokio::test]
2473    async fn should_see_applied_write_via_view() {
2474        // given
2475        let flusher = TestFlusher::default();
2476        let mut coordinator = WriteCoordinator::new(
2477            test_config(),
2478            vec!["default".to_string()],
2479            TestContext::default(),
2480            flusher.initial_snapshot().await,
2481            flusher,
2482        );
2483        let handle = coordinator.handle("default");
2484        coordinator.start();
2485
2486        // when
2487        let mut write = handle
2488            .try_write(TestWrite {
2489                key: "a".into(),
2490                value: 42,
2491                size: 10,
2492            })
2493            .await
2494            .unwrap();
2495        write.wait(Durability::Applied).await.unwrap();
2496
2497        // then
2498        let view = coordinator.view();
2499        assert_eq!(view.current.get("a"), Some(42));
2500
2501        // cleanup
2502        coordinator.stop().await;
2503    }
2504
2505    // ============================================================================
2506    // Multi-Channel Tests
2507    // ============================================================================
2508
2509    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2510    async fn should_flush_writes_from_multiple_channels() {
2511        // given
2512        let flusher = TestFlusher::default();
2513        let mut coordinator = WriteCoordinator::new(
2514            test_config(),
2515            vec!["ch1".to_string(), "ch2".to_string()],
2516            TestContext::default(),
2517            flusher.initial_snapshot().await,
2518            flusher.clone(),
2519        );
2520        let ch1 = coordinator.handle("ch1");
2521        let ch2 = coordinator.handle("ch2");
2522        coordinator.start();
2523
2524        // when - write to both channels, waiting for each to be applied
2525        // to ensure deterministic ordering
2526        let mut w1 = ch1
2527            .try_write(TestWrite {
2528                key: "a".into(),
2529                value: 10,
2530                size: 10,
2531            })
2532            .await
2533            .unwrap();
2534        w1.wait(Durability::Applied).await.unwrap();
2535
2536        let mut w2 = ch2
2537            .try_write(TestWrite {
2538                key: "b".into(),
2539                value: 20,
2540                size: 10,
2541            })
2542            .await
2543            .unwrap();
2544        w2.wait(Durability::Applied).await.unwrap();
2545
2546        let mut w3 = ch1
2547            .try_write(TestWrite {
2548                key: "c".into(),
2549                value: 30,
2550                size: 10,
2551            })
2552            .await
2553            .unwrap();
2554        w3.wait(Durability::Applied).await.unwrap();
2555
2556        ch1.flush(false).await.unwrap();
2557        w3.wait(Durability::Written).await.unwrap();
2558
2559        // then - snapshot should contain writes from both channels
2560        let snapshot = flusher.storage.snapshot().await.unwrap();
2561        assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2562
2563        // cleanup
2564        coordinator.stop().await;
2565    }
2566
2567    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2568    async fn should_succeed_with_write_timeout_when_queue_has_space() {
2569        // given
2570        let flusher = TestFlusher::default();
2571        let mut coordinator = WriteCoordinator::new(
2572            test_config(),
2573            vec!["default".to_string()],
2574            TestContext::default(),
2575            flusher.initial_snapshot().await,
2576            flusher.clone(),
2577        );
2578        let handle = coordinator.handle("default");
2579        coordinator.start();
2580
2581        // when
2582        let mut wh = handle
2583            .write_timeout(
2584                TestWrite {
2585                    key: "a".into(),
2586                    value: 1,
2587                    size: 10,
2588                },
2589                Duration::from_secs(1),
2590            )
2591            .await
2592            .unwrap();
2593
2594        // then
2595        let result = wh.wait(Durability::Applied).await;
2596        assert!(result.is_ok());
2597
2598        // cleanup
2599        coordinator.stop().await;
2600    }
2601
2602    #[tokio::test]
2603    async fn should_timeout_when_queue_full() {
2604        // given - queue_capacity=2, coordinator NOT started so nothing drains
2605        let flusher = TestFlusher::default();
2606        let config = WriteCoordinatorConfig {
2607            queue_capacity: 2,
2608            flush_interval: Duration::from_secs(3600),
2609            flush_size_threshold: usize::MAX,
2610        };
2611        let mut coordinator = WriteCoordinator::new(
2612            config,
2613            vec!["default".to_string()],
2614            TestContext::default(),
2615            flusher.initial_snapshot().await,
2616            flusher.clone(),
2617        );
2618        let handle = coordinator.handle("default");
2619
2620        // fill the queue
2621        let _ = handle
2622            .try_write(TestWrite {
2623                key: "a".into(),
2624                value: 1,
2625                size: 10,
2626            })
2627            .await;
2628        let _ = handle
2629            .try_write(TestWrite {
2630                key: "b".into(),
2631                value: 2,
2632                size: 10,
2633            })
2634            .await;
2635
2636        // when - third write should time out
2637        let result = handle
2638            .write_timeout(
2639                TestWrite {
2640                    key: "c".into(),
2641                    value: 3,
2642                    size: 10,
2643                },
2644                Duration::from_millis(10),
2645            )
2646            .await;
2647
2648        // then
2649        assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2650    }
2651
2652    #[tokio::test]
2653    async fn should_return_write_in_timeout_error() {
2654        // given - queue_capacity=1, coordinator NOT started
2655        let flusher = TestFlusher::default();
2656        let config = WriteCoordinatorConfig {
2657            queue_capacity: 1,
2658            flush_interval: Duration::from_secs(3600),
2659            flush_size_threshold: usize::MAX,
2660        };
2661        let mut coordinator = WriteCoordinator::new(
2662            config,
2663            vec!["default".to_string()],
2664            TestContext::default(),
2665            flusher.initial_snapshot().await,
2666            flusher.clone(),
2667        );
2668        let handle = coordinator.handle("default");
2669
2670        // fill the queue
2671        let _ = handle
2672            .try_write(TestWrite {
2673                key: "a".into(),
2674                value: 1,
2675                size: 10,
2676            })
2677            .await;
2678
2679        // when
2680        let result = handle
2681            .write_timeout(
2682                TestWrite {
2683                    key: "retry_me".into(),
2684                    value: 42,
2685                    size: 10,
2686                },
2687                Duration::from_millis(10),
2688            )
2689            .await;
2690        let Err(err) = result else {
2691            panic!("expected TimeoutError");
2692        };
2693
2694        // then - original write is returned inside the error
2695        let write = err.into_inner().expect("should contain the write");
2696        assert_eq!(write.key, "retry_me");
2697        assert_eq!(write.value, 42);
2698    }
2699
2700    #[tokio::test]
2701    async fn should_return_write_in_backpressure_error() {
2702        // given - queue_capacity=1, coordinator NOT started
2703        let flusher = TestFlusher::default();
2704        let config = WriteCoordinatorConfig {
2705            queue_capacity: 1,
2706            flush_interval: Duration::from_secs(3600),
2707            flush_size_threshold: usize::MAX,
2708        };
2709        let mut coordinator = WriteCoordinator::new(
2710            config,
2711            vec!["default".to_string()],
2712            TestContext::default(),
2713            flusher.initial_snapshot().await,
2714            flusher.clone(),
2715        );
2716        let handle = coordinator.handle("default");
2717
2718        // fill the queue
2719        let _ = handle
2720            .try_write(TestWrite {
2721                key: "a".into(),
2722                value: 1,
2723                size: 10,
2724            })
2725            .await;
2726
2727        // when
2728        let result = handle
2729            .try_write(TestWrite {
2730                key: "retry_me".into(),
2731                value: 42,
2732                size: 10,
2733            })
2734            .await;
2735        let Err(err) = result else {
2736            panic!("expected Backpressure");
2737        };
2738
2739        // then - original write is returned inside the error
2740        let write = err.into_inner().expect("should contain the write");
2741        assert_eq!(write.key, "retry_me");
2742        assert_eq!(write.value, 42);
2743    }
2744
2745    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2746    async fn should_succeed_when_queue_drains_within_timeout() {
2747        // given - queue_capacity=2, coordinator started so it drains
2748        let flusher = TestFlusher::default();
2749        let config = WriteCoordinatorConfig {
2750            queue_capacity: 2,
2751            flush_interval: Duration::from_secs(3600),
2752            flush_size_threshold: usize::MAX,
2753        };
2754        let mut coordinator = WriteCoordinator::new(
2755            config,
2756            vec!["default".to_string()],
2757            TestContext::default(),
2758            flusher.initial_snapshot().await,
2759            flusher.clone(),
2760        );
2761        let handle = coordinator.handle("default");
2762
2763        // fill the queue before starting
2764        let _ = handle
2765            .try_write(TestWrite {
2766                key: "a".into(),
2767                value: 1,
2768                size: 10,
2769            })
2770            .await;
2771        let _ = handle
2772            .try_write(TestWrite {
2773                key: "b".into(),
2774                value: 2,
2775                size: 10,
2776            })
2777            .await;
2778
2779        // when - start coordinator (begins draining) and write with generous timeout
2780        coordinator.start();
2781        let result = handle
2782            .write_timeout(
2783                TestWrite {
2784                    key: "c".into(),
2785                    value: 3,
2786                    size: 10,
2787                },
2788                Duration::from_secs(5),
2789            )
2790            .await;
2791
2792        // then
2793        assert!(result.is_ok());
2794
2795        // cleanup
2796        coordinator.stop().await;
2797    }
2798
2799    #[tokio::test]
2800    async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2801        // given
2802        let flusher = TestFlusher::default();
2803        let mut coordinator = WriteCoordinator::new(
2804            test_config(),
2805            vec!["default".to_string()],
2806            TestContext::default(),
2807            flusher.initial_snapshot().await,
2808            flusher.clone(),
2809        );
2810        let handle = coordinator.handle("default");
2811        coordinator.start();
2812
2813        // when
2814        coordinator.stop().await;
2815        let result = handle
2816            .write_timeout(
2817                TestWrite {
2818                    key: "a".into(),
2819                    value: 1,
2820                    size: 10,
2821                },
2822                Duration::from_secs(1),
2823            )
2824            .await;
2825
2826        // then
2827        assert!(matches!(result, Err(WriteError::Shutdown)));
2828    }
2829
2830    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2831    async fn should_pause_and_resume_write_channel() {
2832        // given
2833        let flusher = TestFlusher::default();
2834        let mut config = test_config();
2835        config.flush_size_threshold = usize::MAX;
2836        config.flush_interval = Duration::from_hours(24);
2837        let mut coordinator = WriteCoordinator::new(
2838            config,
2839            vec!["a", "b"],
2840            TestContext::default(),
2841            flusher.initial_snapshot().await,
2842            flusher.clone(),
2843        );
2844        let handle_a = coordinator.handle("a");
2845        let handle_b = coordinator.handle("b");
2846        let pause_handle = coordinator.pause_handle("a");
2847
2848        // pause before starting the coordinator. otherwise
2849        // there's a race condition where the PausableReceiver
2850        // makes it past pause_rx.wait_for(|v| !*v).await; and
2851        // waits for recv before we pause the handle - we could
2852        // change the behavior of the PausableReceiver to first
2853        // wait on recv and then check pause before returning
2854        // but that feels weird and is not necessary for the use
2855        // cases we have in mind
2856        pause_handle.pause();
2857        coordinator.start();
2858
2859        // when
2860        let mut result_a = handle_a
2861            .try_write(TestWrite {
2862                key: "a".into(),
2863                value: 1,
2864                size: 1,
2865            })
2866            .await
2867            .unwrap();
2868        for i in 0..1000 {
2869            handle_b
2870                .try_write(TestWrite {
2871                    key: format!("b{}", i),
2872                    value: i,
2873                    size: 1,
2874                })
2875                .await
2876                .unwrap()
2877                .wait(Durability::Applied)
2878                .await
2879                .unwrap();
2880        }
2881
2882        // then
2883        // make sure the data only contains keys from b (so a was never processed)
2884        let data = coordinator.view().current.data.lock().unwrap().clone();
2885        let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2886        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2887        pause_handle.unpause();
2888        // after resuming, wait for the data to include keys from a
2889        result_a.wait(Durability::Applied).await.unwrap();
2890        let data = coordinator.view().current.data.lock().unwrap().clone();
2891        expected.insert("a".into());
2892        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2893    }
2894}