1use crate::error::{EventError, Result};
8use crate::provider::EventProvider;
9use crate::types::{BoxFuture, Event};
10use async_trait::async_trait;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14#[async_trait]
20pub trait EventSink: Send + Sync {
21 async fn deliver(&self, event: &Event) -> Result<()>;
23
24 fn name(&self) -> &str;
26}
27
28pub struct TopicSink {
33 provider: Arc<dyn EventProvider>,
34 name: String,
35}
36
37impl TopicSink {
38 pub fn new(name: impl Into<String>, provider: Arc<dyn EventProvider>) -> Self {
40 Self {
41 provider,
42 name: name.into(),
43 }
44 }
45}
46
47#[async_trait]
48impl EventSink for TopicSink {
49 async fn deliver(&self, event: &Event) -> Result<()> {
50 self.provider.publish(event).await?;
51 Ok(())
52 }
53
54 fn name(&self) -> &str {
55 &self.name
56 }
57}
58
59type HandlerFn =
61 dyn Fn(Event) -> BoxFuture<'static, Result<()>> + Send + Sync;
62
63pub struct InProcessSink {
67 handler: Arc<HandlerFn>,
68 name: String,
69}
70
71impl InProcessSink {
72 pub fn new<F, Fut>(name: impl Into<String>, handler: F) -> Self
74 where
75 F: Fn(Event) -> Fut + Send + Sync + 'static,
76 Fut: std::future::Future<Output = Result<()>> + Send + 'static,
77 {
78 let handler = Arc::new(move |event: Event| -> BoxFuture<'static, Result<()>> {
79 Box::pin(handler(event))
80 }) as Arc<HandlerFn>;
81
82 Self {
83 handler,
84 name: name.into(),
85 }
86 }
87}
88
89#[async_trait]
90impl EventSink for InProcessSink {
91 async fn deliver(&self, event: &Event) -> Result<()> {
92 (self.handler)(event.clone()).await
93 }
94
95 fn name(&self) -> &str {
96 &self.name
97 }
98}
99
100pub struct LogSink {
104 name: String,
105}
106
107impl LogSink {
108 pub fn new(name: impl Into<String>) -> Self {
110 Self { name: name.into() }
111 }
112}
113
114impl Default for LogSink {
115 fn default() -> Self {
116 Self::new("log-sink")
117 }
118}
119
120#[async_trait]
121impl EventSink for LogSink {
122 async fn deliver(&self, event: &Event) -> Result<()> {
123 tracing::info!(
124 sink = %self.name,
125 event_id = %event.id,
126 subject = %event.subject,
127 event_type = %event.event_type,
128 "Event delivered to log sink"
129 );
130 Ok(())
131 }
132
133 fn name(&self) -> &str {
134 &self.name
135 }
136}
137
138pub struct CollectorSink {
140 events: Arc<Mutex<Vec<Event>>>,
141 name: String,
142}
143
144impl CollectorSink {
145 pub fn new(name: impl Into<String>) -> Self {
147 Self {
148 events: Arc::new(Mutex::new(Vec::new())),
149 name: name.into(),
150 }
151 }
152
153 pub async fn events(&self) -> Vec<Event> {
155 self.events.lock().await.clone()
156 }
157
158 pub async fn count(&self) -> usize {
160 self.events.lock().await.len()
161 }
162}
163
164#[async_trait]
165impl EventSink for CollectorSink {
166 async fn deliver(&self, event: &Event) -> Result<()> {
167 self.events.lock().await.push(event.clone());
168 Ok(())
169 }
170
171 fn name(&self) -> &str {
172 &self.name
173 }
174}
175
176pub struct FailingSink {
178 name: String,
179 reason: String,
180}
181
182impl FailingSink {
183 pub fn new(name: impl Into<String>, reason: impl Into<String>) -> Self {
185 Self {
186 name: name.into(),
187 reason: reason.into(),
188 }
189 }
190}
191
192#[async_trait]
193impl EventSink for FailingSink {
194 async fn deliver(&self, _event: &Event) -> Result<()> {
195 Err(EventError::SinkDelivery {
196 sink: self.name.clone(),
197 reason: self.reason.clone(),
198 })
199 }
200
201 fn name(&self) -> &str {
202 &self.name
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use crate::provider::memory::MemoryProvider;
210
211 fn test_event() -> Event {
212 Event::new(
213 "events.test.a",
214 "test",
215 "Test event",
216 "test-src",
217 serde_json::json!({"key": "value"}),
218 )
219 }
220
221 #[tokio::test]
222 async fn test_topic_sink_delivers() {
223 let provider = Arc::new(MemoryProvider::default());
224 let sink = TopicSink::new("test-topic-sink", provider.clone());
225
226 assert_eq!(sink.name(), "test-topic-sink");
227
228 let event = test_event();
229 sink.deliver(&event).await.unwrap();
230
231 let history = provider.history(None, 10).await.unwrap();
232 assert_eq!(history.len(), 1);
233 assert_eq!(history[0].id, event.id);
234 }
235
236 #[tokio::test]
237 async fn test_in_process_sink_calls_handler() {
238 let received = Arc::new(Mutex::new(Vec::new()));
239 let received_clone = received.clone();
240
241 let sink = InProcessSink::new("test-handler", move |event: Event| {
242 let received = received_clone.clone();
243 async move {
244 received.lock().await.push(event);
245 Ok(())
246 }
247 });
248
249 assert_eq!(sink.name(), "test-handler");
250
251 let event = test_event();
252 sink.deliver(&event).await.unwrap();
253
254 let events = received.lock().await;
255 assert_eq!(events.len(), 1);
256 assert_eq!(events[0].id, event.id);
257 }
258
259 #[tokio::test]
260 async fn test_log_sink_succeeds() {
261 let sink = LogSink::default();
262 assert_eq!(sink.name(), "log-sink");
263
264 let event = test_event();
265 sink.deliver(&event).await.unwrap();
267 }
268
269 #[tokio::test]
270 async fn test_log_sink_custom_name() {
271 let sink = LogSink::new("debug-sink");
272 assert_eq!(sink.name(), "debug-sink");
273 }
274
275 #[tokio::test]
276 async fn test_collector_sink() {
277 let sink = CollectorSink::new("collector");
278 assert_eq!(sink.name(), "collector");
279 assert_eq!(sink.count().await, 0);
280
281 let e1 = test_event();
282 let e2 = Event::new("events.test.b", "test", "B", "src", serde_json::json!({}));
283
284 sink.deliver(&e1).await.unwrap();
285 sink.deliver(&e2).await.unwrap();
286
287 assert_eq!(sink.count().await, 2);
288 let events = sink.events().await;
289 assert_eq!(events[0].id, e1.id);
290 assert_eq!(events[1].id, e2.id);
291 }
292
293 #[tokio::test]
294 async fn test_failing_sink_returns_error() {
295 let sink = FailingSink::new("bad-sink", "connection refused");
296 assert_eq!(sink.name(), "bad-sink");
297
298 let event = test_event();
299 let err = sink.deliver(&event).await.unwrap_err();
300 let msg = err.to_string();
301 assert!(msg.contains("bad-sink"));
302 assert!(msg.contains("connection refused"));
303 }
304
305 #[tokio::test]
306 async fn test_in_process_sink_error_propagation() {
307 let sink = InProcessSink::new("err-handler", |_event: Event| async {
308 Err(EventError::SinkDelivery {
309 sink: "err-handler".to_string(),
310 reason: "processing failed".to_string(),
311 })
312 });
313
314 let event = test_event();
315 assert!(sink.deliver(&event).await.is_err());
316 }
317
318 #[tokio::test]
319 async fn test_dyn_event_sink_trait_object() {
320 let sinks: Vec<Box<dyn EventSink>> = vec![
321 Box::new(LogSink::default()),
322 Box::new(CollectorSink::new("collector")),
323 ];
324
325 let event = test_event();
326 for sink in &sinks {
327 sink.deliver(&event).await.unwrap();
328 }
329
330 assert_eq!(sinks.len(), 2);
331 }
332}