Skip to main content

obs_core/registry/
scrubbed.rs

1//! `ScrubbedEnvelope<'a>` — the worker→sink handoff.
2//!
3//! The per-tier worker runs the payload scrubber and then hands a
4//! `ScrubbedEnvelope` to each `Sink::deliver`. The `'a` lifetime ties
5//! the scrubbed payload to the worker's scratch buffer, so a sink
6//! cannot escape a reference past the per-event call boundary.
7//!
8//! Spec 14 § 5.
9
10use bytes::BytesMut;
11use obs_proto::obs::v1::ObsEnvelope;
12
13use super::{
14    SchemaRegistry,
15    erased::{EventSchemaErased, ScrubError},
16};
17
18/// Read-only view of an envelope whose payload has already been run
19/// through the per-tier scrubber. Constructed by the worker; consumed
20/// by sinks.
21///
22/// `Clone` + `Copy` are derived because every field is a borrow
23/// (`&ObsEnvelope`, `&[u8]`, or a `'static` trait-object pointer),
24/// so copying the value is a pointer-wide memcpy. Fan-out sinks rely
25/// on this to multiplex one worker envelope across multiple child
26/// sinks without re-running the scrubber. See `obs/tok` design § 4.5.
27#[derive(Clone, Copy)]
28pub struct ScrubbedEnvelope<'a> {
29    inner: &'a ObsEnvelope,
30    payload: &'a [u8],
31    schema: Option<&'static dyn EventSchemaErased>,
32}
33
34impl std::fmt::Debug for ScrubbedEnvelope<'_> {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        f.debug_struct("ScrubbedEnvelope")
37            .field("full_name", &self.inner.full_name)
38            .field("payload_len", &self.payload.len())
39            .field("schema", &self.schema.map(|s| s.full_name()))
40            .finish()
41    }
42}
43
44impl<'a> ScrubbedEnvelope<'a> {
45    /// Worker-side: run the scrubber, build the wrapper.
46    ///
47    /// **`pub(crate)`** by design — only the per-tier worker may
48    /// construct a `ScrubbedEnvelope`. Sinks receive it through
49    /// `Sink::deliver`. Spec 14 § 5.
50    ///
51    /// # Errors
52    ///
53    /// Returns `ScrubError` when the schema's scrubber fails to
54    /// re-encode the payload. The unscrubbed envelope is **never**
55    /// passed to a sink (spec 14 § 8 last row).
56    #[allow(dead_code)] // wired by Phase-3 task 3.1 worker pool
57    pub(crate) fn scrub(
58        env: &'a ObsEnvelope,
59        registry: &SchemaRegistry,
60        scratch: &'a mut BytesMut,
61    ) -> Result<Self, ScrubError> {
62        let schema = registry.lookup(env);
63        let payload = match schema {
64            Some(s) => s.scrub_for_log(&env.payload, scratch)?,
65            None => env.payload.as_slice(),
66        };
67        Ok(Self {
68            inner: env,
69            payload,
70            schema,
71        })
72    }
73
74    /// Build a wrapper that hands a sink the *raw* payload bytes
75    /// without running the scrubber. Used by paths that have already
76    /// scrubbed (the test `InMemorySink`) or for which scrubbing is
77    /// not applicable (Phase-1 stdout pretty-printer).
78    ///
79    /// `pub(crate)` since only the runtime constructs this; the per-tier
80    /// worker switches between `scrub` and `pass_through` based on the
81    /// schema's classification annotations. Spec 14 § 5.
82    #[must_use]
83    pub(crate) fn pass_through(env: &'a ObsEnvelope, registry: &SchemaRegistry) -> Self {
84        Self {
85            inner: env,
86            payload: &env.payload,
87            schema: registry.lookup(env),
88        }
89    }
90
91    /// Test-only constructor that mirrors `Self::pass_through` (the
92    /// internal worker-thread fast-path that wraps an already-scrubbed
93    /// envelope without re-running the scrubber).
94    ///
95    /// Gated behind the `test` feature so production sinks cannot
96    /// fabricate envelopes — only test code that opts into the `test`
97    /// feature gets this constructor. Spec 14 § 5 / KD3.
98    ///
99    /// # Panics
100    ///
101    /// Never panics. Returns a `ScrubbedEnvelope` whose payload borrows
102    /// directly from `env.payload`.
103    #[cfg(feature = "test")]
104    #[must_use]
105    pub fn for_test(env: &'a ObsEnvelope, registry: &SchemaRegistry) -> Self {
106        Self::pass_through(env, registry)
107    }
108
109    /// Borrow the underlying envelope (without payload mutation).
110    #[must_use]
111    pub fn envelope(&self) -> &ObsEnvelope {
112        self.inner
113    }
114
115    /// The (possibly scrubbed) payload bytes.
116    #[must_use]
117    pub fn payload(&self) -> &[u8] {
118        self.payload
119    }
120
121    /// Resolved schema, when this envelope's `full_name`/`schema_hash`
122    /// is registered. `None` for foreign-producer envelopes.
123    #[must_use]
124    pub fn schema(&self) -> Option<&'static dyn EventSchemaErased> {
125        self.schema
126    }
127}