use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::common::message::SharedMessage;
use crate::error::Result;
#[cfg(feature = "file")]
pub mod file;
#[cfg(feature = "http-client")]
pub mod http_client;
#[cfg(feature = "http-server")]
pub mod http_server;
#[cfg(feature = "redis")]
pub mod redis;
#[cfg(feature = "database")]
pub mod sql;
pub mod system;
pub use system::{
AUDIT_SOURCE_ID, AuditSource, DLQ_SOURCE_ID, DlqSource, EVENT_SOURCE_ID, EventSource,
};
pub type SendHook = Arc<dyn Fn() + Send + Sync>;
pub type AuditHook = Arc<dyn Fn(&str) + Send + Sync>;
#[derive(Clone)]
pub struct MessageSender {
inner: broadcast::Sender<SharedMessage>,
on_send: Option<SendHook>,
on_audit: Option<AuditHook>,
}
impl MessageSender {
pub fn new(inner: broadcast::Sender<SharedMessage>, on_send: Option<SendHook>) -> Self {
Self {
inner,
on_send,
on_audit: None,
}
}
pub fn with_audit_hook(mut self, hook: AuditHook) -> Self {
self.on_audit = Some(hook);
self
}
pub fn send(&self, msg: SharedMessage) -> Result<usize> {
let message_id = msg.meta.id.to_string();
let result = self
.inner
.send(msg)
.map_err(|err| crate::error::Error::source(format!("failed to send message: {err}")));
if result.is_ok() {
if let Some(ref hook) = self.on_send {
hook();
}
if let Some(ref audit_hook) = self.on_audit {
audit_hook(&message_id);
}
}
result
}
}
#[async_trait]
pub trait Source: Send + Sync {
fn id(&self) -> &str;
async fn run(&self, sender: MessageSender, shutdown: broadcast::Receiver<()>) -> Result<()>;
}