1use crate::models::CellStateExt;
40use crate::storage::CellStore;
41use crate::{Error, Result};
42use serde::{Deserialize, Serialize};
43use std::collections::HashMap;
44use tracing::{debug, info, instrument, warn};
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
48pub enum AssignmentPriority {
49 Low,
51 #[default]
53 Normal,
54 High,
56 Critical,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum AssignmentStatus {
63 Pending,
65 InProgress,
67 Completed,
69 Failed { reason: String },
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct CellAssignment {
79 pub assignment_id: String,
81 pub squad_id: String,
83 pub platform_ids: Vec<String>,
85 pub issued_by: String,
87 pub timestamp: u64,
89 pub priority: AssignmentPriority,
91 pub status: AssignmentStatus,
93 pub context: Option<String>,
95}
96
97impl CellAssignment {
98 pub fn new(
100 assignment_id: String,
101 squad_id: String,
102 platform_ids: Vec<String>,
103 issued_by: String,
104 priority: AssignmentPriority,
105 ) -> Self {
106 let timestamp = std::time::SystemTime::now()
107 .duration_since(std::time::UNIX_EPOCH)
108 .unwrap()
109 .as_secs();
110
111 Self {
112 assignment_id,
113 squad_id,
114 platform_ids,
115 issued_by,
116 timestamp,
117 priority,
118 status: AssignmentStatus::Pending,
119 context: None,
120 }
121 }
122
123 pub fn with_context(mut self, context: String) -> Self {
125 self.context = Some(context);
126 self
127 }
128
129 pub fn includes_platform(&self, platform_id: &str) -> bool {
131 self.platform_ids.iter().any(|id| id == platform_id)
132 }
133
134 pub fn mark_in_progress(&mut self) {
136 self.status = AssignmentStatus::InProgress;
137 }
138
139 pub fn mark_completed(&mut self) {
141 self.status = AssignmentStatus::Completed;
142 }
143
144 pub fn mark_failed(&mut self, reason: String) {
146 self.status = AssignmentStatus::Failed { reason };
147 }
148
149 pub fn is_active(&self) -> bool {
151 matches!(
152 self.status,
153 AssignmentStatus::Pending | AssignmentStatus::InProgress
154 )
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
160pub enum ValidationResult {
161 Valid,
163 SquadNotFound,
165 SquadFull,
167 PlatformAlreadyAssigned { current_squad: String },
169 Unauthorized,
171 Expired,
173 Invalid { reason: String },
175}
176
177pub struct DirectedAssignmentManager<B: crate::sync::DataSyncBackend> {
181 store: CellStore<B>,
183 assignments: HashMap<String, CellAssignment>,
185 my_platform_id: String,
187 assignment_timeout: u64,
189}
190
191impl<B: crate::sync::DataSyncBackend> DirectedAssignmentManager<B> {
192 pub fn new(store: CellStore<B>, my_platform_id: String) -> Self {
194 Self {
195 store,
196 assignments: HashMap::new(),
197 my_platform_id,
198 assignment_timeout: 300, }
200 }
201
202 pub fn with_timeout(mut self, timeout_secs: u64) -> Self {
204 self.assignment_timeout = timeout_secs;
205 self
206 }
207
208 #[instrument(skip(self, assignment))]
210 pub async fn process_assignment(&mut self, assignment: CellAssignment) -> Result<()> {
211 info!(
212 "Processing assignment {} for squad {}",
213 assignment.assignment_id, assignment.squad_id
214 );
215
216 if !assignment.includes_platform(&self.my_platform_id) {
218 debug!(
219 "Assignment {} does not include platform {}",
220 assignment.assignment_id, self.my_platform_id
221 );
222 return Ok(());
223 }
224
225 let validation = self.validate_assignment(&assignment).await?;
227 if validation != ValidationResult::Valid {
228 warn!(
229 "Assignment {} failed validation: {:?}",
230 assignment.assignment_id, validation
231 );
232 return Err(Error::InvalidTransition {
233 from: "Pending assignment".to_string(),
234 to: "Executed assignment".to_string(),
235 reason: format!("Assignment validation failed: {:?}", validation),
236 });
237 }
238
239 self.assignments
241 .insert(assignment.assignment_id.clone(), assignment.clone());
242
243 self.execute_assignment(assignment).await?;
245
246 Ok(())
247 }
248
249 #[instrument(skip(self, assignment))]
251 async fn validate_assignment(&self, assignment: &CellAssignment) -> Result<ValidationResult> {
252 debug!("Validating assignment {}", assignment.assignment_id);
253
254 let current_time = std::time::SystemTime::now()
256 .duration_since(std::time::UNIX_EPOCH)
257 .unwrap()
258 .as_secs();
259
260 if current_time.saturating_sub(assignment.timestamp) > self.assignment_timeout {
261 return Ok(ValidationResult::Expired);
262 }
263
264 let squad = self.store.get_cell(&assignment.squad_id).await?;
266 if squad.is_none() {
267 return Ok(ValidationResult::SquadNotFound);
268 }
269
270 let squad = squad.unwrap();
271
272 if squad.is_full() {
274 return Ok(ValidationResult::SquadFull);
275 }
276
277 if let Some(current_squad) = self.get_current_squad(&self.my_platform_id).await? {
279 if current_squad != assignment.squad_id {
280 return Ok(ValidationResult::PlatformAlreadyAssigned {
281 current_squad: current_squad.clone(),
282 });
283 }
284 }
285
286 Ok(ValidationResult::Valid)
287 }
288
289 #[instrument(skip(self, assignment))]
291 async fn execute_assignment(&mut self, mut assignment: CellAssignment) -> Result<()> {
292 info!(
293 "Executing assignment {} - joining squad {}",
294 assignment.assignment_id, assignment.squad_id
295 );
296
297 assignment.mark_in_progress();
298
299 self.store
301 .add_member(&assignment.squad_id, self.my_platform_id.clone())
302 .await?;
303
304 assignment.mark_completed();
305 self.assignments
306 .insert(assignment.assignment_id.clone(), assignment.clone());
307
308 info!(
309 "Assignment {} completed successfully",
310 assignment.assignment_id
311 );
312
313 Ok(())
314 }
315
316 async fn get_current_squad(&self, platform_id: &str) -> Result<Option<String>> {
318 let valid_squads = self.store.get_valid_cells().await?;
319
320 for squad in valid_squads {
321 if squad.is_member(platform_id) {
322 return Ok(squad.config.as_ref().map(|c| c.id.clone()));
323 }
324 }
325
326 Ok(None)
327 }
328
329 pub fn get_assignment(&self, assignment_id: &str) -> Option<&CellAssignment> {
331 self.assignments.get(assignment_id)
332 }
333
334 pub fn active_assignments(&self) -> Vec<&CellAssignment> {
336 self.assignments
337 .values()
338 .filter(|a| a.is_active())
339 .collect()
340 }
341
342 pub fn cleanup_expired(&mut self) {
344 let current_time = std::time::SystemTime::now()
345 .duration_since(std::time::UNIX_EPOCH)
346 .unwrap()
347 .as_secs();
348
349 self.assignments.retain(|_, assignment| {
350 current_time.saturating_sub(assignment.timestamp) <= self.assignment_timeout
351 });
352 }
353}