Skip to main content

oxigdal_workflow/dag/
parallelism.rs

1//! Parallel execution planning and resource allocation.
2
3use crate::dag::graph::{ResourceRequirements, WorkflowDag};
4use crate::dag::topological_sort::create_execution_plan;
5use crate::error::Result;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// Resource pool for parallel execution.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ResourcePool {
12    /// Total CPU cores available.
13    pub total_cpu_cores: f64,
14    /// Total memory in MB.
15    pub total_memory_mb: u64,
16    /// Number of GPUs available.
17    pub total_gpus: u32,
18    /// Total disk space in MB.
19    pub total_disk_mb: u64,
20    /// Custom resources.
21    pub custom_resources: HashMap<String, f64>,
22}
23
24impl Default for ResourcePool {
25    fn default() -> Self {
26        Self {
27            total_cpu_cores: num_cpus::get() as f64,
28            total_memory_mb: 8192,
29            total_gpus: 0,
30            total_disk_mb: 102400,
31            custom_resources: HashMap::new(),
32        }
33    }
34}
35
36/// Available resources at a point in time.
37#[derive(Debug, Clone)]
38pub struct AvailableResources {
39    /// CPU cores available.
40    pub cpu_cores: f64,
41    /// Memory available in MB.
42    pub memory_mb: u64,
43    /// GPUs available.
44    pub gpus: u32,
45    /// Disk space available in MB.
46    pub disk_mb: u64,
47    /// Custom resources available.
48    pub custom_resources: HashMap<String, f64>,
49}
50
51impl From<ResourcePool> for AvailableResources {
52    fn from(pool: ResourcePool) -> Self {
53        Self {
54            cpu_cores: pool.total_cpu_cores,
55            memory_mb: pool.total_memory_mb,
56            gpus: pool.total_gpus,
57            disk_mb: pool.total_disk_mb,
58            custom_resources: pool.custom_resources,
59        }
60    }
61}
62
63impl AvailableResources {
64    /// Check if the required resources can be allocated.
65    pub fn can_allocate(&self, requirements: &ResourceRequirements) -> bool {
66        if self.cpu_cores < requirements.cpu_cores {
67            return false;
68        }
69        if self.memory_mb < requirements.memory_mb {
70            return false;
71        }
72        if requirements.gpu && self.gpus == 0 {
73            return false;
74        }
75        if self.disk_mb < requirements.disk_mb {
76            return false;
77        }
78
79        // Check custom resources
80        for (key, &required_value) in &requirements.custom {
81            if let Some(&available_value) = self.custom_resources.get(key) {
82                if available_value < required_value {
83                    return false;
84                }
85            } else {
86                return false;
87            }
88        }
89
90        true
91    }
92
93    /// Allocate resources.
94    pub fn allocate(&mut self, requirements: &ResourceRequirements) -> bool {
95        if !self.can_allocate(requirements) {
96            return false;
97        }
98
99        self.cpu_cores -= requirements.cpu_cores;
100        self.memory_mb -= requirements.memory_mb;
101        if requirements.gpu {
102            self.gpus -= 1;
103        }
104        self.disk_mb -= requirements.disk_mb;
105
106        for (key, &value) in &requirements.custom {
107            if let Some(available) = self.custom_resources.get_mut(key) {
108                *available -= value;
109            }
110        }
111
112        true
113    }
114
115    /// Release resources.
116    pub fn release(&mut self, requirements: &ResourceRequirements) {
117        self.cpu_cores += requirements.cpu_cores;
118        self.memory_mb += requirements.memory_mb;
119        if requirements.gpu {
120            self.gpus += 1;
121        }
122        self.disk_mb += requirements.disk_mb;
123
124        for (key, &value) in &requirements.custom {
125            *self.custom_resources.entry(key.clone()).or_insert(0.0) += value;
126        }
127    }
128}
129
130/// Parallel execution schedule.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ParallelSchedule {
133    /// Execution waves - each wave contains tasks that can run in parallel.
134    pub waves: Vec<ExecutionWave>,
135    /// Total estimated execution time in seconds.
136    pub estimated_time_secs: u64,
137    /// Maximum parallelism (max tasks running at once).
138    pub max_parallelism: usize,
139}
140
141/// A wave of parallel task execution.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct ExecutionWave {
144    /// Task IDs in this wave.
145    pub task_ids: Vec<String>,
146    /// Estimated execution time for this wave.
147    pub estimated_time_secs: u64,
148}
149
150/// Create a parallel execution schedule considering resource constraints.
151pub fn create_parallel_schedule(
152    dag: &WorkflowDag,
153    resource_pool: &ResourcePool,
154) -> Result<ParallelSchedule> {
155    let execution_plan = create_execution_plan(dag)?;
156    let mut waves = Vec::new();
157    let mut total_time = 0u64;
158    let mut max_parallelism = 0usize;
159
160    for level in execution_plan {
161        let mut available_resources = AvailableResources::from(resource_pool.clone());
162        let mut current_wave = Vec::new();
163        let mut waiting_tasks = level.clone();
164        let mut wave_time = 0u64;
165
166        // First pass: allocate resources to as many tasks as possible
167        let mut i = 0;
168        while i < waiting_tasks.len() {
169            let task_id = &waiting_tasks[i];
170            if let Some(task) = dag.get_task(task_id) {
171                if available_resources.can_allocate(&task.resources) {
172                    available_resources.allocate(&task.resources);
173                    current_wave.push(task_id.clone());
174                    wave_time = wave_time.max(task.timeout_secs.unwrap_or(60));
175                    waiting_tasks.remove(i);
176                } else {
177                    i += 1;
178                }
179            } else {
180                i += 1;
181            }
182        }
183
184        if !current_wave.is_empty() {
185            max_parallelism = max_parallelism.max(current_wave.len());
186            waves.push(ExecutionWave {
187                task_ids: current_wave,
188                estimated_time_secs: wave_time,
189            });
190            total_time += wave_time;
191        }
192
193        // Process remaining tasks that couldn't fit in the first wave
194        while !waiting_tasks.is_empty() {
195            let mut available_resources = AvailableResources::from(resource_pool.clone());
196            let mut current_wave = Vec::new();
197            let mut wave_time = 0u64;
198            let mut i = 0;
199
200            while i < waiting_tasks.len() {
201                let task_id = &waiting_tasks[i];
202                if let Some(task) = dag.get_task(task_id) {
203                    if available_resources.can_allocate(&task.resources) {
204                        available_resources.allocate(&task.resources);
205                        current_wave.push(task_id.clone());
206                        wave_time = wave_time.max(task.timeout_secs.unwrap_or(60));
207                        waiting_tasks.remove(i);
208                    } else {
209                        i += 1;
210                    }
211                } else {
212                    i += 1;
213                }
214            }
215
216            if !current_wave.is_empty() {
217                max_parallelism = max_parallelism.max(current_wave.len());
218                waves.push(ExecutionWave {
219                    task_ids: current_wave,
220                    estimated_time_secs: wave_time,
221                });
222                total_time += wave_time;
223            } else {
224                // Can't make progress, break to avoid infinite loop
225                break;
226            }
227        }
228    }
229
230    Ok(ParallelSchedule {
231        waves,
232        estimated_time_secs: total_time,
233        max_parallelism,
234    })
235}
236
237/// Calculate resource utilization over time.
238pub fn calculate_resource_utilization(
239    dag: &WorkflowDag,
240    schedule: &ParallelSchedule,
241) -> Vec<ResourceUtilization> {
242    let mut utilization = Vec::new();
243    let mut current_time = 0u64;
244
245    for wave in &schedule.waves {
246        let mut cpu_used = 0.0;
247        let mut memory_used = 0u64;
248        let mut gpus_used = 0u32;
249
250        for task_id in &wave.task_ids {
251            if let Some(task) = dag.get_task(task_id) {
252                cpu_used += task.resources.cpu_cores;
253                memory_used += task.resources.memory_mb;
254                if task.resources.gpu {
255                    gpus_used += 1;
256                }
257            }
258        }
259
260        utilization.push(ResourceUtilization {
261            time_secs: current_time,
262            cpu_cores_used: cpu_used,
263            memory_mb_used: memory_used,
264            gpus_used,
265            task_count: wave.task_ids.len(),
266        });
267
268        current_time += wave.estimated_time_secs;
269    }
270
271    utilization
272}
273
274/// Resource utilization at a point in time.
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct ResourceUtilization {
277    /// Time in seconds from start.
278    pub time_secs: u64,
279    /// CPU cores in use.
280    pub cpu_cores_used: f64,
281    /// Memory in use (MB).
282    pub memory_mb_used: u64,
283    /// GPUs in use.
284    pub gpus_used: u32,
285    /// Number of tasks running.
286    pub task_count: usize,
287}
288
289/// Optimize the schedule for better resource utilization.
290pub fn optimize_schedule(
291    dag: &WorkflowDag,
292    resource_pool: &ResourcePool,
293) -> Result<ParallelSchedule> {
294    // Start with the basic schedule
295    let schedule = create_parallel_schedule(dag, resource_pool)?;
296
297    // Try to merge waves with low resource utilization
298    let mut optimized_waves = Vec::new();
299    let mut i = 0;
300
301    while i < schedule.waves.len() {
302        let mut current_wave = schedule.waves[i].clone();
303        let mut current_resources = AvailableResources::from(resource_pool.clone());
304
305        // Allocate current wave resources
306        for task_id in &current_wave.task_ids {
307            if let Some(task) = dag.get_task(task_id) {
308                current_resources.allocate(&task.resources);
309            }
310        }
311
312        // Try to merge with next wave if possible
313        if i + 1 < schedule.waves.len() {
314            let next_wave = &schedule.waves[i + 1];
315            let mut merged_tasks = Vec::new();
316
317            for task_id in &next_wave.task_ids {
318                if let Some(task) = dag.get_task(task_id) {
319                    // Check if we can pull this task into the current wave
320                    // Only if it doesn't have dependencies on tasks in the next wave
321                    if current_resources.can_allocate(&task.resources) {
322                        current_resources.allocate(&task.resources);
323                        merged_tasks.push(task_id.clone());
324                        current_wave.estimated_time_secs = current_wave
325                            .estimated_time_secs
326                            .max(task.timeout_secs.unwrap_or(60));
327                    }
328                }
329            }
330
331            current_wave.task_ids.extend(merged_tasks);
332        }
333
334        optimized_waves.push(current_wave);
335        i += 1;
336    }
337
338    // Recalculate statistics
339    let total_time = optimized_waves.iter().map(|w| w.estimated_time_secs).sum();
340    let max_parallelism = optimized_waves
341        .iter()
342        .map(|w| w.task_ids.len())
343        .max()
344        .unwrap_or(0);
345
346    Ok(ParallelSchedule {
347        waves: optimized_waves,
348        estimated_time_secs: total_time,
349        max_parallelism,
350    })
351}
352
353// Add num_cpus to Cargo.toml dependencies
354// Note: This is a comment for the implementation