1use crate::security::events::SecurityEvent;
8use crate::Error;
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use tokio::fs::{File, OpenOptions};
15use tokio::io::{AsyncWriteExt, BufWriter};
16use tokio::sync::RwLock;
17use tracing::{debug, error, warn};
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
22#[serde(rename_all = "lowercase")]
23pub enum SiemProtocol {
24 Syslog,
26 Http,
28 Https,
30 File,
32 Splunk,
34 Datadog,
36 Cloudwatch,
38 Gcp,
40 Azure,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
47#[serde(rename_all = "lowercase")]
48#[derive(Default)]
49pub enum SyslogFacility {
50 Kernel = 0,
52 User = 1,
54 Mail = 2,
56 Daemon = 3,
58 Security = 4,
60 Syslogd = 5,
62 LinePrinter = 6,
64 NetworkNews = 7,
66 Uucp = 8,
68 Clock = 9,
70 Security2 = 10,
72 Ftp = 11,
74 Ntp = 12,
76 LogAudit = 13,
78 LogAlert = 14,
80 #[default]
82 Local0 = 16,
83 Local1 = 17,
85 Local2 = 18,
87 Local3 = 19,
89 Local4 = 20,
91 Local5 = 21,
93 Local6 = 22,
95 Local7 = 23,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum SyslogSeverity {
102 Emergency = 0,
104 Alert = 1,
106 Critical = 2,
108 Error = 3,
110 Warning = 4,
112 Notice = 5,
114 Informational = 6,
116 Debug = 7,
118}
119
120impl From<crate::security::events::SecurityEventSeverity> for SyslogSeverity {
121 fn from(severity: crate::security::events::SecurityEventSeverity) -> Self {
122 match severity {
123 crate::security::events::SecurityEventSeverity::Low => SyslogSeverity::Informational,
124 crate::security::events::SecurityEventSeverity::Medium => SyslogSeverity::Warning,
125 crate::security::events::SecurityEventSeverity::High => SyslogSeverity::Error,
126 crate::security::events::SecurityEventSeverity::Critical => SyslogSeverity::Critical,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
134pub struct RetryConfig {
135 pub max_attempts: u32,
137 #[serde(default = "default_backoff")]
139 pub backoff: String,
140 #[serde(default = "default_initial_delay")]
142 pub initial_delay_secs: u64,
143}
144
145fn default_backoff() -> String {
146 "exponential".to_string()
147}
148
149fn default_initial_delay() -> u64 {
150 1
151}
152
153impl Default for RetryConfig {
154 fn default() -> Self {
155 Self {
156 max_attempts: 3,
157 backoff: "exponential".to_string(),
158 initial_delay_secs: 1,
159 }
160 }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
166pub struct FileRotationConfig {
167 pub max_size: String,
169 pub max_files: u32,
171 #[serde(default)]
173 pub compress: bool,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
179pub struct EventFilter {
180 pub include: Option<Vec<String>>,
182 pub exclude: Option<Vec<String>>,
184 pub conditions: Option<Vec<String>>,
186}
187
188impl EventFilter {
189 pub fn should_include(&self, event: &SecurityEvent) -> bool {
191 if let Some(ref includes) = self.include {
193 let mut matched = false;
194 for pattern in includes {
195 if self.matches_pattern(&event.event_type, pattern) {
196 matched = true;
197 break;
198 }
199 }
200 if !matched {
201 return false;
202 }
203 }
204
205 if let Some(ref excludes) = self.exclude {
207 for pattern in excludes {
208 if pattern.starts_with("severity:") {
209 let severity_str = pattern.strip_prefix("severity:").unwrap_or("");
210 if severity_str == "low"
211 && event.severity == crate::security::events::SecurityEventSeverity::Low
212 {
213 return false;
214 }
215 if severity_str == "medium"
216 && event.severity == crate::security::events::SecurityEventSeverity::Medium
217 {
218 return false;
219 }
220 if severity_str == "high"
221 && event.severity == crate::security::events::SecurityEventSeverity::High
222 {
223 return false;
224 }
225 if severity_str == "critical"
226 && event.severity
227 == crate::security::events::SecurityEventSeverity::Critical
228 {
229 return false;
230 }
231 } else if self.matches_pattern(&event.event_type, pattern) {
232 return false;
233 }
234 }
235 }
236
237 true
238 }
239
240 fn matches_pattern(&self, event_type: &str, pattern: &str) -> bool {
241 if pattern.ends_with(".*") {
243 let prefix = pattern.strip_suffix(".*").unwrap_or("");
244 event_type.starts_with(prefix)
245 } else {
246 event_type == pattern
247 }
248 }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
254#[serde(tag = "protocol")]
255pub enum SiemDestination {
256 #[serde(rename = "syslog")]
258 Syslog {
259 host: String,
261 port: u16,
263 #[serde(default = "default_syslog_protocol", rename = "transport")]
265 transport: String,
266 #[serde(default)]
268 facility: SyslogFacility,
269 #[serde(default = "default_tag")]
271 tag: String,
272 },
273 #[serde(rename = "http")]
275 Http {
276 url: String,
278 #[serde(default = "default_http_method")]
280 method: String,
281 #[serde(default)]
283 headers: HashMap<String, String>,
284 #[serde(default = "default_timeout")]
286 timeout: u64,
287 #[serde(default)]
289 retry: RetryConfig,
290 },
291 #[serde(rename = "https")]
293 Https {
294 url: String,
296 #[serde(default = "default_http_method")]
298 method: String,
299 #[serde(default)]
301 headers: HashMap<String, String>,
302 #[serde(default = "default_timeout")]
304 timeout: u64,
305 #[serde(default)]
307 retry: RetryConfig,
308 },
309 #[serde(rename = "file")]
311 File {
312 path: String,
314 #[serde(default = "default_file_format")]
316 format: String,
317 rotation: Option<FileRotationConfig>,
319 },
320 #[serde(rename = "splunk")]
322 Splunk {
323 url: String,
325 token: String,
327 index: Option<String>,
329 source_type: Option<String>,
331 },
332 #[serde(rename = "datadog")]
334 Datadog {
335 api_key: String,
337 app_key: Option<String>,
339 #[serde(default = "default_datadog_site")]
341 site: String,
342 #[serde(default)]
344 tags: Vec<String>,
345 },
346 #[serde(rename = "cloudwatch")]
348 Cloudwatch {
349 region: String,
351 log_group: String,
353 stream: String,
355 credentials: HashMap<String, String>,
357 },
358 #[serde(rename = "gcp")]
360 Gcp {
361 project_id: String,
363 log_name: String,
365 credentials_path: String,
367 },
368 #[serde(rename = "azure")]
370 Azure {
371 workspace_id: String,
373 shared_key: String,
375 log_type: String,
377 },
378}
379
380fn default_syslog_protocol() -> String {
381 "udp".to_string()
382}
383
384fn default_tag() -> String {
385 "mockforge".to_string()
386}
387
388fn default_http_method() -> String {
389 "POST".to_string()
390}
391
392fn default_timeout() -> u64 {
393 5
394}
395
396fn default_file_format() -> String {
397 "jsonl".to_string()
398}
399
400fn default_datadog_site() -> String {
401 "datadoghq.com".to_string()
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
406#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
407#[derive(Default)]
408pub struct SiemConfig {
409 pub enabled: bool,
411 pub protocol: Option<SiemProtocol>,
413 pub destinations: Vec<SiemDestination>,
415 pub filters: Option<EventFilter>,
417}
418
419#[async_trait]
421pub trait SiemTransport: Send + Sync {
422 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error>;
424}
425
426pub struct SyslogTransport {
428 host: String,
429 port: u16,
430 use_tcp: bool,
431 facility: SyslogFacility,
432 tag: String,
433}
434
435impl SyslogTransport {
436 pub fn new(
445 host: String,
446 port: u16,
447 protocol: String,
448 facility: SyslogFacility,
449 tag: String,
450 ) -> Self {
451 Self {
452 host,
453 port,
454 use_tcp: protocol == "tcp",
455 facility,
456 tag,
457 }
458 }
459
460 fn format_syslog_message(&self, event: &SecurityEvent) -> String {
462 let severity: SyslogSeverity = event.severity.into();
463 let priority = (self.facility as u8) * 8 + severity as u8;
464 let timestamp = event.timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ");
465 let hostname = "mockforge"; let app_name = &self.tag;
467 let proc_id = "-";
468 let msg_id = "-";
469 let structured_data = "-"; let msg = event.to_json().unwrap_or_else(|_| "{}".to_string());
471
472 format!(
473 "<{}>1 {} {} {} {} {} {} {}",
474 priority, timestamp, hostname, app_name, proc_id, msg_id, structured_data, msg
475 )
476 }
477}
478
479#[async_trait]
480impl SiemTransport for SyslogTransport {
481 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
482 let message = self.format_syslog_message(event);
483
484 if self.use_tcp {
485 use tokio::net::TcpStream;
487 let addr = format!("{}:{}", self.host, self.port);
488 let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
489 Error::Generic(format!("Failed to connect to syslog server: {}", e))
490 })?;
491 stream
492 .write_all(message.as_bytes())
493 .await
494 .map_err(|e| Error::Generic(format!("Failed to send syslog message: {}", e)))?;
495 } else {
496 use tokio::net::UdpSocket;
498 let socket = UdpSocket::bind("0.0.0.0:0")
499 .await
500 .map_err(|e| Error::Generic(format!("Failed to bind UDP socket: {}", e)))?;
501 let addr = format!("{}:{}", self.host, self.port);
502 socket
503 .send_to(message.as_bytes(), &addr)
504 .await
505 .map_err(|e| Error::Generic(format!("Failed to send UDP syslog message: {}", e)))?;
506 }
507
508 debug!("Sent syslog event: {}", event.event_type);
509 Ok(())
510 }
511}
512
513pub struct HttpTransport {
515 url: String,
516 method: String,
517 headers: HashMap<String, String>,
518 timeout: u64,
519 retry: RetryConfig,
520 client: reqwest::Client,
521}
522
523impl HttpTransport {
524 pub fn new(
533 url: String,
534 method: String,
535 headers: HashMap<String, String>,
536 timeout: u64,
537 retry: RetryConfig,
538 ) -> Self {
539 let client = reqwest::Client::builder()
540 .timeout(std::time::Duration::from_secs(timeout))
541 .build()
542 .expect("Failed to create HTTP client");
543
544 Self {
545 url,
546 method,
547 headers,
548 timeout,
549 retry,
550 client,
551 }
552 }
553}
554
555#[async_trait]
556impl SiemTransport for HttpTransport {
557 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
558 let event_json = event.to_json()?;
559 let mut request = match self.method.as_str() {
560 "POST" => self.client.post(&self.url),
561 "PUT" => self.client.put(&self.url),
562 "PATCH" => self.client.patch(&self.url),
563 _ => return Err(Error::Generic(format!("Unsupported HTTP method: {}", self.method))),
564 };
565
566 for (key, value) in &self.headers {
568 request = request.header(key, value);
569 }
570
571 if !self.headers.contains_key("Content-Type") {
573 request = request.header("Content-Type", "application/json");
574 }
575
576 request = request.body(event_json);
577
578 let mut last_error = None;
580 for attempt in 0..=self.retry.max_attempts {
581 match request.try_clone() {
582 Some(req) => match req.send().await {
583 Ok(response) => {
584 if response.status().is_success() {
585 debug!("Sent HTTP event to {}: {}", self.url, event.event_type);
586 return Ok(());
587 } else {
588 let status = response.status();
589 last_error = Some(Error::Generic(format!("HTTP error: {}", status)));
590 }
591 }
592 Err(e) => {
593 last_error = Some(Error::Generic(format!("HTTP request failed: {}", e)));
594 }
595 },
596 None => {
597 let event_json = event.to_json()?;
599 let mut req = match self.method.as_str() {
600 "POST" => self.client.post(&self.url),
601 "PUT" => self.client.put(&self.url),
602 "PATCH" => self.client.patch(&self.url),
603 _ => break,
604 };
605 for (key, value) in &self.headers {
606 req = req.header(key, value);
607 }
608 if !self.headers.contains_key("Content-Type") {
609 req = req.header("Content-Type", "application/json");
610 }
611 req = req.body(event_json);
612 request = req;
613 continue;
614 }
615 }
616
617 if attempt < self.retry.max_attempts {
618 let delay = if self.retry.backoff == "exponential" {
620 self.retry.initial_delay_secs * (2_u64.pow(attempt))
621 } else {
622 self.retry.initial_delay_secs * (attempt as u64 + 1)
623 };
624 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
625 }
626 }
627
628 Err(last_error.unwrap_or_else(|| {
629 Error::Generic("Failed to send HTTP event after retries".to_string())
630 }))
631 }
632}
633
634pub struct FileTransport {
636 path: PathBuf,
637 format: String,
638 writer: Arc<RwLock<Option<BufWriter<File>>>>,
639}
640
641impl FileTransport {
642 pub async fn new(path: String, format: String) -> Result<Self, Error> {
651 let path = PathBuf::from(path);
652
653 if let Some(parent) = path.parent() {
655 tokio::fs::create_dir_all(parent)
656 .await
657 .map_err(|e| Error::Generic(format!("Failed to create directory: {}", e)))?;
658 }
659
660 let file = OpenOptions::new()
662 .create(true)
663 .append(true)
664 .open(&path)
665 .await
666 .map_err(|e| Error::Generic(format!("Failed to open file: {}", e)))?;
667
668 let writer = Arc::new(RwLock::new(Some(BufWriter::new(file))));
669
670 Ok(Self {
671 path,
672 format,
673 writer,
674 })
675 }
676}
677
678#[async_trait]
679impl SiemTransport for FileTransport {
680 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
681 let mut writer_guard = self.writer.write().await;
682
683 if let Some(ref mut writer) = *writer_guard {
684 let line = if self.format == "jsonl" {
685 format!("{}\n", event.to_json()?)
686 } else {
687 format!("{}\n", event.to_json()?)
689 };
690
691 writer
692 .write_all(line.as_bytes())
693 .await
694 .map_err(|e| Error::Generic(format!("Failed to write to file: {}", e)))?;
695
696 writer
697 .flush()
698 .await
699 .map_err(|e| Error::Generic(format!("Failed to flush file: {}", e)))?;
700
701 debug!("Wrote event to file {}: {}", self.path.display(), event.event_type);
702 Ok(())
703 } else {
704 Err(Error::Generic("File writer not initialized".to_string()))
705 }
706 }
707}
708
709pub struct SplunkTransport {
711 url: String,
712 token: String,
713 index: Option<String>,
714 source_type: Option<String>,
715 retry: RetryConfig,
716 client: reqwest::Client,
717}
718
719impl SplunkTransport {
720 pub fn new(
722 url: String,
723 token: String,
724 index: Option<String>,
725 source_type: Option<String>,
726 retry: RetryConfig,
727 ) -> Self {
728 let client = reqwest::Client::builder()
729 .timeout(std::time::Duration::from_secs(10))
730 .build()
731 .expect("Failed to create HTTP client");
732
733 Self {
734 url,
735 token,
736 index,
737 source_type,
738 retry,
739 client,
740 }
741 }
742
743 fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
745 let mut splunk_event = serde_json::json!({
746 "event": event.to_json()?,
747 "time": event.timestamp.timestamp(),
748 });
749
750 if let Some(ref index) = self.index {
751 splunk_event["index"] = serde_json::Value::String(index.clone());
752 }
753
754 if let Some(ref st) = self.source_type {
755 splunk_event["sourcetype"] = serde_json::Value::String(st.clone());
756 } else {
757 splunk_event["sourcetype"] = serde_json::Value::String("mockforge:security".to_string());
758 }
759
760 Ok(splunk_event)
761 }
762}
763
764#[async_trait]
765impl SiemTransport for SplunkTransport {
766 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
767 let splunk_event = self.format_event(event)?;
768 let url = format!("{}/services/collector/event", self.url.trim_end_matches('/'));
769
770 let mut last_error = None;
771 for attempt in 0..=self.retry.max_attempts {
772 match self
773 .client
774 .post(&url)
775 .header("Authorization", format!("Splunk {}", self.token))
776 .header("Content-Type", "application/json")
777 .json(&splunk_event)
778 .send()
779 .await
780 {
781 Ok(response) => {
782 if response.status().is_success() {
783 debug!("Sent Splunk event: {}", event.event_type);
784 return Ok(());
785 } else {
786 let status = response.status();
787 let body = response.text().await.unwrap_or_default();
788 last_error = Some(Error::Generic(format!(
789 "Splunk HTTP error {}: {}",
790 status, body
791 )));
792 }
793 }
794 Err(e) => {
795 last_error = Some(Error::Generic(format!("Splunk request failed: {}", e)));
796 }
797 }
798
799 if attempt < self.retry.max_attempts {
800 let delay = if self.retry.backoff == "exponential" {
801 self.retry.initial_delay_secs * (2_u64.pow(attempt))
802 } else {
803 self.retry.initial_delay_secs * (attempt as u64 + 1)
804 };
805 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
806 }
807 }
808
809 Err(last_error.unwrap_or_else(|| {
810 Error::Generic("Failed to send Splunk event after retries".to_string())
811 }))
812 }
813}
814
815pub struct DatadogTransport {
817 api_key: String,
818 app_key: Option<String>,
819 site: String,
820 tags: Vec<String>,
821 retry: RetryConfig,
822 client: reqwest::Client,
823}
824
825impl DatadogTransport {
826 pub fn new(
828 api_key: String,
829 app_key: Option<String>,
830 site: String,
831 tags: Vec<String>,
832 retry: RetryConfig,
833 ) -> Self {
834 let client = reqwest::Client::builder()
835 .timeout(std::time::Duration::from_secs(10))
836 .build()
837 .expect("Failed to create HTTP client");
838
839 Self {
840 api_key,
841 app_key,
842 site,
843 tags,
844 retry,
845 client,
846 }
847 }
848
849 fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
851 let mut tags = self.tags.clone();
852 tags.push(format!("event_type:{}", event.event_type));
853 tags.push(format!("severity:{}", format!("{:?}", event.severity).to_lowercase()));
854
855 let datadog_event = serde_json::json!({
856 "title": format!("MockForge Security Event: {}", event.event_type),
857 "text": event.to_json()?,
858 "alert_type": match event.severity {
859 crate::security::events::SecurityEventSeverity::Critical => "error",
860 crate::security::events::SecurityEventSeverity::High => "warning",
861 crate::security::events::SecurityEventSeverity::Medium => "info",
862 crate::security::events::SecurityEventSeverity::Low => "info",
863 },
864 "tags": tags,
865 "date_happened": event.timestamp.timestamp(),
866 });
867
868 Ok(datadog_event)
869 }
870}
871
872#[async_trait]
873impl SiemTransport for DatadogTransport {
874 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
875 let datadog_event = self.format_event(event)?;
876 let url = format!("https://api.{}/api/v1/events", self.site);
877
878 let mut last_error = None;
879 for attempt in 0..=self.retry.max_attempts {
880 let mut request = self
881 .client
882 .post(&url)
883 .header("DD-API-KEY", &self.api_key)
884 .json(&datadog_event);
885
886 if let Some(ref app_key) = self.app_key {
887 request = request.header("DD-APPLICATION-KEY", app_key);
888 }
889
890 match request.send().await {
891 Ok(response) => {
892 if response.status().is_success() {
893 debug!("Sent Datadog event: {}", event.event_type);
894 return Ok(());
895 } else {
896 let status = response.status();
897 let body = response.text().await.unwrap_or_default();
898 last_error = Some(Error::Generic(format!(
899 "Datadog HTTP error {}: {}",
900 status, body
901 )));
902 }
903 }
904 Err(e) => {
905 last_error = Some(Error::Generic(format!("Datadog request failed: {}", e)));
906 }
907 }
908
909 if attempt < self.retry.max_attempts {
910 let delay = if self.retry.backoff == "exponential" {
911 self.retry.initial_delay_secs * (2_u64.pow(attempt))
912 } else {
913 self.retry.initial_delay_secs * (attempt as u64 + 1)
914 };
915 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
916 }
917 }
918
919 Err(last_error.unwrap_or_else(|| {
920 Error::Generic("Failed to send Datadog event after retries".to_string())
921 }))
922 }
923}
924
925pub struct CloudwatchTransport {
927 region: String,
928 log_group: String,
929 stream: String,
930 credentials: HashMap<String, String>,
931 retry: RetryConfig,
932 client: reqwest::Client,
933}
934
935impl CloudwatchTransport {
936 pub fn new(
938 region: String,
939 log_group: String,
940 stream: String,
941 credentials: HashMap<String, String>,
942 retry: RetryConfig,
943 ) -> Self {
944 let client = reqwest::Client::builder()
945 .timeout(std::time::Duration::from_secs(10))
946 .build()
947 .expect("Failed to create HTTP client");
948
949 Self {
950 region,
951 log_group,
952 stream,
953 credentials,
954 retry,
955 client,
956 }
957 }
958}
959
960#[async_trait]
961impl SiemTransport for CloudwatchTransport {
962 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
963 warn!(
967 "CloudWatch transport requires AWS SDK for proper implementation. \
968 Using HTTP API fallback (may require additional AWS configuration)"
969 );
970
971 let event_json = event.to_json()?;
972 let _log_events = serde_json::json!([{
973 "timestamp": event.timestamp.timestamp_millis(),
974 "message": event_json
975 }]);
976
977 debug!(
980 "CloudWatch event prepared for log_group={}, stream={}: {}",
981 self.log_group, self.stream, event.event_type
982 );
983
984 Ok(())
986 }
987}
988
989pub struct GcpTransport {
991 project_id: String,
992 log_name: String,
993 credentials_path: String,
994 retry: RetryConfig,
995 client: reqwest::Client,
996}
997
998impl GcpTransport {
999 pub fn new(
1001 project_id: String,
1002 log_name: String,
1003 credentials_path: String,
1004 retry: RetryConfig,
1005 ) -> Self {
1006 let client = reqwest::Client::builder()
1007 .timeout(std::time::Duration::from_secs(10))
1008 .build()
1009 .expect("Failed to create HTTP client");
1010
1011 Self {
1012 project_id,
1013 log_name,
1014 credentials_path,
1015 retry,
1016 client,
1017 }
1018 }
1019}
1020
1021#[async_trait]
1022impl SiemTransport for GcpTransport {
1023 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
1024 warn!(
1028 "GCP transport requires google-cloud-logging for proper implementation. \
1029 Using HTTP API fallback (may require additional GCP configuration)"
1030 );
1031
1032 let event_json = event.to_json()?;
1033 let _log_entry = serde_json::json!({
1034 "logName": format!("projects/{}/logs/{}", self.project_id, self.log_name),
1035 "resource": {
1036 "type": "global"
1037 },
1038 "timestamp": event.timestamp.to_rfc3339(),
1039 "jsonPayload": serde_json::from_str::<serde_json::Value>(&event_json)
1040 .unwrap_or_else(|_| serde_json::json!({"message": event_json}))
1041 });
1042
1043 debug!(
1046 "GCP event prepared for project={}, log={}: {}",
1047 self.project_id, self.log_name, event.event_type
1048 );
1049
1050 Ok(())
1052 }
1053}
1054
1055pub struct AzureTransport {
1057 workspace_id: String,
1058 shared_key: String,
1059 log_type: String,
1060 retry: RetryConfig,
1061 client: reqwest::Client,
1062}
1063
1064impl AzureTransport {
1065 pub fn new(
1067 workspace_id: String,
1068 shared_key: String,
1069 log_type: String,
1070 retry: RetryConfig,
1071 ) -> Self {
1072 let client = reqwest::Client::builder()
1073 .timeout(std::time::Duration::from_secs(10))
1074 .build()
1075 .expect("Failed to create HTTP client");
1076
1077 Self {
1078 workspace_id,
1079 shared_key,
1080 log_type,
1081 retry,
1082 client,
1083 }
1084 }
1085
1086 fn generate_signature(
1088 &self,
1089 date: &str,
1090 content_length: usize,
1091 method: &str,
1092 content_type: &str,
1093 resource: &str,
1094 ) -> String {
1095 use hmac::{Hmac, Mac};
1096 use sha2::Sha256;
1097
1098 type HmacSha256 = Hmac<Sha256>;
1099
1100 let string_to_sign = format!(
1101 "{}\n{}\n{}\n{}\n{}",
1102 method, content_length, content_type, date, resource
1103 );
1104
1105 let mut mac = HmacSha256::new_from_slice(
1106 base64::decode(&self.shared_key)
1107 .unwrap_or_default()
1108 .as_slice(),
1109 )
1110 .expect("HMAC can take key of any size");
1111
1112 mac.update(string_to_sign.as_bytes());
1113 let result = mac.finalize();
1114 base64::encode(result.into_bytes())
1115 }
1116}
1117
1118#[async_trait]
1119impl SiemTransport for AzureTransport {
1120 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
1121 let event_json = event.to_json()?;
1122 let url = format!(
1123 "https://{}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01",
1124 self.workspace_id
1125 );
1126
1127 let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
1128 let content_type = "application/json";
1129 let content_length = event_json.len();
1130 let method = "POST";
1131 let resource = format!("/api/logs?api-version=2016-04-01");
1132
1133 let signature = self.generate_signature(
1134 &date,
1135 content_length,
1136 method,
1137 content_type,
1138 &resource,
1139 );
1140
1141 let mut last_error = None;
1142 for attempt in 0..=self.retry.max_attempts {
1143 let log_entry = serde_json::json!({
1144 "log_type": self.log_type,
1145 "time_generated": event.timestamp.to_rfc3339(),
1146 "data": serde_json::from_str::<serde_json::Value>(&event_json)
1147 .unwrap_or_else(|_| serde_json::json!({"message": event_json}))
1148 });
1149
1150 match self
1151 .client
1152 .post(&url)
1153 .header("x-ms-date", &date)
1154 .header("Content-Type", content_type)
1155 .header("Authorization", format!("SharedKey {}:{}", self.workspace_id, signature))
1156 .header("Log-Type", &self.log_type)
1157 .header("time-generated-field", "time_generated")
1158 .body(serde_json::to_string(&log_entry)?)
1159 .send()
1160 .await
1161 {
1162 Ok(response) => {
1163 if response.status().is_success() {
1164 debug!("Sent Azure Monitor event: {}", event.event_type);
1165 return Ok(());
1166 } else {
1167 let status = response.status();
1168 let body = response.text().await.unwrap_or_default();
1169 last_error = Some(Error::Generic(format!(
1170 "Azure Monitor HTTP error {}: {}",
1171 status, body
1172 )));
1173 }
1174 }
1175 Err(e) => {
1176 last_error = Some(Error::Generic(format!("Azure Monitor request failed: {}", e)));
1177 }
1178 }
1179
1180 if attempt < self.retry.max_attempts {
1181 let delay = if self.retry.backoff == "exponential" {
1182 self.retry.initial_delay_secs * (2_u64.pow(attempt))
1183 } else {
1184 self.retry.initial_delay_secs * (attempt as u64 + 1)
1185 };
1186 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
1187 }
1188 }
1189
1190 Err(last_error.unwrap_or_else(|| {
1191 Error::Generic("Failed to send Azure Monitor event after retries".to_string())
1192 }))
1193 }
1194}
1195
1196#[derive(Debug, Clone, Serialize, Deserialize)]
1198pub struct TransportHealth {
1199 pub identifier: String,
1201 pub healthy: bool,
1203 pub last_success: Option<chrono::DateTime<chrono::Utc>>,
1205 pub last_error: Option<String>,
1207 pub success_count: u64,
1209 pub failure_count: u64,
1211}
1212
1213pub struct SiemEmitter {
1215 transports: Vec<Box<dyn SiemTransport>>,
1216 filters: Option<EventFilter>,
1217 health_status: Arc<RwLock<Vec<TransportHealth>>>,
1219}
1220
1221impl SiemEmitter {
1222 pub async fn from_config(config: SiemConfig) -> Result<Self, Error> {
1224 if !config.enabled {
1225 return Ok(Self {
1226 transports: Vec::new(),
1227 filters: config.filters,
1228 health_status: Arc::new(RwLock::new(Vec::new())),
1229 });
1230 }
1231
1232 let mut transports: Vec<Box<dyn SiemTransport>> = Vec::new();
1233
1234 for dest in config.destinations {
1235 let transport: Box<dyn SiemTransport> = match dest {
1236 SiemDestination::Syslog {
1237 host,
1238 port,
1239 transport,
1240 facility,
1241 tag,
1242 } => Box::new(SyslogTransport::new(host, port, transport, facility, tag)),
1243 SiemDestination::Http {
1244 url,
1245 method,
1246 headers,
1247 timeout,
1248 retry,
1249 } => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
1250 SiemDestination::Https {
1251 url,
1252 method,
1253 headers,
1254 timeout,
1255 retry,
1256 } => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
1257 SiemDestination::File { path, format, .. } => {
1258 Box::new(FileTransport::new(path, format).await?)
1259 }
1260 SiemDestination::Splunk {
1261 url,
1262 token,
1263 index,
1264 source_type,
1265 } => Box::new(SplunkTransport::new(
1266 url,
1267 token,
1268 index,
1269 source_type,
1270 RetryConfig::default(),
1271 )),
1272 SiemDestination::Datadog {
1273 api_key,
1274 app_key,
1275 site,
1276 tags,
1277 } => Box::new(DatadogTransport::new(
1278 api_key,
1279 app_key,
1280 site,
1281 tags,
1282 RetryConfig::default(),
1283 )),
1284 SiemDestination::Cloudwatch {
1285 region,
1286 log_group,
1287 stream,
1288 credentials,
1289 } => Box::new(CloudwatchTransport::new(
1290 region,
1291 log_group,
1292 stream,
1293 credentials,
1294 RetryConfig::default(),
1295 )),
1296 SiemDestination::Gcp {
1297 project_id,
1298 log_name,
1299 credentials_path,
1300 } => Box::new(GcpTransport::new(
1301 project_id,
1302 log_name,
1303 credentials_path,
1304 RetryConfig::default(),
1305 )),
1306 SiemDestination::Azure {
1307 workspace_id,
1308 shared_key,
1309 log_type,
1310 } => Box::new(AzureTransport::new(
1311 workspace_id,
1312 shared_key,
1313 log_type,
1314 RetryConfig::default(),
1315 )),
1316 };
1317 transports.push(transport);
1318 }
1319
1320 let health_status = Arc::new(RwLock::new(
1321 transports
1322 .iter()
1323 .enumerate()
1324 .map(|(i, _)| TransportHealth {
1325 identifier: format!("transport_{}", i),
1326 healthy: true,
1327 last_success: None,
1328 last_error: None,
1329 success_count: 0,
1330 failure_count: 0,
1331 })
1332 .collect(),
1333 ));
1334
1335 Ok(Self {
1336 transports,
1337 filters: config.filters,
1338 health_status,
1339 })
1340 }
1341
1342 pub async fn emit(&self, event: SecurityEvent) -> Result<(), Error> {
1344 if let Some(ref filter) = self.filters {
1346 if !filter.should_include(&event) {
1347 debug!("Event filtered out: {}", event.event_type);
1348 return Ok(());
1349 }
1350 }
1351
1352 let mut errors = Vec::new();
1354 let mut health_status = self.health_status.write().await;
1355
1356 for (idx, transport) in self.transports.iter().enumerate() {
1357 match transport.send_event(&event).await {
1358 Ok(()) => {
1359 if let Some(health) = health_status.get_mut(idx) {
1360 health.healthy = true;
1361 health.last_success = Some(chrono::Utc::now());
1362 health.success_count += 1;
1363 health.last_error = None;
1364 }
1365 }
1366 Err(e) => {
1367 let error_msg = format!("{}", e);
1368 error!("Failed to send event to SIEM: {}", error_msg);
1369 errors.push(Error::Generic(error_msg.clone()));
1370 if let Some(health) = health_status.get_mut(idx) {
1371 health.healthy = false;
1372 health.failure_count += 1;
1373 health.last_error = Some(error_msg);
1374 }
1375 }
1376 }
1377 }
1378
1379 drop(health_status);
1380
1381 if !errors.is_empty() && errors.len() == self.transports.len() {
1382 return Err(Error::Generic(format!(
1384 "All SIEM transports failed: {} errors",
1385 errors.len()
1386 )));
1387 }
1388
1389 Ok(())
1390 }
1391
1392 pub async fn health_status(&self) -> Vec<TransportHealth> {
1394 self.health_status.read().await.clone()
1395 }
1396
1397 pub async fn is_healthy(&self) -> bool {
1399 let health_status = self.health_status.read().await;
1400 health_status.iter().any(|h| h.healthy)
1401 }
1402
1403 pub async fn health_summary(&self) -> (usize, usize, usize) {
1405 let health_status = self.health_status.read().await;
1406 let total = health_status.len();
1407 let healthy = health_status.iter().filter(|h| h.healthy).count();
1408 let unhealthy = total - healthy;
1409 (total, healthy, unhealthy)
1410 }
1411}
1412
1413#[cfg(test)]
1414mod tests {
1415 use super::*;
1416 use crate::security::events::{EventActor, EventOutcome, EventTarget, SecurityEventType};
1417
1418 #[test]
1419 fn test_event_filter_include() {
1420 let filter = EventFilter {
1421 include: Some(vec!["auth.*".to_string()]),
1422 exclude: None,
1423 conditions: None,
1424 };
1425
1426 let event =
1427 crate::security::events::SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1428
1429 assert!(filter.should_include(&event));
1430
1431 let event = crate::security::events::SecurityEvent::new(
1432 SecurityEventType::ConfigChanged,
1433 None,
1434 None,
1435 );
1436
1437 assert!(!filter.should_include(&event));
1438 }
1439
1440 #[test]
1441 fn test_event_filter_exclude() {
1442 let filter = EventFilter {
1443 include: None,
1444 exclude: Some(vec!["severity:low".to_string()]),
1445 conditions: None,
1446 };
1447
1448 let event =
1449 crate::security::events::SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1450
1451 assert!(!filter.should_include(&event));
1452
1453 let event =
1454 crate::security::events::SecurityEvent::new(SecurityEventType::AuthFailure, None, None);
1455
1456 assert!(filter.should_include(&event));
1457 }
1458
1459 #[tokio::test]
1460 async fn test_syslog_transport_format() {
1461 let transport = SyslogTransport::new(
1462 "localhost".to_string(),
1463 514,
1464 "udp".to_string(),
1465 SyslogFacility::Local0,
1466 "mockforge".to_string(),
1467 );
1468
1469 let event =
1470 crate::security::events::SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1471
1472 let message = transport.format_syslog_message(&event);
1473 assert!(message.starts_with("<"));
1474 assert!(message.contains("mockforge"));
1475 }
1476}