1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#[macro_export]
macro_rules! packet_stream {
  ($(($port:expr, $value:expr)),*) => {{
    let packets = $crate::packets!($(($port, $value)),*);
    wick_packet::PacketStream::new(Box::new(futures::stream::iter(packets.into_iter().map(Ok))))
  }};
}

#[macro_export]
macro_rules! packets {
    ($(($port:expr, $value:expr)),*) => {
      {
        let mut msgs = std::vec::Vec::new();
        let mut ports = std::collections::HashSet::new();
        $(
          ports.insert($port.to_owned());
          msgs.push(wick_packet::Packet::encode($port, $value));
        )*
        for port in ports {
          msgs.push(wick_packet::Packet::done(&port));
        }
        msgs
      }
    };
}

#[macro_export]
macro_rules! fan_out {
    ($stream:expr, $($port:expr),*) => {
      {
        let mut streams = wick_packet::StreamMap::default();
        let mut senders = std::collections::HashMap::new();
        $(
          senders.insert($port, streams.init($port));
        )*
        tokio::spawn(async move {
            while let Some(Ok(payload)) = $stream.next().await {
            let sender = senders.get_mut(payload.port()).unwrap();
            if payload.is_done() {
              sender.complete();
              continue;
            }
            sender.send(payload).unwrap();
          }
        });
        ($(streams.take($port).unwrap()),*)
        }
    };
}