flow_graph_interpreter/interpreter/
channel.rs1use flow_graph::{NodeIndex, PortReference};
2use tracing::Span;
3use uuid::Uuid;
4use wick_packet::{Invocation, PacketPayload, PacketStream};
5
6pub(crate) use self::error::Error;
7use super::executor::error::ExecutionError;
8use crate::interpreter::executor::context::ExecutionContext;
9
10static CHANNEL_SIZE: usize = 50;
11
12const CHANNEL_UUID: Uuid = Uuid::from_bytes([
13 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF,
14]);
15
16#[derive(Debug)]
17pub struct Event {
18 pub(crate) ctx_id: Uuid,
19 pub(crate) kind: EventKind,
20 pub(crate) span: Option<Span>,
21}
22
23impl Event {
24 pub(crate) const fn new(ctx_id: Uuid, kind: EventKind, span: Option<Span>) -> Self {
25 Self { ctx_id, kind, span }
26 }
27
28 #[must_use]
29 pub const fn ctx_id(&self) -> &Uuid {
30 &self.ctx_id
31 }
32
33 #[must_use]
34 pub const fn name(&self) -> &str {
35 self.kind.name()
36 }
37
38 pub const fn kind(&self) -> &EventKind {
39 &self.kind
40 }
41}
42
43#[derive(Debug)]
44#[must_use]
45#[allow(clippy::exhaustive_enums)]
46pub enum EventKind {
47 Ping(usize),
48 ExecutionStart(Box<ExecutionContext>, PacketStream),
49 ExecutionDone,
50 PortData(PortReference),
51 Invocation(NodeIndex, Box<Invocation>),
52 CallComplete(CallComplete),
53 Close(Option<ExecutionError>),
54}
55
56impl EventKind {
57 pub(crate) const fn name(&self) -> &str {
58 match self {
59 EventKind::Ping(_) => "ping",
60 EventKind::ExecutionStart(_, _) => "exec_start",
61 EventKind::ExecutionDone => "exec_done",
62 EventKind::PortData(_) => "port_data",
63 EventKind::Invocation(_, _) => "invocation",
64 EventKind::CallComplete(_) => "call_complete",
65 EventKind::Close(_) => "close",
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
71pub struct CallComplete {
72 pub(crate) index: NodeIndex,
73 pub(crate) err: Option<PacketPayload>,
74}
75
76impl CallComplete {
77 const fn new(component_index: NodeIndex) -> Self {
78 Self {
79 index: component_index,
80 err: None,
81 }
82 }
83 pub const fn index(&self) -> NodeIndex {
84 self.index
85 }
86 pub const fn err(&self) -> &Option<PacketPayload> {
87 &self.err
88 }
89}
90
91pub(crate) struct InterpreterChannel {
92 sender: tokio::sync::mpsc::Sender<Event>,
93 receiver: tokio::sync::mpsc::Receiver<Event>,
94}
95
96impl Default for InterpreterChannel {
97 fn default() -> Self {
98 Self::new()
99 }
100}
101
102impl std::fmt::Debug for InterpreterChannel {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("InterpreterChannel()").finish()
105 }
106}
107
108impl InterpreterChannel {
109 pub(crate) fn new() -> Self {
110 let (sender, receiver) = tokio::sync::mpsc::channel(CHANNEL_SIZE);
111 Self { sender, receiver }
112 }
113
114 pub(crate) fn dispatcher(&self, span: Option<Span>) -> InterpreterDispatchChannel {
115 InterpreterDispatchChannel::new(self.sender.clone(), span)
116 }
117
118 pub(crate) async fn accept(&mut self) -> Option<Event> {
119 self.receiver.recv().await
120 }
121}
122
123#[derive(Clone)]
124pub(crate) struct InterpreterDispatchChannel {
125 span: Option<Span>,
126 sender: tokio::sync::mpsc::Sender<Event>,
127}
128
129impl std::fmt::Debug for InterpreterDispatchChannel {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("InterpreterRequestChannel()").finish()
132 }
133}
134
135impl InterpreterDispatchChannel {
136 const fn new(sender: tokio::sync::mpsc::Sender<Event>, span: Option<Span>) -> Self {
137 Self { sender, span }
138 }
139
140 pub(crate) fn with_span(self, span: Span) -> Self {
141 Self {
142 sender: self.sender,
143 span: Some(span),
144 }
145 }
146
147 pub(crate) fn dispatch(&self, event: Event) {
148 let tx = self.sender.clone();
149
150 tokio::task::spawn(async move {
151 if tx.send(event).await.is_err() {
152 warn!("interpreter channel closed unexpectedly. This is likely due to an intentional shutdown while there are still events processing.");
153 }
154 });
155 }
156
157 pub(crate) fn dispatch_done(&self, ctx_id: Uuid) {
158 self.dispatch(Event::new(ctx_id, EventKind::ExecutionDone, self.span.clone()));
159 }
160
161 pub(crate) fn dispatch_data(&self, ctx_id: Uuid, port: PortReference) {
162 self.dispatch(Event::new(ctx_id, EventKind::PortData(port), self.span.clone()));
163 }
164
165 pub(crate) fn dispatch_close(&self, error: Option<ExecutionError>) {
166 self.dispatch(Event::new(CHANNEL_UUID, EventKind::Close(error), self.span.clone()));
167 }
168
169 pub(crate) fn dispatch_start(&self, ctx: Box<ExecutionContext>, stream: PacketStream) {
170 self.dispatch(Event::new(
171 ctx.id(),
172 EventKind::ExecutionStart(ctx, stream),
173 self.span.clone(),
174 ));
175 }
176
177 pub(crate) fn dispatch_call_complete(&self, ctx_id: Uuid, op_index: usize) {
178 self.dispatch(Event::new(
179 ctx_id,
180 EventKind::CallComplete(CallComplete::new(op_index)),
181 self.span.clone(),
182 ));
183 }
184
185 pub(crate) fn dispatch_op_err(&self, ctx_id: Uuid, op_index: usize, signal: PacketPayload) {
186 self.dispatch(Event::new(
187 ctx_id,
188 EventKind::CallComplete(CallComplete {
189 index: op_index,
190 err: Some(signal),
191 }),
192 self.span.clone(),
193 ));
194 }
195}
196
197pub(crate) mod error {
198 #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
199 pub enum Error {
200 #[error("Receive failed")]
201 Receive,
202 #[error("Receive timed out")]
203 ReceiveTimeout,
204 #[error("Response failed")]
205 Response,
206 #[error("Send failed")]
207 Send,
208 #[error("Request timed out")]
209 SendTimeout,
210 #[error("Request failed")]
211 Request(RequestError),
212 }
213 #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
214 pub enum RequestError {
215 #[error("Bad request")]
216 BadRequest,
217 }
218}
219
220#[cfg(test)]
221mod test {
222
223 use super::*;
224
225 #[tokio::test]
226 async fn test_channel() -> anyhow::Result<()> {
227 let mut channel = InterpreterChannel::new();
228
229 let child1 = channel.dispatcher(None);
230 let child2 = channel.dispatcher(None);
231 let child3 = channel.dispatcher(None);
232
233 let join_handle = tokio::task::spawn(async move {
234 println!("Handling requests");
235 let mut num_handled = 0;
236 while let Some(event) = channel.accept().await {
237 num_handled += 1;
238 match event.kind {
239 EventKind::Ping(num) => {
240 trace!("ping:{}", num);
241 }
242 EventKind::Close(_) => {
243 break;
244 }
245 _ => panic!(),
246 }
247 }
248 println!("Done handling requests");
249 num_handled
250 });
251
252 tokio::spawn(async move {
253 let num = 1;
254 println!("Child 1 PING({})", num);
255 child1.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num), None));
256 })
257 .await?;
258
259 tokio::spawn(async move {
260 let num = 2;
261 println!("Child 2 PING({})", num);
262 child2.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num), None));
263 })
264 .await?;
265
266 child3.dispatch_close(None);
267 let num_handled = join_handle.await?;
268
269 println!("{:?}", num_handled);
270 assert_eq!(num_handled, 3);
271
272 Ok(())
273 }
274}