1use async_trait::async_trait;
2use tokio::sync::mpsc;
3
4use crate::error::OversyncError;
5use crate::model::{EventEnvelope, RawRow};
6
7#[async_trait]
11pub trait TransformHook: Send + Sync {
12 async fn transform(
13 &self,
14 envelopes: Vec<EventEnvelope>,
15 ) -> Result<Vec<EventEnvelope>, OversyncError>;
16}
17
18#[async_trait]
30pub trait OriginConnector: Send + Sync {
31 fn name(&self) -> &str;
33
34 async fn fetch_all(&self, sql: &str, key_column: &str) -> Result<Vec<RawRow>, OversyncError>;
36
37 async fn fetch_into(
40 &self,
41 sql: &str,
42 key_column: &str,
43 batch_size: usize,
44 tx: mpsc::Sender<Vec<RawRow>>,
45 ) -> Result<usize, OversyncError> {
46 let all = self.fetch_all(sql, key_column).await?;
47 let total = all.len();
48 for chunk in all.chunks(batch_size) {
49 tx.send(chunk.to_vec())
50 .await
51 .map_err(|_| OversyncError::Internal("channel closed".into()))?;
52 }
53 Ok(total)
54 }
55
56 async fn test_connection(&self) -> Result<(), OversyncError>;
57}
58
59#[async_trait]
70pub trait Sink: Send + Sync {
71 fn name(&self) -> &str;
73
74 async fn send_event(&self, envelope: &EventEnvelope) -> Result<(), OversyncError>;
76
77 async fn send_batch(&self, envelopes: &[EventEnvelope]) -> Result<(), OversyncError> {
80 for envelope in envelopes {
81 self.send_event(envelope).await?;
82 }
83 Ok(())
84 }
85
86 async fn test_connection(&self) -> Result<(), OversyncError>;
87}
88
89#[async_trait]
94pub trait OriginFactory: Send + Sync {
95 fn connector_type(&self) -> &str;
96
97 async fn create(
98 &self,
99 name: &str,
100 config: &serde_json::Value,
101 ) -> Result<Box<dyn OriginConnector>, OversyncError>;
102}
103
104#[async_trait]
106pub trait TargetFactory: Send + Sync {
107 fn sink_type(&self) -> &str;
108
109 async fn create(
110 &self,
111 name: &str,
112 config: &serde_json::Value,
113 ) -> Result<Box<dyn Sink>, OversyncError>;
114}
115
116pub struct TransformPipeline {
121 hooks: Vec<std::sync::Arc<dyn TransformHook>>,
122}
123
124impl TransformPipeline {
125 pub fn new(hooks: Vec<std::sync::Arc<dyn TransformHook>>) -> Self {
126 Self { hooks }
127 }
128}
129
130#[async_trait]
131impl TransformHook for TransformPipeline {
132 async fn transform(
133 &self,
134 mut envelopes: Vec<EventEnvelope>,
135 ) -> Result<Vec<EventEnvelope>, OversyncError> {
136 for hook in &self.hooks {
137 envelopes = hook.transform(envelopes).await?;
138 }
139 Ok(envelopes)
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::model::{EventEnvelope, EventMeta, OpType};
147
148 struct DoubleTransform;
149
150 #[async_trait]
151 impl TransformHook for DoubleTransform {
152 async fn transform(
153 &self,
154 envelopes: Vec<EventEnvelope>,
155 ) -> Result<Vec<EventEnvelope>, OversyncError> {
156 let mut out = envelopes.clone();
157 out.extend(envelopes);
158 Ok(out)
159 }
160 }
161
162 fn test_envelope() -> EventEnvelope {
163 EventEnvelope {
164 meta: EventMeta {
165 op: OpType::Created,
166 origin_id: "s".into(),
167 query_id: "q".into(),
168 key: "k".into(),
169 hash: "h".into(),
170 cycle_id: 1,
171 timestamp: chrono::Utc::now(),
172 },
173 data: serde_json::json!({}),
174 }
175 }
176
177 #[tokio::test]
178 async fn transform_hook_receives_and_returns_envelopes() {
179 let hook = DoubleTransform;
180 let input = vec![test_envelope()];
181 let output = hook.transform(input).await.unwrap();
182 assert_eq!(output.len(), 2);
183 }
184
185 #[tokio::test]
186 async fn transform_hook_can_return_empty() {
187 struct DropAll;
188 #[async_trait]
189 impl TransformHook for DropAll {
190 async fn transform(
191 &self,
192 _envelopes: Vec<EventEnvelope>,
193 ) -> Result<Vec<EventEnvelope>, OversyncError> {
194 Ok(vec![])
195 }
196 }
197
198 let output = DropAll.transform(vec![test_envelope()]).await.unwrap();
199 assert!(output.is_empty());
200 }
201
202 #[tokio::test]
203 async fn transform_hook_can_return_error() {
204 struct FailTransform;
205 #[async_trait]
206 impl TransformHook for FailTransform {
207 async fn transform(
208 &self,
209 _envelopes: Vec<EventEnvelope>,
210 ) -> Result<Vec<EventEnvelope>, OversyncError> {
211 Err(OversyncError::Internal("transform failed".into()))
212 }
213 }
214
215 let result = FailTransform.transform(vec![test_envelope()]).await;
216 let err = result.unwrap_err();
217 assert!(
218 matches!(err, OversyncError::Internal(_)),
219 "expected Internal variant, got: {err}"
220 );
221 assert!(err.to_string().contains("transform failed"));
222 }
223
224 #[tokio::test]
227 async fn pipeline_chains_transforms_in_order() {
228 struct AppendSuffix(&'static str);
229 #[async_trait]
230 impl TransformHook for AppendSuffix {
231 async fn transform(
232 &self,
233 envelopes: Vec<EventEnvelope>,
234 ) -> Result<Vec<EventEnvelope>, OversyncError> {
235 Ok(envelopes
236 .into_iter()
237 .map(|mut e| {
238 e.meta.origin_id.push_str(self.0);
239 e
240 })
241 .collect())
242 }
243 }
244
245 let pipeline = TransformPipeline::new(vec![
246 std::sync::Arc::new(AppendSuffix("_a")),
247 std::sync::Arc::new(AppendSuffix("_b")),
248 ]);
249 let output = pipeline.transform(vec![test_envelope()]).await.unwrap();
250 assert_eq!(output[0].meta.origin_id, "s_a_b");
251 }
252
253 #[tokio::test]
254 async fn pipeline_empty_hooks_is_passthrough() {
255 let pipeline = TransformPipeline::new(vec![]);
256 let input = vec![test_envelope()];
257 let output = pipeline.transform(input.clone()).await.unwrap();
258 assert_eq!(output.len(), 1);
259 assert_eq!(output[0].meta.key, input[0].meta.key);
260 }
261
262 #[tokio::test]
263 async fn pipeline_short_circuits_on_error() {
264 struct Fail;
265 #[async_trait]
266 impl TransformHook for Fail {
267 async fn transform(
268 &self,
269 _: Vec<EventEnvelope>,
270 ) -> Result<Vec<EventEnvelope>, OversyncError> {
271 Err(OversyncError::Internal("stage 2 failed".into()))
272 }
273 }
274
275 let pipeline = TransformPipeline::new(vec![
276 std::sync::Arc::new(DoubleTransform),
277 std::sync::Arc::new(Fail),
278 ]);
279 let result = pipeline.transform(vec![test_envelope()]).await;
280 assert!(result.is_err());
281 assert!(result.unwrap_err().to_string().contains("stage 2"));
282 }
283}