Skip to main content

varpulis_connectors/
sink.rs

1//! Sink trait and error types for outputting processed events
2//!
3//! This module defines the core `Sink` trait that all event output destinations
4//! must implement, along with the `SinkError` error type.
5
6use crate::types::ConnectorError;
7use async_trait::async_trait;
8use std::sync::Arc;
9use varpulis_core::Event;
10
11/// Errors produced by sink operations.
12#[derive(Debug, thiserror::Error)]
13pub enum SinkError {
14    /// I/O error (file writes, network, etc.)
15    #[error("I/O error: {0}")]
16    Io(#[from] std::io::Error),
17
18    /// Serialization error (JSON encoding)
19    #[error("serialization error: {0}")]
20    Serialization(#[from] serde_json::Error),
21
22    /// HTTP request error
23    #[error("HTTP error: {0}")]
24    Http(#[from] reqwest::Error),
25
26    /// Connector-level error
27    #[error("connector error: {0}")]
28    Connector(#[from] ConnectorError),
29
30    /// Generic error with message
31    #[error("{0}")]
32    Other(String),
33}
34
35impl SinkError {
36    /// Create a generic error from a displayable value.
37    pub fn other(msg: impl std::fmt::Display) -> Self {
38        Self::Other(msg.to_string())
39    }
40}
41
42/// Trait for event sinks
43#[async_trait]
44pub trait Sink: Send + Sync {
45    /// Name of this sink
46    fn name(&self) -> &str;
47
48    /// Establish connection to the external system.
49    ///
50    /// Called once after sink creation to establish any necessary connections.
51    /// The default implementation is a no-op for sinks that connect eagerly.
52    async fn connect(&self) -> Result<(), SinkError> {
53        Ok(())
54    }
55
56    /// Send an event to this sink
57    async fn send(&self, event: &Event) -> Result<(), SinkError>;
58
59    /// Send a batch of events to this sink.
60    ///
61    /// Default implementation calls `send()` for each event.
62    /// Connectors should override this to amortize lock/syscall overhead.
63    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
64        for event in events {
65            self.send(event).await?;
66        }
67        Ok(())
68    }
69
70    /// Flush any buffered data
71    async fn flush(&self) -> Result<(), SinkError>;
72
73    /// Close the sink
74    async fn close(&self) -> Result<(), SinkError>;
75}
76
77/// Adapter: wraps a SinkConnector as a Sink for use in the sink registry.
78pub struct SinkConnectorAdapter {
79    name: String,
80    inner: tokio::sync::Mutex<Box<dyn crate::types::SinkConnector>>,
81}
82
83impl std::fmt::Debug for SinkConnectorAdapter {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("SinkConnectorAdapter")
86            .finish_non_exhaustive()
87    }
88}
89
90impl SinkConnectorAdapter {
91    /// Create a new adapter wrapping a SinkConnector.
92    pub fn new(name: &str, connector: Box<dyn crate::types::SinkConnector>) -> Self {
93        Self {
94            name: name.to_string(),
95            inner: tokio::sync::Mutex::new(connector),
96        }
97    }
98}
99
100#[async_trait]
101impl Sink for SinkConnectorAdapter {
102    fn name(&self) -> &str {
103        &self.name
104    }
105    async fn connect(&self) -> Result<(), SinkError> {
106        let mut inner = self.inner.lock().await;
107        inner.connect().await.map_err(SinkError::from)
108    }
109    async fn send(&self, event: &Event) -> Result<(), SinkError> {
110        let inner = self.inner.lock().await;
111        inner.send(event).await.map_err(SinkError::from)
112    }
113    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
114        let inner = self.inner.lock().await;
115        for event in events {
116            inner.send(event).await.map_err(SinkError::from)?;
117        }
118        Ok(())
119    }
120    async fn flush(&self) -> Result<(), SinkError> {
121        let inner = self.inner.lock().await;
122        inner.flush().await.map_err(SinkError::from)
123    }
124    async fn close(&self) -> Result<(), SinkError> {
125        let inner = self.inner.lock().await;
126        inner.close().await.map_err(SinkError::from)
127    }
128}