1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#[macro_export]
macro_rules! wick_import {
  () => {
    include!(concat!(env!("OUT_DIR"), "/mod.rs"));
  };
}

#[macro_export]
macro_rules! payloads {
    ($(($port:expr, $value:expr)),*) => {
      {
        let mut msgs = std::vec::Vec::new();
        let mut ports = std::collections::HashSet::new();
        $(
          ports.insert($port.to_owned());
          let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new($port));
          msgs.push(wasmrs::Payload::new_data(Some(md), Some(serialize(&output).unwrap().into())));
        )*
        for port in ports {
          let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new_done($port));
          msgs.push(wasmrs::Payload::new_data(Some(md), None));
        }
        msgs
      }
    };
}

#[macro_export]
macro_rules! payload_stream {
  ($(($port:expr, $value:expr)),*) => {{
    use $crate::wasmrs_rx::Observer;

    let packets = $crate::packet::packets!($(($port, $value)),*);
    let (tx,rx) = $crate::wasmrs_rx::FluxChannel::new_parts();
    for p in packets {
      tx.send(p).unwrap();
    }
    rx
  }};
}

#[macro_export]
macro_rules! handle_port {
  // ($packet:ident, $tx:ident, $subtx:ident, $port:expr, WickStream<$ty:ty> ) => {{
  //   use $crate::wasmrs_rx::Observer;
  //   if $packet.extra.is_done() {
  //     $tx.complete();
  //   } else {
  //     let packet: Result<$ty, _> = $packet.deserialize().map_err(|e| e.into());
  //     let _ = $tx.send_result(packet);
  //   }
  // }};
  (raw: true, $packet:ident, $tx:ident, $port:expr, $ty:ty) => {{
    use $crate::wasmrs_rx::Observer;
    if $packet.is_done() {
      $tx.complete();
    } else {
      // let packet: Result<$ty, _> = $packet.deserialize().map_err(|e| e.into());
      let _ = $tx.send($packet);
    }
  }};
  (raw: false, $packet:ident, $tx:ident, $port:expr, $ty:ty) => {{
    use $crate::wasmrs_rx::Observer;
    if $packet.is_done() {
      $tx.complete();
    } else {
      let packet: Result<$ty, _> = $packet.deserialize().map_err(|e| e.into());
      let _ = $tx.send_result(packet);
    }
  }};
}

#[macro_export]
macro_rules! payload_fan_out {
    ($stream:expr, raw:$raw:tt, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
      {
          $crate::paste::paste! {
            $(
              #[allow(unused_parens)]
              let ([<$port:snake _tx>],[<$port:snake _rx>]) = $crate::wasmrs_rx::FluxChannel::new_parts();
            )*
          }
        $crate::runtime::spawn(async move {

          use $crate::StreamExt;
          while let Some(Ok( payload)) = $stream.next().await {
            let packet: $crate::packet::Packet = payload.into();
            match packet.port() {
              $(
                $port=> {
                  let tx = &$crate::paste::paste! {[<$port:snake _tx>]};
                  $crate::handle_port!(raw: $raw, packet, tx, $port, $($ty)*)
                },
              )*
              _ => panic!("Unexpected port: {}", packet.port())
            }
          }
        });
        $crate::paste::paste! {($([<$port:snake _rx>]),*)}
        }
    };
}

#[cfg(test)]
mod test {
  use anyhow::Result;
  use futures::StreamExt;
  use wasmrs::PayloadError;
  use wasmrs_rx::FluxReceiver;
  use wick_packet::Packet;

  #[tokio::test]
  async fn test_basic() -> Result<()> {
    let mut stream: FluxReceiver<Packet, PayloadError> =
      payload_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
    let (mut foo_rx, mut bar_rx): (FluxReceiver<i32, anyhow::Error>, FluxReceiver<i32, anyhow::Error>) =
      payload_fan_out!(stream, raw: false, [("foo", i32), ("bar", i32)]);
    assert_eq!(foo_rx.next().await.unwrap().unwrap(), 1);
    assert_eq!(bar_rx.next().await.unwrap().unwrap(), 2);
    assert_eq!(foo_rx.next().await.unwrap().unwrap(), 3);
    assert_eq!(bar_rx.next().await.unwrap().unwrap(), 4);
    assert_eq!(foo_rx.next().await.unwrap().unwrap(), 5);
    assert_eq!(bar_rx.next().await.unwrap().unwrap(), 6);

    Ok(())
  }
}