use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct SourceConfig {
pub max_concurrent_requests: usize,
pub batching_enabled: bool,
pub batch_size: usize,
pub batch_timeout_ms: u64,
pub supports_streaming: bool,
pub supports_cursor: bool,
pub timeout_ms: u64,
}
impl Default for SourceConfig {
fn default() -> Self {
Self {
max_concurrent_requests: 1000,
batching_enabled: false,
batch_size: 100,
batch_timeout_ms: 10,
supports_streaming: false,
supports_cursor: false,
timeout_ms: 30000, }
}
}
impl SourceConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_batching(mut self, size: usize) -> Self {
self.batching_enabled = true;
self.batch_size = size;
self
}
pub fn with_batch_timeout(mut self, timeout_ms: u64) -> Self {
self.batch_timeout_ms = timeout_ms;
self
}
pub fn with_streaming(mut self) -> Self {
self.supports_streaming = true;
self
}
pub fn with_cursor_support(mut self) -> Self {
self.supports_cursor = true;
self
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = timeout_ms;
self
}
pub fn with_max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent_requests = max;
self
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FetchRequest {
#[serde(default)]
pub query: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cursor: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<usize>,
#[serde(default)]
pub context: HashMap<String, String>,
}
impl FetchRequest {
pub fn new() -> Self {
Self::default()
}
pub fn query(mut self, key: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
self.query.insert(key.into(), value.into());
self
}
pub fn cursor(mut self, cursor: impl Into<String>) -> Self {
self.cursor = Some(cursor.into());
self
}
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FetchResponse<T> {
pub items: Vec<T>,
#[serde(skip_serializing_if = "Option::is_none")]
pub next_cursor: Option<String>,
#[serde(default)]
pub has_more: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_count: Option<usize>,
#[serde(default)]
pub metadata: HashMap<String, String>,
}
impl<T> FetchResponse<T> {
pub fn items(items: Vec<T>) -> Self {
Self {
items,
next_cursor: None,
has_more: false,
total_count: None,
metadata: HashMap::new(),
}
}
pub fn empty() -> Self {
Self::items(vec![])
}
pub fn with_cursor(mut self, cursor: impl Into<String>) -> Self {
self.next_cursor = Some(cursor.into());
self.has_more = true;
self
}
pub fn with_has_more(mut self, has_more: bool) -> Self {
self.has_more = has_more;
self
}
pub fn with_total(mut self, total: usize) -> Self {
self.total_count = Some(total);
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}
impl<T> Default for FetchResponse<T> {
fn default() -> Self {
Self::empty()
}
}
#[async_trait]
pub trait SourceConnector: Send + Sync {
type Item: Send + Serialize + for<'de> Deserialize<'de>;
async fn fetch(&self, request: FetchRequest) -> FetchResponse<Self::Item>;
fn source_config(&self) -> &SourceConfig;
fn source_metadata(&self) -> HashMap<String, String> {
let config = self.source_config();
let mut meta = HashMap::new();
meta.insert(
"supports_streaming".to_string(),
config.supports_streaming.to_string(),
);
meta.insert(
"supports_cursor".to_string(),
config.supports_cursor.to_string(),
);
meta.insert("max_batch_size".to_string(), config.batch_size.to_string());
meta.insert(
"batching_enabled".to_string(),
config.batching_enabled.to_string(),
);
meta.insert("timeout_ms".to_string(), config.timeout_ms.to_string());
meta
}
fn timeout_ms(&self) -> u64 {
self.source_config().timeout_ms
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_source_config_builder() {
let config = SourceConfig::new()
.with_batching(500)
.with_streaming()
.with_cursor_support()
.with_timeout(60000);
assert!(config.batching_enabled);
assert_eq!(config.batch_size, 500);
assert!(config.supports_streaming);
assert!(config.supports_cursor);
assert_eq!(config.timeout_ms, 60000);
}
#[test]
fn test_fetch_request_builder() {
let req = FetchRequest::new()
.query("filter", "active")
.cursor("page-2")
.limit(50);
assert_eq!(req.query.get("filter").unwrap(), "active");
assert_eq!(req.cursor, Some("page-2".to_string()));
assert_eq!(req.limit, Some(50));
}
#[test]
fn test_fetch_response_builder() {
let resp: FetchResponse<String> =
FetchResponse::items(vec!["a".to_string(), "b".to_string()])
.with_cursor("next-token")
.with_total(100)
.with_metadata("source", "database");
assert_eq!(resp.items.len(), 2);
assert_eq!(resp.next_cursor, Some("next-token".to_string()));
assert!(resp.has_more);
assert_eq!(resp.total_count, Some(100));
assert_eq!(resp.metadata.get("source").unwrap(), "database");
}
}