use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("connection error: {0}")]
Connection(String),
#[error("jetstream error: {0}")]
JetStream(String),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("operation timed out after {0:?}")]
Timeout(Duration),
#[error("not connected to NATS server")]
NotConnected,
#[error("key not found: {0}")]
KeyNotFound(String),
#[error("object not found: {0}")]
ObjectNotFound(String),
#[error("bucket not found: {0}")]
BucketNotFound(String),
#[error("stream already exists: {0}")]
StreamExists(String),
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShipEvent {
pub event_type: String,
pub timestamp: String,
pub source: String,
pub payload: serde_json::Value,
pub correlation_id: String,
pub version: String,
}
impl ShipEvent {
pub fn new(
event_type: impl Into<String>,
source: impl Into<String>,
payload: serde_json::Value,
) -> Self {
Self {
event_type: event_type.into(),
timestamp: Utc::now().to_rfc3339(),
source: source.into(),
payload,
correlation_id: uuid::Uuid::new_v4().to_string()[..8].to_string(),
version: "1.0".to_string(),
}
}
pub fn with_correlation(
event_type: impl Into<String>,
source: impl Into<String>,
payload: serde_json::Value,
correlation_id: impl Into<String>,
) -> Self {
Self {
event_type: event_type.into(),
timestamp: Utc::now().to_rfc3339(),
source: source.into(),
payload,
correlation_id: correlation_id.into(),
version: "1.0".to_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub event_type: String,
pub source: String,
pub payload: serde_json::Value,
pub timestamp: String,
pub correlation_id: Option<String>,
}
impl Event {
pub fn new(
event_type: impl Into<String>,
source: impl Into<String>,
payload: serde_json::Value,
) -> Self {
Self {
event_type: event_type.into(),
source: source.into(),
payload,
timestamp: Utc::now().to_rfc3339(),
correlation_id: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelMessage {
pub sender: String,
pub content: String,
pub timestamp: f64,
pub channel: String,
pub message_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
impl ChannelMessage {
pub fn new(
sender: impl Into<String>,
content: impl Into<String>,
channel: impl Into<String>,
) -> Self {
let channel = channel.into();
let now = Utc::now();
let ts = now.timestamp() as f64 + now.timestamp_subsec_nanos() as f64 / 1_000_000_000.0;
let message_id = format!("{}-{}", channel, now.timestamp_nanos_opt().unwrap_or(0));
Self {
sender: sender.into(),
content: content.into(),
timestamp: ts,
channel,
message_id,
metadata: None,
}
}
pub fn with_metadata(
sender: impl Into<String>,
content: impl Into<String>,
channel: impl Into<String>,
metadata: serde_json::Value,
) -> Self {
let mut msg = Self::new(sender, content, channel);
msg.metadata = Some(metadata);
msg
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NatsConfig {
pub url: String,
#[serde(with = "duration_serde")]
pub connect_timeout: Duration,
#[serde(with = "duration_serde")]
pub request_timeout: Duration,
}
impl NatsConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(10),
}
}
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
}
impl Default for NatsConfig {
fn default() -> Self {
Self::new("nats://localhost:4222")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
pub name: String,
pub subjects: Vec<String>,
pub max_age_secs: u64,
pub max_msgs: i64,
pub storage: String,
}
impl StreamConfig {
pub fn new(name: impl Into<String>, subjects: Vec<String>) -> Self {
Self {
name: name.into(),
subjects,
max_age_secs: 86400,
max_msgs: 100_000,
storage: "file".to_string(),
}
}
pub fn with_max_age(mut self, secs: u64) -> Self {
self.max_age_secs = secs;
self
}
pub fn with_max_msgs(mut self, max: i64) -> Self {
self.max_msgs = max;
self
}
pub fn with_memory_storage(mut self) -> Self {
self.storage = "memory".to_string();
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KvBucketConfig {
pub bucket: String,
pub history: u64,
#[serde(with = "option_duration_serde")]
pub ttl: Option<Duration>,
}
impl KvBucketConfig {
pub fn new(bucket: impl Into<String>) -> Self {
Self {
bucket: bucket.into(),
history: 5,
ttl: None,
}
}
pub fn with_ttl_secs(mut self, secs: u64) -> Self {
self.ttl = Some(Duration::from_secs(secs));
self
}
pub fn with_history(mut self, history: u64) -> Self {
self.history = history;
self
}
}
mod duration_serde {
use serde::{Deserialize, Deserializer, Serializer};
use std::time::Duration;
pub fn serialize<S: Serializer>(d: &Duration, s: S) -> Result<S::Ok, S::Error> {
s.serialize_u64(d.as_secs())
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Duration, D::Error> {
let secs = u64::deserialize(d)?;
Ok(Duration::from_secs(secs))
}
}
mod option_duration_serde {
use serde::{Deserialize, Deserializer, Serializer};
use std::time::Duration;
pub fn serialize<S: Serializer>(d: &Option<Duration>, s: S) -> Result<S::Ok, S::Error> {
match d {
Some(d) => s.serialize_some(&d.as_secs()),
None => s.serialize_none(),
}
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Option<Duration>, D::Error> {
let secs: Option<u64> = Option::deserialize(d)?;
Ok(secs.map(Duration::from_secs))
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn ship_event_new_generates_timestamp() {
let event = ShipEvent::new("test.event", "test-source", json!({"key": "value"}));
assert!(!event.timestamp.is_empty());
assert!(event.timestamp.contains('T')); }
#[test]
fn ship_event_new_generates_correlation_id() {
let event = ShipEvent::new("test.event", "test-source", json!(null));
assert_eq!(event.correlation_id.len(), 8);
}
#[test]
fn ship_event_defaults_version_to_1_0() {
let event = ShipEvent::new("test.event", "test-source", json!(null));
assert_eq!(event.version, "1.0");
}
#[test]
fn ship_event_serialization_roundtrip() {
let event = ShipEvent::new("test.event", "source", json!({"count": 42}));
let json_str = serde_json::to_string(&event).unwrap();
let deserialized: ShipEvent = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.event_type, "test.event");
assert_eq!(deserialized.source, "source");
assert_eq!(deserialized.payload, json!({"count": 42}));
assert_eq!(deserialized.version, "1.0");
}
#[test]
fn ship_event_with_correlation() {
let event = ShipEvent::with_correlation("test", "src", json!(null), "abcd1234");
assert_eq!(event.correlation_id, "abcd1234");
}
#[test]
fn event_new_generates_timestamp() {
let event = Event::new("heartbeat", "agent-1", json!(null));
assert!(!event.timestamp.is_empty());
assert!(event.timestamp.contains('T'));
}
#[test]
fn event_has_no_correlation_by_default() {
let event = Event::new("heartbeat", "agent-1", json!(null));
assert!(event.correlation_id.is_none());
}
#[test]
fn event_serialization_roundtrip() {
let event = Event::new("started", "mini", json!({"pid": 1234}));
let json_str = serde_json::to_string(&event).unwrap();
let deserialized: Event = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.event_type, "started");
assert_eq!(deserialized.source, "mini");
}
#[test]
fn channel_message_generates_id() {
let msg = ChannelMessage::new("alice", "hello", "general");
assert!(msg.message_id.starts_with("general-"));
assert!(msg.timestamp > 0.0);
}
#[test]
fn channel_message_serialization_roundtrip() {
let msg = ChannelMessage::new("bob", "world", "dev");
let json_str = serde_json::to_string(&msg).unwrap();
let deserialized: ChannelMessage = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.sender, "bob");
assert_eq!(deserialized.content, "world");
assert_eq!(deserialized.channel, "dev");
}
#[test]
fn channel_message_metadata_none_skipped_in_json() {
let msg = ChannelMessage::new("alice", "hi", "ch");
let json_str = serde_json::to_string(&msg).unwrap();
assert!(!json_str.contains("metadata"));
}
#[test]
fn channel_message_with_metadata() {
let msg = ChannelMessage::with_metadata("alice", "hi", "ch", json!({"priority": "high"}));
assert!(msg.metadata.is_some());
let json_str = serde_json::to_string(&msg).unwrap();
assert!(json_str.contains("priority"));
}
#[test]
fn nats_config_defaults() {
let config = NatsConfig::default();
assert_eq!(config.url, "nats://localhost:4222");
assert_eq!(config.connect_timeout, Duration::from_secs(5));
assert_eq!(config.request_timeout, Duration::from_secs(10));
}
#[test]
fn nats_config_custom_url() {
let config = NatsConfig::new("nats://192.168.8.110:4222");
assert_eq!(config.url, "nats://192.168.8.110:4222");
}
#[test]
fn nats_config_builder_pattern() {
let config = NatsConfig::new("nats://localhost:4222")
.with_connect_timeout(Duration::from_secs(30))
.with_request_timeout(Duration::from_secs(60));
assert_eq!(config.connect_timeout, Duration::from_secs(30));
assert_eq!(config.request_timeout, Duration::from_secs(60));
}
#[test]
fn nats_config_serialization_roundtrip() {
let config = NatsConfig::new("nats://example.com:4222");
let json_str = serde_json::to_string(&config).unwrap();
let deserialized: NatsConfig = serde_json::from_str(&json_str).unwrap();
assert_eq!(deserialized.url, "nats://example.com:4222");
}
#[test]
fn stream_config_defaults() {
let config = StreamConfig::new("EVENTS", vec!["events.>".to_string()]);
assert_eq!(config.max_age_secs, 86400);
assert_eq!(config.max_msgs, 100_000);
assert_eq!(config.storage, "file");
}
#[test]
fn stream_config_builder() {
let config = StreamConfig::new("TEST", vec!["test.>".to_string()])
.with_max_age(3600)
.with_max_msgs(1000)
.with_memory_storage();
assert_eq!(config.max_age_secs, 3600);
assert_eq!(config.max_msgs, 1000);
assert_eq!(config.storage, "memory");
}
#[test]
fn kv_bucket_config_defaults() {
let config = KvBucketConfig::new("test_bucket");
assert_eq!(config.bucket, "test_bucket");
assert_eq!(config.history, 5);
assert!(config.ttl.is_none());
}
#[test]
fn kv_bucket_config_with_ttl() {
let config = KvBucketConfig::new("status").with_ttl_secs(300);
assert_eq!(config.ttl, Some(Duration::from_secs(300)));
}
#[test]
fn kv_bucket_config_with_history() {
let config = KvBucketConfig::new("status").with_history(10);
assert_eq!(config.history, 10);
}
#[test]
fn error_display_connection() {
let err = Error::Connection("refused".to_string());
assert_eq!(err.to_string(), "connection error: refused");
}
#[test]
fn error_display_not_connected() {
let err = Error::NotConnected;
assert_eq!(err.to_string(), "not connected to NATS server");
}
#[test]
fn error_display_key_not_found() {
let err = Error::KeyNotFound("my_key".to_string());
assert_eq!(err.to_string(), "key not found: my_key");
}
#[test]
fn error_display_timeout() {
let err = Error::Timeout(Duration::from_secs(5));
assert_eq!(err.to_string(), "operation timed out after 5s");
}
#[test]
fn error_from_serde_json() {
let result: std::result::Result<serde_json::Value, _> = serde_json::from_str("not json");
let err: Error = result.unwrap_err().into();
matches!(err, Error::Serialization(_));
}
}