varpulis_connectors/
sink.rs1use std::sync::Arc;
7
8use async_trait::async_trait;
9use varpulis_core::Event;
10
11use crate::types::ConnectorError;
12
13#[derive(Debug, thiserror::Error)]
15pub enum SinkError {
16 #[error("I/O error: {0}")]
18 Io(#[from] std::io::Error),
19
20 #[error("serialization error: {0}")]
22 Serialization(#[from] serde_json::Error),
23
24 #[error("HTTP error: {0}")]
26 Http(#[from] reqwest::Error),
27
28 #[error("connector error: {0}")]
30 Connector(#[from] ConnectorError),
31
32 #[error("{0}")]
34 Other(String),
35}
36
37impl SinkError {
38 pub fn other(msg: impl std::fmt::Display) -> Self {
40 Self::Other(msg.to_string())
41 }
42}
43
44#[async_trait]
46pub trait Sink: Send + Sync {
47 fn name(&self) -> &str;
49
50 async fn connect(&self) -> Result<(), SinkError> {
55 Ok(())
56 }
57
58 async fn send(&self, event: &Event) -> Result<(), SinkError>;
60
61 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
66 for event in events {
67 self.send(event).await?;
68 }
69 Ok(())
70 }
71
72 async fn send_batch_to_topic(
77 &self,
78 events: &[Arc<Event>],
79 _topic: &str,
80 ) -> Result<(), SinkError> {
81 self.send_batch(events).await
82 }
83
84 async fn flush(&self) -> Result<(), SinkError>;
86
87 async fn close(&self) -> Result<(), SinkError>;
89}
90
91pub struct SinkConnectorAdapter {
93 name: String,
94 inner: tokio::sync::Mutex<Box<dyn crate::types::SinkConnector>>,
95}
96
97impl std::fmt::Debug for SinkConnectorAdapter {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("SinkConnectorAdapter")
100 .finish_non_exhaustive()
101 }
102}
103
104impl SinkConnectorAdapter {
105 pub fn new(name: &str, connector: Box<dyn crate::types::SinkConnector>) -> Self {
107 Self {
108 name: name.to_string(),
109 inner: tokio::sync::Mutex::new(connector),
110 }
111 }
112}
113
114#[async_trait]
115impl Sink for SinkConnectorAdapter {
116 fn name(&self) -> &str {
117 &self.name
118 }
119 async fn connect(&self) -> Result<(), SinkError> {
120 let mut inner = self.inner.lock().await;
121 inner.connect().await.map_err(SinkError::from)
122 }
123 async fn send(&self, event: &Event) -> Result<(), SinkError> {
124 let inner = self.inner.lock().await;
125 inner.send(event).await.map_err(SinkError::from)
126 }
127 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
128 let inner = self.inner.lock().await;
129 for event in events {
130 inner.send(event).await.map_err(SinkError::from)?;
131 }
132 Ok(())
133 }
134 async fn send_batch_to_topic(
135 &self,
136 events: &[Arc<Event>],
137 topic: &str,
138 ) -> Result<(), SinkError> {
139 let inner = self.inner.lock().await;
140 inner
141 .send_to_topic(events, topic)
142 .await
143 .map_err(SinkError::from)
144 }
145 async fn flush(&self) -> Result<(), SinkError> {
146 let inner = self.inner.lock().await;
147 inner.flush().await.map_err(SinkError::from)
148 }
149 async fn close(&self) -> Result<(), SinkError> {
150 let inner = self.inner.lock().await;
151 inner.close().await.map_err(SinkError::from)
152 }
153}