engine/observer.rs
1//! Pipeline observability — a backend-neutral [`Observer`] the engine reports
2//! lifecycle and progress events to.
3//!
4//! The engine emits events at the transitions and boundaries it already has
5//! (indexes ensured, backfill phases, live start, each change captured, each
6//! batch committed, errors). It depends only on this trait, never on a concrete
7//! metrics or status backend — exactly the trait-object discipline the rest of
8//! the pipeline uses for sources and sinks. A consumer (the `daemon` crate)
9//! implements [`Observer`] to fan these events out to metrics, a live status
10//! surface, or anything else.
11//!
12//! Methods are **synchronous and cheap** (`&self`, no `await`): they run inline
13//! on the pipeline's hot path, so an implementation must do only fast,
14//! non-blocking work (bump a counter, update an atomic). Anything slower belongs
15//! on a separate task fed by what the observer records. Every method has a
16//! no-op default so an implementation overrides only what it cares about, and
17//! [`NoopObserver`] is the default when none is set.
18
19use std::sync::Arc;
20use std::time::Duration;
21
22use schema_core::IndexName;
23
24/// What one committed batch did — reported to [`Observer::on_batch_committed`].
25#[derive(Debug, Clone)]
26pub struct BatchStats {
27 /// Changes buffered into this batch (what [`BatchPolicy`](crate::BatchPolicy)
28 /// caps).
29 pub changes: usize,
30 /// Distinct documents the batch built and wrote — `<= changes` after the
31 /// per-batch dedup. Equals the sum of [`documents_by_index`](Self::documents_by_index).
32 pub documents: usize,
33 /// Documents built per target index, for per-index metrics. One entry per
34 /// index the batch touched.
35 pub documents_by_index: Vec<(IndexName, usize)>,
36 /// How long the [`flush`](sinks_core::Sink::flush) that made the batch
37 /// durable took.
38 pub flush: Duration,
39}
40
41/// A sink for the engine's lifecycle and progress events.
42///
43/// See this module's docs for the hot-path contract. All methods default
44/// to no-ops.
45pub trait Observer: std::fmt::Debug + Send + Sync {
46 /// The target indexes have been ensured at the sink (`count` of them),
47 /// before any documents flow.
48 fn on_indexes_ensured(&self, count: usize) {
49 let _ = count;
50 }
51
52 /// Backfill is starting for `indexes` (those the sink reported unseeded).
53 fn on_backfill_started(&self, indexes: &[IndexName]) {
54 let _ = indexes;
55 }
56
57 /// `index`'s backfill is complete and it has been marked seeded.
58 fn on_index_seeded(&self, index: &IndexName) {
59 let _ = index;
60 }
61
62 /// The backfill phase finished (all unseeded indexes seeded), or was skipped.
63 fn on_backfill_completed(&self) {}
64
65 /// Live capture has started; the pipeline is now following ongoing changes.
66 fn on_live_started(&self) {}
67
68 /// One change was pulled from the source into the queue.
69 fn on_change_captured(&self) {}
70
71 /// A batch was built, flushed, and acked. See [`BatchStats`].
72 fn on_batch_committed(&self, stats: BatchStats) {
73 let _ = stats;
74 }
75
76 /// The source's capture lag, in bytes behind the latest position — e.g. a
77 /// replication slot's distance from the server's current WAL. Reported by
78 /// whoever polls [`ChangeCapture::lag`](sources_core::cdc::ChangeCapture::lag),
79 /// not by the engine loop itself.
80 fn on_slot_lag(&self, bytes: u64) {
81 let _ = bytes;
82 }
83
84 /// A document was **quarantined**: the sink rejected it at the item level
85 /// and the engine's failure policy is to skip and continue (see
86 /// [`FailurePolicy::Skip`](crate::FailurePolicy)). The document is not
87 /// applied and the batch proceeds, so it is *not* redelivered — this is the
88 /// signal to surface it (a metric, a log, a dead-letter record). `index` and
89 /// `id` are the destination's names for it; `reason` is why it was rejected.
90 fn on_document_quarantined(&self, index: &str, id: &str, reason: &str) {
91 let _ = (index, id, reason);
92 }
93
94 /// The pipeline stopped on an error (rendered to a string, since the engine's
95 /// error type is not part of this neutral surface).
96 fn on_error(&self, error: &str) {
97 let _ = error;
98 }
99}
100
101/// The default [`Observer`]: every event is dropped. Used when an engine is run
102/// without [`with_observer`](crate::Engine::with_observer).
103#[derive(Debug, Default, Clone, Copy)]
104pub struct NoopObserver;
105
106impl Observer for NoopObserver {}
107
108/// An [`Observer`] that forwards every event to several observers in turn.
109///
110/// The engine drives a single observer, so this composes many — e.g. one that
111/// updates a status surface and one that records metrics — without the engine
112/// knowing how many there are. Mirrors [`FanOutSink`](sinks_core::FanOutSink).
113#[derive(Debug, Default)]
114pub struct FanOut {
115 observers: Vec<Arc<dyn Observer>>,
116}
117
118impl FanOut {
119 /// Compose the given observers; each receives every event, in order.
120 pub fn new(observers: Vec<Arc<dyn Observer>>) -> Self {
121 Self { observers }
122 }
123}
124
125impl Observer for FanOut {
126 fn on_indexes_ensured(&self, count: usize) {
127 for observer in &self.observers {
128 observer.on_indexes_ensured(count);
129 }
130 }
131
132 fn on_backfill_started(&self, indexes: &[IndexName]) {
133 for observer in &self.observers {
134 observer.on_backfill_started(indexes);
135 }
136 }
137
138 fn on_index_seeded(&self, index: &IndexName) {
139 for observer in &self.observers {
140 observer.on_index_seeded(index);
141 }
142 }
143
144 fn on_backfill_completed(&self) {
145 for observer in &self.observers {
146 observer.on_backfill_completed();
147 }
148 }
149
150 fn on_live_started(&self) {
151 for observer in &self.observers {
152 observer.on_live_started();
153 }
154 }
155
156 fn on_change_captured(&self) {
157 for observer in &self.observers {
158 observer.on_change_captured();
159 }
160 }
161
162 fn on_batch_committed(&self, stats: BatchStats) {
163 for observer in &self.observers {
164 observer.on_batch_committed(stats.clone());
165 }
166 }
167
168 fn on_slot_lag(&self, bytes: u64) {
169 for observer in &self.observers {
170 observer.on_slot_lag(bytes);
171 }
172 }
173
174 fn on_document_quarantined(&self, index: &str, id: &str, reason: &str) {
175 for observer in &self.observers {
176 observer.on_document_quarantined(index, id, reason);
177 }
178 }
179
180 fn on_error(&self, error: &str) {
181 for observer in &self.observers {
182 observer.on_error(error);
183 }
184 }
185}