auths_telemetry/emitter.rs
1use std::sync::Arc;
2use std::sync::OnceLock;
3use std::sync::atomic::AtomicU64;
4
5use crate::event::AuditEvent;
6use crate::ports::EventSink;
7
8/// Counts events silently dropped because the telemetry channel was full.
9///
10/// Consumers of `WriterSink` do not drop events; this counter is retained for
11/// compatibility with code paths that may use a custom buffering sink that
12/// increments this counter on back-pressure.
13/// Monitor this counter in alerting rules for SOC2 / FedRAMP compliance.
14pub static DROPPED_AUDIT_EVENTS: AtomicU64 = AtomicU64::new(0);
15
16/// Global sink set once at `init_telemetry_with_sink` time.
17static TELEMETRY_SINK: OnceLock<Arc<dyn EventSink>> = OnceLock::new();
18
19/// Handle returned by `init_telemetry_with_sink`.
20///
21/// Call `shutdown()` before the process exits to flush any buffered events.
22pub struct TelemetryShutdown {
23 sink: Arc<dyn EventSink>,
24}
25
26impl TelemetryShutdown {
27 /// Flush all buffered events.
28 ///
29 /// Usage:
30 /// ```ignore
31 /// let telemetry = auths_telemetry::init_telemetry_with_sink(sink);
32 /// run_server(state).await?;
33 /// telemetry.shutdown();
34 /// ```
35 pub fn shutdown(self) {
36 self.sink.flush();
37 }
38}
39
40/// Initialises the telemetry pipeline with an injectable sink.
41///
42/// Allows servers, CLIs, and tests to provide their own sink implementation.
43/// A second call is a silent no-op; the first initialisation wins.
44///
45/// Args:
46/// * `sink` - The sink implementation to install globally.
47///
48/// Usage:
49/// ```ignore
50/// use auths_telemetry::{init_telemetry_with_sink, sinks::stdout::new_stdout_sink};
51/// use std::sync::Arc;
52/// let _handle = init_telemetry_with_sink(Arc::new(new_stdout_sink()));
53/// ```
54pub fn init_telemetry_with_sink(sink: Arc<dyn EventSink>) -> TelemetryShutdown {
55 let _ = TELEMETRY_SINK.set(Arc::clone(&sink));
56 TelemetryShutdown { sink }
57}
58
59/// Emits a structured telemetry event to the active sink.
60///
61/// Serialises `event` to JSON and forwards to the active sink. Returns
62/// immediately. If `init_telemetry_with_sink` was never called, this is a no-op.
63///
64/// Args:
65/// * `event` - The structured audit event to emit.
66///
67/// Usage:
68/// ```rust
69/// use auths_telemetry::{build_audit_event, emit_telemetry};
70/// let event = build_audit_event("did:keri:abc...", "session_verification", "Success", 0);
71/// emit_telemetry(&event);
72/// ```
73pub fn emit_telemetry(event: &AuditEvent<'_>) {
74 let Some(sink) = TELEMETRY_SINK.get() else {
75 return;
76 };
77 let payload = serde_json::to_string(event).unwrap_or_default();
78 sink.emit(&payload);
79}