1#[macro_export]
14macro_rules! wick_import {
15 () => {
16 include!(concat!(env!("OUT_DIR"), "/mod.rs"));
17 };
18}
19
20#[doc(hidden)]
21#[macro_export]
22macro_rules! handle_port {
23 ($packet:ident, $tx:ident, $port:expr, $ty:ty) => {{
24 use $crate::wasmrs_rx::Observer;
25 use $crate::wick_packet::PacketExt;
26 if $packet.is_done() {
27 $tx.complete();
28 } else {
29 let _ = $tx.send($packet);
30 }
31 }};
32}
33
34#[doc(hidden)]
35#[macro_export]
36macro_rules! stream_senders {
37 ($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
38 $crate::paste::paste! {
39 struct $name {
40 $(
41 [<$port:snake>]: $crate::wasmrs_rx::FluxChannel<$crate::wick_packet::Packet,$error>
42 ),*
43 }
44 impl $name {
45 #[allow(clippy::missing_const_for_fn,unreachable_pub,unused,unused_parens)]
46 pub fn receivers(&self) -> Option<($(
47 $crate::BoxStream<$crate::wick_packet::VPacket<$($ty)*>>
48 ),*)> {
49 Some((
50 $(Box::pin(self.[<$port:snake>].take_rx().ok()?.map($crate::wick_packet::VPacket::from_result))),*
51 ))
52 }
53 }
54 impl Default for $name {
55 fn default() -> Self {
56 $crate::paste::paste! {
57 Self {
58 $(
59 [<$port:snake>]: $crate::wasmrs_rx::FluxChannel::new()
60 ),*
61 }
62 }
63 }
64 }
65 }
66 };
67}
68
69#[doc(hidden)]
70#[macro_export]
71macro_rules! stream_receivers {
72 ($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
73 $crate::paste::paste! {
74 struct $name {
75 $(
76 [<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error>
77 ),*
78 }
79 impl $name {
80 fn new(
81 $(
82 [<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error>
83 ),*
84 ) -> Self {
85 $crate::paste::paste! {
86 Self {
87 $(
88 [<$port:snake>]
89 ),*
90 }
91 }
92 }
93 }
94 }
95 };
96}
97
98#[doc(hidden)]
99#[macro_export]
100macro_rules! payload_fan_out {
101 (@handle_packet $payload: ident, $sender:ident, $config:ty) => {
102 {
103 let packet: $crate::wick_packet::Packet = $payload.into();
104
105 if let Some(config_tx) = $sender.take() {
106 if let Some(context) = packet.context() {
107 let config: Result<$crate::wick_packet::ContextTransport<$config>, _> = $crate::wasmrs_codec::messagepack::deserialize(&context).map_err(|e|format!("Cound not deserialize context: {}", e));
108 let _ = config_tx.send(config.map($crate::flow_component::Context::from));
109 } else {
110 }
112 }
113 packet
114 }
115 };
116 (@handle_packet $payload: ident) => {
117 {
118 let packet: $crate::wick_packet::Packet = $payload.into();
119 packet
120 }
121 };
122
123 (@route_packet $packet:ident, $channels:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
124 match $crate::wick_packet::PacketExt::port(&$packet) {
125 $(
126 $port => {
127 let tx = &$crate::paste::paste! { $channels.[<$port:snake>] };
128 $crate::handle_port!($packet, tx, $port, $($ty)*)
129 }
130 ),*
131 $crate::wick_packet::Packet::FATAL_ERROR =>
132 {
133 #[allow(unused)]
134 {
135 use $crate::wasmrs_rx::Observer;
136 let error = $packet.unwrap_err();
137 $crate::paste::paste! {
138 $(
139 $channels.[<$port:snake>].send_result(Err($crate::anyhow::anyhow!(error.clone()))).unwrap();
140 )*
141 }
142 }
143 }
144 _ => {
145 }
147 }
148 };
149 ($stream:expr, $error:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
150 {
151 $crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]);
152 #[allow(unused)]
153
154 let channels = Channels::default();
155
156 let output_streams = channels.receivers().unwrap();
157
158 $crate::runtime::spawn("payload_fan_out", async move {
159 #[allow(unused)]
160 use $crate::StreamExt;
161 loop {
162 if let Some(Ok(payload)) = $stream.next().await {
163 let packet = $crate::payload_fan_out!(@handle_packet payload);
164 $crate::payload_fan_out!(@route_packet packet, channels, [ $(($port, $($ty)+)),* ]);
165 } else {
166 break;
167 }
168 }
169
170 });
171 output_streams
172 }
173
174 };
175 ($stream:expr, $error:ty, $config:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
176 {
177 $crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]);
178 #[allow(unused)]
179 let channels = Channels::default();
180
181 let (config_tx,config_rx) = $crate::runtime::oneshot();
182 let mut config_tx = Some(config_tx);
183 let config_mono = Box::pin(async move {config_rx.await.unwrap()});
184 let output_streams = (config_mono, channels.receivers().unwrap());
185
186 $crate::runtime::spawn("payload_fan_out", async move {
187 #[allow(unused)]
188 use $crate::StreamExt;
189 loop {
190 if let Some(Ok(payload)) = $stream.next().await {
191 let packet = $crate::payload_fan_out!(@handle_packet payload, config_tx, $config);
192 $crate::payload_fan_out!(@route_packet packet, channels, [ $(($port, $($ty)+)),* ]);
193 } else {
194 break;
195 }
196 }
197 });
198
199 output_streams
200 }
201 };
202}
203
204#[doc(hidden)]
205#[macro_export]
206macro_rules! propagate_if_error_then {
207 (($($id:ident),*), $outputs:ident, $bail:expr) => {
208 ($(
209 match $id {
210 Ok(value) => value,
211 Err(err) => {
212 $outputs.broadcast_err(err.to_string());
213 $bail;
214 }
215 },
216 )*)
217 };
218 ($result:expr, $outputs:ident, $bail:expr) => {
219 match $result {
220 Ok(value) => value,
221 Err(err) => {
222 $outputs.broadcast_err(err.to_string());
223 $bail;
224 }
225 }
226 };
227}
228
229#[macro_export]
251macro_rules! propagate_if_error {
252 (($($id:ident),*), $outputs:ident, continue) => {
253 $crate::propagate_if_error_then!(($($id),*), $outputs, continue)
254 };
255 ($result:expr, $outputs:ident, continue) => {
256 $crate::propagate_if_error_then!($result, $outputs, continue)
257 };
258 ($result:expr,$outputs:ident, break) => {
259 $crate::propagate_if_error_then!($result, $outputs, break)
260 };
261 ($result:expr,$outputs:ident, return $rv:expr) => {
262 $crate::propagate_if_error_then!($result, $outputs, $rv)
263 };
264}
265
266#[doc(hidden)]
267#[macro_export]
268macro_rules! if_done_close_then {
269 ([$($id:ident),*], $do:expr) => {{
270 $(
271 if $id.is_done() || $id.is_close_bracket(){
272 $do;
273 }
274 )*
275 }};
276}
277
278#[allow(missing_docs)]
279#[macro_export]
280macro_rules! await_next_ok_or {
281 ($stream:ident, $outputs:ident, continue) => {{
282 let Some(next) = ($stream.next().await) else { break };
283 let packet = propagate_if_error!(next, $outputs, continue);
284 packet
285 }};
286 ($stream:ident, $outputs:ident, break) => {{
287 let Some(next) = ($stream.next().await) else { break };
288 let packet = propagate_if_error!(next, $outputs, break);
289 packet
290 }};
291}
292
293#[allow(missing_docs)]
294#[macro_export]
295macro_rules! make_substream_window {
296 ($outputs:ident, $block:block) => {{
297 $outputs.broadcast_open();
298 $block;
299 $outputs.broadcast_close();
300 }};
301}
302
303#[cfg(test)]
304mod test {
305 use anyhow::Result;
306 use tokio_stream::StreamExt;
307 use wick_packet::{packet_stream, InherentData};
308 #[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
309 struct Config {}
310
311 #[tokio::test]
312 async fn test_fan_out() -> Result<()> {
313 let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
314 stream.set_context(Default::default(), InherentData::unsafe_default());
315 let (_config, (mut foo_rx, mut bar_rx)) =
316 payload_fan_out!(stream, anyhow::Error, Config, [("foo", i32), ("bar", i32)]);
317 assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 1);
318 assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 2);
319 assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 3);
320 assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 4);
321 assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 5);
322 assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 6);
323
324 Ok(())
325 }
326
327 #[tokio::test]
328 async fn test_fan_out_no_config() -> Result<()> {
329 let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
330 stream.set_context(Default::default(), InherentData::unsafe_default());
331 let (mut foo_rx, mut bar_rx) = payload_fan_out!(stream, anyhow::Error, [("foo", i32), ("bar", i32)]);
332 assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 1);
333 assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 2);
334 assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 3);
335 assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 4);
336 assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 5);
337 assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 6);
338
339 Ok(())
340 }
341
342 #[tokio::test]
343 async fn test_fan_out_no_fields() -> Result<()> {
344 let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
345 stream.set_context(Default::default(), InherentData::unsafe_default());
346 let _config = payload_fan_out!(stream, anyhow::Error, Config, []);
347
348 Ok(())
349 }
350}