1use rand::{rngs::OsRng, RngCore};
2
3use crate::intent::IntentHash;
4
5#[derive(Debug, Clone, PartialEq, Eq)]
8#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
9#[cfg_attr(feature = "serde", serde(rename_all = "SCREAMING_SNAKE_CASE"))]
10pub enum AuditOutcome {
11 Authorized,
12 Denied,
13 PolicyViolation,
14 StorageError,
15}
16
17impl std::fmt::Display for AuditOutcome {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 match self {
20 Self::Authorized => write!(f, "AUTHORIZED"),
21 Self::Denied => write!(f, "DENIED"),
22 Self::PolicyViolation => write!(f, "POLICY_VIOLATION"),
23 Self::StorageError => write!(f, "STORAGE_ERROR"),
24 }
25 }
26}
27
28#[derive(Debug, Clone)]
46#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
47pub struct AuditEvent {
48 pub event_id: String,
49 pub timestamp_unix: u64,
50 pub outcome: AuditOutcome,
51 pub principal_pk_hex: String,
52 pub executor_pk_hex: String,
53 pub chain_depth: usize,
54 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
55 pub chain_fingerprint: Option<String>,
56 pub intent_hex: String,
57 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
58 pub error_message: Option<String>,
59 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
60 pub policy_name: Option<String>,
61 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
62 pub request_id: Option<String>,
63 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
64 pub trace_id: Option<String>,
65 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
66 pub span_id: Option<String>,
67 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
68 pub batch_size: Option<usize>,
69 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
70 pub batch_outcomes: Option<Vec<AuditOutcome>>,
71}
72
73impl AuditEvent {
74 pub fn new(
75 outcome: AuditOutcome,
76 principal_pk_hex: String,
77 executor_pk_hex: String,
78 chain_depth: usize,
79 intent: &IntentHash,
80 timestamp_unix: u64,
81 ) -> Self {
82 let mut id_bytes = [0u8; 16];
85 OsRng.fill_bytes(&mut id_bytes);
86
87 id_bytes[10] = 0x64;
89 id_bytes[11] = 0x79;
90 id_bytes[12] = 0x6f;
91 id_bytes[13] = 0x6c;
92 id_bytes[14] = 0x6f;
93
94 let ts_millis = (timestamp_unix as u128 * 1000).min(0x0000_FFFF_FFFF_FFFF) as u64;
97 id_bytes[0..6].copy_from_slice(&ts_millis.to_be_bytes()[2..8]);
98 id_bytes[6] = (id_bytes[6] & 0x0F) | 0x70; id_bytes[8] = (id_bytes[8] & 0x3F) | 0x80; let mut event_id = String::with_capacity(36);
102 let hex = hex::encode(id_bytes);
103 event_id.push_str(&hex[0..8]);
104 event_id.push('-');
105 event_id.push_str(&hex[8..12]);
106 event_id.push('-');
107 event_id.push_str(&hex[12..16]);
108 event_id.push('-');
109 event_id.push_str(&hex[16..20]);
110 event_id.push('-');
111 event_id.push_str(&hex[20..32]);
112
113 #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
114 let mut trace_id = None;
115 #[cfg_attr(not(feature = "otel"), allow(unused_mut))]
116 let mut span_id = None;
117
118 #[cfg(feature = "otel")]
119 {
120 use opentelemetry::trace::TraceContextExt;
121 let cx = opentelemetry::Context::current();
122 let span = cx.span();
123 let sc = span.span_context();
124 if sc.is_valid() {
125 trace_id = Some(sc.trace_id().to_string());
126 span_id = Some(sc.span_id().to_string());
127 }
128 }
129
130 Self {
131 event_id,
132 timestamp_unix,
133 outcome,
134 principal_pk_hex,
135 executor_pk_hex,
136 chain_depth,
137 chain_fingerprint: None,
138 intent_hex: hex::encode(intent),
139 error_message: None,
140 policy_name: None,
141 request_id: None,
142 trace_id,
143 span_id,
144 batch_size: None,
145 batch_outcomes: None,
146 }
147 }
148
149 pub fn with_fingerprint(mut self, fp: [u8; 32]) -> Self {
150 self.chain_fingerprint = Some(hex::encode(fp));
151 self
152 }
153
154 pub fn with_error(mut self, msg: impl Into<String>) -> Self {
155 self.error_message = Some(msg.into());
156 self
157 }
158
159 pub fn with_policy(mut self, name: impl Into<String>) -> Self {
160 self.policy_name = Some(name.into());
161 self
162 }
163
164 pub fn with_request_id(mut self, id: impl Into<String>) -> Self {
165 self.request_id = Some(id.into());
166 self
167 }
168
169 pub fn with_trace(mut self, trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
170 self.trace_id = Some(trace_id.into());
171 self.span_id = Some(span_id.into());
172 self
173 }
174
175 pub fn with_batch_info(mut self, size: usize, outcomes: Vec<AuditOutcome>) -> Self {
176 self.batch_size = Some(size);
177 self.batch_outcomes = Some(outcomes);
178 self
179 }
180}
181
182pub trait AuditSink: Send + Sync {
202 fn emit(&self, event: AuditEvent);
203}
204
205#[derive(Debug, Default, Clone, Copy)]
209pub struct NoopAuditSink;
210
211impl AuditSink for NoopAuditSink {
212 #[inline(always)]
213 fn emit(&self, _event: AuditEvent) {}
214}
215
216#[cfg(feature = "otel")]
221#[cfg_attr(docsrs, doc(cfg(feature = "otel")))]
222#[derive(Debug, Default, Clone, Copy)]
223pub struct OtelAuditSink;
224
225#[cfg(feature = "otel")]
226impl AuditSink for OtelAuditSink {
227 fn emit(&self, event: AuditEvent) {
228 use opentelemetry::trace::TraceContextExt;
229 use opentelemetry::KeyValue;
230
231 let cx = opentelemetry::Context::current();
232 let span = cx.span();
233 if span.span_context().is_valid() {
234 let mut attributes = vec![
235 KeyValue::new("a1.event_id", event.event_id),
236 KeyValue::new("a1.outcome", event.outcome.to_string()),
237 KeyValue::new("a1.principal", event.principal_pk_hex),
238 KeyValue::new("a1.executor", event.executor_pk_hex),
239 KeyValue::new("a1.intent", event.intent_hex),
240 KeyValue::new("a1.depth", event.chain_depth as i64),
241 ];
242 if let Some(fp) = event.chain_fingerprint {
243 attributes.push(KeyValue::new("a1.chain_fingerprint", fp));
244 }
245 if let Some(err) = event.error_message {
246 attributes.push(KeyValue::new("a1.error", err));
247 }
248 if let Some(policy) = event.policy_name {
249 attributes.push(KeyValue::new("a1.policy", policy));
250 }
251 if let Some(size) = event.batch_size {
252 attributes.push(KeyValue::new("a1.batch_size", size as i64));
253 }
254 span.add_event("a1_audit", attributes);
255 }
256 }
257}
258
259#[derive(Debug, Clone, Copy)]
267pub enum LogTarget {
268 Stdout,
269 Stderr,
270}
271
272#[derive(Debug, Clone)]
273pub struct LogAuditSink {
274 target: LogTarget,
275}
276
277impl LogAuditSink {
278 pub fn new(target: LogTarget) -> Self {
279 Self { target }
280 }
281}
282
283impl Default for LogAuditSink {
284 fn default() -> Self {
285 Self::new(LogTarget::Stderr)
286 }
287}
288
289impl AuditSink for LogAuditSink {
290 fn emit(&self, event: AuditEvent) {
291 #[cfg(feature = "serde")]
292 {
293 if let Ok(json) = serde_json::to_string(&event) {
294 match self.target {
295 LogTarget::Stdout => println!("{json}"),
296 LogTarget::Stderr => eprintln!("{json}"),
297 }
298 }
299 }
300 #[cfg(not(feature = "serde"))]
301 {
302 let text = format!(
303 "a1 audit: outcome={} principal={} executor={} depth={}",
304 event.outcome, event.principal_pk_hex, event.executor_pk_hex, event.chain_depth,
305 );
306 match self.target {
307 LogTarget::Stdout => println!("{text}"),
308 LogTarget::Stderr => eprintln!("{text}"),
309 }
310 }
311 }
312}
313
314pub struct CompositeAuditSink {
331 sinks: Vec<Box<dyn AuditSink>>,
332}
333
334impl CompositeAuditSink {
335 pub fn new() -> Self {
336 Self { sinks: Vec::new() }
337 }
338
339 #[allow(clippy::should_implement_trait)]
340 pub fn add(mut self, sink: impl AuditSink + 'static) -> Self {
341 self.sinks.push(Box::new(sink));
342 self
343 }
344}
345
346impl Default for CompositeAuditSink {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352impl AuditSink for CompositeAuditSink {
353 fn emit(&self, event: AuditEvent) {
354 for sink in &self.sinks {
355 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
356 sink.emit(event.clone());
357 }));
358 if let Err(e) = result {
359 if let Some(msg) = e.downcast_ref::<&str>() {
360 eprintln!("a1 audit: panic in CompositeAuditSink downstream: {}", msg);
361 } else if let Some(msg) = e.downcast_ref::<String>() {
362 eprintln!("a1 audit: panic in CompositeAuditSink downstream: {}", msg);
363 } else {
364 eprintln!(
365 "a1 audit: panic in CompositeAuditSink downstream with unknown payload"
366 );
367 }
368 }
369 }
370 }
371}
372
373#[cfg(feature = "async")]
381pub struct SiemHttpAuditSink {
382 sender: tokio::sync::mpsc::UnboundedSender<AuditEvent>,
383}
384
385#[cfg(feature = "async")]
386impl SiemHttpAuditSink {
387 pub fn new(endpoint: String, auth_token: String) -> Self {
390 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::<AuditEvent>();
391
392 tokio::spawn(async move {
393 let mut batch = Vec::with_capacity(100);
394 let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
395
396 loop {
397 tokio::select! {
398 _ = interval.tick() => {
399 if !batch.is_empty() {
400 let _ = Self::flush_batch(&endpoint, &auth_token, &mut batch).await;
401 }
402 }
403 event = receiver.recv() => {
404 match event {
405 Some(ev) => {
406 batch.push(ev);
407 if batch.len() >= 100 {
408 let _ = Self::flush_batch(&endpoint, &auth_token, &mut batch).await;
409 }
410 }
411 None => break,
412 }
413 }
414 }
415 }
416 });
417
418 Self { sender }
419 }
420
421 async fn flush_batch(
422 endpoint: &str,
423 auth_token: &str,
424 batch: &mut Vec<AuditEvent>,
425 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
426 use tokio::io::AsyncWriteExt;
427 let body = serde_json::to_string(batch).unwrap_or_default();
428 let url: url::Url = endpoint.parse()?;
429 let host = url.host_str().unwrap_or("localhost").to_string();
430 let port = url.port_or_known_default().unwrap_or(80);
431 let path = format!(
432 "{}{}",
433 url.path(),
434 url.query().map(|q| format!("?{}", q)).unwrap_or_default()
435 );
436 let request = format!(
437 "POST {} HTTP/1.1\r\nHost: {}\r\nAuthorization: Bearer {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-A1-Provenance: 64796f6c6f\r\nConnection: close\r\n\r\n{}",
438 path, host, auth_token, body.len(), body
439 );
440 let addr = format!("{}:{}", host, port);
441 let mut stream = tokio::net::TcpStream::connect(&addr).await?;
442 stream.write_all(request.as_bytes()).await?;
443 stream.flush().await?;
444 batch.clear();
445 Ok(())
446 }
447}
448
449#[cfg(feature = "async")]
450impl AuditSink for SiemHttpAuditSink {
451 #[inline(always)]
452 fn emit(&self, event: AuditEvent) {
453 let _ = self.sender.send(event);
456 }
457}
458
459#[cfg(feature = "async")]
469pub mod r#async {
470 use super::{AuditEvent, AuditSink};
471 use async_trait::async_trait;
472
473 #[async_trait]
474 pub trait AsyncAuditSink: Send + Sync {
475 async fn emit_async(&self, event: AuditEvent);
476 }
477
478 pub struct SyncAuditAdapter<S>(pub std::sync::Arc<S>);
482
483 #[async_trait]
484 impl<S: AuditSink + 'static> AsyncAuditSink for SyncAuditAdapter<S> {
485 async fn emit_async(&self, event: AuditEvent) {
486 self.0.emit(event);
487 }
488 }
489}