wp_connector_api/runtime/source/
factory.rs1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::path::PathBuf;
4
5use super::types::{CtrlRx, DataSource, Tags};
6use crate::{SourceDefProvider, SourceResult, types::ParamMap};
7
8#[async_trait]
9pub trait ServiceAcceptor: Send {
10 async fn accept_connection(&mut self, ctrl_rx: CtrlRx) -> SourceResult<()>;
12}
13
14#[derive(Clone, Debug)]
15pub struct SourceBuildCtx {
16 pub work_root: PathBuf,
17}
18
19impl SourceBuildCtx {
20 pub fn new(work_root: PathBuf) -> Self {
21 Self { work_root }
22 }
23}
24
25#[derive(Clone, Debug)]
27pub struct SourceMeta {
28 pub name: String,
29 pub kind: String,
30 pub tags: Tags,
31}
32
33impl SourceMeta {
34 pub fn new(name: impl Into<String>, kind: impl Into<String>) -> Self {
35 Self {
36 name: name.into(),
37 kind: kind.into(),
38 tags: Tags::default(),
39 }
40 }
41}
42
43pub struct SourceHandle {
45 pub source: Box<dyn DataSource + 'static>,
46 pub metadata: SourceMeta,
47}
48
49impl std::fmt::Debug for SourceHandle {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("SourceHandle")
52 .field("source", &"Box<dyn DataSource>")
53 .field("metadata", &self.metadata)
54 .finish()
55 }
56}
57
58impl SourceHandle {
59 pub fn new(source: Box<dyn DataSource + 'static>, metadata: SourceMeta) -> Self {
60 Self { source, metadata }
61 }
62}
63
64pub struct AcceptorHandle {
66 pub name: String,
67 pub acceptor: Box<dyn ServiceAcceptor + Send>,
68}
69
70impl std::fmt::Debug for AcceptorHandle {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("AcceptorHandle")
73 .field("name", &self.name)
74 .field("acceptor", &"Box<dyn ServiceAcceptor>")
75 .finish()
76 }
77}
78
79impl AcceptorHandle {
80 pub fn new(name: impl Into<String>, acceptor: Box<dyn ServiceAcceptor + Send>) -> Self {
81 Self {
82 name: name.into(),
83 acceptor,
84 }
85 }
86}
87
88#[derive(Default)]
90pub struct SourceSvcIns {
91 pub sources: Vec<SourceHandle>,
92 pub acceptor: Option<AcceptorHandle>,
93}
94
95impl std::fmt::Debug for SourceSvcIns {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("SourceSvcIns")
98 .field("sources", &format!("len={}", self.sources.len()))
99 .field(
100 "acceptor",
101 if self.acceptor.is_some() {
102 &"Some(AcceptorHandle)"
103 } else {
104 &"None"
105 },
106 )
107 .finish()
108 }
109}
110
111impl SourceSvcIns {
112 pub fn new() -> Self {
113 Self::default()
114 }
115
116 pub fn with_sources(mut self, sources: Vec<SourceHandle>) -> Self {
117 self.sources = sources;
118 self
119 }
120
121 pub fn push_source(&mut self, instance: SourceHandle) {
122 self.sources.push(instance);
123 }
124
125 pub fn with_acceptor(mut self, acceptor: AcceptorHandle) -> Self {
126 self.acceptor = Some(acceptor);
127 self
128 }
129}
130
131#[derive(Deserialize, Serialize, Clone, Debug)]
133pub struct ResolvedSourceSpec {
134 pub name: String,
135 pub kind: String,
136 pub connector_id: String,
137 #[serde(default)]
138 pub params: ParamMap,
139 #[serde(default)]
141 pub tags: Vec<String>,
142}
143
144#[async_trait]
145pub trait SourceFactory: SourceDefProvider + Send + Sync + 'static {
146 fn kind(&self) -> &'static str;
147 fn validate_spec(&self, _spec: &ResolvedSourceSpec) -> SourceResult<()> {
149 Ok(())
150 }
151 async fn build(
152 &self,
153 spec: &ResolvedSourceSpec,
154 ctx: &SourceBuildCtx,
155 ) -> SourceResult<SourceSvcIns>;
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::runtime::source::SourceBatch;
162 use async_trait::async_trait;
163 use serde_json::json;
164 use std::path::PathBuf;
165
166 #[derive(Default)]
167 struct DummySource {
168 id: &'static str,
169 }
170
171 #[async_trait]
172 impl DataSource for DummySource {
173 async fn receive(&mut self) -> SourceResult<SourceBatch> {
174 Ok(Vec::new())
175 }
176
177 fn try_receive(&mut self) -> Option<SourceBatch> {
178 None
179 }
180
181 fn identifier(&self) -> String {
182 self.id.to_string()
183 }
184 }
185
186 #[derive(Default)]
187 struct DummyAcceptor;
188
189 #[async_trait]
190 impl ServiceAcceptor for DummyAcceptor {
191 async fn accept_connection(&mut self, _ctrl_rx: CtrlRx) -> SourceResult<()> {
192 Ok(())
193 }
194 }
195
196 fn make_source_handle(id: &'static str) -> SourceHandle {
197 SourceHandle::new(Box::new(DummySource { id }), SourceMeta::new(id, "dummy"))
198 }
199
200 #[test]
201 fn source_build_ctx_and_meta_helpers() {
202 let ctx = SourceBuildCtx::new(PathBuf::from("/tmp/source"));
203 assert_eq!(ctx.work_root, PathBuf::from("/tmp/source"));
204
205 let meta = SourceMeta::new("orders", "http");
206 assert_eq!(meta.name, "orders");
207 assert_eq!(meta.kind, "http");
208 assert_eq!(meta.tags.len(), 0);
209 }
210
211 #[test]
212 fn handle_constructors_store_inner_state() {
213 let meta = SourceMeta::new("alpha", "kafka");
214 let handle = SourceHandle::new(Box::new(DummySource { id: "alpha" }), meta.clone());
215 assert_eq!(handle.metadata.name, meta.name);
216 assert_eq!(handle.source.identifier(), "alpha");
217
218 let acceptor = AcceptorHandle::new("http", Box::new(DummyAcceptor));
219 assert_eq!(acceptor.name, "http");
220 }
221
222 #[test]
223 fn source_svc_ins_builders_manage_members() {
224 let mut svc = SourceSvcIns::new().with_sources(vec![make_source_handle("a")]);
225 assert_eq!(svc.sources.len(), 1);
226
227 svc.push_source(make_source_handle("b"));
228 assert_eq!(svc.sources.len(), 2);
229
230 let svc = svc.with_acceptor(AcceptorHandle::new("svc", Box::new(DummyAcceptor)));
231 assert!(svc.acceptor.is_some());
232 }
233
234 #[test]
235 fn resolved_source_spec_defaults_optional_fields() {
236 let spec: ResolvedSourceSpec = serde_json::from_value(json!({
237 "name": "demo",
238 "kind": "http",
239 "connector_id": "conn-1"
240 }))
241 .unwrap();
242
243 assert!(spec.params.is_empty());
244 assert!(spec.tags.is_empty());
245 }
246}