use std::cmp::Ordering;
use serde::{Deserialize, Serialize};
use crate::{
offset::Offset,
scalars::StreamId,
tags::TagSet,
timestamp::{LamportTimestamp, Timestamp},
AppId, NodeId, StreamNr,
};
mod opaque;
mod payload;
#[cfg(test)]
mod test_util;
#[cfg(test)]
pub use test_util::*;
pub use opaque::Opaque;
pub use payload::Payload;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Event<T> {
pub key: EventKey,
pub meta: Metadata,
pub payload: T,
}
impl<T> PartialOrd for Event<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.key.cmp(&other.key))
}
}
impl<T> Ord for Event<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.key.cmp(&other.key)
}
}
impl<T> PartialEq for Event<T> {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl<T> Eq for Event<T> {}
#[derive(Debug, Serialize, Deserialize, Clone, Ord, PartialOrd, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
pub timestamp: Timestamp,
pub tags: TagSet,
pub app_id: AppId,
}
#[cfg(any(test, feature = "arb"))]
impl quickcheck::Arbitrary for Metadata {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
Self {
timestamp: quickcheck::Arbitrary::arbitrary(g),
tags: quickcheck::Arbitrary::arbitrary(g),
app_id: quickcheck::Arbitrary::arbitrary(g),
}
}
}
impl Event<Payload> {
pub fn extract<'a, T>(&'a self) -> Result<Event<T>, serde_cbor::Error>
where
T: Deserialize<'a> + Clone,
{
let payload = self.payload.extract::<T>()?;
Ok(Event {
key: self.key,
meta: self.meta.clone(),
payload,
})
}
}
#[derive(Copy, Debug, Serialize, Deserialize, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub struct EventKey {
pub lamport: LamportTimestamp,
pub stream: StreamId,
pub offset: Offset,
}
impl EventKey {
pub const ZERO: Self = Self {
lamport: LamportTimestamp::new(0),
stream: NodeId::new([0; 32]).stream(StreamNr::new(0)),
offset: Offset::new(0),
};
}
#[cfg(any(test, feature = "arb"))]
impl quickcheck::Arbitrary for EventKey {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
Self {
lamport: quickcheck::Arbitrary::arbitrary(g),
stream: quickcheck::Arbitrary::arbitrary(g),
offset: quickcheck::Arbitrary::arbitrary(g),
}
}
}