Skip to main content

yeti_types/backend/
observer.rs

1//! Transaction observer trait — the unified post-commit hook surface
2//! that audit, replication, and (future) other write-observation
3//! consumers share.
4//!
5//! ## Why one trait
6//!
7//! Before this surface, audit and replication would have lived as
8//! parallel hook paths — audit firing in `yeti-table::query_exec`
9//! above `KvBackend` (fire-and-forget, drops on full channel),
10//! replication firing at commit time inside the backend (durable,
11//! cannot drop). Two paths means:
12//!
13//! - Slight differences in what each path sees (audit gets JSON-
14//!   cooked records; replication gets binary write ops) creating
15//!   "audit and replication disagree about state" failure modes
16//! - Doubled instrumentation and observability cost
17//! - More places to keep in sync when the write path evolves
18//!
19//! Replication's data + reliability requirements are a strict
20//! superset of audit's, and replication's fire-point (commit time
21//! inside the backend) is also valid for audit. Unifying as a single
22//! trait means one observation point, multiple subscribers, each
23//! declaring its requirements and reacting to commits according to
24//! its own contract.
25//!
26//! ## Contract
27//!
28//! Observers are registered on the [`BackendManager`](crate::backend::BackendManager) at startup and
29//! are invoked **synchronously inside the backend's commit path**,
30//! after the data has landed in the storage engine but before
31//! `KvBackend::write_batch` returns to the caller.
32//!
33//! **Failure semantics**: returning `Err` from
34//! [`on_commit`](TransactionObserver::on_commit) does NOT roll back
35//! the data write — yeti's underlying storage doesn't support
36//! two-phase commit across the data write and arbitrary observer
37//! side-effects. Observer failures are logged, metered, and surfaced
38//! to operators; the write itself succeeds. Observers that need
39//! transactional durability with the data write must arrange it
40//! themselves (e.g., the replication observer writes its log entry
41//! into the same `RocksDB` instance via a separate column family, so
42//! "data committed but log missing" is a single-machine atomicity
43//! failure mode rather than a cross-system one).
44//!
45//! Observers that want fire-and-forget semantics (e.g., audit) must
46//! return `Ok(())` always and swallow their own internal failures —
47//! a returned `Err` should mean "operator should be paged," not
48//! "this write was rejected."
49//!
50//! ## Ordering
51//!
52//! Observers fire in **registration order**. The replication
53//! observer registers first (durability is the foundational
54//! contract); audit registers second (telemetry on top of durability).
55//! Order matters: if the replication log write fails and audit's
56//! observer would have logged the failure, audit needs to see it.
57
58use std::sync::Arc;
59
60use crate::error::Result;
61use crate::hlc::HlcTimestamp;
62use crate::types::TableId;
63
64use super::WriteOp;
65
66/// Per-observer declaration of which optional fields the backend
67/// must populate on a [`TransactionEvent`].
68///
69/// The backend OR's all registered observers' requirements and
70/// produces exactly the fields some observer wants. Fields nobody
71/// needs are skipped — for example, the "previous value" read
72/// before commit is expensive (an extra `RocksDB` get); if neither
73/// audit (with `capture_state: false`) nor replication (today) asks
74/// for it, the backend skips that read entirely.
75#[derive(Debug, Clone, Copy, Default)]
76pub struct ObserverRequirements {
77    /// Read the existing value at the key (if any) before committing
78    /// the new write. Costs an extra storage `get`. Audit with
79    /// `capture_state: true` needs this; replication today does not
80    /// (CRDT merge happens at apply time on the receiver using its
81    /// local state).
82    pub needs_before_value: bool,
83
84    /// Look up the HLC timestamp of the existing record at the key
85    /// (if any) and pass it as `prev_hlc` for causal predecessor
86    /// tracking. Cheap because the HLC is stored alongside the
87    /// record metadata in a known location. Replication needs this
88    /// for the "this write observed prev HLC X" causal-chain
89    /// invariant; audit does not.
90    pub needs_prev_hlc: bool,
91}
92
93impl ObserverRequirements {
94    /// Combine two requirement sets — the OR of all flags. Used by
95    /// the backend to compute the union of all registered observers'
96    /// needs.
97    #[must_use]
98    pub const fn union(self, other: Self) -> Self {
99        Self {
100            needs_before_value: self.needs_before_value || other.needs_before_value,
101            needs_prev_hlc: self.needs_prev_hlc || other.needs_prev_hlc,
102        }
103    }
104}
105
106/// A single committed transaction event — what observers see.
107///
108/// Borrowed view: the backend produces this on the stack at commit
109/// time and passes it by reference. Observers that want to outlive
110/// the borrow must copy the fields they care about.
111#[derive(Debug)]
112pub struct TransactionEvent<'a> {
113    /// HLC timestamp assigned to this write at commit time.
114    pub hlc: HlcTimestamp,
115
116    /// Node identifier of the writer.
117    ///
118    /// For locally-originated writes, this is `self.node_id` — the
119    /// id of the node that called `write_batch`. For writes applied
120    /// by the replication observer in response to a peer's log
121    /// entry, this is the **originating peer's** id, not the local
122    /// node's. Observers that should only act on local writes
123    /// (e.g., the replication observer's outgoing-log writer) filter
124    /// on `node_id == self.node_id`; observers that act on all
125    /// observed state (e.g., audit) do not filter.
126    pub node_id: &'a str,
127
128    /// In-deployment table identity (app / database / table). Tenant
129    /// scoping lives at the deployment frame in the multiplexer, not
130    /// inside the table id — see [`crate::types::DeploymentHash`].
131    pub table: &'a TableId,
132
133    /// The write operation. Put or Delete; CRDT operations are
134    /// encoded inside the value via `__op__` markers (per the
135    /// Harper convention). The wire shape stays uniform; CRDT-ness
136    /// is in the data, not the protocol.
137    pub op: &'a WriteOp,
138
139    /// Existing value bytes at the key (if any) prior to this write,
140    /// populated only if some registered observer set
141    /// [`ObserverRequirements::needs_before_value`].
142    pub before: Option<&'a [u8]>,
143
144    /// HLC of the existing record at the key (if any), populated
145    /// only if some registered observer set
146    /// [`ObserverRequirements::needs_prev_hlc`]. Used by replication
147    /// to record causal-predecessor relationships in the log.
148    pub prev_hlc: Option<HlcTimestamp>,
149}
150
151/// Trait implemented by every consumer of the post-commit observation
152/// surface (audit, replication, etc.).
153///
154/// See module docs for the full contract.
155pub trait TransactionObserver: Send + Sync + std::fmt::Debug + 'static {
156    /// A short identifier for diagnostics and observability — appears
157    /// in error logs and metric labels. Conventionally
158    /// `crate-shortname` like `"audit"` or `"replication"`.
159    fn name(&self) -> &'static str;
160
161    /// Declare which optional [`TransactionEvent`] fields this
162    /// observer needs the backend to populate. Called once at
163    /// registration time; cached by the backend.
164    fn requirements(&self) -> ObserverRequirements;
165
166    /// Called inside the backend's commit path, after the storage
167    /// write succeeded. See module docs for failure semantics.
168    ///
169    /// # Errors
170    ///
171    /// Returns the observer-specific [`YetiError`](crate::error::YetiError)
172    /// when post-commit processing fails. The backend logs the error
173    /// and continues — observer failures do not roll back the write.
174    fn on_commit(&self, event: &TransactionEvent<'_>) -> Result<()>;
175}
176
177/// Shareable, registerable observer handle. The [`BackendManager`](crate::backend::BackendManager)
178/// stores `Vec<ObserverHandle>` and iterates it on every commit.
179pub type ObserverHandle = Arc<dyn TransactionObserver>;
180
181// ============================================================================
182// ObserverRegistry
183// ============================================================================
184
185/// Shared, lock-free registry of [`TransactionObserver`]s.
186///
187/// One registry instance lives per-deployment process, held by
188/// [`BackendManager`](crate::backend::BackendManager) and cloned into every `LoggingBackend` so the
189/// commit path can fan out without coordination. The internal
190/// snapshot is swapped via [`arc_swap::ArcSwap`]: registration takes
191/// a brief write, every commit-path read is wait-free.
192///
193/// Registration happens during deployment startup before traffic is
194/// accepted. The contract assumes near-zero registrations after
195/// startup completes; nothing in the type enforces that, but the
196/// trait doc's "registration order matters" + "registered first
197/// fires first" promises hold as long as registration is
198/// startup-serialized.
199#[derive(Debug, Default)]
200pub struct ObserverRegistry {
201    /// Snapshot of the registered observers, swapped atomically on
202    /// registration. The empty default is `Arc::new(Vec::new())`.
203    observers: arc_swap::ArcSwap<Vec<ObserverHandle>>,
204}
205
206impl ObserverRegistry {
207    /// Construct an empty registry wrapped in an [`Arc`] for sharing.
208    #[must_use]
209    pub fn new() -> Arc<Self> {
210        Arc::new(Self::default())
211    }
212
213    /// Register an observer. Appends to the end of the snapshot so
214    /// registration order is preserved (replication first, audit
215    /// second per the trait doc).
216    pub fn register(&self, observer: ObserverHandle) {
217        let cur = self.observers.load_full();
218        let mut next = Vec::with_capacity(cur.len() + 1);
219        next.extend(cur.iter().cloned());
220        next.push(observer);
221        self.observers.store(Arc::new(next));
222    }
223
224    /// Snapshot of the currently-registered observers. Cheap (`Arc`
225    /// clone) — call from the commit path on every write.
226    #[must_use]
227    pub fn snapshot(&self) -> Arc<Vec<ObserverHandle>> {
228        self.observers.load_full()
229    }
230
231    /// Computed union of all registered observers' requirements.
232    /// Recomputed on each call; cache at the caller if hot.
233    #[must_use]
234    pub fn requirements(&self) -> ObserverRequirements {
235        self.observers
236            .load()
237            .iter()
238            .map(|o| o.requirements())
239            .fold(ObserverRequirements::default(), ObserverRequirements::union)
240    }
241
242    /// `true` if no observers are registered. Commit paths can use
243    /// this to skip building the [`TransactionEvent`] entirely.
244    #[must_use]
245    pub fn is_empty(&self) -> bool {
246        self.observers.load().is_empty()
247    }
248}
249
250#[cfg(test)]
251#[allow(clippy::unwrap_used, clippy::expect_used)]
252mod tests {
253    use super::*;
254
255    #[derive(Debug)]
256    struct NullObserver;
257    impl TransactionObserver for NullObserver {
258        fn name(&self) -> &'static str {
259            "null"
260        }
261        fn requirements(&self) -> ObserverRequirements {
262            ObserverRequirements::default()
263        }
264        fn on_commit(&self, _event: &TransactionEvent<'_>) -> Result<()> {
265            Ok(())
266        }
267    }
268
269    #[test]
270    fn requirements_union_or_combines() {
271        let a = ObserverRequirements {
272            needs_before_value: true,
273            needs_prev_hlc: false,
274        };
275        let b = ObserverRequirements {
276            needs_before_value: false,
277            needs_prev_hlc: true,
278        };
279        let c = a.union(b);
280        assert!(c.needs_before_value);
281        assert!(c.needs_prev_hlc);
282    }
283
284    #[test]
285    fn observer_handle_is_arc_send_sync() {
286        // Compile-time check: the trait object via ObserverHandle is
287        // shareable across threads. Important because BackendManager
288        // is shared across the runtime and observers are read from
289        // many tasks.
290        fn assert_send_sync<T: Send + Sync>() {}
291        assert_send_sync::<ObserverHandle>();
292        let _: ObserverHandle = Arc::new(NullObserver);
293    }
294
295    #[derive(Debug)]
296    struct WithReqs(ObserverRequirements);
297    impl TransactionObserver for WithReqs {
298        fn name(&self) -> &'static str {
299            "with-reqs"
300        }
301        fn requirements(&self) -> ObserverRequirements {
302            self.0
303        }
304        fn on_commit(&self, _event: &TransactionEvent<'_>) -> Result<()> {
305            Ok(())
306        }
307    }
308
309    #[test]
310    fn registry_starts_empty() {
311        let r = ObserverRegistry::new();
312        assert!(r.is_empty());
313        assert_eq!(r.snapshot().len(), 0);
314        assert!(!r.requirements().needs_before_value);
315        assert!(!r.requirements().needs_prev_hlc);
316    }
317
318    #[test]
319    fn registry_appends_in_registration_order() {
320        let r = ObserverRegistry::new();
321        r.register(Arc::new(WithReqs(ObserverRequirements::default())));
322        r.register(Arc::new(NullObserver));
323        let snap = r.snapshot();
324        assert_eq!(snap.len(), 2);
325        assert_eq!(snap[0].name(), "with-reqs");
326        assert_eq!(snap[1].name(), "null");
327    }
328
329    #[test]
330    fn registry_requirements_is_union() {
331        let r = ObserverRegistry::new();
332        r.register(Arc::new(WithReqs(ObserverRequirements {
333            needs_before_value: true,
334            needs_prev_hlc: false,
335        })));
336        r.register(Arc::new(WithReqs(ObserverRequirements {
337            needs_before_value: false,
338            needs_prev_hlc: true,
339        })));
340        let reqs = r.requirements();
341        assert!(reqs.needs_before_value);
342        assert!(reqs.needs_prev_hlc);
343    }
344
345    #[test]
346    fn registry_snapshot_is_cheap_arc_clone() {
347        // After register the snapshot reflects the new state; older
348        // snapshots taken before the register still point at the
349        // previous Arc — proving the swap is non-disruptive to readers.
350        let r = ObserverRegistry::new();
351        let before = r.snapshot();
352        r.register(Arc::new(NullObserver));
353        let after = r.snapshot();
354        assert_eq!(before.len(), 0);
355        assert_eq!(after.len(), 1);
356    }
357}