flow_graph_interpreter/interpreter/
event_loop.rs1pub(crate) mod state;
2
3use std::time::Duration;
4
5use parking_lot::Mutex;
6use tokio::task::JoinHandle;
7use tracing::Span;
8use tracing_futures::Instrument;
9
10use super::channel::{Event, EventKind, InterpreterChannel, InterpreterDispatchChannel};
11use super::error::Error;
12use super::InterpreterOptions;
13use crate::interpreter::event_loop::state::State;
14use crate::interpreter::executor::error::ExecutionError;
15
16#[derive(Debug)]
17pub(crate) struct EventLoop {
18 channel: Option<InterpreterChannel>,
19 dispatcher: InterpreterDispatchChannel,
20 task: Mutex<Option<JoinHandle<Result<(), ExecutionError>>>>,
21 span: Span,
22}
23
24impl EventLoop {
25 pub(crate) const WAKE_TIMEOUT: Duration = Duration::from_millis(500);
26 pub(crate) const STALLED_TX_TIMEOUT: Duration = Duration::from_secs(60 * 5);
27 pub(crate) const SLOW_TX_TIMEOUT: Duration = Duration::from_secs(15);
28
29 pub(super) fn new(channel: InterpreterChannel, span: &Span) -> Self {
30 let event_span = info_span!("event_loop");
31 event_span.follows_from(span);
32 let dispatcher = channel.dispatcher(Some(event_span.clone()));
33
34 Self {
35 channel: Some(channel),
36 dispatcher,
37 task: Mutex::new(None),
38 span: event_span,
39 }
40 }
41
42 pub(super) async fn start(&mut self, options: InterpreterOptions, observer: Option<Box<dyn Observer + Send + Sync>>) {
43 let channel = self.channel.take().unwrap();
44
45 let span = self.span.clone();
46 let handle = tokio::spawn(async move { event_loop(channel, options, observer, span).await });
47 let mut lock = self.task.lock();
48 lock.replace(handle);
49 }
50
51 fn steal_task(&self) -> Option<JoinHandle<Result<(), ExecutionError>>> {
52 let mut lock = self.task.lock();
53 lock.take()
54 }
55
56 pub(super) async fn shutdown(&self) -> Result<(), Error> {
57 self.span.in_scope(|| trace!("shutting down event loop"));
58 let task = self.steal_task();
59 match task {
60 Some(task) => {
61 self.dispatcher.dispatch_close(None);
62
63 let timeout = std::time::Duration::from_secs(2);
64 let result = tokio::time::timeout(timeout, task).await;
65 match result.map_err(|_| Error::ShutdownTimeout)? {
66 Ok(Err(e)) => {
67 return Err(Error::Shutdown(e.to_string()));
68 }
69 Err(e) => {
70 self.span.in_scope(|| warn!(%e, "event loop panicked"));
71 return Err(Error::EventLoopPanic(e.to_string()));
72 }
73 Ok(_) => {}
74 };
75 self.span.in_scope(|| debug!("event loop closed"));
76 }
77 None => {
78 self.span.in_scope(|| warn!("shutdown called but no task running"));
79 }
80 }
81
82 Ok(())
83 }
84}
85
86impl Drop for EventLoop {
87 fn drop(&mut self) {
88 self.span.in_scope(|| trace!("dropping event loop"));
89 let lock = self.task.lock();
90 if let Some(task) = &*lock {
91 task.abort();
92 }
93 }
94}
95
96pub trait Observer {
97 fn on_event(&self, index: usize, event: &Event);
98 fn on_after_event(&self, index: usize, state: &State);
99 fn on_close(&self);
100}
101
102async fn event_loop(
103 mut channel: InterpreterChannel,
104 options: InterpreterOptions,
105 observer: Option<Box<dyn Observer + Send + Sync>>,
106 span: Span,
107) -> Result<(), ExecutionError> {
108 debug!(?options, "started");
109 let mut state = State::new(channel.dispatcher(None));
110
111 let mut num: usize = 0;
112
113 let result = loop {
114 let task = tokio::time::timeout(EventLoop::WAKE_TIMEOUT, channel.accept());
115 match task.await {
116 Ok(Some(event)) => {
117 let ctx_id = event.ctx_id;
118
119 if let Some(observer) = &observer {
120 observer.on_event(num, &event);
121 }
122
123 let name = event.name().to_owned();
124 let tx_span = event.span.unwrap_or_else(Span::current);
125
126 tx_span.in_scope(|| debug!(event = ?event.kind, ctx_id = ?ctx_id));
127
128 let result = match event.kind {
129 EventKind::Invocation(_index, _invocation) => {
130 error!("invocation not supported");
131 panic!("invocation not supported")
132 }
133 EventKind::CallComplete(data) => state.handle_call_complete(ctx_id, data).instrument(tx_span).await,
134 EventKind::PortData(data) => state.handle_port_data(ctx_id, data, &tx_span).await,
135 EventKind::ExecutionDone => state.handle_exec_done(ctx_id).instrument(tx_span).await,
136 EventKind::ExecutionStart(context, stream) => {
137 state
138 .handle_exec_start(*context, stream, &options)
139 .instrument(tx_span)
140 .await
141 }
142 EventKind::Ping(ping) => {
143 trace!(ping);
144 Ok(())
145 }
146 EventKind::Close(error) => match error {
147 Some(error) => {
148 tx_span.in_scope(|| error!(%error,"stopped with error"));
149 break Err(error);
150 }
151 None => {
152 tx_span.in_scope(|| debug!("stopping"));
153 break Ok(());
154 }
155 },
156 };
157
158 span.in_scope(|| {
159 if let Err(e) = result {
160 warn!(event = %name, ctx_id = ?ctx_id, response_error = %e, "iteration:end");
161 } else {
162 trace!(event = %name, ctx_id = ?ctx_id, "iteration:end");
163 }
164 });
165
166 if let Some(observer) = &observer {
167 observer.on_after_event(num, &state);
168 }
169 num += 1;
170 }
171 Ok(None) => {
172 break Ok(());
173 }
174 Err(_) => {
175 span.in_scope(|| {
176 if let Err(error) = state.run_cleanup() {
177 error!(%error,"Error checking hung invocations");
178 channel.dispatcher(None).dispatch_close(Some(error));
179 };
180 });
181 }
182 }
183 };
184 trace!("stopped");
185 if let Some(observer) = &observer {
186 observer.on_close();
187 }
188 result
189}
190
191#[derive(thiserror::Error, Debug)]
192pub(crate) enum EventLoopError {
193 #[error(transparent)]
194 ExecutionError(#[from] ExecutionError),
195 #[error(transparent)]
196 ChannelError(#[from] crate::interpreter::channel::Error),
197}
198
199#[cfg(test)]
200mod test {
201 use anyhow::Result;
202
203 use super::*;
204
205 const fn sync_send<T>()
206 where
207 T: Sync + Send,
208 {
209 }
210
211 #[test]
212 const fn test_sync_send() -> Result<()> {
213 sync_send::<EventLoop>();
214 Ok(())
215 }
216}