Skip to main content

oversync_transforms/
parse.rs

1use oversync_core::error::OversyncError;
2
3use crate::steps::*;
4use crate::{StepChain, TransformStep};
5
6/// Parse a JSON array of step definitions into a [`StepChain`].
7///
8/// Each element must be an object with a `type` field and step-specific parameters.
9///
10/// # Supported types
11///
12/// | type | params |
13/// |------|--------|
14/// | `rename` | `from`, `to` |
15/// | `set` | `field`, `value` |
16/// | `upper` | `field` |
17/// | `lower` | `field` |
18/// | `remove` | `field` |
19/// | `copy` | `from`, `to` |
20/// | `default` | `field`, `value` |
21/// | `filter` | `field`, `op`, `value` |
22/// | `map_value` | `field`, `mapping` |
23/// | `truncate` | `field`, `max_len` |
24/// | `nest` | `fields`, `into` |
25/// | `flatten` | `field` |
26/// | `hash` | `field` |
27/// | `coalesce` | `fields`, `into` |
28pub fn parse_steps(defs: &[serde_json::Value]) -> Result<StepChain, OversyncError> {
29	let mut steps: Vec<Box<dyn TransformStep>> = Vec::with_capacity(defs.len());
30
31	for (i, def) in defs.iter().enumerate() {
32		let obj = def
33			.as_object()
34			.ok_or_else(|| OversyncError::Config(format!("transform step {i}: expected object")))?;
35
36		let step_type = obj
37			.get("type")
38			.and_then(|v| v.as_str())
39			.ok_or_else(|| OversyncError::Config(format!("transform step {i}: missing 'type'")))?;
40
41		let step: Box<dyn TransformStep> = match step_type {
42			"rename" => Box::new(Rename {
43				from: req_str(obj, "from", i)?,
44				to: req_str(obj, "to", i)?,
45			}),
46			"set" => Box::new(Set {
47				field: req_str(obj, "field", i)?,
48				value: req_val(obj, "value", i)?,
49			}),
50			"upper" => Box::new(Upper {
51				field: req_str(obj, "field", i)?,
52			}),
53			"lower" => Box::new(Lower {
54				field: req_str(obj, "field", i)?,
55			}),
56			"remove" => Box::new(Remove {
57				field: req_str(obj, "field", i)?,
58			}),
59			"copy" => Box::new(Copy {
60				from: req_str(obj, "from", i)?,
61				to: req_str(obj, "to", i)?,
62			}),
63			"default" => Box::new(Default {
64				field: req_str(obj, "field", i)?,
65				value: req_val(obj, "value", i)?,
66			}),
67			"filter" => {
68				let op_str = req_str(obj, "op", i)?;
69				let op = match op_str.as_str() {
70					"eq" => FilterOp::Eq,
71					"ne" => FilterOp::Ne,
72					"gt" => FilterOp::Gt,
73					"gte" => FilterOp::Gte,
74					"lt" => FilterOp::Lt,
75					"lte" => FilterOp::Lte,
76					"contains" => FilterOp::Contains,
77					"exists" => FilterOp::Exists,
78					other => {
79						return Err(OversyncError::Config(format!(
80							"transform step {i}: unknown filter op '{other}'"
81						)));
82					}
83				};
84				Box::new(Filter {
85					field: req_str(obj, "field", i)?,
86					op,
87					value: obj.get("value").cloned().unwrap_or(serde_json::Value::Null),
88				})
89			}
90			"map_value" => {
91				let mapping = obj
92					.get("mapping")
93					.and_then(|v| v.as_object())
94					.ok_or_else(|| {
95						OversyncError::Config(format!(
96							"transform step {i}: 'map_value' requires 'mapping' object"
97						))
98					})?
99					.clone();
100				Box::new(MapValue {
101					field: req_str(obj, "field", i)?,
102					mapping,
103				})
104			}
105			"truncate" => {
106				let max_len = obj.get("max_len").and_then(|v| v.as_u64()).ok_or_else(|| {
107					OversyncError::Config(format!(
108						"transform step {i}: 'truncate' requires 'max_len' (integer)"
109					))
110				})? as usize;
111				Box::new(Truncate {
112					field: req_str(obj, "field", i)?,
113					max_len,
114				})
115			}
116			"nest" => Box::new(Nest {
117				fields: req_str_array(obj, "fields", i)?,
118				into: req_str(obj, "into", i)?,
119			}),
120			"flatten" => Box::new(Flatten {
121				field: req_str(obj, "field", i)?,
122			}),
123			"hash" => Box::new(Hash {
124				field: req_str(obj, "field", i)?,
125			}),
126			"coalesce" => Box::new(Coalesce {
127				fields: req_str_array(obj, "fields", i)?,
128				into: req_str(obj, "into", i)?,
129			}),
130			"schema_filter" => {
131				let allow = parse_regex_array(obj, "allow", i)?;
132				let deny = parse_regex_array(obj, "deny", i)?;
133				if allow.is_empty() && deny.is_empty() {
134					return Err(OversyncError::Config(format!(
135						"transform step {i}: 'schema_filter' requires at least 'allow' or 'deny'"
136					)));
137				}
138				Box::new(SchemaFilter {
139					field: req_str(obj, "field", i)?,
140					allow,
141					deny,
142				})
143			}
144			other => {
145				return Err(OversyncError::Config(format!(
146					"transform step {i}: unknown type '{other}'"
147				)));
148			}
149		};
150
151		steps.push(step);
152	}
153
154	Ok(StepChain::new(steps))
155}
156
157fn req_str(
158	obj: &serde_json::Map<String, serde_json::Value>,
159	key: &str,
160	idx: usize,
161) -> Result<String, OversyncError> {
162	obj.get(key)
163		.and_then(|v| v.as_str())
164		.map(String::from)
165		.ok_or_else(|| {
166			OversyncError::Config(format!("transform step {idx}: missing '{key}' (string)"))
167		})
168}
169
170fn req_val(
171	obj: &serde_json::Map<String, serde_json::Value>,
172	key: &str,
173	idx: usize,
174) -> Result<serde_json::Value, OversyncError> {
175	obj.get(key)
176		.cloned()
177		.ok_or_else(|| OversyncError::Config(format!("transform step {idx}: missing '{key}'")))
178}
179
180fn req_str_array(
181	obj: &serde_json::Map<String, serde_json::Value>,
182	key: &str,
183	idx: usize,
184) -> Result<Vec<String>, OversyncError> {
185	let arr = obj
186		.get(key)
187		.and_then(|v| v.as_array())
188		.ok_or_else(|| {
189			OversyncError::Config(format!(
190				"transform step {idx}: missing '{key}' (string array)"
191			))
192		})?;
193	arr.iter()
194		.enumerate()
195		.map(|(j, v)| {
196			v.as_str()
197				.map(String::from)
198				.ok_or_else(|| {
199					OversyncError::Config(format!(
200						"transform step {idx}: '{key}[{j}]' must be a string"
201					))
202				})
203		})
204		.collect()
205}
206
207fn parse_regex_array(
208	obj: &serde_json::Map<String, serde_json::Value>,
209	key: &str,
210	idx: usize,
211) -> Result<Vec<regex::Regex>, OversyncError> {
212	let arr = match obj.get(key).and_then(|v| v.as_array()) {
213		Some(a) => a,
214		None => return Ok(vec![]),
215	};
216	arr.iter()
217		.enumerate()
218		.map(|(j, v)| {
219			let pattern = v.as_str().ok_or_else(|| {
220				OversyncError::Config(format!(
221					"transform step {idx}: '{key}[{j}]' must be a string"
222				))
223			})?;
224			regex::Regex::new(pattern).map_err(|e| {
225				OversyncError::Config(format!(
226					"transform step {idx}: invalid regex '{pattern}' in '{key}': {e}"
227				))
228			})
229		})
230		.collect()
231}
232
233#[cfg(test)]
234mod tests {
235	use super::*;
236
237	#[test]
238	fn parse_empty_array() {
239		let chain = parse_steps(&[]).unwrap();
240		assert!(chain.is_empty());
241	}
242
243	#[test]
244	fn parse_rename_step() {
245		let defs = vec![serde_json::json!({"type": "rename", "from": "old", "to": "new"})];
246		let chain = parse_steps(&defs).unwrap();
247		assert_eq!(chain.len(), 1);
248
249		let mut data = serde_json::json!({"old": "value"});
250		chain.apply_one(&mut data).unwrap();
251		assert_eq!(data, serde_json::json!({"new": "value"}));
252	}
253
254	#[test]
255	fn parse_filter_step() {
256		let defs = vec![serde_json::json!({
257			"type": "filter",
258			"field": "status",
259			"op": "eq",
260			"value": "active"
261		})];
262		let chain = parse_steps(&defs).unwrap();
263
264		let mut keep = serde_json::json!({"status": "active"});
265		assert!(chain.apply_one(&mut keep).unwrap());
266
267		let mut drop = serde_json::json!({"status": "deleted"});
268		assert!(!chain.apply_one(&mut drop).unwrap());
269	}
270
271	#[test]
272	fn parse_multi_step_chain() {
273		let defs = vec![
274			serde_json::json!({"type": "rename", "from": "entity_id", "to": "id"}),
275			serde_json::json!({"type": "upper", "field": "name"}),
276			serde_json::json!({"type": "set", "field": "version", "value": 1}),
277			serde_json::json!({"type": "remove", "field": "secret"}),
278		];
279		let chain = parse_steps(&defs).unwrap();
280		assert_eq!(chain.len(), 4);
281
282		let mut data = serde_json::json!({"entity_id": "123", "name": "alice", "secret": "pw"});
283		chain.apply_one(&mut data).unwrap();
284		assert_eq!(
285			data,
286			serde_json::json!({"id": "123", "name": "ALICE", "version": 1})
287		);
288	}
289
290	#[test]
291	fn parse_map_value_step() {
292		let defs = vec![serde_json::json!({
293			"type": "map_value",
294			"field": "op",
295			"mapping": {"D": "deleted", "U": "updated", "I": "inserted"}
296		})];
297		let chain = parse_steps(&defs).unwrap();
298		let mut data = serde_json::json!({"op": "D"});
299		chain.apply_one(&mut data).unwrap();
300		assert_eq!(data["op"], "deleted");
301	}
302
303	#[test]
304	fn parse_truncate_step() {
305		let defs = vec![serde_json::json!({"type": "truncate", "field": "desc", "max_len": 5})];
306		let chain = parse_steps(&defs).unwrap();
307		let mut data = serde_json::json!({"desc": "hello world"});
308		chain.apply_one(&mut data).unwrap();
309		assert_eq!(data["desc"], "hello");
310	}
311
312	#[test]
313	fn parse_nest_step() {
314		let defs = vec![serde_json::json!({
315			"type": "nest",
316			"fields": ["city", "zip"],
317			"into": "address"
318		})];
319		let chain = parse_steps(&defs).unwrap();
320		let mut data = serde_json::json!({"city": "NYC", "zip": "10001", "name": "alice"});
321		chain.apply_one(&mut data).unwrap();
322		assert_eq!(data["address"]["city"], "NYC");
323		assert!(!data.as_object().unwrap().contains_key("city"));
324	}
325
326	#[test]
327	fn parse_coalesce_step() {
328		let defs = vec![serde_json::json!({
329			"type": "coalesce",
330			"fields": ["a", "b"],
331			"into": "result"
332		})];
333		let chain = parse_steps(&defs).unwrap();
334		let mut data = serde_json::json!({"a": null, "b": 42});
335		chain.apply_one(&mut data).unwrap();
336		assert_eq!(data["result"], 42);
337	}
338
339	#[test]
340	fn parse_unknown_type_errors() {
341		let defs = vec![serde_json::json!({"type": "bogus"})];
342		let err = parse_steps(&defs).unwrap_err();
343		assert!(err.to_string().contains("unknown type 'bogus'"));
344	}
345
346	#[test]
347	fn parse_missing_type_errors() {
348		let defs = vec![serde_json::json!({"field": "x"})];
349		let err = parse_steps(&defs).unwrap_err();
350		assert!(err.to_string().contains("missing 'type'"));
351	}
352
353	#[test]
354	fn parse_missing_required_field_errors() {
355		let defs = vec![serde_json::json!({"type": "rename", "from": "x"})];
356		let err = parse_steps(&defs).unwrap_err();
357		assert!(err.to_string().contains("missing 'to'"));
358	}
359
360	#[test]
361	fn parse_unknown_filter_op_errors() {
362		let defs = vec![serde_json::json!({
363			"type": "filter",
364			"field": "x",
365			"op": "bogus",
366			"value": 1
367		})];
368		let err = parse_steps(&defs).unwrap_err();
369		assert!(err.to_string().contains("unknown filter op 'bogus'"));
370	}
371
372	#[test]
373	fn parse_all_step_types() {
374		let defs = vec![
375			serde_json::json!({"type": "rename", "from": "a", "to": "b"}),
376			serde_json::json!({"type": "set", "field": "x", "value": 1}),
377			serde_json::json!({"type": "upper", "field": "x"}),
378			serde_json::json!({"type": "lower", "field": "x"}),
379			serde_json::json!({"type": "remove", "field": "x"}),
380			serde_json::json!({"type": "copy", "from": "a", "to": "b"}),
381			serde_json::json!({"type": "default", "field": "x", "value": 0}),
382			serde_json::json!({"type": "filter", "field": "x", "op": "exists"}),
383			serde_json::json!({"type": "map_value", "field": "x", "mapping": {"a": "b"}}),
384			serde_json::json!({"type": "truncate", "field": "x", "max_len": 10}),
385			serde_json::json!({"type": "nest", "fields": ["a"], "into": "b"}),
386			serde_json::json!({"type": "flatten", "field": "x"}),
387			serde_json::json!({"type": "hash", "field": "x"}),
388			serde_json::json!({"type": "coalesce", "fields": ["a", "b"], "into": "c"}),
389			serde_json::json!({"type": "schema_filter", "field": "x", "allow": ["^public"]}),
390		];
391		let chain = parse_steps(&defs).unwrap();
392		assert_eq!(chain.len(), 15);
393	}
394
395	#[test]
396	fn parse_schema_filter_allow() {
397		let defs = vec![serde_json::json!({
398			"type": "schema_filter",
399			"field": "schema",
400			"allow": ["^public$", "^analytics$"]
401		})];
402		let chain = parse_steps(&defs).unwrap();
403
404		let mut keep = serde_json::json!({"schema": "public"});
405		assert!(chain.apply_one(&mut keep).unwrap());
406
407		let mut drop = serde_json::json!({"schema": "internal"});
408		assert!(!chain.apply_one(&mut drop).unwrap());
409	}
410
411	#[test]
412	fn parse_schema_filter_deny() {
413		let defs = vec![serde_json::json!({
414			"type": "schema_filter",
415			"field": "table",
416			"deny": ["^pg_catalog", "^information_schema"]
417		})];
418		let chain = parse_steps(&defs).unwrap();
419
420		let mut keep = serde_json::json!({"table": "public.users"});
421		assert!(chain.apply_one(&mut keep).unwrap());
422
423		let mut drop = serde_json::json!({"table": "pg_catalog.pg_class"});
424		assert!(!chain.apply_one(&mut drop).unwrap());
425	}
426
427	#[test]
428	fn parse_schema_filter_requires_allow_or_deny() {
429		let defs = vec![serde_json::json!({
430			"type": "schema_filter",
431			"field": "x"
432		})];
433		let err = parse_steps(&defs).unwrap_err();
434		assert!(err.to_string().contains("requires at least"));
435	}
436
437	#[test]
438	fn parse_schema_filter_invalid_regex_errors() {
439		let defs = vec![serde_json::json!({
440			"type": "schema_filter",
441			"field": "x",
442			"allow": ["[invalid"]
443		})];
444		let err = parse_steps(&defs).unwrap_err();
445		assert!(err.to_string().contains("invalid regex"));
446	}
447}