hive_mesh/routing/
aggregator.rs1use super::packet::{DataPacket, DataType};
15use thiserror::Error;
16
17#[derive(Debug, Error)]
19pub enum AggregationError {
20 #[error("Failed to deserialize payload: {0}")]
22 DeserializationError(#[from] serde_json::Error),
23
24 #[error("Aggregation operation failed: {0}")]
26 AggregationFailed(String),
27
28 #[error("Expected {expected} packet type, got {actual:?}")]
30 InvalidPacketType { expected: String, actual: DataType },
31
32 #[error("Cannot aggregate empty packet list")]
34 EmptyInput,
35}
36
37pub trait Aggregator: Send + Sync {
42 fn aggregate_telemetry(
54 &self,
55 group_id: &str,
56 leader_id: &str,
57 telemetry_packets: Vec<DataPacket>,
58 ) -> Result<DataPacket, AggregationError>;
59
60 fn extract_summary_bytes(&self, packet: &DataPacket) -> Result<Vec<u8>, AggregationError> {
70 if packet.data_type != DataType::AggregatedTelemetry {
71 return Err(AggregationError::InvalidPacketType {
72 expected: "AggregatedTelemetry".to_string(),
73 actual: packet.data_type,
74 });
75 }
76 Ok(packet.payload.clone())
77 }
78}
79
80pub struct NoOpAggregator;
85
86impl Aggregator for NoOpAggregator {
87 fn aggregate_telemetry(
88 &self,
89 _group_id: &str,
90 _leader_id: &str,
91 _telemetry_packets: Vec<DataPacket>,
92 ) -> Result<DataPacket, AggregationError> {
93 Err(AggregationError::AggregationFailed(
94 "No aggregator configured".to_string(),
95 ))
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102
103 #[test]
104 fn test_aggregation_error_display() {
105 let err = AggregationError::EmptyInput;
106 assert_eq!(err.to_string(), "Cannot aggregate empty packet list");
107
108 let err = AggregationError::AggregationFailed("test".to_string());
109 assert_eq!(err.to_string(), "Aggregation operation failed: test");
110 }
111
112 #[test]
113 fn test_aggregation_error_invalid_type() {
114 let err = AggregationError::InvalidPacketType {
115 expected: "Telemetry".to_string(),
116 actual: DataType::Command,
117 };
118 assert!(err.to_string().contains("Expected Telemetry"));
119 }
120
121 #[test]
122 fn test_aggregation_error_from_serde() {
123 let serde_err = serde_json::from_str::<serde_json::Value>("not json").unwrap_err();
124 let err: AggregationError = serde_err.into();
125 assert!(err.to_string().contains("deserialize"));
126 }
127
128 #[test]
129 fn test_noop_aggregator_returns_error() {
130 let agg = NoOpAggregator;
131 let result = agg.aggregate_telemetry("group", "leader", vec![]);
132 assert!(result.is_err());
133 if let Err(AggregationError::AggregationFailed(msg)) = result {
134 assert!(msg.contains("No aggregator configured"));
135 }
136 }
137
138 #[test]
139 fn test_extract_summary_bytes_valid() {
140 let agg = NoOpAggregator;
141 let mut packet = DataPacket::telemetry("src", vec![1, 2, 3]);
142 packet.data_type = DataType::AggregatedTelemetry;
143 let result = agg.extract_summary_bytes(&packet);
144 assert_eq!(result.unwrap(), vec![1, 2, 3]);
145 }
146
147 #[test]
148 fn test_extract_summary_bytes_wrong_type() {
149 let agg = NoOpAggregator;
150 let packet = DataPacket::command("src", "dst", vec![]);
151 let result = agg.extract_summary_bytes(&packet);
152 assert!(result.is_err());
153 if let Err(AggregationError::InvalidPacketType { expected, actual }) = result {
154 assert_eq!(expected, "AggregatedTelemetry");
155 assert_eq!(actual, DataType::Command);
156 }
157 }
158}