Skip to main content

oxihuman_core/
task_graph.rs

1// Copyright (C) 2026 COOLJAPAN OU (Team KitaSan)
2// SPDX-License-Identifier: Apache-2.0
3
4//! Async-style task dependency graph scheduler (synchronous execution, DAG-based ordering).
5
6#[allow(dead_code)]
7#[derive(Debug, Clone, PartialEq)]
8pub enum TaskStatus {
9    Pending,
10    Ready,
11    Running,
12    Completed,
13    Failed(String),
14}
15
16#[allow(dead_code)]
17#[derive(Debug, Clone)]
18pub struct Task {
19    pub id: usize,
20    pub name: String,
21    pub dependencies: Vec<usize>,
22    pub status: TaskStatus,
23    pub result: Option<String>,
24    pub priority: u32,
25}
26
27#[allow(dead_code)]
28#[derive(Debug, Clone)]
29pub struct TaskGraph {
30    pub tasks: Vec<Task>,
31    pub next_id: usize,
32}
33
34/// Create an empty task graph.
35#[allow(dead_code)]
36pub fn new_task_graph() -> TaskGraph {
37    TaskGraph {
38        tasks: Vec::new(),
39        next_id: 0,
40    }
41}
42
43/// Add a task to the graph and return its assigned ID.
44#[allow(dead_code)]
45pub fn add_task(graph: &mut TaskGraph, name: &str, deps: Vec<usize>, priority: u32) -> usize {
46    let id = graph.next_id;
47    graph.next_id += 1;
48    graph.tasks.push(Task {
49        id,
50        name: name.to_string(),
51        dependencies: deps,
52        status: TaskStatus::Pending,
53        result: None,
54        priority,
55    });
56    id
57}
58
59/// Return IDs of tasks whose dependencies are all Completed and whose own status is Pending.
60#[allow(dead_code)]
61pub fn get_ready_tasks(graph: &TaskGraph) -> Vec<usize> {
62    let completed_ids: std::collections::HashSet<usize> = graph
63        .tasks
64        .iter()
65        .filter(|t| t.status == TaskStatus::Completed)
66        .map(|t| t.id)
67        .collect();
68
69    graph
70        .tasks
71        .iter()
72        .filter(|t| {
73            t.status == TaskStatus::Pending
74                && t.dependencies.iter().all(|dep| completed_ids.contains(dep))
75        })
76        .map(|t| t.id)
77        .collect()
78}
79
80/// Mark a task as Completed with an optional result value.
81#[allow(dead_code)]
82pub fn mark_complete(graph: &mut TaskGraph, id: usize, result: Option<String>) {
83    if let Some(task) = graph.tasks.iter_mut().find(|t| t.id == id) {
84        task.status = TaskStatus::Completed;
85        task.result = result;
86    }
87}
88
89/// Mark a task as Failed with an error message.
90#[allow(dead_code)]
91pub fn mark_failed(graph: &mut TaskGraph, id: usize, error: String) {
92    if let Some(task) = graph.tasks.iter_mut().find(|t| t.id == id) {
93        task.status = TaskStatus::Failed(error);
94    }
95}
96
97/// Compute a topological order using Kahn's algorithm. Returns Err if a cycle is detected.
98#[allow(dead_code)]
99pub fn topological_order(graph: &TaskGraph) -> Result<Vec<usize>, String> {
100    let n = graph.tasks.len();
101    if n == 0 {
102        return Ok(vec![]);
103    }
104
105    // Build id → index map
106    let id_to_idx: std::collections::HashMap<usize, usize> = graph
107        .tasks
108        .iter()
109        .enumerate()
110        .map(|(i, t)| (t.id, i))
111        .collect();
112
113    let mut in_degree = vec![0usize; n];
114    let mut adj: Vec<Vec<usize>> = vec![vec![]; n]; // adj[i] = list of indices that depend on i
115
116    for (idx, task) in graph.tasks.iter().enumerate() {
117        for &dep_id in &task.dependencies {
118            if let Some(&dep_idx) = id_to_idx.get(&dep_id) {
119                adj[dep_idx].push(idx);
120                in_degree[idx] += 1;
121            }
122        }
123    }
124
125    let mut queue: std::collections::VecDeque<usize> = in_degree
126        .iter()
127        .enumerate()
128        .filter(|(_, &d)| d == 0)
129        .map(|(i, _)| i)
130        .collect();
131
132    let mut order = Vec::with_capacity(n);
133
134    while let Some(idx) = queue.pop_front() {
135        order.push(graph.tasks[idx].id);
136        for &neighbor in &adj[idx] {
137            in_degree[neighbor] -= 1;
138            if in_degree[neighbor] == 0 {
139                queue.push_back(neighbor);
140            }
141        }
142    }
143
144    if order.len() != n {
145        Err("Cycle detected in task graph".to_string())
146    } else {
147        Ok(order)
148    }
149}
150
151/// Execute tasks sequentially in topological order, calling `executor` for each.
152#[allow(dead_code)]
153pub fn execute_sequential(
154    graph: &mut TaskGraph,
155    executor: &mut dyn FnMut(usize, &str) -> Result<String, String>,
156) {
157    let order = match topological_order(graph) {
158        Ok(o) => o,
159        Err(e) => {
160            // Mark all as failed
161            for task in &mut graph.tasks {
162                task.status = TaskStatus::Failed(e.clone());
163            }
164            return;
165        }
166    };
167
168    for id in order {
169        let (name, deps_ok) = {
170            let task = match graph.tasks.iter().find(|t| t.id == id) {
171                Some(t) => t,
172                None => continue,
173            };
174            let deps_ok = task.dependencies.iter().all(|dep_id| {
175                graph
176                    .tasks
177                    .iter()
178                    .find(|t| t.id == *dep_id)
179                    .map(|t| t.status == TaskStatus::Completed)
180                    .unwrap_or(false)
181            });
182            (task.name.clone(), deps_ok)
183        };
184
185        if !deps_ok {
186            mark_failed(graph, id, "dependency failed".to_string());
187            continue;
188        }
189
190        if let Some(task) = graph.tasks.iter_mut().find(|t| t.id == id) {
191            task.status = TaskStatus::Running;
192        }
193
194        match executor(id, &name) {
195            Ok(result) => mark_complete(graph, id, Some(result)),
196            Err(err) => mark_failed(graph, id, err),
197        }
198    }
199}
200
201/// Total number of tasks in the graph.
202#[allow(dead_code)]
203pub fn task_count(graph: &TaskGraph) -> usize {
204    graph.tasks.len()
205}
206
207/// Number of completed tasks.
208#[allow(dead_code)]
209pub fn completed_count(graph: &TaskGraph) -> usize {
210    graph
211        .tasks
212        .iter()
213        .filter(|t| t.status == TaskStatus::Completed)
214        .count()
215}
216
217/// Number of failed tasks.
218#[allow(dead_code)]
219pub fn failed_count(graph: &TaskGraph) -> usize {
220    graph
221        .tasks
222        .iter()
223        .filter(|t| matches!(t.status, TaskStatus::Failed(_)))
224        .count()
225}
226
227/// Number of pending tasks.
228#[allow(dead_code)]
229pub fn pending_count(graph: &TaskGraph) -> usize {
230    graph
231        .tasks
232        .iter()
233        .filter(|t| t.status == TaskStatus::Pending)
234        .count()
235}
236
237/// Reset all tasks to Pending status.
238#[allow(dead_code)]
239pub fn reset_graph(graph: &mut TaskGraph) {
240    for task in &mut graph.tasks {
241        task.status = TaskStatus::Pending;
242        task.result = None;
243    }
244}
245
246/// Serialize the graph to a JSON string.
247#[allow(dead_code)]
248pub fn graph_to_json(graph: &TaskGraph) -> String {
249    let tasks_json: Vec<String> = graph
250        .tasks
251        .iter()
252        .map(|t| {
253            let status = match &t.status {
254                TaskStatus::Pending => "\"Pending\"".to_string(),
255                TaskStatus::Ready => "\"Ready\"".to_string(),
256                TaskStatus::Running => "\"Running\"".to_string(),
257                TaskStatus::Completed => "\"Completed\"".to_string(),
258                TaskStatus::Failed(e) => format!("{{\"Failed\":{e:?}}}"),
259            };
260            let deps: Vec<String> = t.dependencies.iter().map(|d| d.to_string()).collect();
261            let result = match &t.result {
262                Some(r) => format!("{r:?}"),
263                None => "null".to_string(),
264            };
265            format!(
266                r#"{{"id":{},"name":{:?},"dependencies":[{}],"status":{},"result":{},"priority":{}}}"#,
267                t.id,
268                t.name,
269                deps.join(","),
270                status,
271                result,
272                t.priority,
273            )
274        })
275        .collect();
276
277    format!(
278        r#"{{"tasks":[{}],"next_id":{}}}"#,
279        tasks_json.join(","),
280        graph.next_id
281    )
282}
283
284/// Return the length of the critical (longest dependency chain) path.
285#[allow(dead_code)]
286pub fn critical_path_length(graph: &TaskGraph) -> usize {
287    let id_to_idx: std::collections::HashMap<usize, usize> = graph
288        .tasks
289        .iter()
290        .enumerate()
291        .map(|(i, t)| (t.id, i))
292        .collect();
293
294    let order = match topological_order(graph) {
295        Ok(o) => o,
296        Err(_) => return 0,
297    };
298
299    let mut depth = vec![0usize; graph.tasks.len()];
300
301    for id in &order {
302        if let Some(&idx) = id_to_idx.get(id) {
303            let task = &graph.tasks[idx];
304            let max_dep_depth = task
305                .dependencies
306                .iter()
307                .filter_map(|dep_id| id_to_idx.get(dep_id))
308                .map(|&di| depth[di])
309                .max()
310                .unwrap_or(0);
311            depth[idx] = max_dep_depth + 1;
312        }
313    }
314
315    depth.into_iter().max().unwrap_or(0)
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    #[test]
323    fn test_new_task_graph_empty() {
324        let g = new_task_graph();
325        assert_eq!(task_count(&g), 0);
326    }
327
328    #[test]
329    fn test_add_task_returns_incremental_ids() {
330        let mut g = new_task_graph();
331        let a = add_task(&mut g, "A", vec![], 0);
332        let b = add_task(&mut g, "B", vec![], 0);
333        assert_eq!(a, 0);
334        assert_eq!(b, 1);
335        assert_eq!(task_count(&g), 2);
336    }
337
338    #[test]
339    fn test_get_ready_tasks_no_deps() {
340        let mut g = new_task_graph();
341        add_task(&mut g, "A", vec![], 0);
342        add_task(&mut g, "B", vec![], 0);
343        let ready = get_ready_tasks(&g);
344        assert_eq!(ready.len(), 2);
345    }
346
347    #[test]
348    fn test_get_ready_tasks_with_deps() {
349        let mut g = new_task_graph();
350        let a = add_task(&mut g, "A", vec![], 0);
351        add_task(&mut g, "B", vec![a], 0);
352        let ready = get_ready_tasks(&g);
353        assert_eq!(ready.len(), 1);
354        assert_eq!(ready[0], a);
355    }
356
357    #[test]
358    fn test_mark_complete() {
359        let mut g = new_task_graph();
360        let id = add_task(&mut g, "A", vec![], 0);
361        mark_complete(&mut g, id, Some("done".to_string()));
362        assert_eq!(completed_count(&g), 1);
363    }
364
365    #[test]
366    fn test_mark_failed() {
367        let mut g = new_task_graph();
368        let id = add_task(&mut g, "A", vec![], 0);
369        mark_failed(&mut g, id, "oops".to_string());
370        assert_eq!(failed_count(&g), 1);
371    }
372
373    #[test]
374    fn test_topological_order_linear() {
375        let mut g = new_task_graph();
376        let a = add_task(&mut g, "A", vec![], 0);
377        let b = add_task(&mut g, "B", vec![a], 0);
378        let c = add_task(&mut g, "C", vec![b], 0);
379        let order = topological_order(&g).expect("should succeed");
380        assert_eq!(order, vec![a, b, c]);
381    }
382
383    #[test]
384    fn test_topological_order_cycle_detected() {
385        let mut g = new_task_graph();
386        let a = add_task(&mut g, "A", vec![], 0);
387        let b = add_task(&mut g, "B", vec![a], 0);
388        // Manually create a cycle: A depends on B
389        g.tasks[0].dependencies.push(b);
390        let result = topological_order(&g);
391        assert!(result.is_err());
392    }
393
394    #[test]
395    fn test_execute_sequential_all_complete() {
396        let mut g = new_task_graph();
397        let a = add_task(&mut g, "A", vec![], 0);
398        let _b = add_task(&mut g, "B", vec![a], 0);
399        execute_sequential(&mut g, &mut |_, name| Ok(format!("done:{name}")));
400        assert_eq!(completed_count(&g), 2);
401    }
402
403    #[test]
404    fn test_execute_sequential_failure_propagates() {
405        let mut g = new_task_graph();
406        let a = add_task(&mut g, "A", vec![], 0);
407        add_task(&mut g, "B", vec![a], 0);
408        execute_sequential(&mut g, &mut |id, _| {
409            if id == 0 {
410                Err("fail".to_string())
411            } else {
412                Ok("ok".to_string())
413            }
414        });
415        assert_eq!(failed_count(&g), 2); // B fails due to dep
416    }
417
418    #[test]
419    fn test_pending_count() {
420        let mut g = new_task_graph();
421        add_task(&mut g, "A", vec![], 0);
422        add_task(&mut g, "B", vec![], 0);
423        assert_eq!(pending_count(&g), 2);
424    }
425
426    #[test]
427    fn test_reset_graph() {
428        let mut g = new_task_graph();
429        let id = add_task(&mut g, "A", vec![], 0);
430        mark_complete(&mut g, id, Some("result".to_string()));
431        reset_graph(&mut g);
432        assert_eq!(pending_count(&g), 1);
433        assert_eq!(completed_count(&g), 0);
434    }
435
436    #[test]
437    fn test_graph_to_json_contains_task_name() {
438        let mut g = new_task_graph();
439        add_task(&mut g, "MyTask", vec![], 5);
440        let json = graph_to_json(&g);
441        assert!(json.contains("MyTask"));
442        assert!(json.contains("priority"));
443    }
444
445    #[test]
446    fn test_critical_path_length_linear() {
447        let mut g = new_task_graph();
448        let a = add_task(&mut g, "A", vec![], 0);
449        let b = add_task(&mut g, "B", vec![a], 0);
450        add_task(&mut g, "C", vec![b], 0);
451        assert_eq!(critical_path_length(&g), 3);
452    }
453
454    #[test]
455    fn test_critical_path_length_empty() {
456        let g = new_task_graph();
457        assert_eq!(critical_path_length(&g), 0);
458    }
459
460    #[test]
461    fn test_critical_path_branching() {
462        let mut g = new_task_graph();
463        let a = add_task(&mut g, "A", vec![], 0);
464        let b = add_task(&mut g, "B", vec![a], 0);
465        add_task(&mut g, "C", vec![a], 0); // shorter branch
466        add_task(&mut g, "D", vec![b], 0); // longer branch
467                                           // Longest: A -> B -> D = 3
468        assert_eq!(critical_path_length(&g), 3);
469    }
470}