lellm_graph/
node_context.rs1use tokio_util::sync::CancellationToken;
10
11use crate::branch_state::BranchState;
12use crate::event::FlowEvent;
13use crate::state::State;
14use crate::stream_chunk::StreamChunk;
15use crate::stream_emitter::StreamSink;
16use crate::workflow_state::WorkflowState;
17
18#[derive(Debug, Clone)]
22pub enum ExecutionSignal {
23 Pause {
25 barrier_id: crate::event::BarrierId,
26 timeout: Option<std::time::Duration>,
27 },
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
34pub enum NextAction {
35 Next,
37 Goto(String),
39 End,
41}
42
43#[derive(Debug, Default)]
47pub struct ExecutionControl {
48 next: Option<NextAction>,
49 signal: Option<ExecutionSignal>,
50}
51
52impl ExecutionControl {
53 pub fn new() -> Self {
54 Self::default()
55 }
56
57 pub fn goto(&mut self, target: impl Into<String>) {
59 self.next = Some(NextAction::Goto(target.into()));
60 }
61
62 pub fn end(&mut self) {
64 self.next = Some(NextAction::End);
65 }
66
67 pub fn pause(
69 &mut self,
70 barrier_id: crate::event::BarrierId,
71 timeout: Option<std::time::Duration>,
72 ) {
73 self.signal = Some(ExecutionSignal::Pause {
74 barrier_id,
75 timeout,
76 });
77 }
78
79 pub fn take(&mut self) -> (NextAction, Option<ExecutionSignal>) {
81 let next = self.next.take().unwrap_or(NextAction::Next);
82 let signal = self.signal.take();
83 (next, signal)
84 }
85}
86
87#[derive(Debug, Clone, Default)]
91pub struct NodeMetadata {
92 pub token_cost: f64,
94 pub has_side_effects: bool,
96}
97
98pub struct NodeContext<'a, S: WorkflowState = State> {
117 state: &'a mut S,
119 branch: &'a mut BranchState,
121 stream: Option<&'a dyn StreamSink>,
123 cancel: CancellationToken,
125 control: ExecutionControl,
127 metadata: NodeMetadata,
129 effects: Vec<S::Effect>,
131 flow_events: Vec<FlowEvent>,
133}
134
135impl<'a, S: WorkflowState> NodeContext<'a, S> {
136 pub fn new(
138 state: &'a mut S,
139 branch: &'a mut BranchState,
140 stream: Option<&'a dyn StreamSink>,
141 cancel: CancellationToken,
142 ) -> Self {
143 Self {
144 state,
145 branch,
146 stream,
147 cancel,
148 control: ExecutionControl::new(),
149 metadata: NodeMetadata::default(),
150 effects: Vec::new(),
151 flow_events: Vec::new(),
152 }
153 }
154
155 pub fn state(&self) -> &S {
157 self.state
158 }
159
160 pub fn state_mut(&mut self) -> &mut S {
162 self.state
163 }
164
165 pub fn branch(&self) -> &BranchState {
167 self.branch
168 }
169
170 pub fn branch_mut(&mut self) -> &mut BranchState {
172 self.branch
173 }
174
175 pub fn emit(&self, chunk: StreamChunk) {
179 if let Some(stream) = &self.stream {
180 stream.emit(chunk);
181 }
182 }
183
184 pub fn emit_flow_event(&mut self, event: FlowEvent) {
186 self.flow_events.push(event);
187 }
188
189 pub fn is_cancelled(&self) -> bool {
193 self.cancel.is_cancelled()
194 }
195
196 pub fn cancel_token(&self) -> &CancellationToken {
198 &self.cancel
199 }
200
201 pub fn goto(&mut self, target: impl Into<String>) {
205 self.control.goto(target);
206 }
207
208 pub fn end(&mut self) {
210 self.control.end();
211 }
212
213 pub fn pause(
215 &mut self,
216 barrier_id: crate::event::BarrierId,
217 timeout: Option<std::time::Duration>,
218 ) {
219 self.control.pause(barrier_id, timeout);
220 }
221
222 pub fn set_token_cost(&mut self, cost: f64) {
226 self.metadata.token_cost = cost;
227 }
228
229 pub fn set_has_side_effects(&mut self) {
231 self.metadata.has_side_effects = true;
232 }
233
234 pub fn emit_effect(&mut self, effect: S::Effect) {
240 self.effects.push(effect);
241 }
242
243 pub fn consume_effects(&mut self) -> Vec<S::Effect> {
245 std::mem::take(&mut self.effects)
246 }
247
248 pub fn effects_len(&self) -> usize {
250 self.effects.len()
251 }
252
253 pub fn take_control(&mut self) -> (NextAction, Option<ExecutionSignal>) {
257 self.control.take()
258 }
259
260 pub fn take_metadata(&mut self) -> NodeMetadata {
262 std::mem::take(&mut self.metadata)
263 }
264
265 pub fn stream(&self) -> Option<&'a dyn StreamSink> {
267 self.stream
268 }
269
270 pub fn take_flow_events(&mut self) -> Vec<FlowEvent> {
272 std::mem::take(&mut self.flow_events)
273 }
274}