1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#[macro_export]
macro_rules! wick_import {
() => {
include!(concat!(env!("OUT_DIR"), "/mod.rs"));
};
}
#[macro_export]
macro_rules! payloads {
($(($port:expr, $value:expr)),*) => {
{
let mut msgs = std::vec::Vec::new();
let mut ports = std::collections::HashSet::new();
$(
ports.insert($port.to_owned());
let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new($port));
msgs.push(wasmrs::Payload::new_data(Some(md), Some(serialize(&output).unwrap().into())));
)*
for port in ports {
let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new_done($port));
msgs.push(wasmrs::Payload::new_data(Some(md), None));
}
msgs
}
};
}
#[macro_export]
macro_rules! payload_stream {
($(($port:expr, $value:expr)),*) => {{
use $crate::wasmrs_rx::Observer;
let packets = $crate::packet::packets!($(($port, $value)),*);
let (tx,rx) = $crate::wasmrs_rx::FluxChannel::new_parts();
for p in packets {
tx.send(p).unwrap();
}
rx
}};
}
#[macro_export]
macro_rules! handle_port {
(raw: true, $packet:ident, $tx:ident, $port:expr, $ty:ty) => {{
use $crate::wasmrs_rx::Observer;
if $packet.is_done() {
$tx.complete();
} else {
let _ = $tx.send($packet);
}
}};
(raw: false, $packet:ident, $tx:ident, $port:expr, $ty:ty) => {{
use $crate::wasmrs_rx::Observer;
if $packet.is_done() {
$tx.complete();
} else {
let packet: Result<$ty, _> = $packet.deserialize().map_err(|e| e.into());
let _ = $tx.send_result(packet);
}
}};
}
#[macro_export]
macro_rules! payload_fan_out {
($stream:expr, raw:$raw:tt, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
{
$crate::paste::paste! {
$(
#[allow(unused_parens)]
let ([<$port:snake _tx>],[<$port:snake _rx>]) = $crate::wasmrs_rx::FluxChannel::new_parts();
)*
}
$crate::runtime::spawn(async move {
use $crate::StreamExt;
while let Some(Ok( payload)) = $stream.next().await {
let packet: $crate::packet::Packet = payload.into();
match packet.port() {
$(
$port=> {
let tx = &$crate::paste::paste! {[<$port:snake _tx>]};
$crate::handle_port!(raw: $raw, packet, tx, $port, $($ty)*)
},
)*
_ => panic!("Unexpected port: {}", packet.port())
}
}
});
$crate::paste::paste! {($([<$port:snake _rx>]),*)}
}
};
}
#[cfg(test)]
mod test {
use anyhow::Result;
use futures::StreamExt;
use wasmrs::PayloadError;
use wasmrs_rx::FluxReceiver;
use wick_packet::Packet;
#[tokio::test]
async fn test_basic() -> Result<()> {
let mut stream: FluxReceiver<Packet, PayloadError> =
payload_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
let (mut foo_rx, mut bar_rx): (FluxReceiver<i32, anyhow::Error>, FluxReceiver<i32, anyhow::Error>) =
payload_fan_out!(stream, raw: false, [("foo", i32), ("bar", i32)]);
assert_eq!(foo_rx.next().await.unwrap().unwrap(), 1);
assert_eq!(bar_rx.next().await.unwrap().unwrap(), 2);
assert_eq!(foo_rx.next().await.unwrap().unwrap(), 3);
assert_eq!(bar_rx.next().await.unwrap().unwrap(), 4);
assert_eq!(foo_rx.next().await.unwrap().unwrap(), 5);
assert_eq!(bar_rx.next().await.unwrap().unwrap(), 6);
Ok(())
}
}