varpulis_connector_api/
sink.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use varpulis_core::Event;
7
8use crate::types::ConnectorError;
9
10#[derive(Debug, thiserror::Error)]
12pub enum SinkError {
13 #[error("I/O error: {0}")]
15 Io(#[from] std::io::Error),
16
17 #[error("serialization error: {0}")]
19 Serialization(#[from] serde_json::Error),
20
21 #[error("HTTP error: {0}")]
23 Http(#[from] reqwest::Error),
24
25 #[error("connector error: {0}")]
27 Connector(#[from] ConnectorError),
28
29 #[error("{0}")]
31 Other(String),
32}
33
34impl SinkError {
35 pub fn other(msg: impl std::fmt::Display) -> Self {
37 Self::Other(msg.to_string())
38 }
39}
40
41#[async_trait]
43pub trait Sink: Send + Sync {
44 fn name(&self) -> &str;
46
47 async fn connect(&self) -> Result<(), SinkError> {
49 Ok(())
50 }
51
52 async fn send(&self, event: &Event) -> Result<(), SinkError>;
54
55 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
57 for event in events {
58 self.send(event).await?;
59 }
60 Ok(())
61 }
62
63 async fn send_batch_to_topic(
65 &self,
66 events: &[Arc<Event>],
67 _topic: &str,
68 ) -> Result<(), SinkError> {
69 self.send_batch(events).await
70 }
71
72 async fn flush(&self) -> Result<(), SinkError>;
74
75 async fn close(&self) -> Result<(), SinkError>;
77}
78
79pub struct SinkConnectorAdapter {
81 pub name: String,
83 pub inner: tokio::sync::Mutex<Box<dyn crate::types::SinkConnector>>,
85}
86
87impl std::fmt::Debug for SinkConnectorAdapter {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("SinkConnectorAdapter")
90 .finish_non_exhaustive()
91 }
92}
93
94impl SinkConnectorAdapter {
95 pub fn new(name: &str, connector: Box<dyn crate::types::SinkConnector>) -> Self {
97 Self {
98 name: name.to_string(),
99 inner: tokio::sync::Mutex::new(connector),
100 }
101 }
102}
103
104#[async_trait]
105impl Sink for SinkConnectorAdapter {
106 fn name(&self) -> &str {
107 &self.name
108 }
109 async fn connect(&self) -> Result<(), SinkError> {
110 let mut inner = self.inner.lock().await;
111 inner.connect().await.map_err(SinkError::from)
112 }
113 async fn send(&self, event: &Event) -> Result<(), SinkError> {
114 let inner = self.inner.lock().await;
115 inner.send(event).await.map_err(SinkError::from)
116 }
117 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
118 let inner = self.inner.lock().await;
119 for event in events {
120 inner.send(event).await.map_err(SinkError::from)?;
121 }
122 Ok(())
123 }
124 async fn send_batch_to_topic(
125 &self,
126 events: &[Arc<Event>],
127 topic: &str,
128 ) -> Result<(), SinkError> {
129 let inner = self.inner.lock().await;
130 inner
131 .send_to_topic(events, topic)
132 .await
133 .map_err(SinkError::from)
134 }
135 async fn flush(&self) -> Result<(), SinkError> {
136 let inner = self.inner.lock().await;
137 inner.flush().await.map_err(SinkError::from)
138 }
139 async fn close(&self) -> Result<(), SinkError> {
140 let inner = self.inner.lock().await;
141 inner.close().await.map_err(SinkError::from)
142 }
143}