use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct SinkConfig {
pub max_concurrent_requests: usize,
pub batching_enabled: bool,
pub batch_size: usize,
pub batch_timeout_ms: u64,
pub is_idempotent: bool,
pub timeout_ms: u64,
}
impl Default for SinkConfig {
fn default() -> Self {
Self {
max_concurrent_requests: 1000,
batching_enabled: false,
batch_size: 1000,
batch_timeout_ms: 100,
is_idempotent: false,
timeout_ms: 5000, }
}
}
impl SinkConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_batching(mut self, size: usize) -> Self {
self.batching_enabled = true;
self.batch_size = size;
self
}
pub fn with_batch_timeout(mut self, timeout_ms: u64) -> Self {
self.batch_timeout_ms = timeout_ms;
self
}
pub fn idempotent(mut self) -> Self {
self.is_idempotent = true;
self
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
pub fn with_max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent_requests = max;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WriteResult {
pub success: bool,
pub items_written: usize,
pub items_failed: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub item_errors: Vec<ItemError>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ItemError {
pub index: usize,
pub error: String,
#[serde(default)]
pub retryable: bool,
}
impl WriteResult {
pub fn success(items_written: usize) -> Self {
Self {
success: true,
items_written,
items_failed: 0,
error: None,
item_errors: vec![],
metadata: HashMap::new(),
}
}
pub fn failure(error: impl Into<String>) -> Self {
Self {
success: false,
items_written: 0,
items_failed: 0,
error: Some(error.into()),
item_errors: vec![],
metadata: HashMap::new(),
}
}
pub fn partial(items_written: usize, items_failed: usize) -> Self {
Self {
success: items_failed == 0,
items_written,
items_failed,
error: None,
item_errors: vec![],
metadata: HashMap::new(),
}
}
pub fn with_item_error(
mut self,
index: usize,
error: impl Into<String>,
retryable: bool,
) -> Self {
self.item_errors.push(ItemError {
index,
error: error.into(),
retryable,
});
self.items_failed += 1;
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}
#[async_trait]
pub trait SinkConnector: Send + Sync {
type Item: Send + for<'de> Deserialize<'de>;
async fn write(&self, items: Vec<Self::Item>) -> WriteResult;
fn sink_config(&self) -> &SinkConfig;
async fn write_one(&self, item: Self::Item) -> WriteResult {
self.write(vec![item]).await
}
fn sink_metadata(&self) -> HashMap<String, String> {
let config = self.sink_config();
let mut meta = HashMap::new();
meta.insert(
"supports_batching".to_string(),
config.batching_enabled.to_string(),
);
meta.insert("max_batch_size".to_string(), config.batch_size.to_string());
meta.insert(
"is_idempotent".to_string(),
config.is_idempotent.to_string(),
);
meta.insert(
"batching_enabled".to_string(),
config.batching_enabled.to_string(),
);
meta.insert("timeout_ms".to_string(), config.timeout_ms.to_string());
meta
}
fn timeout_ms(&self) -> u64 {
self.sink_config().timeout_ms
}
}
pub trait IdempotentSink: SinkConnector {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sink_config_builder() {
let config = SinkConfig::new()
.with_batching(500)
.with_batch_timeout(200)
.idempotent()
.with_timeout(10000);
assert!(config.batching_enabled);
assert_eq!(config.batch_size, 500);
assert_eq!(config.batch_timeout_ms, 200);
assert!(config.is_idempotent);
assert_eq!(config.timeout_ms, 10000);
}
#[test]
fn test_write_result_success() {
let result = WriteResult::success(100);
assert!(result.success);
assert_eq!(result.items_written, 100);
assert_eq!(result.items_failed, 0);
assert!(result.error.is_none());
}
#[test]
fn test_write_result_failure() {
let result = WriteResult::failure("Connection failed");
assert!(!result.success);
assert_eq!(result.items_written, 0);
assert_eq!(result.error, Some("Connection failed".to_string()));
}
#[test]
fn test_write_result_partial() {
let result = WriteResult::partial(80, 20)
.with_item_error(5, "Duplicate key", true)
.with_item_error(10, "Invalid format", false);
assert!(!result.success);
assert_eq!(result.items_written, 80);
assert_eq!(result.items_failed, 22); assert_eq!(result.item_errors.len(), 2);
assert!(result.item_errors[0].retryable);
assert!(!result.item_errors[1].retryable);
}
}