use crate::security::events::SecurityEvent;
use crate::Error;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::RwLock;
use tracing::{debug, error, warn};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "lowercase")]
pub enum SiemProtocol {
Syslog,
Http,
Https,
File,
Splunk,
Datadog,
Cloudwatch,
Gcp,
Azure,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "lowercase")]
#[derive(Default)]
pub enum SyslogFacility {
Kernel = 0,
User = 1,
Mail = 2,
Daemon = 3,
Security = 4,
Syslogd = 5,
LinePrinter = 6,
NetworkNews = 7,
Uucp = 8,
Clock = 9,
Security2 = 10,
Ftp = 11,
Ntp = 12,
LogAudit = 13,
LogAlert = 14,
#[default]
Local0 = 16,
Local1 = 17,
Local2 = 18,
Local3 = 19,
Local4 = 20,
Local5 = 21,
Local6 = 22,
Local7 = 23,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyslogSeverity {
Emergency = 0,
Alert = 1,
Critical = 2,
Error = 3,
Warning = 4,
Notice = 5,
Informational = 6,
Debug = 7,
}
impl From<crate::security::events::SecurityEventSeverity> for SyslogSeverity {
fn from(severity: crate::security::events::SecurityEventSeverity) -> Self {
match severity {
crate::security::events::SecurityEventSeverity::Low => SyslogSeverity::Informational,
crate::security::events::SecurityEventSeverity::Medium => SyslogSeverity::Warning,
crate::security::events::SecurityEventSeverity::High => SyslogSeverity::Error,
crate::security::events::SecurityEventSeverity::Critical => SyslogSeverity::Critical,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct RetryConfig {
pub max_attempts: u32,
#[serde(default = "default_backoff")]
pub backoff: String,
#[serde(default = "default_initial_delay")]
pub initial_delay_secs: u64,
}
fn default_backoff() -> String {
"exponential".to_string()
}
fn default_initial_delay() -> u64 {
1
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
backoff: "exponential".to_string(),
initial_delay_secs: 1,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct FileRotationConfig {
pub max_size: String,
pub max_files: u32,
#[serde(default)]
pub compress: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct EventFilter {
pub include: Option<Vec<String>>,
pub exclude: Option<Vec<String>>,
pub conditions: Option<Vec<String>>,
}
impl EventFilter {
pub fn should_include(&self, event: &SecurityEvent) -> bool {
if let Some(ref includes) = self.include {
let mut matched = false;
for pattern in includes {
if self.matches_pattern(&event.event_type, pattern) {
matched = true;
break;
}
}
if !matched {
return false;
}
}
if let Some(ref excludes) = self.exclude {
for pattern in excludes {
if pattern.starts_with("severity:") {
let severity_str = pattern.strip_prefix("severity:").unwrap_or("");
if severity_str == "low"
&& event.severity == crate::security::events::SecurityEventSeverity::Low
{
return false;
}
if severity_str == "medium"
&& event.severity == crate::security::events::SecurityEventSeverity::Medium
{
return false;
}
if severity_str == "high"
&& event.severity == crate::security::events::SecurityEventSeverity::High
{
return false;
}
if severity_str == "critical"
&& event.severity
== crate::security::events::SecurityEventSeverity::Critical
{
return false;
}
} else if self.matches_pattern(&event.event_type, pattern) {
return false;
}
}
}
if let Some(ref conditions) = self.conditions {
for condition in conditions {
if !self.evaluate_condition(condition, event) {
return false;
}
}
}
true
}
fn evaluate_condition(&self, condition: &str, event: &SecurityEvent) -> bool {
if let Some((key, value)) = condition.split_once("!=") {
let key = key.trim();
let value = value.trim();
match event.metadata.get(key) {
Some(v) => !metadata_value_matches(v, value),
None => true, }
} else if let Some((key, value)) = condition.split_once('=') {
let key = key.trim();
let value = value.trim();
match event.metadata.get(key) {
Some(v) => metadata_value_matches(v, value),
None => false,
}
} else {
let key = condition.trim();
event.metadata.contains_key(key)
}
}
fn matches_pattern(&self, event_type: &str, pattern: &str) -> bool {
if pattern.ends_with(".*") {
let prefix = pattern.strip_suffix(".*").unwrap_or("");
event_type.starts_with(prefix)
} else {
event_type == pattern
}
}
}
fn metadata_value_matches(value: &serde_json::Value, expected: &str) -> bool {
match value {
serde_json::Value::String(s) => s == expected,
serde_json::Value::Number(n) => n.to_string() == expected,
serde_json::Value::Bool(b) => (expected == "true" && *b) || (expected == "false" && !b),
serde_json::Value::Null => expected == "null",
serde_json::Value::Array(_) | serde_json::Value::Object(_) => false,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(tag = "protocol")]
pub enum SiemDestination {
#[serde(rename = "syslog")]
Syslog {
host: String,
port: u16,
#[serde(default = "default_syslog_protocol", rename = "transport")]
transport: String,
#[serde(default)]
facility: SyslogFacility,
#[serde(default = "default_tag")]
tag: String,
},
#[serde(rename = "http")]
Http {
url: String,
#[serde(default = "default_http_method")]
method: String,
#[serde(default)]
headers: HashMap<String, String>,
#[serde(default = "default_timeout")]
timeout: u64,
#[serde(default)]
retry: RetryConfig,
},
#[serde(rename = "https")]
Https {
url: String,
#[serde(default = "default_http_method")]
method: String,
#[serde(default)]
headers: HashMap<String, String>,
#[serde(default = "default_timeout")]
timeout: u64,
#[serde(default)]
retry: RetryConfig,
},
#[serde(rename = "file")]
File {
path: String,
#[serde(default = "default_file_format")]
format: String,
rotation: Option<FileRotationConfig>,
},
#[serde(rename = "splunk")]
Splunk {
url: String,
token: String,
index: Option<String>,
source_type: Option<String>,
},
#[serde(rename = "datadog")]
Datadog {
api_key: String,
app_key: Option<String>,
#[serde(default = "default_datadog_site")]
site: String,
#[serde(default)]
tags: Vec<String>,
},
#[serde(rename = "cloudwatch")]
Cloudwatch {
region: String,
log_group: String,
stream: String,
credentials: HashMap<String, String>,
},
#[serde(rename = "gcp")]
Gcp {
project_id: String,
log_name: String,
credentials_path: String,
},
#[serde(rename = "azure")]
Azure {
workspace_id: String,
shared_key: String,
log_type: String,
},
}
fn default_syslog_protocol() -> String {
"udp".to_string()
}
fn default_tag() -> String {
"mockforge".to_string()
}
fn default_http_method() -> String {
"POST".to_string()
}
fn default_timeout() -> u64 {
5
}
fn default_file_format() -> String {
"jsonl".to_string()
}
fn default_datadog_site() -> String {
"datadoghq.com".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[derive(Default)]
pub struct SiemConfig {
pub enabled: bool,
pub protocol: Option<SiemProtocol>,
pub destinations: Vec<SiemDestination>,
pub filters: Option<EventFilter>,
}
#[async_trait]
pub trait SiemTransport: Send + Sync {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error>;
}
pub struct SyslogTransport {
host: String,
port: u16,
use_tcp: bool,
facility: SyslogFacility,
tag: String,
}
impl SyslogTransport {
pub fn new(
host: String,
port: u16,
protocol: String,
facility: SyslogFacility,
tag: String,
) -> Self {
Self {
host,
port,
use_tcp: protocol == "tcp",
facility,
tag,
}
}
fn format_syslog_message(&self, event: &SecurityEvent) -> String {
let severity: SyslogSeverity = event.severity.into();
let priority = (self.facility as u8) * 8 + severity as u8;
let timestamp = event.timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ");
let hostname = "mockforge"; let app_name = &self.tag;
let proc_id = "-";
let msg_id = "-";
let structured_data = "-"; let msg = event.to_json().unwrap_or_else(|_| "{}".to_string());
format!(
"<{}>1 {} {} {} {} {} {} {}",
priority, timestamp, hostname, app_name, proc_id, msg_id, structured_data, msg
)
}
}
#[async_trait]
impl SiemTransport for SyslogTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let message = self.format_syslog_message(event);
if self.use_tcp {
use tokio::net::TcpStream;
let addr = format!("{}:{}", self.host, self.port);
let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
Error::siem_transport(format!("Failed to connect to syslog server: {}", e))
})?;
stream.write_all(message.as_bytes()).await.map_err(|e| {
Error::siem_transport(format!("Failed to send syslog message: {}", e))
})?;
} else {
use tokio::net::UdpSocket;
let socket = UdpSocket::bind("0.0.0.0:0")
.await
.map_err(|e| Error::siem_transport(format!("Failed to bind UDP socket: {}", e)))?;
let addr = format!("{}:{}", self.host, self.port);
socket.send_to(message.as_bytes(), &addr).await.map_err(|e| {
Error::siem_transport(format!("Failed to send UDP syslog message: {}", e))
})?;
}
debug!("Sent syslog event: {}", event.event_type);
Ok(())
}
}
pub struct HttpTransport {
url: String,
method: String,
headers: HashMap<String, String>,
retry: RetryConfig,
client: reqwest::Client,
}
impl HttpTransport {
pub fn new(
url: String,
method: String,
headers: HashMap<String, String>,
timeout: u64,
retry: RetryConfig,
) -> Self {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(timeout))
.build()
.expect("Failed to create HTTP client");
Self {
url,
method,
headers,
retry,
client,
}
}
}
#[async_trait]
impl SiemTransport for HttpTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let event_json = event.to_json()?;
let mut request = match self.method.as_str() {
"POST" => self.client.post(&self.url),
"PUT" => self.client.put(&self.url),
"PATCH" => self.client.patch(&self.url),
_ => {
return Err(Error::siem_transport(format!(
"Unsupported HTTP method: {}",
self.method
)))
}
};
for (key, value) in &self.headers {
request = request.header(key, value);
}
if !self.headers.contains_key("Content-Type") {
request = request.header("Content-Type", "application/json");
}
request = request.body(event_json);
let mut last_error = None;
for attempt in 0..=self.retry.max_attempts {
match request.try_clone() {
Some(req) => match req.send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Sent HTTP event to {}: {}", self.url, event.event_type);
return Ok(());
} else {
let status = response.status();
last_error =
Some(Error::siem_transport(format!("HTTP error: {}", status)));
}
}
Err(e) => {
last_error =
Some(Error::siem_transport(format!("HTTP request failed: {}", e)));
}
},
None => {
let event_json = event.to_json()?;
let mut req = match self.method.as_str() {
"POST" => self.client.post(&self.url),
"PUT" => self.client.put(&self.url),
"PATCH" => self.client.patch(&self.url),
_ => break,
};
for (key, value) in &self.headers {
req = req.header(key, value);
}
if !self.headers.contains_key("Content-Type") {
req = req.header("Content-Type", "application/json");
}
req = req.body(event_json);
request = req;
continue;
}
}
if attempt < self.retry.max_attempts {
let delay = if self.retry.backoff == "exponential" {
self.retry.initial_delay_secs * (2_u64.pow(attempt))
} else {
self.retry.initial_delay_secs * (attempt as u64 + 1)
};
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
}
}
Err(last_error
.unwrap_or_else(|| Error::siem_transport("Failed to send HTTP event after retries")))
}
}
pub struct FileTransport {
path: PathBuf,
format: String,
writer: Arc<RwLock<Option<BufWriter<File>>>>,
}
impl FileTransport {
pub async fn new(path: String, format: String) -> Result<Self, Error> {
let path = PathBuf::from(path);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| Error::siem_transport(format!("Failed to create directory: {}", e)))?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.map_err(|e| Error::siem_transport(format!("Failed to open file: {}", e)))?;
let writer = Arc::new(RwLock::new(Some(BufWriter::new(file))));
Ok(Self {
path,
format,
writer,
})
}
}
#[async_trait]
impl SiemTransport for FileTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let mut writer_guard = self.writer.write().await;
if let Some(ref mut writer) = *writer_guard {
let line = if self.format == "jsonl" {
format!("{}\n", event.to_json()?)
} else {
format!("{}\n", event.to_json()?)
};
writer
.write_all(line.as_bytes())
.await
.map_err(|e| Error::siem_transport(format!("Failed to write to file: {}", e)))?;
writer
.flush()
.await
.map_err(|e| Error::siem_transport(format!("Failed to flush file: {}", e)))?;
debug!("Wrote event to file {}: {}", self.path.display(), event.event_type);
Ok(())
} else {
Err(Error::siem_transport("File writer not initialized"))
}
}
}
pub struct SplunkTransport {
url: String,
token: String,
index: Option<String>,
source_type: Option<String>,
retry: RetryConfig,
client: reqwest::Client,
}
impl SplunkTransport {
pub fn new(
url: String,
token: String,
index: Option<String>,
source_type: Option<String>,
retry: RetryConfig,
) -> Self {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self {
url,
token,
index,
source_type,
retry,
client,
}
}
fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
let mut splunk_event = serde_json::json!({
"event": event.to_json()?,
"time": event.timestamp.timestamp(),
});
if let Some(ref index) = self.index {
splunk_event["index"] = serde_json::Value::String(index.clone());
}
if let Some(ref st) = self.source_type {
splunk_event["sourcetype"] = serde_json::Value::String(st.clone());
} else {
splunk_event["sourcetype"] =
serde_json::Value::String("mockforge:security".to_string());
}
Ok(splunk_event)
}
}
#[async_trait]
impl SiemTransport for SplunkTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let splunk_event = self.format_event(event)?;
let url = format!("{}/services/collector/event", self.url.trim_end_matches('/'));
let mut last_error = None;
for attempt in 0..=self.retry.max_attempts {
match self
.client
.post(&url)
.header("Authorization", format!("Splunk {}", self.token))
.header("Content-Type", "application/json")
.json(&splunk_event)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
debug!("Sent Splunk event: {}", event.event_type);
return Ok(());
} else {
let status = response.status();
let body = response.text().await.unwrap_or_default();
last_error = Some(Error::siem_transport(format!(
"Splunk HTTP error {}: {}",
status, body
)));
}
}
Err(e) => {
last_error =
Some(Error::siem_transport(format!("Splunk request failed: {}", e)));
}
}
if attempt < self.retry.max_attempts {
let delay = if self.retry.backoff == "exponential" {
self.retry.initial_delay_secs * (2_u64.pow(attempt))
} else {
self.retry.initial_delay_secs * (attempt as u64 + 1)
};
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
}
}
Err(last_error
.unwrap_or_else(|| Error::siem_transport("Failed to send Splunk event after retries")))
}
}
pub struct DatadogTransport {
api_key: String,
app_key: Option<String>,
site: String,
tags: Vec<String>,
retry: RetryConfig,
client: reqwest::Client,
}
impl DatadogTransport {
pub fn new(
api_key: String,
app_key: Option<String>,
site: String,
tags: Vec<String>,
retry: RetryConfig,
) -> Self {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self {
api_key,
app_key,
site,
tags,
retry,
client,
}
}
fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
let mut tags = self.tags.clone();
tags.push(format!("event_type:{}", event.event_type));
tags.push(format!("severity:{}", format!("{:?}", event.severity).to_lowercase()));
let datadog_event = serde_json::json!({
"title": format!("MockForge Security Event: {}", event.event_type),
"text": event.to_json()?,
"alert_type": match event.severity {
crate::security::events::SecurityEventSeverity::Critical => "error",
crate::security::events::SecurityEventSeverity::High => "warning",
crate::security::events::SecurityEventSeverity::Medium => "info",
crate::security::events::SecurityEventSeverity::Low => "info",
},
"tags": tags,
"date_happened": event.timestamp.timestamp(),
});
Ok(datadog_event)
}
}
#[async_trait]
impl SiemTransport for DatadogTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let datadog_event = self.format_event(event)?;
let url = format!("https://api.{}/api/v1/events", self.site);
let mut last_error = None;
for attempt in 0..=self.retry.max_attempts {
let mut request =
self.client.post(&url).header("DD-API-KEY", &self.api_key).json(&datadog_event);
if let Some(ref app_key) = self.app_key {
request = request.header("DD-APPLICATION-KEY", app_key);
}
match request.send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Sent Datadog event: {}", event.event_type);
return Ok(());
} else {
let status = response.status();
let body = response.text().await.unwrap_or_default();
last_error = Some(Error::siem_transport(format!(
"Datadog HTTP error {}: {}",
status, body
)));
}
}
Err(e) => {
last_error =
Some(Error::siem_transport(format!("Datadog request failed: {}", e)));
}
}
if attempt < self.retry.max_attempts {
let delay = if self.retry.backoff == "exponential" {
self.retry.initial_delay_secs * (2_u64.pow(attempt))
} else {
self.retry.initial_delay_secs * (attempt as u64 + 1)
};
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
}
}
Err(last_error
.unwrap_or_else(|| Error::siem_transport("Failed to send Datadog event after retries")))
}
}
pub struct CloudwatchTransport {
region: String,
log_group: String,
stream: String,
credentials: HashMap<String, String>,
retry: RetryConfig,
client: reqwest::Client,
}
impl CloudwatchTransport {
pub fn new(
region: String,
log_group: String,
stream: String,
credentials: HashMap<String, String>,
retry: RetryConfig,
) -> Self {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self {
region,
log_group,
stream,
credentials,
retry,
client,
}
}
}
#[async_trait]
impl SiemTransport for CloudwatchTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let event_json = event.to_json()?;
let log_events = serde_json::json!({
"logGroupName": self.log_group,
"logStreamName": self.stream,
"logEvents": [{
"timestamp": event.timestamp.timestamp_millis(),
"message": event_json
}]
});
let url = format!("https://logs.{}.amazonaws.com/", self.region);
let mut attempt = 0;
loop {
let mut req = self
.client
.post(&url)
.header("Content-Type", "application/x-amz-json-1.1")
.header("X-Amz-Target", "Logs_20140328.PutLogEvents");
if let Some(access_key) = self.credentials.get("access_key_id") {
req = req.header("X-Amz-Access-Key", access_key.as_str());
}
if let Some(token) = self.credentials.get("session_token") {
req = req.header("X-Amz-Security-Token", token.as_str());
}
let result = req.json(&log_events).send().await;
match result {
Ok(resp) if resp.status().is_success() => {
debug!(
"CloudWatch event sent to log_group={}, stream={}: {}",
self.log_group, self.stream, event.event_type
);
return Ok(());
}
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
attempt += 1;
if attempt >= self.retry.max_attempts as usize {
warn!(
"CloudWatch transport failed after {} attempts (status={}): {}",
attempt, status, body
);
return Err(Error::siem_transport(format!(
"CloudWatch PutLogEvents failed with status {}: {}",
status, body
)));
}
let delay = std::time::Duration::from_millis(
self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
);
tokio::time::sleep(delay).await;
}
Err(e) => {
attempt += 1;
if attempt >= self.retry.max_attempts as usize {
warn!("CloudWatch transport failed after {} attempts: {}", attempt, e);
return Err(Error::siem_transport(format!(
"CloudWatch PutLogEvents request failed: {}",
e
)));
}
let delay = std::time::Duration::from_millis(
self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
);
tokio::time::sleep(delay).await;
}
}
}
}
}
pub struct GcpTransport {
project_id: String,
log_name: String,
credentials_path: String,
retry: RetryConfig,
client: reqwest::Client,
}
impl GcpTransport {
pub fn new(
project_id: String,
log_name: String,
credentials_path: String,
retry: RetryConfig,
) -> Self {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self {
project_id,
log_name,
credentials_path,
retry,
client,
}
}
}
#[async_trait]
impl SiemTransport for GcpTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let event_json = event.to_json()?;
let log_entry = serde_json::json!({
"entries": [{
"logName": format!("projects/{}/logs/{}", self.project_id, self.log_name),
"resource": {
"type": "global"
},
"timestamp": event.timestamp.to_rfc3339(),
"jsonPayload": serde_json::from_str::<serde_json::Value>(&event_json)
.unwrap_or_else(|_| serde_json::json!({"message": event_json}))
}]
});
let url = "https://logging.googleapis.com/v2/entries:write";
let bearer_token =
std::fs::read_to_string(&self.credentials_path).ok().and_then(|contents| {
serde_json::from_str::<serde_json::Value>(&contents)
.ok()
.and_then(|v| v.get("access_token").and_then(|t| t.as_str().map(String::from)))
});
let mut attempt = 0;
loop {
let mut req = self.client.post(url).header("Content-Type", "application/json");
if let Some(ref token) = bearer_token {
req = req.bearer_auth(token);
}
let result = req.json(&log_entry).send().await;
match result {
Ok(resp) if resp.status().is_success() => {
debug!(
"GCP event sent to project={}, log={}: {}",
self.project_id, self.log_name, event.event_type
);
return Ok(());
}
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
attempt += 1;
if attempt >= self.retry.max_attempts as usize {
warn!(
"GCP transport failed after {} attempts (status={}): {}",
attempt, status, body
);
return Err(Error::siem_transport(format!(
"GCP entries:write failed with status {}: {}",
status, body
)));
}
let delay = std::time::Duration::from_millis(
self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
);
tokio::time::sleep(delay).await;
}
Err(e) => {
attempt += 1;
if attempt >= self.retry.max_attempts as usize {
warn!("GCP transport failed after {} attempts: {}", attempt, e);
return Err(Error::siem_transport(format!(
"GCP entries:write request failed: {}",
e
)));
}
let delay = std::time::Duration::from_millis(
self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
);
tokio::time::sleep(delay).await;
}
}
}
}
}
pub struct AzureTransport {
workspace_id: String,
shared_key: String,
log_type: String,
retry: RetryConfig,
client: reqwest::Client,
}
impl AzureTransport {
pub fn new(
workspace_id: String,
shared_key: String,
log_type: String,
retry: RetryConfig,
) -> Self {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self {
workspace_id,
shared_key,
log_type,
retry,
client,
}
}
fn generate_signature(
&self,
date: &str,
content_length: usize,
method: &str,
content_type: &str,
resource: &str,
) -> Result<String, Error> {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let string_to_sign =
format!("{}\n{}\n{}\n{}\n{}", method, content_length, content_type, date, resource);
let key_bytes = base64::decode(&self.shared_key).map_err(|e| {
Error::siem_transport(format!("Azure shared_key is not valid base64: {}", e))
})?;
let mut mac =
HmacSha256::new_from_slice(&key_bytes).expect("HMAC can take key of any size");
mac.update(string_to_sign.as_bytes());
let result = mac.finalize();
Ok(base64::encode(result.into_bytes()))
}
}
#[async_trait]
impl SiemTransport for AzureTransport {
async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
let event_json = event.to_json()?;
let url = format!(
"https://{}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01",
self.workspace_id
);
let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
let content_type = "application/json";
let content_length = event_json.len();
let method = "POST";
let resource = "/api/logs?api-version=2016-04-01".to_string();
let signature =
self.generate_signature(&date, content_length, method, content_type, &resource)?;
let mut last_error = None;
for attempt in 0..=self.retry.max_attempts {
let log_entry = serde_json::json!({
"log_type": self.log_type,
"time_generated": event.timestamp.to_rfc3339(),
"data": serde_json::from_str::<serde_json::Value>(&event_json)
.unwrap_or_else(|_| serde_json::json!({"message": event_json}))
});
match self
.client
.post(&url)
.header("x-ms-date", &date)
.header("Content-Type", content_type)
.header("Authorization", format!("SharedKey {}:{}", self.workspace_id, signature))
.header("Log-Type", &self.log_type)
.header("time-generated-field", "time_generated")
.body(serde_json::to_string(&log_entry)?)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
debug!("Sent Azure Monitor event: {}", event.event_type);
return Ok(());
} else {
let status = response.status();
let body = response.text().await.unwrap_or_default();
last_error = Some(Error::siem_transport(format!(
"Azure Monitor HTTP error {}: {}",
status, body
)));
}
}
Err(e) => {
last_error =
Some(Error::siem_transport(format!("Azure Monitor request failed: {}", e)));
}
}
if attempt < self.retry.max_attempts {
let delay = if self.retry.backoff == "exponential" {
self.retry.initial_delay_secs * (2_u64.pow(attempt))
} else {
self.retry.initial_delay_secs * (attempt as u64 + 1)
};
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
}
}
Err(last_error.unwrap_or_else(|| {
Error::siem_transport("Failed to send Azure Monitor event after retries")
}))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportHealth {
pub identifier: String,
pub healthy: bool,
pub last_success: Option<chrono::DateTime<chrono::Utc>>,
pub last_error: Option<String>,
pub success_count: u64,
pub failure_count: u64,
}
pub struct SiemEmitter {
transports: Vec<Box<dyn SiemTransport>>,
filters: Option<EventFilter>,
health_status: Arc<RwLock<Vec<TransportHealth>>>,
}
impl SiemEmitter {
pub async fn from_config(config: SiemConfig) -> Result<Self, Error> {
if !config.enabled {
return Ok(Self {
transports: Vec::new(),
filters: config.filters,
health_status: Arc::new(RwLock::new(Vec::new())),
});
}
let mut transports: Vec<Box<dyn SiemTransport>> = Vec::new();
for dest in config.destinations {
let transport: Box<dyn SiemTransport> = match dest {
SiemDestination::Syslog {
host,
port,
transport,
facility,
tag,
} => Box::new(SyslogTransport::new(host, port, transport, facility, tag)),
SiemDestination::Http {
url,
method,
headers,
timeout,
retry,
} => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
SiemDestination::Https {
url,
method,
headers,
timeout,
retry,
} => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
SiemDestination::File { path, format, .. } => {
Box::new(FileTransport::new(path, format).await?)
}
SiemDestination::Splunk {
url,
token,
index,
source_type,
} => Box::new(SplunkTransport::new(
url,
token,
index,
source_type,
RetryConfig::default(),
)),
SiemDestination::Datadog {
api_key,
app_key,
site,
tags,
} => Box::new(DatadogTransport::new(
api_key,
app_key,
site,
tags,
RetryConfig::default(),
)),
SiemDestination::Cloudwatch {
region,
log_group,
stream,
credentials,
} => Box::new(CloudwatchTransport::new(
region,
log_group,
stream,
credentials,
RetryConfig::default(),
)),
SiemDestination::Gcp {
project_id,
log_name,
credentials_path,
} => Box::new(GcpTransport::new(
project_id,
log_name,
credentials_path,
RetryConfig::default(),
)),
SiemDestination::Azure {
workspace_id,
shared_key,
log_type,
} => Box::new(AzureTransport::new(
workspace_id,
shared_key,
log_type,
RetryConfig::default(),
)),
};
transports.push(transport);
}
let health_status = Arc::new(RwLock::new(
transports
.iter()
.enumerate()
.map(|(i, _)| TransportHealth {
identifier: format!("transport_{}", i),
healthy: true,
last_success: None,
last_error: None,
success_count: 0,
failure_count: 0,
})
.collect(),
));
Ok(Self {
transports,
filters: config.filters,
health_status,
})
}
pub async fn emit(&self, event: SecurityEvent) -> Result<(), Error> {
if let Some(ref filter) = self.filters {
if !filter.should_include(&event) {
debug!("Event filtered out: {}", event.event_type);
return Ok(());
}
}
let mut errors = Vec::new();
let mut health_status = self.health_status.write().await;
for (idx, transport) in self.transports.iter().enumerate() {
match transport.send_event(&event).await {
Ok(()) => {
if let Some(health) = health_status.get_mut(idx) {
health.healthy = true;
health.last_success = Some(chrono::Utc::now());
health.success_count += 1;
health.last_error = None;
}
}
Err(e) => {
let error_msg = format!("{}", e);
error!("Failed to send event to SIEM: {}", error_msg);
errors.push(Error::siem_transport(error_msg.clone()));
if let Some(health) = health_status.get_mut(idx) {
health.healthy = false;
health.failure_count += 1;
health.last_error = Some(error_msg);
}
}
}
}
drop(health_status);
if !errors.is_empty() && errors.len() == self.transports.len() {
return Err(Error::siem_transport(format!(
"All SIEM transports failed: {} errors",
errors.len()
)));
}
Ok(())
}
pub async fn health_status(&self) -> Vec<TransportHealth> {
self.health_status.read().await.clone()
}
pub async fn is_healthy(&self) -> bool {
let health_status = self.health_status.read().await;
health_status.iter().any(|h| h.healthy)
}
pub async fn health_summary(&self) -> (usize, usize, usize) {
let health_status = self.health_status.read().await;
let total = health_status.len();
let healthy = health_status.iter().filter(|h| h.healthy).count();
let unhealthy = total - healthy;
(total, healthy, unhealthy)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::security::events::{SecurityEvent, SecurityEventType};
#[test]
fn test_event_filter_include() {
let filter = EventFilter {
include: Some(vec!["auth.*".to_string()]),
exclude: None,
conditions: None,
};
let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
assert!(filter.should_include(&event));
let event = SecurityEvent::new(SecurityEventType::ConfigChanged, None, None);
assert!(!filter.should_include(&event));
}
#[test]
fn test_event_filter_exclude() {
let filter = EventFilter {
include: None,
exclude: Some(vec!["severity:low".to_string()]),
conditions: None,
};
let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
assert!(!filter.should_include(&event));
let event = SecurityEvent::new(SecurityEventType::AuthFailure, None, None);
assert!(filter.should_include(&event));
}
#[tokio::test]
async fn test_syslog_transport_format() {
let transport = SyslogTransport::new(
"localhost".to_string(),
514,
"udp".to_string(),
SyslogFacility::Local0,
"mockforge".to_string(),
);
let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
let message = transport.format_syslog_message(&event);
assert!(message.starts_with("<"));
assert!(message.contains("mockforge"));
}
#[test]
fn test_siem_protocol_serialization() {
let protocols = vec![
SiemProtocol::Syslog,
SiemProtocol::Http,
SiemProtocol::Https,
SiemProtocol::File,
SiemProtocol::Splunk,
SiemProtocol::Datadog,
SiemProtocol::Cloudwatch,
SiemProtocol::Gcp,
SiemProtocol::Azure,
];
for protocol in protocols {
let json = serde_json::to_string(&protocol).unwrap();
assert!(!json.is_empty());
let deserialized: SiemProtocol = serde_json::from_str(&json).unwrap();
assert_eq!(protocol, deserialized);
}
}
#[test]
fn test_syslog_facility_default() {
let facility = SyslogFacility::default();
assert_eq!(facility, SyslogFacility::Local0);
}
#[test]
fn test_syslog_facility_serialization() {
let facilities = vec![
SyslogFacility::Kernel,
SyslogFacility::User,
SyslogFacility::Security,
SyslogFacility::Local0,
SyslogFacility::Local7,
];
for facility in facilities {
let json = serde_json::to_string(&facility).unwrap();
assert!(!json.is_empty());
let deserialized: SyslogFacility = serde_json::from_str(&json).unwrap();
assert_eq!(facility, deserialized);
}
}
#[test]
fn test_syslog_severity_from_security_event_severity() {
use crate::security::events::SecurityEventSeverity;
assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Low), SyslogSeverity::Informational);
assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Medium), SyslogSeverity::Warning);
assert_eq!(SyslogSeverity::from(SecurityEventSeverity::High), SyslogSeverity::Error);
assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Critical), SyslogSeverity::Critical);
}
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_attempts, 3);
assert_eq!(config.backoff, "exponential");
assert_eq!(config.initial_delay_secs, 1);
}
#[test]
fn test_retry_config_serialization() {
let config = RetryConfig {
max_attempts: 5,
backoff: "linear".to_string(),
initial_delay_secs: 2,
};
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("max_attempts"));
assert!(json.contains("linear"));
}
#[test]
fn test_file_rotation_config_serialization() {
let config = FileRotationConfig {
max_size: "100MB".to_string(),
max_files: 10,
compress: true,
};
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("100MB"));
assert!(json.contains("max_files"));
}
#[test]
fn test_siem_config_default() {
let config = SiemConfig::default();
assert!(!config.enabled);
assert!(config.protocol.is_none());
assert!(config.destinations.is_empty());
assert!(config.filters.is_none());
}
#[test]
fn test_siem_config_serialization() {
let config = SiemConfig {
enabled: true,
protocol: Some(SiemProtocol::Syslog),
destinations: vec![],
filters: None,
};
let json = serde_json::to_string(&config).unwrap();
assert!(json.contains("enabled"));
assert!(json.contains("syslog"));
}
#[test]
fn test_transport_health_creation() {
let health = TransportHealth {
identifier: "test_transport".to_string(),
healthy: true,
last_success: Some(chrono::Utc::now()),
last_error: None,
success_count: 100,
failure_count: 0,
};
assert_eq!(health.identifier, "test_transport");
assert!(health.healthy);
assert_eq!(health.success_count, 100);
assert_eq!(health.failure_count, 0);
}
#[test]
fn test_transport_health_serialization() {
let health = TransportHealth {
identifier: "transport_1".to_string(),
healthy: false,
last_success: None,
last_error: Some("Connection failed".to_string()),
success_count: 50,
failure_count: 5,
};
let json = serde_json::to_string(&health).unwrap();
assert!(json.contains("transport_1"));
assert!(json.contains("Connection failed"));
}
#[test]
fn test_syslog_transport_new() {
let transport = SyslogTransport::new(
"example.com".to_string(),
514,
"tcp".to_string(),
SyslogFacility::Security,
"app".to_string(),
);
let _ = transport;
}
#[test]
fn test_http_transport_new() {
let mut headers = HashMap::new();
headers.insert("X-Custom-Header".to_string(), "value".to_string());
let transport = HttpTransport::new(
"https://example.com/webhook".to_string(),
"POST".to_string(),
headers,
10,
RetryConfig::default(),
);
let _ = transport;
}
#[test]
fn test_splunk_transport_new() {
let transport = SplunkTransport::new(
"https://splunk.example.com:8088".to_string(),
"token123".to_string(),
Some("index1".to_string()),
Some("json".to_string()),
RetryConfig::default(),
);
let _ = transport;
}
#[test]
fn test_datadog_transport_new() {
let transport = DatadogTransport::new(
"api_key_123".to_string(),
Some("app_key_456".to_string()),
"us".to_string(),
vec!["env:test".to_string()],
RetryConfig::default(),
);
let _ = transport;
}
#[test]
fn test_cloudwatch_transport_new() {
let mut credentials = HashMap::new();
credentials.insert("access_key".to_string(), "key123".to_string());
credentials.insert("secret_key".to_string(), "secret123".to_string());
let transport = CloudwatchTransport::new(
"us-east-1".to_string(),
"log-group-name".to_string(),
"log-stream-name".to_string(),
credentials,
RetryConfig::default(),
);
let _ = transport;
}
#[test]
fn test_gcp_transport_new() {
let transport = GcpTransport::new(
"project-id".to_string(),
"log-name".to_string(),
"/path/to/credentials.json".to_string(),
RetryConfig::default(),
);
let _ = transport;
}
#[test]
fn test_azure_transport_new() {
let transport = AzureTransport::new(
"workspace-id".to_string(),
"shared-key".to_string(),
"CustomLog".to_string(),
RetryConfig::default(),
);
let _ = transport;
}
}