1pub mod arrow;
3pub mod flight_serde;
5#[allow(dead_code)]
6pub mod partition;
7mod ser_id;
9
10use crate::buffer::event::BufferReader;
11use kompact::prelude::*;
12use prost::{Message, Oneof};
13use std::{
14 fmt,
15 hash::{Hash, Hasher},
16 ops::Deref,
17};
18
19pub type StateID = String;
21
22pub type VersionId = u32;
24
25pub trait ArconTypeBounds:
26 Clone + fmt::Debug + Sync + Send + prost::Message + Default + 'static
27{
28}
29impl<T> ArconTypeBounds for T where
30 T: Clone + fmt::Debug + Sync + Send + prost::Message + Default + 'static
31{
32}
33
34pub trait ArconType: ArconTypeBounds
36where
37 Self: std::marker::Sized,
38{
39 const RELIABLE_SER_ID: SerId;
41 const VERSION_ID: VersionId;
43}
44
45#[derive(Oneof, Clone)]
47pub enum ArconEvent<A: ArconType> {
48 #[prost(message, tag = "1")]
50 Element(ArconElement<A>),
51 #[prost(message, tag = "2")]
53 Watermark(Watermark),
54 #[prost(message, tag = "3")]
56 Epoch(Epoch),
57 #[prost(message, tag = "4")]
59 Death(String),
60}
61
62#[derive(Message, Clone)]
68pub struct ArconEventWrapper<A: ArconType> {
69 #[prost(oneof = "ArconEvent::<A>", tags = "1, 2, 3, 4")]
70 inner: Option<ArconEvent<A>>,
71}
72
73impl<A: ArconType> ArconEventWrapper<A> {
74 pub fn unwrap(self) -> ArconEvent<A> {
75 self.inner
76 .expect("ArconEventWrapper.inner is None. Prost deserialization error?")
77 }
78
79 pub fn unwrap_ref(&self) -> &ArconEvent<A> {
80 self.inner
81 .as_ref()
82 .expect("ArconEventWrapper.inner is None. Prost deserialization error?")
83 }
84}
85
86impl<A: ArconType> From<ArconEvent<A>> for ArconEventWrapper<A> {
87 fn from(inner: ArconEvent<A>) -> Self {
88 ArconEventWrapper { inner: Some(inner) }
89 }
90}
91
92#[derive(Message, Clone)]
94pub struct ArconElement<A: ArconType> {
95 #[prost(message, required, tag = "1")]
96 pub data: A,
97 #[prost(uint64, tag = "2")]
98 pub timestamp: u64,
99}
100
101impl<A: ArconType> ArconElement<A> {
102 pub fn new(data: A) -> Self {
104 ArconElement { data, timestamp: 0 }
105 }
106
107 pub fn with_timestamp(data: A, timestamp: u64) -> Self {
109 ArconElement { data, timestamp }
110 }
111}
112
113#[derive(Message, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
115pub struct Watermark {
116 #[prost(uint64, tag = "1")]
117 pub timestamp: u64,
118}
119
120impl Watermark {
121 pub fn new(timestamp: u64) -> Self {
122 Watermark { timestamp }
123 }
124}
125
126#[derive(Message, Clone, Hash, Copy, Ord, PartialOrd, Eq, PartialEq)]
128pub struct Epoch {
129 #[prost(uint64, tag = "1")]
130 pub epoch: u64,
131}
132
133impl Epoch {
134 pub fn new(epoch: u64) -> Self {
136 Epoch { epoch }
137 }
138}
139
140pub enum MessageContainer<A: ArconType> {
142 Raw(RawArconMessage<A>),
146 Local(ArconMessage<A>),
148}
149
150impl<A: ArconType> MessageContainer<A> {
151 #[inline]
153 pub fn sender(&self) -> &NodeID {
154 match self {
155 MessageContainer::Raw(r) => &r.sender,
156 MessageContainer::Local(l) => &l.sender,
157 }
158 }
159 #[inline]
162 #[cfg(feature = "metrics")]
163 pub fn total_events(&self) -> u64 {
164 match self {
165 MessageContainer::Raw(r) => r.events.len() as u64,
166 MessageContainer::Local(l) => l.events.len() as u64,
167 }
168 }
169 pub fn raw(self) -> RawArconMessage<A> {
171 match self {
172 MessageContainer::Raw(r) => r,
173 MessageContainer::Local(l) => l.into(),
174 }
175 }
176}
177
178#[derive(Debug, Clone)]
180pub struct ArconMessage<A: ArconType> {
181 pub events: BufferReader<ArconEventWrapper<A>>,
183 pub sender: NodeID,
185}
186
187#[derive(Message, Clone)]
189pub struct RawArconMessage<A: ArconType> {
190 #[prost(message, repeated, tag = "1")]
192 pub events: Vec<ArconEventWrapper<A>>,
193 #[prost(message, required, tag = "2")]
195 pub sender: NodeID,
196}
197
198impl<A: ArconType> From<ArconMessage<A>> for RawArconMessage<A> {
199 fn from(msg: ArconMessage<A>) -> Self {
200 RawArconMessage {
201 events: msg.events.to_vec(),
202 sender: msg.sender,
203 }
204 }
205}
206
207#[cfg(test)]
209impl<A: ArconType> ArconMessage<A> {
210 pub fn watermark(timestamp: u64, sender: NodeID) -> ArconMessage<A> {
214 ArconMessage {
215 events: vec![ArconEvent::<A>::Watermark(Watermark { timestamp }).into()].into(),
216 sender,
217 }
218 }
219 pub fn epoch(epoch: u64, sender: NodeID) -> ArconMessage<A> {
223 ArconMessage {
224 events: vec![ArconEvent::<A>::Epoch(Epoch { epoch }).into()].into(),
225 sender,
226 }
227 }
228 pub fn death(msg: String, sender: NodeID) -> ArconMessage<A> {
232 ArconMessage {
233 events: vec![ArconEvent::<A>::Death(msg).into()].into(),
234 sender,
235 }
236 }
237 pub fn element(data: A, timestamp: u64, sender: NodeID) -> ArconMessage<A> {
241 ArconMessage {
242 events: vec![ArconEvent::Element(ArconElement { data, timestamp }).into()].into(),
243 sender,
244 }
245 }
246}
247
248#[derive(Message, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)]
250pub struct NodeID {
251 #[prost(uint32, tag = "1")]
252 pub id: u32,
253}
254
255impl NodeID {
256 pub fn new(new_id: u32) -> NodeID {
257 NodeID { id: new_id }
258 }
259}
260
261impl From<u32> for NodeID {
262 fn from(id: u32) -> Self {
263 NodeID::new(id)
264 }
265}
266
267impl ArconType for u32 {
272 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_U32_ID;
273 const VERSION_ID: VersionId = 1;
274}
275impl ArconType for u64 {
276 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_U64_ID;
277 const VERSION_ID: VersionId = 1;
278}
279impl ArconType for i32 {
280 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_I32_ID;
281 const VERSION_ID: VersionId = 1;
282}
283impl ArconType for i64 {
284 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_I64_ID;
285 const VERSION_ID: VersionId = 1;
286}
287impl ArconType for ArconF32 {
288 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_F32_ID;
289 const VERSION_ID: VersionId = 1;
290}
291impl ArconType for ArconF64 {
292 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_F64_ID;
293 const VERSION_ID: VersionId = 1;
294}
295impl ArconType for bool {
296 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_BOOLEAN_ID;
297 const VERSION_ID: VersionId = 1;
298}
299impl ArconType for String {
300 const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_STRING_ID;
301 const VERSION_ID: VersionId = 1;
302}
303
304#[derive(Clone, Message)]
308#[repr(transparent)]
309pub struct ArconF32 {
310 #[prost(float, tag = "1")]
311 pub value: f32,
312}
313
314impl ArconF32 {
315 pub fn new(value: f32) -> ArconF32 {
316 ArconF32 { value }
317 }
318}
319
320impl Hash for ArconF32 {
321 fn hash<H: Hasher>(&self, state: &mut H) {
322 let s: u64 = self.value.trunc() as u64;
323 s.hash(state);
324 }
325}
326
327impl From<f32> for ArconF32 {
328 fn from(value: f32) -> Self {
329 ArconF32::new(value)
330 }
331}
332
333impl std::str::FromStr for ArconF32 {
334 type Err = ::std::num::ParseFloatError;
335 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
336 let f: f32 = s.parse::<f32>()?;
337 Ok(ArconF32::new(f))
338 }
339}
340impl Deref for ArconF32 {
341 type Target = f32;
342
343 fn deref(&self) -> &Self::Target {
344 &self.value
345 }
346}
347
348impl PartialEq for ArconF32 {
349 fn eq(&self, other: &Self) -> bool {
350 self.value == other.value
351 }
352}
353
354#[derive(Clone, Message)]
358#[repr(transparent)]
359pub struct ArconF64 {
360 #[prost(double, tag = "1")]
361 pub value: f64,
362}
363
364impl ArconF64 {
365 pub fn new(value: f64) -> ArconF64 {
366 ArconF64 { value }
367 }
368}
369
370impl Hash for ArconF64 {
371 fn hash<H: Hasher>(&self, state: &mut H) {
372 let s: u64 = self.value.trunc() as u64;
373 s.hash(state);
374 }
375}
376
377impl From<f64> for ArconF64 {
378 fn from(value: f64) -> Self {
379 ArconF64::new(value)
380 }
381}
382
383impl std::str::FromStr for ArconF64 {
384 type Err = ::std::num::ParseFloatError;
385 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
386 let f: f64 = s.parse::<f64>()?;
387 Ok(ArconF64::new(f))
388 }
389}
390impl Deref for ArconF64 {
391 type Target = f64;
392
393 fn deref(&self) -> &Self::Target {
394 &self.value
395 }
396}
397
398impl PartialEq for ArconF64 {
399 fn eq(&self, other: &Self) -> bool {
400 self.value == other.value
401 }
402}
403
404#[derive(Clone, Copy, PartialEq, Eq)]
406pub enum ArconNever {}
407impl ArconNever {
408 pub const IS_UNREACHABLE: &'static str = "The ArconNever type cannot be instantiated!";
409}
410impl ArconType for ArconNever {
411 const RELIABLE_SER_ID: SerId = ser_id::NEVER_ID;
412 const VERSION_ID: VersionId = 1;
413}
414impl fmt::Debug for ArconNever {
415 fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
416 unreachable!("{}", ArconNever::IS_UNREACHABLE);
417 }
418}
419impl prost::Message for ArconNever {
420 fn encoded_len(&self) -> usize {
421 unreachable!("{}", ArconNever::IS_UNREACHABLE);
422 }
423
424 fn clear(&mut self) {
425 unreachable!("{}", ArconNever::IS_UNREACHABLE);
426 }
427
428 fn encode_raw<B>(&self, _: &mut B)
429 where
430 B: bytes::buf::BufMut,
431 {
432 unreachable!("{}", ArconNever::IS_UNREACHABLE);
433 }
434 fn merge_field<B>(
435 &mut self,
436 _: u32,
437 _: prost::encoding::WireType,
438 _: &mut B,
439 _: prost::encoding::DecodeContext,
440 ) -> std::result::Result<(), prost::DecodeError>
441 where
442 B: bytes::buf::Buf,
443 {
444 unreachable!("{}", ArconNever::IS_UNREACHABLE);
445 }
446}
447
448impl Default for ArconNever {
449 fn default() -> Self {
450 unreachable!("{}", ArconNever::IS_UNREACHABLE);
451 }
452}