use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, fmt::Display, num::NonZeroU64, ops::AddAssign};
use crate::{
app_id,
event::{Event, EventKey, Metadata},
scalars::StreamId,
tags::TagSet,
LamportTimestamp, Offset, OffsetMap, Payload, Timestamp,
};
use lazy_static::lazy_static;
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
#[serde(rename_all = "kebab-case")]
pub enum Order {
Asc,
Desc,
StreamAsc,
}
#[cfg(any(test, feature = "arb"))]
impl quickcheck::Arbitrary for Order {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
*g.choose(&[Order::Asc, Order::Desc, Order::StreamAsc]).unwrap()
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct QueryRequest {
pub query: String,
pub lower_bound: Option<OffsetMap>,
pub upper_bound: Option<OffsetMap>,
pub order: Order,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeRequest {
pub query: String,
pub lower_bound: Option<OffsetMap>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Ord, PartialOrd, Eq, PartialEq)]
#[serde(from = "EventMetaIo", into = "EventMetaIo")]
pub enum EventMeta {
Range {
from_key: EventKey,
to_key: EventKey,
from_time: Timestamp,
to_time: Timestamp,
},
Synthetic,
Event {
key: EventKey,
meta: Metadata,
},
}
#[cfg(any(test, feature = "arb"))]
impl quickcheck::Arbitrary for EventMeta {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
enum Kind {
S,
E,
R,
}
match g.choose(&[Kind::S, Kind::E, Kind::R]).unwrap() {
Kind::S => EventMeta::Synthetic,
Kind::E => EventMeta::Event {
key: quickcheck::Arbitrary::arbitrary(g),
meta: quickcheck::Arbitrary::arbitrary(g),
},
Kind::R => EventMeta::Range {
from_key: quickcheck::Arbitrary::arbitrary(g),
to_key: quickcheck::Arbitrary::arbitrary(g),
from_time: quickcheck::Arbitrary::arbitrary(g),
to_time: quickcheck::Arbitrary::arbitrary(g),
},
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Ord, PartialOrd, Eq, PartialEq)]
#[serde(untagged)]
pub enum EventMetaIo {
#[serde(rename_all = "camelCase")]
Range {
from_key: EventKey,
to_key: EventKey,
from_time: Timestamp,
to_time: Timestamp,
#[serde(flatten)]
key: EventKey,
#[serde(flatten)]
meta: Metadata,
},
Event {
#[serde(flatten)]
key: EventKey,
#[serde(flatten)]
meta: Metadata,
},
}
lazy_static! {
static ref METADATA: Metadata = Metadata {
timestamp: Timestamp::new(0),
tags: TagSet::empty(),
app_id: app_id!("none"),
};
}
impl From<EventMeta> for EventMetaIo {
fn from(em: EventMeta) -> Self {
match em {
EventMeta::Range {
from_key,
to_key,
from_time,
to_time,
} => Self::Range {
from_key,
to_key,
from_time,
to_time,
key: EventKey::ZERO,
meta: METADATA.clone(),
},
EventMeta::Synthetic => Self::Event {
key: EventKey::ZERO,
meta: METADATA.clone(),
},
EventMeta::Event { key, meta } => Self::Event { key, meta },
}
}
}
impl From<EventMetaIo> for EventMeta {
fn from(em: EventMetaIo) -> Self {
match em {
EventMetaIo::Range {
from_key,
to_key,
from_time,
to_time,
..
} => Self::Range {
from_key,
to_key,
from_time,
to_time,
},
EventMetaIo::Event { key, meta } => {
if meta.timestamp.as_i64() == 0 {
Self::Synthetic
} else {
Self::Event { key, meta }
}
}
}
}
}
impl EventMeta {
fn left(&self) -> (EventKey, Timestamp) {
match self {
EventMeta::Range {
from_key, from_time, ..
} => (*from_key, *from_time),
EventMeta::Synthetic => (EventKey::ZERO, 0.into()),
EventMeta::Event { key, meta } => (*key, meta.timestamp),
}
}
fn right(&self) -> (EventKey, Timestamp) {
match self {
EventMeta::Range { to_key, to_time, .. } => (*to_key, *to_time),
EventMeta::Synthetic => (EventKey::ZERO, 0.into()),
EventMeta::Event { key, meta } => (*key, meta.timestamp),
}
}
}
impl AddAssign<&Self> for EventMeta {
fn add_assign(&mut self, rhs: &Self) {
if *rhs == EventMeta::Synthetic {
return;
}
match self {
EventMeta::Range {
from_key,
to_key,
from_time,
to_time,
} => {
let (min_key, min_time) = rhs.left();
let (max_key, max_time) = rhs.right();
if min_key < *from_key {
*from_key = min_key;
}
if max_key > *to_key {
*to_key = max_key;
}
if min_time < *from_time {
*from_time = min_time;
}
if max_time > *to_time {
*to_time = max_time;
}
}
EventMeta::Synthetic => *self = rhs.clone(),
EventMeta::Event { key, meta } => match rhs {
EventMeta::Range {
from_key: min_key,
to_key: max_key,
from_time: min_time,
to_time: max_time,
} => {
*self = EventMeta::Range {
from_key: (*key).min(*min_key),
to_key: (*key).max(*max_key),
from_time: (meta.timestamp).min(*min_time),
to_time: (meta.timestamp).max(*max_time),
};
}
EventMeta::Synthetic => {}
EventMeta::Event {
key: rkey,
meta: Metadata { timestamp: rtime, .. },
} => {
if rkey == key && *rtime == meta.timestamp {
return;
}
*self = EventMeta::Range {
from_key: (*rkey).min(*key),
to_key: (*rkey).max(*key),
from_time: (*rtime).min(meta.timestamp),
to_time: (*rtime).max(meta.timestamp),
};
}
},
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Ord, PartialOrd, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct EventResponse<T> {
#[serde(flatten)]
pub meta: EventMeta,
pub payload: T,
}
impl<T> From<Event<T>> for EventResponse<T> {
fn from(env: Event<T>) -> Self {
let Event { key, meta, payload } = env;
EventResponse {
meta: EventMeta::Event { key, meta },
payload,
}
}
}
impl EventResponse<Payload> {
pub fn extract<'a, T>(&'a self) -> Result<EventResponse<T>, serde_cbor::Error>
where
T: Deserialize<'a> + Clone,
{
Ok(EventResponse {
meta: self.meta.clone(),
payload: self.payload.extract::<T>()?,
})
}
}
impl<T> std::fmt::Display for EventResponse<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
use chrono::TimeZone;
match &self.meta {
EventMeta::Range { .. } => {
write!(f, "composite event")
}
EventMeta::Event { key, meta } => {
let time = chrono::Local
.timestamp_micros(meta.timestamp.as_i64())
.single()
.expect("source is a stored timestamp");
write!(
f,
"event at {} ({}, stream ID {})",
time.to_rfc3339_opts(chrono::SecondsFormat::Millis, false),
key.lamport,
key.stream,
)
}
EventMeta::Synthetic => f.write_str("synthetic event"),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct OffsetMapResponse {
pub offsets: OffsetMap,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct PublishEvent {
pub tags: TagSet,
pub payload: Payload,
}
impl From<(TagSet, Payload)> for PublishEvent {
fn from(value: (TagSet, Payload)) -> Self {
Self {
tags: value.0,
payload: value.1,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct PublishRequest {
pub data: Vec<PublishEvent>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct PublishResponseKey {
pub lamport: LamportTimestamp,
pub stream: StreamId,
pub offset: Offset,
pub timestamp: Timestamp,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct PublishResponse {
pub data: Vec<PublishResponseKey>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct SessionId(Box<str>);
impl Display for SessionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
impl From<&str> for SessionId {
fn from(s: &str) -> Self {
Self(s.into())
}
}
impl From<String> for SessionId {
fn from(s: String) -> Self {
Self(s.into())
}
}
impl SessionId {
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeMonotonicRequest {
pub query: String,
pub session: SessionId,
pub lower_bound: OffsetMap,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum SubscribeMonotonicResponse {
#[serde(rename_all = "camelCase")]
Event {
#[serde(flatten)]
event: EventResponse<Payload>,
caught_up: bool,
},
#[serde(rename_all = "camelCase")]
Offsets(OffsetMapResponse),
#[serde(rename_all = "camelCase")]
TimeTravel { new_start: EventKey },
#[serde(rename_all = "camelCase")]
Diagnostic(Diagnostic),
#[serde(other)]
FutureCompat,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum QueryResponse {
#[serde(rename_all = "camelCase")]
Event(EventResponse<Payload>),
#[serde(rename_all = "camelCase")]
Offsets(OffsetMapResponse),
#[serde(rename_all = "camelCase")]
Diagnostic(Diagnostic),
#[serde(other)]
FutureCompat,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum SubscribeResponse {
#[serde(rename_all = "camelCase")]
Event(EventResponse<Payload>),
#[serde(rename_all = "camelCase")]
AntiEvent(EventResponse<Payload>),
#[serde(rename_all = "camelCase")]
Offsets(OffsetMapResponse),
#[serde(rename_all = "camelCase")]
Diagnostic(Diagnostic),
#[serde(other)]
FutureCompat,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, derive_more::Display)]
#[serde(rename_all = "camelCase")]
#[display(fmt = "{:?} - {}", severity, message)]
pub struct Diagnostic {
pub severity: Severity,
pub message: String,
}
impl Diagnostic {
pub fn warn(message: String) -> Self {
Self {
severity: Severity::Warning,
message,
}
}
pub fn error(message: String) -> Self {
Self {
severity: Severity::Error,
message,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum Severity {
Warning,
Error,
#[serde(other)]
FutureCompat,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct OffsetsResponse {
pub present: OffsetMap,
pub to_replicate: BTreeMap<StreamId, NonZeroU64>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{app_id, tags, AppId, NodeId};
use quickcheck::quickcheck;
#[derive(Debug, Serialize, Deserialize, Clone, Ord, PartialOrd, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
struct EventResponseV1<T> {
lamport: LamportTimestamp,
stream: StreamId,
offset: Offset,
timestamp: Timestamp,
tags: TagSet,
app_id: AppId,
payload: T,
}
#[test]
fn future_compat() {
assert_eq!(
serde_json::from_str::<QueryResponse>(r#"{"type":"fromTheFuture","x":42}"#).unwrap(),
QueryResponse::FutureCompat
);
assert_eq!(
serde_json::from_str::<SubscribeResponse>(r#"{"type":"fromTheFuture","x":42}"#).unwrap(),
SubscribeResponse::FutureCompat
);
assert_eq!(
serde_json::from_str::<SubscribeMonotonicResponse>(r#"{"type":"fromTheFuture","x":42}"#).unwrap(),
SubscribeMonotonicResponse::FutureCompat
);
}
#[test]
fn event_response_compat() {
let stream = NodeId::from_bytes(b"abcdefghijklmnopqrstuvwxyz123456")
.unwrap()
.stream(12.into());
let lamport = LamportTimestamp::from(42);
let offset = Offset::from(43);
let timestamp = Timestamp::from(44);
let tags = tags!("a1", "b2");
let app_id = app_id!("tester");
let payload = Payload::from_json_str("100").unwrap();
let old = serde_json::to_string(&EventResponseV1 {
lamport,
stream,
offset,
timestamp,
tags: tags.clone(),
app_id: app_id.clone(),
payload: payload.clone(),
})
.unwrap();
assert_eq!(
serde_json::from_str::<EventResponse<Payload>>(&old).unwrap(),
EventResponse {
meta: EventMeta::Event {
key: EventKey {
lamport,
stream,
offset,
},
meta: Metadata {
timestamp,
tags: tags.clone(),
app_id: app_id.clone(),
}
},
payload: payload.clone(),
}
);
let old_synthetic = serde_json::to_string(&EventResponseV1 {
lamport,
stream,
offset,
timestamp: 0.into(),
tags: tags.clone(),
app_id: app_id.clone(),
payload: payload.clone(),
})
.unwrap();
assert_eq!(
serde_json::from_str::<EventResponse<Payload>>(&old_synthetic).unwrap(),
EventResponse {
meta: EventMeta::Synthetic,
payload: payload.clone(),
}
);
let new_synthetic = serde_json::to_string(&EventResponse {
meta: EventMeta::Synthetic,
payload: payload.clone(),
})
.unwrap();
assert_eq!(
serde_json::from_str::<EventResponseV1<Payload>>(&new_synthetic).unwrap(),
EventResponseV1 {
lamport: 0.into(),
stream: NodeId::default().stream(0.into()),
offset: 0.into(),
timestamp: 0.into(),
tags: tags!(),
app_id: app_id!("none"),
payload: payload.clone(),
}
);
let new_event = serde_json::to_string(&EventResponse {
meta: EventMeta::Event {
key: EventKey {
lamport,
stream,
offset,
},
meta: Metadata {
timestamp,
tags: tags.clone(),
app_id: app_id.clone(),
},
},
payload: payload.clone(),
})
.unwrap();
assert_eq!(
serde_json::from_str::<EventResponseV1<Payload>>(&new_event).unwrap(),
EventResponseV1 {
lamport,
stream,
offset,
timestamp,
tags,
app_id,
payload: payload.clone(),
}
);
let new_range = serde_json::to_string(&EventResponse {
meta: EventMeta::Range {
from_key: EventKey {
lamport,
stream,
offset,
},
to_key: EventKey {
lamport,
stream,
offset,
},
from_time: timestamp,
to_time: timestamp,
},
payload: payload.clone(),
})
.unwrap();
assert_eq!(
serde_json::from_str::<EventResponseV1<Payload>>(&new_range).unwrap(),
EventResponseV1 {
lamport: 0.into(),
stream: NodeId::default().stream(0.into()),
offset: 0.into(),
timestamp: 0.into(),
tags: tags!(),
app_id: app_id!("none"),
payload,
}
);
}
quickcheck! {
fn event_meta_merge(m: Vec<EventMeta>) -> bool {
let mut em = EventMeta::Synthetic;
let mut min_key = None;
let mut max_key = None;
let mut min_time = None;
let mut max_time = None;
for m in m {
if m != EventMeta::Synthetic {
let min = m.left();
let max = m.right();
min_key = min_key.map(|k: EventKey| k.min(min.0)).or(Some(min.0));
max_key = max_key.map(|k: EventKey| k.max(max.0)).or(Some(max.0));
min_time = min_time.map(|k: Timestamp| k.min(min.1)).or(Some(min.1));
max_time = max_time.map(|k: Timestamp| k.max(max.1)).or(Some(max.1));
}
em += &m;
}
let (from_key, from_time) = em.left();
let (to_key, to_time) = em.right();
em == EventMeta::Synthetic && min_key.is_none() ||
min_key == max_key && min_time == max_time
&& matches!(em, EventMeta::Event { key, meta: Metadata { timestamp, .. }}
if key == min_key.unwrap() && timestamp == min_time.unwrap()) ||
em == EventMeta::Range { from_key, to_key, from_time, to_time }
}
}
}