Skip to main content

auths_telemetry/
emitter.rs

1use std::sync::Arc;
2use std::sync::OnceLock;
3use std::sync::atomic::AtomicU64;
4
5use crate::event::AuditEvent;
6use crate::ports::EventSink;
7
8/// Counts events silently dropped because the telemetry channel was full.
9///
10/// Consumers of `WriterSink` do not drop events; this counter is retained for
11/// compatibility with code paths that may use a custom buffering sink that
12/// increments this counter on back-pressure.
13/// Monitor this counter in alerting rules for SOC2 / FedRAMP compliance.
14pub static DROPPED_AUDIT_EVENTS: AtomicU64 = AtomicU64::new(0);
15
16/// Global sink set once at `init_telemetry_with_sink` time.
17static TELEMETRY_SINK: OnceLock<Arc<dyn EventSink>> = OnceLock::new();
18
19/// Handle returned by `init_telemetry_with_sink`.
20///
21/// Call `shutdown()` before the process exits to flush any buffered events.
22pub struct TelemetryShutdown {
23    sink: Arc<dyn EventSink>,
24}
25
26impl TelemetryShutdown {
27    /// Flush all buffered events.
28    ///
29    /// Usage:
30    /// ```ignore
31    /// let telemetry = auths_telemetry::init_telemetry_with_sink(sink);
32    /// run_server(state).await?;
33    /// telemetry.shutdown();
34    /// ```
35    pub fn shutdown(self) {
36        self.sink.flush();
37    }
38}
39
40/// Initialises the telemetry pipeline with an injectable sink.
41///
42/// Allows servers, CLIs, and tests to provide their own sink implementation.
43/// A second call is a silent no-op; the first initialisation wins.
44///
45/// Args:
46/// * `sink` - The sink implementation to install globally.
47///
48/// Usage:
49/// ```ignore
50/// use auths_telemetry::{init_telemetry_with_sink, sinks::stdout::new_stdout_sink};
51/// use std::sync::Arc;
52/// let _handle = init_telemetry_with_sink(Arc::new(new_stdout_sink()));
53/// ```
54pub fn init_telemetry_with_sink(sink: Arc<dyn EventSink>) -> TelemetryShutdown {
55    let _ = TELEMETRY_SINK.set(Arc::clone(&sink));
56    TelemetryShutdown { sink }
57}
58
59/// Emits a structured telemetry event to the active sink.
60///
61/// Serialises `event` to JSON and forwards to the active sink. Returns
62/// immediately. If `init_telemetry_with_sink` was never called, this is a no-op.
63///
64/// Args:
65/// * `event` - The structured audit event to emit.
66///
67/// Usage:
68/// ```rust
69/// use auths_telemetry::{build_audit_event, emit_telemetry};
70/// let event = build_audit_event("did:keri:abc...", "session_verification", "Success", 0);
71/// emit_telemetry(&event);
72/// ```
73pub fn emit_telemetry(event: &AuditEvent<'_>) {
74    let Some(sink) = TELEMETRY_SINK.get() else {
75        return;
76    };
77    let payload = serde_json::to_string(event).unwrap_or_default();
78    sink.emit(&payload);
79}