use std::fmt::Debug;
#[cfg(feature = "binary")]
use lazy_static::lazy_static;
use timesource_core::{Persisted, PersistedError, TimesourceEventPayload};
#[derive(Debug)]
#[cfg_attr(test, derive(Clone, PartialEq))]
pub struct EventNotification<T> {
pub prior_event_id: Option<u64>,
pub event: Persisted<T>,
}
#[cfg(feature = "binary")]
fn base_64_postgres_encoding() -> data_encoding::Encoding {
let mut spec = data_encoding::BASE64.specification();
spec.ignore.push('\n');
spec.encoding().unwrap()
}
#[cfg(feature = "binary")]
lazy_static! {
static ref BASE64_POSTGRES: data_encoding::Encoding = base_64_postgres_encoding();
}
impl<T> TryFrom<&str> for EventNotification<T>
where
T: TimesourceEventPayload,
{
type Error = PersistedError;
fn try_from(notification: &str) -> Result<Self, Self::Error> {
let data: heapless::Vec<&str, 6> = notification.splitn(6, ',').collect();
cfg_if::cfg_if! {
if #[cfg(feature = "binary")] {
let input = data[4];
let mut buf = vec![0; BASE64_POSTGRES.decode_len(input.len()).unwrap()];
BASE64_POSTGRES.decode_mut(input.as_bytes(), &mut buf).unwrap();
let bytes = Some(buf);
} else {
let bytes: Option<Vec<u8>> = None;
}
}
cfg_if::cfg_if! {
if #[cfg(feature = "json")] {
let text = data.get(5).and_then(|x| {
if x.is_empty() {
None
} else {
Some(*x)
}
});
} else {
let text: Option<&str> = None;
}
}
let event = Persisted::new(
data[1].parse()?,
T::decode_from_payload(text, bytes.as_deref())?,
data[2].parse()?,
data[3].parse()?,
);
Ok(Self {
prior_event_id: data
.get(0)
.and_then(|x| if x.is_empty() { None } else { Some(x.parse()) })
.transpose()?,
event,
})
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use super::*;
use crate as timesource;
use minicbor::{Decode, Encode};
use serde::{Deserialize, Serialize};
use timesource_derive::TimesourceEvent;
use uuid::Uuid;
#[derive(Encode, Decode, TimesourceEvent, PartialEq, Debug)]
#[timesource(encoding = "cbor")]
enum TdbEventCbor {
#[b(0)]
Created,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OrderItem {
pub item_sku: String,
pub quantity: u8,
pub price: i32,
}
#[derive(Serialize, Deserialize, TimesourceEvent, PartialEq, Debug)]
enum TdbEventJson {
ItemAdded { item: OrderItem },
}
#[test]
fn should_parse_notification_string_for_first_root() {
let notification = ",6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,ggCA,";
let parsed = EventNotification::try_from(notification).unwrap();
let expected = EventNotification {
prior_event_id: None,
event: Persisted::new(
Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
TdbEventCbor::Created,
64,
1639598252193019372,
),
};
assert_eq!(parsed, expected);
}
#[test]
fn should_parse_bytes_notification_string() {
let notification = "63,6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,ggCA,";
let parsed = EventNotification::try_from(notification).unwrap();
let expected = EventNotification {
prior_event_id: Some(63),
event: Persisted::new(
Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
TdbEventCbor::Created,
64,
1639598252193019372,
),
};
assert_eq!(parsed, expected);
}
#[test]
fn should_parse_bytes_notification_string_with_new_lines() {
let notification = "63,6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,gg\nCA,";
let parsed = EventNotification::try_from(notification).unwrap();
let expected = EventNotification {
prior_event_id: Some(63),
event: Persisted::new(
Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
TdbEventCbor::Created,
64,
1639598252193019372,
),
};
assert_eq!(parsed, expected);
}
#[test]
fn should_parse_json_notification_string() {
let notification = "63,6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,,{\"ItemAdded\": {\"item\": {\"price\": 12, \"item_sku\": \"sku-123\", \"quantity\": 1}}}";
let parsed = EventNotification::try_from(notification).unwrap();
let expected = EventNotification {
prior_event_id: Some(63),
event: Persisted::new(
Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
TdbEventJson::ItemAdded {
item: OrderItem {
item_sku: "sku-123".into(),
quantity: 1,
price: 12,
},
},
64,
1639598252193019372,
),
};
assert_eq!(parsed, expected);
}
}