use crate::command::conflict_resolver::{ConflictResolver, ConflictResult};
use crate::command::routing::{CommandRouter, TargetResolution};
use crate::command::timeout_manager::TimeoutManager;
use crate::command::CommandStorage;
use crate::Result;
use peat_schema::command::v1::{
AckStatus, CommandAcknowledgment, CommandStatus, ConflictPolicy, HierarchicalCommand,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
pub struct CommandCoordinator {
node_id: String,
router: CommandRouter,
storage: Arc<dyn CommandStorage>,
active_commands: Arc<RwLock<HashMap<String, HierarchicalCommand>>>,
acknowledgments: Arc<RwLock<HashMap<(String, String), CommandAcknowledgment>>>,
command_status: Arc<RwLock<HashMap<String, CommandStatus>>>,
conflict_resolver: Arc<ConflictResolver>,
timeout_manager: Arc<TimeoutManager>,
}
impl CommandCoordinator {
pub fn new(
squad_id: Option<String>,
node_id: String,
squad_members: Vec<String>,
storage: Arc<dyn CommandStorage>,
) -> Self {
let router = CommandRouter::new(node_id.clone(), squad_id, squad_members, None);
Self {
node_id,
router,
storage,
active_commands: Arc::new(RwLock::new(HashMap::new())),
acknowledgments: Arc::new(RwLock::new(HashMap::new())),
command_status: Arc::new(RwLock::new(HashMap::new())),
conflict_resolver: Arc::new(ConflictResolver::new()),
timeout_manager: Arc::new(TimeoutManager::new()),
}
}
pub async fn issue_command(&self, command: HierarchicalCommand) -> Result<()> {
tracing::info!(
"[{}] Issuing command: {} (priority: {})",
self.node_id,
command.command_id,
command.priority
);
let conflict_result = self.conflict_resolver.check_conflict(&command).await;
if let ConflictResult::Conflict(existing) = conflict_result {
let policy = ConflictPolicy::try_from(command.conflict_policy)
.unwrap_or(ConflictPolicy::HighestPriorityWins);
tracing::debug!(
"[{}] Conflict detected for command {}, resolving with policy {:?}",
self.node_id,
command.command_id,
policy
);
let mut all_commands = existing;
all_commands.push(command.clone());
let resolved = self.conflict_resolver.resolve(all_commands, policy)?;
if resolved.command_id != command.command_id {
tracing::warn!(
"[{}] Command {} rejected due to conflict (winner: {})",
self.node_id,
command.command_id,
resolved.command_id
);
return Err(crate::Error::Internal(
"Command rejected by conflict resolution policy".to_string(),
));
}
}
self.timeout_manager.register_expiration(&command).await?;
self.conflict_resolver.register_command(&command).await?;
self.active_commands
.write()
.await
.insert(command.command_id.clone(), command.clone());
let status = CommandStatus {
command_id: command.command_id.clone(),
state: 1, acknowledgments: Vec::new(),
last_updated: Some(peat_schema::common::v1::Timestamp {
seconds: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock is before Unix epoch")
.as_secs(),
nanos: 0,
}),
};
self.command_status
.write()
.await
.insert(command.command_id.clone(), status);
if self.requires_acknowledgment(&command) {
let targets = {
let resolution = self.router.resolve_target(&command);
self.router.get_routing_targets(&resolution)
};
if !targets.is_empty() {
let ack_timeout = Duration::from_secs(30); self.timeout_manager
.register_ack_timeout(command.command_id.clone(), targets, ack_timeout)
.await?;
}
}
self.route_command(&command).await?;
Ok(())
}
pub async fn receive_command(&self, command: HierarchicalCommand) -> Result<()> {
tracing::info!(
"[{}] Received command: {} from {}",
self.node_id,
command.command_id,
command.originator_id
);
let resolution = self.router.resolve_target(&command);
match resolution {
TargetResolution::Self_ => {
self.execute_command(&command).await?;
if self.requires_acknowledgment(&command) {
self.send_acknowledgment(&command, AckStatus::AckReceived as i32)
.await?;
}
}
TargetResolution::Subordinates(_) | TargetResolution::AllSquadMembers(_) => {
self.route_command(&command).await?;
}
TargetResolution::NotApplicable => {
tracing::debug!(
"[{}] Command {} not applicable to this node",
self.node_id,
command.command_id
);
}
}
Ok(())
}
async fn route_command(&self, command: &HierarchicalCommand) -> Result<()> {
let resolution = self.router.resolve_target(command);
if !self.router.should_route(&resolution) {
return Ok(());
}
let targets = self.router.get_routing_targets(&resolution);
tracing::info!(
"[{}] Routing command {} to {} nodes",
self.node_id,
command.command_id,
targets.len()
);
let doc_id = self.storage.publish_command(command).await?;
tracing::debug!(
"[{}] Published command {} to storage (doc_id: {})",
self.node_id,
command.command_id,
doc_id
);
for target_id in &targets {
tracing::debug!(
"[{}] → Routing command {} to {}",
self.node_id,
command.command_id,
target_id
);
}
Ok(())
}
async fn execute_command(&self, command: &HierarchicalCommand) -> Result<()> {
tracing::info!(
"[{}] Executing command: {}",
self.node_id,
command.command_id
);
let mut status_map = self.command_status.write().await;
if let Some(status) = status_map.get_mut(&command.command_id) {
status.state = 2; } else {
status_map.insert(
command.command_id.clone(),
CommandStatus {
command_id: command.command_id.clone(),
state: 2, acknowledgments: Vec::new(),
last_updated: Some(peat_schema::common::v1::Timestamp {
seconds: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock is before Unix epoch")
.as_secs(),
nanos: 0,
}),
},
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if let Some(status) = status_map.get_mut(&command.command_id) {
status.state = 3; }
tracing::info!(
"[{}] ✓ Completed command: {}",
self.node_id,
command.command_id
);
Ok(())
}
fn requires_acknowledgment(&self, command: &HierarchicalCommand) -> bool {
command.acknowledgment_policy > 1
}
async fn send_acknowledgment(&self, command: &HierarchicalCommand, status: i32) -> Result<()> {
let ack = CommandAcknowledgment {
command_id: command.command_id.clone(),
node_id: self.node_id.clone(),
status,
reason: None,
timestamp: Some(peat_schema::common::v1::Timestamp {
seconds: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock is before Unix epoch")
.as_secs(),
nanos: 0,
}),
};
tracing::debug!(
"[{}] Sending ACK for command {} with status {}",
self.node_id,
command.command_id,
status
);
let doc_id = self.storage.publish_acknowledgment(&ack).await?;
tracing::debug!(
"[{}] Published acknowledgment for command {} to storage (doc_id: {})",
self.node_id,
command.command_id,
doc_id
);
self.acknowledgments
.write()
.await
.insert((command.command_id.clone(), self.node_id.clone()), ack);
let all_received = self
.timeout_manager
.record_ack(&command.command_id, &self.node_id)
.await;
if all_received {
tracing::debug!(
"[{}] All acknowledgments received for command {}",
self.node_id,
command.command_id
);
self.timeout_manager
.unregister_ack_timeout(&command.command_id)
.await?;
}
Ok(())
}
pub async fn get_command_status(&self, command_id: &str) -> Option<CommandStatus> {
self.command_status.read().await.get(command_id).cloned()
}
pub async fn get_command_acknowledgments(
&self,
command_id: &str,
) -> Vec<CommandAcknowledgment> {
self.acknowledgments
.read()
.await
.iter()
.filter(|((cmd_id, _), _)| cmd_id == command_id)
.map(|(_, ack)| ack.clone())
.collect()
}
pub async fn is_command_acknowledged(&self, command_id: &str) -> bool {
let command = match self.active_commands.read().await.get(command_id) {
Some(cmd) => cmd.clone(),
None => return false,
};
let resolution = self.router.resolve_target(&command);
let targets = self.router.get_routing_targets(&resolution);
if targets.is_empty() {
return true;
}
let acks = self.get_command_acknowledgments(command_id).await;
let acked_nodes: std::collections::HashSet<String> =
acks.iter().map(|a| a.node_id.clone()).collect();
targets.iter().all(|t| acked_nodes.contains(t))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::command::ObserverHandle;
use peat_schema::command::v1::{command_target::Scope, CommandTarget};
struct MockStorage;
#[async_trait::async_trait]
impl CommandStorage for MockStorage {
async fn publish_command(&self, _command: &HierarchicalCommand) -> crate::Result<String> {
Ok("mock-doc-id".to_string())
}
async fn get_command(
&self,
_command_id: &str,
) -> crate::Result<Option<HierarchicalCommand>> {
Ok(None)
}
async fn query_commands_by_target(
&self,
_target_id: &str,
) -> crate::Result<Vec<HierarchicalCommand>> {
Ok(Vec::new())
}
async fn delete_command(&self, _command_id: &str) -> crate::Result<()> {
Ok(())
}
async fn publish_acknowledgment(
&self,
_ack: &CommandAcknowledgment,
) -> crate::Result<String> {
Ok("mock-ack-id".to_string())
}
async fn get_acknowledgments(
&self,
_command_id: &str,
) -> crate::Result<Vec<CommandAcknowledgment>> {
Ok(Vec::new())
}
async fn update_command_status(&self, _status: &CommandStatus) -> crate::Result<()> {
Ok(())
}
async fn get_command_status(
&self,
_command_id: &str,
) -> crate::Result<Option<CommandStatus>> {
Ok(None)
}
async fn observe_commands(
&self,
_node_id: &str,
_callback: Box<
dyn Fn(
HierarchicalCommand,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
+ Send
+ Sync,
>,
) -> crate::Result<ObserverHandle> {
Ok(ObserverHandle::new(()))
}
async fn observe_acknowledgments(
&self,
_issuer_id: &str,
_callback: Box<
dyn Fn(
CommandAcknowledgment,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
+ Send
+ Sync,
>,
) -> crate::Result<ObserverHandle> {
Ok(ObserverHandle::new(()))
}
}
#[tokio::test]
async fn test_issue_command() {
let storage = Arc::new(MockStorage);
let coordinator = CommandCoordinator::new(
Some("squad-alpha".to_string()),
"node-1".to_string(),
vec!["node-1".to_string(), "node-2".to_string()],
storage,
);
let command = HierarchicalCommand {
command_id: "cmd-001".to_string(),
originator_id: "node-1".to_string(),
target: Some(CommandTarget {
scope: Scope::Individual as i32,
target_ids: vec!["node-2".to_string()],
}),
priority: 5,
acknowledgment_policy: 2, ..Default::default()
};
coordinator.issue_command(command.clone()).await.unwrap();
let status = coordinator.get_command_status("cmd-001").await;
assert!(status.is_some());
assert_eq!(status.unwrap().state, 1); }
#[tokio::test]
async fn test_receive_and_execute_command() {
let storage = Arc::new(MockStorage);
let coordinator = CommandCoordinator::new(
Some("squad-alpha".to_string()),
"node-1".to_string(),
vec!["node-1".to_string(), "node-2".to_string()],
storage,
);
let command = HierarchicalCommand {
command_id: "cmd-002".to_string(),
originator_id: "node-leader".to_string(),
target: Some(CommandTarget {
scope: Scope::Individual as i32,
target_ids: vec!["node-1".to_string()],
}),
priority: 5,
acknowledgment_policy: 4, ..Default::default()
};
coordinator.receive_command(command).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let status = coordinator.get_command_status("cmd-002").await;
assert!(status.is_some());
assert_eq!(status.unwrap().state, 3); }
#[tokio::test]
async fn test_acknowledgment_tracking() {
let storage = Arc::new(MockStorage);
let coordinator = CommandCoordinator::new(
Some("squad-alpha".to_string()),
"node-1".to_string(),
vec!["node-1".to_string(), "node-2".to_string()],
storage,
);
let command = HierarchicalCommand {
command_id: "cmd-003".to_string(),
originator_id: "node-1".to_string(),
target: Some(CommandTarget {
scope: Scope::Individual as i32,
target_ids: vec!["node-1".to_string()],
}),
priority: 5,
acknowledgment_policy: 2, ..Default::default()
};
coordinator.receive_command(command).await.unwrap();
let acks = coordinator.get_command_acknowledgments("cmd-003").await;
assert!(!acks.is_empty());
assert_eq!(acks[0].node_id, "node-1");
}
}