1pub mod counter;
21pub mod device_tokens;
22pub mod in_app;
23pub mod preferences;
24pub mod push;
25pub mod webhook;
26pub mod websocket;
27
28pub use counter::{CounterConfig, CounterOperation, CounterSink, EntityFieldUpdater};
29pub use device_tokens::{DeviceToken, DeviceTokenStore, Platform};
30pub use in_app::{InAppNotificationSink, NotificationStore};
31pub use preferences::{NotificationPreferencesStore, UserPreferences};
32#[cfg(feature = "push")]
33pub use push::ExpoPushProvider;
34pub use push::{PushNotificationSink, PushProvider};
35pub use webhook::{HttpSender, WebhookConfig, WebhookSink};
36pub use websocket::{WebSocketDispatcher, WebSocketSink};
37
38use crate::config::sinks::SinkType;
39use anyhow::Result;
40use async_trait::async_trait;
41use serde_json::Value;
42use std::collections::HashMap;
43use std::sync::{Arc, RwLock};
44
45pub fn resolve_recipient(
52 explicit: Option<&str>,
53 payload: &Value,
54 context_vars: &HashMap<String, Value>,
55) -> Option<String> {
56 explicit
57 .map(|s| s.to_string())
58 .or_else(|| {
59 payload
60 .get("recipient_id")
61 .and_then(|v| v.as_str())
62 .map(|s| s.to_string())
63 })
64 .or_else(|| {
65 context_vars
66 .get("recipient_id")
67 .and_then(|v| v.as_str())
68 .map(|s| s.to_string())
69 })
70}
71
72#[async_trait]
83pub trait Sink: Send + Sync + std::fmt::Debug {
84 async fn deliver(
90 &self,
91 payload: Value,
92 recipient_id: Option<&str>,
93 context_vars: &HashMap<String, Value>,
94 ) -> Result<()>;
95
96 fn name(&self) -> &str;
98
99 fn sink_type(&self) -> SinkType;
101}
102
103#[derive(Debug)]
116pub struct SinkRegistry {
117 sinks: RwLock<HashMap<String, Arc<dyn Sink>>>,
118}
119
120impl SinkRegistry {
121 pub fn new() -> Self {
123 Self {
124 sinks: RwLock::new(HashMap::new()),
125 }
126 }
127
128 pub fn register(&self, name: impl Into<String>, sink: Arc<dyn Sink>) {
134 self.sinks.write().unwrap().insert(name.into(), sink);
135 }
136
137 pub fn get(&self, name: &str) -> Option<Arc<dyn Sink>> {
139 self.sinks.read().unwrap().get(name).cloned()
140 }
141
142 pub fn names(&self) -> Vec<String> {
144 self.sinks.read().unwrap().keys().cloned().collect()
145 }
146
147 pub async fn deliver(
151 &self,
152 sink_name: &str,
153 payload: Value,
154 recipient_id: Option<&str>,
155 context_vars: &HashMap<String, Value>,
156 ) -> Result<()> {
157 let sink = self
158 .get(sink_name)
159 .ok_or_else(|| anyhow::anyhow!("sink '{}' not found in registry", sink_name))?;
160
161 sink.deliver(payload, recipient_id, context_vars).await
162 }
163
164 pub fn len(&self) -> usize {
166 self.sinks.read().unwrap().len()
167 }
168
169 pub fn is_empty(&self) -> bool {
171 self.sinks.read().unwrap().is_empty()
172 }
173}
174
175impl Default for SinkRegistry {
176 fn default() -> Self {
177 Self::new()
178 }
179}
180
181pub struct SinkFactory {
190 notification_store: Arc<NotificationStore>,
192
193 preferences_store: Arc<NotificationPreferencesStore>,
195
196 device_token_store: Arc<DeviceTokenStore>,
198}
199
200impl SinkFactory {
201 pub fn new() -> Self {
203 Self {
204 notification_store: Arc::new(NotificationStore::new()),
205 preferences_store: Arc::new(NotificationPreferencesStore::new()),
206 device_token_store: Arc::new(DeviceTokenStore::new()),
207 }
208 }
209
210 pub fn with_stores(
212 notification_store: Arc<NotificationStore>,
213 preferences_store: Arc<NotificationPreferencesStore>,
214 device_token_store: Arc<DeviceTokenStore>,
215 ) -> Self {
216 Self {
217 notification_store,
218 preferences_store,
219 device_token_store,
220 }
221 }
222
223 pub fn notification_store(&self) -> &Arc<NotificationStore> {
225 &self.notification_store
226 }
227
228 pub fn preferences_store(&self) -> &Arc<NotificationPreferencesStore> {
230 &self.preferences_store
231 }
232
233 pub fn device_token_store(&self) -> &Arc<DeviceTokenStore> {
235 &self.device_token_store
236 }
237
238 pub fn build_registry(
244 &self,
245 sink_configs: &[crate::config::sinks::SinkConfig],
246 ) -> SinkRegistry {
247 let registry = SinkRegistry::new();
248
249 for config in sink_configs {
250 match config.sink_type {
251 SinkType::InApp => {
252 let sink = InAppNotificationSink::with_preferences(
253 self.notification_store.clone(),
254 self.preferences_store.clone(),
255 );
256 registry.register(&config.name, Arc::new(sink));
257 tracing::info!(
258 sink = %config.name,
259 "auto-wired InApp notification sink"
260 );
261 }
262 SinkType::Push => {
263 tracing::warn!(
264 sink = %config.name,
265 "Push sink requires a PushProvider — use ServerBuilder::with_push_provider() to wire it"
266 );
267 }
268 SinkType::WebSocket => {
269 tracing::warn!(
270 sink = %config.name,
271 "WebSocket sink will be wired automatically when WebSocketExposure is built"
272 );
273 }
274 SinkType::Webhook => {
275 tracing::warn!(
276 sink = %config.name,
277 "Webhook sink requires an HttpSender implementation — skipping auto-wire"
278 );
279 }
280 SinkType::Counter => {
281 tracing::warn!(
282 sink = %config.name,
283 "Counter sink requires an EntityFieldUpdater — use ServerBuilder::with_counter_updater() to wire it"
284 );
285 }
286 SinkType::Feed => {
287 tracing::warn!(
288 sink = %config.name,
289 "Feed sink is not yet implemented — skipping"
290 );
291 }
292 SinkType::Custom => {
293 tracing::warn!(
294 sink = %config.name,
295 "Custom sink requires manual registration — skipping auto-wire"
296 );
297 }
298 }
299 }
300
301 registry
302 }
303}
304
305impl Default for SinkFactory {
306 fn default() -> Self {
307 Self::new()
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use serde_json::json;
315
316 type DeliveryLog = Vec<(Value, Option<String>)>;
317
318 #[derive(Debug)]
320 struct TestSink {
321 sink_name: String,
322 deliveries: Arc<tokio::sync::Mutex<DeliveryLog>>,
323 }
324
325 impl TestSink {
326 fn new(name: &str) -> Self {
327 Self {
328 sink_name: name.to_string(),
329 deliveries: Arc::new(tokio::sync::Mutex::new(Vec::new())),
330 }
331 }
332 }
333
334 #[async_trait]
335 impl Sink for TestSink {
336 async fn deliver(
337 &self,
338 payload: Value,
339 recipient_id: Option<&str>,
340 _context_vars: &HashMap<String, Value>,
341 ) -> Result<()> {
342 self.deliveries
343 .lock()
344 .await
345 .push((payload, recipient_id.map(|s| s.to_string())));
346 Ok(())
347 }
348
349 fn name(&self) -> &str {
350 &self.sink_name
351 }
352
353 fn sink_type(&self) -> SinkType {
354 SinkType::Custom
355 }
356 }
357
358 #[test]
359 fn test_registry_register_and_get() {
360 let registry = SinkRegistry::new();
361 let sink = Arc::new(TestSink::new("test-sink"));
362 registry.register("test-sink", sink);
363
364 assert_eq!(registry.len(), 1);
365 assert!(registry.get("test-sink").is_some());
366 assert!(registry.get("nonexistent").is_none());
367 }
368
369 #[test]
370 fn test_registry_names() {
371 let registry = SinkRegistry::new();
372 registry.register("a", Arc::new(TestSink::new("a")));
373 registry.register("b", Arc::new(TestSink::new("b")));
374
375 let mut names = registry.names();
376 names.sort();
377 assert_eq!(names, vec!["a", "b"]);
378 }
379
380 #[tokio::test]
381 async fn test_registry_deliver() {
382 let registry = SinkRegistry::new();
383 let sink = Arc::new(TestSink::new("test-sink"));
384 let deliveries = sink.deliveries.clone();
385 registry.register("test-sink", sink);
386
387 let payload = json!({"title": "Hello", "body": "World"});
388 registry
389 .deliver(
390 "test-sink",
391 payload.clone(),
392 Some("user-1"),
393 &HashMap::new(),
394 )
395 .await
396 .unwrap();
397
398 let recorded = deliveries.lock().await;
399 assert_eq!(recorded.len(), 1);
400 assert_eq!(recorded[0].0, payload);
401 assert_eq!(recorded[0].1.as_deref(), Some("user-1"));
402 }
403
404 #[tokio::test]
405 async fn test_registry_deliver_unknown_sink() {
406 let registry = SinkRegistry::new();
407
408 let result = registry
409 .deliver("nonexistent", json!({}), None, &HashMap::new())
410 .await;
411
412 assert!(result.is_err());
413 assert!(result.unwrap_err().to_string().contains("nonexistent"));
414 }
415
416 #[test]
417 fn test_registry_replace_sink() {
418 let registry = SinkRegistry::new();
419 registry.register("s", Arc::new(TestSink::new("s-v1")));
420 registry.register("s", Arc::new(TestSink::new("s-v2")));
421
422 assert_eq!(registry.len(), 1);
423 assert_eq!(registry.get("s").unwrap().name(), "s-v2");
424 }
425
426 #[test]
427 fn test_registry_default_is_empty() {
428 let registry = SinkRegistry::default();
429 assert!(registry.is_empty());
430 assert_eq!(registry.len(), 0);
431 }
432}