1#[macro_export]
2macro_rules! raw_packet_stream {
3 ($(($port:expr, $value:expr)),*) => {{
4 let packets = $crate::packets!($(($port, $value)),*);
5 let packets :$crate::PacketStream = packets.into();
6 $crate::packetstream_to_wasmrs(0,packets)
7 }};
8}
9
10#[macro_export]
11macro_rules! packet_stream {
12 ($(($port:expr, $value:expr)),*) => {{
13 let packets = $crate::packets!($(($port, $value)),*);
14 let packets :$crate::PacketStream = packets.into();
15 packets
16 }};
17}
18
19#[macro_export]
20macro_rules! packets {
21 ($(($port:expr, $value:expr)),*) => {
22 {
23 let mut msgs = std::vec::Vec::new();
24 let mut ports = std::collections::HashSet::new();
25 $(
26 ports.insert($port.to_owned());
27 msgs.push($crate::Packet::encode($port, $value));
28 )*
29 for port in ports {
30 msgs.push($crate::Packet::done(&port));
31 }
32 msgs
33 }
34 };
35}
36
37#[macro_export]
38macro_rules! fan_out {
39 ($stream:expr, $($port:expr),*) => {
40 {
41 use $crate::wasmrs_rx::Observer;
42 let mut streams = wick_packet::StreamMap::default();
43 let mut senders = std::collections::HashMap::new();
44 $(
45 senders.insert($port, streams.init($port));
46 )*
47 tokio::spawn(async move {
48 use $crate::PacketExt;
49 while let Some(Ok(payload)) = $stream.next().await {
50 let sender = senders.get_mut(payload.port()).unwrap();
51 if payload.is_done() {
52 sender.send(payload).unwrap();
53 sender.complete();
54 } else {
55 sender.send(payload).unwrap();
56 }
57 }
58 });
59 ($(streams.take($port).unwrap()),*)
60 }
61 };
62}