Skip to main content

floe_core/io/read/
xml.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::path::Path;
3
4use polars::prelude::{DataFrame, NamedFrom, Series};
5use roxmltree::{Document, Node};
6
7use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
8use crate::io::read::xml_selector::{parse_selector, SelectorToken};
9use crate::{config, FloeResult};
10
11struct XmlInputAdapter;
12
13static XML_INPUT_ADAPTER: XmlInputAdapter = XmlInputAdapter;
14
15pub(crate) fn xml_input_adapter() -> &'static dyn InputAdapter {
16    &XML_INPUT_ADAPTER
17}
18
19#[derive(Debug, Clone)]
20pub struct XmlReadError {
21    pub rule: String,
22    pub message: String,
23}
24
25impl std::fmt::Display for XmlReadError {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        write!(f, "{}: {}", self.rule, self.message)
28    }
29}
30
31impl std::error::Error for XmlReadError {}
32
33struct SelectorPlan {
34    source: String,
35    tokens: Vec<SelectorToken>,
36}
37
38fn build_selector_plan(
39    columns: &[config::ColumnConfig],
40) -> Result<Vec<SelectorPlan>, XmlReadError> {
41    let mut plans = Vec::with_capacity(columns.len());
42    let mut seen = std::collections::HashSet::new();
43    for column in columns {
44        let source = column.source_or_name().to_string();
45        if !seen.insert(source.clone()) {
46            return Err(XmlReadError {
47                rule: "xml_selector_invalid".to_string(),
48                message: format!("duplicate xml selector source: {}", source),
49            });
50        }
51        let tokens = parse_selector(&source).map_err(|err| XmlReadError {
52            rule: "xml_selector_invalid".to_string(),
53            message: format!("invalid selector {}: {}", source, err.message),
54        })?;
55        plans.push(SelectorPlan { source, tokens });
56    }
57    Ok(plans)
58}
59
60fn split_tag(tag: &str) -> (Option<&str>, &str) {
61    if let Some((prefix, local)) = tag.split_once(':') {
62        (Some(prefix), local)
63    } else {
64        (None, tag)
65    }
66}
67
68fn matches_tag(node: Node<'_, '_>, tag: &str, namespace: Option<&str>) -> bool {
69    let (prefix, local) = split_tag(tag);
70    let name = node.tag_name();
71    if name.name() != local {
72        return false;
73    }
74    if prefix.is_some() {
75        if let Some(ns) = namespace {
76            return name.namespace() == Some(ns);
77        }
78        return true;
79    }
80    namespace.is_none_or(|ns| name.namespace() == Some(ns))
81}
82
83fn matches_namespace(node: Node<'_, '_>, namespace: Option<&str>) -> bool {
84    namespace.is_none_or(|ns| node.tag_name().namespace() == Some(ns))
85}
86
87fn collect_text(node: Node<'_, '_>) -> Option<String> {
88    let mut text = String::new();
89    for descendant in node.descendants() {
90        if descendant.is_text() {
91            if let Some(value) = descendant.text() {
92                text.push_str(value);
93            }
94        }
95    }
96    let trimmed = text.trim();
97    if trimmed.is_empty() {
98        None
99    } else {
100        Some(trimmed.to_string())
101    }
102}
103
104fn resolve_value_node<'a>(
105    node: Node<'a, 'a>,
106    value_tag: Option<&str>,
107    namespace: Option<&str>,
108) -> Node<'a, 'a> {
109    let Some(value_tag) = value_tag else {
110        return node;
111    };
112    for descendant in node.descendants().skip(1) {
113        if descendant.is_element() && matches_tag(descendant, value_tag, namespace) {
114            return descendant;
115        }
116    }
117    node
118}
119
120fn find_child<'a>(node: Node<'a, 'a>, tag: &str, namespace: Option<&str>) -> Option<Node<'a, 'a>> {
121    node.children()
122        .find(|child| child.is_element() && matches_tag(*child, tag, namespace))
123}
124
125fn evaluate_selector(
126    row: Node<'_, '_>,
127    tokens: &[SelectorToken],
128    namespace: Option<&str>,
129    value_tag: Option<&str>,
130) -> Option<String> {
131    let mut current = row;
132    for token in tokens {
133        match token {
134            SelectorToken::Element(tag) => {
135                let next = find_child(current, tag.as_str(), namespace)?;
136                current = next;
137            }
138            SelectorToken::Attribute(attr) => {
139                return current
140                    .attribute(attr.as_str())
141                    .map(|value| value.to_string());
142            }
143        }
144    }
145    let value_node = resolve_value_node(current, value_tag, namespace);
146    collect_text(value_node)
147}
148
149fn read_xml_columns(
150    input_path: &Path,
151    row_tag: &str,
152    namespace: Option<&str>,
153) -> Result<Vec<String>, XmlReadError> {
154    let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
155        rule: "xml_parse_error".to_string(),
156        message: format!("failed to read xml at {}: {err}", input_path.display()),
157    })?;
158    let doc = Document::parse(&content).map_err(|err| XmlReadError {
159        rule: "xml_parse_error".to_string(),
160        message: format!("xml parse error: {err}"),
161    })?;
162    let row = doc
163        .descendants()
164        .find(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
165        .ok_or_else(|| XmlReadError {
166            rule: "xml_parse_error".to_string(),
167            message: format!("row_tag={} not found in xml", row_tag),
168        })?;
169
170    let mut names = BTreeSet::new();
171    for child in row.children().filter(|node| node.is_element()) {
172        if !matches_namespace(child, namespace) {
173            continue;
174        }
175        names.insert(child.tag_name().name().to_string());
176    }
177    Ok(names.into_iter().collect())
178}
179
180fn read_xml_file(
181    input_path: &Path,
182    columns: &[config::ColumnConfig],
183    row_tag: &str,
184    namespace: Option<&str>,
185    value_tag: Option<&str>,
186) -> Result<DataFrame, XmlReadError> {
187    let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
188        rule: "xml_parse_error".to_string(),
189        message: format!("failed to read xml at {}: {err}", input_path.display()),
190    })?;
191    let doc = Document::parse(&content).map_err(|err| XmlReadError {
192        rule: "xml_parse_error".to_string(),
193        message: format!("xml parse error: {err}"),
194    })?;
195    let plans = build_selector_plan(columns)?;
196    let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();
197
198    for row in doc
199        .descendants()
200        .filter(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
201    {
202        let mut record = BTreeMap::new();
203        for plan in &plans {
204            let value = evaluate_selector(row, &plan.tokens, namespace, value_tag);
205            record.insert(plan.source.clone(), value);
206        }
207        rows.push(record);
208    }
209
210    if rows.is_empty() {
211        return Err(XmlReadError {
212            rule: "xml_parse_error".to_string(),
213            message: format!("row_tag={} produced no rows", row_tag),
214        });
215    }
216
217    let columns = plans
218        .iter()
219        .map(|plan| plan.source.clone())
220        .collect::<Vec<_>>();
221    build_dataframe(&columns, &rows)
222}
223
224fn build_dataframe(
225    columns: &[String],
226    rows: &[BTreeMap<String, Option<String>>],
227) -> Result<DataFrame, XmlReadError> {
228    let mut series = Vec::with_capacity(columns.len());
229    for name in columns {
230        let mut values = Vec::with_capacity(rows.len());
231        for row in rows {
232            values.push(row.get(name).cloned().unwrap_or(None));
233        }
234        series.push(Series::new(name.as_str().into(), values).into());
235    }
236
237    DataFrame::new(series).map_err(|err| XmlReadError {
238        rule: "xml_parse_error".to_string(),
239        message: format!("failed to build dataframe: {err}"),
240    })
241}
242
243fn xml_options(
244    entity: &config::EntityConfig,
245) -> Result<(String, Option<String>, Option<String>), XmlReadError> {
246    let options = entity.source.options.as_ref().ok_or_else(|| XmlReadError {
247        rule: "xml_parse_error".to_string(),
248        message: "source.options is required for xml input".to_string(),
249    })?;
250    let row_tag = options
251        .row_tag
252        .as_deref()
253        .ok_or_else(|| XmlReadError {
254            rule: "xml_parse_error".to_string(),
255            message: "source.options.row_tag is required for xml input".to_string(),
256        })?
257        .trim()
258        .to_string();
259    if row_tag.is_empty() {
260        return Err(XmlReadError {
261            rule: "xml_parse_error".to_string(),
262            message: "source.options.row_tag is required for xml input".to_string(),
263        });
264    }
265    let namespace = options
266        .namespace
267        .as_deref()
268        .map(|value| value.trim())
269        .filter(|value| !value.is_empty())
270        .map(|value| value.to_string());
271    let value_tag = options
272        .value_tag
273        .as_deref()
274        .map(|value| value.trim())
275        .filter(|value| !value.is_empty())
276        .map(|value| value.to_string());
277    Ok((row_tag, namespace, value_tag))
278}
279
280impl InputAdapter for XmlInputAdapter {
281    fn format(&self) -> &'static str {
282        "xml"
283    }
284
285    fn read_input_columns(
286        &self,
287        entity: &config::EntityConfig,
288        input_file: &InputFile,
289        _columns: &[config::ColumnConfig],
290    ) -> Result<Vec<String>, FileReadError> {
291        let (row_tag, namespace, _value_tag) =
292            xml_options(entity).map_err(|err| FileReadError {
293                rule: err.rule,
294                message: err.message,
295            })?;
296        read_xml_columns(
297            &input_file.source_local_path,
298            &row_tag,
299            namespace.as_deref(),
300        )
301        .map_err(|err| FileReadError {
302            rule: err.rule,
303            message: err.message,
304        })
305    }
306
307    fn read_inputs(
308        &self,
309        entity: &config::EntityConfig,
310        files: &[InputFile],
311        columns: &[config::ColumnConfig],
312        normalize_strategy: Option<&str>,
313        collect_raw: bool,
314    ) -> FloeResult<Vec<ReadInput>> {
315        let mut inputs = Vec::with_capacity(files.len());
316        let (row_tag, namespace, value_tag) = match xml_options(entity) {
317            Ok(options) => options,
318            Err(err) => {
319                for input_file in files {
320                    inputs.push(ReadInput::FileError {
321                        input_file: input_file.clone(),
322                        error: FileReadError {
323                            rule: err.rule.clone(),
324                            message: err.message.clone(),
325                        },
326                    });
327                }
328                return Ok(inputs);
329            }
330        };
331
332        for input_file in files {
333            let path = &input_file.source_local_path;
334            let read_result = read_xml_file(
335                path,
336                columns,
337                &row_tag,
338                namespace.as_deref(),
339                value_tag.as_deref(),
340            );
341            match read_result {
342                Ok(df) => {
343                    let input = format::read_input_from_df(
344                        input_file,
345                        &df,
346                        columns,
347                        normalize_strategy,
348                        collect_raw,
349                    )?;
350                    inputs.push(input);
351                }
352                Err(err) => {
353                    inputs.push(ReadInput::FileError {
354                        input_file: input_file.clone(),
355                        error: FileReadError {
356                            rule: err.rule,
357                            message: err.message,
358                        },
359                    });
360                }
361            }
362        }
363        Ok(inputs)
364    }
365}