use crate::prelude::*;
use rand::Rng;
use std::thread::sleep;
use std::time::Duration;
#[derive(Arcon, Arrow, prost::Message, Copy, Clone)]
#[arcon(reliable_ser_id = 13, version = 1)]
pub struct Event {
#[prost(uint64)]
pub data: u64,
#[prost(uint32)]
pub key: u32,
}
#[derive(Arcon, Arrow, prost::Message, Copy, Clone)]
#[arcon(reliable_ser_id = 13, version = 1)]
pub struct EnrichedEvent {
#[prost(uint64)]
pub data: u64,
#[prost(uint32)]
pub key: u32,
#[prost(uint64)]
pub first_val: u64,
}
#[derive(ArconState)]
pub struct FirstVal<B: Backend> {
#[table = "Count"]
events: EagerValue<u64, B>,
}
const PARALLELISM: usize = 2;
const NUM_KEYS: usize = 256;
const EVENT_COUNT: u64 = 99946;
fn operator_conf() -> OperatorConf {
OperatorConf {
parallelism_strategy: ParallelismStrategy::Static(PARALLELISM),
..Default::default()
}
}
fn enriched_event_stream() -> Stream<EnrichedEvent> {
(0u64..EVENT_COUNT)
.to_stream(|conf| {
conf.set_arcon_time(ArconTime::Event);
conf.set_timestamp_extractor(|x: &u64| *x);
})
.operator(OperatorBuilder {
operator: Arc::new(move || {
Map::new({
|i| {
let mut rng = rand::thread_rng();
let r: u32 = rng.gen();
Event {
data: i,
key: r % (NUM_KEYS as u32),
}
}
})
}),
state: Arc::new(|_| EmptyState),
conf: operator_conf(),
})
.key_by(|event| &event.key)
.operator(OperatorBuilder {
operator: Arc::new(|| {
Map::stateful(|event: Event, state: &mut FirstVal<_>| {
let first_val: u64 = if let Some(value) = state.events().get()? {
*value
} else {
state.events().put(event.data)?;
event.data
};
Ok(EnrichedEvent {
data: event.data,
key: event.key,
first_val,
})
})
}),
state: Arc::new(|backend| FirstVal {
events: EagerValue::new("_events", backend),
}),
conf: operator_conf(),
})
}
#[test]
fn key_by_integration() {
let mut app = enriched_event_stream().debug().builder().build();
app.run();
sleep(Duration::from_secs(4));
if let Some(debug_node) = app.get_debug_node::<EnrichedEvent>() {
debug_node.on_definition(|c| {
let mut first_val_vec = Vec::new();
for element in c.data.iter() {
first_val_vec.push(element.data.first_val);
}
first_val_vec.sort_unstable();
first_val_vec.dedup();
assert_eq!(first_val_vec.len(), NUM_KEYS);
assert_eq!(c.senders.len(), PARALLELISM);
})
} else {
panic!("Failed to get DebugNode!")
}
}
#[test]
fn key_by_to_forward_integration() {
let mut app = enriched_event_stream()
.operator(OperatorBuilder {
operator: Arc::new(|| {
Map::new(|event: EnrichedEvent| event)
}),
state: Arc::new(|_| EmptyState),
conf: operator_conf(),
})
.debug()
.builder()
.build();
app.run();
sleep(Duration::from_secs(4));
if let Some(debug_node) = app.get_debug_node::<EnrichedEvent>() {
debug_node.on_definition(|c| {
let mut first_val_vec = Vec::new();
for element in c.data.iter() {
first_val_vec.push(element.data.first_val);
}
first_val_vec.sort_unstable();
first_val_vec.dedup();
assert_eq!(first_val_vec.len(), NUM_KEYS);
assert_eq!(c.senders.len(), PARALLELISM);
})
} else {
panic!("Failed to get DebugNode!")
}
}