flow_graph_interpreter/interpreter/
channel.rs

1use flow_graph::{NodeIndex, PortReference};
2use tracing::Span;
3use uuid::Uuid;
4use wick_packet::{Invocation, PacketPayload, PacketStream};
5
6pub(crate) use self::error::Error;
7use super::executor::error::ExecutionError;
8use crate::interpreter::executor::context::ExecutionContext;
9
10static CHANNEL_SIZE: usize = 50;
11
12const CHANNEL_UUID: Uuid = Uuid::from_bytes([
13  0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF,
14]);
15
16#[derive(Debug)]
17pub struct Event {
18  pub(crate) ctx_id: Uuid,
19  pub(crate) kind: EventKind,
20  pub(crate) span: Option<Span>,
21}
22
23impl Event {
24  pub(crate) const fn new(ctx_id: Uuid, kind: EventKind, span: Option<Span>) -> Self {
25    Self { ctx_id, kind, span }
26  }
27
28  #[must_use]
29  pub const fn ctx_id(&self) -> &Uuid {
30    &self.ctx_id
31  }
32
33  #[must_use]
34  pub const fn name(&self) -> &str {
35    self.kind.name()
36  }
37
38  pub const fn kind(&self) -> &EventKind {
39    &self.kind
40  }
41}
42
43#[derive(Debug)]
44#[must_use]
45#[allow(clippy::exhaustive_enums)]
46pub enum EventKind {
47  Ping(usize),
48  ExecutionStart(Box<ExecutionContext>, PacketStream),
49  ExecutionDone,
50  PortData(PortReference),
51  Invocation(NodeIndex, Box<Invocation>),
52  CallComplete(CallComplete),
53  Close(Option<ExecutionError>),
54}
55
56impl EventKind {
57  pub(crate) const fn name(&self) -> &str {
58    match self {
59      EventKind::Ping(_) => "ping",
60      EventKind::ExecutionStart(_, _) => "exec_start",
61      EventKind::ExecutionDone => "exec_done",
62      EventKind::PortData(_) => "port_data",
63      EventKind::Invocation(_, _) => "invocation",
64      EventKind::CallComplete(_) => "call_complete",
65      EventKind::Close(_) => "close",
66    }
67  }
68}
69
70#[derive(Debug, Clone)]
71pub struct CallComplete {
72  pub(crate) index: NodeIndex,
73  pub(crate) err: Option<PacketPayload>,
74}
75
76impl CallComplete {
77  const fn new(component_index: NodeIndex) -> Self {
78    Self {
79      index: component_index,
80      err: None,
81    }
82  }
83  pub const fn index(&self) -> NodeIndex {
84    self.index
85  }
86  pub const fn err(&self) -> &Option<PacketPayload> {
87    &self.err
88  }
89}
90
91pub(crate) struct InterpreterChannel {
92  sender: tokio::sync::mpsc::Sender<Event>,
93  receiver: tokio::sync::mpsc::Receiver<Event>,
94}
95
96impl Default for InterpreterChannel {
97  fn default() -> Self {
98    Self::new()
99  }
100}
101
102impl std::fmt::Debug for InterpreterChannel {
103  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104    f.debug_struct("InterpreterChannel()").finish()
105  }
106}
107
108impl InterpreterChannel {
109  pub(crate) fn new() -> Self {
110    let (sender, receiver) = tokio::sync::mpsc::channel(CHANNEL_SIZE);
111    Self { sender, receiver }
112  }
113
114  pub(crate) fn dispatcher(&self, span: Option<Span>) -> InterpreterDispatchChannel {
115    InterpreterDispatchChannel::new(self.sender.clone(), span)
116  }
117
118  pub(crate) async fn accept(&mut self) -> Option<Event> {
119    self.receiver.recv().await
120  }
121}
122
123#[derive(Clone)]
124pub(crate) struct InterpreterDispatchChannel {
125  span: Option<Span>,
126  sender: tokio::sync::mpsc::Sender<Event>,
127}
128
129impl std::fmt::Debug for InterpreterDispatchChannel {
130  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131    f.debug_struct("InterpreterRequestChannel()").finish()
132  }
133}
134
135impl InterpreterDispatchChannel {
136  const fn new(sender: tokio::sync::mpsc::Sender<Event>, span: Option<Span>) -> Self {
137    Self { sender, span }
138  }
139
140  pub(crate) fn with_span(self, span: Span) -> Self {
141    Self {
142      sender: self.sender,
143      span: Some(span),
144    }
145  }
146
147  pub(crate) fn dispatch(&self, event: Event) {
148    let tx = self.sender.clone();
149
150    tokio::task::spawn(async move {
151      if tx.send(event).await.is_err() {
152        warn!("interpreter channel closed unexpectedly. This is likely due to an intentional shutdown while there are still events processing.");
153      }
154    });
155  }
156
157  pub(crate) fn dispatch_done(&self, ctx_id: Uuid) {
158    self.dispatch(Event::new(ctx_id, EventKind::ExecutionDone, self.span.clone()));
159  }
160
161  pub(crate) fn dispatch_data(&self, ctx_id: Uuid, port: PortReference) {
162    self.dispatch(Event::new(ctx_id, EventKind::PortData(port), self.span.clone()));
163  }
164
165  pub(crate) fn dispatch_close(&self, error: Option<ExecutionError>) {
166    self.dispatch(Event::new(CHANNEL_UUID, EventKind::Close(error), self.span.clone()));
167  }
168
169  pub(crate) fn dispatch_start(&self, ctx: Box<ExecutionContext>, stream: PacketStream) {
170    self.dispatch(Event::new(
171      ctx.id(),
172      EventKind::ExecutionStart(ctx, stream),
173      self.span.clone(),
174    ));
175  }
176
177  pub(crate) fn dispatch_call_complete(&self, ctx_id: Uuid, op_index: usize) {
178    self.dispatch(Event::new(
179      ctx_id,
180      EventKind::CallComplete(CallComplete::new(op_index)),
181      self.span.clone(),
182    ));
183  }
184
185  pub(crate) fn dispatch_op_err(&self, ctx_id: Uuid, op_index: usize, signal: PacketPayload) {
186    self.dispatch(Event::new(
187      ctx_id,
188      EventKind::CallComplete(CallComplete {
189        index: op_index,
190        err: Some(signal),
191      }),
192      self.span.clone(),
193    ));
194  }
195}
196
197pub(crate) mod error {
198  #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
199  pub enum Error {
200    #[error("Receive failed")]
201    Receive,
202    #[error("Receive timed out")]
203    ReceiveTimeout,
204    #[error("Response failed")]
205    Response,
206    #[error("Send failed")]
207    Send,
208    #[error("Request timed out")]
209    SendTimeout,
210    #[error("Request failed")]
211    Request(RequestError),
212  }
213  #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
214  pub enum RequestError {
215    #[error("Bad request")]
216    BadRequest,
217  }
218}
219
220#[cfg(test)]
221mod test {
222
223  use super::*;
224
225  #[tokio::test]
226  async fn test_channel() -> anyhow::Result<()> {
227    let mut channel = InterpreterChannel::new();
228
229    let child1 = channel.dispatcher(None);
230    let child2 = channel.dispatcher(None);
231    let child3 = channel.dispatcher(None);
232
233    let join_handle = tokio::task::spawn(async move {
234      println!("Handling requests");
235      let mut num_handled = 0;
236      while let Some(event) = channel.accept().await {
237        num_handled += 1;
238        match event.kind {
239          EventKind::Ping(num) => {
240            trace!("ping:{}", num);
241          }
242          EventKind::Close(_) => {
243            break;
244          }
245          _ => panic!(),
246        }
247      }
248      println!("Done handling requests");
249      num_handled
250    });
251
252    tokio::spawn(async move {
253      let num = 1;
254      println!("Child 1 PING({})", num);
255      child1.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num), None));
256    })
257    .await?;
258
259    tokio::spawn(async move {
260      let num = 2;
261      println!("Child 2 PING({})", num);
262      child2.dispatch(Event::new(Uuid::new_v4(), EventKind::Ping(num), None));
263    })
264    .await?;
265
266    child3.dispatch_close(None);
267    let num_handled = join_handle.await?;
268
269    println!("{:?}", num_handled);
270    assert_eq!(num_handled, 3);
271
272    Ok(())
273  }
274}