wp_connector_api/runtime/source/
factory.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::path::PathBuf;
4
5use super::types::{CtrlRx, DataSource, Tags};
6use crate::{SourceDefProvider, SourceResult, types::ParamMap};
7
8#[async_trait]
9pub trait ServiceAcceptor: Send {
10    /// 接受连接/启动服务端式源,处理外部控制事件
11    async fn accept_connection(&mut self, ctrl_rx: CtrlRx) -> SourceResult<()>;
12}
13
14#[derive(Clone, Debug)]
15pub struct SourceBuildCtx {
16    pub work_root: PathBuf,
17}
18
19impl SourceBuildCtx {
20    pub fn new(work_root: PathBuf) -> Self {
21        Self { work_root }
22    }
23}
24
25/// 数据源元信息,供 orchestrator/调度层用于统计与展示。
26#[derive(Clone, Debug)]
27pub struct SourceMeta {
28    pub name: String,
29    pub kind: String,
30    pub tags: Tags,
31}
32
33impl SourceMeta {
34    pub fn new(name: impl Into<String>, kind: impl Into<String>) -> Self {
35        Self {
36            name: name.into(),
37            kind: kind.into(),
38            tags: Tags::default(),
39        }
40    }
41}
42
43/// 单个可注册的数据源实例。
44pub struct SourceHandle {
45    pub source: Box<dyn DataSource + 'static>,
46    pub metadata: SourceMeta,
47}
48
49impl std::fmt::Debug for SourceHandle {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("SourceHandle")
52            .field("source", &"Box<dyn DataSource>")
53            .field("metadata", &self.metadata)
54            .finish()
55    }
56}
57
58impl SourceHandle {
59    pub fn new(source: Box<dyn DataSource + 'static>, metadata: SourceMeta) -> Self {
60        Self { source, metadata }
61    }
62}
63
64/// 包含 acceptor 具体实例及可读名称。
65pub struct AcceptorHandle {
66    pub name: String,
67    pub acceptor: Box<dyn ServiceAcceptor + Send>,
68}
69
70impl std::fmt::Debug for AcceptorHandle {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("AcceptorHandle")
73            .field("name", &self.name)
74            .field("acceptor", &"Box<dyn ServiceAcceptor>")
75            .finish()
76    }
77}
78
79impl AcceptorHandle {
80    pub fn new(name: impl Into<String>, acceptor: Box<dyn ServiceAcceptor + Send>) -> Self {
81        Self {
82            name: name.into(),
83            acceptor,
84        }
85    }
86}
87
88/// SourceFactory::build 的统一返回结构。
89#[derive(Default)]
90pub struct SourceSvcIns {
91    pub sources: Vec<SourceHandle>,
92    pub acceptor: Option<AcceptorHandle>,
93}
94
95impl std::fmt::Debug for SourceSvcIns {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("SourceSvcIns")
98            .field("sources", &format!("len={}", self.sources.len()))
99            .field(
100                "acceptor",
101                if self.acceptor.is_some() {
102                    &"Some(AcceptorHandle)"
103                } else {
104                    &"None"
105                },
106            )
107            .finish()
108    }
109}
110
111impl SourceSvcIns {
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    pub fn with_sources(mut self, sources: Vec<SourceHandle>) -> Self {
117        self.sources = sources;
118        self
119    }
120
121    pub fn push_source(&mut self, instance: SourceHandle) {
122        self.sources.push(instance);
123    }
124
125    pub fn with_acceptor(mut self, acceptor: AcceptorHandle) -> Self {
126        self.acceptor = Some(acceptor);
127        self
128    }
129}
130
131/// ResolvedSourceSpec:统一 Factory 构建使用的规格(包含 connector_id,参数一律扁平)。
132#[derive(Deserialize, Serialize, Clone, Debug)]
133pub struct ResolvedSourceSpec {
134    pub name: String,
135    pub kind: String,
136    pub connector_id: String,
137    #[serde(default)]
138    pub params: ParamMap,
139    /// Optional tags propagated from CoreSpec/config. Keep here to ease adapters.
140    #[serde(default)]
141    pub tags: Vec<String>,
142}
143
144#[async_trait]
145pub trait SourceFactory: SourceDefProvider + Send + Sync + 'static {
146    fn kind(&self) -> &'static str;
147    /// 可选:轻量级参数校验(不产生 I/O),用于尽早暴露参数错误。
148    fn validate_spec(&self, _spec: &ResolvedSourceSpec) -> SourceResult<()> {
149        Ok(())
150    }
151    async fn build(
152        &self,
153        spec: &ResolvedSourceSpec,
154        ctx: &SourceBuildCtx,
155    ) -> SourceResult<SourceSvcIns>;
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::runtime::source::SourceBatch;
162    use async_trait::async_trait;
163    use serde_json::json;
164    use std::path::PathBuf;
165
166    #[derive(Default)]
167    struct DummySource {
168        id: &'static str,
169    }
170
171    #[async_trait]
172    impl DataSource for DummySource {
173        async fn receive(&mut self) -> SourceResult<SourceBatch> {
174            Ok(Vec::new())
175        }
176
177        fn try_receive(&mut self) -> Option<SourceBatch> {
178            None
179        }
180
181        fn identifier(&self) -> String {
182            self.id.to_string()
183        }
184    }
185
186    #[derive(Default)]
187    struct DummyAcceptor;
188
189    #[async_trait]
190    impl ServiceAcceptor for DummyAcceptor {
191        async fn accept_connection(&mut self, _ctrl_rx: CtrlRx) -> SourceResult<()> {
192            Ok(())
193        }
194    }
195
196    fn make_source_handle(id: &'static str) -> SourceHandle {
197        SourceHandle::new(Box::new(DummySource { id }), SourceMeta::new(id, "dummy"))
198    }
199
200    #[test]
201    fn source_build_ctx_and_meta_helpers() {
202        let ctx = SourceBuildCtx::new(PathBuf::from("/tmp/source"));
203        assert_eq!(ctx.work_root, PathBuf::from("/tmp/source"));
204
205        let meta = SourceMeta::new("orders", "http");
206        assert_eq!(meta.name, "orders");
207        assert_eq!(meta.kind, "http");
208        assert_eq!(meta.tags.len(), 0);
209    }
210
211    #[test]
212    fn handle_constructors_store_inner_state() {
213        let meta = SourceMeta::new("alpha", "kafka");
214        let handle = SourceHandle::new(Box::new(DummySource { id: "alpha" }), meta.clone());
215        assert_eq!(handle.metadata.name, meta.name);
216        assert_eq!(handle.source.identifier(), "alpha");
217
218        let acceptor = AcceptorHandle::new("http", Box::new(DummyAcceptor));
219        assert_eq!(acceptor.name, "http");
220    }
221
222    #[test]
223    fn source_svc_ins_builders_manage_members() {
224        let mut svc = SourceSvcIns::new().with_sources(vec![make_source_handle("a")]);
225        assert_eq!(svc.sources.len(), 1);
226
227        svc.push_source(make_source_handle("b"));
228        assert_eq!(svc.sources.len(), 2);
229
230        let svc = svc.with_acceptor(AcceptorHandle::new("svc", Box::new(DummyAcceptor)));
231        assert!(svc.acceptor.is_some());
232    }
233
234    #[test]
235    fn resolved_source_spec_defaults_optional_fields() {
236        let spec: ResolvedSourceSpec = serde_json::from_value(json!({
237            "name": "demo",
238            "kind": "http",
239            "connector_id": "conn-1"
240        }))
241        .unwrap();
242
243        assert!(spec.params.is_empty());
244        assert!(spec.tags.is_empty());
245    }
246}