Skip to main content

common/coordinator/
mod.rs

1#![allow(unused)]
2
3mod error;
4mod handle;
5pub(crate) mod metrics;
6pub mod subscriber;
7mod traits;
8
9use std::collections::HashMap;
10use std::ops::Range;
11use std::ops::{Deref, DerefMut};
12
13pub use error::{WriteError, WriteResult};
14use futures::stream::{self, SelectAll, StreamExt};
15pub use handle::{View, WriteCoordinatorHandle, WriteHandle};
16pub use metrics::describe_coordinator_metrics;
17pub use subscriber::{SubscribeError, ViewMonitor, ViewSubscriber};
18pub use traits::{Delta, Durability, EpochStamped, Flusher};
19
20/// Event sent from the write coordinator task to the flush task.
21enum FlushEvent<D: Delta> {
22    /// Flush a frozen delta to storage.
23    FlushDelta { frozen: EpochStamped<D::Frozen> },
24    /// Ensure storage durability (e.g. call storage.flush()).
25    FlushStorage,
26}
27
28// Internal use only
29use crate::StorageRead;
30use crate::storage::StorageSnapshot;
31use async_trait::async_trait;
32pub use handle::EpochWatcher;
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35use tokio::sync::{broadcast, mpsc, oneshot, watch};
36use tokio::time::{Instant, Interval, interval_at};
37use tokio_util::sync::CancellationToken;
38
39/// Configuration for the write coordinator.
40#[derive(Debug, Clone)]
41pub struct WriteCoordinatorConfig {
42    /// Maximum number of pending writes in the queue.
43    pub queue_capacity: usize,
44    /// Interval at which to trigger automatic flushes.
45    pub flush_interval: Duration,
46    /// Delta size threshold at which to trigger a flush.
47    pub flush_size_threshold: usize,
48}
49
50impl Default for WriteCoordinatorConfig {
51    fn default() -> Self {
52        Self {
53            queue_capacity: 10_000,
54            flush_interval: Duration::from_secs(10),
55            flush_size_threshold: 64 * 1024 * 1024, // 64 MB
56        }
57    }
58}
59
60pub(crate) enum WriteCommand<D: Delta> {
61    Write {
62        write: D::Write,
63        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
64    },
65    Flush {
66        epoch_tx: oneshot::Sender<handle::EpochResult<()>>,
67        flush_storage: bool,
68    },
69}
70
71/// The write coordinator manages write ordering, batching, and durability.
72///
73/// It accepts writes through `WriteCoordinatorHandle`, applies them to a `Delta`,
74/// and coordinates flushing through a `Flusher`.
75pub struct WriteCoordinator<D: Delta, F: Flusher<D>> {
76    handles: HashMap<String, WriteCoordinatorHandle<D>>,
77    pause_handles: HashMap<String, PauseHandle>,
78    stop_tok: CancellationToken,
79    tasks: Option<(WriteCoordinatorTask<D>, FlushTask<D, F>)>,
80    write_task_jh: Option<tokio::task::JoinHandle<Result<(), String>>>,
81    view: Arc<BroadcastedView<D>>,
82}
83
84impl<D: Delta, F: Flusher<D>> WriteCoordinator<D, F> {
85    pub fn new(
86        config: WriteCoordinatorConfig,
87        channels: Vec<impl ToString>,
88        initial_context: D::Context,
89        initial_snapshot: Arc<dyn StorageSnapshot>,
90        flusher: F,
91    ) -> WriteCoordinator<D, F> {
92        let (watermarks, watcher) = EpochWatermarks::new();
93        let watermarks = Arc::new(watermarks);
94
95        // Create a write channel per named input
96        let mut write_rxs = Vec::with_capacity(channels.len());
97        let mut write_txs = HashMap::new();
98        let mut pause_handles = HashMap::new();
99        for name in &channels {
100            let (write_tx, write_rx, pause_hdl) = pausable_channel(config.queue_capacity);
101            write_rxs.push(write_rx);
102            write_txs.insert(name.to_string(), write_tx);
103            pause_handles.insert(name.to_string(), pause_hdl);
104        }
105
106        // this is the channel that sends FlushEvents to be flushed
107        // by a background task so that the process of converting deltas
108        // to storage operations is non-blocking. for now, we apply no
109        // backpressure on this channel, so writes will block if more than
110        // one flush is pending
111        let (flush_tx, flush_rx) = mpsc::channel(2);
112
113        let flush_stop_tok = CancellationToken::new();
114        let stop_tok = CancellationToken::new();
115        let write_task = WriteCoordinatorTask::new(
116            config,
117            initial_context,
118            initial_snapshot,
119            write_rxs,
120            flush_tx,
121            watermarks.clone(),
122            stop_tok.clone(),
123            flush_stop_tok.clone(),
124        );
125
126        let view = write_task.view.clone();
127
128        let mut handles = HashMap::new();
129        for name in channels {
130            let name = name.to_string();
131            let write_tx = write_txs.remove(&name).expect("unreachable");
132            handles.insert(
133                name.clone(),
134                WriteCoordinatorHandle::new(name, write_tx, watcher.clone(), view.clone()),
135            );
136        }
137
138        let flush_task = FlushTask {
139            flusher,
140            stop_tok: flush_stop_tok,
141            flush_rx,
142            watermarks: watermarks.clone(),
143            view: view.clone(),
144            last_flushed_epoch: 0,
145        };
146
147        Self {
148            handles,
149            pause_handles,
150            tasks: Some((write_task, flush_task)),
151            write_task_jh: None,
152            stop_tok,
153            view,
154        }
155    }
156
157    pub fn handle(&self, name: &str) -> WriteCoordinatorHandle<D> {
158        self.handles
159            .get(name)
160            .expect("unknown channel name")
161            .clone()
162    }
163
164    pub fn pause_handle(&self, name: &str) -> PauseHandle {
165        self.pause_handles
166            .get(name)
167            .expect("unknown channel name")
168            .clone()
169    }
170
171    pub fn start(&mut self) {
172        let Some((write_task, flush_task)) = self.tasks.take() else {
173            // already started
174            return;
175        };
176        let flush_task_jh = flush_task.run();
177        let write_task_jh = write_task.run(flush_task_jh);
178        self.write_task_jh = Some(write_task_jh);
179    }
180
181    pub async fn stop(mut self) -> Result<(), String> {
182        let Some(write_task_jh) = self.write_task_jh.take() else {
183            return Ok(());
184        };
185        self.stop_tok.cancel();
186        write_task_jh.await.map_err(|e| e.to_string())?
187    }
188
189    pub fn view(&self) -> Arc<View<D>> {
190        self.view.current()
191    }
192
193    pub fn subscribe(&self) -> (ViewSubscriber<D>, ViewMonitor) {
194        let (view_rx, initial_view) = self.view.subscribe();
195        ViewSubscriber::new(view_rx, initial_view)
196    }
197}
198
199struct WriteCoordinatorTask<D: Delta> {
200    config: WriteCoordinatorConfig,
201    delta: CurrentDelta<D>,
202    flush_tx: mpsc::Sender<FlushEvent<D>>,
203    write_rxs: Vec<PausableReceiver<D>>,
204    watermarks: Arc<EpochWatermarks>,
205    view: Arc<BroadcastedView<D>>,
206    epoch: u64,
207    delta_start_epoch: u64,
208    flush_interval: Interval,
209    stop_tok: CancellationToken,
210    flush_stop_tok: CancellationToken,
211}
212
213impl<D: Delta> WriteCoordinatorTask<D> {
214    /// Create a new write coordinator with the given flusher.
215    ///
216    /// This is useful for testing with mock flushers.
217    #[allow(clippy::too_many_arguments)]
218    pub fn new(
219        config: WriteCoordinatorConfig,
220        initial_context: D::Context,
221        initial_snapshot: Arc<dyn StorageSnapshot>,
222        write_rxs: Vec<PausableReceiver<D>>,
223        flush_tx: mpsc::Sender<FlushEvent<D>>,
224        watermarks: Arc<EpochWatermarks>,
225        stop_tok: CancellationToken,
226        flush_stop_tok: CancellationToken,
227    ) -> Self {
228        let delta = D::init(initial_context);
229
230        let initial_view = View {
231            current: delta.reader(),
232            frozen: vec![],
233            snapshot: initial_snapshot,
234            last_written_delta: None,
235        };
236        let initial_view = Arc::new(BroadcastedView::new(initial_view));
237
238        let flush_interval = interval_at(
239            Instant::now() + config.flush_interval,
240            config.flush_interval,
241        );
242        Self {
243            config,
244            delta: CurrentDelta::new(delta),
245            write_rxs,
246            flush_tx,
247            watermarks,
248            view: initial_view,
249            // Epochs start at 1 because watch channels initialize to 0 (meaning "nothing
250            // processed yet"). If the first write had epoch 0, wait() would return
251            // immediately since the condition `watermark < epoch` would be `0 < 0` = false.
252            epoch: 1,
253            delta_start_epoch: 1,
254            flush_interval,
255            stop_tok,
256            flush_stop_tok,
257        }
258    }
259
260    /// Run the coordinator event loop.
261    pub fn run(
262        mut self,
263        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
264    ) -> tokio::task::JoinHandle<Result<(), String>> {
265        tokio::task::spawn(async move { self.run_coordinator(flush_task_jh).await })
266    }
267
268    async fn run_coordinator(
269        mut self,
270        flush_task_jh: tokio::task::JoinHandle<WriteResult<()>>,
271    ) -> Result<(), String> {
272        // Reset the interval to start fresh from when run() is called
273        self.flush_interval.reset();
274
275        // Merge all write receivers into a single stream
276        let mut write_stream: SelectAll<_> = SelectAll::new();
277        for rx in self.write_rxs.drain(..) {
278            write_stream.push(
279                stream::unfold(
280                    rx,
281                    |mut rx| async move { rx.recv().await.map(|cmd| (cmd, rx)) },
282                )
283                .boxed(),
284            );
285        }
286
287        loop {
288            tokio::select! {
289                cmd = write_stream.next() => {
290                    match cmd {
291                        Some(WriteCommand::Write { write, result_tx }) => {
292                            self.handle_write(write, result_tx).await?;
293                        }
294                        Some(WriteCommand::Flush { epoch_tx, flush_storage }) => {
295                            // Send back the epoch of the last processed write
296                            let _ = epoch_tx.send(Ok(handle::WriteApplied {
297                                epoch: self.epoch.saturating_sub(1),
298                                result: (),
299                            }));
300                            self.handle_flush(FlushReason::Explicit, flush_storage).await;
301                        }
302                        None => {
303                            // All write channels closed
304                            break;
305                        }
306                    }
307                }
308
309                _ = self.flush_interval.tick() => {
310                    self.handle_flush(FlushReason::Interval, false).await;
311                }
312
313                _ = self.stop_tok.cancelled() => {
314                    break;
315                }
316            }
317        }
318
319        // Flush any remaining pending writes before shutdown
320        self.handle_flush(FlushReason::Shutdown, false).await;
321
322        // Signal the flush task to stop
323        self.flush_stop_tok.cancel();
324        // Wait for the flush task to complete and propagate any errors
325        flush_task_jh
326            .await
327            .map_err(|e| format!("flush task panicked: {}", e))?
328            .map_err(|e| format!("flush task error: {}", e))
329    }
330
331    async fn handle_write(
332        &mut self,
333        op: D::Write,
334        result_tx: oneshot::Sender<handle::EpochResult<D::ApplyResult>>,
335    ) -> Result<(), String> {
336        let write_epoch = self.epoch;
337        self.epoch += 1;
338
339        let apply_start = std::time::Instant::now();
340        let result = self.delta.apply(op);
341        ::metrics::histogram!(metrics::COORDINATOR_DELTA_APPLY_DURATION_SECONDS)
342            .record(apply_start.elapsed().as_secs_f64());
343
344        // Ignore error if receiver was dropped (fire-and-forget write)
345        let _ = result_tx.send(
346            result
347                .map(|apply_result| handle::WriteApplied {
348                    epoch: write_epoch,
349                    result: apply_result,
350                })
351                .map_err(|e| handle::WriteFailed {
352                    epoch: write_epoch,
353                    error: e,
354                }),
355        );
356
357        // Ignore error if no watchers are listening - this is non-fatal
358        self.watermarks.update_applied(write_epoch);
359
360        let estimated = self.delta.estimate_size();
361        ::metrics::gauge!(metrics::COORDINATOR_DELTA_ESTIMATED_BYTES).set(estimated as f64);
362        if estimated >= self.config.flush_size_threshold {
363            self.handle_flush(FlushReason::SizeThreshold, false).await;
364        }
365
366        Ok(())
367    }
368
369    async fn handle_flush(&mut self, reason: FlushReason, flush_storage: bool) {
370        self.flush_if_delta_has_writes(reason).await;
371        if flush_storage {
372            self.send_flush_event(FlushEvent::FlushStorage).await;
373        }
374    }
375
376    async fn flush_if_delta_has_writes(&mut self, reason: FlushReason) {
377        if self.epoch == self.delta_start_epoch {
378            return;
379        }
380
381        self.flush_interval.reset();
382
383        ::metrics::counter!(metrics::COORDINATOR_FLUSH_TOTAL, "reason" => reason.as_str())
384            .increment(1);
385
386        let epoch_range = self.delta_start_epoch..self.epoch;
387        self.delta_start_epoch = self.epoch;
388
389        let freeze_start = std::time::Instant::now();
390        let (frozen, frozen_reader) = self.delta.freeze_and_init();
391        ::metrics::histogram!(metrics::COORDINATOR_DELTA_FREEZE_DURATION_SECONDS)
392            .record(freeze_start.elapsed().as_secs_f64());
393        // Reset estimated bytes gauge: the new delta starts empty.
394        ::metrics::gauge!(metrics::COORDINATOR_DELTA_ESTIMATED_BYTES).set(0.0);
395
396        let stamped_frozen = EpochStamped::new(frozen, epoch_range.clone());
397        let stamped_frozen_reader = EpochStamped::new(frozen_reader, epoch_range.clone());
398        let reader = self.delta.reader();
399        // update the view before sending the flush msg to ensure the flusher sees
400        // the frozen reader when updating the view post-flush
401        self.view.update_delta_frozen(stamped_frozen_reader, reader);
402        // this is the blocking section of the flush, new writes will not be accepted
403        // until the event is sent to the FlushTask
404        self.send_flush_event(FlushEvent::FlushDelta {
405            frozen: stamped_frozen,
406        })
407        .await;
408    }
409
410    async fn send_flush_event(&self, event: FlushEvent<D>) {
411        // Sample queue depth (in-flight events) just before sending. `max_capacity`
412        // and `capacity` are cheap atomic reads.
413        let max = self.flush_tx.max_capacity();
414        let free = self.flush_tx.capacity();
415        ::metrics::gauge!(metrics::COORDINATOR_FLUSH_EVENT_QUEUE_DEPTH)
416            .set(max.saturating_sub(free) as f64);
417
418        let send_start = std::time::Instant::now();
419        let _ = self.flush_tx.send(event).await;
420        ::metrics::histogram!(metrics::COORDINATOR_FLUSH_EVENT_SEND_DURATION_SECONDS)
421            .record(send_start.elapsed().as_secs_f64());
422    }
423}
424
425/// Reason a flush was triggered. Used as a low-cardinality `reason` label.
426#[derive(Clone, Copy, Debug)]
427pub(crate) enum FlushReason {
428    SizeThreshold,
429    Interval,
430    Explicit,
431    Shutdown,
432}
433
434impl FlushReason {
435    fn as_str(self) -> &'static str {
436        match self {
437            FlushReason::SizeThreshold => "size_threshold",
438            FlushReason::Interval => "interval",
439            FlushReason::Explicit => "explicit",
440            FlushReason::Shutdown => "shutdown",
441        }
442    }
443}
444
445struct FlushTask<D: Delta, F: Flusher<D>> {
446    flusher: F,
447    stop_tok: CancellationToken,
448    flush_rx: mpsc::Receiver<FlushEvent<D>>,
449    watermarks: Arc<EpochWatermarks>,
450    view: Arc<BroadcastedView<D>>,
451    last_flushed_epoch: u64,
452}
453
454impl<D: Delta, F: Flusher<D>> FlushTask<D, F> {
455    fn run(mut self) -> tokio::task::JoinHandle<WriteResult<()>> {
456        tokio::spawn(async move {
457            loop {
458                tokio::select! {
459                    event = self.flush_rx.recv() => {
460                        let Some(event) = event else {
461                            break;
462                        };
463                        self.handle_event(event).await?;
464                    }
465                    _ = self.stop_tok.cancelled() => {
466                        break;
467                    }
468                }
469            }
470            // drain all remaining flush events
471            while let Ok(event) = self.flush_rx.try_recv() {
472                self.handle_event(event).await;
473            }
474            Ok(())
475        })
476    }
477
478    async fn handle_event(&mut self, event: FlushEvent<D>) -> WriteResult<()> {
479        match event {
480            FlushEvent::FlushDelta { frozen } => self.handle_flush(frozen).await,
481            FlushEvent::FlushStorage => {
482                let start = std::time::Instant::now();
483                let result = self
484                    .flusher
485                    .flush_storage()
486                    .await
487                    .map_err(|e| WriteError::FlushError(e.to_string()));
488                ::metrics::histogram!(metrics::COORDINATOR_FLUSH_STORAGE_DURATION_SECONDS)
489                    .record(start.elapsed().as_secs_f64());
490                result?;
491                self.watermarks.update_durable(self.last_flushed_epoch);
492                Ok(())
493            }
494        }
495    }
496
497    async fn handle_flush(&mut self, frozen: EpochStamped<D::Frozen>) -> WriteResult<()> {
498        let delta = frozen.val;
499        let epoch_range = frozen.epoch_range;
500        let start = std::time::Instant::now();
501        let result = self
502            .flusher
503            .flush_delta(delta, &epoch_range)
504            .await
505            .map_err(|e| WriteError::FlushError(e.to_string()));
506        ::metrics::histogram!(metrics::COORDINATOR_FLUSH_DELTA_DURATION_SECONDS)
507            .record(start.elapsed().as_secs_f64());
508        let snapshot = result?;
509        self.last_flushed_epoch = epoch_range.end - 1;
510        // Publish the refreshed view first so any waiter awoken by the
511        // watermark notification below observes a view that already includes
512        // this flush. Reversing the order races on multi-thread runtimes:
513        // a waiter on `Written` can schedule on another worker and read
514        // `view.snapshot` before this task completes the view update.
515        self.view.update_flush_finished(snapshot, epoch_range);
516        self.watermarks.update_written(self.last_flushed_epoch);
517        Ok(())
518    }
519}
520
521struct CurrentDelta<D: Delta> {
522    delta: Option<D>,
523}
524
525impl<D: Delta> Deref for CurrentDelta<D> {
526    type Target = D;
527
528    fn deref(&self) -> &Self::Target {
529        match &self.delta {
530            Some(d) => d,
531            None => panic!("current delta not initialized"),
532        }
533    }
534}
535
536impl<D: Delta> DerefMut for CurrentDelta<D> {
537    fn deref_mut(&mut self) -> &mut Self::Target {
538        match &mut self.delta {
539            Some(d) => d,
540            None => panic!("current delta not initialized"),
541        }
542    }
543}
544
545impl<D: Delta> CurrentDelta<D> {
546    fn new(delta: D) -> Self {
547        Self { delta: Some(delta) }
548    }
549
550    fn freeze_and_init(&mut self) -> (D::Frozen, D::FrozenView) {
551        let Some(delta) = self.delta.take() else {
552            panic!("delta not initialized");
553        };
554        let (frozen, frozen_reader, context) = delta.freeze();
555        let new_delta = D::init(context);
556        self.delta = Some(new_delta);
557        (frozen, frozen_reader)
558    }
559}
560
561pub struct EpochWatermarks {
562    applied_tx: tokio::sync::watch::Sender<u64>,
563    written_tx: tokio::sync::watch::Sender<u64>,
564    durable_tx: tokio::sync::watch::Sender<u64>,
565}
566
567impl EpochWatermarks {
568    pub fn new() -> (Self, EpochWatcher) {
569        let (applied_tx, applied_rx) = tokio::sync::watch::channel(0);
570        let (written_tx, written_rx) = tokio::sync::watch::channel(0);
571        let (durable_tx, durable_rx) = tokio::sync::watch::channel(0);
572        let watcher = EpochWatcher {
573            applied_rx,
574            written_rx,
575            durable_rx,
576        };
577        let watermarks = EpochWatermarks {
578            applied_tx,
579            written_tx,
580            durable_tx,
581        };
582        (watermarks, watcher)
583    }
584
585    pub fn update_applied(&self, epoch: u64) {
586        let _ = self.applied_tx.send(epoch);
587    }
588
589    pub fn update_written(&self, epoch: u64) {
590        let _ = self.written_tx.send(epoch);
591    }
592
593    pub fn update_durable(&self, epoch: u64) {
594        let _ = self.durable_tx.send(epoch);
595    }
596}
597
598pub(crate) struct BroadcastedView<D: Delta> {
599    inner: Mutex<BroadcastedViewInner<D>>,
600}
601
602impl<D: Delta> BroadcastedView<D> {
603    fn new(initial_view: View<D>) -> Self {
604        let (view_tx, _) = broadcast::channel(16);
605        Self {
606            inner: Mutex::new(BroadcastedViewInner {
607                view: Arc::new(initial_view),
608                view_tx,
609            }),
610        }
611    }
612
613    fn update_flush_finished(&self, snapshot: Arc<dyn StorageSnapshot>, epoch_range: Range<u64>) {
614        self.inner
615            .lock()
616            .expect("lock poisoned")
617            .update_flush_finished(snapshot, epoch_range);
618    }
619
620    fn update_delta_frozen(&self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
621        self.inner
622            .lock()
623            .expect("lock poisoned")
624            .update_delta_frozen(frozen, reader);
625    }
626
627    fn current(&self) -> Arc<View<D>> {
628        self.inner.lock().expect("lock poisoned").current()
629    }
630
631    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
632        self.inner.lock().expect("lock poisoned").subscribe()
633    }
634}
635
636struct BroadcastedViewInner<D: Delta> {
637    view: Arc<View<D>>,
638    view_tx: tokio::sync::broadcast::Sender<Arc<View<D>>>,
639}
640
641impl<D: Delta> BroadcastedViewInner<D> {
642    fn update_flush_finished(
643        &mut self,
644        snapshot: Arc<dyn StorageSnapshot>,
645        epoch_range: Range<u64>,
646    ) {
647        let mut new_frozen = self.view.frozen.clone();
648        let last = new_frozen
649            .pop()
650            .expect("frozen should not be empty when flush completes");
651        assert_eq!(last.epoch_range, epoch_range);
652        self.view = Arc::new(View {
653            current: self.view.current.clone(),
654            frozen: new_frozen,
655            snapshot,
656            last_written_delta: Some(last),
657        });
658        self.view_tx.send(self.view.clone());
659    }
660
661    fn update_delta_frozen(&mut self, frozen: EpochStamped<D::FrozenView>, reader: D::DeltaView) {
662        // Update read state: add frozen delta to front, update current reader
663        let mut new_frozen = vec![frozen];
664        new_frozen.extend(self.view.frozen.iter().cloned());
665        self.view = Arc::new(View {
666            current: reader,
667            frozen: new_frozen,
668            snapshot: self.view.snapshot.clone(),
669            last_written_delta: self.view.last_written_delta.clone(),
670        });
671        self.view_tx.send(self.view.clone());
672    }
673
674    fn current(&self) -> Arc<View<D>> {
675        self.view.clone()
676    }
677
678    fn subscribe(&self) -> (broadcast::Receiver<Arc<View<D>>>, Arc<View<D>>) {
679        (self.view_tx.subscribe(), self.view.clone())
680    }
681}
682
683struct PausableReceiver<D: Delta> {
684    pause_rx: Option<watch::Receiver<bool>>,
685    rx: mpsc::Receiver<WriteCommand<D>>,
686}
687
688impl<D: Delta> PausableReceiver<D> {
689    async fn recv(&mut self) -> Option<WriteCommand<D>> {
690        if let Some(pause_rx) = self.pause_rx.as_mut() {
691            pause_rx.wait_for(|v| !*v).await;
692        }
693        self.rx.recv().await
694    }
695}
696
697/// A handle that can be used to pause/unpause the coordinator's consumption from a write queue
698#[derive(Clone)]
699pub struct PauseHandle {
700    pause_tx: tokio::sync::watch::Sender<bool>,
701}
702
703impl PauseHandle {
704    pub fn pause(&self) {
705        self.pause_tx.send_replace(true);
706    }
707
708    pub fn unpause(&self) {
709        self.pause_tx.send_replace(false);
710    }
711}
712
713fn pausable_channel<D: Delta>(
714    capacity: usize,
715) -> (
716    mpsc::Sender<WriteCommand<D>>,
717    PausableReceiver<D>,
718    PauseHandle,
719) {
720    let (pause_tx, pause_rx) = watch::channel(false);
721    let (tx, rx) = mpsc::channel(capacity);
722    (
723        tx,
724        PausableReceiver {
725            pause_rx: Some(pause_rx),
726            rx,
727        },
728        PauseHandle { pause_tx },
729    )
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use crate::BytesRange;
736    use crate::coordinator::Durability;
737    use crate::storage::in_memory::{InMemoryStorage, InMemoryStorageSnapshot};
738    use crate::storage::{PutRecordOp, Record, StorageSnapshot};
739    use crate::{Storage, StorageRead};
740    use async_trait::async_trait;
741    use bytes::Bytes;
742    use std::collections::{HashMap, HashSet};
743    use std::ops::Range;
744    use std::sync::Mutex;
745    // ============================================================================
746    // Test Infrastructure
747    // ============================================================================
748
749    #[derive(Clone, Debug)]
750    struct TestWrite {
751        key: String,
752        value: u64,
753        size: usize,
754    }
755
756    /// Context carries state that must persist across deltas (like sequence allocation)
757    #[derive(Clone, Debug, Default)]
758    struct TestContext {
759        next_seq: u64,
760        error: Option<String>,
761    }
762
763    /// A shared reader that sees writes as they are applied to the delta.
764    #[derive(Clone, Debug, Default)]
765    struct TestDeltaReader {
766        data: Arc<Mutex<HashMap<String, u64>>>,
767    }
768
769    impl TestDeltaReader {
770        fn get(&self, key: &str) -> Option<u64> {
771            self.data.lock().unwrap().get(key).copied()
772        }
773    }
774
775    /// Delta accumulates writes with sequence numbers.
776    /// Stores the context directly and updates it in place.
777    #[derive(Debug)]
778    struct TestDelta {
779        context: TestContext,
780        writes: HashMap<String, (u64, u64)>,
781        key_values: Arc<Mutex<HashMap<String, u64>>>,
782        total_size: usize,
783    }
784
785    #[derive(Clone, Debug)]
786    struct FrozenTestDelta {
787        writes: HashMap<String, (u64, u64)>,
788    }
789
790    impl std::fmt::Debug for View<TestDelta> {
791        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
792            f.write_str("View<TestDelta>")
793        }
794    }
795
796    impl Delta for TestDelta {
797        type Context = TestContext;
798        type Write = TestWrite;
799        type DeltaView = TestDeltaReader;
800        type Frozen = FrozenTestDelta;
801        type FrozenView = Arc<HashMap<String, u64>>;
802        type ApplyResult = ();
803
804        fn init(context: Self::Context) -> Self {
805            Self {
806                context,
807                writes: HashMap::default(),
808                key_values: Arc::new(Mutex::new(HashMap::default())),
809                total_size: 0,
810            }
811        }
812
813        fn apply(&mut self, write: Self::Write) -> Result<(), String> {
814            if let Some(error) = &self.context.error {
815                return Err(error.clone());
816            }
817
818            let seq = self.context.next_seq;
819            self.context.next_seq += 1;
820
821            self.writes.insert(write.key.clone(), (seq, write.value));
822            self.total_size += write.size;
823            self.key_values
824                .lock()
825                .unwrap()
826                .insert(write.key, write.value);
827            Ok(())
828        }
829
830        fn estimate_size(&self) -> usize {
831            self.total_size
832        }
833
834        fn freeze(self) -> (Self::Frozen, Self::FrozenView, Self::Context) {
835            let frozen = FrozenTestDelta {
836                writes: self.writes,
837            };
838            let frozen_view = Arc::new(self.key_values.lock().unwrap().clone());
839            (frozen, frozen_view, self.context)
840        }
841
842        fn reader(&self) -> Self::DeltaView {
843            TestDeltaReader {
844                data: self.key_values.clone(),
845            }
846        }
847    }
848
849    /// Shared state for TestFlusher - allows test to inspect and control behavior
850    #[derive(Default)]
851    struct TestFlusherState {
852        flushed_events: Vec<Arc<EpochStamped<FrozenTestDelta>>>,
853        /// Signals when a flush starts (before blocking)
854        flush_started_tx: Option<oneshot::Sender<()>>,
855        /// Blocks flush until signaled
856        unblock_rx: Option<mpsc::Receiver<()>>,
857    }
858
859    #[derive(Clone)]
860    struct TestFlusher {
861        state: Arc<Mutex<TestFlusherState>>,
862        storage: Arc<InMemoryStorage>,
863    }
864
865    impl Default for TestFlusher {
866        fn default() -> Self {
867            Self {
868                state: Arc::new(Mutex::new(TestFlusherState::default())),
869                storage: Arc::new(InMemoryStorage::new()),
870            }
871        }
872    }
873
874    impl TestFlusher {
875        /// Create a flusher that blocks until signaled, with a notification when flush starts.
876        /// Returns (flusher, flush_started_rx, unblock_tx).
877        fn with_flush_control() -> (Self, oneshot::Receiver<()>, mpsc::Sender<()>) {
878            let (started_tx, started_rx) = oneshot::channel();
879            let (unblock_tx, unblock_rx) = mpsc::channel(1);
880            let flusher = Self {
881                state: Arc::new(Mutex::new(TestFlusherState {
882                    flushed_events: Vec::new(),
883                    flush_started_tx: Some(started_tx),
884                    unblock_rx: Some(unblock_rx),
885                })),
886                storage: Arc::new(InMemoryStorage::new()),
887            };
888            (flusher, started_rx, unblock_tx)
889        }
890
891        fn flushed_events(&self) -> Vec<Arc<EpochStamped<FrozenTestDelta>>> {
892            self.state.lock().unwrap().flushed_events.clone()
893        }
894
895        async fn initial_snapshot(&self) -> Arc<dyn StorageSnapshot> {
896            self.storage.snapshot().await.unwrap()
897        }
898    }
899
900    #[async_trait]
901    impl Flusher<TestDelta> for TestFlusher {
902        async fn flush_delta(
903            &mut self,
904            frozen: FrozenTestDelta,
905            epoch_range: &Range<u64>,
906        ) -> Result<Arc<dyn StorageSnapshot>, String> {
907            // Signal that flush has started
908            let flush_started_tx = {
909                let mut state = self.state.lock().unwrap();
910                state.flush_started_tx.take()
911            };
912            if let Some(tx) = flush_started_tx {
913                let _ = tx.send(());
914            }
915
916            // Block if test wants to control timing
917            let unblock_rx = {
918                let mut state = self.state.lock().unwrap();
919                state.unblock_rx.take()
920            };
921            if let Some(mut rx) = unblock_rx {
922                rx.recv().await;
923            }
924
925            // Write records to storage
926            let records: Vec<PutRecordOp> = frozen
927                .writes
928                .iter()
929                .map(|(key, (seq, value))| {
930                    let mut buf = Vec::with_capacity(16);
931                    buf.extend_from_slice(&seq.to_le_bytes());
932                    buf.extend_from_slice(&value.to_le_bytes());
933                    Record::new(Bytes::from(key.clone()), Bytes::from(buf)).into()
934                })
935                .collect();
936            self.storage
937                .put(records)
938                .await
939                .map_err(|e| format!("{}", e))?;
940
941            // Record the flush
942            {
943                let mut state = self.state.lock().unwrap();
944                state
945                    .flushed_events
946                    .push(Arc::new(EpochStamped::new(frozen, epoch_range.clone())));
947            }
948
949            self.storage.snapshot().await.map_err(|e| format!("{}", e))
950        }
951
952        async fn flush_storage(&self) -> Result<(), String> {
953            // Signal that flush has started
954            let flush_started_tx = {
955                let mut state = self.state.lock().unwrap();
956                state.flush_started_tx.take()
957            };
958            if let Some(tx) = flush_started_tx {
959                let _ = tx.send(());
960            }
961
962            // Block if test wants to control timing
963            let unblock_rx = {
964                let mut state = self.state.lock().unwrap();
965                state.unblock_rx.take()
966            };
967            if let Some(mut rx) = unblock_rx {
968                rx.recv().await;
969            }
970
971            Ok(())
972        }
973    }
974
975    fn test_config() -> WriteCoordinatorConfig {
976        WriteCoordinatorConfig {
977            queue_capacity: 100,
978            flush_interval: Duration::from_secs(3600), // Long interval to avoid timer flushes
979            flush_size_threshold: usize::MAX,
980        }
981    }
982
983    async fn assert_snapshot_has_rows(
984        snapshot: &Arc<dyn StorageSnapshot>,
985        expected: &[(&str, u64, u64)],
986    ) {
987        let records = snapshot.scan(BytesRange::unbounded()).await.unwrap();
988        assert_eq!(
989            records.len(),
990            expected.len(),
991            "expected {} rows but snapshot has {}",
992            expected.len(),
993            records.len()
994        );
995        let mut actual: Vec<(String, u64, u64)> = records
996            .iter()
997            .map(|r| {
998                let key = String::from_utf8(r.key.to_vec()).unwrap();
999                let seq = u64::from_le_bytes(r.value[0..8].try_into().unwrap());
1000                let value = u64::from_le_bytes(r.value[8..16].try_into().unwrap());
1001                (key, seq, value)
1002            })
1003            .collect();
1004        actual.sort_by(|a, b| a.0.cmp(&b.0));
1005        let mut expected: Vec<(&str, u64, u64)> = expected.to_vec();
1006        expected.sort_by(|a, b| a.0.cmp(b.0));
1007        for (actual, expected) in actual.iter().zip(expected.iter()) {
1008            assert_eq!(
1009                actual.0, expected.0,
1010                "key mismatch: got {:?}, expected {:?}",
1011                actual.0, expected.0
1012            );
1013            assert_eq!(
1014                actual.1, expected.1,
1015                "seq mismatch for key {:?}: got {}, expected {}",
1016                actual.0, actual.1, expected.1
1017            );
1018            assert_eq!(
1019                actual.2, expected.2,
1020                "value mismatch for key {:?}: got {}, expected {}",
1021                actual.0, actual.2, expected.2
1022            );
1023        }
1024    }
1025
1026    // ============================================================================
1027    // Basic Write Flow Tests
1028    // ============================================================================
1029
1030    #[tokio::test]
1031    async fn should_assign_monotonic_epochs() {
1032        // given
1033        let flusher = TestFlusher::default();
1034        let mut coordinator = WriteCoordinator::new(
1035            test_config(),
1036            vec!["default".to_string()],
1037            TestContext::default(),
1038            flusher.initial_snapshot().await,
1039            flusher,
1040        );
1041        let handle = coordinator.handle("default");
1042        coordinator.start();
1043
1044        // when
1045        let write1 = handle
1046            .try_write(TestWrite {
1047                key: "a".into(),
1048                value: 1,
1049                size: 10,
1050            })
1051            .await
1052            .unwrap();
1053        let write2 = handle
1054            .try_write(TestWrite {
1055                key: "b".into(),
1056                value: 2,
1057                size: 10,
1058            })
1059            .await
1060            .unwrap();
1061        let write3 = handle
1062            .try_write(TestWrite {
1063                key: "c".into(),
1064                value: 3,
1065                size: 10,
1066            })
1067            .await
1068            .unwrap();
1069
1070        let epoch1 = write1.epoch().await.unwrap();
1071        let epoch2 = write2.epoch().await.unwrap();
1072        let epoch3 = write3.epoch().await.unwrap();
1073
1074        // then
1075        assert!(epoch1 < epoch2);
1076        assert!(epoch2 < epoch3);
1077
1078        // cleanup
1079        coordinator.stop().await;
1080    }
1081
1082    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1083    async fn should_apply_writes_in_order() {
1084        // given
1085        let flusher = TestFlusher::default();
1086        let mut coordinator = WriteCoordinator::new(
1087            test_config(),
1088            vec!["default".to_string()],
1089            TestContext::default(),
1090            flusher.initial_snapshot().await,
1091            flusher.clone(),
1092        );
1093        let handle = coordinator.handle("default");
1094        coordinator.start();
1095
1096        // when
1097        handle
1098            .try_write(TestWrite {
1099                key: "a".into(),
1100                value: 1,
1101                size: 10,
1102            })
1103            .await
1104            .unwrap();
1105        handle
1106            .try_write(TestWrite {
1107                key: "a".into(),
1108                value: 2,
1109                size: 10,
1110            })
1111            .await
1112            .unwrap();
1113        let mut last_write = handle
1114            .try_write(TestWrite {
1115                key: "a".into(),
1116                value: 3,
1117                size: 10,
1118            })
1119            .await
1120            .unwrap();
1121
1122        handle.flush(false).await.unwrap();
1123        // Wait for flush to complete via watermark
1124        last_write.wait(Durability::Written).await.unwrap();
1125
1126        // then
1127        let events = flusher.flushed_events();
1128        assert_eq!(events.len(), 1);
1129        let frozen_delta = &events[0];
1130        let delta = &frozen_delta.val;
1131        // Writing key "a" 3x overwrites; last write wins with seq=2 (0-indexed)
1132        let (seq, value) = delta.writes.get("a").unwrap();
1133        assert_eq!(*value, 3);
1134        assert_eq!(*seq, 2);
1135
1136        // cleanup
1137        coordinator.stop().await;
1138    }
1139
1140    #[tokio::test]
1141    async fn should_update_applied_watermark_after_each_write() {
1142        // given
1143        let flusher = TestFlusher::default();
1144        let mut coordinator = WriteCoordinator::new(
1145            test_config(),
1146            vec!["default".to_string()],
1147            TestContext::default(),
1148            flusher.initial_snapshot().await,
1149            flusher,
1150        );
1151        let handle = coordinator.handle("default");
1152        coordinator.start();
1153
1154        // when
1155        let mut write_handle = handle
1156            .try_write(TestWrite {
1157                key: "a".into(),
1158                value: 1,
1159                size: 10,
1160            })
1161            .await
1162            .unwrap();
1163
1164        // then - wait should succeed immediately after write is applied
1165        let result = write_handle.wait(Durability::Applied).await;
1166        assert!(result.is_ok());
1167
1168        // cleanup
1169        coordinator.stop().await;
1170    }
1171
1172    #[tokio::test]
1173    async fn should_propagate_apply_error_to_handle() {
1174        // given
1175        let flusher = TestFlusher::default();
1176        let context = TestContext {
1177            error: Some("apply error".to_string()),
1178            ..Default::default()
1179        };
1180        let mut coordinator = WriteCoordinator::new(
1181            test_config(),
1182            vec!["default".to_string()],
1183            context,
1184            flusher.initial_snapshot().await,
1185            flusher,
1186        );
1187        let handle = coordinator.handle("default");
1188        coordinator.start();
1189
1190        // when
1191        let write = handle
1192            .try_write(TestWrite {
1193                key: "a".into(),
1194                value: 1,
1195                size: 10,
1196            })
1197            .await
1198            .unwrap();
1199
1200        let result = write.epoch().await;
1201
1202        // then
1203        assert!(
1204            matches!(result, Err(WriteError::ApplyError(epoch, msg)) if epoch == 1 && msg == "apply error")
1205        );
1206
1207        // cleanup
1208        coordinator.stop().await;
1209    }
1210
1211    // ============================================================================
1212    // Manual Flush Tests
1213    // ============================================================================
1214
1215    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1216    async fn should_flush_on_command() {
1217        // given
1218        let flusher = TestFlusher::default();
1219        let mut coordinator = WriteCoordinator::new(
1220            test_config(),
1221            vec!["default".to_string()],
1222            TestContext::default(),
1223            flusher.initial_snapshot().await,
1224            flusher.clone(),
1225        );
1226        let handle = coordinator.handle("default");
1227        coordinator.start();
1228
1229        // when
1230        let mut write = handle
1231            .try_write(TestWrite {
1232                key: "a".into(),
1233                value: 1,
1234                size: 10,
1235            })
1236            .await
1237            .unwrap();
1238        handle.flush(false).await.unwrap();
1239        write.wait(Durability::Written).await.unwrap();
1240
1241        // then
1242        assert_eq!(flusher.flushed_events().len(), 1);
1243
1244        // cleanup
1245        coordinator.stop().await;
1246    }
1247
1248    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1249    async fn should_wait_on_flush_handle() {
1250        // given
1251        let flusher = TestFlusher::default();
1252        let mut coordinator = WriteCoordinator::new(
1253            test_config(),
1254            vec!["default".to_string()],
1255            TestContext::default(),
1256            flusher.initial_snapshot().await,
1257            flusher.clone(),
1258        );
1259        let handle = coordinator.handle("default");
1260        coordinator.start();
1261
1262        // when
1263        handle
1264            .try_write(TestWrite {
1265                key: "a".into(),
1266                value: 1,
1267                size: 10,
1268            })
1269            .await
1270            .unwrap();
1271        let mut flush_handle = handle.flush(false).await.unwrap();
1272
1273        // then - can wait directly on the flush handle
1274        flush_handle.wait(Durability::Written).await.unwrap();
1275        assert_eq!(flusher.flushed_events().len(), 1);
1276
1277        // cleanup
1278        coordinator.stop().await;
1279    }
1280
1281    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1282    async fn should_return_correct_epoch_from_flush_handle() {
1283        // given
1284        let flusher = TestFlusher::default();
1285        let mut coordinator = WriteCoordinator::new(
1286            test_config(),
1287            vec!["default".to_string()],
1288            TestContext::default(),
1289            flusher.initial_snapshot().await,
1290            flusher,
1291        );
1292        let handle = coordinator.handle("default");
1293        coordinator.start();
1294
1295        // when
1296        let write1 = handle
1297            .try_write(TestWrite {
1298                key: "a".into(),
1299                value: 1,
1300                size: 10,
1301            })
1302            .await
1303            .unwrap();
1304        let write2 = handle
1305            .try_write(TestWrite {
1306                key: "b".into(),
1307                value: 2,
1308                size: 10,
1309            })
1310            .await
1311            .unwrap();
1312        let flush_handle = handle.flush(false).await.unwrap();
1313
1314        // then - flush handle epoch should be the last write's epoch
1315        let flush_epoch = flush_handle.epoch().await.unwrap();
1316        let write2_epoch = write2.epoch().await.unwrap();
1317        assert_eq!(flush_epoch, write2_epoch);
1318
1319        // cleanup
1320        coordinator.stop().await;
1321    }
1322
1323    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1324    async fn should_include_all_pending_writes_in_flush() {
1325        // given
1326        let flusher = TestFlusher::default();
1327        let mut coordinator = WriteCoordinator::new(
1328            test_config(),
1329            vec!["default".to_string()],
1330            TestContext::default(),
1331            flusher.initial_snapshot().await,
1332            flusher.clone(),
1333        );
1334        let handle = coordinator.handle("default");
1335        coordinator.start();
1336
1337        // when
1338        handle
1339            .try_write(TestWrite {
1340                key: "a".into(),
1341                value: 1,
1342                size: 10,
1343            })
1344            .await
1345            .unwrap();
1346        handle
1347            .try_write(TestWrite {
1348                key: "b".into(),
1349                value: 2,
1350                size: 10,
1351            })
1352            .await
1353            .unwrap();
1354        let mut last_write = handle
1355            .try_write(TestWrite {
1356                key: "c".into(),
1357                value: 3,
1358                size: 10,
1359            })
1360            .await
1361            .unwrap();
1362
1363        handle.flush(false).await.unwrap();
1364        last_write.wait(Durability::Written).await.unwrap();
1365
1366        // then
1367        let events = flusher.flushed_events();
1368        assert_eq!(events.len(), 1);
1369        let frozen_delta = &events[0];
1370        assert_eq!(frozen_delta.val.writes.len(), 3);
1371        let snapshot = flusher.storage.snapshot().await.unwrap();
1372        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1), ("b", 1, 2), ("c", 2, 3)]).await;
1373
1374        // cleanup
1375        coordinator.stop().await;
1376    }
1377
1378    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1379    async fn should_skip_flush_when_no_new_writes() {
1380        // given
1381        let flusher = TestFlusher::default();
1382        let mut coordinator = WriteCoordinator::new(
1383            test_config(),
1384            vec!["default".to_string()],
1385            TestContext::default(),
1386            flusher.initial_snapshot().await,
1387            flusher.clone(),
1388        );
1389        let handle = coordinator.handle("default");
1390        coordinator.start();
1391
1392        // when
1393        let mut write = handle
1394            .try_write(TestWrite {
1395                key: "a".into(),
1396                value: 1,
1397                size: 10,
1398            })
1399            .await
1400            .unwrap();
1401        handle.flush(false).await.unwrap();
1402        write.wait(Durability::Written).await.unwrap();
1403
1404        // Second flush with no new writes
1405        handle.flush(false).await.unwrap();
1406
1407        // Synchronization: write and wait for applied to ensure the flush command
1408        // has been processed (commands are processed in order)
1409        let sync_write = handle
1410            .try_write(TestWrite {
1411                key: "sync".into(),
1412                value: 0,
1413                size: 1,
1414            })
1415            .await
1416            .unwrap();
1417        sync_write.epoch().await.unwrap();
1418
1419        // then - only one flush should have occurred (the second flush was a no-op)
1420        assert_eq!(flusher.flushed_events().len(), 1);
1421
1422        // cleanup
1423        coordinator.stop().await;
1424    }
1425
1426    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1427    async fn should_update_written_watermark_after_flush() {
1428        // given
1429        let flusher = TestFlusher::default();
1430        let mut coordinator = WriteCoordinator::new(
1431            test_config(),
1432            vec!["default".to_string()],
1433            TestContext::default(),
1434            flusher.initial_snapshot().await,
1435            flusher,
1436        );
1437        let handle = coordinator.handle("default");
1438        coordinator.start();
1439
1440        // when
1441        let mut write_handle = handle
1442            .try_write(TestWrite {
1443                key: "a".into(),
1444                value: 1,
1445                size: 10,
1446            })
1447            .await
1448            .unwrap();
1449
1450        handle.flush(false).await.unwrap();
1451
1452        // then - wait for Written should succeed after flush completes
1453        let result = write_handle.wait(Durability::Written).await;
1454        assert!(result.is_ok());
1455
1456        // cleanup
1457        coordinator.stop().await;
1458    }
1459
1460    // ============================================================================
1461    // Timer-Based Flush Tests
1462    // ============================================================================
1463
1464    #[tokio::test(start_paused = true)]
1465    async fn should_flush_on_flush_interval() {
1466        // given - create coordinator with short flush interval
1467        let flusher = TestFlusher::default();
1468        let config = WriteCoordinatorConfig {
1469            queue_capacity: 100,
1470            flush_interval: Duration::from_millis(100),
1471            flush_size_threshold: usize::MAX,
1472        };
1473        let mut coordinator = WriteCoordinator::new(
1474            config,
1475            vec!["default".to_string()],
1476            TestContext::default(),
1477            flusher.initial_snapshot().await,
1478            flusher.clone(),
1479        );
1480        let handle = coordinator.handle("default");
1481        coordinator.start();
1482
1483        // when - ensure coordinator task runs and then write something
1484        tokio::task::yield_now().await;
1485        let mut write = handle
1486            .try_write(TestWrite {
1487                key: "a".into(),
1488                value: 1,
1489                size: 10,
1490            })
1491            .await
1492            .unwrap();
1493        write.wait(Durability::Applied).await.unwrap();
1494
1495        // then - no flush should have happened yet (interval was reset in run())
1496        assert_eq!(flusher.flushed_events().len(), 0);
1497
1498        // when - advance time past the flush interval from when run() was called
1499        tokio::time::advance(Duration::from_millis(150)).await;
1500        tokio::task::yield_now().await;
1501
1502        // then - flush should have happened
1503        assert_eq!(flusher.flushed_events().len(), 1);
1504        let snapshot = flusher.storage.snapshot().await.unwrap();
1505        assert_snapshot_has_rows(&snapshot, &[("a", 0, 1)]).await;
1506
1507        // cleanup
1508        coordinator.stop().await;
1509    }
1510
1511    // ============================================================================
1512    // Size-Threshold Flush Tests
1513    // ============================================================================
1514
1515    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1516    async fn should_flush_when_size_threshold_exceeded() {
1517        // given
1518        let flusher = TestFlusher::default();
1519        let config = WriteCoordinatorConfig {
1520            queue_capacity: 100,
1521            flush_interval: Duration::from_secs(3600),
1522            flush_size_threshold: 100, // Low threshold for testing
1523        };
1524        let mut coordinator = WriteCoordinator::new(
1525            config,
1526            vec!["default".to_string()],
1527            TestContext::default(),
1528            flusher.initial_snapshot().await,
1529            flusher.clone(),
1530        );
1531        let handle = coordinator.handle("default");
1532        coordinator.start();
1533
1534        // when - write that exceeds threshold
1535        let mut write = handle
1536            .try_write(TestWrite {
1537                key: "a".into(),
1538                value: 1,
1539                size: 150,
1540            })
1541            .await
1542            .unwrap();
1543        write.wait(Durability::Written).await.unwrap();
1544
1545        // then
1546        assert_eq!(flusher.flushed_events().len(), 1);
1547
1548        // cleanup
1549        coordinator.stop().await;
1550    }
1551
1552    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1553    async fn should_accumulate_until_threshold() {
1554        // given
1555        let flusher = TestFlusher::default();
1556        let config = WriteCoordinatorConfig {
1557            queue_capacity: 100,
1558            flush_interval: Duration::from_secs(3600),
1559            flush_size_threshold: 100,
1560        };
1561        let mut coordinator = WriteCoordinator::new(
1562            config,
1563            vec!["default".to_string()],
1564            TestContext::default(),
1565            flusher.initial_snapshot().await,
1566            flusher.clone(),
1567        );
1568        let handle = coordinator.handle("default");
1569        coordinator.start();
1570
1571        // when - small writes that accumulate
1572        for i in 0..5 {
1573            let mut w = handle
1574                .try_write(TestWrite {
1575                    key: format!("key{}", i),
1576                    value: i,
1577                    size: 15,
1578                })
1579                .await
1580                .unwrap();
1581            w.wait(Durability::Applied).await.unwrap();
1582        }
1583
1584        // then - no flush yet (75 bytes < 100 threshold)
1585        assert_eq!(flusher.flushed_events().len(), 0);
1586
1587        // when - write that pushes over threshold
1588        let mut final_write = handle
1589            .try_write(TestWrite {
1590                key: "final".into(),
1591                value: 999,
1592                size: 30,
1593            })
1594            .await
1595            .unwrap();
1596        final_write.wait(Durability::Written).await.unwrap();
1597
1598        // then - should have flushed (105 bytes > 100 threshold)
1599        assert_eq!(flusher.flushed_events().len(), 1);
1600
1601        // cleanup
1602        coordinator.stop().await;
1603    }
1604
1605    // ============================================================================
1606    // Non-Blocking Flush (Concurrency) Tests
1607    // ============================================================================
1608
1609    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1610    async fn should_accept_writes_during_flush() {
1611        // given
1612        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1613        let mut coordinator = WriteCoordinator::new(
1614            test_config(),
1615            vec!["default".to_string()],
1616            TestContext::default(),
1617            flusher.initial_snapshot().await,
1618            flusher.clone(),
1619        );
1620        let handle = coordinator.handle("default");
1621        coordinator.start();
1622
1623        // when: trigger a flush and wait for it to start (proving it's in progress)
1624        let write1 = handle
1625            .try_write(TestWrite {
1626                key: "a".into(),
1627                value: 1,
1628                size: 10,
1629            })
1630            .await
1631            .unwrap();
1632        handle.flush(false).await.unwrap();
1633        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1634
1635        // then: writes during blocked flush still succeed
1636        let write2 = handle
1637            .try_write(TestWrite {
1638                key: "b".into(),
1639                value: 2,
1640                size: 10,
1641            })
1642            .await
1643            .unwrap();
1644        assert!(write2.epoch().await.unwrap() > write1.epoch().await.unwrap());
1645
1646        // cleanup
1647        unblock_tx.send(()).await.unwrap();
1648        coordinator.stop().await;
1649    }
1650
1651    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1652    async fn should_assign_new_epochs_during_flush() {
1653        // given
1654        let (flusher, flush_started_rx, unblock_tx) = TestFlusher::with_flush_control();
1655        let mut coordinator = WriteCoordinator::new(
1656            test_config(),
1657            vec!["default".to_string()],
1658            TestContext::default(),
1659            flusher.initial_snapshot().await,
1660            flusher.clone(),
1661        );
1662        let handle = coordinator.handle("default");
1663        coordinator.start();
1664
1665        // when: write, flush, then write more during blocked flush
1666        handle
1667            .try_write(TestWrite {
1668                key: "a".into(),
1669                value: 1,
1670                size: 10,
1671            })
1672            .await
1673            .unwrap();
1674        handle.flush(false).await.unwrap();
1675        flush_started_rx.await.unwrap(); // wait until flush is actually in progress
1676
1677        // Writes during blocked flush get new epochs
1678        let w1 = handle
1679            .try_write(TestWrite {
1680                key: "b".into(),
1681                value: 2,
1682                size: 10,
1683            })
1684            .await
1685            .unwrap();
1686        let w2 = handle
1687            .try_write(TestWrite {
1688                key: "c".into(),
1689                value: 3,
1690                size: 10,
1691            })
1692            .await
1693            .unwrap();
1694
1695        // then: epochs continue incrementing
1696        let e1 = w1.epoch().await.unwrap();
1697        let e2 = w2.epoch().await.unwrap();
1698        assert!(e1 < e2);
1699
1700        // cleanup
1701        unblock_tx.send(()).await.unwrap();
1702        coordinator.stop().await;
1703    }
1704
1705    // ============================================================================
1706    // Backpressure Tests
1707    // ============================================================================
1708
1709    #[tokio::test]
1710    async fn should_return_backpressure_when_queue_full() {
1711        // given
1712        let flusher = TestFlusher::default();
1713        let config = WriteCoordinatorConfig {
1714            queue_capacity: 2,
1715            flush_interval: Duration::from_secs(3600),
1716            flush_size_threshold: usize::MAX,
1717        };
1718        let mut coordinator = WriteCoordinator::new(
1719            config,
1720            vec!["default".to_string()],
1721            TestContext::default(),
1722            flusher.initial_snapshot().await,
1723            flusher.clone(),
1724        );
1725        let handle = coordinator.handle("default");
1726        // Don't start coordinator - queue will fill
1727
1728        // when - fill the queue
1729        let _ = handle
1730            .try_write(TestWrite {
1731                key: "a".into(),
1732                value: 1,
1733                size: 10,
1734            })
1735            .await;
1736        let _ = handle
1737            .try_write(TestWrite {
1738                key: "b".into(),
1739                value: 2,
1740                size: 10,
1741            })
1742            .await;
1743
1744        // Third write should fail with backpressure
1745        let result = handle
1746            .try_write(TestWrite {
1747                key: "c".into(),
1748                value: 3,
1749                size: 10,
1750            })
1751            .await;
1752
1753        // then
1754        assert!(matches!(result, Err(WriteError::Backpressure(_))));
1755    }
1756
1757    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1758    async fn should_accept_writes_after_queue_drains() {
1759        // given
1760        let flusher = TestFlusher::default();
1761        let config = WriteCoordinatorConfig {
1762            queue_capacity: 2,
1763            flush_interval: Duration::from_secs(3600),
1764            flush_size_threshold: usize::MAX,
1765        };
1766        let mut coordinator = WriteCoordinator::new(
1767            config,
1768            vec!["default".to_string()],
1769            TestContext::default(),
1770            flusher.initial_snapshot().await,
1771            flusher.clone(),
1772        );
1773        let handle = coordinator.handle("default");
1774
1775        // Fill queue without processing
1776        let _ = handle
1777            .try_write(TestWrite {
1778                key: "a".into(),
1779                value: 1,
1780                size: 10,
1781            })
1782            .await;
1783        let mut write_b = handle
1784            .try_write(TestWrite {
1785                key: "b".into(),
1786                value: 2,
1787                size: 10,
1788            })
1789            .await
1790            .unwrap();
1791
1792        // when - start coordinator to drain queue and wait for it to process writes
1793        coordinator.start();
1794        write_b.wait(Durability::Applied).await.unwrap();
1795
1796        // then - writes should succeed now
1797        let result = handle
1798            .try_write(TestWrite {
1799                key: "c".into(),
1800                value: 3,
1801                size: 10,
1802            })
1803            .await;
1804        assert!(result.is_ok());
1805
1806        // cleanup
1807        coordinator.stop().await;
1808    }
1809
1810    // ============================================================================
1811    // Shutdown Tests
1812    // ============================================================================
1813
1814    #[tokio::test]
1815    async fn should_shutdown_cleanly_when_stop_called() {
1816        // given
1817        let flusher = TestFlusher::default();
1818        let mut coordinator = WriteCoordinator::new(
1819            test_config(),
1820            vec!["default".to_string()],
1821            TestContext::default(),
1822            flusher.initial_snapshot().await,
1823            flusher.clone(),
1824        );
1825        let handle = coordinator.handle("default");
1826        coordinator.start();
1827
1828        // when
1829        let result = coordinator.stop().await;
1830
1831        // then - coordinator should return Ok
1832        assert!(result.is_ok());
1833    }
1834
1835    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1836    async fn should_flush_pending_writes_on_shutdown() {
1837        // given
1838        let flusher = TestFlusher::default();
1839        let config = WriteCoordinatorConfig {
1840            queue_capacity: 100,
1841            flush_interval: Duration::from_secs(3600), // Long interval - won't trigger
1842            flush_size_threshold: usize::MAX,          // High threshold - won't trigger
1843        };
1844        let mut coordinator = WriteCoordinator::new(
1845            config,
1846            vec!["default".to_string()],
1847            TestContext::default(),
1848            flusher.initial_snapshot().await,
1849            flusher.clone(),
1850        );
1851        let handle = coordinator.handle("default");
1852        coordinator.start();
1853
1854        // when - write without explicit flush, then shutdown
1855        let write = handle
1856            .try_write(TestWrite {
1857                key: "a".into(),
1858                value: 1,
1859                size: 10,
1860            })
1861            .await
1862            .unwrap();
1863        let epoch = write.epoch().await.unwrap();
1864
1865        // Drop handle to trigger shutdown
1866        coordinator.stop().await;
1867
1868        // then - pending writes should have been flushed
1869        let events = flusher.flushed_events();
1870        assert_eq!(events.len(), 1);
1871        let epoch_range = &events[0].epoch_range;
1872        assert!(epoch_range.contains(&epoch));
1873    }
1874
1875    #[tokio::test]
1876    async fn should_return_shutdown_error_after_coordinator_stops() {
1877        // given
1878        let flusher = TestFlusher::default();
1879        let mut coordinator = WriteCoordinator::new(
1880            test_config(),
1881            vec!["default".to_string()],
1882            TestContext::default(),
1883            flusher.initial_snapshot().await,
1884            flusher.clone(),
1885        );
1886        let handle = coordinator.handle("default");
1887        coordinator.start();
1888
1889        // Stop coordinator
1890        coordinator.stop().await;
1891
1892        // when
1893        let result = handle
1894            .try_write(TestWrite {
1895                key: "a".into(),
1896                value: 1,
1897                size: 10,
1898            })
1899            .await;
1900
1901        // then
1902        assert!(matches!(result, Err(WriteError::Shutdown)));
1903    }
1904
1905    // ============================================================================
1906    // Epoch Range Tracking Tests
1907    // ============================================================================
1908
1909    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1910    async fn should_track_epoch_range_in_flush_event() {
1911        // given
1912        let flusher = TestFlusher::default();
1913        let mut coordinator = WriteCoordinator::new(
1914            test_config(),
1915            vec!["default".to_string()],
1916            TestContext::default(),
1917            flusher.initial_snapshot().await,
1918            flusher.clone(),
1919        );
1920        let handle = coordinator.handle("default");
1921        coordinator.start();
1922
1923        // when
1924        handle
1925            .try_write(TestWrite {
1926                key: "a".into(),
1927                value: 1,
1928                size: 10,
1929            })
1930            .await
1931            .unwrap();
1932        handle
1933            .try_write(TestWrite {
1934                key: "b".into(),
1935                value: 2,
1936                size: 10,
1937            })
1938            .await
1939            .unwrap();
1940        let mut last_write = handle
1941            .try_write(TestWrite {
1942                key: "c".into(),
1943                value: 3,
1944                size: 10,
1945            })
1946            .await
1947            .unwrap();
1948
1949        handle.flush(false).await.unwrap();
1950        last_write.wait(Durability::Written).await.unwrap();
1951
1952        // then
1953        let events = flusher.flushed_events();
1954        assert_eq!(events.len(), 1);
1955        let epoch_range = &events[0].epoch_range;
1956        assert_eq!(epoch_range.start, 1);
1957        assert_eq!(epoch_range.end, 4); // exclusive: one past the last epoch (3)
1958
1959        // cleanup
1960        coordinator.stop().await;
1961    }
1962
1963    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1964    async fn should_have_contiguous_epoch_ranges() {
1965        // given
1966        let flusher = TestFlusher::default();
1967        let mut coordinator = WriteCoordinator::new(
1968            test_config(),
1969            vec!["default".to_string()],
1970            TestContext::default(),
1971            flusher.initial_snapshot().await,
1972            flusher.clone(),
1973        );
1974        let handle = coordinator.handle("default");
1975        coordinator.start();
1976
1977        // when - first batch
1978        handle
1979            .try_write(TestWrite {
1980                key: "a".into(),
1981                value: 1,
1982                size: 10,
1983            })
1984            .await
1985            .unwrap();
1986        let mut write2 = handle
1987            .try_write(TestWrite {
1988                key: "b".into(),
1989                value: 2,
1990                size: 10,
1991            })
1992            .await
1993            .unwrap();
1994        handle.flush(false).await.unwrap();
1995        write2.wait(Durability::Written).await.unwrap();
1996
1997        // when - second batch
1998        let mut write3 = handle
1999            .try_write(TestWrite {
2000                key: "c".into(),
2001                value: 3,
2002                size: 10,
2003            })
2004            .await
2005            .unwrap();
2006        handle.flush(false).await.unwrap();
2007        write3.wait(Durability::Written).await.unwrap();
2008
2009        // then
2010        let events = flusher.flushed_events();
2011        assert_eq!(events.len(), 2);
2012
2013        let range1 = &events[0].epoch_range;
2014        let range2 = &events[1].epoch_range;
2015
2016        // Ranges should be contiguous (end of first == start of second)
2017        assert_eq!(range1.end, range2.start);
2018        assert_eq!(range1, &(1..3));
2019        assert_eq!(range2, &(3..4));
2020
2021        // cleanup
2022        coordinator.stop().await;
2023    }
2024
2025    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2026    async fn should_include_exact_epochs_in_range() {
2027        // given
2028        let flusher = TestFlusher::default();
2029        let mut coordinator = WriteCoordinator::new(
2030            test_config(),
2031            vec!["default".to_string()],
2032            TestContext::default(),
2033            flusher.initial_snapshot().await,
2034            flusher.clone(),
2035        );
2036        let handle = coordinator.handle("default");
2037        coordinator.start();
2038
2039        // when - write and capture the assigned epochs
2040        let write1 = handle
2041            .try_write(TestWrite {
2042                key: "a".into(),
2043                value: 1,
2044                size: 10,
2045            })
2046            .await
2047            .unwrap();
2048        let epoch1 = write1.epoch().await.unwrap();
2049
2050        let mut write2 = handle
2051            .try_write(TestWrite {
2052                key: "b".into(),
2053                value: 2,
2054                size: 10,
2055            })
2056            .await
2057            .unwrap();
2058        let epoch2 = write2.epoch().await.unwrap();
2059
2060        handle.flush(false).await.unwrap();
2061        write2.wait(Durability::Written).await.unwrap();
2062
2063        // then - the epoch_range should contain exactly the epochs assigned to writes
2064        let events = flusher.flushed_events();
2065        assert_eq!(events.len(), 1);
2066        let epoch_range = &events[0].epoch_range;
2067
2068        // The range should start at the first write's epoch
2069        assert_eq!(epoch_range.start, epoch1);
2070        // The range end should be one past the last write's epoch (exclusive)
2071        assert_eq!(epoch_range.end, epoch2 + 1);
2072        // Both epochs should be contained in the range
2073        assert!(epoch_range.contains(&epoch1));
2074        assert!(epoch_range.contains(&epoch2));
2075
2076        // cleanup
2077        coordinator.stop().await;
2078    }
2079
2080    // ============================================================================
2081    // State Carryover (ID Allocation) Tests
2082    // ============================================================================
2083
2084    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2085    async fn should_preserve_context_across_flushes() {
2086        // given
2087        let flusher = TestFlusher::default();
2088        let mut coordinator = WriteCoordinator::new(
2089            test_config(),
2090            vec!["default".to_string()],
2091            TestContext::default(),
2092            flusher.initial_snapshot().await,
2093            flusher.clone(),
2094        );
2095        let handle = coordinator.handle("default");
2096        coordinator.start();
2097
2098        // when - write key "a" in first batch (seq 0)
2099        let mut write1 = handle
2100            .try_write(TestWrite {
2101                key: "a".into(),
2102                value: 1,
2103                size: 10,
2104            })
2105            .await
2106            .unwrap();
2107        handle.flush(false).await.unwrap();
2108        write1.wait(Durability::Written).await.unwrap();
2109
2110        // Write to key "a" again in second batch (seq 1)
2111        let mut write2 = handle
2112            .try_write(TestWrite {
2113                key: "a".into(),
2114                value: 2,
2115                size: 10,
2116            })
2117            .await
2118            .unwrap();
2119        handle.flush(false).await.unwrap();
2120        write2.wait(Durability::Written).await.unwrap();
2121
2122        // then
2123        let events = flusher.flushed_events();
2124        assert_eq!(events.len(), 2);
2125
2126        // Batch 1: "a" with seq 0
2127        let (seq1, _) = events[0].val.writes.get("a").unwrap();
2128        assert_eq!(*seq1, 0);
2129
2130        // Batch 2: "a" with seq 1 (sequence continues)
2131        let (seq2, _) = events[1].val.writes.get("a").unwrap();
2132        assert_eq!(*seq2, 1);
2133
2134        // cleanup
2135        coordinator.stop().await;
2136    }
2137
2138    // ============================================================================
2139    // Subscribe Tests
2140    // ============================================================================
2141
2142    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2143    async fn should_receive_view_on_subscribe() {
2144        // given
2145        let flusher = TestFlusher::default();
2146        let mut coordinator = WriteCoordinator::new(
2147            test_config(),
2148            vec!["default".to_string()],
2149            TestContext::default(),
2150            flusher.initial_snapshot().await,
2151            flusher.clone(),
2152        );
2153        let handle = coordinator.handle("default");
2154        let (mut subscriber, _) = coordinator.subscribe();
2155        subscriber.initialize();
2156        coordinator.start();
2157
2158        // when
2159        handle
2160            .try_write(TestWrite {
2161                key: "a".into(),
2162                value: 1,
2163                size: 10,
2164            })
2165            .await
2166            .unwrap();
2167        handle.flush(false).await.unwrap();
2168
2169        // then - first broadcast is on freeze (delta added to frozen)
2170        let result = subscriber.recv().await;
2171        assert!(result.is_ok());
2172
2173        // cleanup
2174        coordinator.stop().await;
2175    }
2176
2177    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2178    async fn should_include_snapshot_in_view_after_flush() {
2179        // given
2180        let flusher = TestFlusher::default();
2181        let mut coordinator = WriteCoordinator::new(
2182            test_config(),
2183            vec!["default".to_string()],
2184            TestContext::default(),
2185            flusher.initial_snapshot().await,
2186            flusher.clone(),
2187        );
2188        let handle = coordinator.handle("default");
2189        let (mut subscriber, _) = coordinator.subscribe();
2190        subscriber.initialize();
2191        coordinator.start();
2192
2193        // when
2194        handle
2195            .try_write(TestWrite {
2196                key: "a".into(),
2197                value: 1,
2198                size: 10,
2199            })
2200            .await
2201            .unwrap();
2202        handle.flush(false).await.unwrap();
2203
2204        // First broadcast: freeze (frozen delta added)
2205        let _ = subscriber.recv().await.unwrap();
2206        // Second broadcast: flush complete (snapshot updated)
2207        let result = subscriber.recv().await.unwrap();
2208
2209        // then - snapshot should contain the flushed data
2210        assert_snapshot_has_rows(&result.snapshot, &[("a", 0, 1)]).await;
2211
2212        // cleanup
2213        coordinator.stop().await;
2214    }
2215
2216    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2217    async fn should_include_delta_in_view_after_flush() {
2218        // given
2219        let flusher = TestFlusher::default();
2220        let mut coordinator = WriteCoordinator::new(
2221            test_config(),
2222            vec!["default".to_string()],
2223            TestContext::default(),
2224            flusher.initial_snapshot().await,
2225            flusher.clone(),
2226        );
2227        let handle = coordinator.handle("default");
2228        let (mut subscriber, _) = coordinator.subscribe();
2229        subscriber.initialize();
2230        coordinator.start();
2231
2232        // when
2233        handle
2234            .try_write(TestWrite {
2235                key: "a".into(),
2236                value: 42,
2237                size: 10,
2238            })
2239            .await
2240            .unwrap();
2241        handle.flush(false).await.unwrap();
2242
2243        // First broadcast: freeze
2244        let _ = subscriber.recv().await.unwrap();
2245        // Second broadcast: flush complete
2246        let result = subscriber.recv().await.unwrap();
2247
2248        // then - last_written_delta should contain the write we made
2249        let flushed = result.last_written_delta.as_ref().unwrap();
2250        assert_eq!(flushed.val.get("a"), Some(&42));
2251
2252        // cleanup
2253        coordinator.stop().await;
2254    }
2255
2256    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2257    async fn should_include_epoch_range_in_view_after_flush() {
2258        // given
2259        let flusher = TestFlusher::default();
2260        let mut coordinator = WriteCoordinator::new(
2261            test_config(),
2262            vec!["default".to_string()],
2263            TestContext::default(),
2264            flusher.initial_snapshot().await,
2265            flusher.clone(),
2266        );
2267        let handle = coordinator.handle("default");
2268        let (mut subscriber, _) = coordinator.subscribe();
2269        subscriber.initialize();
2270        coordinator.start();
2271
2272        // when
2273        let write1 = handle
2274            .try_write(TestWrite {
2275                key: "a".into(),
2276                value: 1,
2277                size: 10,
2278            })
2279            .await
2280            .unwrap();
2281        let write2 = handle
2282            .try_write(TestWrite {
2283                key: "b".into(),
2284                value: 2,
2285                size: 10,
2286            })
2287            .await
2288            .unwrap();
2289        handle.flush(false).await.unwrap();
2290
2291        // First broadcast: freeze
2292        let _ = subscriber.recv().await.unwrap();
2293        // Second broadcast: flush complete
2294        let result = subscriber.recv().await.unwrap();
2295
2296        // then - epoch range from last_written_delta should contain the epochs
2297        let flushed = result.last_written_delta.as_ref().unwrap();
2298        let epoch1 = write1.epoch().await.unwrap();
2299        let epoch2 = write2.epoch().await.unwrap();
2300        assert!(flushed.epoch_range.contains(&epoch1));
2301        assert!(flushed.epoch_range.contains(&epoch2));
2302        assert_eq!(flushed.epoch_range.start, epoch1);
2303        assert_eq!(flushed.epoch_range.end, epoch2 + 1);
2304
2305        // cleanup
2306        coordinator.stop().await;
2307    }
2308
2309    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2310    async fn should_broadcast_frozen_delta_on_freeze() {
2311        // given
2312        let flusher = TestFlusher::default();
2313        let mut coordinator = WriteCoordinator::new(
2314            test_config(),
2315            vec!["default".to_string()],
2316            TestContext::default(),
2317            flusher.initial_snapshot().await,
2318            flusher.clone(),
2319        );
2320        let handle = coordinator.handle("default");
2321        let (mut subscriber, _) = coordinator.subscribe();
2322        subscriber.initialize();
2323        coordinator.start();
2324
2325        // when
2326        handle
2327            .try_write(TestWrite {
2328                key: "a".into(),
2329                value: 1,
2330                size: 10,
2331            })
2332            .await
2333            .unwrap();
2334        handle.flush(false).await.unwrap();
2335
2336        // then - first broadcast should have the frozen delta in the frozen vec
2337        let state = subscriber.recv().await.unwrap();
2338        assert_eq!(state.frozen.len(), 1);
2339        assert!(state.frozen[0].val.contains_key("a"));
2340
2341        // cleanup
2342        coordinator.stop().await;
2343    }
2344
2345    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2346    async fn should_remove_frozen_delta_after_flush_complete() {
2347        // given
2348        let flusher = TestFlusher::default();
2349        let mut coordinator = WriteCoordinator::new(
2350            test_config(),
2351            vec!["default".to_string()],
2352            TestContext::default(),
2353            flusher.initial_snapshot().await,
2354            flusher.clone(),
2355        );
2356        let handle = coordinator.handle("default");
2357        let (mut subscriber, _) = coordinator.subscribe();
2358        subscriber.initialize();
2359        coordinator.start();
2360
2361        // when
2362        handle
2363            .try_write(TestWrite {
2364                key: "a".into(),
2365                value: 1,
2366                size: 10,
2367            })
2368            .await
2369            .unwrap();
2370        handle.flush(false).await.unwrap();
2371
2372        // First broadcast: freeze (frozen has 1 entry)
2373        let state1 = subscriber.recv().await.unwrap();
2374        assert_eq!(state1.frozen.len(), 1);
2375
2376        // Second broadcast: flush complete (frozen is empty, last_written_delta set)
2377        let state2 = subscriber.recv().await.unwrap();
2378        assert_eq!(state2.frozen.len(), 0);
2379        assert!(state2.last_written_delta.is_some());
2380
2381        // cleanup
2382        coordinator.stop().await;
2383    }
2384
2385    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2386    async fn should_recover_from_message_lost_subscriber() {
2387        // given - a coordinator with a small broadcast buffer (16)
2388        let flusher = TestFlusher::default();
2389        let mut coordinator = WriteCoordinator::new(
2390            test_config(),
2391            vec!["default".to_string()],
2392            TestContext::default(),
2393            flusher.initial_snapshot().await,
2394            flusher.clone(),
2395        );
2396        let handle = coordinator.handle("default");
2397        let (mut subscriber, _) = coordinator.subscribe();
2398        subscriber.initialize();
2399        coordinator.start();
2400
2401        // when - write and flush enough times to overflow the broadcast buffer
2402        // (capacity is 16, so ~20 flushes should cause lag)
2403        // The subscriber never called recv(), so all broadcasts accumulate and overflow the buffer
2404        for i in 0..20 {
2405            let _write_handle = handle
2406                .try_write(TestWrite {
2407                    key: format!("key_{}", i),
2408                    value: i as u64,
2409                    size: 10,
2410                })
2411                .await
2412                .unwrap();
2413            let _ = handle.flush(false).await.unwrap();
2414        }
2415
2416        // make changes durable
2417        let mut watermark = handle
2418            .flush(true)
2419            .await
2420            .expect("flush(true) should succeed");
2421
2422        // wait for durability watermark
2423        watermark.wait(Durability::Durable).await;
2424
2425        // expect SubscribeError to be MessageLost
2426        let result = subscriber
2427            .recv()
2428            .await
2429            .expect_err("expected recv() to yield an error");
2430        assert!(matches!(result, SubscribeError::MessageLost));
2431
2432        // when - resubscribe to recover
2433        let (rx, initial_view) = handle.subscribe();
2434        (subscriber, _) = ViewSubscriber::new(rx, initial_view);
2435        let view = subscriber.initialize();
2436
2437        // then - the fresh view should reflect the current state
2438        // (all 20 writes should be in the snapshot after all the flushes)
2439        let records = view.snapshot.scan(BytesRange::unbounded()).await.unwrap();
2440        assert!(
2441            records.len() >= 20,
2442            "expected at least 20 rows, got {}",
2443            records.len()
2444        );
2445
2446        // and - we should be able to receive future broadcasts
2447        let _write_handle = handle
2448            .try_write(TestWrite {
2449                key: "post_recovery".into(),
2450                value: 100,
2451                size: 10,
2452            })
2453            .await
2454            .unwrap();
2455        let _ = handle.flush(false).await.unwrap();
2456
2457        let result = subscriber.recv().await;
2458        assert!(result.is_ok());
2459
2460        // cleanup
2461        coordinator.stop().await;
2462    }
2463
2464    // ============================================================================
2465    // Durable Flush Tests
2466    // ============================================================================
2467
2468    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2469    async fn should_flush_even_when_no_writes_if_flush_storage() {
2470        // given
2471        let flusher = TestFlusher::default();
2472        let storage = Arc::new(InMemoryStorage::new());
2473        let snapshot = storage.snapshot().await.unwrap();
2474        let mut coordinator = WriteCoordinator::new(
2475            test_config(),
2476            vec!["default".to_string()],
2477            TestContext::default(),
2478            snapshot,
2479            flusher.clone(),
2480        );
2481        let handle = coordinator.handle("default");
2482        coordinator.start();
2483
2484        // when - flush with flush_storage but no pending writes
2485        let mut flush_handle = handle.flush(true).await.unwrap();
2486        flush_handle.wait(Durability::Durable).await.unwrap();
2487
2488        // then - flusher was called (durable event sent) but no delta was recorded
2489        // (TestFlusher only records events with deltas)
2490        assert_eq!(flusher.flushed_events().len(), 0);
2491
2492        // cleanup
2493        coordinator.stop().await;
2494    }
2495
2496    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2497    async fn should_advance_durable_watermark() {
2498        // given
2499        let flusher = TestFlusher::default();
2500        let storage = Arc::new(InMemoryStorage::new());
2501        let snapshot = storage.snapshot().await.unwrap();
2502        let mut coordinator = WriteCoordinator::new(
2503            test_config(),
2504            vec!["default".to_string()],
2505            TestContext::default(),
2506            snapshot,
2507            flusher.clone(),
2508        );
2509        let handle = coordinator.handle("default");
2510        coordinator.start();
2511
2512        // when - write and flush with durable
2513        let mut write = handle
2514            .try_write(TestWrite {
2515                key: "a".into(),
2516                value: 1,
2517                size: 10,
2518            })
2519            .await
2520            .unwrap();
2521        let mut flush_handle = handle.flush(true).await.unwrap();
2522
2523        // then - can wait for Durable durability level
2524        flush_handle.wait(Durability::Durable).await.unwrap();
2525        write.wait(Durability::Durable).await.unwrap();
2526        assert_eq!(flusher.flushed_events().len(), 1);
2527
2528        // cleanup
2529        coordinator.stop().await;
2530    }
2531
2532    #[tokio::test]
2533    async fn should_see_applied_write_via_view() {
2534        // given
2535        let flusher = TestFlusher::default();
2536        let mut coordinator = WriteCoordinator::new(
2537            test_config(),
2538            vec!["default".to_string()],
2539            TestContext::default(),
2540            flusher.initial_snapshot().await,
2541            flusher,
2542        );
2543        let handle = coordinator.handle("default");
2544        coordinator.start();
2545
2546        // when
2547        let mut write = handle
2548            .try_write(TestWrite {
2549                key: "a".into(),
2550                value: 42,
2551                size: 10,
2552            })
2553            .await
2554            .unwrap();
2555        write.wait(Durability::Applied).await.unwrap();
2556
2557        // then
2558        let view = coordinator.view();
2559        assert_eq!(view.current.get("a"), Some(42));
2560
2561        // cleanup
2562        coordinator.stop().await;
2563    }
2564
2565    // ============================================================================
2566    // Multi-Channel Tests
2567    // ============================================================================
2568
2569    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2570    async fn should_flush_writes_from_multiple_channels() {
2571        // given
2572        let flusher = TestFlusher::default();
2573        let mut coordinator = WriteCoordinator::new(
2574            test_config(),
2575            vec!["ch1".to_string(), "ch2".to_string()],
2576            TestContext::default(),
2577            flusher.initial_snapshot().await,
2578            flusher.clone(),
2579        );
2580        let ch1 = coordinator.handle("ch1");
2581        let ch2 = coordinator.handle("ch2");
2582        coordinator.start();
2583
2584        // when - write to both channels, waiting for each to be applied
2585        // to ensure deterministic ordering
2586        let mut w1 = ch1
2587            .try_write(TestWrite {
2588                key: "a".into(),
2589                value: 10,
2590                size: 10,
2591            })
2592            .await
2593            .unwrap();
2594        w1.wait(Durability::Applied).await.unwrap();
2595
2596        let mut w2 = ch2
2597            .try_write(TestWrite {
2598                key: "b".into(),
2599                value: 20,
2600                size: 10,
2601            })
2602            .await
2603            .unwrap();
2604        w2.wait(Durability::Applied).await.unwrap();
2605
2606        let mut w3 = ch1
2607            .try_write(TestWrite {
2608                key: "c".into(),
2609                value: 30,
2610                size: 10,
2611            })
2612            .await
2613            .unwrap();
2614        w3.wait(Durability::Applied).await.unwrap();
2615
2616        ch1.flush(false).await.unwrap();
2617        w3.wait(Durability::Written).await.unwrap();
2618
2619        // then - snapshot should contain writes from both channels
2620        let snapshot = flusher.storage.snapshot().await.unwrap();
2621        assert_snapshot_has_rows(&snapshot, &[("a", 0, 10), ("b", 1, 20), ("c", 2, 30)]).await;
2622
2623        // cleanup
2624        coordinator.stop().await;
2625    }
2626
2627    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2628    async fn should_succeed_with_write_timeout_when_queue_has_space() {
2629        // given
2630        let flusher = TestFlusher::default();
2631        let mut coordinator = WriteCoordinator::new(
2632            test_config(),
2633            vec!["default".to_string()],
2634            TestContext::default(),
2635            flusher.initial_snapshot().await,
2636            flusher.clone(),
2637        );
2638        let handle = coordinator.handle("default");
2639        coordinator.start();
2640
2641        // when
2642        let mut wh = handle
2643            .write_timeout(
2644                TestWrite {
2645                    key: "a".into(),
2646                    value: 1,
2647                    size: 10,
2648                },
2649                Duration::from_secs(1),
2650            )
2651            .await
2652            .unwrap();
2653
2654        // then
2655        let result = wh.wait(Durability::Applied).await;
2656        assert!(result.is_ok());
2657
2658        // cleanup
2659        coordinator.stop().await;
2660    }
2661
2662    #[tokio::test]
2663    async fn should_timeout_when_queue_full() {
2664        // given - queue_capacity=2, coordinator NOT started so nothing drains
2665        let flusher = TestFlusher::default();
2666        let config = WriteCoordinatorConfig {
2667            queue_capacity: 2,
2668            flush_interval: Duration::from_secs(3600),
2669            flush_size_threshold: usize::MAX,
2670        };
2671        let mut coordinator = WriteCoordinator::new(
2672            config,
2673            vec!["default".to_string()],
2674            TestContext::default(),
2675            flusher.initial_snapshot().await,
2676            flusher.clone(),
2677        );
2678        let handle = coordinator.handle("default");
2679
2680        // fill the queue
2681        let _ = handle
2682            .try_write(TestWrite {
2683                key: "a".into(),
2684                value: 1,
2685                size: 10,
2686            })
2687            .await;
2688        let _ = handle
2689            .try_write(TestWrite {
2690                key: "b".into(),
2691                value: 2,
2692                size: 10,
2693            })
2694            .await;
2695
2696        // when - third write should time out
2697        let result = handle
2698            .write_timeout(
2699                TestWrite {
2700                    key: "c".into(),
2701                    value: 3,
2702                    size: 10,
2703                },
2704                Duration::from_millis(10),
2705            )
2706            .await;
2707
2708        // then
2709        assert!(matches!(result, Err(WriteError::TimeoutError(_))));
2710    }
2711
2712    #[tokio::test]
2713    async fn should_return_write_in_timeout_error() {
2714        // given - queue_capacity=1, coordinator NOT started
2715        let flusher = TestFlusher::default();
2716        let config = WriteCoordinatorConfig {
2717            queue_capacity: 1,
2718            flush_interval: Duration::from_secs(3600),
2719            flush_size_threshold: usize::MAX,
2720        };
2721        let mut coordinator = WriteCoordinator::new(
2722            config,
2723            vec!["default".to_string()],
2724            TestContext::default(),
2725            flusher.initial_snapshot().await,
2726            flusher.clone(),
2727        );
2728        let handle = coordinator.handle("default");
2729
2730        // fill the queue
2731        let _ = handle
2732            .try_write(TestWrite {
2733                key: "a".into(),
2734                value: 1,
2735                size: 10,
2736            })
2737            .await;
2738
2739        // when
2740        let result = handle
2741            .write_timeout(
2742                TestWrite {
2743                    key: "retry_me".into(),
2744                    value: 42,
2745                    size: 10,
2746                },
2747                Duration::from_millis(10),
2748            )
2749            .await;
2750        let Err(err) = result else {
2751            panic!("expected TimeoutError");
2752        };
2753
2754        // then - original write is returned inside the error
2755        let write = err.into_inner().expect("should contain the write");
2756        assert_eq!(write.key, "retry_me");
2757        assert_eq!(write.value, 42);
2758    }
2759
2760    #[tokio::test]
2761    async fn should_return_write_in_backpressure_error() {
2762        // given - queue_capacity=1, coordinator NOT started
2763        let flusher = TestFlusher::default();
2764        let config = WriteCoordinatorConfig {
2765            queue_capacity: 1,
2766            flush_interval: Duration::from_secs(3600),
2767            flush_size_threshold: usize::MAX,
2768        };
2769        let mut coordinator = WriteCoordinator::new(
2770            config,
2771            vec!["default".to_string()],
2772            TestContext::default(),
2773            flusher.initial_snapshot().await,
2774            flusher.clone(),
2775        );
2776        let handle = coordinator.handle("default");
2777
2778        // fill the queue
2779        let _ = handle
2780            .try_write(TestWrite {
2781                key: "a".into(),
2782                value: 1,
2783                size: 10,
2784            })
2785            .await;
2786
2787        // when
2788        let result = handle
2789            .try_write(TestWrite {
2790                key: "retry_me".into(),
2791                value: 42,
2792                size: 10,
2793            })
2794            .await;
2795        let Err(err) = result else {
2796            panic!("expected Backpressure");
2797        };
2798
2799        // then - original write is returned inside the error
2800        let write = err.into_inner().expect("should contain the write");
2801        assert_eq!(write.key, "retry_me");
2802        assert_eq!(write.value, 42);
2803    }
2804
2805    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2806    async fn should_succeed_when_queue_drains_within_timeout() {
2807        // given - queue_capacity=2, coordinator started so it drains
2808        let flusher = TestFlusher::default();
2809        let config = WriteCoordinatorConfig {
2810            queue_capacity: 2,
2811            flush_interval: Duration::from_secs(3600),
2812            flush_size_threshold: usize::MAX,
2813        };
2814        let mut coordinator = WriteCoordinator::new(
2815            config,
2816            vec!["default".to_string()],
2817            TestContext::default(),
2818            flusher.initial_snapshot().await,
2819            flusher.clone(),
2820        );
2821        let handle = coordinator.handle("default");
2822
2823        // fill the queue before starting
2824        let _ = handle
2825            .try_write(TestWrite {
2826                key: "a".into(),
2827                value: 1,
2828                size: 10,
2829            })
2830            .await;
2831        let _ = handle
2832            .try_write(TestWrite {
2833                key: "b".into(),
2834                value: 2,
2835                size: 10,
2836            })
2837            .await;
2838
2839        // when - start coordinator (begins draining) and write with generous timeout
2840        coordinator.start();
2841        let result = handle
2842            .write_timeout(
2843                TestWrite {
2844                    key: "c".into(),
2845                    value: 3,
2846                    size: 10,
2847                },
2848                Duration::from_secs(5),
2849            )
2850            .await;
2851
2852        // then
2853        assert!(result.is_ok());
2854
2855        // cleanup
2856        coordinator.stop().await;
2857    }
2858
2859    #[tokio::test]
2860    async fn should_return_shutdown_on_write_timeout_after_coordinator_stops() {
2861        // given
2862        let flusher = TestFlusher::default();
2863        let mut coordinator = WriteCoordinator::new(
2864            test_config(),
2865            vec!["default".to_string()],
2866            TestContext::default(),
2867            flusher.initial_snapshot().await,
2868            flusher.clone(),
2869        );
2870        let handle = coordinator.handle("default");
2871        coordinator.start();
2872
2873        // when
2874        coordinator.stop().await;
2875        let result = handle
2876            .write_timeout(
2877                TestWrite {
2878                    key: "a".into(),
2879                    value: 1,
2880                    size: 10,
2881                },
2882                Duration::from_secs(1),
2883            )
2884            .await;
2885
2886        // then
2887        assert!(matches!(result, Err(WriteError::Shutdown)));
2888    }
2889
2890    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2891    async fn should_pause_and_resume_write_channel() {
2892        // given
2893        let flusher = TestFlusher::default();
2894        let mut config = test_config();
2895        config.flush_size_threshold = usize::MAX;
2896        config.flush_interval = Duration::from_hours(24);
2897        let mut coordinator = WriteCoordinator::new(
2898            config,
2899            vec!["a", "b"],
2900            TestContext::default(),
2901            flusher.initial_snapshot().await,
2902            flusher.clone(),
2903        );
2904        let handle_a = coordinator.handle("a");
2905        let handle_b = coordinator.handle("b");
2906        let pause_handle = coordinator.pause_handle("a");
2907
2908        // pause before starting the coordinator. otherwise
2909        // there's a race condition where the PausableReceiver
2910        // makes it past pause_rx.wait_for(|v| !*v).await; and
2911        // waits for recv before we pause the handle - we could
2912        // change the behavior of the PausableReceiver to first
2913        // wait on recv and then check pause before returning
2914        // but that feels weird and is not necessary for the use
2915        // cases we have in mind
2916        pause_handle.pause();
2917        coordinator.start();
2918
2919        // when
2920        let mut result_a = handle_a
2921            .try_write(TestWrite {
2922                key: "a".into(),
2923                value: 1,
2924                size: 1,
2925            })
2926            .await
2927            .unwrap();
2928        for i in 0..1000 {
2929            handle_b
2930                .try_write(TestWrite {
2931                    key: format!("b{}", i),
2932                    value: i,
2933                    size: 1,
2934                })
2935                .await
2936                .unwrap()
2937                .wait(Durability::Applied)
2938                .await
2939                .unwrap();
2940        }
2941
2942        // then
2943        // make sure the data only contains keys from b (so a was never processed)
2944        let data = coordinator.view().current.data.lock().unwrap().clone();
2945        let mut expected = (0..1000).map(|i| format!("b{}", i)).collect::<HashSet<_>>();
2946        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2947        pause_handle.unpause();
2948        // after resuming, wait for the data to include keys from a
2949        result_a.wait(Durability::Applied).await.unwrap();
2950        let data = coordinator.view().current.data.lock().unwrap().clone();
2951        expected.insert("a".into());
2952        assert_eq!(data.keys().cloned().collect::<HashSet<_>>(), expected);
2953    }
2954}