varpulis_connectors/
sink.rs1use crate::types::ConnectorError;
7use async_trait::async_trait;
8use std::sync::Arc;
9use varpulis_core::Event;
10
11#[derive(Debug, thiserror::Error)]
13pub enum SinkError {
14 #[error("I/O error: {0}")]
16 Io(#[from] std::io::Error),
17
18 #[error("serialization error: {0}")]
20 Serialization(#[from] serde_json::Error),
21
22 #[error("HTTP error: {0}")]
24 Http(#[from] reqwest::Error),
25
26 #[error("connector error: {0}")]
28 Connector(#[from] ConnectorError),
29
30 #[error("{0}")]
32 Other(String),
33}
34
35impl SinkError {
36 pub fn other(msg: impl std::fmt::Display) -> Self {
38 Self::Other(msg.to_string())
39 }
40}
41
42#[async_trait]
44pub trait Sink: Send + Sync {
45 fn name(&self) -> &str;
47
48 async fn connect(&self) -> Result<(), SinkError> {
53 Ok(())
54 }
55
56 async fn send(&self, event: &Event) -> Result<(), SinkError>;
58
59 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
64 for event in events {
65 self.send(event).await?;
66 }
67 Ok(())
68 }
69
70 async fn flush(&self) -> Result<(), SinkError>;
72
73 async fn close(&self) -> Result<(), SinkError>;
75}
76
77pub struct SinkConnectorAdapter {
79 name: String,
80 inner: tokio::sync::Mutex<Box<dyn crate::types::SinkConnector>>,
81}
82
83impl std::fmt::Debug for SinkConnectorAdapter {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("SinkConnectorAdapter")
86 .finish_non_exhaustive()
87 }
88}
89
90impl SinkConnectorAdapter {
91 pub fn new(name: &str, connector: Box<dyn crate::types::SinkConnector>) -> Self {
93 Self {
94 name: name.to_string(),
95 inner: tokio::sync::Mutex::new(connector),
96 }
97 }
98}
99
100#[async_trait]
101impl Sink for SinkConnectorAdapter {
102 fn name(&self) -> &str {
103 &self.name
104 }
105 async fn connect(&self) -> Result<(), SinkError> {
106 let mut inner = self.inner.lock().await;
107 inner.connect().await.map_err(SinkError::from)
108 }
109 async fn send(&self, event: &Event) -> Result<(), SinkError> {
110 let inner = self.inner.lock().await;
111 inner.send(event).await.map_err(SinkError::from)
112 }
113 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
114 let inner = self.inner.lock().await;
115 for event in events {
116 inner.send(event).await.map_err(SinkError::from)?;
117 }
118 Ok(())
119 }
120 async fn flush(&self) -> Result<(), SinkError> {
121 let inner = self.inner.lock().await;
122 inner.flush().await.map_err(SinkError::from)
123 }
124 async fn close(&self) -> Result<(), SinkError> {
125 let inner = self.inner.lock().await;
126 inner.close().await.map_err(SinkError::from)
127 }
128}