lellm_graph/
test_executor.rs1use std::sync::Arc;
8use std::time::Instant;
9
10use tokio_util::sync::CancellationToken;
11
12use crate::error::GraphError;
13use crate::event::{GraphExecution, GraphHandle};
14use crate::execution_engine::{ExecutionEngine, ExecutorState, NextAction};
15use crate::graph::Graph;
16use crate::ids::TraceId;
17use crate::node::{BarrierNode, ConditionNode, ExecutorOperation, FlowNode, LeafNode, NodeKind};
18use crate::state::{ExecutionEntry, GraphResult, State};
19
20pub struct SimpleExecutor {
26 max_steps: usize,
27}
28
29impl Default for SimpleExecutor {
30 fn default() -> Self {
31 Self { max_steps: 100 }
32 }
33}
34
35impl SimpleExecutor {
36 pub fn new(max_steps: usize) -> Self {
37 Self { max_steps }
38 }
39
40 pub async fn execute(
41 &self,
42 graph: Arc<Graph>,
43 state: State,
44 ) -> Result<GraphResult, GraphError> {
45 let trace_id = TraceId::new();
46 let start_time = Instant::now();
47 let mut execution_log: Vec<ExecutionEntry> = Vec::new();
48
49 let cancel = CancellationToken::new();
50 let mut engine = ExecutionEngine::new(state, None, cancel);
51
52 let mut current = graph.start_node().to_string();
54 let mut step: usize = 0;
55
56 loop {
57 step += 1;
58 if step > self.max_steps {
59 return Err(GraphError::Terminal(
60 crate::error::TerminalError::StepsExceeded {
61 limit: self.max_steps,
62 },
63 ));
64 }
65
66 let node = match graph.nodes.get(¤t) {
67 Some(n) => n,
68 None => {
69 return Err(GraphError::Terminal(
70 crate::error::TerminalError::NodeNotFound(current.clone()),
71 ));
72 }
73 };
74
75 let node_name = current.clone();
76 let node_start = Instant::now();
77
78 match node {
80 NodeKind::Task(n) => {
81 let mut ctx = engine.build_node_context();
82 n.execute(&mut ctx).await?;
83 }
84 NodeKind::Condition(n) => {
85 let mut ctx = engine.build_leaf_context();
86 <ConditionNode as LeafNode>::execute(n, &mut ctx).await?;
87 }
88 NodeKind::Barrier(n) => {
89 let mut ctx = engine.build_leaf_context();
90 <BarrierNode as LeafNode>::execute(n, &mut ctx).await?;
91 }
92 NodeKind::External(n) => {
93 let mut ctx = engine.build_node_context();
94 n.execute(&mut ctx).await?;
95 }
96 NodeKind::ExternalLeaf(n) => {
97 let mut ctx = engine.build_leaf_context();
98 n.execute(&mut ctx).await?;
99 }
100 NodeKind::Parallel(p) => {
101 p.execute(&mut engine).await?;
103 }
104 }
105
106 let node_duration = node_start.elapsed();
107
108 execution_log.push(ExecutionEntry {
109 step,
110 node_name,
111 start_time: node_start,
112 end_time: start_time.checked_add(node_duration).unwrap_or(start_time),
113 success: true,
114 error: None,
115 });
116
117 engine.commit();
120
121 let (next_action, _signal) = engine.take_control();
123
124 match next_action {
126 NextAction::End => break,
127 NextAction::Goto(target) => {
128 current = target;
129 }
130 NextAction::Next => {
131 if current == graph.end_node() {
132 break;
133 }
134 current = graph.resolve_next_inline(¤t, engine.state())?;
135 }
136 }
137 }
138
139 let duration = start_time.elapsed();
140 let final_state = engine.into_state();
141
142 Ok(GraphResult {
143 trace_id,
144 state: final_state,
145 execution_log,
146 duration,
147 trace: None,
148 })
149 }
150
151 pub fn execute_stream(&self, graph: Arc<Graph>, state: State) -> GraphExecution<State> {
152 self.execute_stream_with_restore(graph, state, None)
153 }
154
155 pub fn execute_stream_with_restore(
156 &self,
157 graph: Arc<Graph>,
158 state: State,
159 restore_from: Option<crate::checkpoint::Checkpoint<State>>,
160 ) -> GraphExecution<State> {
161 let (event_tx, event_rx) = tokio::sync::mpsc::channel(256);
162 let (decision_tx, decision_rx) = tokio::sync::mpsc::channel(256);
163 let (cancel_tx, cancel_rx) = tokio::sync::mpsc::channel(1);
164
165 let trace_id = TraceId::new();
166 let cancel = CancellationToken::new();
167
168 let handle = GraphHandle::new(decision_tx, cancel_tx);
169
170 tokio::spawn(crate::execution_loop::run_execution_loop(
171 graph,
172 state,
173 self.max_steps,
174 trace_id,
175 event_tx,
176 decision_rx,
177 cancel_rx,
178 cancel,
179 None, None, restore_from,
182 ));
183
184 GraphExecution {
185 stream: event_rx,
186 handle,
187 }
188 }
189}