1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#[macro_export]
macro_rules! packet_stream {
($(($port:expr, $value:expr)),*) => {{
let packets = $crate::packets!($(($port, $value)),*);
wick_packet::PacketStream::new(Box::new(futures::stream::iter(packets.into_iter().map(Ok))))
}};
}
#[macro_export]
macro_rules! packets {
($(($port:expr, $value:expr)),*) => {
{
let mut msgs = std::vec::Vec::new();
let mut ports = std::collections::HashSet::new();
$(
ports.insert($port.to_owned());
msgs.push(wick_packet::Packet::encode($port, $value));
)*
for port in ports {
msgs.push(wick_packet::Packet::done(&port));
}
msgs
}
};
}
#[macro_export]
macro_rules! fan_out {
($stream:expr, $($port:expr),*) => {
{
let mut streams = wick_packet::StreamMap::default();
let mut senders = std::collections::HashMap::new();
$(
senders.insert($port, streams.init($port));
)*
tokio::spawn(async move {
while let Some(Ok(payload)) = $stream.next().await {
let sender = senders.get_mut(payload.port()).unwrap();
if payload.is_done() {
sender.complete();
continue;
}
sender.send(payload).unwrap();
}
});
($(streams.take($port).unwrap()),*)
}
};
}