obs_core/observer/in_memory.rs
1//! `InMemoryObserver` — captures every envelope into a bounded ring
2//! buffer; for tests and live debug capture (spec 11 § 3.1).
3
4use std::sync::Arc;
5
6use obs_proto::obs::v1::ObsEnvelope;
7
8use super::Observer;
9pub use crate::sink::InMemoryHandle;
10use crate::{
11 registry::{SchemaRegistry, ScrubbedEnvelope},
12 sink::{InMemorySink, Sink},
13};
14
15/// Test-grade observer: every envelope is delivered to an
16/// [`InMemorySink`]. Spec 61 § 2.4 example.
17#[derive(Debug, Clone)]
18pub struct InMemoryObserver {
19 sink: InMemorySink,
20 registry: Arc<SchemaRegistry>,
21}
22
23impl InMemoryObserver {
24 /// Construct with an empty schema registry. The registry is only
25 /// consulted by `ScrubbedEnvelope::pass_through` to populate
26 /// `schema()` — for in-memory tests we don't need decoding so an
27 /// empty registry is fine.
28 #[must_use]
29 pub fn new() -> Self {
30 Self {
31 sink: InMemorySink::new(),
32 registry: Arc::new(SchemaRegistry::empty()),
33 }
34 }
35
36 /// Construct with an existing sink. Used when several observers
37 /// should aggregate into the same buffer.
38 #[must_use]
39 pub fn with_sink(sink: InMemorySink) -> Self {
40 Self {
41 sink,
42 registry: Arc::new(SchemaRegistry::empty()),
43 }
44 }
45
46 /// Stable handle to the buffer (`drain` / `count` / `wait_for`).
47 #[must_use]
48 pub fn handle(&self) -> InMemoryHandle {
49 self.sink.handle()
50 }
51}
52
53impl Default for InMemoryObserver {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl Observer for InMemoryObserver {
60 fn emit_envelope(&self, mut env: ObsEnvelope) {
61 // Skip the worker pool: in-memory observers are synchronous
62 // for testing determinism. The pass-through wrapper feeds the
63 // sink without running the scrubber (spec 14 § 5: scrubber
64 // belongs in the worker; here we are the worker).
65 //
66 // Spec 13 § 2.1 / spec 94 § 2.1: still apply scope auto-fill so
67 // tests that observe trace correlation through the bridge or
68 // tower scope frames see the same `(trace_id, span_id,
69 // parent_span_id)` flow as production.
70 crate::scope::auto_fill_envelope(&mut env);
71 let envref: &ObsEnvelope = &env;
72 let scrubbed = ScrubbedEnvelope::pass_through(envref, &self.registry);
73 self.sink.deliver(scrubbed);
74 }
75
76 fn enabled(&self, _callsite: &crate::ObsCallsite) -> bool {
77 true
78 }
79}