Skip to main content

oversync_transforms/
parse.rs

1use oversync_core::error::OversyncError;
2
3use crate::steps::*;
4use crate::{StepChain, TransformStep};
5
6/// Parse a JSON array of step definitions into a [`StepChain`].
7///
8/// Each element must be an object with a `type` field and step-specific parameters.
9///
10/// # Supported types
11///
12/// | type | params |
13/// |------|--------|
14/// | `rename` | `from`, `to` |
15/// | `set` | `field`, `value` |
16/// | `upper` | `field` |
17/// | `lower` | `field` |
18/// | `remove` | `field` |
19/// | `copy` | `from`, `to` |
20/// | `default` | `field`, `value` |
21/// | `filter` | `field`, `op`, `value` |
22/// | `map_value` | `field`, `mapping` |
23/// | `truncate` | `field`, `max_len` |
24/// | `nest` | `fields`, `into` |
25/// | `flatten` | `field` |
26/// | `hash` | `field` |
27/// | `coalesce` | `fields`, `into` |
28pub fn parse_steps(defs: &[serde_json::Value]) -> Result<StepChain, OversyncError> {
29	let mut steps: Vec<Box<dyn TransformStep>> = Vec::with_capacity(defs.len());
30
31	for (i, def) in defs.iter().enumerate() {
32		let obj = def
33			.as_object()
34			.ok_or_else(|| OversyncError::Config(format!("transform step {i}: expected object")))?;
35
36		let step_type = obj
37			.get("type")
38			.and_then(|v| v.as_str())
39			.ok_or_else(|| OversyncError::Config(format!("transform step {i}: missing 'type'")))?;
40
41		let step: Box<dyn TransformStep> = match step_type {
42			"rename" => Box::new(Rename {
43				from: req_str(obj, "from", i)?,
44				to: req_str(obj, "to", i)?,
45			}),
46			"set" => Box::new(Set {
47				field: req_str(obj, "field", i)?,
48				value: req_val(obj, "value", i)?,
49			}),
50			"upper" => Box::new(Upper {
51				field: req_str(obj, "field", i)?,
52			}),
53			"lower" => Box::new(Lower {
54				field: req_str(obj, "field", i)?,
55			}),
56			"remove" => Box::new(Remove {
57				field: req_str(obj, "field", i)?,
58			}),
59			"copy" => Box::new(Copy {
60				from: req_str(obj, "from", i)?,
61				to: req_str(obj, "to", i)?,
62			}),
63			"default" => Box::new(Default {
64				field: req_str(obj, "field", i)?,
65				value: req_val(obj, "value", i)?,
66			}),
67			"filter" => {
68				let op_str = req_str(obj, "op", i)?;
69				let op = match op_str.as_str() {
70					"eq" => FilterOp::Eq,
71					"ne" => FilterOp::Ne,
72					"gt" => FilterOp::Gt,
73					"gte" => FilterOp::Gte,
74					"lt" => FilterOp::Lt,
75					"lte" => FilterOp::Lte,
76					"contains" => FilterOp::Contains,
77					"exists" => FilterOp::Exists,
78					other => {
79						return Err(OversyncError::Config(format!(
80							"transform step {i}: unknown filter op '{other}'"
81						)));
82					}
83				};
84				Box::new(Filter {
85					field: req_str(obj, "field", i)?,
86					op,
87					value: obj.get("value").cloned().unwrap_or(serde_json::Value::Null),
88				})
89			}
90			"map_value" => {
91				let mapping = obj
92					.get("mapping")
93					.and_then(|v| v.as_object())
94					.ok_or_else(|| {
95						OversyncError::Config(format!(
96							"transform step {i}: 'map_value' requires 'mapping' object"
97						))
98					})?
99					.clone();
100				Box::new(MapValue {
101					field: req_str(obj, "field", i)?,
102					mapping,
103				})
104			}
105			"truncate" => {
106				let max_len = obj.get("max_len").and_then(|v| v.as_u64()).ok_or_else(|| {
107					OversyncError::Config(format!(
108						"transform step {i}: 'truncate' requires 'max_len' (integer)"
109					))
110				})? as usize;
111				Box::new(Truncate {
112					field: req_str(obj, "field", i)?,
113					max_len,
114				})
115			}
116			"nest" => Box::new(Nest {
117				fields: req_str_array(obj, "fields", i)?,
118				into: req_str(obj, "into", i)?,
119			}),
120			"flatten" => Box::new(Flatten {
121				field: req_str(obj, "field", i)?,
122			}),
123			"hash" => Box::new(Hash {
124				field: req_str(obj, "field", i)?,
125			}),
126			"coalesce" => Box::new(Coalesce {
127				fields: req_str_array(obj, "fields", i)?,
128				into: req_str(obj, "into", i)?,
129			}),
130			"schema_filter" => {
131				let allow = parse_regex_array(obj, "allow", i)?;
132				let deny = parse_regex_array(obj, "deny", i)?;
133				if allow.is_empty() && deny.is_empty() {
134					return Err(OversyncError::Config(format!(
135						"transform step {i}: 'schema_filter' requires at least 'allow' or 'deny'"
136					)));
137				}
138				Box::new(SchemaFilter {
139					field: req_str(obj, "field", i)?,
140					allow,
141					deny,
142				})
143			}
144			other => {
145				return Err(OversyncError::Config(format!(
146					"transform step {i}: unknown type '{other}'"
147				)));
148			}
149		};
150
151		steps.push(step);
152	}
153
154	Ok(StepChain::new(steps))
155}
156
157fn req_str(
158	obj: &serde_json::Map<String, serde_json::Value>,
159	key: &str,
160	idx: usize,
161) -> Result<String, OversyncError> {
162	obj.get(key)
163		.and_then(|v| v.as_str())
164		.map(String::from)
165		.ok_or_else(|| {
166			OversyncError::Config(format!("transform step {idx}: missing '{key}' (string)"))
167		})
168}
169
170fn req_val(
171	obj: &serde_json::Map<String, serde_json::Value>,
172	key: &str,
173	idx: usize,
174) -> Result<serde_json::Value, OversyncError> {
175	obj.get(key)
176		.cloned()
177		.ok_or_else(|| OversyncError::Config(format!("transform step {idx}: missing '{key}'")))
178}
179
180fn req_str_array(
181	obj: &serde_json::Map<String, serde_json::Value>,
182	key: &str,
183	idx: usize,
184) -> Result<Vec<String>, OversyncError> {
185	let arr = obj.get(key).and_then(|v| v.as_array()).ok_or_else(|| {
186		OversyncError::Config(format!(
187			"transform step {idx}: missing '{key}' (string array)"
188		))
189	})?;
190	arr.iter()
191		.enumerate()
192		.map(|(j, v)| {
193			v.as_str().map(String::from).ok_or_else(|| {
194				OversyncError::Config(format!(
195					"transform step {idx}: '{key}[{j}]' must be a string"
196				))
197			})
198		})
199		.collect()
200}
201
202fn parse_regex_array(
203	obj: &serde_json::Map<String, serde_json::Value>,
204	key: &str,
205	idx: usize,
206) -> Result<Vec<regex::Regex>, OversyncError> {
207	let arr = match obj.get(key).and_then(|v| v.as_array()) {
208		Some(a) => a,
209		None => return Ok(vec![]),
210	};
211	arr.iter()
212		.enumerate()
213		.map(|(j, v)| {
214			let pattern = v.as_str().ok_or_else(|| {
215				OversyncError::Config(format!(
216					"transform step {idx}: '{key}[{j}]' must be a string"
217				))
218			})?;
219			regex::Regex::new(pattern).map_err(|e| {
220				OversyncError::Config(format!(
221					"transform step {idx}: invalid regex '{pattern}' in '{key}': {e}"
222				))
223			})
224		})
225		.collect()
226}
227
228#[cfg(test)]
229mod tests {
230	use super::*;
231
232	#[test]
233	fn parse_empty_array() {
234		let chain = parse_steps(&[]).unwrap();
235		assert!(chain.is_empty());
236	}
237
238	#[test]
239	fn parse_rename_step() {
240		let defs = vec![serde_json::json!({"type": "rename", "from": "old", "to": "new"})];
241		let chain = parse_steps(&defs).unwrap();
242		assert_eq!(chain.len(), 1);
243
244		let mut data = serde_json::json!({"old": "value"});
245		chain.apply_one(&mut data).unwrap();
246		assert_eq!(data, serde_json::json!({"new": "value"}));
247	}
248
249	#[test]
250	fn parse_filter_step() {
251		let defs = vec![serde_json::json!({
252			"type": "filter",
253			"field": "status",
254			"op": "eq",
255			"value": "active"
256		})];
257		let chain = parse_steps(&defs).unwrap();
258
259		let mut keep = serde_json::json!({"status": "active"});
260		assert!(chain.apply_one(&mut keep).unwrap());
261
262		let mut drop = serde_json::json!({"status": "deleted"});
263		assert!(!chain.apply_one(&mut drop).unwrap());
264	}
265
266	#[test]
267	fn parse_multi_step_chain() {
268		let defs = vec![
269			serde_json::json!({"type": "rename", "from": "entity_id", "to": "id"}),
270			serde_json::json!({"type": "upper", "field": "name"}),
271			serde_json::json!({"type": "set", "field": "version", "value": 1}),
272			serde_json::json!({"type": "remove", "field": "secret"}),
273		];
274		let chain = parse_steps(&defs).unwrap();
275		assert_eq!(chain.len(), 4);
276
277		let mut data = serde_json::json!({"entity_id": "123", "name": "alice", "secret": "pw"});
278		chain.apply_one(&mut data).unwrap();
279		assert_eq!(
280			data,
281			serde_json::json!({"id": "123", "name": "ALICE", "version": 1})
282		);
283	}
284
285	#[test]
286	fn parse_map_value_step() {
287		let defs = vec![serde_json::json!({
288			"type": "map_value",
289			"field": "op",
290			"mapping": {"D": "deleted", "U": "updated", "I": "inserted"}
291		})];
292		let chain = parse_steps(&defs).unwrap();
293		let mut data = serde_json::json!({"op": "D"});
294		chain.apply_one(&mut data).unwrap();
295		assert_eq!(data["op"], "deleted");
296	}
297
298	#[test]
299	fn parse_truncate_step() {
300		let defs = vec![serde_json::json!({"type": "truncate", "field": "desc", "max_len": 5})];
301		let chain = parse_steps(&defs).unwrap();
302		let mut data = serde_json::json!({"desc": "hello world"});
303		chain.apply_one(&mut data).unwrap();
304		assert_eq!(data["desc"], "hello");
305	}
306
307	#[test]
308	fn parse_nest_step() {
309		let defs = vec![serde_json::json!({
310			"type": "nest",
311			"fields": ["city", "zip"],
312			"into": "address"
313		})];
314		let chain = parse_steps(&defs).unwrap();
315		let mut data = serde_json::json!({"city": "NYC", "zip": "10001", "name": "alice"});
316		chain.apply_one(&mut data).unwrap();
317		assert_eq!(data["address"]["city"], "NYC");
318		assert!(!data.as_object().unwrap().contains_key("city"));
319	}
320
321	#[test]
322	fn parse_coalesce_step() {
323		let defs = vec![serde_json::json!({
324			"type": "coalesce",
325			"fields": ["a", "b"],
326			"into": "result"
327		})];
328		let chain = parse_steps(&defs).unwrap();
329		let mut data = serde_json::json!({"a": null, "b": 42});
330		chain.apply_one(&mut data).unwrap();
331		assert_eq!(data["result"], 42);
332	}
333
334	#[test]
335	fn parse_unknown_type_errors() {
336		let defs = vec![serde_json::json!({"type": "bogus"})];
337		let err = parse_steps(&defs).unwrap_err();
338		assert!(err.to_string().contains("unknown type 'bogus'"));
339	}
340
341	#[test]
342	fn parse_missing_type_errors() {
343		let defs = vec![serde_json::json!({"field": "x"})];
344		let err = parse_steps(&defs).unwrap_err();
345		assert!(err.to_string().contains("missing 'type'"));
346	}
347
348	#[test]
349	fn parse_missing_required_field_errors() {
350		let defs = vec![serde_json::json!({"type": "rename", "from": "x"})];
351		let err = parse_steps(&defs).unwrap_err();
352		assert!(err.to_string().contains("missing 'to'"));
353	}
354
355	#[test]
356	fn parse_unknown_filter_op_errors() {
357		let defs = vec![serde_json::json!({
358			"type": "filter",
359			"field": "x",
360			"op": "bogus",
361			"value": 1
362		})];
363		let err = parse_steps(&defs).unwrap_err();
364		assert!(err.to_string().contains("unknown filter op 'bogus'"));
365	}
366
367	#[test]
368	fn parse_all_step_types() {
369		let defs = vec![
370			serde_json::json!({"type": "rename", "from": "a", "to": "b"}),
371			serde_json::json!({"type": "set", "field": "x", "value": 1}),
372			serde_json::json!({"type": "upper", "field": "x"}),
373			serde_json::json!({"type": "lower", "field": "x"}),
374			serde_json::json!({"type": "remove", "field": "x"}),
375			serde_json::json!({"type": "copy", "from": "a", "to": "b"}),
376			serde_json::json!({"type": "default", "field": "x", "value": 0}),
377			serde_json::json!({"type": "filter", "field": "x", "op": "exists"}),
378			serde_json::json!({"type": "map_value", "field": "x", "mapping": {"a": "b"}}),
379			serde_json::json!({"type": "truncate", "field": "x", "max_len": 10}),
380			serde_json::json!({"type": "nest", "fields": ["a"], "into": "b"}),
381			serde_json::json!({"type": "flatten", "field": "x"}),
382			serde_json::json!({"type": "hash", "field": "x"}),
383			serde_json::json!({"type": "coalesce", "fields": ["a", "b"], "into": "c"}),
384			serde_json::json!({"type": "schema_filter", "field": "x", "allow": ["^public"]}),
385		];
386		let chain = parse_steps(&defs).unwrap();
387		assert_eq!(chain.len(), 15);
388	}
389
390	#[test]
391	fn parse_schema_filter_allow() {
392		let defs = vec![serde_json::json!({
393			"type": "schema_filter",
394			"field": "schema",
395			"allow": ["^public$", "^analytics$"]
396		})];
397		let chain = parse_steps(&defs).unwrap();
398
399		let mut keep = serde_json::json!({"schema": "public"});
400		assert!(chain.apply_one(&mut keep).unwrap());
401
402		let mut drop = serde_json::json!({"schema": "internal"});
403		assert!(!chain.apply_one(&mut drop).unwrap());
404	}
405
406	#[test]
407	fn parse_schema_filter_deny() {
408		let defs = vec![serde_json::json!({
409			"type": "schema_filter",
410			"field": "table",
411			"deny": ["^pg_catalog", "^information_schema"]
412		})];
413		let chain = parse_steps(&defs).unwrap();
414
415		let mut keep = serde_json::json!({"table": "public.users"});
416		assert!(chain.apply_one(&mut keep).unwrap());
417
418		let mut drop = serde_json::json!({"table": "pg_catalog.pg_class"});
419		assert!(!chain.apply_one(&mut drop).unwrap());
420	}
421
422	#[test]
423	fn parse_schema_filter_requires_allow_or_deny() {
424		let defs = vec![serde_json::json!({
425			"type": "schema_filter",
426			"field": "x"
427		})];
428		let err = parse_steps(&defs).unwrap_err();
429		assert!(err.to_string().contains("requires at least"));
430	}
431
432	#[test]
433	fn parse_schema_filter_invalid_regex_errors() {
434		let defs = vec![serde_json::json!({
435			"type": "schema_filter",
436			"field": "x",
437			"allow": ["[invalid"]
438		})];
439		let err = parse_steps(&defs).unwrap_err();
440		assert!(err.to_string().contains("invalid regex"));
441	}
442}