Skip to main content

a3s_event/
sink.rs

1//! Event sink — delivery targets for event routing
2//!
3//! `EventSink` defines where events are delivered. Implementations include
4//! publishing to a topic, calling an in-process handler, or logging for
5//! debugging. Used by the Broker/Trigger pattern for event routing.
6
7use crate::error::{EventError, Result};
8use crate::provider::EventProvider;
9use crate::types::{BoxFuture, Event};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14/// Trait for event delivery targets
15///
16/// Sinks receive events from the Broker when a Trigger's filter matches.
17/// Implementations decide how to deliver the event — publish to a topic,
18/// call a handler, log it, etc.
19#[async_trait]
20pub trait EventSink: Send + Sync {
21    /// Deliver an event to this sink
22    async fn deliver(&self, event: &Event) -> Result<()>;
23
24    /// Human-readable sink name for logging
25    fn name(&self) -> &str;
26}
27
28/// Sink that publishes events to an EventProvider topic
29///
30/// Re-publishes matched events to the underlying provider, enabling
31/// event forwarding and fan-out patterns.
32pub struct TopicSink {
33    provider: Arc<dyn EventProvider>,
34    name: String,
35}
36
37impl TopicSink {
38    /// Create a new topic sink backed by a provider
39    pub fn new(name: impl Into<String>, provider: Arc<dyn EventProvider>) -> Self {
40        Self {
41            provider,
42            name: name.into(),
43        }
44    }
45}
46
47#[async_trait]
48impl EventSink for TopicSink {
49    async fn deliver(&self, event: &Event) -> Result<()> {
50        self.provider.publish(event).await?;
51        Ok(())
52    }
53
54    fn name(&self) -> &str {
55        &self.name
56    }
57}
58
59/// Type alias for the async handler function used by InProcessSink
60type HandlerFn =
61    dyn Fn(Event) -> BoxFuture<'static, Result<()>> + Send + Sync;
62
63/// Sink that calls an in-process async handler
64///
65/// Useful for direct event processing without going through a provider.
66pub struct InProcessSink {
67    handler: Arc<HandlerFn>,
68    name: String,
69}
70
71impl InProcessSink {
72    /// Create a new in-process sink with an async handler
73    pub fn new<F, Fut>(name: impl Into<String>, handler: F) -> Self
74    where
75        F: Fn(Event) -> Fut + Send + Sync + 'static,
76        Fut: std::future::Future<Output = Result<()>> + Send + 'static,
77    {
78        let handler = Arc::new(move |event: Event| -> BoxFuture<'static, Result<()>> {
79            Box::pin(handler(event))
80        }) as Arc<HandlerFn>;
81
82        Self {
83            handler,
84            name: name.into(),
85        }
86    }
87}
88
89#[async_trait]
90impl EventSink for InProcessSink {
91    async fn deliver(&self, event: &Event) -> Result<()> {
92        (self.handler)(event.clone()).await
93    }
94
95    fn name(&self) -> &str {
96        &self.name
97    }
98}
99
100/// Sink that logs events via tracing (for debugging)
101///
102/// Does not perform any delivery — just logs the event at info level.
103pub struct LogSink {
104    name: String,
105}
106
107impl LogSink {
108    /// Create a new log sink
109    pub fn new(name: impl Into<String>) -> Self {
110        Self { name: name.into() }
111    }
112}
113
114impl Default for LogSink {
115    fn default() -> Self {
116        Self::new("log-sink")
117    }
118}
119
120#[async_trait]
121impl EventSink for LogSink {
122    async fn deliver(&self, event: &Event) -> Result<()> {
123        tracing::info!(
124            sink = %self.name,
125            event_id = %event.id,
126            subject = %event.subject,
127            event_type = %event.event_type,
128            "Event delivered to log sink"
129        );
130        Ok(())
131    }
132
133    fn name(&self) -> &str {
134        &self.name
135    }
136}
137
138/// Sink that collects events in memory (for testing)
139pub struct CollectorSink {
140    events: Arc<Mutex<Vec<Event>>>,
141    name: String,
142}
143
144impl CollectorSink {
145    /// Create a new collector sink
146    pub fn new(name: impl Into<String>) -> Self {
147        Self {
148            events: Arc::new(Mutex::new(Vec::new())),
149            name: name.into(),
150        }
151    }
152
153    /// Get collected events
154    pub async fn events(&self) -> Vec<Event> {
155        self.events.lock().await.clone()
156    }
157
158    /// Get count of collected events
159    pub async fn count(&self) -> usize {
160        self.events.lock().await.len()
161    }
162}
163
164#[async_trait]
165impl EventSink for CollectorSink {
166    async fn deliver(&self, event: &Event) -> Result<()> {
167        self.events.lock().await.push(event.clone());
168        Ok(())
169    }
170
171    fn name(&self) -> &str {
172        &self.name
173    }
174}
175
176/// Sink that always fails delivery (for testing error paths)
177pub struct FailingSink {
178    name: String,
179    reason: String,
180}
181
182impl FailingSink {
183    /// Create a new failing sink
184    pub fn new(name: impl Into<String>, reason: impl Into<String>) -> Self {
185        Self {
186            name: name.into(),
187            reason: reason.into(),
188        }
189    }
190}
191
192#[async_trait]
193impl EventSink for FailingSink {
194    async fn deliver(&self, _event: &Event) -> Result<()> {
195        Err(EventError::SinkDelivery {
196            sink: self.name.clone(),
197            reason: self.reason.clone(),
198        })
199    }
200
201    fn name(&self) -> &str {
202        &self.name
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use crate::provider::memory::MemoryProvider;
210
211    fn test_event() -> Event {
212        Event::new(
213            "events.test.a",
214            "test",
215            "Test event",
216            "test-src",
217            serde_json::json!({"key": "value"}),
218        )
219    }
220
221    #[tokio::test]
222    async fn test_topic_sink_delivers() {
223        let provider = Arc::new(MemoryProvider::default());
224        let sink = TopicSink::new("test-topic-sink", provider.clone());
225
226        assert_eq!(sink.name(), "test-topic-sink");
227
228        let event = test_event();
229        sink.deliver(&event).await.unwrap();
230
231        let history = provider.history(None, 10).await.unwrap();
232        assert_eq!(history.len(), 1);
233        assert_eq!(history[0].id, event.id);
234    }
235
236    #[tokio::test]
237    async fn test_in_process_sink_calls_handler() {
238        let received = Arc::new(Mutex::new(Vec::new()));
239        let received_clone = received.clone();
240
241        let sink = InProcessSink::new("test-handler", move |event: Event| {
242            let received = received_clone.clone();
243            async move {
244                received.lock().await.push(event);
245                Ok(())
246            }
247        });
248
249        assert_eq!(sink.name(), "test-handler");
250
251        let event = test_event();
252        sink.deliver(&event).await.unwrap();
253
254        let events = received.lock().await;
255        assert_eq!(events.len(), 1);
256        assert_eq!(events[0].id, event.id);
257    }
258
259    #[tokio::test]
260    async fn test_log_sink_succeeds() {
261        let sink = LogSink::default();
262        assert_eq!(sink.name(), "log-sink");
263
264        let event = test_event();
265        // Should not error
266        sink.deliver(&event).await.unwrap();
267    }
268
269    #[tokio::test]
270    async fn test_log_sink_custom_name() {
271        let sink = LogSink::new("debug-sink");
272        assert_eq!(sink.name(), "debug-sink");
273    }
274
275    #[tokio::test]
276    async fn test_collector_sink() {
277        let sink = CollectorSink::new("collector");
278        assert_eq!(sink.name(), "collector");
279        assert_eq!(sink.count().await, 0);
280
281        let e1 = test_event();
282        let e2 = Event::new("events.test.b", "test", "B", "src", serde_json::json!({}));
283
284        sink.deliver(&e1).await.unwrap();
285        sink.deliver(&e2).await.unwrap();
286
287        assert_eq!(sink.count().await, 2);
288        let events = sink.events().await;
289        assert_eq!(events[0].id, e1.id);
290        assert_eq!(events[1].id, e2.id);
291    }
292
293    #[tokio::test]
294    async fn test_failing_sink_returns_error() {
295        let sink = FailingSink::new("bad-sink", "connection refused");
296        assert_eq!(sink.name(), "bad-sink");
297
298        let event = test_event();
299        let err = sink.deliver(&event).await.unwrap_err();
300        let msg = err.to_string();
301        assert!(msg.contains("bad-sink"));
302        assert!(msg.contains("connection refused"));
303    }
304
305    #[tokio::test]
306    async fn test_in_process_sink_error_propagation() {
307        let sink = InProcessSink::new("err-handler", |_event: Event| async {
308            Err(EventError::SinkDelivery {
309                sink: "err-handler".to_string(),
310                reason: "processing failed".to_string(),
311            })
312        });
313
314        let event = test_event();
315        assert!(sink.deliver(&event).await.is_err());
316    }
317
318    #[tokio::test]
319    async fn test_dyn_event_sink_trait_object() {
320        let sinks: Vec<Box<dyn EventSink>> = vec![
321            Box::new(LogSink::default()),
322            Box::new(CollectorSink::new("collector")),
323        ];
324
325        let event = test_event();
326        for sink in &sinks {
327            sink.deliver(&event).await.unwrap();
328        }
329
330        assert_eq!(sinks.len(), 2);
331    }
332}