use crate::storage::CellStore;
use crate::traits::Phase;
use crate::{Error, Result};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
use tracing::{debug, info, instrument, warn};
pub const DEFAULT_BOOTSTRAP_TIMEOUT_SECS: u64 = 60;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BootstrapStrategy {
Geographic,
Directed,
CapabilityBased,
}
impl std::fmt::Display for BootstrapStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BootstrapStrategy::Geographic => write!(f, "geographic"),
BootstrapStrategy::Directed => write!(f, "directed"),
BootstrapStrategy::CapabilityBased => write!(f, "capability_based"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum BootstrapStatus {
NotStarted,
InProgress,
Completed,
Failed,
PartiallyCompleted,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveryMetrics {
pub total_platforms: usize,
pub assigned_platforms: usize,
pub unassigned_platforms: usize,
pub squads_formed: usize,
pub elapsed_seconds: f64,
pub strategy: BootstrapStrategy,
pub status: BootstrapStatus,
pub messages_sent: Option<usize>,
}
impl DiscoveryMetrics {
pub fn assignment_rate(&self) -> f32 {
if self.total_platforms == 0 {
return 0.0;
}
self.assigned_platforms as f32 / self.total_platforms as f32
}
pub fn avg_squad_size(&self) -> f32 {
if self.squads_formed == 0 {
return 0.0;
}
self.assigned_platforms as f32 / self.squads_formed as f32
}
pub fn is_successful(&self) -> bool {
self.assignment_rate() > 0.9 && self.status == BootstrapStatus::Completed
}
}
pub struct DiscoveryCoordinator<B: crate::sync::DataSyncBackend> {
store: CellStore<B>,
current_phase: Phase,
strategy: BootstrapStrategy,
timeout: Duration,
start_time: Option<Instant>,
status: BootstrapStatus,
tracked_platforms: HashSet<String>,
assignments: HashMap<String, String>,
message_count: usize,
}
impl<B: crate::sync::DataSyncBackend> DiscoveryCoordinator<B> {
pub fn new(store: CellStore<B>, strategy: BootstrapStrategy) -> Self {
Self {
store,
current_phase: Phase::Discovery,
strategy,
timeout: Duration::from_secs(DEFAULT_BOOTSTRAP_TIMEOUT_SECS),
start_time: None,
status: BootstrapStatus::NotStarted,
tracked_platforms: HashSet::new(),
assignments: HashMap::new(),
message_count: 0,
}
}
pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
self.timeout = Duration::from_secs(timeout_secs);
self
}
pub fn phase(&self) -> Phase {
self.current_phase
}
pub fn status(&self) -> BootstrapStatus {
self.status
}
#[instrument(skip(self))]
pub fn start_bootstrap(&mut self, platform_ids: Vec<String>) -> Result<()> {
if self.status != BootstrapStatus::NotStarted {
return Err(Error::InvalidTransition {
from: format!("{:?}", self.status),
to: "InProgress".to_string(),
reason: "Discovery already started".to_string(),
});
}
info!(
"Starting bootstrap with {} nodes using {} strategy",
platform_ids.len(),
self.strategy
);
self.tracked_platforms = platform_ids.into_iter().collect();
self.start_time = Some(Instant::now());
self.status = BootstrapStatus::InProgress;
Ok(())
}
#[instrument(skip(self))]
pub fn register_assignment(&mut self, platform_id: String, squad_id: String) -> Result<()> {
if self.status != BootstrapStatus::InProgress {
return Err(Error::InvalidTransition {
from: format!("{:?}", self.status),
to: "Assignment".to_string(),
reason: "Discovery not in progress".to_string(),
});
}
if !self.tracked_platforms.contains(&platform_id) {
warn!("Attempted to assign unknown platform: {}", platform_id);
return Ok(());
}
debug!(
"Registering assignment: {} → squad {}",
platform_id, squad_id
);
self.assignments.insert(platform_id, squad_id);
Ok(())
}
pub fn increment_messages(&mut self, count: usize) {
self.message_count += count;
}
pub fn has_timed_out(&self) -> bool {
if let Some(start_time) = self.start_time {
start_time.elapsed() >= self.timeout
} else {
false
}
}
pub fn unassigned_platforms(&self) -> Vec<String> {
self.tracked_platforms
.iter()
.filter(|id| !self.assignments.contains_key(*id))
.cloned()
.collect()
}
pub fn assigned_platforms(&self) -> Vec<String> {
self.assignments.keys().cloned().collect()
}
pub async fn squads_formed(&self) -> Result<usize> {
let cells = self.store.get_valid_cells().await?;
Ok(cells.len())
}
#[instrument(skip(self))]
pub async fn check_completion(&mut self) -> Result<bool> {
if self.status != BootstrapStatus::InProgress {
return Ok(false);
}
let all_assigned = self.unassigned_platforms().is_empty();
let timed_out = self.has_timed_out();
if all_assigned {
info!("Discovery completed: all nodes assigned");
self.status = BootstrapStatus::Completed;
return Ok(true);
}
if timed_out {
let assignment_rate =
self.assignments.len() as f32 / self.tracked_platforms.len() as f32;
if assignment_rate > 0.9 {
info!(
"Discovery timed out but mostly successful ({:.1}% assigned)",
assignment_rate * 100.0
);
self.status = BootstrapStatus::Completed;
} else if assignment_rate > 0.5 {
warn!(
"Discovery timed out with partial completion ({:.1}% assigned)",
assignment_rate * 100.0
);
self.status = BootstrapStatus::PartiallyCompleted;
} else {
warn!(
"Discovery failed: timeout with only {:.1}% assigned",
assignment_rate * 100.0
);
self.status = BootstrapStatus::Failed;
}
return Ok(true);
}
Ok(false)
}
#[instrument(skip(self))]
pub async fn transition_to_squad_phase(&mut self) -> Result<()> {
if self.current_phase != Phase::Discovery {
return Err(Error::InvalidTransition {
from: format!("{:?}", self.current_phase),
to: "Squad".to_string(),
reason: "Not in Discovery phase".to_string(),
});
}
if self.status == BootstrapStatus::InProgress {
return Err(Error::InvalidTransition {
from: format!("{:?}", self.status),
to: "Squad".to_string(),
reason: "Discovery still in progress".to_string(),
});
}
info!("Transitioning from Discovery to Cell phase");
self.current_phase = Phase::Cell;
Ok(())
}
#[instrument(skip(self))]
pub fn reset_for_retry(&mut self) -> Result<()> {
if self.status == BootstrapStatus::InProgress {
return Err(Error::InvalidTransition {
from: "InProgress".to_string(),
to: "Reset".to_string(),
reason: "Cannot reset while bootstrap is in progress".to_string(),
});
}
info!("Resetting bootstrap coordinator for retry");
self.status = BootstrapStatus::NotStarted;
self.start_time = None;
self.assignments.clear();
self.message_count = 0;
Ok(())
}
#[instrument(skip(self))]
pub async fn get_metrics(&self) -> Result<DiscoveryMetrics> {
let elapsed = if let Some(start_time) = self.start_time {
start_time.elapsed().as_secs_f64()
} else {
0.0
};
let squads_formed = self.squads_formed().await?;
Ok(DiscoveryMetrics {
total_platforms: self.tracked_platforms.len(),
assigned_platforms: self.assignments.len(),
unassigned_platforms: self.unassigned_platforms().len(),
squads_formed,
elapsed_seconds: elapsed,
strategy: self.strategy,
status: self.status,
messages_sent: Some(self.message_count),
})
}
#[instrument(skip(self))]
pub fn force_complete(&mut self) -> Result<()> {
if self.status != BootstrapStatus::InProgress {
return Err(Error::InvalidTransition {
from: format!("{:?}", self.status),
to: "Completed".to_string(),
reason: "Discovery not in progress".to_string(),
});
}
warn!("Force completing bootstrap");
self.status = BootstrapStatus::Completed;
Ok(())
}
}