use async_trait::async_trait;
use indexmap::IndexMap;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use varpulis_core::Event;
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ConnectorConfig {
pub connector_type: String,
pub url: String,
pub topic: Option<String>,
pub properties: IndexMap<String, String>,
}
impl ConnectorConfig {
pub fn new(connector_type: &str, url: &str) -> Self {
Self {
connector_type: connector_type.to_string(),
url: url.to_string(),
topic: None,
properties: IndexMap::new(),
}
}
pub fn with_topic(mut self, topic: &str) -> Self {
self.topic = Some(topic.to_string());
self
}
pub fn with_property(mut self, key: &str, value: &str) -> Self {
self.properties.insert(key.to_string(), value.to_string());
self
}
}
#[async_trait]
pub trait SourceConnector: Send + Sync {
fn name(&self) -> &str;
async fn start(&mut self, tx: mpsc::Sender<Event>) -> Result<(), ConnectorError>;
async fn stop(&mut self) -> Result<(), ConnectorError>;
fn is_running(&self) -> bool;
fn health_check(&self) -> ConnectorHealth {
if self.is_running() {
ConnectorHealth::healthy(0)
} else {
ConnectorHealth::unhealthy("not running")
}
}
}
#[async_trait]
pub trait SinkConnector: Send + Sync {
fn name(&self) -> &str;
async fn connect(&mut self) -> Result<(), ConnectorError> {
Ok(())
}
async fn send(&self, event: &Event) -> Result<(), ConnectorError>;
async fn send_to_topic(
&self,
events: &[std::sync::Arc<Event>],
_topic: &str,
) -> Result<(), ConnectorError> {
for event in events {
self.send(event).await?;
}
Ok(())
}
async fn flush(&self) -> Result<(), ConnectorError>;
async fn close(&self) -> Result<(), ConnectorError>;
fn health_check(&self) -> ConnectorHealth {
ConnectorHealth::healthy(0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ConnectorHealth {
pub healthy: bool,
pub message: String,
pub events_processed: u64,
pub errors: u64,
}
impl ConnectorHealth {
pub fn healthy(events_processed: u64) -> Self {
Self {
healthy: true,
message: "ok".to_string(),
events_processed,
errors: 0,
}
}
pub fn unhealthy(message: impl Into<String>) -> Self {
Self {
healthy: false,
message: message.into(),
events_processed: 0,
errors: 0,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ConnectorError {
#[error("Connection failed: {0}")]
ConnectionFailed(String),
#[error("Send failed: {0}")]
SendFailed(String),
#[error("Receive failed: {0}")]
ReceiveFailed(String),
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Not connected")]
NotConnected,
#[error("Connector not available: {0}")]
NotAvailable(String),
}