1use std::{
11 num::NonZeroU32,
12 sync::{
13 Arc,
14 atomic::{AtomicU64, Ordering},
15 },
16};
17
18use dashmap::DashMap;
19use obs_proto::obs::v1::Severity;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24#[repr(u8)]
25#[non_exhaustive]
26pub enum CallsiteSource {
27 TracingEvent = 1,
29 TracingSpan = 2,
31 Forensic = 3,
33 Instrument = 4,
35}
36
37impl CallsiteSource {
38 #[must_use]
40 pub const fn as_str(&self) -> &'static str {
41 match self {
42 Self::TracingEvent => "TRACING_EVENT",
43 Self::TracingSpan => "TRACING_SPAN",
44 Self::Forensic => "FORENSIC",
45 Self::Instrument => "INSTRUMENT",
46 }
47 }
48}
49
50#[derive(Debug)]
56pub struct CallsiteRecord {
57 pub id: u64,
59 pub source: CallsiteSource,
61 pub target: String,
63 pub name: String,
65 pub module_path: String,
67 pub file: String,
69 pub line: Option<NonZeroU32>,
71 pub sev: Severity,
73 pub field_names: Vec<String>,
75 pub template: String,
77 pub registered_ns: u64,
79 pub events_since_refresh: AtomicU64,
82}
83
84impl CallsiteRecord {
85 pub fn reset_count(&self) {
87 self.events_since_refresh.store(0, Ordering::Relaxed);
88 }
89
90 pub fn observe(&self) -> u64 {
92 self.events_since_refresh.fetch_add(1, Ordering::Relaxed) + 1
93 }
94}
95
96#[derive(Default)]
102pub struct ObsCallsiteRegistry {
103 by_id: DashMap<u64, Arc<CallsiteRecord>>,
104}
105
106impl std::fmt::Debug for ObsCallsiteRegistry {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("ObsCallsiteRegistry")
109 .field("len", &self.by_id.len())
110 .finish_non_exhaustive()
111 }
112}
113
114impl ObsCallsiteRegistry {
115 #[must_use]
117 pub fn new() -> Self {
118 Self::default()
119 }
120
121 #[must_use]
123 pub fn len(&self) -> usize {
124 self.by_id.len()
125 }
126
127 #[must_use]
129 pub fn is_empty(&self) -> bool {
130 self.by_id.is_empty()
131 }
132
133 pub fn insert_if_absent(&self, record: CallsiteRecord) -> (Arc<CallsiteRecord>, bool) {
137 let id = record.id;
138 if let Some(existing) = self.by_id.get(&id) {
139 return (Arc::clone(existing.value()), false);
140 }
141 let arc = Arc::new(record);
142 match self.by_id.entry(id) {
143 dashmap::Entry::Occupied(slot) => (Arc::clone(slot.get()), false),
144 dashmap::Entry::Vacant(slot) => {
145 slot.insert(Arc::clone(&arc));
146 (arc, true)
147 }
148 }
149 }
150
151 #[must_use]
153 pub fn get(&self, id: u64) -> Option<Arc<CallsiteRecord>> {
154 self.by_id.get(&id).map(|r| Arc::clone(r.value()))
155 }
156
157 #[must_use]
159 pub fn snapshot(&self) -> Vec<Arc<CallsiteRecord>> {
160 self.by_id.iter().map(|r| Arc::clone(r.value())).collect()
161 }
162}
163
164#[must_use]
167pub fn callsite_id(
168 source: CallsiteSource,
169 target: &str,
170 file: &str,
171 line: Option<u32>,
172 level: Severity,
173 field_names: &[&str],
174 template: &str,
175) -> u64 {
176 let mut h = blake3::Hasher::new();
177 h.update(&[source as u8]);
178 h.update(target.as_bytes());
179 h.update(file.as_bytes());
180 h.update(&line.unwrap_or(0).to_le_bytes());
181 h.update(&[severity_byte(level)]);
182 for name in field_names {
183 h.update(name.as_bytes());
184 h.update(b"\x00");
185 }
186 h.update(template.as_bytes());
187 let bytes = h.finalize();
188 let raw = bytes.as_bytes();
189 let head: [u8; 8] = raw.first_chunk::<8>().copied().unwrap_or([0; 8]);
190 let id = u64::from_le_bytes(head);
191 if id != 0 { id } else { perturb_to_nonzero(raw) }
192}
193
194const fn severity_byte(s: Severity) -> u8 {
195 match s {
196 Severity::Trace => 1,
197 Severity::Debug => 2,
198 Severity::Info => 3,
199 Severity::Warn => 4,
200 Severity::Error => 5,
201 Severity::Fatal => 6,
202 _ => 0,
203 }
204}
205
206#[must_use]
210pub fn perturb_to_nonzero(blake_bytes: &[u8]) -> u64 {
211 let head2: [u8; 8] = blake_bytes
212 .get(8..16)
213 .and_then(|s| <[u8; 8]>::try_from(s).ok())
214 .unwrap_or([0; 8]);
215 u64::from_le_bytes(head2) | 1
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn test_callsite_id_should_be_deterministic() {
224 let a = callsite_id(
225 CallsiteSource::TracingEvent,
226 "sqlx::query",
227 "src/q.rs",
228 Some(42),
229 Severity::Info,
230 &["rows", "elapsed"],
231 "executed query",
232 );
233 let b = callsite_id(
234 CallsiteSource::TracingEvent,
235 "sqlx::query",
236 "src/q.rs",
237 Some(42),
238 Severity::Info,
239 &["rows", "elapsed"],
240 "executed query",
241 );
242 assert_eq!(a, b);
243 }
244
245 #[test]
246 fn test_callsite_id_should_never_be_zero_for_real_input() {
247 let id = callsite_id(
248 CallsiteSource::Forensic,
249 "site",
250 "",
251 None,
252 Severity::Info,
253 &[],
254 "",
255 );
256 assert_ne!(id, 0);
257 }
258
259 #[test]
260 fn test_registry_should_dedup_inserts() {
261 let reg = ObsCallsiteRegistry::new();
262 let rec = CallsiteRecord {
263 id: 1,
264 source: CallsiteSource::Forensic,
265 target: "t".into(),
266 name: "n".into(),
267 module_path: String::new(),
268 file: String::new(),
269 line: None,
270 sev: Severity::Info,
271 field_names: Vec::new(),
272 template: String::new(),
273 registered_ns: 0,
274 events_since_refresh: AtomicU64::new(0),
275 };
276 let (_a, new1) = reg.insert_if_absent(rec);
277 assert!(new1);
278 let rec2 = CallsiteRecord {
279 id: 1,
280 source: CallsiteSource::Forensic,
281 target: "t".into(),
282 name: "n".into(),
283 module_path: String::new(),
284 file: String::new(),
285 line: None,
286 sev: Severity::Info,
287 field_names: Vec::new(),
288 template: String::new(),
289 registered_ns: 0,
290 events_since_refresh: AtomicU64::new(0),
291 };
292 let (_b, new2) = reg.insert_if_absent(rec2);
293 assert!(!new2);
294 assert_eq!(reg.len(), 1);
295 }
296}