flow_graph_interpreter/interpreter/event_loop/
state.rs1use std::collections::HashMap;
2use std::sync::atomic::AtomicBool;
3
4use flow_graph::{PortDirection, PortReference};
5use tracing::Span;
6use uuid::Uuid;
7use wick_packet::{PacketPayload, PacketStream};
8
9use super::EventLoop;
10use crate::interpreter::channel::{CallComplete, InterpreterDispatchChannel};
11use crate::interpreter::executor::context::{ExecutionContext, TxState};
12use crate::interpreter::executor::error::ExecutionError;
13use crate::InterpreterOptions;
14
15#[derive(Debug)]
16pub struct State {
17 context_map: ContextMap,
18 channel: InterpreterDispatchChannel,
19}
20
21impl State {
22 pub(super) fn new(channel: InterpreterDispatchChannel) -> Self {
23 Self {
24 context_map: ContextMap::default(),
25 channel,
26 }
27 }
28
29 fn get_ctx(&self, uuid: &Uuid) -> Option<&(ExecutionContext, Metadata)> {
30 self.context_map.get(uuid)
31 }
32
33 pub const fn invocations(&self) -> &ContextMap {
34 &self.context_map
35 }
36
37 pub(super) fn run_cleanup(&mut self) -> Result<(), ExecutionError> {
38 let mut cleanup = Vec::new();
39 for (id, (ctx, meta)) in self.context_map.iter() {
40 let last_update = ctx.last_access().elapsed().unwrap();
41
42 let active_instances = ctx.active_instances().iter().map(|i| i.id()).collect::<Vec<_>>();
43 if last_update > EventLoop::SLOW_TX_TIMEOUT {
44 if active_instances.is_empty() && ctx.done() {
45 cleanup.push(*id);
46
47 continue;
48 }
49
50 ctx.in_scope(|| {
51 if !meta.have_warned() {
52 warn!(%id, ?active_instances, "slow invocation: no packet received in a long time");
53 meta.set_have_warned();
54 }
55 });
56 }
57 ctx.in_scope(|| {
58 if last_update > EventLoop::STALLED_TX_TIMEOUT {
59 match ctx.check_stalled() {
60 Ok(TxState::Finished) => {
61 cleanup.push(*id);
63 }
64 Ok(TxState::OutputPending) => {
65 error!(%id, "invocation reached timeout while still waiting for output data");
66 cleanup.push(*id);
67 }
68 Ok(TxState::CompleteWithTasksPending) => {
69 error!(%id, "invocation reached timeout while still waiting for tasks to complete");
70 cleanup.push(*id);
71 }
72 Err(error) => {
73 error!(%error, %id, "stalled invocation generated error determining hung state");
74 }
75 }
76 }
77 });
78 }
79 for ctx_id in cleanup {
80 self.context_map.remove(&ctx_id);
81 }
82 Ok(())
83 }
84
85 fn get_mut(&mut self, ctx_id: &Uuid) -> Option<&mut ExecutionContext> {
86 self.context_map.get_mut(ctx_id)
87 }
88
89 pub(super) async fn handle_exec_start(
90 &mut self,
91 mut ctx: ExecutionContext,
92 stream: PacketStream,
93 options: &InterpreterOptions,
94 ) -> Result<(), ExecutionError> {
95 match ctx.start(options, stream).await {
96 Ok(_) => {
97 self.context_map.init_tx(ctx.id(), ctx);
98 Ok(())
99 }
100 Err(e) => Err(e),
101 }
102 }
103
104 #[allow(clippy::unused_async)]
105 pub(super) async fn handle_exec_done(&mut self, ctx_id: Uuid) -> Result<(), ExecutionError> {
106 let is_done = if let Some(ctx) = self.get_mut(&ctx_id) {
107 let _ = ctx.finish()?;
108
109 ctx.in_scope(|| {
110 if ctx.active_instances().is_empty() {
111 debug!(%ctx_id,"execution:done");
112 true
113 } else {
114 false
115 }
116 })
117 } else {
118 false
119 };
120 if is_done {
121 self.context_map.remove(&ctx_id);
122 }
123 Ok(())
124 }
125
126 #[allow(clippy::unused_async)]
127 async fn handle_input_data(&mut self, ctx_id: Uuid, port: PortReference, span: &Span) -> Result<(), ExecutionError> {
128 let Some((ctx, _)) = self.get_ctx(&ctx_id) else {
129 span.in_scope(||{debug!(
130 port = %port, %ctx_id, "still receiving upstream data for invocation that has already been completed, this may be due to a component panic or premature close"
131 );});
132 return Ok(());
133 };
134
135 let graph = ctx.schematic();
136 let port_name = graph.get_port_name(&port);
137 let instance = ctx.instance(port.node_index());
138
139 ctx
140 .stats
141 .mark(format!("input:{}:{}:ready", port.node_index(), port.port_index()));
142
143 let is_schematic_output = port.node_index() == graph.output().index();
144
145 if is_schematic_output {
146 span.in_scope(|| {
147 debug!(
148 operation = %instance,
149 port = port_name,
150 "handling schematic output"
151 );
152 });
153
154 ctx.handle_schematic_output()?;
155 } else if let Some(packet) = ctx.take_instance_input(&port) {
156 span.in_scope(|| {
157 if packet.is_error() {
158 warn!(
159 operation = %instance,
160 port = port_name,
161 payload = ?packet,
162 "handling port input"
163 );
164 } else {
165 debug!(
166 operation = %instance,
167 port = port_name,
168 payload = ?packet,
169 "handling port input"
170 );
171 }
172 });
173 let fut = ctx.push_packets(port.node_index(), vec![packet]);
174 fut.await?;
175 }
176 Ok(())
177 }
178
179 #[allow(clippy::unused_async)]
180 async fn handle_output_data(&mut self, ctx_id: Uuid, port: PortReference, span: &Span) -> Result<(), ExecutionError> {
181 let Some((ctx, _)) = self.get_ctx(&ctx_id) else {
182 span.in_scope(||{
183 debug!(
184 port = %port, %ctx_id, "still receiving downstream data for invocation that has already been completed, this may be due to a component panic or premature close")
185 ;});
186 return Ok(());
187 };
188
189 let graph = ctx.schematic();
190 let port_name = graph.get_port_name(&port);
191
192 let instance = ctx.instance(port.node_index());
193
194 ctx
195 .stats
196 .mark(format!("output:{}:{}:ready", port.node_index(), port.port_index()));
197
198 let Some(packet) = ctx.take_instance_output(&port) else {
199 panic!("got port_data message with no payload to act on, port: {:?}", port);
200 };
201
202 let connections = span.in_scope(|| {
203 if packet.is_error() {
204 warn!(
205 operation = %instance,
206 port = port_name,
207 payload = ?packet,
208 "handling port output"
209 );
210 } else {
211 debug!(
212 operation = %instance,
213 port = port_name,
214 payload = ?packet,
215 "handling port output"
216 );
217 }
218 graph.get_port(&port).connections()
219 });
220 for index in connections {
221 span.in_scope(|| {
222 let connection = &graph.connections()[*index];
223 let downport = *connection.to();
224 let name = graph.get_port_name(&downport);
225
226 let channel = self.channel.clone();
227 let downstream_instance = ctx.instance(downport.node_index()).clone();
228 let message = packet.clone().to_port(name);
229 trace!(%connection, "delivering packet to downstream",);
230 downstream_instance.buffer_in(&downport, message);
231 channel.dispatch_data(ctx_id, downport);
232 });
233 }
234
235 Ok(())
236 }
237
238 pub(super) async fn handle_port_data(
239 &mut self,
240 ctx_id: Uuid,
241 port: PortReference,
242 span: &Span,
243 ) -> Result<(), ExecutionError> {
244 match port.direction() {
245 PortDirection::Out => self.handle_output_data(ctx_id, port, span).await,
246 PortDirection::In => self.handle_input_data(ctx_id, port, span).await,
247 }
248 }
249
250 #[allow(clippy::unused_async)]
251 pub(super) async fn handle_call_complete(&self, ctx_id: Uuid, data: CallComplete) -> Result<(), ExecutionError> {
252 let Some((ctx, _)) = self.get_ctx(&ctx_id) else {
253 debug!(
256 ?data,
257 %ctx_id, "tried to cleanup missing invocation, this may be due to a component panic or premature close"
258 );
259 return Ok(());
260 };
261 let instance = ctx.instance(data.index);
262 debug!(operation = instance.id(), entity = %instance.entity(), "call complete");
263
264 if let Some(PacketPayload::Err(err)) = data.err {
265 warn!(?err, "op:error");
266 ctx.handle_op_err(data.index, &err)?;
269 }
272
273 Ok(())
274 }
275}
276
277#[derive(Debug)]
278struct Metadata {
279 have_warned_long_tx: AtomicBool,
280}
281
282impl Default for Metadata {
283 fn default() -> Self {
284 Self {
285 have_warned_long_tx: AtomicBool::new(false),
286 }
287 }
288}
289
290impl Metadata {
291 fn have_warned(&self) -> bool {
292 self.have_warned_long_tx.load(std::sync::atomic::Ordering::Relaxed)
293 }
294
295 fn set_have_warned(&self) {
296 self
297 .have_warned_long_tx
298 .store(true, std::sync::atomic::Ordering::Relaxed);
299 }
300}
301
302#[derive(Debug, Default)]
303#[must_use]
304pub struct ContextMap(HashMap<Uuid, (ExecutionContext, Metadata)>);
305
306impl ContextMap {
307 pub(crate) fn init_tx(&mut self, uuid: Uuid, ctx: ExecutionContext) {
308 self.0.insert(uuid, (ctx, Metadata::default()));
309 }
310
311 fn get(&self, uuid: &Uuid) -> Option<&(ExecutionContext, Metadata)> {
312 self.0.get(uuid).map(|ctx| {
313 ctx.0.update_last_access();
314 ctx
315 })
316 }
317
318 fn get_mut(&mut self, uuid: &Uuid) -> Option<&mut ExecutionContext> {
319 self.0.get_mut(uuid).map(|ctx| {
320 ctx.0.update_last_access();
321 &mut ctx.0
322 })
323 }
324
325 fn remove(&mut self, uuid: &Uuid) -> Option<(ExecutionContext, Metadata)> {
326 self.0.remove(uuid)
327 }
328
329 fn iter(&self) -> impl Iterator<Item = (&Uuid, &(ExecutionContext, Metadata))> {
330 self.0.iter()
331 }
332}