use async_trait::async_trait;
use futures_util::Stream;
use std::pin::Pin;
pub mod memory;
pub use memory::InMemoryNotifier;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NotifyEvent {
pub tenant_id: String,
pub conversation_id: String,
pub new_watermark: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum NotifierError {
#[error("subscribe failed: {0}")]
Subscribe(String),
#[error("backend disconnected: {0}")]
Disconnected(String),
}
pub type EventStream = Pin<Box<dyn Stream<Item = NotifyEvent> + Send + 'static>>;
#[async_trait]
pub trait ActivityNotifier: Send + Sync + 'static {
async fn publish(&self, event: NotifyEvent);
async fn subscribe(
&self,
tenant_id: &str,
conversation_id: &str,
) -> Result<EventStream, NotifierError>;
}
#[derive(Debug, Clone)]
pub enum NotifierConfig {
Memory { capacity: usize },
}
impl Default for NotifierConfig {
fn default() -> Self {
NotifierConfig::Memory { capacity: 64 }
}
}
pub fn build_notifier(config: NotifierConfig) -> std::sync::Arc<dyn ActivityNotifier> {
match config {
NotifierConfig::Memory { capacity } => std::sync::Arc::new(InMemoryNotifier::new(capacity)),
}
}
#[cfg(test)]
mod build_tests {
use super::*;
#[tokio::test]
async fn build_default_returns_memory_backend() {
let notifier = build_notifier(NotifierConfig::default());
let mut stream = notifier.subscribe("t", "c").await.unwrap();
notifier
.publish(NotifyEvent {
tenant_id: "t".into(),
conversation_id: "c".into(),
new_watermark: 1,
})
.await;
let received = futures_util::StreamExt::next(&mut stream).await.unwrap();
assert_eq!(received.new_watermark, 1);
}
}