peat_protocol/mesh_integration/
aggregator.rs1use crate::hierarchy::StateAggregator;
15use peat_mesh::routing::{AggregationError, Aggregator, DataDirection, DataPacket, DataType};
16use peat_schema::hierarchy::v1::SquadSummary;
17use peat_schema::node::v1::{NodeConfig, NodeState};
18use serde::{Deserialize, Serialize};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct TelemetryPayload {
26 pub config: NodeConfig,
28
29 pub state: NodeState,
31}
32
33impl TelemetryPayload {
34 pub fn new(config: NodeConfig, state: NodeState) -> Self {
36 Self { config, state }
37 }
38
39 pub fn to_bytes(&self) -> Result<Vec<u8>, AggregationError> {
41 serde_json::to_vec(self).map_err(AggregationError::from)
42 }
43
44 pub fn from_bytes(bytes: &[u8]) -> Result<Self, AggregationError> {
46 serde_json::from_slice(bytes).map_err(AggregationError::from)
47 }
48}
49
50pub struct PacketAggregator;
54
55impl PacketAggregator {
56 pub fn new() -> Self {
58 Self
59 }
60
61 pub fn extract_squad_summary(
63 &self,
64 packet: &DataPacket,
65 ) -> Result<SquadSummary, AggregationError> {
66 if packet.data_type != DataType::AggregatedTelemetry {
67 return Err(AggregationError::InvalidPacketType {
68 expected: "AggregatedTelemetry".to_string(),
69 actual: packet.data_type,
70 });
71 }
72
73 serde_json::from_slice(&packet.payload).map_err(AggregationError::from)
74 }
75}
76
77impl Default for PacketAggregator {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl Aggregator for PacketAggregator {
84 fn aggregate_telemetry(
85 &self,
86 group_id: &str,
87 leader_id: &str,
88 telemetry_packets: Vec<DataPacket>,
89 ) -> Result<DataPacket, AggregationError> {
90 if telemetry_packets.is_empty() {
91 return Err(AggregationError::EmptyInput);
92 }
93
94 for packet in &telemetry_packets {
96 if packet.data_type != DataType::Telemetry {
97 return Err(AggregationError::InvalidPacketType {
98 expected: "Telemetry".to_string(),
99 actual: packet.data_type,
100 });
101 }
102 }
103
104 let member_states: Result<Vec<(NodeConfig, NodeState)>, AggregationError> =
106 telemetry_packets
107 .iter()
108 .map(|packet| {
109 let payload = TelemetryPayload::from_bytes(&packet.payload)?;
110 Ok((payload.config, payload.state))
111 })
112 .collect();
113
114 let member_states = member_states?;
115
116 let squad_summary = StateAggregator::aggregate_squad(group_id, leader_id, member_states)
118 .map_err(|e| AggregationError::AggregationFailed(e.to_string()))?;
119
120 let aggregated_payload =
122 serde_json::to_vec(&squad_summary).map_err(AggregationError::from)?;
123
124 Ok(DataPacket {
126 packet_id: uuid::Uuid::new_v4().to_string(),
127 source_node_id: leader_id.to_string(),
128 destination_node_id: None,
129 data_type: DataType::AggregatedTelemetry,
130 direction: DataDirection::Upward,
131 hop_count: 0,
132 max_hops: 10,
133 payload: aggregated_payload,
134 })
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn test_aggregator_creation() {
144 let aggregator = PacketAggregator::new();
145 assert!(std::mem::size_of_val(&aggregator) == 0); }
147
148 #[test]
149 fn test_aggregate_empty_packets() {
150 let aggregator = PacketAggregator::new();
151
152 let result = aggregator.aggregate_telemetry("squad-1", "node-1", vec![]);
153
154 assert!(matches!(result, Err(AggregationError::EmptyInput)));
155 }
156
157 #[test]
158 fn test_aggregate_wrong_packet_type() {
159 let aggregator = PacketAggregator::new();
160
161 let command_packet = DataPacket::command("hq", "node-1", vec![1, 2, 3]);
163
164 let result = aggregator.aggregate_telemetry("squad-1", "node-1", vec![command_packet]);
165
166 assert!(matches!(
167 result,
168 Err(AggregationError::InvalidPacketType { .. })
169 ));
170 }
171
172 #[test]
173 fn test_extract_summary_wrong_type() {
174 let aggregator = PacketAggregator::new();
175
176 let telemetry_packet = DataPacket::telemetry("node-1", vec![1, 2, 3]);
178
179 let result = aggregator.extract_squad_summary(&telemetry_packet);
180
181 assert!(matches!(
182 result,
183 Err(AggregationError::InvalidPacketType { .. })
184 ));
185 }
186}