use std::collections::HashMap;
use crate::{
error::BroccoliError,
queue::{ConsumeOptions, PublishOptions},
};
#[async_trait::async_trait]
pub trait Broker: Send + Sync {
async fn connect(&mut self, broker_url: &str) -> Result<(), BroccoliError>;
async fn publish(
&self,
queue_name: &str,
disambiguator: Option<String>,
message: &[InternalBrokerMessage],
options: Option<PublishOptions>,
) -> Result<Vec<InternalBrokerMessage>, BroccoliError>;
async fn try_consume(
&self,
queue_name: &str,
options: Option<ConsumeOptions>,
) -> Result<Option<InternalBrokerMessage>, BroccoliError>;
async fn try_consume_batch(
&self,
queue_name: &str,
batch_size: usize,
options: Option<ConsumeOptions>,
) -> Result<Vec<InternalBrokerMessage>, BroccoliError> {
let mut messages = Vec::with_capacity(batch_size);
let mut i = 0;
while i < batch_size && messages.len() < batch_size {
if let Ok(Some(msg)) = self.try_consume(queue_name, options.clone()).await {
messages.push(msg);
}
i += 1;
}
Ok(messages)
}
async fn consume(
&self,
queue_name: &str,
options: Option<ConsumeOptions>,
) -> Result<InternalBrokerMessage, BroccoliError>;
async fn acknowledge(
&self,
queue_name: &str,
message: InternalBrokerMessage,
) -> Result<(), BroccoliError>;
async fn reject(
&self,
queue_name: &str,
message: InternalBrokerMessage,
) -> Result<(), BroccoliError>;
async fn cancel(&self, queue_name: &str, message_id: String) -> Result<(), BroccoliError>;
async fn size(&self, queue_name: &str) -> Result<HashMap<String, u64>, BroccoliError>;
}
#[derive(Debug, Clone)]
pub struct BrokerConfig {
pub retry_attempts: Option<u8>,
pub retry_failed: Option<bool>,
pub pool_connections: Option<u8>,
pub enable_scheduling: Option<bool>,
#[cfg(feature = "surrealdb")]
pub surrealdb_connection: Option<surrealdb::Surreal<surrealdb::engine::any::Any>>,
}
impl Default for BrokerConfig {
fn default() -> Self {
Self {
retry_attempts: Some(3),
retry_failed: Some(true),
pool_connections: Some(10),
enable_scheduling: Some(false),
#[cfg(feature = "surrealdb")]
surrealdb_connection: None,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BrokerMessage<T: Clone> {
pub task_id: uuid::Uuid,
pub payload: T,
pub attempts: u8,
pub disambiguator: Option<String>,
#[serde(skip)]
pub(crate) metadata: Option<HashMap<String, MetadataTypes>>,
}
impl<T: Clone + serde::Serialize> BrokerMessage<T> {
pub fn new(payload: T, disambiguator: Option<String>) -> Self {
Self {
task_id: uuid::Uuid::new_v4(),
payload,
attempts: 0,
disambiguator,
metadata: None,
}
}
}
#[derive(Debug, Clone)]
pub(crate) enum MetadataTypes {
String(String),
U64(u64),
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct InternalBrokerMessage {
pub task_id: String,
pub payload: String,
pub attempts: u8,
pub disambiguator: Option<String>,
#[serde(skip)]
pub(crate) metadata: Option<HashMap<String, MetadataTypes>>,
}
impl InternalBrokerMessage {
#[must_use]
pub const fn new(
task_id: String,
payload: String,
attempts: u8,
disambiguator: Option<String>,
) -> Self {
Self {
task_id,
payload,
attempts,
disambiguator,
metadata: None,
}
}
}
impl<T: Clone + serde::Serialize> From<BrokerMessage<T>> for InternalBrokerMessage {
fn from(msg: BrokerMessage<T>) -> Self {
Self {
task_id: msg.task_id.to_string(),
payload: serde_json::to_string(&msg.payload).unwrap_or_default(),
attempts: msg.attempts,
disambiguator: msg.disambiguator,
metadata: msg.metadata,
}
}
}
impl<T: Clone + serde::Serialize> From<&BrokerMessage<T>> for InternalBrokerMessage {
fn from(msg: &BrokerMessage<T>) -> Self {
Self {
task_id: msg.task_id.to_string(),
payload: serde_json::to_string(&msg.payload).unwrap_or_default(),
attempts: msg.attempts,
disambiguator: msg.disambiguator.clone(),
metadata: msg.metadata.clone(),
}
}
}
impl InternalBrokerMessage {
pub fn into_message<T: Clone + serde::de::DeserializeOwned + serde::Serialize>(
&self,
) -> Result<BrokerMessage<T>, BroccoliError> {
Ok(BrokerMessage {
task_id: self.task_id.parse().unwrap_or_default(),
payload: serde_json::from_str(&self.payload).map_err(|e| {
BroccoliError::Broker(format!("Failed to parse message payload: {e}"))
})?,
attempts: self.attempts,
disambiguator: self.disambiguator.clone(),
metadata: self.metadata.clone(),
})
}
}
pub enum BrokerType {
#[cfg(feature = "redis")]
Redis,
#[cfg(feature = "rabbitmq")]
RabbitMQ,
#[cfg(feature = "surrealdb")]
SurrealDB,
}