Skip to main content

yeti_types/backend/
log.rs

1//! Transaction log primitives — the unified durable record of every
2//! committed write in a yeti deployment.
3//!
4//! ## Why one log
5//!
6//! Before this surface, audit and replication would have lived as
7//! separate sinks: audit writing to its own tables, replication
8//! writing to a separate per-CF append-only log. The Harper precedent
9//! ([`harperfast/harper`]'s `RocksTransactionLogStore`) demonstrates
10//! the unified-store pattern works in production: one log records
11//! every commit, and both audit (range-by-time) and replication
12//! (tail-by-HLC) read from it. Yeti adopts the same shape because:
13//!
14//! - One write per commit (vs. two) — half the I/O on the write path.
15//! - Audit and replication can never disagree about what happened.
16//! - Single retention policy fits both (a node retains the log only
17//!   as long as its peers might need it for catch-up; audit needs
18//!   the same window).
19//! - New observers (custom audit consumers, real-time dashboards)
20//!   plug into the existing log instead of bolting on parallel
21//!   sinks.
22//!
23//! ## Position in the stack
24//!
25//! - **Trait + entry shape**: this module, `yeti-types::backend::log`.
26//!   Sits alongside the sibling `observer` module
27//!   ([`TransactionObserver`](crate::backend::TransactionObserver) /
28//!   [`TransactionEvent`](crate::backend::TransactionEvent)) — the
29//!   trio that defines the post-commit observation surface.
30//! - **Default implementation**: `yeti-store::transaction_log::KvTransactionLog`.
31//!   RocksDB-backed (or any `KvBackend`). Opens in its own
32//!   per-deployment directory so the log's atomicity is local
33//!   (data committed but log missing is a single-machine failure
34//!   mode, not a cross-system one).
35//! - **Commit-path integration**: future slice. The log is built
36//!   here as a standalone primitive so its contract can be reviewed
37//!   and tested before deciding how to intercept writes
38//!   (`LoggingBackend` wrapper vs. explicit hook surface on
39//!   [`BackendManager`](crate::backend::BackendManager)).
40//!
41//! ## Wire stability
42//!
43//! [`LogEntry`] is also the wire frame: the replication transport
44//! streams `LogEntry`s between peers. The shape is therefore stable
45//! across the cluster — adding a field requires a coordinated
46//! deployment. Use `Option<...>` for new optional fields so that
47//! older readers continue to parse newer entries.
48//!
49//! [`harperfast/harper`]: https://github.com/harperdb
50
51use std::sync::Arc;
52
53use async_trait::async_trait;
54use serde::{Deserialize, Serialize};
55
56use crate::error::Result;
57use crate::hlc::HlcTimestamp;
58use crate::types::TableId;
59
60use super::WriteOp;
61
62// ============================================================================
63// Originator
64// ============================================================================
65
66/// Identity of the party that initiated a write.
67///
68/// Captured on every [`LogEntry`] so audit queries can answer
69/// "who/what wrote this record" without a sidecar table, and
70/// replication can distinguish between locally-originated writes
71/// (which must be streamed to peers) and remotely-applied writes
72/// (which were received over the wire and must NOT be re-streamed).
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "kind", rename_all = "snake_case")]
75pub enum Originator {
76    /// Locally-initiated write through the request pipeline. `user`
77    /// is the authenticated principal when available; `None` for
78    /// anonymous writes (e.g., public tables with
79    /// `@export(public: [create])`).
80    Local {
81        /// Authenticated username, if the write went through the auth
82        /// pipeline.
83        user: Option<String>,
84    },
85    /// Write applied locally in response to a peer's replication
86    /// stream. Carries the source node id so replication's outgoing
87    /// log writer can filter out these entries — we don't re-stream
88    /// what we received.
89    Replicated {
90        /// Node id of the source peer that originated the write.
91        from_node: String,
92    },
93    /// Internal write performed by the platform itself — startup
94    /// bootstrap, system-table maintenance, scheduled compaction.
95    /// `component` is a short label for diagnostics
96    /// (`"auth-bootstrap"`, `"ttl-sweep"`, etc.).
97    Internal {
98        /// Name of the platform component that issued the write.
99        component: String,
100    },
101}
102
103impl Originator {
104    /// Convenience constructor for anonymous local writes.
105    #[must_use]
106    pub const fn anonymous() -> Self {
107        Self::Local { user: None }
108    }
109
110    /// Convenience constructor for authenticated local writes.
111    #[must_use]
112    pub fn local(user: impl Into<String>) -> Self {
113        Self::Local {
114            user: Some(user.into()),
115        }
116    }
117
118    /// True if this write was received over the replication wire
119    /// (i.e., should not be re-streamed). Used by the outgoing-log
120    /// writer to filter out apply-from-peer writes.
121    #[must_use]
122    pub const fn is_replicated(&self) -> bool {
123        matches!(self, Self::Replicated { .. })
124    }
125}
126
127// ============================================================================
128// LogEntry
129// ============================================================================
130
131/// A single durable record of a committed write.
132///
133/// Persisted to the per-deployment [`TransactionLog`] at commit time.
134/// Both audit and replication observers consume from this log:
135///
136/// - Replication tail-reads by HLC: streams new entries to peers,
137///   filtering out [`Originator::Replicated`] writes so applied-
138///   from-peer entries don't echo back.
139/// - Audit range-reads by HLC + filters by table / originator / user
140///   for query responses.
141///
142/// Wire-stable (serde-derived) because [`LogEntry`] frames also flow
143/// across the replication network protocol between nodes.
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct LogEntry {
146    /// HLC timestamp assigned at commit time. Strictly greater than
147    /// any previously-appended entry's HLC on the same node.
148    pub hlc: HlcTimestamp,
149
150    /// Node id of the node that PERSISTED this entry (the node that
151    /// observed the commit). Both locally-originated and replication-
152    /// applied writes log here, so `node_id` is always the local
153    /// node. Distinguished from [`Originator::Replicated::from_node`]
154    /// which records the conceptual writer.
155    pub node_id: String,
156
157    /// In-deployment table identity.
158    pub table: TableId,
159
160    /// The write operation that was committed.
161    pub op: WriteOp,
162
163    /// HLC of the existing record at this key prior to the write, if
164    /// any. `None` for inserts. Used by anti-entropy to
165    /// record causal predecessor relationships for the receive-side
166    /// HLC-LWW comparison.
167    pub prev_hlc: Option<HlcTimestamp>,
168
169    /// Raw bytes of the existing record at this key prior to the
170    /// write, if the table opted into `@audit(capture_state: true)`
171    /// and the `LoggingBackend` was configured with
172    /// `capture_before: true`. `None` when capture was disabled OR
173    /// the key didn't exist (insert path). Decoded as JSON by audit
174    /// readers to surface as `AuditEntry.before`.
175    ///
176    /// Storage cost is significant — each captured write doubles
177    /// the log entry size plus pays one extra `get` at write time
178    /// — so this is opt-in per table via the `@audit` directive.
179    ///
180    /// Note: encoded inline (not `skip_serializing_if`) because the
181    /// wire format is rmp-serde tuple-positional; skipping fields
182    /// breaks the decoder's positional expectations.
183    pub prev_value: Option<Vec<u8>>,
184
185    /// Who / what initiated the write.
186    pub originator: Originator,
187
188    /// Optional correlation id for traceability — typically the
189    /// request id from the HTTP / MQTT / gRPC layer.
190    pub request_id: Option<String>,
191
192    /// Which protocol / interface the write came in through — `"rest"`,
193    /// `"graphql"`, `"mqtt"`, `"mcp"`, or `"internal"` for platform-
194    /// originated writes. `None` when the originator couldn't be
195    /// determined (background tasks, startup bootstrap that didn't
196    /// set the task-local). Audit queries surface this for filtering.
197    pub interface: Option<String>,
198}
199
200impl LogEntry {
201    /// Serialize to `MessagePack` bytes for storage / wire.
202    ///
203    /// # Errors
204    ///
205    /// Returns [`crate::error::YetiError::Internal`] if encoding fails
206    /// (in practice, impossible for fully-owned `LogEntry` values; the
207    /// `Result` is preserved for forward-compatibility with future
208    /// fields that may have encoding constraints).
209    pub fn encode(&self) -> Result<Vec<u8>> {
210        rmp_serde::to_vec(self)
211            .map_err(|e| crate::error::YetiError::Internal(format!("LogEntry encode failed: {e}")))
212    }
213
214    /// Deserialize from `MessagePack` bytes.
215    ///
216    /// # Errors
217    ///
218    /// Returns [`crate::error::YetiError::Internal`] if the input is
219    /// not a valid `LogEntry` encoding (truncation, version skew, or
220    /// corruption on disk).
221    pub fn decode(bytes: &[u8]) -> Result<Self> {
222        rmp_serde::from_slice(bytes)
223            .map_err(|e| crate::error::YetiError::Internal(format!("LogEntry decode failed: {e}")))
224    }
225}
226
227// ============================================================================
228// RetentionPolicy
229// ============================================================================
230
231/// Retention policy for the transaction log.
232///
233/// At least one bound must be set for [`TransactionLog::sweep`] to do
234/// useful work; with both bounds set, sweep removes entries that
235/// exceed *either* bound (logical OR — whichever is tighter wins).
236///
237/// Time-based sweep is the cheaper path: a single HLC threshold,
238/// `delete_range`-friendly. Size-based sweep is more expensive because
239/// it requires totaling on-disk bytes; expect implementations to
240/// approximate (e.g., count entries and assume average size) rather
241/// than exact byte accounting.
242#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
243pub struct RetentionPolicy {
244    /// Remove entries with physical-time component older than
245    /// `now - max_age_secs`. `None` disables age-based sweep.
246    pub max_age_secs: Option<u64>,
247
248    /// Trim the log to at most this many on-disk bytes by removing
249    /// the oldest entries. `None` disables size-based sweep. Best-
250    /// effort: implementations may overshoot by one entry rather
251    /// than partial-truncate.
252    pub max_size_bytes: Option<u64>,
253}
254
255impl RetentionPolicy {
256    /// Time-only retention policy.
257    #[must_use]
258    pub const fn time_only(max_age_secs: u64) -> Self {
259        Self {
260            max_age_secs: Some(max_age_secs),
261            max_size_bytes: None,
262        }
263    }
264
265    /// Size-only retention policy.
266    #[must_use]
267    pub const fn size_only(max_size_bytes: u64) -> Self {
268        Self {
269            max_age_secs: None,
270            max_size_bytes: Some(max_size_bytes),
271        }
272    }
273
274    /// Combined retention — sweep removes entries that exceed
275    /// **either** bound.
276    #[must_use]
277    pub const fn dual(max_age_secs: u64, max_size_bytes: u64) -> Self {
278        Self {
279            max_age_secs: Some(max_age_secs),
280            max_size_bytes: Some(max_size_bytes),
281        }
282    }
283
284    /// True if no bound is set; [`TransactionLog::sweep`] is a no-op
285    /// in this case.
286    #[must_use]
287    pub const fn is_disabled(&self) -> bool {
288        self.max_age_secs.is_none() && self.max_size_bytes.is_none()
289    }
290}
291
292// ============================================================================
293// TransactionLog trait
294// ============================================================================
295
296/// Per-deployment append-only log of every committed write.
297///
298/// One instance per deployment process; lives for the deployment's
299/// lifetime. Entries are HLC-ordered and survive process restart
300/// (durability is the foundational contract — replication's catch-
301/// up window relies on it).
302///
303/// ## Contract
304///
305/// - [`append`](Self::append) is the durability sink. Returns only
306///   after the entry has been fsynced (or the implementation's
307///   chosen durability point); subsequent [`read_after`](Self::read_after)
308///   calls on the same instance must see the new entry.
309/// - [`read_after`](Self::read_after) returns entries strictly
310///   greater than the given HLC, in ascending HLC order, up to
311///   `limit`. Used by replication for streaming tail-reads and
312///   catch-up on peer reconnection.
313/// - [`sweep`](Self::sweep) removes entries that exceed the
314///   retention bounds. Returns the count of removed entries. Sweep
315///   is the only operation that DELETES from the log — append is
316///   the only operation that ADDS.
317///
318/// ## Implementations
319///
320/// The canonical implementation is
321/// `yeti_store::transaction_log::KvTransactionLog`, backed by a
322/// dedicated [`KvBackend`](crate::backend::KvBackend) opened at
323/// `{root_directory}/logs/transactions/{deployment_hash}/`. Tests use
324/// the in-memory [`MemoryBackend`](crate::backend::MemoryBackend)
325/// under the same wrapper.
326#[async_trait]
327pub trait TransactionLog: Send + Sync + std::fmt::Debug {
328    /// Append a new entry. The entry's `hlc` is assigned by the
329    /// caller (typically from a [`HybridLogicalClock::now`](crate::hlc::HybridLogicalClock::now)
330    /// call at commit time). The log persists the entry verbatim.
331    ///
332    /// # Errors
333    ///
334    /// - [`crate::error::YetiError::Internal`] if encoding fails.
335    /// - Whatever the underlying storage backend returns from a write.
336    async fn append(&self, entry: LogEntry) -> Result<()>;
337
338    /// Append many entries in one call. Implementations should fuse
339    /// the appends into a single batched backend write where possible
340    /// — the default implementation falls back to a per-entry loop and
341    /// is correct but pays one awaited backend call per entry.
342    ///
343    /// Used by [`LoggingBackend::write_batch`](../../yeti_store/struct.LoggingBackend.html)
344    /// so a 100-record table batch produces one storage write per
345    /// table + one batched write on the log, instead of 100 serial
346    /// awaited log puts.
347    ///
348    /// # Errors
349    ///
350    /// - [`crate::error::YetiError::Internal`] if encoding any entry fails.
351    /// - Whatever the underlying storage backend returns from a write.
352    async fn append_batch(&self, entries: Vec<LogEntry>) -> Result<()> {
353        for entry in entries {
354            self.append(entry).await?;
355        }
356        Ok(())
357    }
358
359    /// Read up to `limit` entries with `hlc > after`, in ascending
360    /// HLC order. `after = HlcTimestamp::ZERO` reads from the
361    /// beginning of the log.
362    ///
363    /// # Errors
364    ///
365    /// - [`crate::error::YetiError::Internal`] if decoding any entry
366    ///   fails (indicates on-disk corruption or version skew).
367    /// - Whatever the underlying storage backend returns from a scan.
368    async fn read_after(&self, after: HlcTimestamp, limit: usize) -> Result<Vec<LogEntry>>;
369
370    /// Remove entries that exceed the retention policy. Returns the
371    /// number of entries removed.
372    ///
373    /// # Errors
374    ///
375    /// Whatever the underlying storage backend returns from scan +
376    /// batch-delete operations.
377    async fn sweep(&self, policy: RetentionPolicy) -> Result<usize>;
378}
379
380/// Shareable handle for a [`TransactionLog`] implementation.
381pub type TransactionLogHandle = Arc<dyn TransactionLog>;
382
383// ============================================================================
384// WriteContext — task-local plumbing for originator / interface / request id
385// ============================================================================
386
387/// Per-request write context — set at the request entry point
388/// (HTTP/MQTT/gRPC dispatcher) and read by the backend logging layer
389/// to populate the corresponding fields on the [`LogEntry`].
390///
391/// Threaded as a tokio task-local so that intermediate layers don't
392/// have to pass `Originator` / interface / request id through every
393/// function signature. The HTTP dispatcher wraps each request future
394/// in `WRITE_CTX::scope`; any backend write that happens inside the
395/// scoped future reads the context and stamps the resulting log entry
396/// accordingly.
397///
398/// When no scope is active (background tasks, startup bootstrap,
399/// internal sweeps), [`current_write_context`] returns
400/// [`WriteContext::internal`] with `component = "yeti-host"` so the
401/// log still records a meaningful originator.
402#[derive(Debug, Clone)]
403pub struct WriteContext {
404    /// Who initiated the write.
405    pub originator: Originator,
406    /// Which protocol the write came in through — `"rest"`, `"graphql"`,
407    /// `"mqtt"`, `"mcp"`, `"internal"`. `None` when not derivable.
408    pub interface: Option<String>,
409    /// Optional request correlation id (e.g., the HTTP `x-request-id`
410    /// header value or a generated UUID).
411    pub request_id: Option<String>,
412}
413
414impl WriteContext {
415    /// The default fallback used when no scope is active. Marks the
416    /// write as originating from the yeti-host platform itself.
417    #[must_use]
418    pub fn internal() -> Self {
419        Self {
420            originator: Originator::Internal {
421                component: "yeti-host".to_owned(),
422            },
423            interface: Some("internal".to_owned()),
424            request_id: None,
425        }
426    }
427
428    /// Construct a [`WriteContext`] for a request that came in through
429    /// the named interface, identifying the authenticated user when
430    /// known.
431    #[must_use]
432    pub fn for_request(
433        interface: impl Into<String>,
434        user: Option<String>,
435        request_id: Option<String>,
436    ) -> Self {
437        let originator = user.map_or_else(Originator::anonymous, |u| Originator::Local {
438            user: Some(u),
439        });
440        Self {
441            originator,
442            interface: Some(interface.into()),
443            request_id,
444        }
445    }
446
447    /// Construct a [`WriteContext`] for a write applied locally in
448    /// response to a peer's replication stream.
449    #[must_use]
450    pub fn for_replication(from_node: impl Into<String>) -> Self {
451        Self {
452            originator: Originator::Replicated {
453                from_node: from_node.into(),
454            },
455            interface: Some("replication".to_owned()),
456            request_id: None,
457        }
458    }
459}
460
461tokio::task_local! {
462    /// Per-task [`WriteContext`]. Set via [`WRITE_CTX::scope`] at the
463    /// request entry point.
464    pub static WRITE_CTX: WriteContext;
465}
466
467/// Read the active [`WriteContext`], or return [`WriteContext::internal`]
468/// if no scope is currently active.
469///
470/// Backend logging layers call this to populate the originator /
471/// interface / `request_id` fields on each [`LogEntry`]; it never
472/// fails — a missing scope means "platform-originated" rather than
473/// an error.
474#[must_use]
475pub fn current_write_context() -> WriteContext {
476    WRITE_CTX
477        .try_with(Clone::clone)
478        .unwrap_or_else(|_| WriteContext::internal())
479}
480
481/// Build a [`WriteContext`] that overlays the authenticated user
482/// onto whatever interface / `request_id` an outer scope (typically
483/// the HTTP/MQTT/etc. dispatcher) has already set.
484///
485/// If no outer scope is active, falls back to
486/// [`WriteContext::internal`] for interface / `request_id` so the
487/// returned context is still well-formed — but in production every
488/// real write path should have an outer dispatcher scope. The
489/// fallback exists for tests and background tasks that call into the
490/// yeti-table handlers without going through HTTP.
491#[must_use]
492pub fn current_write_context_with_user(user: Option<String>) -> WriteContext {
493    let outer = current_write_context();
494    let originator = user.map_or_else(Originator::anonymous, |u| Originator::Local {
495        user: Some(u),
496    });
497    WriteContext {
498        originator,
499        interface: outer.interface,
500        request_id: outer.request_id,
501    }
502}
503
504// ============================================================================
505// Tests
506// ============================================================================
507
508#[cfg(test)]
509#[allow(clippy::unwrap_used, clippy::expect_used)]
510mod tests {
511    use super::*;
512
513    fn sample_entry() -> LogEntry {
514        LogEntry {
515            hlc: HlcTimestamp::new(1_700_000_000_000, 5),
516            node_id: "node-a".to_owned(),
517            table: TableId::new("demo-app", "data", "users"),
518            op: WriteOp::Put {
519                key: b"alice".to_vec(),
520                value: b"{\"role\":\"admin\"}".to_vec(),
521            },
522            prev_hlc: Some(HlcTimestamp::new(1_699_000_000_000, 0)),
523            prev_value: None,
524            originator: Originator::local("admin"),
525            request_id: Some("req-7f3".to_owned()),
526            interface: Some("rest".to_owned()),
527        }
528    }
529
530    #[test]
531    fn originator_serde_round_trip() {
532        let cases = [
533            Originator::anonymous(),
534            Originator::local("alice"),
535            Originator::Replicated {
536                from_node: "node-b".to_owned(),
537            },
538            Originator::Internal {
539                component: "auth-bootstrap".to_owned(),
540            },
541        ];
542        for o in cases {
543            let bytes = rmp_serde::to_vec(&o).unwrap();
544            let back: Originator = rmp_serde::from_slice(&bytes).unwrap();
545            assert_eq!(o, back);
546        }
547    }
548
549    #[test]
550    fn originator_is_replicated_only_for_replicated_variant() {
551        assert!(!Originator::anonymous().is_replicated());
552        assert!(!Originator::local("alice").is_replicated());
553        assert!(
554            Originator::Replicated {
555                from_node: "node-b".to_owned()
556            }
557            .is_replicated()
558        );
559        assert!(
560            !Originator::Internal {
561                component: "x".to_owned()
562            }
563            .is_replicated()
564        );
565    }
566
567    #[test]
568    fn log_entry_encode_decode_round_trip() {
569        let entry = sample_entry();
570        let bytes = entry.encode().unwrap();
571        let back = LogEntry::decode(&bytes).unwrap();
572        assert_eq!(entry, back);
573    }
574
575    #[test]
576    fn log_entry_decode_rejects_garbage() {
577        let result = LogEntry::decode(b"not-a-real-msgpack-payload");
578        assert!(result.is_err());
579        let msg = result.unwrap_err().to_string();
580        assert!(msg.contains("LogEntry decode failed"), "got: {msg}");
581    }
582
583    #[test]
584    fn log_entry_for_delete_op() {
585        let entry = LogEntry {
586            op: WriteOp::Delete {
587                key: b"alice".to_vec(),
588            },
589            ..sample_entry()
590        };
591        let bytes = entry.encode().unwrap();
592        let back = LogEntry::decode(&bytes).unwrap();
593        assert_eq!(entry, back);
594        assert!(back.op.is_delete());
595    }
596
597    #[test]
598    fn retention_policy_constructors() {
599        let t = RetentionPolicy::time_only(3600);
600        assert_eq!(t.max_age_secs, Some(3600));
601        assert_eq!(t.max_size_bytes, None);
602
603        let s = RetentionPolicy::size_only(1_000_000);
604        assert_eq!(s.max_age_secs, None);
605        assert_eq!(s.max_size_bytes, Some(1_000_000));
606
607        let d = RetentionPolicy::dual(3600, 1_000_000);
608        assert_eq!(d.max_age_secs, Some(3600));
609        assert_eq!(d.max_size_bytes, Some(1_000_000));
610    }
611
612    #[test]
613    fn retention_policy_default_is_disabled() {
614        assert!(RetentionPolicy::default().is_disabled());
615        assert!(!RetentionPolicy::time_only(60).is_disabled());
616        assert!(!RetentionPolicy::size_only(1).is_disabled());
617    }
618
619    #[test]
620    fn write_op_key_accessor_works_for_both_variants() {
621        let put = WriteOp::Put {
622            key: b"k".to_vec(),
623            value: b"v".to_vec(),
624        };
625        let del = WriteOp::Delete { key: b"k".to_vec() };
626        assert_eq!(put.key(), b"k");
627        assert_eq!(del.key(), b"k");
628        assert!(!put.is_delete());
629        assert!(del.is_delete());
630    }
631
632    #[tokio::test]
633    async fn current_write_context_returns_internal_when_no_scope() {
634        let ctx = current_write_context();
635        match ctx.originator {
636            Originator::Internal { component } => assert_eq!(component, "yeti-host"),
637            _ => panic!("expected Internal originator outside scope"),
638        }
639        assert_eq!(ctx.interface.as_deref(), Some("internal"));
640    }
641
642    #[tokio::test]
643    async fn current_write_context_reads_active_scope() {
644        let scoped =
645            WriteContext::for_request("rest", Some("alice".to_owned()), Some("req-abc".to_owned()));
646        let observed = WRITE_CTX
647            .scope(scoped.clone(), async { current_write_context() })
648            .await;
649        assert_eq!(observed.interface.as_deref(), Some("rest"));
650        assert_eq!(observed.request_id.as_deref(), Some("req-abc"));
651        match observed.originator {
652            Originator::Local { user } => assert_eq!(user.as_deref(), Some("alice")),
653            _ => panic!("expected Local originator"),
654        }
655    }
656
657    #[tokio::test]
658    async fn write_context_for_request_anonymous_when_user_is_none() {
659        let ctx = WriteContext::for_request("graphql", None, None);
660        assert!(matches!(ctx.originator, Originator::Local { user: None }));
661        assert_eq!(ctx.interface.as_deref(), Some("graphql"));
662    }
663
664    #[tokio::test]
665    async fn write_context_for_replication_marks_originator() {
666        let ctx = WriteContext::for_replication("node-east");
667        assert!(ctx.originator.is_replicated());
668        assert_eq!(ctx.interface.as_deref(), Some("replication"));
669    }
670
671    #[tokio::test]
672    async fn current_write_context_with_user_inherits_outer_interface_and_request_id() {
673        let outer = WriteContext {
674            originator: Originator::anonymous(),
675            interface: Some("rest".to_owned()),
676            request_id: Some("req-42".to_owned()),
677        };
678        WRITE_CTX
679            .scope(outer, async {
680                let inner = current_write_context_with_user(Some("alice".to_owned()));
681                assert_eq!(inner.interface.as_deref(), Some("rest"));
682                assert_eq!(inner.request_id.as_deref(), Some("req-42"));
683                assert!(matches!(
684                    inner.originator,
685                    Originator::Local { ref user } if user.as_deref() == Some("alice")
686                ));
687            })
688            .await;
689    }
690
691    #[tokio::test]
692    async fn current_write_context_with_user_falls_back_to_internal_outside_scope() {
693        let ctx = current_write_context_with_user(Some("alice".to_owned()));
694        // Outer scope absent, so interface inherits the internal default.
695        assert_eq!(ctx.interface.as_deref(), Some("internal"));
696        assert!(matches!(
697            ctx.originator,
698            Originator::Local { ref user } if user.as_deref() == Some("alice")
699        ));
700    }
701
702    #[tokio::test]
703    async fn nested_write_ctx_scope_inner_overrides_outer() {
704        let outer = WriteContext {
705            originator: Originator::anonymous(),
706            interface: Some("rest".to_owned()),
707            request_id: Some("req-1".to_owned()),
708        };
709        WRITE_CTX
710            .scope(outer, async {
711                let inner = current_write_context_with_user(Some("bob".to_owned()));
712                WRITE_CTX
713                    .scope(inner, async {
714                        let observed = current_write_context();
715                        // Inner originator wins.
716                        assert!(matches!(
717                            observed.originator,
718                            Originator::Local { ref user } if user.as_deref() == Some("bob")
719                        ));
720                        // Outer interface + request_id still visible.
721                        assert_eq!(observed.interface.as_deref(), Some("rest"));
722                        assert_eq!(observed.request_id.as_deref(), Some("req-1"));
723                    })
724                    .await;
725            })
726            .await;
727    }
728
729    #[tokio::test]
730    async fn write_ctx_scope_does_not_leak_across_tasks() {
731        let outer = WriteContext::for_request("rest", Some("alice".to_owned()), None);
732        WRITE_CTX
733            .scope(outer, async {
734                // Inside the scope: see alice.
735                let inner_ctx = current_write_context();
736                assert!(matches!(
737                    inner_ctx.originator,
738                    Originator::Local { user: Some(ref u) } if u == "alice"
739                ));
740
741                // A spawned task does NOT inherit the task-local; it
742                // should fall back to Internal.
743                let handle = tokio::spawn(async { current_write_context() });
744                let spawned_ctx = handle.await.unwrap();
745                assert!(matches!(
746                    spawned_ctx.originator,
747                    Originator::Internal { .. }
748                ));
749            })
750            .await;
751    }
752}