1use oversync_core::error::OversyncError;
2
3use crate::steps::*;
4use crate::{StepChain, TransformStep};
5
6pub fn parse_steps(defs: &[serde_json::Value]) -> Result<StepChain, OversyncError> {
29 let mut steps: Vec<Box<dyn TransformStep>> = Vec::with_capacity(defs.len());
30
31 for (i, def) in defs.iter().enumerate() {
32 let obj = def
33 .as_object()
34 .ok_or_else(|| OversyncError::Config(format!("transform step {i}: expected object")))?;
35
36 let step_type = obj
37 .get("type")
38 .and_then(|v| v.as_str())
39 .ok_or_else(|| OversyncError::Config(format!("transform step {i}: missing 'type'")))?;
40
41 let step: Box<dyn TransformStep> = match step_type {
42 "rename" => Box::new(Rename {
43 from: req_str(obj, "from", i)?,
44 to: req_str(obj, "to", i)?,
45 }),
46 "set" => Box::new(Set {
47 field: req_str(obj, "field", i)?,
48 value: req_val(obj, "value", i)?,
49 }),
50 "upper" => Box::new(Upper {
51 field: req_str(obj, "field", i)?,
52 }),
53 "lower" => Box::new(Lower {
54 field: req_str(obj, "field", i)?,
55 }),
56 "remove" => Box::new(Remove {
57 field: req_str(obj, "field", i)?,
58 }),
59 "copy" => Box::new(Copy {
60 from: req_str(obj, "from", i)?,
61 to: req_str(obj, "to", i)?,
62 }),
63 "default" => Box::new(Default {
64 field: req_str(obj, "field", i)?,
65 value: req_val(obj, "value", i)?,
66 }),
67 "filter" => {
68 let op_str = req_str(obj, "op", i)?;
69 let op = match op_str.as_str() {
70 "eq" => FilterOp::Eq,
71 "ne" => FilterOp::Ne,
72 "gt" => FilterOp::Gt,
73 "gte" => FilterOp::Gte,
74 "lt" => FilterOp::Lt,
75 "lte" => FilterOp::Lte,
76 "contains" => FilterOp::Contains,
77 "exists" => FilterOp::Exists,
78 other => {
79 return Err(OversyncError::Config(format!(
80 "transform step {i}: unknown filter op '{other}'"
81 )));
82 }
83 };
84 Box::new(Filter {
85 field: req_str(obj, "field", i)?,
86 op,
87 value: obj.get("value").cloned().unwrap_or(serde_json::Value::Null),
88 })
89 }
90 "map_value" => {
91 let mapping = obj
92 .get("mapping")
93 .and_then(|v| v.as_object())
94 .ok_or_else(|| {
95 OversyncError::Config(format!(
96 "transform step {i}: 'map_value' requires 'mapping' object"
97 ))
98 })?
99 .clone();
100 Box::new(MapValue {
101 field: req_str(obj, "field", i)?,
102 mapping,
103 })
104 }
105 "truncate" => {
106 let max_len = obj.get("max_len").and_then(|v| v.as_u64()).ok_or_else(|| {
107 OversyncError::Config(format!(
108 "transform step {i}: 'truncate' requires 'max_len' (integer)"
109 ))
110 })? as usize;
111 Box::new(Truncate {
112 field: req_str(obj, "field", i)?,
113 max_len,
114 })
115 }
116 "nest" => Box::new(Nest {
117 fields: req_str_array(obj, "fields", i)?,
118 into: req_str(obj, "into", i)?,
119 }),
120 "flatten" => Box::new(Flatten {
121 field: req_str(obj, "field", i)?,
122 }),
123 "hash" => Box::new(Hash {
124 field: req_str(obj, "field", i)?,
125 }),
126 "coalesce" => Box::new(Coalesce {
127 fields: req_str_array(obj, "fields", i)?,
128 into: req_str(obj, "into", i)?,
129 }),
130 "schema_filter" => {
131 let allow = parse_regex_array(obj, "allow", i)?;
132 let deny = parse_regex_array(obj, "deny", i)?;
133 if allow.is_empty() && deny.is_empty() {
134 return Err(OversyncError::Config(format!(
135 "transform step {i}: 'schema_filter' requires at least 'allow' or 'deny'"
136 )));
137 }
138 Box::new(SchemaFilter {
139 field: req_str(obj, "field", i)?,
140 allow,
141 deny,
142 })
143 }
144 other => {
145 return Err(OversyncError::Config(format!(
146 "transform step {i}: unknown type '{other}'"
147 )));
148 }
149 };
150
151 steps.push(step);
152 }
153
154 Ok(StepChain::new(steps))
155}
156
157fn req_str(
158 obj: &serde_json::Map<String, serde_json::Value>,
159 key: &str,
160 idx: usize,
161) -> Result<String, OversyncError> {
162 obj.get(key)
163 .and_then(|v| v.as_str())
164 .map(String::from)
165 .ok_or_else(|| {
166 OversyncError::Config(format!("transform step {idx}: missing '{key}' (string)"))
167 })
168}
169
170fn req_val(
171 obj: &serde_json::Map<String, serde_json::Value>,
172 key: &str,
173 idx: usize,
174) -> Result<serde_json::Value, OversyncError> {
175 obj.get(key)
176 .cloned()
177 .ok_or_else(|| OversyncError::Config(format!("transform step {idx}: missing '{key}'")))
178}
179
180fn req_str_array(
181 obj: &serde_json::Map<String, serde_json::Value>,
182 key: &str,
183 idx: usize,
184) -> Result<Vec<String>, OversyncError> {
185 let arr = obj
186 .get(key)
187 .and_then(|v| v.as_array())
188 .ok_or_else(|| {
189 OversyncError::Config(format!(
190 "transform step {idx}: missing '{key}' (string array)"
191 ))
192 })?;
193 arr.iter()
194 .enumerate()
195 .map(|(j, v)| {
196 v.as_str()
197 .map(String::from)
198 .ok_or_else(|| {
199 OversyncError::Config(format!(
200 "transform step {idx}: '{key}[{j}]' must be a string"
201 ))
202 })
203 })
204 .collect()
205}
206
207fn parse_regex_array(
208 obj: &serde_json::Map<String, serde_json::Value>,
209 key: &str,
210 idx: usize,
211) -> Result<Vec<regex::Regex>, OversyncError> {
212 let arr = match obj.get(key).and_then(|v| v.as_array()) {
213 Some(a) => a,
214 None => return Ok(vec![]),
215 };
216 arr.iter()
217 .enumerate()
218 .map(|(j, v)| {
219 let pattern = v.as_str().ok_or_else(|| {
220 OversyncError::Config(format!(
221 "transform step {idx}: '{key}[{j}]' must be a string"
222 ))
223 })?;
224 regex::Regex::new(pattern).map_err(|e| {
225 OversyncError::Config(format!(
226 "transform step {idx}: invalid regex '{pattern}' in '{key}': {e}"
227 ))
228 })
229 })
230 .collect()
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn parse_empty_array() {
239 let chain = parse_steps(&[]).unwrap();
240 assert!(chain.is_empty());
241 }
242
243 #[test]
244 fn parse_rename_step() {
245 let defs = vec![serde_json::json!({"type": "rename", "from": "old", "to": "new"})];
246 let chain = parse_steps(&defs).unwrap();
247 assert_eq!(chain.len(), 1);
248
249 let mut data = serde_json::json!({"old": "value"});
250 chain.apply_one(&mut data).unwrap();
251 assert_eq!(data, serde_json::json!({"new": "value"}));
252 }
253
254 #[test]
255 fn parse_filter_step() {
256 let defs = vec![serde_json::json!({
257 "type": "filter",
258 "field": "status",
259 "op": "eq",
260 "value": "active"
261 })];
262 let chain = parse_steps(&defs).unwrap();
263
264 let mut keep = serde_json::json!({"status": "active"});
265 assert!(chain.apply_one(&mut keep).unwrap());
266
267 let mut drop = serde_json::json!({"status": "deleted"});
268 assert!(!chain.apply_one(&mut drop).unwrap());
269 }
270
271 #[test]
272 fn parse_multi_step_chain() {
273 let defs = vec![
274 serde_json::json!({"type": "rename", "from": "entity_id", "to": "id"}),
275 serde_json::json!({"type": "upper", "field": "name"}),
276 serde_json::json!({"type": "set", "field": "version", "value": 1}),
277 serde_json::json!({"type": "remove", "field": "secret"}),
278 ];
279 let chain = parse_steps(&defs).unwrap();
280 assert_eq!(chain.len(), 4);
281
282 let mut data = serde_json::json!({"entity_id": "123", "name": "alice", "secret": "pw"});
283 chain.apply_one(&mut data).unwrap();
284 assert_eq!(
285 data,
286 serde_json::json!({"id": "123", "name": "ALICE", "version": 1})
287 );
288 }
289
290 #[test]
291 fn parse_map_value_step() {
292 let defs = vec![serde_json::json!({
293 "type": "map_value",
294 "field": "op",
295 "mapping": {"D": "deleted", "U": "updated", "I": "inserted"}
296 })];
297 let chain = parse_steps(&defs).unwrap();
298 let mut data = serde_json::json!({"op": "D"});
299 chain.apply_one(&mut data).unwrap();
300 assert_eq!(data["op"], "deleted");
301 }
302
303 #[test]
304 fn parse_truncate_step() {
305 let defs = vec![serde_json::json!({"type": "truncate", "field": "desc", "max_len": 5})];
306 let chain = parse_steps(&defs).unwrap();
307 let mut data = serde_json::json!({"desc": "hello world"});
308 chain.apply_one(&mut data).unwrap();
309 assert_eq!(data["desc"], "hello");
310 }
311
312 #[test]
313 fn parse_nest_step() {
314 let defs = vec![serde_json::json!({
315 "type": "nest",
316 "fields": ["city", "zip"],
317 "into": "address"
318 })];
319 let chain = parse_steps(&defs).unwrap();
320 let mut data = serde_json::json!({"city": "NYC", "zip": "10001", "name": "alice"});
321 chain.apply_one(&mut data).unwrap();
322 assert_eq!(data["address"]["city"], "NYC");
323 assert!(!data.as_object().unwrap().contains_key("city"));
324 }
325
326 #[test]
327 fn parse_coalesce_step() {
328 let defs = vec![serde_json::json!({
329 "type": "coalesce",
330 "fields": ["a", "b"],
331 "into": "result"
332 })];
333 let chain = parse_steps(&defs).unwrap();
334 let mut data = serde_json::json!({"a": null, "b": 42});
335 chain.apply_one(&mut data).unwrap();
336 assert_eq!(data["result"], 42);
337 }
338
339 #[test]
340 fn parse_unknown_type_errors() {
341 let defs = vec![serde_json::json!({"type": "bogus"})];
342 let err = parse_steps(&defs).unwrap_err();
343 assert!(err.to_string().contains("unknown type 'bogus'"));
344 }
345
346 #[test]
347 fn parse_missing_type_errors() {
348 let defs = vec![serde_json::json!({"field": "x"})];
349 let err = parse_steps(&defs).unwrap_err();
350 assert!(err.to_string().contains("missing 'type'"));
351 }
352
353 #[test]
354 fn parse_missing_required_field_errors() {
355 let defs = vec![serde_json::json!({"type": "rename", "from": "x"})];
356 let err = parse_steps(&defs).unwrap_err();
357 assert!(err.to_string().contains("missing 'to'"));
358 }
359
360 #[test]
361 fn parse_unknown_filter_op_errors() {
362 let defs = vec![serde_json::json!({
363 "type": "filter",
364 "field": "x",
365 "op": "bogus",
366 "value": 1
367 })];
368 let err = parse_steps(&defs).unwrap_err();
369 assert!(err.to_string().contains("unknown filter op 'bogus'"));
370 }
371
372 #[test]
373 fn parse_all_step_types() {
374 let defs = vec![
375 serde_json::json!({"type": "rename", "from": "a", "to": "b"}),
376 serde_json::json!({"type": "set", "field": "x", "value": 1}),
377 serde_json::json!({"type": "upper", "field": "x"}),
378 serde_json::json!({"type": "lower", "field": "x"}),
379 serde_json::json!({"type": "remove", "field": "x"}),
380 serde_json::json!({"type": "copy", "from": "a", "to": "b"}),
381 serde_json::json!({"type": "default", "field": "x", "value": 0}),
382 serde_json::json!({"type": "filter", "field": "x", "op": "exists"}),
383 serde_json::json!({"type": "map_value", "field": "x", "mapping": {"a": "b"}}),
384 serde_json::json!({"type": "truncate", "field": "x", "max_len": 10}),
385 serde_json::json!({"type": "nest", "fields": ["a"], "into": "b"}),
386 serde_json::json!({"type": "flatten", "field": "x"}),
387 serde_json::json!({"type": "hash", "field": "x"}),
388 serde_json::json!({"type": "coalesce", "fields": ["a", "b"], "into": "c"}),
389 serde_json::json!({"type": "schema_filter", "field": "x", "allow": ["^public"]}),
390 ];
391 let chain = parse_steps(&defs).unwrap();
392 assert_eq!(chain.len(), 15);
393 }
394
395 #[test]
396 fn parse_schema_filter_allow() {
397 let defs = vec![serde_json::json!({
398 "type": "schema_filter",
399 "field": "schema",
400 "allow": ["^public$", "^analytics$"]
401 })];
402 let chain = parse_steps(&defs).unwrap();
403
404 let mut keep = serde_json::json!({"schema": "public"});
405 assert!(chain.apply_one(&mut keep).unwrap());
406
407 let mut drop = serde_json::json!({"schema": "internal"});
408 assert!(!chain.apply_one(&mut drop).unwrap());
409 }
410
411 #[test]
412 fn parse_schema_filter_deny() {
413 let defs = vec![serde_json::json!({
414 "type": "schema_filter",
415 "field": "table",
416 "deny": ["^pg_catalog", "^information_schema"]
417 })];
418 let chain = parse_steps(&defs).unwrap();
419
420 let mut keep = serde_json::json!({"table": "public.users"});
421 assert!(chain.apply_one(&mut keep).unwrap());
422
423 let mut drop = serde_json::json!({"table": "pg_catalog.pg_class"});
424 assert!(!chain.apply_one(&mut drop).unwrap());
425 }
426
427 #[test]
428 fn parse_schema_filter_requires_allow_or_deny() {
429 let defs = vec![serde_json::json!({
430 "type": "schema_filter",
431 "field": "x"
432 })];
433 let err = parse_steps(&defs).unwrap_err();
434 assert!(err.to_string().contains("requires at least"));
435 }
436
437 #[test]
438 fn parse_schema_filter_invalid_regex_errors() {
439 let defs = vec![serde_json::json!({
440 "type": "schema_filter",
441 "field": "x",
442 "allow": ["[invalid"]
443 })];
444 let err = parse_steps(&defs).unwrap_err();
445 assert!(err.to_string().contains("invalid regex"));
446 }
447}