Skip to main content

this/events/sinks/
mod.rs

1//! Sink system — destinations for processed events
2//!
3//! Sinks are the final step in a pipeline: the `deliver` operator dispatches
4//! the `_payload` to one or more registered sinks by name.
5//!
6//! # Architecture
7//!
8//! ```text
9//! FlowRuntime → Pipeline → DeliverOp → SinkRegistry → Sink::deliver()
10//! ```
11//!
12//! # Sink types
13//!
14//! - `InApp` — In-app notification store (list, mark_as_read, unread_count)
15//! - `Push` — Push notifications (Expo/APNs/FCM) [Plan 3, T3.2]
16//! - `WebSocket` — Real-time dispatch to connected clients [Plan 3, T3.3]
17//! - `Webhook` — HTTP POST to external URLs [Plan 3, T3.3]
18//! - `Counter` — Counter update on entity fields [Plan 3, T3.3]
19
20pub mod counter;
21pub mod device_tokens;
22pub mod in_app;
23pub mod preferences;
24pub mod push;
25pub mod webhook;
26pub mod websocket;
27
28pub use counter::{CounterConfig, CounterOperation, CounterSink, EntityFieldUpdater};
29pub use device_tokens::{DeviceToken, DeviceTokenStore, Platform};
30pub use in_app::{InAppNotificationSink, NotificationStore};
31pub use preferences::{NotificationPreferencesStore, UserPreferences};
32#[cfg(feature = "push")]
33pub use push::ExpoPushProvider;
34pub use push::{PushNotificationSink, PushProvider};
35pub use webhook::{HttpSender, WebhookConfig, WebhookSink};
36pub use websocket::{WebSocketDispatcher, WebSocketSink};
37
38use crate::config::sinks::SinkType;
39use anyhow::Result;
40use async_trait::async_trait;
41use serde_json::Value;
42use std::collections::HashMap;
43use std::sync::{Arc, RwLock};
44
45/// Resolve the recipient ID from multiple sources
46///
47/// Priority: explicit parameter > payload field > context variable.
48/// Returns `None` if no recipient ID is found in any source.
49///
50/// Shared by all sinks that need a recipient (in_app, push, websocket).
51pub fn resolve_recipient(
52    explicit: Option<&str>,
53    payload: &Value,
54    context_vars: &HashMap<String, Value>,
55) -> Option<String> {
56    explicit
57        .map(|s| s.to_string())
58        .or_else(|| {
59            payload
60                .get("recipient_id")
61                .and_then(|v| v.as_str())
62                .map(|s| s.to_string())
63        })
64        .or_else(|| {
65            context_vars
66                .get("recipient_id")
67                .and_then(|v| v.as_str())
68                .map(|s| s.to_string())
69        })
70}
71
72/// Trait for event sinks — destinations that receive processed events
73///
74/// Each sink is registered in the `SinkRegistry` by name (matching the
75/// YAML `sinks[].name` field). The `deliver` operator looks up sinks
76/// by name and calls `deliver()` with the payload.
77///
78/// # Object Safety
79///
80/// This trait is object-safe: no generics, all methods take `&self`.
81/// It can be used as `Arc<dyn Sink>`.
82#[async_trait]
83pub trait Sink: Send + Sync + std::fmt::Debug {
84    /// Deliver a payload to this sink
85    ///
86    /// - `payload`: The JSON payload built by the `map` operator
87    /// - `recipient_id`: Optional recipient (e.g., user ID for notifications)
88    /// - `context_vars`: Additional context variables from the pipeline
89    async fn deliver(
90        &self,
91        payload: Value,
92        recipient_id: Option<&str>,
93        context_vars: &HashMap<String, Value>,
94    ) -> Result<()>;
95
96    /// Human-readable name for this sink instance
97    fn name(&self) -> &str;
98
99    /// The sink type (matches SinkConfig.sink_type)
100    fn sink_type(&self) -> SinkType;
101}
102
103/// Registry of named sinks
104///
105/// The SinkRegistry maps sink names (from YAML config) to sink
106/// implementations. The `deliver` operator uses this to dispatch
107/// payloads to the correct sinks.
108///
109/// # Thread Safety
110///
111/// Uses interior mutability (`RwLock`) so that sinks can be registered
112/// after initial construction (e.g., the WebSocket sink is wired when
113/// `WebSocketExposure::build_router()` is called, after the host is
114/// already wrapped in `Arc`).
115#[derive(Debug)]
116pub struct SinkRegistry {
117    sinks: RwLock<HashMap<String, Arc<dyn Sink>>>,
118}
119
120impl SinkRegistry {
121    /// Create an empty sink registry
122    pub fn new() -> Self {
123        Self {
124            sinks: RwLock::new(HashMap::new()),
125        }
126    }
127
128    /// Register a sink by name
129    ///
130    /// If a sink with the same name already exists, it is replaced.
131    /// This method uses interior mutability so it can be called through
132    /// `&self` (even behind `Arc`).
133    pub fn register(&self, name: impl Into<String>, sink: Arc<dyn Sink>) {
134        self.sinks.write().unwrap().insert(name.into(), sink);
135    }
136
137    /// Look up a sink by name
138    pub fn get(&self, name: &str) -> Option<Arc<dyn Sink>> {
139        self.sinks.read().unwrap().get(name).cloned()
140    }
141
142    /// Get all registered sink names
143    pub fn names(&self) -> Vec<String> {
144        self.sinks.read().unwrap().keys().cloned().collect()
145    }
146
147    /// Deliver a payload to a named sink
148    ///
149    /// Returns an error if the sink is not found.
150    pub async fn deliver(
151        &self,
152        sink_name: &str,
153        payload: Value,
154        recipient_id: Option<&str>,
155        context_vars: &HashMap<String, Value>,
156    ) -> Result<()> {
157        let sink = self
158            .get(sink_name)
159            .ok_or_else(|| anyhow::anyhow!("sink '{}' not found in registry", sink_name))?;
160
161        sink.deliver(payload, recipient_id, context_vars).await
162    }
163
164    /// Number of registered sinks
165    pub fn len(&self) -> usize {
166        self.sinks.read().unwrap().len()
167    }
168
169    /// Whether the registry is empty
170    pub fn is_empty(&self) -> bool {
171        self.sinks.read().unwrap().is_empty()
172    }
173}
174
175impl Default for SinkRegistry {
176    fn default() -> Self {
177        Self::new()
178    }
179}
180
181/// Factory for creating sinks from YAML configuration
182///
183/// Builds sink instances from `SinkConfig` entries. Some sinks can be
184/// auto-created (InApp), while others require external dependencies
185/// provided by the user (Push needs PushProvider, Counter needs EntityFieldUpdater,
186/// WebSocket needs WebSocketDispatcher, Webhook needs HttpSender).
187///
188/// Sinks that cannot be auto-created are logged as warnings and skipped.
189pub struct SinkFactory {
190    /// Shared notification store (created once, reused by all InApp sinks)
191    notification_store: Arc<NotificationStore>,
192
193    /// Shared preferences store
194    preferences_store: Arc<NotificationPreferencesStore>,
195
196    /// Shared device token store
197    device_token_store: Arc<DeviceTokenStore>,
198}
199
200impl SinkFactory {
201    /// Create a new SinkFactory with fresh stores
202    pub fn new() -> Self {
203        Self {
204            notification_store: Arc::new(NotificationStore::new()),
205            preferences_store: Arc::new(NotificationPreferencesStore::new()),
206            device_token_store: Arc::new(DeviceTokenStore::new()),
207        }
208    }
209
210    /// Create a SinkFactory with pre-existing stores
211    pub fn with_stores(
212        notification_store: Arc<NotificationStore>,
213        preferences_store: Arc<NotificationPreferencesStore>,
214        device_token_store: Arc<DeviceTokenStore>,
215    ) -> Self {
216        Self {
217            notification_store,
218            preferences_store,
219            device_token_store,
220        }
221    }
222
223    /// Get the notification store (for sharing with ServerHost)
224    pub fn notification_store(&self) -> &Arc<NotificationStore> {
225        &self.notification_store
226    }
227
228    /// Get the preferences store (for sharing with ServerHost)
229    pub fn preferences_store(&self) -> &Arc<NotificationPreferencesStore> {
230        &self.preferences_store
231    }
232
233    /// Get the device token store (for sharing with ServerHost)
234    pub fn device_token_store(&self) -> &Arc<DeviceTokenStore> {
235        &self.device_token_store
236    }
237
238    /// Build a SinkRegistry from a list of SinkConfigs
239    ///
240    /// Auto-creates sinks that don't need external dependencies (InApp).
241    /// Logs warnings for sinks that need manual wiring (Push, WebSocket,
242    /// Counter, Webhook).
243    pub fn build_registry(
244        &self,
245        sink_configs: &[crate::config::sinks::SinkConfig],
246    ) -> SinkRegistry {
247        let registry = SinkRegistry::new();
248
249        for config in sink_configs {
250            match config.sink_type {
251                SinkType::InApp => {
252                    let sink = InAppNotificationSink::with_preferences(
253                        self.notification_store.clone(),
254                        self.preferences_store.clone(),
255                    );
256                    registry.register(&config.name, Arc::new(sink));
257                    tracing::info!(
258                        sink = %config.name,
259                        "auto-wired InApp notification sink"
260                    );
261                }
262                SinkType::Push => {
263                    tracing::warn!(
264                        sink = %config.name,
265                        "Push sink requires a PushProvider — use ServerBuilder::with_push_provider() to wire it"
266                    );
267                }
268                SinkType::WebSocket => {
269                    tracing::warn!(
270                        sink = %config.name,
271                        "WebSocket sink will be wired automatically when WebSocketExposure is built"
272                    );
273                }
274                SinkType::Webhook => {
275                    tracing::warn!(
276                        sink = %config.name,
277                        "Webhook sink requires an HttpSender implementation — skipping auto-wire"
278                    );
279                }
280                SinkType::Counter => {
281                    tracing::warn!(
282                        sink = %config.name,
283                        "Counter sink requires an EntityFieldUpdater — use ServerBuilder::with_counter_updater() to wire it"
284                    );
285                }
286                SinkType::Feed => {
287                    tracing::warn!(
288                        sink = %config.name,
289                        "Feed sink is not yet implemented — skipping"
290                    );
291                }
292                SinkType::Custom => {
293                    tracing::warn!(
294                        sink = %config.name,
295                        "Custom sink requires manual registration — skipping auto-wire"
296                    );
297                }
298            }
299        }
300
301        registry
302    }
303}
304
305impl Default for SinkFactory {
306    fn default() -> Self {
307        Self::new()
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use serde_json::json;
315
316    type DeliveryLog = Vec<(Value, Option<String>)>;
317
318    /// A simple test sink that records deliveries
319    #[derive(Debug)]
320    struct TestSink {
321        sink_name: String,
322        deliveries: Arc<tokio::sync::Mutex<DeliveryLog>>,
323    }
324
325    impl TestSink {
326        fn new(name: &str) -> Self {
327            Self {
328                sink_name: name.to_string(),
329                deliveries: Arc::new(tokio::sync::Mutex::new(Vec::new())),
330            }
331        }
332    }
333
334    #[async_trait]
335    impl Sink for TestSink {
336        async fn deliver(
337            &self,
338            payload: Value,
339            recipient_id: Option<&str>,
340            _context_vars: &HashMap<String, Value>,
341        ) -> Result<()> {
342            self.deliveries
343                .lock()
344                .await
345                .push((payload, recipient_id.map(|s| s.to_string())));
346            Ok(())
347        }
348
349        fn name(&self) -> &str {
350            &self.sink_name
351        }
352
353        fn sink_type(&self) -> SinkType {
354            SinkType::Custom
355        }
356    }
357
358    #[test]
359    fn test_registry_register_and_get() {
360        let registry = SinkRegistry::new();
361        let sink = Arc::new(TestSink::new("test-sink"));
362        registry.register("test-sink", sink);
363
364        assert_eq!(registry.len(), 1);
365        assert!(registry.get("test-sink").is_some());
366        assert!(registry.get("nonexistent").is_none());
367    }
368
369    #[test]
370    fn test_registry_names() {
371        let registry = SinkRegistry::new();
372        registry.register("a", Arc::new(TestSink::new("a")));
373        registry.register("b", Arc::new(TestSink::new("b")));
374
375        let mut names = registry.names();
376        names.sort();
377        assert_eq!(names, vec!["a", "b"]);
378    }
379
380    #[tokio::test]
381    async fn test_registry_deliver() {
382        let registry = SinkRegistry::new();
383        let sink = Arc::new(TestSink::new("test-sink"));
384        let deliveries = sink.deliveries.clone();
385        registry.register("test-sink", sink);
386
387        let payload = json!({"title": "Hello", "body": "World"});
388        registry
389            .deliver(
390                "test-sink",
391                payload.clone(),
392                Some("user-1"),
393                &HashMap::new(),
394            )
395            .await
396            .unwrap();
397
398        let recorded = deliveries.lock().await;
399        assert_eq!(recorded.len(), 1);
400        assert_eq!(recorded[0].0, payload);
401        assert_eq!(recorded[0].1.as_deref(), Some("user-1"));
402    }
403
404    #[tokio::test]
405    async fn test_registry_deliver_unknown_sink() {
406        let registry = SinkRegistry::new();
407
408        let result = registry
409            .deliver("nonexistent", json!({}), None, &HashMap::new())
410            .await;
411
412        assert!(result.is_err());
413        assert!(result.unwrap_err().to_string().contains("nonexistent"));
414    }
415
416    #[test]
417    fn test_registry_replace_sink() {
418        let registry = SinkRegistry::new();
419        registry.register("s", Arc::new(TestSink::new("s-v1")));
420        registry.register("s", Arc::new(TestSink::new("s-v2")));
421
422        assert_eq!(registry.len(), 1);
423        assert_eq!(registry.get("s").unwrap().name(), "s-v2");
424    }
425
426    #[test]
427    fn test_registry_default_is_empty() {
428        let registry = SinkRegistry::default();
429        assert!(registry.is_empty());
430        assert_eq!(registry.len(), 0);
431    }
432}