use crate::models::CellStateExt;
use crate::storage::CellStore;
use crate::{Error, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{debug, info, instrument, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum AssignmentPriority {
Low,
#[default]
Normal,
High,
Critical,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AssignmentStatus {
Pending,
InProgress,
Completed,
Failed { reason: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CellAssignment {
pub assignment_id: String,
pub squad_id: String,
pub platform_ids: Vec<String>,
pub issued_by: String,
pub timestamp: u64,
pub priority: AssignmentPriority,
pub status: AssignmentStatus,
pub context: Option<String>,
}
impl CellAssignment {
pub fn new(
assignment_id: String,
squad_id: String,
platform_ids: Vec<String>,
issued_by: String,
priority: AssignmentPriority,
) -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
Self {
assignment_id,
squad_id,
platform_ids,
issued_by,
timestamp,
priority,
status: AssignmentStatus::Pending,
context: None,
}
}
pub fn with_context(mut self, context: String) -> Self {
self.context = Some(context);
self
}
pub fn includes_platform(&self, platform_id: &str) -> bool {
self.platform_ids.iter().any(|id| id == platform_id)
}
pub fn mark_in_progress(&mut self) {
self.status = AssignmentStatus::InProgress;
}
pub fn mark_completed(&mut self) {
self.status = AssignmentStatus::Completed;
}
pub fn mark_failed(&mut self, reason: String) {
self.status = AssignmentStatus::Failed { reason };
}
pub fn is_active(&self) -> bool {
matches!(
self.status,
AssignmentStatus::Pending | AssignmentStatus::InProgress
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ValidationResult {
Valid,
SquadNotFound,
SquadFull,
PlatformAlreadyAssigned { current_squad: String },
Unauthorized,
Expired,
Invalid { reason: String },
}
pub struct DirectedAssignmentManager<B: crate::sync::DataSyncBackend> {
store: CellStore<B>,
assignments: HashMap<String, CellAssignment>,
my_platform_id: String,
assignment_timeout: u64,
}
impl<B: crate::sync::DataSyncBackend> DirectedAssignmentManager<B> {
pub fn new(store: CellStore<B>, my_platform_id: String) -> Self {
Self {
store,
assignments: HashMap::new(),
my_platform_id,
assignment_timeout: 300, }
}
pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
self.assignment_timeout = timeout_secs;
self
}
#[instrument(skip(self, assignment))]
pub async fn process_assignment(&mut self, assignment: CellAssignment) -> Result<()> {
info!(
"Processing assignment {} for squad {}",
assignment.assignment_id, assignment.squad_id
);
if !assignment.includes_platform(&self.my_platform_id) {
debug!(
"Assignment {} does not include platform {}",
assignment.assignment_id, self.my_platform_id
);
return Ok(());
}
let validation = self.validate_assignment(&assignment).await?;
if validation != ValidationResult::Valid {
warn!(
"Assignment {} failed validation: {:?}",
assignment.assignment_id, validation
);
return Err(Error::InvalidTransition {
from: "Pending assignment".to_string(),
to: "Executed assignment".to_string(),
reason: format!("Assignment validation failed: {:?}", validation),
});
}
self.assignments
.insert(assignment.assignment_id.clone(), assignment.clone());
self.execute_assignment(assignment).await?;
Ok(())
}
#[instrument(skip(self, assignment))]
async fn validate_assignment(&self, assignment: &CellAssignment) -> Result<ValidationResult> {
debug!("Validating assignment {}", assignment.assignment_id);
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
if current_time.saturating_sub(assignment.timestamp) > self.assignment_timeout {
return Ok(ValidationResult::Expired);
}
let squad = self.store.get_cell(&assignment.squad_id).await?;
if squad.is_none() {
return Ok(ValidationResult::SquadNotFound);
}
let squad = squad.unwrap();
if squad.is_full() {
return Ok(ValidationResult::SquadFull);
}
if let Some(current_squad) = self.get_current_squad(&self.my_platform_id).await? {
if current_squad != assignment.squad_id {
return Ok(ValidationResult::PlatformAlreadyAssigned {
current_squad: current_squad.clone(),
});
}
}
Ok(ValidationResult::Valid)
}
#[instrument(skip(self, assignment))]
async fn execute_assignment(&mut self, mut assignment: CellAssignment) -> Result<()> {
info!(
"Executing assignment {} - joining squad {}",
assignment.assignment_id, assignment.squad_id
);
assignment.mark_in_progress();
self.store
.add_member(&assignment.squad_id, self.my_platform_id.clone())
.await?;
assignment.mark_completed();
self.assignments
.insert(assignment.assignment_id.clone(), assignment.clone());
info!(
"Assignment {} completed successfully",
assignment.assignment_id
);
Ok(())
}
async fn get_current_squad(&self, platform_id: &str) -> Result<Option<String>> {
let valid_squads = self.store.get_valid_cells().await?;
for squad in valid_squads {
if squad.is_member(platform_id) {
return Ok(squad.config.as_ref().map(|c| c.id.clone()));
}
}
Ok(None)
}
pub fn get_assignment(&self, assignment_id: &str) -> Option<&CellAssignment> {
self.assignments.get(assignment_id)
}
pub fn active_assignments(&self) -> Vec<&CellAssignment> {
self.assignments
.values()
.filter(|a| a.is_active())
.collect()
}
pub fn cleanup_expired(&mut self) {
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
self.assignments.retain(|_, assignment| {
current_time.saturating_sub(assignment.timestamp) <= self.assignment_timeout
});
}
}