1use crate::security::events::SecurityEvent;
8use crate::Error;
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use tokio::fs::{File, OpenOptions};
15use tokio::io::{AsyncWriteExt, BufWriter};
16use tokio::sync::RwLock;
17use tracing::{debug, error, warn};
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
22#[serde(rename_all = "lowercase")]
23pub enum SiemProtocol {
24 Syslog,
26 Http,
28 Https,
30 File,
32 Splunk,
34 Datadog,
36 Cloudwatch,
38 Gcp,
40 Azure,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
47#[serde(rename_all = "lowercase")]
48#[derive(Default)]
49pub enum SyslogFacility {
50 Kernel = 0,
52 User = 1,
54 Mail = 2,
56 Daemon = 3,
58 Security = 4,
60 Syslogd = 5,
62 LinePrinter = 6,
64 NetworkNews = 7,
66 Uucp = 8,
68 Clock = 9,
70 Security2 = 10,
72 Ftp = 11,
74 Ntp = 12,
76 LogAudit = 13,
78 LogAlert = 14,
80 #[default]
82 Local0 = 16,
83 Local1 = 17,
85 Local2 = 18,
87 Local3 = 19,
89 Local4 = 20,
91 Local5 = 21,
93 Local6 = 22,
95 Local7 = 23,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum SyslogSeverity {
102 Emergency = 0,
104 Alert = 1,
106 Critical = 2,
108 Error = 3,
110 Warning = 4,
112 Notice = 5,
114 Informational = 6,
116 Debug = 7,
118}
119
120impl From<crate::security::events::SecurityEventSeverity> for SyslogSeverity {
121 fn from(severity: crate::security::events::SecurityEventSeverity) -> Self {
122 match severity {
123 crate::security::events::SecurityEventSeverity::Low => SyslogSeverity::Informational,
124 crate::security::events::SecurityEventSeverity::Medium => SyslogSeverity::Warning,
125 crate::security::events::SecurityEventSeverity::High => SyslogSeverity::Error,
126 crate::security::events::SecurityEventSeverity::Critical => SyslogSeverity::Critical,
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
134pub struct RetryConfig {
135 pub max_attempts: u32,
137 #[serde(default = "default_backoff")]
139 pub backoff: String,
140 #[serde(default = "default_initial_delay")]
142 pub initial_delay_secs: u64,
143}
144
145fn default_backoff() -> String {
146 "exponential".to_string()
147}
148
149fn default_initial_delay() -> u64 {
150 1
151}
152
153impl Default for RetryConfig {
154 fn default() -> Self {
155 Self {
156 max_attempts: 3,
157 backoff: "exponential".to_string(),
158 initial_delay_secs: 1,
159 }
160 }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
166pub struct FileRotationConfig {
167 pub max_size: String,
169 pub max_files: u32,
171 #[serde(default)]
173 pub compress: bool,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
179pub struct EventFilter {
180 pub include: Option<Vec<String>>,
182 pub exclude: Option<Vec<String>>,
184 pub conditions: Option<Vec<String>>,
186}
187
188impl EventFilter {
189 pub fn should_include(&self, event: &SecurityEvent) -> bool {
191 if let Some(ref includes) = self.include {
193 let mut matched = false;
194 for pattern in includes {
195 if self.matches_pattern(&event.event_type, pattern) {
196 matched = true;
197 break;
198 }
199 }
200 if !matched {
201 return false;
202 }
203 }
204
205 if let Some(ref excludes) = self.exclude {
207 for pattern in excludes {
208 if pattern.starts_with("severity:") {
209 let severity_str = pattern.strip_prefix("severity:").unwrap_or("");
210 if severity_str == "low"
211 && event.severity == crate::security::events::SecurityEventSeverity::Low
212 {
213 return false;
214 }
215 if severity_str == "medium"
216 && event.severity == crate::security::events::SecurityEventSeverity::Medium
217 {
218 return false;
219 }
220 if severity_str == "high"
221 && event.severity == crate::security::events::SecurityEventSeverity::High
222 {
223 return false;
224 }
225 if severity_str == "critical"
226 && event.severity
227 == crate::security::events::SecurityEventSeverity::Critical
228 {
229 return false;
230 }
231 } else if self.matches_pattern(&event.event_type, pattern) {
232 return false;
233 }
234 }
235 }
236
237 true
238 }
239
240 fn matches_pattern(&self, event_type: &str, pattern: &str) -> bool {
241 if pattern.ends_with(".*") {
243 let prefix = pattern.strip_suffix(".*").unwrap_or("");
244 event_type.starts_with(prefix)
245 } else {
246 event_type == pattern
247 }
248 }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
254#[serde(tag = "protocol")]
255pub enum SiemDestination {
256 #[serde(rename = "syslog")]
258 Syslog {
259 host: String,
261 port: u16,
263 #[serde(default = "default_syslog_protocol", rename = "transport")]
265 transport: String,
266 #[serde(default)]
268 facility: SyslogFacility,
269 #[serde(default = "default_tag")]
271 tag: String,
272 },
273 #[serde(rename = "http")]
275 Http {
276 url: String,
278 #[serde(default = "default_http_method")]
280 method: String,
281 #[serde(default)]
283 headers: HashMap<String, String>,
284 #[serde(default = "default_timeout")]
286 timeout: u64,
287 #[serde(default)]
289 retry: RetryConfig,
290 },
291 #[serde(rename = "https")]
293 Https {
294 url: String,
296 #[serde(default = "default_http_method")]
298 method: String,
299 #[serde(default)]
301 headers: HashMap<String, String>,
302 #[serde(default = "default_timeout")]
304 timeout: u64,
305 #[serde(default)]
307 retry: RetryConfig,
308 },
309 #[serde(rename = "file")]
311 File {
312 path: String,
314 #[serde(default = "default_file_format")]
316 format: String,
317 rotation: Option<FileRotationConfig>,
319 },
320 #[serde(rename = "splunk")]
322 Splunk {
323 url: String,
325 token: String,
327 index: Option<String>,
329 source_type: Option<String>,
331 },
332 #[serde(rename = "datadog")]
334 Datadog {
335 api_key: String,
337 app_key: Option<String>,
339 #[serde(default = "default_datadog_site")]
341 site: String,
342 #[serde(default)]
344 tags: Vec<String>,
345 },
346 #[serde(rename = "cloudwatch")]
348 Cloudwatch {
349 region: String,
351 log_group: String,
353 stream: String,
355 credentials: HashMap<String, String>,
357 },
358 #[serde(rename = "gcp")]
360 Gcp {
361 project_id: String,
363 log_name: String,
365 credentials_path: String,
367 },
368 #[serde(rename = "azure")]
370 Azure {
371 workspace_id: String,
373 shared_key: String,
375 log_type: String,
377 },
378}
379
380fn default_syslog_protocol() -> String {
381 "udp".to_string()
382}
383
384fn default_tag() -> String {
385 "mockforge".to_string()
386}
387
388fn default_http_method() -> String {
389 "POST".to_string()
390}
391
392fn default_timeout() -> u64 {
393 5
394}
395
396fn default_file_format() -> String {
397 "jsonl".to_string()
398}
399
400fn default_datadog_site() -> String {
401 "datadoghq.com".to_string()
402}
403
404#[derive(Debug, Clone, Serialize, Deserialize)]
406#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
407#[derive(Default)]
408pub struct SiemConfig {
409 pub enabled: bool,
411 pub protocol: Option<SiemProtocol>,
413 pub destinations: Vec<SiemDestination>,
415 pub filters: Option<EventFilter>,
417}
418
419#[async_trait]
421pub trait SiemTransport: Send + Sync {
422 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error>;
424}
425
426pub struct SyslogTransport {
428 host: String,
429 port: u16,
430 use_tcp: bool,
431 facility: SyslogFacility,
432 tag: String,
433}
434
435impl SyslogTransport {
436 pub fn new(
445 host: String,
446 port: u16,
447 protocol: String,
448 facility: SyslogFacility,
449 tag: String,
450 ) -> Self {
451 Self {
452 host,
453 port,
454 use_tcp: protocol == "tcp",
455 facility,
456 tag,
457 }
458 }
459
460 fn format_syslog_message(&self, event: &SecurityEvent) -> String {
462 let severity: SyslogSeverity = event.severity.into();
463 let priority = (self.facility as u8) * 8 + severity as u8;
464 let timestamp = event.timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ");
465 let hostname = "mockforge"; let app_name = &self.tag;
467 let proc_id = "-";
468 let msg_id = "-";
469 let structured_data = "-"; let msg = event.to_json().unwrap_or_else(|_| "{}".to_string());
471
472 format!(
473 "<{}>1 {} {} {} {} {} {} {}",
474 priority, timestamp, hostname, app_name, proc_id, msg_id, structured_data, msg
475 )
476 }
477}
478
479#[async_trait]
480impl SiemTransport for SyslogTransport {
481 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
482 let message = self.format_syslog_message(event);
483
484 if self.use_tcp {
485 use tokio::net::TcpStream;
487 let addr = format!("{}:{}", self.host, self.port);
488 let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
489 Error::Generic(format!("Failed to connect to syslog server: {}", e))
490 })?;
491 stream
492 .write_all(message.as_bytes())
493 .await
494 .map_err(|e| Error::Generic(format!("Failed to send syslog message: {}", e)))?;
495 } else {
496 use tokio::net::UdpSocket;
498 let socket = UdpSocket::bind("0.0.0.0:0")
499 .await
500 .map_err(|e| Error::Generic(format!("Failed to bind UDP socket: {}", e)))?;
501 let addr = format!("{}:{}", self.host, self.port);
502 socket
503 .send_to(message.as_bytes(), &addr)
504 .await
505 .map_err(|e| Error::Generic(format!("Failed to send UDP syslog message: {}", e)))?;
506 }
507
508 debug!("Sent syslog event: {}", event.event_type);
509 Ok(())
510 }
511}
512
513pub struct HttpTransport {
515 url: String,
516 method: String,
517 headers: HashMap<String, String>,
518 #[allow(dead_code)]
519 timeout: u64,
520 retry: RetryConfig,
521 client: reqwest::Client,
522}
523
524impl HttpTransport {
525 pub fn new(
534 url: String,
535 method: String,
536 headers: HashMap<String, String>,
537 timeout: u64,
538 retry: RetryConfig,
539 ) -> Self {
540 let client = reqwest::Client::builder()
541 .timeout(std::time::Duration::from_secs(timeout))
542 .build()
543 .expect("Failed to create HTTP client");
544
545 Self {
546 url,
547 method,
548 headers,
549 timeout,
550 retry,
551 client,
552 }
553 }
554}
555
556#[async_trait]
557impl SiemTransport for HttpTransport {
558 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
559 let event_json = event.to_json()?;
560 let mut request = match self.method.as_str() {
561 "POST" => self.client.post(&self.url),
562 "PUT" => self.client.put(&self.url),
563 "PATCH" => self.client.patch(&self.url),
564 _ => return Err(Error::Generic(format!("Unsupported HTTP method: {}", self.method))),
565 };
566
567 for (key, value) in &self.headers {
569 request = request.header(key, value);
570 }
571
572 if !self.headers.contains_key("Content-Type") {
574 request = request.header("Content-Type", "application/json");
575 }
576
577 request = request.body(event_json);
578
579 let mut last_error = None;
581 for attempt in 0..=self.retry.max_attempts {
582 match request.try_clone() {
583 Some(req) => match req.send().await {
584 Ok(response) => {
585 if response.status().is_success() {
586 debug!("Sent HTTP event to {}: {}", self.url, event.event_type);
587 return Ok(());
588 } else {
589 let status = response.status();
590 last_error = Some(Error::Generic(format!("HTTP error: {}", status)));
591 }
592 }
593 Err(e) => {
594 last_error = Some(Error::Generic(format!("HTTP request failed: {}", e)));
595 }
596 },
597 None => {
598 let event_json = event.to_json()?;
600 let mut req = match self.method.as_str() {
601 "POST" => self.client.post(&self.url),
602 "PUT" => self.client.put(&self.url),
603 "PATCH" => self.client.patch(&self.url),
604 _ => break,
605 };
606 for (key, value) in &self.headers {
607 req = req.header(key, value);
608 }
609 if !self.headers.contains_key("Content-Type") {
610 req = req.header("Content-Type", "application/json");
611 }
612 req = req.body(event_json);
613 request = req;
614 continue;
615 }
616 }
617
618 if attempt < self.retry.max_attempts {
619 let delay = if self.retry.backoff == "exponential" {
621 self.retry.initial_delay_secs * (2_u64.pow(attempt))
622 } else {
623 self.retry.initial_delay_secs * (attempt as u64 + 1)
624 };
625 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
626 }
627 }
628
629 Err(last_error.unwrap_or_else(|| {
630 Error::Generic("Failed to send HTTP event after retries".to_string())
631 }))
632 }
633}
634
635pub struct FileTransport {
637 path: PathBuf,
638 format: String,
639 writer: Arc<RwLock<Option<BufWriter<File>>>>,
640}
641
642impl FileTransport {
643 pub async fn new(path: String, format: String) -> Result<Self, Error> {
652 let path = PathBuf::from(path);
653
654 if let Some(parent) = path.parent() {
656 tokio::fs::create_dir_all(parent)
657 .await
658 .map_err(|e| Error::Generic(format!("Failed to create directory: {}", e)))?;
659 }
660
661 let file = OpenOptions::new()
663 .create(true)
664 .append(true)
665 .open(&path)
666 .await
667 .map_err(|e| Error::Generic(format!("Failed to open file: {}", e)))?;
668
669 let writer = Arc::new(RwLock::new(Some(BufWriter::new(file))));
670
671 Ok(Self {
672 path,
673 format,
674 writer,
675 })
676 }
677}
678
679#[async_trait]
680impl SiemTransport for FileTransport {
681 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
682 let mut writer_guard = self.writer.write().await;
683
684 if let Some(ref mut writer) = *writer_guard {
685 let line = if self.format == "jsonl" {
686 format!("{}\n", event.to_json()?)
687 } else {
688 format!("{}\n", event.to_json()?)
690 };
691
692 writer
693 .write_all(line.as_bytes())
694 .await
695 .map_err(|e| Error::Generic(format!("Failed to write to file: {}", e)))?;
696
697 writer
698 .flush()
699 .await
700 .map_err(|e| Error::Generic(format!("Failed to flush file: {}", e)))?;
701
702 debug!("Wrote event to file {}: {}", self.path.display(), event.event_type);
703 Ok(())
704 } else {
705 Err(Error::Generic("File writer not initialized".to_string()))
706 }
707 }
708}
709
710pub struct SplunkTransport {
712 url: String,
713 token: String,
714 index: Option<String>,
715 source_type: Option<String>,
716 retry: RetryConfig,
717 client: reqwest::Client,
718}
719
720impl SplunkTransport {
721 pub fn new(
723 url: String,
724 token: String,
725 index: Option<String>,
726 source_type: Option<String>,
727 retry: RetryConfig,
728 ) -> Self {
729 let client = reqwest::Client::builder()
730 .timeout(std::time::Duration::from_secs(10))
731 .build()
732 .expect("Failed to create HTTP client");
733
734 Self {
735 url,
736 token,
737 index,
738 source_type,
739 retry,
740 client,
741 }
742 }
743
744 fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
746 let mut splunk_event = serde_json::json!({
747 "event": event.to_json()?,
748 "time": event.timestamp.timestamp(),
749 });
750
751 if let Some(ref index) = self.index {
752 splunk_event["index"] = serde_json::Value::String(index.clone());
753 }
754
755 if let Some(ref st) = self.source_type {
756 splunk_event["sourcetype"] = serde_json::Value::String(st.clone());
757 } else {
758 splunk_event["sourcetype"] =
759 serde_json::Value::String("mockforge:security".to_string());
760 }
761
762 Ok(splunk_event)
763 }
764}
765
766#[async_trait]
767impl SiemTransport for SplunkTransport {
768 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
769 let splunk_event = self.format_event(event)?;
770 let url = format!("{}/services/collector/event", self.url.trim_end_matches('/'));
771
772 let mut last_error = None;
773 for attempt in 0..=self.retry.max_attempts {
774 match self
775 .client
776 .post(&url)
777 .header("Authorization", format!("Splunk {}", self.token))
778 .header("Content-Type", "application/json")
779 .json(&splunk_event)
780 .send()
781 .await
782 {
783 Ok(response) => {
784 if response.status().is_success() {
785 debug!("Sent Splunk event: {}", event.event_type);
786 return Ok(());
787 } else {
788 let status = response.status();
789 let body = response.text().await.unwrap_or_default();
790 last_error =
791 Some(Error::Generic(format!("Splunk HTTP error {}: {}", status, body)));
792 }
793 }
794 Err(e) => {
795 last_error = Some(Error::Generic(format!("Splunk request failed: {}", e)));
796 }
797 }
798
799 if attempt < self.retry.max_attempts {
800 let delay = if self.retry.backoff == "exponential" {
801 self.retry.initial_delay_secs * (2_u64.pow(attempt))
802 } else {
803 self.retry.initial_delay_secs * (attempt as u64 + 1)
804 };
805 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
806 }
807 }
808
809 Err(last_error.unwrap_or_else(|| {
810 Error::Generic("Failed to send Splunk event after retries".to_string())
811 }))
812 }
813}
814
815pub struct DatadogTransport {
817 api_key: String,
818 app_key: Option<String>,
819 site: String,
820 tags: Vec<String>,
821 retry: RetryConfig,
822 client: reqwest::Client,
823}
824
825impl DatadogTransport {
826 pub fn new(
828 api_key: String,
829 app_key: Option<String>,
830 site: String,
831 tags: Vec<String>,
832 retry: RetryConfig,
833 ) -> Self {
834 let client = reqwest::Client::builder()
835 .timeout(std::time::Duration::from_secs(10))
836 .build()
837 .expect("Failed to create HTTP client");
838
839 Self {
840 api_key,
841 app_key,
842 site,
843 tags,
844 retry,
845 client,
846 }
847 }
848
849 fn format_event(&self, event: &SecurityEvent) -> Result<serde_json::Value, Error> {
851 let mut tags = self.tags.clone();
852 tags.push(format!("event_type:{}", event.event_type));
853 tags.push(format!("severity:{}", format!("{:?}", event.severity).to_lowercase()));
854
855 let datadog_event = serde_json::json!({
856 "title": format!("MockForge Security Event: {}", event.event_type),
857 "text": event.to_json()?,
858 "alert_type": match event.severity {
859 crate::security::events::SecurityEventSeverity::Critical => "error",
860 crate::security::events::SecurityEventSeverity::High => "warning",
861 crate::security::events::SecurityEventSeverity::Medium => "info",
862 crate::security::events::SecurityEventSeverity::Low => "info",
863 },
864 "tags": tags,
865 "date_happened": event.timestamp.timestamp(),
866 });
867
868 Ok(datadog_event)
869 }
870}
871
872#[async_trait]
873impl SiemTransport for DatadogTransport {
874 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
875 let datadog_event = self.format_event(event)?;
876 let url = format!("https://api.{}/api/v1/events", self.site);
877
878 let mut last_error = None;
879 for attempt in 0..=self.retry.max_attempts {
880 let mut request =
881 self.client.post(&url).header("DD-API-KEY", &self.api_key).json(&datadog_event);
882
883 if let Some(ref app_key) = self.app_key {
884 request = request.header("DD-APPLICATION-KEY", app_key);
885 }
886
887 match request.send().await {
888 Ok(response) => {
889 if response.status().is_success() {
890 debug!("Sent Datadog event: {}", event.event_type);
891 return Ok(());
892 } else {
893 let status = response.status();
894 let body = response.text().await.unwrap_or_default();
895 last_error = Some(Error::Generic(format!(
896 "Datadog HTTP error {}: {}",
897 status, body
898 )));
899 }
900 }
901 Err(e) => {
902 last_error = Some(Error::Generic(format!("Datadog request failed: {}", e)));
903 }
904 }
905
906 if attempt < self.retry.max_attempts {
907 let delay = if self.retry.backoff == "exponential" {
908 self.retry.initial_delay_secs * (2_u64.pow(attempt))
909 } else {
910 self.retry.initial_delay_secs * (attempt as u64 + 1)
911 };
912 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
913 }
914 }
915
916 Err(last_error.unwrap_or_else(|| {
917 Error::Generic("Failed to send Datadog event after retries".to_string())
918 }))
919 }
920}
921
922pub struct CloudwatchTransport {
924 #[allow(dead_code)]
925 region: String,
926 log_group: String,
927 stream: String,
928 #[allow(dead_code)]
929 credentials: HashMap<String, String>,
930 #[allow(dead_code)]
931 retry: RetryConfig,
932 #[allow(dead_code)]
933 client: reqwest::Client,
934}
935
936impl CloudwatchTransport {
937 pub fn new(
939 region: String,
940 log_group: String,
941 stream: String,
942 credentials: HashMap<String, String>,
943 retry: RetryConfig,
944 ) -> Self {
945 let client = reqwest::Client::builder()
946 .timeout(std::time::Duration::from_secs(10))
947 .build()
948 .expect("Failed to create HTTP client");
949
950 Self {
951 region,
952 log_group,
953 stream,
954 credentials,
955 retry,
956 client,
957 }
958 }
959}
960
961#[async_trait]
962impl SiemTransport for CloudwatchTransport {
963 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
964 warn!(
968 "CloudWatch transport requires AWS SDK for proper implementation. \
969 Using HTTP API fallback (may require additional AWS configuration)"
970 );
971
972 let event_json = event.to_json()?;
973 let _log_events = serde_json::json!([{
974 "timestamp": event.timestamp.timestamp_millis(),
975 "message": event_json
976 }]);
977
978 debug!(
981 "CloudWatch event prepared for log_group={}, stream={}: {}",
982 self.log_group, self.stream, event.event_type
983 );
984
985 Ok(())
987 }
988}
989
990pub struct GcpTransport {
992 project_id: String,
993 log_name: String,
994 #[allow(dead_code)]
995 credentials_path: String,
996 #[allow(dead_code)]
997 retry: RetryConfig,
998 #[allow(dead_code)]
999 client: reqwest::Client,
1000}
1001
1002impl GcpTransport {
1003 pub fn new(
1005 project_id: String,
1006 log_name: String,
1007 credentials_path: String,
1008 retry: RetryConfig,
1009 ) -> Self {
1010 let client = reqwest::Client::builder()
1011 .timeout(std::time::Duration::from_secs(10))
1012 .build()
1013 .expect("Failed to create HTTP client");
1014
1015 Self {
1016 project_id,
1017 log_name,
1018 credentials_path,
1019 retry,
1020 client,
1021 }
1022 }
1023}
1024
1025#[async_trait]
1026impl SiemTransport for GcpTransport {
1027 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
1028 warn!(
1032 "GCP transport requires google-cloud-logging for proper implementation. \
1033 Using HTTP API fallback (may require additional GCP configuration)"
1034 );
1035
1036 let event_json = event.to_json()?;
1037 let _log_entry = serde_json::json!({
1038 "logName": format!("projects/{}/logs/{}", self.project_id, self.log_name),
1039 "resource": {
1040 "type": "global"
1041 },
1042 "timestamp": event.timestamp.to_rfc3339(),
1043 "jsonPayload": serde_json::from_str::<serde_json::Value>(&event_json)
1044 .unwrap_or_else(|_| serde_json::json!({"message": event_json}))
1045 });
1046
1047 debug!(
1050 "GCP event prepared for project={}, log={}: {}",
1051 self.project_id, self.log_name, event.event_type
1052 );
1053
1054 Ok(())
1056 }
1057}
1058
1059pub struct AzureTransport {
1061 workspace_id: String,
1062 shared_key: String,
1063 log_type: String,
1064 retry: RetryConfig,
1065 client: reqwest::Client,
1066}
1067
1068impl AzureTransport {
1069 pub fn new(
1071 workspace_id: String,
1072 shared_key: String,
1073 log_type: String,
1074 retry: RetryConfig,
1075 ) -> Self {
1076 let client = reqwest::Client::builder()
1077 .timeout(std::time::Duration::from_secs(10))
1078 .build()
1079 .expect("Failed to create HTTP client");
1080
1081 Self {
1082 workspace_id,
1083 shared_key,
1084 log_type,
1085 retry,
1086 client,
1087 }
1088 }
1089
1090 fn generate_signature(
1092 &self,
1093 date: &str,
1094 content_length: usize,
1095 method: &str,
1096 content_type: &str,
1097 resource: &str,
1098 ) -> String {
1099 use hmac::{Hmac, Mac};
1100 use sha2::Sha256;
1101
1102 type HmacSha256 = Hmac<Sha256>;
1103
1104 let string_to_sign =
1105 format!("{}\n{}\n{}\n{}\n{}", method, content_length, content_type, date, resource);
1106
1107 let mut mac = HmacSha256::new_from_slice(
1108 base64::decode(&self.shared_key).unwrap_or_default().as_slice(),
1109 )
1110 .expect("HMAC can take key of any size");
1111
1112 mac.update(string_to_sign.as_bytes());
1113 let result = mac.finalize();
1114 base64::encode(result.into_bytes())
1115 }
1116}
1117
1118#[async_trait]
1119impl SiemTransport for AzureTransport {
1120 async fn send_event(&self, event: &SecurityEvent) -> Result<(), Error> {
1121 let event_json = event.to_json()?;
1122 let url = format!(
1123 "https://{}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01",
1124 self.workspace_id
1125 );
1126
1127 let date = chrono::Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
1128 let content_type = "application/json";
1129 let content_length = event_json.len();
1130 let method = "POST";
1131 let resource = "/api/logs?api-version=2016-04-01".to_string();
1132
1133 let signature =
1134 self.generate_signature(&date, content_length, method, content_type, &resource);
1135
1136 let mut last_error = None;
1137 for attempt in 0..=self.retry.max_attempts {
1138 let log_entry = serde_json::json!({
1139 "log_type": self.log_type,
1140 "time_generated": event.timestamp.to_rfc3339(),
1141 "data": serde_json::from_str::<serde_json::Value>(&event_json)
1142 .unwrap_or_else(|_| serde_json::json!({"message": event_json}))
1143 });
1144
1145 match self
1146 .client
1147 .post(&url)
1148 .header("x-ms-date", &date)
1149 .header("Content-Type", content_type)
1150 .header("Authorization", format!("SharedKey {}:{}", self.workspace_id, signature))
1151 .header("Log-Type", &self.log_type)
1152 .header("time-generated-field", "time_generated")
1153 .body(serde_json::to_string(&log_entry)?)
1154 .send()
1155 .await
1156 {
1157 Ok(response) => {
1158 if response.status().is_success() {
1159 debug!("Sent Azure Monitor event: {}", event.event_type);
1160 return Ok(());
1161 } else {
1162 let status = response.status();
1163 let body = response.text().await.unwrap_or_default();
1164 last_error = Some(Error::Generic(format!(
1165 "Azure Monitor HTTP error {}: {}",
1166 status, body
1167 )));
1168 }
1169 }
1170 Err(e) => {
1171 last_error =
1172 Some(Error::Generic(format!("Azure Monitor request failed: {}", e)));
1173 }
1174 }
1175
1176 if attempt < self.retry.max_attempts {
1177 let delay = if self.retry.backoff == "exponential" {
1178 self.retry.initial_delay_secs * (2_u64.pow(attempt))
1179 } else {
1180 self.retry.initial_delay_secs * (attempt as u64 + 1)
1181 };
1182 tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
1183 }
1184 }
1185
1186 Err(last_error.unwrap_or_else(|| {
1187 Error::Generic("Failed to send Azure Monitor event after retries".to_string())
1188 }))
1189 }
1190}
1191
1192#[derive(Debug, Clone, Serialize, Deserialize)]
1194pub struct TransportHealth {
1195 pub identifier: String,
1197 pub healthy: bool,
1199 pub last_success: Option<chrono::DateTime<chrono::Utc>>,
1201 pub last_error: Option<String>,
1203 pub success_count: u64,
1205 pub failure_count: u64,
1207}
1208
1209pub struct SiemEmitter {
1211 transports: Vec<Box<dyn SiemTransport>>,
1212 filters: Option<EventFilter>,
1213 health_status: Arc<RwLock<Vec<TransportHealth>>>,
1215}
1216
1217impl SiemEmitter {
1218 pub async fn from_config(config: SiemConfig) -> Result<Self, Error> {
1220 if !config.enabled {
1221 return Ok(Self {
1222 transports: Vec::new(),
1223 filters: config.filters,
1224 health_status: Arc::new(RwLock::new(Vec::new())),
1225 });
1226 }
1227
1228 let mut transports: Vec<Box<dyn SiemTransport>> = Vec::new();
1229
1230 for dest in config.destinations {
1231 let transport: Box<dyn SiemTransport> = match dest {
1232 SiemDestination::Syslog {
1233 host,
1234 port,
1235 transport,
1236 facility,
1237 tag,
1238 } => Box::new(SyslogTransport::new(host, port, transport, facility, tag)),
1239 SiemDestination::Http {
1240 url,
1241 method,
1242 headers,
1243 timeout,
1244 retry,
1245 } => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
1246 SiemDestination::Https {
1247 url,
1248 method,
1249 headers,
1250 timeout,
1251 retry,
1252 } => Box::new(HttpTransport::new(url, method, headers, timeout, retry)),
1253 SiemDestination::File { path, format, .. } => {
1254 Box::new(FileTransport::new(path, format).await?)
1255 }
1256 SiemDestination::Splunk {
1257 url,
1258 token,
1259 index,
1260 source_type,
1261 } => Box::new(SplunkTransport::new(
1262 url,
1263 token,
1264 index,
1265 source_type,
1266 RetryConfig::default(),
1267 )),
1268 SiemDestination::Datadog {
1269 api_key,
1270 app_key,
1271 site,
1272 tags,
1273 } => Box::new(DatadogTransport::new(
1274 api_key,
1275 app_key,
1276 site,
1277 tags,
1278 RetryConfig::default(),
1279 )),
1280 SiemDestination::Cloudwatch {
1281 region,
1282 log_group,
1283 stream,
1284 credentials,
1285 } => Box::new(CloudwatchTransport::new(
1286 region,
1287 log_group,
1288 stream,
1289 credentials,
1290 RetryConfig::default(),
1291 )),
1292 SiemDestination::Gcp {
1293 project_id,
1294 log_name,
1295 credentials_path,
1296 } => Box::new(GcpTransport::new(
1297 project_id,
1298 log_name,
1299 credentials_path,
1300 RetryConfig::default(),
1301 )),
1302 SiemDestination::Azure {
1303 workspace_id,
1304 shared_key,
1305 log_type,
1306 } => Box::new(AzureTransport::new(
1307 workspace_id,
1308 shared_key,
1309 log_type,
1310 RetryConfig::default(),
1311 )),
1312 };
1313 transports.push(transport);
1314 }
1315
1316 let health_status = Arc::new(RwLock::new(
1317 transports
1318 .iter()
1319 .enumerate()
1320 .map(|(i, _)| TransportHealth {
1321 identifier: format!("transport_{}", i),
1322 healthy: true,
1323 last_success: None,
1324 last_error: None,
1325 success_count: 0,
1326 failure_count: 0,
1327 })
1328 .collect(),
1329 ));
1330
1331 Ok(Self {
1332 transports,
1333 filters: config.filters,
1334 health_status,
1335 })
1336 }
1337
1338 pub async fn emit(&self, event: SecurityEvent) -> Result<(), Error> {
1340 if let Some(ref filter) = self.filters {
1342 if !filter.should_include(&event) {
1343 debug!("Event filtered out: {}", event.event_type);
1344 return Ok(());
1345 }
1346 }
1347
1348 let mut errors = Vec::new();
1350 let mut health_status = self.health_status.write().await;
1351
1352 for (idx, transport) in self.transports.iter().enumerate() {
1353 match transport.send_event(&event).await {
1354 Ok(()) => {
1355 if let Some(health) = health_status.get_mut(idx) {
1356 health.healthy = true;
1357 health.last_success = Some(chrono::Utc::now());
1358 health.success_count += 1;
1359 health.last_error = None;
1360 }
1361 }
1362 Err(e) => {
1363 let error_msg = format!("{}", e);
1364 error!("Failed to send event to SIEM: {}", error_msg);
1365 errors.push(Error::Generic(error_msg.clone()));
1366 if let Some(health) = health_status.get_mut(idx) {
1367 health.healthy = false;
1368 health.failure_count += 1;
1369 health.last_error = Some(error_msg);
1370 }
1371 }
1372 }
1373 }
1374
1375 drop(health_status);
1376
1377 if !errors.is_empty() && errors.len() == self.transports.len() {
1378 return Err(Error::Generic(format!(
1380 "All SIEM transports failed: {} errors",
1381 errors.len()
1382 )));
1383 }
1384
1385 Ok(())
1386 }
1387
1388 pub async fn health_status(&self) -> Vec<TransportHealth> {
1390 self.health_status.read().await.clone()
1391 }
1392
1393 pub async fn is_healthy(&self) -> bool {
1395 let health_status = self.health_status.read().await;
1396 health_status.iter().any(|h| h.healthy)
1397 }
1398
1399 pub async fn health_summary(&self) -> (usize, usize, usize) {
1401 let health_status = self.health_status.read().await;
1402 let total = health_status.len();
1403 let healthy = health_status.iter().filter(|h| h.healthy).count();
1404 let unhealthy = total - healthy;
1405 (total, healthy, unhealthy)
1406 }
1407}
1408
1409#[cfg(test)]
1410mod tests {
1411 use super::*;
1412 use crate::security::events::{SecurityEvent, SecurityEventType};
1413
1414 #[test]
1415 fn test_event_filter_include() {
1416 let filter = EventFilter {
1417 include: Some(vec!["auth.*".to_string()]),
1418 exclude: None,
1419 conditions: None,
1420 };
1421
1422 let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1423
1424 assert!(filter.should_include(&event));
1425
1426 let event = SecurityEvent::new(SecurityEventType::ConfigChanged, None, None);
1427
1428 assert!(!filter.should_include(&event));
1429 }
1430
1431 #[test]
1432 fn test_event_filter_exclude() {
1433 let filter = EventFilter {
1434 include: None,
1435 exclude: Some(vec!["severity:low".to_string()]),
1436 conditions: None,
1437 };
1438
1439 let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1440
1441 assert!(!filter.should_include(&event));
1442
1443 let event = SecurityEvent::new(SecurityEventType::AuthFailure, None, None);
1444
1445 assert!(filter.should_include(&event));
1446 }
1447
1448 #[tokio::test]
1449 async fn test_syslog_transport_format() {
1450 let transport = SyslogTransport::new(
1451 "localhost".to_string(),
1452 514,
1453 "udp".to_string(),
1454 SyslogFacility::Local0,
1455 "mockforge".to_string(),
1456 );
1457
1458 let event = SecurityEvent::new(SecurityEventType::AuthSuccess, None, None);
1459
1460 let message = transport.format_syslog_message(&event);
1461 assert!(message.starts_with("<"));
1462 assert!(message.contains("mockforge"));
1463 }
1464
1465 #[test]
1466 fn test_siem_protocol_serialization() {
1467 let protocols = vec![
1468 SiemProtocol::Syslog,
1469 SiemProtocol::Http,
1470 SiemProtocol::Https,
1471 SiemProtocol::File,
1472 SiemProtocol::Splunk,
1473 SiemProtocol::Datadog,
1474 SiemProtocol::Cloudwatch,
1475 SiemProtocol::Gcp,
1476 SiemProtocol::Azure,
1477 ];
1478
1479 for protocol in protocols {
1480 let json = serde_json::to_string(&protocol).unwrap();
1481 assert!(!json.is_empty());
1482 let deserialized: SiemProtocol = serde_json::from_str(&json).unwrap();
1483 assert_eq!(protocol, deserialized);
1484 }
1485 }
1486
1487 #[test]
1488 fn test_syslog_facility_default() {
1489 let facility = SyslogFacility::default();
1490 assert_eq!(facility, SyslogFacility::Local0);
1491 }
1492
1493 #[test]
1494 fn test_syslog_facility_serialization() {
1495 let facilities = vec![
1496 SyslogFacility::Kernel,
1497 SyslogFacility::User,
1498 SyslogFacility::Security,
1499 SyslogFacility::Local0,
1500 SyslogFacility::Local7,
1501 ];
1502
1503 for facility in facilities {
1504 let json = serde_json::to_string(&facility).unwrap();
1505 assert!(!json.is_empty());
1506 let deserialized: SyslogFacility = serde_json::from_str(&json).unwrap();
1507 assert_eq!(facility, deserialized);
1508 }
1509 }
1510
1511 #[test]
1512 fn test_syslog_severity_from_security_event_severity() {
1513 use crate::security::events::SecurityEventSeverity;
1514
1515 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Low), SyslogSeverity::Informational);
1516 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Medium), SyslogSeverity::Warning);
1517 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::High), SyslogSeverity::Error);
1518 assert_eq!(SyslogSeverity::from(SecurityEventSeverity::Critical), SyslogSeverity::Critical);
1519 }
1520
1521 #[test]
1522 fn test_retry_config_default() {
1523 let config = RetryConfig::default();
1524 assert_eq!(config.max_attempts, 3);
1525 assert_eq!(config.backoff, "exponential");
1526 assert_eq!(config.initial_delay_secs, 1);
1527 }
1528
1529 #[test]
1530 fn test_retry_config_serialization() {
1531 let config = RetryConfig {
1532 max_attempts: 5,
1533 backoff: "linear".to_string(),
1534 initial_delay_secs: 2,
1535 };
1536
1537 let json = serde_json::to_string(&config).unwrap();
1538 assert!(json.contains("max_attempts"));
1539 assert!(json.contains("linear"));
1540 }
1541
1542 #[test]
1543 fn test_file_rotation_config_serialization() {
1544 let config = FileRotationConfig {
1545 max_size: "100MB".to_string(),
1546 max_files: 10,
1547 compress: true,
1548 };
1549
1550 let json = serde_json::to_string(&config).unwrap();
1551 assert!(json.contains("100MB"));
1552 assert!(json.contains("max_files"));
1553 }
1554
1555 #[test]
1556 fn test_siem_config_default() {
1557 let config = SiemConfig::default();
1558 assert!(!config.enabled);
1559 assert!(config.protocol.is_none());
1560 assert!(config.destinations.is_empty());
1561 assert!(config.filters.is_none());
1562 }
1563
1564 #[test]
1565 fn test_siem_config_serialization() {
1566 let config = SiemConfig {
1567 enabled: true,
1568 protocol: Some(SiemProtocol::Syslog),
1569 destinations: vec![],
1570 filters: None,
1571 };
1572
1573 let json = serde_json::to_string(&config).unwrap();
1574 assert!(json.contains("enabled"));
1575 assert!(json.contains("syslog"));
1576 }
1577
1578 #[test]
1579 fn test_transport_health_creation() {
1580 let health = TransportHealth {
1581 identifier: "test_transport".to_string(),
1582 healthy: true,
1583 last_success: Some(chrono::Utc::now()),
1584 last_error: None,
1585 success_count: 100,
1586 failure_count: 0,
1587 };
1588
1589 assert_eq!(health.identifier, "test_transport");
1590 assert!(health.healthy);
1591 assert_eq!(health.success_count, 100);
1592 assert_eq!(health.failure_count, 0);
1593 }
1594
1595 #[test]
1596 fn test_transport_health_serialization() {
1597 let health = TransportHealth {
1598 identifier: "transport_1".to_string(),
1599 healthy: false,
1600 last_success: None,
1601 last_error: Some("Connection failed".to_string()),
1602 success_count: 50,
1603 failure_count: 5,
1604 };
1605
1606 let json = serde_json::to_string(&health).unwrap();
1607 assert!(json.contains("transport_1"));
1608 assert!(json.contains("Connection failed"));
1609 }
1610
1611 #[test]
1612 fn test_syslog_transport_new() {
1613 let transport = SyslogTransport::new(
1614 "example.com".to_string(),
1615 514,
1616 "tcp".to_string(),
1617 SyslogFacility::Security,
1618 "app".to_string(),
1619 );
1620
1621 let _ = transport;
1623 }
1624
1625 #[test]
1626 fn test_http_transport_new() {
1627 let mut headers = HashMap::new();
1628 headers.insert("X-Custom-Header".to_string(), "value".to_string());
1629 let transport = HttpTransport::new(
1630 "https://example.com/webhook".to_string(),
1631 "POST".to_string(),
1632 headers,
1633 10,
1634 RetryConfig::default(),
1635 );
1636
1637 let _ = transport;
1639 }
1640
1641 #[test]
1642 fn test_splunk_transport_new() {
1643 let transport = SplunkTransport::new(
1644 "https://splunk.example.com:8088".to_string(),
1645 "token123".to_string(),
1646 Some("index1".to_string()),
1647 Some("json".to_string()),
1648 RetryConfig::default(),
1649 );
1650
1651 let _ = transport;
1653 }
1654
1655 #[test]
1656 fn test_datadog_transport_new() {
1657 let transport = DatadogTransport::new(
1658 "api_key_123".to_string(),
1659 Some("app_key_456".to_string()),
1660 "us".to_string(),
1661 vec!["env:test".to_string()],
1662 RetryConfig::default(),
1663 );
1664
1665 let _ = transport;
1667 }
1668
1669 #[test]
1670 fn test_cloudwatch_transport_new() {
1671 let mut credentials = HashMap::new();
1672 credentials.insert("access_key".to_string(), "key123".to_string());
1673 credentials.insert("secret_key".to_string(), "secret123".to_string());
1674 let transport = CloudwatchTransport::new(
1675 "us-east-1".to_string(),
1676 "log-group-name".to_string(),
1677 "log-stream-name".to_string(),
1678 credentials,
1679 RetryConfig::default(),
1680 );
1681
1682 let _ = transport;
1684 }
1685
1686 #[test]
1687 fn test_gcp_transport_new() {
1688 let transport = GcpTransport::new(
1689 "project-id".to_string(),
1690 "log-name".to_string(),
1691 "/path/to/credentials.json".to_string(),
1692 RetryConfig::default(),
1693 );
1694
1695 let _ = transport;
1697 }
1698
1699 #[test]
1700 fn test_azure_transport_new() {
1701 let transport = AzureTransport::new(
1702 "workspace-id".to_string(),
1703 "shared-key".to_string(),
1704 "CustomLog".to_string(),
1705 RetryConfig::default(),
1706 );
1707
1708 let _ = transport;
1710 }
1711}