use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "consumer")]
mod router;
#[cfg(feature = "kafka")]
pub use kafka::{KafkaConsumer, KafkaProducer};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub topic: String,
pub from: String,
pub data: serde_json::Value,
pub timestamp: i64,
}
impl Message {
pub fn new(topic: impl Into<String>, from: impl Into<String>, data: serde_json::Value) -> Self {
Self {
topic: topic.into(),
from: from.into(),
data,
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
}
#[async_trait]
pub trait MessageProducer: Send + Sync {
async fn publish(
&self,
topic: &str,
message: Message,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
async fn publish_batch(
&self,
messages: Vec<(String, Message)>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
for (topic, message) in messages {
self.publish(&topic, message).await?;
}
Ok(())
}
}
#[async_trait]
pub trait MessageConsumer: Send + Sync {
async fn subscribe(
&self,
topic: &str,
handler: Arc<dyn Fn(Message) + Send + Sync>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
async fn subscribe_topics(
&self,
topics: Vec<String>,
handler: Arc<dyn Fn(Message) + Send + Sync>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
pub type MessageProducerType = Arc<dyn MessageProducer + Send + Sync>;
pub type MessageConsumerType = Arc<dyn MessageConsumer + Send + Sync>;
#[cfg(feature = "consumer")]
#[async_trait]
pub trait KafkaMessageHandler: Send + Sync {
fn topics(&self) -> Vec<String>;
fn group_id(&self) -> String;
async fn handle(&self, message: Message);
}
#[cfg(feature = "consumer")]
pub use router::KafkaMessageRouter;