Skip to main content

peat_protocol/mesh_integration/
aggregator.rs

1//! Peat-specific packet aggregation for hierarchical telemetry summarization
2//!
3//! This module provides the `PacketAggregator` which bridges the gap between:
4//! - **peat-mesh routing**: DataPacket flowing through hierarchy (packets in flight)
5//! - **peat-protocol aggregation**: StateAggregator for computing summaries (documents at rest)
6//!
7//! # Architecture
8//!
9//! The PacketAggregator implements `peat_mesh::routing::Aggregator` and:
10//! 1. Deserializes DataPacket payloads into domain objects (NodeState, NodeConfig)
11//! 2. Calls StateAggregator::aggregate_squad() from peat-protocol
12//! 3. Serializes SquadSummary back into DataPacket with AggregatedTelemetry type
13
14use crate::hierarchy::StateAggregator;
15use peat_mesh::routing::{AggregationError, Aggregator, DataDirection, DataPacket, DataType};
16use peat_schema::hierarchy::v1::SquadSummary;
17use peat_schema::node::v1::{NodeConfig, NodeState};
18use serde::{Deserialize, Serialize};
19
20/// Envelope for NodeState + NodeConfig in DataPacket payload
21///
22/// Since DataPacket payload is opaque `Vec<u8>`, we need to serialize
23/// both the node configuration and state together.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct TelemetryPayload {
26    /// Node configuration (capabilities, comm range, etc.)
27    pub config: NodeConfig,
28
29    /// Current node state (position, fuel, health, etc.)
30    pub state: NodeState,
31}
32
33impl TelemetryPayload {
34    /// Create a new telemetry payload
35    pub fn new(config: NodeConfig, state: NodeState) -> Self {
36        Self { config, state }
37    }
38
39    /// Serialize to JSON bytes for DataPacket payload
40    pub fn to_bytes(&self) -> Result<Vec<u8>, AggregationError> {
41        serde_json::to_vec(self).map_err(AggregationError::from)
42    }
43
44    /// Deserialize from DataPacket payload bytes
45    pub fn from_bytes(bytes: &[u8]) -> Result<Self, AggregationError> {
46        serde_json::from_slice(bytes).map_err(AggregationError::from)
47    }
48}
49
50/// Peat-specific packet aggregator for hierarchical telemetry summarization
51///
52/// Bridges peat-mesh routing layer with peat-protocol aggregation logic.
53pub struct PacketAggregator;
54
55impl PacketAggregator {
56    /// Create a new packet aggregator
57    pub fn new() -> Self {
58        Self
59    }
60
61    /// Deserialize a SquadSummary from an aggregated telemetry packet
62    pub fn extract_squad_summary(
63        &self,
64        packet: &DataPacket,
65    ) -> Result<SquadSummary, AggregationError> {
66        if packet.data_type != DataType::AggregatedTelemetry {
67            return Err(AggregationError::InvalidPacketType {
68                expected: "AggregatedTelemetry".to_string(),
69                actual: packet.data_type,
70            });
71        }
72
73        serde_json::from_slice(&packet.payload).map_err(AggregationError::from)
74    }
75}
76
77impl Default for PacketAggregator {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83impl Aggregator for PacketAggregator {
84    fn aggregate_telemetry(
85        &self,
86        group_id: &str,
87        leader_id: &str,
88        telemetry_packets: Vec<DataPacket>,
89    ) -> Result<DataPacket, AggregationError> {
90        if telemetry_packets.is_empty() {
91            return Err(AggregationError::EmptyInput);
92        }
93
94        // Validate all packets are Telemetry type
95        for packet in &telemetry_packets {
96            if packet.data_type != DataType::Telemetry {
97                return Err(AggregationError::InvalidPacketType {
98                    expected: "Telemetry".to_string(),
99                    actual: packet.data_type,
100                });
101            }
102        }
103
104        // Deserialize all payloads into (NodeConfig, NodeState) pairs
105        let member_states: Result<Vec<(NodeConfig, NodeState)>, AggregationError> =
106            telemetry_packets
107                .iter()
108                .map(|packet| {
109                    let payload = TelemetryPayload::from_bytes(&packet.payload)?;
110                    Ok((payload.config, payload.state))
111                })
112                .collect();
113
114        let member_states = member_states?;
115
116        // Call StateAggregator from peat-protocol
117        let squad_summary = StateAggregator::aggregate_squad(group_id, leader_id, member_states)
118            .map_err(|e| AggregationError::AggregationFailed(e.to_string()))?;
119
120        // Serialize SquadSummary back into DataPacket payload
121        let aggregated_payload =
122            serde_json::to_vec(&squad_summary).map_err(AggregationError::from)?;
123
124        // Create new DataPacket with AggregatedTelemetry type
125        Ok(DataPacket {
126            packet_id: uuid::Uuid::new_v4().to_string(),
127            source_node_id: leader_id.to_string(),
128            destination_node_id: None,
129            data_type: DataType::AggregatedTelemetry,
130            direction: DataDirection::Upward,
131            hop_count: 0,
132            max_hops: 10,
133            payload: aggregated_payload,
134        })
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    #[test]
143    fn test_aggregator_creation() {
144        let aggregator = PacketAggregator::new();
145        assert!(std::mem::size_of_val(&aggregator) == 0); // ZST
146    }
147
148    #[test]
149    fn test_aggregate_empty_packets() {
150        let aggregator = PacketAggregator::new();
151
152        let result = aggregator.aggregate_telemetry("squad-1", "node-1", vec![]);
153
154        assert!(matches!(result, Err(AggregationError::EmptyInput)));
155    }
156
157    #[test]
158    fn test_aggregate_wrong_packet_type() {
159        let aggregator = PacketAggregator::new();
160
161        // Create a command packet instead of telemetry
162        let command_packet = DataPacket::command("hq", "node-1", vec![1, 2, 3]);
163
164        let result = aggregator.aggregate_telemetry("squad-1", "node-1", vec![command_packet]);
165
166        assert!(matches!(
167            result,
168            Err(AggregationError::InvalidPacketType { .. })
169        ));
170    }
171
172    #[test]
173    fn test_extract_summary_wrong_type() {
174        let aggregator = PacketAggregator::new();
175
176        // Create a telemetry packet (not aggregated)
177        let telemetry_packet = DataPacket::telemetry("node-1", vec![1, 2, 3]);
178
179        let result = aggregator.extract_squad_summary(&telemetry_packet);
180
181        assert!(matches!(
182            result,
183            Err(AggregationError::InvalidPacketType { .. })
184        ));
185    }
186}