1use super::aggregator::{Aggregator, NoOpAggregator};
57use super::packet::DataPacket;
58use super::router::{DeduplicationConfig, RoutingDecision, SelectiveRouter};
59use crate::topology::TopologyState;
60use std::sync::Arc;
61use tracing::{debug, info};
62
63#[derive(Debug, Clone)]
65pub struct MeshRouterConfig {
66 pub node_id: String,
68 pub deduplication: DeduplicationConfig,
70 pub auto_aggregate: bool,
72 pub aggregation_threshold: usize,
74 pub verbose: bool,
76}
77
78impl Default for MeshRouterConfig {
79 fn default() -> Self {
80 Self {
81 node_id: String::new(),
82 deduplication: DeduplicationConfig::default(),
83 auto_aggregate: true,
84 aggregation_threshold: 3,
85 verbose: false,
86 }
87 }
88}
89
90impl MeshRouterConfig {
91 pub fn with_node_id(node_id: impl Into<String>) -> Self {
93 Self {
94 node_id: node_id.into(),
95 ..Default::default()
96 }
97 }
98}
99
100#[derive(Debug)]
102pub struct RouteResult {
103 pub decision: RoutingDecision,
105 pub should_aggregate: bool,
107 pub forward_to: Vec<String>,
109}
110
111pub struct MeshRouter<A: Aggregator = NoOpAggregator> {
119 config: MeshRouterConfig,
121 router: SelectiveRouter,
123 aggregator: A,
125 pending_aggregation: Arc<std::sync::RwLock<Vec<DataPacket>>>,
127}
128
129impl MeshRouter<NoOpAggregator> {
130 pub fn new(config: MeshRouterConfig) -> Self {
132 Self::with_aggregator(config, NoOpAggregator)
133 }
134
135 pub fn with_node_id(node_id: impl Into<String>) -> Self {
137 Self::new(MeshRouterConfig::with_node_id(node_id))
138 }
139}
140
141impl<A: Aggregator> MeshRouter<A> {
142 pub fn with_aggregator(config: MeshRouterConfig, aggregator: A) -> Self {
144 let router = if config.deduplication.enabled {
145 SelectiveRouter::new_with_deduplication(config.deduplication.clone())
146 } else {
147 SelectiveRouter::new()
148 };
149
150 Self {
151 config,
152 router,
153 aggregator,
154 pending_aggregation: Arc::new(std::sync::RwLock::new(Vec::new())),
155 }
156 }
157
158 pub fn route(&self, packet: &DataPacket, state: &TopologyState) -> RouteResult {
172 let decision = self.router.route(packet, state, &self.config.node_id);
173
174 let should_aggregate = self.router.should_aggregate(packet, &decision, state);
175
176 let forward_to = match &decision {
177 RoutingDecision::Forward { next_hop } => vec![next_hop.clone()],
178 RoutingDecision::ConsumeAndForward { next_hop } => vec![next_hop.clone()],
179 RoutingDecision::ForwardMulticast { next_hops } => next_hops.clone(),
180 RoutingDecision::ConsumeAndForwardMulticast { next_hops } => next_hops.clone(),
181 _ => vec![],
182 };
183
184 RouteResult {
185 decision,
186 should_aggregate,
187 forward_to,
188 }
189 }
190
191 pub fn add_for_aggregation(&self, packet: DataPacket, squad_id: &str) -> Option<DataPacket> {
196 if !self.config.auto_aggregate {
197 return None;
198 }
199
200 let mut pending = self
201 .pending_aggregation
202 .write()
203 .unwrap_or_else(|e| e.into_inner());
204 pending.push(packet);
205
206 if pending.len() >= self.config.aggregation_threshold {
207 let packets: Vec<DataPacket> = pending.drain(..).collect();
209 match self
210 .aggregator
211 .aggregate_telemetry(squad_id, &self.config.node_id, packets)
212 {
213 Ok(aggregated) => {
214 if self.config.verbose {
215 info!(
216 "Aggregated {} packets into squad summary for {}",
217 self.config.aggregation_threshold, squad_id
218 );
219 }
220 Some(aggregated)
221 }
222 Err(e) => {
223 debug!("Aggregation failed: {}", e);
224 None
225 }
226 }
227 } else {
228 None
229 }
230 }
231
232 pub fn pending_aggregation_count(&self) -> usize {
234 self.pending_aggregation
235 .read()
236 .unwrap_or_else(|e| e.into_inner())
237 .len()
238 }
239
240 pub fn flush_aggregation(&self, squad_id: &str) -> Option<DataPacket> {
242 let mut pending = self
243 .pending_aggregation
244 .write()
245 .unwrap_or_else(|e| e.into_inner());
246 if pending.is_empty() {
247 return None;
248 }
249
250 let packets: Vec<DataPacket> = pending.drain(..).collect();
251 match self
252 .aggregator
253 .aggregate_telemetry(squad_id, &self.config.node_id, packets)
254 {
255 Ok(aggregated) => Some(aggregated),
256 Err(e) => {
257 debug!("Flush aggregation failed: {}", e);
258 None
259 }
260 }
261 }
262
263 pub fn router(&self) -> &SelectiveRouter {
265 &self.router
266 }
267
268 pub fn aggregator(&self) -> &A {
270 &self.aggregator
271 }
272
273 pub fn node_id(&self) -> &str {
275 &self.config.node_id
276 }
277
278 pub fn dedup_cache_size(&self) -> usize {
280 self.router.dedup_cache_size()
281 }
282
283 pub fn clear_dedup_cache(&self) {
285 self.router.clear_dedup_cache();
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use crate::beacon::{GeoPosition, GeographicBeacon, HierarchyLevel};
293 use crate::hierarchy::NodeRole;
294 use crate::topology::SelectedPeer;
295 use std::collections::HashMap;
296 use std::time::Instant;
297
298 fn create_test_state() -> TopologyState {
299 TopologyState {
300 selected_peer: Some(SelectedPeer {
301 node_id: "parent-node".to_string(),
302 beacon: GeographicBeacon::new(
303 "parent-node".to_string(),
304 GeoPosition::new(37.7749, -122.4194),
305 HierarchyLevel::Platoon,
306 ),
307 selected_at: Instant::now(),
308 }),
309 linked_peers: HashMap::new(),
310 lateral_peers: HashMap::new(),
311 role: NodeRole::Member,
312 hierarchy_level: HierarchyLevel::Squad,
313 }
314 }
315
316 #[test]
317 fn test_mesh_router_creation() {
318 let router = MeshRouter::with_node_id("test-node");
319 assert_eq!(router.node_id(), "test-node");
320 assert_eq!(router.pending_aggregation_count(), 0);
321 }
322
323 #[test]
324 fn test_mesh_router_routing() {
325 let router = MeshRouter::with_node_id("test-node");
326 let state = create_test_state();
327 let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
328
329 let result = router.route(&packet, &state);
330
331 assert!(!result.forward_to.is_empty());
333 assert_eq!(result.forward_to[0], "parent-node");
334 }
335
336 #[test]
337 fn test_mesh_router_deduplication() {
338 let config = MeshRouterConfig {
339 node_id: "test-node".to_string(),
340 deduplication: DeduplicationConfig {
341 enabled: true,
342 ..Default::default()
343 },
344 ..Default::default()
345 };
346 let router = MeshRouter::new(config);
347 let state = create_test_state();
348 let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
349
350 let result1 = router.route(&packet, &state);
352 assert!(!matches!(result1.decision, RoutingDecision::Drop));
353 assert_eq!(router.dedup_cache_size(), 1);
354
355 let result2 = router.route(&packet, &state);
357 assert_eq!(result2.decision, RoutingDecision::Drop);
358 }
359
360 #[test]
361 fn test_mesh_router_aggregation_disabled() {
362 let config = MeshRouterConfig {
363 node_id: "test-node".to_string(),
364 auto_aggregate: false,
365 ..Default::default()
366 };
367 let router = MeshRouter::new(config);
368 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
369
370 let result = router.add_for_aggregation(packet, "squad-1");
371 assert!(result.is_none());
372 assert_eq!(router.pending_aggregation_count(), 0);
373 }
374
375 #[test]
376 fn test_mesh_router_aggregation_below_threshold() {
377 let config = MeshRouterConfig {
378 node_id: "test-node".to_string(),
379 auto_aggregate: true,
380 aggregation_threshold: 5,
381 ..Default::default()
382 };
383 let router = MeshRouter::new(config);
384
385 for i in 0..4 {
387 let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
388 let result = router.add_for_aggregation(packet, "squad-1");
389 assert!(result.is_none());
390 }
391 assert_eq!(router.pending_aggregation_count(), 4);
392 }
393
394 #[test]
395 fn test_mesh_router_aggregation_at_threshold() {
396 let config = MeshRouterConfig {
397 node_id: "test-node".to_string(),
398 auto_aggregate: true,
399 aggregation_threshold: 3,
400 ..Default::default()
401 };
402 let router = MeshRouter::new(config);
403
404 for i in 0..2 {
406 let packet = DataPacket::telemetry(format!("sensor-{}", i), vec![i as u8]);
407 let result = router.add_for_aggregation(packet, "squad-1");
408 assert!(result.is_none());
409 }
410
411 let packet = DataPacket::telemetry("sensor-2", vec![2]);
414 let result = router.add_for_aggregation(packet, "squad-1");
415 assert!(result.is_none());
417 assert_eq!(router.pending_aggregation_count(), 0);
419 }
420
421 #[test]
422 fn test_mesh_router_flush_aggregation_empty() {
423 let router = MeshRouter::with_node_id("test-node");
424
425 let result = router.flush_aggregation("squad-1");
427 assert!(result.is_none());
428 }
429
430 #[test]
431 fn test_mesh_router_flush_aggregation_with_pending() {
432 let config = MeshRouterConfig {
433 node_id: "test-node".to_string(),
434 auto_aggregate: true,
435 aggregation_threshold: 10, ..Default::default()
437 };
438 let router = MeshRouter::new(config);
439
440 let packet = DataPacket::telemetry("sensor-1", vec![1, 2, 3]);
442 router.add_for_aggregation(packet, "squad-1");
443 assert_eq!(router.pending_aggregation_count(), 1);
444
445 let result = router.flush_aggregation("squad-1");
447 assert!(result.is_none());
448 assert_eq!(router.pending_aggregation_count(), 0);
450 }
451
452 #[test]
453 fn test_mesh_router_clear_dedup_cache() {
454 let config = MeshRouterConfig {
455 node_id: "test-node".to_string(),
456 deduplication: DeduplicationConfig {
457 enabled: true,
458 ..Default::default()
459 },
460 ..Default::default()
461 };
462 let router = MeshRouter::new(config);
463 let state = create_test_state();
464 let packet = DataPacket::telemetry("other-node", vec![1, 2, 3]);
465
466 router.route(&packet, &state);
467 assert_eq!(router.dedup_cache_size(), 1);
468
469 router.clear_dedup_cache();
470 assert_eq!(router.dedup_cache_size(), 0);
471 }
472
473 #[test]
474 fn test_mesh_router_accessors() {
475 let config = MeshRouterConfig {
476 node_id: "my-node".to_string(),
477 ..Default::default()
478 };
479 let router = MeshRouter::new(config);
480
481 assert_eq!(router.node_id(), "my-node");
482 let _router_ref = router.router();
484 let _agg_ref = router.aggregator();
485 }
486
487 #[test]
488 fn test_mesh_router_config_with_node_id() {
489 let config = MeshRouterConfig::with_node_id("node-abc");
490 assert_eq!(config.node_id, "node-abc");
491 assert!(config.auto_aggregate);
492 assert_eq!(config.aggregation_threshold, 3);
493 }
494
495 #[test]
496 fn test_mesh_router_route_own_telemetry() {
497 let router = MeshRouter::with_node_id("test-node");
499 let state = create_test_state();
500 let packet = DataPacket::telemetry("test-node", vec![1, 2, 3]);
501
502 let result = router.route(&packet, &state);
503 let _ = format!("{:?}", result.decision);
506 }
507
508 #[test]
509 fn test_mesh_router_route_command_packet() {
510 let router = MeshRouter::with_node_id("test-node");
511 let state = create_test_state();
512 let packet = DataPacket::command("hq", "test-node", vec![1, 2, 3]);
513
514 let result = router.route(&packet, &state);
515 assert!(
517 matches!(result.decision, RoutingDecision::Consume)
518 || matches!(result.decision, RoutingDecision::ConsumeAndForward { .. })
519 || matches!(result.decision, RoutingDecision::Drop)
520 || !result.forward_to.is_empty()
521 || result.forward_to.is_empty()
522 );
523 }
524}