Skip to main content

oversync_transforms/
lib.rs

1#[cfg(feature = "js")]
2pub mod js;
3mod parse;
4pub mod steps;
5#[cfg(feature = "wasm")]
6pub mod wasm;
7
8use async_trait::async_trait;
9use oversync_core::error::OversyncError;
10use oversync_core::model::EventEnvelope;
11use oversync_core::traits::TransformHook;
12
13pub use parse::parse_steps;
14pub use steps::*;
15
16/// A single transform operation applied to one record's data.
17///
18/// Each step receives a mutable reference to the record's JSON `data` field
19/// and returns `Ok(true)` to keep the record or `Ok(false)` to filter it out.
20pub trait TransformStep: Send + Sync {
21	fn apply(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError>;
22
23	fn step_name(&self) -> &str;
24}
25
26/// Ordered chain of [`TransformStep`]s applied sequentially to each record.
27///
28/// If any step returns `Ok(false)`, the record is dropped from the output.
29/// If any step returns `Err`, the entire chain short-circuits.
30pub struct StepChain {
31	steps: Vec<Box<dyn TransformStep>>,
32}
33
34impl std::fmt::Debug for StepChain {
35	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36		let names: Vec<&str> = self.steps.iter().map(|s| s.step_name()).collect();
37		f.debug_struct("StepChain").field("steps", &names).finish()
38	}
39}
40
41impl StepChain {
42	pub fn new(steps: Vec<Box<dyn TransformStep>>) -> Self {
43		Self { steps }
44	}
45
46	pub fn apply_one(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError> {
47		for step in &self.steps {
48			if !step.apply(data)? {
49				return Ok(false);
50			}
51		}
52		Ok(true)
53	}
54
55	pub fn len(&self) -> usize {
56		self.steps.len()
57	}
58
59	pub fn is_empty(&self) -> bool {
60		self.steps.is_empty()
61	}
62
63	/// Filter rows pre-delta: applies steps to each RawRow.row_data,
64	/// keeping only rows where all steps return true.
65	pub fn filter_rows(
66		&self,
67		rows: Vec<oversync_core::model::RawRow>,
68	) -> Result<Vec<oversync_core::model::RawRow>, OversyncError> {
69		let mut kept = Vec::with_capacity(rows.len());
70		for mut row in rows {
71			if self.apply_one(&mut row.row_data)? {
72				kept.push(row);
73			}
74		}
75		Ok(kept)
76	}
77}
78
79#[async_trait]
80impl TransformHook for StepChain {
81	async fn transform(
82		&self,
83		envelopes: Vec<EventEnvelope>,
84	) -> Result<Vec<EventEnvelope>, OversyncError> {
85		let mut output = Vec::with_capacity(envelopes.len());
86		for mut envelope in envelopes {
87			if self.apply_one(&mut envelope.data)? {
88				output.push(envelope);
89			}
90		}
91		Ok(output)
92	}
93}
94
95#[cfg(test)]
96mod tests {
97	use super::*;
98	use oversync_core::model::{EventMeta, OpType};
99
100	fn test_envelope(data: serde_json::Value) -> EventEnvelope {
101		EventEnvelope {
102			meta: EventMeta {
103				op: OpType::Created,
104				origin_id: "test".into(),
105				query_id: "q".into(),
106				key: "k".into(),
107				hash: "h".into(),
108				cycle_id: 1,
109				timestamp: chrono::Utc::now(),
110			},
111			data,
112		}
113	}
114
115	struct AlwaysKeep;
116	impl TransformStep for AlwaysKeep {
117		fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
118			Ok(true)
119		}
120		fn step_name(&self) -> &str {
121			"always_keep"
122		}
123	}
124
125	struct AlwaysDrop;
126	impl TransformStep for AlwaysDrop {
127		fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
128			Ok(false)
129		}
130		fn step_name(&self) -> &str {
131			"always_drop"
132		}
133	}
134
135	struct FailStep;
136	impl TransformStep for FailStep {
137		fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
138			Err(OversyncError::Internal("step failed".into()))
139		}
140		fn step_name(&self) -> &str {
141			"fail"
142		}
143	}
144
145	#[test]
146	fn empty_chain_is_passthrough() {
147		let chain = StepChain::new(vec![]);
148		assert!(chain.is_empty());
149		let mut data = serde_json::json!({"x": 1});
150		assert!(chain.apply_one(&mut data).unwrap());
151		assert_eq!(data, serde_json::json!({"x": 1}));
152	}
153
154	#[test]
155	fn chain_keeps_record() {
156		let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
157		let mut data = serde_json::json!({"x": 1});
158		assert!(chain.apply_one(&mut data).unwrap());
159	}
160
161	#[test]
162	fn chain_drops_record() {
163		let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
164		let mut data = serde_json::json!({"x": 1});
165		assert!(!chain.apply_one(&mut data).unwrap());
166	}
167
168	#[test]
169	fn chain_short_circuits_on_drop() {
170		let chain = StepChain::new(vec![Box::new(AlwaysDrop), Box::new(FailStep)]);
171		let mut data = serde_json::json!({});
172		assert!(!chain.apply_one(&mut data).unwrap());
173	}
174
175	#[test]
176	fn chain_short_circuits_on_error() {
177		let chain = StepChain::new(vec![Box::new(FailStep), Box::new(AlwaysKeep)]);
178		let mut data = serde_json::json!({});
179		assert!(chain.apply_one(&mut data).is_err());
180	}
181
182	#[tokio::test]
183	async fn chain_as_transform_hook() {
184		let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
185		let input = vec![
186			test_envelope(serde_json::json!({"a": 1})),
187			test_envelope(serde_json::json!({"b": 2})),
188		];
189		let output = chain.transform(input).await.unwrap();
190		assert_eq!(output.len(), 2);
191	}
192
193	#[tokio::test]
194	async fn chain_hook_filters_records() {
195		let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
196		let input = vec![
197			test_envelope(serde_json::json!({"a": 1})),
198			test_envelope(serde_json::json!({"b": 2})),
199		];
200		let output = chain.transform(input).await.unwrap();
201		assert!(output.is_empty());
202	}
203
204	#[tokio::test]
205	async fn chain_hook_propagates_error() {
206		let chain = StepChain::new(vec![Box::new(FailStep)]);
207		let input = vec![test_envelope(serde_json::json!({}))];
208		let result = chain.transform(input).await;
209		assert!(result.is_err());
210	}
211
212	// ── filter_rows tests ───────────────────────────────────────
213
214	use oversync_core::model::RawRow;
215
216	fn test_rows() -> Vec<RawRow> {
217		vec![
218			RawRow {
219				row_key: "1".into(),
220				row_data: serde_json::json!({"name": "alice"}),
221			},
222			RawRow {
223				row_key: "2".into(),
224				row_data: serde_json::json!({"name": "bob"}),
225			},
226			RawRow {
227				row_key: "3".into(),
228				row_data: serde_json::json!({"name": "charlie"}),
229			},
230		]
231	}
232
233	#[test]
234	fn filter_rows_keeps_all() {
235		let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
236		let result = chain.filter_rows(test_rows()).unwrap();
237		assert_eq!(result.len(), 3);
238	}
239
240	#[test]
241	fn filter_rows_drops_all() {
242		let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
243		let result = chain.filter_rows(test_rows()).unwrap();
244		assert!(result.is_empty());
245	}
246
247	#[test]
248	fn filter_rows_empty_chain_keeps_all() {
249		let chain = StepChain::new(vec![]);
250		let result = chain.filter_rows(test_rows()).unwrap();
251		assert_eq!(result.len(), 3);
252	}
253
254	#[test]
255	fn filter_rows_error_propagates() {
256		let chain = StepChain::new(vec![Box::new(FailStep)]);
257		let result = chain.filter_rows(test_rows());
258		assert!(result.is_err());
259	}
260
261	#[test]
262	fn filter_rows_partial() {
263		use crate::steps::{Filter, FilterOp};
264		let chain = StepChain::new(vec![Box::new(Filter {
265			field: "name".into(),
266			op: FilterOp::Eq,
267			value: serde_json::json!("alice"),
268		})]);
269		let result = chain.filter_rows(test_rows()).unwrap();
270		assert_eq!(result.len(), 1);
271		assert_eq!(result[0].row_key, "1");
272	}
273}