Skip to main content

floe_core/io/read/
xml.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::path::Path;
3
4use polars::prelude::{DataFrame, NamedFrom, Series};
5use roxmltree::{Document, Node};
6
7use crate::io::format::{self, FileReadError, InputAdapter, LocalInputFile, ReadInput};
8use crate::io::read::xml_selector::{parse_selector, SelectorToken};
9use crate::{config, FloeResult};
10
11struct XmlInputAdapter;
12
13static XML_INPUT_ADAPTER: XmlInputAdapter = XmlInputAdapter;
14
15pub(crate) fn xml_input_adapter() -> &'static dyn InputAdapter {
16    &XML_INPUT_ADAPTER
17}
18
19#[derive(Debug, Clone)]
20pub struct XmlReadError {
21    pub rule: String,
22    pub message: String,
23}
24
25impl std::fmt::Display for XmlReadError {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        write!(f, "{}: {}", self.rule, self.message)
28    }
29}
30
31impl std::error::Error for XmlReadError {}
32
33struct SelectorPlan {
34    source: String,
35    tokens: Vec<SelectorToken>,
36}
37
38fn build_selector_plan(
39    columns: &[config::ColumnConfig],
40) -> Result<Vec<SelectorPlan>, XmlReadError> {
41    let mut plans = Vec::with_capacity(columns.len());
42    let mut seen = std::collections::HashSet::new();
43    for column in columns {
44        let source = column.source_or_name().to_string();
45        if !seen.insert(source.clone()) {
46            return Err(XmlReadError {
47                rule: "xml_selector_invalid".to_string(),
48                message: format!("duplicate xml selector source: {}", source),
49            });
50        }
51        let tokens = parse_selector(&source).map_err(|err| XmlReadError {
52            rule: "xml_selector_invalid".to_string(),
53            message: format!("invalid selector {}: {}", source, err.message),
54        })?;
55        plans.push(SelectorPlan { source, tokens });
56    }
57    Ok(plans)
58}
59
60fn split_tag(tag: &str) -> (Option<&str>, &str) {
61    if let Some((prefix, local)) = tag.split_once(':') {
62        (Some(prefix), local)
63    } else {
64        (None, tag)
65    }
66}
67
68fn matches_tag(node: Node<'_, '_>, tag: &str, namespace: Option<&str>) -> bool {
69    let (prefix, local) = split_tag(tag);
70    let name = node.tag_name();
71    if name.name() != local {
72        return false;
73    }
74    if prefix.is_some() {
75        if let Some(ns) = namespace {
76            return name.namespace() == Some(ns);
77        }
78        return true;
79    }
80    namespace.is_none_or(|ns| name.namespace() == Some(ns))
81}
82
83fn matches_namespace(node: Node<'_, '_>, namespace: Option<&str>) -> bool {
84    namespace.is_none_or(|ns| node.tag_name().namespace() == Some(ns))
85}
86
87fn collect_text(node: Node<'_, '_>) -> Option<String> {
88    let mut text = String::new();
89    for descendant in node.descendants() {
90        if descendant.is_text() {
91            if let Some(value) = descendant.text() {
92                text.push_str(value);
93            }
94        }
95    }
96    let trimmed = text.trim();
97    if trimmed.is_empty() {
98        None
99    } else {
100        Some(trimmed.to_string())
101    }
102}
103
104fn resolve_value_node<'a>(
105    node: Node<'a, 'a>,
106    value_tag: Option<&str>,
107    namespace: Option<&str>,
108) -> Node<'a, 'a> {
109    let Some(value_tag) = value_tag else {
110        return node;
111    };
112    for descendant in node.descendants().skip(1) {
113        if descendant.is_element() && matches_tag(descendant, value_tag, namespace) {
114            return descendant;
115        }
116    }
117    node
118}
119
120fn find_child<'a>(node: Node<'a, 'a>, tag: &str, namespace: Option<&str>) -> Option<Node<'a, 'a>> {
121    node.children()
122        .find(|child| child.is_element() && matches_tag(*child, tag, namespace))
123}
124
125fn evaluate_selector(
126    row: Node<'_, '_>,
127    tokens: &[SelectorToken],
128    namespace: Option<&str>,
129    value_tag: Option<&str>,
130) -> Option<String> {
131    let mut current = row;
132    for token in tokens {
133        match token {
134            SelectorToken::Element(tag) => {
135                let next = find_child(current, tag.as_str(), namespace)?;
136                current = next;
137            }
138            SelectorToken::Attribute(attr) => {
139                return current
140                    .attribute(attr.as_str())
141                    .map(|value| value.to_string());
142            }
143        }
144    }
145    let value_node = resolve_value_node(current, value_tag, namespace);
146    collect_text(value_node)
147}
148
149fn read_xml_columns(
150    input_path: &Path,
151    row_tag: &str,
152    namespace: Option<&str>,
153) -> Result<Vec<String>, XmlReadError> {
154    let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
155        rule: "xml_parse_error".to_string(),
156        message: format!("failed to read xml at {}: {err}", input_path.display()),
157    })?;
158    let doc = Document::parse(&content).map_err(|err| XmlReadError {
159        rule: "xml_parse_error".to_string(),
160        message: format!("xml parse error: {err}"),
161    })?;
162    let row = doc
163        .descendants()
164        .find(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
165        .ok_or_else(|| XmlReadError {
166            rule: "xml_parse_error".to_string(),
167            message: format!("row_tag={} not found in xml", row_tag),
168        })?;
169
170    let mut names = BTreeSet::new();
171    for child in row.children().filter(|node| node.is_element()) {
172        if !matches_namespace(child, namespace) {
173            continue;
174        }
175        names.insert(child.tag_name().name().to_string());
176    }
177    Ok(names.into_iter().collect())
178}
179
180fn read_xml_file(
181    input_path: &Path,
182    columns: &[config::ColumnConfig],
183    row_tag: &str,
184    namespace: Option<&str>,
185    value_tag: Option<&str>,
186) -> Result<DataFrame, XmlReadError> {
187    let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
188        rule: "xml_parse_error".to_string(),
189        message: format!("failed to read xml at {}: {err}", input_path.display()),
190    })?;
191    let doc = Document::parse(&content).map_err(|err| XmlReadError {
192        rule: "xml_parse_error".to_string(),
193        message: format!("xml parse error: {err}"),
194    })?;
195    let plans = build_selector_plan(columns)?;
196    let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();
197
198    for row in doc
199        .descendants()
200        .filter(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
201    {
202        let mut record = BTreeMap::new();
203        for plan in &plans {
204            let value = evaluate_selector(row, &plan.tokens, namespace, value_tag);
205            record.insert(plan.source.clone(), value);
206        }
207        rows.push(record);
208    }
209
210    if rows.is_empty() {
211        return Err(XmlReadError {
212            rule: "xml_parse_error".to_string(),
213            message: format!("row_tag={} produced no rows", row_tag),
214        });
215    }
216
217    let columns = plans
218        .iter()
219        .map(|plan| plan.source.clone())
220        .collect::<Vec<_>>();
221    build_dataframe(&columns, &rows)
222}
223
224fn build_dataframe(
225    columns: &[String],
226    rows: &[BTreeMap<String, Option<String>>],
227) -> Result<DataFrame, XmlReadError> {
228    let mut series = Vec::with_capacity(columns.len());
229    for name in columns {
230        let mut values = Vec::with_capacity(rows.len());
231        for row in rows {
232            values.push(row.get(name).cloned().unwrap_or(None));
233        }
234        series.push(Series::new(name.as_str().into(), values).into());
235    }
236
237    DataFrame::new(series).map_err(|err| XmlReadError {
238        rule: "xml_parse_error".to_string(),
239        message: format!("failed to build dataframe: {err}"),
240    })
241}
242
243fn xml_options(
244    entity: &config::EntityConfig,
245) -> Result<(String, Option<String>, Option<String>), XmlReadError> {
246    let options = entity.source.options.as_ref().ok_or_else(|| XmlReadError {
247        rule: "xml_parse_error".to_string(),
248        message: "source.options is required for xml input".to_string(),
249    })?;
250    let row_tag = options
251        .row_tag
252        .as_deref()
253        .ok_or_else(|| XmlReadError {
254            rule: "xml_parse_error".to_string(),
255            message: "source.options.row_tag is required for xml input".to_string(),
256        })?
257        .trim()
258        .to_string();
259    if row_tag.is_empty() {
260        return Err(XmlReadError {
261            rule: "xml_parse_error".to_string(),
262            message: "source.options.row_tag is required for xml input".to_string(),
263        });
264    }
265    let namespace = options
266        .namespace
267        .as_deref()
268        .map(|value| value.trim())
269        .filter(|value| !value.is_empty())
270        .map(|value| value.to_string());
271    let value_tag = options
272        .value_tag
273        .as_deref()
274        .map(|value| value.trim())
275        .filter(|value| !value.is_empty())
276        .map(|value| value.to_string());
277    Ok((row_tag, namespace, value_tag))
278}
279
280impl InputAdapter for XmlInputAdapter {
281    fn format(&self) -> &'static str {
282        "xml"
283    }
284
285    fn read_input_columns(
286        &self,
287        entity: &config::EntityConfig,
288        input_file: &LocalInputFile,
289        _columns: &[config::ColumnConfig],
290    ) -> Result<Vec<String>, FileReadError> {
291        let (row_tag, namespace, _value_tag) =
292            xml_options(entity).map_err(|err| FileReadError {
293                rule: err.rule,
294                message: err.message,
295            })?;
296        read_xml_columns(&input_file.local_path, &row_tag, namespace.as_deref()).map_err(|err| {
297            FileReadError {
298                rule: err.rule,
299                message: err.message,
300            }
301        })
302    }
303
304    fn read_inputs(
305        &self,
306        entity: &config::EntityConfig,
307        files: &[LocalInputFile],
308        columns: &[config::ColumnConfig],
309        normalize_strategy: Option<&str>,
310        collect_raw: bool,
311    ) -> FloeResult<Vec<ReadInput>> {
312        let mut inputs = Vec::with_capacity(files.len());
313        let (row_tag, namespace, value_tag) = match xml_options(entity) {
314            Ok(options) => options,
315            Err(err) => {
316                for input_file in files {
317                    inputs.push(ReadInput::FileError {
318                        input_file: input_file.file.clone(),
319                        error: FileReadError {
320                            rule: err.rule.clone(),
321                            message: err.message.clone(),
322                        },
323                    });
324                }
325                return Ok(inputs);
326            }
327        };
328
329        for input_file in files {
330            let path = &input_file.local_path;
331            let read_result = read_xml_file(
332                path,
333                columns,
334                &row_tag,
335                namespace.as_deref(),
336                value_tag.as_deref(),
337            );
338            match read_result {
339                Ok(df) => {
340                    let input = format::read_input_from_df(
341                        input_file,
342                        &df,
343                        columns,
344                        normalize_strategy,
345                        collect_raw,
346                    )?;
347                    inputs.push(input);
348                }
349                Err(err) => {
350                    inputs.push(ReadInput::FileError {
351                        input_file: input_file.file.clone(),
352                        error: FileReadError {
353                            rule: err.rule,
354                            message: err.message,
355                        },
356                    });
357                }
358            }
359        }
360        Ok(inputs)
361    }
362}