1use std::collections::{HashMap, VecDeque};
13
14use parking_lot::RwLock;
15use serde::{Deserialize, Serialize};
16use tracing::warn;
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum SignalCategory {
27 Attack,
28 Anomaly,
29 Behavior,
30 Intelligence,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct Signal {
36 pub id: String,
38 pub timestamp_ms: u64,
40 pub category: SignalCategory,
42 pub signal_type: String,
44 pub entity_id: Option<String>,
46 pub description: Option<String>,
48 pub metadata: serde_json::Value,
50}
51
52impl Signal {
53 pub fn new(
54 category: SignalCategory,
55 signal_type: impl Into<String>,
56 entity_id: Option<String>,
57 description: Option<String>,
58 metadata: serde_json::Value,
59 ) -> Self {
60 Self {
61 id: Uuid::new_v4().to_string(),
62 timestamp_ms: now_ms(),
63 category,
64 signal_type: signal_type.into(),
65 entity_id,
66 description,
67 metadata,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Default)]
74pub struct SignalQueryOptions {
75 pub category: Option<SignalCategory>,
76 pub limit: Option<usize>,
77 pub since_ms: Option<u64>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct SignalSummary {
83 pub total_signals: usize,
84 pub by_category: HashMap<SignalCategory, usize>,
85 pub top_signal_types: Vec<TopSignalType>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct TopSignalType {
91 pub signal_type: String,
92 pub count: usize,
93}
94
95#[derive(Debug, Clone)]
97pub struct SignalManagerConfig {
98 pub bucket_size_ms: u64,
100 pub retention_ms: u64,
102 pub max_signals_per_bucket: usize,
104 pub max_query_results: usize,
106}
107
108impl Default for SignalManagerConfig {
109 fn default() -> Self {
110 Self {
111 bucket_size_ms: 5 * 60 * 1000,
112 retention_ms: 24 * 60 * 60 * 1000,
113 max_signals_per_bucket: 1000,
114 max_query_results: 500,
115 }
116 }
117}
118
119#[derive(Debug, Clone)]
124struct SignalBucket {
125 timestamp_ms: u64,
126 end_timestamp_ms: u64,
127 signals: Vec<Signal>,
128 by_category: HashMap<SignalCategory, usize>,
129 by_type: HashMap<String, usize>,
130}
131
132impl SignalBucket {
133 fn new(timestamp_ms: u64, bucket_size_ms: u64) -> Self {
134 Self {
135 timestamp_ms,
136 end_timestamp_ms: timestamp_ms + bucket_size_ms,
137 signals: Vec::new(),
138 by_category: HashMap::new(),
139 by_type: HashMap::new(),
140 }
141 }
142
143 fn add_signal(&mut self, signal: Signal, max_signals: usize) {
144 *self.by_category.entry(signal.category).or_insert(0) += 1;
145 *self.by_type.entry(signal.signal_type.clone()).or_insert(0) += 1;
146
147 if self.signals.len() < max_signals {
148 self.signals.push(signal);
149 }
150 }
151}
152
153#[derive(Debug, Default)]
154struct SignalStore {
155 buckets: VecDeque<SignalBucket>,
156}
157
158pub struct SignalManager {
164 config: SignalManagerConfig,
165 store: RwLock<SignalStore>,
166}
167
168impl SignalManager {
169 pub fn new(config: SignalManagerConfig) -> Self {
170 Self {
171 config,
172 store: RwLock::new(SignalStore::default()),
173 }
174 }
175
176 pub fn record(&self, signal: Signal) {
178 let mut store = self.store.write();
179 let bucket_ts = bucket_timestamp(signal.timestamp_ms, self.config.bucket_size_ms);
180 let mut needs_bucket = true;
181 if let Some(last) = store.buckets.back() {
182 if last.timestamp_ms == bucket_ts {
183 needs_bucket = false;
184 } else if bucket_ts > last.timestamp_ms {
185 let mut ts = last.timestamp_ms + self.config.bucket_size_ms;
186 while ts <= bucket_ts {
187 store
188 .buckets
189 .push_back(SignalBucket::new(ts, self.config.bucket_size_ms));
190 ts += self.config.bucket_size_ms;
191 }
192 needs_bucket = false;
193 }
194 }
195
196 if needs_bucket {
197 store
198 .buckets
199 .push_back(SignalBucket::new(bucket_ts, self.config.bucket_size_ms));
200 }
201
202 let Some(bucket) = store.buckets.back_mut() else {
203 warn!("Signal bucket allocation failed; dropping signal");
204 return;
205 };
206
207 bucket.add_signal(signal, self.config.max_signals_per_bucket);
208 self.evict_old_buckets(&mut store);
209 }
210
211 pub fn record_event(
213 &self,
214 category: SignalCategory,
215 signal_type: impl Into<String>,
216 entity_id: Option<String>,
217 description: Option<String>,
218 metadata: serde_json::Value,
219 ) {
220 self.record(Signal::new(
221 category,
222 signal_type,
223 entity_id,
224 description,
225 metadata,
226 ));
227 }
228
229 pub fn list_signals(&self, options: SignalQueryOptions) -> Vec<Signal> {
231 let store = self.store.read();
232 let limit = options
233 .limit
234 .unwrap_or(self.config.max_query_results)
235 .min(self.config.max_query_results);
236
237 let mut results = Vec::with_capacity(limit);
238 for bucket in store.buckets.iter().rev() {
239 for signal in bucket.signals.iter().rev() {
240 if let Some(category) = options.category {
241 if signal.category != category {
242 continue;
243 }
244 }
245 if let Some(since_ms) = options.since_ms {
246 if signal.timestamp_ms < since_ms {
247 continue;
248 }
249 }
250 results.push(signal.clone());
251 if results.len() >= limit {
252 return results;
253 }
254 }
255 }
256 results
257 }
258
259 pub fn summary(&self) -> SignalSummary {
261 let store = self.store.read();
262 let mut by_category: HashMap<SignalCategory, usize> = HashMap::new();
263 let mut by_type: HashMap<String, usize> = HashMap::new();
264 let mut total = 0usize;
265
266 for bucket in store.buckets.iter() {
267 total += bucket.signals.len();
268 for (category, count) in &bucket.by_category {
269 *by_category.entry(*category).or_insert(0) += count;
270 }
271 for (signal_type, count) in &bucket.by_type {
272 *by_type.entry(signal_type.clone()).or_insert(0) += count;
273 }
274 }
275
276 let mut top_signal_types: Vec<TopSignalType> = by_type
277 .into_iter()
278 .map(|(signal_type, count)| TopSignalType { signal_type, count })
279 .collect();
280 top_signal_types.sort_by_key(|s| std::cmp::Reverse(s.count));
281 top_signal_types.truncate(10);
282
283 SignalSummary {
284 total_signals: total,
285 by_category,
286 top_signal_types,
287 }
288 }
289
290 fn evict_old_buckets(&self, store: &mut SignalStore) {
291 let max_buckets = (self.config.retention_ms / self.config.bucket_size_ms).max(1) as usize;
292 while store.buckets.len() > max_buckets {
293 store.buckets.pop_front();
294 }
295 }
296}
297
298#[inline]
299fn bucket_timestamp(timestamp_ms: u64, bucket_size_ms: u64) -> u64 {
300 timestamp_ms - (timestamp_ms % bucket_size_ms)
301}
302
303#[inline]
304fn now_ms() -> u64 {
305 use std::time::{SystemTime, UNIX_EPOCH};
306 SystemTime::now()
307 .duration_since(UNIX_EPOCH)
308 .map(|d| d.as_millis() as u64)
309 .unwrap_or(0)
310}
311
312#[cfg(test)]
317mod tests {
318 use super::*;
319
320 fn test_limit_usize(env_key: &str, default: usize, min: usize) -> usize {
321 std::env::var(env_key)
322 .ok()
323 .and_then(|value| value.parse::<usize>().ok())
324 .map(|value| value.max(min).min(default))
325 .unwrap_or(default)
326 }
327
328 fn test_limit_u64(env_key: &str, default: u64, min: u64) -> u64 {
329 std::env::var(env_key)
330 .ok()
331 .and_then(|value| value.parse::<u64>().ok())
332 .map(|value| value.max(min).min(default))
333 .unwrap_or(default)
334 }
335
336 fn test_config() -> SignalManagerConfig {
337 SignalManagerConfig {
338 bucket_size_ms: 1000, retention_ms: 10_000, max_signals_per_bucket: 100,
341 max_query_results: 50,
342 }
343 }
344
345 #[test]
350 fn test_signal_new_creates_unique_id() {
351 let s1 = Signal::new(
352 SignalCategory::Attack,
353 "sql_injection",
354 None,
355 None,
356 serde_json::json!({}),
357 );
358 let s2 = Signal::new(
359 SignalCategory::Attack,
360 "sql_injection",
361 None,
362 None,
363 serde_json::json!({}),
364 );
365
366 assert_ne!(s1.id, s2.id, "Each signal should have unique ID");
367 }
368
369 #[test]
370 fn test_signal_new_sets_timestamp() {
371 let before = now_ms();
372 let signal = Signal::new(
373 SignalCategory::Anomaly,
374 "rate_spike",
375 Some("192.168.1.1".to_string()),
376 Some("Unusual request rate".to_string()),
377 serde_json::json!({"rate": 1000}),
378 );
379 let after = now_ms();
380
381 assert!(signal.timestamp_ms >= before);
382 assert!(signal.timestamp_ms <= after);
383 }
384
385 #[test]
386 fn test_signal_fields_populated() {
387 let signal = Signal::new(
388 SignalCategory::Behavior,
389 "crawler_detected",
390 Some("10.0.0.1".to_string()),
391 Some("Bot behavior".to_string()),
392 serde_json::json!({"bot_name": "test_bot"}),
393 );
394
395 assert_eq!(signal.category, SignalCategory::Behavior);
396 assert_eq!(signal.signal_type, "crawler_detected");
397 assert_eq!(signal.entity_id, Some("10.0.0.1".to_string()));
398 assert_eq!(signal.description, Some("Bot behavior".to_string()));
399 assert_eq!(signal.metadata["bot_name"], "test_bot");
400 }
401
402 #[test]
407 fn test_record_signal() {
408 let manager = SignalManager::new(test_config());
409
410 manager.record_event(
411 SignalCategory::Attack,
412 "xss",
413 Some("1.2.3.4".to_string()),
414 Some("XSS attempt".to_string()),
415 serde_json::json!({}),
416 );
417
418 let signals = manager.list_signals(SignalQueryOptions::default());
419 assert_eq!(signals.len(), 1);
420 assert_eq!(signals[0].category, SignalCategory::Attack);
421 assert_eq!(signals[0].signal_type, "xss");
422 }
423
424 #[test]
425 fn test_record_multiple_signals() {
426 let manager = SignalManager::new(test_config());
427
428 for i in 0..5 {
429 manager.record_event(
430 SignalCategory::Attack,
431 format!("attack_{}", i),
432 None,
433 None,
434 serde_json::json!({"index": i}),
435 );
436 }
437
438 let signals = manager.list_signals(SignalQueryOptions::default());
439 assert_eq!(signals.len(), 5);
440 }
441
442 #[test]
443 fn test_record_different_categories() {
444 let manager = SignalManager::new(test_config());
445
446 manager.record_event(
447 SignalCategory::Attack,
448 "sqli",
449 None,
450 None,
451 serde_json::json!({}),
452 );
453 manager.record_event(
454 SignalCategory::Anomaly,
455 "rate_spike",
456 None,
457 None,
458 serde_json::json!({}),
459 );
460 manager.record_event(
461 SignalCategory::Behavior,
462 "crawler",
463 None,
464 None,
465 serde_json::json!({}),
466 );
467 manager.record_event(
468 SignalCategory::Intelligence,
469 "blocklist_hit",
470 None,
471 None,
472 serde_json::json!({}),
473 );
474
475 let summary = manager.summary();
476 assert_eq!(summary.total_signals, 4);
477 assert_eq!(summary.by_category.get(&SignalCategory::Attack), Some(&1));
478 assert_eq!(summary.by_category.get(&SignalCategory::Anomaly), Some(&1));
479 assert_eq!(summary.by_category.get(&SignalCategory::Behavior), Some(&1));
480 assert_eq!(
481 summary.by_category.get(&SignalCategory::Intelligence),
482 Some(&1)
483 );
484 }
485
486 #[test]
491 fn test_list_signals_filter_by_category() {
492 let manager = SignalManager::new(test_config());
493
494 manager.record_event(
495 SignalCategory::Attack,
496 "sqli",
497 None,
498 None,
499 serde_json::json!({}),
500 );
501 manager.record_event(
502 SignalCategory::Attack,
503 "xss",
504 None,
505 None,
506 serde_json::json!({}),
507 );
508 manager.record_event(
509 SignalCategory::Anomaly,
510 "rate_spike",
511 None,
512 None,
513 serde_json::json!({}),
514 );
515
516 let attacks = manager.list_signals(SignalQueryOptions {
517 category: Some(SignalCategory::Attack),
518 ..Default::default()
519 });
520 assert_eq!(attacks.len(), 2);
521
522 let anomalies = manager.list_signals(SignalQueryOptions {
523 category: Some(SignalCategory::Anomaly),
524 ..Default::default()
525 });
526 assert_eq!(anomalies.len(), 1);
527 }
528
529 #[test]
530 fn test_list_signals_limit() {
531 let manager = SignalManager::new(test_config());
532
533 for i in 0..20 {
534 manager.record_event(
535 SignalCategory::Attack,
536 format!("attack_{}", i),
537 None,
538 None,
539 serde_json::json!({}),
540 );
541 }
542
543 let limited = manager.list_signals(SignalQueryOptions {
544 limit: Some(5),
545 ..Default::default()
546 });
547 assert_eq!(limited.len(), 5);
548 }
549
550 #[test]
551 fn test_list_signals_respects_max_query_results() {
552 let config = SignalManagerConfig {
553 max_query_results: 10,
554 ..test_config()
555 };
556 let manager = SignalManager::new(config);
557
558 for i in 0..20 {
559 manager.record_event(
560 SignalCategory::Attack,
561 format!("attack_{}", i),
562 None,
563 None,
564 serde_json::json!({}),
565 );
566 }
567
568 let signals = manager.list_signals(SignalQueryOptions {
570 limit: Some(100),
571 ..Default::default()
572 });
573 assert_eq!(signals.len(), 10);
574 }
575
576 #[test]
577 fn test_list_signals_returns_most_recent_first() {
578 let manager = SignalManager::new(test_config());
579
580 manager.record_event(
581 SignalCategory::Attack,
582 "first",
583 None,
584 None,
585 serde_json::json!({}),
586 );
587 manager.record_event(
588 SignalCategory::Attack,
589 "second",
590 None,
591 None,
592 serde_json::json!({}),
593 );
594 manager.record_event(
595 SignalCategory::Attack,
596 "third",
597 None,
598 None,
599 serde_json::json!({}),
600 );
601
602 let signals = manager.list_signals(SignalQueryOptions::default());
603 assert_eq!(signals[0].signal_type, "third");
604 assert_eq!(signals[1].signal_type, "second");
605 assert_eq!(signals[2].signal_type, "first");
606 }
607
608 #[test]
613 fn test_summary_empty() {
614 let manager = SignalManager::new(test_config());
615 let summary = manager.summary();
616
617 assert_eq!(summary.total_signals, 0);
618 assert!(summary.by_category.is_empty());
619 assert!(summary.top_signal_types.is_empty());
620 }
621
622 #[test]
623 fn test_summary_counts_by_category() {
624 let manager = SignalManager::new(test_config());
625
626 for _ in 0..3 {
627 manager.record_event(
628 SignalCategory::Attack,
629 "sqli",
630 None,
631 None,
632 serde_json::json!({}),
633 );
634 }
635 for _ in 0..2 {
636 manager.record_event(
637 SignalCategory::Anomaly,
638 "rate",
639 None,
640 None,
641 serde_json::json!({}),
642 );
643 }
644
645 let summary = manager.summary();
646 assert_eq!(summary.total_signals, 5);
647 assert_eq!(summary.by_category.get(&SignalCategory::Attack), Some(&3));
648 assert_eq!(summary.by_category.get(&SignalCategory::Anomaly), Some(&2));
649 }
650
651 #[test]
652 fn test_summary_top_signal_types() {
653 let manager = SignalManager::new(test_config());
654
655 for _ in 0..5 {
656 manager.record_event(
657 SignalCategory::Attack,
658 "sqli",
659 None,
660 None,
661 serde_json::json!({}),
662 );
663 }
664 for _ in 0..3 {
665 manager.record_event(
666 SignalCategory::Attack,
667 "xss",
668 None,
669 None,
670 serde_json::json!({}),
671 );
672 }
673 for _ in 0..1 {
674 manager.record_event(
675 SignalCategory::Attack,
676 "rce",
677 None,
678 None,
679 serde_json::json!({}),
680 );
681 }
682
683 let summary = manager.summary();
684 assert_eq!(summary.top_signal_types.len(), 3);
685 assert_eq!(summary.top_signal_types[0].signal_type, "sqli");
686 assert_eq!(summary.top_signal_types[0].count, 5);
687 assert_eq!(summary.top_signal_types[1].signal_type, "xss");
688 assert_eq!(summary.top_signal_types[1].count, 3);
689 assert_eq!(summary.top_signal_types[2].signal_type, "rce");
690 assert_eq!(summary.top_signal_types[2].count, 1);
691 }
692
693 #[test]
694 fn test_summary_top_signal_types_limited_to_10() {
695 let manager = SignalManager::new(test_config());
696
697 for i in 0..15 {
698 manager.record_event(
699 SignalCategory::Attack,
700 format!("attack_type_{}", i),
701 None,
702 None,
703 serde_json::json!({}),
704 );
705 }
706
707 let summary = manager.summary();
708 assert_eq!(summary.top_signal_types.len(), 10);
709 }
710
711 #[test]
716 fn test_bucket_timestamp_calculation() {
717 assert_eq!(bucket_timestamp(1500, 1000), 1000);
721 assert_eq!(bucket_timestamp(2500, 1000), 2000);
722 assert_eq!(bucket_timestamp(3000, 1000), 3000);
723 }
724
725 #[test]
726 fn test_max_signals_per_bucket() {
727 let config = SignalManagerConfig {
728 max_signals_per_bucket: 3,
729 ..test_config()
730 };
731 let manager = SignalManager::new(config);
732
733 for i in 0..10 {
735 manager.record_event(
736 SignalCategory::Attack,
737 format!("attack_{}", i),
738 None,
739 None,
740 serde_json::json!({}),
741 );
742 }
743
744 let summary = manager.summary();
746 assert_eq!(summary.by_category.get(&SignalCategory::Attack), Some(&10));
747
748 let signals = manager.list_signals(SignalQueryOptions::default());
750 assert_eq!(signals.len(), 3);
751 }
752
753 #[test]
758 #[cfg_attr(not(feature = "heavy-tests"), ignore)]
759 fn test_bucket_eviction_respects_retention() {
760 let bucket_size_ms = 1000;
761 let retention_ms = test_limit_u64("SYNAPSE_TEST_RETENTION_MS", 3000, bucket_size_ms);
762 let bucket_count = test_limit_usize("SYNAPSE_TEST_BUCKET_COUNT", 5, 1);
763 let max_buckets = (retention_ms / bucket_size_ms).max(1) as usize;
764 eprintln!(
765 "test_bucket_eviction_respects_retention: bucket_count={}, retention_ms={}, max_buckets={}",
766 bucket_count, retention_ms, max_buckets
767 );
768
769 let config = SignalManagerConfig {
770 bucket_size_ms,
771 retention_ms,
772 max_signals_per_bucket: 100,
773 max_query_results: 500,
774 };
775 let manager = SignalManager::new(config);
776
777 let store = &manager.store;
780
781 {
782 let mut store_lock = store.write();
783 for i in 0..bucket_count {
785 let mut bucket = SignalBucket::new((i as u64) * bucket_size_ms, bucket_size_ms);
786 bucket.add_signal(
787 Signal {
788 id: format!("sig_{}", i),
789 timestamp_ms: (i as u64) * bucket_size_ms,
790 category: SignalCategory::Attack,
791 signal_type: format!("attack_{}", i),
792 entity_id: None,
793 description: None,
794 metadata: serde_json::json!({}),
795 },
796 100,
797 );
798 store_lock.buckets.push_back(bucket);
799 }
800 manager.evict_old_buckets(&mut store_lock);
801 assert!(store_lock.buckets.len() <= max_buckets);
802 }
803
804 let summary = manager.summary();
805 assert!(summary.total_signals <= max_buckets);
806 }
807
808 #[test]
813 fn test_signal_category_equality() {
814 assert_eq!(SignalCategory::Attack, SignalCategory::Attack);
815 assert_ne!(SignalCategory::Attack, SignalCategory::Anomaly);
816 }
817
818 #[test]
819 fn test_signal_category_serialization() {
820 let category = SignalCategory::Intelligence;
821 let serialized = serde_json::to_string(&category).unwrap();
822 assert_eq!(serialized, "\"intelligence\"");
823
824 let deserialized: SignalCategory = serde_json::from_str(&serialized).unwrap();
825 assert_eq!(deserialized, SignalCategory::Intelligence);
826 }
827
828 #[test]
833 fn test_empty_signal_query() {
834 let manager = SignalManager::new(test_config());
835 let signals = manager.list_signals(SignalQueryOptions::default());
836 assert!(signals.is_empty());
837 }
838
839 #[test]
840 fn test_filter_nonexistent_category() {
841 let manager = SignalManager::new(test_config());
842
843 manager.record_event(
844 SignalCategory::Attack,
845 "test",
846 None,
847 None,
848 serde_json::json!({}),
849 );
850
851 let signals = manager.list_signals(SignalQueryOptions {
852 category: Some(SignalCategory::Anomaly),
853 ..Default::default()
854 });
855 assert!(signals.is_empty());
856 }
857
858 #[test]
859 fn test_signal_with_complex_metadata() {
860 let manager = SignalManager::new(test_config());
861
862 manager.record_event(
863 SignalCategory::Attack,
864 "complex_attack",
865 Some("attacker-ip".to_string()),
866 Some("Complex attack detected".to_string()),
867 serde_json::json!({
868 "rules": [1001, 1002, 1003],
869 "risk_score": 85,
870 "headers": {
871 "user-agent": "malicious-bot",
872 "x-forwarded-for": "1.2.3.4"
873 },
874 "nested": {
875 "deep": {
876 "value": true
877 }
878 }
879 }),
880 );
881
882 let signals = manager.list_signals(SignalQueryOptions::default());
883 assert_eq!(signals.len(), 1);
884 assert_eq!(signals[0].metadata["rules"].as_array().unwrap().len(), 3);
885 assert_eq!(signals[0].metadata["risk_score"], 85);
886 assert_eq!(signals[0].metadata["nested"]["deep"]["value"], true);
887 }
888}