oxigdal_workflow/dag/
parallelism.rs1use crate::dag::graph::{ResourceRequirements, WorkflowDag};
4use crate::dag::topological_sort::create_execution_plan;
5use crate::error::Result;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct ResourcePool {
12 pub total_cpu_cores: f64,
14 pub total_memory_mb: u64,
16 pub total_gpus: u32,
18 pub total_disk_mb: u64,
20 pub custom_resources: HashMap<String, f64>,
22}
23
24impl Default for ResourcePool {
25 fn default() -> Self {
26 Self {
27 total_cpu_cores: num_cpus::get() as f64,
28 total_memory_mb: 8192,
29 total_gpus: 0,
30 total_disk_mb: 102400,
31 custom_resources: HashMap::new(),
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
38pub struct AvailableResources {
39 pub cpu_cores: f64,
41 pub memory_mb: u64,
43 pub gpus: u32,
45 pub disk_mb: u64,
47 pub custom_resources: HashMap<String, f64>,
49}
50
51impl From<ResourcePool> for AvailableResources {
52 fn from(pool: ResourcePool) -> Self {
53 Self {
54 cpu_cores: pool.total_cpu_cores,
55 memory_mb: pool.total_memory_mb,
56 gpus: pool.total_gpus,
57 disk_mb: pool.total_disk_mb,
58 custom_resources: pool.custom_resources,
59 }
60 }
61}
62
63impl AvailableResources {
64 pub fn can_allocate(&self, requirements: &ResourceRequirements) -> bool {
66 if self.cpu_cores < requirements.cpu_cores {
67 return false;
68 }
69 if self.memory_mb < requirements.memory_mb {
70 return false;
71 }
72 if requirements.gpu && self.gpus == 0 {
73 return false;
74 }
75 if self.disk_mb < requirements.disk_mb {
76 return false;
77 }
78
79 for (key, &required_value) in &requirements.custom {
81 if let Some(&available_value) = self.custom_resources.get(key) {
82 if available_value < required_value {
83 return false;
84 }
85 } else {
86 return false;
87 }
88 }
89
90 true
91 }
92
93 pub fn allocate(&mut self, requirements: &ResourceRequirements) -> bool {
95 if !self.can_allocate(requirements) {
96 return false;
97 }
98
99 self.cpu_cores -= requirements.cpu_cores;
100 self.memory_mb -= requirements.memory_mb;
101 if requirements.gpu {
102 self.gpus -= 1;
103 }
104 self.disk_mb -= requirements.disk_mb;
105
106 for (key, &value) in &requirements.custom {
107 if let Some(available) = self.custom_resources.get_mut(key) {
108 *available -= value;
109 }
110 }
111
112 true
113 }
114
115 pub fn release(&mut self, requirements: &ResourceRequirements) {
117 self.cpu_cores += requirements.cpu_cores;
118 self.memory_mb += requirements.memory_mb;
119 if requirements.gpu {
120 self.gpus += 1;
121 }
122 self.disk_mb += requirements.disk_mb;
123
124 for (key, &value) in &requirements.custom {
125 *self.custom_resources.entry(key.clone()).or_insert(0.0) += value;
126 }
127 }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ParallelSchedule {
133 pub waves: Vec<ExecutionWave>,
135 pub estimated_time_secs: u64,
137 pub max_parallelism: usize,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct ExecutionWave {
144 pub task_ids: Vec<String>,
146 pub estimated_time_secs: u64,
148}
149
150pub fn create_parallel_schedule(
152 dag: &WorkflowDag,
153 resource_pool: &ResourcePool,
154) -> Result<ParallelSchedule> {
155 let execution_plan = create_execution_plan(dag)?;
156 let mut waves = Vec::new();
157 let mut total_time = 0u64;
158 let mut max_parallelism = 0usize;
159
160 for level in execution_plan {
161 let mut available_resources = AvailableResources::from(resource_pool.clone());
162 let mut current_wave = Vec::new();
163 let mut waiting_tasks = level.clone();
164 let mut wave_time = 0u64;
165
166 let mut i = 0;
168 while i < waiting_tasks.len() {
169 let task_id = &waiting_tasks[i];
170 if let Some(task) = dag.get_task(task_id) {
171 if available_resources.can_allocate(&task.resources) {
172 available_resources.allocate(&task.resources);
173 current_wave.push(task_id.clone());
174 wave_time = wave_time.max(task.timeout_secs.unwrap_or(60));
175 waiting_tasks.remove(i);
176 } else {
177 i += 1;
178 }
179 } else {
180 i += 1;
181 }
182 }
183
184 if !current_wave.is_empty() {
185 max_parallelism = max_parallelism.max(current_wave.len());
186 waves.push(ExecutionWave {
187 task_ids: current_wave,
188 estimated_time_secs: wave_time,
189 });
190 total_time += wave_time;
191 }
192
193 while !waiting_tasks.is_empty() {
195 let mut available_resources = AvailableResources::from(resource_pool.clone());
196 let mut current_wave = Vec::new();
197 let mut wave_time = 0u64;
198 let mut i = 0;
199
200 while i < waiting_tasks.len() {
201 let task_id = &waiting_tasks[i];
202 if let Some(task) = dag.get_task(task_id) {
203 if available_resources.can_allocate(&task.resources) {
204 available_resources.allocate(&task.resources);
205 current_wave.push(task_id.clone());
206 wave_time = wave_time.max(task.timeout_secs.unwrap_or(60));
207 waiting_tasks.remove(i);
208 } else {
209 i += 1;
210 }
211 } else {
212 i += 1;
213 }
214 }
215
216 if !current_wave.is_empty() {
217 max_parallelism = max_parallelism.max(current_wave.len());
218 waves.push(ExecutionWave {
219 task_ids: current_wave,
220 estimated_time_secs: wave_time,
221 });
222 total_time += wave_time;
223 } else {
224 break;
226 }
227 }
228 }
229
230 Ok(ParallelSchedule {
231 waves,
232 estimated_time_secs: total_time,
233 max_parallelism,
234 })
235}
236
237pub fn calculate_resource_utilization(
239 dag: &WorkflowDag,
240 schedule: &ParallelSchedule,
241) -> Vec<ResourceUtilization> {
242 let mut utilization = Vec::new();
243 let mut current_time = 0u64;
244
245 for wave in &schedule.waves {
246 let mut cpu_used = 0.0;
247 let mut memory_used = 0u64;
248 let mut gpus_used = 0u32;
249
250 for task_id in &wave.task_ids {
251 if let Some(task) = dag.get_task(task_id) {
252 cpu_used += task.resources.cpu_cores;
253 memory_used += task.resources.memory_mb;
254 if task.resources.gpu {
255 gpus_used += 1;
256 }
257 }
258 }
259
260 utilization.push(ResourceUtilization {
261 time_secs: current_time,
262 cpu_cores_used: cpu_used,
263 memory_mb_used: memory_used,
264 gpus_used,
265 task_count: wave.task_ids.len(),
266 });
267
268 current_time += wave.estimated_time_secs;
269 }
270
271 utilization
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct ResourceUtilization {
277 pub time_secs: u64,
279 pub cpu_cores_used: f64,
281 pub memory_mb_used: u64,
283 pub gpus_used: u32,
285 pub task_count: usize,
287}
288
289pub fn optimize_schedule(
291 dag: &WorkflowDag,
292 resource_pool: &ResourcePool,
293) -> Result<ParallelSchedule> {
294 let schedule = create_parallel_schedule(dag, resource_pool)?;
296
297 let mut optimized_waves = Vec::new();
299 let mut i = 0;
300
301 while i < schedule.waves.len() {
302 let mut current_wave = schedule.waves[i].clone();
303 let mut current_resources = AvailableResources::from(resource_pool.clone());
304
305 for task_id in ¤t_wave.task_ids {
307 if let Some(task) = dag.get_task(task_id) {
308 current_resources.allocate(&task.resources);
309 }
310 }
311
312 if i + 1 < schedule.waves.len() {
314 let next_wave = &schedule.waves[i + 1];
315 let mut merged_tasks = Vec::new();
316
317 for task_id in &next_wave.task_ids {
318 if let Some(task) = dag.get_task(task_id) {
319 if current_resources.can_allocate(&task.resources) {
322 current_resources.allocate(&task.resources);
323 merged_tasks.push(task_id.clone());
324 current_wave.estimated_time_secs = current_wave
325 .estimated_time_secs
326 .max(task.timeout_secs.unwrap_or(60));
327 }
328 }
329 }
330
331 current_wave.task_ids.extend(merged_tasks);
332 }
333
334 optimized_waves.push(current_wave);
335 i += 1;
336 }
337
338 let total_time = optimized_waves.iter().map(|w| w.estimated_time_secs).sum();
340 let max_parallelism = optimized_waves
341 .iter()
342 .map(|w| w.task_ids.len())
343 .max()
344 .unwrap_or(0);
345
346 Ok(ParallelSchedule {
347 waves: optimized_waves,
348 estimated_time_secs: total_time,
349 max_parallelism,
350 })
351}
352
353