Skip to main content

peat_protocol/composition/
redundant.rs

1//! Redundant composition rules
2//!
3//! This module implements composition rules for redundant capabilities - capabilities
4//! that benefit from redundancy to improve reliability, coverage, or availability.
5//!
6//! Examples:
7//! - Detection Reliability: Multiple sensors increase detection confidence
8//! - Continuous Coverage: Overlapping sensor fields ensure no gaps
9//! - Fault Tolerance: Redundant systems provide backup capability
10
11use crate::composition::rules::{CompositionContext, CompositionResult, CompositionRule};
12use crate::models::capability::{Capability, CapabilityType};
13use crate::models::CapabilityExt;
14use crate::Result;
15use async_trait::async_trait;
16use serde_json::{json, Value};
17
18/// Rule for improving detection reliability through redundant sensors
19///
20/// Multiple sensors of the same type increase detection confidence through redundancy.
21/// Uses probabilistic model: P(detection) = 1 - ∏(1 - P(detection_i))
22pub struct DetectionReliabilityRule {
23    /// Minimum number of redundant sensors required
24    min_sensors: usize,
25    /// Confidence threshold for individual sensors
26    min_confidence: f32,
27}
28
29impl DetectionReliabilityRule {
30    /// Create a new detection reliability rule
31    pub fn new(min_sensors: usize, min_confidence: f32) -> Self {
32        Self {
33            min_sensors,
34            min_confidence,
35        }
36    }
37
38    /// Calculate combined detection probability from redundant sensors
39    ///
40    /// Uses probability formula: P(any detects) = 1 - ∏(1 - P(sensor_i detects))
41    fn combined_confidence(&self, confidences: &[f32]) -> f32 {
42        let failure_prob: f32 = confidences.iter().map(|c| 1.0 - c).product();
43        1.0 - failure_prob
44    }
45}
46
47impl Default for DetectionReliabilityRule {
48    fn default() -> Self {
49        Self::new(2, 0.6)
50    }
51}
52
53#[async_trait]
54impl CompositionRule for DetectionReliabilityRule {
55    fn name(&self) -> &str {
56        "detection_reliability"
57    }
58
59    fn description(&self) -> &str {
60        "Improves detection confidence through redundant sensor coverage"
61    }
62
63    fn applies_to(&self, capabilities: &[Capability]) -> bool {
64        let qualified_sensors = capabilities
65            .iter()
66            .filter(|c| {
67                c.get_capability_type() == CapabilityType::Sensor
68                    && c.confidence >= self.min_confidence
69            })
70            .count();
71
72        qualified_sensors >= self.min_sensors
73    }
74
75    async fn compose(
76        &self,
77        capabilities: &[Capability],
78        _context: &CompositionContext,
79    ) -> Result<CompositionResult> {
80        let sensors: Vec<&Capability> = capabilities
81            .iter()
82            .filter(|c| {
83                c.get_capability_type() == CapabilityType::Sensor
84                    && c.confidence >= self.min_confidence
85            })
86            .collect();
87
88        if sensors.len() < self.min_sensors {
89            return Ok(CompositionResult::new(vec![], 0.0));
90        }
91
92        // Calculate combined detection probability
93        let confidences: Vec<f32> = sensors.iter().map(|s| s.confidence).collect();
94        let combined_confidence = self.combined_confidence(&confidences);
95
96        // Calculate coverage overlap (if metadata available)
97        let total_coverage: f64 = sensors
98            .iter()
99            .filter_map(|cap| {
100                serde_json::from_str::<Value>(&cap.metadata_json)
101                    .ok()
102                    .and_then(|v| v.get("coverage_area").and_then(|c| c.as_f64()))
103            })
104            .sum();
105
106        let mut composed = Capability::new(
107            format!("redundant_detection_{}", uuid::Uuid::new_v4()),
108            "Redundant Detection".to_string(),
109            CapabilityType::Emergent,
110            combined_confidence,
111        );
112        composed.metadata_json = serde_json::to_string(&json!({
113            "composition_type": "redundant",
114            "pattern": "detection_reliability",
115            "sensor_count": sensors.len(),
116            "coverage_area": total_coverage,
117            "individual_confidences": confidences,
118            "reliability_improvement": combined_confidence - confidences.iter().cloned().fold(0.0, f32::max),
119            "description": "Improved detection through sensor redundancy"
120        }))
121        .unwrap_or_default();
122
123        let contributor_ids: Vec<String> = sensors.iter().map(|c| c.id.clone()).collect();
124
125        Ok(CompositionResult::new(vec![composed], combined_confidence)
126            .with_contributors(contributor_ids))
127    }
128}
129
130/// Rule for ensuring continuous coverage through temporal overlap
131///
132/// Multiple platforms with overlapping coverage windows ensure continuous monitoring
133/// of an area without gaps.
134pub struct ContinuousCoverageRule {
135    /// Minimum number of platforms for continuous coverage
136    min_platforms: usize,
137    /// Minimum overlap percentage required (0.0 - 1.0)
138    #[allow(dead_code)]
139    min_overlap: f32,
140}
141
142impl ContinuousCoverageRule {
143    /// Create a new continuous coverage rule
144    pub fn new(min_platforms: usize, min_overlap: f32) -> Self {
145        Self {
146            min_platforms,
147            min_overlap: min_overlap.clamp(0.0, 1.0),
148        }
149    }
150}
151
152impl Default for ContinuousCoverageRule {
153    fn default() -> Self {
154        Self::new(2, 0.3) // 30% overlap recommended
155    }
156}
157
158#[async_trait]
159impl CompositionRule for ContinuousCoverageRule {
160    fn name(&self) -> &str {
161        "continuous_coverage"
162    }
163
164    fn description(&self) -> &str {
165        "Ensures continuous area coverage through temporal overlap of multiple platforms"
166    }
167
168    fn applies_to(&self, capabilities: &[Capability]) -> bool {
169        let qualified_sensors = capabilities
170            .iter()
171            .filter(|c| {
172                c.get_capability_type() == CapabilityType::Sensor
173                    && serde_json::from_str::<Value>(&c.metadata_json)
174                        .ok()
175                        .and_then(|v| v.get("coverage_area").cloned())
176                        .is_some()
177            })
178            .count();
179
180        qualified_sensors >= self.min_platforms
181    }
182
183    async fn compose(
184        &self,
185        capabilities: &[Capability],
186        context: &CompositionContext,
187    ) -> Result<CompositionResult> {
188        let sensors: Vec<&Capability> = capabilities
189            .iter()
190            .filter(|c| {
191                c.get_capability_type() == CapabilityType::Sensor
192                    && serde_json::from_str::<Value>(&c.metadata_json)
193                        .ok()
194                        .and_then(|v| v.get("coverage_area").cloned())
195                        .is_some()
196            })
197            .collect();
198
199        if sensors.len() < self.min_platforms {
200            return Ok(CompositionResult::new(vec![], 0.0));
201        }
202
203        // Calculate total coverage area
204        let total_coverage: f64 = sensors
205            .iter()
206            .filter_map(|cap| {
207                serde_json::from_str::<Value>(&cap.metadata_json)
208                    .ok()
209                    .and_then(|v| v.get("coverage_area").and_then(|c| c.as_f64()))
210            })
211            .sum();
212
213        // Estimate overlap (simplified - assumes some redundancy)
214        let overlap_factor = if sensors.len() > 1 {
215            // Rough estimate: more sensors = more overlap
216            0.2 + (sensors.len() as f32 - 1.0) * 0.1
217        } else {
218            0.0
219        };
220
221        // Confidence based on number of platforms and individual confidences
222        let avg_confidence: f32 =
223            sensors.iter().map(|s| s.confidence).sum::<f32>() / sensors.len() as f32;
224
225        // Boost confidence for continuous coverage
226        let continuous_confidence = (avg_confidence + overlap_factor * 0.2).min(1.0);
227
228        let mut composed = Capability::new(
229            format!("continuous_coverage_{}", uuid::Uuid::new_v4()),
230            "Continuous Coverage".to_string(),
231            CapabilityType::Emergent,
232            continuous_confidence,
233        );
234        composed.metadata_json = serde_json::to_string(&json!({
235            "composition_type": "redundant",
236            "pattern": "continuous_coverage",
237            "platform_count": sensors.len(),
238            "total_coverage_area": total_coverage,
239            "estimated_overlap": overlap_factor,
240            "coverage_redundancy": (sensors.len() as f32 * overlap_factor),
241            "cell_id": context.cell_id,
242            "description": "Continuous monitoring through overlapping coverage"
243        }))
244        .unwrap_or_default();
245
246        let contributor_ids: Vec<String> = sensors.iter().map(|c| c.id.clone()).collect();
247
248        Ok(
249            CompositionResult::new(vec![composed], continuous_confidence)
250                .with_contributors(contributor_ids),
251        )
252    }
253}
254
255/// Rule for fault-tolerant capability through redundant systems
256///
257/// Multiple identical capabilities provide backup and fault tolerance.
258/// System remains operational even if some components fail.
259pub struct FaultToleranceRule {
260    /// Minimum number of redundant capabilities required
261    min_redundancy: usize,
262    /// Capability type to provide fault tolerance for
263    capability_type: CapabilityType,
264}
265
266impl FaultToleranceRule {
267    /// Create a new fault tolerance rule
268    pub fn new(min_redundancy: usize, capability_type: CapabilityType) -> Self {
269        Self {
270            min_redundancy,
271            capability_type,
272        }
273    }
274
275    /// Calculate system reliability with N redundant components
276    ///
277    /// Assumes independent failures: R(system) = 1 - ∏(1 - R(component_i))
278    fn system_reliability(&self, component_confidences: &[f32]) -> f32 {
279        let failure_prob: f32 = component_confidences.iter().map(|c| 1.0 - c).product();
280        1.0 - failure_prob
281    }
282}
283
284impl Default for FaultToleranceRule {
285    fn default() -> Self {
286        Self::new(3, CapabilityType::Communication) // Triple redundant comms
287    }
288}
289
290#[async_trait]
291impl CompositionRule for FaultToleranceRule {
292    fn name(&self) -> &str {
293        "fault_tolerance"
294    }
295
296    fn description(&self) -> &str {
297        "Provides fault-tolerant capability through redundant systems"
298    }
299
300    fn applies_to(&self, capabilities: &[Capability]) -> bool {
301        let redundant_count = capabilities
302            .iter()
303            .filter(|c| c.get_capability_type() == self.capability_type)
304            .count();
305
306        redundant_count >= self.min_redundancy
307    }
308
309    async fn compose(
310        &self,
311        capabilities: &[Capability],
312        _context: &CompositionContext,
313    ) -> Result<CompositionResult> {
314        let redundant_caps: Vec<&Capability> = capabilities
315            .iter()
316            .filter(|c| c.get_capability_type() == self.capability_type)
317            .collect();
318
319        if redundant_caps.len() < self.min_redundancy {
320            return Ok(CompositionResult::new(vec![], 0.0));
321        }
322
323        // Calculate system reliability
324        let confidences: Vec<f32> = redundant_caps.iter().map(|c| c.confidence).collect();
325        let system_reliability = self.system_reliability(&confidences);
326
327        let mut composed = Capability::new(
328            format!(
329                "fault_tolerant_{:?}_{}",
330                self.capability_type,
331                uuid::Uuid::new_v4()
332            ),
333            format!("Fault-Tolerant {:?}", self.capability_type),
334            CapabilityType::Emergent,
335            system_reliability,
336        );
337        composed.metadata_json = serde_json::to_string(&json!({
338            "composition_type": "redundant",
339            "pattern": "fault_tolerance",
340            "base_capability_type": format!("{:?}", self.capability_type),
341            "redundancy_level": redundant_caps.len(),
342            "system_reliability": system_reliability,
343            "individual_reliabilities": confidences,
344            "description": format!("Fault-tolerant {:?} with {}-way redundancy",
345                self.capability_type, redundant_caps.len())
346        }))
347        .unwrap_or_default();
348
349        let contributor_ids: Vec<String> = redundant_caps.iter().map(|c| c.id.clone()).collect();
350
351        Ok(CompositionResult::new(vec![composed], system_reliability)
352            .with_contributors(contributor_ids))
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use serde_json::json;
360
361    #[tokio::test]
362    async fn test_detection_reliability_two_sensors() {
363        let rule = DetectionReliabilityRule::default();
364
365        let mut sensor1 = Capability::new(
366            "sensor1".to_string(),
367            "Camera 1".to_string(),
368            CapabilityType::Sensor,
369            0.7,
370        );
371        sensor1.metadata_json =
372            serde_json::to_string(&json!({"coverage_area": 100.0})).unwrap_or_default();
373
374        let mut sensor2 = Capability::new(
375            "sensor2".to_string(),
376            "Camera 2".to_string(),
377            CapabilityType::Sensor,
378            0.7,
379        );
380        sensor2.metadata_json =
381            serde_json::to_string(&json!({"coverage_area": 100.0})).unwrap_or_default();
382
383        let caps = vec![sensor1, sensor2];
384        let context = CompositionContext::new(vec!["node1".to_string()]);
385
386        let result = rule.compose(&caps, &context).await.unwrap();
387        assert!(result.has_compositions());
388
389        let composed = &result.composed_capabilities[0];
390        // P(detect) = 1 - (1-0.7)(1-0.7) = 1 - 0.09 = 0.91
391        assert!((composed.confidence - 0.91).abs() < 0.01);
392        let metadata: Value = serde_json::from_str(&composed.metadata_json).unwrap();
393        assert_eq!(metadata["sensor_count"].as_u64().unwrap(), 2);
394    }
395
396    #[tokio::test]
397    async fn test_detection_reliability_three_sensors() {
398        let rule = DetectionReliabilityRule::new(3, 0.6);
399
400        let sensors: Vec<Capability> = (0..3)
401            .map(|i| {
402                Capability::new(
403                    format!("sensor{}", i),
404                    format!("Sensor {}", i),
405                    CapabilityType::Sensor,
406                    0.7,
407                )
408            })
409            .collect();
410
411        let context = CompositionContext::new(vec!["node1".to_string()]);
412
413        let result = rule.compose(&sensors, &context).await.unwrap();
414        assert!(result.has_compositions());
415
416        let composed = &result.composed_capabilities[0];
417        // P(detect) = 1 - (1-0.7)^3 = 1 - 0.027 = 0.973
418        assert!((composed.confidence - 0.973).abs() < 0.01);
419    }
420
421    #[tokio::test]
422    async fn test_detection_reliability_insufficient_sensors() {
423        let rule = DetectionReliabilityRule::default();
424
425        let sensor1 = Capability::new(
426            "sensor1".to_string(),
427            "Single Sensor".to_string(),
428            CapabilityType::Sensor,
429            0.8,
430        );
431
432        let caps = vec![sensor1];
433        let context = CompositionContext::new(vec!["node1".to_string()]);
434
435        let result = rule.compose(&caps, &context).await.unwrap();
436        assert!(!result.has_compositions());
437    }
438
439    #[tokio::test]
440    async fn test_continuous_coverage() {
441        let rule = ContinuousCoverageRule::default();
442
443        let mut sensor1 = Capability::new(
444            "sensor1".to_string(),
445            "Platform 1".to_string(),
446            CapabilityType::Sensor,
447            0.85,
448        );
449        sensor1.metadata_json =
450            serde_json::to_string(&json!({"coverage_area": 200.0})).unwrap_or_default();
451
452        let mut sensor2 = Capability::new(
453            "sensor2".to_string(),
454            "Platform 2".to_string(),
455            CapabilityType::Sensor,
456            0.8,
457        );
458        sensor2.metadata_json =
459            serde_json::to_string(&json!({"coverage_area": 200.0})).unwrap_or_default();
460
461        let caps = vec![sensor1, sensor2];
462        let context = CompositionContext::new(vec!["node1".to_string(), "node2".to_string()])
463            .with_cell_id("cell_alpha".to_string());
464
465        assert!(rule.applies_to(&caps));
466
467        let result = rule.compose(&caps, &context).await.unwrap();
468        assert!(result.has_compositions());
469
470        let composed = &result.composed_capabilities[0];
471        assert_eq!(composed.name, "Continuous Coverage");
472        let metadata: Value = serde_json::from_str(&composed.metadata_json).unwrap();
473        assert_eq!(metadata["total_coverage_area"].as_f64().unwrap(), 400.0);
474        assert!(composed.confidence > 0.8); // Should be boosted by redundancy
475        assert_eq!(result.contributing_capabilities.len(), 2);
476    }
477
478    #[tokio::test]
479    async fn test_fault_tolerance_communication() {
480        let rule = FaultToleranceRule::default(); // 3-way redundant comms
481
482        let comms: Vec<Capability> = (0..3)
483            .map(|i| {
484                let mut cap = Capability::new(
485                    format!("comm{}", i),
486                    format!("Radio {}", i),
487                    CapabilityType::Communication,
488                    0.8,
489                );
490                cap.metadata_json =
491                    serde_json::to_string(&json!({"bandwidth": 10.0})).unwrap_or_default();
492                cap
493            })
494            .collect();
495
496        let context = CompositionContext::new(vec!["node1".to_string()]);
497
498        assert!(rule.applies_to(&comms));
499
500        let result = rule.compose(&comms, &context).await.unwrap();
501        assert!(result.has_compositions());
502
503        let composed = &result.composed_capabilities[0];
504        // System reliability = 1 - (1-0.8)^3 = 1 - 0.008 = 0.992
505        assert!((composed.confidence - 0.992).abs() < 0.01);
506        let metadata: Value = serde_json::from_str(&composed.metadata_json).unwrap();
507        assert_eq!(metadata["redundancy_level"].as_u64().unwrap(), 3);
508    }
509
510    #[tokio::test]
511    async fn test_fault_tolerance_insufficient_redundancy() {
512        let rule = FaultToleranceRule::new(4, CapabilityType::Compute);
513
514        let compute_caps: Vec<Capability> = (0..3)
515            .map(|i| {
516                Capability::new(
517                    format!("compute{}", i),
518                    format!("Compute {}", i),
519                    CapabilityType::Compute,
520                    0.9,
521                )
522            })
523            .collect();
524
525        let context = CompositionContext::new(vec!["node1".to_string()]);
526
527        // Only 3 but needs 4
528        assert!(!rule.applies_to(&compute_caps));
529
530        let result = rule.compose(&compute_caps, &context).await.unwrap();
531        assert!(!result.has_compositions());
532    }
533
534    #[tokio::test]
535    async fn test_redundancy_improves_low_confidence_sensors() {
536        let rule = DetectionReliabilityRule::new(2, 0.5);
537
538        // Two sensors with low individual confidence
539        let sensor1 = Capability::new(
540            "sensor1".to_string(),
541            "Weak Sensor 1".to_string(),
542            CapabilityType::Sensor,
543            0.6,
544        );
545
546        let sensor2 = Capability::new(
547            "sensor2".to_string(),
548            "Weak Sensor 2".to_string(),
549            CapabilityType::Sensor,
550            0.6,
551        );
552
553        let caps = vec![sensor1, sensor2];
554        let context = CompositionContext::new(vec!["node1".to_string()]);
555
556        let result = rule.compose(&caps, &context).await.unwrap();
557        assert!(result.has_compositions());
558
559        let composed = &result.composed_capabilities[0];
560        // P(detect) = 1 - (1-0.6)^2 = 1 - 0.16 = 0.84
561        // Redundancy significantly improves reliability!
562        assert!((composed.confidence - 0.84).abs() < 0.01);
563        assert!(composed.confidence > 0.6); // Better than individual sensors
564    }
565
566    #[tokio::test]
567    async fn test_combined_confidence_calculation() {
568        let rule = DetectionReliabilityRule::default();
569
570        // Test with different confidence levels
571        let confidences = vec![0.7, 0.8, 0.9];
572        let combined = rule.combined_confidence(&confidences);
573
574        // P(any detects) = 1 - (1-0.7)(1-0.8)(1-0.9) = 1 - 0.006 = 0.994
575        assert!((combined - 0.994).abs() < 0.01);
576    }
577}