1use async_trait::async_trait;
2
3use oversync_core::error::OversyncError;
4use oversync_core::traits::{OriginConnector, OriginFactory};
5
6use crate::PostgresConnector;
7use crate::clickhouse::{ClickHouseConfig, ClickHouseConnector};
8use crate::flight_sql::FlightSqlConnector;
9use crate::graphql::{GraphqlConfig, GraphqlConnector};
10use crate::http_source::{HttpSource, HttpSourceConfig};
11use crate::kafka_source::KafkaSourceConnector;
12use crate::mcp::{McpConfig, McpOriginConnector};
13use crate::mysql::MysqlConnector;
14use crate::surrealdb_source::{SurrealDbConnector, SurrealDbLiveConnector};
15use crate::trino::{TrinoConfig, TrinoConnector};
16
17pub struct PostgresOriginFactory;
18
19#[async_trait]
20impl OriginFactory for PostgresOriginFactory {
21 fn connector_type(&self) -> &str {
22 "postgres"
23 }
24
25 async fn create(
26 &self,
27 name: &str,
28 config: &serde_json::Value,
29 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
30 let dsn = config
31 .get("dsn")
32 .and_then(|v| v.as_str())
33 .ok_or_else(|| OversyncError::Config("postgres: missing 'dsn'".into()))?;
34
35 let connector = PostgresConnector::new(name, dsn).await?;
36 Ok(Box::new(connector))
37 }
38}
39
40pub struct HttpOriginFactory;
41
42#[async_trait]
43impl OriginFactory for HttpOriginFactory {
44 fn connector_type(&self) -> &str {
45 "http"
46 }
47
48 async fn create(
49 &self,
50 name: &str,
51 config: &serde_json::Value,
52 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
53 let http_config: HttpSourceConfig = serde_json::from_value(config.clone())
54 .map_err(|e| OversyncError::Config(format!("http source: {e}")))?;
55 Ok(Box::new(HttpSource::new(name, http_config)?))
56 }
57}
58
59pub struct MysqlOriginFactory;
60
61#[async_trait]
62impl OriginFactory for MysqlOriginFactory {
63 fn connector_type(&self) -> &str {
64 "mysql"
65 }
66
67 async fn create(
68 &self,
69 name: &str,
70 config: &serde_json::Value,
71 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
72 let dsn = config
73 .get("dsn")
74 .and_then(|v| v.as_str())
75 .ok_or_else(|| OversyncError::Config("mysql: missing 'dsn'".into()))?;
76
77 let connector = MysqlConnector::new(name, dsn).await?;
78 Ok(Box::new(connector))
79 }
80}
81
82pub struct FlightSqlOriginFactory;
83
84#[async_trait]
85impl OriginFactory for FlightSqlOriginFactory {
86 fn connector_type(&self) -> &str {
87 "flight-sql"
88 }
89
90 async fn create(
91 &self,
92 name: &str,
93 config: &serde_json::Value,
94 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
95 let dsn = config
96 .get("dsn")
97 .and_then(|v| v.as_str())
98 .ok_or_else(|| OversyncError::Config("flight-sql: missing 'dsn'".into()))?;
99
100 let connector = FlightSqlConnector::new(name, dsn)?;
101 Ok(Box::new(connector))
102 }
103}
104
105pub struct TrinoOriginFactory;
106
107#[async_trait]
108impl OriginFactory for TrinoOriginFactory {
109 fn connector_type(&self) -> &str {
110 "trino"
111 }
112
113 async fn create(
114 &self,
115 name: &str,
116 config: &serde_json::Value,
117 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
118 let trino_config: TrinoConfig = serde_json::from_value(config.clone())
119 .map_err(|e| OversyncError::Config(format!("trino: {e}")))?;
120 let connector = TrinoConnector::new(name, trino_config)?;
121 Ok(Box::new(connector))
122 }
123}
124
125pub struct ClickHouseOriginFactory;
126
127#[async_trait]
128impl OriginFactory for ClickHouseOriginFactory {
129 fn connector_type(&self) -> &str {
130 "clickhouse"
131 }
132
133 async fn create(
134 &self,
135 name: &str,
136 config: &serde_json::Value,
137 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
138 let ch_config: ClickHouseConfig = serde_json::from_value(config.clone())
139 .map_err(|e| OversyncError::Config(format!("clickhouse: {e}")))?;
140 let connector = ClickHouseConnector::new(name, ch_config)?;
141 Ok(Box::new(connector))
142 }
143}
144
145pub struct McpOriginFactory;
146
147#[async_trait]
148impl OriginFactory for McpOriginFactory {
149 fn connector_type(&self) -> &str {
150 "mcp"
151 }
152
153 async fn create(
154 &self,
155 name: &str,
156 config: &serde_json::Value,
157 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
158 let mcp_config: McpConfig = serde_json::from_value(config.clone())
159 .map_err(|e| OversyncError::Config(format!("mcp: {e}")))?;
160 Ok(Box::new(McpOriginConnector::new(name, mcp_config)))
161 }
162}
163
164pub struct GraphqlOriginFactory;
165
166#[async_trait]
167impl OriginFactory for GraphqlOriginFactory {
168 fn connector_type(&self) -> &str {
169 "graphql"
170 }
171
172 async fn create(
173 &self,
174 name: &str,
175 config: &serde_json::Value,
176 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
177 let gql_config: GraphqlConfig = serde_json::from_value(config.clone())
178 .map_err(|e| OversyncError::Config(format!("graphql: {e}")))?;
179 let connector = GraphqlConnector::new(name, gql_config)?;
180 Ok(Box::new(connector))
181 }
182}
183
184pub struct KafkaOriginFactory;
185
186#[async_trait]
187impl OriginFactory for KafkaOriginFactory {
188 fn connector_type(&self) -> &str {
189 "kafka"
190 }
191
192 async fn create(
193 &self,
194 name: &str,
195 config: &serde_json::Value,
196 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
197 let brokers = config
198 .get("brokers")
199 .and_then(|v| v.as_str())
200 .ok_or_else(|| OversyncError::Config("kafka: missing 'brokers'".into()))?;
201 let topic = config
202 .get("topic")
203 .and_then(|v| v.as_str())
204 .ok_or_else(|| OversyncError::Config("kafka: missing 'topic'".into()))?;
205 let group_id = config
206 .get("group_id")
207 .and_then(|v| v.as_str())
208 .ok_or_else(|| OversyncError::Config("kafka: missing 'group_id'".into()))?;
209 let auto_offset_reset = config.get("auto_offset_reset").and_then(|v| v.as_str());
210 let auth: Option<oversync_core::model::KafkaAuth> = config
211 .get("auth")
212 .map(|v| {
213 serde_json::from_value(v.clone())
214 .map_err(|e| OversyncError::Config(format!("kafka auth: {e}")))
215 })
216 .transpose()?;
217 let connector = KafkaSourceConnector::with_auth(
218 name,
219 brokers,
220 topic,
221 group_id,
222 auto_offset_reset,
223 auth.as_ref(),
224 )?;
225 Ok(Box::new(connector))
226 }
227}
228
229pub struct SurrealDbOriginFactory;
230
231#[async_trait]
232impl OriginFactory for SurrealDbOriginFactory {
233 fn connector_type(&self) -> &str {
234 "surrealdb"
235 }
236
237 async fn create(
238 &self,
239 name: &str,
240 config: &serde_json::Value,
241 ) -> Result<Box<dyn OriginConnector>, OversyncError> {
242 let url = config
243 .get("url")
244 .and_then(|v| v.as_str())
245 .ok_or_else(|| OversyncError::Config("surrealdb: missing 'url'".into()))?;
246 let namespace = config
247 .get("namespace")
248 .and_then(|v| v.as_str())
249 .ok_or_else(|| OversyncError::Config("surrealdb: missing 'namespace'".into()))?;
250 let database = config
251 .get("database")
252 .and_then(|v| v.as_str())
253 .ok_or_else(|| OversyncError::Config("surrealdb: missing 'database'".into()))?;
254 let username = config
255 .get("username")
256 .and_then(|v| v.as_str())
257 .unwrap_or("root");
258 let password = config
259 .get("password")
260 .and_then(|v| v.as_str())
261 .unwrap_or("root");
262 let live = config
263 .get("live")
264 .and_then(|v| v.as_bool())
265 .unwrap_or(false);
266
267 if live {
268 let table = config
269 .get("table")
270 .and_then(|v| v.as_str())
271 .ok_or_else(|| OversyncError::Config("surrealdb live: missing 'table'".into()))?;
272 let key_column = config
273 .get("key_column")
274 .and_then(|v| v.as_str())
275 .unwrap_or("id");
276 let connector = SurrealDbLiveConnector::new(
277 name, url, namespace, database, username, password, table, key_column,
278 )
279 .await?;
280 Ok(Box::new(connector))
281 } else {
282 let connector =
283 SurrealDbConnector::new(name, url, namespace, database, username, password).await?;
284 Ok(Box::new(connector))
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
296 fn all_factories_report_correct_connector_type() {
297 assert_eq!(PostgresOriginFactory.connector_type(), "postgres");
298 assert_eq!(HttpOriginFactory.connector_type(), "http");
299 assert_eq!(MysqlOriginFactory.connector_type(), "mysql");
300 assert_eq!(FlightSqlOriginFactory.connector_type(), "flight-sql");
301 assert_eq!(TrinoOriginFactory.connector_type(), "trino");
302 assert_eq!(ClickHouseOriginFactory.connector_type(), "clickhouse");
303 assert_eq!(McpOriginFactory.connector_type(), "mcp");
304 assert_eq!(GraphqlOriginFactory.connector_type(), "graphql");
305 assert_eq!(KafkaOriginFactory.connector_type(), "kafka");
306 assert_eq!(SurrealDbOriginFactory.connector_type(), "surrealdb");
307 }
308
309 #[tokio::test]
312 async fn postgres_factory_missing_dsn() {
313 let config = serde_json::json!({});
314 let err = PostgresOriginFactory
315 .create("test", &config)
316 .await
317 .err()
318 .expect("should fail");
319 assert!(err.to_string().contains("missing 'dsn'"));
320 }
321
322 #[tokio::test]
325 async fn mysql_factory_missing_dsn() {
326 let config = serde_json::json!({});
327 let err = MysqlOriginFactory
328 .create("test", &config)
329 .await
330 .err()
331 .expect("should fail");
332 assert!(err.to_string().contains("missing 'dsn'"));
333 }
334
335 #[tokio::test]
338 async fn flight_sql_factory_missing_dsn() {
339 let config = serde_json::json!({});
340 let err = FlightSqlOriginFactory
341 .create("test", &config)
342 .await
343 .err()
344 .expect("should fail");
345 assert!(err.to_string().contains("missing 'dsn'"));
346 }
347
348 #[tokio::test]
351 async fn http_factory_invalid_config() {
352 let config = serde_json::json!("not-an-object");
353 let err = HttpOriginFactory
354 .create("test", &config)
355 .await
356 .err()
357 .expect("should fail");
358 assert!(err.to_string().contains("http source"));
359 }
360
361 #[tokio::test]
364 async fn trino_factory_invalid_config() {
365 let config = serde_json::json!("not-an-object");
366 let err = TrinoOriginFactory
367 .create("test", &config)
368 .await
369 .err()
370 .expect("should fail");
371 assert!(err.to_string().contains("trino"));
372 }
373
374 #[tokio::test]
377 async fn clickhouse_factory_invalid_config() {
378 let config = serde_json::json!("not-an-object");
379 let err = ClickHouseOriginFactory
380 .create("test", &config)
381 .await
382 .err()
383 .expect("should fail");
384 assert!(err.to_string().contains("clickhouse"));
385 }
386
387 #[tokio::test]
390 async fn graphql_factory_invalid_config() {
391 let config = serde_json::json!("not-an-object");
392 let err = GraphqlOriginFactory
393 .create("test", &config)
394 .await
395 .err()
396 .expect("should fail");
397 assert!(err.to_string().contains("graphql"));
398 }
399
400 #[tokio::test]
403 async fn kafka_factory_missing_brokers() {
404 let config = serde_json::json!({"topic": "t", "group_id": "g"});
405 let err = KafkaOriginFactory
406 .create("test", &config)
407 .await
408 .err()
409 .expect("should fail");
410 assert!(err.to_string().contains("missing 'brokers'"));
411 }
412
413 #[tokio::test]
414 async fn kafka_factory_missing_topic() {
415 let config = serde_json::json!({"brokers": "localhost:9092", "group_id": "g"});
416 let err = KafkaOriginFactory
417 .create("test", &config)
418 .await
419 .err()
420 .expect("should fail");
421 assert!(err.to_string().contains("missing 'topic'"));
422 }
423
424 #[tokio::test]
425 async fn kafka_factory_missing_group_id() {
426 let config = serde_json::json!({"brokers": "localhost:9092", "topic": "t"});
427 let err = KafkaOriginFactory
428 .create("test", &config)
429 .await
430 .err()
431 .expect("should fail");
432 assert!(err.to_string().contains("missing 'group_id'"));
433 }
434
435 #[tokio::test]
436 async fn kafka_factory_invalid_auth() {
437 let config = serde_json::json!({
438 "brokers": "localhost:9092",
439 "topic": "t",
440 "group_id": "g",
441 "auth": "not-an-object"
442 });
443 let err = KafkaOriginFactory
444 .create("test", &config)
445 .await
446 .err()
447 .expect("should fail");
448 assert!(err.to_string().contains("kafka auth"));
449 }
450
451 #[tokio::test]
454 async fn surrealdb_factory_missing_url() {
455 let config = serde_json::json!({"namespace": "ns", "database": "db"});
456 let err = SurrealDbOriginFactory
457 .create("test", &config)
458 .await
459 .err()
460 .expect("should fail");
461 assert!(err.to_string().contains("missing 'url'"));
462 }
463
464 #[tokio::test]
465 async fn surrealdb_factory_missing_namespace() {
466 let config = serde_json::json!({"url": "ws://localhost:8000", "database": "db"});
467 let err = SurrealDbOriginFactory
468 .create("test", &config)
469 .await
470 .err()
471 .expect("should fail");
472 assert!(err.to_string().contains("missing 'namespace'"));
473 }
474
475 #[tokio::test]
476 async fn surrealdb_factory_missing_database() {
477 let config = serde_json::json!({"url": "ws://localhost:8000", "namespace": "ns"});
478 let err = SurrealDbOriginFactory
479 .create("test", &config)
480 .await
481 .err()
482 .expect("should fail");
483 assert!(err.to_string().contains("missing 'database'"));
484 }
485
486 #[tokio::test]
487 async fn surrealdb_live_factory_missing_table() {
488 let config = serde_json::json!({
489 "url": "ws://localhost:8000",
490 "namespace": "ns",
491 "database": "db",
492 "live": true
493 });
494 let err = SurrealDbOriginFactory
495 .create("test", &config)
496 .await
497 .err()
498 .expect("should fail");
499 assert!(err.to_string().contains("missing 'table'"));
500 }
501
502 #[tokio::test]
505 async fn mcp_factory_creates_with_valid_config() {
506 let config = serde_json::json!({"dsn": "npx -y @modelcontextprotocol/server-memory"});
507 let connector = McpOriginFactory.create("mcp-test", &config).await.unwrap();
508 assert_eq!(connector.name(), "mcp-test");
509 }
510
511 #[tokio::test]
512 async fn mcp_factory_invalid_config() {
513 let config = serde_json::json!("not-an-object");
514 let err = McpOriginFactory
515 .create("test", &config)
516 .await
517 .err()
518 .expect("should fail");
519 assert!(err.to_string().contains("mcp"));
520 }
521}