integrationos-domain 8.0.0

Shared library for IntegrationOS
Documentation
pub mod duplicates;
pub mod event_access;
pub mod event_response;
pub mod event_state;
pub mod event_with_context;
pub mod hashes;

use chrono::{DateTime, SubsecRound, Utc};
use http::HeaderMap;
use serde::{Deserialize, Serialize};

use crate::id::{prefix::IdPrefix, Id};

use self::{
    duplicates::Duplicates,
    event_state::EventState,
    hashes::{HashValue, Hashes},
};

use super::{
    access_key::{encrypted_access_key::EncryptedAccessKey, AccessKey},
    configuration::environment::Environment,
    shared::{ownership::Ownership, record_metadata::RecordMetadata},
};

#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[cfg_attr(feature = "dummy", derive(fake::Dummy))]
#[serde(rename_all = "camelCase")]
pub struct Event {
    #[serde(rename = "_id")]
    pub id: Id,
    pub key: Id,
    pub name: String,
    pub r#type: String,
    pub group: String,
    pub access_key: String,
    pub topic: String,
    pub environment: Environment,
    pub body: String,
    #[serde(with = "http_serde_ext::header_map")]
    pub headers: HeaderMap,
    #[serde(with = "chrono::serde::ts_milliseconds")]
    pub arrived_at: DateTime<Utc>,
    pub arrived_date: DateTime<Utc>,
    pub state: EventState,
    pub ownership: Ownership,
    pub hashes: [HashValue; 3],
    pub payload_byte_length: usize,
    #[serde(skip_serializing_if = "Option::is_none", default)]
    pub duplicates: Option<Duplicates>,
    #[serde(flatten, default)]
    pub record_metadata: RecordMetadata,
}

#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[cfg_attr(feature = "dummy", derive(fake::Dummy))]
#[serde(rename_all = "camelCase")]
pub struct PublicEvent {
    #[serde(rename = "_id")]
    pub id: Id,
    pub key: Id,
    pub name: String,
    pub r#type: String,
    pub group: String,
    pub topic: String,
    pub environment: Environment,
    pub body: String,
    #[serde(with = "http_serde_ext::header_map")]
    pub headers: HeaderMap,
    #[serde(with = "chrono::serde::ts_milliseconds")]
    pub arrived_at: DateTime<Utc>,
    pub arrived_date: DateTime<Utc>,
    pub state: EventState,
    pub ownership: Ownership,
    pub hashes: [HashValue; 3],
    pub payload_byte_length: usize,
    #[serde(skip_serializing_if = "Option::is_none", default)]
    pub duplicates: Option<Duplicates>,
    #[serde(flatten, default)]
    pub record_metadata: RecordMetadata,
}

struct IntermediateEventFields<'a> {
    access_key: &'a AccessKey,
    encrypted_access_key: &'a EncryptedAccessKey<'a>,
    event_name: &'a str,
    headers: HeaderMap,
    body: String,
    timestamp: DateTime<Utc>,
    id: Id,
    key: Id,
}

impl Event {
    pub fn new(
        access_key: &AccessKey,
        encrypted_access_key: &EncryptedAccessKey,
        event_name: &str,
        headers: HeaderMap,
        body: String,
    ) -> Self {
        let timestamp = Utc::now().round_subsecs(3);
        let id = Id::new(IdPrefix::Event, timestamp);
        let key = Id::new(IdPrefix::EventKey, timestamp);
        let fields = IntermediateEventFields {
            access_key,
            encrypted_access_key,
            event_name,
            headers,
            body,
            timestamp,
            id,
            key,
        };
        Self::new_with_timestamp_and_ids(fields)
    }

    pub fn add_duplicates(mut self, duplicates: Duplicates) -> Self {
        self.duplicates = Some(duplicates);
        self
    }

    fn new_with_timestamp_and_ids(fields: IntermediateEventFields<'_>) -> Self {
        let topic = fields.access_key.get_topic(fields.event_name);
        let access_key_data = &fields.access_key.data;
        let environment = fields.access_key.prefix.environment;
        let state = EventState::Pending;
        let buildable_id = access_key_data.id.to_owned();
        let ownership = Ownership {
            id: buildable_id.clone().into(),
            client_id: buildable_id.to_owned(),
            organization_id: Some(buildable_id.to_owned()),
            project_id: Some(buildable_id.to_owned()),
            user_id: Some(buildable_id),
        };
        let hashes = Hashes::new(
            &topic,
            environment,
            &fields.body,
            &access_key_data.event_type,
            &access_key_data.group,
        );
        let hashes = hashes.get_hashes();

        let payload_byte_length = fields.body.len();
        Event {
            id: fields.id,
            key: fields.key,
            name: fields.event_name.to_owned(),
            r#type: fields.access_key.data.event_type.clone(),
            group: fields.access_key.data.group.clone(),
            access_key: fields.encrypted_access_key.to_string(),
            topic,
            environment,
            body: fields.body,
            headers: fields.headers,
            arrived_at: fields.timestamp,
            arrived_date: fields.timestamp,
            state,
            ownership,
            hashes,
            payload_byte_length,
            duplicates: None,
            record_metadata: Default::default(),
        }
    }

    pub fn to_public(self) -> PublicEvent {
        PublicEvent {
            id: self.id,
            key: self.key,
            name: self.name.clone(),
            r#type: self.r#type.clone(),
            group: self.group.clone(),
            topic: self.topic.clone(),
            environment: self.environment,
            body: self.body.clone(),
            headers: self.headers.clone(),
            arrived_at: self.arrived_at,
            arrived_date: self.arrived_date,
            state: self.state,
            ownership: self.ownership.clone(),
            hashes: self.hashes,
            payload_byte_length: self.payload_byte_length,
            duplicates: self.duplicates.clone(),
            record_metadata: self.record_metadata.clone(),
        }
    }
}

#[cfg(test)]
mod test {

    use crate::prelude::access_key::{
        access_key_data::AccessKeyData, access_key_prefix::AccessKeyPrefix, event_type::EventType,
    };

    use super::*;
    use chrono::TimeZone;
    use http::{HeaderMap, HeaderValue};
    use once_cell::sync::Lazy;
    use test::hashes::HashType;
    use uuid::Uuid;

    static HEADERS: Lazy<HeaderMap> = Lazy::new(|| {
        let mut headers = HeaderMap::new();
        headers.insert("foo", HeaderValue::from_static("bar"));
        headers.insert("baz", HeaderValue::from_static("qux"));
        headers
    });

    static ACCESS_KEY: Lazy<AccessKey> = Lazy::new(|| AccessKey {
        prefix: AccessKeyPrefix {
            environment: Environment::Test,
            event_type: EventType::Id,
            version: 1,
        },
        data: AccessKeyData {
            id: "foo".to_owned(),
            event_type: "bar".to_owned(),
            group: "baz".to_owned(),
            namespace: "qux".to_owned(),
            event_path: "quux".to_owned(),
            event_object_id_path: None,
            timestamp_path: None,
            parent_access_key: None,
        },
    });

    #[test]
    fn test_event_with_timestamp_and_ids() {
        let timestamp = Utc.timestamp_opt(0, 0).single().unwrap();
        let event_id = Id::new_with_uuid(IdPrefix::Event, timestamp, Uuid::nil());
        let key_id = Id::new_with_uuid(IdPrefix::EventKey, timestamp, Uuid::nil());
        let event_name = "event.received";
        let event = Event::new_with_timestamp_and_ids(IntermediateEventFields {
            access_key: &ACCESS_KEY,
            encrypted_access_key: &EncryptedAccessKey::parse("id_live_1_foo").unwrap(),
            event_name,
            headers: HEADERS.clone(),
            body: "hello world".to_owned(),
            timestamp,
            id: event_id,
            key: key_id,
        });
        assert_eq!(event.topic, ACCESS_KEY.get_topic(event_name));
        assert_eq!(event.environment, Environment::Test);
        assert_eq!(event.body, "hello world");
        assert_eq!(event.headers, *HEADERS);
        assert_eq!(event.arrived_at, timestamp);
        assert_eq!(event.state, EventState::Pending);
        assert_eq!(
            event.ownership,
            Ownership {
                id: ACCESS_KEY.data.id.to_owned().into(),
                client_id: ACCESS_KEY.data.id.to_owned(),
                organization_id: Some(ACCESS_KEY.data.id.to_owned()),
                project_id: Some(ACCESS_KEY.data.id.to_owned()),
                user_id: Some(ACCESS_KEY.data.id.to_owned())
            }
        );
        assert_eq!(
            event.hashes,
            [
                HashValue {
                    r#type: HashType::Body,
                    hash: "39c898e492b3eadc9798e23e28d8f89392c584ef4e495992e08a146d6b71a535"
                        .to_owned(),
                },
                HashValue {
                    r#type: HashType::Event,
                    hash: "fb6d7839ce31c8a72e3f9396c569bff26af7e10e361d9a731b813ec9a60693be"
                        .to_owned(),
                },
                HashValue {
                    r#type: HashType::ModelBody,
                    hash: "85ac81f9ee4268c027c6b35f4dbc613673280630ee85e676c005a5fe69b3be63"
                        .to_owned(),
                },
            ]
        );
        assert_eq!(event.payload_byte_length, 11);
    }

    #[test]
    fn test_event_serde() {
        let event = Event::new(
            &ACCESS_KEY,
            &EncryptedAccessKey::parse("id_live_1_foo").unwrap(),
            "event.received",
            HEADERS.clone(),
            "hello world".to_owned(),
        );
        let json = serde_json::to_string(&event).unwrap();
        let deserialized: Event = serde_json::from_str(&json).unwrap();
        assert_eq!(deserialized, event);
    }
}