flow_graph_interpreter/interpreter/event_loop/
state.rs

1use std::collections::HashMap;
2use std::sync::atomic::AtomicBool;
3
4use flow_graph::{PortDirection, PortReference};
5use tracing::Span;
6use uuid::Uuid;
7use wick_packet::{PacketPayload, PacketStream};
8
9use super::EventLoop;
10use crate::interpreter::channel::{CallComplete, InterpreterDispatchChannel};
11use crate::interpreter::executor::context::{ExecutionContext, TxState};
12use crate::interpreter::executor::error::ExecutionError;
13use crate::InterpreterOptions;
14
15#[derive(Debug)]
16pub struct State {
17  context_map: ContextMap,
18  channel: InterpreterDispatchChannel,
19}
20
21impl State {
22  pub(super) fn new(channel: InterpreterDispatchChannel) -> Self {
23    Self {
24      context_map: ContextMap::default(),
25      channel,
26    }
27  }
28
29  fn get_ctx(&self, uuid: &Uuid) -> Option<&(ExecutionContext, Metadata)> {
30    self.context_map.get(uuid)
31  }
32
33  pub const fn invocations(&self) -> &ContextMap {
34    &self.context_map
35  }
36
37  pub(super) fn run_cleanup(&mut self) -> Result<(), ExecutionError> {
38    let mut cleanup = Vec::new();
39    for (id, (ctx, meta)) in self.context_map.iter() {
40      let last_update = ctx.last_access().elapsed().unwrap();
41
42      let active_instances = ctx.active_instances().iter().map(|i| i.id()).collect::<Vec<_>>();
43      if last_update > EventLoop::SLOW_TX_TIMEOUT {
44        if active_instances.is_empty() && ctx.done() {
45          cleanup.push(*id);
46
47          continue;
48        }
49
50        ctx.in_scope(|| {
51          if !meta.have_warned() {
52            warn!(%id, ?active_instances, "slow invocation: no packet received in a long time");
53            meta.set_have_warned();
54          }
55        });
56      }
57      ctx.in_scope(|| {
58        if last_update > EventLoop::STALLED_TX_TIMEOUT {
59          match ctx.check_stalled() {
60            Ok(TxState::Finished) => {
61              // execution has completed its output and isn't generating more data, clean it up.
62              cleanup.push(*id);
63            }
64            Ok(TxState::OutputPending) => {
65              error!(%id, "invocation reached timeout while still waiting for output data");
66              cleanup.push(*id);
67            }
68            Ok(TxState::CompleteWithTasksPending) => {
69              error!(%id, "invocation reached timeout while still waiting for tasks to complete");
70              cleanup.push(*id);
71            }
72            Err(error) => {
73              error!(%error, %id, "stalled invocation generated error determining hung state");
74            }
75          }
76        }
77      });
78    }
79    for ctx_id in cleanup {
80      self.context_map.remove(&ctx_id);
81    }
82    Ok(())
83  }
84
85  fn get_mut(&mut self, ctx_id: &Uuid) -> Option<&mut ExecutionContext> {
86    self.context_map.get_mut(ctx_id)
87  }
88
89  pub(super) async fn handle_exec_start(
90    &mut self,
91    mut ctx: ExecutionContext,
92    stream: PacketStream,
93    options: &InterpreterOptions,
94  ) -> Result<(), ExecutionError> {
95    match ctx.start(options, stream).await {
96      Ok(_) => {
97        self.context_map.init_tx(ctx.id(), ctx);
98        Ok(())
99      }
100      Err(e) => Err(e),
101    }
102  }
103
104  #[allow(clippy::unused_async)]
105  pub(super) async fn handle_exec_done(&mut self, ctx_id: Uuid) -> Result<(), ExecutionError> {
106    let is_done = if let Some(ctx) = self.get_mut(&ctx_id) {
107      let _ = ctx.finish()?;
108
109      ctx.in_scope(|| {
110        if ctx.active_instances().is_empty() {
111          debug!(%ctx_id,"execution:done");
112          true
113        } else {
114          false
115        }
116      })
117    } else {
118      false
119    };
120    if is_done {
121      self.context_map.remove(&ctx_id);
122    }
123    Ok(())
124  }
125
126  #[allow(clippy::unused_async)]
127  async fn handle_input_data(&mut self, ctx_id: Uuid, port: PortReference, span: &Span) -> Result<(), ExecutionError> {
128    let Some((ctx, _)) = self.get_ctx(&ctx_id) else {
129      span.in_scope(||{debug!(
130          port = %port, %ctx_id, "still receiving upstream data for invocation that has already been completed, this may be due to a component panic or premature close"
131        );});
132      return Ok(());
133    };
134
135    let graph = ctx.schematic();
136    let port_name = graph.get_port_name(&port);
137    let instance = ctx.instance(port.node_index());
138
139    ctx
140      .stats
141      .mark(format!("input:{}:{}:ready", port.node_index(), port.port_index()));
142
143    let is_schematic_output = port.node_index() == graph.output().index();
144
145    if is_schematic_output {
146      span.in_scope(|| {
147        debug!(
148          operation = %instance,
149          port = port_name,
150          "handling schematic output"
151        );
152      });
153
154      ctx.handle_schematic_output()?;
155    } else if let Some(packet) = ctx.take_instance_input(&port) {
156      span.in_scope(|| {
157        if packet.is_error() {
158          warn!(
159            operation = %instance,
160            port = port_name,
161            payload = ?packet,
162            "handling port input"
163          );
164        } else {
165          debug!(
166            operation = %instance,
167            port = port_name,
168            payload = ?packet,
169            "handling port input"
170          );
171        }
172      });
173      let fut = ctx.push_packets(port.node_index(), vec![packet]);
174      fut.await?;
175    }
176    Ok(())
177  }
178
179  #[allow(clippy::unused_async)]
180  async fn handle_output_data(&mut self, ctx_id: Uuid, port: PortReference, span: &Span) -> Result<(), ExecutionError> {
181    let Some((ctx, _)) = self.get_ctx(&ctx_id) else {
182      span.in_scope(||{
183          debug!(
184          port = %port, %ctx_id, "still receiving downstream data for invocation that has already been completed, this may be due to a component panic or premature close")
185        ;});
186      return Ok(());
187    };
188
189    let graph = ctx.schematic();
190    let port_name = graph.get_port_name(&port);
191
192    let instance = ctx.instance(port.node_index());
193
194    ctx
195      .stats
196      .mark(format!("output:{}:{}:ready", port.node_index(), port.port_index()));
197
198    let Some(packet) = ctx.take_instance_output(&port) else {
199      panic!("got port_data message with no payload to act on, port: {:?}", port);
200    };
201
202    let connections = span.in_scope(|| {
203      if packet.is_error() {
204        warn!(
205          operation = %instance,
206          port = port_name,
207          payload = ?packet,
208          "handling port output"
209        );
210      } else {
211        debug!(
212          operation = %instance,
213          port = port_name,
214          payload = ?packet,
215          "handling port output"
216        );
217      }
218      graph.get_port(&port).connections()
219    });
220    for index in connections {
221      span.in_scope(|| {
222        let connection = &graph.connections()[*index];
223        let downport = *connection.to();
224        let name = graph.get_port_name(&downport);
225
226        let channel = self.channel.clone();
227        let downstream_instance = ctx.instance(downport.node_index()).clone();
228        let message = packet.clone().to_port(name);
229        trace!(%connection, "delivering packet to downstream",);
230        downstream_instance.buffer_in(&downport, message);
231        channel.dispatch_data(ctx_id, downport);
232      });
233    }
234
235    Ok(())
236  }
237
238  pub(super) async fn handle_port_data(
239    &mut self,
240    ctx_id: Uuid,
241    port: PortReference,
242    span: &Span,
243  ) -> Result<(), ExecutionError> {
244    match port.direction() {
245      PortDirection::Out => self.handle_output_data(ctx_id, port, span).await,
246      PortDirection::In => self.handle_input_data(ctx_id, port, span).await,
247    }
248  }
249
250  #[allow(clippy::unused_async)]
251  pub(super) async fn handle_call_complete(&self, ctx_id: Uuid, data: CallComplete) -> Result<(), ExecutionError> {
252    let Some((ctx, _)) = self.get_ctx(&ctx_id) else {
253      // This is a warning, not an error, because it's possible the transaction completes OK, it's just that a
254      // component is misbehaving.
255      debug!(
256        ?data,
257        %ctx_id, "tried to cleanup missing invocation, this may be due to a component panic or premature close"
258      );
259      return Ok(());
260    };
261    let instance = ctx.instance(data.index);
262    debug!(operation = instance.id(), entity = %instance.entity(), "call complete");
263
264    if let Some(PacketPayload::Err(err)) = data.err {
265      warn!(?err, "op:error");
266      // If the call contains an error, then the component panicked.
267      // We need to propagate the error downward...
268      ctx.handle_op_err(data.index, &err)?;
269      // ...and clean up the call.
270      // instance.handle_stream_complete(CompletionStatus::Error)?;
271    }
272
273    Ok(())
274  }
275}
276
277#[derive(Debug)]
278struct Metadata {
279  have_warned_long_tx: AtomicBool,
280}
281
282impl Default for Metadata {
283  fn default() -> Self {
284    Self {
285      have_warned_long_tx: AtomicBool::new(false),
286    }
287  }
288}
289
290impl Metadata {
291  fn have_warned(&self) -> bool {
292    self.have_warned_long_tx.load(std::sync::atomic::Ordering::Relaxed)
293  }
294
295  fn set_have_warned(&self) {
296    self
297      .have_warned_long_tx
298      .store(true, std::sync::atomic::Ordering::Relaxed);
299  }
300}
301
302#[derive(Debug, Default)]
303#[must_use]
304pub struct ContextMap(HashMap<Uuid, (ExecutionContext, Metadata)>);
305
306impl ContextMap {
307  pub(crate) fn init_tx(&mut self, uuid: Uuid, ctx: ExecutionContext) {
308    self.0.insert(uuid, (ctx, Metadata::default()));
309  }
310
311  fn get(&self, uuid: &Uuid) -> Option<&(ExecutionContext, Metadata)> {
312    self.0.get(uuid).map(|ctx| {
313      ctx.0.update_last_access();
314      ctx
315    })
316  }
317
318  fn get_mut(&mut self, uuid: &Uuid) -> Option<&mut ExecutionContext> {
319    self.0.get_mut(uuid).map(|ctx| {
320      ctx.0.update_last_access();
321      &mut ctx.0
322    })
323  }
324
325  fn remove(&mut self, uuid: &Uuid) -> Option<(ExecutionContext, Metadata)> {
326    self.0.remove(uuid)
327  }
328
329  fn iter(&self) -> impl Iterator<Item = (&Uuid, &(ExecutionContext, Metadata))> {
330    self.0.iter()
331  }
332}