Skip to main content

obs_core/registry/
callsite_registry.rs

1//! `ObsCallsiteRegistry` — process-local map from `callsite_id` to the
2//! human metadata (target, file, line, level, field names, template).
3//!
4//! Spec 31. The registry is owned by `StandardObserver`; the bridge
5//! (Direction A) inserts on first sight and emits one
6//! `obs.runtime.v1.ObsCallsiteRegistered` envelope, and the bridge
7//! (Direction B / `ObsToTracingSink`) reads it to reconstitute
8//! `tracing::Metadata` for envelopes whose `env.callsite_id != 0`.
9
10use std::{
11    num::NonZeroU32,
12    sync::{
13        Arc,
14        atomic::{AtomicU64, Ordering},
15    },
16};
17
18use dashmap::DashMap;
19use obs_proto::obs::v1::Severity;
20
21/// Callsite source — drives both human display and the BLAKE3 input.
22/// Spec 31 § 3.4 enum.
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[repr(u8)]
25#[non_exhaustive]
26pub enum CallsiteSource {
27    /// Tracing event (Direction A bridge).
28    TracingEvent = 1,
29    /// Tracing span (Direction A bridge, `ObsSpanCompleted`).
30    TracingSpan = 2,
31    /// `obs::forensic!` macro.
32    Forensic = 3,
33    /// `#[obs::instrument]`-emitted `ObsFnEntered`/`ObsFnExited`.
34    Instrument = 4,
35}
36
37impl CallsiteSource {
38    /// Human-readable identifier; matches the CLI render in spec 31 § 5.4.
39    #[must_use]
40    pub const fn as_str(&self) -> &'static str {
41        match self {
42            Self::TracingEvent => "TRACING_EVENT",
43            Self::TracingSpan => "TRACING_SPAN",
44            Self::Forensic => "FORENSIC",
45            Self::Instrument => "INSTRUMENT",
46        }
47    }
48}
49
50/// Stable record kept per `callsite_id`. Spec 31 § 3.2.
51///
52/// Records live behind `Arc<CallsiteRecord>` in the registry; we never
53/// `Clone` one whole, so omitting `Clone` lets the per-record
54/// `AtomicU64` event counter live in-line without indirection.
55#[derive(Debug)]
56pub struct CallsiteRecord {
57    /// 64-bit BLAKE3-derived stable id.
58    pub id: u64,
59    /// Source vocabulary.
60    pub source: CallsiteSource,
61    /// Tracing/forensic target (`sqlx::query`, `myapp::auth`, …).
62    pub target: String,
63    /// Display name (`event src/foo.rs:42`, span name, function name).
64    pub name: String,
65    /// Module path or empty.
66    pub module_path: String,
67    /// Source file path or empty.
68    pub file: String,
69    /// Source line; `None` when unavailable.
70    pub line: Option<NonZeroU32>,
71    /// Severity / level.
72    pub sev: Severity,
73    /// Field names in stable order.
74    pub field_names: Vec<String>,
75    /// Optional rendered template; empty for non-templated paths.
76    pub template: String,
77    /// Wall-clock ns at registration time (used for re-emit cadence).
78    pub registered_ns: u64,
79    /// Approximate count of envelopes that referenced this callsite
80    /// since the last refresh. Reset by re-emit cadence.
81    pub events_since_refresh: AtomicU64,
82}
83
84impl CallsiteRecord {
85    /// Reset the per-cadence event counter.
86    pub fn reset_count(&self) {
87        self.events_since_refresh.store(0, Ordering::Relaxed);
88    }
89
90    /// Increment the event count and return the new value.
91    pub fn observe(&self) -> u64 {
92        self.events_since_refresh.fetch_add(1, Ordering::Relaxed) + 1
93    }
94}
95
96/// Process-local callsite registry. Spec 31 § 3.2.
97///
98/// Concurrent access is allowed: the bridge writes once per first-sight
99/// callsite and reads once per envelope. `DashMap` matches CLAUDE.md
100/// guidance on concurrent maps.
101#[derive(Default)]
102pub struct ObsCallsiteRegistry {
103    by_id: DashMap<u64, Arc<CallsiteRecord>>,
104}
105
106impl std::fmt::Debug for ObsCallsiteRegistry {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("ObsCallsiteRegistry")
109            .field("len", &self.by_id.len())
110            .finish_non_exhaustive()
111    }
112}
113
114impl ObsCallsiteRegistry {
115    /// Empty registry.
116    #[must_use]
117    pub fn new() -> Self {
118        Self::default()
119    }
120
121    /// Number of registered callsites.
122    #[must_use]
123    pub fn len(&self) -> usize {
124        self.by_id.len()
125    }
126
127    /// True if no callsites are registered.
128    #[must_use]
129    pub fn is_empty(&self) -> bool {
130        self.by_id.is_empty()
131    }
132
133    /// Insert `record` if absent. Returns `(record, was_new)` where
134    /// `was_new == true` only when this call inserted the record.
135    /// Spec 31 § 3.3.
136    pub fn insert_if_absent(&self, record: CallsiteRecord) -> (Arc<CallsiteRecord>, bool) {
137        let id = record.id;
138        if let Some(existing) = self.by_id.get(&id) {
139            return (Arc::clone(existing.value()), false);
140        }
141        let arc = Arc::new(record);
142        match self.by_id.entry(id) {
143            dashmap::Entry::Occupied(slot) => (Arc::clone(slot.get()), false),
144            dashmap::Entry::Vacant(slot) => {
145                slot.insert(Arc::clone(&arc));
146                (arc, true)
147            }
148        }
149    }
150
151    /// Look up a record by id.
152    #[must_use]
153    pub fn get(&self, id: u64) -> Option<Arc<CallsiteRecord>> {
154        self.by_id.get(&id).map(|r| Arc::clone(r.value()))
155    }
156
157    /// Iterate records (snapshot — drops `Ref`s before returning).
158    #[must_use]
159    pub fn snapshot(&self) -> Vec<Arc<CallsiteRecord>> {
160        self.by_id.iter().map(|r| Arc::clone(r.value())).collect()
161    }
162}
163
164/// Compute a stable 64-bit callsite id from the canonical inputs.
165/// Spec 31 § 3.1 — the perturb-to-non-zero path is preserved.
166#[must_use]
167pub fn callsite_id(
168    source: CallsiteSource,
169    target: &str,
170    file: &str,
171    line: Option<u32>,
172    level: Severity,
173    field_names: &[&str],
174    template: &str,
175) -> u64 {
176    let mut h = blake3::Hasher::new();
177    h.update(&[source as u8]);
178    h.update(target.as_bytes());
179    h.update(file.as_bytes());
180    h.update(&line.unwrap_or(0).to_le_bytes());
181    h.update(&[severity_byte(level)]);
182    for name in field_names {
183        h.update(name.as_bytes());
184        h.update(b"\x00");
185    }
186    h.update(template.as_bytes());
187    let bytes = h.finalize();
188    let raw = bytes.as_bytes();
189    let head: [u8; 8] = raw.first_chunk::<8>().copied().unwrap_or([0; 8]);
190    let id = u64::from_le_bytes(head);
191    if id != 0 { id } else { perturb_to_nonzero(raw) }
192}
193
194const fn severity_byte(s: Severity) -> u8 {
195    match s {
196        Severity::Trace => 1,
197        Severity::Debug => 2,
198        Severity::Info => 3,
199        Severity::Warn => 4,
200        Severity::Error => 5,
201        Severity::Fatal => 6,
202        _ => 0,
203    }
204}
205
206/// Force a non-zero 64-bit id from a 32-byte BLAKE3 output. Reserved
207/// for the `head[0..8] == 0` corner case (probability 2⁻⁶⁴). Spec
208/// 31 § 3.1.
209#[must_use]
210pub fn perturb_to_nonzero(blake_bytes: &[u8]) -> u64 {
211    let head2: [u8; 8] = blake_bytes
212        .get(8..16)
213        .and_then(|s| <[u8; 8]>::try_from(s).ok())
214        .unwrap_or([0; 8]);
215    u64::from_le_bytes(head2) | 1
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn test_callsite_id_should_be_deterministic() {
224        let a = callsite_id(
225            CallsiteSource::TracingEvent,
226            "sqlx::query",
227            "src/q.rs",
228            Some(42),
229            Severity::Info,
230            &["rows", "elapsed"],
231            "executed query",
232        );
233        let b = callsite_id(
234            CallsiteSource::TracingEvent,
235            "sqlx::query",
236            "src/q.rs",
237            Some(42),
238            Severity::Info,
239            &["rows", "elapsed"],
240            "executed query",
241        );
242        assert_eq!(a, b);
243    }
244
245    #[test]
246    fn test_callsite_id_should_never_be_zero_for_real_input() {
247        let id = callsite_id(
248            CallsiteSource::Forensic,
249            "site",
250            "",
251            None,
252            Severity::Info,
253            &[],
254            "",
255        );
256        assert_ne!(id, 0);
257    }
258
259    #[test]
260    fn test_registry_should_dedup_inserts() {
261        let reg = ObsCallsiteRegistry::new();
262        let rec = CallsiteRecord {
263            id: 1,
264            source: CallsiteSource::Forensic,
265            target: "t".into(),
266            name: "n".into(),
267            module_path: String::new(),
268            file: String::new(),
269            line: None,
270            sev: Severity::Info,
271            field_names: Vec::new(),
272            template: String::new(),
273            registered_ns: 0,
274            events_since_refresh: AtomicU64::new(0),
275        };
276        let (_a, new1) = reg.insert_if_absent(rec);
277        assert!(new1);
278        let rec2 = CallsiteRecord {
279            id: 1,
280            source: CallsiteSource::Forensic,
281            target: "t".into(),
282            name: "n".into(),
283            module_path: String::new(),
284            file: String::new(),
285            line: None,
286            sev: Severity::Info,
287            field_names: Vec::new(),
288            template: String::new(),
289            registered_ns: 0,
290            events_since_refresh: AtomicU64::new(0),
291        };
292        let (_b, new2) = reg.insert_if_absent(rec2);
293        assert!(!new2);
294        assert_eq!(reg.len(), 1);
295    }
296}