1use oversync_core::error::OversyncError;
2
3use crate::steps::*;
4use crate::{StepChain, TransformStep};
5
6pub fn parse_steps(defs: &[serde_json::Value]) -> Result<StepChain, OversyncError> {
29 let mut steps: Vec<Box<dyn TransformStep>> = Vec::with_capacity(defs.len());
30
31 for (i, def) in defs.iter().enumerate() {
32 let obj = def
33 .as_object()
34 .ok_or_else(|| OversyncError::Config(format!("transform step {i}: expected object")))?;
35
36 let step_type = obj
37 .get("type")
38 .and_then(|v| v.as_str())
39 .ok_or_else(|| OversyncError::Config(format!("transform step {i}: missing 'type'")))?;
40
41 let step: Box<dyn TransformStep> = match step_type {
42 "rename" => Box::new(Rename {
43 from: req_str(obj, "from", i)?,
44 to: req_str(obj, "to", i)?,
45 }),
46 "set" => Box::new(Set {
47 field: req_str(obj, "field", i)?,
48 value: req_val(obj, "value", i)?,
49 }),
50 "upper" => Box::new(Upper {
51 field: req_str(obj, "field", i)?,
52 }),
53 "lower" => Box::new(Lower {
54 field: req_str(obj, "field", i)?,
55 }),
56 "remove" => Box::new(Remove {
57 field: req_str(obj, "field", i)?,
58 }),
59 "copy" => Box::new(Copy {
60 from: req_str(obj, "from", i)?,
61 to: req_str(obj, "to", i)?,
62 }),
63 "default" => Box::new(Default {
64 field: req_str(obj, "field", i)?,
65 value: req_val(obj, "value", i)?,
66 }),
67 "filter" => {
68 let op_str = req_str(obj, "op", i)?;
69 let op = match op_str.as_str() {
70 "eq" => FilterOp::Eq,
71 "ne" => FilterOp::Ne,
72 "gt" => FilterOp::Gt,
73 "gte" => FilterOp::Gte,
74 "lt" => FilterOp::Lt,
75 "lte" => FilterOp::Lte,
76 "contains" => FilterOp::Contains,
77 "exists" => FilterOp::Exists,
78 other => {
79 return Err(OversyncError::Config(format!(
80 "transform step {i}: unknown filter op '{other}'"
81 )));
82 }
83 };
84 Box::new(Filter {
85 field: req_str(obj, "field", i)?,
86 op,
87 value: obj.get("value").cloned().unwrap_or(serde_json::Value::Null),
88 })
89 }
90 "map_value" => {
91 let mapping = obj
92 .get("mapping")
93 .and_then(|v| v.as_object())
94 .ok_or_else(|| {
95 OversyncError::Config(format!(
96 "transform step {i}: 'map_value' requires 'mapping' object"
97 ))
98 })?
99 .clone();
100 Box::new(MapValue {
101 field: req_str(obj, "field", i)?,
102 mapping,
103 })
104 }
105 "truncate" => {
106 let max_len = obj.get("max_len").and_then(|v| v.as_u64()).ok_or_else(|| {
107 OversyncError::Config(format!(
108 "transform step {i}: 'truncate' requires 'max_len' (integer)"
109 ))
110 })? as usize;
111 Box::new(Truncate {
112 field: req_str(obj, "field", i)?,
113 max_len,
114 })
115 }
116 "nest" => Box::new(Nest {
117 fields: req_str_array(obj, "fields", i)?,
118 into: req_str(obj, "into", i)?,
119 }),
120 "flatten" => Box::new(Flatten {
121 field: req_str(obj, "field", i)?,
122 }),
123 "hash" => Box::new(Hash {
124 field: req_str(obj, "field", i)?,
125 }),
126 "coalesce" => Box::new(Coalesce {
127 fields: req_str_array(obj, "fields", i)?,
128 into: req_str(obj, "into", i)?,
129 }),
130 "schema_filter" => {
131 let allow = parse_regex_array(obj, "allow", i)?;
132 let deny = parse_regex_array(obj, "deny", i)?;
133 if allow.is_empty() && deny.is_empty() {
134 return Err(OversyncError::Config(format!(
135 "transform step {i}: 'schema_filter' requires at least 'allow' or 'deny'"
136 )));
137 }
138 Box::new(SchemaFilter {
139 field: req_str(obj, "field", i)?,
140 allow,
141 deny,
142 })
143 }
144 other => {
145 return Err(OversyncError::Config(format!(
146 "transform step {i}: unknown type '{other}'"
147 )));
148 }
149 };
150
151 steps.push(step);
152 }
153
154 Ok(StepChain::new(steps))
155}
156
157fn req_str(
158 obj: &serde_json::Map<String, serde_json::Value>,
159 key: &str,
160 idx: usize,
161) -> Result<String, OversyncError> {
162 obj.get(key)
163 .and_then(|v| v.as_str())
164 .map(String::from)
165 .ok_or_else(|| {
166 OversyncError::Config(format!("transform step {idx}: missing '{key}' (string)"))
167 })
168}
169
170fn req_val(
171 obj: &serde_json::Map<String, serde_json::Value>,
172 key: &str,
173 idx: usize,
174) -> Result<serde_json::Value, OversyncError> {
175 obj.get(key)
176 .cloned()
177 .ok_or_else(|| OversyncError::Config(format!("transform step {idx}: missing '{key}'")))
178}
179
180fn req_str_array(
181 obj: &serde_json::Map<String, serde_json::Value>,
182 key: &str,
183 idx: usize,
184) -> Result<Vec<String>, OversyncError> {
185 let arr = obj.get(key).and_then(|v| v.as_array()).ok_or_else(|| {
186 OversyncError::Config(format!(
187 "transform step {idx}: missing '{key}' (string array)"
188 ))
189 })?;
190 arr.iter()
191 .enumerate()
192 .map(|(j, v)| {
193 v.as_str().map(String::from).ok_or_else(|| {
194 OversyncError::Config(format!(
195 "transform step {idx}: '{key}[{j}]' must be a string"
196 ))
197 })
198 })
199 .collect()
200}
201
202fn parse_regex_array(
203 obj: &serde_json::Map<String, serde_json::Value>,
204 key: &str,
205 idx: usize,
206) -> Result<Vec<regex::Regex>, OversyncError> {
207 let arr = match obj.get(key).and_then(|v| v.as_array()) {
208 Some(a) => a,
209 None => return Ok(vec![]),
210 };
211 arr.iter()
212 .enumerate()
213 .map(|(j, v)| {
214 let pattern = v.as_str().ok_or_else(|| {
215 OversyncError::Config(format!(
216 "transform step {idx}: '{key}[{j}]' must be a string"
217 ))
218 })?;
219 regex::Regex::new(pattern).map_err(|e| {
220 OversyncError::Config(format!(
221 "transform step {idx}: invalid regex '{pattern}' in '{key}': {e}"
222 ))
223 })
224 })
225 .collect()
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn parse_empty_array() {
234 let chain = parse_steps(&[]).unwrap();
235 assert!(chain.is_empty());
236 }
237
238 #[test]
239 fn parse_rename_step() {
240 let defs = vec![serde_json::json!({"type": "rename", "from": "old", "to": "new"})];
241 let chain = parse_steps(&defs).unwrap();
242 assert_eq!(chain.len(), 1);
243
244 let mut data = serde_json::json!({"old": "value"});
245 chain.apply_one(&mut data).unwrap();
246 assert_eq!(data, serde_json::json!({"new": "value"}));
247 }
248
249 #[test]
250 fn parse_filter_step() {
251 let defs = vec![serde_json::json!({
252 "type": "filter",
253 "field": "status",
254 "op": "eq",
255 "value": "active"
256 })];
257 let chain = parse_steps(&defs).unwrap();
258
259 let mut keep = serde_json::json!({"status": "active"});
260 assert!(chain.apply_one(&mut keep).unwrap());
261
262 let mut drop = serde_json::json!({"status": "deleted"});
263 assert!(!chain.apply_one(&mut drop).unwrap());
264 }
265
266 #[test]
267 fn parse_multi_step_chain() {
268 let defs = vec![
269 serde_json::json!({"type": "rename", "from": "entity_id", "to": "id"}),
270 serde_json::json!({"type": "upper", "field": "name"}),
271 serde_json::json!({"type": "set", "field": "version", "value": 1}),
272 serde_json::json!({"type": "remove", "field": "secret"}),
273 ];
274 let chain = parse_steps(&defs).unwrap();
275 assert_eq!(chain.len(), 4);
276
277 let mut data = serde_json::json!({"entity_id": "123", "name": "alice", "secret": "pw"});
278 chain.apply_one(&mut data).unwrap();
279 assert_eq!(
280 data,
281 serde_json::json!({"id": "123", "name": "ALICE", "version": 1})
282 );
283 }
284
285 #[test]
286 fn parse_map_value_step() {
287 let defs = vec![serde_json::json!({
288 "type": "map_value",
289 "field": "op",
290 "mapping": {"D": "deleted", "U": "updated", "I": "inserted"}
291 })];
292 let chain = parse_steps(&defs).unwrap();
293 let mut data = serde_json::json!({"op": "D"});
294 chain.apply_one(&mut data).unwrap();
295 assert_eq!(data["op"], "deleted");
296 }
297
298 #[test]
299 fn parse_truncate_step() {
300 let defs = vec![serde_json::json!({"type": "truncate", "field": "desc", "max_len": 5})];
301 let chain = parse_steps(&defs).unwrap();
302 let mut data = serde_json::json!({"desc": "hello world"});
303 chain.apply_one(&mut data).unwrap();
304 assert_eq!(data["desc"], "hello");
305 }
306
307 #[test]
308 fn parse_nest_step() {
309 let defs = vec![serde_json::json!({
310 "type": "nest",
311 "fields": ["city", "zip"],
312 "into": "address"
313 })];
314 let chain = parse_steps(&defs).unwrap();
315 let mut data = serde_json::json!({"city": "NYC", "zip": "10001", "name": "alice"});
316 chain.apply_one(&mut data).unwrap();
317 assert_eq!(data["address"]["city"], "NYC");
318 assert!(!data.as_object().unwrap().contains_key("city"));
319 }
320
321 #[test]
322 fn parse_coalesce_step() {
323 let defs = vec![serde_json::json!({
324 "type": "coalesce",
325 "fields": ["a", "b"],
326 "into": "result"
327 })];
328 let chain = parse_steps(&defs).unwrap();
329 let mut data = serde_json::json!({"a": null, "b": 42});
330 chain.apply_one(&mut data).unwrap();
331 assert_eq!(data["result"], 42);
332 }
333
334 #[test]
335 fn parse_unknown_type_errors() {
336 let defs = vec![serde_json::json!({"type": "bogus"})];
337 let err = parse_steps(&defs).unwrap_err();
338 assert!(err.to_string().contains("unknown type 'bogus'"));
339 }
340
341 #[test]
342 fn parse_missing_type_errors() {
343 let defs = vec![serde_json::json!({"field": "x"})];
344 let err = parse_steps(&defs).unwrap_err();
345 assert!(err.to_string().contains("missing 'type'"));
346 }
347
348 #[test]
349 fn parse_missing_required_field_errors() {
350 let defs = vec![serde_json::json!({"type": "rename", "from": "x"})];
351 let err = parse_steps(&defs).unwrap_err();
352 assert!(err.to_string().contains("missing 'to'"));
353 }
354
355 #[test]
356 fn parse_unknown_filter_op_errors() {
357 let defs = vec![serde_json::json!({
358 "type": "filter",
359 "field": "x",
360 "op": "bogus",
361 "value": 1
362 })];
363 let err = parse_steps(&defs).unwrap_err();
364 assert!(err.to_string().contains("unknown filter op 'bogus'"));
365 }
366
367 #[test]
368 fn parse_all_step_types() {
369 let defs = vec![
370 serde_json::json!({"type": "rename", "from": "a", "to": "b"}),
371 serde_json::json!({"type": "set", "field": "x", "value": 1}),
372 serde_json::json!({"type": "upper", "field": "x"}),
373 serde_json::json!({"type": "lower", "field": "x"}),
374 serde_json::json!({"type": "remove", "field": "x"}),
375 serde_json::json!({"type": "copy", "from": "a", "to": "b"}),
376 serde_json::json!({"type": "default", "field": "x", "value": 0}),
377 serde_json::json!({"type": "filter", "field": "x", "op": "exists"}),
378 serde_json::json!({"type": "map_value", "field": "x", "mapping": {"a": "b"}}),
379 serde_json::json!({"type": "truncate", "field": "x", "max_len": 10}),
380 serde_json::json!({"type": "nest", "fields": ["a"], "into": "b"}),
381 serde_json::json!({"type": "flatten", "field": "x"}),
382 serde_json::json!({"type": "hash", "field": "x"}),
383 serde_json::json!({"type": "coalesce", "fields": ["a", "b"], "into": "c"}),
384 serde_json::json!({"type": "schema_filter", "field": "x", "allow": ["^public"]}),
385 ];
386 let chain = parse_steps(&defs).unwrap();
387 assert_eq!(chain.len(), 15);
388 }
389
390 #[test]
391 fn parse_schema_filter_allow() {
392 let defs = vec![serde_json::json!({
393 "type": "schema_filter",
394 "field": "schema",
395 "allow": ["^public$", "^analytics$"]
396 })];
397 let chain = parse_steps(&defs).unwrap();
398
399 let mut keep = serde_json::json!({"schema": "public"});
400 assert!(chain.apply_one(&mut keep).unwrap());
401
402 let mut drop = serde_json::json!({"schema": "internal"});
403 assert!(!chain.apply_one(&mut drop).unwrap());
404 }
405
406 #[test]
407 fn parse_schema_filter_deny() {
408 let defs = vec![serde_json::json!({
409 "type": "schema_filter",
410 "field": "table",
411 "deny": ["^pg_catalog", "^information_schema"]
412 })];
413 let chain = parse_steps(&defs).unwrap();
414
415 let mut keep = serde_json::json!({"table": "public.users"});
416 assert!(chain.apply_one(&mut keep).unwrap());
417
418 let mut drop = serde_json::json!({"table": "pg_catalog.pg_class"});
419 assert!(!chain.apply_one(&mut drop).unwrap());
420 }
421
422 #[test]
423 fn parse_schema_filter_requires_allow_or_deny() {
424 let defs = vec![serde_json::json!({
425 "type": "schema_filter",
426 "field": "x"
427 })];
428 let err = parse_steps(&defs).unwrap_err();
429 assert!(err.to_string().contains("requires at least"));
430 }
431
432 #[test]
433 fn parse_schema_filter_invalid_regex_errors() {
434 let defs = vec![serde_json::json!({
435 "type": "schema_filter",
436 "field": "x",
437 "allow": ["[invalid"]
438 })];
439 let err = parse_steps(&defs).unwrap_err();
440 assert!(err.to_string().contains("invalid regex"));
441 }
442}