1use faucet_core::FaucetError;
7use quick_xml::events::Event;
8use quick_xml::reader::Reader;
9use serde_json::{Map, Value, json};
10
11pub fn xml_to_json(xml: &str) -> Result<Value, FaucetError> {
16 let mut reader = Reader::from_str(xml);
17 let mut stack: Vec<(String, Map<String, Value>)> = vec![("$root".into(), Map::new())];
18
19 loop {
20 match reader.read_event() {
21 Ok(Event::Start(e)) => {
22 let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
23 let mut obj = Map::new();
24
25 for attr in e.attributes().flatten() {
27 let key = format!("@{}", String::from_utf8_lossy(attr.key.as_ref()));
28 let val = String::from_utf8_lossy(&attr.value).into_owned();
29 obj.insert(key, Value::String(val));
30 }
31
32 stack.push((name, obj));
33 }
34 Ok(Event::End(_)) => {
35 let (name, obj) = stack.pop().ok_or_else(|| {
36 FaucetError::Transform("malformed XML: unexpected end tag".into())
37 })?;
38
39 let value = if obj.len() == 1 && obj.contains_key("#text") {
40 obj.into_iter().next().unwrap().1
42 } else {
43 Value::Object(obj)
44 };
45
46 let parent = stack.last_mut().ok_or_else(|| {
47 FaucetError::Transform("malformed XML: no parent element".into())
48 })?;
49
50 match parent.1.get_mut(&name) {
52 Some(Value::Array(arr)) => arr.push(value),
53 Some(existing) => {
54 let prev = existing.clone();
55 *existing = Value::Array(vec![prev, value]);
56 }
57 None => {
58 parent.1.insert(name, value);
59 }
60 }
61 }
62 Ok(Event::Text(e)) => {
63 let text = e
64 .unescape()
65 .map_err(|err| FaucetError::Transform(format!("XML decode error: {err}")))?
66 .trim()
67 .to_string();
68
69 if !text.is_empty()
70 && let Some(current) = stack.last_mut()
71 {
72 match current.1.get_mut("#text") {
73 Some(Value::String(s)) => {
74 s.push(' ');
75 s.push_str(&text);
76 }
77 _ => {
78 current.1.insert("#text".into(), Value::String(text));
79 }
80 }
81 }
82 }
83 Ok(Event::CData(e)) => {
84 let text = e
90 .decode()
91 .map_err(|err| {
92 FaucetError::Transform(format!("XML CDATA decode error: {err}"))
93 })?
94 .trim()
95 .to_string();
96
97 if !text.is_empty()
98 && let Some(current) = stack.last_mut()
99 {
100 match current.1.get_mut("#text") {
101 Some(Value::String(s)) => {
102 s.push(' ');
103 s.push_str(&text);
104 }
105 _ => {
106 current.1.insert("#text".into(), Value::String(text));
107 }
108 }
109 }
110 }
111 Ok(Event::Empty(e)) => {
112 let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
113 let mut obj = Map::new();
114 for attr in e.attributes().flatten() {
115 let key = format!("@{}", String::from_utf8_lossy(attr.key.as_ref()));
116 let val = String::from_utf8_lossy(&attr.value).into_owned();
117 obj.insert(key, Value::String(val));
118 }
119 let value = if obj.is_empty() {
120 json!(null)
121 } else {
122 Value::Object(obj)
123 };
124
125 if let Some(parent) = stack.last_mut() {
126 match parent.1.get_mut(&name) {
127 Some(Value::Array(arr)) => arr.push(value),
128 Some(existing) => {
129 let prev = existing.clone();
130 *existing = Value::Array(vec![prev, value]);
131 }
132 None => {
133 parent.1.insert(name, value);
134 }
135 }
136 }
137 }
138 Ok(Event::Eof) => break,
139 Ok(_) => {} Err(e) => {
141 return Err(FaucetError::Transform(format!("XML parse error: {e}")));
142 }
143 }
144 }
145
146 let (_, root) = stack
147 .pop()
148 .ok_or_else(|| FaucetError::Transform("empty XML document".into()))?;
149
150 Ok(Value::Object(root))
151}
152
153pub fn stream_extract<F: FnMut(Value)>(
172 xml: &str,
173 records_element_path: Option<&str>,
174 mut on_record: F,
175) -> Result<(), FaucetError> {
176 let target_segments: Option<Vec<&str>> = records_element_path.map(|p| p.split('.').collect());
177
178 let mut reader = Reader::from_str(xml);
179
180 let mut path: Vec<String> = Vec::new();
182
183 let mut start_depth: Option<usize> = None;
188 let mut subtree: Vec<(String, Map<String, Value>)> = Vec::new();
189
190 let mut full_doc: Option<Vec<(String, Map<String, Value>)>> = if target_segments.is_none() {
194 Some(vec![("$root".into(), Map::new())])
195 } else {
196 None
197 };
198
199 fn path_matches(path: &[String], target: &[&str]) -> bool {
203 path.len() == target.len() && path.iter().zip(target).all(|(a, b)| a.as_str() == *b)
204 }
205
206 fn append_child(parent: &mut Map<String, Value>, name: String, value: Value) {
209 match parent.get_mut(&name) {
210 Some(Value::Array(arr)) => arr.push(value),
211 Some(existing) => {
212 let prev = existing.clone();
213 *existing = Value::Array(vec![prev, value]);
214 }
215 None => {
216 parent.insert(name, value);
217 }
218 }
219 }
220
221 loop {
222 match reader.read_event() {
223 Ok(Event::Start(e)) => {
224 let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
225 let mut obj = Map::new();
226 for attr in e.attributes().flatten() {
227 let key = format!("@{}", String::from_utf8_lossy(attr.key.as_ref()));
228 let val = String::from_utf8_lossy(&attr.value).into_owned();
229 obj.insert(key, Value::String(val));
230 }
231
232 path.push(name.clone());
233
234 if let Some(doc) = full_doc.as_mut() {
235 doc.push((name, obj));
236 } else if let Some(target) = target_segments.as_deref() {
237 if start_depth.is_some() {
238 subtree.push((name, obj));
239 } else if path_matches(&path, target) {
240 start_depth = Some(path.len() - 1);
243 subtree.push((name, obj));
244 }
245 }
248 }
249 Ok(Event::Empty(e)) => {
250 let name = String::from_utf8_lossy(e.name().as_ref()).into_owned();
251 let mut obj = Map::new();
252 for attr in e.attributes().flatten() {
253 let key = format!("@{}", String::from_utf8_lossy(attr.key.as_ref()));
254 let val = String::from_utf8_lossy(&attr.value).into_owned();
255 obj.insert(key, Value::String(val));
256 }
257 let value = if obj.is_empty() {
258 json!(null)
259 } else {
260 Value::Object(obj)
261 };
262
263 path.push(name.clone());
266 let matches_target = target_segments
267 .as_deref()
268 .map(|t| path_matches(&path, t))
269 .unwrap_or(false);
270 path.pop();
271
272 if let Some(doc) = full_doc.as_mut() {
273 if let Some(parent) = doc.last_mut() {
274 append_child(&mut parent.1, name, value);
275 }
276 } else if matches_target && start_depth.is_none() {
277 on_record(value);
279 } else if start_depth.is_some()
280 && let Some(parent) = subtree.last_mut()
281 {
282 append_child(&mut parent.1, name, value);
283 }
284 }
285 Ok(Event::End(_)) => {
286 let name = path.pop().ok_or_else(|| {
287 FaucetError::Transform("malformed XML: unexpected end tag".into())
288 })?;
289
290 if let Some(doc) = full_doc.as_mut() {
291 let (popped_name, obj) = doc.pop().ok_or_else(|| {
292 FaucetError::Transform("malformed XML: no element on stack".into())
293 })?;
294 debug_assert_eq!(popped_name, name);
295 let value = if obj.len() == 1 && obj.contains_key("#text") {
296 obj.into_iter().next().unwrap().1
297 } else {
298 Value::Object(obj)
299 };
300 let parent = doc.last_mut().ok_or_else(|| {
301 FaucetError::Transform("malformed XML: no parent element".into())
302 })?;
303 append_child(&mut parent.1, popped_name, value);
304 } else if let Some(depth) = start_depth {
305 let (popped_name, obj) = subtree.pop().ok_or_else(|| {
306 FaucetError::Transform("malformed XML: no element on subtree stack".into())
307 })?;
308 debug_assert_eq!(popped_name, name);
309 let value = if obj.len() == 1 && obj.contains_key("#text") {
310 obj.into_iter().next().unwrap().1
311 } else {
312 Value::Object(obj)
313 };
314
315 if subtree.is_empty() {
316 debug_assert_eq!(path.len(), depth);
319 start_depth = None;
320 on_record(value);
321 } else if let Some(parent) = subtree.last_mut() {
322 append_child(&mut parent.1, popped_name, value);
323 }
324 }
325 }
327 Ok(Event::Text(e)) => {
328 let text = e
329 .unescape()
330 .map_err(|err| FaucetError::Transform(format!("XML decode error: {err}")))?
331 .trim()
332 .to_string();
333 if text.is_empty() {
334 continue;
335 }
336
337 if let Some(doc) = full_doc.as_mut() {
338 if let Some(current) = doc.last_mut() {
339 match current.1.get_mut("#text") {
340 Some(Value::String(s)) => {
341 s.push(' ');
342 s.push_str(&text);
343 }
344 _ => {
345 current.1.insert("#text".into(), Value::String(text));
346 }
347 }
348 }
349 } else if start_depth.is_some()
350 && let Some(current) = subtree.last_mut()
351 {
352 match current.1.get_mut("#text") {
353 Some(Value::String(s)) => {
354 s.push(' ');
355 s.push_str(&text);
356 }
357 _ => {
358 current.1.insert("#text".into(), Value::String(text));
359 }
360 }
361 }
362 }
363 Ok(Event::CData(e)) => {
364 let text = e
368 .decode()
369 .map_err(|err| {
370 FaucetError::Transform(format!("XML CDATA decode error: {err}"))
371 })?
372 .trim()
373 .to_string();
374 if text.is_empty() {
375 continue;
376 }
377 if let Some(doc) = full_doc.as_mut() {
378 if let Some(current) = doc.last_mut() {
379 match current.1.get_mut("#text") {
380 Some(Value::String(s)) => {
381 s.push(' ');
382 s.push_str(&text);
383 }
384 _ => {
385 current.1.insert("#text".into(), Value::String(text));
386 }
387 }
388 }
389 } else if start_depth.is_some()
390 && let Some(current) = subtree.last_mut()
391 {
392 match current.1.get_mut("#text") {
393 Some(Value::String(s)) => {
394 s.push(' ');
395 s.push_str(&text);
396 }
397 _ => {
398 current.1.insert("#text".into(), Value::String(text));
399 }
400 }
401 }
402 }
403 Ok(Event::Eof) => break,
404 Ok(_) => {} Err(e) => {
406 return Err(FaucetError::Transform(format!("XML parse error: {e}")));
407 }
408 }
409 }
410
411 if let Some(mut doc) = full_doc {
412 let (_, root) = doc
413 .pop()
414 .ok_or_else(|| FaucetError::Transform("empty XML document".into()))?;
415 on_record(Value::Object(root));
416 }
417
418 Ok(())
419}
420
421pub fn extract_at_path(value: &Value, path: &str) -> Vec<Value> {
425 let segments: Vec<&str> = path.split('.').collect();
426 let mut current = value.clone();
427
428 for seg in &segments {
429 current = match current {
430 Value::Object(ref map) => match map.get(*seg) {
431 Some(v) => v.clone(),
432 None => return vec![],
433 },
434 _ => return vec![],
435 };
436 }
437
438 match current {
439 Value::Array(arr) => arr,
440 other => vec![other],
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn simple_xml_to_json() {
450 let xml = r#"<root><name>Alice</name><age>30</age></root>"#;
451 let json = xml_to_json(xml).unwrap();
452 assert_eq!(json["root"]["name"], "Alice");
453 assert_eq!(json["root"]["age"], "30");
454 }
455
456 #[test]
457 fn repeated_elements_become_array() {
458 let xml = r#"<root><item>a</item><item>b</item><item>c</item></root>"#;
459 let json = xml_to_json(xml).unwrap();
460 let items = json["root"]["item"].as_array().unwrap();
461 assert_eq!(items.len(), 3);
462 assert_eq!(items[0], "a");
463 assert_eq!(items[1], "b");
464 }
465
466 #[test]
467 fn attributes_prefixed() {
468 let xml = r#"<user id="42"><name>Bob</name></user>"#;
469 let json = xml_to_json(xml).unwrap();
470 assert_eq!(json["user"]["@id"], "42");
471 assert_eq!(json["user"]["name"], "Bob");
472 }
473
474 #[test]
475 fn nested_elements() {
476 let xml = r#"<root><user><address><city>NYC</city></address></user></root>"#;
477 let json = xml_to_json(xml).unwrap();
478 assert_eq!(json["root"]["user"]["address"]["city"], "NYC");
479 }
480
481 #[test]
482 fn cdata_content_is_captured_not_dropped() {
483 let xml = r#"<root><body><![CDATA[<b>hi</b> & bye]]></body></root>"#;
486 let json = xml_to_json(xml).unwrap();
487 assert_eq!(json["root"]["body"], "<b>hi</b> & bye");
488 }
489
490 #[test]
491 fn cdata_content_captured_in_streaming_path() {
492 let xml = r#"<feed><item><html><![CDATA[<p>x</p>]]></html></item></feed>"#;
494 let recs = collect_stream_extract(xml, Some("feed.item"));
495 assert_eq!(recs.len(), 1);
496 assert_eq!(recs[0]["html"], "<p>x</p>");
497 }
498
499 #[test]
500 fn empty_elements() {
501 let xml = r#"<root><flag/></root>"#;
502 let json = xml_to_json(xml).unwrap();
503 assert!(json["root"]["flag"].is_null());
504 }
505
506 #[test]
507 fn empty_element_with_attr() {
508 let xml = r#"<root><flag enabled="true"/></root>"#;
509 let json = xml_to_json(xml).unwrap();
510 assert_eq!(json["root"]["flag"]["@enabled"], "true");
511 }
512
513 #[test]
514 fn extract_at_path_nested() {
515 let val = json!({"root": {"users": {"user": [{"id": 1}, {"id": 2}]}}});
516 let records = extract_at_path(&val, "root.users.user");
517 assert_eq!(records.len(), 2);
518 assert_eq!(records[0]["id"], 1);
519 }
520
521 #[test]
522 fn extract_at_path_single_element() {
523 let val = json!({"root": {"user": {"id": 1}}});
524 let records = extract_at_path(&val, "root.user");
525 assert_eq!(records.len(), 1);
526 assert_eq!(records[0]["id"], 1);
527 }
528
529 #[test]
530 fn extract_at_path_missing() {
531 let val = json!({"root": {}});
532 let records = extract_at_path(&val, "root.users.user");
533 assert!(records.is_empty());
534 }
535
536 #[test]
537 fn soap_envelope() {
538 let xml = r#"
539 <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
540 <soap:Body>
541 <GetUsersResponse>
542 <User><Name>Alice</Name></User>
543 <User><Name>Bob</Name></User>
544 </GetUsersResponse>
545 </soap:Body>
546 </soap:Envelope>"#;
547 let json = xml_to_json(xml).unwrap();
548 let users = extract_at_path(&json, "soap:Envelope.soap:Body.GetUsersResponse.User");
549 assert_eq!(users.len(), 2);
550 }
551
552 fn collect_stream_extract(xml: &str, path: Option<&str>) -> Vec<Value> {
553 let mut out = Vec::new();
554 stream_extract(xml, path, |v| out.push(v)).unwrap();
555 out
556 }
557
558 #[test]
559 fn stream_extract_matches_eager_path_extraction() {
560 let xml = r#"<root>
561 <user id="1"><name>Alice</name><age>30</age></user>
562 <user id="2"><name>Bob</name><age>25</age></user>
563 <user id="3"><name>Carol</name><age>40</age></user>
564 </root>"#;
565 let streamed = collect_stream_extract(xml, Some("root.user"));
566 let eager = extract_at_path(&xml_to_json(xml).unwrap(), "root.user");
567 assert_eq!(streamed, eager);
568 assert_eq!(streamed.len(), 3);
569 assert_eq!(streamed[0]["@id"], "1");
570 assert_eq!(streamed[0]["name"], "Alice");
571 assert_eq!(streamed[2]["name"], "Carol");
572 }
573
574 #[test]
575 fn stream_extract_handles_nested_children_and_attrs() {
576 let xml = r#"<root>
577 <order id="A"><line><sku>X</sku><qty>2</qty></line><line><sku>Y</sku><qty>5</qty></line></order>
578 <order id="B"><line><sku>Z</sku><qty>1</qty></line></order>
579 </root>"#;
580 let streamed = collect_stream_extract(xml, Some("root.order"));
581 let eager = extract_at_path(&xml_to_json(xml).unwrap(), "root.order");
582 assert_eq!(streamed, eager);
583 assert_eq!(streamed.len(), 2);
584 let lines = streamed[0]["line"].as_array().expect("repeated children");
585 assert_eq!(lines.len(), 2);
586 assert_eq!(lines[1]["sku"], "Y");
587 }
588
589 #[test]
590 fn stream_extract_no_path_returns_full_doc_once() {
591 let xml = r#"<root><a>1</a><b>2</b></root>"#;
592 let streamed = collect_stream_extract(xml, None);
593 let eager = xml_to_json(xml).unwrap();
594 assert_eq!(streamed.len(), 1);
595 assert_eq!(streamed[0], eager);
596 }
597
598 #[test]
599 fn stream_extract_no_matches_emits_nothing() {
600 let xml = r#"<root><a>1</a></root>"#;
601 let streamed = collect_stream_extract(xml, Some("root.missing"));
602 assert!(streamed.is_empty());
603 }
604
605 #[test]
606 fn stream_extract_self_closing_matched_element() {
607 let xml = r#"<root><item id="1"/><item id="2"/><item id="3"/></root>"#;
608 let streamed = collect_stream_extract(xml, Some("root.item"));
609 assert_eq!(streamed.len(), 3);
610 assert_eq!(streamed[0]["@id"], "1");
611 assert_eq!(streamed[2]["@id"], "3");
612 }
613
614 #[test]
615 fn stream_extract_preserves_soap_namespaces() {
616 let xml = r#"
617 <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
618 <soap:Body>
619 <GetUsersResponse>
620 <User><Name>Alice</Name></User>
621 <User><Name>Bob</Name></User>
622 </GetUsersResponse>
623 </soap:Body>
624 </soap:Envelope>"#;
625 let streamed =
626 collect_stream_extract(xml, Some("soap:Envelope.soap:Body.GetUsersResponse.User"));
627 let eager = extract_at_path(
628 &xml_to_json(xml).unwrap(),
629 "soap:Envelope.soap:Body.GetUsersResponse.User",
630 );
631 assert_eq!(streamed, eager);
632 assert_eq!(streamed.len(), 2);
633 assert_eq!(streamed[1]["Name"], "Bob");
634 }
635}