greentic_start/notifier/
mod.rs1use async_trait::async_trait;
7use futures_util::Stream;
8use std::pin::Pin;
9
10pub mod config;
11pub mod memory;
12pub mod redis;
13
14pub use memory::InMemoryNotifier;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct NotifyEvent {
19 pub tenant_id: String,
20 pub conversation_id: String,
21 pub new_watermark: u64,
22}
23
24#[derive(Debug, thiserror::Error)]
25pub enum NotifierError {
26 #[error("subscribe failed: {0}")]
27 Subscribe(String),
28 #[error("backend disconnected: {0}")]
29 Disconnected(String),
30}
31
32pub type EventStream = Pin<Box<dyn Stream<Item = NotifyEvent> + Send + 'static>>;
33
34#[async_trait]
35pub trait ActivityNotifier: Send + Sync + 'static {
36 async fn publish(&self, event: NotifyEvent);
38
39 async fn subscribe(
42 &self,
43 tenant_id: &str,
44 conversation_id: &str,
45 ) -> Result<EventStream, NotifierError>;
46}
47
48#[derive(Debug, Clone, serde::Deserialize)]
53#[serde(tag = "backend", rename_all = "lowercase")]
54pub enum NotifierConfig {
55 Memory {
56 #[serde(default = "default_capacity")]
57 capacity: usize,
58 },
59 Redis {
60 #[serde(default)]
63 url: Option<String>,
64 #[serde(default)]
66 channel: Option<String>,
67 #[serde(default = "default_capacity")]
70 capacity: usize,
71 },
72}
73
74fn default_capacity() -> usize {
75 64
76}
77
78impl Default for NotifierConfig {
79 fn default() -> Self {
80 NotifierConfig::Memory { capacity: 64 }
81 }
82}
83
84pub async fn build_notifier(
85 config: NotifierConfig,
86) -> anyhow::Result<std::sync::Arc<dyn ActivityNotifier>> {
87 match config {
88 NotifierConfig::Memory { capacity } => {
89 Ok(std::sync::Arc::new(InMemoryNotifier::new(capacity)))
90 }
91 NotifierConfig::Redis {
92 url,
93 channel,
94 capacity,
95 } => {
96 let url = url.ok_or_else(|| {
97 anyhow::anyhow!(
98 "Redis notifier built without a URL — call resolve_notifier_config first"
99 )
100 })?;
101 let notifier =
102 crate::notifier::redis::RedisNotifier::build(&url, channel, capacity).await?;
103 Ok(notifier as std::sync::Arc<dyn ActivityNotifier>)
104 }
105 }
106}
107
108#[cfg(test)]
109mod build_tests {
110 use super::*;
111
112 #[tokio::test]
113 async fn build_default_returns_memory_backend() {
114 let notifier = build_notifier(NotifierConfig::default())
115 .await
116 .expect("build");
117 let mut stream = notifier.subscribe("t", "c").await.unwrap();
118 notifier
119 .publish(NotifyEvent {
120 tenant_id: "t".into(),
121 conversation_id: "c".into(),
122 new_watermark: 1,
123 })
124 .await;
125 let received = futures_util::StreamExt::next(&mut stream).await.unwrap();
126 assert_eq!(received.new_watermark, 1);
127 }
128}
129
130#[cfg(test)]
131mod config_tests {
132 use super::*;
133
134 #[test]
135 fn notifier_config_serde_default_yaml_empty() {
136 let cfg: NotifierConfig = serde_yaml_bw::from_str("backend: memory").expect("parse");
138 match cfg {
139 NotifierConfig::Memory { capacity } => assert_eq!(capacity, 64),
140 _ => panic!("expected Memory variant"),
141 }
142 }
143
144 #[test]
145 fn notifier_config_serde_redis_minimal() {
146 let yaml = "backend: redis";
147 let cfg: NotifierConfig = serde_yaml_bw::from_str(yaml).expect("parse");
148 match cfg {
149 NotifierConfig::Redis {
150 url,
151 channel,
152 capacity,
153 } => {
154 assert!(url.is_none());
155 assert!(channel.is_none());
156 assert_eq!(capacity, 64);
157 }
158 _ => panic!("expected Redis variant"),
159 }
160 }
161
162 #[test]
163 fn notifier_config_serde_redis_full() {
164 let yaml = "\
165backend: redis
166url: redis://localhost:6379
167channel: greentic:webchat:notify
168capacity: 128
169";
170 let cfg: NotifierConfig = serde_yaml_bw::from_str(yaml).expect("parse");
171 match cfg {
172 NotifierConfig::Redis {
173 url,
174 channel,
175 capacity,
176 } => {
177 assert_eq!(url.as_deref(), Some("redis://localhost:6379"));
178 assert_eq!(channel.as_deref(), Some("greentic:webchat:notify"));
179 assert_eq!(capacity, 128);
180 }
181 _ => panic!("expected Redis variant"),
182 }
183 }
184}