use crate::connection::ConnectionManager;
use crate::types::{Error, Event, NatsConfig, Result};
use futures::StreamExt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone)]
pub enum EventType {
Custom(String),
Heartbeat,
Started,
Stopped,
Error,
}
impl EventType {
pub fn subject(&self) -> &str {
match self {
EventType::Custom(s) => s,
EventType::Heartbeat => "heartbeat",
EventType::Started => "started",
EventType::Stopped => "stopped",
EventType::Error => "error",
}
}
pub fn with_entity(&self, entity_id: &str) -> String {
format!("{}.{}", entity_id, self.subject())
}
}
pub type EventHandler =
Box<dyn Fn(Event) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub struct Publisher {
source_id: String,
conn: ConnectionManager,
}
impl Publisher {
pub fn new(source_id: impl Into<String>, config: NatsConfig) -> Self {
Self {
source_id: source_id.into(),
conn: ConnectionManager::new(config),
}
}
pub async fn connect(&mut self) -> Result<()> {
self.conn.connect().await
}
pub async fn disconnect(&mut self) -> Result<()> {
self.conn.disconnect().await
}
pub async fn publish(&self, subject: &str, payload: &[u8]) -> Result<()> {
let client = self.conn.client()?;
client
.publish(subject.to_string(), payload.to_vec().into())
.await
.map_err(|e| Error::Connection(e.to_string()))
}
pub async fn emit(&self, event_type: &EventType, payload: serde_json::Value) -> Result<()> {
let event = Event::new(event_type.subject(), &self.source_id, payload);
let data = serde_json::to_vec(&event)?;
self.publish(event_type.subject(), &data).await
}
pub async fn emit_being(
&self,
being_id: &str,
event_type: &EventType,
payload: serde_json::Value,
) -> Result<()> {
let subject = event_type.with_entity(being_id);
let event = Event::new(event_type.subject(), &self.source_id, payload);
let data = serde_json::to_vec(&event)?;
self.publish(&subject, &data).await
}
}
pub struct Subscriber {
subscriber_id: String,
conn: ConnectionManager,
handlers: Vec<(String, Arc<EventHandler>)>,
shutdown: tokio::sync::watch::Sender<bool>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
}
impl Subscriber {
pub fn new(subscriber_id: impl Into<String>, config: NatsConfig) -> Self {
let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
Self {
subscriber_id: subscriber_id.into(),
conn: ConnectionManager::new(config),
handlers: Vec::new(),
shutdown,
shutdown_rx,
}
}
pub async fn connect(&mut self) -> Result<()> {
self.conn.connect().await
}
pub async fn disconnect(&mut self) -> Result<()> {
let _ = self.shutdown.send(true);
self.conn.disconnect().await
}
pub fn subscribe<F, Fut>(&mut self, subject: impl Into<String>, handler: F)
where
F: Fn(Event) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let handler: EventHandler = Box::new(move |event| Box::pin(handler(event)));
self.handlers.push((subject.into(), Arc::new(handler)));
}
pub async fn run(&self) -> Result<()> {
let client = self.conn.client()?;
let mut tasks = Vec::new();
for (subject, handler) in &self.handlers {
let mut sub = client
.subscribe(subject.clone())
.await
.map_err(|e| Error::Connection(e.to_string()))?;
let handler = Arc::clone(handler);
let mut shutdown_rx = self.shutdown_rx.clone();
let _sub_id = self.subscriber_id.clone();
tasks.push(tokio::spawn(async move {
loop {
tokio::select! {
msg = sub.next() => {
if let Some(msg) = msg {
if let Ok(event) = serde_json::from_slice::<Event>(&msg.payload) {
handler(event).await;
}
} else {
break;
}
}
_ = shutdown_rx.changed() => {
break;
}
}
}
}));
}
for task in tasks {
let _ = task.await;
}
Ok(())
}
}
pub struct PubSub {
source_id: String,
conn: Arc<Mutex<ConnectionManager>>,
handlers: Vec<(String, Arc<EventHandler>)>,
shutdown: tokio::sync::watch::Sender<bool>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
}
impl PubSub {
pub fn new(bus_id: impl Into<String>, config: NatsConfig) -> Self {
let (shutdown, shutdown_rx) = tokio::sync::watch::channel(false);
Self {
source_id: bus_id.into(),
conn: Arc::new(Mutex::new(ConnectionManager::new(config))),
handlers: Vec::new(),
shutdown,
shutdown_rx,
}
}
pub async fn connect(&self) -> Result<()> {
self.conn.lock().await.connect().await
}
pub fn subscribe<F, Fut>(&mut self, subject: impl Into<String>, handler: F)
where
F: Fn(Event) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let handler: EventHandler = Box::new(move |event| Box::pin(handler(event)));
self.handlers.push((subject.into(), Arc::new(handler)));
}
pub async fn emit(&self, event_type: &EventType, payload: serde_json::Value) -> Result<()> {
let event = Event::new(event_type.subject(), &self.source_id, payload);
let data = serde_json::to_vec(&event)?;
let conn = self.conn.lock().await;
let client = conn.client()?;
client
.publish(event_type.subject().to_string(), data.into())
.await
.map_err(|e| Error::Connection(e.to_string()))
}
pub async fn publish(&self, subject: &str, payload: &[u8]) -> Result<()> {
let conn = self.conn.lock().await;
let client = conn.client()?;
client
.publish(subject.to_string(), payload.to_vec().into())
.await
.map_err(|e| Error::Connection(e.to_string()))
}
pub async fn run(&self) -> Result<()> {
let conn = self.conn.lock().await;
let client = conn.client()?.clone();
drop(conn);
let mut tasks = Vec::new();
for (subject, handler) in &self.handlers {
let mut sub = client
.subscribe(subject.clone())
.await
.map_err(|e| Error::Connection(e.to_string()))?;
let handler = Arc::clone(handler);
let mut shutdown_rx = self.shutdown_rx.clone();
tasks.push(tokio::spawn(async move {
loop {
tokio::select! {
msg = sub.next() => {
if let Some(msg) = msg {
if let Ok(event) = serde_json::from_slice::<Event>(&msg.payload) {
handler(event).await;
}
} else {
break;
}
}
_ = shutdown_rx.changed() => {
break;
}
}
}
}));
}
for task in tasks {
let _ = task.await;
}
Ok(())
}
pub fn shutdown(&self) {
let _ = self.shutdown.send(true);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn event_type_subject() {
assert_eq!(EventType::Heartbeat.subject(), "heartbeat");
assert_eq!(EventType::Started.subject(), "started");
assert_eq!(EventType::Stopped.subject(), "stopped");
assert_eq!(EventType::Error.subject(), "error");
assert_eq!(EventType::Custom("my.event".into()).subject(), "my.event");
}
#[test]
fn event_type_with_entity() {
assert_eq!(
EventType::Heartbeat.with_entity("being-1"),
"being-1.heartbeat"
);
assert_eq!(
EventType::Custom("status".into()).with_entity("mini"),
"mini.status"
);
}
#[test]
fn publisher_new_not_connected() {
let pub_ = Publisher::new("test", NatsConfig::default());
assert!(!pub_.conn.is_connected());
}
#[test]
fn subscriber_new_not_connected() {
let sub = Subscriber::new("test", NatsConfig::default());
assert!(!sub.conn.is_connected());
}
#[test]
fn pubsub_new() {
let _bus = PubSub::new("test-bus", NatsConfig::default());
}
#[test]
fn subscriber_register_handler() {
let mut sub = Subscriber::new("test", NatsConfig::default());
sub.subscribe("test.>", |_event| async {});
assert_eq!(sub.handlers.len(), 1);
}
#[test]
fn pubsub_register_handler() {
let mut bus = PubSub::new("test", NatsConfig::default());
bus.subscribe("events.>", |_event| async {});
assert_eq!(bus.handlers.len(), 1);
}
}