use crate::ack::{AckHandle, AckType};
use crate::binary::AttachmentsBuilder;
use crate::client::Emit;
use crate::error::{EventError, PayloadError};
use crate::marker::{AckMarker, BinaryMarker, HasAck, HasBinary, NoAck, NoBinary};
use crate::packet::{Directive, DynEvent};
use crate::payload::{DeserializePayload, SerializePayload, event_from_json, event_to_json};
use bytes::Bytes;
use tokio::sync::oneshot;
pub trait EventType: Sized {
const NAME: &'static str;
type Ack: AckMarker;
type Binary: BinaryMarker;
}
pub struct Event<E>
where
E: EventType,
{
pub payload: E,
pub id: <E::Ack as AckMarker>::Id,
pub attachments: <E::Binary as BinaryMarker>::Attachments,
}
impl<E> std::fmt::Debug for Event<E>
where
E: std::fmt::Debug + EventType,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut map = f.debug_map();
map.entry(&"payload", &self.payload);
E::Ack::format(&self.id, &mut map);
E::Binary::format(&self.attachments, &mut map);
map.finish()
}
}
impl<E> TryFrom<DynEvent> for Event<E>
where
E: EventType + DeserializePayload,
{
type Error = EventError;
fn try_from(value: DynEvent) -> Result<Self, EventError> {
let payload = event_from_json(&value.payload)?;
let id = E::Ack::parse(value.id)?;
let attachments = E::Binary::parse(value.attachments)?;
Ok(Self {
payload,
id,
attachments,
})
}
}
pub trait EventHandler: Sized {
type Payload: EventType;
fn handle(
payload: Self::Payload,
id: Option<u64>,
attachments: Option<Vec<Bytes>>,
) -> Result<Self, EventError>;
}
pub trait EventRouter: TryFrom<DynEvent, Error = EventError> {
fn name(&self) -> &'static str;
}
impl<E> EventHandler for Event<E>
where
E: EventType + DeserializePayload,
{
type Payload = E;
fn handle(
payload: Self::Payload,
id: Option<u64>,
attachments: Option<Vec<Bytes>>,
) -> Result<Self, EventError> {
let id = E::Ack::parse(id)?;
let attachments = E::Binary::parse(attachments)?;
Ok(Self {
payload,
id,
attachments,
})
}
}
impl<E> Emit<NoAck, NoBinary> for E
where
E: EventType<Ack = NoAck, Binary = NoBinary> + SerializePayload,
{
type Output = ();
fn prepare(self) -> Result<(Directive, ()), PayloadError> {
let payload = event_to_json(&self)?;
Ok((
Directive::Event {
payload: payload.into(),
tx: None,
attachments: None,
},
(),
))
}
}
impl<E, A> Emit<HasAck<A>, NoBinary> for E
where
E: EventType<Ack = HasAck<A>, Binary = NoBinary> + SerializePayload,
A: AckType,
{
type Output = AckHandle<A>;
fn prepare(self) -> Result<(Directive, AckHandle<A>), PayloadError> {
let (tx, rx) = oneshot::channel();
let payload = event_to_json(&self)?.into();
Ok((
Directive::Event {
payload,
tx: Some(tx),
attachments: None,
},
AckHandle::new(rx),
))
}
}
impl<F, E> Emit<NoAck, HasBinary> for F
where
F: FnOnce(&mut AttachmentsBuilder) -> E,
E: EventType<Ack = NoAck, Binary = HasBinary> + SerializePayload,
{
type Output = ();
fn prepare(self) -> Result<(Directive, ()), PayloadError> {
let mut builder = AttachmentsBuilder::new();
let payload = event_to_json(&self(&mut builder))?.into();
Ok((
Directive::Event {
payload,
tx: None,
attachments: Some(builder.finish()),
},
(),
))
}
}
impl<F, E, A> Emit<HasAck<A>, HasBinary> for F
where
F: FnOnce(&mut AttachmentsBuilder) -> E,
E: EventType<Ack = HasAck<A>, Binary = HasBinary> + SerializePayload,
A: AckType,
{
type Output = AckHandle<A>;
fn prepare(self) -> Result<(Directive, AckHandle<A>), PayloadError> {
let (tx, rx) = oneshot::channel();
let mut builder = AttachmentsBuilder::new();
let payload = event_to_json(&self(&mut builder))?.into();
Ok((
Directive::Event {
payload,
tx: Some(tx),
attachments: Some(builder.finish()),
},
AckHandle::new(rx),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::binary::AttachmentsBuilder;
use crate::client::Emit;
use crate::error::{AckIdError, AttachmentsError, EventError};
use crate::marker::{HasAck, HasBinary, NoAck, NoBinary};
use crate::packet::DynEvent;
use crate::payload::{DeserializePayload, SerializePayload};
use bytes::Bytes;
use bytestring::ByteString;
fn bss(s: &'static str) -> ByteString {
ByteString::from_static(s)
}
macro_rules! test_event {
($name:ident, ack = $ack:ty, binary = $binary:ty) => {
#[derive(Debug, PartialEq)]
struct $name;
impl EventType for $name {
const NAME: &'static str = "ping";
type Ack = $ack;
type Binary = $binary;
}
impl DeserializePayload for $name {
fn deserialize_payload<'de, S>(seq: &mut S) -> Result<Self, S::Error>
where
S: serde::de::SeqAccess<'de>,
{
while let Some(serde::de::IgnoredAny) = seq.next_element()? {}
Ok($name)
}
}
impl SerializePayload for $name {
fn serialize_payload<S>(&self, _seq: &mut S) -> Result<(), S::Error>
where
S: serde::ser::SerializeSeq,
{
Ok(())
}
}
};
}
test_event!(Ping, ack = NoAck, binary = NoBinary);
test_event!(PingWithAck, ack = HasAck<()>, binary = NoBinary);
test_event!(PingWithBinary, ack = NoAck, binary = HasBinary);
test_event!(PingWithAckAndBinary, ack = HasAck<()>, binary = HasBinary);
fn ping_event(id: Option<u64>, attachments: Option<Vec<Bytes>>) -> DynEvent {
DynEvent {
payload: bss(r#"["ping"]"#),
id,
attachments,
}
}
#[test]
fn try_from_with_ack_and_binary_succeeds() {
let att = vec![Bytes::from_static(b"\xFF")];
let ev: Event<PingWithAckAndBinary> = DynEvent {
payload: bss(r#"["ping"]"#),
id: Some(1),
attachments: Some(att),
}
.try_into()
.unwrap();
assert_eq!(ev.id.get(), 1);
}
#[test]
fn try_from_basic_event_succeeds() {
let ev: Event<Ping> = ping_event(None, None).try_into().unwrap();
assert_eq!(ev.payload, Ping);
}
#[test]
fn try_from_wrong_name_fails() {
let dyn_ev = DynEvent {
payload: bss(r#"["pong"]"#),
id: None,
attachments: None,
};
assert!(matches!(
Event::<Ping>::try_from(dyn_ev),
Err(EventError::Payload(_))
));
}
#[test]
fn try_from_unexpected_ack_id_fails() {
assert!(matches!(
Event::<Ping>::try_from(ping_event(Some(1), None)),
Err(EventError::AckId(AckIdError::Unexpected))
));
}
#[test]
fn try_from_missing_ack_id_fails() {
assert!(matches!(
Event::<PingWithAck>::try_from(ping_event(None, None)),
Err(EventError::AckId(AckIdError::Missing))
));
}
#[test]
fn try_from_with_ack_id_succeeds() {
let ev: Event<PingWithAck> = ping_event(Some(5), None).try_into().unwrap();
assert_eq!(ev.id.get(), 5);
}
#[test]
fn try_from_unexpected_attachments_fails() {
assert!(matches!(
Event::<Ping>::try_from(ping_event(None, Some(vec![Bytes::from_static(b"x")]))),
Err(EventError::Attachments(AttachmentsError::Unexpected))
));
}
#[test]
fn try_from_missing_attachments_fails() {
assert!(matches!(
Event::<PingWithBinary>::try_from(ping_event(None, None)),
Err(EventError::Attachments(AttachmentsError::Missing))
));
}
#[test]
fn try_from_with_attachments_succeeds() {
let att = vec![Bytes::from_static(b"\xFF")];
let ev: Event<PingWithBinary> = ping_event(None, Some(att)).try_into().unwrap();
assert_eq!(ev.attachments.len(), 1);
}
#[test]
fn event_handler_handle_succeeds() {
let ev = Event::<Ping>::handle(Ping, None, None).unwrap();
assert_eq!(ev.payload, Ping);
}
#[test]
fn event_handler_handle_bad_ack_fails() {
assert!(Event::<Ping>::handle(Ping, Some(1), None).is_err());
}
#[test]
fn emit_no_ack_no_binary_prepare() {
let (directive, ()) = Ping.prepare().unwrap();
let Directive::Event {
payload,
tx,
attachments,
} = directive
else {
panic!("expected Event directive");
};
assert_eq!(&payload[..], r#"["ping"]"#);
assert!(tx.is_none());
assert!(attachments.is_none());
}
#[test]
fn emit_has_ack_no_binary_prepare() {
let (directive, _handle) = PingWithAck.prepare().unwrap();
let Directive::Event {
payload,
tx,
attachments,
} = directive
else {
panic!("expected Event directive");
};
assert_eq!(&payload[..], r#"["ping"]"#);
assert!(tx.is_some());
assert!(attachments.is_none());
}
#[test]
fn emit_no_ack_has_binary_prepare() {
let closure = |builder: &mut AttachmentsBuilder| {
let _p = builder.attach(Bytes::from_static(b"\xFF"));
PingWithBinary
};
let (directive, ()) = closure.prepare().unwrap();
let Directive::Event {
payload,
tx,
attachments,
} = directive
else {
panic!("expected Event directive");
};
assert_eq!(&payload[..], r#"["ping"]"#);
assert!(tx.is_none());
let att = attachments.expect("expected attachments");
assert_eq!(att.len(), 1);
}
#[test]
fn emit_has_ack_has_binary_prepare() {
let closure = |builder: &mut AttachmentsBuilder| {
let _p = builder.attach(Bytes::from_static(b"\xFF"));
PingWithAckAndBinary
};
let (directive, _handle) = closure.prepare().unwrap();
let Directive::Event {
payload,
tx,
attachments,
} = directive
else {
panic!("expected Event directive");
};
assert_eq!(&payload[..], r#"["ping"]"#);
assert!(tx.is_some());
let att = attachments.expect("expected attachments");
assert_eq!(att.len(), 1);
}
}