use std::collections::VecDeque;
use std::fmt;
use std::io::Write;
use std::net::UdpSocket;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use parking_lot::{Mutex, RwLock};
use crate::hlc::HlcTimestamp;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum AuditLevel {
Info = 0,
Warning = 1,
Security = 2,
Critical = 3,
Compliance = 4,
}
impl AuditLevel {
pub fn as_str(&self) -> &'static str {
match self {
Self::Info => "INFO",
Self::Warning => "WARNING",
Self::Security => "SECURITY",
Self::Critical => "CRITICAL",
Self::Compliance => "COMPLIANCE",
}
}
}
impl fmt::Display for AuditLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum AuditEventType {
KernelLaunched,
KernelTerminated,
KernelMigrated,
KernelCheckpointed,
KernelRestored,
MessageSent,
MessageReceived,
MessageFailed,
AuthenticationAttempt,
AuthorizationCheck,
ConfigurationChange,
SecurityViolation,
MemoryAllocated,
MemoryDeallocated,
ResourceLimitExceeded,
HealthCheck,
CircuitBreakerStateChange,
DegradationChange,
Custom(String),
}
impl AuditEventType {
pub fn as_str(&self) -> &str {
match self {
Self::KernelLaunched => "kernel_launched",
Self::KernelTerminated => "kernel_terminated",
Self::KernelMigrated => "kernel_migrated",
Self::KernelCheckpointed => "kernel_checkpointed",
Self::KernelRestored => "kernel_restored",
Self::MessageSent => "message_sent",
Self::MessageReceived => "message_received",
Self::MessageFailed => "message_failed",
Self::AuthenticationAttempt => "authentication_attempt",
Self::AuthorizationCheck => "authorization_check",
Self::ConfigurationChange => "configuration_change",
Self::SecurityViolation => "security_violation",
Self::MemoryAllocated => "memory_allocated",
Self::MemoryDeallocated => "memory_deallocated",
Self::ResourceLimitExceeded => "resource_limit_exceeded",
Self::HealthCheck => "health_check",
Self::CircuitBreakerStateChange => "circuit_breaker_state_change",
Self::DegradationChange => "degradation_change",
Self::Custom(s) => s.as_str(),
}
}
}
impl fmt::Display for AuditEventType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct AuditEvent {
pub id: u64,
pub timestamp: SystemTime,
pub hlc: Option<HlcTimestamp>,
pub level: AuditLevel,
pub event_type: AuditEventType,
pub actor: String,
pub target: Option<String>,
pub description: String,
pub metadata: Vec<(String, String)>,
pub prev_checksum: Option<u64>,
pub checksum: u64,
}
impl AuditEvent {
pub fn new(
level: AuditLevel,
event_type: AuditEventType,
actor: impl Into<String>,
description: impl Into<String>,
) -> Self {
let id = next_event_id();
let timestamp = SystemTime::now();
let actor = actor.into();
let description = description.into();
let mut event = Self {
id,
timestamp,
hlc: None,
level,
event_type,
actor,
target: None,
description,
metadata: Vec::new(),
prev_checksum: None,
checksum: 0,
};
event.checksum = event.compute_checksum();
event
}
pub fn with_hlc(mut self, hlc: HlcTimestamp) -> Self {
self.hlc = Some(hlc);
self.checksum = self.compute_checksum();
self
}
pub fn with_target(mut self, target: impl Into<String>) -> Self {
self.target = Some(target.into());
self.checksum = self.compute_checksum();
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.push((key.into(), value.into()));
self.checksum = self.compute_checksum();
self
}
pub fn with_prev_checksum(mut self, checksum: u64) -> Self {
self.prev_checksum = Some(checksum);
self.checksum = self.compute_checksum();
self
}
fn compute_checksum(&self) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
self.id.hash(&mut hasher);
self.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
.hash(&mut hasher);
self.level.as_str().hash(&mut hasher);
self.event_type.as_str().hash(&mut hasher);
self.actor.hash(&mut hasher);
self.target.hash(&mut hasher);
self.description.hash(&mut hasher);
for (k, v) in &self.metadata {
k.hash(&mut hasher);
v.hash(&mut hasher);
}
self.prev_checksum.hash(&mut hasher);
hasher.finish()
}
pub fn verify_checksum(&self) -> bool {
self.checksum == self.compute_checksum()
}
pub fn kernel_launched(kernel_id: impl Into<String>, backend: impl Into<String>) -> Self {
Self::new(
AuditLevel::Info,
AuditEventType::KernelLaunched,
"runtime",
format!("Kernel launched on {}", backend.into()),
)
.with_target(kernel_id)
}
pub fn kernel_terminated(kernel_id: impl Into<String>, reason: impl Into<String>) -> Self {
Self::new(
AuditLevel::Info,
AuditEventType::KernelTerminated,
"runtime",
format!("Kernel terminated: {}", reason.into()),
)
.with_target(kernel_id)
}
pub fn security_violation(actor: impl Into<String>, violation: impl Into<String>) -> Self {
Self::new(
AuditLevel::Security,
AuditEventType::SecurityViolation,
actor,
violation,
)
}
pub fn config_change(
actor: impl Into<String>,
config_key: impl Into<String>,
old_value: impl Into<String>,
new_value: impl Into<String>,
) -> Self {
Self::new(
AuditLevel::Compliance,
AuditEventType::ConfigurationChange,
actor,
format!("Configuration changed: {}", config_key.into()),
)
.with_metadata("old_value", old_value)
.with_metadata("new_value", new_value)
}
pub fn health_check(kernel_id: impl Into<String>, status: impl Into<String>) -> Self {
Self::new(
AuditLevel::Info,
AuditEventType::HealthCheck,
"health_checker",
format!("Health check: {}", status.into()),
)
.with_target(kernel_id)
}
pub fn to_json(&self) -> String {
let timestamp = self
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let hlc_str = self
.hlc
.map(|h| {
format!(
r#","hlc":{{"wall":{},"logical":{}}}"#,
h.physical, h.logical
)
})
.unwrap_or_default();
let target_str = self
.target
.as_ref()
.map(|t| format!(r#","target":"{}""#, escape_json(t)))
.unwrap_or_default();
let prev_checksum_str = self
.prev_checksum
.map(|c| format!(r#","prev_checksum":{}"#, c))
.unwrap_or_default();
let metadata_str = if self.metadata.is_empty() {
String::new()
} else {
let pairs: Vec<String> = self
.metadata
.iter()
.map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
.collect();
format!(r#","metadata":{{{}}}"#, pairs.join(","))
};
format!(
r#"{{"id":{},"timestamp":{}{},"level":"{}","event_type":"{}","actor":"{}"{}"description":"{}"{}"checksum":{}{}}}"#,
self.id,
timestamp,
hlc_str,
self.level.as_str(),
self.event_type.as_str(),
escape_json(&self.actor),
target_str,
escape_json(&self.description),
metadata_str,
self.checksum,
prev_checksum_str,
)
}
}
fn escape_json(s: &str) -> String {
s.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
.replace('\r', "\\r")
.replace('\t', "\\t")
}
static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
fn next_event_id() -> u64 {
EVENT_ID_COUNTER.fetch_add(1, Ordering::SeqCst)
}
pub trait AuditSink: Send + Sync {
fn write(&self, event: &AuditEvent) -> std::io::Result<()>;
fn flush(&self) -> std::io::Result<()>;
fn close(&self) -> std::io::Result<()>;
}
pub struct FileSink {
path: PathBuf,
writer: Mutex<Option<std::fs::File>>,
max_size: u64,
current_size: AtomicU64,
}
impl FileSink {
pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
let path = path.into();
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let metadata = file.metadata()?;
Ok(Self {
path,
writer: Mutex::new(Some(file)),
max_size: 100 * 1024 * 1024, current_size: AtomicU64::new(metadata.len()),
})
}
pub fn with_max_size(mut self, size: u64) -> Self {
self.max_size = size;
self
}
fn rotate_if_needed(&self) -> std::io::Result<()> {
if self.current_size.load(Ordering::Relaxed) >= self.max_size {
let mut writer = self.writer.lock();
if let Some(file) = writer.take() {
drop(file);
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let rotated_path = self.path.with_extension(format!("log.{}", timestamp));
std::fs::rename(&self.path, rotated_path)?;
let new_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
*writer = Some(new_file);
self.current_size.store(0, Ordering::Relaxed);
}
}
Ok(())
}
}
impl AuditSink for FileSink {
fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
self.rotate_if_needed()?;
let json = event.to_json();
let line = format!("{}\n", json);
let len = line.len() as u64;
let mut writer = self.writer.lock();
if let Some(file) = writer.as_mut() {
file.write_all(line.as_bytes())?;
self.current_size.fetch_add(len, Ordering::Relaxed);
}
Ok(())
}
fn flush(&self) -> std::io::Result<()> {
let mut writer = self.writer.lock();
if let Some(file) = writer.as_mut() {
file.flush()?;
}
Ok(())
}
fn close(&self) -> std::io::Result<()> {
let mut writer = self.writer.lock();
if let Some(file) = writer.take() {
drop(file);
}
Ok(())
}
}
#[derive(Default)]
pub struct MemorySink {
events: Mutex<VecDeque<AuditEvent>>,
max_events: usize,
}
impl MemorySink {
pub fn new(max_events: usize) -> Self {
Self {
events: Mutex::new(VecDeque::with_capacity(max_events)),
max_events,
}
}
pub fn events(&self) -> Vec<AuditEvent> {
self.events.lock().iter().cloned().collect()
}
pub fn len(&self) -> usize {
self.events.lock().len()
}
pub fn is_empty(&self) -> bool {
self.events.lock().is_empty()
}
pub fn clear(&self) {
self.events.lock().clear();
}
}
impl AuditSink for MemorySink {
fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
let mut events = self.events.lock();
if events.len() >= self.max_events {
events.pop_front();
}
events.push_back(event.clone());
Ok(())
}
fn flush(&self) -> std::io::Result<()> {
Ok(())
}
fn close(&self) -> std::io::Result<()> {
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum SyslogFacility {
Kern = 0,
User = 1,
Auth = 4,
AuthPriv = 10,
Local0 = 16,
Local1 = 17,
Local2 = 18,
Local3 = 19,
Local4 = 20,
Local5 = 21,
Local6 = 22,
Local7 = 23,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum SyslogSeverity {
Emergency = 0,
Alert = 1,
Critical = 2,
Error = 3,
Warning = 4,
Notice = 5,
Informational = 6,
Debug = 7,
}
impl From<AuditLevel> for SyslogSeverity {
fn from(level: AuditLevel) -> Self {
match level {
AuditLevel::Info => SyslogSeverity::Informational,
AuditLevel::Warning => SyslogSeverity::Warning,
AuditLevel::Security => SyslogSeverity::Notice,
AuditLevel::Critical => SyslogSeverity::Error,
AuditLevel::Compliance => SyslogSeverity::Notice,
}
}
}
#[derive(Debug, Clone)]
pub struct SyslogConfig {
pub server_addr: String,
pub facility: SyslogFacility,
pub app_name: String,
pub procid: Option<String>,
pub msgid: Option<String>,
pub rfc5424: bool,
}
impl Default for SyslogConfig {
fn default() -> Self {
Self {
server_addr: "127.0.0.1:514".to_string(),
facility: SyslogFacility::Local0,
app_name: "ringkernel".to_string(),
procid: None,
msgid: None,
rfc5424: true,
}
}
}
pub struct SyslogSink {
config: SyslogConfig,
socket: Mutex<Option<UdpSocket>>,
hostname: String,
}
impl SyslogSink {
pub fn new(config: SyslogConfig) -> std::io::Result<Self> {
let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.connect(&config.server_addr)?;
let hostname = std::env::var("HOSTNAME")
.or_else(|_| std::env::var("HOST"))
.unwrap_or_else(|_| "localhost".to_string());
Ok(Self {
config,
socket: Mutex::new(Some(socket)),
hostname,
})
}
pub fn with_server(server_addr: impl Into<String>) -> std::io::Result<Self> {
Self::new(SyslogConfig {
server_addr: server_addr.into(),
..Default::default()
})
}
fn format_rfc5424(&self, event: &AuditEvent) -> String {
let severity: SyslogSeverity = event.level.into();
let priority = (self.config.facility as u8) * 8 + (severity as u8);
let timestamp = event
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let secs = timestamp.as_secs();
let millis = timestamp.subsec_millis();
let epoch_days = secs / 86400;
let day_secs = secs % 86400;
let hours = day_secs / 3600;
let minutes = (day_secs % 3600) / 60;
let seconds = day_secs % 60;
let year = 1970 + (epoch_days / 365);
let day_of_year = epoch_days % 365;
let month = (day_of_year / 30).min(11) + 1;
let day = (day_of_year % 30) + 1;
let timestamp_str = format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
year, month, day, hours, minutes, seconds, millis
);
let procid = self.config.procid.as_deref().unwrap_or("-");
let msgid = self.config.msgid.as_deref().unwrap_or("-");
let sd = format!(
"[ringkernel@12345 level=\"{}\" event_type=\"{}\" actor=\"{}\" checksum=\"{}\"]",
event.level.as_str(),
event.event_type.as_str(),
event.actor,
event.checksum
);
format!(
"<{}>{} {} {} {} {} {} {} {}",
priority,
1, timestamp_str,
self.hostname,
self.config.app_name,
procid,
msgid,
sd,
event.description
)
}
fn format_bsd(&self, event: &AuditEvent) -> String {
let severity: SyslogSeverity = event.level.into();
let priority = (self.config.facility as u8) * 8 + (severity as u8);
let timestamp = event
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let secs = timestamp.as_secs();
let months = [
"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
];
let epoch_days = secs / 86400;
let day_secs = secs % 86400;
let hours = day_secs / 3600;
let minutes = (day_secs % 3600) / 60;
let seconds = day_secs % 60;
let day_of_year = epoch_days % 365;
let month_idx = ((day_of_year / 30) as usize).min(11);
let day = (day_of_year % 30) + 1;
let timestamp_str = format!(
"{} {:2} {:02}:{:02}:{:02}",
months[month_idx], day, hours, minutes, seconds
);
format!(
"<{}>{} {} {}: [{}] {}",
priority,
timestamp_str,
self.hostname,
self.config.app_name,
event.event_type.as_str(),
event.description
)
}
}
impl AuditSink for SyslogSink {
fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
let message = if self.config.rfc5424 {
self.format_rfc5424(event)
} else {
self.format_bsd(event)
};
let socket = self.socket.lock();
if let Some(ref sock) = *socket {
sock.send(message.as_bytes())?;
}
Ok(())
}
fn flush(&self) -> std::io::Result<()> {
Ok(())
}
fn close(&self) -> std::io::Result<()> {
let mut socket = self.socket.lock();
*socket = None;
Ok(())
}
}
#[cfg(feature = "alerting")]
#[derive(Debug, Clone)]
pub struct ElasticsearchConfig {
pub url: String,
pub index_pattern: String,
pub auth: Option<(String, String)>,
pub batch_size: usize,
pub timeout: Duration,
}
#[cfg(feature = "alerting")]
impl Default for ElasticsearchConfig {
fn default() -> Self {
Self {
url: "http://localhost:9200".to_string(),
index_pattern: "ringkernel-audit".to_string(),
auth: None,
batch_size: 100,
timeout: Duration::from_secs(30),
}
}
}
#[cfg(feature = "alerting")]
pub struct ElasticsearchSink {
config: ElasticsearchConfig,
client: reqwest::blocking::Client,
buffer: Mutex<Vec<String>>,
}
#[cfg(feature = "alerting")]
impl ElasticsearchSink {
pub fn new(config: ElasticsearchConfig) -> Result<Self, reqwest::Error> {
let client = reqwest::blocking::Client::builder()
.timeout(config.timeout)
.build()?;
Ok(Self {
config,
client,
buffer: Mutex::new(Vec::new()),
})
}
fn get_index(&self, event: &AuditEvent) -> String {
let timestamp = event
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let secs = timestamp.as_secs();
let epoch_days = secs / 86400;
let year = 1970 + (epoch_days / 365);
let day_of_year = epoch_days % 365;
let month = (day_of_year / 30).min(11) + 1;
let day = (day_of_year % 30) + 1;
let date_str = format!("{:04}.{:02}.{:02}", year, month, day);
self.config
.index_pattern
.replace("{date}", &date_str)
.replace("{year}", &format!("{:04}", year))
.replace("{month}", &format!("{:02}", month))
.replace("{day}", &format!("{:02}", day))
}
fn to_es_document(&self, event: &AuditEvent) -> String {
let timestamp_millis = event
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let metadata_json = if event.metadata.is_empty() {
"{}".to_string()
} else {
let pairs: Vec<String> = event
.metadata
.iter()
.map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
.collect();
format!("{{{}}}", pairs.join(","))
};
let hlc_json = event
.hlc
.map(|h| {
format!(
r#","hlc":{{"physical":{},"logical":{}}}"#,
h.physical, h.logical
)
})
.unwrap_or_default();
let target_json = event
.target
.as_ref()
.map(|t| format!(r#","target":"{}""#, escape_json(t)))
.unwrap_or_default();
format!(
r#"{{"@timestamp":{},"id":{},"level":"{}","event_type":"{}","actor":"{}"{}{}"description":"{}","metadata":{},"checksum":{}}}"#,
timestamp_millis,
event.id,
event.level.as_str(),
event.event_type.as_str(),
escape_json(&event.actor),
target_json,
hlc_json,
escape_json(&event.description),
metadata_json,
event.checksum
)
}
fn flush_buffer(&self) -> std::io::Result<()> {
let documents: Vec<String> = {
let mut buffer = self.buffer.lock();
std::mem::take(&mut *buffer)
};
if documents.is_empty() {
return Ok(());
}
let mut bulk_body = String::new();
for doc in documents {
bulk_body.push_str(&format!(
r#"{{"index":{{"_index":"{}"}}}}"#,
self.config.index_pattern.replace("{date}", "current")
));
bulk_body.push('\n');
bulk_body.push_str(&doc);
bulk_body.push('\n');
}
let url = format!("{}/_bulk", self.config.url);
let mut request = self
.client
.post(&url)
.body(bulk_body)
.header(reqwest::header::CONTENT_TYPE, "application/x-ndjson");
if let Some((user, pass)) = &self.config.auth {
request = request.basic_auth(user, Some(pass));
}
request.send().map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("ES request failed: {}", e),
)
})?;
Ok(())
}
}
#[cfg(feature = "alerting")]
impl AuditSink for ElasticsearchSink {
fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
let doc = self.to_es_document(event);
let should_flush = {
let mut buffer = self.buffer.lock();
buffer.push(doc);
buffer.len() >= self.config.batch_size
};
if should_flush {
self.flush_buffer()?;
}
Ok(())
}
fn flush(&self) -> std::io::Result<()> {
self.flush_buffer()
}
fn close(&self) -> std::io::Result<()> {
self.flush_buffer()
}
}
#[derive(Debug, Clone)]
pub struct CloudWatchConfig {
pub log_group: String,
pub log_stream: String,
pub region: String,
pub batch_size: usize,
}
impl Default for CloudWatchConfig {
fn default() -> Self {
Self {
log_group: "/ringkernel/audit".to_string(),
log_stream: "default".to_string(),
region: "us-east-1".to_string(),
batch_size: 100,
}
}
}
pub struct CloudWatchSink {
config: CloudWatchConfig,
buffer: Mutex<Vec<(u64, String)>>, _sequence_token: Mutex<Option<String>>,
}
impl CloudWatchSink {
pub fn new(config: CloudWatchConfig) -> Self {
Self {
config,
buffer: Mutex::new(Vec::new()),
_sequence_token: Mutex::new(None),
}
}
pub fn config(&self) -> &CloudWatchConfig {
&self.config
}
pub fn buffer_size(&self) -> usize {
self.buffer.lock().len()
}
}
impl AuditSink for CloudWatchSink {
fn write(&self, event: &AuditEvent) -> std::io::Result<()> {
let timestamp_ms = event
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let message = event.to_json();
let should_flush = {
let mut buffer = self.buffer.lock();
buffer.push((timestamp_ms, message));
buffer.len() >= self.config.batch_size
};
if should_flush {
self.flush()?;
}
Ok(())
}
fn flush(&self) -> std::io::Result<()> {
let events: Vec<(u64, String)> = {
let mut buffer = self.buffer.lock();
std::mem::take(&mut *buffer)
};
if events.is_empty() {
return Ok(());
}
tracing::warn!(
event_count = events.len(),
log_group = %self.config.log_group,
log_stream = %self.config.log_stream,
"CloudWatch sink is a stub: {} audit events dropped. \
Enable the `cloudwatch` feature for real AWS integration.",
events.len(),
);
Ok(())
}
fn close(&self) -> std::io::Result<()> {
self.flush()
}
}
#[derive(Debug, Clone)]
pub struct AuditConfig {
pub min_level: AuditLevel,
pub enable_checksums: bool,
pub buffer_size: usize,
pub flush_interval: Duration,
pub retention: Duration,
}
impl Default for AuditConfig {
fn default() -> Self {
Self {
min_level: AuditLevel::Info,
enable_checksums: true,
buffer_size: 100,
flush_interval: Duration::from_secs(5),
retention: Duration::from_secs(90 * 24 * 60 * 60), }
}
}
pub struct AuditLoggerBuilder {
config: AuditConfig,
sinks: Vec<Arc<dyn AuditSink>>,
}
impl AuditLoggerBuilder {
pub fn new() -> Self {
Self {
config: AuditConfig::default(),
sinks: Vec::new(),
}
}
pub fn with_min_level(mut self, level: AuditLevel) -> Self {
self.config.min_level = level;
self
}
pub fn with_file_sink(mut self, path: impl Into<PathBuf>) -> std::io::Result<Self> {
let sink = Arc::new(FileSink::new(path)?);
self.sinks.push(sink);
Ok(self)
}
pub fn with_memory_sink(mut self, max_events: usize) -> Self {
let sink = Arc::new(MemorySink::new(max_events));
self.sinks.push(sink);
self
}
pub fn with_sink(mut self, sink: Arc<dyn AuditSink>) -> Self {
self.sinks.push(sink);
self
}
pub fn with_syslog_sink(mut self, config: SyslogConfig) -> std::io::Result<Self> {
let sink = Arc::new(SyslogSink::new(config)?);
self.sinks.push(sink);
Ok(self)
}
pub fn with_syslog(mut self, server_addr: impl Into<String>) -> std::io::Result<Self> {
let sink = Arc::new(SyslogSink::with_server(server_addr)?);
self.sinks.push(sink);
Ok(self)
}
pub fn with_cloudwatch_sink(mut self, config: CloudWatchConfig) -> Self {
let sink = Arc::new(CloudWatchSink::new(config));
self.sinks.push(sink);
self
}
#[cfg(feature = "alerting")]
pub fn with_elasticsearch_sink(
mut self,
config: ElasticsearchConfig,
) -> Result<Self, reqwest::Error> {
let sink = Arc::new(ElasticsearchSink::new(config)?);
self.sinks.push(sink);
Ok(self)
}
pub fn with_retention(mut self, retention: Duration) -> Self {
self.config.retention = retention;
self
}
pub fn with_checksums(mut self, enable: bool) -> Self {
self.config.enable_checksums = enable;
self
}
pub fn build(self) -> AuditLogger {
AuditLogger {
config: self.config,
sinks: self.sinks,
last_checksum: AtomicU64::new(0),
event_count: AtomicU64::new(0),
buffer: RwLock::new(Vec::new()),
}
}
}
impl Default for AuditLoggerBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct AuditLogger {
config: AuditConfig,
sinks: Vec<Arc<dyn AuditSink>>,
last_checksum: AtomicU64,
event_count: AtomicU64,
buffer: RwLock<Vec<AuditEvent>>,
}
impl AuditLogger {
pub fn builder() -> AuditLoggerBuilder {
AuditLoggerBuilder::new()
}
pub fn in_memory(max_events: usize) -> Self {
AuditLoggerBuilder::new()
.with_memory_sink(max_events)
.build()
}
pub fn log(&self, mut event: AuditEvent) {
if event.level < self.config.min_level {
return;
}
if self.config.enable_checksums {
let prev = self.last_checksum.load(Ordering::Acquire);
event = event.with_prev_checksum(prev);
self.last_checksum.store(event.checksum, Ordering::Release);
}
for sink in &self.sinks {
if let Err(e) = sink.write(&event) {
tracing::error!("Audit sink error: {}", e);
}
}
self.event_count.fetch_add(1, Ordering::Relaxed);
}
pub fn log_kernel_launched(&self, kernel_id: &str, backend: &str) {
self.log(AuditEvent::kernel_launched(kernel_id, backend));
}
pub fn log_kernel_terminated(&self, kernel_id: &str, reason: &str) {
self.log(AuditEvent::kernel_terminated(kernel_id, reason));
}
pub fn log_security_violation(&self, actor: &str, violation: &str) {
self.log(AuditEvent::security_violation(actor, violation));
}
pub fn log_config_change(&self, actor: &str, key: &str, old_value: &str, new_value: &str) {
self.log(AuditEvent::config_change(actor, key, old_value, new_value));
}
pub fn event_count(&self) -> u64 {
self.event_count.load(Ordering::Relaxed)
}
pub fn buffer_event(&self, event: AuditEvent) {
let mut buffer = self.buffer.write();
buffer.push(event);
}
pub fn flush_buffered(&self) -> std::io::Result<()> {
let events: Vec<AuditEvent> = {
let mut buffer = self.buffer.write();
std::mem::take(&mut *buffer)
};
for mut event in events {
if self.config.enable_checksums {
let prev = self.last_checksum.load(Ordering::Acquire);
event = event.with_prev_checksum(prev);
self.last_checksum.store(event.checksum, Ordering::Release);
}
for sink in &self.sinks {
sink.write(&event)?;
}
self.event_count.fetch_add(1, Ordering::Relaxed);
}
self.flush()
}
pub fn buffered_count(&self) -> usize {
self.buffer.read().len()
}
pub fn flush(&self) -> std::io::Result<()> {
for sink in &self.sinks {
sink.flush()?;
}
Ok(())
}
pub fn close(&self) -> std::io::Result<()> {
for sink in &self.sinks {
sink.close()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_audit_event_creation() {
let event = AuditEvent::new(
AuditLevel::Info,
AuditEventType::KernelLaunched,
"runtime",
"Kernel launched",
);
assert_eq!(event.level, AuditLevel::Info);
assert_eq!(event.event_type, AuditEventType::KernelLaunched);
assert_eq!(event.actor, "runtime");
assert!(event.checksum != 0);
}
#[test]
fn test_audit_event_checksum() {
let event = AuditEvent::kernel_launched("test_kernel", "cuda");
assert!(event.verify_checksum());
let mut modified = event.clone();
modified.description = "Modified".to_string();
assert!(!modified.verify_checksum());
}
#[test]
fn test_audit_event_chain() {
let event1 = AuditEvent::kernel_launched("k1", "cuda");
let event2 = AuditEvent::kernel_launched("k2", "cuda").with_prev_checksum(event1.checksum);
assert_eq!(event2.prev_checksum, Some(event1.checksum));
}
#[test]
fn test_audit_event_json() {
let event = AuditEvent::kernel_launched("test", "cuda")
.with_metadata("gpu_id", "0")
.with_metadata("memory_mb", "8192");
let json = event.to_json();
assert!(json.contains("kernel_launched"));
assert!(json.contains("test"));
assert!(json.contains("cuda"));
assert!(json.contains("gpu_id"));
}
#[test]
fn test_memory_sink() {
let sink = MemorySink::new(10);
let event = AuditEvent::kernel_launched("test", "cuda");
sink.write(&event).unwrap();
assert_eq!(sink.len(), 1);
assert!(!sink.is_empty());
let events = sink.events();
assert_eq!(events[0].event_type, AuditEventType::KernelLaunched);
}
#[test]
fn test_memory_sink_rotation() {
let sink = MemorySink::new(3);
for i in 0..5 {
let event = AuditEvent::new(
AuditLevel::Info,
AuditEventType::Custom(format!("event_{}", i)),
"test",
format!("Event {}", i),
);
sink.write(&event).unwrap();
}
assert_eq!(sink.len(), 3);
let events = sink.events();
assert_eq!(
events[0].event_type,
AuditEventType::Custom("event_2".to_string())
);
}
#[test]
fn test_audit_logger() {
let logger = AuditLogger::in_memory(100);
logger.log_kernel_launched("k1", "cuda");
logger.log_kernel_terminated("k1", "shutdown");
logger.log_security_violation("user", "unauthorized access");
assert_eq!(logger.event_count(), 3);
}
#[test]
fn test_audit_level_ordering() {
assert!(AuditLevel::Info < AuditLevel::Warning);
assert!(AuditLevel::Warning < AuditLevel::Security);
assert!(AuditLevel::Security < AuditLevel::Critical);
assert!(AuditLevel::Critical < AuditLevel::Compliance);
}
#[test]
fn test_audit_event_helpers() {
let event = AuditEvent::config_change("admin", "max_kernels", "10", "20");
assert_eq!(event.level, AuditLevel::Compliance);
assert_eq!(event.metadata.len(), 2);
let health = AuditEvent::health_check("kernel_1", "healthy");
assert_eq!(health.event_type, AuditEventType::HealthCheck);
}
#[test]
fn test_syslog_severity_conversion() {
assert_eq!(
SyslogSeverity::from(AuditLevel::Info),
SyslogSeverity::Informational
);
assert_eq!(
SyslogSeverity::from(AuditLevel::Warning),
SyslogSeverity::Warning
);
assert_eq!(
SyslogSeverity::from(AuditLevel::Security),
SyslogSeverity::Notice
);
assert_eq!(
SyslogSeverity::from(AuditLevel::Critical),
SyslogSeverity::Error
);
}
#[test]
fn test_syslog_config_default() {
let config = SyslogConfig::default();
assert_eq!(config.server_addr, "127.0.0.1:514");
assert_eq!(config.facility, SyslogFacility::Local0);
assert_eq!(config.app_name, "ringkernel");
assert!(config.rfc5424);
}
#[test]
fn test_cloudwatch_config_default() {
let config = CloudWatchConfig::default();
assert_eq!(config.log_group, "/ringkernel/audit");
assert_eq!(config.log_stream, "default");
assert_eq!(config.region, "us-east-1");
assert_eq!(config.batch_size, 100);
}
#[test]
fn test_cloudwatch_sink_buffering() {
let config = CloudWatchConfig {
batch_size: 5,
..Default::default()
};
let sink = CloudWatchSink::new(config);
for i in 0..3 {
let event = AuditEvent::new(
AuditLevel::Info,
AuditEventType::Custom(format!("event_{}", i)),
"test",
format!("Event {}", i),
);
sink.write(&event).unwrap();
}
assert_eq!(sink.buffer_size(), 3);
}
#[test]
fn test_syslog_facility_values() {
assert_eq!(SyslogFacility::Kern as u8, 0);
assert_eq!(SyslogFacility::User as u8, 1);
assert_eq!(SyslogFacility::Auth as u8, 4);
assert_eq!(SyslogFacility::Local0 as u8, 16);
assert_eq!(SyslogFacility::Local7 as u8, 23);
}
#[test]
fn test_syslog_severity_values() {
assert_eq!(SyslogSeverity::Emergency as u8, 0);
assert_eq!(SyslogSeverity::Alert as u8, 1);
assert_eq!(SyslogSeverity::Critical as u8, 2);
assert_eq!(SyslogSeverity::Error as u8, 3);
assert_eq!(SyslogSeverity::Warning as u8, 4);
assert_eq!(SyslogSeverity::Notice as u8, 5);
assert_eq!(SyslogSeverity::Informational as u8, 6);
assert_eq!(SyslogSeverity::Debug as u8, 7);
}
}