1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#[macro_export]
macro_rules! wick_import {
  () => {
    include!(concat!(env!("OUT_DIR"), "/mod.rs"));
  };
}

#[macro_export]
macro_rules! operation {
    () => {
      #[cfg_attr(target_family = "wasm",async_trait::async_trait(?Send))]
      #[cfg_attr(not(target_family = "wasm"), async_trait::async_trait(Send))]
    };
}

#[macro_export]
macro_rules! payloads {
    ($(($port:expr, $value:expr)),*) => {
      {
        let mut msgs = std::vec::Vec::new();
        let mut ports = std::collections::HashSet::new();
        $(
          ports.insert($port.to_owned());
          let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new($port));
          msgs.push(wasmrs::Payload::new_data(Some(md), Some(serialize(&output).unwrap().into())));
        )*
        for port in ports {
          let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new_done($port));
          msgs.push(wasmrs::Payload::new_data(Some(md), None));
        }
        msgs
      }
    };
}

#[macro_export]
macro_rules! payload_stream {
  ($(($port:expr, $value:expr)),*) => {{
    let packets = $crate::packets!($(($port, $value)),*);
    let (tx,rx) = wasmrs::FluxChannel::new_partss();
    for p in packets {
      tx.send(p).unwrap();
    }
    rx
  }};
}

#[macro_export]
macro_rules! payload_fan_out {
    ($stream:expr, [ $(($port:expr, $ty:ty)),* $(,)? ]) => {
      {
          $crate::paste::paste! {
            $(
              #[allow(unused_parens)]
              let ([<$port:snake _tx>],[<$port:snake _rx>]) = FluxChannel::new_parts();
            )*
          }
        $crate::runtime::spawn(async move {
          while let Some(Ok(mut payload)) = $stream.next().await {
            let packet: Packet = payload.into();
            match packet.extra.stream() {
              $(
                $port=>if packet.extra.is_done() {
                  $crate::paste::paste! {[<$port:snake _tx>].complete();}
                } else {
                  let packet: Result<$ty,_> = packet.deserialize().map_err(|e|e.into());
                  $crate::paste::paste! {let _ = [<$port:snake _tx>].send_result(packet);}
                },
              )*
              _ => panic!("Unexpected port: {}", packet.extra.stream())
            }
          }
        });
        $crate::paste::paste! {($([<$port:snake _rx>]),*)}
        }
    };
}