Skip to main content

ringkernel_core/
audit.rs

1//! Audit logging for enterprise security and compliance.
2//!
3//! This module provides comprehensive audit logging for GPU kernel operations,
4//! enabling security monitoring, compliance reporting, and forensic analysis.
5//!
6//! # Features
7//!
8//! - Structured audit events with timestamps
9//! - Multiple output sinks (file, syslog, custom)
10//! - Tamper-evident log chains with checksums
11//! - Async-safe audit trail generation
12//! - Retention policies and log rotation
13//!
14//! # Example
15//!
16//! ```ignore
17//! use ringkernel_core::audit::{AuditLogger, AuditEvent, AuditLevel};
18//!
19//! let logger = AuditLogger::new()
20//!     .with_file_sink("/var/log/ringkernel/audit.log")
21//!     .with_retention(Duration::from_days(90))
22//!     .build()?;
23//!
24//! logger.log(AuditEvent::kernel_launched("processor", "cuda"));
25//! ```
26
27use std::collections::VecDeque;
28use std::fmt;
29use std::io::Write;
30use std::net::UdpSocket;
31use std::path::PathBuf;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35
36use parking_lot::{Mutex, RwLock};
37
38use crate::hlc::HlcTimestamp;
39
40// ============================================================================
41// AUDIT LEVELS
42// ============================================================================
43
44/// Audit event severity levels.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46#[repr(u8)]
47pub enum AuditLevel {
48    /// Informational events (kernel start/stop, config changes).
49    Info = 0,
50    /// Warning events (degraded performance, retries).
51    Warning = 1,
52    /// Security-relevant events (authentication, authorization).
53    Security = 2,
54    /// Critical events (failures, violations).
55    Critical = 3,
56    /// Compliance-relevant events (data access, retention).
57    Compliance = 4,
58}
59
60impl AuditLevel {
61    /// Get the level name.
62    pub fn as_str(&self) -> &'static str {
63        match self {
64            Self::Info => "INFO",
65            Self::Warning => "WARNING",
66            Self::Security => "SECURITY",
67            Self::Critical => "CRITICAL",
68            Self::Compliance => "COMPLIANCE",
69        }
70    }
71}
72
73impl fmt::Display for AuditLevel {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        write!(f, "{}", self.as_str())
76    }
77}
78
79// ============================================================================
80// AUDIT EVENT TYPES
81// ============================================================================
82
83/// Types of audit events.
84#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub enum AuditEventType {
86    // Kernel lifecycle events
87    /// Kernel was launched.
88    KernelLaunched,
89    /// Kernel was terminated.
90    KernelTerminated,
91    /// Kernel was migrated to another device.
92    KernelMigrated,
93    /// Kernel checkpoint was created.
94    KernelCheckpointed,
95    /// Kernel was restored from checkpoint.
96    KernelRestored,
97
98    // Message events
99    /// Message was sent.
100    MessageSent,
101    /// Message was received.
102    MessageReceived,
103    /// Message delivery failed.
104    MessageFailed,
105
106    // Security events
107    /// Authentication attempt.
108    AuthenticationAttempt,
109    /// Authorization check.
110    AuthorizationCheck,
111    /// Configuration change.
112    ConfigurationChange,
113    /// Security policy violation.
114    SecurityViolation,
115
116    // Resource events
117    /// GPU memory allocated.
118    MemoryAllocated,
119    /// GPU memory deallocated.
120    MemoryDeallocated,
121    /// Resource limit exceeded.
122    ResourceLimitExceeded,
123
124    // Health events
125    /// Health check performed.
126    HealthCheck,
127    /// Circuit breaker state changed.
128    CircuitBreakerStateChange,
129    /// Degradation level changed.
130    DegradationChange,
131
132    /// Custom event type for user-defined audit events.
133    Custom(String),
134}
135
136impl AuditEventType {
137    /// Get the event type name.
138    pub fn as_str(&self) -> &str {
139        match self {
140            Self::KernelLaunched => "kernel_launched",
141            Self::KernelTerminated => "kernel_terminated",
142            Self::KernelMigrated => "kernel_migrated",
143            Self::KernelCheckpointed => "kernel_checkpointed",
144            Self::KernelRestored => "kernel_restored",
145            Self::MessageSent => "message_sent",
146            Self::MessageReceived => "message_received",
147            Self::MessageFailed => "message_failed",
148            Self::AuthenticationAttempt => "authentication_attempt",
149            Self::AuthorizationCheck => "authorization_check",
150            Self::ConfigurationChange => "configuration_change",
151            Self::SecurityViolation => "security_violation",
152            Self::MemoryAllocated => "memory_allocated",
153            Self::MemoryDeallocated => "memory_deallocated",
154            Self::ResourceLimitExceeded => "resource_limit_exceeded",
155            Self::HealthCheck => "health_check",
156            Self::CircuitBreakerStateChange => "circuit_breaker_state_change",
157            Self::DegradationChange => "degradation_change",
158            Self::Custom(s) => s.as_str(),
159        }
160    }
161}
162
163impl fmt::Display for AuditEventType {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        write!(f, "{}", self.as_str())
166    }
167}
168
169// ============================================================================
170// AUDIT EVENT
171// ============================================================================
172
173/// A structured audit event.
174#[derive(Debug, Clone)]
175pub struct AuditEvent {
176    /// Unique event ID.
177    pub id: u64,
178    /// Event timestamp (wall clock).
179    pub timestamp: SystemTime,
180    /// HLC timestamp for causal ordering.
181    pub hlc: Option<HlcTimestamp>,
182    /// Event level.
183    pub level: AuditLevel,
184    /// Event type.
185    pub event_type: AuditEventType,
186    /// Actor/component that generated the event.
187    pub actor: String,
188    /// Target resource or kernel.
189    pub target: Option<String>,
190    /// Event description.
191    pub description: String,
192    /// Additional metadata as key-value pairs.
193    pub metadata: Vec<(String, String)>,
194    /// Previous event checksum (for tamper detection).
195    pub prev_checksum: Option<u64>,
196    /// This event's checksum.
197    pub checksum: u64,
198}
199
200impl AuditEvent {
201    /// Create a new audit event.
202    pub fn new(
203        level: AuditLevel,
204        event_type: AuditEventType,
205        actor: impl Into<String>,
206        description: impl Into<String>,
207    ) -> Self {
208        let id = next_event_id();
209        let timestamp = SystemTime::now();
210        let actor = actor.into();
211        let description = description.into();
212
213        let mut event = Self {
214            id,
215            timestamp,
216            hlc: None,
217            level,
218            event_type,
219            actor,
220            target: None,
221            description,
222            metadata: Vec::new(),
223            prev_checksum: None,
224            checksum: 0,
225        };
226
227        event.checksum = event.compute_checksum();
228        event
229    }
230
231    /// Add an HLC timestamp.
232    pub fn with_hlc(mut self, hlc: HlcTimestamp) -> Self {
233        self.hlc = Some(hlc);
234        self.checksum = self.compute_checksum();
235        self
236    }
237
238    /// Add a target resource.
239    pub fn with_target(mut self, target: impl Into<String>) -> Self {
240        self.target = Some(target.into());
241        self.checksum = self.compute_checksum();
242        self
243    }
244
245    /// Add metadata.
246    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
247        self.metadata.push((key.into(), value.into()));
248        self.checksum = self.compute_checksum();
249        self
250    }
251
252    /// Set the previous checksum for chain integrity.
253    pub fn with_prev_checksum(mut self, checksum: u64) -> Self {
254        self.prev_checksum = Some(checksum);
255        self.checksum = self.compute_checksum();
256        self
257    }
258
259    /// Compute a checksum for this event.
260    fn compute_checksum(&self) -> u64 {
261        use std::collections::hash_map::DefaultHasher;
262        use std::hash::{Hash, Hasher};
263
264        let mut hasher = DefaultHasher::new();
265        self.id.hash(&mut hasher);
266        self.timestamp
267            .duration_since(UNIX_EPOCH)
268            .unwrap_or_default()
269            .as_nanos()
270            .hash(&mut hasher);
271        self.level.as_str().hash(&mut hasher);
272        self.event_type.as_str().hash(&mut hasher);
273        self.actor.hash(&mut hasher);
274        self.target.hash(&mut hasher);
275        self.description.hash(&mut hasher);
276        for (k, v) in &self.metadata {
277            k.hash(&mut hasher);
278            v.hash(&mut hasher);
279        }
280        self.prev_checksum.hash(&mut hasher);
281        hasher.finish()
282    }
283
284    /// Verify the event checksum.
285    pub fn verify_checksum(&self) -> bool {
286        self.checksum == self.compute_checksum()
287    }
288
289    // Helper constructors for common events
290
291    /// Create a kernel launched event.
292    pub fn kernel_launched(kernel_id: impl Into<String>, backend: impl Into<String>) -> Self {
293        Self::new(
294            AuditLevel::Info,
295            AuditEventType::KernelLaunched,
296            "runtime",
297            format!("Kernel launched on {}", backend.into()),
298        )
299        .with_target(kernel_id)
300    }
301
302    /// Create a kernel terminated event.
303    pub fn kernel_terminated(kernel_id: impl Into<String>, reason: impl Into<String>) -> Self {
304        Self::new(
305            AuditLevel::Info,
306            AuditEventType::KernelTerminated,
307            "runtime",
308            format!("Kernel terminated: {}", reason.into()),
309        )
310        .with_target(kernel_id)
311    }
312
313    /// Create a security violation event.
314    pub fn security_violation(actor: impl Into<String>, violation: impl Into<String>) -> Self {
315        Self::new(
316            AuditLevel::Security,
317            AuditEventType::SecurityViolation,
318            actor,
319            violation,
320        )
321    }
322
323    /// Create a configuration change event.
324    pub fn config_change(
325        actor: impl Into<String>,
326        config_key: impl Into<String>,
327        old_value: impl Into<String>,
328        new_value: impl Into<String>,
329    ) -> Self {
330        Self::new(
331            AuditLevel::Compliance,
332            AuditEventType::ConfigurationChange,
333            actor,
334            format!("Configuration changed: {}", config_key.into()),
335        )
336        .with_metadata("old_value", old_value)
337        .with_metadata("new_value", new_value)
338    }
339
340    /// Create a health check event.
341    pub fn health_check(kernel_id: impl Into<String>, status: impl Into<String>) -> Self {
342        Self::new(
343            AuditLevel::Info,
344            AuditEventType::HealthCheck,
345            "health_checker",
346            format!("Health check: {}", status.into()),
347        )
348        .with_target(kernel_id)
349    }
350
351    /// Format as JSON.
352    pub fn to_json(&self) -> String {
353        let timestamp = self
354            .timestamp
355            .duration_since(UNIX_EPOCH)
356            .unwrap_or_default()
357            .as_millis();
358
359        let hlc_str = self
360            .hlc
361            .map(|h| {
362                format!(
363                    r#","hlc":{{"wall":{},"logical":{}}}"#,
364                    h.physical, h.logical
365                )
366            })
367            .unwrap_or_default();
368
369        let target_str = self
370            .target
371            .as_ref()
372            .map(|t| format!(r#","target":"{}""#, escape_json(t)))
373            .unwrap_or_default();
374
375        let prev_checksum_str = self
376            .prev_checksum
377            .map(|c| format!(r#","prev_checksum":{}"#, c))
378            .unwrap_or_default();
379
380        let metadata_str = if self.metadata.is_empty() {
381            String::new()
382        } else {
383            let pairs: Vec<String> = self
384                .metadata
385                .iter()
386                .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
387                .collect();
388            format!(r#","metadata":{{{}}}"#, pairs.join(","))
389        };
390
391        format!(
392            r#"{{"id":{},"timestamp":{}{},"level":"{}","event_type":"{}","actor":"{}"{}"description":"{}"{}"checksum":{}{}}}"#,
393            self.id,
394            timestamp,
395            hlc_str,
396            self.level.as_str(),
397            self.event_type.as_str(),
398            escape_json(&self.actor),
399            target_str,
400            escape_json(&self.description),
401            metadata_str,
402            self.checksum,
403            prev_checksum_str,
404        )
405    }
406}
407
408/// Escape a string for JSON.
409fn escape_json(s: &str) -> String {
410    s.replace('\\', "\\\\")
411        .replace('"', "\\\"")
412        .replace('\n', "\\n")
413        .replace('\r', "\\r")
414        .replace('\t', "\\t")
415}
416
417// Global event ID counter
418static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
419
420fn next_event_id() -> u64 {
421    EVENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
422}
423
424// ============================================================================
425// AUDIT SINK TRAIT
426// ============================================================================
427
428/// Trait for audit log output sinks.
429pub trait AuditSink: Send + Sync {
430    /// Write an audit event to the sink.
431    fn write(&self, event: &AuditEvent) -> std::io::Result<()>;
432
433    /// Flush any buffered events.
434    fn flush(&self) -> std::io::Result<()>;
435
436    /// Close the sink.
437    fn close(&self) -> std::io::Result<()>;
438}
439
440/// File-based audit sink.
441pub struct FileSink {
442    path: PathBuf,
443    writer: Mutex<Option<std::fs::File>>,
444    max_size: u64,
445    current_size: AtomicU64,
446}
447
448impl FileSink {
449    /// Create a new file sink.
450    pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
451        let path = path.into();
452        let file = std::fs::OpenOptions::new()
453            .create(true)
454            .append(true)
455            .open(&path)?;
456
457        let metadata = file.metadata()?;
458
459        Ok(Self {
460            path,
461            writer: Mutex::new(Some(file)),
462            max_size: 100 * 1024 * 1024, // 100 MB default
463            current_size: AtomicU64::new(metadata.len()),
464        })
465    }
466
467    /// Set the maximum file size before rotation.
468    pub fn with_max_size(mut self, size: u64) -> Self {
469        self.max_size = size;
470        self
471    }
472
473    /// Rotate the log file if needed.
474    fn rotate_if_needed(&self) -> std::io::Result<()> {
475        if self.current_size.load(Ordering::Relaxed) >= self.max_size {
476            let mut writer = self.writer.lock();
477            if let Some(file) = writer.take() {
478                drop(file);
479
480                // Rename current file with timestamp
481                let timestamp = SystemTime::now()
482                    .duration_since(UNIX_EPOCH)
483                    .unwrap_or_default()
484                    .as_secs();
485                let rotated_path = self.path.with_extension(format!("log.{}", timestamp));
486                std::fs::rename(&self.path, rotated_path)?;
487
488                // Create new file
489                let new_file = std::fs::OpenOptions::new()
490                    .create(true)
491                    .append(true)
492                    .open(&self.path)?;
493                *writer = Some(new_file);
494                self.current_size.store(0, Ordering::Relaxed);
495            }
496        }
497        Ok(())
498    }
499}
500
501impl AuditSink for FileSink {
502    fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
503        self.rotate_if_needed()?;
504
505        let json = event.to_json();
506        let line = format!("{}\n", json);
507        let len = line.len() as u64;
508
509        let mut writer = self.writer.lock();
510        if let Some(file) = writer.as_mut() {
511            file.write_all(line.as_bytes())?;
512            self.current_size.fetch_add(len, Ordering::Relaxed);
513        }
514        Ok(())
515    }
516
517    fn flush(&self) -> std::io::Result<()> {
518        let mut writer = self.writer.lock();
519        if let Some(file) = writer.as_mut() {
520            file.flush()?;
521        }
522        Ok(())
523    }
524
525    fn close(&self) -> std::io::Result<()> {
526        let mut writer = self.writer.lock();
527        if let Some(file) = writer.take() {
528            drop(file);
529        }
530        Ok(())
531    }
532}
533
534/// In-memory audit sink for testing.
535#[derive(Default)]
536pub struct MemorySink {
537    events: Mutex<VecDeque<AuditEvent>>,
538    max_events: usize,
539}
540
541impl MemorySink {
542    /// Create a new memory sink.
543    pub fn new(max_events: usize) -> Self {
544        Self {
545            events: Mutex::new(VecDeque::with_capacity(max_events)),
546            max_events,
547        }
548    }
549
550    /// Get all stored events.
551    pub fn events(&self) -> Vec<AuditEvent> {
552        self.events.lock().iter().cloned().collect()
553    }
554
555    /// Get the count of events.
556    pub fn len(&self) -> usize {
557        self.events.lock().len()
558    }
559
560    /// Check if empty.
561    pub fn is_empty(&self) -> bool {
562        self.events.lock().is_empty()
563    }
564
565    /// Clear all events.
566    pub fn clear(&self) {
567        self.events.lock().clear();
568    }
569}
570
571impl AuditSink for MemorySink {
572    fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
573        let mut events = self.events.lock();
574        if events.len() >= self.max_events {
575            events.pop_front();
576        }
577        events.push_back(event.clone());
578        Ok(())
579    }
580
581    fn flush(&self) -> std::io::Result<()> {
582        Ok(())
583    }
584
585    fn close(&self) -> std::io::Result<()> {
586        Ok(())
587    }
588}
589
590// ============================================================================
591// SYSLOG SINK (RFC 5424)
592// ============================================================================
593
594/// Syslog facility codes (RFC 5424).
595#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596#[repr(u8)]
597pub enum SyslogFacility {
598    /// Kernel messages.
599    Kern = 0,
600    /// User-level messages.
601    User = 1,
602    /// Security/authorization messages.
603    Auth = 4,
604    /// Security/authorization messages (private).
605    AuthPriv = 10,
606    /// Local use 0.
607    Local0 = 16,
608    /// Local use 1.
609    Local1 = 17,
610    /// Local use 2.
611    Local2 = 18,
612    /// Local use 3.
613    Local3 = 19,
614    /// Local use 4.
615    Local4 = 20,
616    /// Local use 5.
617    Local5 = 21,
618    /// Local use 6.
619    Local6 = 22,
620    /// Local use 7.
621    Local7 = 23,
622}
623
624/// Syslog severity codes (RFC 5424).
625#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626#[repr(u8)]
627pub enum SyslogSeverity {
628    /// System is unusable.
629    Emergency = 0,
630    /// Action must be taken immediately.
631    Alert = 1,
632    /// Critical conditions.
633    Critical = 2,
634    /// Error conditions.
635    Error = 3,
636    /// Warning conditions.
637    Warning = 4,
638    /// Normal but significant condition.
639    Notice = 5,
640    /// Informational messages.
641    Informational = 6,
642    /// Debug-level messages.
643    Debug = 7,
644}
645
646impl From<AuditLevel> for SyslogSeverity {
647    fn from(level: AuditLevel) -> Self {
648        match level {
649            AuditLevel::Info => SyslogSeverity::Informational,
650            AuditLevel::Warning => SyslogSeverity::Warning,
651            AuditLevel::Security => SyslogSeverity::Notice,
652            AuditLevel::Critical => SyslogSeverity::Error,
653            AuditLevel::Compliance => SyslogSeverity::Notice,
654        }
655    }
656}
657
658/// Configuration for syslog sink.
659#[derive(Debug, Clone)]
660pub struct SyslogConfig {
661    /// Syslog server address (e.g., "127.0.0.1:514").
662    pub server_addr: String,
663    /// Facility code.
664    pub facility: SyslogFacility,
665    /// Application name (APP-NAME in RFC 5424).
666    pub app_name: String,
667    /// Process ID (PROCID in RFC 5424).
668    pub procid: Option<String>,
669    /// Message ID (MSGID in RFC 5424).
670    pub msgid: Option<String>,
671    /// Use RFC 5424 format (true) or BSD format (false).
672    pub rfc5424: bool,
673}
674
675impl Default for SyslogConfig {
676    fn default() -> Self {
677        Self {
678            server_addr: "127.0.0.1:514".to_string(),
679            facility: SyslogFacility::Local0,
680            app_name: "ringkernel".to_string(),
681            procid: None,
682            msgid: None,
683            rfc5424: true,
684        }
685    }
686}
687
688/// RFC 5424 syslog sink for remote audit log forwarding.
689pub struct SyslogSink {
690    config: SyslogConfig,
691    socket: Mutex<Option<UdpSocket>>,
692    hostname: String,
693}
694
695impl SyslogSink {
696    /// Create a new syslog sink with the given configuration.
697    pub fn new(config: SyslogConfig) -> std::io::Result<Self> {
698        let socket = UdpSocket::bind("0.0.0.0:0")?;
699        socket.connect(&config.server_addr)?;
700
701        // Get hostname
702        let hostname = std::env::var("HOSTNAME")
703            .or_else(|_| std::env::var("HOST"))
704            .unwrap_or_else(|_| "localhost".to_string());
705
706        Ok(Self {
707            config,
708            socket: Mutex::new(Some(socket)),
709            hostname,
710        })
711    }
712
713    /// Create a syslog sink with default configuration.
714    pub fn with_server(server_addr: impl Into<String>) -> std::io::Result<Self> {
715        Self::new(SyslogConfig {
716            server_addr: server_addr.into(),
717            ..Default::default()
718        })
719    }
720
721    /// Format an audit event as RFC 5424 syslog message.
722    fn format_rfc5424(&self, event: &AuditEvent) -> String {
723        let severity: SyslogSeverity = event.level.into();
724        let priority = (self.config.facility as u8) * 8 + (severity as u8);
725
726        // RFC 5424 timestamp format
727        let timestamp = event
728            .timestamp
729            .duration_since(UNIX_EPOCH)
730            .unwrap_or_default();
731        let secs = timestamp.as_secs();
732        let millis = timestamp.subsec_millis();
733
734        // Simple ISO 8601 format (we don't have chrono, so approximate)
735        let epoch_days = secs / 86400;
736        let day_secs = secs % 86400;
737        let hours = day_secs / 3600;
738        let minutes = (day_secs % 3600) / 60;
739        let seconds = day_secs % 60;
740
741        // Approximate date calculation (not accounting for leap years perfectly)
742        let year = 1970 + (epoch_days / 365);
743        let day_of_year = epoch_days % 365;
744        let month = (day_of_year / 30).min(11) + 1;
745        let day = (day_of_year % 30) + 1;
746
747        let timestamp_str = format!(
748            "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
749            year, month, day, hours, minutes, seconds, millis
750        );
751
752        let procid = self.config.procid.as_deref().unwrap_or("-");
753        let msgid = self.config.msgid.as_deref().unwrap_or("-");
754
755        // Structured data (SD-ELEMENT)
756        let sd = format!(
757            "[ringkernel@12345 level=\"{}\" event_type=\"{}\" actor=\"{}\" checksum=\"{}\"]",
758            event.level.as_str(),
759            event.event_type.as_str(),
760            event.actor,
761            event.checksum
762        );
763
764        format!(
765            "<{}>{} {} {} {} {} {} {} {}",
766            priority,
767            1, // version
768            timestamp_str,
769            self.hostname,
770            self.config.app_name,
771            procid,
772            msgid,
773            sd,
774            event.description
775        )
776    }
777
778    /// Format an audit event as BSD syslog message.
779    fn format_bsd(&self, event: &AuditEvent) -> String {
780        let severity: SyslogSeverity = event.level.into();
781        let priority = (self.config.facility as u8) * 8 + (severity as u8);
782
783        let timestamp = event
784            .timestamp
785            .duration_since(UNIX_EPOCH)
786            .unwrap_or_default();
787        let secs = timestamp.as_secs();
788
789        // BSD syslog timestamp format (Mmm dd hh:mm:ss)
790        let months = [
791            "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
792        ];
793        let epoch_days = secs / 86400;
794        let day_secs = secs % 86400;
795        let hours = day_secs / 3600;
796        let minutes = (day_secs % 3600) / 60;
797        let seconds = day_secs % 60;
798
799        let day_of_year = epoch_days % 365;
800        let month_idx = ((day_of_year / 30) as usize).min(11);
801        let day = (day_of_year % 30) + 1;
802
803        let timestamp_str = format!(
804            "{} {:2} {:02}:{:02}:{:02}",
805            months[month_idx], day, hours, minutes, seconds
806        );
807
808        format!(
809            "<{}>{} {} {}: [{}] {}",
810            priority,
811            timestamp_str,
812            self.hostname,
813            self.config.app_name,
814            event.event_type.as_str(),
815            event.description
816        )
817    }
818}
819
820impl AuditSink for SyslogSink {
821    fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
822        let message = if self.config.rfc5424 {
823            self.format_rfc5424(event)
824        } else {
825            self.format_bsd(event)
826        };
827
828        let socket = self.socket.lock();
829        if let Some(ref sock) = *socket {
830            sock.send(message.as_bytes())?;
831        }
832        Ok(())
833    }
834
835    fn flush(&self) -> std::io::Result<()> {
836        Ok(())
837    }
838
839    fn close(&self) -> std::io::Result<()> {
840        let mut socket = self.socket.lock();
841        *socket = None;
842        Ok(())
843    }
844}
845
846// ============================================================================
847// ELASTICSEARCH SINK (requires alerting feature for reqwest)
848// ============================================================================
849
850/// Configuration for Elasticsearch audit sink.
851#[cfg(feature = "alerting")]
852#[derive(Debug, Clone)]
853pub struct ElasticsearchConfig {
854    /// Elasticsearch URL (e.g., "http://localhost:9200").
855    pub url: String,
856    /// Index name or pattern (e.g., "ringkernel-audit-{date}").
857    pub index_pattern: String,
858    /// Optional authentication (Basic auth).
859    pub auth: Option<(String, String)>,
860    /// Batch size before flushing.
861    pub batch_size: usize,
862    /// Request timeout.
863    pub timeout: Duration,
864}
865
866#[cfg(feature = "alerting")]
867impl Default for ElasticsearchConfig {
868    fn default() -> Self {
869        Self {
870            url: "http://localhost:9200".to_string(),
871            index_pattern: "ringkernel-audit".to_string(),
872            auth: None,
873            batch_size: 100,
874            timeout: Duration::from_secs(30),
875        }
876    }
877}
878
879/// Elasticsearch sink for direct indexing of audit events.
880#[cfg(feature = "alerting")]
881pub struct ElasticsearchSink {
882    config: ElasticsearchConfig,
883    client: reqwest::blocking::Client,
884    buffer: Mutex<Vec<String>>,
885}
886
887#[cfg(feature = "alerting")]
888impl ElasticsearchSink {
889    /// Create a new Elasticsearch sink.
890    pub fn new(config: ElasticsearchConfig) -> Result<Self, reqwest::Error> {
891        let client = reqwest::blocking::Client::builder()
892            .timeout(config.timeout)
893            .build()?;
894
895        Ok(Self {
896            config,
897            client,
898            buffer: Mutex::new(Vec::new()),
899        })
900    }
901
902    /// Get the index name for an event.
903    fn get_index(&self, event: &AuditEvent) -> String {
904        let timestamp = event
905            .timestamp
906            .duration_since(UNIX_EPOCH)
907            .unwrap_or_default();
908        let secs = timestamp.as_secs();
909
910        // Calculate date components
911        let epoch_days = secs / 86400;
912        let year = 1970 + (epoch_days / 365);
913        let day_of_year = epoch_days % 365;
914        let month = (day_of_year / 30).min(11) + 1;
915        let day = (day_of_year % 30) + 1;
916
917        let date_str = format!("{:04}.{:02}.{:02}", year, month, day);
918
919        self.config
920            .index_pattern
921            .replace("{date}", &date_str)
922            .replace("{year}", &format!("{:04}", year))
923            .replace("{month}", &format!("{:02}", month))
924            .replace("{day}", &format!("{:02}", day))
925    }
926
927    /// Convert an audit event to Elasticsearch document JSON.
928    fn to_es_document(&self, event: &AuditEvent) -> String {
929        let timestamp_millis = event
930            .timestamp
931            .duration_since(UNIX_EPOCH)
932            .unwrap_or_default()
933            .as_millis();
934
935        // Build metadata as nested JSON
936        let metadata_json = if event.metadata.is_empty() {
937            "{}".to_string()
938        } else {
939            let pairs: Vec<String> = event
940                .metadata
941                .iter()
942                .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
943                .collect();
944            format!("{{{}}}", pairs.join(","))
945        };
946
947        let hlc_json = event
948            .hlc
949            .map(|h| {
950                format!(
951                    r#","hlc":{{"physical":{},"logical":{}}}"#,
952                    h.physical, h.logical
953                )
954            })
955            .unwrap_or_default();
956
957        let target_json = event
958            .target
959            .as_ref()
960            .map(|t| format!(r#","target":"{}""#, escape_json(t)))
961            .unwrap_or_default();
962
963        format!(
964            r#"{{"@timestamp":{},"id":{},"level":"{}","event_type":"{}","actor":"{}"{}{}"description":"{}","metadata":{},"checksum":{}}}"#,
965            timestamp_millis,
966            event.id,
967            event.level.as_str(),
968            event.event_type.as_str(),
969            escape_json(&event.actor),
970            target_json,
971            hlc_json,
972            escape_json(&event.description),
973            metadata_json,
974            event.checksum
975        )
976    }
977
978    /// Flush the buffer to Elasticsearch using bulk API.
979    fn flush_buffer(&self) -> std::io::Result<()> {
980        let documents: Vec<String> = {
981            let mut buffer = self.buffer.lock();
982            std::mem::take(&mut *buffer)
983        };
984
985        if documents.is_empty() {
986            return Ok(());
987        }
988
989        // Build bulk request body
990        let mut bulk_body = String::new();
991        for doc in documents {
992            // Action line
993            bulk_body.push_str(&format!(
994                r#"{{"index":{{"_index":"{}"}}}}"#,
995                self.config.index_pattern.replace("{date}", "current")
996            ));
997            bulk_body.push('\n');
998            // Document line
999            bulk_body.push_str(&doc);
1000            bulk_body.push('\n');
1001        }
1002
1003        let url = format!("{}/_bulk", self.config.url);
1004        let mut request = self
1005            .client
1006            .post(&url)
1007            .body(bulk_body)
1008            .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson");
1009
1010        if let Some((user, pass)) = &self.config.auth {
1011            request = request.basic_auth(user, Some(pass));
1012        }
1013
1014        request.send().map_err(|e| {
1015            std::io::Error::new(
1016                std::io::ErrorKind::Other,
1017                format!("ES request failed: {}", e),
1018            )
1019        })?;
1020
1021        Ok(())
1022    }
1023}
1024
1025#[cfg(feature = "alerting")]
1026impl AuditSink for ElasticsearchSink {
1027    fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1028        let doc = self.to_es_document(event);
1029
1030        let should_flush = {
1031            let mut buffer = self.buffer.lock();
1032            buffer.push(doc);
1033            buffer.len() >= self.config.batch_size
1034        };
1035
1036        if should_flush {
1037            self.flush_buffer()?;
1038        }
1039
1040        Ok(())
1041    }
1042
1043    fn flush(&self) -> std::io::Result<()> {
1044        self.flush_buffer()
1045    }
1046
1047    fn close(&self) -> std::io::Result<()> {
1048        self.flush_buffer()
1049    }
1050}
1051
1052// ============================================================================
1053// CLOUDWATCH LOGS SINK
1054// ============================================================================
1055
1056/// Configuration for CloudWatch Logs sink.
1057#[derive(Debug, Clone)]
1058pub struct CloudWatchConfig {
1059    /// Log group name.
1060    pub log_group: String,
1061    /// Log stream name.
1062    pub log_stream: String,
1063    /// AWS region.
1064    pub region: String,
1065    /// Batch size before flushing.
1066    pub batch_size: usize,
1067    /// Whether to auto-create the log group and stream if they don't exist.
1068    #[cfg(feature = "cloudwatch")]
1069    pub auto_create: bool,
1070    /// Maximum retries for throttled requests.
1071    #[cfg(feature = "cloudwatch")]
1072    pub max_retries: u32,
1073}
1074
1075impl Default for CloudWatchConfig {
1076    fn default() -> Self {
1077        Self {
1078            log_group: "/ringkernel/audit".to_string(),
1079            log_stream: "default".to_string(),
1080            region: "us-east-1".to_string(),
1081            batch_size: 100,
1082            #[cfg(feature = "cloudwatch")]
1083            auto_create: true,
1084            #[cfg(feature = "cloudwatch")]
1085            max_retries: 3,
1086        }
1087    }
1088}
1089
1090/// CloudWatch Logs sink for AWS-native audit logging.
1091///
1092/// When the `cloudwatch` feature is enabled, this sink uses the AWS SDK to
1093/// upload audit events to CloudWatch Logs via the `PutLogEvents` API with
1094/// automatic batching and retry on throttling.
1095///
1096/// When the `cloudwatch` feature is **not** enabled, this acts as a stub that
1097/// buffers events but drops them with a warning on flush.
1098pub struct CloudWatchSink {
1099    config: CloudWatchConfig,
1100    buffer: Mutex<Vec<(u64, String)>>, // (timestamp_ms, message)
1101    #[cfg_attr(not(feature = "cloudwatch"), allow(dead_code))]
1102    sequence_token: Mutex<Option<String>>,
1103    #[cfg(feature = "cloudwatch")]
1104    client: aws_sdk_cloudwatchlogs::Client,
1105    #[cfg(feature = "cloudwatch")]
1106    initialized: Mutex<bool>,
1107}
1108
1109impl CloudWatchSink {
1110    /// Create a new CloudWatch Logs sink.
1111    ///
1112    /// When the `cloudwatch` feature is enabled, this initializes the AWS SDK
1113    /// client using the default credential chain (environment variables, AWS
1114    /// config files, IAM roles, etc.). The client is created synchronously by
1115    /// blocking on the tokio runtime, following the same pattern used by
1116    /// [`S3Storage`](crate::cloud_storage::S3Storage).
1117    #[cfg(feature = "cloudwatch")]
1118    pub fn new(config: CloudWatchConfig) -> Self {
1119        let client = tokio::task::block_in_place(|| {
1120            tokio::runtime::Handle::current().block_on(async {
1121                let region = aws_sdk_cloudwatchlogs::config::Region::new(config.region.clone());
1122                let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
1123                    .region(region)
1124                    .load()
1125                    .await;
1126                aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
1127            })
1128        });
1129
1130        Self {
1131            config,
1132            buffer: Mutex::new(Vec::new()),
1133            sequence_token: Mutex::new(None),
1134            client,
1135            initialized: Mutex::new(false),
1136        }
1137    }
1138
1139    /// Create a new CloudWatch Logs sink (stub, without `cloudwatch` feature).
1140    #[cfg(not(feature = "cloudwatch"))]
1141    pub fn new(config: CloudWatchConfig) -> Self {
1142        Self {
1143            config,
1144            buffer: Mutex::new(Vec::new()),
1145            sequence_token: Mutex::new(None),
1146        }
1147    }
1148
1149    /// Create a CloudWatch Logs sink with explicit AWS credentials.
1150    ///
1151    /// This is useful for environments where the default credential chain
1152    /// is not configured (e.g., local development, testing).
1153    #[cfg(feature = "cloudwatch")]
1154    pub fn with_credentials(
1155        config: CloudWatchConfig,
1156        access_key: impl Into<String>,
1157        secret_key: impl Into<String>,
1158    ) -> Self {
1159        let access_key = access_key.into();
1160        let secret_key = secret_key.into();
1161        let client = tokio::task::block_in_place(|| {
1162            tokio::runtime::Handle::current().block_on(async {
1163                let region = aws_sdk_cloudwatchlogs::config::Region::new(config.region.clone());
1164                let creds = aws_sdk_cloudwatchlogs::config::Credentials::new(
1165                    access_key,
1166                    secret_key,
1167                    None, // session token
1168                    None, // expiry
1169                    "ringkernel",
1170                );
1171                let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
1172                    .region(region)
1173                    .credentials_provider(creds)
1174                    .load()
1175                    .await;
1176                aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
1177            })
1178        });
1179
1180        Self {
1181            config,
1182            buffer: Mutex::new(Vec::new()),
1183            sequence_token: Mutex::new(None),
1184            client,
1185            initialized: Mutex::new(false),
1186        }
1187    }
1188
1189    /// Get the configuration.
1190    pub fn config(&self) -> &CloudWatchConfig {
1191        &self.config
1192    }
1193
1194    /// Get the current buffer size.
1195    pub fn buffer_size(&self) -> usize {
1196        self.buffer.lock().len()
1197    }
1198
1199    /// Ensure the log group and log stream exist, creating them if
1200    /// `auto_create` is enabled.
1201    #[cfg(feature = "cloudwatch")]
1202    fn ensure_log_group_and_stream(&self) -> std::io::Result<()> {
1203        {
1204            let initialized = self.initialized.lock();
1205            if *initialized {
1206                return Ok(());
1207            }
1208        }
1209
1210        if !self.config.auto_create {
1211            let mut initialized = self.initialized.lock();
1212            *initialized = true;
1213            return Ok(());
1214        }
1215
1216        tokio::task::block_in_place(|| {
1217            tokio::runtime::Handle::current().block_on(async {
1218                // Create log group (ignore if it already exists)
1219                let create_group_result = self
1220                    .client
1221                    .create_log_group()
1222                    .log_group_name(&self.config.log_group)
1223                    .send()
1224                    .await;
1225
1226                if let Err(e) = &create_group_result {
1227                    let is_already_exists = e
1228                        .as_service_error()
1229                        .map(|se| se.is_resource_already_exists_exception())
1230                        .unwrap_or(false);
1231                    if !is_already_exists {
1232                        return Err(std::io::Error::new(
1233                            std::io::ErrorKind::Other,
1234                            format!("Failed to create CloudWatch log group: {}", e),
1235                        ));
1236                    }
1237                }
1238
1239                // Create log stream (ignore if it already exists)
1240                let create_stream_result = self
1241                    .client
1242                    .create_log_stream()
1243                    .log_group_name(&self.config.log_group)
1244                    .log_stream_name(&self.config.log_stream)
1245                    .send()
1246                    .await;
1247
1248                if let Err(e) = &create_stream_result {
1249                    let is_already_exists = e
1250                        .as_service_error()
1251                        .map(|se| se.is_resource_already_exists_exception())
1252                        .unwrap_or(false);
1253                    if !is_already_exists {
1254                        return Err(std::io::Error::new(
1255                            std::io::ErrorKind::Other,
1256                            format!("Failed to create CloudWatch log stream: {}", e),
1257                        ));
1258                    }
1259                }
1260
1261                let mut initialized = self.initialized.lock();
1262                *initialized = true;
1263                Ok(())
1264            })
1265        })
1266    }
1267
1268    /// Flush buffered events to CloudWatch Logs using `PutLogEvents`.
1269    ///
1270    /// Events are sorted by timestamp (required by the API) and sent in a
1271    /// single batch. Handles sequence token management and retries on
1272    /// throttling (`ThrottlingException`) with exponential backoff.
1273    #[cfg(feature = "cloudwatch")]
1274    fn flush_to_cloudwatch(&self) -> std::io::Result<()> {
1275        let events: Vec<(u64, String)> = {
1276            let mut buffer = self.buffer.lock();
1277            std::mem::take(&mut *buffer)
1278        };
1279
1280        if events.is_empty() {
1281            return Ok(());
1282        }
1283
1284        self.ensure_log_group_and_stream()?;
1285
1286        // Build CloudWatch InputLogEvent entries, sorted by timestamp
1287        // (CloudWatch requires chronological order within a batch).
1288        let mut log_events: Vec<aws_sdk_cloudwatchlogs::types::InputLogEvent> = events
1289            .into_iter()
1290            .map(|(ts, msg)| {
1291                aws_sdk_cloudwatchlogs::types::InputLogEvent::builder()
1292                    .timestamp(ts as i64)
1293                    .message(msg)
1294                    .build()
1295                    .expect("InputLogEvent builder should not fail with timestamp and message set")
1296            })
1297            .collect();
1298
1299        log_events.sort_by_key(|e| e.timestamp());
1300
1301        tokio::task::block_in_place(|| {
1302            tokio::runtime::Handle::current().block_on(async {
1303                let mut retries = 0u32;
1304
1305                loop {
1306                    let mut request = self
1307                        .client
1308                        .put_log_events()
1309                        .log_group_name(&self.config.log_group)
1310                        .log_stream_name(&self.config.log_stream);
1311
1312                    // Attach sequence token if we have one
1313                    {
1314                        let token = self.sequence_token.lock();
1315                        if let Some(ref tok) = *token {
1316                            request = request.sequence_token(tok);
1317                        }
1318                    }
1319
1320                    for event in &log_events {
1321                        request = request.log_events(event.clone());
1322                    }
1323
1324                    match request.send().await {
1325                        Ok(output) => {
1326                            // Store the next sequence token for subsequent calls
1327                            let mut token = self.sequence_token.lock();
1328                            *token = output.next_sequence_token().map(|s| s.to_string());
1329
1330                            tracing::debug!(
1331                                event_count = log_events.len(),
1332                                log_group = %self.config.log_group,
1333                                log_stream = %self.config.log_stream,
1334                                "Successfully uploaded {} audit events to CloudWatch Logs",
1335                                log_events.len(),
1336                            );
1337                            return Ok(());
1338                        }
1339                        Err(e) => {
1340                            if let Some(service_err) = e.as_service_error() {
1341                                // Handle InvalidSequenceTokenException: extract
1342                                // the expected token from the structured error and
1343                                // retry. Note: modern CloudWatch APIs no longer
1344                                // return this, but we handle it for completeness.
1345                                if service_err.is_invalid_sequence_token_exception() {
1346                                    if let aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError::InvalidSequenceTokenException(ref inner) = service_err {
1347                                        let mut token = self.sequence_token.lock();
1348                                        *token = inner.expected_sequence_token().map(|s| s.to_string());
1349                                    }
1350
1351                                    if retries < self.config.max_retries {
1352                                        retries += 1;
1353                                        continue;
1354                                    }
1355                                }
1356
1357                                // Handle DataAlreadyAcceptedException: the batch
1358                                // was already ingested. Update the sequence token
1359                                // and treat as success.
1360                                if service_err.is_data_already_accepted_exception() {
1361                                    if let aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError::DataAlreadyAcceptedException(ref inner) = service_err {
1362                                        let mut token = self.sequence_token.lock();
1363                                        *token = inner.expected_sequence_token().map(|s| s.to_string());
1364                                    }
1365                                    tracing::debug!(
1366                                        "CloudWatch PutLogEvents: data already accepted, skipping"
1367                                    );
1368                                    return Ok(());
1369                                }
1370
1371                                // Retry on ServiceUnavailableException with
1372                                // exponential backoff.
1373                                if service_err.is_service_unavailable_exception()
1374                                    && retries < self.config.max_retries
1375                                {
1376                                    retries += 1;
1377                                    let backoff = std::time::Duration::from_millis(
1378                                        100 * 2u64.pow(retries),
1379                                    );
1380                                    tracing::warn!(
1381                                        retry = retries,
1382                                        backoff_ms = backoff.as_millis() as u64,
1383                                        "CloudWatch PutLogEvents service unavailable, retrying"
1384                                    );
1385                                    tokio::time::sleep(backoff).await;
1386                                    continue;
1387                                }
1388                            }
1389
1390                            // Check for throttling at the SDK/HTTP level
1391                            // (error code "Throttling" or "ThrottlingException").
1392                            {
1393                                use aws_sdk_cloudwatchlogs::error::ProvideErrorMetadata;
1394                                let is_throttled = e
1395                                    .as_service_error()
1396                                    .and_then(|se| se.code())
1397                                    .map(|code| {
1398                                        code == "Throttling"
1399                                            || code == "ThrottlingException"
1400                                            || code == "TooManyRequestsException"
1401                                    })
1402                                    .unwrap_or(false);
1403
1404                                if is_throttled && retries < self.config.max_retries {
1405                                    retries += 1;
1406                                    let backoff = std::time::Duration::from_millis(
1407                                        100 * 2u64.pow(retries),
1408                                    );
1409                                    tracing::warn!(
1410                                        retry = retries,
1411                                        backoff_ms = backoff.as_millis() as u64,
1412                                        "CloudWatch PutLogEvents throttled, retrying"
1413                                    );
1414                                    tokio::time::sleep(backoff).await;
1415                                    continue;
1416                                }
1417                            }
1418
1419                            return Err(std::io::Error::new(
1420                                std::io::ErrorKind::Other,
1421                                format!("CloudWatch PutLogEvents failed: {}", e),
1422                            ));
1423                        }
1424                    }
1425                }
1426            })
1427        })
1428    }
1429
1430    /// Stub flush: drops events with a warning.
1431    #[cfg(not(feature = "cloudwatch"))]
1432    fn flush_stub(&self) -> std::io::Result<()> {
1433        let events: Vec<(u64, String)> = {
1434            let mut buffer = self.buffer.lock();
1435            std::mem::take(&mut *buffer)
1436        };
1437
1438        if events.is_empty() {
1439            return Ok(());
1440        }
1441
1442        // Stub: CloudWatch Logs integration requires aws-sdk-cloudwatchlogs.
1443        // Events are dropped with a warning. To implement, add the `cloudwatch`
1444        // feature flag and the aws-sdk-cloudwatchlogs dependency.
1445        tracing::warn!(
1446            event_count = events.len(),
1447            log_group = %self.config.log_group,
1448            log_stream = %self.config.log_stream,
1449            "CloudWatch sink is a stub: {} audit events dropped. \
1450             Enable the `cloudwatch` feature for real AWS integration.",
1451            events.len(),
1452        );
1453
1454        Ok(())
1455    }
1456}
1457
1458impl AuditSink for CloudWatchSink {
1459    fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1460        let timestamp_ms = event
1461            .timestamp
1462            .duration_since(UNIX_EPOCH)
1463            .unwrap_or_default()
1464            .as_millis() as u64;
1465
1466        let message = event.to_json();
1467
1468        let should_flush = {
1469            let mut buffer = self.buffer.lock();
1470            buffer.push((timestamp_ms, message));
1471            buffer.len() >= self.config.batch_size
1472        };
1473
1474        if should_flush {
1475            self.flush()?;
1476        }
1477
1478        Ok(())
1479    }
1480
1481    fn flush(&self) -> std::io::Result<()> {
1482        #[cfg(feature = "cloudwatch")]
1483        {
1484            self.flush_to_cloudwatch()
1485        }
1486        #[cfg(not(feature = "cloudwatch"))]
1487        {
1488            self.flush_stub()
1489        }
1490    }
1491
1492    fn close(&self) -> std::io::Result<()> {
1493        self.flush()
1494    }
1495}
1496
1497// ============================================================================
1498// AUDIT LOGGER
1499// ============================================================================
1500
1501/// Configuration for the audit logger.
1502#[derive(Debug, Clone)]
1503pub struct AuditConfig {
1504    /// Minimum level to log.
1505    pub min_level: AuditLevel,
1506    /// Whether to include checksums.
1507    pub enable_checksums: bool,
1508    /// Buffer size before flushing.
1509    pub buffer_size: usize,
1510    /// Flush interval.
1511    pub flush_interval: Duration,
1512    /// Retention period.
1513    pub retention: Duration,
1514}
1515
1516impl Default for AuditConfig {
1517    fn default() -> Self {
1518        Self {
1519            min_level: AuditLevel::Info,
1520            enable_checksums: true,
1521            buffer_size: 100,
1522            flush_interval: Duration::from_secs(5),
1523            retention: Duration::from_secs(90 * 24 * 60 * 60), // 90 days
1524        }
1525    }
1526}
1527
1528/// Builder for AuditLogger.
1529pub struct AuditLoggerBuilder {
1530    config: AuditConfig,
1531    sinks: Vec<Arc<dyn AuditSink>>,
1532}
1533
1534impl AuditLoggerBuilder {
1535    /// Create a new builder.
1536    pub fn new() -> Self {
1537        Self {
1538            config: AuditConfig::default(),
1539            sinks: Vec::new(),
1540        }
1541    }
1542
1543    /// Set the minimum log level.
1544    pub fn with_min_level(mut self, level: AuditLevel) -> Self {
1545        self.config.min_level = level;
1546        self
1547    }
1548
1549    /// Add a file sink.
1550    pub fn with_file_sink(mut self, path: impl Into<PathBuf>) -> std::io::Result<Self> {
1551        let sink = Arc::new(FileSink::new(path)?);
1552        self.sinks.push(sink);
1553        Ok(self)
1554    }
1555
1556    /// Add a memory sink.
1557    pub fn with_memory_sink(mut self, max_events: usize) -> Self {
1558        let sink = Arc::new(MemorySink::new(max_events));
1559        self.sinks.push(sink);
1560        self
1561    }
1562
1563    /// Add a custom sink.
1564    pub fn with_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
1565        self.sinks.push(sink);
1566        self
1567    }
1568
1569    /// Add a syslog sink.
1570    pub fn with_syslog_sink(mut self, config: SyslogConfig) -> std::io::Result<Self> {
1571        let sink = Arc::new(SyslogSink::new(config)?);
1572        self.sinks.push(sink);
1573        Ok(self)
1574    }
1575
1576    /// Add a syslog sink with just a server address.
1577    pub fn with_syslog(mut self, server_addr: impl Into<String>) -> std::io::Result<Self> {
1578        let sink = Arc::new(SyslogSink::with_server(server_addr)?);
1579        self.sinks.push(sink);
1580        Ok(self)
1581    }
1582
1583    /// Add a CloudWatch Logs sink.
1584    pub fn with_cloudwatch_sink(mut self, config: CloudWatchConfig) -> Self {
1585        let sink = Arc::new(CloudWatchSink::new(config));
1586        self.sinks.push(sink);
1587        self
1588    }
1589
1590    /// Add an Elasticsearch sink (requires `alerting` feature).
1591    #[cfg(feature = "alerting")]
1592    pub fn with_elasticsearch_sink(
1593        mut self,
1594        config: ElasticsearchConfig,
1595    ) -> Result<Self, reqwest::Error> {
1596        let sink = Arc::new(ElasticsearchSink::new(config)?);
1597        self.sinks.push(sink);
1598        Ok(self)
1599    }
1600
1601    /// Set the retention period.
1602    pub fn with_retention(mut self, retention: Duration) -> Self {
1603        self.config.retention = retention;
1604        self
1605    }
1606
1607    /// Enable or disable checksums.
1608    pub fn with_checksums(mut self, enable: bool) -> Self {
1609        self.config.enable_checksums = enable;
1610        self
1611    }
1612
1613    /// Build the logger.
1614    pub fn build(self) -> AuditLogger {
1615        AuditLogger {
1616            config: self.config,
1617            sinks: self.sinks,
1618            last_checksum: AtomicU64::new(0),
1619            event_count: AtomicU64::new(0),
1620            buffer: RwLock::new(Vec::new()),
1621        }
1622    }
1623}
1624
1625impl Default for AuditLoggerBuilder {
1626    fn default() -> Self {
1627        Self::new()
1628    }
1629}
1630
1631/// The main audit logger.
1632pub struct AuditLogger {
1633    config: AuditConfig,
1634    sinks: Vec<Arc<dyn AuditSink>>,
1635    last_checksum: AtomicU64,
1636    event_count: AtomicU64,
1637    buffer: RwLock<Vec<AuditEvent>>,
1638}
1639
1640impl AuditLogger {
1641    /// Create a new logger builder.
1642    pub fn builder() -> AuditLoggerBuilder {
1643        AuditLoggerBuilder::new()
1644    }
1645
1646    /// Create a simple in-memory logger for testing.
1647    pub fn in_memory(max_events: usize) -> Self {
1648        AuditLoggerBuilder::new()
1649            .with_memory_sink(max_events)
1650            .build()
1651    }
1652
1653    /// Log an audit event.
1654    pub fn log(&self, mut event: AuditEvent) {
1655        // Check level
1656        if event.level < self.config.min_level {
1657            return;
1658        }
1659
1660        // Add chain checksum if enabled
1661        if self.config.enable_checksums {
1662            let prev = self.last_checksum.load(Ordering::Acquire);
1663            event = event.with_prev_checksum(prev);
1664            self.last_checksum.store(event.checksum, Ordering::Release);
1665        }
1666
1667        // Write to all sinks
1668        for sink in &self.sinks {
1669            if let Err(e) = sink.write(&event) {
1670                tracing::error!("Audit sink error: {}", e);
1671            }
1672        }
1673
1674        self.event_count.fetch_add(1, Ordering::Relaxed);
1675    }
1676
1677    /// Log a kernel launch event.
1678    pub fn log_kernel_launched(&self, kernel_id: &str, backend: &str) {
1679        self.log(AuditEvent::kernel_launched(kernel_id, backend));
1680    }
1681
1682    /// Log a kernel termination event.
1683    pub fn log_kernel_terminated(&self, kernel_id: &str, reason: &str) {
1684        self.log(AuditEvent::kernel_terminated(kernel_id, reason));
1685    }
1686
1687    /// Log a security violation.
1688    pub fn log_security_violation(&self, actor: &str, violation: &str) {
1689        self.log(AuditEvent::security_violation(actor, violation));
1690    }
1691
1692    /// Log a configuration change.
1693    pub fn log_config_change(&self, actor: &str, key: &str, old_value: &str, new_value: &str) {
1694        self.log(AuditEvent::config_change(actor, key, old_value, new_value));
1695    }
1696
1697    /// Get the total event count.
1698    pub fn event_count(&self) -> u64 {
1699        self.event_count.load(Ordering::Relaxed)
1700    }
1701
1702    /// Buffer an event for batch processing.
1703    ///
1704    /// Events buffered with this method can be flushed with `flush_buffered`.
1705    pub fn buffer_event(&self, event: AuditEvent) {
1706        let mut buffer = self.buffer.write();
1707        buffer.push(event);
1708    }
1709
1710    /// Flush all buffered events to sinks.
1711    pub fn flush_buffered(&self) -> std::io::Result<()> {
1712        let events: Vec<AuditEvent> = {
1713            let mut buffer = self.buffer.write();
1714            std::mem::take(&mut *buffer)
1715        };
1716
1717        for mut event in events {
1718            // Add chain checksum if enabled
1719            if self.config.enable_checksums {
1720                let prev = self.last_checksum.load(Ordering::Acquire);
1721                event = event.with_prev_checksum(prev);
1722                self.last_checksum.store(event.checksum, Ordering::Release);
1723            }
1724
1725            // Write to all sinks
1726            for sink in &self.sinks {
1727                sink.write(&event)?;
1728            }
1729
1730            self.event_count.fetch_add(1, Ordering::Relaxed);
1731        }
1732
1733        self.flush()
1734    }
1735
1736    /// Get the count of buffered events.
1737    pub fn buffered_count(&self) -> usize {
1738        self.buffer.read().len()
1739    }
1740
1741    /// Flush all sinks.
1742    pub fn flush(&self) -> std::io::Result<()> {
1743        for sink in &self.sinks {
1744            sink.flush()?;
1745        }
1746        Ok(())
1747    }
1748
1749    /// Close all sinks.
1750    pub fn close(&self) -> std::io::Result<()> {
1751        for sink in &self.sinks {
1752            sink.close()?;
1753        }
1754        Ok(())
1755    }
1756}
1757
1758// ============================================================================
1759// TESTS
1760// ============================================================================
1761
1762#[cfg(test)]
1763mod tests {
1764    use super::*;
1765
1766    #[test]
1767    fn test_audit_event_creation() {
1768        let event = AuditEvent::new(
1769            AuditLevel::Info,
1770            AuditEventType::KernelLaunched,
1771            "runtime",
1772            "Kernel launched",
1773        );
1774
1775        assert_eq!(event.level, AuditLevel::Info);
1776        assert_eq!(event.event_type, AuditEventType::KernelLaunched);
1777        assert_eq!(event.actor, "runtime");
1778        assert!(event.checksum != 0);
1779    }
1780
1781    #[test]
1782    fn test_audit_event_checksum() {
1783        let event = AuditEvent::kernel_launched("test_kernel", "cuda");
1784        assert!(event.verify_checksum());
1785
1786        // Modifying the event should invalidate the checksum
1787        let mut modified = event.clone();
1788        modified.description = "Modified".to_string();
1789        assert!(!modified.verify_checksum());
1790    }
1791
1792    #[test]
1793    fn test_audit_event_chain() {
1794        let event1 = AuditEvent::kernel_launched("k1", "cuda");
1795        let event2 = AuditEvent::kernel_launched("k2", "cuda").with_prev_checksum(event1.checksum);
1796
1797        assert_eq!(event2.prev_checksum, Some(event1.checksum));
1798    }
1799
1800    #[test]
1801    fn test_audit_event_json() {
1802        let event = AuditEvent::kernel_launched("test", "cuda")
1803            .with_metadata("gpu_id", "0")
1804            .with_metadata("memory_mb", "8192");
1805
1806        let json = event.to_json();
1807        assert!(json.contains("kernel_launched"));
1808        assert!(json.contains("test"));
1809        assert!(json.contains("cuda"));
1810        assert!(json.contains("gpu_id"));
1811    }
1812
1813    #[test]
1814    fn test_memory_sink() {
1815        let sink = MemorySink::new(10);
1816
1817        let event = AuditEvent::kernel_launched("test", "cuda");
1818        sink.write(&event).unwrap();
1819
1820        assert_eq!(sink.len(), 1);
1821        assert!(!sink.is_empty());
1822
1823        let events = sink.events();
1824        assert_eq!(events[0].event_type, AuditEventType::KernelLaunched);
1825    }
1826
1827    #[test]
1828    fn test_memory_sink_rotation() {
1829        let sink = MemorySink::new(3);
1830
1831        for i in 0..5 {
1832            let event = AuditEvent::new(
1833                AuditLevel::Info,
1834                AuditEventType::Custom(format!("event_{}", i)),
1835                "test",
1836                format!("Event {}", i),
1837            );
1838            sink.write(&event).unwrap();
1839        }
1840
1841        // Should only keep the last 3
1842        assert_eq!(sink.len(), 3);
1843        let events = sink.events();
1844        assert_eq!(
1845            events[0].event_type,
1846            AuditEventType::Custom("event_2".to_string())
1847        );
1848    }
1849
1850    #[test]
1851    fn test_audit_logger() {
1852        let logger = AuditLogger::in_memory(100);
1853
1854        logger.log_kernel_launched("k1", "cuda");
1855        logger.log_kernel_terminated("k1", "shutdown");
1856        logger.log_security_violation("user", "unauthorized access");
1857
1858        assert_eq!(logger.event_count(), 3);
1859    }
1860
1861    #[test]
1862    fn test_audit_level_ordering() {
1863        assert!(AuditLevel::Info < AuditLevel::Warning);
1864        assert!(AuditLevel::Warning < AuditLevel::Security);
1865        assert!(AuditLevel::Security < AuditLevel::Critical);
1866        assert!(AuditLevel::Critical < AuditLevel::Compliance);
1867    }
1868
1869    #[test]
1870    fn test_audit_event_helpers() {
1871        let event = AuditEvent::config_change("admin", "max_kernels", "10", "20");
1872        assert_eq!(event.level, AuditLevel::Compliance);
1873        assert_eq!(event.metadata.len(), 2);
1874
1875        let health = AuditEvent::health_check("kernel_1", "healthy");
1876        assert_eq!(health.event_type, AuditEventType::HealthCheck);
1877    }
1878
1879    #[test]
1880    fn test_syslog_severity_conversion() {
1881        assert_eq!(
1882            SyslogSeverity::from(AuditLevel::Info),
1883            SyslogSeverity::Informational
1884        );
1885        assert_eq!(
1886            SyslogSeverity::from(AuditLevel::Warning),
1887            SyslogSeverity::Warning
1888        );
1889        assert_eq!(
1890            SyslogSeverity::from(AuditLevel::Security),
1891            SyslogSeverity::Notice
1892        );
1893        assert_eq!(
1894            SyslogSeverity::from(AuditLevel::Critical),
1895            SyslogSeverity::Error
1896        );
1897    }
1898
1899    #[test]
1900    fn test_syslog_config_default() {
1901        let config = SyslogConfig::default();
1902        assert_eq!(config.server_addr, "127.0.0.1:514");
1903        assert_eq!(config.facility, SyslogFacility::Local0);
1904        assert_eq!(config.app_name, "ringkernel");
1905        assert!(config.rfc5424);
1906    }
1907
1908    #[test]
1909    fn test_cloudwatch_config_default() {
1910        let config = CloudWatchConfig::default();
1911        assert_eq!(config.log_group, "/ringkernel/audit");
1912        assert_eq!(config.log_stream, "default");
1913        assert_eq!(config.region, "us-east-1");
1914        assert_eq!(config.batch_size, 100);
1915    }
1916
1917    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1918    async fn test_cloudwatch_sink_buffering() {
1919        let config = CloudWatchConfig {
1920            batch_size: 5,
1921            ..Default::default()
1922        };
1923        let sink = CloudWatchSink::new(config);
1924
1925        // Write 3 events (below batch size)
1926        for i in 0..3 {
1927            let event = AuditEvent::new(
1928                AuditLevel::Info,
1929                AuditEventType::Custom(format!("event_{}", i)),
1930                "test",
1931                format!("Event {}", i),
1932            );
1933            sink.write(&event).unwrap();
1934        }
1935
1936        assert_eq!(sink.buffer_size(), 3);
1937    }
1938
1939    #[test]
1940    fn test_syslog_facility_values() {
1941        assert_eq!(SyslogFacility::Kern as u8, 0);
1942        assert_eq!(SyslogFacility::User as u8, 1);
1943        assert_eq!(SyslogFacility::Auth as u8, 4);
1944        assert_eq!(SyslogFacility::Local0 as u8, 16);
1945        assert_eq!(SyslogFacility::Local7 as u8, 23);
1946    }
1947
1948    #[test]
1949    fn test_syslog_severity_values() {
1950        assert_eq!(SyslogSeverity::Emergency as u8, 0);
1951        assert_eq!(SyslogSeverity::Alert as u8, 1);
1952        assert_eq!(SyslogSeverity::Critical as u8, 2);
1953        assert_eq!(SyslogSeverity::Error as u8, 3);
1954        assert_eq!(SyslogSeverity::Warning as u8, 4);
1955        assert_eq!(SyslogSeverity::Notice as u8, 5);
1956        assert_eq!(SyslogSeverity::Informational as u8, 6);
1957        assert_eq!(SyslogSeverity::Debug as u8, 7);
1958    }
1959}