Skip to main content

synapse_pingora/intelligence/
signal_manager.rs

1//! SignalManager - aggregates security signals into time buckets.
2//!
3//! Signals are categorized into high-level buckets:
4//! - Attack
5//! - Anomaly
6//! - Behavior
7//! - Intelligence
8//!
9//! This manager provides lightweight, in-memory storage optimized for
10//! last-24-hour visibility and dashboard queries.
11
12use std::collections::{HashMap, VecDeque};
13
14use parking_lot::RwLock;
15use serde::{Deserialize, Serialize};
16use tracing::warn;
17use uuid::Uuid;
18
19// ============================================================================
20// Types
21// ============================================================================
22
23/// High-level signal categories.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum SignalCategory {
27    Attack,
28    Anomaly,
29    Behavior,
30    Intelligence,
31}
32
33/// Security signal recorded by the sensor.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct Signal {
36    /// Unique signal ID.
37    pub id: String,
38    /// Unix timestamp in milliseconds.
39    pub timestamp_ms: u64,
40    /// Signal category.
41    pub category: SignalCategory,
42    /// Signal type identifier (string for extensibility).
43    pub signal_type: String,
44    /// Optional entity identifier (IP, actor ID, fingerprint).
45    pub entity_id: Option<String>,
46    /// Human-readable description.
47    pub description: Option<String>,
48    /// Arbitrary structured metadata.
49    pub metadata: serde_json::Value,
50}
51
52impl Signal {
53    pub fn new(
54        category: SignalCategory,
55        signal_type: impl Into<String>,
56        entity_id: Option<String>,
57        description: Option<String>,
58        metadata: serde_json::Value,
59    ) -> Self {
60        Self {
61            id: Uuid::new_v4().to_string(),
62            timestamp_ms: now_ms(),
63            category,
64            signal_type: signal_type.into(),
65            entity_id,
66            description,
67            metadata,
68        }
69    }
70}
71
72/// Query options for listing signals.
73#[derive(Debug, Clone, Default)]
74pub struct SignalQueryOptions {
75    pub category: Option<SignalCategory>,
76    pub limit: Option<usize>,
77    pub since_ms: Option<u64>,
78}
79
80/// Summary of signals for dashboards.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct SignalSummary {
83    pub total_signals: usize,
84    pub by_category: HashMap<SignalCategory, usize>,
85    pub top_signal_types: Vec<TopSignalType>,
86}
87
88/// Top signal type counts.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct TopSignalType {
91    pub signal_type: String,
92    pub count: usize,
93}
94
95/// Signal manager configuration.
96#[derive(Debug, Clone)]
97pub struct SignalManagerConfig {
98    /// Bucket size in milliseconds (default: 5 minutes).
99    pub bucket_size_ms: u64,
100    /// Total retention window in milliseconds (default: 24 hours).
101    pub retention_ms: u64,
102    /// Maximum stored signals per bucket (default: 1000).
103    pub max_signals_per_bucket: usize,
104    /// Maximum number of signals returned per query (default: 500).
105    pub max_query_results: usize,
106}
107
108impl Default for SignalManagerConfig {
109    fn default() -> Self {
110        Self {
111            bucket_size_ms: 5 * 60 * 1000,
112            retention_ms: 24 * 60 * 60 * 1000,
113            max_signals_per_bucket: 1000,
114            max_query_results: 500,
115        }
116    }
117}
118
119// ============================================================================
120// Internal Structures
121// ============================================================================
122
123#[derive(Debug, Clone)]
124struct SignalBucket {
125    timestamp_ms: u64,
126    end_timestamp_ms: u64,
127    signals: Vec<Signal>,
128    by_category: HashMap<SignalCategory, usize>,
129    by_type: HashMap<String, usize>,
130}
131
132impl SignalBucket {
133    fn new(timestamp_ms: u64, bucket_size_ms: u64) -> Self {
134        Self {
135            timestamp_ms,
136            end_timestamp_ms: timestamp_ms + bucket_size_ms,
137            signals: Vec::new(),
138            by_category: HashMap::new(),
139            by_type: HashMap::new(),
140        }
141    }
142
143    fn add_signal(&mut self, signal: Signal, max_signals: usize) {
144        *self.by_category.entry(signal.category).or_insert(0) += 1;
145        *self.by_type.entry(signal.signal_type.clone()).or_insert(0) += 1;
146
147        if self.signals.len() < max_signals {
148            self.signals.push(signal);
149        }
150    }
151}
152
153#[derive(Debug, Default)]
154struct SignalStore {
155    buckets: VecDeque<SignalBucket>,
156}
157
158// ============================================================================
159// Signal Manager
160// ============================================================================
161
162/// In-memory signal aggregation manager.
163pub struct SignalManager {
164    config: SignalManagerConfig,
165    store: RwLock<SignalStore>,
166}
167
168impl SignalManager {
169    pub fn new(config: SignalManagerConfig) -> Self {
170        Self {
171            config,
172            store: RwLock::new(SignalStore::default()),
173        }
174    }
175
176    /// Record a signal into the time store.
177    pub fn record(&self, signal: Signal) {
178        let mut store = self.store.write();
179        let bucket_ts = bucket_timestamp(signal.timestamp_ms, self.config.bucket_size_ms);
180        let mut needs_bucket = true;
181        if let Some(last) = store.buckets.back() {
182            if last.timestamp_ms == bucket_ts {
183                needs_bucket = false;
184            } else if bucket_ts > last.timestamp_ms {
185                let mut ts = last.timestamp_ms + self.config.bucket_size_ms;
186                while ts <= bucket_ts {
187                    store
188                        .buckets
189                        .push_back(SignalBucket::new(ts, self.config.bucket_size_ms));
190                    ts += self.config.bucket_size_ms;
191                }
192                needs_bucket = false;
193            }
194        }
195
196        if needs_bucket {
197            store
198                .buckets
199                .push_back(SignalBucket::new(bucket_ts, self.config.bucket_size_ms));
200        }
201
202        let Some(bucket) = store.buckets.back_mut() else {
203            warn!("Signal bucket allocation failed; dropping signal");
204            return;
205        };
206
207        bucket.add_signal(signal, self.config.max_signals_per_bucket);
208        self.evict_old_buckets(&mut store);
209    }
210
211    /// Convenience method to build and record a signal.
212    pub fn record_event(
213        &self,
214        category: SignalCategory,
215        signal_type: impl Into<String>,
216        entity_id: Option<String>,
217        description: Option<String>,
218        metadata: serde_json::Value,
219    ) {
220        self.record(Signal::new(
221            category,
222            signal_type,
223            entity_id,
224            description,
225            metadata,
226        ));
227    }
228
229    /// List recent signals with optional filtering.
230    pub fn list_signals(&self, options: SignalQueryOptions) -> Vec<Signal> {
231        let store = self.store.read();
232        let limit = options
233            .limit
234            .unwrap_or(self.config.max_query_results)
235            .min(self.config.max_query_results);
236
237        let mut results = Vec::with_capacity(limit);
238        for bucket in store.buckets.iter().rev() {
239            for signal in bucket.signals.iter().rev() {
240                if let Some(category) = options.category {
241                    if signal.category != category {
242                        continue;
243                    }
244                }
245                if let Some(since_ms) = options.since_ms {
246                    if signal.timestamp_ms < since_ms {
247                        continue;
248                    }
249                }
250                results.push(signal.clone());
251                if results.len() >= limit {
252                    return results;
253                }
254            }
255        }
256        results
257    }
258
259    /// Build a summary of signals for dashboards.
260    pub fn summary(&self) -> SignalSummary {
261        let store = self.store.read();
262        let mut by_category: HashMap<SignalCategory, usize> = HashMap::new();
263        let mut by_type: HashMap<String, usize> = HashMap::new();
264        let mut total = 0usize;
265
266        for bucket in store.buckets.iter() {
267            total += bucket.signals.len();
268            for (category, count) in &bucket.by_category {
269                *by_category.entry(*category).or_insert(0) += count;
270            }
271            for (signal_type, count) in &bucket.by_type {
272                *by_type.entry(signal_type.clone()).or_insert(0) += count;
273            }
274        }
275
276        let mut top_signal_types: Vec<TopSignalType> = by_type
277            .into_iter()
278            .map(|(signal_type, count)| TopSignalType { signal_type, count })
279            .collect();
280        top_signal_types.sort_by_key(|s| std::cmp::Reverse(s.count));
281        top_signal_types.truncate(10);
282
283        SignalSummary {
284            total_signals: total,
285            by_category,
286            top_signal_types,
287        }
288    }
289
290    fn evict_old_buckets(&self, store: &mut SignalStore) {
291        let max_buckets = (self.config.retention_ms / self.config.bucket_size_ms).max(1) as usize;
292        while store.buckets.len() > max_buckets {
293            store.buckets.pop_front();
294        }
295    }
296}
297
298#[inline]
299fn bucket_timestamp(timestamp_ms: u64, bucket_size_ms: u64) -> u64 {
300    timestamp_ms - (timestamp_ms % bucket_size_ms)
301}
302
303#[inline]
304fn now_ms() -> u64 {
305    use std::time::{SystemTime, UNIX_EPOCH};
306    SystemTime::now()
307        .duration_since(UNIX_EPOCH)
308        .map(|d| d.as_millis() as u64)
309        .unwrap_or(0)
310}
311
312// ============================================================================
313// Tests
314// ============================================================================
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    fn test_limit_usize(env_key: &str, default: usize, min: usize) -> usize {
321        std::env::var(env_key)
322            .ok()
323            .and_then(|value| value.parse::<usize>().ok())
324            .map(|value| value.max(min).min(default))
325            .unwrap_or(default)
326    }
327
328    fn test_limit_u64(env_key: &str, default: u64, min: u64) -> u64 {
329        std::env::var(env_key)
330            .ok()
331            .and_then(|value| value.parse::<u64>().ok())
332            .map(|value| value.max(min).min(default))
333            .unwrap_or(default)
334    }
335
336    fn test_config() -> SignalManagerConfig {
337        SignalManagerConfig {
338            bucket_size_ms: 1000, // 1 second buckets for testing
339            retention_ms: 10_000, // 10 seconds
340            max_signals_per_bucket: 100,
341            max_query_results: 50,
342        }
343    }
344
345    // ========================================================================
346    // Signal Creation Tests
347    // ========================================================================
348
349    #[test]
350    fn test_signal_new_creates_unique_id() {
351        let s1 = Signal::new(
352            SignalCategory::Attack,
353            "sql_injection",
354            None,
355            None,
356            serde_json::json!({}),
357        );
358        let s2 = Signal::new(
359            SignalCategory::Attack,
360            "sql_injection",
361            None,
362            None,
363            serde_json::json!({}),
364        );
365
366        assert_ne!(s1.id, s2.id, "Each signal should have unique ID");
367    }
368
369    #[test]
370    fn test_signal_new_sets_timestamp() {
371        let before = now_ms();
372        let signal = Signal::new(
373            SignalCategory::Anomaly,
374            "rate_spike",
375            Some("192.168.1.1".to_string()),
376            Some("Unusual request rate".to_string()),
377            serde_json::json!({"rate": 1000}),
378        );
379        let after = now_ms();
380
381        assert!(signal.timestamp_ms >= before);
382        assert!(signal.timestamp_ms <= after);
383    }
384
385    #[test]
386    fn test_signal_fields_populated() {
387        let signal = Signal::new(
388            SignalCategory::Behavior,
389            "crawler_detected",
390            Some("10.0.0.1".to_string()),
391            Some("Bot behavior".to_string()),
392            serde_json::json!({"bot_name": "test_bot"}),
393        );
394
395        assert_eq!(signal.category, SignalCategory::Behavior);
396        assert_eq!(signal.signal_type, "crawler_detected");
397        assert_eq!(signal.entity_id, Some("10.0.0.1".to_string()));
398        assert_eq!(signal.description, Some("Bot behavior".to_string()));
399        assert_eq!(signal.metadata["bot_name"], "test_bot");
400    }
401
402    // ========================================================================
403    // Signal Manager Recording Tests
404    // ========================================================================
405
406    #[test]
407    fn test_record_signal() {
408        let manager = SignalManager::new(test_config());
409
410        manager.record_event(
411            SignalCategory::Attack,
412            "xss",
413            Some("1.2.3.4".to_string()),
414            Some("XSS attempt".to_string()),
415            serde_json::json!({}),
416        );
417
418        let signals = manager.list_signals(SignalQueryOptions::default());
419        assert_eq!(signals.len(), 1);
420        assert_eq!(signals[0].category, SignalCategory::Attack);
421        assert_eq!(signals[0].signal_type, "xss");
422    }
423
424    #[test]
425    fn test_record_multiple_signals() {
426        let manager = SignalManager::new(test_config());
427
428        for i in 0..5 {
429            manager.record_event(
430                SignalCategory::Attack,
431                format!("attack_{}", i),
432                None,
433                None,
434                serde_json::json!({"index": i}),
435            );
436        }
437
438        let signals = manager.list_signals(SignalQueryOptions::default());
439        assert_eq!(signals.len(), 5);
440    }
441
442    #[test]
443    fn test_record_different_categories() {
444        let manager = SignalManager::new(test_config());
445
446        manager.record_event(
447            SignalCategory::Attack,
448            "sqli",
449            None,
450            None,
451            serde_json::json!({}),
452        );
453        manager.record_event(
454            SignalCategory::Anomaly,
455            "rate_spike",
456            None,
457            None,
458            serde_json::json!({}),
459        );
460        manager.record_event(
461            SignalCategory::Behavior,
462            "crawler",
463            None,
464            None,
465            serde_json::json!({}),
466        );
467        manager.record_event(
468            SignalCategory::Intelligence,
469            "blocklist_hit",
470            None,
471            None,
472            serde_json::json!({}),
473        );
474
475        let summary = manager.summary();
476        assert_eq!(summary.total_signals, 4);
477        assert_eq!(summary.by_category.get(&SignalCategory::Attack), Some(&1));
478        assert_eq!(summary.by_category.get(&SignalCategory::Anomaly), Some(&1));
479        assert_eq!(summary.by_category.get(&SignalCategory::Behavior), Some(&1));
480        assert_eq!(
481            summary.by_category.get(&SignalCategory::Intelligence),
482            Some(&1)
483        );
484    }
485
486    // ========================================================================
487    // Query Filtering Tests
488    // ========================================================================
489
490    #[test]
491    fn test_list_signals_filter_by_category() {
492        let manager = SignalManager::new(test_config());
493
494        manager.record_event(
495            SignalCategory::Attack,
496            "sqli",
497            None,
498            None,
499            serde_json::json!({}),
500        );
501        manager.record_event(
502            SignalCategory::Attack,
503            "xss",
504            None,
505            None,
506            serde_json::json!({}),
507        );
508        manager.record_event(
509            SignalCategory::Anomaly,
510            "rate_spike",
511            None,
512            None,
513            serde_json::json!({}),
514        );
515
516        let attacks = manager.list_signals(SignalQueryOptions {
517            category: Some(SignalCategory::Attack),
518            ..Default::default()
519        });
520        assert_eq!(attacks.len(), 2);
521
522        let anomalies = manager.list_signals(SignalQueryOptions {
523            category: Some(SignalCategory::Anomaly),
524            ..Default::default()
525        });
526        assert_eq!(anomalies.len(), 1);
527    }
528
529    #[test]
530    fn test_list_signals_limit() {
531        let manager = SignalManager::new(test_config());
532
533        for i in 0..20 {
534            manager.record_event(
535                SignalCategory::Attack,
536                format!("attack_{}", i),
537                None,
538                None,
539                serde_json::json!({}),
540            );
541        }
542
543        let limited = manager.list_signals(SignalQueryOptions {
544            limit: Some(5),
545            ..Default::default()
546        });
547        assert_eq!(limited.len(), 5);
548    }
549
550    #[test]
551    fn test_list_signals_respects_max_query_results() {
552        let config = SignalManagerConfig {
553            max_query_results: 10,
554            ..test_config()
555        };
556        let manager = SignalManager::new(config);
557
558        for i in 0..20 {
559            manager.record_event(
560                SignalCategory::Attack,
561                format!("attack_{}", i),
562                None,
563                None,
564                serde_json::json!({}),
565            );
566        }
567
568        // Request more than max_query_results
569        let signals = manager.list_signals(SignalQueryOptions {
570            limit: Some(100),
571            ..Default::default()
572        });
573        assert_eq!(signals.len(), 10);
574    }
575
576    #[test]
577    fn test_list_signals_returns_most_recent_first() {
578        let manager = SignalManager::new(test_config());
579
580        manager.record_event(
581            SignalCategory::Attack,
582            "first",
583            None,
584            None,
585            serde_json::json!({}),
586        );
587        manager.record_event(
588            SignalCategory::Attack,
589            "second",
590            None,
591            None,
592            serde_json::json!({}),
593        );
594        manager.record_event(
595            SignalCategory::Attack,
596            "third",
597            None,
598            None,
599            serde_json::json!({}),
600        );
601
602        let signals = manager.list_signals(SignalQueryOptions::default());
603        assert_eq!(signals[0].signal_type, "third");
604        assert_eq!(signals[1].signal_type, "second");
605        assert_eq!(signals[2].signal_type, "first");
606    }
607
608    // ========================================================================
609    // Summary Tests
610    // ========================================================================
611
612    #[test]
613    fn test_summary_empty() {
614        let manager = SignalManager::new(test_config());
615        let summary = manager.summary();
616
617        assert_eq!(summary.total_signals, 0);
618        assert!(summary.by_category.is_empty());
619        assert!(summary.top_signal_types.is_empty());
620    }
621
622    #[test]
623    fn test_summary_counts_by_category() {
624        let manager = SignalManager::new(test_config());
625
626        for _ in 0..3 {
627            manager.record_event(
628                SignalCategory::Attack,
629                "sqli",
630                None,
631                None,
632                serde_json::json!({}),
633            );
634        }
635        for _ in 0..2 {
636            manager.record_event(
637                SignalCategory::Anomaly,
638                "rate",
639                None,
640                None,
641                serde_json::json!({}),
642            );
643        }
644
645        let summary = manager.summary();
646        assert_eq!(summary.total_signals, 5);
647        assert_eq!(summary.by_category.get(&SignalCategory::Attack), Some(&3));
648        assert_eq!(summary.by_category.get(&SignalCategory::Anomaly), Some(&2));
649    }
650
651    #[test]
652    fn test_summary_top_signal_types() {
653        let manager = SignalManager::new(test_config());
654
655        for _ in 0..5 {
656            manager.record_event(
657                SignalCategory::Attack,
658                "sqli",
659                None,
660                None,
661                serde_json::json!({}),
662            );
663        }
664        for _ in 0..3 {
665            manager.record_event(
666                SignalCategory::Attack,
667                "xss",
668                None,
669                None,
670                serde_json::json!({}),
671            );
672        }
673        for _ in 0..1 {
674            manager.record_event(
675                SignalCategory::Attack,
676                "rce",
677                None,
678                None,
679                serde_json::json!({}),
680            );
681        }
682
683        let summary = manager.summary();
684        assert_eq!(summary.top_signal_types.len(), 3);
685        assert_eq!(summary.top_signal_types[0].signal_type, "sqli");
686        assert_eq!(summary.top_signal_types[0].count, 5);
687        assert_eq!(summary.top_signal_types[1].signal_type, "xss");
688        assert_eq!(summary.top_signal_types[1].count, 3);
689        assert_eq!(summary.top_signal_types[2].signal_type, "rce");
690        assert_eq!(summary.top_signal_types[2].count, 1);
691    }
692
693    #[test]
694    fn test_summary_top_signal_types_limited_to_10() {
695        let manager = SignalManager::new(test_config());
696
697        for i in 0..15 {
698            manager.record_event(
699                SignalCategory::Attack,
700                format!("attack_type_{}", i),
701                None,
702                None,
703                serde_json::json!({}),
704            );
705        }
706
707        let summary = manager.summary();
708        assert_eq!(summary.top_signal_types.len(), 10);
709    }
710
711    // ========================================================================
712    // Time Bucket Tests
713    // ========================================================================
714
715    #[test]
716    fn test_bucket_timestamp_calculation() {
717        // With 1000ms buckets:
718        // 1500ms should go to bucket 1000
719        // 2500ms should go to bucket 2000
720        assert_eq!(bucket_timestamp(1500, 1000), 1000);
721        assert_eq!(bucket_timestamp(2500, 1000), 2000);
722        assert_eq!(bucket_timestamp(3000, 1000), 3000);
723    }
724
725    #[test]
726    fn test_max_signals_per_bucket() {
727        let config = SignalManagerConfig {
728            max_signals_per_bucket: 3,
729            ..test_config()
730        };
731        let manager = SignalManager::new(config);
732
733        // Record more than max_signals_per_bucket
734        for i in 0..10 {
735            manager.record_event(
736                SignalCategory::Attack,
737                format!("attack_{}", i),
738                None,
739                None,
740                serde_json::json!({}),
741            );
742        }
743
744        // Summary should count all signals (via counters)
745        let summary = manager.summary();
746        assert_eq!(summary.by_category.get(&SignalCategory::Attack), Some(&10));
747
748        // But list should only return stored signals
749        let signals = manager.list_signals(SignalQueryOptions::default());
750        assert_eq!(signals.len(), 3);
751    }
752
753    // ========================================================================
754    // Bucket Eviction Tests
755    // ========================================================================
756
757    #[test]
758    #[cfg_attr(not(feature = "heavy-tests"), ignore)]
759    fn test_bucket_eviction_respects_retention() {
760        let bucket_size_ms = 1000;
761        let retention_ms = test_limit_u64("SYNAPSE_TEST_RETENTION_MS", 3000, bucket_size_ms);
762        let bucket_count = test_limit_usize("SYNAPSE_TEST_BUCKET_COUNT", 5, 1);
763        let max_buckets = (retention_ms / bucket_size_ms).max(1) as usize;
764        eprintln!(
765            "test_bucket_eviction_respects_retention: bucket_count={}, retention_ms={}, max_buckets={}",
766            bucket_count, retention_ms, max_buckets
767        );
768
769        let config = SignalManagerConfig {
770            bucket_size_ms,
771            retention_ms,
772            max_signals_per_bucket: 100,
773            max_query_results: 500,
774        };
775        let manager = SignalManager::new(config);
776
777        // Create signals with manually set timestamps to simulate time passing
778        // Note: This tests internal eviction logic indirectly
779        let store = &manager.store;
780
781        {
782            let mut store_lock = store.write();
783            // Add 5 buckets manually
784            for i in 0..bucket_count {
785                let mut bucket = SignalBucket::new((i as u64) * bucket_size_ms, bucket_size_ms);
786                bucket.add_signal(
787                    Signal {
788                        id: format!("sig_{}", i),
789                        timestamp_ms: (i as u64) * bucket_size_ms,
790                        category: SignalCategory::Attack,
791                        signal_type: format!("attack_{}", i),
792                        entity_id: None,
793                        description: None,
794                        metadata: serde_json::json!({}),
795                    },
796                    100,
797                );
798                store_lock.buckets.push_back(bucket);
799            }
800            manager.evict_old_buckets(&mut store_lock);
801            assert!(store_lock.buckets.len() <= max_buckets);
802        }
803
804        let summary = manager.summary();
805        assert!(summary.total_signals <= max_buckets);
806    }
807
808    // ========================================================================
809    // SignalCategory Tests
810    // ========================================================================
811
812    #[test]
813    fn test_signal_category_equality() {
814        assert_eq!(SignalCategory::Attack, SignalCategory::Attack);
815        assert_ne!(SignalCategory::Attack, SignalCategory::Anomaly);
816    }
817
818    #[test]
819    fn test_signal_category_serialization() {
820        let category = SignalCategory::Intelligence;
821        let serialized = serde_json::to_string(&category).unwrap();
822        assert_eq!(serialized, "\"intelligence\"");
823
824        let deserialized: SignalCategory = serde_json::from_str(&serialized).unwrap();
825        assert_eq!(deserialized, SignalCategory::Intelligence);
826    }
827
828    // ========================================================================
829    // Edge Cases
830    // ========================================================================
831
832    #[test]
833    fn test_empty_signal_query() {
834        let manager = SignalManager::new(test_config());
835        let signals = manager.list_signals(SignalQueryOptions::default());
836        assert!(signals.is_empty());
837    }
838
839    #[test]
840    fn test_filter_nonexistent_category() {
841        let manager = SignalManager::new(test_config());
842
843        manager.record_event(
844            SignalCategory::Attack,
845            "test",
846            None,
847            None,
848            serde_json::json!({}),
849        );
850
851        let signals = manager.list_signals(SignalQueryOptions {
852            category: Some(SignalCategory::Anomaly),
853            ..Default::default()
854        });
855        assert!(signals.is_empty());
856    }
857
858    #[test]
859    fn test_signal_with_complex_metadata() {
860        let manager = SignalManager::new(test_config());
861
862        manager.record_event(
863            SignalCategory::Attack,
864            "complex_attack",
865            Some("attacker-ip".to_string()),
866            Some("Complex attack detected".to_string()),
867            serde_json::json!({
868                "rules": [1001, 1002, 1003],
869                "risk_score": 85,
870                "headers": {
871                    "user-agent": "malicious-bot",
872                    "x-forwarded-for": "1.2.3.4"
873                },
874                "nested": {
875                    "deep": {
876                        "value": true
877                    }
878                }
879            }),
880        );
881
882        let signals = manager.list_signals(SignalQueryOptions::default());
883        assert_eq!(signals.len(), 1);
884        assert_eq!(signals[0].metadata["rules"].as_array().unwrap().len(), 3);
885        assert_eq!(signals[0].metadata["risk_score"], 85);
886        assert_eq!(signals[0].metadata["nested"]["deep"]["value"], true);
887    }
888}