use crate::libp2p_streaming_response::Codec;
use ax_types::{
service::{
Diagnostic, EventResponse, OffsetsResponse, PublishRequest, PublishResponse, QueryRequest,
SubscribeMonotonicRequest, SubscribeRequest,
},
OffsetMap, Payload,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
pub struct EventsProtocol;
impl Codec for EventsProtocol {
type Request = EventsRequest;
type Response = EventsResponse;
fn info_v1() -> &'static str {
"/actyx/events/v2"
}
fn info_v2() -> &'static [&'static str] {
&["/actyx/events/v3"]
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum EventsRequest {
Offsets,
Query(QueryRequest),
Subscribe(SubscribeRequest),
SubscribeMonotonic(SubscribeMonotonicRequest),
Publish(PublishRequest),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum EventsResponse {
Error {
message: String,
},
Offsets(OffsetsResponse),
Event(EventResponse<Payload>),
AntiEvent(EventResponse<Payload>),
OffsetMap {
offsets: OffsetMap,
},
Publish(PublishResponse),
Diagnostic(Diagnostic),
#[serde(other)]
FutureCompat,
}
#[cfg(test)]
mod tests {
use super::*;
use ax_types::{
app_id, service::Severity, tags, Event, EventKey, LamportTimestamp, Metadata, NodeId, Offset, Timestamp,
};
use std::collections::BTreeMap;
fn req(req: EventsRequest) -> String {
serde_json::to_string(&req).unwrap()
}
fn res(res: EventsResponse) -> String {
serde_json::to_string(&res).unwrap()
}
#[test]
fn requests() {
assert_eq!(req(EventsRequest::Offsets), r#"{"type":"offsets"}"#);
assert_eq!(
req(EventsRequest::Query(QueryRequest {
lower_bound: None,
upper_bound: None,
query: "FROM allEvents".parse().unwrap(),
order: ax_types::service::Order::Asc
})),
r#"{"type":"query","query":"FROM allEvents","lowerBound":null,"upperBound":null,"order":"asc"}"#
);
assert_eq!(
req(EventsRequest::Subscribe(SubscribeRequest {
lower_bound: None,
query: "FROM allEvents".parse().unwrap(),
})),
r#"{"type":"subscribe","query":"FROM allEvents","lowerBound":null}"#
);
assert_eq!(
req(EventsRequest::SubscribeMonotonic(SubscribeMonotonicRequest {
session: "".into(),
lower_bound: OffsetMap::default(),
query: "FROM allEvents".parse().unwrap(),
})),
r#"{"type":"subscribeMonotonic","query":"FROM allEvents","session":"","lowerBound":{}}"#
);
}
fn ev(n: u32) -> EventResponse<Payload> {
let key = EventKey {
lamport: LamportTimestamp::default(),
stream: NodeId::from_bytes(&[0; 32]).unwrap().stream(0.into()),
offset: Offset::default(),
};
let meta = Metadata {
timestamp: Timestamp::new(12),
tags: tags!("a", "b"),
app_id: app_id!("app"),
};
let payload = Payload::from_json_str(&format!("{}", n)).unwrap();
Event { key, meta, payload }.into()
}
#[test]
fn responses() {
assert_eq!(
res(EventsResponse::Error { message: "haha".into() }),
r#"{"type":"error","message":"haha"}"#
);
assert_eq!(
res(EventsResponse::Event(ev(3))),
r#"{"type":"event","lamport":0,"stream":"...........................................-0","offset":0,"timestamp":12,"tags":["a","b"],"appId":"app","payload":3}"#
);
assert_eq!(
res(EventsResponse::Diagnostic(Diagnostic {
severity: Severity::Warning,
message: "buh".to_owned()
})),
r#"{"type":"diagnostic","severity":"warning","message":"buh"}"#
);
assert_eq!(
serde_json::from_str::<EventsResponse>(r#"{"type":"diagnostic","severity":"warning","message":"buh"}"#)
.unwrap(),
EventsResponse::Diagnostic(Diagnostic {
severity: Severity::Warning,
message: "buh".to_owned()
})
);
assert_eq!(
res(EventsResponse::OffsetMap {
offsets: OffsetMap::default()
}),
r#"{"type":"offsetMap","offsets":{}}"#
);
assert_eq!(
res(EventsResponse::Offsets(OffsetsResponse {
present: OffsetMap::default(),
to_replicate: BTreeMap::default()
})),
r#"{"type":"offsets","present":{},"toReplicate":{}}"#
);
assert_eq!(
res(EventsResponse::Publish(PublishResponse { data: vec![] })),
r#"{"type":"publish","data":[]}"#
);
}
#[test]
fn future_compat() {
assert_eq!(
serde_json::from_str::<EventsResponse>(r#"{"type":"fromTheFuture","a":null}"#).unwrap(),
EventsResponse::FutureCompat,
);
}
}