use std::collections::VecDeque;
use std::sync::RwLock;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use super::audit::AuditEntry;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SiemConfig {
#[serde(default)]
pub destinations: Vec<String>,
#[serde(default)]
pub webhook_url: String,
#[serde(default)]
pub webhook_hmac_secret: String,
#[serde(default = "default_buffer_size")]
pub buffer_size: usize,
}
fn default_buffer_size() -> usize {
10_000
}
impl Default for SiemConfig {
fn default() -> Self {
Self {
destinations: Vec::new(),
webhook_url: String::new(),
webhook_hmac_secret: String::new(),
buffer_size: default_buffer_size(),
}
}
}
pub struct SiemExporter {
config: SiemConfig,
audit_buffer: RwLock<VecDeque<AuditEntry>>,
auth_buffer: RwLock<VecDeque<AuditEntry>>,
}
impl SiemExporter {
pub fn new(config: SiemConfig) -> Self {
let cap = config.buffer_size;
Self {
config,
audit_buffer: RwLock::new(VecDeque::with_capacity(cap.min(10_000))),
auth_buffer: RwLock::new(VecDeque::with_capacity(cap.min(10_000))),
}
}
pub fn push_audit(&self, entry: AuditEntry) {
let mut buf = self.audit_buffer.write().unwrap_or_else(|p| p.into_inner());
if buf.len() >= self.config.buffer_size {
buf.pop_front();
}
buf.push_back(entry);
}
pub fn push_auth(&self, entry: AuditEntry) {
let mut buf = self.auth_buffer.write().unwrap_or_else(|p| p.into_inner());
if buf.len() >= self.config.buffer_size {
buf.pop_front();
}
buf.push_back(entry);
}
pub fn drain_audit(&self) -> Vec<AuditEntry> {
let mut buf = self.audit_buffer.write().unwrap_or_else(|p| p.into_inner());
buf.drain(..).collect()
}
pub fn drain_auth(&self) -> Vec<AuditEntry> {
let mut buf = self.auth_buffer.write().unwrap_or_else(|p| p.into_inner());
buf.drain(..).collect()
}
pub fn build_webhook_payload(&self, events: &[AuditEntry]) -> (String, String) {
let body = serde_json::json!({
"source": "nodedb",
"event_count": events.len(),
"events": events,
})
.to_string();
let signature = if !self.config.webhook_hmac_secret.is_empty() {
compute_hmac(&self.config.webhook_hmac_secret, &body)
} else {
String::new()
};
(body, signature)
}
pub async fn flush_webhook(&self) {
if self.config.webhook_url.is_empty() {
return;
}
let audit_events = self.drain_audit();
let auth_events = self.drain_auth();
let all: Vec<AuditEntry> = audit_events.into_iter().chain(auth_events).collect();
if all.is_empty() {
return;
}
let (body, signature) = self.build_webhook_payload(&all);
match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
{
Ok(client) => {
let mut req = client
.post(&self.config.webhook_url)
.header("Content-Type", "application/json")
.body(body);
if !signature.is_empty() {
req = req.header("X-NodeDB-Signature", &signature);
}
match req.send().await {
Ok(resp) if resp.status().is_success() => {
info!(events = all.len(), "SIEM webhook delivered");
}
Ok(resp) => {
warn!(status = %resp.status(), "SIEM webhook delivery failed");
}
Err(e) => {
warn!(error = %e, "SIEM webhook request failed");
}
}
}
Err(e) => {
warn!(error = %e, "SIEM webhook client construction failed");
}
}
}
pub fn is_configured(&self) -> bool {
!self.config.destinations.is_empty() || !self.config.webhook_url.is_empty()
}
pub fn buffered_count(&self) -> usize {
let a = self
.audit_buffer
.read()
.unwrap_or_else(|p| p.into_inner())
.len();
let b = self
.auth_buffer
.read()
.unwrap_or_else(|p| p.into_inner())
.len();
a + b
}
}
impl Default for SiemExporter {
fn default() -> Self {
Self::new(SiemConfig::default())
}
}
fn compute_hmac(secret: &str, message: &str) -> String {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let Ok(mut mac) = HmacSha256::new_from_slice(secret.as_bytes()) else {
return String::new();
};
mac.update(message.as_bytes());
let result = mac.finalize();
result
.into_bytes()
.iter()
.map(|b| format!("{b:02x}"))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
fn test_entry() -> AuditEntry {
AuditEntry {
seq: 1,
timestamp_us: 0,
event: super::super::audit::AuditEvent::AuthSuccess,
tenant_id: None,
auth_user_id: "u1".into(),
auth_user_name: "alice".into(),
session_id: "s1".into(),
source: "10.0.0.1".into(),
detail: "test".into(),
prev_hash: String::new(),
}
}
#[test]
fn buffer_and_drain() {
let exporter = SiemExporter::default();
exporter.push_audit(test_entry());
exporter.push_audit(test_entry());
exporter.push_auth(test_entry());
assert_eq!(exporter.buffered_count(), 3);
let audit = exporter.drain_audit();
assert_eq!(audit.len(), 2);
assert_eq!(exporter.buffered_count(), 1); }
#[test]
fn webhook_payload_with_hmac() {
let config = SiemConfig {
webhook_hmac_secret: "test_secret".into(),
..Default::default()
};
let exporter = SiemExporter::new(config);
let (body, signature) = exporter.build_webhook_payload(&[test_entry()]);
assert!(body.contains("nodedb"));
assert!(!signature.is_empty());
assert_eq!(signature.len(), 64); }
#[test]
fn hmac_consistency() {
let sig1 = compute_hmac("secret", "hello");
let sig2 = compute_hmac("secret", "hello");
assert_eq!(sig1, sig2);
let sig3 = compute_hmac("secret", "world");
assert_ne!(sig1, sig3);
}
}