flow_graph_interpreter/interpreter/
event_loop.rs

1pub(crate) mod state;
2
3use std::time::Duration;
4
5use parking_lot::Mutex;
6use tokio::task::JoinHandle;
7use tracing::Span;
8use tracing_futures::Instrument;
9
10use super::channel::{Event, EventKind, InterpreterChannel, InterpreterDispatchChannel};
11use super::error::Error;
12use super::InterpreterOptions;
13use crate::interpreter::event_loop::state::State;
14use crate::interpreter::executor::error::ExecutionError;
15
16#[derive(Debug)]
17pub(crate) struct EventLoop {
18  channel: Option<InterpreterChannel>,
19  dispatcher: InterpreterDispatchChannel,
20  task: Mutex<Option<JoinHandle<Result<(), ExecutionError>>>>,
21  span: Span,
22}
23
24impl EventLoop {
25  pub(crate) const WAKE_TIMEOUT: Duration = Duration::from_millis(500);
26  pub(crate) const STALLED_TX_TIMEOUT: Duration = Duration::from_secs(60 * 5);
27  pub(crate) const SLOW_TX_TIMEOUT: Duration = Duration::from_secs(15);
28
29  pub(super) fn new(channel: InterpreterChannel, span: &Span) -> Self {
30    let event_span = info_span!("event_loop");
31    event_span.follows_from(span);
32    let dispatcher = channel.dispatcher(Some(event_span.clone()));
33
34    Self {
35      channel: Some(channel),
36      dispatcher,
37      task: Mutex::new(None),
38      span: event_span,
39    }
40  }
41
42  pub(super) async fn start(&mut self, options: InterpreterOptions, observer: Option<Box<dyn Observer + Send + Sync>>) {
43    let channel = self.channel.take().unwrap();
44
45    let span = self.span.clone();
46    let handle = tokio::spawn(async move { event_loop(channel, options, observer, span).await });
47    let mut lock = self.task.lock();
48    lock.replace(handle);
49  }
50
51  fn steal_task(&self) -> Option<JoinHandle<Result<(), ExecutionError>>> {
52    let mut lock = self.task.lock();
53    lock.take()
54  }
55
56  pub(super) async fn shutdown(&self) -> Result<(), Error> {
57    self.span.in_scope(|| trace!("shutting down event loop"));
58    let task = self.steal_task();
59    match task {
60      Some(task) => {
61        self.dispatcher.dispatch_close(None);
62
63        let timeout = std::time::Duration::from_secs(2);
64        let result = tokio::time::timeout(timeout, task).await;
65        match result.map_err(|_| Error::ShutdownTimeout)? {
66          Ok(Err(e)) => {
67            return Err(Error::Shutdown(e.to_string()));
68          }
69          Err(e) => {
70            self.span.in_scope(|| warn!(%e, "event loop panicked"));
71            return Err(Error::EventLoopPanic(e.to_string()));
72          }
73          Ok(_) => {}
74        };
75        self.span.in_scope(|| debug!("event loop closed"));
76      }
77      None => {
78        self.span.in_scope(|| warn!("shutdown called but no task running"));
79      }
80    }
81
82    Ok(())
83  }
84}
85
86impl Drop for EventLoop {
87  fn drop(&mut self) {
88    self.span.in_scope(|| trace!("dropping event loop"));
89    let lock = self.task.lock();
90    if let Some(task) = &*lock {
91      task.abort();
92    }
93  }
94}
95
96pub trait Observer {
97  fn on_event(&self, index: usize, event: &Event);
98  fn on_after_event(&self, index: usize, state: &State);
99  fn on_close(&self);
100}
101
102async fn event_loop(
103  mut channel: InterpreterChannel,
104  options: InterpreterOptions,
105  observer: Option<Box<dyn Observer + Send + Sync>>,
106  span: Span,
107) -> Result<(), ExecutionError> {
108  debug!(?options, "started");
109  let mut state = State::new(channel.dispatcher(None));
110
111  let mut num: usize = 0;
112
113  let result = loop {
114    let task = tokio::time::timeout(EventLoop::WAKE_TIMEOUT, channel.accept());
115    match task.await {
116      Ok(Some(event)) => {
117        let ctx_id = event.ctx_id;
118
119        if let Some(observer) = &observer {
120          observer.on_event(num, &event);
121        }
122
123        let name = event.name().to_owned();
124        let tx_span = event.span.unwrap_or_else(Span::current);
125
126        tx_span.in_scope(|| debug!(event = ?event.kind, ctx_id = ?ctx_id));
127
128        let result = match event.kind {
129          EventKind::Invocation(_index, _invocation) => {
130            error!("invocation not supported");
131            panic!("invocation not supported")
132          }
133          EventKind::CallComplete(data) => state.handle_call_complete(ctx_id, data).instrument(tx_span).await,
134          EventKind::PortData(data) => state.handle_port_data(ctx_id, data, &tx_span).await,
135          EventKind::ExecutionDone => state.handle_exec_done(ctx_id).instrument(tx_span).await,
136          EventKind::ExecutionStart(context, stream) => {
137            state
138              .handle_exec_start(*context, stream, &options)
139              .instrument(tx_span)
140              .await
141          }
142          EventKind::Ping(ping) => {
143            trace!(ping);
144            Ok(())
145          }
146          EventKind::Close(error) => match error {
147            Some(error) => {
148              tx_span.in_scope(|| error!(%error,"stopped with error"));
149              break Err(error);
150            }
151            None => {
152              tx_span.in_scope(|| debug!("stopping"));
153              break Ok(());
154            }
155          },
156        };
157
158        span.in_scope(|| {
159          if let Err(e) = result {
160            warn!(event = %name, ctx_id = ?ctx_id, response_error = %e, "iteration:end");
161          } else {
162            trace!(event = %name, ctx_id = ?ctx_id, "iteration:end");
163          }
164        });
165
166        if let Some(observer) = &observer {
167          observer.on_after_event(num, &state);
168        }
169        num += 1;
170      }
171      Ok(None) => {
172        break Ok(());
173      }
174      Err(_) => {
175        span.in_scope(|| {
176          if let Err(error) = state.run_cleanup() {
177            error!(%error,"Error checking hung invocations");
178            channel.dispatcher(None).dispatch_close(Some(error));
179          };
180        });
181      }
182    }
183  };
184  trace!("stopped");
185  if let Some(observer) = &observer {
186    observer.on_close();
187  }
188  result
189}
190
191#[derive(thiserror::Error, Debug)]
192pub(crate) enum EventLoopError {
193  #[error(transparent)]
194  ExecutionError(#[from] ExecutionError),
195  #[error(transparent)]
196  ChannelError(#[from] crate::interpreter::channel::Error),
197}
198
199#[cfg(test)]
200mod test {
201  use anyhow::Result;
202
203  use super::*;
204
205  const fn sync_send<T>()
206  where
207    T: Sync + Send,
208  {
209  }
210
211  #[test]
212  const fn test_sync_send() -> Result<()> {
213    sync_send::<EventLoop>();
214    Ok(())
215  }
216}