1use std::collections::VecDeque;
28use std::fmt;
29use std::io::Write;
30use std::net::UdpSocket;
31use std::path::PathBuf;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35
36use parking_lot::{Mutex, RwLock};
37
38use crate::hlc::HlcTimestamp;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46#[repr(u8)]
47pub enum AuditLevel {
48 Info = 0,
50 Warning = 1,
52 Security = 2,
54 Critical = 3,
56 Compliance = 4,
58}
59
60impl AuditLevel {
61 pub fn as_str(&self) -> &'static str {
63 match self {
64 Self::Info => "INFO",
65 Self::Warning => "WARNING",
66 Self::Security => "SECURITY",
67 Self::Critical => "CRITICAL",
68 Self::Compliance => "COMPLIANCE",
69 }
70 }
71}
72
73impl fmt::Display for AuditLevel {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 write!(f, "{}", self.as_str())
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub enum AuditEventType {
86 KernelLaunched,
89 KernelTerminated,
91 KernelMigrated,
93 KernelCheckpointed,
95 KernelRestored,
97
98 MessageSent,
101 MessageReceived,
103 MessageFailed,
105
106 AuthenticationAttempt,
109 AuthorizationCheck,
111 ConfigurationChange,
113 SecurityViolation,
115
116 MemoryAllocated,
119 MemoryDeallocated,
121 ResourceLimitExceeded,
123
124 HealthCheck,
127 CircuitBreakerStateChange,
129 DegradationChange,
131
132 Custom(String),
134}
135
136impl AuditEventType {
137 pub fn as_str(&self) -> &str {
139 match self {
140 Self::KernelLaunched => "kernel_launched",
141 Self::KernelTerminated => "kernel_terminated",
142 Self::KernelMigrated => "kernel_migrated",
143 Self::KernelCheckpointed => "kernel_checkpointed",
144 Self::KernelRestored => "kernel_restored",
145 Self::MessageSent => "message_sent",
146 Self::MessageReceived => "message_received",
147 Self::MessageFailed => "message_failed",
148 Self::AuthenticationAttempt => "authentication_attempt",
149 Self::AuthorizationCheck => "authorization_check",
150 Self::ConfigurationChange => "configuration_change",
151 Self::SecurityViolation => "security_violation",
152 Self::MemoryAllocated => "memory_allocated",
153 Self::MemoryDeallocated => "memory_deallocated",
154 Self::ResourceLimitExceeded => "resource_limit_exceeded",
155 Self::HealthCheck => "health_check",
156 Self::CircuitBreakerStateChange => "circuit_breaker_state_change",
157 Self::DegradationChange => "degradation_change",
158 Self::Custom(s) => s.as_str(),
159 }
160 }
161}
162
163impl fmt::Display for AuditEventType {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 write!(f, "{}", self.as_str())
166 }
167}
168
169#[derive(Debug, Clone)]
175pub struct AuditEvent {
176 pub id: u64,
178 pub timestamp: SystemTime,
180 pub hlc: Option<HlcTimestamp>,
182 pub level: AuditLevel,
184 pub event_type: AuditEventType,
186 pub actor: String,
188 pub target: Option<String>,
190 pub description: String,
192 pub metadata: Vec<(String, String)>,
194 pub prev_checksum: Option<u64>,
196 pub checksum: u64,
198}
199
200impl AuditEvent {
201 pub fn new(
203 level: AuditLevel,
204 event_type: AuditEventType,
205 actor: impl Into<String>,
206 description: impl Into<String>,
207 ) -> Self {
208 let id = next_event_id();
209 let timestamp = SystemTime::now();
210 let actor = actor.into();
211 let description = description.into();
212
213 let mut event = Self {
214 id,
215 timestamp,
216 hlc: None,
217 level,
218 event_type,
219 actor,
220 target: None,
221 description,
222 metadata: Vec::new(),
223 prev_checksum: None,
224 checksum: 0,
225 };
226
227 event.checksum = event.compute_checksum();
228 event
229 }
230
231 pub fn with_hlc(mut self, hlc: HlcTimestamp) -> Self {
233 self.hlc = Some(hlc);
234 self.checksum = self.compute_checksum();
235 self
236 }
237
238 pub fn with_target(mut self, target: impl Into<String>) -> Self {
240 self.target = Some(target.into());
241 self.checksum = self.compute_checksum();
242 self
243 }
244
245 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
247 self.metadata.push((key.into(), value.into()));
248 self.checksum = self.compute_checksum();
249 self
250 }
251
252 pub fn with_prev_checksum(mut self, checksum: u64) -> Self {
254 self.prev_checksum = Some(checksum);
255 self.checksum = self.compute_checksum();
256 self
257 }
258
259 fn compute_checksum(&self) -> u64 {
261 use std::collections::hash_map::DefaultHasher;
262 use std::hash::{Hash, Hasher};
263
264 let mut hasher = DefaultHasher::new();
265 self.id.hash(&mut hasher);
266 self.timestamp
267 .duration_since(UNIX_EPOCH)
268 .unwrap_or_default()
269 .as_nanos()
270 .hash(&mut hasher);
271 self.level.as_str().hash(&mut hasher);
272 self.event_type.as_str().hash(&mut hasher);
273 self.actor.hash(&mut hasher);
274 self.target.hash(&mut hasher);
275 self.description.hash(&mut hasher);
276 for (k, v) in &self.metadata {
277 k.hash(&mut hasher);
278 v.hash(&mut hasher);
279 }
280 self.prev_checksum.hash(&mut hasher);
281 hasher.finish()
282 }
283
284 pub fn verify_checksum(&self) -> bool {
286 self.checksum == self.compute_checksum()
287 }
288
289 pub fn kernel_launched(kernel_id: impl Into<String>, backend: impl Into<String>) -> Self {
293 Self::new(
294 AuditLevel::Info,
295 AuditEventType::KernelLaunched,
296 "runtime",
297 format!("Kernel launched on {}", backend.into()),
298 )
299 .with_target(kernel_id)
300 }
301
302 pub fn kernel_terminated(kernel_id: impl Into<String>, reason: impl Into<String>) -> Self {
304 Self::new(
305 AuditLevel::Info,
306 AuditEventType::KernelTerminated,
307 "runtime",
308 format!("Kernel terminated: {}", reason.into()),
309 )
310 .with_target(kernel_id)
311 }
312
313 pub fn security_violation(actor: impl Into<String>, violation: impl Into<String>) -> Self {
315 Self::new(
316 AuditLevel::Security,
317 AuditEventType::SecurityViolation,
318 actor,
319 violation,
320 )
321 }
322
323 pub fn config_change(
325 actor: impl Into<String>,
326 config_key: impl Into<String>,
327 old_value: impl Into<String>,
328 new_value: impl Into<String>,
329 ) -> Self {
330 Self::new(
331 AuditLevel::Compliance,
332 AuditEventType::ConfigurationChange,
333 actor,
334 format!("Configuration changed: {}", config_key.into()),
335 )
336 .with_metadata("old_value", old_value)
337 .with_metadata("new_value", new_value)
338 }
339
340 pub fn health_check(kernel_id: impl Into<String>, status: impl Into<String>) -> Self {
342 Self::new(
343 AuditLevel::Info,
344 AuditEventType::HealthCheck,
345 "health_checker",
346 format!("Health check: {}", status.into()),
347 )
348 .with_target(kernel_id)
349 }
350
351 pub fn to_json(&self) -> String {
353 let timestamp = self
354 .timestamp
355 .duration_since(UNIX_EPOCH)
356 .unwrap_or_default()
357 .as_millis();
358
359 let hlc_str = self
360 .hlc
361 .map(|h| {
362 format!(
363 r#","hlc":{{"wall":{},"logical":{}}}"#,
364 h.physical, h.logical
365 )
366 })
367 .unwrap_or_default();
368
369 let target_str = self
370 .target
371 .as_ref()
372 .map(|t| format!(r#","target":"{}""#, escape_json(t)))
373 .unwrap_or_default();
374
375 let prev_checksum_str = self
376 .prev_checksum
377 .map(|c| format!(r#","prev_checksum":{}"#, c))
378 .unwrap_or_default();
379
380 let metadata_str = if self.metadata.is_empty() {
381 String::new()
382 } else {
383 let pairs: Vec<String> = self
384 .metadata
385 .iter()
386 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
387 .collect();
388 format!(r#","metadata":{{{}}}"#, pairs.join(","))
389 };
390
391 format!(
392 r#"{{"id":{},"timestamp":{}{},"level":"{}","event_type":"{}","actor":"{}"{}"description":"{}"{}"checksum":{}{}}}"#,
393 self.id,
394 timestamp,
395 hlc_str,
396 self.level.as_str(),
397 self.event_type.as_str(),
398 escape_json(&self.actor),
399 target_str,
400 escape_json(&self.description),
401 metadata_str,
402 self.checksum,
403 prev_checksum_str,
404 )
405 }
406}
407
408fn escape_json(s: &str) -> String {
410 s.replace('\\', "\\\\")
411 .replace('"', "\\\"")
412 .replace('\n', "\\n")
413 .replace('\r', "\\r")
414 .replace('\t', "\\t")
415}
416
417static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
419
420fn next_event_id() -> u64 {
421 EVENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
422}
423
424pub trait AuditSink: Send + Sync {
430 fn write(&self, event: &AuditEvent) -> std::io::Result<()>;
432
433 fn flush(&self) -> std::io::Result<()>;
435
436 fn close(&self) -> std::io::Result<()>;
438}
439
440pub struct FileSink {
442 path: PathBuf,
443 writer: Mutex<Option<std::fs::File>>,
444 max_size: u64,
445 current_size: AtomicU64,
446}
447
448impl FileSink {
449 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
451 let path = path.into();
452 let file = std::fs::OpenOptions::new()
453 .create(true)
454 .append(true)
455 .open(&path)?;
456
457 let metadata = file.metadata()?;
458
459 Ok(Self {
460 path,
461 writer: Mutex::new(Some(file)),
462 max_size: 100 * 1024 * 1024, current_size: AtomicU64::new(metadata.len()),
464 })
465 }
466
467 pub fn with_max_size(mut self, size: u64) -> Self {
469 self.max_size = size;
470 self
471 }
472
473 fn rotate_if_needed(&self) -> std::io::Result<()> {
475 if self.current_size.load(Ordering::Relaxed) >= self.max_size {
476 let mut writer = self.writer.lock();
477 if let Some(file) = writer.take() {
478 drop(file);
479
480 let timestamp = SystemTime::now()
482 .duration_since(UNIX_EPOCH)
483 .unwrap_or_default()
484 .as_secs();
485 let rotated_path = self.path.with_extension(format!("log.{}", timestamp));
486 std::fs::rename(&self.path, rotated_path)?;
487
488 let new_file = std::fs::OpenOptions::new()
490 .create(true)
491 .append(true)
492 .open(&self.path)?;
493 *writer = Some(new_file);
494 self.current_size.store(0, Ordering::Relaxed);
495 }
496 }
497 Ok(())
498 }
499}
500
501impl AuditSink for FileSink {
502 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
503 self.rotate_if_needed()?;
504
505 let json = event.to_json();
506 let line = format!("{}\n", json);
507 let len = line.len() as u64;
508
509 let mut writer = self.writer.lock();
510 if let Some(file) = writer.as_mut() {
511 file.write_all(line.as_bytes())?;
512 self.current_size.fetch_add(len, Ordering::Relaxed);
513 }
514 Ok(())
515 }
516
517 fn flush(&self) -> std::io::Result<()> {
518 let mut writer = self.writer.lock();
519 if let Some(file) = writer.as_mut() {
520 file.flush()?;
521 }
522 Ok(())
523 }
524
525 fn close(&self) -> std::io::Result<()> {
526 let mut writer = self.writer.lock();
527 if let Some(file) = writer.take() {
528 drop(file);
529 }
530 Ok(())
531 }
532}
533
534#[derive(Default)]
536pub struct MemorySink {
537 events: Mutex<VecDeque<AuditEvent>>,
538 max_events: usize,
539}
540
541impl MemorySink {
542 pub fn new(max_events: usize) -> Self {
544 Self {
545 events: Mutex::new(VecDeque::with_capacity(max_events)),
546 max_events,
547 }
548 }
549
550 pub fn events(&self) -> Vec<AuditEvent> {
552 self.events.lock().iter().cloned().collect()
553 }
554
555 pub fn len(&self) -> usize {
557 self.events.lock().len()
558 }
559
560 pub fn is_empty(&self) -> bool {
562 self.events.lock().is_empty()
563 }
564
565 pub fn clear(&self) {
567 self.events.lock().clear();
568 }
569}
570
571impl AuditSink for MemorySink {
572 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
573 let mut events = self.events.lock();
574 if events.len() >= self.max_events {
575 events.pop_front();
576 }
577 events.push_back(event.clone());
578 Ok(())
579 }
580
581 fn flush(&self) -> std::io::Result<()> {
582 Ok(())
583 }
584
585 fn close(&self) -> std::io::Result<()> {
586 Ok(())
587 }
588}
589
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596#[repr(u8)]
597pub enum SyslogFacility {
598 Kern = 0,
600 User = 1,
602 Auth = 4,
604 AuthPriv = 10,
606 Local0 = 16,
608 Local1 = 17,
610 Local2 = 18,
612 Local3 = 19,
614 Local4 = 20,
616 Local5 = 21,
618 Local6 = 22,
620 Local7 = 23,
622}
623
624#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626#[repr(u8)]
627pub enum SyslogSeverity {
628 Emergency = 0,
630 Alert = 1,
632 Critical = 2,
634 Error = 3,
636 Warning = 4,
638 Notice = 5,
640 Informational = 6,
642 Debug = 7,
644}
645
646impl From<AuditLevel> for SyslogSeverity {
647 fn from(level: AuditLevel) -> Self {
648 match level {
649 AuditLevel::Info => SyslogSeverity::Informational,
650 AuditLevel::Warning => SyslogSeverity::Warning,
651 AuditLevel::Security => SyslogSeverity::Notice,
652 AuditLevel::Critical => SyslogSeverity::Error,
653 AuditLevel::Compliance => SyslogSeverity::Notice,
654 }
655 }
656}
657
658#[derive(Debug, Clone)]
660pub struct SyslogConfig {
661 pub server_addr: String,
663 pub facility: SyslogFacility,
665 pub app_name: String,
667 pub procid: Option<String>,
669 pub msgid: Option<String>,
671 pub rfc5424: bool,
673}
674
675impl Default for SyslogConfig {
676 fn default() -> Self {
677 Self {
678 server_addr: "127.0.0.1:514".to_string(),
679 facility: SyslogFacility::Local0,
680 app_name: "ringkernel".to_string(),
681 procid: None,
682 msgid: None,
683 rfc5424: true,
684 }
685 }
686}
687
688pub struct SyslogSink {
690 config: SyslogConfig,
691 socket: Mutex<Option<UdpSocket>>,
692 hostname: String,
693}
694
695impl SyslogSink {
696 pub fn new(config: SyslogConfig) -> std::io::Result<Self> {
698 let socket = UdpSocket::bind("0.0.0.0:0")?;
699 socket.connect(&config.server_addr)?;
700
701 let hostname = std::env::var("HOSTNAME")
703 .or_else(|_| std::env::var("HOST"))
704 .unwrap_or_else(|_| "localhost".to_string());
705
706 Ok(Self {
707 config,
708 socket: Mutex::new(Some(socket)),
709 hostname,
710 })
711 }
712
713 pub fn with_server(server_addr: impl Into<String>) -> std::io::Result<Self> {
715 Self::new(SyslogConfig {
716 server_addr: server_addr.into(),
717 ..Default::default()
718 })
719 }
720
721 fn format_rfc5424(&self, event: &AuditEvent) -> String {
723 let severity: SyslogSeverity = event.level.into();
724 let priority = (self.config.facility as u8) * 8 + (severity as u8);
725
726 let timestamp = event
728 .timestamp
729 .duration_since(UNIX_EPOCH)
730 .unwrap_or_default();
731 let secs = timestamp.as_secs();
732 let millis = timestamp.subsec_millis();
733
734 let epoch_days = secs / 86400;
736 let day_secs = secs % 86400;
737 let hours = day_secs / 3600;
738 let minutes = (day_secs % 3600) / 60;
739 let seconds = day_secs % 60;
740
741 let year = 1970 + (epoch_days / 365);
743 let day_of_year = epoch_days % 365;
744 let month = (day_of_year / 30).min(11) + 1;
745 let day = (day_of_year % 30) + 1;
746
747 let timestamp_str = format!(
748 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
749 year, month, day, hours, minutes, seconds, millis
750 );
751
752 let procid = self.config.procid.as_deref().unwrap_or("-");
753 let msgid = self.config.msgid.as_deref().unwrap_or("-");
754
755 let sd = format!(
757 "[ringkernel@12345 level=\"{}\" event_type=\"{}\" actor=\"{}\" checksum=\"{}\"]",
758 event.level.as_str(),
759 event.event_type.as_str(),
760 event.actor,
761 event.checksum
762 );
763
764 format!(
765 "<{}>{} {} {} {} {} {} {} {}",
766 priority,
767 1, timestamp_str,
769 self.hostname,
770 self.config.app_name,
771 procid,
772 msgid,
773 sd,
774 event.description
775 )
776 }
777
778 fn format_bsd(&self, event: &AuditEvent) -> String {
780 let severity: SyslogSeverity = event.level.into();
781 let priority = (self.config.facility as u8) * 8 + (severity as u8);
782
783 let timestamp = event
784 .timestamp
785 .duration_since(UNIX_EPOCH)
786 .unwrap_or_default();
787 let secs = timestamp.as_secs();
788
789 let months = [
791 "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
792 ];
793 let epoch_days = secs / 86400;
794 let day_secs = secs % 86400;
795 let hours = day_secs / 3600;
796 let minutes = (day_secs % 3600) / 60;
797 let seconds = day_secs % 60;
798
799 let day_of_year = epoch_days % 365;
800 let month_idx = ((day_of_year / 30) as usize).min(11);
801 let day = (day_of_year % 30) + 1;
802
803 let timestamp_str = format!(
804 "{} {:2} {:02}:{:02}:{:02}",
805 months[month_idx], day, hours, minutes, seconds
806 );
807
808 format!(
809 "<{}>{} {} {}: [{}] {}",
810 priority,
811 timestamp_str,
812 self.hostname,
813 self.config.app_name,
814 event.event_type.as_str(),
815 event.description
816 )
817 }
818}
819
820impl AuditSink for SyslogSink {
821 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
822 let message = if self.config.rfc5424 {
823 self.format_rfc5424(event)
824 } else {
825 self.format_bsd(event)
826 };
827
828 let socket = self.socket.lock();
829 if let Some(ref sock) = *socket {
830 sock.send(message.as_bytes())?;
831 }
832 Ok(())
833 }
834
835 fn flush(&self) -> std::io::Result<()> {
836 Ok(())
837 }
838
839 fn close(&self) -> std::io::Result<()> {
840 let mut socket = self.socket.lock();
841 *socket = None;
842 Ok(())
843 }
844}
845
846#[cfg(feature = "alerting")]
852#[derive(Debug, Clone)]
853pub struct ElasticsearchConfig {
854 pub url: String,
856 pub index_pattern: String,
858 pub auth: Option<(String, String)>,
860 pub batch_size: usize,
862 pub timeout: Duration,
864}
865
866#[cfg(feature = "alerting")]
867impl Default for ElasticsearchConfig {
868 fn default() -> Self {
869 Self {
870 url: "http://localhost:9200".to_string(),
871 index_pattern: "ringkernel-audit".to_string(),
872 auth: None,
873 batch_size: 100,
874 timeout: Duration::from_secs(30),
875 }
876 }
877}
878
879#[cfg(feature = "alerting")]
881pub struct ElasticsearchSink {
882 config: ElasticsearchConfig,
883 client: reqwest::blocking::Client,
884 buffer: Mutex<Vec<String>>,
885}
886
887#[cfg(feature = "alerting")]
888impl ElasticsearchSink {
889 pub fn new(config: ElasticsearchConfig) -> Result<Self, reqwest::Error> {
891 let client = reqwest::blocking::Client::builder()
892 .timeout(config.timeout)
893 .build()?;
894
895 Ok(Self {
896 config,
897 client,
898 buffer: Mutex::new(Vec::new()),
899 })
900 }
901
902 fn get_index(&self, event: &AuditEvent) -> String {
904 let timestamp = event
905 .timestamp
906 .duration_since(UNIX_EPOCH)
907 .unwrap_or_default();
908 let secs = timestamp.as_secs();
909
910 let epoch_days = secs / 86400;
912 let year = 1970 + (epoch_days / 365);
913 let day_of_year = epoch_days % 365;
914 let month = (day_of_year / 30).min(11) + 1;
915 let day = (day_of_year % 30) + 1;
916
917 let date_str = format!("{:04}.{:02}.{:02}", year, month, day);
918
919 self.config
920 .index_pattern
921 .replace("{date}", &date_str)
922 .replace("{year}", &format!("{:04}", year))
923 .replace("{month}", &format!("{:02}", month))
924 .replace("{day}", &format!("{:02}", day))
925 }
926
927 fn to_es_document(&self, event: &AuditEvent) -> String {
929 let timestamp_millis = event
930 .timestamp
931 .duration_since(UNIX_EPOCH)
932 .unwrap_or_default()
933 .as_millis();
934
935 let metadata_json = if event.metadata.is_empty() {
937 "{}".to_string()
938 } else {
939 let pairs: Vec<String> = event
940 .metadata
941 .iter()
942 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
943 .collect();
944 format!("{{{}}}", pairs.join(","))
945 };
946
947 let hlc_json = event
948 .hlc
949 .map(|h| {
950 format!(
951 r#","hlc":{{"physical":{},"logical":{}}}"#,
952 h.physical, h.logical
953 )
954 })
955 .unwrap_or_default();
956
957 let target_json = event
958 .target
959 .as_ref()
960 .map(|t| format!(r#","target":"{}""#, escape_json(t)))
961 .unwrap_or_default();
962
963 format!(
964 r#"{{"@timestamp":{},"id":{},"level":"{}","event_type":"{}","actor":"{}"{}{}"description":"{}","metadata":{},"checksum":{}}}"#,
965 timestamp_millis,
966 event.id,
967 event.level.as_str(),
968 event.event_type.as_str(),
969 escape_json(&event.actor),
970 target_json,
971 hlc_json,
972 escape_json(&event.description),
973 metadata_json,
974 event.checksum
975 )
976 }
977
978 fn flush_buffer(&self) -> std::io::Result<()> {
980 let documents: Vec<String> = {
981 let mut buffer = self.buffer.lock();
982 std::mem::take(&mut *buffer)
983 };
984
985 if documents.is_empty() {
986 return Ok(());
987 }
988
989 let mut bulk_body = String::new();
991 for doc in documents {
992 bulk_body.push_str(&format!(
994 r#"{{"index":{{"_index":"{}"}}}}"#,
995 self.config.index_pattern.replace("{date}", "current")
996 ));
997 bulk_body.push('\n');
998 bulk_body.push_str(&doc);
1000 bulk_body.push('\n');
1001 }
1002
1003 let url = format!("{}/_bulk", self.config.url);
1004 let mut request = self
1005 .client
1006 .post(&url)
1007 .body(bulk_body)
1008 .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson");
1009
1010 if let Some((user, pass)) = &self.config.auth {
1011 request = request.basic_auth(user, Some(pass));
1012 }
1013
1014 request.send().map_err(|e| {
1015 std::io::Error::new(
1016 std::io::ErrorKind::Other,
1017 format!("ES request failed: {}", e),
1018 )
1019 })?;
1020
1021 Ok(())
1022 }
1023}
1024
1025#[cfg(feature = "alerting")]
1026impl AuditSink for ElasticsearchSink {
1027 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1028 let doc = self.to_es_document(event);
1029
1030 let should_flush = {
1031 let mut buffer = self.buffer.lock();
1032 buffer.push(doc);
1033 buffer.len() >= self.config.batch_size
1034 };
1035
1036 if should_flush {
1037 self.flush_buffer()?;
1038 }
1039
1040 Ok(())
1041 }
1042
1043 fn flush(&self) -> std::io::Result<()> {
1044 self.flush_buffer()
1045 }
1046
1047 fn close(&self) -> std::io::Result<()> {
1048 self.flush_buffer()
1049 }
1050}
1051
1052#[derive(Debug, Clone)]
1058pub struct CloudWatchConfig {
1059 pub log_group: String,
1061 pub log_stream: String,
1063 pub region: String,
1065 pub batch_size: usize,
1067}
1068
1069impl Default for CloudWatchConfig {
1070 fn default() -> Self {
1071 Self {
1072 log_group: "/ringkernel/audit".to_string(),
1073 log_stream: "default".to_string(),
1074 region: "us-east-1".to_string(),
1075 batch_size: 100,
1076 }
1077 }
1078}
1079
1080pub struct CloudWatchSink {
1085 config: CloudWatchConfig,
1086 buffer: Mutex<Vec<(u64, String)>>, _sequence_token: Mutex<Option<String>>,
1088}
1089
1090impl CloudWatchSink {
1091 pub fn new(config: CloudWatchConfig) -> Self {
1093 Self {
1094 config,
1095 buffer: Mutex::new(Vec::new()),
1096 _sequence_token: Mutex::new(None),
1097 }
1098 }
1099
1100 pub fn config(&self) -> &CloudWatchConfig {
1102 &self.config
1103 }
1104
1105 pub fn buffer_size(&self) -> usize {
1107 self.buffer.lock().len()
1108 }
1109}
1110
1111impl AuditSink for CloudWatchSink {
1112 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1113 let timestamp_ms = event
1114 .timestamp
1115 .duration_since(UNIX_EPOCH)
1116 .unwrap_or_default()
1117 .as_millis() as u64;
1118
1119 let message = event.to_json();
1120
1121 let should_flush = {
1122 let mut buffer = self.buffer.lock();
1123 buffer.push((timestamp_ms, message));
1124 buffer.len() >= self.config.batch_size
1125 };
1126
1127 if should_flush {
1128 self.flush()?;
1129 }
1130
1131 Ok(())
1132 }
1133
1134 fn flush(&self) -> std::io::Result<()> {
1135 let events: Vec<(u64, String)> = {
1136 let mut buffer = self.buffer.lock();
1137 std::mem::take(&mut *buffer)
1138 };
1139
1140 if events.is_empty() {
1141 return Ok(());
1142 }
1143
1144 eprintln!(
1147 "[CloudWatch stub] Would send {} events to {}/{}",
1148 events.len(),
1149 self.config.log_group,
1150 self.config.log_stream
1151 );
1152
1153 Ok(())
1154 }
1155
1156 fn close(&self) -> std::io::Result<()> {
1157 self.flush()
1158 }
1159}
1160
1161#[derive(Debug, Clone)]
1167pub struct AuditConfig {
1168 pub min_level: AuditLevel,
1170 pub enable_checksums: bool,
1172 pub buffer_size: usize,
1174 pub flush_interval: Duration,
1176 pub retention: Duration,
1178}
1179
1180impl Default for AuditConfig {
1181 fn default() -> Self {
1182 Self {
1183 min_level: AuditLevel::Info,
1184 enable_checksums: true,
1185 buffer_size: 100,
1186 flush_interval: Duration::from_secs(5),
1187 retention: Duration::from_secs(90 * 24 * 60 * 60), }
1189 }
1190}
1191
1192pub struct AuditLoggerBuilder {
1194 config: AuditConfig,
1195 sinks: Vec<Arc<dyn AuditSink>>,
1196}
1197
1198impl AuditLoggerBuilder {
1199 pub fn new() -> Self {
1201 Self {
1202 config: AuditConfig::default(),
1203 sinks: Vec::new(),
1204 }
1205 }
1206
1207 pub fn with_min_level(mut self, level: AuditLevel) -> Self {
1209 self.config.min_level = level;
1210 self
1211 }
1212
1213 pub fn with_file_sink(mut self, path: impl Into<PathBuf>) -> std::io::Result<Self> {
1215 let sink = Arc::new(FileSink::new(path)?);
1216 self.sinks.push(sink);
1217 Ok(self)
1218 }
1219
1220 pub fn with_memory_sink(mut self, max_events: usize) -> Self {
1222 let sink = Arc::new(MemorySink::new(max_events));
1223 self.sinks.push(sink);
1224 self
1225 }
1226
1227 pub fn with_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
1229 self.sinks.push(sink);
1230 self
1231 }
1232
1233 pub fn with_syslog_sink(mut self, config: SyslogConfig) -> std::io::Result<Self> {
1235 let sink = Arc::new(SyslogSink::new(config)?);
1236 self.sinks.push(sink);
1237 Ok(self)
1238 }
1239
1240 pub fn with_syslog(mut self, server_addr: impl Into<String>) -> std::io::Result<Self> {
1242 let sink = Arc::new(SyslogSink::with_server(server_addr)?);
1243 self.sinks.push(sink);
1244 Ok(self)
1245 }
1246
1247 pub fn with_cloudwatch_sink(mut self, config: CloudWatchConfig) -> Self {
1249 let sink = Arc::new(CloudWatchSink::new(config));
1250 self.sinks.push(sink);
1251 self
1252 }
1253
1254 #[cfg(feature = "alerting")]
1256 pub fn with_elasticsearch_sink(
1257 mut self,
1258 config: ElasticsearchConfig,
1259 ) -> Result<Self, reqwest::Error> {
1260 let sink = Arc::new(ElasticsearchSink::new(config)?);
1261 self.sinks.push(sink);
1262 Ok(self)
1263 }
1264
1265 pub fn with_retention(mut self, retention: Duration) -> Self {
1267 self.config.retention = retention;
1268 self
1269 }
1270
1271 pub fn with_checksums(mut self, enable: bool) -> Self {
1273 self.config.enable_checksums = enable;
1274 self
1275 }
1276
1277 pub fn build(self) -> AuditLogger {
1279 AuditLogger {
1280 config: self.config,
1281 sinks: self.sinks,
1282 last_checksum: AtomicU64::new(0),
1283 event_count: AtomicU64::new(0),
1284 buffer: RwLock::new(Vec::new()),
1285 }
1286 }
1287}
1288
1289impl Default for AuditLoggerBuilder {
1290 fn default() -> Self {
1291 Self::new()
1292 }
1293}
1294
1295pub struct AuditLogger {
1297 config: AuditConfig,
1298 sinks: Vec<Arc<dyn AuditSink>>,
1299 last_checksum: AtomicU64,
1300 event_count: AtomicU64,
1301 buffer: RwLock<Vec<AuditEvent>>,
1302}
1303
1304impl AuditLogger {
1305 pub fn builder() -> AuditLoggerBuilder {
1307 AuditLoggerBuilder::new()
1308 }
1309
1310 pub fn in_memory(max_events: usize) -> Self {
1312 AuditLoggerBuilder::new()
1313 .with_memory_sink(max_events)
1314 .build()
1315 }
1316
1317 pub fn log(&self, mut event: AuditEvent) {
1319 if event.level < self.config.min_level {
1321 return;
1322 }
1323
1324 if self.config.enable_checksums {
1326 let prev = self.last_checksum.load(Ordering::Acquire);
1327 event = event.with_prev_checksum(prev);
1328 self.last_checksum.store(event.checksum, Ordering::Release);
1329 }
1330
1331 for sink in &self.sinks {
1333 if let Err(e) = sink.write(&event) {
1334 eprintln!("Audit sink error: {}", e);
1335 }
1336 }
1337
1338 self.event_count.fetch_add(1, Ordering::Relaxed);
1339 }
1340
1341 pub fn log_kernel_launched(&self, kernel_id: &str, backend: &str) {
1343 self.log(AuditEvent::kernel_launched(kernel_id, backend));
1344 }
1345
1346 pub fn log_kernel_terminated(&self, kernel_id: &str, reason: &str) {
1348 self.log(AuditEvent::kernel_terminated(kernel_id, reason));
1349 }
1350
1351 pub fn log_security_violation(&self, actor: &str, violation: &str) {
1353 self.log(AuditEvent::security_violation(actor, violation));
1354 }
1355
1356 pub fn log_config_change(&self, actor: &str, key: &str, old_value: &str, new_value: &str) {
1358 self.log(AuditEvent::config_change(actor, key, old_value, new_value));
1359 }
1360
1361 pub fn event_count(&self) -> u64 {
1363 self.event_count.load(Ordering::Relaxed)
1364 }
1365
1366 pub fn buffer_event(&self, event: AuditEvent) {
1370 let mut buffer = self.buffer.write();
1371 buffer.push(event);
1372 }
1373
1374 pub fn flush_buffered(&self) -> std::io::Result<()> {
1376 let events: Vec<AuditEvent> = {
1377 let mut buffer = self.buffer.write();
1378 std::mem::take(&mut *buffer)
1379 };
1380
1381 for mut event in events {
1382 if self.config.enable_checksums {
1384 let prev = self.last_checksum.load(Ordering::Acquire);
1385 event = event.with_prev_checksum(prev);
1386 self.last_checksum.store(event.checksum, Ordering::Release);
1387 }
1388
1389 for sink in &self.sinks {
1391 sink.write(&event)?;
1392 }
1393
1394 self.event_count.fetch_add(1, Ordering::Relaxed);
1395 }
1396
1397 self.flush()
1398 }
1399
1400 pub fn buffered_count(&self) -> usize {
1402 self.buffer.read().len()
1403 }
1404
1405 pub fn flush(&self) -> std::io::Result<()> {
1407 for sink in &self.sinks {
1408 sink.flush()?;
1409 }
1410 Ok(())
1411 }
1412
1413 pub fn close(&self) -> std::io::Result<()> {
1415 for sink in &self.sinks {
1416 sink.close()?;
1417 }
1418 Ok(())
1419 }
1420}
1421
1422#[cfg(test)]
1427mod tests {
1428 use super::*;
1429
1430 #[test]
1431 fn test_audit_event_creation() {
1432 let event = AuditEvent::new(
1433 AuditLevel::Info,
1434 AuditEventType::KernelLaunched,
1435 "runtime",
1436 "Kernel launched",
1437 );
1438
1439 assert_eq!(event.level, AuditLevel::Info);
1440 assert_eq!(event.event_type, AuditEventType::KernelLaunched);
1441 assert_eq!(event.actor, "runtime");
1442 assert!(event.checksum != 0);
1443 }
1444
1445 #[test]
1446 fn test_audit_event_checksum() {
1447 let event = AuditEvent::kernel_launched("test_kernel", "cuda");
1448 assert!(event.verify_checksum());
1449
1450 let mut modified = event.clone();
1452 modified.description = "Modified".to_string();
1453 assert!(!modified.verify_checksum());
1454 }
1455
1456 #[test]
1457 fn test_audit_event_chain() {
1458 let event1 = AuditEvent::kernel_launched("k1", "cuda");
1459 let event2 = AuditEvent::kernel_launched("k2", "cuda").with_prev_checksum(event1.checksum);
1460
1461 assert_eq!(event2.prev_checksum, Some(event1.checksum));
1462 }
1463
1464 #[test]
1465 fn test_audit_event_json() {
1466 let event = AuditEvent::kernel_launched("test", "cuda")
1467 .with_metadata("gpu_id", "0")
1468 .with_metadata("memory_mb", "8192");
1469
1470 let json = event.to_json();
1471 assert!(json.contains("kernel_launched"));
1472 assert!(json.contains("test"));
1473 assert!(json.contains("cuda"));
1474 assert!(json.contains("gpu_id"));
1475 }
1476
1477 #[test]
1478 fn test_memory_sink() {
1479 let sink = MemorySink::new(10);
1480
1481 let event = AuditEvent::kernel_launched("test", "cuda");
1482 sink.write(&event).unwrap();
1483
1484 assert_eq!(sink.len(), 1);
1485 assert!(!sink.is_empty());
1486
1487 let events = sink.events();
1488 assert_eq!(events[0].event_type, AuditEventType::KernelLaunched);
1489 }
1490
1491 #[test]
1492 fn test_memory_sink_rotation() {
1493 let sink = MemorySink::new(3);
1494
1495 for i in 0..5 {
1496 let event = AuditEvent::new(
1497 AuditLevel::Info,
1498 AuditEventType::Custom(format!("event_{}", i)),
1499 "test",
1500 format!("Event {}", i),
1501 );
1502 sink.write(&event).unwrap();
1503 }
1504
1505 assert_eq!(sink.len(), 3);
1507 let events = sink.events();
1508 assert_eq!(
1509 events[0].event_type,
1510 AuditEventType::Custom("event_2".to_string())
1511 );
1512 }
1513
1514 #[test]
1515 fn test_audit_logger() {
1516 let logger = AuditLogger::in_memory(100);
1517
1518 logger.log_kernel_launched("k1", "cuda");
1519 logger.log_kernel_terminated("k1", "shutdown");
1520 logger.log_security_violation("user", "unauthorized access");
1521
1522 assert_eq!(logger.event_count(), 3);
1523 }
1524
1525 #[test]
1526 fn test_audit_level_ordering() {
1527 assert!(AuditLevel::Info < AuditLevel::Warning);
1528 assert!(AuditLevel::Warning < AuditLevel::Security);
1529 assert!(AuditLevel::Security < AuditLevel::Critical);
1530 assert!(AuditLevel::Critical < AuditLevel::Compliance);
1531 }
1532
1533 #[test]
1534 fn test_audit_event_helpers() {
1535 let event = AuditEvent::config_change("admin", "max_kernels", "10", "20");
1536 assert_eq!(event.level, AuditLevel::Compliance);
1537 assert_eq!(event.metadata.len(), 2);
1538
1539 let health = AuditEvent::health_check("kernel_1", "healthy");
1540 assert_eq!(health.event_type, AuditEventType::HealthCheck);
1541 }
1542
1543 #[test]
1544 fn test_syslog_severity_conversion() {
1545 assert_eq!(
1546 SyslogSeverity::from(AuditLevel::Info),
1547 SyslogSeverity::Informational
1548 );
1549 assert_eq!(
1550 SyslogSeverity::from(AuditLevel::Warning),
1551 SyslogSeverity::Warning
1552 );
1553 assert_eq!(
1554 SyslogSeverity::from(AuditLevel::Security),
1555 SyslogSeverity::Notice
1556 );
1557 assert_eq!(
1558 SyslogSeverity::from(AuditLevel::Critical),
1559 SyslogSeverity::Error
1560 );
1561 }
1562
1563 #[test]
1564 fn test_syslog_config_default() {
1565 let config = SyslogConfig::default();
1566 assert_eq!(config.server_addr, "127.0.0.1:514");
1567 assert_eq!(config.facility, SyslogFacility::Local0);
1568 assert_eq!(config.app_name, "ringkernel");
1569 assert!(config.rfc5424);
1570 }
1571
1572 #[test]
1573 fn test_cloudwatch_config_default() {
1574 let config = CloudWatchConfig::default();
1575 assert_eq!(config.log_group, "/ringkernel/audit");
1576 assert_eq!(config.log_stream, "default");
1577 assert_eq!(config.region, "us-east-1");
1578 assert_eq!(config.batch_size, 100);
1579 }
1580
1581 #[test]
1582 fn test_cloudwatch_sink_buffering() {
1583 let config = CloudWatchConfig {
1584 batch_size: 5,
1585 ..Default::default()
1586 };
1587 let sink = CloudWatchSink::new(config);
1588
1589 for i in 0..3 {
1591 let event = AuditEvent::new(
1592 AuditLevel::Info,
1593 AuditEventType::Custom(format!("event_{}", i)),
1594 "test",
1595 format!("Event {}", i),
1596 );
1597 sink.write(&event).unwrap();
1598 }
1599
1600 assert_eq!(sink.buffer_size(), 3);
1601 }
1602
1603 #[test]
1604 fn test_syslog_facility_values() {
1605 assert_eq!(SyslogFacility::Kern as u8, 0);
1606 assert_eq!(SyslogFacility::User as u8, 1);
1607 assert_eq!(SyslogFacility::Auth as u8, 4);
1608 assert_eq!(SyslogFacility::Local0 as u8, 16);
1609 assert_eq!(SyslogFacility::Local7 as u8, 23);
1610 }
1611
1612 #[test]
1613 fn test_syslog_severity_values() {
1614 assert_eq!(SyslogSeverity::Emergency as u8, 0);
1615 assert_eq!(SyslogSeverity::Alert as u8, 1);
1616 assert_eq!(SyslogSeverity::Critical as u8, 2);
1617 assert_eq!(SyslogSeverity::Error as u8, 3);
1618 assert_eq!(SyslogSeverity::Warning as u8, 4);
1619 assert_eq!(SyslogSeverity::Notice as u8, 5);
1620 assert_eq!(SyslogSeverity::Informational as u8, 6);
1621 assert_eq!(SyslogSeverity::Debug as u8, 7);
1622 }
1623}