cpm_rs/
scheduler.rs

1use std::collections::HashMap;
2use std::cmp::Ordering::Equal;
3
4use crate::customtask::CustomTask;
5use crate::path::Path;
6
7/// Different state indicators of Scheduler.
8#[derive(Debug, PartialEq)]
9enum SchedulerState {
10	/// Uninitialized.
11	Unknown,
12	/// Something has been changed, it should be recalculated.
13	Edited,
14	/// All calculations are finished, results can be extracted.
15	Ready,
16}
17
18/// The scheduler implements the basic functionality to
19/// calculate critical paths plus the number of
20/// maximum parallel jobs at a time.
21#[derive(Debug)]
22pub struct Scheduler<T>
23where T: From<i8>
24	+ std::clone::Clone
25	+ std::marker::Copy
26	+ std::ops::Sub::<Output = T>
27	+ std::ops::Add<Output = T>
28	+ std::fmt::Display
29	+ std::fmt::Debug
30	+ std::cmp::PartialOrd
31	+ std::ops::AddAssign
32{
33	tasks: HashMap<String, CustomTask<T>>,
34	state: SchedulerState,
35}
36
37impl <T> Scheduler<T>
38where T: From<i8>
39	+ std::clone::Clone
40	+ std::marker::Copy
41	+ std::ops::Sub::<Output = T>
42	+ std::ops::Add<Output = T>
43	+ std::fmt::Display
44	+ std::fmt::Debug
45	+ std::cmp::PartialOrd
46	+ std::ops::AddAssign
47{
48	pub fn new() -> Self {
49		Scheduler {
50			tasks: HashMap::new(),
51			state: SchedulerState::Unknown,
52		}
53	}
54
55	/// Ignites all the calculations.
56	pub fn schedule(&mut self) -> Result<(), String>{
57		self.calculate()?;
58		// self.print_output();
59		Ok(())
60	}
61
62	/// Recalculate all parameters without providing new tasks.
63	pub fn calculate(&mut self) -> Result<(), String> {
64		self.calculate_es_ef()?;
65		self.calculate_ls_lf()?;
66		self.state = SchedulerState::Ready;
67		Ok(())
68	}
69
70	pub fn add_task(&mut self, task: CustomTask<T>) -> Result<(), String> {
71		match self.check_task_duplication(&task) {
72			Ok(_) => { self.tasks.insert(task.get_id(), task); return Ok(()); },
73			Err(e) => { return Err(format!("Failed to add task: {}", e)); }
74		}
75	}
76
77	/// Sets up a list of tasks, overwriting the already listed ones.
78	pub fn fill_tasklist(&mut self, task_list: Vec<CustomTask<T>>) -> Result<(), String> {
79		self.state = SchedulerState::Edited;
80		let mut new_tasks: HashMap<String, CustomTask<T>> = HashMap::new();
81		for task in &task_list {
82			if new_tasks.contains_key(&task.get_id()) {
83				return Err(format!("task ID duplication: {}", task.get_id()));
84			} else {
85				new_tasks.insert(task.get_id(), task.clone());
86			}
87		}
88		self.tasks = new_tasks;
89		Ok(())
90	}
91
92	fn check_task_duplication(&self, ref_task: &CustomTask<T>) -> Result<(), String> {
93		if self.tasks.contains_key(&ref_task.get_id()) {
94			return Err(format!("task ID is already added: {}", ref_task.get_id()));
95		} else {
96			return Ok(());
97		}
98	}
99
100	/// Gets a task by it's name.
101	pub fn get_task_by_name(&self, task_name: &String) -> Option<&CustomTask<T>> {
102		self.tasks.get(task_name)
103	}
104
105	/// This one makes the scheduler get the Edited state if the task is found.
106	pub fn get_mut_task_by_name(&mut self, task_name: &String)
107	-> Option<&mut CustomTask<T>> {
108		self.tasks.get_mut(task_name)
109	}
110
111	/// Gets dependencies of a task.
112	pub fn get_task_dependencies(&self, task_ref: &CustomTask<T>) -> Vec<&CustomTask<T>> {
113		let mut dependencies: Vec<&CustomTask<T>> = vec!{};
114		for dep_name in &task_ref.get_dependencies() {
115			match self.get_task_by_name(dep_name) {
116				Some(dep_ref) => dependencies.push(dep_ref),
117				None => {},
118			}
119		}
120		dependencies
121	}
122
123	/// Gets successors of a task.
124	pub fn get_task_successors(&self, task_ref: &CustomTask<T>) -> Vec<&CustomTask<T>> {
125		let mut successors: Vec<&CustomTask<T>> = vec!{};
126		for (_, task) in &self.tasks {
127			if task.get_dependencies().contains(&task_ref.get_id()) {
128				successors.push(&task);
129			}
130		}
131		successors
132	}
133
134	// TODO: optimize
135	fn calculate_es_ef(&mut self) -> Result<(), String> {
136		debug!("Calculating ES-EF");
137		let mut sorting_list = self.tasks.clone();
138		loop {
139			for (id, task) in sorting_list.clone() {
140				debug!("Task taken: {}", id);
141				let deps = self.get_task_dependencies(&task);
142				let successor_count = self.get_task_successors(&task).len();
143
144				if deps.len() == 0 {
145					match self.get_mut_task_by_name(&id) {
146						None => {}, // TODO
147						Some(original_task) => {
148							original_task.set_early_start(0.into())
149								.expect("Could not set early start.");
150							original_task.set_early_finish(
151								original_task.get_duration()
152							).expect("Could not set early finish.");
153						}
154					}
155					debug!("ESEF SP calculated: \n{:?}", self.get_task_by_name(&id));
156				} else {
157					let mut max_dep_ef: T = 0.into();
158					let mut invalid_deps = 0;
159					for dep in deps {
160						if dep.get_early_finish() == None {
161							invalid_deps += 1;
162							break;
163						}
164						match dep.get_early_finish() {
165							None => {
166								return Err("Uncalculated early finish found.".to_string());
167							},
168							Some(ef) => {
169								//max_dep_ef = max_dep_ef.max(ef);
170								if ef > max_dep_ef {
171									max_dep_ef = ef;
172								}
173							},
174						}
175					}
176					debug!("Invalid deps: {}", invalid_deps);
177					if invalid_deps == 0 {
178						match self.get_mut_task_by_name(&id) {
179							None => {},
180							Some(original_task) => {
181								original_task.set_early_start(max_dep_ef)
182									.expect("Could not set early start.");
183								original_task.set_early_finish(
184									max_dep_ef + original_task.get_duration()
185								).expect("Could not set early finish.");
186							}
187						}
188						debug!("ESEF calculated: \n{:?}", self.get_task_by_name(&id));
189					} else {
190						continue;
191					}
192				}
193				if successor_count == 0 {
194					debug!("No successors found for {}", id);
195					match self.get_mut_task_by_name(&id) {
196						None => { },
197						Some(original_task) => {
198							original_task.set_late_finish(
199								original_task.get_early_finish().unwrap()
200							).expect("Could not set late finish.");
201							original_task.set_late_start(
202								original_task.get_early_start().unwrap()
203							).expect("Could not set late start.");
204						}
205					}
206					debug!("ESEF EP calculated: \n{:?}", self.get_task_by_name(&id));
207				}
208				sorting_list.remove(&id);
209				break;
210			}
211			if sorting_list.len() == 0 { break; }
212		}
213		Ok(())
214	}
215
216	// TODO: optimize
217	fn calculate_ls_lf(&mut self) -> Result<(), String> {
218		debug!("Calculating LS-LF");
219		let mut sorting_list = self.tasks.clone();
220		loop {
221			for (id, task) in sorting_list.clone() {
222				debug!("Task taken: {}", id);
223				let successors = self.get_task_successors(&task);
224				if successors.len() > 0 {
225					let mut min_successor_ls: Option<T> = None;
226					for successor in successors {
227						match successor.get_late_start() {
228							None => {
229								min_successor_ls = None;
230								break;
231							},
232							Some(ls) => {
233								if ls < 0.into() {
234									min_successor_ls = None;
235									break;
236								}
237								match min_successor_ls {
238									None => {
239										min_successor_ls = Some(ls);
240									},
241									Some(current_min) => {
242										if current_min < ls {
243											min_successor_ls = Some(current_min);
244										} else {
245											min_successor_ls = Some(ls);
246										}
247									}
248								}
249							},
250						}
251						debug!("min ls: {:?}", min_successor_ls);
252					}
253					match min_successor_ls {
254						Some(min) => {
255							match self.get_mut_task_by_name(&task.get_id()) {
256								None => {}, // TODO
257								Some(original_task) => {
258									original_task.set_late_finish(min)
259										.expect("Could not set late finish.");
260									original_task.set_late_start(
261										min - original_task.get_duration()
262									).expect("Could not set late start.");
263								}
264							}
265							debug!("LSLF calculated: \n{:?}", id);
266							sorting_list.remove(&id);
267							break;
268						},
269						None => {},
270					}
271				} else {
272					sorting_list.remove(&id);
273					break;
274				}
275			}
276			if sorting_list.len() == 0 { break; }
277		}
278		Ok(())
279	}
280
281	/// Get all the entry points of the graph.
282	pub fn get_startpoints(&self) -> Vec<&CustomTask<T>> {
283		let mut startpoints: Vec<&CustomTask<T>> = vec!{};
284		for (_, task) in &self.tasks {
285			if self.get_task_dependencies(&task).len() == 0 {
286				startpoints.push(&task);
287			}
288		}
289		startpoints
290	}
291
292	/// Get all the end points of the graph.
293	pub fn get_endpoints(&self) -> Vec<&CustomTask<T>> {
294		let mut endpoints: Vec<&CustomTask<T>> = vec!{};
295		for (_, task) in &self.tasks {
296			if self.get_task_successors(&task).len() == 0 {
297				endpoints.push(&task);
298			}
299		}
300		endpoints
301	}
302
303	/// Returns all paths that are able to trace from the given task.
304	pub fn get_paths_from_task(&self, start_point: &CustomTask<T>, level: u32)
305	-> Vec<Path<T>> {
306		let mut head = start_point;
307		let mut base_path = Path::new();
308		let mut found_paths: Vec<Path<T>> = vec!{};
309		base_path.add_task(start_point);
310		loop {
311			let deps = self.get_task_dependencies(&head);
312			if deps.len() > 1 {
313				for dep in &deps {
314					let sub_paths = self.get_paths_from_task(&dep, level + 1);
315					for path in sub_paths {
316						let mut concatenated_path = base_path.clone();
317						concatenated_path.join_path(&path);
318						found_paths.push(concatenated_path);
319					}
320				}
321				break;
322			}
323			if self.get_task_dependencies(&head).len() == 0 {
324				found_paths.push(base_path);
325				break;
326			}
327			if deps.len() == 1 {
328				base_path.add_task(deps[0]);
329				head = deps[0];
330			}
331		}
332		found_paths
333	}
334
335	/// Gets all the paths in the graph.
336	/// Attention! Does not check the possible cycles in dependencies!
337	/// TODO: make it parallel.
338	pub fn get_all_paths(&self) -> Vec<Path<T>> {
339		let mut paths: Vec<Path<T>> = vec!{};
340		let endpoints = self.get_endpoints();
341		for task in endpoints {
342			paths.append(&mut self.get_paths_from_task(&task, 0));
343		}
344		paths
345	}
346
347	pub fn get_critical_paths(&self) -> Vec<Path<T>> {
348		let mut paths = self.get_all_paths();
349		let mut candidates: Vec<Path<T>> = vec!{};
350		let mut critical_paths: Vec<Path<T>> = vec!{};
351		let mut max_length: T = 0.into();
352		for path in &mut paths {
353			// reverse paths
354			path.reverse_tasks();
355			if path.get_total_float() == 0.into() {
356				candidates.push(path.clone());
357				if path.get_dur() > max_length {
358					max_length = path.get_dur();
359				}
360			}
361		}
362		for path in candidates {
363			if path.get_dur() == max_length {
364				critical_paths.push(path);
365			}
366		}
367		debug!("Critical paths: {:?}", critical_paths.len());
368		critical_paths
369	}
370
371	/// Calculates the maximum number of parallel jobs at a time.
372	/// Scheduler has to be in ready state.
373	pub fn get_parallelism(&self) -> Result<u32, String> {
374		debug!("Getting parallel task count:");
375		if self.state == SchedulerState::Ready {
376			let mut ef_list: Vec<Option<T>> = vec!{Some(0.into())};
377			let mut max_parallel = 0;
378			for (_, task) in &self.tasks {
379				match task.get_early_finish() {
380					Some(_) => {
381						ef_list.push(task.get_early_finish());
382					},
383					None => {
384						return Err(
385							format!(
386								"Some of the early finish values have not been calculated! Task: {}"
387								, task.get_id()
388							)
389						);
390					},
391				}
392			}
393			ef_list.dedup();
394			ef_list.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Equal));
395			for ef_idx in 0..ef_list.len() - 1 {
396				let section_start = ef_list[ef_idx].unwrap();
397				let section_end = ef_list[ef_idx + 1].unwrap();
398				let section_parallel = self.tasks.iter().filter(
399					| (_, task) |
400					task.get_early_start().unwrap() <= section_start
401					&& section_end <= task.get_early_finish().unwrap()
402					).collect::<Vec<(&String, &CustomTask<T>)>>();
403				if section_parallel.len() > max_parallel {
404					max_parallel = section_parallel.len();
405					debug!("Section: {} .. {}", section_start, section_end);
406					debug!("section tasks: {:?}", section_parallel);
407				}
408			}
409			return Ok(max_parallel.try_into().unwrap());
410		} else {
411			return Err(
412				format!("Scheduler is in state {:?} instead of being ready.", self.state)
413			);
414		}
415	}
416
417	fn print_output(&self) {
418		let critical_paths = self.get_critical_paths();
419		println!("Critical paths: {}", critical_paths.len());
420		for path in &critical_paths {
421			println!("\tCritical path: {}", path.get_path_string());
422			println!("\tPath duration: {}", path.get_dur());
423		}
424		println!("Number of maximum parallel jobs: {}", self.get_parallelism().unwrap());
425	}
426
427}
428