timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
use std::fmt::Debug;

#[cfg(feature = "binary")]
use lazy_static::lazy_static;
use timesource_core::{Persisted, PersistedError, TimesourceEventPayload};

#[derive(Debug)]
#[cfg_attr(test, derive(Clone, PartialEq))]
pub struct EventNotification<T> {
    pub prior_event_id: Option<u64>,
    pub event: Persisted<T>,
}

/// Postgres `encode` will add new lines when data exceeds line limit
/// Create encoding to manage it
#[cfg(feature = "binary")]
fn base_64_postgres_encoding() -> data_encoding::Encoding {
    let mut spec = data_encoding::BASE64.specification();
    spec.ignore.push('\n');
    spec.encoding().unwrap()
}

#[cfg(feature = "binary")]
lazy_static! {
    static ref BASE64_POSTGRES: data_encoding::Encoding = base_64_postgres_encoding();
}

impl<T> TryFrom<&str> for EventNotification<T>
where
    T: TimesourceEventPayload,
{
    type Error = PersistedError;

    fn try_from(notification: &str) -> Result<Self, Self::Error> {
        let data: heapless::Vec<&str, 6> = notification.splitn(6, ',').collect();

        cfg_if::cfg_if! {
            if #[cfg(feature = "binary")] {
                let input = data[4];
                let mut buf =  vec![0; BASE64_POSTGRES.decode_len(input.len()).unwrap()];
                BASE64_POSTGRES.decode_mut(input.as_bytes(), &mut buf).unwrap();
                let bytes = Some(buf);
            } else {
                let bytes: Option<Vec<u8>> = None;
            }
        }

        cfg_if::cfg_if! {
            if #[cfg(feature = "json")] {
                let text = data.get(5).and_then(|x| {
                    if x.is_empty() {
                        None
                    } else {
                        Some(*x)
                    }
                });
            } else {
                let text: Option<&str> = None;
            }
        }

        let event = Persisted::new(
            data[1].parse()?,
            T::decode_from_payload(text, bytes.as_deref())?,
            data[2].parse()?,
            data[3].parse()?,
        );

        Ok(Self {
            prior_event_id: data
                .get(0)
                .and_then(|x| if x.is_empty() { None } else { Some(x.parse()) })
                .transpose()?,
            event,
        })
    }
}

#[cfg(test)]
mod tests {
    use std::str::FromStr;

    use super::*;
    use crate as timesource;
    use minicbor::{Decode, Encode};
    use serde::{Deserialize, Serialize};
    use timesource_derive::TimesourceEvent;
    use uuid::Uuid;

    #[derive(Encode, Decode, TimesourceEvent, PartialEq, Debug)]
    #[timesource(encoding = "cbor")]
    enum TdbEventCbor {
        #[b(0)]
        Created,
    }

    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
    pub struct OrderItem {
        pub item_sku: String,
        pub quantity: u8,
        pub price: i32,
    }

    #[derive(Serialize, Deserialize, TimesourceEvent, PartialEq, Debug)]
    enum TdbEventJson {
        ItemAdded { item: OrderItem },
    }

    #[test]
    fn should_parse_notification_string_for_first_root() {
        let notification = ",6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,ggCA,";
        let parsed = EventNotification::try_from(notification).unwrap();
        let expected = EventNotification {
            prior_event_id: None,
            event: Persisted::new(
                Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
                TdbEventCbor::Created,
                64,
                1639598252193019372,
            ),
        };
        assert_eq!(parsed, expected);
    }

    #[test]
    fn should_parse_bytes_notification_string() {
        let notification = "63,6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,ggCA,";
        let parsed = EventNotification::try_from(notification).unwrap();
        let expected = EventNotification {
            prior_event_id: Some(63),
            event: Persisted::new(
                Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
                TdbEventCbor::Created,
                64,
                1639598252193019372,
            ),
        };
        assert_eq!(parsed, expected);
    }

    #[test]
    fn should_parse_bytes_notification_string_with_new_lines() {
        let notification = "63,6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,gg\nCA,";
        let parsed = EventNotification::try_from(notification).unwrap();
        let expected = EventNotification {
            prior_event_id: Some(63),
            event: Persisted::new(
                Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
                TdbEventCbor::Created,
                64,
                1639598252193019372,
            ),
        };
        assert_eq!(parsed, expected);
    }

    #[test]
    fn should_parse_json_notification_string() {
        let notification = "63,6e909e1d-0139-493d-8627-0becb94e6c72,64,1639598252193019372,,{\"ItemAdded\": {\"item\": {\"price\": 12, \"item_sku\": \"sku-123\", \"quantity\": 1}}}";
        let parsed = EventNotification::try_from(notification).unwrap();
        let expected = EventNotification {
            prior_event_id: Some(63),
            event: Persisted::new(
                Uuid::from_str("6e909e1d-0139-493d-8627-0becb94e6c72").unwrap(),
                TdbEventJson::ItemAdded {
                    item: OrderItem {
                        item_sku: "sku-123".into(),
                        quantity: 1,
                        price: 12,
                    },
                },
                64,
                1639598252193019372,
            ),
        };
        assert_eq!(parsed, expected);
    }
}