1use std::collections::VecDeque;
28use std::fmt;
29use std::io::Write;
30use std::path::PathBuf;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use std::time::{Duration, SystemTime, UNIX_EPOCH};
34
35use parking_lot::{Mutex, RwLock};
36
37use crate::hlc::HlcTimestamp;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
45#[repr(u8)]
46pub enum AuditLevel {
47 Info = 0,
49 Warning = 1,
51 Security = 2,
53 Critical = 3,
55 Compliance = 4,
57}
58
59impl AuditLevel {
60 pub fn as_str(&self) -> &'static str {
62 match self {
63 Self::Info => "INFO",
64 Self::Warning => "WARNING",
65 Self::Security => "SECURITY",
66 Self::Critical => "CRITICAL",
67 Self::Compliance => "COMPLIANCE",
68 }
69 }
70}
71
72impl fmt::Display for AuditLevel {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 write!(f, "{}", self.as_str())
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Hash)]
84pub enum AuditEventType {
85 KernelLaunched,
88 KernelTerminated,
90 KernelMigrated,
92 KernelCheckpointed,
94 KernelRestored,
96
97 MessageSent,
100 MessageReceived,
102 MessageFailed,
104
105 AuthenticationAttempt,
108 AuthorizationCheck,
110 ConfigurationChange,
112 SecurityViolation,
114
115 MemoryAllocated,
118 MemoryDeallocated,
120 ResourceLimitExceeded,
122
123 HealthCheck,
126 CircuitBreakerStateChange,
128 DegradationChange,
130
131 Custom(String),
133}
134
135impl AuditEventType {
136 pub fn as_str(&self) -> &str {
138 match self {
139 Self::KernelLaunched => "kernel_launched",
140 Self::KernelTerminated => "kernel_terminated",
141 Self::KernelMigrated => "kernel_migrated",
142 Self::KernelCheckpointed => "kernel_checkpointed",
143 Self::KernelRestored => "kernel_restored",
144 Self::MessageSent => "message_sent",
145 Self::MessageReceived => "message_received",
146 Self::MessageFailed => "message_failed",
147 Self::AuthenticationAttempt => "authentication_attempt",
148 Self::AuthorizationCheck => "authorization_check",
149 Self::ConfigurationChange => "configuration_change",
150 Self::SecurityViolation => "security_violation",
151 Self::MemoryAllocated => "memory_allocated",
152 Self::MemoryDeallocated => "memory_deallocated",
153 Self::ResourceLimitExceeded => "resource_limit_exceeded",
154 Self::HealthCheck => "health_check",
155 Self::CircuitBreakerStateChange => "circuit_breaker_state_change",
156 Self::DegradationChange => "degradation_change",
157 Self::Custom(s) => s.as_str(),
158 }
159 }
160}
161
162impl fmt::Display for AuditEventType {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 write!(f, "{}", self.as_str())
165 }
166}
167
168#[derive(Debug, Clone)]
174pub struct AuditEvent {
175 pub id: u64,
177 pub timestamp: SystemTime,
179 pub hlc: Option<HlcTimestamp>,
181 pub level: AuditLevel,
183 pub event_type: AuditEventType,
185 pub actor: String,
187 pub target: Option<String>,
189 pub description: String,
191 pub metadata: Vec<(String, String)>,
193 pub prev_checksum: Option<u64>,
195 pub checksum: u64,
197}
198
199impl AuditEvent {
200 pub fn new(
202 level: AuditLevel,
203 event_type: AuditEventType,
204 actor: impl Into<String>,
205 description: impl Into<String>,
206 ) -> Self {
207 let id = next_event_id();
208 let timestamp = SystemTime::now();
209 let actor = actor.into();
210 let description = description.into();
211
212 let mut event = Self {
213 id,
214 timestamp,
215 hlc: None,
216 level,
217 event_type,
218 actor,
219 target: None,
220 description,
221 metadata: Vec::new(),
222 prev_checksum: None,
223 checksum: 0,
224 };
225
226 event.checksum = event.compute_checksum();
227 event
228 }
229
230 pub fn with_hlc(mut self, hlc: HlcTimestamp) -> Self {
232 self.hlc = Some(hlc);
233 self.checksum = self.compute_checksum();
234 self
235 }
236
237 pub fn with_target(mut self, target: impl Into<String>) -> Self {
239 self.target = Some(target.into());
240 self.checksum = self.compute_checksum();
241 self
242 }
243
244 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
246 self.metadata.push((key.into(), value.into()));
247 self.checksum = self.compute_checksum();
248 self
249 }
250
251 pub fn with_prev_checksum(mut self, checksum: u64) -> Self {
253 self.prev_checksum = Some(checksum);
254 self.checksum = self.compute_checksum();
255 self
256 }
257
258 fn compute_checksum(&self) -> u64 {
260 use std::collections::hash_map::DefaultHasher;
261 use std::hash::{Hash, Hasher};
262
263 let mut hasher = DefaultHasher::new();
264 self.id.hash(&mut hasher);
265 self.timestamp
266 .duration_since(UNIX_EPOCH)
267 .unwrap_or_default()
268 .as_nanos()
269 .hash(&mut hasher);
270 self.level.as_str().hash(&mut hasher);
271 self.event_type.as_str().hash(&mut hasher);
272 self.actor.hash(&mut hasher);
273 self.target.hash(&mut hasher);
274 self.description.hash(&mut hasher);
275 for (k, v) in &self.metadata {
276 k.hash(&mut hasher);
277 v.hash(&mut hasher);
278 }
279 self.prev_checksum.hash(&mut hasher);
280 hasher.finish()
281 }
282
283 pub fn verify_checksum(&self) -> bool {
285 self.checksum == self.compute_checksum()
286 }
287
288 pub fn kernel_launched(kernel_id: impl Into<String>, backend: impl Into<String>) -> Self {
292 Self::new(
293 AuditLevel::Info,
294 AuditEventType::KernelLaunched,
295 "runtime",
296 format!("Kernel launched on {}", backend.into()),
297 )
298 .with_target(kernel_id)
299 }
300
301 pub fn kernel_terminated(kernel_id: impl Into<String>, reason: impl Into<String>) -> Self {
303 Self::new(
304 AuditLevel::Info,
305 AuditEventType::KernelTerminated,
306 "runtime",
307 format!("Kernel terminated: {}", reason.into()),
308 )
309 .with_target(kernel_id)
310 }
311
312 pub fn security_violation(actor: impl Into<String>, violation: impl Into<String>) -> Self {
314 Self::new(
315 AuditLevel::Security,
316 AuditEventType::SecurityViolation,
317 actor,
318 violation,
319 )
320 }
321
322 pub fn config_change(
324 actor: impl Into<String>,
325 config_key: impl Into<String>,
326 old_value: impl Into<String>,
327 new_value: impl Into<String>,
328 ) -> Self {
329 Self::new(
330 AuditLevel::Compliance,
331 AuditEventType::ConfigurationChange,
332 actor,
333 format!("Configuration changed: {}", config_key.into()),
334 )
335 .with_metadata("old_value", old_value)
336 .with_metadata("new_value", new_value)
337 }
338
339 pub fn health_check(kernel_id: impl Into<String>, status: impl Into<String>) -> Self {
341 Self::new(
342 AuditLevel::Info,
343 AuditEventType::HealthCheck,
344 "health_checker",
345 format!("Health check: {}", status.into()),
346 )
347 .with_target(kernel_id)
348 }
349
350 pub fn to_json(&self) -> String {
352 let timestamp = self
353 .timestamp
354 .duration_since(UNIX_EPOCH)
355 .unwrap_or_default()
356 .as_millis();
357
358 let hlc_str = self
359 .hlc
360 .map(|h| {
361 format!(
362 r#","hlc":{{"wall":{},"logical":{}}}"#,
363 h.physical, h.logical
364 )
365 })
366 .unwrap_or_default();
367
368 let target_str = self
369 .target
370 .as_ref()
371 .map(|t| format!(r#","target":"{}""#, escape_json(t)))
372 .unwrap_or_default();
373
374 let prev_checksum_str = self
375 .prev_checksum
376 .map(|c| format!(r#","prev_checksum":{}"#, c))
377 .unwrap_or_default();
378
379 let metadata_str = if self.metadata.is_empty() {
380 String::new()
381 } else {
382 let pairs: Vec<String> = self
383 .metadata
384 .iter()
385 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
386 .collect();
387 format!(r#","metadata":{{{}}}"#, pairs.join(","))
388 };
389
390 format!(
391 r#"{{"id":{},"timestamp":{}{},"level":"{}","event_type":"{}","actor":"{}"{}"description":"{}"{}"checksum":{}{}}}"#,
392 self.id,
393 timestamp,
394 hlc_str,
395 self.level.as_str(),
396 self.event_type.as_str(),
397 escape_json(&self.actor),
398 target_str,
399 escape_json(&self.description),
400 metadata_str,
401 self.checksum,
402 prev_checksum_str,
403 )
404 }
405}
406
407fn escape_json(s: &str) -> String {
409 s.replace('\\', "\\\\")
410 .replace('"', "\\\"")
411 .replace('\n', "\\n")
412 .replace('\r', "\\r")
413 .replace('\t', "\\t")
414}
415
416static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
418
419fn next_event_id() -> u64 {
420 EVENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
421}
422
423pub trait AuditSink: Send + Sync {
429 fn write(&self, event: &AuditEvent) -> std::io::Result<()>;
431
432 fn flush(&self) -> std::io::Result<()>;
434
435 fn close(&self) -> std::io::Result<()>;
437}
438
439pub struct FileSink {
441 path: PathBuf,
442 writer: Mutex<Option<std::fs::File>>,
443 max_size: u64,
444 current_size: AtomicU64,
445}
446
447impl FileSink {
448 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
450 let path = path.into();
451 let file = std::fs::OpenOptions::new()
452 .create(true)
453 .append(true)
454 .open(&path)?;
455
456 let metadata = file.metadata()?;
457
458 Ok(Self {
459 path,
460 writer: Mutex::new(Some(file)),
461 max_size: 100 * 1024 * 1024, current_size: AtomicU64::new(metadata.len()),
463 })
464 }
465
466 pub fn with_max_size(mut self, size: u64) -> Self {
468 self.max_size = size;
469 self
470 }
471
472 fn rotate_if_needed(&self) -> std::io::Result<()> {
474 if self.current_size.load(Ordering::Relaxed) >= self.max_size {
475 let mut writer = self.writer.lock();
476 if let Some(file) = writer.take() {
477 drop(file);
478
479 let timestamp = SystemTime::now()
481 .duration_since(UNIX_EPOCH)
482 .unwrap_or_default()
483 .as_secs();
484 let rotated_path = self.path.with_extension(format!("log.{}", timestamp));
485 std::fs::rename(&self.path, rotated_path)?;
486
487 let new_file = std::fs::OpenOptions::new()
489 .create(true)
490 .append(true)
491 .open(&self.path)?;
492 *writer = Some(new_file);
493 self.current_size.store(0, Ordering::Relaxed);
494 }
495 }
496 Ok(())
497 }
498}
499
500impl AuditSink for FileSink {
501 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
502 self.rotate_if_needed()?;
503
504 let json = event.to_json();
505 let line = format!("{}\n", json);
506 let len = line.len() as u64;
507
508 let mut writer = self.writer.lock();
509 if let Some(file) = writer.as_mut() {
510 file.write_all(line.as_bytes())?;
511 self.current_size.fetch_add(len, Ordering::Relaxed);
512 }
513 Ok(())
514 }
515
516 fn flush(&self) -> std::io::Result<()> {
517 let mut writer = self.writer.lock();
518 if let Some(file) = writer.as_mut() {
519 file.flush()?;
520 }
521 Ok(())
522 }
523
524 fn close(&self) -> std::io::Result<()> {
525 let mut writer = self.writer.lock();
526 if let Some(file) = writer.take() {
527 drop(file);
528 }
529 Ok(())
530 }
531}
532
533#[derive(Default)]
535pub struct MemorySink {
536 events: Mutex<VecDeque<AuditEvent>>,
537 max_events: usize,
538}
539
540impl MemorySink {
541 pub fn new(max_events: usize) -> Self {
543 Self {
544 events: Mutex::new(VecDeque::with_capacity(max_events)),
545 max_events,
546 }
547 }
548
549 pub fn events(&self) -> Vec<AuditEvent> {
551 self.events.lock().iter().cloned().collect()
552 }
553
554 pub fn len(&self) -> usize {
556 self.events.lock().len()
557 }
558
559 pub fn is_empty(&self) -> bool {
561 self.events.lock().is_empty()
562 }
563
564 pub fn clear(&self) {
566 self.events.lock().clear();
567 }
568}
569
570impl AuditSink for MemorySink {
571 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
572 let mut events = self.events.lock();
573 if events.len() >= self.max_events {
574 events.pop_front();
575 }
576 events.push_back(event.clone());
577 Ok(())
578 }
579
580 fn flush(&self) -> std::io::Result<()> {
581 Ok(())
582 }
583
584 fn close(&self) -> std::io::Result<()> {
585 Ok(())
586 }
587}
588
589#[derive(Debug, Clone)]
595pub struct AuditConfig {
596 pub min_level: AuditLevel,
598 pub enable_checksums: bool,
600 pub buffer_size: usize,
602 pub flush_interval: Duration,
604 pub retention: Duration,
606}
607
608impl Default for AuditConfig {
609 fn default() -> Self {
610 Self {
611 min_level: AuditLevel::Info,
612 enable_checksums: true,
613 buffer_size: 100,
614 flush_interval: Duration::from_secs(5),
615 retention: Duration::from_secs(90 * 24 * 60 * 60), }
617 }
618}
619
620pub struct AuditLoggerBuilder {
622 config: AuditConfig,
623 sinks: Vec<Arc<dyn AuditSink>>,
624}
625
626impl AuditLoggerBuilder {
627 pub fn new() -> Self {
629 Self {
630 config: AuditConfig::default(),
631 sinks: Vec::new(),
632 }
633 }
634
635 pub fn with_min_level(mut self, level: AuditLevel) -> Self {
637 self.config.min_level = level;
638 self
639 }
640
641 pub fn with_file_sink(mut self, path: impl Into<PathBuf>) -> std::io::Result<Self> {
643 let sink = Arc::new(FileSink::new(path)?);
644 self.sinks.push(sink);
645 Ok(self)
646 }
647
648 pub fn with_memory_sink(mut self, max_events: usize) -> Self {
650 let sink = Arc::new(MemorySink::new(max_events));
651 self.sinks.push(sink);
652 self
653 }
654
655 pub fn with_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
657 self.sinks.push(sink);
658 self
659 }
660
661 pub fn with_retention(mut self, retention: Duration) -> Self {
663 self.config.retention = retention;
664 self
665 }
666
667 pub fn with_checksums(mut self, enable: bool) -> Self {
669 self.config.enable_checksums = enable;
670 self
671 }
672
673 pub fn build(self) -> AuditLogger {
675 AuditLogger {
676 config: self.config,
677 sinks: self.sinks,
678 last_checksum: AtomicU64::new(0),
679 event_count: AtomicU64::new(0),
680 buffer: RwLock::new(Vec::new()),
681 }
682 }
683}
684
685impl Default for AuditLoggerBuilder {
686 fn default() -> Self {
687 Self::new()
688 }
689}
690
691pub struct AuditLogger {
693 config: AuditConfig,
694 sinks: Vec<Arc<dyn AuditSink>>,
695 last_checksum: AtomicU64,
696 event_count: AtomicU64,
697 buffer: RwLock<Vec<AuditEvent>>,
698}
699
700impl AuditLogger {
701 pub fn builder() -> AuditLoggerBuilder {
703 AuditLoggerBuilder::new()
704 }
705
706 pub fn in_memory(max_events: usize) -> Self {
708 AuditLoggerBuilder::new()
709 .with_memory_sink(max_events)
710 .build()
711 }
712
713 pub fn log(&self, mut event: AuditEvent) {
715 if event.level < self.config.min_level {
717 return;
718 }
719
720 if self.config.enable_checksums {
722 let prev = self.last_checksum.load(Ordering::Acquire);
723 event = event.with_prev_checksum(prev);
724 self.last_checksum.store(event.checksum, Ordering::Release);
725 }
726
727 for sink in &self.sinks {
729 if let Err(e) = sink.write(&event) {
730 eprintln!("Audit sink error: {}", e);
731 }
732 }
733
734 self.event_count.fetch_add(1, Ordering::Relaxed);
735 }
736
737 pub fn log_kernel_launched(&self, kernel_id: &str, backend: &str) {
739 self.log(AuditEvent::kernel_launched(kernel_id, backend));
740 }
741
742 pub fn log_kernel_terminated(&self, kernel_id: &str, reason: &str) {
744 self.log(AuditEvent::kernel_terminated(kernel_id, reason));
745 }
746
747 pub fn log_security_violation(&self, actor: &str, violation: &str) {
749 self.log(AuditEvent::security_violation(actor, violation));
750 }
751
752 pub fn log_config_change(&self, actor: &str, key: &str, old_value: &str, new_value: &str) {
754 self.log(AuditEvent::config_change(actor, key, old_value, new_value));
755 }
756
757 pub fn event_count(&self) -> u64 {
759 self.event_count.load(Ordering::Relaxed)
760 }
761
762 pub fn buffer_event(&self, event: AuditEvent) {
766 let mut buffer = self.buffer.write();
767 buffer.push(event);
768 }
769
770 pub fn flush_buffered(&self) -> std::io::Result<()> {
772 let events: Vec<AuditEvent> = {
773 let mut buffer = self.buffer.write();
774 std::mem::take(&mut *buffer)
775 };
776
777 for mut event in events {
778 if self.config.enable_checksums {
780 let prev = self.last_checksum.load(Ordering::Acquire);
781 event = event.with_prev_checksum(prev);
782 self.last_checksum.store(event.checksum, Ordering::Release);
783 }
784
785 for sink in &self.sinks {
787 sink.write(&event)?;
788 }
789
790 self.event_count.fetch_add(1, Ordering::Relaxed);
791 }
792
793 self.flush()
794 }
795
796 pub fn buffered_count(&self) -> usize {
798 self.buffer.read().len()
799 }
800
801 pub fn flush(&self) -> std::io::Result<()> {
803 for sink in &self.sinks {
804 sink.flush()?;
805 }
806 Ok(())
807 }
808
809 pub fn close(&self) -> std::io::Result<()> {
811 for sink in &self.sinks {
812 sink.close()?;
813 }
814 Ok(())
815 }
816}
817
818#[cfg(test)]
823mod tests {
824 use super::*;
825
826 #[test]
827 fn test_audit_event_creation() {
828 let event = AuditEvent::new(
829 AuditLevel::Info,
830 AuditEventType::KernelLaunched,
831 "runtime",
832 "Kernel launched",
833 );
834
835 assert_eq!(event.level, AuditLevel::Info);
836 assert_eq!(event.event_type, AuditEventType::KernelLaunched);
837 assert_eq!(event.actor, "runtime");
838 assert!(event.checksum != 0);
839 }
840
841 #[test]
842 fn test_audit_event_checksum() {
843 let event = AuditEvent::kernel_launched("test_kernel", "cuda");
844 assert!(event.verify_checksum());
845
846 let mut modified = event.clone();
848 modified.description = "Modified".to_string();
849 assert!(!modified.verify_checksum());
850 }
851
852 #[test]
853 fn test_audit_event_chain() {
854 let event1 = AuditEvent::kernel_launched("k1", "cuda");
855 let event2 = AuditEvent::kernel_launched("k2", "cuda").with_prev_checksum(event1.checksum);
856
857 assert_eq!(event2.prev_checksum, Some(event1.checksum));
858 }
859
860 #[test]
861 fn test_audit_event_json() {
862 let event = AuditEvent::kernel_launched("test", "cuda")
863 .with_metadata("gpu_id", "0")
864 .with_metadata("memory_mb", "8192");
865
866 let json = event.to_json();
867 assert!(json.contains("kernel_launched"));
868 assert!(json.contains("test"));
869 assert!(json.contains("cuda"));
870 assert!(json.contains("gpu_id"));
871 }
872
873 #[test]
874 fn test_memory_sink() {
875 let sink = MemorySink::new(10);
876
877 let event = AuditEvent::kernel_launched("test", "cuda");
878 sink.write(&event).unwrap();
879
880 assert_eq!(sink.len(), 1);
881 assert!(!sink.is_empty());
882
883 let events = sink.events();
884 assert_eq!(events[0].event_type, AuditEventType::KernelLaunched);
885 }
886
887 #[test]
888 fn test_memory_sink_rotation() {
889 let sink = MemorySink::new(3);
890
891 for i in 0..5 {
892 let event = AuditEvent::new(
893 AuditLevel::Info,
894 AuditEventType::Custom(format!("event_{}", i)),
895 "test",
896 format!("Event {}", i),
897 );
898 sink.write(&event).unwrap();
899 }
900
901 assert_eq!(sink.len(), 3);
903 let events = sink.events();
904 assert_eq!(
905 events[0].event_type,
906 AuditEventType::Custom("event_2".to_string())
907 );
908 }
909
910 #[test]
911 fn test_audit_logger() {
912 let logger = AuditLogger::in_memory(100);
913
914 logger.log_kernel_launched("k1", "cuda");
915 logger.log_kernel_terminated("k1", "shutdown");
916 logger.log_security_violation("user", "unauthorized access");
917
918 assert_eq!(logger.event_count(), 3);
919 }
920
921 #[test]
922 fn test_audit_level_ordering() {
923 assert!(AuditLevel::Info < AuditLevel::Warning);
924 assert!(AuditLevel::Warning < AuditLevel::Security);
925 assert!(AuditLevel::Security < AuditLevel::Critical);
926 assert!(AuditLevel::Critical < AuditLevel::Compliance);
927 }
928
929 #[test]
930 fn test_audit_event_helpers() {
931 let event = AuditEvent::config_change("admin", "max_kernels", "10", "20");
932 assert_eq!(event.level, AuditLevel::Compliance);
933 assert_eq!(event.metadata.len(), 2);
934
935 let health = AuditEvent::health_check("kernel_1", "healthy");
936 assert_eq!(health.event_type, AuditEventType::HealthCheck);
937 }
938}