use crate::{
data::{
flight_serde::{reliable_remote::ReliableSerde, FlightSerde},
ArconEvent, ArconMessage, ArconType,
},
stream::channel::Channel,
};
use kompact::prelude::{ComponentDefinition, SerError};
use std::sync::Arc;
#[allow(dead_code)]
pub mod broadcast;
pub mod forward;
pub mod keyed;
pub enum ChannelStrategy<A>
where
A: ArconType,
{
Forward(forward::Forward<A>),
#[allow(dead_code)]
Broadcast(broadcast::Broadcast<A>),
Keyed(keyed::Keyed<A>),
Console,
Mute,
}
impl<A> ChannelStrategy<A>
where
A: ArconType,
{
#[inline]
pub fn push(
&mut self,
event: ArconEvent<A>,
) -> impl IntoIterator<Item = (Arc<Channel<A>>, ArconMessage<A>)> {
match self {
ChannelStrategy::Forward(s) => s.add(event),
ChannelStrategy::Keyed(s) => s.add(event),
ChannelStrategy::Broadcast(s) => s.add(event),
ChannelStrategy::Console => {
println!("{:?}", event);
Vec::new()
}
ChannelStrategy::Mute => Vec::new(),
}
}
#[inline]
#[allow(dead_code)]
pub fn num_channels(&self) -> usize {
match self {
ChannelStrategy::Forward(_) => 1,
ChannelStrategy::Broadcast(s) => s.num_channels(),
ChannelStrategy::Keyed(s) => s.num_channels(),
ChannelStrategy::Console => 0,
ChannelStrategy::Mute => 0,
}
}
}
#[inline]
pub(crate) fn send<A: ArconType>(
channel: &Channel<A>,
message: ArconMessage<A>,
source: &impl ComponentDefinition,
) -> Result<(), SerError> {
match channel {
Channel::Local(actor_ref) => {
actor_ref.tell(message);
Ok(())
}
Channel::Remote(actor_path, FlightSerde::Reliable) => {
let reliable_msg = ReliableSerde(message.into());
actor_path.tell_serialised(reliable_msg, source)
}
}
}
#[cfg(test)]
pub mod tests {
#[derive(Arcon, prost::Message, Clone)]
#[arcon(reliable_ser_id = 13, version = 1)]
pub struct Input {
#[prost(uint32, tag = "1")]
pub id: u32,
}
}