yeti_types/backend/log.rs
1//! Transaction log primitives — the unified durable record of every
2//! committed write in a yeti deployment.
3//!
4//! ## Why one log
5//!
6//! Before this surface, audit and replication would have lived as
7//! separate sinks: audit writing to its own tables, replication
8//! writing to a separate per-CF append-only log. The Harper precedent
9//! ([`harperfast/harper`]'s `RocksTransactionLogStore`) demonstrates
10//! the unified-store pattern works in production: one log records
11//! every commit, and both audit (range-by-time) and replication
12//! (tail-by-HLC) read from it. Yeti adopts the same shape because:
13//!
14//! - One write per commit (vs. two) — half the I/O on the write path.
15//! - Audit and replication can never disagree about what happened.
16//! - Single retention policy fits both (a node retains the log only
17//! as long as its peers might need it for catch-up; audit needs
18//! the same window).
19//! - New observers (custom audit consumers, real-time dashboards)
20//! plug into the existing log instead of bolting on parallel
21//! sinks.
22//!
23//! ## Position in the stack
24//!
25//! - **Trait + entry shape**: this module, `yeti-types::backend::log`.
26//! Sits alongside the sibling `observer` module
27//! ([`TransactionObserver`](crate::backend::TransactionObserver) /
28//! [`TransactionEvent`](crate::backend::TransactionEvent)) — the
29//! trio that defines the post-commit observation surface.
30//! - **Default implementation**: `yeti-store::transaction_log::KvTransactionLog`.
31//! RocksDB-backed (or any `KvBackend`). Opens in its own
32//! per-deployment directory so the log's atomicity is local
33//! (data committed but log missing is a single-machine failure
34//! mode, not a cross-system one).
35//! - **Commit-path integration**: future slice. The log is built
36//! here as a standalone primitive so its contract can be reviewed
37//! and tested before deciding how to intercept writes
38//! (`LoggingBackend` wrapper vs. explicit hook surface on
39//! [`BackendManager`](crate::backend::BackendManager)).
40//!
41//! ## Wire stability
42//!
43//! [`LogEntry`] is also the wire frame: the replication transport
44//! streams `LogEntry`s between peers. The shape is therefore stable
45//! across the cluster — adding a field requires a coordinated
46//! deployment. Use `Option<...>` for new optional fields so that
47//! older readers continue to parse newer entries.
48//!
49//! [`harperfast/harper`]: https://github.com/harperdb
50
51use std::sync::Arc;
52
53use async_trait::async_trait;
54use serde::{Deserialize, Serialize};
55
56use crate::error::Result;
57use crate::hlc::HlcTimestamp;
58use crate::types::TableId;
59
60use super::WriteOp;
61
62// ============================================================================
63// Originator
64// ============================================================================
65
66/// Identity of the party that initiated a write.
67///
68/// Captured on every [`LogEntry`] so audit queries can answer
69/// "who/what wrote this record" without a sidecar table, and
70/// replication can distinguish between locally-originated writes
71/// (which must be streamed to peers) and remotely-applied writes
72/// (which were received over the wire and must NOT be re-streamed).
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74#[serde(tag = "kind", rename_all = "snake_case")]
75pub enum Originator {
76 /// Locally-initiated write through the request pipeline. `user`
77 /// is the authenticated principal when available; `None` for
78 /// anonymous writes (e.g., public tables with
79 /// `@export(public: [create])`).
80 Local {
81 /// Authenticated username, if the write went through the auth
82 /// pipeline.
83 user: Option<String>,
84 },
85 /// Write applied locally in response to a peer's replication
86 /// stream. Carries the source node id so replication's outgoing
87 /// log writer can filter out these entries — we don't re-stream
88 /// what we received.
89 Replicated {
90 /// Node id of the source peer that originated the write.
91 from_node: String,
92 },
93 /// Internal write performed by the platform itself — startup
94 /// bootstrap, system-table maintenance, scheduled compaction.
95 /// `component` is a short label for diagnostics
96 /// (`"auth-bootstrap"`, `"ttl-sweep"`, etc.).
97 Internal {
98 /// Name of the platform component that issued the write.
99 component: String,
100 },
101}
102
103impl Originator {
104 /// Convenience constructor for anonymous local writes.
105 #[must_use]
106 pub const fn anonymous() -> Self {
107 Self::Local { user: None }
108 }
109
110 /// Convenience constructor for authenticated local writes.
111 #[must_use]
112 pub fn local(user: impl Into<String>) -> Self {
113 Self::Local {
114 user: Some(user.into()),
115 }
116 }
117
118 /// True if this write was received over the replication wire
119 /// (i.e., should not be re-streamed). Used by the outgoing-log
120 /// writer to filter out apply-from-peer writes.
121 #[must_use]
122 pub const fn is_replicated(&self) -> bool {
123 matches!(self, Self::Replicated { .. })
124 }
125}
126
127// ============================================================================
128// LogEntry
129// ============================================================================
130
131/// A single durable record of a committed write.
132///
133/// Persisted to the per-deployment [`TransactionLog`] at commit time.
134/// Both audit and replication observers consume from this log:
135///
136/// - Replication tail-reads by HLC: streams new entries to peers,
137/// filtering out [`Originator::Replicated`] writes so applied-
138/// from-peer entries don't echo back.
139/// - Audit range-reads by HLC + filters by table / originator / user
140/// for query responses.
141///
142/// Wire-stable (serde-derived) because [`LogEntry`] frames also flow
143/// across the replication network protocol between nodes.
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct LogEntry {
146 /// HLC timestamp assigned at commit time. Strictly greater than
147 /// any previously-appended entry's HLC on the same node.
148 pub hlc: HlcTimestamp,
149
150 /// Node id of the node that PERSISTED this entry (the node that
151 /// observed the commit). Both locally-originated and replication-
152 /// applied writes log here, so `node_id` is always the local
153 /// node. Distinguished from [`Originator::Replicated::from_node`]
154 /// which records the conceptual writer.
155 pub node_id: String,
156
157 /// In-deployment table identity.
158 pub table: TableId,
159
160 /// The write operation that was committed.
161 pub op: WriteOp,
162
163 /// HLC of the existing record at this key prior to the write, if
164 /// any. `None` for inserts. Used by anti-entropy to
165 /// record causal predecessor relationships for the receive-side
166 /// HLC-LWW comparison.
167 pub prev_hlc: Option<HlcTimestamp>,
168
169 /// Raw bytes of the existing record at this key prior to the
170 /// write, if the table opted into `@audit(capture_state: true)`
171 /// and the `LoggingBackend` was configured with
172 /// `capture_before: true`. `None` when capture was disabled OR
173 /// the key didn't exist (insert path). Decoded as JSON by audit
174 /// readers to surface as `AuditEntry.before`.
175 ///
176 /// Storage cost is significant — each captured write doubles
177 /// the log entry size plus pays one extra `get` at write time
178 /// — so this is opt-in per table via the `@audit` directive.
179 ///
180 /// Note: encoded inline (not `skip_serializing_if`) because the
181 /// wire format is rmp-serde tuple-positional; skipping fields
182 /// breaks the decoder's positional expectations.
183 pub prev_value: Option<Vec<u8>>,
184
185 /// Who / what initiated the write.
186 pub originator: Originator,
187
188 /// Optional correlation id for traceability — typically the
189 /// request id from the HTTP / MQTT / gRPC layer.
190 pub request_id: Option<String>,
191
192 /// Which protocol / interface the write came in through — `"rest"`,
193 /// `"graphql"`, `"mqtt"`, `"mcp"`, or `"internal"` for platform-
194 /// originated writes. `None` when the originator couldn't be
195 /// determined (background tasks, startup bootstrap that didn't
196 /// set the task-local). Audit queries surface this for filtering.
197 pub interface: Option<String>,
198}
199
200impl LogEntry {
201 /// Serialize to `MessagePack` bytes for storage / wire.
202 ///
203 /// # Errors
204 ///
205 /// Returns [`crate::error::YetiError::Internal`] if encoding fails
206 /// (in practice, impossible for fully-owned `LogEntry` values; the
207 /// `Result` is preserved for forward-compatibility with future
208 /// fields that may have encoding constraints).
209 pub fn encode(&self) -> Result<Vec<u8>> {
210 rmp_serde::to_vec(self)
211 .map_err(|e| crate::error::YetiError::Internal(format!("LogEntry encode failed: {e}")))
212 }
213
214 /// Deserialize from `MessagePack` bytes.
215 ///
216 /// # Errors
217 ///
218 /// Returns [`crate::error::YetiError::Internal`] if the input is
219 /// not a valid `LogEntry` encoding (truncation, version skew, or
220 /// corruption on disk).
221 pub fn decode(bytes: &[u8]) -> Result<Self> {
222 rmp_serde::from_slice(bytes)
223 .map_err(|e| crate::error::YetiError::Internal(format!("LogEntry decode failed: {e}")))
224 }
225}
226
227// ============================================================================
228// RetentionPolicy
229// ============================================================================
230
231/// Retention policy for the transaction log.
232///
233/// At least one bound must be set for [`TransactionLog::sweep`] to do
234/// useful work; with both bounds set, sweep removes entries that
235/// exceed *either* bound (logical OR — whichever is tighter wins).
236///
237/// Time-based sweep is the cheaper path: a single HLC threshold,
238/// `delete_range`-friendly. Size-based sweep is more expensive because
239/// it requires totaling on-disk bytes; expect implementations to
240/// approximate (e.g., count entries and assume average size) rather
241/// than exact byte accounting.
242#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
243pub struct RetentionPolicy {
244 /// Remove entries with physical-time component older than
245 /// `now - max_age_secs`. `None` disables age-based sweep.
246 pub max_age_secs: Option<u64>,
247
248 /// Trim the log to at most this many on-disk bytes by removing
249 /// the oldest entries. `None` disables size-based sweep. Best-
250 /// effort: implementations may overshoot by one entry rather
251 /// than partial-truncate.
252 pub max_size_bytes: Option<u64>,
253}
254
255impl RetentionPolicy {
256 /// Time-only retention policy.
257 #[must_use]
258 pub const fn time_only(max_age_secs: u64) -> Self {
259 Self {
260 max_age_secs: Some(max_age_secs),
261 max_size_bytes: None,
262 }
263 }
264
265 /// Size-only retention policy.
266 #[must_use]
267 pub const fn size_only(max_size_bytes: u64) -> Self {
268 Self {
269 max_age_secs: None,
270 max_size_bytes: Some(max_size_bytes),
271 }
272 }
273
274 /// Combined retention — sweep removes entries that exceed
275 /// **either** bound.
276 #[must_use]
277 pub const fn dual(max_age_secs: u64, max_size_bytes: u64) -> Self {
278 Self {
279 max_age_secs: Some(max_age_secs),
280 max_size_bytes: Some(max_size_bytes),
281 }
282 }
283
284 /// True if no bound is set; [`TransactionLog::sweep`] is a no-op
285 /// in this case.
286 #[must_use]
287 pub const fn is_disabled(&self) -> bool {
288 self.max_age_secs.is_none() && self.max_size_bytes.is_none()
289 }
290}
291
292// ============================================================================
293// TransactionLog trait
294// ============================================================================
295
296/// Per-deployment append-only log of every committed write.
297///
298/// One instance per deployment process; lives for the deployment's
299/// lifetime. Entries are HLC-ordered and survive process restart
300/// (durability is the foundational contract — replication's catch-
301/// up window relies on it).
302///
303/// ## Contract
304///
305/// - [`append`](Self::append) is the durability sink. Returns only
306/// after the entry has been fsynced (or the implementation's
307/// chosen durability point); subsequent [`read_after`](Self::read_after)
308/// calls on the same instance must see the new entry.
309/// - [`read_after`](Self::read_after) returns entries strictly
310/// greater than the given HLC, in ascending HLC order, up to
311/// `limit`. Used by replication for streaming tail-reads and
312/// catch-up on peer reconnection.
313/// - [`sweep`](Self::sweep) removes entries that exceed the
314/// retention bounds. Returns the count of removed entries. Sweep
315/// is the only operation that DELETES from the log — append is
316/// the only operation that ADDS.
317///
318/// ## Implementations
319///
320/// The canonical implementation is
321/// `yeti_store::transaction_log::KvTransactionLog`, backed by a
322/// dedicated [`KvBackend`](crate::backend::KvBackend) opened at
323/// `{root_directory}/logs/transactions/{deployment_hash}/`. Tests use
324/// the in-memory [`MemoryBackend`](crate::backend::MemoryBackend)
325/// under the same wrapper.
326#[async_trait]
327pub trait TransactionLog: Send + Sync + std::fmt::Debug {
328 /// Append a new entry. The entry's `hlc` is assigned by the
329 /// caller (typically from a [`HybridLogicalClock::now`](crate::hlc::HybridLogicalClock::now)
330 /// call at commit time). The log persists the entry verbatim.
331 ///
332 /// # Errors
333 ///
334 /// - [`crate::error::YetiError::Internal`] if encoding fails.
335 /// - Whatever the underlying storage backend returns from a write.
336 async fn append(&self, entry: LogEntry) -> Result<()>;
337
338 /// Append many entries in one call. Implementations should fuse
339 /// the appends into a single batched backend write where possible
340 /// — the default implementation falls back to a per-entry loop and
341 /// is correct but pays one awaited backend call per entry.
342 ///
343 /// Used by [`LoggingBackend::write_batch`](../../yeti_store/struct.LoggingBackend.html)
344 /// so a 100-record table batch produces one storage write per
345 /// table + one batched write on the log, instead of 100 serial
346 /// awaited log puts.
347 ///
348 /// # Errors
349 ///
350 /// - [`crate::error::YetiError::Internal`] if encoding any entry fails.
351 /// - Whatever the underlying storage backend returns from a write.
352 async fn append_batch(&self, entries: Vec<LogEntry>) -> Result<()> {
353 for entry in entries {
354 self.append(entry).await?;
355 }
356 Ok(())
357 }
358
359 /// Read up to `limit` entries with `hlc > after`, in ascending
360 /// HLC order. `after = HlcTimestamp::ZERO` reads from the
361 /// beginning of the log.
362 ///
363 /// # Errors
364 ///
365 /// - [`crate::error::YetiError::Internal`] if decoding any entry
366 /// fails (indicates on-disk corruption or version skew).
367 /// - Whatever the underlying storage backend returns from a scan.
368 async fn read_after(&self, after: HlcTimestamp, limit: usize) -> Result<Vec<LogEntry>>;
369
370 /// Remove entries that exceed the retention policy. Returns the
371 /// number of entries removed.
372 ///
373 /// # Errors
374 ///
375 /// Whatever the underlying storage backend returns from scan +
376 /// batch-delete operations.
377 async fn sweep(&self, policy: RetentionPolicy) -> Result<usize>;
378}
379
380/// Shareable handle for a [`TransactionLog`] implementation.
381pub type TransactionLogHandle = Arc<dyn TransactionLog>;
382
383// ============================================================================
384// WriteContext — task-local plumbing for originator / interface / request id
385// ============================================================================
386
387/// Per-request write context — set at the request entry point
388/// (HTTP/MQTT/gRPC dispatcher) and read by the backend logging layer
389/// to populate the corresponding fields on the [`LogEntry`].
390///
391/// Threaded as a tokio task-local so that intermediate layers don't
392/// have to pass `Originator` / interface / request id through every
393/// function signature. The HTTP dispatcher wraps each request future
394/// in `WRITE_CTX::scope`; any backend write that happens inside the
395/// scoped future reads the context and stamps the resulting log entry
396/// accordingly.
397///
398/// When no scope is active (background tasks, startup bootstrap,
399/// internal sweeps), [`current_write_context`] returns
400/// [`WriteContext::internal`] with `component = "yeti-host"` so the
401/// log still records a meaningful originator.
402#[derive(Debug, Clone)]
403pub struct WriteContext {
404 /// Who initiated the write.
405 pub originator: Originator,
406 /// Which protocol the write came in through — `"rest"`, `"graphql"`,
407 /// `"mqtt"`, `"mcp"`, `"internal"`. `None` when not derivable.
408 pub interface: Option<String>,
409 /// Optional request correlation id (e.g., the HTTP `x-request-id`
410 /// header value or a generated UUID).
411 pub request_id: Option<String>,
412}
413
414impl WriteContext {
415 /// The default fallback used when no scope is active. Marks the
416 /// write as originating from the yeti-host platform itself.
417 #[must_use]
418 pub fn internal() -> Self {
419 Self {
420 originator: Originator::Internal {
421 component: "yeti-host".to_owned(),
422 },
423 interface: Some("internal".to_owned()),
424 request_id: None,
425 }
426 }
427
428 /// Construct a [`WriteContext`] for a request that came in through
429 /// the named interface, identifying the authenticated user when
430 /// known.
431 #[must_use]
432 pub fn for_request(
433 interface: impl Into<String>,
434 user: Option<String>,
435 request_id: Option<String>,
436 ) -> Self {
437 let originator = user.map_or_else(Originator::anonymous, |u| Originator::Local {
438 user: Some(u),
439 });
440 Self {
441 originator,
442 interface: Some(interface.into()),
443 request_id,
444 }
445 }
446
447 /// Construct a [`WriteContext`] for a write applied locally in
448 /// response to a peer's replication stream.
449 #[must_use]
450 pub fn for_replication(from_node: impl Into<String>) -> Self {
451 Self {
452 originator: Originator::Replicated {
453 from_node: from_node.into(),
454 },
455 interface: Some("replication".to_owned()),
456 request_id: None,
457 }
458 }
459}
460
461tokio::task_local! {
462 /// Per-task [`WriteContext`]. Set via [`WRITE_CTX::scope`] at the
463 /// request entry point.
464 pub static WRITE_CTX: WriteContext;
465}
466
467/// Read the active [`WriteContext`], or return [`WriteContext::internal`]
468/// if no scope is currently active.
469///
470/// Backend logging layers call this to populate the originator /
471/// interface / `request_id` fields on each [`LogEntry`]; it never
472/// fails — a missing scope means "platform-originated" rather than
473/// an error.
474#[must_use]
475pub fn current_write_context() -> WriteContext {
476 WRITE_CTX
477 .try_with(Clone::clone)
478 .unwrap_or_else(|_| WriteContext::internal())
479}
480
481/// Build a [`WriteContext`] that overlays the authenticated user
482/// onto whatever interface / `request_id` an outer scope (typically
483/// the HTTP/MQTT/etc. dispatcher) has already set.
484///
485/// If no outer scope is active, falls back to
486/// [`WriteContext::internal`] for interface / `request_id` so the
487/// returned context is still well-formed — but in production every
488/// real write path should have an outer dispatcher scope. The
489/// fallback exists for tests and background tasks that call into the
490/// yeti-table handlers without going through HTTP.
491#[must_use]
492pub fn current_write_context_with_user(user: Option<String>) -> WriteContext {
493 let outer = current_write_context();
494 let originator = user.map_or_else(Originator::anonymous, |u| Originator::Local {
495 user: Some(u),
496 });
497 WriteContext {
498 originator,
499 interface: outer.interface,
500 request_id: outer.request_id,
501 }
502}
503
504// ============================================================================
505// Tests
506// ============================================================================
507
508#[cfg(test)]
509#[allow(clippy::unwrap_used, clippy::expect_used)]
510mod tests {
511 use super::*;
512
513 fn sample_entry() -> LogEntry {
514 LogEntry {
515 hlc: HlcTimestamp::new(1_700_000_000_000, 5),
516 node_id: "node-a".to_owned(),
517 table: TableId::new("demo-app", "data", "users"),
518 op: WriteOp::Put {
519 key: b"alice".to_vec(),
520 value: b"{\"role\":\"admin\"}".to_vec(),
521 },
522 prev_hlc: Some(HlcTimestamp::new(1_699_000_000_000, 0)),
523 prev_value: None,
524 originator: Originator::local("admin"),
525 request_id: Some("req-7f3".to_owned()),
526 interface: Some("rest".to_owned()),
527 }
528 }
529
530 #[test]
531 fn originator_serde_round_trip() {
532 let cases = [
533 Originator::anonymous(),
534 Originator::local("alice"),
535 Originator::Replicated {
536 from_node: "node-b".to_owned(),
537 },
538 Originator::Internal {
539 component: "auth-bootstrap".to_owned(),
540 },
541 ];
542 for o in cases {
543 let bytes = rmp_serde::to_vec(&o).unwrap();
544 let back: Originator = rmp_serde::from_slice(&bytes).unwrap();
545 assert_eq!(o, back);
546 }
547 }
548
549 #[test]
550 fn originator_is_replicated_only_for_replicated_variant() {
551 assert!(!Originator::anonymous().is_replicated());
552 assert!(!Originator::local("alice").is_replicated());
553 assert!(
554 Originator::Replicated {
555 from_node: "node-b".to_owned()
556 }
557 .is_replicated()
558 );
559 assert!(
560 !Originator::Internal {
561 component: "x".to_owned()
562 }
563 .is_replicated()
564 );
565 }
566
567 #[test]
568 fn log_entry_encode_decode_round_trip() {
569 let entry = sample_entry();
570 let bytes = entry.encode().unwrap();
571 let back = LogEntry::decode(&bytes).unwrap();
572 assert_eq!(entry, back);
573 }
574
575 #[test]
576 fn log_entry_decode_rejects_garbage() {
577 let result = LogEntry::decode(b"not-a-real-msgpack-payload");
578 assert!(result.is_err());
579 let msg = result.unwrap_err().to_string();
580 assert!(msg.contains("LogEntry decode failed"), "got: {msg}");
581 }
582
583 #[test]
584 fn log_entry_for_delete_op() {
585 let entry = LogEntry {
586 op: WriteOp::Delete {
587 key: b"alice".to_vec(),
588 },
589 ..sample_entry()
590 };
591 let bytes = entry.encode().unwrap();
592 let back = LogEntry::decode(&bytes).unwrap();
593 assert_eq!(entry, back);
594 assert!(back.op.is_delete());
595 }
596
597 #[test]
598 fn retention_policy_constructors() {
599 let t = RetentionPolicy::time_only(3600);
600 assert_eq!(t.max_age_secs, Some(3600));
601 assert_eq!(t.max_size_bytes, None);
602
603 let s = RetentionPolicy::size_only(1_000_000);
604 assert_eq!(s.max_age_secs, None);
605 assert_eq!(s.max_size_bytes, Some(1_000_000));
606
607 let d = RetentionPolicy::dual(3600, 1_000_000);
608 assert_eq!(d.max_age_secs, Some(3600));
609 assert_eq!(d.max_size_bytes, Some(1_000_000));
610 }
611
612 #[test]
613 fn retention_policy_default_is_disabled() {
614 assert!(RetentionPolicy::default().is_disabled());
615 assert!(!RetentionPolicy::time_only(60).is_disabled());
616 assert!(!RetentionPolicy::size_only(1).is_disabled());
617 }
618
619 #[test]
620 fn write_op_key_accessor_works_for_both_variants() {
621 let put = WriteOp::Put {
622 key: b"k".to_vec(),
623 value: b"v".to_vec(),
624 };
625 let del = WriteOp::Delete { key: b"k".to_vec() };
626 assert_eq!(put.key(), b"k");
627 assert_eq!(del.key(), b"k");
628 assert!(!put.is_delete());
629 assert!(del.is_delete());
630 }
631
632 #[tokio::test]
633 async fn current_write_context_returns_internal_when_no_scope() {
634 let ctx = current_write_context();
635 match ctx.originator {
636 Originator::Internal { component } => assert_eq!(component, "yeti-host"),
637 _ => panic!("expected Internal originator outside scope"),
638 }
639 assert_eq!(ctx.interface.as_deref(), Some("internal"));
640 }
641
642 #[tokio::test]
643 async fn current_write_context_reads_active_scope() {
644 let scoped =
645 WriteContext::for_request("rest", Some("alice".to_owned()), Some("req-abc".to_owned()));
646 let observed = WRITE_CTX
647 .scope(scoped.clone(), async { current_write_context() })
648 .await;
649 assert_eq!(observed.interface.as_deref(), Some("rest"));
650 assert_eq!(observed.request_id.as_deref(), Some("req-abc"));
651 match observed.originator {
652 Originator::Local { user } => assert_eq!(user.as_deref(), Some("alice")),
653 _ => panic!("expected Local originator"),
654 }
655 }
656
657 #[tokio::test]
658 async fn write_context_for_request_anonymous_when_user_is_none() {
659 let ctx = WriteContext::for_request("graphql", None, None);
660 assert!(matches!(ctx.originator, Originator::Local { user: None }));
661 assert_eq!(ctx.interface.as_deref(), Some("graphql"));
662 }
663
664 #[tokio::test]
665 async fn write_context_for_replication_marks_originator() {
666 let ctx = WriteContext::for_replication("node-east");
667 assert!(ctx.originator.is_replicated());
668 assert_eq!(ctx.interface.as_deref(), Some("replication"));
669 }
670
671 #[tokio::test]
672 async fn current_write_context_with_user_inherits_outer_interface_and_request_id() {
673 let outer = WriteContext {
674 originator: Originator::anonymous(),
675 interface: Some("rest".to_owned()),
676 request_id: Some("req-42".to_owned()),
677 };
678 WRITE_CTX
679 .scope(outer, async {
680 let inner = current_write_context_with_user(Some("alice".to_owned()));
681 assert_eq!(inner.interface.as_deref(), Some("rest"));
682 assert_eq!(inner.request_id.as_deref(), Some("req-42"));
683 assert!(matches!(
684 inner.originator,
685 Originator::Local { ref user } if user.as_deref() == Some("alice")
686 ));
687 })
688 .await;
689 }
690
691 #[tokio::test]
692 async fn current_write_context_with_user_falls_back_to_internal_outside_scope() {
693 let ctx = current_write_context_with_user(Some("alice".to_owned()));
694 // Outer scope absent, so interface inherits the internal default.
695 assert_eq!(ctx.interface.as_deref(), Some("internal"));
696 assert!(matches!(
697 ctx.originator,
698 Originator::Local { ref user } if user.as_deref() == Some("alice")
699 ));
700 }
701
702 #[tokio::test]
703 async fn nested_write_ctx_scope_inner_overrides_outer() {
704 let outer = WriteContext {
705 originator: Originator::anonymous(),
706 interface: Some("rest".to_owned()),
707 request_id: Some("req-1".to_owned()),
708 };
709 WRITE_CTX
710 .scope(outer, async {
711 let inner = current_write_context_with_user(Some("bob".to_owned()));
712 WRITE_CTX
713 .scope(inner, async {
714 let observed = current_write_context();
715 // Inner originator wins.
716 assert!(matches!(
717 observed.originator,
718 Originator::Local { ref user } if user.as_deref() == Some("bob")
719 ));
720 // Outer interface + request_id still visible.
721 assert_eq!(observed.interface.as_deref(), Some("rest"));
722 assert_eq!(observed.request_id.as_deref(), Some("req-1"));
723 })
724 .await;
725 })
726 .await;
727 }
728
729 #[tokio::test]
730 async fn write_ctx_scope_does_not_leak_across_tasks() {
731 let outer = WriteContext::for_request("rest", Some("alice".to_owned()), None);
732 WRITE_CTX
733 .scope(outer, async {
734 // Inside the scope: see alice.
735 let inner_ctx = current_write_context();
736 assert!(matches!(
737 inner_ctx.originator,
738 Originator::Local { user: Some(ref u) } if u == "alice"
739 ));
740
741 // A spawned task does NOT inherit the task-local; it
742 // should fall back to Internal.
743 let handle = tokio::spawn(async { current_write_context() });
744 let spawned_ctx = handle.await.unwrap();
745 assert!(matches!(
746 spawned_ctx.originator,
747 Originator::Internal { .. }
748 ));
749 })
750 .await;
751 }
752}