restrepo 0.5.12

A collection of components for building restful webservices with actix-web
Documentation
use crate::server::{ApiErrorV1, error_codes};
use serde::Serialize;
use tokio::sync::mpsc;

use topmesys::{EventEmitter, EventMessage, EventSubmission, EventTopic};

/// Represents api resource related events published to the event bus by crud
/// service implementations. Consists of a crud verb and the affected resource name.
#[derive(Debug)]
pub enum CrudEvent {
    Create(String),
    Read(String),
    Update(String),
    Delete(String),
}

/// Represents the submission side of the event bus channel for use within request contexts, tasks etc.
/// Takes a [mpsc::Sender<EventMessage>] and a `subject_prefix` which will be
/// prepended to all `subject`s passed to [CrudEventDispatcher::publish] or [CrudEventDispatcher::publish_many].
#[derive(Clone, Debug)]
pub struct CrudEventDispatcher {
    channel: mpsc::Sender<EventMessage>,
    subject_prefix: String,
}

impl EventEmitter for CrudEventDispatcher {
    fn get_sender(&self) -> &mpsc::Sender<EventMessage> {
        &self.channel
    }
}

impl CrudEventDispatcher {
    pub fn new(channel: mpsc::Sender<EventMessage>, subject_prefix: impl Into<String>) -> Self {
        Self {
            channel,
            subject_prefix: subject_prefix.into(),
        }
    }

    fn make_topic_text(&self, crud_event: &CrudEvent) -> String {
        let (event_name, resource_name) = match crud_event {
            CrudEvent::Create(name) => ("create", name),
            CrudEvent::Read(name) => ("read", name),
            CrudEvent::Update(name) => ("update", name),
            CrudEvent::Delete(name) => ("delete", name),
        };
        format!("{}.{resource_name}.{event_name}", self.subject_prefix)
    }

    /// Collects a number of [Serialize] types under the same [CrudEvent] into a [EventSubmission::Batch] and submits it to the event bus.
    pub async fn publish_many(
        &self,
        crud_event: &CrudEvent,
        content: &Vec<impl Serialize>,
    ) -> Result<(), ApiErrorV1> {
        let message_base = EventMessage::default()
            .with_topic(EventTopic::new(self.make_topic_text(crud_event)).as_routing_key()?);
        let mut messages = Vec::default();
        for ed in content {
            let content = serde_json::to_vec(ed)?;
            messages.push(message_base.clone().with_content(content));
        }
        Ok(self
            .submit_event(EventSubmission::from_iter(messages))
            .await?)
    }

    /// Creates an [EventSubmission::Single] from a [CrudEvent] and any [Serialize] type and submits it to the event bus.
    pub async fn publish(
        &self,
        crud_event: &CrudEvent,
        content: &impl Serialize,
    ) -> Result<(), ApiErrorV1> {
        let content = serde_json::to_vec(content).map_err(|e| ApiErrorV1::MalformedDataError {
            detail: e.to_string(),
            code: error_codes::SERVICE_ERROR.to_string(),
        })?;
        let msg = EventMessage::new(self.make_topic_text(crud_event), content).map_err(|e| {
            ApiErrorV1::MalformedDataError {
                detail: e.to_string(),
                code: error_codes::SERVICE_ERROR.to_string(),
            }
        })?;
        Ok(self.submit_event(EventSubmission::Single(msg)).await?)
    }
}

#[cfg(test)]
mod tests {
    use topmesys::EventMessage;

    use super::{CrudEvent, CrudEventDispatcher};

    #[test]
    fn test_resource_event_dispatcher_topic() {
        let (tx, _) = tokio::sync::mpsc::channel::<EventMessage>(12);

        let event_dispatcher = CrudEventDispatcher::new(tx, "test");

        let create = CrudEvent::Create("foo".to_string());
        let read = CrudEvent::Read("foo".to_string());
        let update = CrudEvent::Update("foo".to_string());
        let delete = CrudEvent::Delete("foo".to_string());

        assert_eq!(event_dispatcher.make_topic_text(&create), "test.foo.create");
        assert_eq!(event_dispatcher.make_topic_text(&read), "test.foo.read");
        assert_eq!(event_dispatcher.make_topic_text(&update), "test.foo.update");
        assert_eq!(event_dispatcher.make_topic_text(&delete), "test.foo.delete");
    }

    #[tokio::test]
    async fn test_resource_event_dispatcher_publishing() {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<EventMessage>(12);

        let event_dispatcher = CrudEventDispatcher::new(tx, "test");

        let many_content = vec![
            serde_json::json!({"number": 1, "title": "test payload"}),
            serde_json::json!({"number": 2, "title": "test payload"}),
        ];
        let single_content = serde_json::json!({"number": 3, "title": "test payload"});

        event_dispatcher
            .publish_many(&CrudEvent::Create("dispatcher".to_string()), &many_content)
            .await
            .unwrap();

        event_dispatcher
            .publish(
                &CrudEvent::Create("dispatcher".to_string()),
                &single_content,
            )
            .await
            .unwrap();

        let mut buf = Vec::with_capacity(rx.capacity());
        rx.recv_many(&mut buf, rx.capacity()).await;

        assert_eq!(
            buf.pop().unwrap().content().as_ref(),
            r#"{"number":3,"title":"test payload"}"#.as_bytes()
        );
        assert_eq!(
            buf.pop().unwrap().content().as_ref(),
            r#"{"number":2,"title":"test payload"}"#.as_bytes()
        );
        assert_eq!(
            buf.pop().unwrap().content().as_ref(),
            r#"{"number":1,"title":"test payload"}"#.as_bytes()
        );
    }
}