Skip to main content

obs_core/observer/
in_memory.rs

1//! `InMemoryObserver` — captures every envelope into a bounded ring
2//! buffer; for tests and live debug capture (spec 11 § 3.1).
3
4use std::sync::Arc;
5
6use obs_proto::obs::v1::ObsEnvelope;
7
8use super::Observer;
9pub use crate::sink::InMemoryHandle;
10use crate::{
11    registry::{SchemaRegistry, ScrubbedEnvelope},
12    sink::{InMemorySink, Sink},
13};
14
15/// Test-grade observer: every envelope is delivered to an
16/// [`InMemorySink`]. Spec 61 § 2.4 example.
17#[derive(Debug, Clone)]
18pub struct InMemoryObserver {
19    sink: InMemorySink,
20    registry: Arc<SchemaRegistry>,
21}
22
23impl InMemoryObserver {
24    /// Construct with an empty schema registry. The registry is only
25    /// consulted by `ScrubbedEnvelope::pass_through` to populate
26    /// `schema()` — for in-memory tests we don't need decoding so an
27    /// empty registry is fine.
28    #[must_use]
29    pub fn new() -> Self {
30        Self {
31            sink: InMemorySink::new(),
32            registry: Arc::new(SchemaRegistry::empty()),
33        }
34    }
35
36    /// Construct with an existing sink. Used when several observers
37    /// should aggregate into the same buffer.
38    #[must_use]
39    pub fn with_sink(sink: InMemorySink) -> Self {
40        Self {
41            sink,
42            registry: Arc::new(SchemaRegistry::empty()),
43        }
44    }
45
46    /// Stable handle to the buffer (`drain` / `count` / `wait_for`).
47    #[must_use]
48    pub fn handle(&self) -> InMemoryHandle {
49        self.sink.handle()
50    }
51}
52
53impl Default for InMemoryObserver {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl Observer for InMemoryObserver {
60    fn emit_envelope(&self, mut env: ObsEnvelope) {
61        // Skip the worker pool: in-memory observers are synchronous
62        // for testing determinism. The pass-through wrapper feeds the
63        // sink without running the scrubber (spec 14 § 5: scrubber
64        // belongs in the worker; here we are the worker).
65        //
66        // Spec 13 § 2.1 / spec 94 § 2.1: still apply scope auto-fill so
67        // tests that observe trace correlation through the bridge or
68        // tower scope frames see the same `(trace_id, span_id,
69        // parent_span_id)` flow as production.
70        crate::scope::auto_fill_envelope(&mut env);
71        let envref: &ObsEnvelope = &env;
72        let scrubbed = ScrubbedEnvelope::pass_through(envref, &self.registry);
73        self.sink.deliver(scrubbed);
74    }
75
76    fn enabled(&self, _callsite: &crate::ObsCallsite) -> bool {
77        true
78    }
79}