scirs2_core/distributed/
orchestration.rs

1//! Orchestration and coordination for distributed systems
2//!
3//! This module provides orchestration capabilities for managing distributed
4//! workflows, task coordination, and resource allocation across cluster nodes.
5
6use crate::error::{CoreError, CoreResult, ErrorContext};
7use std::collections::{HashMap, VecDeque};
8use std::net::SocketAddr;
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant};
11
12/// Task status in the orchestration system
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TaskStatus {
15    /// Task is pending execution
16    Pending,
17    /// Task has been assigned to a node
18    Assigned { nodeid: String },
19    /// Task is currently running
20    Running { nodeid: String, started_at: Instant },
21    /// Task completed successfully
22    Completed {
23        nodeid: String,
24        completed_at: Instant,
25    },
26    /// Task failed with an error
27    Failed { nodeid: String, error: String },
28    /// Task was cancelled
29    Cancelled,
30    /// Task timed out
31    TimedOut,
32}
33
34/// Priority level for tasks
35#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
36pub enum TaskPriority {
37    /// Low priority task
38    Low = 1,
39    /// Normal priority task
40    Normal = 2,
41    /// High priority task
42    High = 3,
43    /// Critical priority task
44    Critical = 4,
45}
46
47/// Task definition for orchestration
48#[derive(Debug, Clone)]
49pub struct Task {
50    pub id: String,
51    pub name: String,
52    pub payload: Vec<u8>,
53    pub priority: TaskPriority,
54    pub timeout: Option<Duration>,
55    pub dependencies: Vec<String>,
56    pub retry_count: usize,
57    pub maxretries: usize,
58    pub created_at: Instant,
59    pub status: TaskStatus,
60}
61
62impl Task {
63    /// Create a new task
64    pub fn new(id: String, name: String, payload: Vec<u8>) -> Self {
65        Self {
66            id,
67            name,
68            payload,
69            priority: TaskPriority::Normal,
70            timeout: Some(Duration::from_secs(300)), // 5 minutes default
71            dependencies: Vec::new(),
72            retry_count: 0,
73            maxretries: 3,
74            created_at: Instant::now(),
75            status: TaskStatus::Pending,
76        }
77    }
78
79    /// Set task priority
80    pub fn with_priority(mut self, priority: TaskPriority) -> Self {
81        self.priority = priority;
82        self
83    }
84
85    /// Set task timeout
86    pub fn with_timeout(mut self, timeout: Duration) -> Self {
87        self.timeout = Some(timeout);
88        self
89    }
90
91    /// Add task dependencies
92    pub fn with_dependencies(mut self, dependencies: Vec<String>) -> Self {
93        self.dependencies = dependencies;
94        self
95    }
96
97    /// Set maximum retry count
98    pub fn retries(mut self, maxretries: usize) -> Self {
99        self.maxretries = maxretries;
100        self
101    }
102
103    /// Check if task can be retried
104    pub fn can_retry(&self) -> bool {
105        self.retry_count < self.maxretries
106    }
107
108    /// Increment retry count
109    pub fn increment_retry(&mut self) {
110        self.retry_count += 1;
111    }
112
113    /// Check if task has timed out
114    pub fn has_timed_out(&self) -> bool {
115        if let Some(timeout) = self.timeout {
116            match &self.status {
117                TaskStatus::Running { started_at, .. } => {
118                    Instant::now().duration_since(*started_at) > timeout
119                }
120                _ => false,
121            }
122        } else {
123            false
124        }
125    }
126}
127
128/// Workflow definition containing multiple tasks
129#[derive(Debug)]
130pub struct Workflow {
131    pub id: String,
132    pub name: String,
133    pub tasks: HashMap<String, Task>,
134    pub execution_order: Vec<String>,
135    pub status: WorkflowStatus,
136    pub created_at: Instant,
137}
138
139/// Workflow execution status
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub enum WorkflowStatus {
142    /// Workflow is pending execution
143    Pending,
144    /// Workflow is currently running
145    Running,
146    /// Workflow completed successfully
147    Completed,
148    /// Workflow failed
149    Failed { error: String },
150    /// Workflow was cancelled
151    Cancelled,
152}
153
154impl Workflow {
155    /// Create a new workflow
156    pub fn workflow_id(id: String, name: String) -> Self {
157        Self {
158            id,
159            name,
160            tasks: HashMap::new(),
161            execution_order: Vec::new(),
162            status: WorkflowStatus::Pending,
163            created_at: Instant::now(),
164        }
165    }
166
167    /// Create a new workflow (alias for workflow_id)
168    pub fn new(id: String, name: String) -> Self {
169        Self::workflow_id(id, name)
170    }
171
172    /// Add a task to the workflow
173    pub fn add_task(&mut self, task: Task) {
174        let taskid = task.id.clone();
175        self.tasks.insert(taskid.clone(), task);
176        self.execution_order.push(taskid);
177    }
178
179    /// Get tasks that are ready to execute (dependencies satisfied)
180    pub fn get_ready_tasks(&self) -> Vec<&Task> {
181        self.tasks
182            .values()
183            .filter(|task| {
184                matches!(task.status, TaskStatus::Pending)
185                    && self.are_dependencies_satisfied(&task.id)
186            })
187            .collect()
188    }
189
190    fn are_dependencies_satisfied(&self, taskid: &str) -> bool {
191        if let Some(task) = self.tasks.get(taskid) {
192            task.dependencies.iter().all(|dep_id| {
193                if let Some(dep_task) = self.tasks.get(dep_id) {
194                    matches!(dep_task.status, TaskStatus::Completed { .. })
195                } else {
196                    false
197                }
198            })
199        } else {
200            false
201        }
202    }
203
204    /// Check if workflow is complete
205    pub fn is_complete(&self) -> bool {
206        self.tasks.values().all(|task| {
207            matches!(
208                task.status,
209                TaskStatus::Completed { .. } | TaskStatus::Failed { .. } | TaskStatus::Cancelled
210            )
211        })
212    }
213
214    /// Check if workflow has failed
215    pub fn has_failed(&self) -> bool {
216        self.tasks
217            .values()
218            .any(|task| matches!(task.status, TaskStatus::Failed { .. }))
219    }
220}
221
222/// Node information for orchestration
223#[derive(Debug, Clone)]
224pub struct OrchestratorNode {
225    pub nodeid: String,
226    pub address: SocketAddr,
227    pub capacity: usize,
228    pub current_load: usize,
229    pub capabilities: Vec<String>,
230    pub last_heartbeat: Instant,
231}
232
233impl OrchestratorNode {
234    /// Create a new orchestrator node
235    pub fn id(nodeid: String, address: SocketAddr, capacity: usize) -> Self {
236        Self {
237            nodeid,
238            address,
239            capacity,
240            current_load: 0,
241            capabilities: Vec::new(),
242            last_heartbeat: Instant::now(),
243        }
244    }
245
246    /// Create a new orchestrator node (alias for id)
247    pub fn new(nodeid: String, address: SocketAddr, capacity: usize) -> Self {
248        Self::id(nodeid, address, capacity)
249    }
250
251    /// Check if node can accept more tasks
252    pub fn can_accept_task(&self) -> bool {
253        self.current_load < self.capacity
254    }
255
256    /// Update node heartbeat
257    pub fn update_heartbeat(&mut self) {
258        self.last_heartbeat = Instant::now();
259    }
260
261    /// Check if node is responsive
262    pub fn is_responsive(&self, timeout: Duration) -> bool {
263        Instant::now().duration_since(self.last_heartbeat) <= timeout
264    }
265}
266
267/// Orchestration engine for managing distributed workflows
268#[derive(Debug)]
269pub struct OrchestrationEngine {
270    workflows: Arc<Mutex<HashMap<String, Workflow>>>,
271    nodes: Arc<Mutex<HashMap<String, OrchestratorNode>>>,
272    task_queue: Arc<Mutex<VecDeque<String>>>, // Task IDs
273    running_tasks: Arc<Mutex<HashMap<String, (String, Instant)>>>, // Task ID -> (Node ID, Start time)
274    node_timeout: Duration,
275}
276
277impl OrchestrationEngine {
278    /// Create a new orchestration engine
279    pub fn new() -> Self {
280        Self {
281            workflows: Arc::new(Mutex::new(HashMap::new())),
282            nodes: Arc::new(Mutex::new(HashMap::new())),
283            task_queue: Arc::new(Mutex::new(VecDeque::new())),
284            running_tasks: Arc::new(Mutex::new(HashMap::new())),
285            node_timeout: Duration::from_secs(60),
286        }
287    }
288
289    /// Register a node with the orchestrator
290    pub fn register_node(&self, node: OrchestratorNode) -> CoreResult<()> {
291        let mut nodes = self.nodes.lock().map_err(|_| {
292            CoreError::InvalidState(ErrorContext::new(
293                "Failed to acquire nodes lock".to_string(),
294            ))
295        })?;
296        nodes.insert(node.nodeid.clone(), node);
297        Ok(())
298    }
299
300    /// Submit a workflow for execution
301    pub fn submit_workflow(&self, workflow: Workflow) -> CoreResult<()> {
302        let workflow_id = workflow.id.clone();
303
304        let mut workflows = self.workflows.lock().map_err(|_| {
305            CoreError::InvalidState(ErrorContext::new(
306                "Failed to acquire workflows lock".to_string(),
307            ))
308        })?;
309
310        let mut task_queue = self.task_queue.lock().map_err(|_| {
311            CoreError::InvalidState(ErrorContext::new(
312                "Failed to acquire task queue lock".to_string(),
313            ))
314        })?;
315
316        // Add ready tasks to the queue
317        for task in workflow.get_ready_tasks() {
318            task_queue.push_back(task.id.clone());
319        }
320
321        workflows.insert(workflow_id, workflow);
322        Ok(())
323    }
324
325    /// Submit a single task for execution
326    pub fn submit_task(&self, task: Task) -> CoreResult<()> {
327        let taskid = task.id.clone();
328
329        // Create a single-task workflow
330        let mut workflow = Workflow::new(format!("workflow_{taskid}"), task.name.to_string());
331        workflow.add_task(task);
332
333        self.submit_workflow(workflow)
334    }
335
336    /// Process the task queue and assign tasks to available nodes
337    pub fn process_task_queue(&self) -> CoreResult<()> {
338        let mut task_queue = self.task_queue.lock().map_err(|_| {
339            CoreError::InvalidState(ErrorContext::new(
340                "Failed to acquire task queue lock".to_string(),
341            ))
342        })?;
343
344        let mut nodes = self.nodes.lock().map_err(|_| {
345            CoreError::InvalidState(ErrorContext::new(
346                "Failed to acquire nodes lock".to_string(),
347            ))
348        })?;
349
350        let mut workflows = self.workflows.lock().map_err(|_| {
351            CoreError::InvalidState(ErrorContext::new(
352                "Failed to acquire workflows lock".to_string(),
353            ))
354        })?;
355
356        let mut running_tasks = self.running_tasks.lock().map_err(|_| {
357            CoreError::InvalidState(ErrorContext::new(
358                "Failed to acquire running tasks lock".to_string(),
359            ))
360        })?;
361
362        // Process tasks in priority order
363        let mut tasks_to_assign = Vec::new();
364        while let Some(taskid) = task_queue.pop_front() {
365            tasks_to_assign.push(taskid);
366        }
367
368        // Sort by priority
369        tasks_to_assign.sort_by(|a, b| {
370            let priority_a = self
371                .find_task_priority(a, &workflows)
372                .unwrap_or(TaskPriority::Low);
373            let priority_b = self
374                .find_task_priority(b, &workflows)
375                .unwrap_or(TaskPriority::Low);
376            priority_b.cmp(&priority_a) // Higher priority first
377        });
378
379        for taskid in tasks_to_assign {
380            // Find an available node
381            if let Some(available_node) = nodes
382                .values_mut()
383                .filter(|node| node.can_accept_task() && node.is_responsive(self.node_timeout))
384                .min_by_key(|node| node.current_load)
385            {
386                // Assign task to node
387                if let Some(task) = self.find_task_mut(&taskid, &mut workflows) {
388                    task.status = TaskStatus::Running {
389                        nodeid: available_node.nodeid.clone(),
390                        started_at: Instant::now(),
391                    };
392
393                    available_node.current_load += 1;
394                    running_tasks.insert(taskid, (available_node.nodeid.clone(), Instant::now()));
395                } else {
396                    // Task not found, put it back in queue
397                    task_queue.push_back(taskid);
398                }
399            } else {
400                // No available nodes, put task back in queue
401                task_queue.push_back(taskid);
402            }
403        }
404
405        Ok(())
406    }
407
408    fn find_task_priority(
409        &self,
410        taskid: &str,
411        workflows: &HashMap<String, Workflow>,
412    ) -> Option<TaskPriority> {
413        for workflow in workflows.values() {
414            if let Some(task) = workflow.tasks.get(taskid) {
415                return Some(task.priority);
416            }
417        }
418        None
419    }
420
421    fn find_task_mut<'a>(
422        &self,
423        taskid: &str,
424        workflows: &'a mut HashMap<String, Workflow>,
425    ) -> Option<&'a mut Task> {
426        for workflow in workflows.values_mut() {
427            if let Some(task) = workflow.tasks.get_mut(taskid) {
428                return Some(task);
429            }
430        }
431        None
432    }
433
434    /// Mark a task as completed
435    pub fn complete_task(&mut self, taskid: &str) -> CoreResult<()> {
436        let mut workflows = self.workflows.lock().map_err(|_| {
437            CoreError::InvalidState(ErrorContext::new(
438                "Failed to acquire workflows lock".to_string(),
439            ))
440        })?;
441
442        let mut nodes = self.nodes.lock().map_err(|_| {
443            CoreError::InvalidState(ErrorContext::new(
444                "Failed to acquire nodes lock".to_string(),
445            ))
446        })?;
447
448        let mut running_tasks = self.running_tasks.lock().map_err(|_| {
449            CoreError::InvalidState(ErrorContext::new(
450                "Failed to acquire running tasks lock".to_string(),
451            ))
452        })?;
453
454        // Get the nodeid from running tasks
455        let nodeid = running_tasks
456            .get(taskid)
457            .map(|(nodeid, _)| nodeid.clone())
458            .unwrap_or_else(|| "unknown".to_string());
459
460        // Update task status
461        if let Some(task) = self.find_task_mut(taskid, &mut workflows) {
462            task.status = TaskStatus::Completed {
463                nodeid: nodeid.clone(),
464                completed_at: Instant::now(),
465            };
466        }
467
468        // Update node load
469        if let Some(node) = nodes.get_mut(&nodeid) {
470            node.current_load = node.current_load.saturating_sub(1);
471        }
472
473        // Remove from running tasks
474        running_tasks.remove(taskid);
475
476        // Add newly ready tasks to queue
477        self.queue_ready_tasks(&workflows)?;
478
479        Ok(())
480    }
481
482    fn queue_ready_tasks(&self, workflows: &HashMap<String, Workflow>) -> CoreResult<()> {
483        let mut task_queue = self.task_queue.lock().map_err(|_| {
484            CoreError::InvalidState(ErrorContext::new(
485                "Failed to acquire task queue lock".to_string(),
486            ))
487        })?;
488
489        for workflow in workflows.values() {
490            for task in workflow.get_ready_tasks() {
491                if !task_queue.iter().any(|id| id == &task.id) {
492                    task_queue.push_back(task.id.clone());
493                }
494            }
495        }
496
497        Ok(())
498    }
499
500    /// Get orchestration statistics
501    pub fn get_statistics(&self) -> CoreResult<OrchestrationStats> {
502        let workflows = self.workflows.lock().map_err(|_| {
503            CoreError::InvalidState(ErrorContext::new(
504                "Failed to acquire workflows lock".to_string(),
505            ))
506        })?;
507
508        let nodes = self.nodes.lock().map_err(|_| {
509            CoreError::InvalidState(ErrorContext::new(
510                "Failed to acquire nodes lock".to_string(),
511            ))
512        })?;
513
514        let task_queue = self.task_queue.lock().map_err(|_| {
515            CoreError::InvalidState(ErrorContext::new(
516                "Failed to acquire task queue lock".to_string(),
517            ))
518        })?;
519
520        let running_tasks = self.running_tasks.lock().map_err(|_| {
521            CoreError::InvalidState(ErrorContext::new(
522                "Failed to acquire running tasks lock".to_string(),
523            ))
524        })?;
525
526        let total_workflows = workflows.len();
527        let pending_workflows = workflows
528            .values()
529            .filter(|w| matches!(w.status, WorkflowStatus::Pending))
530            .count();
531        let running_workflows = workflows
532            .values()
533            .filter(|w| matches!(w.status, WorkflowStatus::Running))
534            .count();
535        let completed_workflows = workflows
536            .values()
537            .filter(|w| matches!(w.status, WorkflowStatus::Completed))
538            .count();
539
540        let total_tasks: usize = workflows.values().map(|w| w.tasks.len()).sum();
541        let pending_tasks = task_queue.len();
542        let running_tasks_count = running_tasks.len();
543
544        let total_nodes = nodes.len();
545        let active_nodes = nodes
546            .values()
547            .filter(|n| n.is_responsive(self.node_timeout))
548            .count();
549        let total_capacity: usize = nodes.values().map(|n| n.capacity).sum();
550        let current_load: usize = nodes.values().map(|n| n.current_load).sum();
551
552        Ok(OrchestrationStats {
553            total_workflows,
554            pending_workflows,
555            running_workflows,
556            completed_workflows,
557            total_tasks,
558            pending_tasks,
559            running_tasks: running_tasks_count,
560            total_nodes,
561            active_nodes,
562            total_capacity,
563            current_load,
564        })
565    }
566}
567
568impl Default for OrchestrationEngine {
569    fn default() -> Self {
570        Self::new()
571    }
572}
573
574/// Orchestration statistics
575#[derive(Debug)]
576pub struct OrchestrationStats {
577    pub total_workflows: usize,
578    pub pending_workflows: usize,
579    pub running_workflows: usize,
580    pub completed_workflows: usize,
581    pub total_tasks: usize,
582    pub pending_tasks: usize,
583    pub running_tasks: usize,
584    pub total_nodes: usize,
585    pub active_nodes: usize,
586    pub total_capacity: usize,
587    pub current_load: usize,
588}
589
590impl OrchestrationStats {
591    /// Calculate capacity utilization percentage
592    pub fn capacity_utilization(&self) -> f64 {
593        if self.total_capacity == 0 {
594            0.0
595        } else {
596            (self.current_load as f64 / self.total_capacity as f64) * 100.0
597        }
598    }
599
600    /// Calculate node availability percentage
601    pub fn node_availability(&self) -> f64 {
602        if self.total_nodes == 0 {
603            0.0
604        } else {
605            (self.active_nodes as f64 / self.total_nodes as f64) * 100.0
606        }
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613    use std::net::{IpAddr, Ipv4Addr};
614
615    #[test]
616    fn test_task_creation() {
617        let task = Task::new("task1".to_string(), "Test Task".to_string(), vec![1, 2, 3])
618            .with_priority(TaskPriority::High)
619            .with_timeout(Duration::from_secs(60));
620
621        assert_eq!(task.id, "task1");
622        assert_eq!(task.priority, TaskPriority::High);
623        assert_eq!(task.timeout, Some(Duration::from_secs(60)));
624        assert!(task.can_retry());
625    }
626
627    #[test]
628    fn test_workflow_creation() {
629        let mut workflow = Workflow::new("wf1".to_string(), "Test Workflow".to_string());
630        let task = Task::new("task1".to_string(), "Test Task".to_string(), vec![1, 2, 3]);
631
632        workflow.add_task(task);
633        assert_eq!(workflow.tasks.len(), 1);
634        assert_eq!(workflow.execution_order.len(), 1);
635
636        let ready_tasks = workflow.get_ready_tasks();
637        assert_eq!(ready_tasks.len(), 1);
638    }
639
640    #[test]
641    fn test_orchestrator_node() {
642        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
643        let mut node = OrchestratorNode::new("node1".to_string(), address, 10);
644
645        assert!(node.can_accept_task());
646        assert!(node.is_responsive(Duration::from_secs(30)));
647
648        node.current_load = 10;
649        assert!(!node.can_accept_task());
650    }
651
652    #[test]
653    fn test_orchestration_engine() {
654        let engine = OrchestrationEngine::new();
655
656        let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
657        let node = OrchestratorNode::new("node1".to_string(), address, 5);
658
659        assert!(engine.register_node(node).is_ok());
660
661        let task = Task::new("task1".to_string(), "Test Task".to_string(), vec![1, 2, 3]);
662        assert!(engine.submit_task(task).is_ok());
663
664        let stats = engine.get_statistics().unwrap();
665        assert_eq!(stats.total_nodes, 1);
666        assert_eq!(stats.total_workflows, 1);
667    }
668
669    #[test]
670    fn test_orchestration_stats() {
671        let stats = OrchestrationStats {
672            total_workflows: 10,
673            pending_workflows: 2,
674            running_workflows: 3,
675            completed_workflows: 5,
676            total_tasks: 50,
677            pending_tasks: 10,
678            running_tasks: 15,
679            total_nodes: 5,
680            active_nodes: 4,
681            total_capacity: 100,
682            current_load: 75,
683        };
684
685        assert_eq!(stats.capacity_utilization(), 75.0);
686        assert_eq!(stats.node_availability(), 80.0);
687    }
688}