1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use async_trait::async_trait;
use dagrs::node::action::Action;
use dagrs::node::default_node::DefaultNode;
use dagrs::node::loop_node::{CountLoopCondition, LoopNode};
use dagrs::{EnvVar, Graph, InChannels, Node, NodeTable, OutChannels, Output};
use std::sync::{Arc, Mutex};
/// Action that increments a counter
#[derive(Clone)]
struct IncAction {
counter: Arc<Mutex<usize>>,
}
#[async_trait]
impl Action for IncAction {
/// Increment the counter each time this action is run.
async fn run(&self, _: &mut InChannels, _: &mut OutChannels, _: Arc<EnvVar>) -> Output {
let mut c = self.counter.lock().unwrap();
*c += 1;
Output::empty()
}
}
#[tokio::test]
async fn test_loop_reset() {
let mut graph = Graph::new();
let mut table = NodeTable::new();
let counter = Arc::new(Mutex::new(0));
// Node A: Counter++
let node_a = DefaultNode::with_action(
"A".to_string(),
IncAction {
counter: counter.clone(),
},
&mut table,
);
let id_a = node_a.id();
// Loop: runs A 3 times
let loop_node = LoopNode::new(
"Loop".to_string(),
id_a,
CountLoopCondition::new(3),
&mut table,
);
let id_loop = loop_node.id();
graph.add_node(node_a).unwrap();
graph.add_node(loop_node).unwrap();
// A -> Loop
graph.add_edge(id_a, vec![id_loop]).unwrap();
// # Dynamic Jump Mechanism
//
// We do NOT add a static edge from Loop -> A (e.g., `graph.add_edge(id_loop, vec![id_a])`).
//
// ## Reason 1: Cycle Avoidance
// Adding a static edge would create a cycle (A -> Loop -> A) in the dependency graph,
// causing the topological sort to fail with a cycle detection error.
//
// ## Reason 2: Runtime Control Flow
// The "jump" is handled dynamically at runtime:
// 1. The `LoopNode` returns a `FlowControl::Loop` instruction containing the target node ID.
// 2. The Graph executor interprets this instruction.
// 3. It looks up the block index of the target node in `node_block_map`.
// 4. It modifies the Program Counter (pc) to jump backwards to that block.
//
// This design allows for cyclic execution flows on top of an acyclic static graph structure,
// maintaining the benefits of topological ordering while supporting loop semantics.
println!("First Run");
graph.async_start().await.unwrap();
// A runs once initially + 3 times loop = 4 times?
// Wait, let's check CountLoopCondition logic.
// LoopNode calls should_continue. If true, it returns Loop(target).
// Initial run: Graph executes LoopNode (if it's in topological order).
// Actually loop logic depends on graph structure.
// Let's assume standard loop subgraph behavior or just simple loop.
// dagrs handles loops by expanding them or by jump instructions.
// CountLoopCondition: new(3) -> 0, 1, 2 < 3. Returns true 3 times.
// Check counter.
// The exact count depends on how loop node is scheduled.
// Assuming it works as intended in first run.
let first_run_count = *counter.lock().unwrap();
println!("First run count: {}", first_run_count);
assert_eq!(
first_run_count, 4,
"Counter should be 4 (1 initial + 3 loops)"
);
// Reset
graph.reset().await.unwrap();
*counter.lock().unwrap() = 0;
println!("Second Run");
graph.async_start().await.unwrap();
let second_run_count = *counter.lock().unwrap();
println!("Second run count: {}", second_run_count);
// Verify both correctness AND consistency:
// - First assertion ensures second run produces the expected count (4)
// - This catches bugs where loops might be broken in both runs
assert_eq!(
second_run_count, 4,
"Counter should be 4 after reset (1 initial + 3 loops)"
);
// - Second assertion ensures consistency between runs
assert_eq!(
first_run_count, second_run_count,
"Loop execution count should be consistent after reset"
);
}