wick_component/adapters/binary/
paired_right_stream.rs

1use tokio_stream::StreamExt;
2use wasmrs_runtime::BoxFuture;
3use wick_packet::{BoxStream, PacketExt, VPacket};
4
5use crate::adapters::encode;
6use crate::{make_substream_window, propagate_if_error, SingleOutput, WickStream};
7
8#[macro_export]
9/// This macro will generate the implementations for binary operations that pair single packets with potential streams of packets.
10///
11/// A common example would be file system write() operations, where contents are paired with a filename and a stream of contents.
12macro_rules! binary_paired_right_stream {
13  ($name:ident => $handler:ident) => {
14    #[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
15    #[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
16    impl $name::Operation for Component {
17      type Error = wick_component::AnyError;
18      type Inputs = $name::Inputs;
19      type Outputs = $name::Outputs;
20      type Config = $name::Config;
21
22      async fn $name(
23        inputs: Self::Inputs,
24        mut outputs: Self::Outputs,
25        ctx: Context<Self::Config>,
26      ) -> Result<(), Self::Error> {
27        use wick_packet::BinaryInputs;
28        let (left, right) = inputs.both();
29
30        wick_component::binary::paired_right_stream(left, right, &mut outputs, &ctx, &$handler).await?;
31
32        Ok(())
33      }
34    }
35  };
36}
37
38/// Operation helper for common for binary operations that pair single packets with potential streams of packets.
39pub async fn paired_right_stream<'c, LEFT, RIGHT, OUTPUT, CONTEXT, OUTPORT, F, E>(
40  left: BoxStream<VPacket<LEFT>>,
41  right: BoxStream<VPacket<RIGHT>>,
42  outputs: &mut OUTPORT,
43  ctx: &'c CONTEXT,
44  func: &'static F,
45) -> Result<(), E>
46where
47  CONTEXT: Clone + wasmrs_runtime::ConditionallySendSync,
48  F: Fn(LEFT, WickStream<RIGHT>, CONTEXT) -> BoxFuture<Result<OUTPUT, E>> + wasmrs_runtime::ConditionallySendSync,
49  OUTPORT: SingleOutput + wasmrs_runtime::ConditionallySendSync,
50  LEFT: serde::de::DeserializeOwned + Clone + wasmrs_runtime::ConditionallySendSync,
51  RIGHT: serde::de::DeserializeOwned + Clone + wasmrs_runtime::ConditionallySendSync,
52  OUTPUT: serde::Serialize + wasmrs_runtime::ConditionallySendSync,
53  E: std::fmt::Display + wasmrs_runtime::ConditionallySendSync,
54{
55  let (_, _) = inner::<LEFT, RIGHT, OUTPUT, CONTEXT, OUTPORT, F, E>(left, right, outputs, ctx, func).await;
56  outputs.single_output().done();
57
58  Ok(())
59}
60
61#[cfg_attr(not(target_family = "wasm"), async_recursion::async_recursion)]
62#[cfg_attr(target_family = "wasm", async_recursion::async_recursion(?Send))]
63async fn inner<'out, 'c, LEFT, RIGHT, OUTPUT, CONTEXT, OUTPORT, F, E>(
64  mut l_stream: BoxStream<VPacket<LEFT>>,
65  mut r_stream: BoxStream<VPacket<RIGHT>>,
66  outputs: &'out mut OUTPORT,
67  ctx: &'c CONTEXT,
68  func: &'static F,
69) -> (BoxStream<VPacket<LEFT>>, BoxStream<VPacket<RIGHT>>)
70where
71  CONTEXT: Clone + wasmrs_runtime::ConditionallySendSync,
72  F: Fn(LEFT, WickStream<RIGHT>, CONTEXT) -> BoxFuture<Result<OUTPUT, E>> + wasmrs_runtime::ConditionallySendSync,
73  OUTPORT: SingleOutput + wasmrs_runtime::ConditionallySendSync,
74  LEFT: serde::de::DeserializeOwned + Clone + wasmrs_runtime::ConditionallySendSync,
75  RIGHT: serde::de::DeserializeOwned + Clone + wasmrs_runtime::ConditionallySendSync,
76  OUTPUT: serde::Serialize + wasmrs_runtime::ConditionallySendSync,
77  E: std::fmt::Display + wasmrs_runtime::ConditionallySendSync,
78{
79  loop {
80    let Some(left) = l_stream.next().await else { break };
81    if left.is_open_bracket() {
82      make_substream_window!(outputs, {
83        (l_stream, r_stream) = inner(l_stream, r_stream, outputs, ctx, func).await;
84      });
85      continue;
86    }
87
88    let left: LEFT = propagate_if_error!(left.decode(), outputs, continue);
89
90    let (tx, rx) = wasmrs_runtime::unbounded_channel();
91    let mut started = false;
92    let mut depth = 0;
93    let (rv_tx, rv_rx) = wasmrs_runtime::oneshot();
94    let ctx = ctx.clone();
95    wasmrs_runtime::spawn("paired_right_stream", async move {
96      let _ = rv_tx.send(encode(func(left, Box::pin(rx), ctx).await));
97    });
98    while let Some(packet) = r_stream.next().await {
99      if packet.is_open_bracket() {
100        if !started {
101          depth += 1;
102          outputs.broadcast_open();
103          continue;
104        }
105        tracing::debug!("received open bracket while already started");
106        let _ = tx.send(Err(
107          wick_packet::Error::Component("Received open bracket while already started".to_owned()).into(),
108        ));
109        continue;
110      }
111
112      if packet.is_close_bracket() {
113        depth -= 1;
114        if depth == 0 {
115          break;
116        }
117        continue;
118      }
119
120      if packet.is_done() {
121        break;
122      }
123
124      if !started {
125        started = true;
126      }
127
128      let _ = tx.send(packet.decode().map_err(Into::into));
129    }
130    drop(tx);
131
132    match rv_rx.await {
133      Ok(v) => outputs.single_output().send_raw_payload(v),
134      Err(e) => outputs.broadcast_err(e.to_string()),
135    }
136  }
137
138  (l_stream, r_stream)
139}