wick_component/adapters/unary/
simple.rs

1use tokio_stream::StreamExt;
2use wasmrs_runtime::BoxFuture;
3use wick_packet::{BoxStream, PacketExt, VPacket};
4
5use crate::adapters::encode;
6use crate::{propagate_if_error, SingleOutput};
7
8#[macro_export]
9/// This macro will generate the implementations for simple binary operations, operations that take two inputs, produce one output, and are largely want to remain ignorant of stream state.
10macro_rules! unary_simple {
11  ($name:ident => $handler:ident) => {
12    #[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
13    #[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
14    impl $name::Operation for Component {
15      type Error = wick_component::AnyError;
16      type Inputs = $name::Inputs;
17      type Outputs = $name::Outputs;
18      type Config = $name::Config;
19
20      async fn $name(
21        mut inputs: Self::Inputs,
22        mut outputs: Self::Outputs,
23        ctx: Context<Self::Config>,
24      ) -> Result<(), Self::Error> {
25        use wick_packet::UnaryInputs;
26        let input = inputs.take_input();
27
28        wick_component::unary::simple(input, &mut outputs, &ctx, &$handler).await?;
29
30        Ok(())
31      }
32    }
33  };
34}
35
36/// Operation helper for common binary operations that have one output.
37pub async fn simple<'c, INPUT, OUTPUT, CONTEXT, OUTPORT, F, E>(
38  input: BoxStream<VPacket<INPUT>>,
39  outputs: &mut OUTPORT,
40  ctx: &'c CONTEXT,
41  func: &'static F,
42) -> Result<(), E>
43where
44  CONTEXT: Clone + wasmrs_runtime::ConditionallySendSync,
45  F: Fn(INPUT, CONTEXT) -> BoxFuture<Result<OUTPUT, E>> + wasmrs_runtime::ConditionallySendSync,
46  OUTPORT: SingleOutput + wasmrs_runtime::ConditionallySendSync,
47  INPUT: serde::de::DeserializeOwned + Clone + wasmrs_runtime::ConditionallySendSync,
48  OUTPUT: serde::Serialize + wasmrs_runtime::ConditionallySendSync,
49  E: std::fmt::Display + wasmrs_runtime::ConditionallySendSync,
50{
51  let _ = inner::<INPUT, OUTPUT, CONTEXT, OUTPORT, F, E>(input, outputs, ctx, func).await;
52  outputs.single_output().done();
53
54  Ok(())
55}
56
57#[cfg_attr(not(target_family = "wasm"), async_recursion::async_recursion)]
58#[cfg_attr(target_family = "wasm", async_recursion::async_recursion(?Send))]
59async fn inner<'out, 'c, INPUT, OUTPUT, CONTEXT, OUTPORT, F, E>(
60  mut input_stream: BoxStream<VPacket<INPUT>>,
61  outputs: &'out mut OUTPORT,
62  ctx: &'c CONTEXT,
63  func: &'static F,
64) -> BoxStream<VPacket<INPUT>>
65where
66  CONTEXT: Clone + wasmrs_runtime::ConditionallySendSync,
67  F: Fn(INPUT, CONTEXT) -> BoxFuture<Result<OUTPUT, E>> + wasmrs_runtime::ConditionallySendSync,
68  OUTPORT: SingleOutput + wasmrs_runtime::ConditionallySendSync,
69  INPUT: serde::de::DeserializeOwned + Clone + wasmrs_runtime::ConditionallySendSync,
70  OUTPUT: serde::Serialize + wasmrs_runtime::ConditionallySendSync,
71  E: std::fmt::Display + wasmrs_runtime::ConditionallySendSync,
72{
73  loop {
74    let Some(input) = input_stream.next().await else { break };
75
76    if input.is_open_bracket() {
77      outputs.broadcast_open();
78      input_stream = inner(input_stream, outputs, ctx, func).await;
79      outputs.broadcast_close();
80    } else if input.is_close_bracket() || input.is_done() {
81      break;
82    } else {
83      let input: INPUT = propagate_if_error!(input.decode(), outputs, continue);
84      outputs
85        .single_output()
86        .send_raw_payload(encode(func(input.clone(), ctx.clone()).await));
87    }
88  }
89
90  input_stream
91}