1use std::collections::VecDeque;
28use std::fmt;
29use std::io::Write;
30use std::net::UdpSocket;
31use std::path::PathBuf;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35
36use parking_lot::{Mutex, RwLock};
37
38use crate::hlc::HlcTimestamp;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46#[repr(u8)]
47pub enum AuditLevel {
48 Info = 0,
50 Warning = 1,
52 Security = 2,
54 Critical = 3,
56 Compliance = 4,
58}
59
60impl AuditLevel {
61 pub fn as_str(&self) -> &'static str {
63 match self {
64 Self::Info => "INFO",
65 Self::Warning => "WARNING",
66 Self::Security => "SECURITY",
67 Self::Critical => "CRITICAL",
68 Self::Compliance => "COMPLIANCE",
69 }
70 }
71}
72
73impl fmt::Display for AuditLevel {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 write!(f, "{}", self.as_str())
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub enum AuditEventType {
86 KernelLaunched,
89 KernelTerminated,
91 KernelMigrated,
93 KernelCheckpointed,
95 KernelRestored,
97
98 MessageSent,
101 MessageReceived,
103 MessageFailed,
105
106 AuthenticationAttempt,
109 AuthorizationCheck,
111 ConfigurationChange,
113 SecurityViolation,
115
116 MemoryAllocated,
119 MemoryDeallocated,
121 ResourceLimitExceeded,
123
124 HealthCheck,
127 CircuitBreakerStateChange,
129 DegradationChange,
131
132 Custom(String),
134}
135
136impl AuditEventType {
137 pub fn as_str(&self) -> &str {
139 match self {
140 Self::KernelLaunched => "kernel_launched",
141 Self::KernelTerminated => "kernel_terminated",
142 Self::KernelMigrated => "kernel_migrated",
143 Self::KernelCheckpointed => "kernel_checkpointed",
144 Self::KernelRestored => "kernel_restored",
145 Self::MessageSent => "message_sent",
146 Self::MessageReceived => "message_received",
147 Self::MessageFailed => "message_failed",
148 Self::AuthenticationAttempt => "authentication_attempt",
149 Self::AuthorizationCheck => "authorization_check",
150 Self::ConfigurationChange => "configuration_change",
151 Self::SecurityViolation => "security_violation",
152 Self::MemoryAllocated => "memory_allocated",
153 Self::MemoryDeallocated => "memory_deallocated",
154 Self::ResourceLimitExceeded => "resource_limit_exceeded",
155 Self::HealthCheck => "health_check",
156 Self::CircuitBreakerStateChange => "circuit_breaker_state_change",
157 Self::DegradationChange => "degradation_change",
158 Self::Custom(s) => s.as_str(),
159 }
160 }
161}
162
163impl fmt::Display for AuditEventType {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 write!(f, "{}", self.as_str())
166 }
167}
168
169#[derive(Debug, Clone)]
175pub struct AuditEvent {
176 pub id: u64,
178 pub timestamp: SystemTime,
180 pub hlc: Option<HlcTimestamp>,
182 pub level: AuditLevel,
184 pub event_type: AuditEventType,
186 pub actor: String,
188 pub target: Option<String>,
190 pub description: String,
192 pub metadata: Vec<(String, String)>,
194 pub prev_checksum: Option<u64>,
196 pub checksum: u64,
198}
199
200impl AuditEvent {
201 pub fn new(
203 level: AuditLevel,
204 event_type: AuditEventType,
205 actor: impl Into<String>,
206 description: impl Into<String>,
207 ) -> Self {
208 let id = next_event_id();
209 let timestamp = SystemTime::now();
210 let actor = actor.into();
211 let description = description.into();
212
213 let mut event = Self {
214 id,
215 timestamp,
216 hlc: None,
217 level,
218 event_type,
219 actor,
220 target: None,
221 description,
222 metadata: Vec::new(),
223 prev_checksum: None,
224 checksum: 0,
225 };
226
227 event.checksum = event.compute_checksum();
228 event
229 }
230
231 pub fn with_hlc(mut self, hlc: HlcTimestamp) -> Self {
233 self.hlc = Some(hlc);
234 self.checksum = self.compute_checksum();
235 self
236 }
237
238 pub fn with_target(mut self, target: impl Into<String>) -> Self {
240 self.target = Some(target.into());
241 self.checksum = self.compute_checksum();
242 self
243 }
244
245 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
247 self.metadata.push((key.into(), value.into()));
248 self.checksum = self.compute_checksum();
249 self
250 }
251
252 pub fn with_prev_checksum(mut self, checksum: u64) -> Self {
254 self.prev_checksum = Some(checksum);
255 self.checksum = self.compute_checksum();
256 self
257 }
258
259 fn compute_checksum(&self) -> u64 {
261 use std::collections::hash_map::DefaultHasher;
262 use std::hash::{Hash, Hasher};
263
264 let mut hasher = DefaultHasher::new();
265 self.id.hash(&mut hasher);
266 self.timestamp
267 .duration_since(UNIX_EPOCH)
268 .unwrap_or_default()
269 .as_nanos()
270 .hash(&mut hasher);
271 self.level.as_str().hash(&mut hasher);
272 self.event_type.as_str().hash(&mut hasher);
273 self.actor.hash(&mut hasher);
274 self.target.hash(&mut hasher);
275 self.description.hash(&mut hasher);
276 for (k, v) in &self.metadata {
277 k.hash(&mut hasher);
278 v.hash(&mut hasher);
279 }
280 self.prev_checksum.hash(&mut hasher);
281 hasher.finish()
282 }
283
284 pub fn verify_checksum(&self) -> bool {
286 self.checksum == self.compute_checksum()
287 }
288
289 pub fn kernel_launched(kernel_id: impl Into<String>, backend: impl Into<String>) -> Self {
293 Self::new(
294 AuditLevel::Info,
295 AuditEventType::KernelLaunched,
296 "runtime",
297 format!("Kernel launched on {}", backend.into()),
298 )
299 .with_target(kernel_id)
300 }
301
302 pub fn kernel_terminated(kernel_id: impl Into<String>, reason: impl Into<String>) -> Self {
304 Self::new(
305 AuditLevel::Info,
306 AuditEventType::KernelTerminated,
307 "runtime",
308 format!("Kernel terminated: {}", reason.into()),
309 )
310 .with_target(kernel_id)
311 }
312
313 pub fn security_violation(actor: impl Into<String>, violation: impl Into<String>) -> Self {
315 Self::new(
316 AuditLevel::Security,
317 AuditEventType::SecurityViolation,
318 actor,
319 violation,
320 )
321 }
322
323 pub fn config_change(
325 actor: impl Into<String>,
326 config_key: impl Into<String>,
327 old_value: impl Into<String>,
328 new_value: impl Into<String>,
329 ) -> Self {
330 Self::new(
331 AuditLevel::Compliance,
332 AuditEventType::ConfigurationChange,
333 actor,
334 format!("Configuration changed: {}", config_key.into()),
335 )
336 .with_metadata("old_value", old_value)
337 .with_metadata("new_value", new_value)
338 }
339
340 pub fn health_check(kernel_id: impl Into<String>, status: impl Into<String>) -> Self {
342 Self::new(
343 AuditLevel::Info,
344 AuditEventType::HealthCheck,
345 "health_checker",
346 format!("Health check: {}", status.into()),
347 )
348 .with_target(kernel_id)
349 }
350
351 pub fn to_json(&self) -> String {
353 let timestamp = self
354 .timestamp
355 .duration_since(UNIX_EPOCH)
356 .unwrap_or_default()
357 .as_millis();
358
359 let hlc_str = self
360 .hlc
361 .map(|h| {
362 format!(
363 r#","hlc":{{"wall":{},"logical":{}}}"#,
364 h.physical, h.logical
365 )
366 })
367 .unwrap_or_default();
368
369 let target_str = self
370 .target
371 .as_ref()
372 .map(|t| format!(r#","target":"{}""#, escape_json(t)))
373 .unwrap_or_default();
374
375 let prev_checksum_str = self
376 .prev_checksum
377 .map(|c| format!(r#","prev_checksum":{}"#, c))
378 .unwrap_or_default();
379
380 let metadata_str = if self.metadata.is_empty() {
381 String::new()
382 } else {
383 let pairs: Vec<String> = self
384 .metadata
385 .iter()
386 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
387 .collect();
388 format!(r#","metadata":{{{}}}"#, pairs.join(","))
389 };
390
391 format!(
392 r#"{{"id":{},"timestamp":{}{},"level":"{}","event_type":"{}","actor":"{}"{}"description":"{}"{}"checksum":{}{}}}"#,
393 self.id,
394 timestamp,
395 hlc_str,
396 self.level.as_str(),
397 self.event_type.as_str(),
398 escape_json(&self.actor),
399 target_str,
400 escape_json(&self.description),
401 metadata_str,
402 self.checksum,
403 prev_checksum_str,
404 )
405 }
406}
407
408fn escape_json(s: &str) -> String {
410 s.replace('\\', "\\\\")
411 .replace('"', "\\\"")
412 .replace('\n', "\\n")
413 .replace('\r', "\\r")
414 .replace('\t', "\\t")
415}
416
417static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
419
420fn next_event_id() -> u64 {
421 EVENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
422}
423
424pub trait AuditSink: Send + Sync {
430 fn write(&self, event: &AuditEvent) -> std::io::Result<()>;
432
433 fn flush(&self) -> std::io::Result<()>;
435
436 fn close(&self) -> std::io::Result<()>;
438}
439
440pub struct FileSink {
442 path: PathBuf,
443 writer: Mutex<Option<std::fs::File>>,
444 max_size: u64,
445 current_size: AtomicU64,
446}
447
448impl FileSink {
449 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
451 let path = path.into();
452 let file = std::fs::OpenOptions::new()
453 .create(true)
454 .append(true)
455 .open(&path)?;
456
457 let metadata = file.metadata()?;
458
459 Ok(Self {
460 path,
461 writer: Mutex::new(Some(file)),
462 max_size: 100 * 1024 * 1024, current_size: AtomicU64::new(metadata.len()),
464 })
465 }
466
467 pub fn with_max_size(mut self, size: u64) -> Self {
469 self.max_size = size;
470 self
471 }
472
473 fn rotate_if_needed(&self) -> std::io::Result<()> {
475 if self.current_size.load(Ordering::Relaxed) >= self.max_size {
476 let mut writer = self.writer.lock();
477 if let Some(file) = writer.take() {
478 drop(file);
479
480 let timestamp = SystemTime::now()
482 .duration_since(UNIX_EPOCH)
483 .unwrap_or_default()
484 .as_secs();
485 let rotated_path = self.path.with_extension(format!("log.{}", timestamp));
486 std::fs::rename(&self.path, rotated_path)?;
487
488 let new_file = std::fs::OpenOptions::new()
490 .create(true)
491 .append(true)
492 .open(&self.path)?;
493 *writer = Some(new_file);
494 self.current_size.store(0, Ordering::Relaxed);
495 }
496 }
497 Ok(())
498 }
499}
500
501impl AuditSink for FileSink {
502 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
503 self.rotate_if_needed()?;
504
505 let json = event.to_json();
506 let line = format!("{}\n", json);
507 let len = line.len() as u64;
508
509 let mut writer = self.writer.lock();
510 if let Some(file) = writer.as_mut() {
511 file.write_all(line.as_bytes())?;
512 self.current_size.fetch_add(len, Ordering::Relaxed);
513 }
514 Ok(())
515 }
516
517 fn flush(&self) -> std::io::Result<()> {
518 let mut writer = self.writer.lock();
519 if let Some(file) = writer.as_mut() {
520 file.flush()?;
521 }
522 Ok(())
523 }
524
525 fn close(&self) -> std::io::Result<()> {
526 let mut writer = self.writer.lock();
527 if let Some(file) = writer.take() {
528 drop(file);
529 }
530 Ok(())
531 }
532}
533
534#[derive(Default)]
536pub struct MemorySink {
537 events: Mutex<VecDeque<AuditEvent>>,
538 max_events: usize,
539}
540
541impl MemorySink {
542 pub fn new(max_events: usize) -> Self {
544 Self {
545 events: Mutex::new(VecDeque::with_capacity(max_events)),
546 max_events,
547 }
548 }
549
550 pub fn events(&self) -> Vec<AuditEvent> {
552 self.events.lock().iter().cloned().collect()
553 }
554
555 pub fn len(&self) -> usize {
557 self.events.lock().len()
558 }
559
560 pub fn is_empty(&self) -> bool {
562 self.events.lock().is_empty()
563 }
564
565 pub fn clear(&self) {
567 self.events.lock().clear();
568 }
569}
570
571impl AuditSink for MemorySink {
572 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
573 let mut events = self.events.lock();
574 if events.len() >= self.max_events {
575 events.pop_front();
576 }
577 events.push_back(event.clone());
578 Ok(())
579 }
580
581 fn flush(&self) -> std::io::Result<()> {
582 Ok(())
583 }
584
585 fn close(&self) -> std::io::Result<()> {
586 Ok(())
587 }
588}
589
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596#[repr(u8)]
597pub enum SyslogFacility {
598 Kern = 0,
600 User = 1,
602 Auth = 4,
604 AuthPriv = 10,
606 Local0 = 16,
608 Local1 = 17,
610 Local2 = 18,
612 Local3 = 19,
614 Local4 = 20,
616 Local5 = 21,
618 Local6 = 22,
620 Local7 = 23,
622}
623
624#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626#[repr(u8)]
627pub enum SyslogSeverity {
628 Emergency = 0,
630 Alert = 1,
632 Critical = 2,
634 Error = 3,
636 Warning = 4,
638 Notice = 5,
640 Informational = 6,
642 Debug = 7,
644}
645
646impl From<AuditLevel> for SyslogSeverity {
647 fn from(level: AuditLevel) -> Self {
648 match level {
649 AuditLevel::Info => SyslogSeverity::Informational,
650 AuditLevel::Warning => SyslogSeverity::Warning,
651 AuditLevel::Security => SyslogSeverity::Notice,
652 AuditLevel::Critical => SyslogSeverity::Error,
653 AuditLevel::Compliance => SyslogSeverity::Notice,
654 }
655 }
656}
657
658#[derive(Debug, Clone)]
660pub struct SyslogConfig {
661 pub server_addr: String,
663 pub facility: SyslogFacility,
665 pub app_name: String,
667 pub procid: Option<String>,
669 pub msgid: Option<String>,
671 pub rfc5424: bool,
673}
674
675impl Default for SyslogConfig {
676 fn default() -> Self {
677 Self {
678 server_addr: "127.0.0.1:514".to_string(),
679 facility: SyslogFacility::Local0,
680 app_name: "ringkernel".to_string(),
681 procid: None,
682 msgid: None,
683 rfc5424: true,
684 }
685 }
686}
687
688pub struct SyslogSink {
690 config: SyslogConfig,
691 socket: Mutex<Option<UdpSocket>>,
692 hostname: String,
693}
694
695impl SyslogSink {
696 pub fn new(config: SyslogConfig) -> std::io::Result<Self> {
698 let socket = UdpSocket::bind("0.0.0.0:0")?;
699 socket.connect(&config.server_addr)?;
700
701 let hostname = std::env::var("HOSTNAME")
703 .or_else(|_| std::env::var("HOST"))
704 .unwrap_or_else(|_| "localhost".to_string());
705
706 Ok(Self {
707 config,
708 socket: Mutex::new(Some(socket)),
709 hostname,
710 })
711 }
712
713 pub fn with_server(server_addr: impl Into<String>) -> std::io::Result<Self> {
715 Self::new(SyslogConfig {
716 server_addr: server_addr.into(),
717 ..Default::default()
718 })
719 }
720
721 fn format_rfc5424(&self, event: &AuditEvent) -> String {
723 let severity: SyslogSeverity = event.level.into();
724 let priority = (self.config.facility as u8) * 8 + (severity as u8);
725
726 let timestamp = event
728 .timestamp
729 .duration_since(UNIX_EPOCH)
730 .unwrap_or_default();
731 let secs = timestamp.as_secs();
732 let millis = timestamp.subsec_millis();
733
734 let epoch_days = secs / 86400;
736 let day_secs = secs % 86400;
737 let hours = day_secs / 3600;
738 let minutes = (day_secs % 3600) / 60;
739 let seconds = day_secs % 60;
740
741 let year = 1970 + (epoch_days / 365);
743 let day_of_year = epoch_days % 365;
744 let month = (day_of_year / 30).min(11) + 1;
745 let day = (day_of_year % 30) + 1;
746
747 let timestamp_str = format!(
748 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
749 year, month, day, hours, minutes, seconds, millis
750 );
751
752 let procid = self.config.procid.as_deref().unwrap_or("-");
753 let msgid = self.config.msgid.as_deref().unwrap_or("-");
754
755 let sd = format!(
757 "[ringkernel@12345 level=\"{}\" event_type=\"{}\" actor=\"{}\" checksum=\"{}\"]",
758 event.level.as_str(),
759 event.event_type.as_str(),
760 event.actor,
761 event.checksum
762 );
763
764 format!(
765 "<{}>{} {} {} {} {} {} {} {}",
766 priority,
767 1, timestamp_str,
769 self.hostname,
770 self.config.app_name,
771 procid,
772 msgid,
773 sd,
774 event.description
775 )
776 }
777
778 fn format_bsd(&self, event: &AuditEvent) -> String {
780 let severity: SyslogSeverity = event.level.into();
781 let priority = (self.config.facility as u8) * 8 + (severity as u8);
782
783 let timestamp = event
784 .timestamp
785 .duration_since(UNIX_EPOCH)
786 .unwrap_or_default();
787 let secs = timestamp.as_secs();
788
789 let months = [
791 "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
792 ];
793 let epoch_days = secs / 86400;
794 let day_secs = secs % 86400;
795 let hours = day_secs / 3600;
796 let minutes = (day_secs % 3600) / 60;
797 let seconds = day_secs % 60;
798
799 let day_of_year = epoch_days % 365;
800 let month_idx = ((day_of_year / 30) as usize).min(11);
801 let day = (day_of_year % 30) + 1;
802
803 let timestamp_str = format!(
804 "{} {:2} {:02}:{:02}:{:02}",
805 months[month_idx], day, hours, minutes, seconds
806 );
807
808 format!(
809 "<{}>{} {} {}: [{}] {}",
810 priority,
811 timestamp_str,
812 self.hostname,
813 self.config.app_name,
814 event.event_type.as_str(),
815 event.description
816 )
817 }
818}
819
820impl AuditSink for SyslogSink {
821 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
822 let message = if self.config.rfc5424 {
823 self.format_rfc5424(event)
824 } else {
825 self.format_bsd(event)
826 };
827
828 let socket = self.socket.lock();
829 if let Some(ref sock) = *socket {
830 sock.send(message.as_bytes())?;
831 }
832 Ok(())
833 }
834
835 fn flush(&self) -> std::io::Result<()> {
836 Ok(())
837 }
838
839 fn close(&self) -> std::io::Result<()> {
840 let mut socket = self.socket.lock();
841 *socket = None;
842 Ok(())
843 }
844}
845
846#[cfg(feature = "alerting")]
852#[derive(Debug, Clone)]
853pub struct ElasticsearchConfig {
854 pub url: String,
856 pub index_pattern: String,
858 pub auth: Option<(String, String)>,
860 pub batch_size: usize,
862 pub timeout: Duration,
864}
865
866#[cfg(feature = "alerting")]
867impl Default for ElasticsearchConfig {
868 fn default() -> Self {
869 Self {
870 url: "http://localhost:9200".to_string(),
871 index_pattern: "ringkernel-audit".to_string(),
872 auth: None,
873 batch_size: 100,
874 timeout: Duration::from_secs(30),
875 }
876 }
877}
878
879#[cfg(feature = "alerting")]
881pub struct ElasticsearchSink {
882 config: ElasticsearchConfig,
883 client: reqwest::blocking::Client,
884 buffer: Mutex<Vec<String>>,
885}
886
887#[cfg(feature = "alerting")]
888impl ElasticsearchSink {
889 pub fn new(config: ElasticsearchConfig) -> Result<Self, reqwest::Error> {
891 let client = reqwest::blocking::Client::builder()
892 .timeout(config.timeout)
893 .build()?;
894
895 Ok(Self {
896 config,
897 client,
898 buffer: Mutex::new(Vec::new()),
899 })
900 }
901
902 fn get_index(&self, event: &AuditEvent) -> String {
904 let timestamp = event
905 .timestamp
906 .duration_since(UNIX_EPOCH)
907 .unwrap_or_default();
908 let secs = timestamp.as_secs();
909
910 let epoch_days = secs / 86400;
912 let year = 1970 + (epoch_days / 365);
913 let day_of_year = epoch_days % 365;
914 let month = (day_of_year / 30).min(11) + 1;
915 let day = (day_of_year % 30) + 1;
916
917 let date_str = format!("{:04}.{:02}.{:02}", year, month, day);
918
919 self.config
920 .index_pattern
921 .replace("{date}", &date_str)
922 .replace("{year}", &format!("{:04}", year))
923 .replace("{month}", &format!("{:02}", month))
924 .replace("{day}", &format!("{:02}", day))
925 }
926
927 fn to_es_document(&self, event: &AuditEvent) -> String {
929 let timestamp_millis = event
930 .timestamp
931 .duration_since(UNIX_EPOCH)
932 .unwrap_or_default()
933 .as_millis();
934
935 let metadata_json = if event.metadata.is_empty() {
937 "{}".to_string()
938 } else {
939 let pairs: Vec<String> = event
940 .metadata
941 .iter()
942 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
943 .collect();
944 format!("{{{}}}", pairs.join(","))
945 };
946
947 let hlc_json = event
948 .hlc
949 .map(|h| {
950 format!(
951 r#","hlc":{{"physical":{},"logical":{}}}"#,
952 h.physical, h.logical
953 )
954 })
955 .unwrap_or_default();
956
957 let target_json = event
958 .target
959 .as_ref()
960 .map(|t| format!(r#","target":"{}""#, escape_json(t)))
961 .unwrap_or_default();
962
963 format!(
964 r#"{{"@timestamp":{},"id":{},"level":"{}","event_type":"{}","actor":"{}"{}{}"description":"{}","metadata":{},"checksum":{}}}"#,
965 timestamp_millis,
966 event.id,
967 event.level.as_str(),
968 event.event_type.as_str(),
969 escape_json(&event.actor),
970 target_json,
971 hlc_json,
972 escape_json(&event.description),
973 metadata_json,
974 event.checksum
975 )
976 }
977
978 fn flush_buffer(&self) -> std::io::Result<()> {
980 let documents: Vec<String> = {
981 let mut buffer = self.buffer.lock();
982 std::mem::take(&mut *buffer)
983 };
984
985 if documents.is_empty() {
986 return Ok(());
987 }
988
989 let mut bulk_body = String::new();
991 for doc in documents {
992 bulk_body.push_str(&format!(
994 r#"{{"index":{{"_index":"{}"}}}}"#,
995 self.config.index_pattern.replace("{date}", "current")
996 ));
997 bulk_body.push('\n');
998 bulk_body.push_str(&doc);
1000 bulk_body.push('\n');
1001 }
1002
1003 let url = format!("{}/_bulk", self.config.url);
1004 let mut request = self
1005 .client
1006 .post(&url)
1007 .body(bulk_body)
1008 .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson");
1009
1010 if let Some((user, pass)) = &self.config.auth {
1011 request = request.basic_auth(user, Some(pass));
1012 }
1013
1014 request.send().map_err(|e| {
1015 std::io::Error::new(
1016 std::io::ErrorKind::Other,
1017 format!("ES request failed: {}", e),
1018 )
1019 })?;
1020
1021 Ok(())
1022 }
1023}
1024
1025#[cfg(feature = "alerting")]
1026impl AuditSink for ElasticsearchSink {
1027 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1028 let doc = self.to_es_document(event);
1029
1030 let should_flush = {
1031 let mut buffer = self.buffer.lock();
1032 buffer.push(doc);
1033 buffer.len() >= self.config.batch_size
1034 };
1035
1036 if should_flush {
1037 self.flush_buffer()?;
1038 }
1039
1040 Ok(())
1041 }
1042
1043 fn flush(&self) -> std::io::Result<()> {
1044 self.flush_buffer()
1045 }
1046
1047 fn close(&self) -> std::io::Result<()> {
1048 self.flush_buffer()
1049 }
1050}
1051
1052#[derive(Debug, Clone)]
1058pub struct CloudWatchConfig {
1059 pub log_group: String,
1061 pub log_stream: String,
1063 pub region: String,
1065 pub batch_size: usize,
1067}
1068
1069impl Default for CloudWatchConfig {
1070 fn default() -> Self {
1071 Self {
1072 log_group: "/ringkernel/audit".to_string(),
1073 log_stream: "default".to_string(),
1074 region: "us-east-1".to_string(),
1075 batch_size: 100,
1076 }
1077 }
1078}
1079
1080pub struct CloudWatchSink {
1085 config: CloudWatchConfig,
1086 buffer: Mutex<Vec<(u64, String)>>, _sequence_token: Mutex<Option<String>>,
1088}
1089
1090impl CloudWatchSink {
1091 pub fn new(config: CloudWatchConfig) -> Self {
1093 Self {
1094 config,
1095 buffer: Mutex::new(Vec::new()),
1096 _sequence_token: Mutex::new(None),
1097 }
1098 }
1099
1100 pub fn config(&self) -> &CloudWatchConfig {
1102 &self.config
1103 }
1104
1105 pub fn buffer_size(&self) -> usize {
1107 self.buffer.lock().len()
1108 }
1109}
1110
1111impl AuditSink for CloudWatchSink {
1112 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1113 let timestamp_ms = event
1114 .timestamp
1115 .duration_since(UNIX_EPOCH)
1116 .unwrap_or_default()
1117 .as_millis() as u64;
1118
1119 let message = event.to_json();
1120
1121 let should_flush = {
1122 let mut buffer = self.buffer.lock();
1123 buffer.push((timestamp_ms, message));
1124 buffer.len() >= self.config.batch_size
1125 };
1126
1127 if should_flush {
1128 self.flush()?;
1129 }
1130
1131 Ok(())
1132 }
1133
1134 fn flush(&self) -> std::io::Result<()> {
1135 let events: Vec<(u64, String)> = {
1136 let mut buffer = self.buffer.lock();
1137 std::mem::take(&mut *buffer)
1138 };
1139
1140 if events.is_empty() {
1141 return Ok(());
1142 }
1143
1144 tracing::warn!(
1148 event_count = events.len(),
1149 log_group = %self.config.log_group,
1150 log_stream = %self.config.log_stream,
1151 "CloudWatch sink is a stub: {} audit events dropped. \
1152 Enable the `cloudwatch` feature for real AWS integration.",
1153 events.len(),
1154 );
1155
1156 Ok(())
1157 }
1158
1159 fn close(&self) -> std::io::Result<()> {
1160 self.flush()
1161 }
1162}
1163
1164#[derive(Debug, Clone)]
1170pub struct AuditConfig {
1171 pub min_level: AuditLevel,
1173 pub enable_checksums: bool,
1175 pub buffer_size: usize,
1177 pub flush_interval: Duration,
1179 pub retention: Duration,
1181}
1182
1183impl Default for AuditConfig {
1184 fn default() -> Self {
1185 Self {
1186 min_level: AuditLevel::Info,
1187 enable_checksums: true,
1188 buffer_size: 100,
1189 flush_interval: Duration::from_secs(5),
1190 retention: Duration::from_secs(90 * 24 * 60 * 60), }
1192 }
1193}
1194
1195pub struct AuditLoggerBuilder {
1197 config: AuditConfig,
1198 sinks: Vec<Arc<dyn AuditSink>>,
1199}
1200
1201impl AuditLoggerBuilder {
1202 pub fn new() -> Self {
1204 Self {
1205 config: AuditConfig::default(),
1206 sinks: Vec::new(),
1207 }
1208 }
1209
1210 pub fn with_min_level(mut self, level: AuditLevel) -> Self {
1212 self.config.min_level = level;
1213 self
1214 }
1215
1216 pub fn with_file_sink(mut self, path: impl Into<PathBuf>) -> std::io::Result<Self> {
1218 let sink = Arc::new(FileSink::new(path)?);
1219 self.sinks.push(sink);
1220 Ok(self)
1221 }
1222
1223 pub fn with_memory_sink(mut self, max_events: usize) -> Self {
1225 let sink = Arc::new(MemorySink::new(max_events));
1226 self.sinks.push(sink);
1227 self
1228 }
1229
1230 pub fn with_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
1232 self.sinks.push(sink);
1233 self
1234 }
1235
1236 pub fn with_syslog_sink(mut self, config: SyslogConfig) -> std::io::Result<Self> {
1238 let sink = Arc::new(SyslogSink::new(config)?);
1239 self.sinks.push(sink);
1240 Ok(self)
1241 }
1242
1243 pub fn with_syslog(mut self, server_addr: impl Into<String>) -> std::io::Result<Self> {
1245 let sink = Arc::new(SyslogSink::with_server(server_addr)?);
1246 self.sinks.push(sink);
1247 Ok(self)
1248 }
1249
1250 pub fn with_cloudwatch_sink(mut self, config: CloudWatchConfig) -> Self {
1252 let sink = Arc::new(CloudWatchSink::new(config));
1253 self.sinks.push(sink);
1254 self
1255 }
1256
1257 #[cfg(feature = "alerting")]
1259 pub fn with_elasticsearch_sink(
1260 mut self,
1261 config: ElasticsearchConfig,
1262 ) -> Result<Self, reqwest::Error> {
1263 let sink = Arc::new(ElasticsearchSink::new(config)?);
1264 self.sinks.push(sink);
1265 Ok(self)
1266 }
1267
1268 pub fn with_retention(mut self, retention: Duration) -> Self {
1270 self.config.retention = retention;
1271 self
1272 }
1273
1274 pub fn with_checksums(mut self, enable: bool) -> Self {
1276 self.config.enable_checksums = enable;
1277 self
1278 }
1279
1280 pub fn build(self) -> AuditLogger {
1282 AuditLogger {
1283 config: self.config,
1284 sinks: self.sinks,
1285 last_checksum: AtomicU64::new(0),
1286 event_count: AtomicU64::new(0),
1287 buffer: RwLock::new(Vec::new()),
1288 }
1289 }
1290}
1291
1292impl Default for AuditLoggerBuilder {
1293 fn default() -> Self {
1294 Self::new()
1295 }
1296}
1297
1298pub struct AuditLogger {
1300 config: AuditConfig,
1301 sinks: Vec<Arc<dyn AuditSink>>,
1302 last_checksum: AtomicU64,
1303 event_count: AtomicU64,
1304 buffer: RwLock<Vec<AuditEvent>>,
1305}
1306
1307impl AuditLogger {
1308 pub fn builder() -> AuditLoggerBuilder {
1310 AuditLoggerBuilder::new()
1311 }
1312
1313 pub fn in_memory(max_events: usize) -> Self {
1315 AuditLoggerBuilder::new()
1316 .with_memory_sink(max_events)
1317 .build()
1318 }
1319
1320 pub fn log(&self, mut event: AuditEvent) {
1322 if event.level < self.config.min_level {
1324 return;
1325 }
1326
1327 if self.config.enable_checksums {
1329 let prev = self.last_checksum.load(Ordering::Acquire);
1330 event = event.with_prev_checksum(prev);
1331 self.last_checksum.store(event.checksum, Ordering::Release);
1332 }
1333
1334 for sink in &self.sinks {
1336 if let Err(e) = sink.write(&event) {
1337 tracing::error!("Audit sink error: {}", e);
1338 }
1339 }
1340
1341 self.event_count.fetch_add(1, Ordering::Relaxed);
1342 }
1343
1344 pub fn log_kernel_launched(&self, kernel_id: &str, backend: &str) {
1346 self.log(AuditEvent::kernel_launched(kernel_id, backend));
1347 }
1348
1349 pub fn log_kernel_terminated(&self, kernel_id: &str, reason: &str) {
1351 self.log(AuditEvent::kernel_terminated(kernel_id, reason));
1352 }
1353
1354 pub fn log_security_violation(&self, actor: &str, violation: &str) {
1356 self.log(AuditEvent::security_violation(actor, violation));
1357 }
1358
1359 pub fn log_config_change(&self, actor: &str, key: &str, old_value: &str, new_value: &str) {
1361 self.log(AuditEvent::config_change(actor, key, old_value, new_value));
1362 }
1363
1364 pub fn event_count(&self) -> u64 {
1366 self.event_count.load(Ordering::Relaxed)
1367 }
1368
1369 pub fn buffer_event(&self, event: AuditEvent) {
1373 let mut buffer = self.buffer.write();
1374 buffer.push(event);
1375 }
1376
1377 pub fn flush_buffered(&self) -> std::io::Result<()> {
1379 let events: Vec<AuditEvent> = {
1380 let mut buffer = self.buffer.write();
1381 std::mem::take(&mut *buffer)
1382 };
1383
1384 for mut event in events {
1385 if self.config.enable_checksums {
1387 let prev = self.last_checksum.load(Ordering::Acquire);
1388 event = event.with_prev_checksum(prev);
1389 self.last_checksum.store(event.checksum, Ordering::Release);
1390 }
1391
1392 for sink in &self.sinks {
1394 sink.write(&event)?;
1395 }
1396
1397 self.event_count.fetch_add(1, Ordering::Relaxed);
1398 }
1399
1400 self.flush()
1401 }
1402
1403 pub fn buffered_count(&self) -> usize {
1405 self.buffer.read().len()
1406 }
1407
1408 pub fn flush(&self) -> std::io::Result<()> {
1410 for sink in &self.sinks {
1411 sink.flush()?;
1412 }
1413 Ok(())
1414 }
1415
1416 pub fn close(&self) -> std::io::Result<()> {
1418 for sink in &self.sinks {
1419 sink.close()?;
1420 }
1421 Ok(())
1422 }
1423}
1424
1425#[cfg(test)]
1430mod tests {
1431 use super::*;
1432
1433 #[test]
1434 fn test_audit_event_creation() {
1435 let event = AuditEvent::new(
1436 AuditLevel::Info,
1437 AuditEventType::KernelLaunched,
1438 "runtime",
1439 "Kernel launched",
1440 );
1441
1442 assert_eq!(event.level, AuditLevel::Info);
1443 assert_eq!(event.event_type, AuditEventType::KernelLaunched);
1444 assert_eq!(event.actor, "runtime");
1445 assert!(event.checksum != 0);
1446 }
1447
1448 #[test]
1449 fn test_audit_event_checksum() {
1450 let event = AuditEvent::kernel_launched("test_kernel", "cuda");
1451 assert!(event.verify_checksum());
1452
1453 let mut modified = event.clone();
1455 modified.description = "Modified".to_string();
1456 assert!(!modified.verify_checksum());
1457 }
1458
1459 #[test]
1460 fn test_audit_event_chain() {
1461 let event1 = AuditEvent::kernel_launched("k1", "cuda");
1462 let event2 = AuditEvent::kernel_launched("k2", "cuda").with_prev_checksum(event1.checksum);
1463
1464 assert_eq!(event2.prev_checksum, Some(event1.checksum));
1465 }
1466
1467 #[test]
1468 fn test_audit_event_json() {
1469 let event = AuditEvent::kernel_launched("test", "cuda")
1470 .with_metadata("gpu_id", "0")
1471 .with_metadata("memory_mb", "8192");
1472
1473 let json = event.to_json();
1474 assert!(json.contains("kernel_launched"));
1475 assert!(json.contains("test"));
1476 assert!(json.contains("cuda"));
1477 assert!(json.contains("gpu_id"));
1478 }
1479
1480 #[test]
1481 fn test_memory_sink() {
1482 let sink = MemorySink::new(10);
1483
1484 let event = AuditEvent::kernel_launched("test", "cuda");
1485 sink.write(&event).unwrap();
1486
1487 assert_eq!(sink.len(), 1);
1488 assert!(!sink.is_empty());
1489
1490 let events = sink.events();
1491 assert_eq!(events[0].event_type, AuditEventType::KernelLaunched);
1492 }
1493
1494 #[test]
1495 fn test_memory_sink_rotation() {
1496 let sink = MemorySink::new(3);
1497
1498 for i in 0..5 {
1499 let event = AuditEvent::new(
1500 AuditLevel::Info,
1501 AuditEventType::Custom(format!("event_{}", i)),
1502 "test",
1503 format!("Event {}", i),
1504 );
1505 sink.write(&event).unwrap();
1506 }
1507
1508 assert_eq!(sink.len(), 3);
1510 let events = sink.events();
1511 assert_eq!(
1512 events[0].event_type,
1513 AuditEventType::Custom("event_2".to_string())
1514 );
1515 }
1516
1517 #[test]
1518 fn test_audit_logger() {
1519 let logger = AuditLogger::in_memory(100);
1520
1521 logger.log_kernel_launched("k1", "cuda");
1522 logger.log_kernel_terminated("k1", "shutdown");
1523 logger.log_security_violation("user", "unauthorized access");
1524
1525 assert_eq!(logger.event_count(), 3);
1526 }
1527
1528 #[test]
1529 fn test_audit_level_ordering() {
1530 assert!(AuditLevel::Info < AuditLevel::Warning);
1531 assert!(AuditLevel::Warning < AuditLevel::Security);
1532 assert!(AuditLevel::Security < AuditLevel::Critical);
1533 assert!(AuditLevel::Critical < AuditLevel::Compliance);
1534 }
1535
1536 #[test]
1537 fn test_audit_event_helpers() {
1538 let event = AuditEvent::config_change("admin", "max_kernels", "10", "20");
1539 assert_eq!(event.level, AuditLevel::Compliance);
1540 assert_eq!(event.metadata.len(), 2);
1541
1542 let health = AuditEvent::health_check("kernel_1", "healthy");
1543 assert_eq!(health.event_type, AuditEventType::HealthCheck);
1544 }
1545
1546 #[test]
1547 fn test_syslog_severity_conversion() {
1548 assert_eq!(
1549 SyslogSeverity::from(AuditLevel::Info),
1550 SyslogSeverity::Informational
1551 );
1552 assert_eq!(
1553 SyslogSeverity::from(AuditLevel::Warning),
1554 SyslogSeverity::Warning
1555 );
1556 assert_eq!(
1557 SyslogSeverity::from(AuditLevel::Security),
1558 SyslogSeverity::Notice
1559 );
1560 assert_eq!(
1561 SyslogSeverity::from(AuditLevel::Critical),
1562 SyslogSeverity::Error
1563 );
1564 }
1565
1566 #[test]
1567 fn test_syslog_config_default() {
1568 let config = SyslogConfig::default();
1569 assert_eq!(config.server_addr, "127.0.0.1:514");
1570 assert_eq!(config.facility, SyslogFacility::Local0);
1571 assert_eq!(config.app_name, "ringkernel");
1572 assert!(config.rfc5424);
1573 }
1574
1575 #[test]
1576 fn test_cloudwatch_config_default() {
1577 let config = CloudWatchConfig::default();
1578 assert_eq!(config.log_group, "/ringkernel/audit");
1579 assert_eq!(config.log_stream, "default");
1580 assert_eq!(config.region, "us-east-1");
1581 assert_eq!(config.batch_size, 100);
1582 }
1583
1584 #[test]
1585 fn test_cloudwatch_sink_buffering() {
1586 let config = CloudWatchConfig {
1587 batch_size: 5,
1588 ..Default::default()
1589 };
1590 let sink = CloudWatchSink::new(config);
1591
1592 for i in 0..3 {
1594 let event = AuditEvent::new(
1595 AuditLevel::Info,
1596 AuditEventType::Custom(format!("event_{}", i)),
1597 "test",
1598 format!("Event {}", i),
1599 );
1600 sink.write(&event).unwrap();
1601 }
1602
1603 assert_eq!(sink.buffer_size(), 3);
1604 }
1605
1606 #[test]
1607 fn test_syslog_facility_values() {
1608 assert_eq!(SyslogFacility::Kern as u8, 0);
1609 assert_eq!(SyslogFacility::User as u8, 1);
1610 assert_eq!(SyslogFacility::Auth as u8, 4);
1611 assert_eq!(SyslogFacility::Local0 as u8, 16);
1612 assert_eq!(SyslogFacility::Local7 as u8, 23);
1613 }
1614
1615 #[test]
1616 fn test_syslog_severity_values() {
1617 assert_eq!(SyslogSeverity::Emergency as u8, 0);
1618 assert_eq!(SyslogSeverity::Alert as u8, 1);
1619 assert_eq!(SyslogSeverity::Critical as u8, 2);
1620 assert_eq!(SyslogSeverity::Error as u8, 3);
1621 assert_eq!(SyslogSeverity::Warning as u8, 4);
1622 assert_eq!(SyslogSeverity::Notice as u8, 5);
1623 assert_eq!(SyslogSeverity::Informational as u8, 6);
1624 assert_eq!(SyslogSeverity::Debug as u8, 7);
1625 }
1626}