1use crate::command::conflict_resolver::{ConflictResolver, ConflictResult};
6use crate::command::routing::{CommandRouter, TargetResolution};
7use crate::command::timeout_manager::TimeoutManager;
8use crate::command::CommandStorage;
9use crate::Result;
10use peat_schema::command::v1::{
11 AckStatus, CommandAcknowledgment, CommandStatus, ConflictPolicy, HierarchicalCommand,
12};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17
18pub struct CommandCoordinator {
20 node_id: String,
22
23 router: CommandRouter,
25
26 storage: Arc<dyn CommandStorage>,
28
29 active_commands: Arc<RwLock<HashMap<String, HierarchicalCommand>>>,
31
32 acknowledgments: Arc<RwLock<HashMap<(String, String), CommandAcknowledgment>>>,
34
35 command_status: Arc<RwLock<HashMap<String, CommandStatus>>>,
37
38 conflict_resolver: Arc<ConflictResolver>,
40
41 timeout_manager: Arc<TimeoutManager>,
43}
44
45impl CommandCoordinator {
46 pub fn new(
48 squad_id: Option<String>,
49 node_id: String,
50 squad_members: Vec<String>,
51 storage: Arc<dyn CommandStorage>,
52 ) -> Self {
53 let router = CommandRouter::new(node_id.clone(), squad_id, squad_members, None);
54
55 Self {
56 node_id,
57 router,
58 storage,
59 active_commands: Arc::new(RwLock::new(HashMap::new())),
60 acknowledgments: Arc::new(RwLock::new(HashMap::new())),
61 command_status: Arc::new(RwLock::new(HashMap::new())),
62 conflict_resolver: Arc::new(ConflictResolver::new()),
63 timeout_manager: Arc::new(TimeoutManager::new()),
64 }
65 }
66
67 pub async fn issue_command(&self, command: HierarchicalCommand) -> Result<()> {
69 tracing::info!(
70 "[{}] Issuing command: {} (priority: {})",
71 self.node_id,
72 command.command_id,
73 command.priority
74 );
75
76 let conflict_result = self.conflict_resolver.check_conflict(&command).await;
78 if let ConflictResult::Conflict(existing) = conflict_result {
79 let policy = ConflictPolicy::try_from(command.conflict_policy)
80 .unwrap_or(ConflictPolicy::HighestPriorityWins);
81
82 tracing::debug!(
83 "[{}] Conflict detected for command {}, resolving with policy {:?}",
84 self.node_id,
85 command.command_id,
86 policy
87 );
88
89 let mut all_commands = existing;
90 all_commands.push(command.clone());
91
92 let resolved = self.conflict_resolver.resolve(all_commands, policy)?;
93
94 if resolved.command_id != command.command_id {
95 tracing::warn!(
96 "[{}] Command {} rejected due to conflict (winner: {})",
97 self.node_id,
98 command.command_id,
99 resolved.command_id
100 );
101 return Err(crate::Error::Internal(
102 "Command rejected by conflict resolution policy".to_string(),
103 ));
104 }
105 }
106
107 self.timeout_manager.register_expiration(&command).await?;
109
110 self.conflict_resolver.register_command(&command).await?;
112
113 self.active_commands
115 .write()
116 .await
117 .insert(command.command_id.clone(), command.clone());
118
119 let status = CommandStatus {
121 command_id: command.command_id.clone(),
122 state: 1, acknowledgments: Vec::new(),
124 last_updated: Some(peat_schema::common::v1::Timestamp {
125 seconds: std::time::SystemTime::now()
126 .duration_since(std::time::UNIX_EPOCH)
127 .expect("system clock is before Unix epoch")
128 .as_secs(),
129 nanos: 0,
130 }),
131 };
132
133 self.command_status
134 .write()
135 .await
136 .insert(command.command_id.clone(), status);
137
138 if self.requires_acknowledgment(&command) {
140 let targets = {
141 let resolution = self.router.resolve_target(&command);
142 self.router.get_routing_targets(&resolution)
143 };
144
145 if !targets.is_empty() {
146 let ack_timeout = Duration::from_secs(30); self.timeout_manager
148 .register_ack_timeout(command.command_id.clone(), targets, ack_timeout)
149 .await?;
150 }
151 }
152
153 self.route_command(&command).await?;
155
156 Ok(())
157 }
158
159 pub async fn receive_command(&self, command: HierarchicalCommand) -> Result<()> {
161 tracing::info!(
162 "[{}] Received command: {} from {}",
163 self.node_id,
164 command.command_id,
165 command.originator_id
166 );
167
168 let resolution = self.router.resolve_target(&command);
170
171 match resolution {
172 TargetResolution::Self_ => {
173 self.execute_command(&command).await?;
175
176 if self.requires_acknowledgment(&command) {
178 self.send_acknowledgment(&command, AckStatus::AckReceived as i32)
179 .await?;
180 }
181 }
182
183 TargetResolution::Subordinates(_) | TargetResolution::AllSquadMembers(_) => {
184 self.route_command(&command).await?;
186 }
187
188 TargetResolution::NotApplicable => {
189 tracing::debug!(
190 "[{}] Command {} not applicable to this node",
191 self.node_id,
192 command.command_id
193 );
194 }
195 }
196
197 Ok(())
198 }
199
200 async fn route_command(&self, command: &HierarchicalCommand) -> Result<()> {
202 let resolution = self.router.resolve_target(command);
203
204 if !self.router.should_route(&resolution) {
205 return Ok(());
206 }
207
208 let targets = self.router.get_routing_targets(&resolution);
209
210 tracing::info!(
211 "[{}] Routing command {} to {} nodes",
212 self.node_id,
213 command.command_id,
214 targets.len()
215 );
216
217 let doc_id = self.storage.publish_command(command).await?;
219
220 tracing::debug!(
221 "[{}] Published command {} to storage (doc_id: {})",
222 self.node_id,
223 command.command_id,
224 doc_id
225 );
226
227 for target_id in &targets {
228 tracing::debug!(
229 "[{}] → Routing command {} to {}",
230 self.node_id,
231 command.command_id,
232 target_id
233 );
234 }
235
236 Ok(())
237 }
238
239 async fn execute_command(&self, command: &HierarchicalCommand) -> Result<()> {
241 tracing::info!(
242 "[{}] Executing command: {}",
243 self.node_id,
244 command.command_id
245 );
246
247 let mut status_map = self.command_status.write().await;
249 if let Some(status) = status_map.get_mut(&command.command_id) {
250 status.state = 2; } else {
252 status_map.insert(
253 command.command_id.clone(),
254 CommandStatus {
255 command_id: command.command_id.clone(),
256 state: 2, acknowledgments: Vec::new(),
258 last_updated: Some(peat_schema::common::v1::Timestamp {
259 seconds: std::time::SystemTime::now()
260 .duration_since(std::time::UNIX_EPOCH)
261 .expect("system clock is before Unix epoch")
262 .as_secs(),
263 nanos: 0,
264 }),
265 },
266 );
267 }
268
269 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
272
273 if let Some(status) = status_map.get_mut(&command.command_id) {
275 status.state = 3; }
277
278 tracing::info!(
279 "[{}] ✓ Completed command: {}",
280 self.node_id,
281 command.command_id
282 );
283
284 Ok(())
285 }
286
287 fn requires_acknowledgment(&self, command: &HierarchicalCommand) -> bool {
289 command.acknowledgment_policy > 1
292 }
293
294 async fn send_acknowledgment(&self, command: &HierarchicalCommand, status: i32) -> Result<()> {
296 let ack = CommandAcknowledgment {
297 command_id: command.command_id.clone(),
298 node_id: self.node_id.clone(),
299 status,
300 reason: None,
301 timestamp: Some(peat_schema::common::v1::Timestamp {
302 seconds: std::time::SystemTime::now()
303 .duration_since(std::time::UNIX_EPOCH)
304 .expect("system clock is before Unix epoch")
305 .as_secs(),
306 nanos: 0,
307 }),
308 };
309
310 tracing::debug!(
311 "[{}] Sending ACK for command {} with status {}",
312 self.node_id,
313 command.command_id,
314 status
315 );
316
317 let doc_id = self.storage.publish_acknowledgment(&ack).await?;
319
320 tracing::debug!(
321 "[{}] Published acknowledgment for command {} to storage (doc_id: {})",
322 self.node_id,
323 command.command_id,
324 doc_id
325 );
326
327 self.acknowledgments
329 .write()
330 .await
331 .insert((command.command_id.clone(), self.node_id.clone()), ack);
332
333 let all_received = self
335 .timeout_manager
336 .record_ack(&command.command_id, &self.node_id)
337 .await;
338
339 if all_received {
340 tracing::debug!(
341 "[{}] All acknowledgments received for command {}",
342 self.node_id,
343 command.command_id
344 );
345 self.timeout_manager
347 .unregister_ack_timeout(&command.command_id)
348 .await?;
349 }
350
351 Ok(())
352 }
353
354 pub async fn get_command_status(&self, command_id: &str) -> Option<CommandStatus> {
356 self.command_status.read().await.get(command_id).cloned()
357 }
358
359 pub async fn get_command_acknowledgments(
361 &self,
362 command_id: &str,
363 ) -> Vec<CommandAcknowledgment> {
364 self.acknowledgments
365 .read()
366 .await
367 .iter()
368 .filter(|((cmd_id, _), _)| cmd_id == command_id)
369 .map(|(_, ack)| ack.clone())
370 .collect()
371 }
372
373 pub async fn is_command_acknowledged(&self, command_id: &str) -> bool {
375 let command = match self.active_commands.read().await.get(command_id) {
376 Some(cmd) => cmd.clone(),
377 None => return false,
378 };
379
380 let resolution = self.router.resolve_target(&command);
381 let targets = self.router.get_routing_targets(&resolution);
382
383 if targets.is_empty() {
384 return true;
385 }
386
387 let acks = self.get_command_acknowledgments(command_id).await;
388 let acked_nodes: std::collections::HashSet<String> =
389 acks.iter().map(|a| a.node_id.clone()).collect();
390
391 targets.iter().all(|t| acked_nodes.contains(t))
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use crate::command::ObserverHandle;
399 use peat_schema::command::v1::{command_target::Scope, CommandTarget};
400
401 struct MockStorage;
403
404 #[async_trait::async_trait]
405 impl CommandStorage for MockStorage {
406 async fn publish_command(&self, _command: &HierarchicalCommand) -> crate::Result<String> {
407 Ok("mock-doc-id".to_string())
408 }
409
410 async fn get_command(
411 &self,
412 _command_id: &str,
413 ) -> crate::Result<Option<HierarchicalCommand>> {
414 Ok(None)
415 }
416
417 async fn query_commands_by_target(
418 &self,
419 _target_id: &str,
420 ) -> crate::Result<Vec<HierarchicalCommand>> {
421 Ok(Vec::new())
422 }
423
424 async fn delete_command(&self, _command_id: &str) -> crate::Result<()> {
425 Ok(())
426 }
427
428 async fn publish_acknowledgment(
429 &self,
430 _ack: &CommandAcknowledgment,
431 ) -> crate::Result<String> {
432 Ok("mock-ack-id".to_string())
433 }
434
435 async fn get_acknowledgments(
436 &self,
437 _command_id: &str,
438 ) -> crate::Result<Vec<CommandAcknowledgment>> {
439 Ok(Vec::new())
440 }
441
442 async fn update_command_status(&self, _status: &CommandStatus) -> crate::Result<()> {
443 Ok(())
444 }
445
446 async fn get_command_status(
447 &self,
448 _command_id: &str,
449 ) -> crate::Result<Option<CommandStatus>> {
450 Ok(None)
451 }
452
453 async fn observe_commands(
454 &self,
455 _node_id: &str,
456 _callback: Box<
457 dyn Fn(
458 HierarchicalCommand,
459 )
460 -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
461 + Send
462 + Sync,
463 >,
464 ) -> crate::Result<ObserverHandle> {
465 Ok(ObserverHandle::new(()))
466 }
467
468 async fn observe_acknowledgments(
469 &self,
470 _issuer_id: &str,
471 _callback: Box<
472 dyn Fn(
473 CommandAcknowledgment,
474 )
475 -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
476 + Send
477 + Sync,
478 >,
479 ) -> crate::Result<ObserverHandle> {
480 Ok(ObserverHandle::new(()))
481 }
482 }
483
484 #[tokio::test]
485 async fn test_issue_command() {
486 let storage = Arc::new(MockStorage);
487 let coordinator = CommandCoordinator::new(
488 Some("squad-alpha".to_string()),
489 "node-1".to_string(),
490 vec!["node-1".to_string(), "node-2".to_string()],
491 storage,
492 );
493
494 let command = HierarchicalCommand {
495 command_id: "cmd-001".to_string(),
496 originator_id: "node-1".to_string(),
497 target: Some(CommandTarget {
498 scope: Scope::Individual as i32,
499 target_ids: vec!["node-2".to_string()],
500 }),
501 priority: 5,
502 acknowledgment_policy: 2, ..Default::default()
504 };
505
506 coordinator.issue_command(command.clone()).await.unwrap();
507
508 let status = coordinator.get_command_status("cmd-001").await;
509 assert!(status.is_some());
510 assert_eq!(status.unwrap().state, 1); }
512
513 #[tokio::test]
514 async fn test_receive_and_execute_command() {
515 let storage = Arc::new(MockStorage);
516 let coordinator = CommandCoordinator::new(
517 Some("squad-alpha".to_string()),
518 "node-1".to_string(),
519 vec!["node-1".to_string(), "node-2".to_string()],
520 storage,
521 );
522
523 let command = HierarchicalCommand {
524 command_id: "cmd-002".to_string(),
525 originator_id: "node-leader".to_string(),
526 target: Some(CommandTarget {
527 scope: Scope::Individual as i32,
528 target_ids: vec!["node-1".to_string()],
529 }),
530 priority: 5,
531 acknowledgment_policy: 4, ..Default::default()
533 };
534
535 coordinator.receive_command(command).await.unwrap();
536
537 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
539
540 let status = coordinator.get_command_status("cmd-002").await;
541 assert!(status.is_some());
542 assert_eq!(status.unwrap().state, 3); }
544
545 #[tokio::test]
546 async fn test_acknowledgment_tracking() {
547 let storage = Arc::new(MockStorage);
548 let coordinator = CommandCoordinator::new(
549 Some("squad-alpha".to_string()),
550 "node-1".to_string(),
551 vec!["node-1".to_string(), "node-2".to_string()],
552 storage,
553 );
554
555 let command = HierarchicalCommand {
556 command_id: "cmd-003".to_string(),
557 originator_id: "node-1".to_string(),
558 target: Some(CommandTarget {
559 scope: Scope::Individual as i32,
560 target_ids: vec!["node-1".to_string()],
561 }),
562 priority: 5,
563 acknowledgment_policy: 2, ..Default::default()
565 };
566
567 coordinator.receive_command(command).await.unwrap();
568
569 let acks = coordinator.get_command_acknowledgments("cmd-003").await;
570 assert!(!acks.is_empty());
571 assert_eq!(acks[0].node_id, "node-1");
572 }
573}