Skip to main content

varpulis_connectors/
sink.rs

1//! Sink trait and error types for outputting processed events
2//!
3//! This module defines the core `Sink` trait that all event output destinations
4//! must implement, along with the `SinkError` error type.
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use varpulis_core::Event;
10
11use crate::types::ConnectorError;
12
13/// Errors produced by sink operations.
14#[derive(Debug, thiserror::Error)]
15pub enum SinkError {
16    /// I/O error (file writes, network, etc.)
17    #[error("I/O error: {0}")]
18    Io(#[from] std::io::Error),
19
20    /// Serialization error (JSON encoding)
21    #[error("serialization error: {0}")]
22    Serialization(#[from] serde_json::Error),
23
24    /// HTTP request error
25    #[error("HTTP error: {0}")]
26    Http(#[from] reqwest::Error),
27
28    /// Connector-level error
29    #[error("connector error: {0}")]
30    Connector(#[from] ConnectorError),
31
32    /// Generic error with message
33    #[error("{0}")]
34    Other(String),
35}
36
37impl SinkError {
38    /// Create a generic error from a displayable value.
39    pub fn other(msg: impl std::fmt::Display) -> Self {
40        Self::Other(msg.to_string())
41    }
42}
43
44/// Trait for event sinks
45#[async_trait]
46pub trait Sink: Send + Sync {
47    /// Name of this sink
48    fn name(&self) -> &str;
49
50    /// Establish connection to the external system.
51    ///
52    /// Called once after sink creation to establish any necessary connections.
53    /// The default implementation is a no-op for sinks that connect eagerly.
54    async fn connect(&self) -> Result<(), SinkError> {
55        Ok(())
56    }
57
58    /// Send an event to this sink
59    async fn send(&self, event: &Event) -> Result<(), SinkError>;
60
61    /// Send a batch of events to this sink.
62    ///
63    /// Default implementation calls `send()` for each event.
64    /// Connectors should override this to amortize lock/syscall overhead.
65    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
66        for event in events {
67            self.send(event).await?;
68        }
69        Ok(())
70    }
71
72    /// Send a batch of events to a specific topic (for dynamic routing).
73    ///
74    /// Default implementation ignores the topic and delegates to `send_batch()`.
75    /// Connectors that support per-message topics (Kafka, MQTT) should override.
76    async fn send_batch_to_topic(
77        &self,
78        events: &[Arc<Event>],
79        _topic: &str,
80    ) -> Result<(), SinkError> {
81        self.send_batch(events).await
82    }
83
84    /// Flush any buffered data
85    async fn flush(&self) -> Result<(), SinkError>;
86
87    /// Close the sink
88    async fn close(&self) -> Result<(), SinkError>;
89}
90
91/// Adapter: wraps a SinkConnector as a Sink for use in the sink registry.
92pub struct SinkConnectorAdapter {
93    name: String,
94    inner: tokio::sync::Mutex<Box<dyn crate::types::SinkConnector>>,
95}
96
97impl std::fmt::Debug for SinkConnectorAdapter {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("SinkConnectorAdapter")
100            .finish_non_exhaustive()
101    }
102}
103
104impl SinkConnectorAdapter {
105    /// Create a new adapter wrapping a SinkConnector.
106    pub fn new(name: &str, connector: Box<dyn crate::types::SinkConnector>) -> Self {
107        Self {
108            name: name.to_string(),
109            inner: tokio::sync::Mutex::new(connector),
110        }
111    }
112}
113
114#[async_trait]
115impl Sink for SinkConnectorAdapter {
116    fn name(&self) -> &str {
117        &self.name
118    }
119    async fn connect(&self) -> Result<(), SinkError> {
120        let mut inner = self.inner.lock().await;
121        inner.connect().await.map_err(SinkError::from)
122    }
123    async fn send(&self, event: &Event) -> Result<(), SinkError> {
124        let inner = self.inner.lock().await;
125        inner.send(event).await.map_err(SinkError::from)
126    }
127    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
128        let inner = self.inner.lock().await;
129        for event in events {
130            inner.send(event).await.map_err(SinkError::from)?;
131        }
132        Ok(())
133    }
134    async fn send_batch_to_topic(
135        &self,
136        events: &[Arc<Event>],
137        topic: &str,
138    ) -> Result<(), SinkError> {
139        let inner = self.inner.lock().await;
140        inner
141            .send_to_topic(events, topic)
142            .await
143            .map_err(SinkError::from)
144    }
145    async fn flush(&self) -> Result<(), SinkError> {
146        let inner = self.inner.lock().await;
147        inner.flush().await.map_err(SinkError::from)
148    }
149    async fn close(&self) -> Result<(), SinkError> {
150        let inner = self.inner.lock().await;
151        inner.close().await.map_err(SinkError::from)
152    }
153}