arcon/data/
mod.rs

1/// Arrow related types
2pub mod arrow;
3/// Serialisers and Deserialiser for in-flight data
4pub mod flight_serde;
5#[allow(dead_code)]
6pub mod partition;
7/// Known Serialisation IDs for Arcon Types
8mod ser_id;
9
10use crate::buffer::event::BufferReader;
11use kompact::prelude::*;
12use prost::{Message, Oneof};
13use std::{
14    fmt,
15    hash::{Hash, Hasher},
16    ops::Deref,
17};
18
19/// A type alias for registered state within Arcon
20pub type StateID = String;
21
22/// A type alias for an ArconType version id
23pub type VersionId = u32;
24
25pub trait ArconTypeBounds:
26    Clone + fmt::Debug + Sync + Send + prost::Message + Default + 'static
27{
28}
29impl<T> ArconTypeBounds for T where
30    T: Clone + fmt::Debug + Sync + Send + prost::Message + Default + 'static
31{
32}
33
34/// Type that can be passed through the Arcon runtime
35pub trait ArconType: ArconTypeBounds
36where
37    Self: std::marker::Sized,
38{
39    /// Serialisation ID for Arcon's Reliable In-flight serde
40    const RELIABLE_SER_ID: SerId;
41    /// Current version of this ArconType
42    const VERSION_ID: VersionId;
43}
44
45/// An Enum containing all possible stream events that may occur in an execution
46#[derive(Oneof, Clone)]
47pub enum ArconEvent<A: ArconType> {
48    /// A stream element containing some data of type [ArconType] and an optional timestamp [u64]
49    #[prost(message, tag = "1")]
50    Element(ArconElement<A>),
51    /// A [Watermark] message
52    #[prost(message, tag = "2")]
53    Watermark(Watermark),
54    /// An [Epoch] marker message
55    #[prost(message, tag = "3")]
56    Epoch(Epoch),
57    /// A death message
58    #[prost(message, tag = "4")]
59    Death(String),
60}
61
62// The struct below is required because of the peculiarity of prost/protobuf - you cannot have
63// repeated (like, a Vec<_>) oneof fields, so we wrap ArconEvent in a struct which implements
64// prost::Message. Unfortunately protobuf also doesn't allow for required oneof fields, so the inner
65// value has to be optional. In practice we expect it to always be Some.
66
67#[derive(Message, Clone)]
68pub struct ArconEventWrapper<A: ArconType> {
69    #[prost(oneof = "ArconEvent::<A>", tags = "1, 2, 3, 4")]
70    inner: Option<ArconEvent<A>>,
71}
72
73impl<A: ArconType> ArconEventWrapper<A> {
74    pub fn unwrap(self) -> ArconEvent<A> {
75        self.inner
76            .expect("ArconEventWrapper.inner is None. Prost deserialization error?")
77    }
78
79    pub fn unwrap_ref(&self) -> &ArconEvent<A> {
80        self.inner
81            .as_ref()
82            .expect("ArconEventWrapper.inner is None. Prost deserialization error?")
83    }
84}
85
86impl<A: ArconType> From<ArconEvent<A>> for ArconEventWrapper<A> {
87    fn from(inner: ArconEvent<A>) -> Self {
88        ArconEventWrapper { inner: Some(inner) }
89    }
90}
91
92/// A Stream element containing some data and timestamp
93#[derive(Message, Clone)]
94pub struct ArconElement<A: ArconType> {
95    #[prost(message, required, tag = "1")]
96    pub data: A,
97    #[prost(uint64, tag = "2")]
98    pub timestamp: u64,
99}
100
101impl<A: ArconType> ArconElement<A> {
102    /// Creates an ArconElement without a timestamp
103    pub fn new(data: A) -> Self {
104        ArconElement { data, timestamp: 0 }
105    }
106
107    /// Creates an ArconElement with a timestamp
108    pub fn with_timestamp(data: A, timestamp: u64) -> Self {
109        ArconElement { data, timestamp }
110    }
111}
112
113/// Watermark message containing a [u64] timestamp
114#[derive(Message, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
115pub struct Watermark {
116    #[prost(uint64, tag = "1")]
117    pub timestamp: u64,
118}
119
120impl Watermark {
121    pub fn new(timestamp: u64) -> Self {
122        Watermark { timestamp }
123    }
124}
125
126/// Epoch marker message
127#[derive(Message, Clone, Hash, Copy, Ord, PartialOrd, Eq, PartialEq)]
128pub struct Epoch {
129    #[prost(uint64, tag = "1")]
130    pub epoch: u64,
131}
132
133impl Epoch {
134    /// Creates a new Epoch
135    pub fn new(epoch: u64) -> Self {
136        Epoch { epoch }
137    }
138}
139
140/// Container that holds two possible variants of messages
141pub enum MessageContainer<A: ArconType> {
142    /// Batch of events backed Rust's system allocator
143    ///
144    /// Used when receiving events from the network or when restoring messages from a state backend
145    Raw(RawArconMessage<A>),
146    /// Batch of events backed by Arcon's allocator
147    Local(ArconMessage<A>),
148}
149
150impl<A: ArconType> MessageContainer<A> {
151    /// Return a reference to the sender ID of this message
152    #[inline]
153    pub fn sender(&self) -> &NodeID {
154        match self {
155            MessageContainer::Raw(r) => &r.sender,
156            MessageContainer::Local(l) => &l.sender,
157        }
158    }
159    /// Return number of events in the message
160
161    #[inline]
162    #[cfg(feature = "metrics")]
163    pub fn total_events(&self) -> u64 {
164        match self {
165            MessageContainer::Raw(r) => r.events.len() as u64,
166            MessageContainer::Local(l) => l.events.len() as u64,
167        }
168    }
169    /// Consumes the container and returns a RawArconMessage
170    pub fn raw(self) -> RawArconMessage<A> {
171        match self {
172            MessageContainer::Raw(r) => r,
173            MessageContainer::Local(l) => l.into(),
174        }
175    }
176}
177
178/// An ArconMessage backed by a reusable EventBuffer
179#[derive(Debug, Clone)]
180pub struct ArconMessage<A: ArconType> {
181    /// Batch of ArconEvents backed by an EventBuffer
182    pub events: BufferReader<ArconEventWrapper<A>>,
183    /// ID identifying where the message is sent from
184    pub sender: NodeID,
185}
186
187/// A raw ArconMessage for serialisation
188#[derive(Message, Clone)]
189pub struct RawArconMessage<A: ArconType> {
190    /// Batch of ArconEvents
191    #[prost(message, repeated, tag = "1")]
192    pub events: Vec<ArconEventWrapper<A>>,
193    /// ID identifying where the message is sent from
194    #[prost(message, required, tag = "2")]
195    pub sender: NodeID,
196}
197
198impl<A: ArconType> From<ArconMessage<A>> for RawArconMessage<A> {
199    fn from(msg: ArconMessage<A>) -> Self {
200        RawArconMessage {
201            events: msg.events.to_vec(),
202            sender: msg.sender,
203        }
204    }
205}
206
207/// Convenience methods
208#[cfg(test)]
209impl<A: ArconType> ArconMessage<A> {
210    /// Creates an ArconMessage with a single [ArconEvent::Watermark] event
211    ///
212    /// This function should only be used for development and test purposes.
213    pub fn watermark(timestamp: u64, sender: NodeID) -> ArconMessage<A> {
214        ArconMessage {
215            events: vec![ArconEvent::<A>::Watermark(Watermark { timestamp }).into()].into(),
216            sender,
217        }
218    }
219    /// Creates an ArconMessage with a single [ArconEvent::Epoch] event
220    ///
221    /// This function should only be used for development and test purposes.
222    pub fn epoch(epoch: u64, sender: NodeID) -> ArconMessage<A> {
223        ArconMessage {
224            events: vec![ArconEvent::<A>::Epoch(Epoch { epoch }).into()].into(),
225            sender,
226        }
227    }
228    /// Creates an ArconMessage with a single [ArconEvent::Death] event
229    ///
230    /// This function should only be used for development and test purposes.
231    pub fn death(msg: String, sender: NodeID) -> ArconMessage<A> {
232        ArconMessage {
233            events: vec![ArconEvent::<A>::Death(msg).into()].into(),
234            sender,
235        }
236    }
237    /// Creates an ArconMessage with a single [ArconEvent::Element] event
238    ///
239    /// This function should only be used for development and test purposes.
240    pub fn element(data: A, timestamp: u64, sender: NodeID) -> ArconMessage<A> {
241        ArconMessage {
242            events: vec![ArconEvent::Element(ArconElement { data, timestamp }).into()].into(),
243            sender,
244        }
245    }
246}
247
248/// A NodeID is used to identify a message sender
249#[derive(Message, PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone)]
250pub struct NodeID {
251    #[prost(uint32, tag = "1")]
252    pub id: u32,
253}
254
255impl NodeID {
256    pub fn new(new_id: u32) -> NodeID {
257        NodeID { id: new_id }
258    }
259}
260
261impl From<u32> for NodeID {
262    fn from(id: u32) -> Self {
263        NodeID::new(id)
264    }
265}
266
267// Implement ArconType for primitives.
268// NOTE: This is mainly for testing and development. In practice,
269// an ArconType is always a struct or enum.
270
271impl ArconType for u32 {
272    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_U32_ID;
273    const VERSION_ID: VersionId = 1;
274}
275impl ArconType for u64 {
276    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_U64_ID;
277    const VERSION_ID: VersionId = 1;
278}
279impl ArconType for i32 {
280    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_I32_ID;
281    const VERSION_ID: VersionId = 1;
282}
283impl ArconType for i64 {
284    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_I64_ID;
285    const VERSION_ID: VersionId = 1;
286}
287impl ArconType for ArconF32 {
288    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_F32_ID;
289    const VERSION_ID: VersionId = 1;
290}
291impl ArconType for ArconF64 {
292    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_F64_ID;
293    const VERSION_ID: VersionId = 1;
294}
295impl ArconType for bool {
296    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_BOOLEAN_ID;
297    const VERSION_ID: VersionId = 1;
298}
299impl ArconType for String {
300    const RELIABLE_SER_ID: SerId = ser_id::RELIABLE_STRING_ID;
301    const VERSION_ID: VersionId = 1;
302}
303
304/// Float wrapper for f32 in order to impl Hash [std::hash::Hash]
305///
306/// The `Hash` impl rounds the floats down to an integer and then hashes it.
307#[derive(Clone, Message)]
308#[repr(transparent)]
309pub struct ArconF32 {
310    #[prost(float, tag = "1")]
311    pub value: f32,
312}
313
314impl ArconF32 {
315    pub fn new(value: f32) -> ArconF32 {
316        ArconF32 { value }
317    }
318}
319
320impl Hash for ArconF32 {
321    fn hash<H: Hasher>(&self, state: &mut H) {
322        let s: u64 = self.value.trunc() as u64;
323        s.hash(state);
324    }
325}
326
327impl From<f32> for ArconF32 {
328    fn from(value: f32) -> Self {
329        ArconF32::new(value)
330    }
331}
332
333impl std::str::FromStr for ArconF32 {
334    type Err = ::std::num::ParseFloatError;
335    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
336        let f: f32 = s.parse::<f32>()?;
337        Ok(ArconF32::new(f))
338    }
339}
340impl Deref for ArconF32 {
341    type Target = f32;
342
343    fn deref(&self) -> &Self::Target {
344        &self.value
345    }
346}
347
348impl PartialEq for ArconF32 {
349    fn eq(&self, other: &Self) -> bool {
350        self.value == other.value
351    }
352}
353
354/// Float wrapper for f64 in order to impl Hash [std::hash::Hash]
355///
356/// The `Hash` impl rounds the floats down to an integer and then hashes it.
357#[derive(Clone, Message)]
358#[repr(transparent)]
359pub struct ArconF64 {
360    #[prost(double, tag = "1")]
361    pub value: f64,
362}
363
364impl ArconF64 {
365    pub fn new(value: f64) -> ArconF64 {
366        ArconF64 { value }
367    }
368}
369
370impl Hash for ArconF64 {
371    fn hash<H: Hasher>(&self, state: &mut H) {
372        let s: u64 = self.value.trunc() as u64;
373        s.hash(state);
374    }
375}
376
377impl From<f64> for ArconF64 {
378    fn from(value: f64) -> Self {
379        ArconF64::new(value)
380    }
381}
382
383impl std::str::FromStr for ArconF64 {
384    type Err = ::std::num::ParseFloatError;
385    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
386        let f: f64 = s.parse::<f64>()?;
387        Ok(ArconF64::new(f))
388    }
389}
390impl Deref for ArconF64 {
391    type Target = f64;
392
393    fn deref(&self) -> &Self::Target {
394        &self.value
395    }
396}
397
398impl PartialEq for ArconF64 {
399    fn eq(&self, other: &Self) -> bool {
400        self.value == other.value
401    }
402}
403
404/// Arcon variant of the `Never` (or `!`) type which fulfills `ArconType` requirements
405#[derive(Clone, Copy, PartialEq, Eq)]
406pub enum ArconNever {}
407impl ArconNever {
408    pub const IS_UNREACHABLE: &'static str = "The ArconNever type cannot be instantiated!";
409}
410impl ArconType for ArconNever {
411    const RELIABLE_SER_ID: SerId = ser_id::NEVER_ID;
412    const VERSION_ID: VersionId = 1;
413}
414impl fmt::Debug for ArconNever {
415    fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
416        unreachable!("{}", ArconNever::IS_UNREACHABLE);
417    }
418}
419impl prost::Message for ArconNever {
420    fn encoded_len(&self) -> usize {
421        unreachable!("{}", ArconNever::IS_UNREACHABLE);
422    }
423
424    fn clear(&mut self) {
425        unreachable!("{}", ArconNever::IS_UNREACHABLE);
426    }
427
428    fn encode_raw<B>(&self, _: &mut B)
429    where
430        B: bytes::buf::BufMut,
431    {
432        unreachable!("{}", ArconNever::IS_UNREACHABLE);
433    }
434    fn merge_field<B>(
435        &mut self,
436        _: u32,
437        _: prost::encoding::WireType,
438        _: &mut B,
439        _: prost::encoding::DecodeContext,
440    ) -> std::result::Result<(), prost::DecodeError>
441    where
442        B: bytes::buf::Buf,
443    {
444        unreachable!("{}", ArconNever::IS_UNREACHABLE);
445    }
446}
447
448impl Default for ArconNever {
449    fn default() -> Self {
450        unreachable!("{}", ArconNever::IS_UNREACHABLE);
451    }
452}