obs_core/registry/scrubbed.rs
1//! `ScrubbedEnvelope<'a>` — the worker→sink handoff.
2//!
3//! The per-tier worker runs the payload scrubber and then hands a
4//! `ScrubbedEnvelope` to each `Sink::deliver`. The `'a` lifetime ties
5//! the scrubbed payload to the worker's scratch buffer, so a sink
6//! cannot escape a reference past the per-event call boundary.
7//!
8//! Spec 14 § 5.
9
10use bytes::BytesMut;
11use obs_proto::obs::v1::ObsEnvelope;
12
13use super::{
14 SchemaRegistry,
15 erased::{EventSchemaErased, ScrubError},
16};
17
18/// Read-only view of an envelope whose payload has already been run
19/// through the per-tier scrubber. Constructed by the worker; consumed
20/// by sinks.
21///
22/// `Clone` + `Copy` are derived because every field is a borrow
23/// (`&ObsEnvelope`, `&[u8]`, or a `'static` trait-object pointer),
24/// so copying the value is a pointer-wide memcpy. Fan-out sinks rely
25/// on this to multiplex one worker envelope across multiple child
26/// sinks without re-running the scrubber. See `obs/tok` design § 4.5.
27#[derive(Clone, Copy)]
28pub struct ScrubbedEnvelope<'a> {
29 inner: &'a ObsEnvelope,
30 payload: &'a [u8],
31 schema: Option<&'static dyn EventSchemaErased>,
32}
33
34impl std::fmt::Debug for ScrubbedEnvelope<'_> {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("ScrubbedEnvelope")
37 .field("full_name", &self.inner.full_name)
38 .field("payload_len", &self.payload.len())
39 .field("schema", &self.schema.map(|s| s.full_name()))
40 .finish()
41 }
42}
43
44impl<'a> ScrubbedEnvelope<'a> {
45 /// Worker-side: run the scrubber, build the wrapper.
46 ///
47 /// **`pub(crate)`** by design — only the per-tier worker may
48 /// construct a `ScrubbedEnvelope`. Sinks receive it through
49 /// `Sink::deliver`. Spec 14 § 5.
50 ///
51 /// # Errors
52 ///
53 /// Returns `ScrubError` when the schema's scrubber fails to
54 /// re-encode the payload. The unscrubbed envelope is **never**
55 /// passed to a sink (spec 14 § 8 last row).
56 #[allow(dead_code)] // wired by Phase-3 task 3.1 worker pool
57 pub(crate) fn scrub(
58 env: &'a ObsEnvelope,
59 registry: &SchemaRegistry,
60 scratch: &'a mut BytesMut,
61 ) -> Result<Self, ScrubError> {
62 let schema = registry.lookup(env);
63 let payload = match schema {
64 Some(s) => s.scrub_for_log(&env.payload, scratch)?,
65 None => env.payload.as_slice(),
66 };
67 Ok(Self {
68 inner: env,
69 payload,
70 schema,
71 })
72 }
73
74 /// Build a wrapper that hands a sink the *raw* payload bytes
75 /// without running the scrubber. Used by paths that have already
76 /// scrubbed (the test `InMemorySink`) or for which scrubbing is
77 /// not applicable (Phase-1 stdout pretty-printer).
78 ///
79 /// `pub(crate)` since only the runtime constructs this; the per-tier
80 /// worker switches between `scrub` and `pass_through` based on the
81 /// schema's classification annotations. Spec 14 § 5.
82 #[must_use]
83 pub(crate) fn pass_through(env: &'a ObsEnvelope, registry: &SchemaRegistry) -> Self {
84 Self {
85 inner: env,
86 payload: &env.payload,
87 schema: registry.lookup(env),
88 }
89 }
90
91 /// Test-only constructor that mirrors `Self::pass_through` (the
92 /// internal worker-thread fast-path that wraps an already-scrubbed
93 /// envelope without re-running the scrubber).
94 ///
95 /// Gated behind the `test` feature so production sinks cannot
96 /// fabricate envelopes — only test code that opts into the `test`
97 /// feature gets this constructor. Spec 14 § 5 / KD3.
98 ///
99 /// # Panics
100 ///
101 /// Never panics. Returns a `ScrubbedEnvelope` whose payload borrows
102 /// directly from `env.payload`.
103 #[cfg(feature = "test")]
104 #[must_use]
105 pub fn for_test(env: &'a ObsEnvelope, registry: &SchemaRegistry) -> Self {
106 Self::pass_through(env, registry)
107 }
108
109 /// Borrow the underlying envelope (without payload mutation).
110 #[must_use]
111 pub fn envelope(&self) -> &ObsEnvelope {
112 self.inner
113 }
114
115 /// The (possibly scrubbed) payload bytes.
116 #[must_use]
117 pub fn payload(&self) -> &[u8] {
118 self.payload
119 }
120
121 /// Resolved schema, when this envelope's `full_name`/`schema_hash`
122 /// is registered. `None` for foreign-producer envelopes.
123 #[must_use]
124 pub fn schema(&self) -> Option<&'static dyn EventSchemaErased> {
125 self.schema
126 }
127}