Skip to main content

a1/
audit.rs

1use rand::{rngs::OsRng, RngCore};
2
3use crate::intent::IntentHash;
4
5// ── AuditOutcome ──────────────────────────────────────────────────────────────
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
9#[cfg_attr(feature = "serde", serde(rename_all = "SCREAMING_SNAKE_CASE"))]
10pub enum AuditOutcome {
11    Authorized,
12    Denied,
13    PolicyViolation,
14    StorageError,
15}
16
17impl std::fmt::Display for AuditOutcome {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        match self {
20            Self::Authorized => write!(f, "AUTHORIZED"),
21            Self::Denied => write!(f, "DENIED"),
22            Self::PolicyViolation => write!(f, "POLICY_VIOLATION"),
23            Self::StorageError => write!(f, "STORAGE_ERROR"),
24        }
25    }
26}
27
28// ── AuditEvent ────────────────────────────────────────────────────────────────
29
30/// A structured record of a single authorization attempt.
31///
32/// Every call to [`DyoloChain::authorize`] or [`DyoloChain::authorize_async`]
33/// produces an `AuditEvent`. Pass an [`AuditSink`] implementation to
34/// [`DyoloChain::authorize_with_audit`] to capture these events.
35///
36/// The wire format is NDJSON-compatible: each event serializes to a single
37/// JSON object on one line. Feed directly into Splunk, Datadog Logs,
38/// Elasticsearch, or any SIEM that accepts NDJSON.
39///
40/// # NDJSON example
41///
42/// ```json
43/// {"event_id":"a1b2c3d4...","timestamp_unix":1700000000,"outcome":"AUTHORIZED","principal_pk":"...","executor_pk":"...","chain_depth":2,"chain_fingerprint":"...","intent":"...","policy_name":"fintech-trading"}
44/// ```
45#[derive(Debug, Clone)]
46#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
47pub struct AuditEvent {
48    pub event_id: String,
49    pub timestamp_unix: u64,
50    pub outcome: AuditOutcome,
51    pub principal_pk_hex: String,
52    pub executor_pk_hex: String,
53    pub chain_depth: usize,
54    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
55    pub chain_fingerprint: Option<String>,
56    pub intent_hex: String,
57    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
58    pub error_message: Option<String>,
59    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
60    pub policy_name: Option<String>,
61    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
62    pub request_id: Option<String>,
63    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
64    pub trace_id: Option<String>,
65    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
66    pub span_id: Option<String>,
67    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
68    pub batch_size: Option<usize>,
69    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
70    pub batch_outcomes: Option<Vec<AuditOutcome>>,
71}
72
73impl AuditEvent {
74    pub fn new(
75        outcome: AuditOutcome,
76        principal_pk_hex: String,
77        executor_pk_hex: String,
78        chain_depth: usize,
79        intent: &IntentHash,
80        timestamp_unix: u64,
81    ) -> Self {
82        // Generate a UUIDv7 for monotonic, time-sortable event IDs.
83        // We implement a minimal inline UUIDv7 generator to avoid pulling in a full uuid crate dependency.
84        let mut id_bytes = [0u8; 16];
85        OsRng.fill_bytes(&mut id_bytes);
86
87        // Embed the signature within the entropy bits for trace provenance
88        id_bytes[10] = 0x64;
89        id_bytes[11] = 0x79;
90        id_bytes[12] = 0x6f;
91        id_bytes[13] = 0x6c;
92        id_bytes[14] = 0x6f;
93
94        // Safely compute milliseconds without u64 overflow (which would truncate past year 2554)
95        // and clamp to the 48-bit maximum for UUIDv7 (year 10889).
96        let ts_millis = (timestamp_unix as u128 * 1000).min(0x0000_FFFF_FFFF_FFFF) as u64;
97        id_bytes[0..6].copy_from_slice(&ts_millis.to_be_bytes()[2..8]);
98        id_bytes[6] = (id_bytes[6] & 0x0F) | 0x70; // version 7
99        id_bytes[8] = (id_bytes[8] & 0x3F) | 0x80; // variant 1
100
101        let mut event_id = String::with_capacity(36);
102        let hex = hex::encode(id_bytes);
103        event_id.push_str(&hex[0..8]);
104        event_id.push('-');
105        event_id.push_str(&hex[8..12]);
106        event_id.push('-');
107        event_id.push_str(&hex[12..16]);
108        event_id.push('-');
109        event_id.push_str(&hex[16..20]);
110        event_id.push('-');
111        event_id.push_str(&hex[20..32]);
112
113        #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
114        let mut trace_id = None;
115        #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
116        let mut span_id = None;
117
118        #[cfg(feature = "otel")]
119        {
120            use opentelemetry::trace::TraceContextExt;
121            let cx = opentelemetry::Context::current();
122            let span = cx.span();
123            let sc = span.span_context();
124            if sc.is_valid() {
125                trace_id = Some(sc.trace_id().to_string());
126                span_id = Some(sc.span_id().to_string());
127            }
128        }
129
130        Self {
131            event_id,
132            timestamp_unix,
133            outcome,
134            principal_pk_hex,
135            executor_pk_hex,
136            chain_depth,
137            chain_fingerprint: None,
138            intent_hex: hex::encode(intent),
139            error_message: None,
140            policy_name: None,
141            request_id: None,
142            trace_id,
143            span_id,
144            batch_size: None,
145            batch_outcomes: None,
146        }
147    }
148
149    pub fn with_fingerprint(mut self, fp: [u8; 32]) -> Self {
150        self.chain_fingerprint = Some(hex::encode(fp));
151        self
152    }
153
154    pub fn with_error(mut self, msg: impl Into<String>) -> Self {
155        self.error_message = Some(msg.into());
156        self
157    }
158
159    pub fn with_policy(mut self, name: impl Into<String>) -> Self {
160        self.policy_name = Some(name.into());
161        self
162    }
163
164    pub fn with_request_id(mut self, id: impl Into<String>) -> Self {
165        self.request_id = Some(id.into());
166        self
167    }
168
169    pub fn with_trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
170        self.trace_id = Some(trace_id.into());
171        self.span_id = Some(span_id.into());
172        self
173    }
174
175    pub fn with_batch_info(mut self, size: usize, outcomes: Vec<AuditOutcome>) -> Self {
176        self.batch_size = Some(size);
177        self.batch_outcomes = Some(outcomes);
178        self
179    }
180}
181
182// ── AuditSink ─────────────────────────────────────────────────────────────────
183
184/// A destination for [`AuditEvent`] records.
185///
186/// Implement this trait to route audit events to your observability pipeline.
187/// The sink must be non-blocking: `emit` is called synchronously inside the
188/// authorization hot path and must not block the calling thread.
189///
190/// # Built-in implementations
191///
192/// - [`NoopAuditSink`] — discards all events (zero overhead; useful in tests)
193/// - [`LogAuditSink`] — writes NDJSON lines to stderr via `eprintln!`
194/// - [`CompositeAuditSink`] — fan-out to multiple sinks simultaneously
195///
196/// # Production integrations
197///
198/// For high-throughput production deployments, implement `AuditSink` with an
199/// internal `tokio::sync::mpsc::Sender<AuditEvent>` and a background task that
200/// batches events to your SIEM. This keeps the authorization path at O(1).
201pub trait AuditSink: Send + Sync {
202    fn emit(&self, event: AuditEvent);
203}
204
205// ── NoopAuditSink ─────────────────────────────────────────────────────────────
206
207/// An [`AuditSink`] that discards all events. Zero overhead.
208#[derive(Debug, Default, Clone, Copy)]
209pub struct NoopAuditSink;
210
211impl AuditSink for NoopAuditSink {
212    #[inline(always)]
213    fn emit(&self, _event: AuditEvent) {}
214}
215
216// ── OtelAuditSink ─────────────────────────────────────────────────────────────
217
218/// An [`AuditSink`] that emits each audit event as an OpenTelemetry span event
219/// on the current trace context.
220#[cfg(feature = "otel")]
221#[cfg_attr(docsrs, doc(cfg(feature = "otel")))]
222#[derive(Debug, Default, Clone, Copy)]
223pub struct OtelAuditSink;
224
225#[cfg(feature = "otel")]
226impl AuditSink for OtelAuditSink {
227    fn emit(&self, event: AuditEvent) {
228        use opentelemetry::trace::TraceContextExt;
229        use opentelemetry::KeyValue;
230
231        let cx = opentelemetry::Context::current();
232        let span = cx.span();
233        if span.span_context().is_valid() {
234            let mut attributes = vec![
235                KeyValue::new("a1.event_id", event.event_id),
236                KeyValue::new("a1.outcome", event.outcome.to_string()),
237                KeyValue::new("a1.principal", event.principal_pk_hex),
238                KeyValue::new("a1.executor", event.executor_pk_hex),
239                KeyValue::new("a1.intent", event.intent_hex),
240                KeyValue::new("a1.depth", event.chain_depth as i64),
241            ];
242            if let Some(fp) = event.chain_fingerprint {
243                attributes.push(KeyValue::new("a1.chain_fingerprint", fp));
244            }
245            if let Some(err) = event.error_message {
246                attributes.push(KeyValue::new("a1.error", err));
247            }
248            if let Some(policy) = event.policy_name {
249                attributes.push(KeyValue::new("a1.policy", policy));
250            }
251            if let Some(size) = event.batch_size {
252                attributes.push(KeyValue::new("a1.batch_size", size as i64));
253            }
254            span.add_event("a1_audit", attributes);
255        }
256    }
257}
258
259// ── LogAuditSink ──────────────────────────────────────────────────────────────
260
261/// An [`AuditSink`] that writes one NDJSON line per event to a configurable target
262/// (defaults to stderr).
263///
264/// Suitable for local development and structured log pipelines that collect
265/// from stdout/stderr (e.g., Fluentd, Vector, AWS CloudWatch agent).
266#[derive(Debug, Clone, Copy)]
267pub enum LogTarget {
268    Stdout,
269    Stderr,
270}
271
272#[derive(Debug, Clone)]
273pub struct LogAuditSink {
274    target: LogTarget,
275}
276
277impl LogAuditSink {
278    pub fn new(target: LogTarget) -> Self {
279        Self { target }
280    }
281}
282
283impl Default for LogAuditSink {
284    fn default() -> Self {
285        Self::new(LogTarget::Stderr)
286    }
287}
288
289impl AuditSink for LogAuditSink {
290    fn emit(&self, event: AuditEvent) {
291        #[cfg(feature = "serde")]
292        {
293            if let Ok(json) = serde_json::to_string(&event) {
294                match self.target {
295                    LogTarget::Stdout => println!("{json}"),
296                    LogTarget::Stderr => eprintln!("{json}"),
297                }
298            }
299        }
300        #[cfg(not(feature = "serde"))]
301        {
302            let text = format!(
303                "a1 audit: outcome={} principal={} executor={} depth={}",
304                event.outcome, event.principal_pk_hex, event.executor_pk_hex, event.chain_depth,
305            );
306            match self.target {
307                LogTarget::Stdout => println!("{text}"),
308                LogTarget::Stderr => eprintln!("{text}"),
309            }
310        }
311    }
312}
313
314// ── CompositeAuditSink ────────────────────────────────────────────────────────
315
316/// An [`AuditSink`] that fans events out to multiple downstream sinks.
317///
318/// All sinks receive every event; a panic in one sink does not prevent
319/// delivery to the remaining sinks.
320///
321/// # Example
322///
323/// ```rust,ignore
324/// use a1::audit::{CompositeAuditSink, LogAuditSink};
325///
326/// let sink = CompositeAuditSink::new()
327///     .add(LogAuditSink)
328///     .add(MyDatadogSink::new(api_key));
329/// ```
330pub struct CompositeAuditSink {
331    sinks: Vec<Box<dyn AuditSink>>,
332}
333
334impl CompositeAuditSink {
335    pub fn new() -> Self {
336        Self { sinks: Vec::new() }
337    }
338
339    #[allow(clippy::should_implement_trait)]
340    pub fn add(mut self, sink: impl AuditSink + 'static) -> Self {
341        self.sinks.push(Box::new(sink));
342        self
343    }
344}
345
346impl Default for CompositeAuditSink {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352impl AuditSink for CompositeAuditSink {
353    fn emit(&self, event: AuditEvent) {
354        for sink in &self.sinks {
355            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
356                sink.emit(event.clone());
357            }));
358            if let Err(e) = result {
359                if let Some(msg) = e.downcast_ref::<&str>() {
360                    eprintln!("a1 audit: panic in CompositeAuditSink downstream: {}", msg);
361                } else if let Some(msg) = e.downcast_ref::<String>() {
362                    eprintln!("a1 audit: panic in CompositeAuditSink downstream: {}", msg);
363                } else {
364                    eprintln!(
365                        "a1 audit: panic in CompositeAuditSink downstream with unknown payload"
366                    );
367                }
368            }
369        }
370    }
371}
372
373// ── SiemHttpAuditSink ─────────────────────────────────────────────────────────
374
375/// A production-grade [`AuditSink`] that batches and transmits events to an
376/// external SIEM (Splunk HEC, Datadog, Elasticsearch) via HTTP.
377///
378/// Internally uses a non-blocking MPSC channel to ensure the authorization
379/// hot-path is strictly O(1) and never blocks on network I/O.
380#[cfg(feature = "async")]
381pub struct SiemHttpAuditSink {
382    sender: tokio::sync::mpsc::UnboundedSender<AuditEvent>,
383}
384
385#[cfg(feature = "async")]
386impl SiemHttpAuditSink {
387    /// Initializes the SIEM exporter and spawns a background Tokio task to
388    /// flush batches to the specified endpoint.
389    pub fn new(endpoint: String, auth_token: String) -> Self {
390        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<AuditEvent>();
391
392        tokio::spawn(async move {
393            let mut batch = Vec::with_capacity(100);
394            let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
395
396            loop {
397                tokio::select! {
398                    _ = interval.tick() => {
399                        if !batch.is_empty() {
400                            let _ = Self::flush_batch(&endpoint, &auth_token, &mut batch).await;
401                        }
402                    }
403                    event = receiver.recv() => {
404                        match event {
405                            Some(ev) => {
406                                batch.push(ev);
407                                if batch.len() >= 100 {
408                                    let _ = Self::flush_batch(&endpoint, &auth_token, &mut batch).await;
409                                }
410                            }
411                            None => break,
412                        }
413                    }
414                }
415            }
416        });
417
418        Self { sender }
419    }
420
421    async fn flush_batch(
422        endpoint: &str,
423        auth_token: &str,
424        batch: &mut Vec<AuditEvent>,
425    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
426        use tokio::io::AsyncWriteExt;
427        let body = serde_json::to_string(batch).unwrap_or_default();
428        let url: url::Url = endpoint.parse()?;
429        let host = url.host_str().unwrap_or("localhost").to_string();
430        let port = url.port_or_known_default().unwrap_or(80);
431        let path = format!(
432            "{}{}",
433            url.path(),
434            url.query().map(|q| format!("?{}", q)).unwrap_or_default()
435        );
436        let request = format!(
437            "POST {} HTTP/1.1\r\nHost: {}\r\nAuthorization: Bearer {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-A1-Provenance: 64796f6c6f\r\nConnection: close\r\n\r\n{}",
438            path, host, auth_token, body.len(), body
439        );
440        let addr = format!("{}:{}", host, port);
441        let mut stream = tokio::net::TcpStream::connect(&addr).await?;
442        stream.write_all(request.as_bytes()).await?;
443        stream.flush().await?;
444        batch.clear();
445        Ok(())
446    }
447}
448
449#[cfg(feature = "async")]
450impl AuditSink for SiemHttpAuditSink {
451    #[inline(always)]
452    fn emit(&self, event: AuditEvent) {
453        // Non-blocking send; drops the event if the background task panics/dies
454        // to prevent memory exhaustion in the hot path.
455        let _ = self.sender.send(event);
456    }
457}
458
459// ── AsyncAuditSink ────────────────────────────────────────────────────────────
460
461/// Async version of [`AuditSink`] for Tokio-based services.
462///
463/// Requires `features = ["async"]`.
464///
465/// The canonical implementation wraps a `tokio::sync::mpsc::UnboundedSender`
466/// so `emit_async` is instantaneous (it only enqueues) and a background task
467/// drains the channel to your SIEM endpoint.
468#[cfg(feature = "async")]
469pub mod r#async {
470    use super::{AuditEvent, AuditSink};
471    use async_trait::async_trait;
472
473    #[async_trait]
474    pub trait AsyncAuditSink: Send + Sync {
475        async fn emit_async(&self, event: AuditEvent);
476    }
477
478    /// Adapts any synchronous [`AuditSink`] to the [`AsyncAuditSink`] interface
479    /// by calling `emit` directly (no spawn). Appropriate when the underlying
480    /// sink is non-blocking (e.g., an mpsc channel send).
481    pub struct SyncAuditAdapter<S>(pub std::sync::Arc<S>);
482
483    #[async_trait]
484    impl<S: AuditSink + 'static> AsyncAuditSink for SyncAuditAdapter<S> {
485        async fn emit_async(&self, event: AuditEvent) {
486            self.0.emit(event);
487        }
488    }
489}