use chrono::{Local, SecondsFormat, TimeZone};
use serde::{de::Error, Deserialize, Serialize};
use std::cmp::Ordering;
use std::convert::TryFrom;
use std::fmt::{self, Debug, Display, Formatter};
use std::str::FromStr;
mod offsets;
mod opaque;
pub(crate) mod scalars;
pub use offsets::{Offset, OffsetMap, OffsetOrMin};
pub use opaque::Opaque;
pub use scalars::{FishName, LamportTimestamp, ParseError, Semantics, SourceId, TimeStamp};
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "dataflow", derive(Abomonation))]
#[serde(rename_all = "camelCase")]
pub struct Event<T> {
pub lamport: LamportTimestamp,
pub stream: StreamInfo,
pub timestamp: TimeStamp,
pub offset: Offset,
pub payload: T,
}
impl Event<Payload> {
pub fn extract<'a, T>(&'a self) -> Result<Event<T>, serde_cbor::Error>
where
T: Deserialize<'a> + Clone,
{
let payload = self.payload.extract::<T>()?;
Ok(Event {
lamport: self.lamport,
stream: self.stream.clone(),
timestamp: self.timestamp,
offset: self.offset,
payload,
})
}
pub fn mk_test(semantics: &str, name: &str, payload: &str) -> Result<Event<Payload>, serde_json::Error> {
Ok(Event {
lamport: Default::default(),
timestamp: Default::default(),
offset: Offset::default(),
stream: StreamInfo {
semantics: Semantics::try_from(semantics).map_err(serde_json::Error::custom)?,
name: FishName::try_from(name).map_err(serde_json::Error::custom)?,
source: SourceId::from_str("dummy").unwrap(),
},
payload: serde_json::from_str(payload)?,
})
}
}
impl<T> Ord for Event<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.lamport
.cmp(&other.lamport)
.then(self.stream.source.cmp(&other.stream.source))
}
}
impl<T> PartialOrd for Event<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T> PartialEq for Event<T> {
fn eq(&self, other: &Self) -> bool {
self.lamport == other.lamport && self.stream.source == other.stream.source
}
}
impl<T> Eq for Event<T> {}
impl<T> Display for Event<T> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
let time = Local.timestamp_millis(self.timestamp.as_i64() / 1000);
write!(
f,
"Event at {} ({}, source ID {})",
time.to_rfc3339_opts(SecondsFormat::Millis, false),
self.lamport,
self.stream.source,
)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(feature = "dataflow", derive(Abomonation))]
#[serde(rename_all = "camelCase")]
pub struct StreamInfo {
pub semantics: Semantics,
pub name: FishName,
pub source: SourceId,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
#[cfg_attr(feature = "dataflow", derive(Abomonation))]
pub struct Payload(Opaque);
impl Payload {
pub fn from_json_str(s: &str) -> Result<Payload, String> {
serde_json::from_str(s).map_err(|e| format!("{}", e))
}
pub fn compact<T: Serialize>(t: &T) -> Result<Payload, serde_cbor::Error> {
serde_cbor::to_vec(t).map(|bytes| Payload(bytes.into()))
}
pub fn extract<'a, T: Deserialize<'a>>(&'a self) -> Result<T, serde_cbor::Error> {
serde_cbor::from_slice(self.0.as_ref())
}
pub fn json_value(&self) -> serde_json::Value {
serde_json::to_value(self).unwrap()
}
pub fn json_string(&self) -> String {
serde_json::to_string(&self).unwrap()
}
pub fn empty() -> Payload {
Payload(serde_json::from_str("null").unwrap())
}
pub fn rough_size(&self) -> usize {
self.0.rough_size()
}
pub fn from_json_value(v: serde_json::Value) -> Result<Payload, String> {
let text = serde_json::to_string(&v).unwrap();
Payload::from_json_str(&text)
}
}
impl Default for Payload {
fn default() -> Self {
Payload::empty()
}
}
impl Debug for Payload {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{}", self.json_string())
}
}