1use std::collections::VecDeque;
28use std::fmt;
29use std::io::Write;
30use std::net::UdpSocket;
31use std::path::PathBuf;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, SystemTime, UNIX_EPOCH};
35
36use parking_lot::{Mutex, RwLock};
37
38use crate::hlc::HlcTimestamp;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46#[repr(u8)]
47pub enum AuditLevel {
48 Info = 0,
50 Warning = 1,
52 Security = 2,
54 Critical = 3,
56 Compliance = 4,
58}
59
60impl AuditLevel {
61 pub fn as_str(&self) -> &'static str {
63 match self {
64 Self::Info => "INFO",
65 Self::Warning => "WARNING",
66 Self::Security => "SECURITY",
67 Self::Critical => "CRITICAL",
68 Self::Compliance => "COMPLIANCE",
69 }
70 }
71}
72
73impl fmt::Display for AuditLevel {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 write!(f, "{}", self.as_str())
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub enum AuditEventType {
86 KernelLaunched,
89 KernelTerminated,
91 KernelMigrated,
93 KernelCheckpointed,
95 KernelRestored,
97
98 MessageSent,
101 MessageReceived,
103 MessageFailed,
105
106 AuthenticationAttempt,
109 AuthorizationCheck,
111 ConfigurationChange,
113 SecurityViolation,
115
116 MemoryAllocated,
119 MemoryDeallocated,
121 ResourceLimitExceeded,
123
124 HealthCheck,
127 CircuitBreakerStateChange,
129 DegradationChange,
131
132 Custom(String),
134}
135
136impl AuditEventType {
137 pub fn as_str(&self) -> &str {
139 match self {
140 Self::KernelLaunched => "kernel_launched",
141 Self::KernelTerminated => "kernel_terminated",
142 Self::KernelMigrated => "kernel_migrated",
143 Self::KernelCheckpointed => "kernel_checkpointed",
144 Self::KernelRestored => "kernel_restored",
145 Self::MessageSent => "message_sent",
146 Self::MessageReceived => "message_received",
147 Self::MessageFailed => "message_failed",
148 Self::AuthenticationAttempt => "authentication_attempt",
149 Self::AuthorizationCheck => "authorization_check",
150 Self::ConfigurationChange => "configuration_change",
151 Self::SecurityViolation => "security_violation",
152 Self::MemoryAllocated => "memory_allocated",
153 Self::MemoryDeallocated => "memory_deallocated",
154 Self::ResourceLimitExceeded => "resource_limit_exceeded",
155 Self::HealthCheck => "health_check",
156 Self::CircuitBreakerStateChange => "circuit_breaker_state_change",
157 Self::DegradationChange => "degradation_change",
158 Self::Custom(s) => s.as_str(),
159 }
160 }
161}
162
163impl fmt::Display for AuditEventType {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 write!(f, "{}", self.as_str())
166 }
167}
168
169#[derive(Debug, Clone)]
175pub struct AuditEvent {
176 pub id: u64,
178 pub timestamp: SystemTime,
180 pub hlc: Option<HlcTimestamp>,
182 pub level: AuditLevel,
184 pub event_type: AuditEventType,
186 pub actor: String,
188 pub target: Option<String>,
190 pub description: String,
192 pub metadata: Vec<(String, String)>,
194 pub prev_checksum: Option<u64>,
196 pub checksum: u64,
198}
199
200impl AuditEvent {
201 pub fn new(
203 level: AuditLevel,
204 event_type: AuditEventType,
205 actor: impl Into<String>,
206 description: impl Into<String>,
207 ) -> Self {
208 let id = next_event_id();
209 let timestamp = SystemTime::now();
210 let actor = actor.into();
211 let description = description.into();
212
213 let mut event = Self {
214 id,
215 timestamp,
216 hlc: None,
217 level,
218 event_type,
219 actor,
220 target: None,
221 description,
222 metadata: Vec::new(),
223 prev_checksum: None,
224 checksum: 0,
225 };
226
227 event.checksum = event.compute_checksum();
228 event
229 }
230
231 pub fn with_hlc(mut self, hlc: HlcTimestamp) -> Self {
233 self.hlc = Some(hlc);
234 self.checksum = self.compute_checksum();
235 self
236 }
237
238 pub fn with_target(mut self, target: impl Into<String>) -> Self {
240 self.target = Some(target.into());
241 self.checksum = self.compute_checksum();
242 self
243 }
244
245 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
247 self.metadata.push((key.into(), value.into()));
248 self.checksum = self.compute_checksum();
249 self
250 }
251
252 pub fn with_prev_checksum(mut self, checksum: u64) -> Self {
254 self.prev_checksum = Some(checksum);
255 self.checksum = self.compute_checksum();
256 self
257 }
258
259 fn compute_checksum(&self) -> u64 {
261 use std::collections::hash_map::DefaultHasher;
262 use std::hash::{Hash, Hasher};
263
264 let mut hasher = DefaultHasher::new();
265 self.id.hash(&mut hasher);
266 self.timestamp
267 .duration_since(UNIX_EPOCH)
268 .unwrap_or_default()
269 .as_nanos()
270 .hash(&mut hasher);
271 self.level.as_str().hash(&mut hasher);
272 self.event_type.as_str().hash(&mut hasher);
273 self.actor.hash(&mut hasher);
274 self.target.hash(&mut hasher);
275 self.description.hash(&mut hasher);
276 for (k, v) in &self.metadata {
277 k.hash(&mut hasher);
278 v.hash(&mut hasher);
279 }
280 self.prev_checksum.hash(&mut hasher);
281 hasher.finish()
282 }
283
284 pub fn verify_checksum(&self) -> bool {
286 self.checksum == self.compute_checksum()
287 }
288
289 pub fn kernel_launched(kernel_id: impl Into<String>, backend: impl Into<String>) -> Self {
293 Self::new(
294 AuditLevel::Info,
295 AuditEventType::KernelLaunched,
296 "runtime",
297 format!("Kernel launched on {}", backend.into()),
298 )
299 .with_target(kernel_id)
300 }
301
302 pub fn kernel_terminated(kernel_id: impl Into<String>, reason: impl Into<String>) -> Self {
304 Self::new(
305 AuditLevel::Info,
306 AuditEventType::KernelTerminated,
307 "runtime",
308 format!("Kernel terminated: {}", reason.into()),
309 )
310 .with_target(kernel_id)
311 }
312
313 pub fn security_violation(actor: impl Into<String>, violation: impl Into<String>) -> Self {
315 Self::new(
316 AuditLevel::Security,
317 AuditEventType::SecurityViolation,
318 actor,
319 violation,
320 )
321 }
322
323 pub fn config_change(
325 actor: impl Into<String>,
326 config_key: impl Into<String>,
327 old_value: impl Into<String>,
328 new_value: impl Into<String>,
329 ) -> Self {
330 Self::new(
331 AuditLevel::Compliance,
332 AuditEventType::ConfigurationChange,
333 actor,
334 format!("Configuration changed: {}", config_key.into()),
335 )
336 .with_metadata("old_value", old_value)
337 .with_metadata("new_value", new_value)
338 }
339
340 pub fn health_check(kernel_id: impl Into<String>, status: impl Into<String>) -> Self {
342 Self::new(
343 AuditLevel::Info,
344 AuditEventType::HealthCheck,
345 "health_checker",
346 format!("Health check: {}", status.into()),
347 )
348 .with_target(kernel_id)
349 }
350
351 pub fn to_json(&self) -> String {
353 let timestamp = self
354 .timestamp
355 .duration_since(UNIX_EPOCH)
356 .unwrap_or_default()
357 .as_millis();
358
359 let hlc_str = self
360 .hlc
361 .map(|h| {
362 format!(
363 r#","hlc":{{"wall":{},"logical":{}}}"#,
364 h.physical, h.logical
365 )
366 })
367 .unwrap_or_default();
368
369 let target_str = self
370 .target
371 .as_ref()
372 .map(|t| format!(r#","target":"{}""#, escape_json(t)))
373 .unwrap_or_default();
374
375 let prev_checksum_str = self
376 .prev_checksum
377 .map(|c| format!(r#","prev_checksum":{}"#, c))
378 .unwrap_or_default();
379
380 let metadata_str = if self.metadata.is_empty() {
381 String::new()
382 } else {
383 let pairs: Vec<String> = self
384 .metadata
385 .iter()
386 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
387 .collect();
388 format!(r#","metadata":{{{}}}"#, pairs.join(","))
389 };
390
391 format!(
392 r#"{{"id":{},"timestamp":{}{},"level":"{}","event_type":"{}","actor":"{}"{}"description":"{}"{}"checksum":{}{}}}"#,
393 self.id,
394 timestamp,
395 hlc_str,
396 self.level.as_str(),
397 self.event_type.as_str(),
398 escape_json(&self.actor),
399 target_str,
400 escape_json(&self.description),
401 metadata_str,
402 self.checksum,
403 prev_checksum_str,
404 )
405 }
406}
407
408fn escape_json(s: &str) -> String {
410 s.replace('\\', "\\\\")
411 .replace('"', "\\\"")
412 .replace('\n', "\\n")
413 .replace('\r', "\\r")
414 .replace('\t', "\\t")
415}
416
417static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
419
420fn next_event_id() -> u64 {
421 EVENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
422}
423
424pub trait AuditSink: Send + Sync {
430 fn write(&self, event: &AuditEvent) -> std::io::Result<()>;
432
433 fn flush(&self) -> std::io::Result<()>;
435
436 fn close(&self) -> std::io::Result<()>;
438}
439
440pub struct FileSink {
442 path: PathBuf,
443 writer: Mutex<Option<std::fs::File>>,
444 max_size: u64,
445 current_size: AtomicU64,
446}
447
448impl FileSink {
449 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
451 let path = path.into();
452 let file = std::fs::OpenOptions::new()
453 .create(true)
454 .append(true)
455 .open(&path)?;
456
457 let metadata = file.metadata()?;
458
459 Ok(Self {
460 path,
461 writer: Mutex::new(Some(file)),
462 max_size: 100 * 1024 * 1024, current_size: AtomicU64::new(metadata.len()),
464 })
465 }
466
467 pub fn with_max_size(mut self, size: u64) -> Self {
469 self.max_size = size;
470 self
471 }
472
473 fn rotate_if_needed(&self) -> std::io::Result<()> {
475 if self.current_size.load(Ordering::Relaxed) >= self.max_size {
476 let mut writer = self.writer.lock();
477 if let Some(file) = writer.take() {
478 drop(file);
479
480 let timestamp = SystemTime::now()
482 .duration_since(UNIX_EPOCH)
483 .unwrap_or_default()
484 .as_secs();
485 let rotated_path = self.path.with_extension(format!("log.{}", timestamp));
486 std::fs::rename(&self.path, rotated_path)?;
487
488 let new_file = std::fs::OpenOptions::new()
490 .create(true)
491 .append(true)
492 .open(&self.path)?;
493 *writer = Some(new_file);
494 self.current_size.store(0, Ordering::Relaxed);
495 }
496 }
497 Ok(())
498 }
499}
500
501impl AuditSink for FileSink {
502 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
503 self.rotate_if_needed()?;
504
505 let json = event.to_json();
506 let line = format!("{}\n", json);
507 let len = line.len() as u64;
508
509 let mut writer = self.writer.lock();
510 if let Some(file) = writer.as_mut() {
511 file.write_all(line.as_bytes())?;
512 self.current_size.fetch_add(len, Ordering::Relaxed);
513 }
514 Ok(())
515 }
516
517 fn flush(&self) -> std::io::Result<()> {
518 let mut writer = self.writer.lock();
519 if let Some(file) = writer.as_mut() {
520 file.flush()?;
521 }
522 Ok(())
523 }
524
525 fn close(&self) -> std::io::Result<()> {
526 let mut writer = self.writer.lock();
527 if let Some(file) = writer.take() {
528 drop(file);
529 }
530 Ok(())
531 }
532}
533
534#[derive(Default)]
536pub struct MemorySink {
537 events: Mutex<VecDeque<AuditEvent>>,
538 max_events: usize,
539}
540
541impl MemorySink {
542 pub fn new(max_events: usize) -> Self {
544 Self {
545 events: Mutex::new(VecDeque::with_capacity(max_events)),
546 max_events,
547 }
548 }
549
550 pub fn events(&self) -> Vec<AuditEvent> {
552 self.events.lock().iter().cloned().collect()
553 }
554
555 pub fn len(&self) -> usize {
557 self.events.lock().len()
558 }
559
560 pub fn is_empty(&self) -> bool {
562 self.events.lock().is_empty()
563 }
564
565 pub fn clear(&self) {
567 self.events.lock().clear();
568 }
569}
570
571impl AuditSink for MemorySink {
572 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
573 let mut events = self.events.lock();
574 if events.len() >= self.max_events {
575 events.pop_front();
576 }
577 events.push_back(event.clone());
578 Ok(())
579 }
580
581 fn flush(&self) -> std::io::Result<()> {
582 Ok(())
583 }
584
585 fn close(&self) -> std::io::Result<()> {
586 Ok(())
587 }
588}
589
590#[derive(Debug, Clone, Copy, PartialEq, Eq)]
596#[repr(u8)]
597pub enum SyslogFacility {
598 Kern = 0,
600 User = 1,
602 Auth = 4,
604 AuthPriv = 10,
606 Local0 = 16,
608 Local1 = 17,
610 Local2 = 18,
612 Local3 = 19,
614 Local4 = 20,
616 Local5 = 21,
618 Local6 = 22,
620 Local7 = 23,
622}
623
624#[derive(Debug, Clone, Copy, PartialEq, Eq)]
626#[repr(u8)]
627pub enum SyslogSeverity {
628 Emergency = 0,
630 Alert = 1,
632 Critical = 2,
634 Error = 3,
636 Warning = 4,
638 Notice = 5,
640 Informational = 6,
642 Debug = 7,
644}
645
646impl From<AuditLevel> for SyslogSeverity {
647 fn from(level: AuditLevel) -> Self {
648 match level {
649 AuditLevel::Info => SyslogSeverity::Informational,
650 AuditLevel::Warning => SyslogSeverity::Warning,
651 AuditLevel::Security => SyslogSeverity::Notice,
652 AuditLevel::Critical => SyslogSeverity::Error,
653 AuditLevel::Compliance => SyslogSeverity::Notice,
654 }
655 }
656}
657
658#[derive(Debug, Clone)]
660pub struct SyslogConfig {
661 pub server_addr: String,
663 pub facility: SyslogFacility,
665 pub app_name: String,
667 pub procid: Option<String>,
669 pub msgid: Option<String>,
671 pub rfc5424: bool,
673}
674
675impl Default for SyslogConfig {
676 fn default() -> Self {
677 Self {
678 server_addr: "127.0.0.1:514".to_string(),
679 facility: SyslogFacility::Local0,
680 app_name: "ringkernel".to_string(),
681 procid: None,
682 msgid: None,
683 rfc5424: true,
684 }
685 }
686}
687
688pub struct SyslogSink {
690 config: SyslogConfig,
691 socket: Mutex<Option<UdpSocket>>,
692 hostname: String,
693}
694
695impl SyslogSink {
696 pub fn new(config: SyslogConfig) -> std::io::Result<Self> {
698 let socket = UdpSocket::bind("0.0.0.0:0")?;
699 socket.connect(&config.server_addr)?;
700
701 let hostname = std::env::var("HOSTNAME")
703 .or_else(|_| std::env::var("HOST"))
704 .unwrap_or_else(|_| "localhost".to_string());
705
706 Ok(Self {
707 config,
708 socket: Mutex::new(Some(socket)),
709 hostname,
710 })
711 }
712
713 pub fn with_server(server_addr: impl Into<String>) -> std::io::Result<Self> {
715 Self::new(SyslogConfig {
716 server_addr: server_addr.into(),
717 ..Default::default()
718 })
719 }
720
721 fn format_rfc5424(&self, event: &AuditEvent) -> String {
723 let severity: SyslogSeverity = event.level.into();
724 let priority = (self.config.facility as u8) * 8 + (severity as u8);
725
726 let timestamp = event
728 .timestamp
729 .duration_since(UNIX_EPOCH)
730 .unwrap_or_default();
731 let secs = timestamp.as_secs();
732 let millis = timestamp.subsec_millis();
733
734 let epoch_days = secs / 86400;
736 let day_secs = secs % 86400;
737 let hours = day_secs / 3600;
738 let minutes = (day_secs % 3600) / 60;
739 let seconds = day_secs % 60;
740
741 let year = 1970 + (epoch_days / 365);
743 let day_of_year = epoch_days % 365;
744 let month = (day_of_year / 30).min(11) + 1;
745 let day = (day_of_year % 30) + 1;
746
747 let timestamp_str = format!(
748 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
749 year, month, day, hours, minutes, seconds, millis
750 );
751
752 let procid = self.config.procid.as_deref().unwrap_or("-");
753 let msgid = self.config.msgid.as_deref().unwrap_or("-");
754
755 let sd = format!(
757 "[ringkernel@12345 level=\"{}\" event_type=\"{}\" actor=\"{}\" checksum=\"{}\"]",
758 event.level.as_str(),
759 event.event_type.as_str(),
760 event.actor,
761 event.checksum
762 );
763
764 format!(
765 "<{}>{} {} {} {} {} {} {} {}",
766 priority,
767 1, timestamp_str,
769 self.hostname,
770 self.config.app_name,
771 procid,
772 msgid,
773 sd,
774 event.description
775 )
776 }
777
778 fn format_bsd(&self, event: &AuditEvent) -> String {
780 let severity: SyslogSeverity = event.level.into();
781 let priority = (self.config.facility as u8) * 8 + (severity as u8);
782
783 let timestamp = event
784 .timestamp
785 .duration_since(UNIX_EPOCH)
786 .unwrap_or_default();
787 let secs = timestamp.as_secs();
788
789 let months = [
791 "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
792 ];
793 let epoch_days = secs / 86400;
794 let day_secs = secs % 86400;
795 let hours = day_secs / 3600;
796 let minutes = (day_secs % 3600) / 60;
797 let seconds = day_secs % 60;
798
799 let day_of_year = epoch_days % 365;
800 let month_idx = ((day_of_year / 30) as usize).min(11);
801 let day = (day_of_year % 30) + 1;
802
803 let timestamp_str = format!(
804 "{} {:2} {:02}:{:02}:{:02}",
805 months[month_idx], day, hours, minutes, seconds
806 );
807
808 format!(
809 "<{}>{} {} {}: [{}] {}",
810 priority,
811 timestamp_str,
812 self.hostname,
813 self.config.app_name,
814 event.event_type.as_str(),
815 event.description
816 )
817 }
818}
819
820impl AuditSink for SyslogSink {
821 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
822 let message = if self.config.rfc5424 {
823 self.format_rfc5424(event)
824 } else {
825 self.format_bsd(event)
826 };
827
828 let socket = self.socket.lock();
829 if let Some(ref sock) = *socket {
830 sock.send(message.as_bytes())?;
831 }
832 Ok(())
833 }
834
835 fn flush(&self) -> std::io::Result<()> {
836 Ok(())
837 }
838
839 fn close(&self) -> std::io::Result<()> {
840 let mut socket = self.socket.lock();
841 *socket = None;
842 Ok(())
843 }
844}
845
846#[cfg(feature = "alerting")]
852#[derive(Debug, Clone)]
853pub struct ElasticsearchConfig {
854 pub url: String,
856 pub index_pattern: String,
858 pub auth: Option<(String, String)>,
860 pub batch_size: usize,
862 pub timeout: Duration,
864}
865
866#[cfg(feature = "alerting")]
867impl Default for ElasticsearchConfig {
868 fn default() -> Self {
869 Self {
870 url: "http://localhost:9200".to_string(),
871 index_pattern: "ringkernel-audit".to_string(),
872 auth: None,
873 batch_size: 100,
874 timeout: Duration::from_secs(30),
875 }
876 }
877}
878
879#[cfg(feature = "alerting")]
881pub struct ElasticsearchSink {
882 config: ElasticsearchConfig,
883 client: reqwest::blocking::Client,
884 buffer: Mutex<Vec<String>>,
885}
886
887#[cfg(feature = "alerting")]
888impl ElasticsearchSink {
889 pub fn new(config: ElasticsearchConfig) -> Result<Self, reqwest::Error> {
891 let client = reqwest::blocking::Client::builder()
892 .timeout(config.timeout)
893 .build()?;
894
895 Ok(Self {
896 config,
897 client,
898 buffer: Mutex::new(Vec::new()),
899 })
900 }
901
902 fn get_index(&self, event: &AuditEvent) -> String {
904 let timestamp = event
905 .timestamp
906 .duration_since(UNIX_EPOCH)
907 .unwrap_or_default();
908 let secs = timestamp.as_secs();
909
910 let epoch_days = secs / 86400;
912 let year = 1970 + (epoch_days / 365);
913 let day_of_year = epoch_days % 365;
914 let month = (day_of_year / 30).min(11) + 1;
915 let day = (day_of_year % 30) + 1;
916
917 let date_str = format!("{:04}.{:02}.{:02}", year, month, day);
918
919 self.config
920 .index_pattern
921 .replace("{date}", &date_str)
922 .replace("{year}", &format!("{:04}", year))
923 .replace("{month}", &format!("{:02}", month))
924 .replace("{day}", &format!("{:02}", day))
925 }
926
927 fn to_es_document(&self, event: &AuditEvent) -> String {
929 let timestamp_millis = event
930 .timestamp
931 .duration_since(UNIX_EPOCH)
932 .unwrap_or_default()
933 .as_millis();
934
935 let metadata_json = if event.metadata.is_empty() {
937 "{}".to_string()
938 } else {
939 let pairs: Vec<String> = event
940 .metadata
941 .iter()
942 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
943 .collect();
944 format!("{{{}}}", pairs.join(","))
945 };
946
947 let hlc_json = event
948 .hlc
949 .map(|h| {
950 format!(
951 r#","hlc":{{"physical":{},"logical":{}}}"#,
952 h.physical, h.logical
953 )
954 })
955 .unwrap_or_default();
956
957 let target_json = event
958 .target
959 .as_ref()
960 .map(|t| format!(r#","target":"{}""#, escape_json(t)))
961 .unwrap_or_default();
962
963 format!(
964 r#"{{"@timestamp":{},"id":{},"level":"{}","event_type":"{}","actor":"{}"{}{}"description":"{}","metadata":{},"checksum":{}}}"#,
965 timestamp_millis,
966 event.id,
967 event.level.as_str(),
968 event.event_type.as_str(),
969 escape_json(&event.actor),
970 target_json,
971 hlc_json,
972 escape_json(&event.description),
973 metadata_json,
974 event.checksum
975 )
976 }
977
978 fn flush_buffer(&self) -> std::io::Result<()> {
980 let documents: Vec<String> = {
981 let mut buffer = self.buffer.lock();
982 std::mem::take(&mut *buffer)
983 };
984
985 if documents.is_empty() {
986 return Ok(());
987 }
988
989 let mut bulk_body = String::new();
991 for doc in documents {
992 bulk_body.push_str(&format!(
994 r#"{{"index":{{"_index":"{}"}}}}"#,
995 self.config.index_pattern.replace("{date}", "current")
996 ));
997 bulk_body.push('\n');
998 bulk_body.push_str(&doc);
1000 bulk_body.push('\n');
1001 }
1002
1003 let url = format!("{}/_bulk", self.config.url);
1004 let mut request = self
1005 .client
1006 .post(&url)
1007 .body(bulk_body)
1008 .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson");
1009
1010 if let Some((user, pass)) = &self.config.auth {
1011 request = request.basic_auth(user, Some(pass));
1012 }
1013
1014 request.send().map_err(|e| {
1015 std::io::Error::new(
1016 std::io::ErrorKind::Other,
1017 format!("ES request failed: {}", e),
1018 )
1019 })?;
1020
1021 Ok(())
1022 }
1023}
1024
1025#[cfg(feature = "alerting")]
1026impl AuditSink for ElasticsearchSink {
1027 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1028 let doc = self.to_es_document(event);
1029
1030 let should_flush = {
1031 let mut buffer = self.buffer.lock();
1032 buffer.push(doc);
1033 buffer.len() >= self.config.batch_size
1034 };
1035
1036 if should_flush {
1037 self.flush_buffer()?;
1038 }
1039
1040 Ok(())
1041 }
1042
1043 fn flush(&self) -> std::io::Result<()> {
1044 self.flush_buffer()
1045 }
1046
1047 fn close(&self) -> std::io::Result<()> {
1048 self.flush_buffer()
1049 }
1050}
1051
1052#[derive(Debug, Clone)]
1058pub struct CloudWatchConfig {
1059 pub log_group: String,
1061 pub log_stream: String,
1063 pub region: String,
1065 pub batch_size: usize,
1067 #[cfg(feature = "cloudwatch")]
1069 pub auto_create: bool,
1070 #[cfg(feature = "cloudwatch")]
1072 pub max_retries: u32,
1073}
1074
1075impl Default for CloudWatchConfig {
1076 fn default() -> Self {
1077 Self {
1078 log_group: "/ringkernel/audit".to_string(),
1079 log_stream: "default".to_string(),
1080 region: "us-east-1".to_string(),
1081 batch_size: 100,
1082 #[cfg(feature = "cloudwatch")]
1083 auto_create: true,
1084 #[cfg(feature = "cloudwatch")]
1085 max_retries: 3,
1086 }
1087 }
1088}
1089
1090pub struct CloudWatchSink {
1099 config: CloudWatchConfig,
1100 buffer: Mutex<Vec<(u64, String)>>, #[cfg_attr(not(feature = "cloudwatch"), allow(dead_code))]
1102 sequence_token: Mutex<Option<String>>,
1103 #[cfg(feature = "cloudwatch")]
1104 client: aws_sdk_cloudwatchlogs::Client,
1105 #[cfg(feature = "cloudwatch")]
1106 initialized: Mutex<bool>,
1107}
1108
1109impl CloudWatchSink {
1110 #[cfg(feature = "cloudwatch")]
1118 pub fn new(config: CloudWatchConfig) -> Self {
1119 let client = tokio::task::block_in_place(|| {
1120 tokio::runtime::Handle::current().block_on(async {
1121 let region = aws_sdk_cloudwatchlogs::config::Region::new(config.region.clone());
1122 let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
1123 .region(region)
1124 .load()
1125 .await;
1126 aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
1127 })
1128 });
1129
1130 Self {
1131 config,
1132 buffer: Mutex::new(Vec::new()),
1133 sequence_token: Mutex::new(None),
1134 client,
1135 initialized: Mutex::new(false),
1136 }
1137 }
1138
1139 #[cfg(not(feature = "cloudwatch"))]
1141 pub fn new(config: CloudWatchConfig) -> Self {
1142 Self {
1143 config,
1144 buffer: Mutex::new(Vec::new()),
1145 sequence_token: Mutex::new(None),
1146 }
1147 }
1148
1149 #[cfg(feature = "cloudwatch")]
1154 pub fn with_credentials(
1155 config: CloudWatchConfig,
1156 access_key: impl Into<String>,
1157 secret_key: impl Into<String>,
1158 ) -> Self {
1159 let access_key = access_key.into();
1160 let secret_key = secret_key.into();
1161 let client = tokio::task::block_in_place(|| {
1162 tokio::runtime::Handle::current().block_on(async {
1163 let region = aws_sdk_cloudwatchlogs::config::Region::new(config.region.clone());
1164 let creds = aws_sdk_cloudwatchlogs::config::Credentials::new(
1165 access_key,
1166 secret_key,
1167 None, None, "ringkernel",
1170 );
1171 let sdk_config = aws_config::defaults(aws_config::BehaviorVersion::latest())
1172 .region(region)
1173 .credentials_provider(creds)
1174 .load()
1175 .await;
1176 aws_sdk_cloudwatchlogs::Client::new(&sdk_config)
1177 })
1178 });
1179
1180 Self {
1181 config,
1182 buffer: Mutex::new(Vec::new()),
1183 sequence_token: Mutex::new(None),
1184 client,
1185 initialized: Mutex::new(false),
1186 }
1187 }
1188
1189 pub fn config(&self) -> &CloudWatchConfig {
1191 &self.config
1192 }
1193
1194 pub fn buffer_size(&self) -> usize {
1196 self.buffer.lock().len()
1197 }
1198
1199 #[cfg(feature = "cloudwatch")]
1202 fn ensure_log_group_and_stream(&self) -> std::io::Result<()> {
1203 {
1204 let initialized = self.initialized.lock();
1205 if *initialized {
1206 return Ok(());
1207 }
1208 }
1209
1210 if !self.config.auto_create {
1211 let mut initialized = self.initialized.lock();
1212 *initialized = true;
1213 return Ok(());
1214 }
1215
1216 tokio::task::block_in_place(|| {
1217 tokio::runtime::Handle::current().block_on(async {
1218 let create_group_result = self
1220 .client
1221 .create_log_group()
1222 .log_group_name(&self.config.log_group)
1223 .send()
1224 .await;
1225
1226 if let Err(e) = &create_group_result {
1227 let is_already_exists = e
1228 .as_service_error()
1229 .map(|se| se.is_resource_already_exists_exception())
1230 .unwrap_or(false);
1231 if !is_already_exists {
1232 return Err(std::io::Error::new(
1233 std::io::ErrorKind::Other,
1234 format!("Failed to create CloudWatch log group: {}", e),
1235 ));
1236 }
1237 }
1238
1239 let create_stream_result = self
1241 .client
1242 .create_log_stream()
1243 .log_group_name(&self.config.log_group)
1244 .log_stream_name(&self.config.log_stream)
1245 .send()
1246 .await;
1247
1248 if let Err(e) = &create_stream_result {
1249 let is_already_exists = e
1250 .as_service_error()
1251 .map(|se| se.is_resource_already_exists_exception())
1252 .unwrap_or(false);
1253 if !is_already_exists {
1254 return Err(std::io::Error::new(
1255 std::io::ErrorKind::Other,
1256 format!("Failed to create CloudWatch log stream: {}", e),
1257 ));
1258 }
1259 }
1260
1261 let mut initialized = self.initialized.lock();
1262 *initialized = true;
1263 Ok(())
1264 })
1265 })
1266 }
1267
1268 #[cfg(feature = "cloudwatch")]
1274 fn flush_to_cloudwatch(&self) -> std::io::Result<()> {
1275 let events: Vec<(u64, String)> = {
1276 let mut buffer = self.buffer.lock();
1277 std::mem::take(&mut *buffer)
1278 };
1279
1280 if events.is_empty() {
1281 return Ok(());
1282 }
1283
1284 self.ensure_log_group_and_stream()?;
1285
1286 let mut log_events: Vec<aws_sdk_cloudwatchlogs::types::InputLogEvent> = events
1289 .into_iter()
1290 .map(|(ts, msg)| {
1291 aws_sdk_cloudwatchlogs::types::InputLogEvent::builder()
1292 .timestamp(ts as i64)
1293 .message(msg)
1294 .build()
1295 .expect("InputLogEvent builder should not fail with timestamp and message set")
1296 })
1297 .collect();
1298
1299 log_events.sort_by_key(|e| e.timestamp());
1300
1301 tokio::task::block_in_place(|| {
1302 tokio::runtime::Handle::current().block_on(async {
1303 let mut retries = 0u32;
1304
1305 loop {
1306 let mut request = self
1307 .client
1308 .put_log_events()
1309 .log_group_name(&self.config.log_group)
1310 .log_stream_name(&self.config.log_stream);
1311
1312 {
1314 let token = self.sequence_token.lock();
1315 if let Some(ref tok) = *token {
1316 request = request.sequence_token(tok);
1317 }
1318 }
1319
1320 for event in &log_events {
1321 request = request.log_events(event.clone());
1322 }
1323
1324 match request.send().await {
1325 Ok(output) => {
1326 let mut token = self.sequence_token.lock();
1328 *token = output.next_sequence_token().map(|s| s.to_string());
1329
1330 tracing::debug!(
1331 event_count = log_events.len(),
1332 log_group = %self.config.log_group,
1333 log_stream = %self.config.log_stream,
1334 "Successfully uploaded {} audit events to CloudWatch Logs",
1335 log_events.len(),
1336 );
1337 return Ok(());
1338 }
1339 Err(e) => {
1340 if let Some(service_err) = e.as_service_error() {
1341 if service_err.is_invalid_sequence_token_exception() {
1346 if let aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError::InvalidSequenceTokenException(ref inner) = service_err {
1347 let mut token = self.sequence_token.lock();
1348 *token = inner.expected_sequence_token().map(|s| s.to_string());
1349 }
1350
1351 if retries < self.config.max_retries {
1352 retries += 1;
1353 continue;
1354 }
1355 }
1356
1357 if service_err.is_data_already_accepted_exception() {
1361 if let aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError::DataAlreadyAcceptedException(ref inner) = service_err {
1362 let mut token = self.sequence_token.lock();
1363 *token = inner.expected_sequence_token().map(|s| s.to_string());
1364 }
1365 tracing::debug!(
1366 "CloudWatch PutLogEvents: data already accepted, skipping"
1367 );
1368 return Ok(());
1369 }
1370
1371 if service_err.is_service_unavailable_exception()
1374 && retries < self.config.max_retries
1375 {
1376 retries += 1;
1377 let backoff = std::time::Duration::from_millis(
1378 100 * 2u64.pow(retries),
1379 );
1380 tracing::warn!(
1381 retry = retries,
1382 backoff_ms = backoff.as_millis() as u64,
1383 "CloudWatch PutLogEvents service unavailable, retrying"
1384 );
1385 tokio::time::sleep(backoff).await;
1386 continue;
1387 }
1388 }
1389
1390 {
1393 use aws_sdk_cloudwatchlogs::error::ProvideErrorMetadata;
1394 let is_throttled = e
1395 .as_service_error()
1396 .and_then(|se| se.code())
1397 .map(|code| {
1398 code == "Throttling"
1399 || code == "ThrottlingException"
1400 || code == "TooManyRequestsException"
1401 })
1402 .unwrap_or(false);
1403
1404 if is_throttled && retries < self.config.max_retries {
1405 retries += 1;
1406 let backoff = std::time::Duration::from_millis(
1407 100 * 2u64.pow(retries),
1408 );
1409 tracing::warn!(
1410 retry = retries,
1411 backoff_ms = backoff.as_millis() as u64,
1412 "CloudWatch PutLogEvents throttled, retrying"
1413 );
1414 tokio::time::sleep(backoff).await;
1415 continue;
1416 }
1417 }
1418
1419 return Err(std::io::Error::new(
1420 std::io::ErrorKind::Other,
1421 format!("CloudWatch PutLogEvents failed: {}", e),
1422 ));
1423 }
1424 }
1425 }
1426 })
1427 })
1428 }
1429
1430 #[cfg(not(feature = "cloudwatch"))]
1432 fn flush_stub(&self) -> std::io::Result<()> {
1433 let events: Vec<(u64, String)> = {
1434 let mut buffer = self.buffer.lock();
1435 std::mem::take(&mut *buffer)
1436 };
1437
1438 if events.is_empty() {
1439 return Ok(());
1440 }
1441
1442 tracing::warn!(
1446 event_count = events.len(),
1447 log_group = %self.config.log_group,
1448 log_stream = %self.config.log_stream,
1449 "CloudWatch sink is a stub: {} audit events dropped. \
1450 Enable the `cloudwatch` feature for real AWS integration.",
1451 events.len(),
1452 );
1453
1454 Ok(())
1455 }
1456}
1457
1458impl AuditSink for CloudWatchSink {
1459 fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
1460 let timestamp_ms = event
1461 .timestamp
1462 .duration_since(UNIX_EPOCH)
1463 .unwrap_or_default()
1464 .as_millis() as u64;
1465
1466 let message = event.to_json();
1467
1468 let should_flush = {
1469 let mut buffer = self.buffer.lock();
1470 buffer.push((timestamp_ms, message));
1471 buffer.len() >= self.config.batch_size
1472 };
1473
1474 if should_flush {
1475 self.flush()?;
1476 }
1477
1478 Ok(())
1479 }
1480
1481 fn flush(&self) -> std::io::Result<()> {
1482 #[cfg(feature = "cloudwatch")]
1483 {
1484 self.flush_to_cloudwatch()
1485 }
1486 #[cfg(not(feature = "cloudwatch"))]
1487 {
1488 self.flush_stub()
1489 }
1490 }
1491
1492 fn close(&self) -> std::io::Result<()> {
1493 self.flush()
1494 }
1495}
1496
1497#[derive(Debug, Clone)]
1503pub struct AuditConfig {
1504 pub min_level: AuditLevel,
1506 pub enable_checksums: bool,
1508 pub buffer_size: usize,
1510 pub flush_interval: Duration,
1512 pub retention: Duration,
1514}
1515
1516impl Default for AuditConfig {
1517 fn default() -> Self {
1518 Self {
1519 min_level: AuditLevel::Info,
1520 enable_checksums: true,
1521 buffer_size: 100,
1522 flush_interval: Duration::from_secs(5),
1523 retention: Duration::from_secs(90 * 24 * 60 * 60), }
1525 }
1526}
1527
1528pub struct AuditLoggerBuilder {
1530 config: AuditConfig,
1531 sinks: Vec<Arc<dyn AuditSink>>,
1532}
1533
1534impl AuditLoggerBuilder {
1535 pub fn new() -> Self {
1537 Self {
1538 config: AuditConfig::default(),
1539 sinks: Vec::new(),
1540 }
1541 }
1542
1543 pub fn with_min_level(mut self, level: AuditLevel) -> Self {
1545 self.config.min_level = level;
1546 self
1547 }
1548
1549 pub fn with_file_sink(mut self, path: impl Into<PathBuf>) -> std::io::Result<Self> {
1551 let sink = Arc::new(FileSink::new(path)?);
1552 self.sinks.push(sink);
1553 Ok(self)
1554 }
1555
1556 pub fn with_memory_sink(mut self, max_events: usize) -> Self {
1558 let sink = Arc::new(MemorySink::new(max_events));
1559 self.sinks.push(sink);
1560 self
1561 }
1562
1563 pub fn with_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
1565 self.sinks.push(sink);
1566 self
1567 }
1568
1569 pub fn with_syslog_sink(mut self, config: SyslogConfig) -> std::io::Result<Self> {
1571 let sink = Arc::new(SyslogSink::new(config)?);
1572 self.sinks.push(sink);
1573 Ok(self)
1574 }
1575
1576 pub fn with_syslog(mut self, server_addr: impl Into<String>) -> std::io::Result<Self> {
1578 let sink = Arc::new(SyslogSink::with_server(server_addr)?);
1579 self.sinks.push(sink);
1580 Ok(self)
1581 }
1582
1583 pub fn with_cloudwatch_sink(mut self, config: CloudWatchConfig) -> Self {
1585 let sink = Arc::new(CloudWatchSink::new(config));
1586 self.sinks.push(sink);
1587 self
1588 }
1589
1590 #[cfg(feature = "alerting")]
1592 pub fn with_elasticsearch_sink(
1593 mut self,
1594 config: ElasticsearchConfig,
1595 ) -> Result<Self, reqwest::Error> {
1596 let sink = Arc::new(ElasticsearchSink::new(config)?);
1597 self.sinks.push(sink);
1598 Ok(self)
1599 }
1600
1601 pub fn with_retention(mut self, retention: Duration) -> Self {
1603 self.config.retention = retention;
1604 self
1605 }
1606
1607 pub fn with_checksums(mut self, enable: bool) -> Self {
1609 self.config.enable_checksums = enable;
1610 self
1611 }
1612
1613 pub fn build(self) -> AuditLogger {
1615 AuditLogger {
1616 config: self.config,
1617 sinks: self.sinks,
1618 last_checksum: AtomicU64::new(0),
1619 event_count: AtomicU64::new(0),
1620 buffer: RwLock::new(Vec::new()),
1621 }
1622 }
1623}
1624
1625impl Default for AuditLoggerBuilder {
1626 fn default() -> Self {
1627 Self::new()
1628 }
1629}
1630
1631pub struct AuditLogger {
1633 config: AuditConfig,
1634 sinks: Vec<Arc<dyn AuditSink>>,
1635 last_checksum: AtomicU64,
1636 event_count: AtomicU64,
1637 buffer: RwLock<Vec<AuditEvent>>,
1638}
1639
1640impl AuditLogger {
1641 pub fn builder() -> AuditLoggerBuilder {
1643 AuditLoggerBuilder::new()
1644 }
1645
1646 pub fn in_memory(max_events: usize) -> Self {
1648 AuditLoggerBuilder::new()
1649 .with_memory_sink(max_events)
1650 .build()
1651 }
1652
1653 pub fn log(&self, mut event: AuditEvent) {
1655 if event.level < self.config.min_level {
1657 return;
1658 }
1659
1660 if self.config.enable_checksums {
1662 let prev = self.last_checksum.load(Ordering::Acquire);
1663 event = event.with_prev_checksum(prev);
1664 self.last_checksum.store(event.checksum, Ordering::Release);
1665 }
1666
1667 for sink in &self.sinks {
1669 if let Err(e) = sink.write(&event) {
1670 tracing::error!("Audit sink error: {}", e);
1671 }
1672 }
1673
1674 self.event_count.fetch_add(1, Ordering::Relaxed);
1675 }
1676
1677 pub fn log_kernel_launched(&self, kernel_id: &str, backend: &str) {
1679 self.log(AuditEvent::kernel_launched(kernel_id, backend));
1680 }
1681
1682 pub fn log_kernel_terminated(&self, kernel_id: &str, reason: &str) {
1684 self.log(AuditEvent::kernel_terminated(kernel_id, reason));
1685 }
1686
1687 pub fn log_security_violation(&self, actor: &str, violation: &str) {
1689 self.log(AuditEvent::security_violation(actor, violation));
1690 }
1691
1692 pub fn log_config_change(&self, actor: &str, key: &str, old_value: &str, new_value: &str) {
1694 self.log(AuditEvent::config_change(actor, key, old_value, new_value));
1695 }
1696
1697 pub fn event_count(&self) -> u64 {
1699 self.event_count.load(Ordering::Relaxed)
1700 }
1701
1702 pub fn buffer_event(&self, event: AuditEvent) {
1706 let mut buffer = self.buffer.write();
1707 buffer.push(event);
1708 }
1709
1710 pub fn flush_buffered(&self) -> std::io::Result<()> {
1712 let events: Vec<AuditEvent> = {
1713 let mut buffer = self.buffer.write();
1714 std::mem::take(&mut *buffer)
1715 };
1716
1717 for mut event in events {
1718 if self.config.enable_checksums {
1720 let prev = self.last_checksum.load(Ordering::Acquire);
1721 event = event.with_prev_checksum(prev);
1722 self.last_checksum.store(event.checksum, Ordering::Release);
1723 }
1724
1725 for sink in &self.sinks {
1727 sink.write(&event)?;
1728 }
1729
1730 self.event_count.fetch_add(1, Ordering::Relaxed);
1731 }
1732
1733 self.flush()
1734 }
1735
1736 pub fn buffered_count(&self) -> usize {
1738 self.buffer.read().len()
1739 }
1740
1741 pub fn flush(&self) -> std::io::Result<()> {
1743 for sink in &self.sinks {
1744 sink.flush()?;
1745 }
1746 Ok(())
1747 }
1748
1749 pub fn close(&self) -> std::io::Result<()> {
1751 for sink in &self.sinks {
1752 sink.close()?;
1753 }
1754 Ok(())
1755 }
1756}
1757
1758#[cfg(test)]
1763mod tests {
1764 use super::*;
1765
1766 #[test]
1767 fn test_audit_event_creation() {
1768 let event = AuditEvent::new(
1769 AuditLevel::Info,
1770 AuditEventType::KernelLaunched,
1771 "runtime",
1772 "Kernel launched",
1773 );
1774
1775 assert_eq!(event.level, AuditLevel::Info);
1776 assert_eq!(event.event_type, AuditEventType::KernelLaunched);
1777 assert_eq!(event.actor, "runtime");
1778 assert!(event.checksum != 0);
1779 }
1780
1781 #[test]
1782 fn test_audit_event_checksum() {
1783 let event = AuditEvent::kernel_launched("test_kernel", "cuda");
1784 assert!(event.verify_checksum());
1785
1786 let mut modified = event.clone();
1788 modified.description = "Modified".to_string();
1789 assert!(!modified.verify_checksum());
1790 }
1791
1792 #[test]
1793 fn test_audit_event_chain() {
1794 let event1 = AuditEvent::kernel_launched("k1", "cuda");
1795 let event2 = AuditEvent::kernel_launched("k2", "cuda").with_prev_checksum(event1.checksum);
1796
1797 assert_eq!(event2.prev_checksum, Some(event1.checksum));
1798 }
1799
1800 #[test]
1801 fn test_audit_event_json() {
1802 let event = AuditEvent::kernel_launched("test", "cuda")
1803 .with_metadata("gpu_id", "0")
1804 .with_metadata("memory_mb", "8192");
1805
1806 let json = event.to_json();
1807 assert!(json.contains("kernel_launched"));
1808 assert!(json.contains("test"));
1809 assert!(json.contains("cuda"));
1810 assert!(json.contains("gpu_id"));
1811 }
1812
1813 #[test]
1814 fn test_memory_sink() {
1815 let sink = MemorySink::new(10);
1816
1817 let event = AuditEvent::kernel_launched("test", "cuda");
1818 sink.write(&event).unwrap();
1819
1820 assert_eq!(sink.len(), 1);
1821 assert!(!sink.is_empty());
1822
1823 let events = sink.events();
1824 assert_eq!(events[0].event_type, AuditEventType::KernelLaunched);
1825 }
1826
1827 #[test]
1828 fn test_memory_sink_rotation() {
1829 let sink = MemorySink::new(3);
1830
1831 for i in 0..5 {
1832 let event = AuditEvent::new(
1833 AuditLevel::Info,
1834 AuditEventType::Custom(format!("event_{}", i)),
1835 "test",
1836 format!("Event {}", i),
1837 );
1838 sink.write(&event).unwrap();
1839 }
1840
1841 assert_eq!(sink.len(), 3);
1843 let events = sink.events();
1844 assert_eq!(
1845 events[0].event_type,
1846 AuditEventType::Custom("event_2".to_string())
1847 );
1848 }
1849
1850 #[test]
1851 fn test_audit_logger() {
1852 let logger = AuditLogger::in_memory(100);
1853
1854 logger.log_kernel_launched("k1", "cuda");
1855 logger.log_kernel_terminated("k1", "shutdown");
1856 logger.log_security_violation("user", "unauthorized access");
1857
1858 assert_eq!(logger.event_count(), 3);
1859 }
1860
1861 #[test]
1862 fn test_audit_level_ordering() {
1863 assert!(AuditLevel::Info < AuditLevel::Warning);
1864 assert!(AuditLevel::Warning < AuditLevel::Security);
1865 assert!(AuditLevel::Security < AuditLevel::Critical);
1866 assert!(AuditLevel::Critical < AuditLevel::Compliance);
1867 }
1868
1869 #[test]
1870 fn test_audit_event_helpers() {
1871 let event = AuditEvent::config_change("admin", "max_kernels", "10", "20");
1872 assert_eq!(event.level, AuditLevel::Compliance);
1873 assert_eq!(event.metadata.len(), 2);
1874
1875 let health = AuditEvent::health_check("kernel_1", "healthy");
1876 assert_eq!(health.event_type, AuditEventType::HealthCheck);
1877 }
1878
1879 #[test]
1880 fn test_syslog_severity_conversion() {
1881 assert_eq!(
1882 SyslogSeverity::from(AuditLevel::Info),
1883 SyslogSeverity::Informational
1884 );
1885 assert_eq!(
1886 SyslogSeverity::from(AuditLevel::Warning),
1887 SyslogSeverity::Warning
1888 );
1889 assert_eq!(
1890 SyslogSeverity::from(AuditLevel::Security),
1891 SyslogSeverity::Notice
1892 );
1893 assert_eq!(
1894 SyslogSeverity::from(AuditLevel::Critical),
1895 SyslogSeverity::Error
1896 );
1897 }
1898
1899 #[test]
1900 fn test_syslog_config_default() {
1901 let config = SyslogConfig::default();
1902 assert_eq!(config.server_addr, "127.0.0.1:514");
1903 assert_eq!(config.facility, SyslogFacility::Local0);
1904 assert_eq!(config.app_name, "ringkernel");
1905 assert!(config.rfc5424);
1906 }
1907
1908 #[test]
1909 fn test_cloudwatch_config_default() {
1910 let config = CloudWatchConfig::default();
1911 assert_eq!(config.log_group, "/ringkernel/audit");
1912 assert_eq!(config.log_stream, "default");
1913 assert_eq!(config.region, "us-east-1");
1914 assert_eq!(config.batch_size, 100);
1915 }
1916
1917 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1918 async fn test_cloudwatch_sink_buffering() {
1919 let config = CloudWatchConfig {
1920 batch_size: 5,
1921 ..Default::default()
1922 };
1923 let sink = CloudWatchSink::new(config);
1924
1925 for i in 0..3 {
1927 let event = AuditEvent::new(
1928 AuditLevel::Info,
1929 AuditEventType::Custom(format!("event_{}", i)),
1930 "test",
1931 format!("Event {}", i),
1932 );
1933 sink.write(&event).unwrap();
1934 }
1935
1936 assert_eq!(sink.buffer_size(), 3);
1937 }
1938
1939 #[test]
1940 fn test_syslog_facility_values() {
1941 assert_eq!(SyslogFacility::Kern as u8, 0);
1942 assert_eq!(SyslogFacility::User as u8, 1);
1943 assert_eq!(SyslogFacility::Auth as u8, 4);
1944 assert_eq!(SyslogFacility::Local0 as u8, 16);
1945 assert_eq!(SyslogFacility::Local7 as u8, 23);
1946 }
1947
1948 #[test]
1949 fn test_syslog_severity_values() {
1950 assert_eq!(SyslogSeverity::Emergency as u8, 0);
1951 assert_eq!(SyslogSeverity::Alert as u8, 1);
1952 assert_eq!(SyslogSeverity::Critical as u8, 2);
1953 assert_eq!(SyslogSeverity::Error as u8, 3);
1954 assert_eq!(SyslogSeverity::Warning as u8, 4);
1955 assert_eq!(SyslogSeverity::Notice as u8, 5);
1956 assert_eq!(SyslogSeverity::Informational as u8, 6);
1957 assert_eq!(SyslogSeverity::Debug as u8, 7);
1958 }
1959}