Skip to main content

peat_protocol/discovery/
directed.rs

1//! C2-Directed Assignment for bootstrap phase
2//!
3//! Implements Command & Control (C2) directed cell formation where C2 explicitly
4//! assigns nodes to cells based on operational requirements.
5//!
6//! # Architecture
7//!
8//! Unlike autonomous geographic self-organization (E3.1), C2-directed assignment
9//! provides top-down cell formation with explicit authority and validation:
10//!
11//! ## Assignment Flow
12//!
13//! 1. **C2 Issues Assignment**: C2 broadcasts `CellAssignment` messages
14//! 2. **Node Receives**: Platforms observe assignments via the sync backend
15//! 3. **Validation**: Node validates assignment (exists, not full, authorized)
16//! 4. **Execution**: Node joins squad and updates state
17//! 5. **Confirmation**: Assignment status tracked in distributed state
18//!
19//! ## Message Format
20//!
21//! ```json
22//! {
23//!   "assignment_id": "assign_123",
24//!   "squad_id": "squad_alpha",
25//!   "platform_ids": ["node_1", "node_2", "node_3"],
26//!   "issued_by": "c2_controller_1",
27//!   "timestamp": 1698765432,
28//!   "priority": "high"
29//! }
30//! ```
31//!
32//! ## Use Cases
33//!
34//! - **Pre-planned missions**: Assign nodes based on pre-mission planning
35//! - **Capability requirements**: Form cells with specific capability mixes
36//! - **Command override**: Override autonomous formation when needed
37//! - **Emergency reconstitution**: Rebuild cells after casualties/failures
38
39use crate::models::CellStateExt;
40use crate::storage::CellStore;
41use crate::{Error, Result};
42use serde::{Deserialize, Serialize};
43use std::collections::HashMap;
44use tracing::{debug, info, instrument, warn};
45
46/// Assignment priority levels
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
48pub enum AssignmentPriority {
49    /// Low priority - can be deferred
50    Low,
51    /// Normal priority - standard assignment
52    #[default]
53    Normal,
54    /// High priority - process immediately
55    High,
56    /// Critical priority - override existing assignments
57    Critical,
58}
59
60/// Assignment status tracking
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum AssignmentStatus {
63    /// Assignment issued but not yet processed
64    Pending,
65    /// Assignment accepted and in progress
66    InProgress,
67    /// Assignment completed successfully
68    Completed,
69    /// Assignment failed validation or execution
70    Failed { reason: String },
71}
72
73/// Cell assignment message from C2
74///
75/// This message is broadcast via the sync backend and contains explicit platform-to-squad
76/// assignments. Platforms observe these messages and execute them if valid.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct CellAssignment {
79    /// Unique identifier for this assignment
80    pub assignment_id: String,
81    /// Target squad ID
82    pub squad_id: String,
83    /// List of platform IDs to assign to this squad
84    pub platform_ids: Vec<String>,
85    /// C2 authority issuing the assignment
86    pub issued_by: String,
87    /// Unix timestamp when assignment was issued
88    pub timestamp: u64,
89    /// Assignment priority
90    pub priority: AssignmentPriority,
91    /// Current status of the assignment
92    pub status: AssignmentStatus,
93    /// Optional operational context or reason
94    pub context: Option<String>,
95}
96
97impl CellAssignment {
98    /// Create a new squad assignment
99    pub fn new(
100        assignment_id: String,
101        squad_id: String,
102        platform_ids: Vec<String>,
103        issued_by: String,
104        priority: AssignmentPriority,
105    ) -> Self {
106        let timestamp = std::time::SystemTime::now()
107            .duration_since(std::time::UNIX_EPOCH)
108            .unwrap()
109            .as_secs();
110
111        Self {
112            assignment_id,
113            squad_id,
114            platform_ids,
115            issued_by,
116            timestamp,
117            priority,
118            status: AssignmentStatus::Pending,
119            context: None,
120        }
121    }
122
123    /// Add operational context to the assignment
124    pub fn with_context(mut self, context: String) -> Self {
125        self.context = Some(context);
126        self
127    }
128
129    /// Check if assignment includes a specific platform
130    pub fn includes_platform(&self, platform_id: &str) -> bool {
131        self.platform_ids.iter().any(|id| id == platform_id)
132    }
133
134    /// Mark assignment as in progress
135    pub fn mark_in_progress(&mut self) {
136        self.status = AssignmentStatus::InProgress;
137    }
138
139    /// Mark assignment as completed
140    pub fn mark_completed(&mut self) {
141        self.status = AssignmentStatus::Completed;
142    }
143
144    /// Mark assignment as failed
145    pub fn mark_failed(&mut self, reason: String) {
146        self.status = AssignmentStatus::Failed { reason };
147    }
148
149    /// Check if assignment is still active (pending or in progress)
150    pub fn is_active(&self) -> bool {
151        matches!(
152            self.status,
153            AssignmentStatus::Pending | AssignmentStatus::InProgress
154        )
155    }
156}
157
158/// Assignment validation result
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub enum ValidationResult {
161    /// Assignment is valid and can be executed
162    Valid,
163    /// Cell does not exist
164    SquadNotFound,
165    /// Cell is full and cannot accept more members
166    SquadFull,
167    /// Node is already in another squad
168    PlatformAlreadyAssigned { current_squad: String },
169    /// Assignment is from unauthorized source
170    Unauthorized,
171    /// Assignment has expired
172    Expired,
173    /// Other validation error
174    Invalid { reason: String },
175}
176
177/// C2-Directed Assignment Manager
178///
179/// Processes C2-issued squad assignments and manages assignment lifecycle.
180pub struct DirectedAssignmentManager<B: crate::sync::DataSyncBackend> {
181    /// Cell storage
182    store: CellStore<B>,
183    /// Active assignments being tracked
184    assignments: HashMap<String, CellAssignment>,
185    /// Node ID of this node
186    my_platform_id: String,
187    /// Assignment timeout (seconds)
188    assignment_timeout: u64,
189}
190
191impl<B: crate::sync::DataSyncBackend> DirectedAssignmentManager<B> {
192    /// Create a new directed assignment manager
193    pub fn new(store: CellStore<B>, my_platform_id: String) -> Self {
194        Self {
195            store,
196            assignments: HashMap::new(),
197            my_platform_id,
198            assignment_timeout: 300, // 5 minutes default
199        }
200    }
201
202    /// Set assignment timeout
203    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
204        self.assignment_timeout = timeout_secs;
205        self
206    }
207
208    /// Process a received squad assignment
209    #[instrument(skip(self, assignment))]
210    pub async fn process_assignment(&mut self, assignment: CellAssignment) -> Result<()> {
211        info!(
212            "Processing assignment {} for squad {}",
213            assignment.assignment_id, assignment.squad_id
214        );
215
216        // Check if this assignment applies to us
217        if !assignment.includes_platform(&self.my_platform_id) {
218            debug!(
219                "Assignment {} does not include platform {}",
220                assignment.assignment_id, self.my_platform_id
221            );
222            return Ok(());
223        }
224
225        // Validate the assignment
226        let validation = self.validate_assignment(&assignment).await?;
227        if validation != ValidationResult::Valid {
228            warn!(
229                "Assignment {} failed validation: {:?}",
230                assignment.assignment_id, validation
231            );
232            return Err(Error::InvalidTransition {
233                from: "Pending assignment".to_string(),
234                to: "Executed assignment".to_string(),
235                reason: format!("Assignment validation failed: {:?}", validation),
236            });
237        }
238
239        // Store assignment
240        self.assignments
241            .insert(assignment.assignment_id.clone(), assignment.clone());
242
243        // Execute the assignment
244        self.execute_assignment(assignment).await?;
245
246        Ok(())
247    }
248
249    /// Validate a squad assignment
250    #[instrument(skip(self, assignment))]
251    async fn validate_assignment(&self, assignment: &CellAssignment) -> Result<ValidationResult> {
252        debug!("Validating assignment {}", assignment.assignment_id);
253
254        // Check if assignment has expired
255        let current_time = std::time::SystemTime::now()
256            .duration_since(std::time::UNIX_EPOCH)
257            .unwrap()
258            .as_secs();
259
260        if current_time.saturating_sub(assignment.timestamp) > self.assignment_timeout {
261            return Ok(ValidationResult::Expired);
262        }
263
264        // Check if squad exists
265        let squad = self.store.get_cell(&assignment.squad_id).await?;
266        if squad.is_none() {
267            return Ok(ValidationResult::SquadNotFound);
268        }
269
270        let squad = squad.unwrap();
271
272        // Check if squad can accept new members
273        if squad.is_full() {
274            return Ok(ValidationResult::SquadFull);
275        }
276
277        // Check if platform is already in a squad
278        if let Some(current_squad) = self.get_current_squad(&self.my_platform_id).await? {
279            if current_squad != assignment.squad_id {
280                return Ok(ValidationResult::PlatformAlreadyAssigned {
281                    current_squad: current_squad.clone(),
282                });
283            }
284        }
285
286        Ok(ValidationResult::Valid)
287    }
288
289    /// Execute a validated assignment
290    #[instrument(skip(self, assignment))]
291    async fn execute_assignment(&mut self, mut assignment: CellAssignment) -> Result<()> {
292        info!(
293            "Executing assignment {} - joining squad {}",
294            assignment.assignment_id, assignment.squad_id
295        );
296
297        assignment.mark_in_progress();
298
299        // Add platform to squad
300        self.store
301            .add_member(&assignment.squad_id, self.my_platform_id.clone())
302            .await?;
303
304        assignment.mark_completed();
305        self.assignments
306            .insert(assignment.assignment_id.clone(), assignment.clone());
307
308        info!(
309            "Assignment {} completed successfully",
310            assignment.assignment_id
311        );
312
313        Ok(())
314    }
315
316    /// Get the current squad for a platform
317    async fn get_current_squad(&self, platform_id: &str) -> Result<Option<String>> {
318        let valid_squads = self.store.get_valid_cells().await?;
319
320        for squad in valid_squads {
321            if squad.is_member(platform_id) {
322                return Ok(squad.config.as_ref().map(|c| c.id.clone()));
323            }
324        }
325
326        Ok(None)
327    }
328
329    /// Get assignment by ID
330    pub fn get_assignment(&self, assignment_id: &str) -> Option<&CellAssignment> {
331        self.assignments.get(assignment_id)
332    }
333
334    /// Get all active assignments
335    pub fn active_assignments(&self) -> Vec<&CellAssignment> {
336        self.assignments
337            .values()
338            .filter(|a| a.is_active())
339            .collect()
340    }
341
342    /// Clean up expired assignments
343    pub fn cleanup_expired(&mut self) {
344        let current_time = std::time::SystemTime::now()
345            .duration_since(std::time::UNIX_EPOCH)
346            .unwrap()
347            .as_secs();
348
349        self.assignments.retain(|_, assignment| {
350            current_time.saturating_sub(assignment.timestamp) <= self.assignment_timeout
351        });
352    }
353}