wick_packet/
macros.rs

1#[macro_export]
2macro_rules! raw_packet_stream {
3  ($(($port:expr, $value:expr)),*) => {{
4    let packets = $crate::packets!($(($port, $value)),*);
5    let packets :$crate::PacketStream = packets.into();
6    $crate::packetstream_to_wasmrs(0,packets)
7  }};
8}
9
10#[macro_export]
11macro_rules! packet_stream {
12  ($(($port:expr, $value:expr)),*) => {{
13    let packets = $crate::packets!($(($port, $value)),*);
14    let packets :$crate::PacketStream = packets.into();
15    packets
16  }};
17}
18
19#[macro_export]
20macro_rules! packets {
21    ($(($port:expr, $value:expr)),*) => {
22      {
23        let mut msgs = std::vec::Vec::new();
24        let mut ports = std::collections::HashSet::new();
25        $(
26          ports.insert($port.to_owned());
27          msgs.push($crate::Packet::encode($port, $value));
28        )*
29        for port in ports {
30          msgs.push($crate::Packet::done(&port));
31        }
32        msgs
33      }
34    };
35}
36
37#[macro_export]
38macro_rules! fan_out {
39  ($stream:expr, $($port:expr),*) => {
40  {
41        use $crate::wasmrs_rx::Observer;
42        let mut streams = wick_packet::StreamMap::default();
43        let mut senders = std::collections::HashMap::new();
44        $(
45          senders.insert($port, streams.init($port));
46        )*
47        tokio::spawn(async move {
48            use $crate::PacketExt;
49            while let Some(Ok(payload)) = $stream.next().await {
50            let sender = senders.get_mut(payload.port()).unwrap();
51            if payload.is_done() {
52              sender.send(payload).unwrap();
53              sender.complete();
54            } else {
55              sender.send(payload).unwrap();
56            }
57          }
58        });
59        ($(streams.take($port).unwrap()),*)
60        }
61    };
62}