Skip to main content

peat_protocol/discovery/
coordinator.rs

1//! Discovery Coordinator for Phase 1
2//!
3//! Orchestrates the bootstrap phase for nodes to discover and form initial squads.
4//!
5//! # Architecture
6//!
7//! The DiscoveryCoordinator manages:
8//! - Phase state transitions (Discovery → Squad)
9//! - Discovery timeout management (default 60s)
10//! - Tracking unassigned platforms
11//! - Discovery metrics collection
12//! - Re-bootstrap on failure
13//!
14//! ## Discovery Strategies
15//!
16//! Three strategies are supported:
17//! 1. **Geographic Self-Organization** (E3.1) - Platforms form cells based on proximity
18//! 2. **C2-Directed Assignment** (E3.2) - C2 explicitly assigns nodes to squads
19//! 3. **Capability-Based Queries** (E3.3) - Platforms query and form cells by capabilities
20//!
21//! ## State Machine
22//!
23//! ```text
24//! Discovery (initial)
25//!   │
26//!   ├─ timeout expired & assigned → Squad
27//!   ├─ timeout expired & unassigned → Failed (can retry)
28//!   └─ forced transition → Squad
29//! ```
30
31use crate::storage::CellStore;
32use crate::traits::Phase;
33use crate::{Error, Result};
34use serde::{Deserialize, Serialize};
35use std::collections::{HashMap, HashSet};
36use std::time::{Duration, Instant};
37use tracing::{debug, info, instrument, warn};
38
39/// Default bootstrap timeout (60 seconds)
40pub const DEFAULT_BOOTSTRAP_TIMEOUT_SECS: u64 = 60;
41
42/// Discovery strategy selection
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum BootstrapStrategy {
45    /// Geographic proximity-based cell formation
46    Geographic,
47    /// C2-directed squad assignment
48    Directed,
49    /// Capability-based query and formation
50    CapabilityBased,
51}
52
53impl std::fmt::Display for BootstrapStrategy {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            BootstrapStrategy::Geographic => write!(f, "geographic"),
57            BootstrapStrategy::Directed => write!(f, "directed"),
58            BootstrapStrategy::CapabilityBased => write!(f, "capability_based"),
59        }
60    }
61}
62
63/// Discovery phase status
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65pub enum BootstrapStatus {
66    /// Discovery phase not started
67    NotStarted,
68    /// Discovery phase in progress
69    InProgress,
70    /// Discovery completed successfully
71    Completed,
72    /// Discovery failed (timeout with no assignment)
73    Failed,
74    /// Discovery timed out but partially completed
75    PartiallyCompleted,
76}
77
78/// Discovery metrics for analysis
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct DiscoveryMetrics {
81    /// Total nodes participating
82    pub total_platforms: usize,
83    /// Platforms successfully assigned to squads
84    pub assigned_platforms: usize,
85    /// Platforms still unassigned
86    pub unassigned_platforms: usize,
87    /// Number of cells formed
88    pub squads_formed: usize,
89    /// Time elapsed since bootstrap start (seconds)
90    pub elapsed_seconds: f64,
91    /// Discovery strategy used
92    pub strategy: BootstrapStrategy,
93    /// Final status
94    pub status: BootstrapStatus,
95    /// Total messages sent during bootstrap (if tracked)
96    pub messages_sent: Option<usize>,
97}
98
99impl DiscoveryMetrics {
100    /// Calculate assignment rate (0.0 - 1.0)
101    pub fn assignment_rate(&self) -> f32 {
102        if self.total_platforms == 0 {
103            return 0.0;
104        }
105        self.assigned_platforms as f32 / self.total_platforms as f32
106    }
107
108    /// Calculate average squad size
109    pub fn avg_squad_size(&self) -> f32 {
110        if self.squads_formed == 0 {
111            return 0.0;
112        }
113        self.assigned_platforms as f32 / self.squads_formed as f32
114    }
115
116    /// Check if bootstrap was successful (>90% assigned)
117    pub fn is_successful(&self) -> bool {
118        self.assignment_rate() > 0.9 && self.status == BootstrapStatus::Completed
119    }
120}
121
122/// Discovery Coordinator
123///
124/// Manages the bootstrap phase lifecycle for a platform or simulation.
125pub struct DiscoveryCoordinator<B: crate::sync::DataSyncBackend> {
126    /// Cell storage
127    store: CellStore<B>,
128    /// Current phase
129    current_phase: Phase,
130    /// Discovery strategy
131    strategy: BootstrapStrategy,
132    /// Discovery timeout duration
133    timeout: Duration,
134    /// Discovery start time
135    start_time: Option<Instant>,
136    /// Discovery status
137    status: BootstrapStatus,
138    /// Tracked platform IDs
139    tracked_platforms: HashSet<String>,
140    /// Node to squad assignments
141    assignments: HashMap<String, String>,
142    /// Message count (optional tracking)
143    message_count: usize,
144}
145
146impl<B: crate::sync::DataSyncBackend> DiscoveryCoordinator<B> {
147    /// Create a new bootstrap coordinator
148    pub fn new(store: CellStore<B>, strategy: BootstrapStrategy) -> Self {
149        Self {
150            store,
151            current_phase: Phase::Discovery,
152            strategy,
153            timeout: Duration::from_secs(DEFAULT_BOOTSTRAP_TIMEOUT_SECS),
154            start_time: None,
155            status: BootstrapStatus::NotStarted,
156            tracked_platforms: HashSet::new(),
157            assignments: HashMap::new(),
158            message_count: 0,
159        }
160    }
161
162    /// Set custom bootstrap timeout
163    pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
164        self.timeout = Duration::from_secs(timeout_secs);
165        self
166    }
167
168    /// Get current phase
169    pub fn phase(&self) -> Phase {
170        self.current_phase
171    }
172
173    /// Get bootstrap status
174    pub fn status(&self) -> BootstrapStatus {
175        self.status
176    }
177
178    /// Start the bootstrap phase
179    #[instrument(skip(self))]
180    pub fn start_bootstrap(&mut self, platform_ids: Vec<String>) -> Result<()> {
181        if self.status != BootstrapStatus::NotStarted {
182            return Err(Error::InvalidTransition {
183                from: format!("{:?}", self.status),
184                to: "InProgress".to_string(),
185                reason: "Discovery already started".to_string(),
186            });
187        }
188
189        info!(
190            "Starting bootstrap with {} nodes using {} strategy",
191            platform_ids.len(),
192            self.strategy
193        );
194
195        self.tracked_platforms = platform_ids.into_iter().collect();
196        self.start_time = Some(Instant::now());
197        self.status = BootstrapStatus::InProgress;
198
199        Ok(())
200    }
201
202    /// Register a platform assignment to a squad
203    #[instrument(skip(self))]
204    pub fn register_assignment(&mut self, platform_id: String, squad_id: String) -> Result<()> {
205        if self.status != BootstrapStatus::InProgress {
206            return Err(Error::InvalidTransition {
207                from: format!("{:?}", self.status),
208                to: "Assignment".to_string(),
209                reason: "Discovery not in progress".to_string(),
210            });
211        }
212
213        if !self.tracked_platforms.contains(&platform_id) {
214            warn!("Attempted to assign unknown platform: {}", platform_id);
215            return Ok(());
216        }
217
218        debug!(
219            "Registering assignment: {} → squad {}",
220            platform_id, squad_id
221        );
222
223        self.assignments.insert(platform_id, squad_id);
224        Ok(())
225    }
226
227    /// Increment message count (for metrics)
228    pub fn increment_messages(&mut self, count: usize) {
229        self.message_count += count;
230    }
231
232    /// Check if bootstrap has timed out
233    pub fn has_timed_out(&self) -> bool {
234        if let Some(start_time) = self.start_time {
235            start_time.elapsed() >= self.timeout
236        } else {
237            false
238        }
239    }
240
241    /// Get unassigned platform IDs
242    pub fn unassigned_platforms(&self) -> Vec<String> {
243        self.tracked_platforms
244            .iter()
245            .filter(|id| !self.assignments.contains_key(*id))
246            .cloned()
247            .collect()
248    }
249
250    /// Get assigned platform IDs
251    pub fn assigned_platforms(&self) -> Vec<String> {
252        self.assignments.keys().cloned().collect()
253    }
254
255    /// Get number of unique cells formed
256    pub async fn squads_formed(&self) -> Result<usize> {
257        let cells = self.store.get_valid_cells().await?;
258        Ok(cells.len())
259    }
260
261    /// Check if bootstrap is complete
262    ///
263    /// Discovery is complete when:
264    /// - Timeout has been reached, OR
265    /// - All nodes have been assigned (100% completion)
266    #[instrument(skip(self))]
267    pub async fn check_completion(&mut self) -> Result<bool> {
268        if self.status != BootstrapStatus::InProgress {
269            return Ok(false);
270        }
271
272        let all_assigned = self.unassigned_platforms().is_empty();
273        let timed_out = self.has_timed_out();
274
275        if all_assigned {
276            info!("Discovery completed: all nodes assigned");
277            self.status = BootstrapStatus::Completed;
278            return Ok(true);
279        }
280
281        if timed_out {
282            let assignment_rate =
283                self.assignments.len() as f32 / self.tracked_platforms.len() as f32;
284
285            if assignment_rate > 0.9 {
286                info!(
287                    "Discovery timed out but mostly successful ({:.1}% assigned)",
288                    assignment_rate * 100.0
289                );
290                self.status = BootstrapStatus::Completed;
291            } else if assignment_rate > 0.5 {
292                warn!(
293                    "Discovery timed out with partial completion ({:.1}% assigned)",
294                    assignment_rate * 100.0
295                );
296                self.status = BootstrapStatus::PartiallyCompleted;
297            } else {
298                warn!(
299                    "Discovery failed: timeout with only {:.1}% assigned",
300                    assignment_rate * 100.0
301                );
302                self.status = BootstrapStatus::Failed;
303            }
304            return Ok(true);
305        }
306
307        Ok(false)
308    }
309
310    /// Transition to Cell phase
311    ///
312    /// Should be called after bootstrap completes successfully or times out.
313    #[instrument(skip(self))]
314    pub async fn transition_to_squad_phase(&mut self) -> Result<()> {
315        if self.current_phase != Phase::Discovery {
316            return Err(Error::InvalidTransition {
317                from: format!("{:?}", self.current_phase),
318                to: "Squad".to_string(),
319                reason: "Not in Discovery phase".to_string(),
320            });
321        }
322
323        if self.status == BootstrapStatus::InProgress {
324            return Err(Error::InvalidTransition {
325                from: format!("{:?}", self.status),
326                to: "Squad".to_string(),
327                reason: "Discovery still in progress".to_string(),
328            });
329        }
330
331        info!("Transitioning from Discovery to Cell phase");
332        self.current_phase = Phase::Cell;
333
334        Ok(())
335    }
336
337    /// Reset bootstrap state for retry
338    #[instrument(skip(self))]
339    pub fn reset_for_retry(&mut self) -> Result<()> {
340        if self.status == BootstrapStatus::InProgress {
341            return Err(Error::InvalidTransition {
342                from: "InProgress".to_string(),
343                to: "Reset".to_string(),
344                reason: "Cannot reset while bootstrap is in progress".to_string(),
345            });
346        }
347
348        info!("Resetting bootstrap coordinator for retry");
349
350        self.status = BootstrapStatus::NotStarted;
351        self.start_time = None;
352        self.assignments.clear();
353        self.message_count = 0;
354        // Keep tracked_platforms for retry
355
356        Ok(())
357    }
358
359    /// Get current bootstrap metrics
360    #[instrument(skip(self))]
361    pub async fn get_metrics(&self) -> Result<DiscoveryMetrics> {
362        let elapsed = if let Some(start_time) = self.start_time {
363            start_time.elapsed().as_secs_f64()
364        } else {
365            0.0
366        };
367
368        let squads_formed = self.squads_formed().await?;
369
370        Ok(DiscoveryMetrics {
371            total_platforms: self.tracked_platforms.len(),
372            assigned_platforms: self.assignments.len(),
373            unassigned_platforms: self.unassigned_platforms().len(),
374            squads_formed,
375            elapsed_seconds: elapsed,
376            strategy: self.strategy,
377            status: self.status,
378            messages_sent: Some(self.message_count),
379        })
380    }
381
382    /// Force complete bootstrap (for testing or emergency)
383    #[instrument(skip(self))]
384    pub fn force_complete(&mut self) -> Result<()> {
385        if self.status != BootstrapStatus::InProgress {
386            return Err(Error::InvalidTransition {
387                from: format!("{:?}", self.status),
388                to: "Completed".to_string(),
389                reason: "Discovery not in progress".to_string(),
390            });
391        }
392
393        warn!("Force completing bootstrap");
394        self.status = BootstrapStatus::Completed;
395        Ok(())
396    }
397}