#[derive(Clone)]
pub enum FlightSerde {
Reliable,
}
impl Default for FlightSerde {
fn default() -> Self {
FlightSerde::Reliable
}
}
pub mod reliable_remote {
use crate::data::{ArconType, RawArconMessage};
use kompact::prelude::*;
use prost::*;
#[derive(Clone, Debug)]
pub struct ReliableSerde<A: ArconType>(pub RawArconMessage<A>);
impl<A: ArconType> Deserialiser<RawArconMessage<A>> for ReliableSerde<A> {
const SER_ID: SerId = A::RELIABLE_SER_ID;
fn deserialise(buf: &mut dyn Buf) -> Result<RawArconMessage<A>, SerError> {
RawArconMessage::decode(buf.chunk()).map_err(|e| SerError::InvalidData(e.to_string()))
}
}
impl<A: ArconType> Serialisable for ReliableSerde<A> {
fn ser_id(&self) -> u64 {
A::RELIABLE_SER_ID
}
fn size_hint(&self) -> Option<usize> {
Some(self.0.encoded_len())
}
fn serialise(&self, mut buf: &mut dyn BufMut) -> Result<(), SerError> {
self.0
.encode(&mut buf)
.map_err(|e| SerError::InvalidData(e.to_string()))?;
Ok(())
}
fn local(self: Box<Self>) -> Result<Box<dyn Any + Send>, Box<dyn Serialisable>> {
Ok(self)
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::stream::channel::strategy::send;
use crate::{
application::Application,
data::{ArconElement, ArconEvent, ArconType, Watermark},
stream::{
channel::{
strategy::{forward::Forward, ChannelStrategy},
Channel,
},
node::debug::DebugNode,
},
};
use kompact::prelude::*;
use once_cell::sync::Lazy;
use std::time::Duration;
static ITEMS: Lazy<Vec<u32>> = Lazy::new(|| vec![1, 2, 3, 4, 5, 6, 7]);
const PRICE: u32 = 10;
const ID: u32 = 1;
#[derive(Arcon, prost::Message, Clone)]
#[arcon(reliable_ser_id = 105, version = 1)]
pub struct ArconDataTest {
#[prost(uint32, tag = "1")]
pub id: u32,
#[prost(uint32, repeated, tag = "2")]
pub items: Vec<u32>,
#[prost(uint32, tag = "3")]
pub price: u32,
}
#[derive(Arcon, prost::Message, Clone)]
#[arcon(reliable_ser_id = 105, version = 2)]
pub struct UpdatedVer {
#[prost(uint32, tag = "1")]
pub id: u32,
#[prost(uint32, repeated, tag = "2")]
pub items: Vec<u32>,
#[prost(uint32, tag = "3")]
pub price: u32,
}
#[derive(Arcon, prost::Message, Clone)]
#[arcon(reliable_ser_id = 205, version = 2)]
pub struct UpdatedSerId {
#[prost(uint32, tag = "1")]
pub id: u32,
#[prost(uint32, repeated, tag = "2")]
pub items: Vec<u32>,
#[prost(uint32, tag = "3")]
pub price: u32,
}
#[derive(Arcon, prost::Message, Clone)]
#[arcon(reliable_ser_id = 105, version = 1)]
pub struct RemovedField {
#[prost(uint32, tag = "1")]
pub id: u32,
#[prost(uint32, repeated, tag = "2")]
pub items: Vec<u32>,
}
#[derive(Arcon, prost::Message, Clone)]
#[arcon(reliable_ser_id = 105, version = 1)]
pub struct AddedField {
#[prost(uint32, tag = "1")]
pub id: u32,
#[prost(uint32, repeated, tag = "2")]
pub items: Vec<u32>,
#[prost(uint32, tag = "3")]
pub price: u32,
#[prost(bytes, tag = "4")]
pub bytes: Vec<u8>,
}
#[derive(Arcon, prost::Message, Clone)]
#[arcon(reliable_ser_id = 105, version = 1)]
pub struct RearrangedFieldOrder {
#[prost(bytes, tag = "1")]
pub bytes: Vec<u8>,
#[prost(uint32, tag = "2")]
pub id: u32,
#[prost(uint32, repeated, tag = "3")]
pub items: Vec<u32>,
#[prost(uint32, tag = "4")]
pub price: u32,
}
fn get_systems() -> (KompactSystem, KompactSystem) {
let system = || {
let mut cfg = KompactConfig::default();
cfg.system_components(DeadletterBox::new, NetworkConfig::default().build());
cfg.build().expect("KompactSystem")
};
(system(), system())
}
#[test]
fn reliable_serde_test() {
let data = flight_test::<ArconDataTest>(FlightSerde::Reliable);
for d in data {
assert_eq!(d.data.items, *ITEMS);
assert_eq!(d.data.price, PRICE);
assert_eq!(d.data.id, ID);
}
}
#[test]
fn reliable_added_field_test() {
let data = flight_test::<AddedField>(FlightSerde::Reliable);
for d in data {
assert_eq!(d.data.items, *ITEMS);
assert_eq!(d.data.price, PRICE);
assert_eq!(d.data.id, ID);
assert_eq!(d.data.bytes.len(), 0);
}
}
#[test]
#[should_panic]
fn reliable_rearranged_field_test() {
let _ = flight_test::<RearrangedFieldOrder>(FlightSerde::Reliable);
}
#[test]
fn reliable_removed_field_test() {
let data = flight_test::<RemovedField>(FlightSerde::Reliable);
for d in data {
assert_eq!(d.data.id, ID);
assert_eq!(d.data.items, *ITEMS);
}
}
fn flight_test<ReceivingType>(serde: FlightSerde) -> Vec<ArconElement<ReceivingType>>
where
ReceivingType: ArconType,
{
let app = Application::default();
let pool_info = app.get_pool_info();
let (local, remote) = get_systems();
let timeout = Duration::from_millis(150);
let comp = remote.create(DebugNode::<ReceivingType>::new);
remote
.start_notify(&comp)
.wait_timeout(timeout)
.expect("comp never started");
let comp_id = String::from("comp");
let reg = remote.register_by_alias(&comp, comp_id.clone());
reg.wait_expect(
Duration::from_millis(1000),
"Failed to register alias for DebugNode",
);
let remote_path =
ActorPath::Named(NamedPath::with_system(remote.system_path(), vec![comp_id]));
let channel = Channel::Remote(remote_path, serde);
let mut channel_strategy: ChannelStrategy<ArconDataTest> =
ChannelStrategy::Forward(Forward::new(channel, 1.into(), pool_info));
let data = ArconDataTest {
id: ID,
items: ITEMS.clone(),
price: PRICE,
};
let element = ArconElement::new(data);
comp.on_definition(|cd| {
channel_strategy.push(ArconEvent::Element(element.clone()));
channel_strategy.push(ArconEvent::Element(element.clone()));
channel_strategy.push(ArconEvent::Element(element.clone()));
channel_strategy.push(ArconEvent::Element(element));
for (channel, msg) in channel_strategy.push(ArconEvent::Watermark(Watermark::new(0))) {
let _ = send(&channel, msg, cd);
}
});
std::thread::sleep(timeout);
let data = comp.on_definition(|cd| {
assert_eq!(cd.data.len() as u64, 4);
cd.data.clone()
});
let _ = local.shutdown();
let _ = remote.shutdown();
data
}
}