1use crate::storage::CellStore;
32use crate::traits::Phase;
33use crate::{Error, Result};
34use serde::{Deserialize, Serialize};
35use std::collections::{HashMap, HashSet};
36use std::time::{Duration, Instant};
37use tracing::{debug, info, instrument, warn};
38
39pub const DEFAULT_BOOTSTRAP_TIMEOUT_SECS: u64 = 60;
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum BootstrapStrategy {
45 Geographic,
47 Directed,
49 CapabilityBased,
51}
52
53impl std::fmt::Display for BootstrapStrategy {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 BootstrapStrategy::Geographic => write!(f, "geographic"),
57 BootstrapStrategy::Directed => write!(f, "directed"),
58 BootstrapStrategy::CapabilityBased => write!(f, "capability_based"),
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65pub enum BootstrapStatus {
66 NotStarted,
68 InProgress,
70 Completed,
72 Failed,
74 PartiallyCompleted,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct DiscoveryMetrics {
81 pub total_platforms: usize,
83 pub assigned_platforms: usize,
85 pub unassigned_platforms: usize,
87 pub squads_formed: usize,
89 pub elapsed_seconds: f64,
91 pub strategy: BootstrapStrategy,
93 pub status: BootstrapStatus,
95 pub messages_sent: Option<usize>,
97}
98
99impl DiscoveryMetrics {
100 pub fn assignment_rate(&self) -> f32 {
102 if self.total_platforms == 0 {
103 return 0.0;
104 }
105 self.assigned_platforms as f32 / self.total_platforms as f32
106 }
107
108 pub fn avg_squad_size(&self) -> f32 {
110 if self.squads_formed == 0 {
111 return 0.0;
112 }
113 self.assigned_platforms as f32 / self.squads_formed as f32
114 }
115
116 pub fn is_successful(&self) -> bool {
118 self.assignment_rate() > 0.9 && self.status == BootstrapStatus::Completed
119 }
120}
121
122pub struct DiscoveryCoordinator<B: crate::sync::DataSyncBackend> {
126 store: CellStore<B>,
128 current_phase: Phase,
130 strategy: BootstrapStrategy,
132 timeout: Duration,
134 start_time: Option<Instant>,
136 status: BootstrapStatus,
138 tracked_platforms: HashSet<String>,
140 assignments: HashMap<String, String>,
142 message_count: usize,
144}
145
146impl<B: crate::sync::DataSyncBackend> DiscoveryCoordinator<B> {
147 pub fn new(store: CellStore<B>, strategy: BootstrapStrategy) -> Self {
149 Self {
150 store,
151 current_phase: Phase::Discovery,
152 strategy,
153 timeout: Duration::from_secs(DEFAULT_BOOTSTRAP_TIMEOUT_SECS),
154 start_time: None,
155 status: BootstrapStatus::NotStarted,
156 tracked_platforms: HashSet::new(),
157 assignments: HashMap::new(),
158 message_count: 0,
159 }
160 }
161
162 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
164 self.timeout = Duration::from_secs(timeout_secs);
165 self
166 }
167
168 pub fn phase(&self) -> Phase {
170 self.current_phase
171 }
172
173 pub fn status(&self) -> BootstrapStatus {
175 self.status
176 }
177
178 #[instrument(skip(self))]
180 pub fn start_bootstrap(&mut self, platform_ids: Vec<String>) -> Result<()> {
181 if self.status != BootstrapStatus::NotStarted {
182 return Err(Error::InvalidTransition {
183 from: format!("{:?}", self.status),
184 to: "InProgress".to_string(),
185 reason: "Discovery already started".to_string(),
186 });
187 }
188
189 info!(
190 "Starting bootstrap with {} nodes using {} strategy",
191 platform_ids.len(),
192 self.strategy
193 );
194
195 self.tracked_platforms = platform_ids.into_iter().collect();
196 self.start_time = Some(Instant::now());
197 self.status = BootstrapStatus::InProgress;
198
199 Ok(())
200 }
201
202 #[instrument(skip(self))]
204 pub fn register_assignment(&mut self, platform_id: String, squad_id: String) -> Result<()> {
205 if self.status != BootstrapStatus::InProgress {
206 return Err(Error::InvalidTransition {
207 from: format!("{:?}", self.status),
208 to: "Assignment".to_string(),
209 reason: "Discovery not in progress".to_string(),
210 });
211 }
212
213 if !self.tracked_platforms.contains(&platform_id) {
214 warn!("Attempted to assign unknown platform: {}", platform_id);
215 return Ok(());
216 }
217
218 debug!(
219 "Registering assignment: {} → squad {}",
220 platform_id, squad_id
221 );
222
223 self.assignments.insert(platform_id, squad_id);
224 Ok(())
225 }
226
227 pub fn increment_messages(&mut self, count: usize) {
229 self.message_count += count;
230 }
231
232 pub fn has_timed_out(&self) -> bool {
234 if let Some(start_time) = self.start_time {
235 start_time.elapsed() >= self.timeout
236 } else {
237 false
238 }
239 }
240
241 pub fn unassigned_platforms(&self) -> Vec<String> {
243 self.tracked_platforms
244 .iter()
245 .filter(|id| !self.assignments.contains_key(*id))
246 .cloned()
247 .collect()
248 }
249
250 pub fn assigned_platforms(&self) -> Vec<String> {
252 self.assignments.keys().cloned().collect()
253 }
254
255 pub async fn squads_formed(&self) -> Result<usize> {
257 let cells = self.store.get_valid_cells().await?;
258 Ok(cells.len())
259 }
260
261 #[instrument(skip(self))]
267 pub async fn check_completion(&mut self) -> Result<bool> {
268 if self.status != BootstrapStatus::InProgress {
269 return Ok(false);
270 }
271
272 let all_assigned = self.unassigned_platforms().is_empty();
273 let timed_out = self.has_timed_out();
274
275 if all_assigned {
276 info!("Discovery completed: all nodes assigned");
277 self.status = BootstrapStatus::Completed;
278 return Ok(true);
279 }
280
281 if timed_out {
282 let assignment_rate =
283 self.assignments.len() as f32 / self.tracked_platforms.len() as f32;
284
285 if assignment_rate > 0.9 {
286 info!(
287 "Discovery timed out but mostly successful ({:.1}% assigned)",
288 assignment_rate * 100.0
289 );
290 self.status = BootstrapStatus::Completed;
291 } else if assignment_rate > 0.5 {
292 warn!(
293 "Discovery timed out with partial completion ({:.1}% assigned)",
294 assignment_rate * 100.0
295 );
296 self.status = BootstrapStatus::PartiallyCompleted;
297 } else {
298 warn!(
299 "Discovery failed: timeout with only {:.1}% assigned",
300 assignment_rate * 100.0
301 );
302 self.status = BootstrapStatus::Failed;
303 }
304 return Ok(true);
305 }
306
307 Ok(false)
308 }
309
310 #[instrument(skip(self))]
314 pub async fn transition_to_squad_phase(&mut self) -> Result<()> {
315 if self.current_phase != Phase::Discovery {
316 return Err(Error::InvalidTransition {
317 from: format!("{:?}", self.current_phase),
318 to: "Squad".to_string(),
319 reason: "Not in Discovery phase".to_string(),
320 });
321 }
322
323 if self.status == BootstrapStatus::InProgress {
324 return Err(Error::InvalidTransition {
325 from: format!("{:?}", self.status),
326 to: "Squad".to_string(),
327 reason: "Discovery still in progress".to_string(),
328 });
329 }
330
331 info!("Transitioning from Discovery to Cell phase");
332 self.current_phase = Phase::Cell;
333
334 Ok(())
335 }
336
337 #[instrument(skip(self))]
339 pub fn reset_for_retry(&mut self) -> Result<()> {
340 if self.status == BootstrapStatus::InProgress {
341 return Err(Error::InvalidTransition {
342 from: "InProgress".to_string(),
343 to: "Reset".to_string(),
344 reason: "Cannot reset while bootstrap is in progress".to_string(),
345 });
346 }
347
348 info!("Resetting bootstrap coordinator for retry");
349
350 self.status = BootstrapStatus::NotStarted;
351 self.start_time = None;
352 self.assignments.clear();
353 self.message_count = 0;
354 Ok(())
357 }
358
359 #[instrument(skip(self))]
361 pub async fn get_metrics(&self) -> Result<DiscoveryMetrics> {
362 let elapsed = if let Some(start_time) = self.start_time {
363 start_time.elapsed().as_secs_f64()
364 } else {
365 0.0
366 };
367
368 let squads_formed = self.squads_formed().await?;
369
370 Ok(DiscoveryMetrics {
371 total_platforms: self.tracked_platforms.len(),
372 assigned_platforms: self.assignments.len(),
373 unassigned_platforms: self.unassigned_platforms().len(),
374 squads_formed,
375 elapsed_seconds: elapsed,
376 strategy: self.strategy,
377 status: self.status,
378 messages_sent: Some(self.message_count),
379 })
380 }
381
382 #[instrument(skip(self))]
384 pub fn force_complete(&mut self) -> Result<()> {
385 if self.status != BootstrapStatus::InProgress {
386 return Err(Error::InvalidTransition {
387 from: format!("{:?}", self.status),
388 to: "Completed".to_string(),
389 reason: "Discovery not in progress".to_string(),
390 });
391 }
392
393 warn!("Force completing bootstrap");
394 self.status = BootstrapStatus::Completed;
395 Ok(())
396 }
397}