1#[macro_export]
2macro_rules! request_response {
3 ($name:ident, $handler:ident=> {
4 inputs: {$($ikey:ident => $ity:ty),* $(,)?},
5 output: $okey:expr,
6 }) => {
7 pub(crate) async fn $name( invocation: wick_packet::Invocation) -> Result<PacketStream, flow_component::ComponentError> {
8 let (_inv, mut stream) = invocation.split();
9 #[allow(unused_parens)]
10 let ($(mut $ikey),*) = fan_out!(stream, $(stringify!($ikey)),*);
11 let (tx, rx) = PacketStream::new_channels();
12 tokio::spawn(async move {
13 let error = loop {
14 #[allow(unused_parens)]
15 let ($($ikey),*) = ($($ikey.next().await),*);
16 #[allow(unused_parens)]
17 if let ($(Some(Ok($ikey))),*) = ($($ikey),*) {
18 $(if $ikey.is_done() {
19 break None;
20 })*
21 $(if $ikey.is_bracket() {
22 tx.send($ikey.set_port($okey))?;
23 continue;
24 })*
25 $(let $ikey = match $ikey.decode::<$ity>(){Ok(v)=>v,Err(e)=>break Some(e)};)*
26 let output = match $handler($($ikey,)*) { Ok(o)=>o, Err(e)=> break(Some(e))};
27 tx.send(Packet::encode($okey, output))?;
28 } else {
29 break None;
30 }
31 };
32 if let Some(error) = error {
33 tx.send(Packet::err($okey, error.to_string()))?;
34 }
35 tx.send(Packet::done($okey))?;
36 Ok::<_, wick_packet::Error>(())
37 });
38 Ok(rx.into())
39 }
40 };
41}