Skip to main content

daemon/
lib.rs

1//! The `flusso` daemon — the supervisor around the [`engine`].
2//!
3//! It builds the pluggable parts from a validated [`Config`], wires a
4//! [`StatusObserver`] that updates a shared [`Status`], runs the engine, and
5//! polls source lag out of band.
6//!
7//! It owns the **domain**: the pipeline and its observable state, and it is
8//! telemetry-agnostic — it depends only on the engine's [`Observer`] trait, not
9//! on any metrics backend. It does *not* own **transport**: the HTTP surface,
10//! process signals, the telemetry exporter, *and the metrics recording itself*
11//! live in the binary (the CLI), which installs a meter provider, attaches its
12//! own metrics observer via [`Daemon::with_observer`], reads the [`Status`]
13//! handle this exposes, serves it, and drives shutdown:
14//!
15//! ```text
16//!   CLI ── install meter provider ─▶ Daemon::start ──▶ RunningDaemon
17//!    │                                                   │  .status() ─▶ Arc<Status>  (CLI serves it)
18//!    └── shutdown future (signals) ─▶ RunningDaemon::run(shutdown)
19//! ```
20
21mod backends;
22mod lag;
23mod observer;
24pub mod status;
25
26pub use backends::{Backends, SourceParts};
27pub use observer::StatusObserver;
28pub use status::{IndexState, Phase, Status, StatusSnapshot};
29
30// Re-exported so a binary can attach its own observer (e.g. a metrics recorder)
31// without depending on `engine`/`schema-core` directly — these are part of the
32// daemon's observe-the-pipeline surface.
33pub use engine::{BatchStats, Observer};
34pub use schema_core::IndexName;
35
36use std::future::Future;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39
40use anyhow::Context;
41use engine::{Engine, FailurePolicies, FanOut};
42use schema::Config;
43use sources_core::cdc::ChangeCapture;
44
45/// How a [`Daemon`] run is parameterized — the pipeline knobs the CLI exposes as
46/// flags. Transport settings (HTTP address, …) are the binary's concern, not the
47/// daemon's, so they are not here.
48#[derive(Debug, Clone)]
49pub struct DaemonOptions {
50    /// Logical replication slot to consume. Must already exist or be creatable.
51    pub slot: String,
52    /// Publication to subscribe to.
53    pub publication: String,
54    /// Skip the initial backfill and resume live capture only.
55    pub skip_backfill: bool,
56    /// Changes buffered between capture and processing.
57    pub queue_capacity: usize,
58    /// Pretty-print documents on the stdout fallback sink (no sink configured).
59    pub pretty: bool,
60    /// How often to sample source capture lag.
61    pub lag_poll_interval: Duration,
62}
63
64impl Default for DaemonOptions {
65    fn default() -> Self {
66        Self {
67            slot: "flusso".to_owned(),
68            publication: "flusso".to_owned(),
69            skip_backfill: false,
70            queue_capacity: 1024,
71            pretty: false,
72            lag_poll_interval: Duration::from_secs(15),
73        }
74    }
75}
76
77/// A configured-but-not-yet-running sync daemon over one [`Config`].
78#[derive(Debug)]
79pub struct Daemon {
80    config: Config,
81    options: DaemonOptions,
82    backends: Arc<dyn Backends>,
83    extra_observers: Vec<Arc<dyn Observer>>,
84    status: Option<Arc<Status>>,
85}
86
87impl Daemon {
88    /// Create a daemon for `config` with default [`DaemonOptions`].
89    ///
90    /// `backends` builds the concrete source/sink the engine drives; the daemon
91    /// itself never names a backend (see [`Backends`]). The composition root
92    /// supplies it.
93    pub fn new(config: Config, backends: Arc<dyn Backends>) -> Self {
94        Self {
95            config,
96            options: DaemonOptions::default(),
97            backends,
98            extra_observers: Vec::new(),
99            status: None,
100        }
101    }
102
103    /// Override the run options.
104    pub fn with_options(mut self, options: DaemonOptions) -> Self {
105        self.options = options;
106        self
107    }
108
109    /// Attach an additional [`Observer`] alongside the daemon's own status
110    /// observer — e.g. a metrics recorder the binary owns. All attached
111    /// observers receive every event (the engine drives a [`FanOut`]).
112    pub fn with_observer(mut self, observer: Arc<dyn Observer>) -> Self {
113        self.extra_observers.push(observer);
114        self
115    }
116
117    /// Provide the [`Status`] handle to update instead of minting a fresh one.
118    ///
119    /// The binary uses this to keep **one** process-lifetime status across
120    /// pipeline restarts (e.g. an on-demand reindex): the long-lived HTTP surface
121    /// and metrics keep reading the same handle, and its counters and uptime
122    /// survive the restart rather than resetting. Without it, [`start`](Self::start)
123    /// creates a new status each time.
124    pub fn with_status(mut self, status: Arc<Status>) -> Self {
125        self.status = Some(status);
126        self
127    }
128
129    /// Build the pipeline and its observable state, returning a [`RunningDaemon`]
130    /// whose [`status`](RunningDaemon::status) can be read (e.g. served over HTTP)
131    /// while it runs.
132    ///
133    /// If an attached observer (via [`with_observer`](Self::with_observer)) records
134    /// to the global OpenTelemetry meter, install a meter provider *before* calling
135    /// this; otherwise its instruments are no-ops.
136    #[tracing::instrument(name = "daemon.start", skip_all)]
137    pub async fn start(self) -> anyhow::Result<RunningDaemon> {
138        let Daemon {
139            config,
140            options,
141            backends,
142            extra_observers,
143            status,
144        } = self;
145
146        tracing::info!(
147            slot = %options.slot,
148            publication = %options.publication,
149            indexes = config.indexes.len(),
150            "starting sync",
151        );
152
153        // Reuse a caller-provided status (so it survives restarts) or mint a
154        // fresh one. Either way reset the phase to `Starting`: a reused status may
155        // have been left `Stopped` by a previous run.
156        let status = status.unwrap_or_else(|| {
157            Arc::new(Status::new(config.indexes.keys().cloned(), Instant::now()))
158        });
159        status.set_phase(Phase::Starting);
160        // The daemon's own observer updates status; any binary-supplied observers
161        // (metrics, …) ride alongside it through one fan-out.
162        let mut observers: Vec<Arc<dyn Observer>> =
163            vec![Arc::new(StatusObserver::new(Arc::clone(&status)))];
164        observers.extend(extra_observers);
165        let observer: Arc<dyn Observer> = Arc::new(FanOut::new(observers));
166
167        // The concrete source/sink are the composition root's choice — built
168        // here through the `Backends` seam, resolving connection/credentials in
169        // this (the running) environment.
170        let config = Arc::new(config);
171        let SourceParts { capture, documents } =
172            backends.source(Arc::clone(&config), &options).await?;
173        let sink = backends.sink(&config, &options).await?;
174
175        // The item-level failure policy is config-driven: a global default plus
176        // optional per-index overrides, keyed by logical index name.
177        let mut failure_policies = FailurePolicies::new(config.on_error);
178        for (name, index) in &config.indexes {
179            if let Some(policy) = index.on_error {
180                failure_policies = failure_policies.with_override(name.as_ref(), policy);
181            }
182        }
183
184        let engine = Engine::new(Arc::clone(&capture), documents, sink)
185            .with_observer(Arc::clone(&observer))
186            .with_queue_capacity(options.queue_capacity)
187            .skip_backfill(options.skip_backfill)
188            .with_failure_policies(failure_policies);
189
190        Ok(RunningDaemon {
191            status,
192            engine,
193            source: capture,
194            observer,
195            lag_poll_interval: options.lag_poll_interval,
196        })
197    }
198}
199
200/// A built sync daemon, ready to run. Exposes its live [`Status`] so a transport
201/// the binary owns can serve it concurrently with the run.
202#[derive(Debug)]
203pub struct RunningDaemon {
204    status: Arc<Status>,
205    engine: Engine,
206    source: Arc<dyn ChangeCapture>,
207    observer: Arc<dyn Observer>,
208    lag_poll_interval: Duration,
209}
210
211impl RunningDaemon {
212    /// A handle to the live operational status, for a transport (HTTP, a TUI, …)
213    /// to read while the daemon runs. Cheap to clone.
214    pub fn status(&self) -> Arc<Status> {
215        Arc::clone(&self.status)
216    }
217
218    /// Run until the live stream ends, an error stops the pipeline, or `shutdown`
219    /// resolves — typically a signal future the binary owns. A pending batch on
220    /// shutdown is simply redelivered on the next run (at-least-once), so
221    /// dropping the run mid-flight is safe.
222    #[tracing::instrument(name = "daemon.run", skip_all)]
223    pub async fn run(self, shutdown: impl Future<Output = ()> + Send) -> anyhow::Result<()> {
224        let RunningDaemon {
225            status,
226            engine,
227            source,
228            observer,
229            lag_poll_interval,
230        } = self;
231
232        // Poll capture lag alongside the run. Held in a guard so it's aborted
233        // however this returns — a normal stop *or* the future being cancelled
234        // (e.g. the binary dropping the run for a reindex restart) — rather than
235        // detaching onto the shared status.
236        let _lag = LagGuard(tokio::spawn(lag::poll(source, observer, lag_poll_interval)));
237
238        let result = tokio::select! {
239            res = engine.run() => res.context("sync engine stopped"),
240            () = shutdown => {
241                tracing::info!("shutdown requested; stopping pipeline");
242                Ok(())
243            }
244        };
245
246        status.set_phase(Phase::Stopped);
247        result
248    }
249}
250
251/// Aborts the lag poller when dropped — on a normal stop or on cancellation
252/// (the run future being dropped for a restart) alike. Its result is discarded,
253/// so there's nothing to join.
254#[derive(Debug)]
255struct LagGuard(tokio::task::JoinHandle<()>);
256
257impl Drop for LagGuard {
258    fn drop(&mut self) {
259        self.0.abort();
260    }
261}
262
263#[cfg(test)]
264#[allow(clippy::unwrap_used)]
265mod tests {
266    use super::*;
267
268    use std::collections::BTreeMap;
269    use std::sync::Mutex;
270    use std::sync::atomic::{AtomicU64, Ordering};
271    use std::time::Duration;
272
273    use async_trait::async_trait;
274    use engine::BatchStats;
275    use futures::stream::{self, BoxStream};
276    use schema::{Source, SourceType};
277    use schema_core::{ColumnName, DatabaseSchema, GenericValue, IndexName, TableName};
278    use sinks_core::{FlushReport, Sink};
279    use sources_core::cdc::{Ack, AckSink, Change, ChangeEvent};
280    use sources_core::document::{Document, DocumentBuilder, DocumentId, IndexScope};
281    use sources_core::{RowKey, SnapshotTable};
282    use tokio::sync::Notify;
283
284    use crate::observer::StatusObserver;
285    use crate::status::{IndexState, Phase};
286
287    fn users() -> IndexName {
288        IndexName::try_new("users").unwrap()
289    }
290
291    /// The observer drives the status surface through a full lifecycle, and the
292    /// snapshot serializes to the expected JSON shape.
293    #[test]
294    fn observer_drives_status_through_its_lifecycle() {
295        let status = Arc::new(Status::new([users()], Instant::now()));
296        let observer = StatusObserver::new(Arc::clone(&status));
297
298        // Starts pending, before any events.
299        let snap = status.snapshot();
300        assert_eq!(snap.phase, Phase::Starting);
301        assert_eq!(snap.indexes.get("users"), Some(&IndexState::Pending));
302
303        observer.on_indexes_ensured(1);
304        observer.on_backfill_started(&[users()]);
305        let snap = status.snapshot();
306        assert_eq!(snap.phase, Phase::Backfilling);
307        assert_eq!(snap.indexes.get("users"), Some(&IndexState::Backfilling));
308
309        observer.on_index_seeded(&users());
310        observer.on_backfill_completed();
311        observer.on_live_started();
312
313        // Three changes captured, two distinct documents built in one batch.
314        observer.on_change_captured();
315        observer.on_change_captured();
316        observer.on_change_captured();
317        observer.on_batch_committed(BatchStats {
318            changes: 3,
319            documents: 2,
320            documents_by_index: vec![(users(), 2)],
321            flush: Duration::from_millis(5),
322        });
323        observer.on_slot_lag(4096);
324
325        let snap = status.snapshot();
326        assert_eq!(snap.phase, Phase::Live);
327        assert_eq!(snap.indexes.get("users"), Some(&IndexState::Seeded));
328        assert_eq!(snap.changes_captured, 3);
329        assert_eq!(snap.changes_committed, 3);
330        assert_eq!(snap.changes_in_flight, 0);
331        assert_eq!(snap.documents_built, 2);
332        assert_eq!(snap.batches, 1);
333        assert_eq!(snap.slot_lag_bytes, Some(4096));
334        assert_eq!(snap.errors, 0);
335
336        // The JSON the `/status` endpoint returns.
337        let json = serde_json::to_value(&snap).unwrap();
338        assert_eq!(json["phase"], "live");
339        assert_eq!(json["indexes"]["users"], "seeded");
340        assert_eq!(json["changes_in_flight"], 0);
341        assert_eq!(json["slot_lag_bytes"], 4096);
342    }
343
344    /// Reaching live with a never-backfilled index (already seeded on start)
345    /// still reports it seeded, and an error moves the phase to `Stopped`.
346    #[test]
347    fn already_seeded_index_and_error_phase() {
348        let status = Arc::new(Status::new([users()], Instant::now()));
349        let observer = StatusObserver::new(Arc::clone(&status));
350
351        // No backfill_started for `users` — it was already seeded.
352        observer.on_live_started();
353        assert_eq!(
354            status.snapshot().indexes.get("users"),
355            Some(&IndexState::Seeded),
356            "an index live without a backfill this run is reported seeded",
357        );
358
359        observer.on_error("boom");
360        let snap = status.snapshot();
361        assert_eq!(snap.phase, Phase::Stopped);
362        assert_eq!(snap.errors, 1);
363        assert_eq!(snap.last_error.as_deref(), Some("boom"));
364    }
365
366    /// A source that reports a fixed lag and an empty live stream.
367    #[derive(Debug)]
368    struct LaggySource(Option<u64>);
369
370    #[async_trait]
371    impl ChangeCapture for LaggySource {
372        async fn live(
373            &self,
374        ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
375            Ok(Box::pin(stream::empty()))
376        }
377
378        async fn lag(&self) -> sources_core::Result<Option<u64>> {
379            Ok(self.0)
380        }
381    }
382
383    /// Records the slot lag it's told and signals each report, so the poller
384    /// test can await a real report instead of sleeping a fixed duration.
385    #[derive(Debug, Default)]
386    struct LagObserver {
387        last: Mutex<Option<u64>>,
388        reported: Notify,
389    }
390
391    impl Observer for LagObserver {
392        fn on_slot_lag(&self, bytes: u64) {
393            *self.last.lock().unwrap() = Some(bytes);
394            self.reported.notify_one();
395        }
396    }
397
398    /// The lag poller samples the source and reports each known value to the
399    /// observer. Deterministic: it awaits an actual report (the poller's first
400    /// interval tick fires immediately), with a generous timeout as a backstop.
401    #[tokio::test]
402    async fn lag_poller_reports_each_sampled_value() {
403        let observer = Arc::new(LagObserver::default());
404        let source: Arc<dyn ChangeCapture> = Arc::new(LaggySource(Some(8192)));
405
406        let handle = tokio::spawn(lag::poll(
407            source,
408            Arc::clone(&observer) as Arc<dyn Observer>,
409            Duration::from_millis(5),
410        ));
411        tokio::time::timeout(Duration::from_secs(5), observer.reported.notified())
412            .await
413            .expect("the poller should report a lag sample");
414        handle.abort();
415
416        assert_eq!(*observer.last.lock().unwrap(), Some(8192));
417    }
418
419    // --- The daemon driven end-to-end over injected backends -----------------
420    //
421    // These exercise `Daemon::start`/`run` with no Postgres/OpenSearch by
422    // supplying a `Backends` that hands back test doubles — the seam the
423    // pluggable-backends refactor exists to enable.
424
425    /// A `Backends` that returns pre-built test doubles, ignoring the `Config`.
426    /// Counts how often each edge was asked for, to prove the daemon builds its
427    /// backends *through* the seam rather than naming any concrete one.
428    #[derive(Debug)]
429    struct MockBackends {
430        capture: Arc<dyn ChangeCapture>,
431        documents: Arc<dyn DocumentBuilder>,
432        sink: Arc<dyn Sink>,
433        source_built: Arc<AtomicU64>,
434        sink_built: Arc<AtomicU64>,
435    }
436
437    #[async_trait]
438    impl Backends for MockBackends {
439        async fn source(
440            &self,
441            _config: Arc<Config>,
442            _options: &DaemonOptions,
443        ) -> anyhow::Result<SourceParts> {
444            self.source_built.fetch_add(1, Ordering::SeqCst);
445            Ok(SourceParts {
446                capture: Arc::clone(&self.capture),
447                documents: Arc::clone(&self.documents),
448            })
449        }
450
451        async fn sink(
452            &self,
453            _config: &Config,
454            _options: &DaemonOptions,
455        ) -> anyhow::Result<Arc<dyn Sink>> {
456            self.sink_built.fetch_add(1, Ordering::SeqCst);
457            Ok(Arc::clone(&self.sink))
458        }
459    }
460
461    /// Replays a fixed list of changes on the live stream, once, then ends — so
462    /// `engine.run()` completes on its own without a shutdown signal.
463    #[derive(Debug)]
464    struct ScriptedSource {
465        changes: Mutex<Option<Vec<Change>>>,
466    }
467
468    #[async_trait]
469    impl ChangeCapture for ScriptedSource {
470        async fn live(
471            &self,
472        ) -> sources_core::Result<BoxStream<'static, sources_core::Result<Change>>> {
473            let changes = self.changes.lock().unwrap().take().unwrap_or_default();
474            Ok(Box::pin(stream::iter(
475                changes
476                    .into_iter()
477                    .map(Ok::<Change, sources_core::SourceError>),
478            )))
479        }
480
481        async fn lag(&self) -> sources_core::Result<Option<u64>> {
482            Ok(None)
483        }
484    }
485
486    /// Resolves each change to one `users` document; key value `2` is a delete.
487    #[derive(Debug)]
488    struct ScriptedDocuments;
489
490    #[async_trait]
491    impl DocumentBuilder for ScriptedDocuments {
492        async fn resolve(
493            &self,
494            _table: &TableName,
495            key: &RowKey,
496        ) -> sources_core::Result<Vec<DocumentId>> {
497            Ok(vec![DocumentId {
498                index: users(),
499                key: key.clone(),
500            }])
501        }
502
503        async fn build(&self, id: &DocumentId) -> sources_core::Result<Document> {
504            let deleted = matches!(id.key.0.first(), Some((_, GenericValue::Int(2))));
505            Ok(if deleted {
506                Document::Delete { id: id.clone() }
507            } else {
508                Document::Upsert {
509                    id: id.clone(),
510                    body: GenericValue::Map(Default::default()),
511                }
512            })
513        }
514
515        fn backfill_scopes(&self) -> Vec<IndexScope> {
516            vec![IndexScope {
517                index: users(),
518                root: SnapshotTable {
519                    db_schema: DatabaseSchema::try_new("public").unwrap(),
520                    table: TableName::try_new("users").unwrap(),
521                },
522            }]
523        }
524    }
525
526    /// Records the sink ops it receives, in order.
527    #[derive(Debug, Default)]
528    struct RecordingSink {
529        ops: Arc<Mutex<Vec<String>>>,
530    }
531
532    #[async_trait]
533    impl Sink for RecordingSink {
534        async fn upsert(
535            &self,
536            index: &IndexName,
537            id: &str,
538            _document: &GenericValue,
539        ) -> sinks_core::Result<()> {
540            self.ops
541                .lock()
542                .unwrap()
543                .push(format!("upsert {} {id}", index.as_ref()));
544            Ok(())
545        }
546
547        async fn delete(&self, index: &IndexName, id: &str) -> sinks_core::Result<()> {
548            self.ops
549                .lock()
550                .unwrap()
551                .push(format!("delete {} {id}", index.as_ref()));
552            Ok(())
553        }
554
555        async fn flush(&self, _caught_up: bool) -> sinks_core::Result<FlushReport> {
556            Ok(FlushReport::clean())
557        }
558    }
559
560    /// Counts the changes confirmed back to the source.
561    #[derive(Debug)]
562    struct CountingAck(Arc<AtomicU64>);
563
564    impl AckSink for CountingAck {
565        fn confirm(&self, _seq: u64) {
566            self.0.fetch_add(1, Ordering::SeqCst);
567        }
568    }
569
570    fn row_change(delete: bool, id: i64, seq: u64, acks: &Arc<AtomicU64>) -> Change {
571        let table = TableName::try_new("users").unwrap();
572        let key = RowKey(vec![(
573            ColumnName::try_new("id").unwrap(),
574            GenericValue::Int(id),
575        )]);
576        let event = if delete {
577            ChangeEvent::Delete { table, key }
578        } else {
579            ChangeEvent::Upsert { table, key }
580        };
581        Change {
582            event,
583            ack: Ack::new(seq, Arc::new(CountingAck(Arc::clone(acks)))),
584        }
585    }
586
587    /// A config the `MockBackends` ignores — only `indexes` is read by the
588    /// daemon (for the status surface), and it's intentionally empty.
589    fn backendless_config() -> Config {
590        Config {
591            source: Source {
592                source_type: SourceType::Postgres,
593                connection: None,
594            },
595            sinks: BTreeMap::new(),
596            indexes: BTreeMap::new(),
597            on_error: Default::default(),
598            server: Default::default(),
599        }
600    }
601
602    fn daemon_over(backends: Arc<MockBackends>) -> Daemon {
603        Daemon::new(backendless_config(), backends).with_options(DaemonOptions {
604            // No backfill: the test drives the live path directly.
605            skip_backfill: true,
606            ..DaemonOptions::default()
607        })
608    }
609
610    /// `Daemon::start` builds both edges **through** the injected `Backends`
611    /// (never naming a concrete backend itself), and a run over an empty live
612    /// stream returns cleanly with the status surface left at `Stopped`.
613    #[tokio::test]
614    async fn start_builds_backends_through_the_seam() {
615        let source_built = Arc::new(AtomicU64::new(0));
616        let sink_built = Arc::new(AtomicU64::new(0));
617
618        let backends = Arc::new(MockBackends {
619            capture: Arc::new(ScriptedSource {
620                changes: Mutex::new(Some(Vec::new())),
621            }),
622            documents: Arc::new(ScriptedDocuments),
623            sink: Arc::new(RecordingSink::default()),
624            source_built: Arc::clone(&source_built),
625            sink_built: Arc::clone(&sink_built),
626        });
627
628        let running = daemon_over(backends).start().await.unwrap();
629        let status = running.status();
630
631        // The daemon asked the seam — not a hardcoded backend — for each edge.
632        assert_eq!(source_built.load(Ordering::SeqCst), 1);
633        assert_eq!(sink_built.load(Ordering::SeqCst), 1);
634
635        // An empty live stream completes on its own; the shutdown never fires.
636        running.run(std::future::pending::<()>()).await.unwrap();
637
638        let snap = status.snapshot();
639        assert_eq!(snap.phase, Phase::Stopped);
640        assert_eq!(snap.changes_committed, 0);
641    }
642
643    /// A run over a non-empty live stream drives changes all the way through the
644    /// injected document builder and sink — capture, build, write, flush, ack —
645    /// with no real source or sink, and the status counters reflect it.
646    #[tokio::test]
647    async fn drives_changes_through_injected_backends() {
648        let acks = Arc::new(AtomicU64::new(0));
649        let ops = Arc::new(Mutex::new(Vec::new()));
650
651        let changes = vec![
652            row_change(false, 1, 0, &acks),
653            row_change(true, 2, 1, &acks),
654        ];
655
656        let backends = Arc::new(MockBackends {
657            capture: Arc::new(ScriptedSource {
658                changes: Mutex::new(Some(changes)),
659            }),
660            documents: Arc::new(ScriptedDocuments),
661            sink: Arc::new(RecordingSink {
662                ops: Arc::clone(&ops),
663            }),
664            source_built: Arc::new(AtomicU64::new(0)),
665            sink_built: Arc::new(AtomicU64::new(0)),
666        });
667
668        let running = daemon_over(backends).start().await.unwrap();
669        let status = running.status();
670        running.run(std::future::pending::<()>()).await.unwrap();
671
672        assert_eq!(
673            *ops.lock().unwrap(),
674            vec!["upsert users 1".to_owned(), "delete users 2".to_owned()],
675        );
676        assert_eq!(acks.load(Ordering::SeqCst), 2, "both changes acked");
677
678        let snap = status.snapshot();
679        assert_eq!(snap.changes_captured, 2);
680        assert_eq!(snap.changes_committed, 2);
681        assert_eq!(snap.changes_in_flight, 0);
682        assert_eq!(snap.phase, Phase::Stopped);
683    }
684}