1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#[macro_export]
macro_rules! wick_import {
() => {
include!(concat!(env!("OUT_DIR"), "/mod.rs"));
};
}
#[macro_export]
macro_rules! operation {
() => {
#[cfg_attr(target_family = "wasm",async_trait::async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait(Send))]
};
}
#[macro_export]
macro_rules! payloads {
($(($port:expr, $value:expr)),*) => {
{
let mut msgs = std::vec::Vec::new();
let mut ports = std::collections::HashSet::new();
$(
ports.insert($port.to_owned());
let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new($port));
msgs.push(wasmrs::Payload::new_data(Some(md), Some(serialize(&output).unwrap().into())));
)*
for port in ports {
let md = wasmrs::Metadata::new_extra(0, $crate::WickMetadata::new_done($port));
msgs.push(wasmrs::Payload::new_data(Some(md), None));
}
msgs
}
};
}
#[macro_export]
macro_rules! payload_stream {
($(($port:expr, $value:expr)),*) => {{
let packets = $crate::packets!($(($port, $value)),*);
let (tx,rx) = wasmrs::FluxChannel::new_partss();
for p in packets {
tx.send(p).unwrap();
}
rx
}};
}
#[macro_export]
macro_rules! payload_fan_out {
($stream:expr, [ $(($port:expr, $ty:ty)),* $(,)? ]) => {
{
$crate::paste::paste! {
$(
#[allow(unused_parens)]
let ([<$port:snake _tx>],[<$port:snake _rx>]) = FluxChannel::new_parts();
)*
}
$crate::runtime::spawn(async move {
while let Some(Ok(mut payload)) = $stream.next().await {
let packet: Packet = payload.into();
match packet.extra.stream() {
$(
$port=>if packet.extra.is_done() {
$crate::paste::paste! {[<$port:snake _tx>].complete();}
} else {
let packet: Result<$ty,_> = packet.deserialize().map_err(|e|e.into());
$crate::paste::paste! {let _ = [<$port:snake _tx>].send_result(packet);}
},
)*
_ => panic!("Unexpected port: {}", packet.extra.stream())
}
}
});
$crate::paste::paste! {($([<$port:snake _rx>]),*)}
}
};
}