1use crate::security::events::SecurityEvent;
8use crate::Error;
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use tokio::fs::{File, OpenOptions};
15use tokio::io::{AsyncWriteExt, BufWriter};
16use tokio::sync::RwLock;
17use tracing::{debug, error, warn};
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
22#[serde(rename_all = "lowercase")]
23pub enum SiemProtocol {
24 Syslog,
26 Http,
28 Https,
30 File,
32 Splunk,
34 Datadog,
36 Cloudwatch,
38 Gcp,
40 Azure,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
47#[serde(rename_all = "lowercase")]
48#[derive(Default)]
49pub enum SyslogFacility {
50 Kernel = 0,
52 User = 1,
54 Mail = 2,
56 Daemon = 3,
58 Security = 4,
60 Syslogd = 5,
62 LinePrinter = 6,
64 NetworkNews = 7,
66 Uucp = 8,
68 Clock = 9,
70 Security2 = 10,
72 Ftp = 11,
74 Ntp = 12,
76 LogAudit = 13,
78 LogAlert = 14,
80 #[default]
82 Local0 = 16,
83 Local1 = 17,
85 Local2 = 18,
87 Local3 = 19,
89 Local4 = 20,
91 Local5 = 21,
93 Local6 = 22,
95 Local7 = 23,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum SyslogSeverity {
102 Emergency = 0,
104 Alert = 1,
106 Critical = 2,
108 Error = 3,
110 Warning = 4,
112 Notice = 5,
114 Informational = 6,
116 Debug = 7,
118}
119
120impl From<crate::security::events::SecurityEventSeverity> for SyslogSeverity {
121 fn from(severity: crate::security::events::SecurityEventSeverity) -> Self {
122 match severity {
123 crate::security::events::SecurityEventSeverity::Low => SyslogSeverity::Informational,
124 crate::security::events::SecurityEventSeverity::Medium => SyslogSeverity::Warning,
125 crate::security::events::SecurityEventSeverity::High => SyslogSeverity::Error,
126 crate::security::events::SecurityEventSeverity::Critical => SyslogSeverity::Critical,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
134pub struct RetryConfig {
135 pub max_attempts: u32,
137 #[serde(default = "default_backoff")]
139 pub backoff: String,
140 #[serde(default = "default_initial_delay")]
142 pub initial_delay_secs: u64,
143}
144
145fn default_backoff() -> String {
146 "exponential".to_string()
147}
148
149fn default_initial_delay() -> u64 {
150 1
151}
152
153impl Default for RetryConfig {
154 fn default() -> Self {
155 Self {
156 max_attempts: 3,
157 backoff: "exponential".to_string(),
158 initial_delay_secs: 1,
159 }
160 }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
166pub struct FileRotationConfig {
167 pub max_size: String,
169 pub max_files: u32,
171 #[serde(default)]
173 pub compress: bool,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
179pub struct EventFilter {
180 pub include: Option<Vec<String>>,
182 pub exclude: Option<Vec<String>>,
184 pub conditions: Option<Vec<String>>,
186}
187
188impl EventFilter {
189 pub fn should_include(&self, event: &SecurityEvent) -> bool {
191 if let Some(ref includes) = self.include {
193 let mut matched = false;
194 for pattern in includes {
195 if self.matches_pattern(&event.event_type, pattern) {
196 matched = true;
197 break;
198 }
199 }
200 if !matched {
201 return false;
202 }
203 }
204
205 if let Some(ref excludes) = self.exclude {
207 for pattern in excludes {
208 if pattern.starts_with("severity:") {
209 let severity_str = pattern.strip_prefix("severity:").unwrap_or("");
210 if severity_str == "low"
211 && event.severity == crate::security::events::SecurityEventSeverity::Low
212 {
213 return false;
214 }
215 if severity_str == "medium"
216 && event.severity == crate::security::events::SecurityEventSeverity::Medium
217 {
218 return false;
219 }
220 if severity_str == "high"
221 && event.severity == crate::security::events::SecurityEventSeverity::High
222 {
223 return false;
224 }
225 if severity_str == "critical"
226 && event.severity
227 == crate::security::events::SecurityEventSeverity::Critical
228 {
229 return false;
230 }
231 } else if self.matches_pattern(&event.event_type, pattern) {
232 return false;
233 }
234 }
235 }
236
237 true
238 }
239
240 fn matches_pattern(&self, event_type: &str, pattern: &str) -> bool {
241 if pattern.ends_with(".*") {
243 let prefix = pattern.strip_suffix(".*").unwrap_or("");
244 event_type.starts_with(prefix)
245 } else {
246 event_type == pattern
247 }
248 }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
254#[serde(tag = "protocol")]
255pub enum SiemDestination {
256 #[serde(rename = "syslog")]
258 Syslog {
259 host: String,
261 port: u16,
263 #[serde(default = "default_syslog_protocol", rename = "transport")]
265 transport: String,
266 #[serde(default)]
268 facility: SyslogFacility,
269 #[serde(default = "default_tag")]
271 tag: String,
272 },
273 #[serde(rename = "http")]
275 Http {
276 url: String,
278 #[serde(default = "default_http_method")]
280 method: String,
281 #[serde(default)]
283 headers: HashMap<String, String>,
284 #[serde(default = "default_timeout")]
286 timeout: u64,
287 #[serde(default)]
289 retry: RetryConfig,
290 },
291 #[serde(rename = "https")]
293 Https {
294 url: String,
296 #[serde(default = "default_http_method")]
298 method: String,
299 #[serde(default)]
301 headers: HashMap<String, String>,
302 #[serde(default = "default_timeout")]
304 timeout: u64,
305 #[serde(default)]
307 retry: RetryConfig,
308 },
309 #[serde(rename = "file")]
311 File {
312 path: String,
314 #[serde(default = "default_file_format")]
316 format: String,
317 rotation: Option<FileRotationConfig>,
319 },
320 #[serde(rename = "splunk")]
322 Splunk {
323 url: String,
325 token: String,
327 index: Option<String>,
329 source_type: Option<String>,
331 },
332 #[serde(rename = "datadog")]
334 Datadog {
335 api_key: String,
337 app_key: Option<String>,
339 #[serde(default = "default_datadog_site")]
341 site: String,
342 #[serde(default)]
344 tags: Vec<String>,
345 },
346 #[serde(rename = "cloudwatch")]
348 Cloudwatch {
349 region: String,
351 log_group: String,
353 stream: String,
355 credentials: HashMap<String, String>,
357 },
358 #[serde(rename = "gcp")]
360 Gcp {
361 project_id: String,
363 log_name: String,
365 credentials_path: String,
367 },
368 #[serde(rename = "azure")]
370 Azure {
371 workspace_id: String,
373 shared_key: String,
375 log_type: String,
377 },
378}
379
380fn default_syslog_protocol() -> String {
381 "udp".to_string()
382}
383
384fn default_tag() -> String {
385 "mockforge".to_string()
386}
387
388fn default_http_method() -> String {
389 "POST".to_string()
390}
391
392fn default_timeout() -> u64 {
393 5
394}
395
396fn default_file_format() -> String {
397 "jsonl".to_string()
398}
399
400fn default_datadog_site() -> String {
401 "datadoghq.com".to_string()
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
406#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
407#[derive(Default)]
408pub struct SiemConfig {
409 pub enabled: bool,
411 pub protocol: Option<SiemProtocol>,
413 pub destinations: Vec<SiemDestination>,
415 pub filters: Option<EventFilter>,
417}
418
419#[async_trait]
421pub trait SiemTransport: Send + Sync {
422 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error>;
424}
425
426pub struct SyslogTransport {
428 host: String,
429 port: u16,
430 use_tcp: bool,
431 facility: SyslogFacility,
432 tag: String,
433}
434
435impl SyslogTransport {
436 pub fn new(
445 host: String,
446 port: u16,
447 protocol: String,
448 facility: SyslogFacility,
449 tag: String,
450 ) -> Self {
451 Self {
452 host,
453 port,
454 use_tcp: protocol == "tcp",
455 facility,
456 tag,
457 }
458 }
459
460 fn format_syslog_message(&self, event: &SecurityEvent) -> String {
462 let severity: SyslogSeverity = event.severity.into();
463 let priority = (self.facility as u8) * 8 + severity as u8;
464 let timestamp = event.timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ");
465 let hostname = "mockforge"; let app_name = &self.tag;
467 let proc_id = "-";
468 let msg_id = "-";
469 let structured_data = "-"; let msg = event.to_json().unwrap_or_else(|_| "{}".to_string());
471
472 format!(
473 "<{}>1 {} {} {} {} {} {} {}",
474 priority, timestamp, hostname, app_name, proc_id, msg_id, structured_data, msg
475 )
476 }
477}
478
479#[async_trait]
480impl SiemTransport for SyslogTransport {
481 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
482 let message = self.format_syslog_message(event);
483
484 if self.use_tcp {
485 use tokio::net::TcpStream;
487 let addr = format!("{}:{}", self.host, self.port);
488 let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
489 Error::Generic(format!("Failed to connect to syslog server: {}", e))
490 })?;
491 stream
492 .write_all(message.as_bytes())
493 .await
494 .map_err(|e| Error::Generic(format!("Failed to send syslog message: {}", e)))?;
495 } else {
496 use tokio::net::UdpSocket;
498 let socket = UdpSocket::bind("0.0.0.0:0")
499 .await
500 .map_err(|e| Error::Generic(format!("Failed to bind UDP socket: {}", e)))?;
501 let addr = format!("{}:{}", self.host, self.port);
502 socket
503 .send_to(message.as_bytes(), &addr)
504 .await
505 .map_err(|e| Error::Generic(format!("Failed to send UDP syslog message: {}", e)))?;
506 }
507
508 debug!("Sent syslog event: {}", event.event_type);
509 Ok(())
510 }
511}
512
513pub struct HttpTransport {
515 url: String,
516 method: String,
517 headers: HashMap<String, String>,
518 retry: RetryConfig,
519 client: reqwest::Client,
520}
521
522impl HttpTransport {
523 pub fn new(
532 url: String,
533 method: String,
534 headers: HashMap<String, String>,
535 timeout: u64,
536 retry: RetryConfig,
537 ) -> Self {
538 let client = reqwest::Client::builder()
539 .timeout(std::time::Duration::from_secs(timeout))
540 .build()
541 .expect("Failed to create HTTP client");
542
543 Self {
544 url,
545 method,
546 headers,
547 retry,
548 client,
549 }
550 }
551}
552
553#[async_trait]
554impl SiemTransport for HttpTransport {
555 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
556 let event_json = event.to_json()?;
557 let mut request = match self.method.as_str() {
558 "POST" => self.client.post(&self.url),
559 "PUT" => self.client.put(&self.url),
560 "PATCH" => self.client.patch(&self.url),
561 _ => return Err(Error::Generic(format!("Unsupported HTTP method: {}", self.method))),
562 };
563
564 for (key, value) in &self.headers {
566 request = request.header(key, value);
567 }
568
569 if !self.headers.contains_key("Content-Type") {
571 request = request.header("Content-Type", "application/json");
572 }
573
574 request = request.body(event_json);
575
576 let mut last_error = None;
578 for attempt in 0..=self.retry.max_attempts {
579 match request.try_clone() {
580 Some(req) => match req.send().await {
581 Ok(response) => {
582 if response.status().is_success() {
583 debug!("Sent HTTP event to {}: {}", self.url, event.event_type);
584 return Ok(());
585 } else {
586 let status = response.status();
587 last_error = Some(Error::Generic(format!("HTTP error: {}", status)));
588 }
589 }
590 Err(e) => {
591 last_error = Some(Error::Generic(format!("HTTP request failed: {}", e)));
592 }
593 },
594 None => {
595 let event_json = event.to_json()?;
597 let mut req = match self.method.as_str() {
598 "POST" => self.client.post(&self.url),
599 "PUT" => self.client.put(&self.url),
600 "PATCH" => self.client.patch(&self.url),
601 _ => break,
602 };
603 for (key, value) in &self.headers {
604 req = req.header(key, value);
605 }
606 if !self.headers.contains_key("Content-Type") {
607 req = req.header("Content-Type", "application/json");
608 }
609 req = req.body(event_json);
610 request = req;
611 continue;
612 }
613 }
614
615 if attempt < self.retry.max_attempts {
616 let delay = if self.retry.backoff == "exponential" {
618 self.retry.initial_delay_secs * (2_u64.pow(attempt))
619 } else {
620 self.retry.initial_delay_secs * (attempt as u64 + 1)
621 };
622 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
623 }
624 }
625
626 Err(last_error.unwrap_or_else(|| {
627 Error::Generic("Failed to send HTTP event after retries".to_string())
628 }))
629 }
630}
631
632pub struct FileTransport {
634 path: PathBuf,
635 format: String,
636 writer: Arc<RwLock<Option<BufWriter<File>>>>,
637}
638
639impl FileTransport {
640 pub async fn new(path: String, format: String) -> Result<Self, Error> {
649 let path = PathBuf::from(path);
650
651 if let Some(parent) = path.parent() {
653 tokio::fs::create_dir_all(parent)
654 .await
655 .map_err(|e| Error::Generic(format!("Failed to create directory: {}", e)))?;
656 }
657
658 let file = OpenOptions::new()
660 .create(true)
661 .append(true)
662 .open(&path)
663 .await
664 .map_err(|e| Error::Generic(format!("Failed to open file: {}", e)))?;
665
666 let writer = Arc::new(RwLock::new(Some(BufWriter::new(file))));
667
668 Ok(Self {
669 path,
670 format,
671 writer,
672 })
673 }
674}
675
676#[async_trait]
677impl SiemTransport for FileTransport {
678 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
679 let mut writer_guard = self.writer.write().await;
680
681 if let Some(ref mut writer) = *writer_guard {
682 let line = if self.format == "jsonl" {
683 format!("{}\n", event.to_json()?)
684 } else {
685 format!("{}\n", event.to_json()?)
687 };
688
689 writer
690 .write_all(line.as_bytes())
691 .await
692 .map_err(|e| Error::Generic(format!("Failed to write to file: {}", e)))?;
693
694 writer
695 .flush()
696 .await
697 .map_err(|e| Error::Generic(format!("Failed to flush file: {}", e)))?;
698
699 debug!("Wrote event to file {}: {}", self.path.display(), event.event_type);
700 Ok(())
701 } else {
702 Err(Error::Generic("File writer not initialized".to_string()))
703 }
704 }
705}
706
707pub struct SplunkTransport {
709 url: String,
710 token: String,
711 index: Option<String>,
712 source_type: Option<String>,
713 retry: RetryConfig,
714 client: reqwest::Client,
715}
716
717impl SplunkTransport {
718 pub fn new(
720 url: String,
721 token: String,
722 index: Option<String>,
723 source_type: Option<String>,
724 retry: RetryConfig,
725 ) -> Self {
726 let client = reqwest::Client::builder()
727 .timeout(std::time::Duration::from_secs(10))
728 .build()
729 .expect("Failed to create HTTP client");
730
731 Self {
732 url,
733 token,
734 index,
735 source_type,
736 retry,
737 client,
738 }
739 }
740
741 fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
743 let mut splunk_event = serde_json::json!({
744 "event": event.to_json()?,
745 "time": event.timestamp.timestamp(),
746 });
747
748 if let Some(ref index) = self.index {
749 splunk_event["index"] = serde_json::Value::String(index.clone());
750 }
751
752 if let Some(ref st) = self.source_type {
753 splunk_event["sourcetype"] = serde_json::Value::String(st.clone());
754 } else {
755 splunk_event["sourcetype"] =
756 serde_json::Value::String("mockforge:security".to_string());
757 }
758
759 Ok(splunk_event)
760 }
761}
762
763#[async_trait]
764impl SiemTransport for SplunkTransport {
765 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
766 let splunk_event = self.format_event(event)?;
767 let url = format!("{}/services/collector/event", self.url.trim_end_matches('/'));
768
769 let mut last_error = None;
770 for attempt in 0..=self.retry.max_attempts {
771 match self
772 .client
773 .post(&url)
774 .header("Authorization", format!("Splunk {}", self.token))
775 .header("Content-Type", "application/json")
776 .json(&splunk_event)
777 .send()
778 .await
779 {
780 Ok(response) => {
781 if response.status().is_success() {
782 debug!("Sent Splunk event: {}", event.event_type);
783 return Ok(());
784 } else {
785 let status = response.status();
786 let body = response.text().await.unwrap_or_default();
787 last_error =
788 Some(Error::Generic(format!("Splunk HTTP error {}: {}", status, body)));
789 }
790 }
791 Err(e) => {
792 last_error = Some(Error::Generic(format!("Splunk request failed: {}", e)));
793 }
794 }
795
796 if attempt < self.retry.max_attempts {
797 let delay = if self.retry.backoff == "exponential" {
798 self.retry.initial_delay_secs * (2_u64.pow(attempt))
799 } else {
800 self.retry.initial_delay_secs * (attempt as u64 + 1)
801 };
802 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
803 }
804 }
805
806 Err(last_error.unwrap_or_else(|| {
807 Error::Generic("Failed to send Splunk event after retries".to_string())
808 }))
809 }
810}
811
812pub struct DatadogTransport {
814 api_key: String,
815 app_key: Option<String>,
816 site: String,
817 tags: Vec<String>,
818 retry: RetryConfig,
819 client: reqwest::Client,
820}
821
822impl DatadogTransport {
823 pub fn new(
825 api_key: String,
826 app_key: Option<String>,
827 site: String,
828 tags: Vec<String>,
829 retry: RetryConfig,
830 ) -> Self {
831 let client = reqwest::Client::builder()
832 .timeout(std::time::Duration::from_secs(10))
833 .build()
834 .expect("Failed to create HTTP client");
835
836 Self {
837 api_key,
838 app_key,
839 site,
840 tags,
841 retry,
842 client,
843 }
844 }
845
846 fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
848 let mut tags = self.tags.clone();
849 tags.push(format!("event_type:{}", event.event_type));
850 tags.push(format!("severity:{}", format!("{:?}", event.severity).to_lowercase()));
851
852 let datadog_event = serde_json::json!({
853 "title": format!("MockForge Security Event: {}", event.event_type),
854 "text": event.to_json()?,
855 "alert_type": match event.severity {
856 crate::security::events::SecurityEventSeverity::Critical => "error",
857 crate::security::events::SecurityEventSeverity::High => "warning",
858 crate::security::events::SecurityEventSeverity::Medium => "info",
859 crate::security::events::SecurityEventSeverity::Low => "info",
860 },
861 "tags": tags,
862 "date_happened": event.timestamp.timestamp(),
863 });
864
865 Ok(datadog_event)
866 }
867}
868
869#[async_trait]
870impl SiemTransport for DatadogTransport {
871 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
872 let datadog_event = self.format_event(event)?;
873 let url = format!("https://api.{}/api/v1/events", self.site);
874
875 let mut last_error = None;
876 for attempt in 0..=self.retry.max_attempts {
877 let mut request =
878 self.client.post(&url).header("DD-API-KEY", &self.api_key).json(&datadog_event);
879
880 if let Some(ref app_key) = self.app_key {
881 request = request.header("DD-APPLICATION-KEY", app_key);
882 }
883
884 match request.send().await {
885 Ok(response) => {
886 if response.status().is_success() {
887 debug!("Sent Datadog event: {}", event.event_type);
888 return Ok(());
889 } else {
890 let status = response.status();
891 let body = response.text().await.unwrap_or_default();
892 last_error = Some(Error::Generic(format!(
893 "Datadog HTTP error {}: {}",
894 status, body
895 )));
896 }
897 }
898 Err(e) => {
899 last_error = Some(Error::Generic(format!("Datadog request failed: {}", e)));
900 }
901 }
902
903 if attempt < self.retry.max_attempts {
904 let delay = if self.retry.backoff == "exponential" {
905 self.retry.initial_delay_secs * (2_u64.pow(attempt))
906 } else {
907 self.retry.initial_delay_secs * (attempt as u64 + 1)
908 };
909 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
910 }
911 }
912
913 Err(last_error.unwrap_or_else(|| {
914 Error::Generic("Failed to send Datadog event after retries".to_string())
915 }))
916 }
917}
918
919pub struct CloudwatchTransport {
921 region: String,
922 log_group: String,
923 stream: String,
924 credentials: HashMap<String, String>,
925 retry: RetryConfig,
926 client: reqwest::Client,
927}
928
929impl CloudwatchTransport {
930 pub fn new(
932 region: String,
933 log_group: String,
934 stream: String,
935 credentials: HashMap<String, String>,
936 retry: RetryConfig,
937 ) -> Self {
938 let client = reqwest::Client::builder()
939 .timeout(std::time::Duration::from_secs(10))
940 .build()
941 .expect("Failed to create HTTP client");
942
943 Self {
944 region,
945 log_group,
946 stream,
947 credentials,
948 retry,
949 client,
950 }
951 }
952}
953
954#[async_trait]
955impl SiemTransport for CloudwatchTransport {
956 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
957 let event_json = event.to_json()?;
958 let log_events = serde_json::json!({
959 "logGroupName": self.log_group,
960 "logStreamName": self.stream,
961 "logEvents": [{
962 "timestamp": event.timestamp.timestamp_millis(),
963 "message": event_json
964 }]
965 });
966
967 let url = format!("https://logs.{}.amazonaws.com/", self.region);
968
969 let mut attempt = 0;
970 loop {
971 let mut req = self
972 .client
973 .post(&url)
974 .header("Content-Type", "application/x-amz-json-1.1")
975 .header("X-Amz-Target", "Logs_20140328.PutLogEvents");
976
977 if let Some(access_key) = self.credentials.get("access_key_id") {
979 req = req.header("X-Amz-Access-Key", access_key.as_str());
980 }
981 if let Some(token) = self.credentials.get("session_token") {
982 req = req.header("X-Amz-Security-Token", token.as_str());
983 }
984
985 let result = req.json(&log_events).send().await;
986
987 match result {
988 Ok(resp) if resp.status().is_success() => {
989 debug!(
990 "CloudWatch event sent to log_group={}, stream={}: {}",
991 self.log_group, self.stream, event.event_type
992 );
993 return Ok(());
994 }
995 Ok(resp) => {
996 let status = resp.status();
997 let body = resp.text().await.unwrap_or_default();
998 attempt += 1;
999 if attempt >= self.retry.max_attempts as usize {
1000 warn!(
1001 "CloudWatch transport failed after {} attempts (status={}): {}",
1002 attempt, status, body
1003 );
1004 return Err(Error::generic(format!(
1005 "CloudWatch PutLogEvents failed with status {}: {}",
1006 status, body
1007 )));
1008 }
1009 let delay = std::time::Duration::from_millis(
1010 self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
1011 );
1012 tokio::time::sleep(delay).await;
1013 }
1014 Err(e) => {
1015 attempt += 1;
1016 if attempt >= self.retry.max_attempts as usize {
1017 warn!("CloudWatch transport failed after {} attempts: {}", attempt, e);
1018 return Err(Error::generic(format!(
1019 "CloudWatch PutLogEvents request failed: {}",
1020 e
1021 )));
1022 }
1023 let delay = std::time::Duration::from_millis(
1024 self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
1025 );
1026 tokio::time::sleep(delay).await;
1027 }
1028 }
1029 }
1030 }
1031}
1032
1033pub struct GcpTransport {
1035 project_id: String,
1036 log_name: String,
1037 credentials_path: String,
1038 retry: RetryConfig,
1039 client: reqwest::Client,
1040}
1041
1042impl GcpTransport {
1043 pub fn new(
1045 project_id: String,
1046 log_name: String,
1047 credentials_path: String,
1048 retry: RetryConfig,
1049 ) -> Self {
1050 let client = reqwest::Client::builder()
1051 .timeout(std::time::Duration::from_secs(10))
1052 .build()
1053 .expect("Failed to create HTTP client");
1054
1055 Self {
1056 project_id,
1057 log_name,
1058 credentials_path,
1059 retry,
1060 client,
1061 }
1062 }
1063}
1064
1065#[async_trait]
1066impl SiemTransport for GcpTransport {
1067 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
1068 let event_json = event.to_json()?;
1069 let log_entry = serde_json::json!({
1070 "entries": [{
1071 "logName": format!("projects/{}/logs/{}", self.project_id, self.log_name),
1072 "resource": {
1073 "type": "global"
1074 },
1075 "timestamp": event.timestamp.to_rfc3339(),
1076 "jsonPayload": serde_json::from_str::<serde_json::Value>(&event_json)
1077 .unwrap_or_else(|_| serde_json::json!({"message": event_json}))
1078 }]
1079 });
1080
1081 let url = "https://logging.googleapis.com/v2/entries:write";
1082
1083 let bearer_token =
1085 std::fs::read_to_string(&self.credentials_path).ok().and_then(|contents| {
1086 serde_json::from_str::<serde_json::Value>(&contents)
1087 .ok()
1088 .and_then(|v| v.get("access_token").and_then(|t| t.as_str().map(String::from)))
1089 });
1090
1091 let mut attempt = 0;
1092 loop {
1093 let mut req = self.client.post(url).header("Content-Type", "application/json");
1094
1095 if let Some(ref token) = bearer_token {
1096 req = req.bearer_auth(token);
1097 }
1098
1099 let result = req.json(&log_entry).send().await;
1100
1101 match result {
1102 Ok(resp) if resp.status().is_success() => {
1103 debug!(
1104 "GCP event sent to project={}, log={}: {}",
1105 self.project_id, self.log_name, event.event_type
1106 );
1107 return Ok(());
1108 }
1109 Ok(resp) => {
1110 let status = resp.status();
1111 let body = resp.text().await.unwrap_or_default();
1112 attempt += 1;
1113 if attempt >= self.retry.max_attempts as usize {
1114 warn!(
1115 "GCP transport failed after {} attempts (status={}): {}",
1116 attempt, status, body
1117 );
1118 return Err(Error::generic(format!(
1119 "GCP entries:write failed with status {}: {}",
1120 status, body
1121 )));
1122 }
1123 let delay = std::time::Duration::from_millis(
1124 self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
1125 );
1126 tokio::time::sleep(delay).await;
1127 }
1128 Err(e) => {
1129 attempt += 1;
1130 if attempt >= self.retry.max_attempts as usize {
1131 warn!("GCP transport failed after {} attempts: {}", attempt, e);
1132 return Err(Error::generic(format!(
1133 "GCP entries:write request failed: {}",
1134 e
1135 )));
1136 }
1137 let delay = std::time::Duration::from_millis(
1138 self.retry.initial_delay_secs * 1000 * 2u64.pow(attempt as u32 - 1),
1139 );
1140 tokio::time::sleep(delay).await;
1141 }
1142 }
1143 }
1144 }
1145}
1146
1147pub struct AzureTransport {
1149 workspace_id: String,
1150 shared_key: String,
1151 log_type: String,
1152 retry: RetryConfig,
1153 client: reqwest::Client,
1154}
1155
1156impl AzureTransport {
1157 pub fn new(
1159 workspace_id: String,
1160 shared_key: String,
1161 log_type: String,
1162 retry: RetryConfig,
1163 ) -> Self {
1164 let client = reqwest::Client::builder()
1165 .timeout(std::time::Duration::from_secs(10))
1166 .build()
1167 .expect("Failed to create HTTP client");
1168
1169 Self {
1170 workspace_id,
1171 shared_key,
1172 log_type,
1173 retry,
1174 client,
1175 }
1176 }
1177
1178 fn generate_signature(
1180 &self,
1181 date: &str,
1182 content_length: usize,
1183 method: &str,
1184 content_type: &str,
1185 resource: &str,
1186 ) -> Result<String, Error> {
1187 use hmac::{Hmac, Mac};
1188 use sha2::Sha256;
1189
1190 type HmacSha256 = Hmac<Sha256>;
1191
1192 let string_to_sign =
1193 format!("{}\n{}\n{}\n{}\n{}", method, content_length, content_type, date, resource);
1194
1195 let key_bytes = base64::decode(&self.shared_key)
1196 .map_err(|e| Error::generic(format!("Azure shared_key is not valid base64: {}", e)))?;
1197
1198 let mut mac =
1199 HmacSha256::new_from_slice(&key_bytes).expect("HMAC can take key of any size");
1200
1201 mac.update(string_to_sign.as_bytes());
1202 let result = mac.finalize();
1203 Ok(base64::encode(result.into_bytes()))
1204 }
1205}
1206
1207#[async_trait]
1208impl SiemTransport for AzureTransport {
1209 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
1210 let event_json = event.to_json()?;
1211 let url = format!(
1212 "https://{}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01",
1213 self.workspace_id
1214 );
1215
1216 let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
1217 let content_type = "application/json";
1218 let content_length = event_json.len();
1219 let method = "POST";
1220 let resource = "/api/logs?api-version=2016-04-01".to_string();
1221
1222 let signature =
1223 self.generate_signature(&date, content_length, method, content_type, &resource)?;
1224
1225 let mut last_error = None;
1226 for attempt in 0..=self.retry.max_attempts {
1227 let log_entry = serde_json::json!({
1228 "log_type": self.log_type,
1229 "time_generated": event.timestamp.to_rfc3339(),
1230 "data": serde_json::from_str::<serde_json::Value>(&event_json)
1231 .unwrap_or_else(|_| serde_json::json!({"message": event_json}))
1232 });
1233
1234 match self
1235 .client
1236 .post(&url)
1237 .header("x-ms-date", &date)
1238 .header("Content-Type", content_type)
1239 .header("Authorization", format!("SharedKey {}:{}", self.workspace_id, signature))
1240 .header("Log-Type", &self.log_type)
1241 .header("time-generated-field", "time_generated")
1242 .body(serde_json::to_string(&log_entry)?)
1243 .send()
1244 .await
1245 {
1246 Ok(response) => {
1247 if response.status().is_success() {
1248 debug!("Sent Azure Monitor event: {}", event.event_type);
1249 return Ok(());
1250 } else {
1251 let status = response.status();
1252 let body = response.text().await.unwrap_or_default();
1253 last_error = Some(Error::Generic(format!(
1254 "Azure Monitor HTTP error {}: {}",
1255 status, body
1256 )));
1257 }
1258 }
1259 Err(e) => {
1260 last_error =
1261 Some(Error::Generic(format!("Azure Monitor request failed: {}", e)));
1262 }
1263 }
1264
1265 if attempt < self.retry.max_attempts {
1266 let delay = if self.retry.backoff == "exponential" {
1267 self.retry.initial_delay_secs * (2_u64.pow(attempt))
1268 } else {
1269 self.retry.initial_delay_secs * (attempt as u64 + 1)
1270 };
1271 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
1272 }
1273 }
1274
1275 Err(last_error.unwrap_or_else(|| {
1276 Error::Generic("Failed to send Azure Monitor event after retries".to_string())
1277 }))
1278 }
1279}
1280
1281#[derive(Debug, Clone, Serialize, Deserialize)]
1283pub struct TransportHealth {
1284 pub identifier: String,
1286 pub healthy: bool,
1288 pub last_success: Option<chrono::DateTime<chrono::Utc>>,
1290 pub last_error: Option<String>,
1292 pub success_count: u64,
1294 pub failure_count: u64,
1296}
1297
1298pub struct SiemEmitter {
1300 transports: Vec<Box<dyn SiemTransport>>,
1301 filters: Option<EventFilter>,
1302 health_status: Arc<RwLock<Vec<TransportHealth>>>,
1304}
1305
1306impl SiemEmitter {
1307 pub async fn from_config(config: SiemConfig) -> Result<Self, Error> {
1309 if !config.enabled {
1310 return Ok(Self {
1311 transports: Vec::new(),
1312 filters: config.filters,
1313 health_status: Arc::new(RwLock::new(Vec::new())),
1314 });
1315 }
1316
1317 let mut transports: Vec<Box<dyn SiemTransport>> = Vec::new();
1318
1319 for dest in config.destinations {
1320 let transport: Box<dyn SiemTransport> = match dest {
1321 SiemDestination::Syslog {
1322 host,
1323 port,
1324 transport,
1325 facility,
1326 tag,
1327 } => Box::new(SyslogTransport::new(host, port, transport, facility, tag)),
1328 SiemDestination::Http {
1329 url,
1330 method,
1331 headers,
1332 timeout,
1333 retry,
1334 } => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
1335 SiemDestination::Https {
1336 url,
1337 method,
1338 headers,
1339 timeout,
1340 retry,
1341 } => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
1342 SiemDestination::File { path, format, .. } => {
1343 Box::new(FileTransport::new(path, format).await?)
1344 }
1345 SiemDestination::Splunk {
1346 url,
1347 token,
1348 index,
1349 source_type,
1350 } => Box::new(SplunkTransport::new(
1351 url,
1352 token,
1353 index,
1354 source_type,
1355 RetryConfig::default(),
1356 )),
1357 SiemDestination::Datadog {
1358 api_key,
1359 app_key,
1360 site,
1361 tags,
1362 } => Box::new(DatadogTransport::new(
1363 api_key,
1364 app_key,
1365 site,
1366 tags,
1367 RetryConfig::default(),
1368 )),
1369 SiemDestination::Cloudwatch {
1370 region,
1371 log_group,
1372 stream,
1373 credentials,
1374 } => Box::new(CloudwatchTransport::new(
1375 region,
1376 log_group,
1377 stream,
1378 credentials,
1379 RetryConfig::default(),
1380 )),
1381 SiemDestination::Gcp {
1382 project_id,
1383 log_name,
1384 credentials_path,
1385 } => Box::new(GcpTransport::new(
1386 project_id,
1387 log_name,
1388 credentials_path,
1389 RetryConfig::default(),
1390 )),
1391 SiemDestination::Azure {
1392 workspace_id,
1393 shared_key,
1394 log_type,
1395 } => Box::new(AzureTransport::new(
1396 workspace_id,
1397 shared_key,
1398 log_type,
1399 RetryConfig::default(),
1400 )),
1401 };
1402 transports.push(transport);
1403 }
1404
1405 let health_status = Arc::new(RwLock::new(
1406 transports
1407 .iter()
1408 .enumerate()
1409 .map(|(i, _)| TransportHealth {
1410 identifier: format!("transport_{}", i),
1411 healthy: true,
1412 last_success: None,
1413 last_error: None,
1414 success_count: 0,
1415 failure_count: 0,
1416 })
1417 .collect(),
1418 ));
1419
1420 Ok(Self {
1421 transports,
1422 filters: config.filters,
1423 health_status,
1424 })
1425 }
1426
1427 pub async fn emit(&self, event: SecurityEvent) -> Result<(), Error> {
1429 if let Some(ref filter) = self.filters {
1431 if !filter.should_include(&event) {
1432 debug!("Event filtered out: {}", event.event_type);
1433 return Ok(());
1434 }
1435 }
1436
1437 let mut errors = Vec::new();
1439 let mut health_status = self.health_status.write().await;
1440
1441 for (idx, transport) in self.transports.iter().enumerate() {
1442 match transport.send_event(&event).await {
1443 Ok(()) => {
1444 if let Some(health) = health_status.get_mut(idx) {
1445 health.healthy = true;
1446 health.last_success = Some(chrono::Utc::now());
1447 health.success_count += 1;
1448 health.last_error = None;
1449 }
1450 }
1451 Err(e) => {
1452 let error_msg = format!("{}", e);
1453 error!("Failed to send event to SIEM: {}", error_msg);
1454 errors.push(Error::Generic(error_msg.clone()));
1455 if let Some(health) = health_status.get_mut(idx) {
1456 health.healthy = false;
1457 health.failure_count += 1;
1458 health.last_error = Some(error_msg);
1459 }
1460 }
1461 }
1462 }
1463
1464 drop(health_status);
1465
1466 if !errors.is_empty() && errors.len() == self.transports.len() {
1467 return Err(Error::Generic(format!(
1469 "All SIEM transports failed: {} errors",
1470 errors.len()
1471 )));
1472 }
1473
1474 Ok(())
1475 }
1476
1477 pub async fn health_status(&self) -> Vec<TransportHealth> {
1479 self.health_status.read().await.clone()
1480 }
1481
1482 pub async fn is_healthy(&self) -> bool {
1484 let health_status = self.health_status.read().await;
1485 health_status.iter().any(|h| h.healthy)
1486 }
1487
1488 pub async fn health_summary(&self) -> (usize, usize, usize) {
1490 let health_status = self.health_status.read().await;
1491 let total = health_status.len();
1492 let healthy = health_status.iter().filter(|h| h.healthy).count();
1493 let unhealthy = total - healthy;
1494 (total, healthy, unhealthy)
1495 }
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500 use super::*;
1501 use crate::security::events::{SecurityEvent, SecurityEventType};
1502
1503 #[test]
1504 fn test_event_filter_include() {
1505 let filter = EventFilter {
1506 include: Some(vec!["auth.*".to_string()]),
1507 exclude: None,
1508 conditions: None,
1509 };
1510
1511 let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1512
1513 assert!(filter.should_include(&event));
1514
1515 let event = SecurityEvent::new(SecurityEventType::ConfigChanged, None, None);
1516
1517 assert!(!filter.should_include(&event));
1518 }
1519
1520 #[test]
1521 fn test_event_filter_exclude() {
1522 let filter = EventFilter {
1523 include: None,
1524 exclude: Some(vec!["severity:low".to_string()]),
1525 conditions: None,
1526 };
1527
1528 let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1529
1530 assert!(!filter.should_include(&event));
1531
1532 let event = SecurityEvent::new(SecurityEventType::AuthFailure, None, None);
1533
1534 assert!(filter.should_include(&event));
1535 }
1536
1537 #[tokio::test]
1538 async fn test_syslog_transport_format() {
1539 let transport = SyslogTransport::new(
1540 "localhost".to_string(),
1541 514,
1542 "udp".to_string(),
1543 SyslogFacility::Local0,
1544 "mockforge".to_string(),
1545 );
1546
1547 let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1548
1549 let message = transport.format_syslog_message(&event);
1550 assert!(message.starts_with("<"));
1551 assert!(message.contains("mockforge"));
1552 }
1553
1554 #[test]
1555 fn test_siem_protocol_serialization() {
1556 let protocols = vec![
1557 SiemProtocol::Syslog,
1558 SiemProtocol::Http,
1559 SiemProtocol::Https,
1560 SiemProtocol::File,
1561 SiemProtocol::Splunk,
1562 SiemProtocol::Datadog,
1563 SiemProtocol::Cloudwatch,
1564 SiemProtocol::Gcp,
1565 SiemProtocol::Azure,
1566 ];
1567
1568 for protocol in protocols {
1569 let json = serde_json::to_string(&protocol).unwrap();
1570 assert!(!json.is_empty());
1571 let deserialized: SiemProtocol = serde_json::from_str(&json).unwrap();
1572 assert_eq!(protocol, deserialized);
1573 }
1574 }
1575
1576 #[test]
1577 fn test_syslog_facility_default() {
1578 let facility = SyslogFacility::default();
1579 assert_eq!(facility, SyslogFacility::Local0);
1580 }
1581
1582 #[test]
1583 fn test_syslog_facility_serialization() {
1584 let facilities = vec![
1585 SyslogFacility::Kernel,
1586 SyslogFacility::User,
1587 SyslogFacility::Security,
1588 SyslogFacility::Local0,
1589 SyslogFacility::Local7,
1590 ];
1591
1592 for facility in facilities {
1593 let json = serde_json::to_string(&facility).unwrap();
1594 assert!(!json.is_empty());
1595 let deserialized: SyslogFacility = serde_json::from_str(&json).unwrap();
1596 assert_eq!(facility, deserialized);
1597 }
1598 }
1599
1600 #[test]
1601 fn test_syslog_severity_from_security_event_severity() {
1602 use crate::security::events::SecurityEventSeverity;
1603
1604 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Low), SyslogSeverity::Informational);
1605 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Medium), SyslogSeverity::Warning);
1606 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::High), SyslogSeverity::Error);
1607 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Critical), SyslogSeverity::Critical);
1608 }
1609
1610 #[test]
1611 fn test_retry_config_default() {
1612 let config = RetryConfig::default();
1613 assert_eq!(config.max_attempts, 3);
1614 assert_eq!(config.backoff, "exponential");
1615 assert_eq!(config.initial_delay_secs, 1);
1616 }
1617
1618 #[test]
1619 fn test_retry_config_serialization() {
1620 let config = RetryConfig {
1621 max_attempts: 5,
1622 backoff: "linear".to_string(),
1623 initial_delay_secs: 2,
1624 };
1625
1626 let json = serde_json::to_string(&config).unwrap();
1627 assert!(json.contains("max_attempts"));
1628 assert!(json.contains("linear"));
1629 }
1630
1631 #[test]
1632 fn test_file_rotation_config_serialization() {
1633 let config = FileRotationConfig {
1634 max_size: "100MB".to_string(),
1635 max_files: 10,
1636 compress: true,
1637 };
1638
1639 let json = serde_json::to_string(&config).unwrap();
1640 assert!(json.contains("100MB"));
1641 assert!(json.contains("max_files"));
1642 }
1643
1644 #[test]
1645 fn test_siem_config_default() {
1646 let config = SiemConfig::default();
1647 assert!(!config.enabled);
1648 assert!(config.protocol.is_none());
1649 assert!(config.destinations.is_empty());
1650 assert!(config.filters.is_none());
1651 }
1652
1653 #[test]
1654 fn test_siem_config_serialization() {
1655 let config = SiemConfig {
1656 enabled: true,
1657 protocol: Some(SiemProtocol::Syslog),
1658 destinations: vec![],
1659 filters: None,
1660 };
1661
1662 let json = serde_json::to_string(&config).unwrap();
1663 assert!(json.contains("enabled"));
1664 assert!(json.contains("syslog"));
1665 }
1666
1667 #[test]
1668 fn test_transport_health_creation() {
1669 let health = TransportHealth {
1670 identifier: "test_transport".to_string(),
1671 healthy: true,
1672 last_success: Some(chrono::Utc::now()),
1673 last_error: None,
1674 success_count: 100,
1675 failure_count: 0,
1676 };
1677
1678 assert_eq!(health.identifier, "test_transport");
1679 assert!(health.healthy);
1680 assert_eq!(health.success_count, 100);
1681 assert_eq!(health.failure_count, 0);
1682 }
1683
1684 #[test]
1685 fn test_transport_health_serialization() {
1686 let health = TransportHealth {
1687 identifier: "transport_1".to_string(),
1688 healthy: false,
1689 last_success: None,
1690 last_error: Some("Connection failed".to_string()),
1691 success_count: 50,
1692 failure_count: 5,
1693 };
1694
1695 let json = serde_json::to_string(&health).unwrap();
1696 assert!(json.contains("transport_1"));
1697 assert!(json.contains("Connection failed"));
1698 }
1699
1700 #[test]
1701 fn test_syslog_transport_new() {
1702 let transport = SyslogTransport::new(
1703 "example.com".to_string(),
1704 514,
1705 "tcp".to_string(),
1706 SyslogFacility::Security,
1707 "app".to_string(),
1708 );
1709
1710 let _ = transport;
1712 }
1713
1714 #[test]
1715 fn test_http_transport_new() {
1716 let mut headers = HashMap::new();
1717 headers.insert("X-Custom-Header".to_string(), "value".to_string());
1718 let transport = HttpTransport::new(
1719 "https://example.com/webhook".to_string(),
1720 "POST".to_string(),
1721 headers,
1722 10,
1723 RetryConfig::default(),
1724 );
1725
1726 let _ = transport;
1728 }
1729
1730 #[test]
1731 fn test_splunk_transport_new() {
1732 let transport = SplunkTransport::new(
1733 "https://splunk.example.com:8088".to_string(),
1734 "token123".to_string(),
1735 Some("index1".to_string()),
1736 Some("json".to_string()),
1737 RetryConfig::default(),
1738 );
1739
1740 let _ = transport;
1742 }
1743
1744 #[test]
1745 fn test_datadog_transport_new() {
1746 let transport = DatadogTransport::new(
1747 "api_key_123".to_string(),
1748 Some("app_key_456".to_string()),
1749 "us".to_string(),
1750 vec!["env:test".to_string()],
1751 RetryConfig::default(),
1752 );
1753
1754 let _ = transport;
1756 }
1757
1758 #[test]
1759 fn test_cloudwatch_transport_new() {
1760 let mut credentials = HashMap::new();
1761 credentials.insert("access_key".to_string(), "key123".to_string());
1762 credentials.insert("secret_key".to_string(), "secret123".to_string());
1763 let transport = CloudwatchTransport::new(
1764 "us-east-1".to_string(),
1765 "log-group-name".to_string(),
1766 "log-stream-name".to_string(),
1767 credentials,
1768 RetryConfig::default(),
1769 );
1770
1771 let _ = transport;
1773 }
1774
1775 #[test]
1776 fn test_gcp_transport_new() {
1777 let transport = GcpTransport::new(
1778 "project-id".to_string(),
1779 "log-name".to_string(),
1780 "/path/to/credentials.json".to_string(),
1781 RetryConfig::default(),
1782 );
1783
1784 let _ = transport;
1786 }
1787
1788 #[test]
1789 fn test_azure_transport_new() {
1790 let transport = AzureTransport::new(
1791 "workspace-id".to_string(),
1792 "shared-key".to_string(),
1793 "CustomLog".to_string(),
1794 RetryConfig::default(),
1795 );
1796
1797 let _ = transport;
1799 }
1800}