wick_component/
macros.rs

1/// This macro is used to include the generated code from a `wick-component-codegen` build step.
2///
3/// # Example
4///
5/// ```
6/// # use wick_component::prelude::*;
7///
8/// // Useful way of importing code and keeping it separate from your own code.
9/// mod wick {
10/// wick_import!();
11/// }
12///
13#[macro_export]
14macro_rules! wick_import {
15  () => {
16    include!(concat!(env!("OUT_DIR"), "/mod.rs"));
17  };
18}
19
20#[doc(hidden)]
21#[macro_export]
22macro_rules! handle_port {
23  ($packet:ident, $tx:ident, $port:expr, $ty:ty) => {{
24    use $crate::wasmrs_rx::Observer;
25    use $crate::wick_packet::PacketExt;
26    if $packet.is_done() {
27      $tx.complete();
28    } else {
29      let _ = $tx.send($packet);
30    }
31  }};
32}
33
34#[doc(hidden)]
35#[macro_export]
36macro_rules! stream_senders {
37  ($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
38    $crate::paste::paste! {
39      struct $name {
40        $(
41          [<$port:snake>]: $crate::wasmrs_rx::FluxChannel<$crate::wick_packet::Packet,$error>
42        ),*
43      }
44      impl $name {
45        #[allow(clippy::missing_const_for_fn,unreachable_pub,unused,unused_parens)]
46        pub fn receivers(&self) -> Option<($(
47          $crate::BoxStream<$crate::wick_packet::VPacket<$($ty)*>>
48        ),*)> {
49          Some((
50            $(Box::pin(self.[<$port:snake>].take_rx().ok()?.map($crate::wick_packet::VPacket::from_result))),*
51          ))
52        }
53      }
54      impl Default for $name {
55        fn default() -> Self {
56          $crate::paste::paste! {
57            Self {
58              $(
59                [<$port:snake>]: $crate::wasmrs_rx::FluxChannel::new()
60              ),*
61            }
62          }
63        }
64      }
65    }
66  };
67}
68
69#[doc(hidden)]
70#[macro_export]
71macro_rules! stream_receivers {
72  ($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
73    $crate::paste::paste! {
74      struct $name {
75        $(
76          [<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error>
77        ),*
78      }
79      impl $name {
80        fn new(
81          $(
82            [<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error>
83          ),*
84        ) -> Self {
85          $crate::paste::paste! {
86            Self {
87              $(
88                [<$port:snake>]
89              ),*
90            }
91          }
92        }
93      }
94    }
95  };
96}
97
98#[doc(hidden)]
99#[macro_export]
100macro_rules! payload_fan_out {
101    (@handle_packet $payload: ident, $sender:ident, $config:ty) => {
102      {
103        let packet: $crate::wick_packet::Packet = $payload.into();
104
105        if let Some(config_tx) = $sender.take() {
106          if let Some(context) = packet.context() {
107            let config: Result<$crate::wick_packet::ContextTransport<$config>, _> = $crate::wasmrs_codec::messagepack::deserialize(&context).map_err(|e|format!("Cound not deserialize context: {}", e));
108            let _ = config_tx.send(config.map($crate::flow_component::Context::from));
109          } else {
110            // packet = $crate::wick_packet::Packet::component_error("No context attached to first invocation packet");
111          }
112        }
113        packet
114      }
115    };
116    (@handle_packet $payload: ident) => {
117      {
118        let packet: $crate::wick_packet::Packet = $payload.into();
119        packet
120      }
121    };
122
123    (@route_packet $packet:ident, $channels:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
124      match $crate::wick_packet::PacketExt::port(&$packet) {
125        $(
126          $port => {
127            let tx = &$crate::paste::paste! { $channels.[<$port:snake>] };
128            $crate::handle_port!($packet, tx, $port, $($ty)*)
129          }
130        ),*
131        $crate::wick_packet::Packet::FATAL_ERROR =>
132        {
133          #[allow(unused)]
134          {
135            use $crate::wasmrs_rx::Observer;
136            let error = $packet.unwrap_err();
137            $crate::paste::paste! {
138              $(
139                $channels.[<$port:snake>].send_result(Err($crate::anyhow::anyhow!(error.clone()))).unwrap();
140              )*
141            }
142          }
143        }
144        _ => {
145          // TODO: add tracing to warn when we're sent packets we aren't expecting
146        }
147      }
148    };
149    ($stream:expr, $error:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
150      {
151        $crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]);
152        #[allow(unused)]
153
154        let channels = Channels::default();
155
156        let output_streams = channels.receivers().unwrap();
157
158        $crate::runtime::spawn("payload_fan_out", async move {
159          #[allow(unused)]
160          use $crate::StreamExt;
161          loop {
162            if let Some(Ok(payload)) = $stream.next().await {
163              let packet = $crate::payload_fan_out!(@handle_packet payload);
164              $crate::payload_fan_out!(@route_packet packet, channels, [ $(($port, $($ty)+)),* ]);
165            } else {
166              break;
167            }
168          }
169
170        });
171        output_streams
172      }
173
174    };
175    ($stream:expr, $error:ty, $config:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
176      {
177        $crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]);
178        #[allow(unused)]
179        let channels = Channels::default();
180
181        let (config_tx,config_rx) = $crate::runtime::oneshot();
182        let mut config_tx = Some(config_tx);
183        let config_mono = Box::pin(async move {config_rx.await.unwrap()});
184        let output_streams = (config_mono, channels.receivers().unwrap());
185
186        $crate::runtime::spawn("payload_fan_out", async move {
187          #[allow(unused)]
188          use $crate::StreamExt;
189          loop {
190            if let Some(Ok(payload)) = $stream.next().await {
191              let packet = $crate::payload_fan_out!(@handle_packet payload, config_tx, $config);
192              $crate::payload_fan_out!(@route_packet packet, channels, [ $(($port, $($ty)+)),* ]);
193            } else {
194              break;
195            }
196          }
197        });
198
199        output_streams
200      }
201    };
202}
203
204#[doc(hidden)]
205#[macro_export]
206macro_rules! propagate_if_error_then {
207  (($($id:ident),*), $outputs:ident, $bail:expr) => {
208    ($(
209      match $id {
210        Ok(value) => value,
211        Err(err) => {
212          $outputs.broadcast_err(err.to_string());
213          $bail;
214        }
215      },
216    )*)
217  };
218  ($result:expr, $outputs:ident, $bail:expr) => {
219    match $result {
220      Ok(value) => value,
221      Err(err) => {
222        $outputs.broadcast_err(err.to_string());
223        $bail;
224      }
225    }
226  };
227}
228
229/// Unwrap a [Result] value to its [Result::Ok] value or propagate the error to the downstream inputs and
230/// short circuit the logic.
231///
232///
233/// Takes a [Result] value, a generated component's Outputs, and an action to perform on error (`break`, `continue`, or `return expr`).
234///
235/// If the [Result] is [Ok], the [Result::Ok] value is returned.
236///
237/// If the [Result] is [Err], the error is propagated to the passed outputs,
238/// and the logic is short circuited with the passed action.
239///
240/// # Example
241///
242/// ```
243/// let results: Vec<Result<i32, ()>> = vec![Ok(1),Err(()),Ok(2)];
244/// while let Some(result) = results.next() {
245///   let value = propagate_if_error!(result, outputs, continue);
246///   println!("{}", value);
247/// }
248/// ```
249///
250#[macro_export]
251macro_rules! propagate_if_error {
252  (($($id:ident),*), $outputs:ident, continue) => {
253    $crate::propagate_if_error_then!(($($id),*), $outputs, continue)
254  };
255  ($result:expr, $outputs:ident, continue) => {
256    $crate::propagate_if_error_then!($result, $outputs, continue)
257  };
258  ($result:expr,$outputs:ident, break) => {
259    $crate::propagate_if_error_then!($result, $outputs, break)
260  };
261  ($result:expr,$outputs:ident, return $rv:expr) => {
262    $crate::propagate_if_error_then!($result, $outputs, $rv)
263  };
264}
265
266#[doc(hidden)]
267#[macro_export]
268macro_rules! if_done_close_then {
269  ([$($id:ident),*], $do:expr) => {{
270    $(
271      if $id.is_done() || $id.is_close_bracket(){
272        $do;
273      }
274    )*
275  }};
276}
277
278#[allow(missing_docs)]
279#[macro_export]
280macro_rules! await_next_ok_or {
281  ($stream:ident, $outputs:ident, continue) => {{
282    let Some(next) = ($stream.next().await) else { break };
283    let packet = propagate_if_error!(next, $outputs, continue);
284    packet
285  }};
286  ($stream:ident, $outputs:ident, break) => {{
287    let Some(next) = ($stream.next().await) else { break };
288    let packet = propagate_if_error!(next, $outputs, break);
289    packet
290  }};
291}
292
293#[allow(missing_docs)]
294#[macro_export]
295macro_rules! make_substream_window {
296  ($outputs:ident, $block:block) => {{
297    $outputs.broadcast_open();
298    $block;
299    $outputs.broadcast_close();
300  }};
301}
302
303#[cfg(test)]
304mod test {
305  use anyhow::Result;
306  use tokio_stream::StreamExt;
307  use wick_packet::{packet_stream, InherentData};
308  #[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
309  struct Config {}
310
311  #[tokio::test]
312  async fn test_fan_out() -> Result<()> {
313    let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
314    stream.set_context(Default::default(), InherentData::unsafe_default());
315    let (_config, (mut foo_rx, mut bar_rx)) =
316      payload_fan_out!(stream, anyhow::Error, Config, [("foo", i32), ("bar", i32)]);
317    assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 1);
318    assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 2);
319    assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 3);
320    assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 4);
321    assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 5);
322    assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 6);
323
324    Ok(())
325  }
326
327  #[tokio::test]
328  async fn test_fan_out_no_config() -> Result<()> {
329    let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
330    stream.set_context(Default::default(), InherentData::unsafe_default());
331    let (mut foo_rx, mut bar_rx) = payload_fan_out!(stream, anyhow::Error, [("foo", i32), ("bar", i32)]);
332    assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 1);
333    assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 2);
334    assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 3);
335    assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 4);
336    assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 5);
337    assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 6);
338
339    Ok(())
340  }
341
342  #[tokio::test]
343  async fn test_fan_out_no_fields() -> Result<()> {
344    let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
345    stream.set_context(Default::default(), InherentData::unsafe_default());
346    let _config = payload_fan_out!(stream, anyhow::Error, Config, []);
347
348    Ok(())
349  }
350}