1use super::aggregator::{Aggregator, NoOpAggregator};
57use super::packet::DataPacket;
58use super::router::{DeduplicationConfig, RoutingDecision, SelectiveRouter};
59use crate::topology::TopologyState;
60use std::sync::Arc;
61use tracing::{debug, info};
62
63#[derive(Debug, Clone)]
65pub struct MeshRouterConfig {
66 pub node_id: String,
68 pub deduplication: DeduplicationConfig,
70 pub auto_aggregate: bool,
72 pub aggregation_threshold: usize,
74 pub verbose: bool,
76}
77
78impl Default for MeshRouterConfig {
79 fn default() -> Self {
80 Self {
81 node_id: String::new(),
82 deduplication: DeduplicationConfig::default(),
83 auto_aggregate: true,
84 aggregation_threshold: 3,
85 verbose: false,
86 }
87 }
88}
89
90impl MeshRouterConfig {
91 pub fn with_node_id(node_id: impl Into<String>) -> Self {
93 Self {
94 node_id: node_id.into(),
95 ..Default::default()
96 }
97 }
98}
99
100#[derive(Debug)]
102pub struct RouteResult {
103 pub decision: RoutingDecision,
105 pub should_aggregate: bool,
107 pub forward_to: Vec<String>,
109}
110
111pub struct MeshRouter<A: Aggregator = NoOpAggregator> {
119 config: MeshRouterConfig,
121 router: SelectiveRouter,
123 aggregator: A,
125 pending_aggregation: Arc<std::sync::RwLock<Vec<DataPacket>>>,
127}
128
129impl MeshRouter<NoOpAggregator> {
130 pub fn new(config: MeshRouterConfig) -> Self {
132 Self::with_aggregator(config, NoOpAggregator)
133 }
134
135 pub fn with_node_id(node_id: impl Into<String>) -> Self {
137 Self::new(MeshRouterConfig::with_node_id(node_id))
138 }
139}
140
141impl<A: Aggregator> MeshRouter<A> {
142 pub fn with_aggregator(config: MeshRouterConfig, aggregator: A) -> Self {
144 let router = if config.deduplication.enabled {
145 SelectiveRouter::new_with_deduplication(config.deduplication.clone())
146 } else {
147 SelectiveRouter::new()
148 };
149
150 Self {
151 config,
152 router,
153 aggregator,
154 pending_aggregation: Arc::new(std::sync::RwLock::new(Vec::new())),
155 }
156 }
157
158 pub fn route(&self, packet: &DataPacket, state: &TopologyState) -> RouteResult {
172 let decision = self.router.route(packet, state, &self.config.node_id);
173
174 let should_aggregate = self.router.should_aggregate(packet, &decision, state);
175
176 let forward_to = match &decision {
177 RoutingDecision::Forward { next_hop } => vec![next_hop.clone()],
178 RoutingDecision::ConsumeAndForward { next_hop } => vec![next_hop.clone()],
179 RoutingDecision::ForwardMulticast { next_hops } => next_hops.clone(),
180 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => next_hops.clone(),
181 _ => vec![],
182 };
183
184 RouteResult {
185 decision,
186 should_aggregate,
187 forward_to,
188 }
189 }
190
191 pub fn add_for_aggregation(&self, packet: DataPacket, squad_id: &str) -> Option<DataPacket> {
196 if !self.config.auto_aggregate {
197 return None;
198 }
199
200 let mut pending = self.pending_aggregation.write().unwrap();
201 pending.push(packet);
202
203 if pending.len() >= self.config.aggregation_threshold {
204 let packets: Vec<DataPacket> = pending.drain(..).collect();
206 match self
207 .aggregator
208 .aggregate_telemetry(squad_id, &self.config.node_id, packets)
209 {
210 Ok(aggregated) => {
211 if self.config.verbose {
212 info!(
213 "Aggregated {} packets into squad summary for {}",
214 self.config.aggregation_threshold, squad_id
215 );
216 }
217 Some(aggregated)
218 }
219 Err(e) => {
220 debug!("Aggregation failed: {}", e);
221 None
222 }
223 }
224 } else {
225 None
226 }
227 }
228
229 pub fn pending_aggregation_count(&self) -> usize {
231 self.pending_aggregation.read().unwrap().len()
232 }
233
234 pub fn flush_aggregation(&self, squad_id: &str) -> Option<DataPacket> {
236 let mut pending = self.pending_aggregation.write().unwrap();
237 if pending.is_empty() {
238 return None;
239 }
240
241 let packets: Vec<DataPacket> = pending.drain(..).collect();
242 match self
243 .aggregator
244 .aggregate_telemetry(squad_id, &self.config.node_id, packets)
245 {
246 Ok(aggregated) => Some(aggregated),
247 Err(e) => {
248 debug!("Flush aggregation failed: {}", e);
249 None
250 }
251 }
252 }
253
254 pub fn router(&self) -> &SelectiveRouter {
256 &self.router
257 }
258
259 pub fn aggregator(&self) -> &A {
261 &self.aggregator
262 }
263
264 pub fn node_id(&self) -> &str {
266 &self.config.node_id
267 }
268
269 pub fn dedup_cache_size(&self) -> usize {
271 self.router.dedup_cache_size()
272 }
273
274 pub fn clear_dedup_cache(&self) {
276 self.router.clear_dedup_cache();
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::beacon::{GeoPosition, GeographicBeacon, HierarchyLevel};
284 use crate::hierarchy::NodeRole;
285 use crate::topology::SelectedPeer;
286 use std::collections::HashMap;
287 use std::time::Instant;
288
289 fn create_test_state() -> TopologyState {
290 TopologyState {
291 selected_peer: Some(SelectedPeer {
292 node_id: "parent-node".to_string(),
293 beacon: GeographicBeacon::new(
294 "parent-node".to_string(),
295 GeoPosition::new(37.7749, -122.4194),
296 HierarchyLevel::Platoon,
297 ),
298 selected_at: Instant::now(),
299 }),
300 linked_peers: HashMap::new(),
301 lateral_peers: HashMap::new(),
302 role: NodeRole::Member,
303 hierarchy_level: HierarchyLevel::Squad,
304 }
305 }
306
307 #[test]
308 fn test_mesh_router_creation() {
309 let router = MeshRouter::with_node_id("test-node");
310 assert_eq!(router.node_id(), "test-node");
311 assert_eq!(router.pending_aggregation_count(), 0);
312 }
313
314 #[test]
315 fn test_mesh_router_routing() {
316 let router = MeshRouter::with_node_id("test-node");
317 let state = create_test_state();
318 let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
319
320 let result = router.route(&packet, &state);
321
322 assert!(!result.forward_to.is_empty());
324 assert_eq!(result.forward_to[0], "parent-node");
325 }
326
327 #[test]
328 fn test_mesh_router_deduplication() {
329 let config = MeshRouterConfig {
330 node_id: "test-node".to_string(),
331 deduplication: DeduplicationConfig {
332 enabled: true,
333 ..Default::default()
334 },
335 ..Default::default()
336 };
337 let router = MeshRouter::new(config);
338 let state = create_test_state();
339 let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
340
341 let result1 = router.route(&packet, &state);
343 assert!(!matches!(result1.decision, RoutingDecision::Drop));
344 assert_eq!(router.dedup_cache_size(), 1);
345
346 let result2 = router.route(&packet, &state);
348 assert_eq!(result2.decision, RoutingDecision::Drop);
349 }
350
351 #[test]
352 fn test_mesh_router_aggregation_disabled() {
353 let config = MeshRouterConfig {
354 node_id: "test-node".to_string(),
355 auto_aggregate: false,
356 ..Default::default()
357 };
358 let router = MeshRouter::new(config);
359 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
360
361 let result = router.add_for_aggregation(packet, "squad-1");
362 assert!(result.is_none());
363 assert_eq!(router.pending_aggregation_count(), 0);
364 }
365
366 #[test]
367 fn test_mesh_router_aggregation_below_threshold() {
368 let config = MeshRouterConfig {
369 node_id: "test-node".to_string(),
370 auto_aggregate: true,
371 aggregation_threshold: 5,
372 ..Default::default()
373 };
374 let router = MeshRouter::new(config);
375
376 for i in 0..4 {
378 let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
379 let result = router.add_for_aggregation(packet, "squad-1");
380 assert!(result.is_none());
381 }
382 assert_eq!(router.pending_aggregation_count(), 4);
383 }
384
385 #[test]
386 fn test_mesh_router_aggregation_at_threshold() {
387 let config = MeshRouterConfig {
388 node_id: "test-node".to_string(),
389 auto_aggregate: true,
390 aggregation_threshold: 3,
391 ..Default::default()
392 };
393 let router = MeshRouter::new(config);
394
395 for i in 0..2 {
397 let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
398 let result = router.add_for_aggregation(packet, "squad-1");
399 assert!(result.is_none());
400 }
401
402 let packet = DataPacket::telemetry("sensor-2", vec![2]);
405 let result = router.add_for_aggregation(packet, "squad-1");
406 assert!(result.is_none());
408 assert_eq!(router.pending_aggregation_count(), 0);
410 }
411
412 #[test]
413 fn test_mesh_router_flush_aggregation_empty() {
414 let router = MeshRouter::with_node_id("test-node");
415
416 let result = router.flush_aggregation("squad-1");
418 assert!(result.is_none());
419 }
420
421 #[test]
422 fn test_mesh_router_flush_aggregation_with_pending() {
423 let config = MeshRouterConfig {
424 node_id: "test-node".to_string(),
425 auto_aggregate: true,
426 aggregation_threshold: 10, ..Default::default()
428 };
429 let router = MeshRouter::new(config);
430
431 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
433 router.add_for_aggregation(packet, "squad-1");
434 assert_eq!(router.pending_aggregation_count(), 1);
435
436 let result = router.flush_aggregation("squad-1");
438 assert!(result.is_none());
439 assert_eq!(router.pending_aggregation_count(), 0);
441 }
442
443 #[test]
444 fn test_mesh_router_clear_dedup_cache() {
445 let config = MeshRouterConfig {
446 node_id: "test-node".to_string(),
447 deduplication: DeduplicationConfig {
448 enabled: true,
449 ..Default::default()
450 },
451 ..Default::default()
452 };
453 let router = MeshRouter::new(config);
454 let state = create_test_state();
455 let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
456
457 router.route(&packet, &state);
458 assert_eq!(router.dedup_cache_size(), 1);
459
460 router.clear_dedup_cache();
461 assert_eq!(router.dedup_cache_size(), 0);
462 }
463
464 #[test]
465 fn test_mesh_router_accessors() {
466 let config = MeshRouterConfig {
467 node_id: "my-node".to_string(),
468 ..Default::default()
469 };
470 let router = MeshRouter::new(config);
471
472 assert_eq!(router.node_id(), "my-node");
473 let _router_ref = router.router();
475 let _agg_ref = router.aggregator();
476 }
477
478 #[test]
479 fn test_mesh_router_config_with_node_id() {
480 let config = MeshRouterConfig::with_node_id("node-abc");
481 assert_eq!(config.node_id, "node-abc");
482 assert!(config.auto_aggregate);
483 assert_eq!(config.aggregation_threshold, 3);
484 }
485
486 #[test]
487 fn test_mesh_router_route_own_telemetry() {
488 let router = MeshRouter::with_node_id("test-node");
490 let state = create_test_state();
491 let packet = DataPacket::telemetry("test-node", vec![1, 2, 3]);
492
493 let result = router.route(&packet, &state);
494 let _ = format!("{:?}", result.decision);
497 }
498
499 #[test]
500 fn test_mesh_router_route_command_packet() {
501 let router = MeshRouter::with_node_id("test-node");
502 let state = create_test_state();
503 let packet = DataPacket::command("hq", "test-node", vec![1, 2, 3]);
504
505 let result = router.route(&packet, &state);
506 assert!(
508 matches!(result.decision, RoutingDecision::Consume)
509 || matches!(result.decision, RoutingDecision::ConsumeAndForward { .. })
510 || matches!(result.decision, RoutingDecision::Drop)
511 || !result.forward_to.is_empty()
512 || result.forward_to.is_empty()
513 );
514 }
515}