Skip to main content

greentic_start/notifier/
mod.rs

1//! Activity push notifier — informs WS sessions when a conversation has new activities.
2//!
3//! Two backends are supported via the `ActivityNotifier` trait. This module ships the
4//! trait, types, and the in-memory backend. NATS lives in a follow-up plan.
5
6use async_trait::async_trait;
7use futures_util::Stream;
8use std::pin::Pin;
9
10pub mod config;
11pub mod memory;
12pub mod redis;
13
14pub use memory::InMemoryNotifier;
15
16/// Identifies an activity-write event for a single conversation.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct NotifyEvent {
19    pub tenant_id: String,
20    pub conversation_id: String,
21    pub new_watermark: u64,
22}
23
24#[derive(Debug, thiserror::Error)]
25pub enum NotifierError {
26    #[error("subscribe failed: {0}")]
27    Subscribe(String),
28    #[error("backend disconnected: {0}")]
29    Disconnected(String),
30}
31
32pub type EventStream = Pin<Box<dyn Stream<Item = NotifyEvent> + Send + 'static>>;
33
34#[async_trait]
35pub trait ActivityNotifier: Send + Sync + 'static {
36    /// Fire-and-forget publish. Failures are logged but not propagated.
37    async fn publish(&self, event: NotifyEvent);
38
39    /// Subscribe to events for a specific (tenant, conversation_id). Drop the
40    /// returned stream to unsubscribe.
41    async fn subscribe(
42        &self,
43        tenant_id: &str,
44        conversation_id: &str,
45    ) -> Result<EventStream, NotifierError>;
46}
47
48/// Backend selector for `build_notifier`.
49///
50/// Deserialized from the `webchat.notifier` section of `greentic.yaml`.
51/// Absent or unset → defaults to `Memory { capacity: 64 }`.
52#[derive(Debug, Clone, serde::Deserialize)]
53#[serde(tag = "backend", rename_all = "lowercase")]
54pub enum NotifierConfig {
55    Memory {
56        #[serde(default = "default_capacity")]
57        capacity: usize,
58    },
59    Redis {
60        /// Optional explicit URL. If `None`, resolved from the state-redis
61        /// provider's `ConfigEnvelope` at boot time.
62        #[serde(default)]
63        url: Option<String>,
64        /// Channel name override. Default: `greentic:webchat:notify`.
65        #[serde(default)]
66        channel: Option<String>,
67        /// Local in-memory broadcast capacity (forwarded to the inner
68        /// `InMemoryNotifier`).
69        #[serde(default = "default_capacity")]
70        capacity: usize,
71    },
72}
73
74fn default_capacity() -> usize {
75    64
76}
77
78impl Default for NotifierConfig {
79    fn default() -> Self {
80        NotifierConfig::Memory { capacity: 64 }
81    }
82}
83
84pub async fn build_notifier(
85    config: NotifierConfig,
86) -> anyhow::Result<std::sync::Arc<dyn ActivityNotifier>> {
87    match config {
88        NotifierConfig::Memory { capacity } => {
89            Ok(std::sync::Arc::new(InMemoryNotifier::new(capacity)))
90        }
91        NotifierConfig::Redis {
92            url,
93            channel,
94            capacity,
95        } => {
96            let url = url.ok_or_else(|| {
97                anyhow::anyhow!(
98                    "Redis notifier built without a URL — call resolve_notifier_config first"
99                )
100            })?;
101            let notifier =
102                crate::notifier::redis::RedisNotifier::build(&url, channel, capacity).await?;
103            Ok(notifier as std::sync::Arc<dyn ActivityNotifier>)
104        }
105    }
106}
107
108#[cfg(test)]
109mod build_tests {
110    use super::*;
111
112    #[tokio::test]
113    async fn build_default_returns_memory_backend() {
114        let notifier = build_notifier(NotifierConfig::default())
115            .await
116            .expect("build");
117        let mut stream = notifier.subscribe("t", "c").await.unwrap();
118        notifier
119            .publish(NotifyEvent {
120                tenant_id: "t".into(),
121                conversation_id: "c".into(),
122                new_watermark: 1,
123            })
124            .await;
125        let received = futures_util::StreamExt::next(&mut stream).await.unwrap();
126        assert_eq!(received.new_watermark, 1);
127    }
128}
129
130#[cfg(test)]
131mod config_tests {
132    use super::*;
133
134    #[test]
135    fn notifier_config_serde_default_yaml_empty() {
136        // Empty YAML map should default to Memory { capacity: 64 }.
137        let cfg: NotifierConfig = serde_yaml_bw::from_str("backend: memory").expect("parse");
138        match cfg {
139            NotifierConfig::Memory { capacity } => assert_eq!(capacity, 64),
140            _ => panic!("expected Memory variant"),
141        }
142    }
143
144    #[test]
145    fn notifier_config_serde_redis_minimal() {
146        let yaml = "backend: redis";
147        let cfg: NotifierConfig = serde_yaml_bw::from_str(yaml).expect("parse");
148        match cfg {
149            NotifierConfig::Redis {
150                url,
151                channel,
152                capacity,
153            } => {
154                assert!(url.is_none());
155                assert!(channel.is_none());
156                assert_eq!(capacity, 64);
157            }
158            _ => panic!("expected Redis variant"),
159        }
160    }
161
162    #[test]
163    fn notifier_config_serde_redis_full() {
164        let yaml = "\
165backend: redis
166url: redis://localhost:6379
167channel: greentic:webchat:notify
168capacity: 128
169";
170        let cfg: NotifierConfig = serde_yaml_bw::from_str(yaml).expect("parse");
171        match cfg {
172            NotifierConfig::Redis {
173                url,
174                channel,
175                capacity,
176            } => {
177                assert_eq!(url.as_deref(), Some("redis://localhost:6379"));
178                assert_eq!(channel.as_deref(), Some("greentic:webchat:notify"));
179                assert_eq!(capacity, 128);
180            }
181            _ => panic!("expected Redis variant"),
182        }
183    }
184}