Skip to main content

oversync_core/
traits.rs

1use async_trait::async_trait;
2use tokio::sync::mpsc;
3
4use crate::error::OversyncError;
5use crate::model::{EventEnvelope, RawRow};
6
7/// Rust-native event transform hook. Consumers implement this to modify
8/// [`EventEnvelope`]s in-flight before they reach sinks. Takes precedence
9/// over SurrealQL `fn::*` transforms when set on a [`CycleRunner`].
10#[async_trait]
11pub trait TransformHook: Send + Sync {
12	async fn transform(
13		&self,
14		envelopes: Vec<EventEnvelope>,
15	) -> Result<Vec<EventEnvelope>, OversyncError>;
16}
17
18/// An origin connector fetches rows from an external data source.
19///
20/// Implementations exist for PostgreSQL, MySQL, Trino, ClickHouse, HTTP APIs,
21/// GraphQL, and Apache Arrow Flight SQL. Each connector is created via an
22/// [`OriginFactory`] from a JSON config object.
23///
24/// # Lifecycle
25///
26/// 1. Factory creates the connector (connection pool established)
27/// 2. [`fetch_all`] or [`fetch_into`] called once per cycle
28/// 3. Connector is reused across cycles (connection pooling)
29#[async_trait]
30pub trait OriginConnector: Send + Sync {
31	/// Human-readable name of this connector instance.
32	fn name(&self) -> &str;
33
34	/// Fetch all rows matching the query. Returns the full result set in memory.
35	async fn fetch_all(&self, sql: &str, key_column: &str) -> Result<Vec<RawRow>, OversyncError>;
36
37	/// Stream rows in batches into a channel. Memory bounded by batch_size * channel buffer.
38	/// Default: calls fetch_all, chunks, sends.
39	async fn fetch_into(
40		&self,
41		sql: &str,
42		key_column: &str,
43		batch_size: usize,
44		tx: mpsc::Sender<Vec<RawRow>>,
45	) -> Result<usize, OversyncError> {
46		let all = self.fetch_all(sql, key_column).await?;
47		let total = all.len();
48		for chunk in all.chunks(batch_size) {
49			tx.send(chunk.to_vec())
50				.await
51				.map_err(|_| OversyncError::Internal("channel closed".into()))?;
52		}
53		Ok(total)
54	}
55
56	async fn test_connection(&self) -> Result<(), OversyncError>;
57}
58
59/// A target connector delivers delta events to a destination.
60///
61/// Built-in implementations: stdout, HTTP webhook, Kafka, SurrealDB.
62/// Custom sinks implement this trait for application-specific delivery
63/// (e.g., DatacatSink transforms events into catalog entities).
64///
65/// # Batching
66///
67/// The default [`send_batch`] iterates and calls [`send_event`] per item.
68/// Override for targets that support native batching (e.g., Kafka produce).
69#[async_trait]
70pub trait Sink: Send + Sync {
71	/// Human-readable name of this target instance.
72	fn name(&self) -> &str;
73
74	/// Deliver a single event to the target.
75	async fn send_event(&self, envelope: &EventEnvelope) -> Result<(), OversyncError>;
76
77	/// Send a batch of envelopes. Default: iterates and calls send_event.
78	/// Override for sinks that support native batching (e.g., Kafka produce batch).
79	async fn send_batch(&self, envelopes: &[EventEnvelope]) -> Result<(), OversyncError> {
80		for envelope in envelopes {
81			self.send_event(envelope).await?;
82		}
83		Ok(())
84	}
85
86	async fn test_connection(&self) -> Result<(), OversyncError>;
87}
88
89/// Factory for creating [`OriginConnector`]s from a JSON config object.
90///
91/// Registered in the [`PluginRegistry`] by connector type name (e.g., `"postgres"`).
92/// The factory is called once per source to create a connector instance.
93#[async_trait]
94pub trait OriginFactory: Send + Sync {
95	fn connector_type(&self) -> &str;
96
97	async fn create(
98		&self,
99		name: &str,
100		config: &serde_json::Value,
101	) -> Result<Box<dyn OriginConnector>, OversyncError>;
102}
103
104/// Factory for creating [`Sink`] target connectors from a JSON config object.
105#[async_trait]
106pub trait TargetFactory: Send + Sync {
107	fn sink_type(&self) -> &str;
108
109	async fn create(
110		&self,
111		name: &str,
112		config: &serde_json::Value,
113	) -> Result<Box<dyn Sink>, OversyncError>;
114}
115
116/// Chain multiple [`TransformHook`]s into a sequential pipeline.
117///
118/// Each hook receives the output of the previous one. If any hook
119/// returns an error, the pipeline short-circuits.
120pub struct TransformPipeline {
121	hooks: Vec<std::sync::Arc<dyn TransformHook>>,
122}
123
124impl TransformPipeline {
125	pub fn new(hooks: Vec<std::sync::Arc<dyn TransformHook>>) -> Self {
126		Self { hooks }
127	}
128}
129
130#[async_trait]
131impl TransformHook for TransformPipeline {
132	async fn transform(
133		&self,
134		mut envelopes: Vec<EventEnvelope>,
135	) -> Result<Vec<EventEnvelope>, OversyncError> {
136		for hook in &self.hooks {
137			envelopes = hook.transform(envelopes).await?;
138		}
139		Ok(envelopes)
140	}
141}
142
143#[cfg(test)]
144mod tests {
145	use super::*;
146	use crate::model::{EventEnvelope, EventMeta, OpType};
147
148	struct DoubleTransform;
149
150	#[async_trait]
151	impl TransformHook for DoubleTransform {
152		async fn transform(
153			&self,
154			envelopes: Vec<EventEnvelope>,
155		) -> Result<Vec<EventEnvelope>, OversyncError> {
156			let mut out = envelopes.clone();
157			out.extend(envelopes);
158			Ok(out)
159		}
160	}
161
162	fn test_envelope() -> EventEnvelope {
163		EventEnvelope {
164			meta: EventMeta {
165				op: OpType::Created,
166				origin_id: "s".into(),
167				query_id: "q".into(),
168				key: "k".into(),
169				hash: "h".into(),
170				cycle_id: 1,
171				timestamp: chrono::Utc::now(),
172			},
173			data: serde_json::json!({}),
174		}
175	}
176
177	#[tokio::test]
178	async fn transform_hook_receives_and_returns_envelopes() {
179		let hook = DoubleTransform;
180		let input = vec![test_envelope()];
181		let output = hook.transform(input).await.unwrap();
182		assert_eq!(output.len(), 2);
183	}
184
185	#[tokio::test]
186	async fn transform_hook_can_return_empty() {
187		struct DropAll;
188		#[async_trait]
189		impl TransformHook for DropAll {
190			async fn transform(
191				&self,
192				_envelopes: Vec<EventEnvelope>,
193			) -> Result<Vec<EventEnvelope>, OversyncError> {
194				Ok(vec![])
195			}
196		}
197
198		let output = DropAll.transform(vec![test_envelope()]).await.unwrap();
199		assert!(output.is_empty());
200	}
201
202	#[tokio::test]
203	async fn transform_hook_can_return_error() {
204		struct FailTransform;
205		#[async_trait]
206		impl TransformHook for FailTransform {
207			async fn transform(
208				&self,
209				_envelopes: Vec<EventEnvelope>,
210			) -> Result<Vec<EventEnvelope>, OversyncError> {
211				Err(OversyncError::Internal("transform failed".into()))
212			}
213		}
214
215		let result = FailTransform.transform(vec![test_envelope()]).await;
216		let err = result.unwrap_err();
217		assert!(
218			matches!(err, OversyncError::Internal(_)),
219			"expected Internal variant, got: {err}"
220		);
221		assert!(err.to_string().contains("transform failed"));
222	}
223
224	// ── TransformPipeline tests ──────────────────────────────
225
226	#[tokio::test]
227	async fn pipeline_chains_transforms_in_order() {
228		struct AppendSuffix(&'static str);
229		#[async_trait]
230		impl TransformHook for AppendSuffix {
231			async fn transform(
232				&self,
233				envelopes: Vec<EventEnvelope>,
234			) -> Result<Vec<EventEnvelope>, OversyncError> {
235				Ok(envelopes
236					.into_iter()
237					.map(|mut e| {
238						e.meta.origin_id.push_str(self.0);
239						e
240					})
241					.collect())
242			}
243		}
244
245		let pipeline = TransformPipeline::new(vec![
246			std::sync::Arc::new(AppendSuffix("_a")),
247			std::sync::Arc::new(AppendSuffix("_b")),
248		]);
249		let output = pipeline.transform(vec![test_envelope()]).await.unwrap();
250		assert_eq!(output[0].meta.origin_id, "s_a_b");
251	}
252
253	#[tokio::test]
254	async fn pipeline_empty_hooks_is_passthrough() {
255		let pipeline = TransformPipeline::new(vec![]);
256		let input = vec![test_envelope()];
257		let output = pipeline.transform(input.clone()).await.unwrap();
258		assert_eq!(output.len(), 1);
259		assert_eq!(output[0].meta.key, input[0].meta.key);
260	}
261
262	#[tokio::test]
263	async fn pipeline_short_circuits_on_error() {
264		struct Fail;
265		#[async_trait]
266		impl TransformHook for Fail {
267			async fn transform(
268				&self,
269				_: Vec<EventEnvelope>,
270			) -> Result<Vec<EventEnvelope>, OversyncError> {
271				Err(OversyncError::Internal("stage 2 failed".into()))
272			}
273		}
274
275		let pipeline = TransformPipeline::new(vec![
276			std::sync::Arc::new(DoubleTransform),
277			std::sync::Arc::new(Fail),
278		]);
279		let result = pipeline.transform(vec![test_envelope()]).await;
280		assert!(result.is_err());
281		assert!(result.unwrap_err().to_string().contains("stage 2"));
282	}
283}