1use crate::entropy_gate::{
29 entropy_alert_severity, entropy_observation_millibits, entropy_threshold_millibits,
30 is_high_entropy_millibits, EntropyAlertLevel,
31};
32use serde::{Deserialize, Serialize};
33use std::collections::{HashMap, VecDeque};
34use std::sync::RwLock;
35
36const MAX_TRACKED_AGENTS: usize = 10_000;
42
43const MAX_EVENTS_PER_RESOURCE: usize = 1_000;
45
46const MAX_TRACKED_RESOURCES: usize = 50_000;
48
49const MAX_ENTROPY_SAMPLES_PER_AGENT: usize = 500;
51
52const MAX_AGENT_ID_LEN: usize = 512;
54
55const MAX_RESOURCE_KEY_LEN: usize = 2048;
57
58const MAX_TOOL_NAME_LEN: usize = 256;
60
61const MAX_ALERT_HISTORY: usize = 10_000;
63
64const MAX_PARAM_DATA_LEN: usize = 65_536;
66
67const MAX_DENIAL_EVENTS_PER_AGENT: usize = 500;
69
70const MAX_RECON_TRACKED_AGENTS: usize = 10_000;
72
73const MAX_DRIFT_TRACKED_AGENTS: usize = 10_000;
75
76const MAX_DRIFT_TOOL_ENTRIES: usize = 500;
78
79#[allow(dead_code)]
82const MAX_DRIFT_SNAPSHOTS: usize = 100;
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(deny_unknown_fields)]
91pub struct CollusionConfig {
92 #[serde(default = "default_enabled")]
95 pub enabled: bool,
96
97 #[serde(default = "default_coordination_window_secs")]
101 pub coordination_window_secs: u64,
102
103 #[serde(default = "default_entropy_threshold")]
107 pub entropy_threshold: f64,
108
109 #[serde(default = "default_min_entropy_observations")]
113 pub min_entropy_observations: u32,
114
115 #[serde(default = "default_min_coordinated_agents")]
119 pub min_coordinated_agents: u32,
120
121 #[serde(default = "default_sync_threshold")]
125 pub sync_threshold: f64,
126
127 #[serde(default = "default_recon_denial_threshold")]
132 pub recon_denial_threshold: u32,
133
134 #[serde(default = "default_recon_window_secs")]
137 pub recon_window_secs: u64,
138
139 #[serde(default = "default_drift_threshold")]
144 pub drift_threshold: f64,
145
146 #[serde(default = "default_drift_window_secs")]
149 pub drift_window_secs: u64,
150
151 #[serde(default = "default_drift_min_actions")]
155 pub drift_min_actions: u32,
156}
157
158fn default_enabled() -> bool {
159 true
160}
161fn default_coordination_window_secs() -> u64 {
162 60
163}
164fn default_entropy_threshold() -> f64 {
165 6.5
166}
167fn default_min_entropy_observations() -> u32 {
168 5
169}
170fn default_min_coordinated_agents() -> u32 {
171 3
172}
173fn default_sync_threshold() -> f64 {
174 0.7
175}
176fn default_recon_denial_threshold() -> u32 {
177 10
178}
179fn default_recon_window_secs() -> u64 {
180 60
181}
182fn default_drift_threshold() -> f64 {
183 0.20
184}
185fn default_drift_window_secs() -> u64 {
186 3600
187}
188fn default_drift_min_actions() -> u32 {
189 20
190}
191
192impl Default for CollusionConfig {
193 fn default() -> Self {
194 Self {
195 enabled: default_enabled(),
196 coordination_window_secs: default_coordination_window_secs(),
197 entropy_threshold: default_entropy_threshold(),
198 min_entropy_observations: default_min_entropy_observations(),
199 min_coordinated_agents: default_min_coordinated_agents(),
200 sync_threshold: default_sync_threshold(),
201 recon_denial_threshold: default_recon_denial_threshold(),
202 recon_window_secs: default_recon_window_secs(),
203 drift_threshold: default_drift_threshold(),
204 drift_window_secs: default_drift_window_secs(),
205 drift_min_actions: default_drift_min_actions(),
206 }
207 }
208}
209
210#[derive(Debug, Clone, PartialEq)]
216pub enum CollusionError {
217 InvalidConfig(String),
219 LockPoisoned(String),
221 InvalidInput(String),
223}
224
225impl std::fmt::Display for CollusionError {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 match self {
228 CollusionError::InvalidConfig(msg) => write!(f, "invalid collusion config: {msg}"),
229 CollusionError::LockPoisoned(msg) => {
230 write!(f, "collusion detector lock poisoned (fail-closed): {msg}")
231 }
232 CollusionError::InvalidInput(msg) => {
233 write!(f, "collusion detector input validation failed: {msg}")
234 }
235 }
236 }
237}
238
239impl std::error::Error for CollusionError {}
240
241impl CollusionConfig {
242 pub fn validate(&self) -> Result<(), CollusionError> {
244 const MAX_COORDINATION_WINDOW_SECS: u64 = 86_400; if self.coordination_window_secs == 0
247 || self.coordination_window_secs > MAX_COORDINATION_WINDOW_SECS
248 {
249 return Err(CollusionError::InvalidConfig(format!(
250 "coordination_window_secs must be in [1, {MAX_COORDINATION_WINDOW_SECS}]"
251 )));
252 }
253 if !self.entropy_threshold.is_finite()
257 || self.entropy_threshold < 0.0
258 || self.entropy_threshold > 8.0
259 {
260 return Err(CollusionError::InvalidConfig(format!(
261 "entropy_threshold must be in [0.0, 8.0], got {}",
262 self.entropy_threshold
263 )));
264 }
265 if !self.sync_threshold.is_finite()
266 || self.sync_threshold < 0.0
267 || self.sync_threshold > 1.0
268 {
269 return Err(CollusionError::InvalidConfig(format!(
270 "sync_threshold must be in [0.0, 1.0], got {}",
271 self.sync_threshold
272 )));
273 }
274 if self.min_coordinated_agents < 2 {
275 return Err(CollusionError::InvalidConfig(
276 "min_coordinated_agents must be >= 2".to_string(),
277 ));
278 }
279 if self.recon_denial_threshold == 0 {
281 return Err(CollusionError::InvalidConfig(
282 "recon_denial_threshold must be > 0".to_string(),
283 ));
284 }
285 const MAX_RECON_WINDOW_SECS: u64 = 3_600; if self.recon_window_secs == 0 || self.recon_window_secs > MAX_RECON_WINDOW_SECS {
287 return Err(CollusionError::InvalidConfig(format!(
288 "recon_window_secs must be in [1, {MAX_RECON_WINDOW_SECS}]"
289 )));
290 }
291 if !self.drift_threshold.is_finite()
293 || self.drift_threshold < 0.0
294 || self.drift_threshold > 1.0
295 {
296 return Err(CollusionError::InvalidConfig(format!(
297 "drift_threshold must be in [0.0, 1.0], got {}",
298 self.drift_threshold
299 )));
300 }
301 const MAX_DRIFT_WINDOW_SECS: u64 = 604_800; if self.drift_window_secs == 0 || self.drift_window_secs > MAX_DRIFT_WINDOW_SECS {
303 return Err(CollusionError::InvalidConfig(format!(
304 "drift_window_secs must be in [1, {MAX_DRIFT_WINDOW_SECS}]"
305 )));
306 }
307 if self.drift_min_actions == 0 {
308 return Err(CollusionError::InvalidConfig(
309 "drift_min_actions must be > 0".to_string(),
310 ));
311 }
312 if self.min_entropy_observations == 0 {
316 return Err(CollusionError::InvalidConfig(
317 "min_entropy_observations must be > 0".to_string(),
318 ));
319 }
320 Ok(())
321 }
322}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
330pub enum CollusionType {
331 SteganographicChannel,
333 CoordinatedAccess,
336 SynchronizedBehavior,
338 ReconnaissanceProbe,
342 ConstraintDrift,
346 CapacityExhaustion,
349}
350
351#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
353pub enum CollusionSeverity {
354 Low,
356 Medium,
358 High,
360 Critical,
362}
363
364#[derive(Debug, Clone, Serialize, Deserialize)]
366#[serde(deny_unknown_fields)]
367pub struct CollusionAlert {
368 pub collusion_type: CollusionType,
370 pub severity: CollusionSeverity,
372 pub agent_ids: Vec<String>,
374 pub target: String,
376 pub description: String,
378 pub detected_at: u64,
380 pub evidence: CollusionEvidence,
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
386#[serde(deny_unknown_fields)]
387pub struct CollusionEvidence {
388 #[serde(default, skip_serializing_if = "Option::is_none")]
390 pub entropy_values: Option<Vec<f64>>,
391 #[serde(default, skip_serializing_if = "Option::is_none")]
393 pub access_timestamps: Option<Vec<u64>>,
394 #[serde(default, skip_serializing_if = "Option::is_none")]
396 pub sync_score: Option<f64>,
397 pub observation_count: u32,
399}
400
401impl std::fmt::Display for CollusionAlert {
402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403 write!(
404 f,
405 "[{:?}/{:?}] agents={:?} target='{}': {}",
406 self.severity, self.collusion_type, self.agent_ids, self.target, self.description,
407 )
408 }
409}
410
411#[derive(Debug, Clone)]
417struct ResourceAccessEvent {
418 agent_id: String,
419 #[allow(dead_code)]
421 tool: String,
422 timestamp: u64,
423}
424
425#[derive(Debug, Clone)]
427struct EntropyProfile {
428 samples: VecDeque<EntropyObservation>,
430 high_entropy_count: u32,
432 total_samples: u32,
434}
435
436impl EntropyProfile {
437 fn new() -> Self {
438 Self {
439 samples: VecDeque::new(),
440 high_entropy_count: 0,
441 total_samples: 0,
442 }
443 }
444}
445
446#[derive(Debug, Clone, Copy)]
452struct EntropyObservation {
453 bits_per_byte: f64,
454 decision_millibits: u16,
459}
460
461impl EntropyObservation {
462 fn new(bits_per_byte: f64) -> Self {
463 Self {
464 bits_per_byte,
465 decision_millibits: entropy_observation_millibits(bits_per_byte),
466 }
467 }
468
469 fn is_high(self, threshold_millibits: u16) -> bool {
470 is_high_entropy_millibits(self.decision_millibits, threshold_millibits)
471 }
472}
473
474#[derive(Debug, Clone)]
476struct DenialEvent {
477 tool: String,
479 timestamp: u64,
481}
482
483#[derive(Debug, Clone)]
489struct DriftProfile {
490 actions: VecDeque<(u64, bool)>,
492}
493
494impl DriftProfile {
495 fn new() -> Self {
496 Self {
497 actions: VecDeque::new(),
498 }
499 }
500
501 fn record(&mut self, timestamp: u64, denied: bool) {
503 if self.actions.len() >= MAX_DRIFT_TOOL_ENTRIES {
504 self.actions.pop_front();
505 }
506 self.actions.push_back((timestamp, denied));
507 }
508
509 fn denial_rate_in_window(&self, window_start: u64, window_end: u64) -> Option<f64> {
511 let mut total = 0u64;
512 let mut denied = 0u64;
513 for &(ts, was_denied) in &self.actions {
514 if ts >= window_start && ts < window_end {
516 total = total.saturating_add(1);
517 if was_denied {
518 denied = denied.saturating_add(1);
519 }
520 }
521 }
522 if total == 0 {
523 return None;
524 }
525 Some(denied as f64 / total as f64)
526 }
527}
528
529#[derive(Debug, Clone)]
531struct TimingProfile {
532 timestamps: VecDeque<u64>,
534 max_entries: usize,
536}
537
538impl TimingProfile {
539 fn new(max_entries: usize) -> Self {
540 Self {
541 timestamps: VecDeque::new(),
542 max_entries,
543 }
544 }
545
546 fn record(&mut self, timestamp: u64) {
547 if self.timestamps.len() >= self.max_entries {
548 self.timestamps.pop_front();
549 }
550 self.timestamps.push_back(timestamp);
551 }
552}
553
554pub struct CollusionDetector {
562 config: CollusionConfig,
563 resource_events: RwLock<HashMap<String, VecDeque<ResourceAccessEvent>>>,
565 entropy_profiles: RwLock<HashMap<String, EntropyProfile>>,
567 timing_profiles: RwLock<HashMap<String, TimingProfile>>,
569 alerts: RwLock<VecDeque<CollusionAlert>>,
571 denial_events: RwLock<HashMap<String, VecDeque<DenialEvent>>>,
573 drift_profiles: RwLock<HashMap<String, DriftProfile>>,
575}
576
577impl std::fmt::Debug for CollusionDetector {
578 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
579 f.debug_struct("CollusionDetector")
580 .field("config", &self.config)
581 .field("resource_events", &"<locked>")
582 .field("entropy_profiles", &"<locked>")
583 .field("timing_profiles", &"<locked>")
584 .field("alerts", &"<locked>")
585 .field("denial_events", &"<locked>")
586 .field("drift_profiles", &"<locked>")
587 .finish()
588 }
589}
590
591impl CollusionDetector {
592 pub fn new(config: CollusionConfig) -> Result<Self, CollusionError> {
594 config.validate()?;
595 Ok(Self {
596 config,
597 resource_events: RwLock::new(HashMap::new()),
598 entropy_profiles: RwLock::new(HashMap::new()),
599 timing_profiles: RwLock::new(HashMap::new()),
600 alerts: RwLock::new(VecDeque::new()),
601 denial_events: RwLock::new(HashMap::new()),
602 drift_profiles: RwLock::new(HashMap::new()),
603 })
604 }
605
606 pub fn is_enabled(&self) -> bool {
608 self.config.enabled
609 }
610
611 pub fn config(&self) -> &CollusionConfig {
613 &self.config
614 }
615
616 fn validate_agent_id(agent_id: &str) -> Result<(), CollusionError> {
622 if agent_id.is_empty() || agent_id.len() > MAX_AGENT_ID_LEN {
623 return Err(CollusionError::InvalidInput(format!(
624 "agent_id length {} out of range [1, {}]",
625 agent_id.len(),
626 MAX_AGENT_ID_LEN
627 )));
628 }
629 if vellaveto_types::has_dangerous_chars(agent_id) {
630 return Err(CollusionError::InvalidInput(
631 "agent_id contains control or Unicode format characters".to_string(),
632 ));
633 }
634 Ok(())
635 }
636
637 fn validate_resource_key(resource: &str) -> Result<(), CollusionError> {
639 if resource.is_empty() || resource.len() > MAX_RESOURCE_KEY_LEN {
640 return Err(CollusionError::InvalidInput(format!(
641 "resource key length {} out of range [1, {}]",
642 resource.len(),
643 MAX_RESOURCE_KEY_LEN
644 )));
645 }
646 if vellaveto_types::has_dangerous_chars(resource) {
647 return Err(CollusionError::InvalidInput(
648 "resource key contains control or Unicode format characters".to_string(),
649 ));
650 }
651 Ok(())
652 }
653
654 fn validate_tool_name(tool: &str) -> Result<(), CollusionError> {
656 if tool.is_empty() || tool.len() > MAX_TOOL_NAME_LEN {
657 return Err(CollusionError::InvalidInput(format!(
658 "tool name length {} out of range [1, {}]",
659 tool.len(),
660 MAX_TOOL_NAME_LEN
661 )));
662 }
663 if vellaveto_types::has_dangerous_chars(tool) {
664 return Err(CollusionError::InvalidInput(
665 "tool name contains control or Unicode format characters".to_string(),
666 ));
667 }
668 Ok(())
669 }
670
671 pub fn compute_entropy(data: &[u8]) -> f64 {
680 if data.is_empty() {
681 return 0.0;
682 }
683
684 let mut freq = [0u64; 256];
685 for &byte in data {
686 freq[byte as usize] = freq[byte as usize].saturating_add(1);
687 }
688
689 let len = data.len() as f64;
690 let mut entropy = 0.0_f64;
691
692 for &count in &freq {
693 if count == 0 {
694 continue;
695 }
696 let p = count as f64 / len;
697 entropy -= p * p.log2();
698 }
699
700 if !entropy.is_finite() {
702 return 0.0;
703 }
704
705 entropy
706 }
707
708 fn capacity_alert(tracker_name: &str, max: usize) -> CollusionAlert {
710 CollusionAlert {
711 collusion_type: CollusionType::CapacityExhaustion,
712 severity: CollusionSeverity::High,
713 agent_ids: Vec::new(),
714 target: tracker_name.to_string(),
715 description: format!(
716 "{tracker_name} tracker at capacity ({max}) — possible evasion attack"
717 ),
718 detected_at: std::time::SystemTime::now()
721 .duration_since(std::time::UNIX_EPOCH)
722 .map(|d| d.as_secs())
723 .unwrap_or(1),
724 evidence: CollusionEvidence {
725 entropy_values: None,
726 access_timestamps: None,
727 sync_score: None,
728 observation_count: u32::try_from(max).unwrap_or(u32::MAX),
729 },
730 }
731 }
732
733 pub fn analyze_parameters(
738 &self,
739 agent_id: &str,
740 param_data: &[u8],
741 ) -> Result<Option<CollusionAlert>, CollusionError> {
742 if !self.config.enabled {
743 return Ok(None);
744 }
745 Self::validate_agent_id(agent_id)?;
746
747 let data = if param_data.len() > MAX_PARAM_DATA_LEN {
749 ¶m_data[..MAX_PARAM_DATA_LEN]
750 } else {
751 param_data
752 };
753
754 if data.len() < 16 {
756 return Ok(None);
757 }
758
759 let observation = EntropyObservation::new(Self::compute_entropy(data));
760 let threshold_millibits = entropy_threshold_millibits(self.config.entropy_threshold);
761 let is_high = observation.is_high(threshold_millibits);
762
763 let mut profiles = self
764 .entropy_profiles
765 .write()
766 .map_err(|_| CollusionError::LockPoisoned("entropy_profiles write lock".to_string()))?;
767
768 if !profiles.contains_key(agent_id) && profiles.len() >= MAX_TRACKED_AGENTS {
771 tracing::warn!(
772 max = MAX_TRACKED_AGENTS,
773 "Collusion entropy profiles at capacity — returning alert"
774 );
775 return Ok(Some(Self::capacity_alert(
776 "entropy_profiles",
777 MAX_TRACKED_AGENTS,
778 )));
779 }
780
781 let profile = profiles
782 .entry(agent_id.to_string())
783 .or_insert_with(EntropyProfile::new);
784
785 if profile.samples.len() >= MAX_ENTROPY_SAMPLES_PER_AGENT {
787 if let Some(old) = profile.samples.pop_front() {
789 if old.is_high(threshold_millibits) {
790 profile.high_entropy_count = profile.high_entropy_count.saturating_sub(1);
791 }
792 }
793 }
794 profile.samples.push_back(observation);
795 profile.total_samples = profile.total_samples.saturating_add(1);
796 if is_high {
797 profile.high_entropy_count = profile.high_entropy_count.saturating_add(1);
798 }
799
800 if let Some(alert_level) = entropy_alert_severity(
802 profile.high_entropy_count,
803 self.config.min_entropy_observations,
804 ) {
805 let entropy_values: Vec<f64> = profile
806 .samples
807 .iter()
808 .copied()
809 .filter(|sample| sample.is_high(threshold_millibits))
810 .map(|sample| sample.bits_per_byte)
811 .take(10) .collect();
813
814 let now = Self::now_secs();
815 let severity = match alert_level {
816 EntropyAlertLevel::High => CollusionSeverity::High,
817 EntropyAlertLevel::Medium => CollusionSeverity::Medium,
818 };
819
820 let alert = CollusionAlert {
821 collusion_type: CollusionType::SteganographicChannel,
822 severity,
823 agent_ids: vec![agent_id.to_string()],
824 target: format!("agent:{agent_id}"),
825 description: format!(
826 "Agent '{}' has {} high-entropy parameter observations (threshold: {:.1} bits/byte)",
827 agent_id,
828 profile.high_entropy_count,
829 self.config.entropy_threshold,
830 ),
831 detected_at: now,
832 evidence: CollusionEvidence {
833 entropy_values: Some(entropy_values),
834 access_timestamps: None,
835 sync_score: None,
836 observation_count: profile.high_entropy_count,
837 },
838 };
839
840 metrics::counter!(
841 "vellaveto_collusion_alerts_total",
842 "type" => "steganographic_channel"
843 )
844 .increment(1);
845
846 tracing::warn!(
847 agent_id = %agent_id,
848 high_entropy_count = %profile.high_entropy_count,
849 latest_entropy = %observation.bits_per_byte,
850 "Potential steganographic channel detected in agent parameters"
851 );
852
853 self.record_alert(alert.clone())?;
854 return Ok(Some(alert));
855 }
856
857 Ok(None)
858 }
859
860 pub fn record_resource_access(
869 &self,
870 agent_id: &str,
871 resource: &str,
872 tool: &str,
873 timestamp: u64,
874 ) -> Result<Option<CollusionAlert>, CollusionError> {
875 if !self.config.enabled {
876 return Ok(None);
877 }
878 Self::validate_agent_id(agent_id)?;
879 Self::validate_resource_key(resource)?;
880 Self::validate_tool_name(tool)?;
881
882 let agent_id = crate::normalize::normalize_full(agent_id);
887 let resource = crate::normalize::normalize_full(resource);
888 let tool = crate::normalize::normalize_full(tool);
889
890 let mut events = self
891 .resource_events
892 .write()
893 .map_err(|_| CollusionError::LockPoisoned("resource_events write lock".to_string()))?;
894
895 if !events.contains_key(resource.as_str()) && events.len() >= MAX_TRACKED_RESOURCES {
897 tracing::warn!(
898 max = MAX_TRACKED_RESOURCES,
899 "Collusion resource tracking at capacity — returning alert"
900 );
901 return Ok(Some(Self::capacity_alert(
902 "resource_tracking",
903 MAX_TRACKED_RESOURCES,
904 )));
905 }
906
907 let event_queue = events
908 .entry(resource.to_string())
909 .or_insert_with(VecDeque::new);
910
911 let cutoff = timestamp.saturating_sub(self.config.coordination_window_secs);
913 while let Some(front) = event_queue.front() {
914 if front.timestamp < cutoff {
915 event_queue.pop_front();
916 } else {
917 break;
918 }
919 }
920
921 if event_queue.len() >= MAX_EVENTS_PER_RESOURCE {
923 event_queue.pop_front();
924 }
925
926 event_queue.push_back(ResourceAccessEvent {
927 agent_id: agent_id.to_string(),
928 tool: tool.to_string(),
929 timestamp,
930 });
931
932 let mut distinct_agents: Vec<&str> = Vec::new();
934 let mut timestamps_in_window: Vec<u64> = Vec::new();
935 for event in event_queue.iter() {
936 if event.timestamp >= cutoff {
937 if !distinct_agents.contains(&event.agent_id.as_str()) {
938 distinct_agents.push(&event.agent_id);
939 }
940 timestamps_in_window.push(event.timestamp);
941 }
942 }
943
944 if distinct_agents.len() >= self.config.min_coordinated_agents as usize {
945 let now = Self::now_secs();
946
947 let severity = if distinct_agents.len()
948 >= (self.config.min_coordinated_agents as usize).saturating_mul(2)
949 {
950 CollusionSeverity::High
951 } else {
952 CollusionSeverity::Medium
953 };
954
955 let alert = CollusionAlert {
956 collusion_type: CollusionType::CoordinatedAccess,
957 severity,
958 agent_ids: distinct_agents.iter().map(|s| s.to_string()).collect(),
959 target: resource.to_string(),
960 description: format!(
961 "{} agents accessed resource '{}' within {}s window",
962 distinct_agents.len(),
963 resource,
964 self.config.coordination_window_secs,
965 ),
966 detected_at: now,
967 evidence: CollusionEvidence {
968 entropy_values: None,
969 access_timestamps: Some(timestamps_in_window),
970 sync_score: None,
971 observation_count: u32::try_from(distinct_agents.len()).unwrap_or(u32::MAX),
973 },
974 };
975
976 metrics::counter!(
977 "vellaveto_collusion_alerts_total",
978 "type" => "coordinated_access"
979 )
980 .increment(1);
981
982 tracing::warn!(
983 resource = %resource,
984 agent_count = %distinct_agents.len(),
985 window_secs = %self.config.coordination_window_secs,
986 "Coordinated resource access detected"
987 );
988
989 self.record_alert(alert.clone())?;
990 return Ok(Some(alert));
991 }
992
993 Ok(None)
994 }
995
996 pub fn record_tool_timing(
1005 &self,
1006 agent_id: &str,
1007 timestamp: u64,
1008 ) -> Result<Option<CollusionAlert>, CollusionError> {
1009 if !self.config.enabled {
1010 return Ok(None);
1011 }
1012 Self::validate_agent_id(agent_id)?;
1013 let agent_id = crate::normalize::normalize_full(agent_id);
1015
1016 let mut profiles = self
1017 .timing_profiles
1018 .write()
1019 .map_err(|_| CollusionError::LockPoisoned("timing_profiles write lock".to_string()))?;
1020
1021 if !profiles.contains_key(agent_id.as_str()) && profiles.len() >= MAX_TRACKED_AGENTS {
1023 tracing::warn!(
1024 max = MAX_TRACKED_AGENTS,
1025 "Collusion timing profiles at capacity — returning alert"
1026 );
1027 return Ok(Some(Self::capacity_alert(
1028 "timing_profiles",
1029 MAX_TRACKED_AGENTS,
1030 )));
1031 }
1032
1033 let profile = profiles
1034 .entry(agent_id.to_string())
1035 .or_insert_with(|| TimingProfile::new(MAX_ENTROPY_SAMPLES_PER_AGENT));
1036
1037 profile.record(timestamp);
1038
1039 if profile.timestamps.len() < 10 {
1041 return Ok(None);
1042 }
1043
1044 let agent_timestamps = profile.timestamps.clone();
1047
1048 let mut correlated_agents = Vec::new();
1050 let mut max_sync_score = 0.0_f64;
1051
1052 for (other_id, other_profile) in profiles.iter() {
1053 if *other_id == agent_id || other_profile.timestamps.len() < 10 {
1054 continue;
1055 }
1056 let score = Self::compute_sync_score(&agent_timestamps, &other_profile.timestamps);
1057 if score >= self.config.sync_threshold {
1058 correlated_agents.push(other_id.clone());
1059 if score > max_sync_score {
1060 max_sync_score = score;
1061 }
1062 }
1063 }
1064
1065 if !correlated_agents.is_empty() {
1066 let mut all_agents = vec![agent_id.to_string()];
1067 all_agents.extend(correlated_agents);
1068
1069 let now = Self::now_secs();
1070
1071 let severity = if max_sync_score >= 0.9 {
1072 CollusionSeverity::High
1073 } else {
1074 CollusionSeverity::Medium
1075 };
1076
1077 let alert = CollusionAlert {
1078 collusion_type: CollusionType::SynchronizedBehavior,
1079 severity,
1080 agent_ids: all_agents.clone(),
1081 target: "tool_call_timing".to_string(),
1082 description: format!(
1083 "{} agents show synchronized tool call timing (max sync score: {:.2})",
1084 all_agents.len(),
1085 max_sync_score,
1086 ),
1087 detected_at: now,
1088 evidence: CollusionEvidence {
1089 entropy_values: None,
1090 access_timestamps: None,
1091 sync_score: Some(max_sync_score),
1092 observation_count: u32::try_from(all_agents.len()).unwrap_or(u32::MAX),
1093 },
1094 };
1095
1096 metrics::counter!(
1097 "vellaveto_collusion_alerts_total",
1098 "type" => "synchronized_behavior"
1099 )
1100 .increment(1);
1101
1102 tracing::warn!(
1103 agent_ids = ?all_agents,
1104 sync_score = %max_sync_score,
1105 "Synchronized agent behavior detected"
1106 );
1107
1108 self.record_alert(alert.clone())?;
1109 return Ok(Some(alert));
1110 }
1111
1112 Ok(None)
1113 }
1114
1115 fn compute_sync_score(a_timestamps: &VecDeque<u64>, b_timestamps: &VecDeque<u64>) -> f64 {
1122 if a_timestamps.len() < 2 || b_timestamps.len() < 2 {
1123 return 0.0;
1124 }
1125
1126 let a_intervals: Vec<f64> = a_timestamps
1128 .iter()
1129 .zip(a_timestamps.iter().skip(1))
1130 .map(|(&t1, &t2)| t2.saturating_sub(t1) as f64)
1131 .collect();
1132
1133 let b_intervals: Vec<f64> = b_timestamps
1134 .iter()
1135 .zip(b_timestamps.iter().skip(1))
1136 .map(|(&t1, &t2)| t2.saturating_sub(t1) as f64)
1137 .collect();
1138
1139 if a_intervals.is_empty() || b_intervals.is_empty() {
1140 return 0.0;
1141 }
1142
1143 let min_len = a_intervals.len().min(b_intervals.len());
1146 if min_len < 2 {
1147 return 0.0;
1148 }
1149
1150 let a_slice = &a_intervals[..min_len];
1151 let b_slice = &b_intervals[..min_len];
1152
1153 let a_mean: f64 = a_slice.iter().sum::<f64>() / min_len as f64;
1154 let b_mean: f64 = b_slice.iter().sum::<f64>() / min_len as f64;
1155
1156 let mut cov = 0.0_f64;
1157 let mut a_var = 0.0_f64;
1158 let mut b_var = 0.0_f64;
1159
1160 for i in 0..min_len {
1161 let a_diff = a_slice[i] - a_mean;
1162 let b_diff = b_slice[i] - b_mean;
1163 cov += a_diff * b_diff;
1164 a_var += a_diff * a_diff;
1165 b_var += b_diff * b_diff;
1166 }
1167
1168 let denom = (a_var * b_var).sqrt();
1169 if denom < f64::EPSILON {
1170 if a_var < f64::EPSILON && b_var < f64::EPSILON {
1173 let a_rep = a_slice.first().copied().unwrap_or(0.0);
1175 let b_rep = b_slice.first().copied().unwrap_or(0.0);
1176 let max_rep = a_rep.max(b_rep);
1177 if max_rep < f64::EPSILON {
1178 return 1.0; }
1180 let diff = (a_rep - b_rep).abs() / max_rep;
1181 return (1.0 - diff).max(0.0);
1182 }
1183 return 0.0;
1184 }
1185
1186 let correlation = cov / denom;
1187 if !correlation.is_finite() {
1190 return 0.0;
1191 }
1192 correlation.clamp(0.0, 1.0)
1193 }
1194
1195 pub fn record_denial(
1210 &self,
1211 agent_id: &str,
1212 tool: &str,
1213 timestamp: u64,
1214 ) -> Result<Option<CollusionAlert>, CollusionError> {
1215 if !self.config.enabled {
1216 return Ok(None);
1217 }
1218 Self::validate_agent_id(agent_id)?;
1219 Self::validate_tool_name(tool)?;
1220 let agent_id = crate::normalize::normalize_full(agent_id);
1222 let tool = crate::normalize::normalize_full(tool);
1223
1224 let mut denials = self
1225 .denial_events
1226 .write()
1227 .map_err(|_| CollusionError::LockPoisoned("denial_events write lock".to_string()))?;
1228
1229 if !denials.contains_key(agent_id.as_str()) && denials.len() >= MAX_RECON_TRACKED_AGENTS {
1231 tracing::warn!(
1232 max = MAX_RECON_TRACKED_AGENTS,
1233 "Reconnaissance denial tracking at capacity — returning alert"
1234 );
1235 return Ok(Some(Self::capacity_alert(
1236 "recon_tracking",
1237 MAX_RECON_TRACKED_AGENTS,
1238 )));
1239 }
1240
1241 let events = denials
1242 .entry(agent_id.to_string())
1243 .or_insert_with(VecDeque::new);
1244
1245 let cutoff = timestamp.saturating_sub(self.config.recon_window_secs);
1247 while let Some(front) = events.front() {
1248 if front.timestamp < cutoff {
1249 events.pop_front();
1250 } else {
1251 break;
1252 }
1253 }
1254
1255 if events.len() >= MAX_DENIAL_EVENTS_PER_AGENT {
1257 events.pop_front();
1258 }
1259 events.push_back(DenialEvent {
1260 tool: tool.to_string(),
1261 timestamp,
1262 });
1263
1264 let mut distinct_tools: Vec<&str> = Vec::new();
1266 let mut denial_timestamps: Vec<u64> = Vec::new();
1267 for event in events.iter() {
1268 if event.timestamp >= cutoff {
1269 if !distinct_tools.contains(&event.tool.as_str()) {
1270 distinct_tools.push(&event.tool);
1271 }
1272 denial_timestamps.push(event.timestamp);
1273 }
1274 }
1275
1276 let distinct_count = u32::try_from(distinct_tools.len()).unwrap_or(u32::MAX);
1278 if distinct_count >= self.config.recon_denial_threshold {
1279 let now = Self::now_secs();
1280
1281 let severity = if distinct_count >= self.config.recon_denial_threshold.saturating_mul(2)
1282 {
1283 CollusionSeverity::Critical
1284 } else {
1285 CollusionSeverity::High
1286 };
1287
1288 let alert = CollusionAlert {
1289 collusion_type: CollusionType::ReconnaissanceProbe,
1290 severity,
1291 agent_ids: vec![agent_id.to_string()],
1292 target: format!("agent:{agent_id}"),
1293 description: format!(
1294 "Agent '{}' triggered {} distinct policy denials within {}s window \
1295 (threshold: {}), indicating reconnaissance probing",
1296 agent_id,
1297 distinct_count,
1298 self.config.recon_window_secs,
1299 self.config.recon_denial_threshold,
1300 ),
1301 detected_at: now,
1302 evidence: CollusionEvidence {
1303 entropy_values: None,
1304 access_timestamps: Some(denial_timestamps),
1305 sync_score: None,
1306 observation_count: distinct_count,
1307 },
1308 };
1309
1310 metrics::counter!(
1311 "vellaveto_collusion_alerts_total",
1312 "type" => "reconnaissance_probe"
1313 )
1314 .increment(1);
1315
1316 tracing::warn!(
1317 agent_id = %agent_id,
1318 distinct_denials = %distinct_count,
1319 window_secs = %self.config.recon_window_secs,
1320 "Reconnaissance probe detected: agent systematically probing permission boundaries"
1321 );
1322
1323 self.record_alert(alert.clone())?;
1324 return Ok(Some(alert));
1325 }
1326
1327 Ok(None)
1328 }
1329
1330 pub fn record_agent_action(
1343 &self,
1344 agent_id: &str,
1345 denied: bool,
1346 timestamp: u64,
1347 ) -> Result<Option<CollusionAlert>, CollusionError> {
1348 if !self.config.enabled {
1349 return Ok(None);
1350 }
1351 Self::validate_agent_id(agent_id)?;
1352 let agent_id = crate::normalize::normalize_full(agent_id);
1354
1355 let mut profiles = self
1356 .drift_profiles
1357 .write()
1358 .map_err(|_| CollusionError::LockPoisoned("drift_profiles write lock".to_string()))?;
1359
1360 if !profiles.contains_key(agent_id.as_str()) && profiles.len() >= MAX_DRIFT_TRACKED_AGENTS {
1362 tracing::warn!(
1363 max = MAX_DRIFT_TRACKED_AGENTS,
1364 "Drift tracking at capacity — returning alert"
1365 );
1366 return Ok(Some(Self::capacity_alert(
1367 "drift_tracking",
1368 MAX_DRIFT_TRACKED_AGENTS,
1369 )));
1370 }
1371
1372 let profile = profiles
1373 .entry(agent_id.to_string())
1374 .or_insert_with(DriftProfile::new);
1375
1376 profile.record(timestamp, denied);
1377
1378 if profile.actions.len() < self.config.drift_min_actions as usize {
1380 return Ok(None);
1381 }
1382
1383 let window_start = timestamp.saturating_sub(self.config.drift_window_secs);
1385 let midpoint = window_start.saturating_add(self.config.drift_window_secs / 2);
1386
1387 let baseline_rate = profile.denial_rate_in_window(window_start, midpoint);
1388 let current_rate = profile.denial_rate_in_window(midpoint, timestamp);
1389
1390 if let (Some(baseline), Some(current)) = (baseline_rate, current_rate) {
1391 let drift = current - baseline;
1392 if drift >= self.config.drift_threshold {
1393 let now = Self::now_secs();
1394
1395 let severity = if drift >= self.config.drift_threshold * 2.0 {
1396 CollusionSeverity::High
1397 } else {
1398 CollusionSeverity::Medium
1399 };
1400
1401 let alert = CollusionAlert {
1402 collusion_type: CollusionType::ConstraintDrift,
1403 severity,
1404 agent_ids: vec![agent_id.to_string()],
1405 target: format!("agent:{agent_id}"),
1406 description: format!(
1407 "Agent '{}' denial rate shifted from {:.1}% to {:.1}% \
1408 (drift: {:.1}%, threshold: {:.1}%) within {}s window",
1409 agent_id,
1410 baseline * 100.0,
1411 current * 100.0,
1412 drift * 100.0,
1413 self.config.drift_threshold * 100.0,
1414 self.config.drift_window_secs,
1415 ),
1416 detected_at: now,
1417 evidence: CollusionEvidence {
1418 entropy_values: None,
1419 access_timestamps: None,
1420 sync_score: Some(drift),
1421 observation_count: u32::try_from(profile.actions.len()).unwrap_or(u32::MAX),
1422 },
1423 };
1424
1425 metrics::counter!(
1426 "vellaveto_collusion_alerts_total",
1427 "type" => "constraint_drift"
1428 )
1429 .increment(1);
1430
1431 tracing::warn!(
1432 agent_id = %agent_id,
1433 baseline_rate = %format!("{:.2}", baseline),
1434 current_rate = %format!("{:.2}", current),
1435 drift = %format!("{:.2}", drift),
1436 "Gradual constraint drift detected: agent behavior shifting"
1437 );
1438
1439 self.record_alert(alert.clone())?;
1440 return Ok(Some(alert));
1441 }
1442 }
1443
1444 Ok(None)
1445 }
1446
1447 fn record_alert(&self, alert: CollusionAlert) -> Result<(), CollusionError> {
1453 let mut alerts = self
1454 .alerts
1455 .write()
1456 .map_err(|_| CollusionError::LockPoisoned("alerts write lock".to_string()))?;
1457
1458 if alerts.len() >= MAX_ALERT_HISTORY {
1459 alerts.pop_front();
1460 }
1461 alerts.push_back(alert);
1462 Ok(())
1463 }
1464
1465 pub fn recent_alerts(&self, limit: usize) -> Result<Vec<CollusionAlert>, CollusionError> {
1467 let alerts = self
1468 .alerts
1469 .read()
1470 .map_err(|_| CollusionError::LockPoisoned("alerts read lock".to_string()))?;
1471
1472 Ok(alerts.iter().rev().take(limit).cloned().collect())
1473 }
1474
1475 pub fn alert_count(&self) -> Result<usize, CollusionError> {
1477 let alerts = self
1478 .alerts
1479 .read()
1480 .map_err(|_| CollusionError::LockPoisoned("alerts read lock".to_string()))?;
1481 Ok(alerts.len())
1482 }
1483
1484 fn now_secs() -> u64 {
1494 std::time::SystemTime::now()
1495 .duration_since(std::time::UNIX_EPOCH)
1496 .map(|d| d.as_secs())
1497 .unwrap_or_else(|_| {
1498 tracing::error!(
1499 "SECURITY: System clock before UNIX epoch — using fallback timestamp 1"
1500 );
1501 1
1502 })
1503 }
1504}
1505
1506#[cfg(test)]
1511mod tests {
1512 use super::*;
1513
1514 fn default_config() -> CollusionConfig {
1515 CollusionConfig::default()
1516 }
1517
1518 fn make_detector() -> CollusionDetector {
1519 CollusionDetector::new(default_config()).unwrap()
1520 }
1521
1522 #[test]
1527 fn test_config_validate_default_ok() {
1528 assert!(CollusionConfig::default().validate().is_ok());
1529 }
1530
1531 #[test]
1532 fn test_config_validate_zero_window_rejected() {
1533 let mut cfg = default_config();
1534 cfg.coordination_window_secs = 0;
1535 assert!(cfg.validate().is_err());
1536 }
1537
1538 #[test]
1539 fn test_config_validate_nan_entropy_rejected() {
1540 let mut cfg = default_config();
1541 cfg.entropy_threshold = f64::NAN;
1542 assert!(cfg.validate().is_err());
1543 }
1544
1545 #[test]
1546 fn test_config_validate_negative_entropy_rejected() {
1547 let mut cfg = default_config();
1548 cfg.entropy_threshold = -1.0;
1549 assert!(cfg.validate().is_err());
1550 }
1551
1552 #[test]
1553 fn test_config_validate_sync_threshold_out_of_range() {
1554 let mut cfg = default_config();
1555 cfg.sync_threshold = 1.5;
1556 assert!(cfg.validate().is_err());
1557 }
1558
1559 #[test]
1560 fn test_config_validate_min_agents_below_two() {
1561 let mut cfg = default_config();
1562 cfg.min_coordinated_agents = 1;
1563 assert!(cfg.validate().is_err());
1564 }
1565
1566 #[test]
1568 fn test_config_validate_zero_min_entropy_observations_rejected() {
1569 let mut cfg = default_config();
1570 cfg.min_entropy_observations = 0;
1571 let err = cfg.validate().unwrap_err();
1572 assert!(
1573 err.to_string()
1574 .contains("min_entropy_observations must be > 0"),
1575 "Expected min_entropy_observations error, got: {err}",
1576 );
1577 }
1578
1579 #[test]
1581 fn test_config_validate_nonzero_min_entropy_observations_ok() {
1582 let mut cfg = default_config();
1583 cfg.min_entropy_observations = 1;
1584 assert!(cfg.validate().is_ok());
1585 }
1586
1587 #[test]
1592 fn test_compute_entropy_empty_data_returns_zero() {
1593 assert_eq!(CollusionDetector::compute_entropy(&[]), 0.0);
1594 }
1595
1596 #[test]
1597 fn test_compute_entropy_uniform_data_returns_zero() {
1598 let data = vec![0x41u8; 100]; let entropy = CollusionDetector::compute_entropy(&data);
1600 assert!(
1601 entropy < 0.01,
1602 "Uniform data should have ~0 entropy, got {entropy}"
1603 );
1604 }
1605
1606 #[test]
1607 fn test_compute_entropy_random_data_returns_high() {
1608 let mut data = Vec::with_capacity(2560);
1610 for _ in 0..10 {
1611 for b in 0..=255u8 {
1612 data.push(b);
1613 }
1614 }
1615 let entropy = CollusionDetector::compute_entropy(&data);
1616 assert!(
1617 entropy > 7.9,
1618 "Uniformly distributed data should have ~8.0 bits/byte entropy, got {entropy}"
1619 );
1620 }
1621
1622 #[test]
1623 fn test_compute_entropy_english_text_moderate() {
1624 let data = b"The quick brown fox jumps over the lazy dog. This is a normal sentence.";
1625 let entropy = CollusionDetector::compute_entropy(data);
1626 assert!(
1627 entropy > 3.0 && entropy < 5.5,
1628 "English text should have ~3.5-4.5 bits/byte entropy, got {entropy}"
1629 );
1630 }
1631
1632 #[test]
1633 fn test_entropy_decision_helper_uses_fixed_point_threshold() {
1634 let threshold = entropy_threshold_millibits(6.5);
1635 assert_eq!(threshold, 6500);
1636
1637 let exact = EntropyObservation::new(6.5);
1638 let clearly_low = EntropyObservation::new(6.498);
1639 let clearly_high = EntropyObservation::new(6.8);
1640
1641 assert!(exact.is_high(threshold));
1642 assert!(!clearly_low.is_high(threshold));
1643 assert!(clearly_high.is_high(threshold));
1644 }
1645
1646 #[test]
1647 fn test_entropy_decision_helper_biases_borderline_values_toward_alerting() {
1648 let threshold = entropy_threshold_millibits(6.5);
1649 let borderline = EntropyObservation::new(6.4991);
1650
1651 assert!(
1652 borderline.is_high(threshold),
1653 "Borderline entropy should classify high under the conservative fixed-point gate"
1654 );
1655 }
1656
1657 #[test]
1662 fn test_analyze_parameters_disabled_returns_none() {
1663 let mut cfg = default_config();
1664 cfg.enabled = false;
1665 let detector = CollusionDetector::new(cfg).unwrap();
1666 let result = detector.analyze_parameters("agent-1", &[0u8; 100]);
1667 assert!(result.unwrap().is_none());
1668 }
1669
1670 #[test]
1671 fn test_analyze_parameters_short_data_returns_none() {
1672 let detector = make_detector();
1673 let result = detector.analyze_parameters("agent-1", &[0u8; 10]);
1675 assert!(result.unwrap().is_none());
1676 }
1677
1678 #[test]
1679 fn test_analyze_parameters_normal_text_no_alert() {
1680 let detector = make_detector();
1681 let text = b"This is normal text parameter data that should not trigger alerts.";
1682 for _ in 0..20 {
1683 let result = detector.analyze_parameters("agent-1", text);
1684 assert!(
1685 result.unwrap().is_none(),
1686 "Normal text should not trigger steganographic alert"
1687 );
1688 }
1689 }
1690
1691 #[test]
1692 fn test_analyze_parameters_high_entropy_triggers_alert() {
1693 let mut cfg = default_config();
1694 cfg.min_entropy_observations = 3;
1695 cfg.entropy_threshold = 6.0;
1696 let detector = CollusionDetector::new(cfg).unwrap();
1697
1698 let mut data = Vec::with_capacity(2560);
1700 for _ in 0..10 {
1701 for b in 0..=255u8 {
1702 data.push(b);
1703 }
1704 }
1705
1706 for i in 0..2 {
1708 let result = detector
1709 .analyze_parameters(&format!("agent-{i}"), &data)
1710 .unwrap();
1711 assert!(result.is_none());
1713 }
1714
1715 let mut triggered = false;
1717 for _ in 0..5 {
1718 if let Some(alert) = detector.analyze_parameters("agent-crypto", &data).unwrap() {
1719 assert_eq!(alert.collusion_type, CollusionType::SteganographicChannel);
1720 assert!(alert.evidence.entropy_values.is_some());
1721 triggered = true;
1722 break;
1723 }
1724 }
1725 assert!(
1726 triggered,
1727 "Should have triggered steganographic alert after repeated high-entropy observations"
1728 );
1729 }
1730
1731 #[test]
1736 fn test_record_resource_access_disabled_returns_none() {
1737 let mut cfg = default_config();
1738 cfg.enabled = false;
1739 let detector = CollusionDetector::new(cfg).unwrap();
1740 let result = detector.record_resource_access("agent-1", "/secret", "read", 1000);
1741 assert!(result.unwrap().is_none());
1742 }
1743
1744 #[test]
1745 fn test_record_resource_access_single_agent_no_alert() {
1746 let detector = make_detector();
1747 let result = detector.record_resource_access("agent-1", "/secret", "read", 1000);
1748 assert!(result.unwrap().is_none());
1749 }
1750
1751 #[test]
1752 fn test_record_resource_access_coordinated_triggers_alert() {
1753 let mut cfg = default_config();
1754 cfg.min_coordinated_agents = 3;
1755 cfg.coordination_window_secs = 60;
1756 let detector = CollusionDetector::new(cfg).unwrap();
1757
1758 let base_time = 1_000_000u64;
1759 assert!(detector
1761 .record_resource_access("agent-1", "/credentials/db", "read_file", base_time)
1762 .unwrap()
1763 .is_none());
1764 assert!(detector
1765 .record_resource_access("agent-2", "/credentials/db", "read_file", base_time + 10)
1766 .unwrap()
1767 .is_none());
1768
1769 let result = detector
1770 .record_resource_access("agent-3", "/credentials/db", "read_file", base_time + 20)
1771 .unwrap();
1772 assert!(
1773 result.is_some(),
1774 "Three agents in window should trigger alert"
1775 );
1776
1777 let alert = result.unwrap();
1778 assert_eq!(alert.collusion_type, CollusionType::CoordinatedAccess);
1779 assert_eq!(alert.agent_ids.len(), 3);
1780 assert!(alert.description.contains("3 agents"));
1781 }
1782
1783 #[test]
1784 fn test_record_resource_access_outside_window_no_alert() {
1785 let mut cfg = default_config();
1786 cfg.min_coordinated_agents = 3;
1787 cfg.coordination_window_secs = 60;
1788 let detector = CollusionDetector::new(cfg).unwrap();
1789
1790 let base_time = 1_000_000u64;
1791 assert!(detector
1793 .record_resource_access("agent-1", "/secret", "read", base_time)
1794 .unwrap()
1795 .is_none());
1796 assert!(detector
1797 .record_resource_access("agent-2", "/secret", "read", base_time + 10)
1798 .unwrap()
1799 .is_none());
1800 let result = detector
1802 .record_resource_access("agent-3", "/secret", "read", base_time + 100)
1803 .unwrap();
1804 assert!(
1806 result.is_none(),
1807 "Agent outside window should not trigger alert"
1808 );
1809 }
1810
1811 #[test]
1816 fn test_record_tool_timing_disabled_returns_none() {
1817 let mut cfg = default_config();
1818 cfg.enabled = false;
1819 let detector = CollusionDetector::new(cfg).unwrap();
1820 let result = detector.record_tool_timing("agent-1", 1000);
1821 assert!(result.unwrap().is_none());
1822 }
1823
1824 #[test]
1825 fn test_record_tool_timing_single_agent_no_alert() {
1826 let detector = make_detector();
1827 for t in 0..20 {
1828 let result = detector.record_tool_timing("agent-1", t * 10);
1829 assert!(result.unwrap().is_none());
1830 }
1831 }
1832
1833 #[test]
1834 fn test_record_tool_timing_synchronized_triggers_alert() {
1835 let mut cfg = default_config();
1836 cfg.sync_threshold = 0.8;
1837 let detector = CollusionDetector::new(cfg).unwrap();
1838
1839 for t in 0..15 {
1841 let _ = detector.record_tool_timing("agent-a", t * 10);
1842 let _ = detector.record_tool_timing("agent-b", t * 10 + 1); }
1844
1845 let result = detector.record_tool_timing("agent-a", 150);
1847 assert!(result.is_ok());
1851 }
1852
1853 #[test]
1854 fn test_compute_sync_score_identical_intervals_returns_high() {
1855 let a: VecDeque<u64> = (0..10).map(|i| i * 5).collect();
1856 let b: VecDeque<u64> = (0..10).map(|i| i * 5 + 1).collect();
1857
1858 let score = CollusionDetector::compute_sync_score(&a, &b);
1859 assert!(
1860 score > 0.95,
1861 "Identical intervals should have high sync score, got {score}"
1862 );
1863 }
1864
1865 #[test]
1866 fn test_compute_sync_score_different_intervals_returns_low() {
1867 let a: VecDeque<u64> = (0..10).map(|i| i * 5).collect();
1869 let b: VecDeque<u64> = (0..10).map(|i| i * i * 3).collect();
1871
1872 let score = CollusionDetector::compute_sync_score(&a, &b);
1873 assert!(
1874 score < 0.8,
1875 "Different interval patterns should have low sync score, got {score}"
1876 );
1877 }
1878
1879 #[test]
1880 fn test_compute_sync_score_empty_timestamps_returns_zero() {
1881 let a: VecDeque<u64> = VecDeque::new();
1882 let b: VecDeque<u64> = (0..10).collect();
1883 assert_eq!(CollusionDetector::compute_sync_score(&a, &b), 0.0);
1884 }
1885
1886 #[test]
1887 fn test_compute_sync_score_single_timestamp_returns_zero() {
1888 let a: VecDeque<u64> = vec![100].into_iter().collect();
1889 let b: VecDeque<u64> = vec![100].into_iter().collect();
1890 assert_eq!(CollusionDetector::compute_sync_score(&a, &b), 0.0);
1891 }
1892
1893 #[test]
1898 fn test_validate_agent_id_empty_rejected() {
1899 assert!(CollusionDetector::validate_agent_id("").is_err());
1900 }
1901
1902 #[test]
1903 fn test_validate_agent_id_too_long_rejected() {
1904 let long_id = "a".repeat(MAX_AGENT_ID_LEN + 1);
1905 assert!(CollusionDetector::validate_agent_id(&long_id).is_err());
1906 }
1907
1908 #[test]
1909 fn test_validate_agent_id_control_chars_rejected() {
1910 assert!(CollusionDetector::validate_agent_id("agent\0id").is_err());
1911 }
1912
1913 #[test]
1914 fn test_validate_resource_key_empty_rejected() {
1915 assert!(CollusionDetector::validate_resource_key("").is_err());
1916 }
1917
1918 #[test]
1919 fn test_validate_tool_name_too_long_rejected() {
1920 let long_name = "t".repeat(MAX_TOOL_NAME_LEN + 1);
1921 assert!(CollusionDetector::validate_tool_name(&long_name).is_err());
1922 }
1923
1924 #[test]
1929 fn test_recent_alerts_returns_empty_initially() {
1930 let detector = make_detector();
1931 let alerts = detector.recent_alerts(10).unwrap();
1932 assert!(alerts.is_empty());
1933 }
1934
1935 #[test]
1936 fn test_alert_count_returns_zero_initially() {
1937 let detector = make_detector();
1938 assert_eq!(detector.alert_count().unwrap(), 0);
1939 }
1940
1941 #[test]
1942 fn test_recent_alerts_returns_most_recent_first() {
1943 let mut cfg = default_config();
1945 cfg.min_coordinated_agents = 2;
1946 let detector = CollusionDetector::new(cfg).unwrap();
1947
1948 let base = 1_000_000u64;
1949 let _ = detector.record_resource_access("a1", "/res1", "read", base);
1950 let _ = detector.record_resource_access("a2", "/res1", "read", base + 1);
1951 let _ = detector.record_resource_access("a1", "/res2", "read", base + 2);
1952 let _ = detector.record_resource_access("a3", "/res2", "read", base + 3);
1953
1954 let alerts = detector.recent_alerts(10).unwrap();
1955 assert!(!alerts.is_empty(), "Should have at least one alert");
1956 if alerts.len() >= 2 {
1958 assert!(alerts[0].detected_at >= alerts[1].detected_at);
1959 }
1960 }
1961
1962 #[test]
1967 fn test_config_serialization_roundtrip() {
1968 let cfg = CollusionConfig::default();
1969 let json = serde_json::to_string(&cfg).unwrap();
1970 let parsed: CollusionConfig = serde_json::from_str(&json).unwrap();
1971 assert_eq!(
1972 parsed.coordination_window_secs,
1973 cfg.coordination_window_secs
1974 );
1975 assert_eq!(parsed.entropy_threshold, cfg.entropy_threshold);
1976 }
1977
1978 #[test]
1979 fn test_config_deny_unknown_fields() {
1980 let json = r#"{"enabled": true, "unknown_field": 42}"#;
1981 let result: Result<CollusionConfig, _> = serde_json::from_str(json);
1982 assert!(
1983 result.is_err(),
1984 "deny_unknown_fields should reject unknown fields"
1985 );
1986 }
1987
1988 #[test]
1989 fn test_alert_serialization_roundtrip() {
1990 let alert = CollusionAlert {
1991 collusion_type: CollusionType::SteganographicChannel,
1992 severity: CollusionSeverity::High,
1993 agent_ids: vec!["a1".to_string()],
1994 target: "test".to_string(),
1995 description: "test alert".to_string(),
1996 detected_at: 12345,
1997 evidence: CollusionEvidence {
1998 entropy_values: Some(vec![7.5, 7.8]),
1999 access_timestamps: None,
2000 sync_score: None,
2001 observation_count: 2,
2002 },
2003 };
2004 let json = serde_json::to_string(&alert).unwrap();
2005 let parsed: CollusionAlert = serde_json::from_str(&json).unwrap();
2006 assert_eq!(parsed.collusion_type, CollusionType::SteganographicChannel);
2007 assert_eq!(parsed.severity, CollusionSeverity::High);
2008 }
2009
2010 #[test]
2015 fn test_record_denial_disabled_returns_none() {
2016 let mut cfg = default_config();
2017 cfg.enabled = false;
2018 let detector = CollusionDetector::new(cfg).unwrap();
2019 let result = detector.record_denial("agent-1", "read_file", 1000);
2020 assert!(result.unwrap().is_none());
2021 }
2022
2023 #[test]
2024 fn test_record_denial_below_threshold_no_alert() {
2025 let mut cfg = default_config();
2026 cfg.recon_denial_threshold = 5;
2027 cfg.recon_window_secs = 60;
2028 let detector = CollusionDetector::new(cfg).unwrap();
2029
2030 let base = 1_000_000u64;
2031 for (i, tool) in ["read_file", "write_file", "exec_cmd", "list_dir"]
2033 .iter()
2034 .enumerate()
2035 {
2036 let result = detector
2037 .record_denial("probe-agent", tool, base + i as u64)
2038 .unwrap();
2039 assert!(
2040 result.is_none(),
2041 "Below threshold should not trigger alert (tool #{i})"
2042 );
2043 }
2044 }
2045
2046 #[test]
2047 fn test_record_denial_at_threshold_triggers_alert() {
2048 let mut cfg = default_config();
2049 cfg.recon_denial_threshold = 5;
2050 cfg.recon_window_secs = 60;
2051 let detector = CollusionDetector::new(cfg).unwrap();
2052
2053 let base = 1_000_000u64;
2054 let tools = [
2055 "read_file",
2056 "write_file",
2057 "exec_cmd",
2058 "list_dir",
2059 "delete_file",
2060 ];
2061
2062 let mut triggered = false;
2063 for (i, tool) in tools.iter().enumerate() {
2064 if let Some(alert) = detector
2065 .record_denial("probe-agent", tool, base + i as u64)
2066 .unwrap()
2067 {
2068 assert_eq!(alert.collusion_type, CollusionType::ReconnaissanceProbe);
2069 assert_eq!(alert.severity, CollusionSeverity::High);
2070 assert!(alert.description.contains("5 distinct policy denials"));
2071 assert!(alert.description.contains("probe-agent"));
2072 assert_eq!(alert.evidence.observation_count, 5);
2073 triggered = true;
2074 break;
2075 }
2076 }
2077 assert!(
2078 triggered,
2079 "Should have triggered reconnaissance probe alert at threshold"
2080 );
2081 }
2082
2083 #[test]
2084 fn test_record_denial_same_tool_repeated_no_alert() {
2085 let mut cfg = default_config();
2086 cfg.recon_denial_threshold = 5;
2087 cfg.recon_window_secs = 60;
2088 let detector = CollusionDetector::new(cfg).unwrap();
2089
2090 let base = 1_000_000u64;
2091 for i in 0..20 {
2093 let result = detector
2094 .record_denial("probe-agent", "read_file", base + i)
2095 .unwrap();
2096 assert!(
2097 result.is_none(),
2098 "Repeated same-tool denial should not trigger recon alert"
2099 );
2100 }
2101 }
2102
2103 #[test]
2104 fn test_record_denial_outside_window_no_alert() {
2105 let mut cfg = default_config();
2106 cfg.recon_denial_threshold = 3;
2107 cfg.recon_window_secs = 10;
2108 let detector = CollusionDetector::new(cfg).unwrap();
2109
2110 let base = 1_000_000u64;
2111 detector
2113 .record_denial("probe-agent", "tool_a", base)
2114 .unwrap();
2115 detector
2116 .record_denial("probe-agent", "tool_b", base + 1)
2117 .unwrap();
2118
2119 let result = detector
2121 .record_denial("probe-agent", "tool_c", base + 100)
2122 .unwrap();
2123 assert!(
2125 result.is_none(),
2126 "Denials outside window should not trigger alert"
2127 );
2128 }
2129
2130 #[test]
2131 fn test_record_denial_critical_severity_at_double_threshold() {
2132 let mut cfg = default_config();
2133 cfg.recon_denial_threshold = 3;
2134 cfg.recon_window_secs = 120;
2135 let detector = CollusionDetector::new(cfg).unwrap();
2136
2137 let base = 1_000_000u64;
2138 let tools = ["t1", "t2", "t3", "t4", "t5", "t6"];
2139
2140 let mut last_alert = None;
2141 for (i, tool) in tools.iter().enumerate() {
2142 if let Some(alert) = detector
2143 .record_denial("heavy-probe", tool, base + i as u64)
2144 .unwrap()
2145 {
2146 last_alert = Some(alert);
2147 }
2148 }
2149 let alert = last_alert.expect("Should have triggered alert with 6 tools");
2150 assert_eq!(alert.severity, CollusionSeverity::Critical);
2152 }
2153
2154 #[test]
2155 fn test_record_denial_invalid_agent_id_rejected() {
2156 let detector = make_detector();
2157 assert!(detector.record_denial("", "tool", 1000).is_err());
2158 }
2159
2160 #[test]
2161 fn test_record_denial_invalid_tool_name_rejected() {
2162 let detector = make_detector();
2163 assert!(detector.record_denial("agent", "", 1000).is_err());
2164 }
2165
2166 #[test]
2167 fn test_recon_config_validate_zero_threshold_rejected() {
2168 let mut cfg = default_config();
2169 cfg.recon_denial_threshold = 0;
2170 assert!(cfg.validate().is_err());
2171 }
2172
2173 #[test]
2174 fn test_recon_config_validate_zero_window_rejected() {
2175 let mut cfg = default_config();
2176 cfg.recon_window_secs = 0;
2177 assert!(cfg.validate().is_err());
2178 }
2179
2180 #[test]
2185 fn test_record_agent_action_disabled_returns_none() {
2186 let mut cfg = default_config();
2187 cfg.enabled = false;
2188 let detector = CollusionDetector::new(cfg).unwrap();
2189 let result = detector.record_agent_action("agent-1", false, 1000);
2190 assert!(result.unwrap().is_none());
2191 }
2192
2193 #[test]
2194 fn test_record_agent_action_below_min_actions_no_alert() {
2195 let mut cfg = default_config();
2196 cfg.drift_min_actions = 20;
2197 cfg.drift_window_secs = 100;
2198 cfg.drift_threshold = 0.20;
2199 let detector = CollusionDetector::new(cfg).unwrap();
2200
2201 for i in 0..10 {
2203 let result = detector
2204 .record_agent_action("agent-1", i % 3 == 0, 1000 + i)
2205 .unwrap();
2206 assert!(result.is_none(), "Below min_actions should not trigger");
2207 }
2208 }
2209
2210 #[test]
2211 fn test_record_agent_action_drift_triggers_alert() {
2212 let mut cfg = default_config();
2213 cfg.drift_min_actions = 10;
2214 cfg.drift_window_secs = 100;
2215 cfg.drift_threshold = 0.20;
2216 let detector = CollusionDetector::new(cfg).unwrap();
2217
2218 let base = 1_000_000u64;
2219 for i in 0..10 {
2221 detector
2222 .record_agent_action("drift-agent", false, base + i * 5)
2223 .unwrap();
2224 }
2225 let mut triggered = false;
2227 for i in 0..10 {
2228 if let Some(alert) = detector
2229 .record_agent_action("drift-agent", true, base + 51 + i * 5)
2230 .unwrap()
2231 {
2232 assert_eq!(alert.collusion_type, CollusionType::ConstraintDrift);
2233 assert!(alert.description.contains("drift-agent"));
2234 triggered = true;
2235 break;
2236 }
2237 }
2238 assert!(triggered, "Significant drift should trigger alert");
2239 }
2240
2241 #[test]
2242 fn test_record_agent_action_no_drift_no_alert() {
2243 let mut cfg = default_config();
2244 cfg.drift_min_actions = 10;
2245 cfg.drift_window_secs = 100;
2246 cfg.drift_threshold = 0.20;
2247 let detector = CollusionDetector::new(cfg).unwrap();
2248
2249 let base = 1_000_000u64;
2250 for i in 0..30 {
2252 let denied = i % 10 == 0; let result = detector
2254 .record_agent_action("stable-agent", denied, base + i * 3)
2255 .unwrap();
2256 assert!(
2257 result.is_none(),
2258 "Stable denial rate should not trigger drift alert (action {i})"
2259 );
2260 }
2261 }
2262
2263 #[test]
2264 fn test_drift_config_validate_invalid_threshold() {
2265 let mut cfg = default_config();
2266 cfg.drift_threshold = 1.5;
2267 assert!(cfg.validate().is_err());
2268 }
2269
2270 #[test]
2271 fn test_drift_config_validate_nan_threshold() {
2272 let mut cfg = default_config();
2273 cfg.drift_threshold = f64::NAN;
2274 assert!(cfg.validate().is_err());
2275 }
2276
2277 #[test]
2278 fn test_drift_config_validate_zero_window() {
2279 let mut cfg = default_config();
2280 cfg.drift_window_secs = 0;
2281 assert!(cfg.validate().is_err());
2282 }
2283
2284 #[test]
2285 fn test_drift_config_validate_zero_min_actions() {
2286 let mut cfg = default_config();
2287 cfg.drift_min_actions = 0;
2288 assert!(cfg.validate().is_err());
2289 }
2290
2291 #[test]
2294 fn test_r237_eng7_coordination_window_upper_bound() {
2295 let mut cfg = default_config();
2296 cfg.coordination_window_secs = 86_401; assert!(cfg.validate().is_err());
2298 }
2299
2300 #[test]
2301 fn test_r237_eng7_recon_window_upper_bound() {
2302 let mut cfg = default_config();
2303 cfg.recon_window_secs = 3_601; assert!(cfg.validate().is_err());
2305 }
2306
2307 #[test]
2308 fn test_r237_eng7_drift_window_upper_bound() {
2309 let mut cfg = default_config();
2310 cfg.drift_window_secs = 604_801; assert!(cfg.validate().is_err());
2312 }
2313
2314 #[test]
2315 fn test_r237_eng7_coordination_window_at_max_ok() {
2316 let mut cfg = default_config();
2317 cfg.coordination_window_secs = 86_400; assert!(cfg.validate().is_ok());
2319 }
2320
2321 #[test]
2324 fn test_r229_capacity_alert_type_is_capacity_exhaustion() {
2325 let alert = CollusionDetector::capacity_alert("test_tracker", 100);
2326 assert_eq!(alert.collusion_type, CollusionType::CapacityExhaustion);
2327 assert_eq!(alert.severity, CollusionSeverity::High);
2328 assert!(alert.description.contains("capacity"));
2329 assert!(alert.description.contains("100"));
2330 assert_eq!(alert.target, "test_tracker");
2331 }
2332
2333 #[test]
2334 fn test_r229_capacity_exhaustion_serialization_roundtrip() {
2335 let alert = CollusionDetector::capacity_alert("entropy_profiles", 10_000);
2336 let json = serde_json::to_string(&alert).expect("serialize");
2337 let parsed: CollusionAlert = serde_json::from_str(&json).expect("deserialize");
2338 assert_eq!(parsed.collusion_type, CollusionType::CapacityExhaustion);
2339 assert_eq!(parsed.severity, CollusionSeverity::High);
2340 }
2341
2342 #[test]
2343 fn test_r229_safe_u32_cast_on_observation_count() {
2344 let huge: usize = u32::MAX as usize + 100;
2346 let safe = u32::try_from(huge).unwrap_or(u32::MAX);
2347 assert_eq!(safe, u32::MAX);
2348 }
2349}