Skip to main content

varpulis_connector_api/
sink.rs

1//! Sink trait and error types for outputting processed events
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use varpulis_core::Event;
7
8use crate::types::ConnectorError;
9
10/// Errors produced by sink operations.
11#[derive(Debug, thiserror::Error)]
12pub enum SinkError {
13    /// I/O error (file writes, network, etc.)
14    #[error("I/O error: {0}")]
15    Io(#[from] std::io::Error),
16
17    /// Serialization error (JSON encoding)
18    #[error("serialization error: {0}")]
19    Serialization(#[from] serde_json::Error),
20
21    /// HTTP request error
22    #[error("HTTP error: {0}")]
23    Http(#[from] reqwest::Error),
24
25    /// Connector-level error
26    #[error("connector error: {0}")]
27    Connector(#[from] ConnectorError),
28
29    /// Generic error with message
30    #[error("{0}")]
31    Other(String),
32}
33
34impl SinkError {
35    /// Create a generic error from a displayable value.
36    pub fn other(msg: impl std::fmt::Display) -> Self {
37        Self::Other(msg.to_string())
38    }
39}
40
41/// Trait for event sinks
42#[async_trait]
43pub trait Sink: Send + Sync {
44    /// Name of this sink
45    fn name(&self) -> &str;
46
47    /// Establish connection to the external system.
48    async fn connect(&self) -> Result<(), SinkError> {
49        Ok(())
50    }
51
52    /// Send an event to this sink
53    async fn send(&self, event: &Event) -> Result<(), SinkError>;
54
55    /// Send a batch of events to this sink.
56    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
57        for event in events {
58            self.send(event).await?;
59        }
60        Ok(())
61    }
62
63    /// Send a batch of events to a specific topic (for dynamic routing).
64    async fn send_batch_to_topic(
65        &self,
66        events: &[Arc<Event>],
67        _topic: &str,
68    ) -> Result<(), SinkError> {
69        self.send_batch(events).await
70    }
71
72    /// Flush any buffered data
73    async fn flush(&self) -> Result<(), SinkError>;
74
75    /// Close the sink
76    async fn close(&self) -> Result<(), SinkError>;
77}
78
79/// Adapter: wraps a `SinkConnector` as a `Sink` for use in the sink registry.
80pub struct SinkConnectorAdapter {
81    /// The name of this adapter.
82    pub name: String,
83    /// The inner sink connector.
84    pub inner: tokio::sync::Mutex<Box<dyn crate::types::SinkConnector>>,
85}
86
87impl std::fmt::Debug for SinkConnectorAdapter {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_struct("SinkConnectorAdapter")
90            .finish_non_exhaustive()
91    }
92}
93
94impl SinkConnectorAdapter {
95    /// Create a new adapter wrapping a `SinkConnector`.
96    pub fn new(name: &str, connector: Box<dyn crate::types::SinkConnector>) -> Self {
97        Self {
98            name: name.to_string(),
99            inner: tokio::sync::Mutex::new(connector),
100        }
101    }
102}
103
104#[async_trait]
105impl Sink for SinkConnectorAdapter {
106    fn name(&self) -> &str {
107        &self.name
108    }
109    async fn connect(&self) -> Result<(), SinkError> {
110        let mut inner = self.inner.lock().await;
111        inner.connect().await.map_err(SinkError::from)
112    }
113    async fn send(&self, event: &Event) -> Result<(), SinkError> {
114        let inner = self.inner.lock().await;
115        inner.send(event).await.map_err(SinkError::from)
116    }
117    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
118        let inner = self.inner.lock().await;
119        for event in events {
120            inner.send(event).await.map_err(SinkError::from)?;
121        }
122        Ok(())
123    }
124    async fn send_batch_to_topic(
125        &self,
126        events: &[Arc<Event>],
127        topic: &str,
128    ) -> Result<(), SinkError> {
129        let inner = self.inner.lock().await;
130        inner
131            .send_to_topic(events, topic)
132            .await
133            .map_err(SinkError::from)
134    }
135    async fn flush(&self) -> Result<(), SinkError> {
136        let inner = self.inner.lock().await;
137        inner.flush().await.map_err(SinkError::from)
138    }
139    async fn close(&self) -> Result<(), SinkError> {
140        let inner = self.inner.lock().await;
141        inner.close().await.map_err(SinkError::from)
142    }
143}