use crate::cell::messaging::MessagePriority;
use crate::hierarchy::flow_control::{FlowController, RoutingLevel};
use crate::hierarchy::routing_table::RoutingTable;
use crate::Result;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, instrument, warn};
pub struct HierarchicalRouter {
node_id: String,
routing_table: Arc<RwLock<RoutingTable>>,
flow_controller: Option<Arc<FlowController>>,
}
impl HierarchicalRouter {
pub fn new(node_id: String, routing_table: Arc<RwLock<RoutingTable>>) -> Self {
Self {
node_id,
routing_table,
flow_controller: None,
}
}
pub fn with_flow_control(
node_id: String,
routing_table: Arc<RwLock<RoutingTable>>,
flow_controller: Arc<FlowController>,
) -> Self {
Self {
node_id,
routing_table,
flow_controller: Some(flow_controller),
}
}
#[instrument(skip(self))]
pub async fn is_route_valid(&self, from: &str, to: &str) -> bool {
let table = self.routing_table.read().await;
let from_cell = match table.get_node_cell(from) {
Some(cell) => cell,
None => {
warn!("Source node {} not assigned to any cell", from);
return false;
}
};
let to_cell = match table.get_node_cell(to) {
Some(cell) => cell,
None => {
return self.is_upward_route_valid(&table, from, from_cell, to);
}
};
if from_cell == to_cell {
debug!(
"Allowing same-cell routing: {} → {} (cell: {})",
from, to, from_cell
);
return true;
}
warn!(
"Rejecting cross-cell routing: {} (cell: {}) → {} (cell: {})",
from, from_cell, to, to_cell
);
false
}
fn is_upward_route_valid(
&self,
table: &RoutingTable,
from: &str,
from_cell: &str,
to: &str,
) -> bool {
if !table.is_cell_leader(from, from_cell) {
warn!(
"Rejecting upward routing from non-leader: {} → {}",
from, to
);
return false;
}
debug!(
"Allowing upward routing from cell leader: {} → {}",
from, to
);
true
}
#[instrument(skip(self))]
pub async fn valid_targets(&self) -> Vec<String> {
let table = self.routing_table.read().await;
let my_cell = match table.get_node_cell(&self.node_id) {
Some(cell) => cell,
None => {
warn!("Node {} not assigned to any cell", self.node_id);
return Vec::new();
}
};
let mut targets: Vec<String> = table
.get_cell_nodes(my_cell)
.into_iter()
.filter(|&node| node != self.node_id) .map(|s| s.to_string())
.collect();
if table.is_cell_leader(&self.node_id, my_cell) {
if let Some(zone_id) = table.get_cell_zone(my_cell) {
targets.push(format!("zone:{}", zone_id));
}
}
targets
}
#[instrument(skip(self, new_table))]
pub async fn update_routing_table(&mut self, new_table: RoutingTable) -> Result<()> {
let mut table = self.routing_table.write().await;
debug!("Updating routing table for node {}", self.node_id);
table.merge(&new_table);
Ok(())
}
pub async fn stats(&self) -> RouterStats {
let table = self.routing_table.read().await;
let my_cell = table.get_node_cell(&self.node_id).map(|s| s.to_string());
let my_zone = table.get_node_zone(&self.node_id).map(|s| s.to_string());
let is_leader = my_cell
.as_ref()
.map(|cell| table.is_cell_leader(&self.node_id, cell))
.unwrap_or(false);
let cell_peer_count = my_cell
.as_ref()
.map(|cell| table.get_cell_nodes(cell).len().saturating_sub(1)) .unwrap_or(0);
RouterStats {
node_id: self.node_id.clone(),
cell_id: my_cell,
zone_id: my_zone,
is_cell_leader: is_leader,
cell_peer_count,
}
}
pub async fn get_my_cell(&self) -> Option<String> {
let table = self.routing_table.read().await;
table.get_node_cell(&self.node_id).map(|s| s.to_string())
}
pub async fn get_my_zone(&self) -> Option<String> {
let table = self.routing_table.read().await;
table.get_node_zone(&self.node_id).map(|s| s.to_string())
}
pub async fn is_leader(&self) -> bool {
let table = self.routing_table.read().await;
if let Some(cell) = table.get_node_cell(&self.node_id) {
table.is_cell_leader(&self.node_id, cell)
} else {
false
}
}
#[instrument(skip(self))]
pub async fn route_message(
&self,
from: &str,
to: &str,
message_size: usize,
priority: MessagePriority,
) -> Result<Option<crate::hierarchy::flow_control::Permit>> {
if !self.is_route_valid(from, to).await {
debug!("Route from {} to {} rejected by hierarchy rules", from, to);
return Ok(None);
}
if let Some(fc) = &self.flow_controller {
let table = self.routing_table.read().await;
let from_cell = table.get_node_cell(from);
let to_cell = table.get_node_cell(to);
let level = if from_cell == to_cell && from_cell.is_some() {
RoutingLevel::Cell
} else {
RoutingLevel::Zone
};
let permit = fc.acquire_permit(level, message_size, priority).await?;
Ok(Some(permit))
} else {
Ok(None) }
}
pub async fn has_backpressure(&self) -> bool {
if let Some(fc) = &self.flow_controller {
fc.has_backpressure().await
} else {
false
}
}
pub fn flow_controller(&self) -> Option<Arc<FlowController>> {
self.flow_controller.clone()
}
}
#[cfg(feature = "automerge-backend")]
#[async_trait::async_trait]
impl peat_mesh::storage::sync_transport::SyncRouter for HierarchicalRouter {
async fn get_targets(
&self,
direction: peat_mesh::storage::automerge_sync::SyncDirection,
connected: &[iroh::EndpointId],
) -> Vec<iroh::EndpointId> {
use peat_mesh::storage::automerge_sync::SyncDirection;
match direction {
SyncDirection::Broadcast => connected.to_vec(),
SyncDirection::Lateral => {
connected.to_vec()
}
SyncDirection::Upward => {
if self.is_leader().await {
let valid = self.valid_targets().await;
let has_zone_targets = valid.iter().any(|t| t.starts_with("zone:"));
if has_zone_targets {
tracing::debug!("Upward sync from leader - zone sync not yet implemented");
Vec::new()
} else {
connected.to_vec()
}
} else {
connected.to_vec()
}
}
SyncDirection::Downward => {
if self.is_leader().await {
connected.to_vec()
} else {
tracing::debug!(
"Non-leader ignoring downward sync (commands flow from leader)"
);
Vec::new()
}
}
}
}
async fn is_leader(&self) -> bool {
self.is_leader().await
}
}
#[derive(Debug, Clone)]
pub struct RouterStats {
pub node_id: String,
pub cell_id: Option<String>,
pub zone_id: Option<String>,
pub is_cell_leader: bool,
pub cell_peer_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
async fn setup_routing_table() -> RoutingTable {
let mut table = RoutingTable::new();
table.assign_node("node1", "cell_alpha", 100);
table.assign_node("node2", "cell_alpha", 101);
table.assign_node("node3", "cell_alpha", 102);
table.set_cell_leader("cell_alpha", "node1", 103);
table.assign_node("node4", "cell_beta", 104);
table.assign_node("node5", "cell_beta", 105);
table.set_cell_leader("cell_beta", "node4", 106);
table.assign_cell("cell_alpha", "zone_north", 107);
table.assign_cell("cell_beta", "zone_south", 108);
table
}
#[tokio::test]
async fn test_router_creation() {
let table = setup_routing_table().await;
let router = HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table)));
let stats = router.stats().await;
assert_eq!(stats.node_id, "node1");
assert_eq!(stats.cell_id, Some("cell_alpha".to_string()));
assert_eq!(stats.zone_id, Some("zone_north".to_string()));
assert!(stats.is_cell_leader);
assert_eq!(stats.cell_peer_count, 2); }
#[tokio::test]
async fn test_same_cell_routing() {
let table = setup_routing_table().await;
let router = HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table)));
assert!(router.is_route_valid("node1", "node2").await);
assert!(router.is_route_valid("node2", "node3").await);
assert!(router.is_route_valid("node3", "node1").await);
}
#[tokio::test]
async fn test_cross_cell_routing_rejected() {
let table = setup_routing_table().await;
let router = HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table)));
assert!(!router.is_route_valid("node1", "node4").await);
assert!(!router.is_route_valid("node2", "node5").await);
assert!(!router.is_route_valid("node4", "node1").await);
}
#[tokio::test]
async fn test_leader_upward_routing() {
let table = setup_routing_table().await;
let router = HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table)));
assert!(router.is_route_valid("node1", "zone_coordinator").await);
assert!(!router.is_route_valid("node2", "zone_coordinator").await);
}
#[tokio::test]
async fn test_valid_targets() {
let table = setup_routing_table().await;
let router2 =
HierarchicalRouter::new("node2".to_string(), Arc::new(RwLock::new(table.clone())));
let mut targets = router2.valid_targets().await;
targets.sort();
assert_eq!(targets, vec!["node1", "node3"]);
let router1 = HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table)));
let mut targets = router1.valid_targets().await;
targets.sort();
assert!(targets.contains(&"node2".to_string()));
assert!(targets.contains(&"node3".to_string()));
assert!(targets.contains(&"zone:zone_north".to_string()));
assert_eq!(targets.len(), 3);
}
#[tokio::test]
async fn test_unassigned_node() {
let table = setup_routing_table().await;
let router =
HierarchicalRouter::new("node_unassigned".to_string(), Arc::new(RwLock::new(table)));
assert!(!router.is_route_valid("node_unassigned", "node1").await);
let targets = router.valid_targets().await;
assert_eq!(targets.len(), 0);
let stats = router.stats().await;
assert_eq!(stats.cell_id, None);
assert!(!stats.is_cell_leader);
}
#[tokio::test]
async fn test_routing_table_update() {
let table = setup_routing_table().await;
let mut router = HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table)));
assert_eq!(router.get_my_cell().await, Some("cell_alpha".to_string()));
let mut update = RoutingTable::new();
update.assign_node("node1", "cell_beta", 200);
router.update_routing_table(update).await.unwrap();
assert_eq!(router.get_my_cell().await, Some("cell_beta".to_string()));
}
#[tokio::test]
async fn test_leader_check() {
let table = setup_routing_table().await;
let router1 =
HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table.clone())));
assert!(router1.is_leader().await);
let router2 = HierarchicalRouter::new("node2".to_string(), Arc::new(RwLock::new(table)));
assert!(!router2.is_leader().await); }
#[tokio::test]
async fn test_router_with_flow_control() {
use crate::hierarchy::flow_control::{BandwidthLimit, MessageDropPolicy};
let table = setup_routing_table().await;
let fc = Arc::new(FlowController::new(
BandwidthLimit::new(10, 1000),
BandwidthLimit::new(5, 500),
MessageDropPolicy::DropLowPriority,
));
let router = HierarchicalRouter::with_flow_control(
"node1".to_string(),
Arc::new(RwLock::new(table)),
fc.clone(),
);
assert!(router.flow_controller().is_some());
assert!(!router.has_backpressure().await);
}
#[tokio::test]
async fn test_route_message_with_flow_control() {
use crate::hierarchy::flow_control::{BandwidthLimit, MessageDropPolicy};
let table = setup_routing_table().await;
let fc = Arc::new(FlowController::new(
BandwidthLimit::new(100, 10000),
BandwidthLimit::new(50, 5000),
MessageDropPolicy::DropLowPriority,
));
let router = HierarchicalRouter::with_flow_control(
"node1".to_string(),
Arc::new(RwLock::new(table)),
fc.clone(),
);
let result = router
.route_message("node1", "node2", 100, MessagePriority::Normal)
.await
.unwrap();
assert!(result.is_some());
let metrics = fc.get_metrics();
assert_eq!(metrics.cell_messages_sent, 1);
assert_eq!(metrics.cell_bytes_sent, 100);
}
#[tokio::test]
async fn test_route_message_cross_cell_rejected() {
use crate::hierarchy::flow_control::{BandwidthLimit, MessageDropPolicy};
let table = setup_routing_table().await;
let fc = Arc::new(FlowController::new(
BandwidthLimit::new(100, 10000),
BandwidthLimit::new(50, 5000),
MessageDropPolicy::DropLowPriority,
));
let router = HierarchicalRouter::with_flow_control(
"node1".to_string(),
Arc::new(RwLock::new(table)),
fc.clone(),
);
let result = router
.route_message("node1", "node4", 100, MessagePriority::Normal)
.await
.unwrap();
assert!(result.is_none());
let metrics = fc.get_metrics();
assert_eq!(metrics.cell_messages_sent, 0);
}
#[tokio::test]
async fn test_route_message_leader_to_zone() {
use crate::hierarchy::flow_control::{BandwidthLimit, MessageDropPolicy};
let table = setup_routing_table().await;
let fc = Arc::new(FlowController::new(
BandwidthLimit::new(100, 10000),
BandwidthLimit::new(50, 5000),
MessageDropPolicy::DropLowPriority,
));
let router = HierarchicalRouter::with_flow_control(
"node1".to_string(),
Arc::new(RwLock::new(table)),
fc.clone(),
);
let result = router
.route_message("node1", "zone_coordinator", 200, MessagePriority::High)
.await
.unwrap();
assert!(result.is_some());
let metrics = fc.get_metrics();
assert_eq!(metrics.zone_messages_sent, 1);
assert_eq!(metrics.zone_bytes_sent, 200);
}
#[tokio::test]
async fn test_priority_affects_flow_control() {
use crate::hierarchy::flow_control::{BandwidthLimit, MessageDropPolicy};
let table = setup_routing_table().await;
let fc = Arc::new(FlowController::new(
BandwidthLimit::new(10, 1000),
BandwidthLimit::new(5, 500),
MessageDropPolicy::DropLowPriority,
));
let router = HierarchicalRouter::with_flow_control(
"node1".to_string(),
Arc::new(RwLock::new(table)),
fc.clone(),
);
let _r1 = router
.route_message("node1", "node2", 100, MessagePriority::Critical)
.await
.unwrap();
let _r2 = router
.route_message("node2", "node3", 100, MessagePriority::Low)
.await
.unwrap();
let metrics = fc.get_metrics();
assert_eq!(metrics.cell_messages_sent, 2);
}
#[tokio::test]
async fn test_router_without_flow_control() {
let table = setup_routing_table().await;
let router = HierarchicalRouter::new("node1".to_string(), Arc::new(RwLock::new(table)));
let result = router
.route_message("node1", "node2", 100, MessagePriority::Normal)
.await
.unwrap();
assert!(result.is_none());
assert!(!router.has_backpressure().await);
}
}