yeti_types/backend/observer.rs
1//! Transaction observer trait — the unified post-commit hook surface
2//! that audit, replication, and (future) other write-observation
3//! consumers share.
4//!
5//! ## Why one trait
6//!
7//! Before this surface, audit and replication would have lived as
8//! parallel hook paths — audit firing in `yeti-table::query_exec`
9//! above `KvBackend` (fire-and-forget, drops on full channel),
10//! replication firing at commit time inside the backend (durable,
11//! cannot drop). Two paths means:
12//!
13//! - Slight differences in what each path sees (audit gets JSON-
14//! cooked records; replication gets binary write ops) creating
15//! "audit and replication disagree about state" failure modes
16//! - Doubled instrumentation and observability cost
17//! - More places to keep in sync when the write path evolves
18//!
19//! Replication's data + reliability requirements are a strict
20//! superset of audit's, and replication's fire-point (commit time
21//! inside the backend) is also valid for audit. Unifying as a single
22//! trait means one observation point, multiple subscribers, each
23//! declaring its requirements and reacting to commits according to
24//! its own contract.
25//!
26//! ## Contract
27//!
28//! Observers are registered on the [`BackendManager`](crate::backend::BackendManager) at startup and
29//! are invoked **synchronously inside the backend's commit path**,
30//! after the data has landed in the storage engine but before
31//! `KvBackend::write_batch` returns to the caller.
32//!
33//! **Failure semantics**: returning `Err` from
34//! [`on_commit`](TransactionObserver::on_commit) does NOT roll back
35//! the data write — yeti's underlying storage doesn't support
36//! two-phase commit across the data write and arbitrary observer
37//! side-effects. Observer failures are logged, metered, and surfaced
38//! to operators; the write itself succeeds. Observers that need
39//! transactional durability with the data write must arrange it
40//! themselves (e.g., the replication observer writes its log entry
41//! into the same `RocksDB` instance via a separate column family, so
42//! "data committed but log missing" is a single-machine atomicity
43//! failure mode rather than a cross-system one).
44//!
45//! Observers that want fire-and-forget semantics (e.g., audit) must
46//! return `Ok(())` always and swallow their own internal failures —
47//! a returned `Err` should mean "operator should be paged," not
48//! "this write was rejected."
49//!
50//! ## Ordering
51//!
52//! Observers fire in **registration order**. The replication
53//! observer registers first (durability is the foundational
54//! contract); audit registers second (telemetry on top of durability).
55//! Order matters: if the replication log write fails and audit's
56//! observer would have logged the failure, audit needs to see it.
57
58use std::sync::Arc;
59
60use crate::error::Result;
61use crate::hlc::HlcTimestamp;
62use crate::types::TableId;
63
64use super::WriteOp;
65
66/// Per-observer declaration of which optional fields the backend
67/// must populate on a [`TransactionEvent`].
68///
69/// The backend OR's all registered observers' requirements and
70/// produces exactly the fields some observer wants. Fields nobody
71/// needs are skipped — for example, the "previous value" read
72/// before commit is expensive (an extra `RocksDB` get); if neither
73/// audit (with `capture_state: false`) nor replication (today) asks
74/// for it, the backend skips that read entirely.
75#[derive(Debug, Clone, Copy, Default)]
76pub struct ObserverRequirements {
77 /// Read the existing value at the key (if any) before committing
78 /// the new write. Costs an extra storage `get`. Audit with
79 /// `capture_state: true` needs this; replication today does not
80 /// (CRDT merge happens at apply time on the receiver using its
81 /// local state).
82 pub needs_before_value: bool,
83
84 /// Look up the HLC timestamp of the existing record at the key
85 /// (if any) and pass it as `prev_hlc` for causal predecessor
86 /// tracking. Cheap because the HLC is stored alongside the
87 /// record metadata in a known location. Replication needs this
88 /// for the "this write observed prev HLC X" causal-chain
89 /// invariant; audit does not.
90 pub needs_prev_hlc: bool,
91}
92
93impl ObserverRequirements {
94 /// Combine two requirement sets — the OR of all flags. Used by
95 /// the backend to compute the union of all registered observers'
96 /// needs.
97 #[must_use]
98 pub const fn union(self, other: Self) -> Self {
99 Self {
100 needs_before_value: self.needs_before_value || other.needs_before_value,
101 needs_prev_hlc: self.needs_prev_hlc || other.needs_prev_hlc,
102 }
103 }
104}
105
106/// A single committed transaction event — what observers see.
107///
108/// Borrowed view: the backend produces this on the stack at commit
109/// time and passes it by reference. Observers that want to outlive
110/// the borrow must copy the fields they care about.
111#[derive(Debug)]
112pub struct TransactionEvent<'a> {
113 /// HLC timestamp assigned to this write at commit time.
114 pub hlc: HlcTimestamp,
115
116 /// Node identifier of the writer.
117 ///
118 /// For locally-originated writes, this is `self.node_id` — the
119 /// id of the node that called `write_batch`. For writes applied
120 /// by the replication observer in response to a peer's log
121 /// entry, this is the **originating peer's** id, not the local
122 /// node's. Observers that should only act on local writes
123 /// (e.g., the replication observer's outgoing-log writer) filter
124 /// on `node_id == self.node_id`; observers that act on all
125 /// observed state (e.g., audit) do not filter.
126 pub node_id: &'a str,
127
128 /// In-deployment table identity (app / database / table). Tenant
129 /// scoping lives at the deployment frame in the multiplexer, not
130 /// inside the table id — see [`crate::types::DeploymentHash`].
131 pub table: &'a TableId,
132
133 /// The write operation. Put or Delete; CRDT operations are
134 /// encoded inside the value via `__op__` markers (per the
135 /// Harper convention). The wire shape stays uniform; CRDT-ness
136 /// is in the data, not the protocol.
137 pub op: &'a WriteOp,
138
139 /// Existing value bytes at the key (if any) prior to this write,
140 /// populated only if some registered observer set
141 /// [`ObserverRequirements::needs_before_value`].
142 pub before: Option<&'a [u8]>,
143
144 /// HLC of the existing record at the key (if any), populated
145 /// only if some registered observer set
146 /// [`ObserverRequirements::needs_prev_hlc`]. Used by replication
147 /// to record causal-predecessor relationships in the log.
148 pub prev_hlc: Option<HlcTimestamp>,
149}
150
151/// Trait implemented by every consumer of the post-commit observation
152/// surface (audit, replication, etc.).
153///
154/// See module docs for the full contract.
155pub trait TransactionObserver: Send + Sync + std::fmt::Debug + 'static {
156 /// A short identifier for diagnostics and observability — appears
157 /// in error logs and metric labels. Conventionally
158 /// `crate-shortname` like `"audit"` or `"replication"`.
159 fn name(&self) -> &'static str;
160
161 /// Declare which optional [`TransactionEvent`] fields this
162 /// observer needs the backend to populate. Called once at
163 /// registration time; cached by the backend.
164 fn requirements(&self) -> ObserverRequirements;
165
166 /// Called inside the backend's commit path, after the storage
167 /// write succeeded. See module docs for failure semantics.
168 ///
169 /// # Errors
170 ///
171 /// Returns the observer-specific [`YetiError`](crate::error::YetiError)
172 /// when post-commit processing fails. The backend logs the error
173 /// and continues — observer failures do not roll back the write.
174 fn on_commit(&self, event: &TransactionEvent<'_>) -> Result<()>;
175}
176
177/// Shareable, registerable observer handle. The [`BackendManager`](crate::backend::BackendManager)
178/// stores `Vec<ObserverHandle>` and iterates it on every commit.
179pub type ObserverHandle = Arc<dyn TransactionObserver>;
180
181// ============================================================================
182// ObserverRegistry
183// ============================================================================
184
185/// Shared, lock-free registry of [`TransactionObserver`]s.
186///
187/// One registry instance lives per-deployment process, held by
188/// [`BackendManager`](crate::backend::BackendManager) and cloned into every `LoggingBackend` so the
189/// commit path can fan out without coordination. The internal
190/// snapshot is swapped via [`arc_swap::ArcSwap`]: registration takes
191/// a brief write, every commit-path read is wait-free.
192///
193/// Registration happens during deployment startup before traffic is
194/// accepted. The contract assumes near-zero registrations after
195/// startup completes; nothing in the type enforces that, but the
196/// trait doc's "registration order matters" + "registered first
197/// fires first" promises hold as long as registration is
198/// startup-serialized.
199#[derive(Debug, Default)]
200pub struct ObserverRegistry {
201 /// Snapshot of the registered observers, swapped atomically on
202 /// registration. The empty default is `Arc::new(Vec::new())`.
203 observers: arc_swap::ArcSwap<Vec<ObserverHandle>>,
204}
205
206impl ObserverRegistry {
207 /// Construct an empty registry wrapped in an [`Arc`] for sharing.
208 #[must_use]
209 pub fn new() -> Arc<Self> {
210 Arc::new(Self::default())
211 }
212
213 /// Register an observer. Appends to the end of the snapshot so
214 /// registration order is preserved (replication first, audit
215 /// second per the trait doc).
216 pub fn register(&self, observer: ObserverHandle) {
217 let cur = self.observers.load_full();
218 let mut next = Vec::with_capacity(cur.len() + 1);
219 next.extend(cur.iter().cloned());
220 next.push(observer);
221 self.observers.store(Arc::new(next));
222 }
223
224 /// Snapshot of the currently-registered observers. Cheap (`Arc`
225 /// clone) — call from the commit path on every write.
226 #[must_use]
227 pub fn snapshot(&self) -> Arc<Vec<ObserverHandle>> {
228 self.observers.load_full()
229 }
230
231 /// Computed union of all registered observers' requirements.
232 /// Recomputed on each call; cache at the caller if hot.
233 #[must_use]
234 pub fn requirements(&self) -> ObserverRequirements {
235 self.observers
236 .load()
237 .iter()
238 .map(|o| o.requirements())
239 .fold(ObserverRequirements::default(), ObserverRequirements::union)
240 }
241
242 /// `true` if no observers are registered. Commit paths can use
243 /// this to skip building the [`TransactionEvent`] entirely.
244 #[must_use]
245 pub fn is_empty(&self) -> bool {
246 self.observers.load().is_empty()
247 }
248}
249
250#[cfg(test)]
251#[allow(clippy::unwrap_used, clippy::expect_used)]
252mod tests {
253 use super::*;
254
255 #[derive(Debug)]
256 struct NullObserver;
257 impl TransactionObserver for NullObserver {
258 fn name(&self) -> &'static str {
259 "null"
260 }
261 fn requirements(&self) -> ObserverRequirements {
262 ObserverRequirements::default()
263 }
264 fn on_commit(&self, _event: &TransactionEvent<'_>) -> Result<()> {
265 Ok(())
266 }
267 }
268
269 #[test]
270 fn requirements_union_or_combines() {
271 let a = ObserverRequirements {
272 needs_before_value: true,
273 needs_prev_hlc: false,
274 };
275 let b = ObserverRequirements {
276 needs_before_value: false,
277 needs_prev_hlc: true,
278 };
279 let c = a.union(b);
280 assert!(c.needs_before_value);
281 assert!(c.needs_prev_hlc);
282 }
283
284 #[test]
285 fn observer_handle_is_arc_send_sync() {
286 // Compile-time check: the trait object via ObserverHandle is
287 // shareable across threads. Important because BackendManager
288 // is shared across the runtime and observers are read from
289 // many tasks.
290 fn assert_send_sync<T: Send + Sync>() {}
291 assert_send_sync::<ObserverHandle>();
292 let _: ObserverHandle = Arc::new(NullObserver);
293 }
294
295 #[derive(Debug)]
296 struct WithReqs(ObserverRequirements);
297 impl TransactionObserver for WithReqs {
298 fn name(&self) -> &'static str {
299 "with-reqs"
300 }
301 fn requirements(&self) -> ObserverRequirements {
302 self.0
303 }
304 fn on_commit(&self, _event: &TransactionEvent<'_>) -> Result<()> {
305 Ok(())
306 }
307 }
308
309 #[test]
310 fn registry_starts_empty() {
311 let r = ObserverRegistry::new();
312 assert!(r.is_empty());
313 assert_eq!(r.snapshot().len(), 0);
314 assert!(!r.requirements().needs_before_value);
315 assert!(!r.requirements().needs_prev_hlc);
316 }
317
318 #[test]
319 fn registry_appends_in_registration_order() {
320 let r = ObserverRegistry::new();
321 r.register(Arc::new(WithReqs(ObserverRequirements::default())));
322 r.register(Arc::new(NullObserver));
323 let snap = r.snapshot();
324 assert_eq!(snap.len(), 2);
325 assert_eq!(snap[0].name(), "with-reqs");
326 assert_eq!(snap[1].name(), "null");
327 }
328
329 #[test]
330 fn registry_requirements_is_union() {
331 let r = ObserverRegistry::new();
332 r.register(Arc::new(WithReqs(ObserverRequirements {
333 needs_before_value: true,
334 needs_prev_hlc: false,
335 })));
336 r.register(Arc::new(WithReqs(ObserverRequirements {
337 needs_before_value: false,
338 needs_prev_hlc: true,
339 })));
340 let reqs = r.requirements();
341 assert!(reqs.needs_before_value);
342 assert!(reqs.needs_prev_hlc);
343 }
344
345 #[test]
346 fn registry_snapshot_is_cheap_arc_clone() {
347 // After register the snapshot reflects the new state; older
348 // snapshots taken before the register still point at the
349 // previous Arc — proving the swap is non-disruptive to readers.
350 let r = ObserverRegistry::new();
351 let before = r.snapshot();
352 r.register(Arc::new(NullObserver));
353 let after = r.snapshot();
354 assert_eq!(before.len(), 0);
355 assert_eq!(after.len(), 1);
356 }
357}