use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum BrokerError {
#[error("broker not configured")]
NotConfigured,
#[error("connection failed: {0}")]
ConnectionFailed(String),
#[error("publish failed: {0}")]
PublishFailed(String),
#[error("invalid message: {0}")]
InvalidMessage(String),
#[error("timeout")]
Timeout,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BrokerMessage {
#[serde(default)]
pub url: Option<String>,
pub topic: String,
#[serde(default)]
pub key: Option<String>,
pub payload: String,
#[serde(default)]
pub headers: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PublishResult {
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
pub topic: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub partition: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<i64>,
}
impl PublishResult {
pub fn success(topic: String) -> Self {
Self {
success: true,
error: None,
topic,
partition: None,
offset: None,
}
}
pub fn failure(topic: String, error: String) -> Self {
Self {
success: false,
error: Some(error),
topic,
partition: None,
offset: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn broker_message_url_roundtrip() {
let message = BrokerMessage {
url: Some("nats://localhost:4222".to_string()),
topic: "test.subject".to_string(),
key: None,
payload: r#"{"data":"hello"}"#.to_string(),
headers: BTreeMap::new(),
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: BrokerMessage = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.url, Some("nats://localhost:4222".to_string()));
assert_eq!(deserialized.topic, "test.subject");
assert_eq!(deserialized.payload, r#"{"data":"hello"}"#);
}
#[test]
fn broker_message_url_absent() {
let json = r#"{"topic":"events","payload":"{}"}"#;
let message: BrokerMessage = serde_json::from_str(json).unwrap();
assert!(message.url.is_none());
assert_eq!(message.topic, "events");
}
#[test]
fn broker_message_with_key_and_headers() {
let mut headers = BTreeMap::new();
headers.insert("x-request-id".to_string(), "req-123".to_string());
headers.insert("x-trace-id".to_string(), "trace-456".to_string());
let message = BrokerMessage {
url: Some("kafka:9092".to_string()),
topic: "orders.placed".to_string(),
key: Some("order-789".to_string()),
payload: r#"{"orderId":"789"}"#.to_string(),
headers,
};
let json = serde_json::to_string(&message).unwrap();
let deserialized: BrokerMessage = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.key, Some("order-789".to_string()));
assert_eq!(deserialized.headers.len(), 2);
assert_eq!(deserialized.headers["x-request-id"], "req-123");
assert_eq!(deserialized.headers["x-trace-id"], "trace-456");
}
#[test]
fn broker_message_key_defaults_to_none() {
let json = r#"{"topic":"events","payload":"{}"}"#;
let message: BrokerMessage = serde_json::from_str(json).unwrap();
assert!(message.key.is_none());
assert!(message.headers.is_empty());
}
#[test]
fn publish_result_success_constructor() {
let result = PublishResult::success("orders.placed".to_string());
assert!(result.success);
assert!(result.error.is_none());
assert_eq!(result.topic, "orders.placed");
assert!(result.partition.is_none());
assert!(result.offset.is_none());
}
#[test]
fn publish_result_failure_constructor() {
let result = PublishResult::failure(
"orders.placed".to_string(),
"connection refused".to_string(),
);
assert!(!result.success);
assert_eq!(result.error, Some("connection refused".to_string()));
assert_eq!(result.topic, "orders.placed");
}
#[test]
fn publish_result_success_serialization_skips_optional() {
let result = PublishResult::success("events".to_string());
let json = serde_json::to_string(&result).unwrap();
assert!(!json.contains("error"));
assert!(!json.contains("partition"));
assert!(!json.contains("offset"));
assert!(json.contains(r#""success":true"#));
}
#[test]
fn publish_result_failure_serialization_includes_error() {
let result = PublishResult::failure("events".to_string(), "timeout".to_string());
let json = serde_json::to_string(&result).unwrap();
assert!(json.contains(r#""error":"timeout""#));
assert!(json.contains(r#""success":false"#));
}
#[test]
fn publish_result_with_kafka_metadata_roundtrip() {
let result = PublishResult {
success: true,
error: None,
topic: "orders".to_string(),
partition: Some(3),
offset: Some(42),
};
let json = serde_json::to_string(&result).unwrap();
let deserialized: PublishResult = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.partition, Some(3));
assert_eq!(deserialized.offset, Some(42));
}
#[test]
fn broker_error_display() {
assert_eq!(
BrokerError::NotConfigured.to_string(),
"broker not configured"
);
assert_eq!(BrokerError::Timeout.to_string(), "timeout");
assert_eq!(
BrokerError::ConnectionFailed("refused".to_string()).to_string(),
"connection failed: refused"
);
assert_eq!(
BrokerError::PublishFailed("full".to_string()).to_string(),
"publish failed: full"
);
assert_eq!(
BrokerError::InvalidMessage("bad json".to_string()).to_string(),
"invalid message: bad json"
);
}
}