Skip to main content

oversync_transforms/
lib.rs

1mod parse;
2pub mod steps;
3#[cfg(feature = "wasm")]
4pub mod wasm;
5
6use async_trait::async_trait;
7use oversync_core::error::OversyncError;
8use oversync_core::model::EventEnvelope;
9use oversync_core::traits::TransformHook;
10
11pub use parse::parse_steps;
12pub use steps::*;
13
14/// A single transform operation applied to one record's data.
15///
16/// Each step receives a mutable reference to the record's JSON `data` field
17/// and returns `Ok(true)` to keep the record or `Ok(false)` to filter it out.
18pub trait TransformStep: Send + Sync {
19	fn apply(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError>;
20
21	fn step_name(&self) -> &str;
22}
23
24/// Ordered chain of [`TransformStep`]s applied sequentially to each record.
25///
26/// If any step returns `Ok(false)`, the record is dropped from the output.
27/// If any step returns `Err`, the entire chain short-circuits.
28pub struct StepChain {
29	steps: Vec<Box<dyn TransformStep>>,
30}
31
32impl std::fmt::Debug for StepChain {
33	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34		let names: Vec<&str> = self.steps.iter().map(|s| s.step_name()).collect();
35		f.debug_struct("StepChain").field("steps", &names).finish()
36	}
37}
38
39impl StepChain {
40	pub fn new(steps: Vec<Box<dyn TransformStep>>) -> Self {
41		Self { steps }
42	}
43
44	pub fn apply_one(&self, data: &mut serde_json::Value) -> Result<bool, OversyncError> {
45		for step in &self.steps {
46			if !step.apply(data)? {
47				return Ok(false);
48			}
49		}
50		Ok(true)
51	}
52
53	pub fn len(&self) -> usize {
54		self.steps.len()
55	}
56
57	pub fn is_empty(&self) -> bool {
58		self.steps.is_empty()
59	}
60
61	/// Filter rows pre-delta: applies steps to each RawRow.row_data,
62	/// keeping only rows where all steps return true.
63	pub fn filter_rows(
64		&self,
65		rows: Vec<oversync_core::model::RawRow>,
66	) -> Result<Vec<oversync_core::model::RawRow>, OversyncError> {
67		let mut kept = Vec::with_capacity(rows.len());
68		for mut row in rows {
69			if self.apply_one(&mut row.row_data)? {
70				kept.push(row);
71			}
72		}
73		Ok(kept)
74	}
75}
76
77#[async_trait]
78impl TransformHook for StepChain {
79	async fn transform(
80		&self,
81		envelopes: Vec<EventEnvelope>,
82	) -> Result<Vec<EventEnvelope>, OversyncError> {
83		let mut output = Vec::with_capacity(envelopes.len());
84		for mut envelope in envelopes {
85			if self.apply_one(&mut envelope.data)? {
86				output.push(envelope);
87			}
88		}
89		Ok(output)
90	}
91}
92
93#[cfg(test)]
94mod tests {
95	use super::*;
96	use oversync_core::model::{EventMeta, OpType};
97
98	fn test_envelope(data: serde_json::Value) -> EventEnvelope {
99		EventEnvelope {
100			meta: EventMeta {
101				op: OpType::Created,
102				origin_id: "test".into(),
103				query_id: "q".into(),
104				key: "k".into(),
105				hash: "h".into(),
106				cycle_id: 1,
107				timestamp: chrono::Utc::now(),
108			},
109			data,
110		}
111	}
112
113	struct AlwaysKeep;
114	impl TransformStep for AlwaysKeep {
115		fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
116			Ok(true)
117		}
118		fn step_name(&self) -> &str {
119			"always_keep"
120		}
121	}
122
123	struct AlwaysDrop;
124	impl TransformStep for AlwaysDrop {
125		fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
126			Ok(false)
127		}
128		fn step_name(&self) -> &str {
129			"always_drop"
130		}
131	}
132
133	struct FailStep;
134	impl TransformStep for FailStep {
135		fn apply(&self, _data: &mut serde_json::Value) -> Result<bool, OversyncError> {
136			Err(OversyncError::Internal("step failed".into()))
137		}
138		fn step_name(&self) -> &str {
139			"fail"
140		}
141	}
142
143	#[test]
144	fn empty_chain_is_passthrough() {
145		let chain = StepChain::new(vec![]);
146		assert!(chain.is_empty());
147		let mut data = serde_json::json!({"x": 1});
148		assert!(chain.apply_one(&mut data).unwrap());
149		assert_eq!(data, serde_json::json!({"x": 1}));
150	}
151
152	#[test]
153	fn chain_keeps_record() {
154		let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
155		let mut data = serde_json::json!({"x": 1});
156		assert!(chain.apply_one(&mut data).unwrap());
157	}
158
159	#[test]
160	fn chain_drops_record() {
161		let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
162		let mut data = serde_json::json!({"x": 1});
163		assert!(!chain.apply_one(&mut data).unwrap());
164	}
165
166	#[test]
167	fn chain_short_circuits_on_drop() {
168		let chain = StepChain::new(vec![Box::new(AlwaysDrop), Box::new(FailStep)]);
169		let mut data = serde_json::json!({});
170		assert!(!chain.apply_one(&mut data).unwrap());
171	}
172
173	#[test]
174	fn chain_short_circuits_on_error() {
175		let chain = StepChain::new(vec![Box::new(FailStep), Box::new(AlwaysKeep)]);
176		let mut data = serde_json::json!({});
177		assert!(chain.apply_one(&mut data).is_err());
178	}
179
180	#[tokio::test]
181	async fn chain_as_transform_hook() {
182		let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
183		let input = vec![
184			test_envelope(serde_json::json!({"a": 1})),
185			test_envelope(serde_json::json!({"b": 2})),
186		];
187		let output = chain.transform(input).await.unwrap();
188		assert_eq!(output.len(), 2);
189	}
190
191	#[tokio::test]
192	async fn chain_hook_filters_records() {
193		let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
194		let input = vec![
195			test_envelope(serde_json::json!({"a": 1})),
196			test_envelope(serde_json::json!({"b": 2})),
197		];
198		let output = chain.transform(input).await.unwrap();
199		assert!(output.is_empty());
200	}
201
202	#[tokio::test]
203	async fn chain_hook_propagates_error() {
204		let chain = StepChain::new(vec![Box::new(FailStep)]);
205		let input = vec![test_envelope(serde_json::json!({}))];
206		let result = chain.transform(input).await;
207		assert!(result.is_err());
208	}
209
210	// ── filter_rows tests ───────────────────────────────────────
211
212	use oversync_core::model::RawRow;
213
214	fn test_rows() -> Vec<RawRow> {
215		vec![
216			RawRow {
217				row_key: "1".into(),
218				row_data: serde_json::json!({"name": "alice"}),
219			},
220			RawRow {
221				row_key: "2".into(),
222				row_data: serde_json::json!({"name": "bob"}),
223			},
224			RawRow {
225				row_key: "3".into(),
226				row_data: serde_json::json!({"name": "charlie"}),
227			},
228		]
229	}
230
231	#[test]
232	fn filter_rows_keeps_all() {
233		let chain = StepChain::new(vec![Box::new(AlwaysKeep)]);
234		let result = chain.filter_rows(test_rows()).unwrap();
235		assert_eq!(result.len(), 3);
236	}
237
238	#[test]
239	fn filter_rows_drops_all() {
240		let chain = StepChain::new(vec![Box::new(AlwaysDrop)]);
241		let result = chain.filter_rows(test_rows()).unwrap();
242		assert!(result.is_empty());
243	}
244
245	#[test]
246	fn filter_rows_empty_chain_keeps_all() {
247		let chain = StepChain::new(vec![]);
248		let result = chain.filter_rows(test_rows()).unwrap();
249		assert_eq!(result.len(), 3);
250	}
251
252	#[test]
253	fn filter_rows_error_propagates() {
254		let chain = StepChain::new(vec![Box::new(FailStep)]);
255		let result = chain.filter_rows(test_rows());
256		assert!(result.is_err());
257	}
258
259	#[test]
260	fn filter_rows_partial() {
261		use crate::steps::{Filter, FilterOp};
262		let chain = StepChain::new(vec![Box::new(Filter {
263			field: "name".into(),
264			op: FilterOp::Eq,
265			value: serde_json::json!("alice"),
266		})]);
267		let result = chain.filter_rows(test_rows()).unwrap();
268		assert_eq!(result.len(), 1);
269		assert_eq!(result[0].row_key, "1");
270	}
271}