use crate::{
buffer::event::{BufferPool, BufferWriter, PoolInfo},
data::{
partition::shard_lookup_with_key, ArconEvent, ArconEventWrapper, ArconMessage, ArconType,
NodeID,
},
dataflow::stream::KeyBuilder,
stream::channel::Channel,
};
use std::sync::Arc;
pub struct Keyed<A>
where
A: ArconType,
{
buffer_pool: BufferPool<ArconEventWrapper<A>>,
key_ranges: u64,
sender_id: NodeID,
key_builder: KeyBuilder<A>,
buffers: Vec<BufferWriter<ArconEventWrapper<A>>>,
channels: Vec<Arc<Channel<A>>>,
_pool_info: PoolInfo,
}
impl<A> Keyed<A>
where
A: ArconType,
{
pub fn new(
channels: Vec<Channel<A>>,
sender_id: NodeID,
pool_info: PoolInfo,
key_builder: KeyBuilder<A>,
) -> Keyed<A> {
let channels_len: u64 = channels.len() as u64;
assert!(
channels.len() < pool_info.capacity,
"Strategy must be initialised with a pool capacity larger than amount of channels"
);
let mut buffer_pool: BufferPool<ArconEventWrapper<A>> = BufferPool::new(
pool_info.capacity,
pool_info.buffer_size,
pool_info.allocator.clone(),
)
.expect("failed to initialise BufferPool");
let mut buffers = Vec::with_capacity(channels.len());
for _ in 0..channels.len() {
let writer = buffer_pool
.try_get()
.expect("failed to fetch initial buffer");
buffers.push(writer)
}
Keyed {
buffer_pool,
key_ranges: channels_len,
sender_id,
channels: channels.into_iter().map(Arc::new).collect::<Vec<_>>(),
buffers,
_pool_info: pool_info,
key_builder,
}
}
#[inline]
fn push_event(&mut self, index: usize, event: ArconEvent<A>) -> Option<ArconMessage<A>> {
let writer = &mut self.buffers[index];
match writer.push(event.into()) {
Some(e) => {
let msg = ArconMessage {
events: writer.reader(),
sender: self.sender_id,
};
*writer = self.buffer_pool.get();
writer.push(e);
Some(msg)
}
None => None,
}
}
#[inline]
pub fn add(&mut self, event: ArconEvent<A>) -> Vec<(Arc<Channel<A>>, ArconMessage<A>)> {
match &event {
ArconEvent::Element(e) => {
let key = self.key_builder.get_key(&e.data);
let index = shard_lookup_with_key(key, self.key_ranges);
self.push_event(index as usize, event)
.map(move |msg| vec![(self.channels[index as usize].clone(), msg)])
.unwrap_or_else(Vec::new)
}
_ => {
let mut outputs = Vec::with_capacity(self.buffers.len());
for index in 0..self.buffers.len() {
match self.push_event(index, event.clone()) {
Some(msg) => {
let writer = &mut self.buffers[index];
let msg_two = ArconMessage {
events: writer.reader(),
sender: self.sender_id,
};
*writer = self.buffer_pool.get();
outputs.push((self.channels[index].clone(), msg));
outputs.push((self.channels[index].clone(), msg_two));
}
None => {
let writer = &mut self.buffers[index];
let msg = ArconMessage {
events: writer.reader(),
sender: self.sender_id,
};
*writer = self.buffer_pool.get();
outputs.push((self.channels[index].clone(), msg));
}
}
}
outputs
}
}
}
#[inline]
pub fn num_channels(&self) -> usize {
self.channels.len()
}
}
#[cfg(test)]
mod tests {
use super::{Channel, *};
use crate::{
application::Application,
data::{ArconElement, ArconEvent, NodeID, Watermark},
stream::{
channel::strategy::{send, tests::*, ChannelStrategy},
node::debug::DebugNode,
},
};
use kompact::prelude::*;
use rand::Rng;
use std::sync::Arc;
#[test]
fn keyby_test() {
let app = Application::default();
let pool_info = app.get_pool_info();
let system = app.data_system();
let parallelism: u32 = 8;
let total_msgs = 1000;
let mut channels: Vec<Channel<Input>> = Vec::new();
let mut comps: Vec<Arc<crate::prelude::Component<DebugNode<Input>>>> = Vec::new();
for _i in 0..parallelism {
let comp = system.create(DebugNode::<Input>::new);
system.start(&comp);
let actor_ref: ActorRefStrong<ArconMessage<Input>> =
comp.actor_ref().hold().expect("failed to fetch");
channels.push(Channel::Local(actor_ref));
comps.push(comp);
}
let key_builder = KeyBuilder::<Input> {
extractor: Arc::new(|i: &Input| i.id as u64),
};
let mut channel_strategy =
ChannelStrategy::Keyed(Keyed::new(channels, NodeID::new(1), pool_info, key_builder));
let mut rng = rand::thread_rng();
let mut inputs: Vec<ArconEvent<Input>> = Vec::new();
for _i in 0..total_msgs {
let input = Input {
id: rng.gen_range(0, 100000),
};
let elem = ArconElement::new(input);
inputs.push(ArconEvent::Element(elem));
}
let comp = &comps[0];
comp.on_definition(|cd| {
for input in inputs {
let _ = channel_strategy.push(input);
}
for (channel, msg) in channel_strategy.push(ArconEvent::Watermark(Watermark::new(0))) {
let _ = send(&channel, msg, cd);
}
});
std::thread::sleep(std::time::Duration::from_secs(1));
for comp in comps {
comp.on_definition(|cd| {
assert!(!cd.data.is_empty());
});
}
app.shutdown();
}
}