Skip to main content

ranvier_audit/
lib.rs

1pub mod file_sink;
2#[cfg(feature = "postgres")]
3pub mod postgres;
4
5use async_trait::async_trait;
6use chrono::{DateTime, Duration, Utc};
7use ring::digest;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use thiserror::Error;
12use tokio::sync::Mutex;
13
14/// Core error type for audit operations
15#[derive(Debug, Error)]
16pub enum AuditError {
17    #[error("Failed to append event: {0}")]
18    AppendFailed(String),
19    #[error("Serialization error: {0}")]
20    SerializationError(#[from] serde_json::Error),
21    #[error("Internal audit error: {0}")]
22    Internal(String),
23    #[error("Integrity violation at event index {index}: {reason}")]
24    IntegrityViolation { index: usize, reason: String },
25}
26
27/// The main Audit Event payload reflecting the 5 W's.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct AuditEvent {
30    /// Unique event identifier for correlation
31    pub id: String,
32    /// When did this occur?
33    pub timestamp: DateTime<Utc>,
34    /// Who performed the action? (User, Service, System)
35    pub actor: String,
36    /// What was the action? (Create, Read, Update, Delete, Transition, etc.)
37    pub action: String,
38    /// Where did it happen? (Target resource identifier, node id)
39    pub target: String,
40    /// What was the intent or outcome?
41    pub intent: Option<String>,
42    /// Optional structured metadata related to the event payload.
43    /// Warning: Do not embed PII here unless properly redacted.
44    pub metadata: HashMap<String, serde_json::Value>,
45    /// Hash of the previous event in the chain (None for the first event).
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub prev_hash: Option<String>,
48}
49
50impl AuditEvent {
51    /// Creates a new basic audit event
52    pub fn new(id: String, actor: String, action: String, target: String) -> Self {
53        Self {
54            id,
55            timestamp: Utc::now(),
56            actor,
57            action,
58            target,
59            intent: None,
60            metadata: HashMap::new(),
61            prev_hash: None,
62        }
63    }
64
65    pub fn with_intent(mut self, intent: impl Into<String>) -> Self {
66        self.intent = Some(intent.into());
67        self
68    }
69
70    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
71        if let Ok(json_val) = serde_json::to_value(value) {
72            self.metadata.insert(key.into(), json_val);
73        }
74        self
75    }
76
77    /// Compute the SHA-256 hash of this event's canonical JSON representation.
78    pub fn compute_hash(&self) -> String {
79        let payload = serde_json::to_string(self).unwrap_or_default();
80        let digest = digest::digest(&digest::SHA256, payload.as_bytes());
81        hex::encode(digest.as_ref())
82    }
83}
84
85// ---------------------------------------------------------------------------
86// AuditChain — tamper-proof hash chain
87// ---------------------------------------------------------------------------
88
89/// A tamper-proof hash chain of audit events.
90///
91/// Each event's `prev_hash` links to the SHA-256 hash of the preceding event,
92/// forming a chain. If any event is deleted or modified, the chain breaks.
93#[derive(Clone)]
94pub struct AuditChain {
95    events: Arc<Mutex<Vec<AuditEvent>>>,
96    last_hash: Arc<Mutex<Option<String>>>,
97}
98
99impl AuditChain {
100    pub fn new() -> Self {
101        Self {
102            events: Arc::new(Mutex::new(Vec::new())),
103            last_hash: Arc::new(Mutex::new(None)),
104        }
105    }
106
107    /// Append an event to the chain, linking it to the previous event's hash.
108    pub async fn append(&self, mut event: AuditEvent) -> AuditEvent {
109        let mut events = self.events.lock().await;
110        let mut last_hash = self.last_hash.lock().await;
111
112        event.prev_hash = last_hash.clone();
113        let hash = event.compute_hash();
114        *last_hash = Some(hash);
115
116        events.push(event.clone());
117        event
118    }
119
120    /// Verify the integrity of the entire chain.
121    ///
122    /// Returns `Ok(())` if the chain is intact, or an error describing the
123    /// first integrity violation found.
124    pub async fn verify(&self) -> Result<(), AuditError> {
125        let events = self.events.lock().await;
126        let mut prev_hash: Option<String> = None;
127
128        for (i, event) in events.iter().enumerate() {
129            if event.prev_hash != prev_hash {
130                return Err(AuditError::IntegrityViolation {
131                    index: i,
132                    reason: format!(
133                        "expected prev_hash {:?}, found {:?}",
134                        prev_hash, event.prev_hash
135                    ),
136                });
137            }
138            prev_hash = Some(event.compute_hash());
139        }
140
141        Ok(())
142    }
143
144    /// Get a snapshot of all events in the chain.
145    pub async fn events(&self) -> Vec<AuditEvent> {
146        self.events.lock().await.clone()
147    }
148
149    /// Get the number of events in the chain.
150    pub async fn len(&self) -> usize {
151        self.events.lock().await.len()
152    }
153
154    /// Check if the chain is empty.
155    pub async fn is_empty(&self) -> bool {
156        self.events.lock().await.is_empty()
157    }
158}
159
160impl Default for AuditChain {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166// ---------------------------------------------------------------------------
167// AuditQuery — structured event filtering
168// ---------------------------------------------------------------------------
169
170/// Builder for filtering audit events by time range, event type, actor, or resource.
171#[derive(Debug, Clone, Default)]
172pub struct AuditQuery {
173    pub(crate) time_start: Option<DateTime<Utc>>,
174    pub(crate) time_end: Option<DateTime<Utc>>,
175    pub(crate) action: Option<String>,
176    pub(crate) actor: Option<String>,
177    pub(crate) target: Option<String>,
178}
179
180impl AuditQuery {
181    pub fn new() -> Self {
182        Self::default()
183    }
184
185    /// Filter events within a time range (inclusive).
186    pub fn time_range(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
187        self.time_start = Some(start);
188        self.time_end = Some(end);
189        self
190    }
191
192    /// Filter events by action type.
193    pub fn action(mut self, action: impl Into<String>) -> Self {
194        self.action = Some(action.into());
195        self
196    }
197
198    /// Filter events by actor.
199    pub fn actor(mut self, actor: impl Into<String>) -> Self {
200        self.actor = Some(actor.into());
201        self
202    }
203
204    /// Filter events by target resource.
205    pub fn target(mut self, target: impl Into<String>) -> Self {
206        self.target = Some(target.into());
207        self
208    }
209
210    /// Test whether an event matches this query.
211    pub fn matches(&self, event: &AuditEvent) -> bool {
212        if let Some(start) = &self.time_start {
213            if event.timestamp < *start {
214                return false;
215            }
216        }
217        if let Some(end) = &self.time_end {
218            if event.timestamp > *end {
219                return false;
220            }
221        }
222        if let Some(action) = &self.action {
223            if event.action != *action {
224                return false;
225            }
226        }
227        if let Some(actor) = &self.actor {
228            if event.actor != *actor {
229                return false;
230            }
231        }
232        if let Some(target) = &self.target {
233            if event.target != *target {
234                return false;
235            }
236        }
237        true
238    }
239
240    /// Apply this query to a slice of events, returning matching events.
241    pub fn filter<'a>(&self, events: &'a [AuditEvent]) -> Vec<&'a AuditEvent> {
242        events.iter().filter(|e| self.matches(e)).collect()
243    }
244}
245
246// ---------------------------------------------------------------------------
247// RetentionPolicy — automatic event lifecycle management
248// ---------------------------------------------------------------------------
249
250/// Strategy for handling events that exceed retention limits.
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252pub enum ArchiveStrategy {
253    /// Delete events permanently.
254    Delete,
255    /// Archive events (caller handles archival destination).
256    Archive,
257}
258
259/// Policy for automatic audit event lifecycle management.
260#[derive(Debug, Clone)]
261pub struct RetentionPolicy {
262    /// Maximum age of events to retain.
263    pub max_age: Option<Duration>,
264    /// Maximum number of events to retain.
265    pub max_count: Option<usize>,
266    /// What to do with events that exceed retention limits.
267    pub strategy: ArchiveStrategy,
268}
269
270impl RetentionPolicy {
271    /// Create a retention policy with a maximum event age.
272    pub fn max_age(age: Duration) -> Self {
273        Self {
274            max_age: Some(age),
275            max_count: None,
276            strategy: ArchiveStrategy::Delete,
277        }
278    }
279
280    /// Create a retention policy with a maximum event count.
281    pub fn max_count(count: usize) -> Self {
282        Self {
283            max_age: None,
284            max_count: Some(count),
285            strategy: ArchiveStrategy::Delete,
286        }
287    }
288
289    /// Set the archive strategy.
290    pub fn with_strategy(mut self, strategy: ArchiveStrategy) -> Self {
291        self.strategy = strategy;
292        self
293    }
294
295    /// Apply this retention policy to a list of events, returning
296    /// `(retained, expired)` event lists.
297    pub fn apply(&self, events: &[AuditEvent]) -> (Vec<AuditEvent>, Vec<AuditEvent>) {
298        let now = Utc::now();
299        let mut retained: Vec<AuditEvent> = events.to_vec();
300        let mut expired: Vec<AuditEvent> = Vec::new();
301
302        // Apply max_age filter
303        if let Some(max_age) = &self.max_age {
304            let cutoff = now - *max_age;
305            let (keep, remove): (Vec<_>, Vec<_>) =
306                retained.into_iter().partition(|e| e.timestamp >= cutoff);
307            retained = keep;
308            expired.extend(remove);
309        }
310
311        // Apply max_count (keep the newest events)
312        if let Some(max_count) = self.max_count {
313            if retained.len() > max_count {
314                let excess = retained.len() - max_count;
315                let removed: Vec<_> = retained.drain(..excess).collect();
316                expired.extend(removed);
317            }
318        }
319
320        (retained, expired)
321    }
322}
323
324// ---------------------------------------------------------------------------
325// AuditSink trait
326// ---------------------------------------------------------------------------
327
328/// A sink responsible for durably and securely writing audit events.
329#[async_trait]
330pub trait AuditSink: Send + Sync {
331    /// Append a new event to the secure audit log.
332    async fn append(&self, event: &AuditEvent) -> Result<(), AuditError>;
333
334    /// Query events matching the given filter. Not all sinks support querying.
335    async fn query(&self, _query: &AuditQuery) -> Result<Vec<AuditEvent>, AuditError> {
336        Ok(Vec::new())
337    }
338
339    /// Apply a retention policy. Not all sinks support retention.
340    async fn apply_retention(
341        &self,
342        _policy: &RetentionPolicy,
343    ) -> Result<Vec<AuditEvent>, AuditError> {
344        Ok(Vec::new())
345    }
346}
347
348// ---------------------------------------------------------------------------
349// InMemoryAuditSink — in-memory sink with query and retention support
350// ---------------------------------------------------------------------------
351
352/// In-memory audit sink for testing and development.
353///
354/// Supports querying and retention policy application.
355#[derive(Clone, Default)]
356pub struct InMemoryAuditSink {
357    events: Arc<Mutex<Vec<AuditEvent>>>,
358}
359
360impl InMemoryAuditSink {
361    pub fn new() -> Self {
362        Self::default()
363    }
364
365    /// Get a snapshot of all stored events.
366    pub async fn get_events(&self) -> Vec<AuditEvent> {
367        self.events.lock().await.clone()
368    }
369
370    /// Get the number of stored events.
371    pub async fn len(&self) -> usize {
372        self.events.lock().await.len()
373    }
374
375    /// Check if the sink is empty.
376    pub async fn is_empty(&self) -> bool {
377        self.events.lock().await.is_empty()
378    }
379}
380
381#[async_trait]
382impl AuditSink for InMemoryAuditSink {
383    async fn append(&self, event: &AuditEvent) -> Result<(), AuditError> {
384        self.events.lock().await.push(event.clone());
385        Ok(())
386    }
387
388    async fn query(&self, query: &AuditQuery) -> Result<Vec<AuditEvent>, AuditError> {
389        let events = self.events.lock().await;
390        Ok(query.filter(&events).into_iter().cloned().collect())
391    }
392
393    async fn apply_retention(
394        &self,
395        policy: &RetentionPolicy,
396    ) -> Result<Vec<AuditEvent>, AuditError> {
397        let mut events = self.events.lock().await;
398        let (retained, expired) = policy.apply(&events);
399        *events = retained;
400        Ok(expired)
401    }
402}
403
404// ---------------------------------------------------------------------------
405// AuditLogger
406// ---------------------------------------------------------------------------
407
408/// Core interface for invoking audit logging from within Ranvier.
409#[derive(Clone)]
410pub struct AuditLogger<S: AuditSink> {
411    sink: Arc<S>,
412}
413
414impl<S: AuditSink> AuditLogger<S> {
415    pub fn new(sink: S) -> Self {
416        Self {
417            sink: Arc::new(sink),
418        }
419    }
420
421    /// Logs an event
422    pub async fn log(&self, event: AuditEvent) -> Result<(), AuditError> {
423        self.sink.append(&event).await
424    }
425
426    /// Query events matching the given filter.
427    pub async fn query(&self, query: &AuditQuery) -> Result<Vec<AuditEvent>, AuditError> {
428        self.sink.query(query).await
429    }
430
431    /// Apply a retention policy.
432    pub async fn apply_retention(
433        &self,
434        policy: &RetentionPolicy,
435    ) -> Result<Vec<AuditEvent>, AuditError> {
436        self.sink.apply_retention(policy).await
437    }
438}
439
440#[cfg(test)]
441mod tests;