1use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::{HashMap, VecDeque};
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TaskStatus {
15 Pending,
17 Assigned { nodeid: String },
19 Running { nodeid: String, started_at: Instant },
21 Completed {
23 nodeid: String,
24 completed_at: Instant,
25 },
26 Failed { nodeid: String, error: String },
28 Cancelled,
30 TimedOut,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
36pub enum TaskPriority {
37 Low = 1,
39 Normal = 2,
41 High = 3,
43 Critical = 4,
45}
46
47#[derive(Debug, Clone)]
49pub struct Task {
50 pub id: String,
51 pub name: String,
52 pub payload: Vec<u8>,
53 pub priority: TaskPriority,
54 pub timeout: Option<Duration>,
55 pub dependencies: Vec<String>,
56 pub retry_count: usize,
57 pub maxretries: usize,
58 pub created_at: Instant,
59 pub status: TaskStatus,
60}
61
62impl Task {
63 pub fn new(id: String, name: String, payload: Vec<u8>) -> Self {
65 Self {
66 id,
67 name,
68 payload,
69 priority: TaskPriority::Normal,
70 timeout: Some(Duration::from_secs(300)), dependencies: Vec::new(),
72 retry_count: 0,
73 maxretries: 3,
74 created_at: Instant::now(),
75 status: TaskStatus::Pending,
76 }
77 }
78
79 pub fn with_priority(mut self, priority: TaskPriority) -> Self {
81 self.priority = priority;
82 self
83 }
84
85 pub fn with_timeout(mut self, timeout: Duration) -> Self {
87 self.timeout = Some(timeout);
88 self
89 }
90
91 pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self {
93 self.dependencies = dependencies;
94 self
95 }
96
97 pub fn retries(mut self, maxretries: usize) -> Self {
99 self.maxretries = maxretries;
100 self
101 }
102
103 pub fn can_retry(&self) -> bool {
105 self.retry_count < self.maxretries
106 }
107
108 pub fn increment_retry(&mut self) {
110 self.retry_count += 1;
111 }
112
113 pub fn has_timed_out(&self) -> bool {
115 if let Some(timeout) = self.timeout {
116 match &self.status {
117 TaskStatus::Running { started_at, .. } => {
118 Instant::now().duration_since(*started_at) > timeout
119 }
120 _ => false,
121 }
122 } else {
123 false
124 }
125 }
126}
127
128#[derive(Debug)]
130pub struct Workflow {
131 pub id: String,
132 pub name: String,
133 pub tasks: HashMap<String, Task>,
134 pub execution_order: Vec<String>,
135 pub status: WorkflowStatus,
136 pub created_at: Instant,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
141pub enum WorkflowStatus {
142 Pending,
144 Running,
146 Completed,
148 Failed { error: String },
150 Cancelled,
152}
153
154impl Workflow {
155 pub fn workflow_id(id: String, name: String) -> Self {
157 Self {
158 id,
159 name,
160 tasks: HashMap::new(),
161 execution_order: Vec::new(),
162 status: WorkflowStatus::Pending,
163 created_at: Instant::now(),
164 }
165 }
166
167 pub fn new(id: String, name: String) -> Self {
169 Self::workflow_id(id, name)
170 }
171
172 pub fn add_task(&mut self, task: Task) {
174 let taskid = task.id.clone();
175 self.tasks.insert(taskid.clone(), task);
176 self.execution_order.push(taskid);
177 }
178
179 pub fn get_ready_tasks(&self) -> Vec<&Task> {
181 self.tasks
182 .values()
183 .filter(|task| {
184 matches!(task.status, TaskStatus::Pending)
185 && self.are_dependencies_satisfied(&task.id)
186 })
187 .collect()
188 }
189
190 fn are_dependencies_satisfied(&self, taskid: &str) -> bool {
191 if let Some(task) = self.tasks.get(taskid) {
192 task.dependencies.iter().all(|dep_id| {
193 if let Some(dep_task) = self.tasks.get(dep_id) {
194 matches!(dep_task.status, TaskStatus::Completed { .. })
195 } else {
196 false
197 }
198 })
199 } else {
200 false
201 }
202 }
203
204 pub fn is_complete(&self) -> bool {
206 self.tasks.values().all(|task| {
207 matches!(
208 task.status,
209 TaskStatus::Completed { .. } | TaskStatus::Failed { .. } | TaskStatus::Cancelled
210 )
211 })
212 }
213
214 pub fn has_failed(&self) -> bool {
216 self.tasks
217 .values()
218 .any(|task| matches!(task.status, TaskStatus::Failed { .. }))
219 }
220}
221
222#[derive(Debug, Clone)]
224pub struct OrchestratorNode {
225 pub nodeid: String,
226 pub address: SocketAddr,
227 pub capacity: usize,
228 pub current_load: usize,
229 pub capabilities: Vec<String>,
230 pub last_heartbeat: Instant,
231}
232
233impl OrchestratorNode {
234 pub fn id(nodeid: String, address: SocketAddr, capacity: usize) -> Self {
236 Self {
237 nodeid,
238 address,
239 capacity,
240 current_load: 0,
241 capabilities: Vec::new(),
242 last_heartbeat: Instant::now(),
243 }
244 }
245
246 pub fn new(nodeid: String, address: SocketAddr, capacity: usize) -> Self {
248 Self::id(nodeid, address, capacity)
249 }
250
251 pub fn can_accept_task(&self) -> bool {
253 self.current_load < self.capacity
254 }
255
256 pub fn update_heartbeat(&mut self) {
258 self.last_heartbeat = Instant::now();
259 }
260
261 pub fn is_responsive(&self, timeout: Duration) -> bool {
263 Instant::now().duration_since(self.last_heartbeat) <= timeout
264 }
265}
266
267#[derive(Debug)]
269pub struct OrchestrationEngine {
270 workflows: Arc<Mutex<HashMap<String, Workflow>>>,
271 nodes: Arc<Mutex<HashMap<String, OrchestratorNode>>>,
272 task_queue: Arc<Mutex<VecDeque<String>>>, running_tasks: Arc<Mutex<HashMap<String, (String, Instant)>>>, node_timeout: Duration,
275}
276
277impl OrchestrationEngine {
278 pub fn new() -> Self {
280 Self {
281 workflows: Arc::new(Mutex::new(HashMap::new())),
282 nodes: Arc::new(Mutex::new(HashMap::new())),
283 task_queue: Arc::new(Mutex::new(VecDeque::new())),
284 running_tasks: Arc::new(Mutex::new(HashMap::new())),
285 node_timeout: Duration::from_secs(60),
286 }
287 }
288
289 pub fn register_node(&self, node: OrchestratorNode) -> CoreResult<()> {
291 let mut nodes = self.nodes.lock().map_err(|_| {
292 CoreError::InvalidState(ErrorContext::new(
293 "Failed to acquire nodes lock".to_string(),
294 ))
295 })?;
296 nodes.insert(node.nodeid.clone(), node);
297 Ok(())
298 }
299
300 pub fn submit_workflow(&self, workflow: Workflow) -> CoreResult<()> {
302 let workflow_id = workflow.id.clone();
303
304 let mut workflows = self.workflows.lock().map_err(|_| {
305 CoreError::InvalidState(ErrorContext::new(
306 "Failed to acquire workflows lock".to_string(),
307 ))
308 })?;
309
310 let mut task_queue = self.task_queue.lock().map_err(|_| {
311 CoreError::InvalidState(ErrorContext::new(
312 "Failed to acquire task queue lock".to_string(),
313 ))
314 })?;
315
316 for task in workflow.get_ready_tasks() {
318 task_queue.push_back(task.id.clone());
319 }
320
321 workflows.insert(workflow_id, workflow);
322 Ok(())
323 }
324
325 pub fn submit_task(&self, task: Task) -> CoreResult<()> {
327 let taskid = task.id.clone();
328
329 let mut workflow = Workflow::new(format!("workflow_{taskid}"), task.name.to_string());
331 workflow.add_task(task);
332
333 self.submit_workflow(workflow)
334 }
335
336 pub fn process_task_queue(&self) -> CoreResult<()> {
338 let mut task_queue = self.task_queue.lock().map_err(|_| {
339 CoreError::InvalidState(ErrorContext::new(
340 "Failed to acquire task queue lock".to_string(),
341 ))
342 })?;
343
344 let mut nodes = self.nodes.lock().map_err(|_| {
345 CoreError::InvalidState(ErrorContext::new(
346 "Failed to acquire nodes lock".to_string(),
347 ))
348 })?;
349
350 let mut workflows = self.workflows.lock().map_err(|_| {
351 CoreError::InvalidState(ErrorContext::new(
352 "Failed to acquire workflows lock".to_string(),
353 ))
354 })?;
355
356 let mut running_tasks = self.running_tasks.lock().map_err(|_| {
357 CoreError::InvalidState(ErrorContext::new(
358 "Failed to acquire running tasks lock".to_string(),
359 ))
360 })?;
361
362 let mut tasks_to_assign = Vec::new();
364 while let Some(taskid) = task_queue.pop_front() {
365 tasks_to_assign.push(taskid);
366 }
367
368 tasks_to_assign.sort_by(|a, b| {
370 let priority_a = self
371 .find_task_priority(a, &workflows)
372 .unwrap_or(TaskPriority::Low);
373 let priority_b = self
374 .find_task_priority(b, &workflows)
375 .unwrap_or(TaskPriority::Low);
376 priority_b.cmp(&priority_a) });
378
379 for taskid in tasks_to_assign {
380 if let Some(available_node) = nodes
382 .values_mut()
383 .filter(|node| node.can_accept_task() && node.is_responsive(self.node_timeout))
384 .min_by_key(|node| node.current_load)
385 {
386 if let Some(task) = self.find_task_mut(&taskid, &mut workflows) {
388 task.status = TaskStatus::Running {
389 nodeid: available_node.nodeid.clone(),
390 started_at: Instant::now(),
391 };
392
393 available_node.current_load += 1;
394 running_tasks.insert(taskid, (available_node.nodeid.clone(), Instant::now()));
395 } else {
396 task_queue.push_back(taskid);
398 }
399 } else {
400 task_queue.push_back(taskid);
402 }
403 }
404
405 Ok(())
406 }
407
408 fn find_task_priority(
409 &self,
410 taskid: &str,
411 workflows: &HashMap<String, Workflow>,
412 ) -> Option<TaskPriority> {
413 for workflow in workflows.values() {
414 if let Some(task) = workflow.tasks.get(taskid) {
415 return Some(task.priority);
416 }
417 }
418 None
419 }
420
421 fn find_task_mut<'a>(
422 &self,
423 taskid: &str,
424 workflows: &'a mut HashMap<String, Workflow>,
425 ) -> Option<&'a mut Task> {
426 for workflow in workflows.values_mut() {
427 if let Some(task) = workflow.tasks.get_mut(taskid) {
428 return Some(task);
429 }
430 }
431 None
432 }
433
434 pub fn complete_task(&mut self, taskid: &str) -> CoreResult<()> {
436 let mut workflows = self.workflows.lock().map_err(|_| {
437 CoreError::InvalidState(ErrorContext::new(
438 "Failed to acquire workflows lock".to_string(),
439 ))
440 })?;
441
442 let mut nodes = self.nodes.lock().map_err(|_| {
443 CoreError::InvalidState(ErrorContext::new(
444 "Failed to acquire nodes lock".to_string(),
445 ))
446 })?;
447
448 let mut running_tasks = self.running_tasks.lock().map_err(|_| {
449 CoreError::InvalidState(ErrorContext::new(
450 "Failed to acquire running tasks lock".to_string(),
451 ))
452 })?;
453
454 let nodeid = running_tasks
456 .get(taskid)
457 .map(|(nodeid, _)| nodeid.clone())
458 .unwrap_or_else(|| "unknown".to_string());
459
460 if let Some(task) = self.find_task_mut(taskid, &mut workflows) {
462 task.status = TaskStatus::Completed {
463 nodeid: nodeid.clone(),
464 completed_at: Instant::now(),
465 };
466 }
467
468 if let Some(node) = nodes.get_mut(&nodeid) {
470 node.current_load = node.current_load.saturating_sub(1);
471 }
472
473 running_tasks.remove(taskid);
475
476 self.queue_ready_tasks(&workflows)?;
478
479 Ok(())
480 }
481
482 fn queue_ready_tasks(&self, workflows: &HashMap<String, Workflow>) -> CoreResult<()> {
483 let mut task_queue = self.task_queue.lock().map_err(|_| {
484 CoreError::InvalidState(ErrorContext::new(
485 "Failed to acquire task queue lock".to_string(),
486 ))
487 })?;
488
489 for workflow in workflows.values() {
490 for task in workflow.get_ready_tasks() {
491 if !task_queue.iter().any(|id| id == &task.id) {
492 task_queue.push_back(task.id.clone());
493 }
494 }
495 }
496
497 Ok(())
498 }
499
500 pub fn get_statistics(&self) -> CoreResult<OrchestrationStats> {
502 let workflows = self.workflows.lock().map_err(|_| {
503 CoreError::InvalidState(ErrorContext::new(
504 "Failed to acquire workflows lock".to_string(),
505 ))
506 })?;
507
508 let nodes = self.nodes.lock().map_err(|_| {
509 CoreError::InvalidState(ErrorContext::new(
510 "Failed to acquire nodes lock".to_string(),
511 ))
512 })?;
513
514 let task_queue = self.task_queue.lock().map_err(|_| {
515 CoreError::InvalidState(ErrorContext::new(
516 "Failed to acquire task queue lock".to_string(),
517 ))
518 })?;
519
520 let running_tasks = self.running_tasks.lock().map_err(|_| {
521 CoreError::InvalidState(ErrorContext::new(
522 "Failed to acquire running tasks lock".to_string(),
523 ))
524 })?;
525
526 let total_workflows = workflows.len();
527 let pending_workflows = workflows
528 .values()
529 .filter(|w| matches!(w.status, WorkflowStatus::Pending))
530 .count();
531 let running_workflows = workflows
532 .values()
533 .filter(|w| matches!(w.status, WorkflowStatus::Running))
534 .count();
535 let completed_workflows = workflows
536 .values()
537 .filter(|w| matches!(w.status, WorkflowStatus::Completed))
538 .count();
539
540 let total_tasks: usize = workflows.values().map(|w| w.tasks.len()).sum();
541 let pending_tasks = task_queue.len();
542 let running_tasks_count = running_tasks.len();
543
544 let total_nodes = nodes.len();
545 let active_nodes = nodes
546 .values()
547 .filter(|n| n.is_responsive(self.node_timeout))
548 .count();
549 let total_capacity: usize = nodes.values().map(|n| n.capacity).sum();
550 let current_load: usize = nodes.values().map(|n| n.current_load).sum();
551
552 Ok(OrchestrationStats {
553 total_workflows,
554 pending_workflows,
555 running_workflows,
556 completed_workflows,
557 total_tasks,
558 pending_tasks,
559 running_tasks: running_tasks_count,
560 total_nodes,
561 active_nodes,
562 total_capacity,
563 current_load,
564 })
565 }
566}
567
568impl Default for OrchestrationEngine {
569 fn default() -> Self {
570 Self::new()
571 }
572}
573
574#[derive(Debug)]
576pub struct OrchestrationStats {
577 pub total_workflows: usize,
578 pub pending_workflows: usize,
579 pub running_workflows: usize,
580 pub completed_workflows: usize,
581 pub total_tasks: usize,
582 pub pending_tasks: usize,
583 pub running_tasks: usize,
584 pub total_nodes: usize,
585 pub active_nodes: usize,
586 pub total_capacity: usize,
587 pub current_load: usize,
588}
589
590impl OrchestrationStats {
591 pub fn capacity_utilization(&self) -> f64 {
593 if self.total_capacity == 0 {
594 0.0
595 } else {
596 (self.current_load as f64 / self.total_capacity as f64) * 100.0
597 }
598 }
599
600 pub fn node_availability(&self) -> f64 {
602 if self.total_nodes == 0 {
603 0.0
604 } else {
605 (self.active_nodes as f64 / self.total_nodes as f64) * 100.0
606 }
607 }
608}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613 use std::net::{IpAddr, Ipv4Addr};
614
615 #[test]
616 fn test_task_creation() {
617 let task = Task::new("task1".to_string(), "Test Task".to_string(), vec![1, 2, 3])
618 .with_priority(TaskPriority::High)
619 .with_timeout(Duration::from_secs(60));
620
621 assert_eq!(task.id, "task1");
622 assert_eq!(task.priority, TaskPriority::High);
623 assert_eq!(task.timeout, Some(Duration::from_secs(60)));
624 assert!(task.can_retry());
625 }
626
627 #[test]
628 fn test_workflow_creation() {
629 let mut workflow = Workflow::new("wf1".to_string(), "Test Workflow".to_string());
630 let task = Task::new("task1".to_string(), "Test Task".to_string(), vec![1, 2, 3]);
631
632 workflow.add_task(task);
633 assert_eq!(workflow.tasks.len(), 1);
634 assert_eq!(workflow.execution_order.len(), 1);
635
636 let ready_tasks = workflow.get_ready_tasks();
637 assert_eq!(ready_tasks.len(), 1);
638 }
639
640 #[test]
641 fn test_orchestrator_node() {
642 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
643 let mut node = OrchestratorNode::new("node1".to_string(), address, 10);
644
645 assert!(node.can_accept_task());
646 assert!(node.is_responsive(Duration::from_secs(30)));
647
648 node.current_load = 10;
649 assert!(!node.can_accept_task());
650 }
651
652 #[test]
653 fn test_orchestration_engine() {
654 let engine = OrchestrationEngine::new();
655
656 let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
657 let node = OrchestratorNode::new("node1".to_string(), address, 5);
658
659 assert!(engine.register_node(node).is_ok());
660
661 let task = Task::new("task1".to_string(), "Test Task".to_string(), vec![1, 2, 3]);
662 assert!(engine.submit_task(task).is_ok());
663
664 let stats = engine.get_statistics().unwrap();
665 assert_eq!(stats.total_nodes, 1);
666 assert_eq!(stats.total_workflows, 1);
667 }
668
669 #[test]
670 fn test_orchestration_stats() {
671 let stats = OrchestrationStats {
672 total_workflows: 10,
673 pending_workflows: 2,
674 running_workflows: 3,
675 completed_workflows: 5,
676 total_tasks: 50,
677 pending_tasks: 10,
678 running_tasks: 15,
679 total_nodes: 5,
680 active_nodes: 4,
681 total_capacity: 100,
682 current_load: 75,
683 };
684
685 assert_eq!(stats.capacity_utilization(), 75.0);
686 assert_eq!(stats.node_availability(), 80.0);
687 }
688}