use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum BrokerError {
#[error("broker not configured")]
NotConfigured,
#[error("connection failed: {0}")]
ConnectionFailed(String),
#[error("publish failed: {0}")]
PublishFailed(String),
#[error("invalid message: {0}")]
InvalidMessage(String),
#[error("timeout")]
Timeout,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BrokerMessage {
pub topic: String,
#[serde(default)]
pub key: Option<String>,
pub payload: String,
#[serde(default)]
pub headers: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PublishResult {
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
pub topic: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub partition: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<i64>,
}
impl PublishResult {
pub fn success(topic: String) -> Self {
Self {
success: true,
error: None,
topic,
partition: None,
offset: None,
}
}
pub fn failure(topic: String, error: String) -> Self {
Self {
success: false,
error: Some(error),
topic,
partition: None,
offset: None,
}
}
}
pub trait MessageBroker: Send + Sync {
fn publish(&self, message: BrokerMessage) -> Result<PublishResult, BrokerError>;
fn broker_type(&self) -> &'static str;
}
#[derive(Clone, Default)]
pub struct MockBroker {
messages: Arc<RwLock<Vec<BrokerMessage>>>,
fail_on_topic: Arc<RwLock<Option<String>>>,
}
impl MockBroker {
pub fn new() -> Self {
Self::default()
}
pub fn messages(&self) -> Vec<BrokerMessage> {
self.messages.read().unwrap().clone()
}
pub fn clear(&self) {
self.messages.write().unwrap().clear();
}
pub fn fail_on(&self, topic: &str) {
*self.fail_on_topic.write().unwrap() = Some(topic.to_string());
}
}
impl MessageBroker for MockBroker {
fn publish(&self, message: BrokerMessage) -> Result<PublishResult, BrokerError> {
if let Some(fail_topic) = self.fail_on_topic.read().unwrap().as_ref() {
if &message.topic == fail_topic {
return Ok(PublishResult::failure(
message.topic,
"simulated failure".to_string(),
));
}
}
let topic = message.topic.clone();
self.messages.write().unwrap().push(message);
Ok(PublishResult::success(topic))
}
fn broker_type(&self) -> &'static str {
"mock"
}
}
pub struct KafkaBroker {
_brokers: String,
}
impl KafkaBroker {
pub fn new(brokers: &str) -> Self {
Self {
_brokers: brokers.to_string(),
}
}
}
impl MessageBroker for KafkaBroker {
fn publish(&self, message: BrokerMessage) -> Result<PublishResult, BrokerError> {
tracing::warn!(
topic = %message.topic,
"Kafka publish not yet implemented, message dropped"
);
Ok(PublishResult::failure(
message.topic,
"Kafka client not yet implemented".to_string(),
))
}
fn broker_type(&self) -> &'static str {
"kafka"
}
}
pub struct NatsBroker {
_servers: String,
}
impl NatsBroker {
pub fn new(servers: &str) -> Self {
Self {
_servers: servers.to_string(),
}
}
}
impl MessageBroker for NatsBroker {
fn publish(&self, message: BrokerMessage) -> Result<PublishResult, BrokerError> {
tracing::warn!(
subject = %message.topic,
"NATS publish not yet implemented, message dropped"
);
Ok(PublishResult::failure(
message.topic,
"NATS client not yet implemented".to_string(),
))
}
fn broker_type(&self) -> &'static str {
"nats"
}
}
#[derive(Default)]
pub struct BrokerRegistry {
kafka: Option<Arc<dyn MessageBroker>>,
nats: Option<Arc<dyn MessageBroker>>,
}
impl BrokerRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn with_mocks() -> Self {
Self {
kafka: Some(Arc::new(MockBroker::new())),
nats: Some(Arc::new(MockBroker::new())),
}
}
pub fn set_kafka(&mut self, broker: Arc<dyn MessageBroker>) {
self.kafka = Some(broker);
}
pub fn set_nats(&mut self, broker: Arc<dyn MessageBroker>) {
self.nats = Some(broker);
}
pub fn kafka(&self) -> Option<&Arc<dyn MessageBroker>> {
self.kafka.as_ref()
}
pub fn nats(&self) -> Option<&Arc<dyn MessageBroker>> {
self.nats.as_ref()
}
pub fn publish_kafka(&self, message: BrokerMessage) -> Result<PublishResult, BrokerError> {
match &self.kafka {
Some(broker) => broker.publish(message),
None => Err(BrokerError::NotConfigured),
}
}
pub fn publish_nats(&self, message: BrokerMessage) -> Result<PublishResult, BrokerError> {
match &self.nats {
Some(broker) => broker.publish(message),
None => Err(BrokerError::NotConfigured),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mock_broker_records_messages() {
let broker = MockBroker::new();
let message = BrokerMessage {
topic: "test-topic".to_string(),
key: Some("key-1".to_string()),
payload: r#"{"event":"test"}"#.to_string(),
headers: BTreeMap::new(),
};
let result = broker.publish(message).unwrap();
assert!(result.success);
assert_eq!(result.topic, "test-topic");
let messages = broker.messages();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].topic, "test-topic");
assert_eq!(messages[0].key, Some("key-1".to_string()));
}
#[test]
fn mock_broker_simulates_failure() {
let broker = MockBroker::new();
broker.fail_on("fail-topic");
let message = BrokerMessage {
topic: "fail-topic".to_string(),
key: None,
payload: "{}".to_string(),
headers: BTreeMap::new(),
};
let result = broker.publish(message).unwrap();
assert!(!result.success);
assert!(result.error.is_some());
}
#[test]
fn broker_registry_with_mocks() {
let registry = BrokerRegistry::with_mocks();
let message = BrokerMessage {
topic: "events".to_string(),
key: None,
payload: "{}".to_string(),
headers: BTreeMap::new(),
};
let result = registry.publish_kafka(message.clone()).unwrap();
assert!(result.success);
let result = registry.publish_nats(message).unwrap();
assert!(result.success);
}
#[test]
fn broker_registry_not_configured() {
let registry = BrokerRegistry::new();
let message = BrokerMessage {
topic: "events".to_string(),
key: None,
payload: "{}".to_string(),
headers: BTreeMap::new(),
};
let result = registry.publish_kafka(message);
assert!(matches!(result, Err(BrokerError::NotConfigured)));
}
}