Skip to main content

hive_mesh/routing/
aggregator.rs

1//! Generic packet aggregation for hierarchical telemetry summarization
2//!
3//! This module provides a generic `Aggregator` trait that bridges the gap between:
4//! - **hive-mesh routing**: DataPacket flowing through hierarchy (packets in flight)
5//! - **Application-specific aggregation**: Computing summaries from raw data
6//!
7//! # Architecture
8//!
9//! The `Aggregator` trait defines a generic interface for aggregating
10//! telemetry packets into summary packets. Concrete implementations
11//! (e.g., HIVE's `PacketAggregator`) provide domain-specific aggregation
12//! logic by implementing this trait.
13
14use super::packet::{DataPacket, DataType};
15use thiserror::Error;
16
17/// Errors that can occur during packet aggregation
18#[derive(Debug, Error)]
19pub enum AggregationError {
20    /// Payload deserialization failed
21    #[error("Failed to deserialize payload: {0}")]
22    DeserializationError(#[from] serde_json::Error),
23
24    /// Hierarchical aggregation operation failed
25    #[error("Aggregation operation failed: {0}")]
26    AggregationFailed(String),
27
28    /// Invalid packet type for aggregation
29    #[error("Expected {expected} packet type, got {actual:?}")]
30    InvalidPacketType { expected: String, actual: DataType },
31
32    /// Empty input when non-empty required
33    #[error("Cannot aggregate empty packet list")]
34    EmptyInput,
35}
36
37/// Generic trait for aggregating telemetry packets into summaries
38///
39/// Implementations provide domain-specific aggregation logic that transforms
40/// a collection of telemetry `DataPacket`s into a single aggregated packet.
41pub trait Aggregator: Send + Sync {
42    /// Aggregate telemetry packets into a summary packet
43    ///
44    /// # Arguments
45    ///
46    /// * `group_id` - Identifier for the group being aggregated (e.g., squad ID)
47    /// * `leader_id` - Node ID of the group leader (source of aggregated packet)
48    /// * `telemetry_packets` - Vector of telemetry DataPackets from group members
49    ///
50    /// # Returns
51    ///
52    /// A new DataPacket with aggregated telemetry data
53    fn aggregate_telemetry(
54        &self,
55        group_id: &str,
56        leader_id: &str,
57        telemetry_packets: Vec<DataPacket>,
58    ) -> Result<DataPacket, AggregationError>;
59
60    /// Extract a summary from an aggregated telemetry packet
61    ///
62    /// # Arguments
63    ///
64    /// * `packet` - DataPacket with DataType::AggregatedTelemetry
65    ///
66    /// # Returns
67    ///
68    /// The raw summary bytes from the aggregated packet
69    fn extract_summary_bytes(&self, packet: &DataPacket) -> Result<Vec<u8>, AggregationError> {
70        if packet.data_type != DataType::AggregatedTelemetry {
71            return Err(AggregationError::InvalidPacketType {
72                expected: "AggregatedTelemetry".to_string(),
73                actual: packet.data_type,
74            });
75        }
76        Ok(packet.payload.clone())
77    }
78}
79
80/// A no-op aggregator for use when aggregation is not needed
81///
82/// Always returns an error from `aggregate_telemetry`. Useful as a default
83/// type parameter when creating a `MeshRouter` without a real aggregator.
84pub struct NoOpAggregator;
85
86impl Aggregator for NoOpAggregator {
87    fn aggregate_telemetry(
88        &self,
89        _group_id: &str,
90        _leader_id: &str,
91        _telemetry_packets: Vec<DataPacket>,
92    ) -> Result<DataPacket, AggregationError> {
93        Err(AggregationError::AggregationFailed(
94            "No aggregator configured".to_string(),
95        ))
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_aggregation_error_display() {
105        let err = AggregationError::EmptyInput;
106        assert_eq!(err.to_string(), "Cannot aggregate empty packet list");
107
108        let err = AggregationError::AggregationFailed("test".to_string());
109        assert_eq!(err.to_string(), "Aggregation operation failed: test");
110    }
111
112    #[test]
113    fn test_aggregation_error_invalid_type() {
114        let err = AggregationError::InvalidPacketType {
115            expected: "Telemetry".to_string(),
116            actual: DataType::Command,
117        };
118        assert!(err.to_string().contains("Expected Telemetry"));
119    }
120
121    #[test]
122    fn test_aggregation_error_from_serde() {
123        let serde_err = serde_json::from_str::<serde_json::Value>("not json").unwrap_err();
124        let err: AggregationError = serde_err.into();
125        assert!(err.to_string().contains("deserialize"));
126    }
127
128    #[test]
129    fn test_noop_aggregator_returns_error() {
130        let agg = NoOpAggregator;
131        let result = agg.aggregate_telemetry("group", "leader", vec![]);
132        assert!(result.is_err());
133        if let Err(AggregationError::AggregationFailed(msg)) = result {
134            assert!(msg.contains("No aggregator configured"));
135        }
136    }
137
138    #[test]
139    fn test_extract_summary_bytes_valid() {
140        let agg = NoOpAggregator;
141        let mut packet = DataPacket::telemetry("src", vec![1, 2, 3]);
142        packet.data_type = DataType::AggregatedTelemetry;
143        let result = agg.extract_summary_bytes(&packet);
144        assert_eq!(result.unwrap(), vec![1, 2, 3]);
145    }
146
147    #[test]
148    fn test_extract_summary_bytes_wrong_type() {
149        let agg = NoOpAggregator;
150        let packet = DataPacket::command("src", "dst", vec![]);
151        let result = agg.extract_summary_bytes(&packet);
152        assert!(result.is_err());
153        if let Err(AggregationError::InvalidPacketType { expected, actual }) = result {
154            assert_eq!(expected, "AggregatedTelemetry");
155            assert_eq!(actual, DataType::Command);
156        }
157    }
158}