use crate::connection::ConnectionManager;
use crate::types::{Error, NatsConfig, Result, ShipEvent, StreamConfig};
use futures::StreamExt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
pub fn default_stream_config() -> StreamConfig {
StreamConfig::new("SHIP_EVENTS", vec!["ship.events.>".to_string()])
}
pub type ShipEventHandler =
Box<dyn Fn(ShipEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub struct EventBus {
config: NatsConfig,
stream_config: StreamConfig,
subject_prefix: String,
conn: Arc<Mutex<ConnectionManager>>,
source: String,
}
impl EventBus {
pub fn new(config: NatsConfig) -> Self {
Self::with_stream(config, default_stream_config(), "ship.events")
}
pub fn with_stream(
config: NatsConfig,
stream_config: StreamConfig,
subject_prefix: impl Into<String>,
) -> Self {
Self {
conn: Arc::new(Mutex::new(ConnectionManager::new(config.clone()))),
config,
stream_config,
subject_prefix: subject_prefix.into(),
source: String::new(),
}
}
pub fn with_source(mut self, source: impl Into<String>) -> Self {
self.source = source.into();
self
}
pub async fn connect(&self) -> Result<()> {
let mut conn = self.conn.lock().await;
conn.connect().await?;
conn.ensure_stream(&self.stream_config).await?;
Ok(())
}
pub async fn disconnect(&self) -> Result<()> {
self.conn.lock().await.disconnect().await
}
pub async fn is_connected(&self) -> bool {
self.conn.lock().await.is_connected()
}
pub async fn emit(
&self,
event_type: &str,
source: &str,
payload: serde_json::Value,
correlation_id: Option<&str>,
) -> Result<()> {
let subject = format!("{}.{}", self.subject_prefix, event_type);
let event = match correlation_id {
Some(cid) => ShipEvent::with_correlation(event_type, source, payload, cid),
None => ShipEvent::new(event_type, source, payload),
};
let data = serde_json::to_vec(&event)?;
let conn = self.conn.lock().await;
let js = conn.jetstream()?;
js.publish(subject, data.into())
.await
.map_err(|e| Error::JetStream(e.to_string()))?
.await
.map_err(|e| Error::JetStream(e.to_string()))?;
Ok(())
}
pub async fn emit_event(&self, event_type: &str, payload: serde_json::Value) -> Result<()> {
self.emit(event_type, &self.source, payload, None).await
}
pub async fn subscribe(
&self,
pattern: &str,
consumer_name: &str,
handler: impl Fn(ShipEvent) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
) -> Result<tokio::task::JoinHandle<()>> {
let full_pattern = format!("{}.{}", self.subject_prefix, pattern);
let conn = self.conn.lock().await;
let js = conn.jetstream()?;
let consumer = js
.create_consumer_on_stream(
async_nats::jetstream::consumer::push::Config {
filter_subject: full_pattern,
durable_name: Some(consumer_name.to_string()),
deliver_subject: format!("_deliver.{}.{}", consumer_name, uuid::Uuid::new_v4()),
..Default::default()
},
&self.stream_config.name,
)
.await
.map_err(|e| Error::JetStream(e.to_string()))?;
let handler = Arc::new(handler);
let handle = tokio::spawn(async move {
let mut messages = match consumer.messages().await {
Ok(m) => m,
Err(_) => return,
};
while let Some(Ok(msg)) = messages.next().await {
if let Ok(event) = serde_json::from_slice::<ShipEvent>(&msg.payload) {
handler(event).await;
}
let _ = msg.ack().await;
}
});
Ok(handle)
}
pub fn config(&self) -> &NatsConfig {
&self.config
}
pub fn stream_config(&self) -> &StreamConfig {
&self.stream_config
}
}
static GLOBAL_EVENT_BUS: tokio::sync::OnceCell<EventBus> = tokio::sync::OnceCell::const_new();
pub async fn init_global_event_bus(config: NatsConfig) -> Result<()> {
let bus = EventBus::new(config).with_source("global");
bus.connect().await?;
GLOBAL_EVENT_BUS
.set(bus)
.map_err(|_| Error::Connection("global event bus already initialized".into()))
}
pub async fn emit_event(event_type: &str, source: &str, payload: serde_json::Value) -> Result<()> {
let bus = GLOBAL_EVENT_BUS.get().ok_or(Error::NotConnected)?;
bus.emit(event_type, source, payload, None).await
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn default_stream_config_values() {
let config = default_stream_config();
assert_eq!(config.name, "SHIP_EVENTS");
assert_eq!(config.subjects, vec!["ship.events.>"]);
assert_eq!(config.max_age_secs, 86400); assert_eq!(config.max_msgs, 100_000);
assert_eq!(config.storage, "file");
}
#[test]
fn event_bus_new_defaults() {
let bus = EventBus::new(NatsConfig::default());
assert_eq!(bus.subject_prefix, "ship.events");
assert_eq!(bus.stream_config.name, "SHIP_EVENTS");
}
#[test]
fn event_bus_with_custom_stream() {
let stream = StreamConfig::new("CUSTOM", vec!["custom.>".to_string()])
.with_max_age(3600)
.with_memory_storage();
let bus = EventBus::with_stream(NatsConfig::default(), stream, "custom");
assert_eq!(bus.subject_prefix, "custom");
assert_eq!(bus.stream_config.name, "CUSTOM");
assert_eq!(bus.stream_config.max_age_secs, 3600);
}
#[test]
fn event_bus_with_source() {
let bus = EventBus::new(NatsConfig::default()).with_source("kanban-server");
assert_eq!(bus.source, "kanban-server");
}
#[test]
fn ship_event_envelope_subject_format() {
let prefix = "ship.events";
let event_type = "item.created";
let subject = format!("{}.{}", prefix, event_type);
assert_eq!(subject, "ship.events.item.created");
}
#[test]
fn ship_event_envelope_creation() {
let event = ShipEvent::new("item.created", "kanban-server", json!({"id": "EX-3001"}));
assert_eq!(event.event_type, "item.created");
assert_eq!(event.source, "kanban-server");
assert_eq!(event.version, "1.0");
assert_eq!(event.correlation_id.len(), 8);
}
#[test]
fn ship_event_with_explicit_correlation() {
let event = ShipEvent::with_correlation(
"item.moved",
"server",
json!({"from": "backlog", "to": "in_progress"}),
"abc12345",
);
assert_eq!(event.correlation_id, "abc12345");
}
}