Skip to main content

this/config/
sinks.rs

1//! Configuration types for event sinks (notification destinations)
2//!
3//! These structs are deserialized from the `sinks` section of `this.yaml`.
4//! Sinks define where processed events are delivered: push notifications,
5//! in-app storage, WebSocket, webhook, counter updates, etc.
6
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Sink configuration — a destination where events are delivered
11///
12/// ```yaml
13/// sinks:
14///   - name: push-notification
15///     type: push
16///     config:
17///       provider: expo
18///
19///   - name: in-app-notification
20///     type: in_app
21///     config:
22///       ttl: 30d
23///
24///   - name: analytics-webhook
25///     type: webhook
26///     config:
27///       url: https://analytics.example.com/events
28///       method: POST
29///       headers:
30///         Authorization: "Bearer {{ env.ANALYTICS_TOKEN }}"
31/// ```
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct SinkConfig {
34    /// Unique sink name (referenced by `deliver` operators in flows)
35    pub name: String,
36
37    /// Sink type
38    #[serde(rename = "type")]
39    pub sink_type: SinkType,
40
41    /// Type-specific configuration
42    #[serde(default)]
43    pub config: HashMap<String, serde_json::Value>,
44}
45
46/// Available sink types
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
48#[serde(rename_all = "snake_case")]
49pub enum SinkType {
50    /// Push notifications (Expo, APNs, FCM)
51    Push,
52    /// In-app notification store (list, mark_as_read, unread_count)
53    InApp,
54    /// Feed (ordered event stream per user)
55    Feed,
56    /// WebSocket dispatch to connected clients
57    WebSocket,
58    /// HTTP webhook (POST/PUT to external URL)
59    Webhook,
60    /// Counter update on an entity field
61    Counter,
62    /// Custom sink (user-provided implementation)
63    Custom,
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69
70    #[test]
71    fn test_sink_config_push() {
72        let yaml = r#"
73name: push-notification
74type: push
75config:
76  provider: expo
77  retry_count: 3
78"#;
79
80        let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
81        assert_eq!(config.name, "push-notification");
82        assert_eq!(config.sink_type, SinkType::Push);
83        assert_eq!(
84            config.config.get("provider").unwrap(),
85            &serde_json::Value::String("expo".to_string())
86        );
87        assert_eq!(
88            config.config.get("retry_count").unwrap(),
89            &serde_json::json!(3)
90        );
91    }
92
93    #[test]
94    fn test_sink_config_in_app() {
95        let yaml = r#"
96name: in-app-notification
97type: in_app
98config:
99  ttl: 30d
100"#;
101
102        let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
103        assert_eq!(config.name, "in-app-notification");
104        assert_eq!(config.sink_type, SinkType::InApp);
105    }
106
107    #[test]
108    fn test_sink_config_webhook() {
109        let yaml = r#"
110name: analytics-webhook
111type: webhook
112config:
113  url: https://analytics.example.com/events
114  method: POST
115  headers:
116    Authorization: "Bearer token123"
117"#;
118
119        let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
120        assert_eq!(config.name, "analytics-webhook");
121        assert_eq!(config.sink_type, SinkType::Webhook);
122        assert!(config.config.contains_key("url"));
123        assert!(config.config.contains_key("headers"));
124    }
125
126    #[test]
127    fn test_sink_config_websocket() {
128        let yaml = r#"
129name: live-updates
130type: web_socket
131config:
132  filter_by: recipient_id
133"#;
134
135        let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
136        assert_eq!(config.name, "live-updates");
137        assert_eq!(config.sink_type, SinkType::WebSocket);
138    }
139
140    #[test]
141    fn test_sink_config_counter() {
142        let yaml = r#"
143name: like-counter
144type: counter
145config:
146  field: like_count
147  operation: increment
148"#;
149
150        let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
151        assert_eq!(config.name, "like-counter");
152        assert_eq!(config.sink_type, SinkType::Counter);
153    }
154
155    #[test]
156    fn test_sink_config_no_config() {
157        let yaml = r#"
158name: simple-sink
159type: in_app
160"#;
161
162        let config: SinkConfig = serde_yaml::from_str(yaml).unwrap();
163        assert_eq!(config.name, "simple-sink");
164        assert!(config.config.is_empty());
165    }
166
167    #[test]
168    fn test_sink_type_serde_roundtrip() {
169        let types = vec![
170            SinkType::Push,
171            SinkType::InApp,
172            SinkType::Feed,
173            SinkType::WebSocket,
174            SinkType::Webhook,
175            SinkType::Counter,
176            SinkType::Custom,
177        ];
178
179        for t in &types {
180            let json = serde_json::to_string(t).unwrap();
181            let roundtrip: SinkType = serde_json::from_str(&json).unwrap();
182            assert_eq!(*t, roundtrip);
183        }
184    }
185
186    #[test]
187    fn test_multiple_sinks_yaml() {
188        let yaml = r#"
189- name: push-notification
190  type: push
191  config:
192    provider: expo
193
194- name: in-app-notification
195  type: in_app
196  config:
197    ttl: 30d
198
199- name: websocket
200  type: web_socket
201  config:
202    filter_by: recipient_id
203"#;
204
205        let sinks: Vec<SinkConfig> = serde_yaml::from_str(yaml).unwrap();
206        assert_eq!(sinks.len(), 3);
207        assert_eq!(sinks[0].sink_type, SinkType::Push);
208        assert_eq!(sinks[1].sink_type, SinkType::InApp);
209        assert_eq!(sinks[2].sink_type, SinkType::WebSocket);
210    }
211}