Skip to main content

engine/
observer.rs

1//! Pipeline observability — a backend-neutral [`Observer`] the engine reports
2//! lifecycle and progress events to.
3//!
4//! The engine emits events at the transitions and boundaries it already has
5//! (indexes ensured, backfill phases, live start, each change captured, each
6//! batch committed, errors). It depends only on this trait, never on a concrete
7//! metrics or status backend — exactly the trait-object discipline the rest of
8//! the pipeline uses for sources and sinks. A consumer (the `daemon` crate)
9//! implements [`Observer`] to fan these events out to metrics, a live status
10//! surface, or anything else.
11//!
12//! Methods are **synchronous and cheap** (`&self`, no `await`): they run inline
13//! on the pipeline's hot path, so an implementation must do only fast,
14//! non-blocking work (bump a counter, update an atomic). Anything slower belongs
15//! on a separate task fed by what the observer records. Every method has a
16//! no-op default so an implementation overrides only what it cares about, and
17//! [`NoopObserver`] is the default when none is set.
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use schema_core::IndexName;
23
24/// What one committed batch did — reported to [`Observer::on_batch_committed`].
25#[derive(Debug, Clone)]
26pub struct BatchStats {
27    /// Changes buffered into this batch (what [`BatchPolicy`](crate::BatchPolicy)
28    /// caps).
29    pub changes: usize,
30    /// Distinct documents the batch built and wrote — `<= changes` after the
31    /// per-batch dedup. Equals the sum of [`documents_by_index`](Self::documents_by_index).
32    pub documents: usize,
33    /// Documents built per target index, for per-index metrics. One entry per
34    /// index the batch touched.
35    pub documents_by_index: Vec<(IndexName, usize)>,
36    /// How long the [`flush`](sinks_core::Sink::flush) that made the batch
37    /// durable took.
38    pub flush: Duration,
39}
40
41/// A sink for the engine's lifecycle and progress events.
42///
43/// See this module's docs for the hot-path contract. All methods default
44/// to no-ops.
45pub trait Observer: std::fmt::Debug + Send + Sync {
46    /// The target indexes have been ensured at the sink (`count` of them),
47    /// before any documents flow.
48    fn on_indexes_ensured(&self, count: usize) {
49        let _ = count;
50    }
51
52    /// Backfill is starting for `indexes` (those the sink reported unseeded).
53    fn on_backfill_started(&self, indexes: &[IndexName]) {
54        let _ = indexes;
55    }
56
57    /// `index`'s backfill is complete and it has been marked seeded.
58    fn on_index_seeded(&self, index: &IndexName) {
59        let _ = index;
60    }
61
62    /// The backfill phase finished (all unseeded indexes seeded), or was skipped.
63    fn on_backfill_completed(&self) {}
64
65    /// Live capture has started; the pipeline is now following ongoing changes.
66    fn on_live_started(&self) {}
67
68    /// One change was pulled from the source into the queue.
69    fn on_change_captured(&self) {}
70
71    /// A batch was built, flushed, and acked. See [`BatchStats`].
72    fn on_batch_committed(&self, stats: BatchStats) {
73        let _ = stats;
74    }
75
76    /// The source's capture lag, in bytes behind the latest position — e.g. a
77    /// replication slot's distance from the server's current WAL. Reported by
78    /// whoever polls [`ChangeCapture::lag`](sources_core::cdc::ChangeCapture::lag),
79    /// not by the engine loop itself.
80    fn on_slot_lag(&self, bytes: u64) {
81        let _ = bytes;
82    }
83
84    /// A document was **quarantined**: the sink rejected it at the item level
85    /// and the engine's failure policy is to skip and continue (see
86    /// [`FailurePolicy::Skip`](crate::FailurePolicy)). The document is not
87    /// applied and the batch proceeds, so it is *not* redelivered — this is the
88    /// signal to surface it (a metric, a log, a dead-letter record). `index` and
89    /// `id` are the destination's names for it; `reason` is why it was rejected.
90    fn on_document_quarantined(&self, index: &str, id: &str, reason: &str) {
91        let _ = (index, id, reason);
92    }
93
94    /// The pipeline stopped on an error (rendered to a string, since the engine's
95    /// error type is not part of this neutral surface).
96    fn on_error(&self, error: &str) {
97        let _ = error;
98    }
99}
100
101/// The default [`Observer`]: every event is dropped. Used when an engine is run
102/// without [`with_observer`](crate::Engine::with_observer).
103#[derive(Debug, Default, Clone, Copy)]
104pub struct NoopObserver;
105
106impl Observer for NoopObserver {}
107
108/// An [`Observer`] that forwards every event to several observers in turn.
109///
110/// The engine drives a single observer, so this composes many — e.g. one that
111/// updates a status surface and one that records metrics — without the engine
112/// knowing how many there are. Mirrors [`FanOutSink`](sinks_core::FanOutSink).
113#[derive(Debug, Default)]
114pub struct FanOut {
115    observers: Vec<Arc<dyn Observer>>,
116}
117
118impl FanOut {
119    /// Compose the given observers; each receives every event, in order.
120    pub fn new(observers: Vec<Arc<dyn Observer>>) -> Self {
121        Self { observers }
122    }
123}
124
125impl Observer for FanOut {
126    fn on_indexes_ensured(&self, count: usize) {
127        for observer in &self.observers {
128            observer.on_indexes_ensured(count);
129        }
130    }
131
132    fn on_backfill_started(&self, indexes: &[IndexName]) {
133        for observer in &self.observers {
134            observer.on_backfill_started(indexes);
135        }
136    }
137
138    fn on_index_seeded(&self, index: &IndexName) {
139        for observer in &self.observers {
140            observer.on_index_seeded(index);
141        }
142    }
143
144    fn on_backfill_completed(&self) {
145        for observer in &self.observers {
146            observer.on_backfill_completed();
147        }
148    }
149
150    fn on_live_started(&self) {
151        for observer in &self.observers {
152            observer.on_live_started();
153        }
154    }
155
156    fn on_change_captured(&self) {
157        for observer in &self.observers {
158            observer.on_change_captured();
159        }
160    }
161
162    fn on_batch_committed(&self, stats: BatchStats) {
163        for observer in &self.observers {
164            observer.on_batch_committed(stats.clone());
165        }
166    }
167
168    fn on_slot_lag(&self, bytes: u64) {
169        for observer in &self.observers {
170            observer.on_slot_lag(bytes);
171        }
172    }
173
174    fn on_document_quarantined(&self, index: &str, id: &str, reason: &str) {
175        for observer in &self.observers {
176            observer.on_document_quarantined(index, id, reason);
177        }
178    }
179
180    fn on_error(&self, error: &str) {
181        for observer in &self.observers {
182            observer.on_error(error);
183        }
184    }
185}