Skip to main content

oversync_transforms/
parse.rs

1use oversync_core::error::OversyncError;
2
3use crate::steps::*;
4use crate::{StepChain, TransformStep};
5
6/// Parse a JSON array of step definitions into a [`StepChain`].
7///
8/// Each element must be an object with a `type` field and step-specific parameters.
9///
10/// # Supported types
11///
12/// | type | params |
13/// |------|--------|
14/// | `rename` | `from`, `to` |
15/// | `set` | `field`, `value` |
16/// | `upper` | `field` |
17/// | `lower` | `field` |
18/// | `remove` | `field` |
19/// | `copy` | `from`, `to` |
20/// | `default` | `field`, `value` |
21/// | `filter` | `field`, `op`, `value` |
22/// | `map_value` | `field`, `mapping` |
23/// | `truncate` | `field`, `max_len` |
24/// | `nest` | `fields`, `into` |
25/// | `flatten` | `field` |
26/// | `hash` | `field` |
27/// | `coalesce` | `fields`, `into` |
28pub fn parse_steps(defs: &[serde_json::Value]) -> Result<StepChain, OversyncError> {
29	let mut steps: Vec<Box<dyn TransformStep>> = Vec::with_capacity(defs.len());
30
31	for (i, def) in defs.iter().enumerate() {
32		let obj = def
33			.as_object()
34			.ok_or_else(|| OversyncError::Config(format!("transform step {i}: expected object")))?;
35
36		let step_type = obj
37			.get("type")
38			.and_then(|v| v.as_str())
39			.ok_or_else(|| OversyncError::Config(format!("transform step {i}: missing 'type'")))?;
40
41		let step: Box<dyn TransformStep> = match step_type {
42			"rename" => Box::new(Rename {
43				from: req_str(obj, "from", i)?,
44				to: req_str(obj, "to", i)?,
45			}),
46			"set" => Box::new(Set {
47				field: req_str(obj, "field", i)?,
48				value: req_val(obj, "value", i)?,
49			}),
50			"upper" => Box::new(Upper {
51				field: req_str(obj, "field", i)?,
52			}),
53			"lower" => Box::new(Lower {
54				field: req_str(obj, "field", i)?,
55			}),
56			"remove" => Box::new(Remove {
57				field: req_str(obj, "field", i)?,
58			}),
59			"copy" => Box::new(Copy {
60				from: req_str(obj, "from", i)?,
61				to: req_str(obj, "to", i)?,
62			}),
63			"default" => Box::new(Default {
64				field: req_str(obj, "field", i)?,
65				value: req_val(obj, "value", i)?,
66			}),
67			"filter" => {
68				let op_str = req_str(obj, "op", i)?;
69				let op = match op_str.as_str() {
70					"eq" => FilterOp::Eq,
71					"ne" => FilterOp::Ne,
72					"gt" => FilterOp::Gt,
73					"gte" => FilterOp::Gte,
74					"lt" => FilterOp::Lt,
75					"lte" => FilterOp::Lte,
76					"contains" => FilterOp::Contains,
77					"exists" => FilterOp::Exists,
78					other => {
79						return Err(OversyncError::Config(format!(
80							"transform step {i}: unknown filter op '{other}'"
81						)));
82					}
83				};
84				Box::new(Filter {
85					field: req_str(obj, "field", i)?,
86					op,
87					value: obj.get("value").cloned().unwrap_or(serde_json::Value::Null),
88				})
89			}
90			"map_value" => {
91				let mapping = obj
92					.get("mapping")
93					.and_then(|v| v.as_object())
94					.ok_or_else(|| {
95						OversyncError::Config(format!(
96							"transform step {i}: 'map_value' requires 'mapping' object"
97						))
98					})?
99					.clone();
100				Box::new(MapValue {
101					field: req_str(obj, "field", i)?,
102					mapping,
103				})
104			}
105			"truncate" => {
106				let max_len = obj.get("max_len").and_then(|v| v.as_u64()).ok_or_else(|| {
107					OversyncError::Config(format!(
108						"transform step {i}: 'truncate' requires 'max_len' (integer)"
109					))
110				})? as usize;
111				Box::new(Truncate {
112					field: req_str(obj, "field", i)?,
113					max_len,
114				})
115			}
116			"nest" => Box::new(Nest {
117				fields: req_str_array(obj, "fields", i)?,
118				into: req_str(obj, "into", i)?,
119			}),
120			"flatten" => Box::new(Flatten {
121				field: req_str(obj, "field", i)?,
122			}),
123			"hash" => Box::new(Hash {
124				field: req_str(obj, "field", i)?,
125			}),
126			"coalesce" => Box::new(Coalesce {
127				fields: req_str_array(obj, "fields", i)?,
128				into: req_str(obj, "into", i)?,
129			}),
130			"schema_filter" => {
131				let allow = parse_regex_array(obj, "allow", i)?;
132				let deny = parse_regex_array(obj, "deny", i)?;
133				if allow.is_empty() && deny.is_empty() {
134					return Err(OversyncError::Config(format!(
135						"transform step {i}: 'schema_filter' requires at least 'allow' or 'deny'"
136					)));
137				}
138				Box::new(SchemaFilter {
139					field: req_str(obj, "field", i)?,
140					allow,
141					deny,
142				})
143			}
144			#[cfg(feature = "js")]
145			"js" => {
146				let function = req_str(obj, "function", i)?;
147				let name = obj.get("name").and_then(|v| v.as_str()).unwrap_or("js");
148				Box::new(crate::js::JsStep::new(name, &function)?)
149			}
150			other => {
151				return Err(OversyncError::Config(format!(
152					"transform step {i}: unknown type '{other}'"
153				)));
154			}
155		};
156
157		steps.push(step);
158	}
159
160	Ok(StepChain::new(steps))
161}
162
163fn req_str(
164	obj: &serde_json::Map<String, serde_json::Value>,
165	key: &str,
166	idx: usize,
167) -> Result<String, OversyncError> {
168	obj.get(key)
169		.and_then(|v| v.as_str())
170		.map(String::from)
171		.ok_or_else(|| {
172			OversyncError::Config(format!("transform step {idx}: missing '{key}' (string)"))
173		})
174}
175
176fn req_val(
177	obj: &serde_json::Map<String, serde_json::Value>,
178	key: &str,
179	idx: usize,
180) -> Result<serde_json::Value, OversyncError> {
181	obj.get(key)
182		.cloned()
183		.ok_or_else(|| OversyncError::Config(format!("transform step {idx}: missing '{key}'")))
184}
185
186fn req_str_array(
187	obj: &serde_json::Map<String, serde_json::Value>,
188	key: &str,
189	idx: usize,
190) -> Result<Vec<String>, OversyncError> {
191	let arr = obj.get(key).and_then(|v| v.as_array()).ok_or_else(|| {
192		OversyncError::Config(format!(
193			"transform step {idx}: missing '{key}' (string array)"
194		))
195	})?;
196	arr.iter()
197		.enumerate()
198		.map(|(j, v)| {
199			v.as_str().map(String::from).ok_or_else(|| {
200				OversyncError::Config(format!(
201					"transform step {idx}: '{key}[{j}]' must be a string"
202				))
203			})
204		})
205		.collect()
206}
207
208fn parse_regex_array(
209	obj: &serde_json::Map<String, serde_json::Value>,
210	key: &str,
211	idx: usize,
212) -> Result<Vec<regex::Regex>, OversyncError> {
213	let arr = match obj.get(key).and_then(|v| v.as_array()) {
214		Some(a) => a,
215		None => return Ok(vec![]),
216	};
217	arr.iter()
218		.enumerate()
219		.map(|(j, v)| {
220			let pattern = v.as_str().ok_or_else(|| {
221				OversyncError::Config(format!(
222					"transform step {idx}: '{key}[{j}]' must be a string"
223				))
224			})?;
225			regex::Regex::new(pattern).map_err(|e| {
226				OversyncError::Config(format!(
227					"transform step {idx}: invalid regex '{pattern}' in '{key}': {e}"
228				))
229			})
230		})
231		.collect()
232}
233
234#[cfg(test)]
235mod tests {
236	use super::*;
237
238	#[test]
239	fn parse_empty_array() {
240		let chain = parse_steps(&[]).unwrap();
241		assert!(chain.is_empty());
242	}
243
244	#[test]
245	fn parse_rename_step() {
246		let defs = vec![serde_json::json!({"type": "rename", "from": "old", "to": "new"})];
247		let chain = parse_steps(&defs).unwrap();
248		assert_eq!(chain.len(), 1);
249
250		let mut data = serde_json::json!({"old": "value"});
251		chain.apply_one(&mut data).unwrap();
252		assert_eq!(data, serde_json::json!({"new": "value"}));
253	}
254
255	#[test]
256	fn parse_filter_step() {
257		let defs = vec![serde_json::json!({
258			"type": "filter",
259			"field": "status",
260			"op": "eq",
261			"value": "active"
262		})];
263		let chain = parse_steps(&defs).unwrap();
264
265		let mut keep = serde_json::json!({"status": "active"});
266		assert!(chain.apply_one(&mut keep).unwrap());
267
268		let mut drop = serde_json::json!({"status": "deleted"});
269		assert!(!chain.apply_one(&mut drop).unwrap());
270	}
271
272	#[test]
273	fn parse_multi_step_chain() {
274		let defs = vec![
275			serde_json::json!({"type": "rename", "from": "entity_id", "to": "id"}),
276			serde_json::json!({"type": "upper", "field": "name"}),
277			serde_json::json!({"type": "set", "field": "version", "value": 1}),
278			serde_json::json!({"type": "remove", "field": "secret"}),
279		];
280		let chain = parse_steps(&defs).unwrap();
281		assert_eq!(chain.len(), 4);
282
283		let mut data = serde_json::json!({"entity_id": "123", "name": "alice", "secret": "pw"});
284		chain.apply_one(&mut data).unwrap();
285		assert_eq!(
286			data,
287			serde_json::json!({"id": "123", "name": "ALICE", "version": 1})
288		);
289	}
290
291	#[test]
292	fn parse_map_value_step() {
293		let defs = vec![serde_json::json!({
294			"type": "map_value",
295			"field": "op",
296			"mapping": {"D": "deleted", "U": "updated", "I": "inserted"}
297		})];
298		let chain = parse_steps(&defs).unwrap();
299		let mut data = serde_json::json!({"op": "D"});
300		chain.apply_one(&mut data).unwrap();
301		assert_eq!(data["op"], "deleted");
302	}
303
304	#[test]
305	fn parse_truncate_step() {
306		let defs = vec![serde_json::json!({"type": "truncate", "field": "desc", "max_len": 5})];
307		let chain = parse_steps(&defs).unwrap();
308		let mut data = serde_json::json!({"desc": "hello world"});
309		chain.apply_one(&mut data).unwrap();
310		assert_eq!(data["desc"], "hello");
311	}
312
313	#[test]
314	fn parse_nest_step() {
315		let defs = vec![serde_json::json!({
316			"type": "nest",
317			"fields": ["city", "zip"],
318			"into": "address"
319		})];
320		let chain = parse_steps(&defs).unwrap();
321		let mut data = serde_json::json!({"city": "NYC", "zip": "10001", "name": "alice"});
322		chain.apply_one(&mut data).unwrap();
323		assert_eq!(data["address"]["city"], "NYC");
324		assert!(!data.as_object().unwrap().contains_key("city"));
325	}
326
327	#[test]
328	fn parse_coalesce_step() {
329		let defs = vec![serde_json::json!({
330			"type": "coalesce",
331			"fields": ["a", "b"],
332			"into": "result"
333		})];
334		let chain = parse_steps(&defs).unwrap();
335		let mut data = serde_json::json!({"a": null, "b": 42});
336		chain.apply_one(&mut data).unwrap();
337		assert_eq!(data["result"], 42);
338	}
339
340	#[test]
341	fn parse_unknown_type_errors() {
342		let defs = vec![serde_json::json!({"type": "bogus"})];
343		let err = parse_steps(&defs).unwrap_err();
344		assert!(err.to_string().contains("unknown type 'bogus'"));
345	}
346
347	#[test]
348	fn parse_missing_type_errors() {
349		let defs = vec![serde_json::json!({"field": "x"})];
350		let err = parse_steps(&defs).unwrap_err();
351		assert!(err.to_string().contains("missing 'type'"));
352	}
353
354	#[test]
355	fn parse_missing_required_field_errors() {
356		let defs = vec![serde_json::json!({"type": "rename", "from": "x"})];
357		let err = parse_steps(&defs).unwrap_err();
358		assert!(err.to_string().contains("missing 'to'"));
359	}
360
361	#[test]
362	fn parse_unknown_filter_op_errors() {
363		let defs = vec![serde_json::json!({
364			"type": "filter",
365			"field": "x",
366			"op": "bogus",
367			"value": 1
368		})];
369		let err = parse_steps(&defs).unwrap_err();
370		assert!(err.to_string().contains("unknown filter op 'bogus'"));
371	}
372
373	#[test]
374	fn parse_all_step_types() {
375		let defs = vec![
376			serde_json::json!({"type": "rename", "from": "a", "to": "b"}),
377			serde_json::json!({"type": "set", "field": "x", "value": 1}),
378			serde_json::json!({"type": "upper", "field": "x"}),
379			serde_json::json!({"type": "lower", "field": "x"}),
380			serde_json::json!({"type": "remove", "field": "x"}),
381			serde_json::json!({"type": "copy", "from": "a", "to": "b"}),
382			serde_json::json!({"type": "default", "field": "x", "value": 0}),
383			serde_json::json!({"type": "filter", "field": "x", "op": "exists"}),
384			serde_json::json!({"type": "map_value", "field": "x", "mapping": {"a": "b"}}),
385			serde_json::json!({"type": "truncate", "field": "x", "max_len": 10}),
386			serde_json::json!({"type": "nest", "fields": ["a"], "into": "b"}),
387			serde_json::json!({"type": "flatten", "field": "x"}),
388			serde_json::json!({"type": "hash", "field": "x"}),
389			serde_json::json!({"type": "coalesce", "fields": ["a", "b"], "into": "c"}),
390			serde_json::json!({"type": "schema_filter", "field": "x", "allow": ["^public"]}),
391		];
392		let chain = parse_steps(&defs).unwrap();
393		assert_eq!(chain.len(), 15);
394	}
395
396	#[test]
397	fn parse_schema_filter_allow() {
398		let defs = vec![serde_json::json!({
399			"type": "schema_filter",
400			"field": "schema",
401			"allow": ["^public$", "^analytics$"]
402		})];
403		let chain = parse_steps(&defs).unwrap();
404
405		let mut keep = serde_json::json!({"schema": "public"});
406		assert!(chain.apply_one(&mut keep).unwrap());
407
408		let mut drop = serde_json::json!({"schema": "internal"});
409		assert!(!chain.apply_one(&mut drop).unwrap());
410	}
411
412	#[test]
413	fn parse_schema_filter_deny() {
414		let defs = vec![serde_json::json!({
415			"type": "schema_filter",
416			"field": "table",
417			"deny": ["^pg_catalog", "^information_schema"]
418		})];
419		let chain = parse_steps(&defs).unwrap();
420
421		let mut keep = serde_json::json!({"table": "public.users"});
422		assert!(chain.apply_one(&mut keep).unwrap());
423
424		let mut drop = serde_json::json!({"table": "pg_catalog.pg_class"});
425		assert!(!chain.apply_one(&mut drop).unwrap());
426	}
427
428	#[test]
429	fn parse_schema_filter_requires_allow_or_deny() {
430		let defs = vec![serde_json::json!({
431			"type": "schema_filter",
432			"field": "x"
433		})];
434		let err = parse_steps(&defs).unwrap_err();
435		assert!(err.to_string().contains("requires at least"));
436	}
437
438	#[test]
439	fn parse_schema_filter_invalid_regex_errors() {
440		let defs = vec![serde_json::json!({
441			"type": "schema_filter",
442			"field": "x",
443			"allow": ["[invalid"]
444		})];
445		let err = parse_steps(&defs).unwrap_err();
446		assert!(err.to_string().contains("invalid regex"));
447	}
448}