Skip to main content

peat_protocol/command/
coordinator.rs

1//! Command coordination and dissemination logic
2//!
3//! Manages command lifecycle: issuance, routing, acknowledgment, and status tracking.
4
5use crate::command::conflict_resolver::{ConflictResolver, ConflictResult};
6use crate::command::routing::{CommandRouter, TargetResolution};
7use crate::command::timeout_manager::TimeoutManager;
8use crate::command::CommandStorage;
9use crate::Result;
10use peat_schema::command::v1::{
11    AckStatus, CommandAcknowledgment, CommandStatus, ConflictPolicy, HierarchicalCommand,
12};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17
18/// Coordinates hierarchical command dissemination
19pub struct CommandCoordinator {
20    /// Node identifier
21    node_id: String,
22
23    /// Router for target resolution
24    router: CommandRouter,
25
26    /// Storage backend for command dissemination
27    storage: Arc<dyn CommandStorage>,
28
29    /// Active commands indexed by command_id (in-memory cache)
30    active_commands: Arc<RwLock<HashMap<String, HierarchicalCommand>>>,
31
32    /// Command acknowledgments indexed by (command_id, node_id) (in-memory cache)
33    acknowledgments: Arc<RwLock<HashMap<(String, String), CommandAcknowledgment>>>,
34
35    /// Command execution status (in-memory cache)
36    command_status: Arc<RwLock<HashMap<String, CommandStatus>>>,
37
38    /// Conflict resolution engine
39    conflict_resolver: Arc<ConflictResolver>,
40
41    /// Timeout management
42    timeout_manager: Arc<TimeoutManager>,
43}
44
45impl CommandCoordinator {
46    /// Create new command coordinator with storage backend
47    pub fn new(
48        squad_id: Option<String>,
49        node_id: String,
50        squad_members: Vec<String>,
51        storage: Arc<dyn CommandStorage>,
52    ) -> Self {
53        let router = CommandRouter::new(node_id.clone(), squad_id, squad_members, None);
54
55        Self {
56            node_id,
57            router,
58            storage,
59            active_commands: Arc::new(RwLock::new(HashMap::new())),
60            acknowledgments: Arc::new(RwLock::new(HashMap::new())),
61            command_status: Arc::new(RwLock::new(HashMap::new())),
62            conflict_resolver: Arc::new(ConflictResolver::new()),
63            timeout_manager: Arc::new(TimeoutManager::new()),
64        }
65    }
66
67    /// Issue a command (originating from this node)
68    pub async fn issue_command(&self, command: HierarchicalCommand) -> Result<()> {
69        tracing::info!(
70            "[{}] Issuing command: {} (priority: {})",
71            self.node_id,
72            command.command_id,
73            command.priority
74        );
75
76        // 1. Check for conflicts
77        let conflict_result = self.conflict_resolver.check_conflict(&command).await;
78        if let ConflictResult::Conflict(existing) = conflict_result {
79            let policy = ConflictPolicy::try_from(command.conflict_policy)
80                .unwrap_or(ConflictPolicy::HighestPriorityWins);
81
82            tracing::debug!(
83                "[{}] Conflict detected for command {}, resolving with policy {:?}",
84                self.node_id,
85                command.command_id,
86                policy
87            );
88
89            let mut all_commands = existing;
90            all_commands.push(command.clone());
91
92            let resolved = self.conflict_resolver.resolve(all_commands, policy)?;
93
94            if resolved.command_id != command.command_id {
95                tracing::warn!(
96                    "[{}] Command {} rejected due to conflict (winner: {})",
97                    self.node_id,
98                    command.command_id,
99                    resolved.command_id
100                );
101                return Err(crate::Error::Internal(
102                    "Command rejected by conflict resolution policy".to_string(),
103                ));
104            }
105        }
106
107        // 2. Register for expiration tracking
108        self.timeout_manager.register_expiration(&command).await?;
109
110        // 3. Register with conflict resolver
111        self.conflict_resolver.register_command(&command).await?;
112
113        // 4. Store in active commands
114        self.active_commands
115            .write()
116            .await
117            .insert(command.command_id.clone(), command.clone());
118
119        // 5. Create initial status
120        let status = CommandStatus {
121            command_id: command.command_id.clone(),
122            state: 1, // PENDING
123            acknowledgments: Vec::new(),
124            last_updated: Some(peat_schema::common::v1::Timestamp {
125                seconds: std::time::SystemTime::now()
126                    .duration_since(std::time::UNIX_EPOCH)
127                    .expect("system clock is before Unix epoch")
128                    .as_secs(),
129                nanos: 0,
130            }),
131        };
132
133        self.command_status
134            .write()
135            .await
136            .insert(command.command_id.clone(), status);
137
138        // 6. Setup acknowledgment timeout if required
139        if self.requires_acknowledgment(&command) {
140            let targets = {
141                let resolution = self.router.resolve_target(&command);
142                self.router.get_routing_targets(&resolution)
143            };
144
145            if !targets.is_empty() {
146                let ack_timeout = Duration::from_secs(30); // TODO: Make configurable
147                self.timeout_manager
148                    .register_ack_timeout(command.command_id.clone(), targets, ack_timeout)
149                    .await?;
150            }
151        }
152
153        // 7. Route command to targets
154        self.route_command(&command).await?;
155
156        Ok(())
157    }
158
159    /// Receive a command (from higher echelon)
160    pub async fn receive_command(&self, command: HierarchicalCommand) -> Result<()> {
161        tracing::info!(
162            "[{}] Received command: {} from {}",
163            self.node_id,
164            command.command_id,
165            command.originator_id
166        );
167
168        // Resolve target
169        let resolution = self.router.resolve_target(&command);
170
171        match resolution {
172            TargetResolution::Self_ => {
173                // Command targets this node - execute it
174                self.execute_command(&command).await?;
175
176                // Send acknowledgment if required
177                if self.requires_acknowledgment(&command) {
178                    self.send_acknowledgment(&command, AckStatus::AckReceived as i32)
179                        .await?;
180                }
181            }
182
183            TargetResolution::Subordinates(_) | TargetResolution::AllSquadMembers(_) => {
184                // Command targets subordinates - route it
185                self.route_command(&command).await?;
186            }
187
188            TargetResolution::NotApplicable => {
189                tracing::debug!(
190                    "[{}] Command {} not applicable to this node",
191                    self.node_id,
192                    command.command_id
193                );
194            }
195        }
196
197        Ok(())
198    }
199
200    /// Route command to subordinate nodes
201    async fn route_command(&self, command: &HierarchicalCommand) -> Result<()> {
202        let resolution = self.router.resolve_target(command);
203
204        if !self.router.should_route(&resolution) {
205            return Ok(());
206        }
207
208        let targets = self.router.get_routing_targets(&resolution);
209
210        tracing::info!(
211            "[{}] Routing command {} to {} nodes",
212            self.node_id,
213            command.command_id,
214            targets.len()
215        );
216
217        // Publish command to storage for dissemination
218        let doc_id = self.storage.publish_command(command).await?;
219
220        tracing::debug!(
221            "[{}] Published command {} to storage (doc_id: {})",
222            self.node_id,
223            command.command_id,
224            doc_id
225        );
226
227        for target_id in &targets {
228            tracing::debug!(
229                "[{}] → Routing command {} to {}",
230                self.node_id,
231                command.command_id,
232                target_id
233            );
234        }
235
236        Ok(())
237    }
238
239    /// Execute a command locally
240    async fn execute_command(&self, command: &HierarchicalCommand) -> Result<()> {
241        tracing::info!(
242            "[{}] Executing command: {}",
243            self.node_id,
244            command.command_id
245        );
246
247        // Update status to EXECUTING
248        let mut status_map = self.command_status.write().await;
249        if let Some(status) = status_map.get_mut(&command.command_id) {
250            status.state = 2; // EXECUTING
251        } else {
252            status_map.insert(
253                command.command_id.clone(),
254                CommandStatus {
255                    command_id: command.command_id.clone(),
256                    state: 2, // EXECUTING
257                    acknowledgments: Vec::new(),
258                    last_updated: Some(peat_schema::common::v1::Timestamp {
259                        seconds: std::time::SystemTime::now()
260                            .duration_since(std::time::UNIX_EPOCH)
261                            .expect("system clock is before Unix epoch")
262                            .as_secs(),
263                        nanos: 0,
264                    }),
265                },
266            );
267        }
268
269        // TODO: Actual command execution logic based on command_type
270        // For now, just mark as completed
271        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
272
273        // Update status to COMPLETED
274        if let Some(status) = status_map.get_mut(&command.command_id) {
275            status.state = 3; // COMPLETED
276        }
277
278        tracing::info!(
279            "[{}] ✓ Completed command: {}",
280            self.node_id,
281            command.command_id
282        );
283
284        Ok(())
285    }
286
287    /// Check if command requires acknowledgment
288    fn requires_acknowledgment(&self, command: &HierarchicalCommand) -> bool {
289        // Check acknowledgment_policy
290        // 0 = UNSPECIFIED, 1 = NONE, 2 = RECEIVED_ONLY, 3 = COMPLETED_ONLY, 4 = BOTH
291        command.acknowledgment_policy > 1
292    }
293
294    /// Send acknowledgment for a command
295    async fn send_acknowledgment(&self, command: &HierarchicalCommand, status: i32) -> Result<()> {
296        let ack = CommandAcknowledgment {
297            command_id: command.command_id.clone(),
298            node_id: self.node_id.clone(),
299            status,
300            reason: None,
301            timestamp: Some(peat_schema::common::v1::Timestamp {
302                seconds: std::time::SystemTime::now()
303                    .duration_since(std::time::UNIX_EPOCH)
304                    .expect("system clock is before Unix epoch")
305                    .as_secs(),
306                nanos: 0,
307            }),
308        };
309
310        tracing::debug!(
311            "[{}] Sending ACK for command {} with status {}",
312            self.node_id,
313            command.command_id,
314            status
315        );
316
317        // Publish acknowledgment to storage
318        let doc_id = self.storage.publish_acknowledgment(&ack).await?;
319
320        tracing::debug!(
321            "[{}] Published acknowledgment for command {} to storage (doc_id: {})",
322            self.node_id,
323            command.command_id,
324            doc_id
325        );
326
327        // Store acknowledgment in local cache
328        self.acknowledgments
329            .write()
330            .await
331            .insert((command.command_id.clone(), self.node_id.clone()), ack);
332
333        // Record acknowledgment in timeout manager
334        let all_received = self
335            .timeout_manager
336            .record_ack(&command.command_id, &self.node_id)
337            .await;
338
339        if all_received {
340            tracing::debug!(
341                "[{}] All acknowledgments received for command {}",
342                self.node_id,
343                command.command_id
344            );
345            // Clean up timeout tracking
346            self.timeout_manager
347                .unregister_ack_timeout(&command.command_id)
348                .await?;
349        }
350
351        Ok(())
352    }
353
354    /// Get command status
355    pub async fn get_command_status(&self, command_id: &str) -> Option<CommandStatus> {
356        self.command_status.read().await.get(command_id).cloned()
357    }
358
359    /// Get all acknowledgments for a command
360    pub async fn get_command_acknowledgments(
361        &self,
362        command_id: &str,
363    ) -> Vec<CommandAcknowledgment> {
364        self.acknowledgments
365            .read()
366            .await
367            .iter()
368            .filter(|((cmd_id, _), _)| cmd_id == command_id)
369            .map(|(_, ack)| ack.clone())
370            .collect()
371    }
372
373    /// Check if command has been acknowledged by all targets
374    pub async fn is_command_acknowledged(&self, command_id: &str) -> bool {
375        let command = match self.active_commands.read().await.get(command_id) {
376            Some(cmd) => cmd.clone(),
377            None => return false,
378        };
379
380        let resolution = self.router.resolve_target(&command);
381        let targets = self.router.get_routing_targets(&resolution);
382
383        if targets.is_empty() {
384            return true;
385        }
386
387        let acks = self.get_command_acknowledgments(command_id).await;
388        let acked_nodes: std::collections::HashSet<String> =
389            acks.iter().map(|a| a.node_id.clone()).collect();
390
391        targets.iter().all(|t| acked_nodes.contains(t))
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use crate::command::ObserverHandle;
399    use peat_schema::command::v1::{command_target::Scope, CommandTarget};
400
401    // Mock storage for unit tests
402    struct MockStorage;
403
404    #[async_trait::async_trait]
405    impl CommandStorage for MockStorage {
406        async fn publish_command(&self, _command: &HierarchicalCommand) -> crate::Result<String> {
407            Ok("mock-doc-id".to_string())
408        }
409
410        async fn get_command(
411            &self,
412            _command_id: &str,
413        ) -> crate::Result<Option<HierarchicalCommand>> {
414            Ok(None)
415        }
416
417        async fn query_commands_by_target(
418            &self,
419            _target_id: &str,
420        ) -> crate::Result<Vec<HierarchicalCommand>> {
421            Ok(Vec::new())
422        }
423
424        async fn delete_command(&self, _command_id: &str) -> crate::Result<()> {
425            Ok(())
426        }
427
428        async fn publish_acknowledgment(
429            &self,
430            _ack: &CommandAcknowledgment,
431        ) -> crate::Result<String> {
432            Ok("mock-ack-id".to_string())
433        }
434
435        async fn get_acknowledgments(
436            &self,
437            _command_id: &str,
438        ) -> crate::Result<Vec<CommandAcknowledgment>> {
439            Ok(Vec::new())
440        }
441
442        async fn update_command_status(&self, _status: &CommandStatus) -> crate::Result<()> {
443            Ok(())
444        }
445
446        async fn get_command_status(
447            &self,
448            _command_id: &str,
449        ) -> crate::Result<Option<CommandStatus>> {
450            Ok(None)
451        }
452
453        async fn observe_commands(
454            &self,
455            _node_id: &str,
456            _callback: Box<
457                dyn Fn(
458                        HierarchicalCommand,
459                    )
460                        -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
461                    + Send
462                    + Sync,
463            >,
464        ) -> crate::Result<ObserverHandle> {
465            Ok(ObserverHandle::new(()))
466        }
467
468        async fn observe_acknowledgments(
469            &self,
470            _issuer_id: &str,
471            _callback: Box<
472                dyn Fn(
473                        CommandAcknowledgment,
474                    )
475                        -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
476                    + Send
477                    + Sync,
478            >,
479        ) -> crate::Result<ObserverHandle> {
480            Ok(ObserverHandle::new(()))
481        }
482    }
483
484    #[tokio::test]
485    async fn test_issue_command() {
486        let storage = Arc::new(MockStorage);
487        let coordinator = CommandCoordinator::new(
488            Some("squad-alpha".to_string()),
489            "node-1".to_string(),
490            vec!["node-1".to_string(), "node-2".to_string()],
491            storage,
492        );
493
494        let command = HierarchicalCommand {
495            command_id: "cmd-001".to_string(),
496            originator_id: "node-1".to_string(),
497            target: Some(CommandTarget {
498                scope: Scope::Individual as i32,
499                target_ids: vec!["node-2".to_string()],
500            }),
501            priority: 5,
502            acknowledgment_policy: 2, // RECEIVED_ONLY
503            ..Default::default()
504        };
505
506        coordinator.issue_command(command.clone()).await.unwrap();
507
508        let status = coordinator.get_command_status("cmd-001").await;
509        assert!(status.is_some());
510        assert_eq!(status.unwrap().state, 1); // PENDING
511    }
512
513    #[tokio::test]
514    async fn test_receive_and_execute_command() {
515        let storage = Arc::new(MockStorage);
516        let coordinator = CommandCoordinator::new(
517            Some("squad-alpha".to_string()),
518            "node-1".to_string(),
519            vec!["node-1".to_string(), "node-2".to_string()],
520            storage,
521        );
522
523        let command = HierarchicalCommand {
524            command_id: "cmd-002".to_string(),
525            originator_id: "node-leader".to_string(),
526            target: Some(CommandTarget {
527                scope: Scope::Individual as i32,
528                target_ids: vec!["node-1".to_string()],
529            }),
530            priority: 5,
531            acknowledgment_policy: 4, // BOTH
532            ..Default::default()
533        };
534
535        coordinator.receive_command(command).await.unwrap();
536
537        // Wait for execution
538        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
539
540        let status = coordinator.get_command_status("cmd-002").await;
541        assert!(status.is_some());
542        assert_eq!(status.unwrap().state, 3); // COMPLETED
543    }
544
545    #[tokio::test]
546    async fn test_acknowledgment_tracking() {
547        let storage = Arc::new(MockStorage);
548        let coordinator = CommandCoordinator::new(
549            Some("squad-alpha".to_string()),
550            "node-1".to_string(),
551            vec!["node-1".to_string(), "node-2".to_string()],
552            storage,
553        );
554
555        let command = HierarchicalCommand {
556            command_id: "cmd-003".to_string(),
557            originator_id: "node-1".to_string(),
558            target: Some(CommandTarget {
559                scope: Scope::Individual as i32,
560                target_ids: vec!["node-1".to_string()],
561            }),
562            priority: 5,
563            acknowledgment_policy: 2, // RECEIVED_ONLY
564            ..Default::default()
565        };
566
567        coordinator.receive_command(command).await.unwrap();
568
569        let acks = coordinator.get_command_acknowledgments("cmd-003").await;
570        assert!(!acks.is_empty());
571        assert_eq!(acks[0].node_id, "node-1");
572    }
573}