use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::num::ParseIntError;
use std::time::Duration;
use bytes::{Bytes, BytesMut};
use futures::stream::Stream as FuturesStream;
use proc_macro2::Span;
use quote::quote;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use slotmap::{Key, new_key_type};
use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
use stageleft::{QuotedWithContext, q, quote_type};
use syn::parse_quote;
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
use crate::compile::ir::{
ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
};
use crate::forward_handle::ForwardRef;
#[cfg(stageleft_runtime)]
use crate::forward_handle::{CycleCollection, ForwardHandle};
use crate::live_collections::boundedness::{Bounded, Unbounded};
use crate::live_collections::keyed_stream::KeyedStream;
use crate::live_collections::singleton::Singleton;
use crate::live_collections::stream::{
ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
};
use crate::location::dynamic::LocationId;
use crate::location::external_process::{
ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
};
use crate::nondet::NonDet;
#[cfg(feature = "sim")]
use crate::sim::SimSender;
use crate::staging_util::get_this_crate;
pub mod dynamic;
pub mod external_process;
pub use external_process::External;
pub mod process;
pub use process::Process;
pub mod cluster;
pub use cluster::Cluster;
pub mod member_id;
pub use member_id::{MemberId, TaglessMemberId};
pub mod tick;
pub use tick::{Atomic, NoTick, Tick};
#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
pub enum MembershipEvent {
Joined,
Left,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum NetworkHint {
Auto,
TcpPort(Option<u16>),
}
pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
}
#[stageleft::export(LocationKey)]
new_key_type! {
pub struct LocationKey;
}
impl std::fmt::Display for LocationKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "loc{:?}", self.data()) }
}
impl std::str::FromStr for LocationKey {
type Err = Option<ParseIntError>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let nvn = s.strip_prefix("loc").ok_or(None)?;
let (idx, ver) = nvn.split_once("v").ok_or(None)?;
let idx: u64 = idx.parse()?;
let ver: u64 = ver.parse()?;
Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
}
}
impl LocationKey {
pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001));
#[cfg(test)]
pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001));
#[cfg(test)]
pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); }
impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
type O = LocationKey;
fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
where
Self: Sized,
{
let root = get_this_crate();
let n = Key::data(&self).as_ffi();
(
QuoteTokens {
prelude: None,
expr: Some(quote! {
#root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
}),
},
(),
)
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
pub enum LocationType {
Process,
Cluster,
External,
}
#[expect(
private_bounds,
reason = "only internal Hydro code can define location types"
)]
pub trait Location<'a>: dynamic::DynLocation {
type Root: Location<'a>;
fn root(&self) -> Self::Root;
fn try_tick(&self) -> Option<Tick<Self>> {
if Self::is_top_level() {
let id = self.flow_state().borrow_mut().next_clock_id();
Some(Tick {
id,
l: self.clone(),
})
} else {
None
}
}
fn id(&self) -> LocationId {
dynamic::DynLocation::id(self)
}
fn tick(&self) -> Tick<Self>
where
Self: NoTick,
{
let id = self.flow_state().borrow_mut().next_clock_id();
Tick {
id,
l: self.clone(),
}
}
fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
where
Self: Sized + NoTick,
{
Stream::new(
self.clone(),
HydroNode::Source {
source: HydroSource::Spin(),
metadata: self.new_node_metadata(Stream::<
(),
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
)
}
fn source_stream<T, E>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
where
E: FuturesStream<Item = T> + Unpin,
Self: Sized + NoTick,
{
let e = e.splice_untyped_ctx(self);
Stream::new(
self.clone(),
HydroNode::Source {
source: HydroSource::Stream(e.into()),
metadata: self.new_node_metadata(Stream::<
T,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
)
}
fn source_iter<T, E>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
where
E: IntoIterator<Item = T>,
Self: Sized,
{
let e = e.splice_typed_ctx(self);
Stream::new(
self.clone(),
HydroNode::Source {
source: HydroSource::Iter(e.into()),
metadata: self.new_node_metadata(
Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
),
},
)
}
fn source_cluster_members<C: 'a>(
&self,
cluster: &Cluster<'a, C>,
) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
where
Self: Sized + NoTick,
{
Stream::new(
self.clone(),
HydroNode::Source {
source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
metadata: self.new_node_metadata(Stream::<
(TaglessMemberId, MembershipEvent),
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
)
.map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
.into_keyed()
}
fn source_external_bytes<L>(
&self,
from: &External<L>,
) -> (
ExternalBytesPort,
Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
)
where
Self: Sized + NoTick,
{
let (port, stream, sink) =
self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
sink.complete(self.source_iter(q!([])));
(port, stream)
}
#[expect(clippy::type_complexity, reason = "stream markers")]
fn source_external_bincode<L, T, O: Ordering, R: Retries>(
&self,
from: &External<L>,
) -> (
ExternalBincodeSink<T, NotMany, O, R>,
Stream<T, Self, Unbounded, O, R>,
)
where
Self: Sized + NoTick,
T: Serialize + DeserializeOwned,
{
let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
sink.complete(self.source_iter(q!([])));
(
ExternalBincodeSink {
process_key: from.key,
port_id: port.port_id,
_phantom: PhantomData,
},
stream.weaken_ordering().weaken_retries(),
)
}
#[cfg(feature = "sim")]
#[expect(clippy::type_complexity, reason = "stream markers")]
fn sim_input<T, O: Ordering, R: Retries>(
&self,
) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
where
Self: Sized + NoTick,
T: Serialize + DeserializeOwned,
{
let external_location: External<'a, ()> = External {
key: LocationKey::FIRST,
flow_state: self.flow_state().clone(),
_phantom: PhantomData,
};
let (external, stream) = self.source_external_bincode(&external_location);
(SimSender(external.port_id, PhantomData), stream)
}
fn embedded_input<T>(
&self,
name: impl Into<String>,
) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
where
Self: Sized + NoTick,
{
let ident = syn::Ident::new(&name.into(), Span::call_site());
Stream::new(
self.clone(),
HydroNode::Source {
source: HydroSource::Embedded(ident),
metadata: self.new_node_metadata(Stream::<
T,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
)
}
fn embedded_singleton_input<T>(&self, name: impl Into<String>) -> Singleton<T, Self, Bounded>
where
Self: Sized + NoTick,
{
let ident = syn::Ident::new(&name.into(), Span::call_site());
Singleton::new(
self.clone(),
HydroNode::Source {
source: HydroSource::EmbeddedSingleton(ident),
metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
},
)
}
#[expect(clippy::type_complexity, reason = "stream markers")]
fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
&self,
from: &External<L>,
port_hint: NetworkHint,
) -> (
ExternalBytesPort<NotMany>,
Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
)
where
Self: Sized + NoTick,
{
let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
let (fwd_ref, to_sink) =
self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
let mut flow_state_borrow = self.flow_state().borrow_mut();
flow_state_borrow.push_root(HydroRoot::SendExternal {
to_external_key: from.key,
to_port_id: next_external_port_id,
to_many: false,
unpaired: false,
serialize_fn: None,
instantiate_fn: DebugInstantiate::Building,
input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
op_metadata: HydroIrOpMetadata::new(),
});
let raw_stream: Stream<
Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
> = Stream::new(
self.clone(),
HydroNode::ExternalInput {
from_external_key: from.key,
from_port_id: next_external_port_id,
from_many: false,
codec_type: quote_type::<Codec>().into(),
port_hint,
instantiate_fn: DebugInstantiate::Building,
deserialize_fn: None,
metadata: self.new_node_metadata(Stream::<
Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
);
(
ExternalBytesPort {
process_key: from.key,
port_id: next_external_port_id,
_phantom: PhantomData,
},
raw_stream.flatten_ordered(),
fwd_ref,
)
}
#[expect(clippy::type_complexity, reason = "stream markers")]
fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
&self,
from: &External<L>,
) -> (
ExternalBincodeBidi<InT, OutT, NotMany>,
Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
)
where
Self: Sized + NoTick,
{
let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
let (fwd_ref, to_sink) =
self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
let mut flow_state_borrow = self.flow_state().borrow_mut();
let root = get_this_crate();
let out_t_type = quote_type::<OutT>();
let ser_fn: syn::Expr = syn::parse_quote! {
#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
|b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
)
};
flow_state_borrow.push_root(HydroRoot::SendExternal {
to_external_key: from.key,
to_port_id: next_external_port_id,
to_many: false,
unpaired: false,
serialize_fn: Some(ser_fn.into()),
instantiate_fn: DebugInstantiate::Building,
input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
op_metadata: HydroIrOpMetadata::new(),
});
let in_t_type = quote_type::<InT>();
let deser_fn: syn::Expr = syn::parse_quote! {
|res| {
let b = res.unwrap();
#root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
}
};
let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
self.clone(),
HydroNode::ExternalInput {
from_external_key: from.key,
from_port_id: next_external_port_id,
from_many: false,
codec_type: quote_type::<LengthDelimitedCodec>().into(),
port_hint: NetworkHint::Auto,
instantiate_fn: DebugInstantiate::Building,
deserialize_fn: Some(deser_fn.into()),
metadata: self.new_node_metadata(Stream::<
InT,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
);
(
ExternalBincodeBidi {
process_key: from.key,
port_id: next_external_port_id,
_phantom: PhantomData,
},
raw_stream,
fwd_ref,
)
}
#[expect(clippy::type_complexity, reason = "stream markers")]
fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
&self,
from: &External<L>,
port_hint: NetworkHint,
) -> (
ExternalBytesPort<Many>,
KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
)
where
Self: Sized + NoTick,
{
let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
let (fwd_ref, to_sink) =
self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
let mut flow_state_borrow = self.flow_state().borrow_mut();
flow_state_borrow.push_root(HydroRoot::SendExternal {
to_external_key: from.key,
to_port_id: next_external_port_id,
to_many: true,
unpaired: false,
serialize_fn: None,
instantiate_fn: DebugInstantiate::Building,
input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
op_metadata: HydroIrOpMetadata::new(),
});
let raw_stream: Stream<
Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
> = Stream::new(
self.clone(),
HydroNode::ExternalInput {
from_external_key: from.key,
from_port_id: next_external_port_id,
from_many: true,
codec_type: quote_type::<Codec>().into(),
port_hint,
instantiate_fn: DebugInstantiate::Building,
deserialize_fn: None,
metadata: self.new_node_metadata(Stream::<
Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
);
let membership_stream_ident = syn::Ident::new(
&format!(
"__hydro_deploy_many_{}_{}_membership",
from.key, next_external_port_id
),
Span::call_site(),
);
let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
let raw_membership_stream: KeyedStream<
u64,
bool,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
> = KeyedStream::new(
self.clone(),
HydroNode::Source {
source: HydroSource::Stream(membership_stream_expr.into()),
metadata: self.new_node_metadata(KeyedStream::<
u64,
bool,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
);
(
ExternalBytesPort {
process_key: from.key,
port_id: next_external_port_id,
_phantom: PhantomData,
},
raw_stream
.flatten_ordered() .into_keyed(),
raw_membership_stream.map(q!(|join| {
if join {
MembershipEvent::Joined
} else {
MembershipEvent::Left
}
})),
fwd_ref,
)
}
#[expect(clippy::type_complexity, reason = "stream markers")]
fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
&self,
from: &External<L>,
) -> (
ExternalBincodeBidi<InT, OutT, Many>,
KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
)
where
Self: Sized + NoTick,
{
let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
let (fwd_ref, to_sink) =
self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
let mut flow_state_borrow = self.flow_state().borrow_mut();
let root = get_this_crate();
let out_t_type = quote_type::<OutT>();
let ser_fn: syn::Expr = syn::parse_quote! {
#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
|(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
)
};
flow_state_borrow.push_root(HydroRoot::SendExternal {
to_external_key: from.key,
to_port_id: next_external_port_id,
to_many: true,
unpaired: false,
serialize_fn: Some(ser_fn.into()),
instantiate_fn: DebugInstantiate::Building,
input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
op_metadata: HydroIrOpMetadata::new(),
});
let in_t_type = quote_type::<InT>();
let deser_fn: syn::Expr = syn::parse_quote! {
|res| {
let (id, b) = res.unwrap();
(id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
}
};
let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
KeyedStream::new(
self.clone(),
HydroNode::ExternalInput {
from_external_key: from.key,
from_port_id: next_external_port_id,
from_many: true,
codec_type: quote_type::<LengthDelimitedCodec>().into(),
port_hint: NetworkHint::Auto,
instantiate_fn: DebugInstantiate::Building,
deserialize_fn: Some(deser_fn.into()),
metadata: self.new_node_metadata(KeyedStream::<
u64,
InT,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
);
let membership_stream_ident = syn::Ident::new(
&format!(
"__hydro_deploy_many_{}_{}_membership",
from.key, next_external_port_id
),
Span::call_site(),
);
let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
let raw_membership_stream: KeyedStream<
u64,
bool,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
> = KeyedStream::new(
self.clone(),
HydroNode::Source {
source: HydroSource::Stream(membership_stream_expr.into()),
metadata: self.new_node_metadata(KeyedStream::<
u64,
bool,
Self,
Unbounded,
TotalOrder,
ExactlyOnce,
>::collection_kind()),
},
);
(
ExternalBincodeBidi {
process_key: from.key,
port_id: next_external_port_id,
_phantom: PhantomData,
},
raw_stream,
raw_membership_stream.map(q!(|join| {
if join {
MembershipEvent::Joined
} else {
MembershipEvent::Left
}
})),
fwd_ref,
)
}
fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
where
Self: Sized + NoTick,
{
let e = e.splice_untyped_ctx(self);
Singleton::new(
self.clone(),
HydroNode::SingletonSource {
value: e.into(),
first_tick_only: false,
metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
},
)
}
fn singleton_future<F>(
&self,
e: impl QuotedWithContext<'a, F, Self>,
) -> Singleton<F::Output, Self, Bounded>
where
F: Future,
Self: Sized + NoTick,
{
self.singleton(e).resolve_future_blocking()
}
fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
_nondet: NonDet,
) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
where
Self: Sized + NoTick,
{
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(interval)
)))
}
fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
_nondet: NonDet,
) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
where
Self: Sized + NoTick,
{
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
)))
}
fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
where
S: CycleCollection<'a, ForwardRef, Location = Self>,
{
let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
(
ForwardHandle::new(cycle_id, Location::id(self)),
S::create_source(cycle_id, self.clone()),
)
}
}
#[cfg(feature = "deploy")]
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use futures::{SinkExt, StreamExt};
use hydro_deploy::Deployment;
use stageleft::q;
use tokio_util::codec::LengthDelimitedCodec;
use crate::compile::builder::FlowBuilder;
use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
use crate::location::{Location, NetworkHint};
use crate::nondet::nondet;
#[tokio::test]
async fn top_level_singleton_replay_cardinality() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let external = flow.external::<()>();
let (in_port, input) =
node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
let singleton = node.singleton(q!(123));
let tick = node.tick();
let out = input
.batch(&tick, nondet!())
.cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
.cross_singleton(
singleton
.snapshot(&tick, nondet!())
.into_stream()
.count(),
)
.all_ticks()
.send_bincode_external(&external);
let nodes = flow
.with_process(&node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let mut external_in = nodes.connect(in_port).await;
let mut external_out = nodes.connect(out).await;
deployment.start().await.unwrap();
external_in.send(1).await.unwrap();
assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
external_in.send(2).await.unwrap();
assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
}
#[tokio::test]
async fn tick_singleton_replay_cardinality() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let node = flow.process::<()>();
let external = flow.external::<()>();
let (in_port, input) =
node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
let tick = node.tick();
let singleton = tick.singleton(q!(123));
let out = input
.batch(&tick, nondet!())
.cross_singleton(singleton.clone())
.cross_singleton(singleton.into_stream().count())
.all_ticks()
.send_bincode_external(&external);
let nodes = flow
.with_process(&node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let mut external_in = nodes.connect(in_port).await;
let mut external_out = nodes.connect(out).await;
deployment.start().await.unwrap();
external_in.send(1).await.unwrap();
assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
external_in.send(2).await.unwrap();
assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
}
#[tokio::test]
async fn external_bytes() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let first_node = flow.process::<()>();
let external = flow.external::<()>();
let (in_port, input) = first_node.source_external_bytes(&external);
let out = input.send_bincode_external(&external);
let nodes = flow
.with_process(&first_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let mut external_in = nodes.connect(in_port).await.1;
let mut external_out = nodes.connect(out).await;
deployment.start().await.unwrap();
external_in.send(vec![1, 2, 3].into()).await.unwrap();
assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
}
#[tokio::test]
async fn multi_external_source() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let first_node = flow.process::<()>();
let external = flow.external::<()>();
let (in_port, input, _membership, complete_sink) =
first_node.bidi_external_many_bincode(&external);
let out = input.entries().send_bincode_external(&external);
complete_sink.complete(
first_node
.source_iter::<(u64, ()), _>(q!([]))
.into_keyed()
.weaken_ordering(),
);
let nodes = flow
.with_process(&first_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
let external_out = nodes.connect(out).await;
deployment.start().await.unwrap();
external_in_1.send(123).await.unwrap();
external_in_2.send(456).await.unwrap();
assert_eq!(
external_out.take(2).collect::<HashSet<_>>().await,
vec![(0, 123), (1, 456)].into_iter().collect()
);
}
#[tokio::test]
async fn second_connection_only_multi_source() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let first_node = flow.process::<()>();
let external = flow.external::<()>();
let (in_port, input, _membership, complete_sink) =
first_node.bidi_external_many_bincode(&external);
let out = input.entries().send_bincode_external(&external);
complete_sink.complete(
first_node
.source_iter::<(u64, ()), _>(q!([]))
.into_keyed()
.weaken_ordering(),
);
let nodes = flow
.with_process(&first_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
let mut external_out = nodes.connect(out).await;
deployment.start().await.unwrap();
external_in_2.send(456).await.unwrap();
assert_eq!(external_out.next().await.unwrap(), (1, 456));
}
#[tokio::test]
async fn multi_external_bytes() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let first_node = flow.process::<()>();
let external = flow.external::<()>();
let (in_port, input, _membership, complete_sink) = first_node
.bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
let out = input.entries().send_bincode_external(&external);
complete_sink.complete(
first_node
.source_iter(q!([]))
.into_keyed()
.weaken_ordering(),
);
let nodes = flow
.with_process(&first_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
let mut external_in_2 = nodes.connect(in_port).await.1;
let external_out = nodes.connect(out).await;
deployment.start().await.unwrap();
external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
external_in_2.send(vec![4, 5].into()).await.unwrap();
assert_eq!(
external_out.take(2).collect::<HashSet<_>>().await,
vec![
(0, (&[1u8, 2, 3] as &[u8]).into()),
(1, (&[4u8, 5] as &[u8]).into())
]
.into_iter()
.collect()
);
}
#[tokio::test]
async fn single_client_external_bytes() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let first_node = flow.process::<()>();
let external = flow.external::<()>();
let (port, input, complete_sink) = first_node
.bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
complete_sink.complete(input.map(q!(|data| {
let mut resp: Vec<u8> = data.into();
resp.push(42);
resp.into() })));
let nodes = flow
.with_process(&first_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
deployment.start().await.unwrap();
let (mut external_out, mut external_in) = nodes.connect(port).await;
external_in.send(vec![1, 2, 3].into()).await.unwrap();
assert_eq!(
external_out.next().await.unwrap().unwrap(),
vec![1, 2, 3, 42]
);
}
#[tokio::test]
async fn echo_external_bytes() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let first_node = flow.process::<()>();
let external = flow.external::<()>();
let (port, input, _membership, complete_sink) = first_node
.bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
complete_sink
.complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
let nodes = flow
.with_process(&first_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
deployment.start().await.unwrap();
external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
external_in_2.send(vec![4, 5].into()).await.unwrap();
assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
}
#[tokio::test]
async fn echo_external_bincode() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
let first_node = flow.process::<()>();
let external = flow.external::<()>();
let (port, input, _membership, complete_sink) =
first_node.bidi_external_many_bincode(&external);
complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
let nodes = flow
.with_process(&first_node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
deployment.start().await.unwrap();
external_in_1.send("hi".to_owned()).await.unwrap();
external_in_2.send("hello".to_owned()).await.unwrap();
assert_eq!(external_out_1.next().await.unwrap(), "HI");
assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
}
#[tokio::test]
async fn closure_location_name() {
let mut deployment = Deployment::new();
let mut flow = FlowBuilder::new();
enum ClosureProcess {}
let node = flow.process::<ClosureProcess>();
let external = flow.external::<()>();
let (in_port, input) =
node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
let out = input.send_bincode_external(&external);
let nodes = flow
.with_process(&node, deployment.Localhost())
.with_external(&external, deployment.Localhost())
.deploy(&mut deployment);
deployment.deploy().await.unwrap();
let mut external_in = nodes.connect(in_port).await;
let mut external_out = nodes.connect(out).await;
deployment.start().await.unwrap();
external_in.send(42).await.unwrap();
assert_eq!(external_out.next().await.unwrap(), 42);
}
}