use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use super::types::{CtrlRx, DataSource, Tags};
use crate::{SourceDefProvider, SourceResult, types::ParamMap};
#[async_trait]
pub trait ServiceAcceptor: Send {
async fn accept_connection(&mut self, ctrl_rx: CtrlRx) -> SourceResult<()>;
}
#[derive(Clone, Debug)]
pub struct SourceBuildCtx {
pub work_root: PathBuf,
}
impl SourceBuildCtx {
pub fn new(work_root: PathBuf) -> Self {
Self { work_root }
}
}
#[derive(Clone, Debug)]
pub struct SourceMeta {
pub name: String,
pub kind: String,
pub tags: Tags,
}
impl SourceMeta {
pub fn new(name: impl Into<String>, kind: impl Into<String>) -> Self {
Self {
name: name.into(),
kind: kind.into(),
tags: Tags::default(),
}
}
}
pub struct SourceHandle {
pub source: Box<dyn DataSource + 'static>,
pub metadata: SourceMeta,
}
impl std::fmt::Debug for SourceHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SourceHandle")
.field("source", &"Box<dyn DataSource>")
.field("metadata", &self.metadata)
.finish()
}
}
impl SourceHandle {
pub fn new(source: Box<dyn DataSource + 'static>, metadata: SourceMeta) -> Self {
Self { source, metadata }
}
}
pub struct AcceptorHandle {
pub name: String,
pub acceptor: Box<dyn ServiceAcceptor + Send>,
}
impl std::fmt::Debug for AcceptorHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AcceptorHandle")
.field("name", &self.name)
.field("acceptor", &"Box<dyn ServiceAcceptor>")
.finish()
}
}
impl AcceptorHandle {
pub fn new(name: impl Into<String>, acceptor: Box<dyn ServiceAcceptor + Send>) -> Self {
Self {
name: name.into(),
acceptor,
}
}
}
#[derive(Default)]
pub struct SourceSvcIns {
pub sources: Vec<SourceHandle>,
pub acceptor: Option<AcceptorHandle>,
}
impl std::fmt::Debug for SourceSvcIns {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SourceSvcIns")
.field("sources", &format!("len={}", self.sources.len()))
.field(
"acceptor",
if self.acceptor.is_some() {
&"Some(AcceptorHandle)"
} else {
&"None"
},
)
.finish()
}
}
impl SourceSvcIns {
pub fn new() -> Self {
Self::default()
}
pub fn with_sources(mut self, sources: Vec<SourceHandle>) -> Self {
self.sources = sources;
self
}
pub fn push_source(&mut self, instance: SourceHandle) {
self.sources.push(instance);
}
pub fn with_acceptor(mut self, acceptor: AcceptorHandle) -> Self {
self.acceptor = Some(acceptor);
self
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct ResolvedSourceSpec {
pub name: String,
pub kind: String,
pub connector_id: String,
#[serde(default)]
pub params: ParamMap,
#[serde(default)]
pub tags: Vec<String>,
}
#[async_trait]
pub trait SourceFactory: SourceDefProvider + Send + Sync + 'static {
fn kind(&self) -> &'static str;
fn validate_spec(&self, _spec: &ResolvedSourceSpec) -> SourceResult<()> {
Ok(())
}
async fn build(
&self,
spec: &ResolvedSourceSpec,
ctx: &SourceBuildCtx,
) -> SourceResult<SourceSvcIns>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::source::SourceBatch;
use async_trait::async_trait;
use serde_json::json;
use std::path::PathBuf;
#[derive(Default)]
struct DummySource {
id: &'static str,
}
#[async_trait]
impl DataSource for DummySource {
async fn receive(&mut self) -> SourceResult<SourceBatch> {
Ok(Vec::new())
}
fn try_receive(&mut self) -> Option<SourceBatch> {
None
}
fn identifier(&self) -> String {
self.id.to_string()
}
}
#[derive(Default)]
struct DummyAcceptor;
#[async_trait]
impl ServiceAcceptor for DummyAcceptor {
async fn accept_connection(&mut self, _ctrl_rx: CtrlRx) -> SourceResult<()> {
Ok(())
}
}
fn make_source_handle(id: &'static str) -> SourceHandle {
SourceHandle::new(Box::new(DummySource { id }), SourceMeta::new(id, "dummy"))
}
#[test]
fn source_build_ctx_and_meta_helpers() {
let ctx = SourceBuildCtx::new(PathBuf::from("/tmp/source"));
assert_eq!(ctx.work_root, PathBuf::from("/tmp/source"));
let meta = SourceMeta::new("orders", "http");
assert_eq!(meta.name, "orders");
assert_eq!(meta.kind, "http");
assert_eq!(meta.tags.len(), 0);
}
#[test]
fn handle_constructors_store_inner_state() {
let meta = SourceMeta::new("alpha", "kafka");
let handle = SourceHandle::new(Box::new(DummySource { id: "alpha" }), meta.clone());
assert_eq!(handle.metadata.name, meta.name);
assert_eq!(handle.source.identifier(), "alpha");
let acceptor = AcceptorHandle::new("http", Box::new(DummyAcceptor));
assert_eq!(acceptor.name, "http");
}
#[test]
fn source_svc_ins_builders_manage_members() {
let mut svc = SourceSvcIns::new().with_sources(vec![make_source_handle("a")]);
assert_eq!(svc.sources.len(), 1);
svc.push_source(make_source_handle("b"));
assert_eq!(svc.sources.len(), 2);
let svc = svc.with_acceptor(AcceptorHandle::new("svc", Box::new(DummyAcceptor)));
assert!(svc.acceptor.is_some());
}
#[test]
fn resolved_source_spec_defaults_optional_fields() {
let spec: ResolvedSourceSpec = serde_json::from_value(json!({
"name": "demo",
"kind": "http",
"connector_id": "conn-1"
}))
.unwrap();
assert!(spec.params.is_empty());
assert!(spec.tags.is_empty());
}
}