use crate::server::{ApiErrorV1, error_codes};
use serde::Serialize;
use tokio::sync::mpsc;
use topmesys::{EventEmitter, EventMessage, EventSubmission, EventTopic};
#[derive(Debug)]
pub enum CrudEvent {
Create(String),
Read(String),
Update(String),
Delete(String),
}
#[derive(Clone, Debug)]
pub struct CrudEventDispatcher {
channel: mpsc::Sender<EventMessage>,
subject_prefix: String,
}
impl EventEmitter for CrudEventDispatcher {
fn get_sender(&self) -> &mpsc::Sender<EventMessage> {
&self.channel
}
}
impl CrudEventDispatcher {
pub fn new(channel: mpsc::Sender<EventMessage>, subject_prefix: impl Into<String>) -> Self {
Self {
channel,
subject_prefix: subject_prefix.into(),
}
}
fn make_topic_text(&self, crud_event: &CrudEvent) -> String {
let (event_name, resource_name) = match crud_event {
CrudEvent::Create(name) => ("create", name),
CrudEvent::Read(name) => ("read", name),
CrudEvent::Update(name) => ("update", name),
CrudEvent::Delete(name) => ("delete", name),
};
format!("{}.{resource_name}.{event_name}", self.subject_prefix)
}
pub async fn publish_many(
&self,
crud_event: &CrudEvent,
content: &Vec<impl Serialize>,
) -> Result<(), ApiErrorV1> {
let message_base = EventMessage::default()
.with_topic(EventTopic::new(self.make_topic_text(crud_event)).as_routing_key()?);
let mut messages = Vec::default();
for ed in content {
let content = serde_json::to_vec(ed)?;
messages.push(message_base.clone().with_content(content));
}
Ok(self
.submit_event(EventSubmission::from_iter(messages))
.await?)
}
pub async fn publish(
&self,
crud_event: &CrudEvent,
content: &impl Serialize,
) -> Result<(), ApiErrorV1> {
let content = serde_json::to_vec(content).map_err(|e| ApiErrorV1::MalformedDataError {
detail: e.to_string(),
code: error_codes::SERVICE_ERROR.to_string(),
})?;
let msg = EventMessage::new(self.make_topic_text(crud_event), content).map_err(|e| {
ApiErrorV1::MalformedDataError {
detail: e.to_string(),
code: error_codes::SERVICE_ERROR.to_string(),
}
})?;
Ok(self.submit_event(EventSubmission::Single(msg)).await?)
}
}
#[cfg(test)]
mod tests {
use topmesys::EventMessage;
use super::{CrudEvent, CrudEventDispatcher};
#[test]
fn test_resource_event_dispatcher_topic() {
let (tx, _) = tokio::sync::mpsc::channel::<EventMessage>(12);
let event_dispatcher = CrudEventDispatcher::new(tx, "test");
let create = CrudEvent::Create("foo".to_string());
let read = CrudEvent::Read("foo".to_string());
let update = CrudEvent::Update("foo".to_string());
let delete = CrudEvent::Delete("foo".to_string());
assert_eq!(event_dispatcher.make_topic_text(&create), "test.foo.create");
assert_eq!(event_dispatcher.make_topic_text(&read), "test.foo.read");
assert_eq!(event_dispatcher.make_topic_text(&update), "test.foo.update");
assert_eq!(event_dispatcher.make_topic_text(&delete), "test.foo.delete");
}
#[tokio::test]
async fn test_resource_event_dispatcher_publishing() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<EventMessage>(12);
let event_dispatcher = CrudEventDispatcher::new(tx, "test");
let many_content = vec![
serde_json::json!({"number": 1, "title": "test payload"}),
serde_json::json!({"number": 2, "title": "test payload"}),
];
let single_content = serde_json::json!({"number": 3, "title": "test payload"});
event_dispatcher
.publish_many(&CrudEvent::Create("dispatcher".to_string()), &many_content)
.await
.unwrap();
event_dispatcher
.publish(
&CrudEvent::Create("dispatcher".to_string()),
&single_content,
)
.await
.unwrap();
let mut buf = Vec::with_capacity(rx.capacity());
rx.recv_many(&mut buf, rx.capacity()).await;
assert_eq!(
buf.pop().unwrap().content().as_ref(),
r#"{"number":3,"title":"test payload"}"#.as_bytes()
);
assert_eq!(
buf.pop().unwrap().content().as_ref(),
r#"{"number":2,"title":"test payload"}"#.as_bytes()
);
assert_eq!(
buf.pop().unwrap().content().as_ref(),
r#"{"number":1,"title":"test payload"}"#.as_bytes()
);
}
}