use crate::{
buffer::event::{BufferPool, BufferWriter, PoolInfo},
data::{ArconEvent, ArconEventWrapper, ArconMessage, ArconType, NodeID},
stream::channel::{strategy::send, Channel},
};
use kompact::prelude::{ComponentDefinition, SerError};
use std::sync::Arc;
#[allow(dead_code)]
pub struct Broadcast<A>
where
A: ArconType,
{
buffer_pool: BufferPool<ArconEventWrapper<A>>,
channels: Vec<Arc<Channel<A>>>,
curr_buffer: BufferWriter<ArconEventWrapper<A>>,
sender_id: NodeID,
pool_info: PoolInfo,
}
impl<A> Broadcast<A>
where
A: ArconType,
{
pub fn new(channels: Vec<Channel<A>>, sender_id: NodeID, pool_info: PoolInfo) -> Broadcast<A> {
assert!(
channels.len() > 1,
"Number of Channels must exceed 1 for a Broadcast strategy"
);
assert!(
channels.len() < pool_info.capacity,
"Strategy must be initialised with a pool capacity larger than amount of channels"
);
let mut buffer_pool: BufferPool<ArconEventWrapper<A>> = BufferPool::new(
pool_info.capacity,
pool_info.buffer_size,
pool_info.allocator.clone(),
)
.expect("failed to initialise BufferPool");
let curr_buffer = buffer_pool
.try_get()
.expect("failed to fetch initial buffer");
Broadcast {
buffer_pool,
channels: channels.into_iter().map(Arc::new).collect(),
curr_buffer,
sender_id,
pool_info,
}
}
#[inline]
fn push_event(&mut self, event: ArconEvent<A>) -> Option<ArconMessage<A>> {
self.curr_buffer.push(event.into()).map(|e| {
let msg = self.message();
self.curr_buffer.push(e);
msg
})
}
#[inline]
fn message(&mut self) -> ArconMessage<A> {
let reader = self.curr_buffer.reader();
let msg = ArconMessage {
events: reader,
sender: self.sender_id,
};
self.curr_buffer = self.buffer_pool.get();
msg
}
#[inline]
pub fn add(&mut self, event: ArconEvent<A>) -> Vec<(Arc<Channel<A>>, ArconMessage<A>)> {
match &event {
ArconEvent::Element(_) => self
.push_event(event)
.map(move |msg| {
self.channels
.iter()
.map(|c| (c.clone(), msg.clone()))
.collect()
})
.unwrap_or_else(Vec::new),
_ => match self.push_event(event) {
Some(msg) => {
let msg_two = self.message();
self.channels
.iter()
.map(|c| (c.clone(), msg.clone()))
.chain(self.channels.iter().map(|c| (c.clone(), msg_two.clone())))
.collect()
}
None => {
let msg = self.message();
self.channels
.iter()
.map(|c| (c.clone(), msg.clone()))
.collect()
}
},
}
}
#[inline]
pub fn flush<CD>(&mut self, source: &CD)
where
CD: ComponentDefinition + Sized + 'static,
{
for (i, channel) in self.channels.iter().enumerate() {
if i == self.channels.len() - 1 {
let reader = self.curr_buffer.reader();
let msg = ArconMessage {
events: reader,
sender: self.sender_id,
};
if let Err(SerError::BufferError(err)) = send(channel, msg, source) {
panic!("Buffer Error {}", err);
};
} else {
let mut writer = self.buffer_pool.get();
writer.copy_from_writer(&self.curr_buffer);
let msg = ArconMessage {
events: writer.reader(),
sender: self.sender_id,
};
if let Err(SerError::BufferError(err)) = send(channel, msg, source) {
panic!("Buffer Error {}", err);
};
}
}
self.curr_buffer = self.buffer_pool.get();
}
#[inline]
pub fn num_channels(&self) -> usize {
self.channels.len()
}
}
#[cfg(test)]
mod tests {
use super::{Channel, *};
use crate::{
application::Application,
data::{ArconElement, Watermark},
stream::{
channel::strategy::{send, tests::*, ChannelStrategy},
node::debug::DebugNode,
},
};
use kompact::prelude::*;
use std::sync::Arc;
#[test]
fn broadcast_local_test() {
let app = Application::default();
let pool_info = app.get_pool_info();
let system = app.data_system();
let components: u32 = 8;
let total_msgs: u64 = 10;
let mut channels: Vec<Channel<Input>> = Vec::new();
let mut comps: Vec<Arc<crate::prelude::Component<DebugNode<Input>>>> = Vec::new();
for _i in 0..components {
let comp = system.create(DebugNode::<Input>::new);
system.start(&comp);
let actor_ref: ActorRefStrong<ArconMessage<Input>> =
comp.actor_ref().hold().expect("failed to fetch");
channels.push(Channel::Local(actor_ref));
comps.push(comp);
}
let mut channel_strategy: ChannelStrategy<Input> =
ChannelStrategy::Broadcast(Broadcast::new(channels, NodeID::new(1), pool_info));
let comp = &comps[0];
comp.on_definition(|cd| {
for _i in 0..total_msgs {
let elem = ArconElement::new(Input { id: 1 });
for (channel, msg) in channel_strategy.push(ArconEvent::Element(elem)) {
send(&channel, msg, cd).unwrap();
}
}
for (channel, msg) in channel_strategy.push(ArconEvent::Watermark(Watermark::new(0))) {
send(&channel, msg, cd).unwrap();
}
});
std::thread::sleep(std::time::Duration::from_secs(1));
for comp in comps {
comp.on_definition(|cd| {
assert_eq!(cd.data.len() as u64, total_msgs);
});
}
app.shutdown();
}
}