1use oversync_core::error::OversyncError;
2
3use crate::steps::*;
4use crate::{StepChain, TransformStep};
5
6pub fn parse_steps(defs: &[serde_json::Value]) -> Result<StepChain, OversyncError> {
29 let mut steps: Vec<Box<dyn TransformStep>> = Vec::with_capacity(defs.len());
30
31 for (i, def) in defs.iter().enumerate() {
32 let obj = def
33 .as_object()
34 .ok_or_else(|| OversyncError::Config(format!("transform step {i}: expected object")))?;
35
36 let step_type = obj
37 .get("type")
38 .and_then(|v| v.as_str())
39 .ok_or_else(|| OversyncError::Config(format!("transform step {i}: missing 'type'")))?;
40
41 let step: Box<dyn TransformStep> = match step_type {
42 "rename" => Box::new(Rename {
43 from: req_str(obj, "from", i)?,
44 to: req_str(obj, "to", i)?,
45 }),
46 "set" => Box::new(Set {
47 field: req_str(obj, "field", i)?,
48 value: req_val(obj, "value", i)?,
49 }),
50 "upper" => Box::new(Upper {
51 field: req_str(obj, "field", i)?,
52 }),
53 "lower" => Box::new(Lower {
54 field: req_str(obj, "field", i)?,
55 }),
56 "remove" => Box::new(Remove {
57 field: req_str(obj, "field", i)?,
58 }),
59 "copy" => Box::new(Copy {
60 from: req_str(obj, "from", i)?,
61 to: req_str(obj, "to", i)?,
62 }),
63 "default" => Box::new(Default {
64 field: req_str(obj, "field", i)?,
65 value: req_val(obj, "value", i)?,
66 }),
67 "filter" => {
68 let op_str = req_str(obj, "op", i)?;
69 let op = match op_str.as_str() {
70 "eq" => FilterOp::Eq,
71 "ne" => FilterOp::Ne,
72 "gt" => FilterOp::Gt,
73 "gte" => FilterOp::Gte,
74 "lt" => FilterOp::Lt,
75 "lte" => FilterOp::Lte,
76 "contains" => FilterOp::Contains,
77 "exists" => FilterOp::Exists,
78 other => {
79 return Err(OversyncError::Config(format!(
80 "transform step {i}: unknown filter op '{other}'"
81 )));
82 }
83 };
84 Box::new(Filter {
85 field: req_str(obj, "field", i)?,
86 op,
87 value: obj.get("value").cloned().unwrap_or(serde_json::Value::Null),
88 })
89 }
90 "map_value" => {
91 let mapping = obj
92 .get("mapping")
93 .and_then(|v| v.as_object())
94 .ok_or_else(|| {
95 OversyncError::Config(format!(
96 "transform step {i}: 'map_value' requires 'mapping' object"
97 ))
98 })?
99 .clone();
100 Box::new(MapValue {
101 field: req_str(obj, "field", i)?,
102 mapping,
103 })
104 }
105 "truncate" => {
106 let max_len = obj.get("max_len").and_then(|v| v.as_u64()).ok_or_else(|| {
107 OversyncError::Config(format!(
108 "transform step {i}: 'truncate' requires 'max_len' (integer)"
109 ))
110 })? as usize;
111 Box::new(Truncate {
112 field: req_str(obj, "field", i)?,
113 max_len,
114 })
115 }
116 "nest" => Box::new(Nest {
117 fields: req_str_array(obj, "fields", i)?,
118 into: req_str(obj, "into", i)?,
119 }),
120 "flatten" => Box::new(Flatten {
121 field: req_str(obj, "field", i)?,
122 }),
123 "hash" => Box::new(Hash {
124 field: req_str(obj, "field", i)?,
125 }),
126 "coalesce" => Box::new(Coalesce {
127 fields: req_str_array(obj, "fields", i)?,
128 into: req_str(obj, "into", i)?,
129 }),
130 "schema_filter" => {
131 let allow = parse_regex_array(obj, "allow", i)?;
132 let deny = parse_regex_array(obj, "deny", i)?;
133 if allow.is_empty() && deny.is_empty() {
134 return Err(OversyncError::Config(format!(
135 "transform step {i}: 'schema_filter' requires at least 'allow' or 'deny'"
136 )));
137 }
138 Box::new(SchemaFilter {
139 field: req_str(obj, "field", i)?,
140 allow,
141 deny,
142 })
143 }
144 #[cfg(feature = "js")]
145 "js" => {
146 let function = req_str(obj, "function", i)?;
147 let name = obj.get("name").and_then(|v| v.as_str()).unwrap_or("js");
148 Box::new(crate::js::JsStep::new(name, &function)?)
149 }
150 other => {
151 return Err(OversyncError::Config(format!(
152 "transform step {i}: unknown type '{other}'"
153 )));
154 }
155 };
156
157 steps.push(step);
158 }
159
160 Ok(StepChain::new(steps))
161}
162
163fn req_str(
164 obj: &serde_json::Map<String, serde_json::Value>,
165 key: &str,
166 idx: usize,
167) -> Result<String, OversyncError> {
168 obj.get(key)
169 .and_then(|v| v.as_str())
170 .map(String::from)
171 .ok_or_else(|| {
172 OversyncError::Config(format!("transform step {idx}: missing '{key}' (string)"))
173 })
174}
175
176fn req_val(
177 obj: &serde_json::Map<String, serde_json::Value>,
178 key: &str,
179 idx: usize,
180) -> Result<serde_json::Value, OversyncError> {
181 obj.get(key)
182 .cloned()
183 .ok_or_else(|| OversyncError::Config(format!("transform step {idx}: missing '{key}'")))
184}
185
186fn req_str_array(
187 obj: &serde_json::Map<String, serde_json::Value>,
188 key: &str,
189 idx: usize,
190) -> Result<Vec<String>, OversyncError> {
191 let arr = obj.get(key).and_then(|v| v.as_array()).ok_or_else(|| {
192 OversyncError::Config(format!(
193 "transform step {idx}: missing '{key}' (string array)"
194 ))
195 })?;
196 arr.iter()
197 .enumerate()
198 .map(|(j, v)| {
199 v.as_str().map(String::from).ok_or_else(|| {
200 OversyncError::Config(format!(
201 "transform step {idx}: '{key}[{j}]' must be a string"
202 ))
203 })
204 })
205 .collect()
206}
207
208fn parse_regex_array(
209 obj: &serde_json::Map<String, serde_json::Value>,
210 key: &str,
211 idx: usize,
212) -> Result<Vec<regex::Regex>, OversyncError> {
213 let arr = match obj.get(key).and_then(|v| v.as_array()) {
214 Some(a) => a,
215 None => return Ok(vec![]),
216 };
217 arr.iter()
218 .enumerate()
219 .map(|(j, v)| {
220 let pattern = v.as_str().ok_or_else(|| {
221 OversyncError::Config(format!(
222 "transform step {idx}: '{key}[{j}]' must be a string"
223 ))
224 })?;
225 regex::Regex::new(pattern).map_err(|e| {
226 OversyncError::Config(format!(
227 "transform step {idx}: invalid regex '{pattern}' in '{key}': {e}"
228 ))
229 })
230 })
231 .collect()
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn parse_empty_array() {
240 let chain = parse_steps(&[]).unwrap();
241 assert!(chain.is_empty());
242 }
243
244 #[test]
245 fn parse_rename_step() {
246 let defs = vec![serde_json::json!({"type": "rename", "from": "old", "to": "new"})];
247 let chain = parse_steps(&defs).unwrap();
248 assert_eq!(chain.len(), 1);
249
250 let mut data = serde_json::json!({"old": "value"});
251 chain.apply_one(&mut data).unwrap();
252 assert_eq!(data, serde_json::json!({"new": "value"}));
253 }
254
255 #[test]
256 fn parse_filter_step() {
257 let defs = vec![serde_json::json!({
258 "type": "filter",
259 "field": "status",
260 "op": "eq",
261 "value": "active"
262 })];
263 let chain = parse_steps(&defs).unwrap();
264
265 let mut keep = serde_json::json!({"status": "active"});
266 assert!(chain.apply_one(&mut keep).unwrap());
267
268 let mut drop = serde_json::json!({"status": "deleted"});
269 assert!(!chain.apply_one(&mut drop).unwrap());
270 }
271
272 #[test]
273 fn parse_multi_step_chain() {
274 let defs = vec![
275 serde_json::json!({"type": "rename", "from": "entity_id", "to": "id"}),
276 serde_json::json!({"type": "upper", "field": "name"}),
277 serde_json::json!({"type": "set", "field": "version", "value": 1}),
278 serde_json::json!({"type": "remove", "field": "secret"}),
279 ];
280 let chain = parse_steps(&defs).unwrap();
281 assert_eq!(chain.len(), 4);
282
283 let mut data = serde_json::json!({"entity_id": "123", "name": "alice", "secret": "pw"});
284 chain.apply_one(&mut data).unwrap();
285 assert_eq!(
286 data,
287 serde_json::json!({"id": "123", "name": "ALICE", "version": 1})
288 );
289 }
290
291 #[test]
292 fn parse_map_value_step() {
293 let defs = vec![serde_json::json!({
294 "type": "map_value",
295 "field": "op",
296 "mapping": {"D": "deleted", "U": "updated", "I": "inserted"}
297 })];
298 let chain = parse_steps(&defs).unwrap();
299 let mut data = serde_json::json!({"op": "D"});
300 chain.apply_one(&mut data).unwrap();
301 assert_eq!(data["op"], "deleted");
302 }
303
304 #[test]
305 fn parse_truncate_step() {
306 let defs = vec![serde_json::json!({"type": "truncate", "field": "desc", "max_len": 5})];
307 let chain = parse_steps(&defs).unwrap();
308 let mut data = serde_json::json!({"desc": "hello world"});
309 chain.apply_one(&mut data).unwrap();
310 assert_eq!(data["desc"], "hello");
311 }
312
313 #[test]
314 fn parse_nest_step() {
315 let defs = vec![serde_json::json!({
316 "type": "nest",
317 "fields": ["city", "zip"],
318 "into": "address"
319 })];
320 let chain = parse_steps(&defs).unwrap();
321 let mut data = serde_json::json!({"city": "NYC", "zip": "10001", "name": "alice"});
322 chain.apply_one(&mut data).unwrap();
323 assert_eq!(data["address"]["city"], "NYC");
324 assert!(!data.as_object().unwrap().contains_key("city"));
325 }
326
327 #[test]
328 fn parse_coalesce_step() {
329 let defs = vec![serde_json::json!({
330 "type": "coalesce",
331 "fields": ["a", "b"],
332 "into": "result"
333 })];
334 let chain = parse_steps(&defs).unwrap();
335 let mut data = serde_json::json!({"a": null, "b": 42});
336 chain.apply_one(&mut data).unwrap();
337 assert_eq!(data["result"], 42);
338 }
339
340 #[test]
341 fn parse_unknown_type_errors() {
342 let defs = vec![serde_json::json!({"type": "bogus"})];
343 let err = parse_steps(&defs).unwrap_err();
344 assert!(err.to_string().contains("unknown type 'bogus'"));
345 }
346
347 #[test]
348 fn parse_missing_type_errors() {
349 let defs = vec![serde_json::json!({"field": "x"})];
350 let err = parse_steps(&defs).unwrap_err();
351 assert!(err.to_string().contains("missing 'type'"));
352 }
353
354 #[test]
355 fn parse_missing_required_field_errors() {
356 let defs = vec![serde_json::json!({"type": "rename", "from": "x"})];
357 let err = parse_steps(&defs).unwrap_err();
358 assert!(err.to_string().contains("missing 'to'"));
359 }
360
361 #[test]
362 fn parse_unknown_filter_op_errors() {
363 let defs = vec![serde_json::json!({
364 "type": "filter",
365 "field": "x",
366 "op": "bogus",
367 "value": 1
368 })];
369 let err = parse_steps(&defs).unwrap_err();
370 assert!(err.to_string().contains("unknown filter op 'bogus'"));
371 }
372
373 #[test]
374 fn parse_all_step_types() {
375 let defs = vec![
376 serde_json::json!({"type": "rename", "from": "a", "to": "b"}),
377 serde_json::json!({"type": "set", "field": "x", "value": 1}),
378 serde_json::json!({"type": "upper", "field": "x"}),
379 serde_json::json!({"type": "lower", "field": "x"}),
380 serde_json::json!({"type": "remove", "field": "x"}),
381 serde_json::json!({"type": "copy", "from": "a", "to": "b"}),
382 serde_json::json!({"type": "default", "field": "x", "value": 0}),
383 serde_json::json!({"type": "filter", "field": "x", "op": "exists"}),
384 serde_json::json!({"type": "map_value", "field": "x", "mapping": {"a": "b"}}),
385 serde_json::json!({"type": "truncate", "field": "x", "max_len": 10}),
386 serde_json::json!({"type": "nest", "fields": ["a"], "into": "b"}),
387 serde_json::json!({"type": "flatten", "field": "x"}),
388 serde_json::json!({"type": "hash", "field": "x"}),
389 serde_json::json!({"type": "coalesce", "fields": ["a", "b"], "into": "c"}),
390 serde_json::json!({"type": "schema_filter", "field": "x", "allow": ["^public"]}),
391 ];
392 let chain = parse_steps(&defs).unwrap();
393 assert_eq!(chain.len(), 15);
394 }
395
396 #[test]
397 fn parse_schema_filter_allow() {
398 let defs = vec![serde_json::json!({
399 "type": "schema_filter",
400 "field": "schema",
401 "allow": ["^public$", "^analytics$"]
402 })];
403 let chain = parse_steps(&defs).unwrap();
404
405 let mut keep = serde_json::json!({"schema": "public"});
406 assert!(chain.apply_one(&mut keep).unwrap());
407
408 let mut drop = serde_json::json!({"schema": "internal"});
409 assert!(!chain.apply_one(&mut drop).unwrap());
410 }
411
412 #[test]
413 fn parse_schema_filter_deny() {
414 let defs = vec![serde_json::json!({
415 "type": "schema_filter",
416 "field": "table",
417 "deny": ["^pg_catalog", "^information_schema"]
418 })];
419 let chain = parse_steps(&defs).unwrap();
420
421 let mut keep = serde_json::json!({"table": "public.users"});
422 assert!(chain.apply_one(&mut keep).unwrap());
423
424 let mut drop = serde_json::json!({"table": "pg_catalog.pg_class"});
425 assert!(!chain.apply_one(&mut drop).unwrap());
426 }
427
428 #[test]
429 fn parse_schema_filter_requires_allow_or_deny() {
430 let defs = vec![serde_json::json!({
431 "type": "schema_filter",
432 "field": "x"
433 })];
434 let err = parse_steps(&defs).unwrap_err();
435 assert!(err.to_string().contains("requires at least"));
436 }
437
438 #[test]
439 fn parse_schema_filter_invalid_regex_errors() {
440 let defs = vec![serde_json::json!({
441 "type": "schema_filter",
442 "field": "x",
443 "allow": ["[invalid"]
444 })];
445 let err = parse_steps(&defs).unwrap_err();
446 assert!(err.to_string().contains("invalid regex"));
447 }
448}