Skip to main content

oversync_connectors/
factory.rs

1use async_trait::async_trait;
2
3use oversync_core::error::OversyncError;
4use oversync_core::traits::{OriginConnector, OriginFactory};
5
6use crate::PostgresConnector;
7use crate::clickhouse::{ClickHouseConfig, ClickHouseConnector};
8use crate::flight_sql::FlightSqlConnector;
9use crate::graphql::{GraphqlConfig, GraphqlConnector};
10use crate::http_source::{HttpSource, HttpSourceConfig};
11use crate::kafka_source::KafkaSourceConnector;
12use crate::mcp::{McpConfig, McpOriginConnector};
13use crate::mysql::MysqlConnector;
14use crate::surrealdb_source::{SurrealDbConnector, SurrealDbLiveConnector};
15use crate::trino::{TrinoConfig, TrinoConnector};
16
17pub struct PostgresOriginFactory;
18
19#[async_trait]
20impl OriginFactory for PostgresOriginFactory {
21	fn connector_type(&self) -> &str {
22		"postgres"
23	}
24
25	async fn create(
26		&self,
27		name: &str,
28		config: &serde_json::Value,
29	) -> Result<Box<dyn OriginConnector>, OversyncError> {
30		let dsn = config
31			.get("dsn")
32			.and_then(|v| v.as_str())
33			.ok_or_else(|| OversyncError::Config("postgres: missing 'dsn'".into()))?;
34
35		let connector = PostgresConnector::new(name, dsn).await?;
36		Ok(Box::new(connector))
37	}
38}
39
40pub struct HttpOriginFactory;
41
42#[async_trait]
43impl OriginFactory for HttpOriginFactory {
44	fn connector_type(&self) -> &str {
45		"http"
46	}
47
48	async fn create(
49		&self,
50		name: &str,
51		config: &serde_json::Value,
52	) -> Result<Box<dyn OriginConnector>, OversyncError> {
53		let http_config: HttpSourceConfig = serde_json::from_value(config.clone())
54			.map_err(|e| OversyncError::Config(format!("http source: {e}")))?;
55		Ok(Box::new(HttpSource::new(name, http_config)?))
56	}
57}
58
59pub struct MysqlOriginFactory;
60
61#[async_trait]
62impl OriginFactory for MysqlOriginFactory {
63	fn connector_type(&self) -> &str {
64		"mysql"
65	}
66
67	async fn create(
68		&self,
69		name: &str,
70		config: &serde_json::Value,
71	) -> Result<Box<dyn OriginConnector>, OversyncError> {
72		let dsn = config
73			.get("dsn")
74			.and_then(|v| v.as_str())
75			.ok_or_else(|| OversyncError::Config("mysql: missing 'dsn'".into()))?;
76
77		let connector = MysqlConnector::new(name, dsn).await?;
78		Ok(Box::new(connector))
79	}
80}
81
82pub struct FlightSqlOriginFactory;
83
84#[async_trait]
85impl OriginFactory for FlightSqlOriginFactory {
86	fn connector_type(&self) -> &str {
87		"flight-sql"
88	}
89
90	async fn create(
91		&self,
92		name: &str,
93		config: &serde_json::Value,
94	) -> Result<Box<dyn OriginConnector>, OversyncError> {
95		let dsn = config
96			.get("dsn")
97			.and_then(|v| v.as_str())
98			.ok_or_else(|| OversyncError::Config("flight-sql: missing 'dsn'".into()))?;
99
100		let connector = FlightSqlConnector::new(name, dsn)?;
101		Ok(Box::new(connector))
102	}
103}
104
105pub struct TrinoOriginFactory;
106
107#[async_trait]
108impl OriginFactory for TrinoOriginFactory {
109	fn connector_type(&self) -> &str {
110		"trino"
111	}
112
113	async fn create(
114		&self,
115		name: &str,
116		config: &serde_json::Value,
117	) -> Result<Box<dyn OriginConnector>, OversyncError> {
118		let trino_config: TrinoConfig = serde_json::from_value(config.clone())
119			.map_err(|e| OversyncError::Config(format!("trino: {e}")))?;
120		let connector = TrinoConnector::new(name, trino_config)?;
121		Ok(Box::new(connector))
122	}
123}
124
125pub struct ClickHouseOriginFactory;
126
127#[async_trait]
128impl OriginFactory for ClickHouseOriginFactory {
129	fn connector_type(&self) -> &str {
130		"clickhouse"
131	}
132
133	async fn create(
134		&self,
135		name: &str,
136		config: &serde_json::Value,
137	) -> Result<Box<dyn OriginConnector>, OversyncError> {
138		let ch_config: ClickHouseConfig = serde_json::from_value(config.clone())
139			.map_err(|e| OversyncError::Config(format!("clickhouse: {e}")))?;
140		let connector = ClickHouseConnector::new(name, ch_config)?;
141		Ok(Box::new(connector))
142	}
143}
144
145pub struct McpOriginFactory;
146
147#[async_trait]
148impl OriginFactory for McpOriginFactory {
149	fn connector_type(&self) -> &str {
150		"mcp"
151	}
152
153	async fn create(
154		&self,
155		name: &str,
156		config: &serde_json::Value,
157	) -> Result<Box<dyn OriginConnector>, OversyncError> {
158		let mcp_config: McpConfig = serde_json::from_value(config.clone())
159			.map_err(|e| OversyncError::Config(format!("mcp: {e}")))?;
160		Ok(Box::new(McpOriginConnector::new(name, mcp_config)))
161	}
162}
163
164pub struct GraphqlOriginFactory;
165
166#[async_trait]
167impl OriginFactory for GraphqlOriginFactory {
168	fn connector_type(&self) -> &str {
169		"graphql"
170	}
171
172	async fn create(
173		&self,
174		name: &str,
175		config: &serde_json::Value,
176	) -> Result<Box<dyn OriginConnector>, OversyncError> {
177		let gql_config: GraphqlConfig = serde_json::from_value(config.clone())
178			.map_err(|e| OversyncError::Config(format!("graphql: {e}")))?;
179		let connector = GraphqlConnector::new(name, gql_config)?;
180		Ok(Box::new(connector))
181	}
182}
183
184pub struct KafkaOriginFactory;
185
186#[async_trait]
187impl OriginFactory for KafkaOriginFactory {
188	fn connector_type(&self) -> &str {
189		"kafka"
190	}
191
192	async fn create(
193		&self,
194		name: &str,
195		config: &serde_json::Value,
196	) -> Result<Box<dyn OriginConnector>, OversyncError> {
197		let brokers = config
198			.get("brokers")
199			.and_then(|v| v.as_str())
200			.ok_or_else(|| OversyncError::Config("kafka: missing 'brokers'".into()))?;
201		let topic = config
202			.get("topic")
203			.and_then(|v| v.as_str())
204			.ok_or_else(|| OversyncError::Config("kafka: missing 'topic'".into()))?;
205		let group_id = config
206			.get("group_id")
207			.and_then(|v| v.as_str())
208			.ok_or_else(|| OversyncError::Config("kafka: missing 'group_id'".into()))?;
209		let auto_offset_reset = config.get("auto_offset_reset").and_then(|v| v.as_str());
210		let auth: Option<oversync_core::model::KafkaAuth> = config
211			.get("auth")
212			.map(|v| {
213				serde_json::from_value(v.clone())
214					.map_err(|e| OversyncError::Config(format!("kafka auth: {e}")))
215			})
216			.transpose()?;
217		let connector = KafkaSourceConnector::with_auth(
218			name,
219			brokers,
220			topic,
221			group_id,
222			auto_offset_reset,
223			auth.as_ref(),
224		)?;
225		Ok(Box::new(connector))
226	}
227}
228
229pub struct SurrealDbOriginFactory;
230
231#[async_trait]
232impl OriginFactory for SurrealDbOriginFactory {
233	fn connector_type(&self) -> &str {
234		"surrealdb"
235	}
236
237	async fn create(
238		&self,
239		name: &str,
240		config: &serde_json::Value,
241	) -> Result<Box<dyn OriginConnector>, OversyncError> {
242		let url = config
243			.get("url")
244			.and_then(|v| v.as_str())
245			.ok_or_else(|| OversyncError::Config("surrealdb: missing 'url'".into()))?;
246		let namespace = config
247			.get("namespace")
248			.and_then(|v| v.as_str())
249			.ok_or_else(|| OversyncError::Config("surrealdb: missing 'namespace'".into()))?;
250		let database = config
251			.get("database")
252			.and_then(|v| v.as_str())
253			.ok_or_else(|| OversyncError::Config("surrealdb: missing 'database'".into()))?;
254		let username = config
255			.get("username")
256			.and_then(|v| v.as_str())
257			.unwrap_or("root");
258		let password = config
259			.get("password")
260			.and_then(|v| v.as_str())
261			.unwrap_or("root");
262		let live = config
263			.get("live")
264			.and_then(|v| v.as_bool())
265			.unwrap_or(false);
266
267		if live {
268			let table = config
269				.get("table")
270				.and_then(|v| v.as_str())
271				.ok_or_else(|| OversyncError::Config("surrealdb live: missing 'table'".into()))?;
272			let key_column = config
273				.get("key_column")
274				.and_then(|v| v.as_str())
275				.unwrap_or("id");
276			let connector = SurrealDbLiveConnector::new(
277				name, url, namespace, database, username, password, table, key_column,
278			)
279			.await?;
280			Ok(Box::new(connector))
281		} else {
282			let connector =
283				SurrealDbConnector::new(name, url, namespace, database, username, password).await?;
284			Ok(Box::new(connector))
285		}
286	}
287}
288
289#[cfg(test)]
290mod tests {
291	use super::*;
292
293	// ── Connector type identity ───────────────────────────────────
294
295	#[test]
296	fn all_factories_report_correct_connector_type() {
297		assert_eq!(PostgresOriginFactory.connector_type(), "postgres");
298		assert_eq!(HttpOriginFactory.connector_type(), "http");
299		assert_eq!(MysqlOriginFactory.connector_type(), "mysql");
300		assert_eq!(FlightSqlOriginFactory.connector_type(), "flight-sql");
301		assert_eq!(TrinoOriginFactory.connector_type(), "trino");
302		assert_eq!(ClickHouseOriginFactory.connector_type(), "clickhouse");
303		assert_eq!(McpOriginFactory.connector_type(), "mcp");
304		assert_eq!(GraphqlOriginFactory.connector_type(), "graphql");
305		assert_eq!(KafkaOriginFactory.connector_type(), "kafka");
306		assert_eq!(SurrealDbOriginFactory.connector_type(), "surrealdb");
307	}
308
309	// ── Postgres factory ──────────────────────────────────────────
310
311	#[tokio::test]
312	async fn postgres_factory_missing_dsn() {
313		let config = serde_json::json!({});
314		let err = PostgresOriginFactory
315			.create("test", &config)
316			.await
317			.err()
318			.expect("should fail");
319		assert!(err.to_string().contains("missing 'dsn'"));
320	}
321
322	// ── MySQL factory ─────────────────────────────────────────────
323
324	#[tokio::test]
325	async fn mysql_factory_missing_dsn() {
326		let config = serde_json::json!({});
327		let err = MysqlOriginFactory
328			.create("test", &config)
329			.await
330			.err()
331			.expect("should fail");
332		assert!(err.to_string().contains("missing 'dsn'"));
333	}
334
335	// ── FlightSQL factory ─────────────────────────────────────────
336
337	#[tokio::test]
338	async fn flight_sql_factory_missing_dsn() {
339		let config = serde_json::json!({});
340		let err = FlightSqlOriginFactory
341			.create("test", &config)
342			.await
343			.err()
344			.expect("should fail");
345		assert!(err.to_string().contains("missing 'dsn'"));
346	}
347
348	// ── HTTP factory ──────────────────────────────────────────────
349
350	#[tokio::test]
351	async fn http_factory_invalid_config() {
352		let config = serde_json::json!("not-an-object");
353		let err = HttpOriginFactory
354			.create("test", &config)
355			.await
356			.err()
357			.expect("should fail");
358		assert!(err.to_string().contains("http source"));
359	}
360
361	// ── Trino factory ─────────────────────────────────────────────
362
363	#[tokio::test]
364	async fn trino_factory_invalid_config() {
365		let config = serde_json::json!("not-an-object");
366		let err = TrinoOriginFactory
367			.create("test", &config)
368			.await
369			.err()
370			.expect("should fail");
371		assert!(err.to_string().contains("trino"));
372	}
373
374	// ── ClickHouse factory ────────────────────────────────────────
375
376	#[tokio::test]
377	async fn clickhouse_factory_invalid_config() {
378		let config = serde_json::json!("not-an-object");
379		let err = ClickHouseOriginFactory
380			.create("test", &config)
381			.await
382			.err()
383			.expect("should fail");
384		assert!(err.to_string().contains("clickhouse"));
385	}
386
387	// ── GraphQL factory ───────────────────────────────────────────
388
389	#[tokio::test]
390	async fn graphql_factory_invalid_config() {
391		let config = serde_json::json!("not-an-object");
392		let err = GraphqlOriginFactory
393			.create("test", &config)
394			.await
395			.err()
396			.expect("should fail");
397		assert!(err.to_string().contains("graphql"));
398	}
399
400	// ── Kafka factory ─────────────────────────────────────────────
401
402	#[tokio::test]
403	async fn kafka_factory_missing_brokers() {
404		let config = serde_json::json!({"topic": "t", "group_id": "g"});
405		let err = KafkaOriginFactory
406			.create("test", &config)
407			.await
408			.err()
409			.expect("should fail");
410		assert!(err.to_string().contains("missing 'brokers'"));
411	}
412
413	#[tokio::test]
414	async fn kafka_factory_missing_topic() {
415		let config = serde_json::json!({"brokers": "localhost:9092", "group_id": "g"});
416		let err = KafkaOriginFactory
417			.create("test", &config)
418			.await
419			.err()
420			.expect("should fail");
421		assert!(err.to_string().contains("missing 'topic'"));
422	}
423
424	#[tokio::test]
425	async fn kafka_factory_missing_group_id() {
426		let config = serde_json::json!({"brokers": "localhost:9092", "topic": "t"});
427		let err = KafkaOriginFactory
428			.create("test", &config)
429			.await
430			.err()
431			.expect("should fail");
432		assert!(err.to_string().contains("missing 'group_id'"));
433	}
434
435	#[tokio::test]
436	async fn kafka_factory_invalid_auth() {
437		let config = serde_json::json!({
438			"brokers": "localhost:9092",
439			"topic": "t",
440			"group_id": "g",
441			"auth": "not-an-object"
442		});
443		let err = KafkaOriginFactory
444			.create("test", &config)
445			.await
446			.err()
447			.expect("should fail");
448		assert!(err.to_string().contains("kafka auth"));
449	}
450
451	// ── SurrealDB factory ─────────────────────────────────────────
452
453	#[tokio::test]
454	async fn surrealdb_factory_missing_url() {
455		let config = serde_json::json!({"namespace": "ns", "database": "db"});
456		let err = SurrealDbOriginFactory
457			.create("test", &config)
458			.await
459			.err()
460			.expect("should fail");
461		assert!(err.to_string().contains("missing 'url'"));
462	}
463
464	#[tokio::test]
465	async fn surrealdb_factory_missing_namespace() {
466		let config = serde_json::json!({"url": "ws://localhost:8000", "database": "db"});
467		let err = SurrealDbOriginFactory
468			.create("test", &config)
469			.await
470			.err()
471			.expect("should fail");
472		assert!(err.to_string().contains("missing 'namespace'"));
473	}
474
475	#[tokio::test]
476	async fn surrealdb_factory_missing_database() {
477		let config = serde_json::json!({"url": "ws://localhost:8000", "namespace": "ns"});
478		let err = SurrealDbOriginFactory
479			.create("test", &config)
480			.await
481			.err()
482			.expect("should fail");
483		assert!(err.to_string().contains("missing 'database'"));
484	}
485
486	#[tokio::test]
487	async fn surrealdb_live_factory_missing_table() {
488		let config = serde_json::json!({
489			"url": "ws://localhost:8000",
490			"namespace": "ns",
491			"database": "db",
492			"live": true
493		});
494		let err = SurrealDbOriginFactory
495			.create("test", &config)
496			.await
497			.err()
498			.expect("should fail");
499		assert!(err.to_string().contains("missing 'table'"));
500	}
501
502	// ── MCP factory ───────────────────────────────────────────────
503
504	#[tokio::test]
505	async fn mcp_factory_creates_with_valid_config() {
506		let config = serde_json::json!({"dsn": "npx -y @modelcontextprotocol/server-memory"});
507		let connector = McpOriginFactory.create("mcp-test", &config).await.unwrap();
508		assert_eq!(connector.name(), "mcp-test");
509	}
510
511	#[tokio::test]
512	async fn mcp_factory_invalid_config() {
513		let config = serde_json::json!("not-an-object");
514		let err = McpOriginFactory
515			.create("test", &config)
516			.await
517			.err()
518			.expect("should fail");
519		assert!(err.to_string().contains("mcp"));
520	}
521}