use peat_schema::command::v1::{command_target::Scope, HierarchicalCommand};
use std::collections::HashSet;
use tracing::warn;
pub struct CommandRouter {
node_id: String,
cell_id: Option<String>,
cell_members: Vec<String>,
cohort_id: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TargetResolution {
Self_,
Subordinates(Vec<String>),
AllCellMembers(Vec<String>),
NotApplicable,
}
impl CommandRouter {
pub fn new(
node_id: String,
cell_id: Option<String>,
cell_members: Vec<String>,
cohort_id: Option<String>,
) -> Self {
Self {
node_id,
cell_id,
cell_members,
cohort_id,
}
}
pub fn resolve_target(&self, command: &HierarchicalCommand) -> TargetResolution {
let target = match &command.target {
Some(t) => t,
None => return TargetResolution::NotApplicable,
};
let scope = Scope::try_from(target.scope).unwrap_or(Scope::Unspecified);
match scope {
Scope::Individual => {
let target_ids: HashSet<String> = target.target_ids.iter().cloned().collect();
if target_ids.contains(&self.node_id) {
TargetResolution::Self_
} else {
let subordinate_targets: Vec<String> = self
.cell_members
.iter()
.filter(|m| target_ids.contains(*m))
.cloned()
.collect();
if !subordinate_targets.is_empty() {
TargetResolution::Subordinates(subordinate_targets)
} else {
TargetResolution::NotApplicable
}
}
}
Scope::Cell => {
if let Some(ref my_cell) = self.cell_id {
if target.target_ids.contains(my_cell) {
if !self.cell_members.is_empty() {
TargetResolution::AllCellMembers(self.cell_members.clone())
} else {
TargetResolution::Self_
}
} else {
TargetResolution::NotApplicable
}
} else {
TargetResolution::NotApplicable
}
}
Scope::Cohort => {
if let Some(ref my_cohort) = self.cohort_id {
if target.target_ids.contains(my_cohort) {
if !self.cell_members.is_empty() {
TargetResolution::AllCellMembers(self.cell_members.clone())
} else {
TargetResolution::Self_
}
} else {
TargetResolution::NotApplicable
}
} else {
TargetResolution::NotApplicable
}
}
Scope::Federation | Scope::Coalition => {
warn!(
node_id = %self.node_id,
command_id = %command.command_id,
scope = ?scope,
"Federation/Coalition-scope command reached the cell-tier \
router; should have been resolved at the coordinator layer. \
Treating as NotApplicable (see peat-protocol::command::routing \
module doc)."
);
TargetResolution::NotApplicable
}
Scope::Broadcast => {
if !self.cell_members.is_empty() {
TargetResolution::AllCellMembers(self.cell_members.clone())
} else {
TargetResolution::Self_
}
}
Scope::Unspecified => TargetResolution::NotApplicable,
}
}
pub fn should_route(&self, resolution: &TargetResolution) -> bool {
matches!(
resolution,
TargetResolution::Subordinates(_) | TargetResolution::AllCellMembers(_)
)
}
pub fn get_routing_targets(&self, resolution: &TargetResolution) -> Vec<String> {
match resolution {
TargetResolution::Subordinates(nodes) => nodes.clone(),
TargetResolution::AllCellMembers(nodes) => nodes.clone(),
_ => Vec::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use peat_schema::command::v1::CommandTarget;
#[test]
fn test_resolve_individual_self() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec!["node-1".to_string(), "node-2".to_string()],
None,
);
let command = HierarchicalCommand {
command_id: "cmd-1".to_string(),
target: Some(CommandTarget {
scope: Scope::Individual as i32,
target_ids: vec!["node-1".to_string()],
}),
..Default::default()
};
let resolution = router.resolve_target(&command);
assert_eq!(resolution, TargetResolution::Self_);
}
#[test]
fn test_resolve_individual_subordinate() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec!["node-1".to_string(), "node-2".to_string()],
None,
);
let command = HierarchicalCommand {
command_id: "cmd-1".to_string(),
target: Some(CommandTarget {
scope: Scope::Individual as i32,
target_ids: vec!["node-2".to_string()],
}),
..Default::default()
};
let resolution = router.resolve_target(&command);
assert_eq!(
resolution,
TargetResolution::Subordinates(vec!["node-2".to_string()])
);
}
#[test]
fn test_resolve_cell() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec![
"node-1".to_string(),
"node-2".to_string(),
"node-3".to_string(),
],
None,
);
let command = HierarchicalCommand {
command_id: "cmd-1".to_string(),
target: Some(CommandTarget {
scope: Scope::Cell as i32,
target_ids: vec!["cell-alpha".to_string()],
}),
..Default::default()
};
let resolution = router.resolve_target(&command);
if let TargetResolution::AllCellMembers(members) = resolution {
assert_eq!(members.len(), 3);
} else {
panic!("Expected AllCellMembers resolution");
}
}
#[test]
fn test_resolve_federation_at_cell_router_is_not_applicable_even_as_leader() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec![
"node-1".to_string(),
"node-2".to_string(),
"node-3".to_string(),
],
None,
);
let command = HierarchicalCommand {
command_id: "cmd-fed-1".to_string(),
target: Some(CommandTarget {
scope: Scope::Federation as i32,
target_ids: vec!["fed-1".to_string()],
}),
..Default::default()
};
let resolution = router.resolve_target(&command);
assert_eq!(
resolution,
TargetResolution::NotApplicable,
"Federation-scope command at a cell-tier router must not fan out, \
even for a cell leader — the federation coordinator is responsible \
for resolving targets and re-emitting as Cell/Individual scope."
);
}
#[test]
fn test_resolve_federation_at_cell_router_is_not_applicable_for_non_leader() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec![],
None,
);
let command = HierarchicalCommand {
command_id: "cmd-fed-2".to_string(),
target: Some(CommandTarget {
scope: Scope::Federation as i32,
target_ids: vec!["fed-1".to_string()],
}),
..Default::default()
};
let resolution = router.resolve_target(&command);
assert_eq!(
resolution,
TargetResolution::NotApplicable,
"Federation-scope command at a cell-tier non-leader router is \
NotApplicable; coordinator-mediated dissemination model."
);
}
#[test]
fn test_resolve_coalition_at_cell_router_is_not_applicable_even_as_leader() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec!["node-1".to_string(), "node-2".to_string()],
None,
);
let command = HierarchicalCommand {
command_id: "cmd-coa-1".to_string(),
target: Some(CommandTarget {
scope: Scope::Coalition as i32,
target_ids: vec!["coa-1".to_string()],
}),
..Default::default()
};
let resolution = router.resolve_target(&command);
assert_eq!(
resolution,
TargetResolution::NotApplicable,
"Coalition-scope command at a cell-tier router must not fan out, \
even for a cell leader — the coalition coordinator is responsible \
for resolving targets and re-emitting as Cell/Individual scope."
);
}
#[test]
fn test_resolve_coalition_at_cell_router_is_not_applicable_for_non_leader() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec![],
None,
);
let command = HierarchicalCommand {
command_id: "cmd-coa-2".to_string(),
target: Some(CommandTarget {
scope: Scope::Coalition as i32,
target_ids: vec!["coa-1".to_string()],
}),
..Default::default()
};
let resolution = router.resolve_target(&command);
assert_eq!(
resolution,
TargetResolution::NotApplicable,
"Coalition-scope command at a cell-tier non-leader router is \
NotApplicable; coordinator-mediated dissemination model."
);
}
#[test]
fn test_should_route() {
let router = CommandRouter::new(
"node-1".to_string(),
Some("cell-alpha".to_string()),
vec!["node-1".to_string(), "node-2".to_string()],
None,
);
assert!(router.should_route(&TargetResolution::Subordinates(vec!["node-2".to_string()])));
assert!(!router.should_route(&TargetResolution::Self_));
assert!(!router.should_route(&TargetResolution::NotApplicable));
}
}