#[macro_export]
macro_rules! wick_import {
() => {
include!(concat!(env!("OUT_DIR"), "/mod.rs"));
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! handle_port {
($packet:ident, $tx:ident, $port:expr, $ty:ty) => {{
use $crate::wasmrs_rx::Observer;
use $crate::wick_packet::PacketExt;
if $packet.is_done() {
$tx.complete();
} else {
let _ = $tx.send($packet);
}
}};
}
#[doc(hidden)]
#[macro_export]
macro_rules! stream_senders {
($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
$crate::paste::paste! {
struct $name {
$(
[<$port:snake>]: $crate::wasmrs_rx::FluxChannel<$crate::wick_packet::Packet,$error>
),*
}
impl $name {
#[allow(clippy::missing_const_for_fn,unreachable_pub,unused,unused_parens)]
pub fn receivers(&self) -> Option<($(
$crate::BoxStream<$crate::wick_packet::VPacket<$($ty)*>>
),*)> {
Some((
$(Box::pin(self.[<$port:snake>].take_rx().ok()?.map($crate::wick_packet::VPacket::from_result))),*
))
}
}
impl Default for $name {
fn default() -> Self {
$crate::paste::paste! {
Self {
$(
[<$port:snake>]: $crate::wasmrs_rx::FluxChannel::new()
),*
}
}
}
}
}
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! stream_receivers {
($error:ty, $name:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
$crate::paste::paste! {
struct $name {
$(
[<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error>
),*
}
impl $name {
fn new(
$(
[<$port:snake>]: $crate::wasmrs_rx::FluxReceiver<$($ty)*,$error>
),*
) -> Self {
$crate::paste::paste! {
Self {
$(
[<$port:snake>]
),*
}
}
}
}
}
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! payload_fan_out {
(@handle_packet $payload: ident, $sender:ident, $config:ty) => {
{
let packet: $crate::wick_packet::Packet = $payload.into();
if let Some(config_tx) = $sender.take() {
if let Some(context) = packet.context() {
let config: Result<$crate::wick_packet::ContextTransport<$config>, _> = $crate::wasmrs_codec::messagepack::deserialize(&context).map_err(|e|format!("Cound not deserialize context: {}", e));
let _ = config_tx.send(config.map($crate::flow_component::Context::from));
} else {
}
}
packet
}
};
(@handle_packet $payload: ident) => {
{
let packet: $crate::wick_packet::Packet = $payload.into();
packet
}
};
(@route_packet $packet:ident, $channels:ident, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
match $crate::wick_packet::PacketExt::port(&$packet) {
$(
$port => {
let tx = &$crate::paste::paste! { $channels.[<$port:snake>] };
$crate::handle_port!($packet, tx, $port, $($ty)*)
}
),*
$crate::wick_packet::Packet::FATAL_ERROR =>
{
#[allow(unused)]
{
use $crate::wasmrs_rx::Observer;
let error = $packet.unwrap_err();
$crate::paste::paste! {
$(
$channels.[<$port:snake>].send_result(Err($crate::anyhow::anyhow!(error.clone()))).unwrap();
)*
}
}
}
_ => {
}
}
};
($stream:expr, $error:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
{
$crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]);
#[allow(unused)]
let channels = Channels::default();
let output_streams = channels.receivers().unwrap();
$crate::runtime::spawn("payload_fan_out", async move {
#[allow(unused)]
use $crate::StreamExt;
loop {
if let Some(Ok(payload)) = $stream.next().await {
let packet = $crate::payload_fan_out!(@handle_packet payload);
$crate::payload_fan_out!(@route_packet packet, channels, [ $(($port, $($ty)+)),* ]);
} else {
break;
}
}
});
output_streams
}
};
($stream:expr, $error:ty, $config:ty, [ $(($port:expr, $($ty:tt)+)),* $(,)? ]) => {
{
$crate::stream_senders!($error, Channels, [ $(($port, $($ty)+)),* ]);
#[allow(unused)]
let channels = Channels::default();
let (config_tx,config_rx) = $crate::runtime::oneshot();
let mut config_tx = Some(config_tx);
let config_mono = Box::pin(async move {config_rx.await.unwrap()});
let output_streams = (config_mono, channels.receivers().unwrap());
$crate::runtime::spawn("payload_fan_out", async move {
#[allow(unused)]
use $crate::StreamExt;
loop {
if let Some(Ok(payload)) = $stream.next().await {
let packet = $crate::payload_fan_out!(@handle_packet payload, config_tx, $config);
$crate::payload_fan_out!(@route_packet packet, channels, [ $(($port, $($ty)+)),* ]);
} else {
break;
}
}
});
output_streams
}
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! propagate_if_error_then {
(($($id:ident),*), $outputs:ident, $bail:expr) => {
($(
match $id {
Ok(value) => value,
Err(err) => {
$outputs.broadcast_err(err.to_string());
$bail;
}
},
)*)
};
($result:expr, $outputs:ident, $bail:expr) => {
match $result {
Ok(value) => value,
Err(err) => {
$outputs.broadcast_err(err.to_string());
$bail;
}
}
};
}
#[macro_export]
macro_rules! propagate_if_error {
(($($id:ident),*), $outputs:ident, continue) => {
$crate::propagate_if_error_then!(($($id),*), $outputs, continue)
};
($result:expr, $outputs:ident, continue) => {
$crate::propagate_if_error_then!($result, $outputs, continue)
};
($result:expr,$outputs:ident, break) => {
$crate::propagate_if_error_then!($result, $outputs, break)
};
($result:expr,$outputs:ident, return $rv:expr) => {
$crate::propagate_if_error_then!($result, $outputs, $rv)
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! if_done_close_then {
([$($id:ident),*], $do:expr) => {{
$(
if $id.is_done() || $id.is_close_bracket(){
$do;
}
)*
}};
}
#[allow(missing_docs)]
#[macro_export]
macro_rules! await_next_ok_or {
($stream:ident, $outputs:ident, continue) => {{
let Some(next) = ($stream.next().await) else { break };
let packet = propagate_if_error!(next, $outputs, continue);
packet
}};
($stream:ident, $outputs:ident, break) => {{
let Some(next) = ($stream.next().await) else { break };
let packet = propagate_if_error!(next, $outputs, break);
packet
}};
}
#[allow(missing_docs)]
#[macro_export]
macro_rules! make_substream_window {
($outputs:ident, $block:block) => {{
$outputs.broadcast_open();
$block;
$outputs.broadcast_close();
}};
}
#[cfg(test)]
mod test {
use anyhow::Result;
use tokio_stream::StreamExt;
use wick_packet::{packet_stream, InherentData};
#[derive(Debug, Default, Clone, serde::Deserialize, serde::Serialize)]
struct Config {}
#[tokio::test]
async fn test_fan_out() -> Result<()> {
let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
stream.set_context(Default::default(), InherentData::unsafe_default());
let (_config, (mut foo_rx, mut bar_rx)) =
payload_fan_out!(stream, anyhow::Error, Config, [("foo", i32), ("bar", i32)]);
assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 1);
assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 2);
assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 3);
assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 4);
assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 5);
assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 6);
Ok(())
}
#[tokio::test]
async fn test_fan_out_no_config() -> Result<()> {
let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
stream.set_context(Default::default(), InherentData::unsafe_default());
let (mut foo_rx, mut bar_rx) = payload_fan_out!(stream, anyhow::Error, [("foo", i32), ("bar", i32)]);
assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 1);
assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 2);
assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 3);
assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 4);
assert_eq!(foo_rx.next().await.unwrap().decode().unwrap(), 5);
assert_eq!(bar_rx.next().await.unwrap().decode().unwrap(), 6);
Ok(())
}
#[tokio::test]
async fn test_fan_out_no_fields() -> Result<()> {
let mut stream = packet_stream!(("foo", 1), ("bar", 2), ("foo", 3), ("bar", 4), ("foo", 5), ("bar", 6));
stream.set_context(Default::default(), InherentData::unsafe_default());
let _config = payload_fan_out!(stream, anyhow::Error, Config, []);
Ok(())
}
}