use std::sync::Arc;
use async_trait::async_trait;
use varpulis_core::Event;
use crate::types::ConnectorError;
#[derive(Debug, thiserror::Error)]
pub enum SinkError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("connector error: {0}")]
Connector(#[from] ConnectorError),
#[error("{0}")]
Other(String),
}
impl SinkError {
pub fn other(msg: impl std::fmt::Display) -> Self {
Self::Other(msg.to_string())
}
}
#[async_trait]
pub trait Sink: Send + Sync {
fn name(&self) -> &str;
async fn connect(&self) -> Result<(), SinkError> {
Ok(())
}
async fn send(&self, event: &Event) -> Result<(), SinkError>;
async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
for event in events {
self.send(event).await?;
}
Ok(())
}
async fn send_batch_to_topic(
&self,
events: &[Arc<Event>],
_topic: &str,
) -> Result<(), SinkError> {
self.send_batch(events).await
}
async fn flush(&self) -> Result<(), SinkError>;
async fn close(&self) -> Result<(), SinkError>;
}
pub struct SinkConnectorAdapter {
pub name: String,
pub inner: tokio::sync::Mutex<Box<dyn crate::types::SinkConnector>>,
}
impl std::fmt::Debug for SinkConnectorAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SinkConnectorAdapter")
.finish_non_exhaustive()
}
}
impl SinkConnectorAdapter {
pub fn new(name: &str, connector: Box<dyn crate::types::SinkConnector>) -> Self {
Self {
name: name.to_string(),
inner: tokio::sync::Mutex::new(connector),
}
}
}
#[async_trait]
impl Sink for SinkConnectorAdapter {
fn name(&self) -> &str {
&self.name
}
async fn connect(&self) -> Result<(), SinkError> {
let mut inner = self.inner.lock().await;
inner.connect().await.map_err(SinkError::from)
}
async fn send(&self, event: &Event) -> Result<(), SinkError> {
let inner = self.inner.lock().await;
inner.send(event).await.map_err(SinkError::from)
}
async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
let inner = self.inner.lock().await;
for event in events {
inner.send(event).await.map_err(SinkError::from)?;
}
Ok(())
}
async fn send_batch_to_topic(
&self,
events: &[Arc<Event>],
topic: &str,
) -> Result<(), SinkError> {
let inner = self.inner.lock().await;
inner
.send_to_topic(events, topic)
.await
.map_err(SinkError::from)
}
async fn flush(&self) -> Result<(), SinkError> {
let inner = self.inner.lock().await;
inner.flush().await.map_err(SinkError::from)
}
async fn close(&self) -> Result<(), SinkError> {
let inner = self.inner.lock().await;
inner.close().await.map_err(SinkError::from)
}
}