1use std::collections::{BTreeMap, BTreeSet};
2use std::path::Path;
3
4use polars::prelude::{DataFrame, NamedFrom, Series};
5use roxmltree::{Document, Node};
6
7use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
8use crate::io::read::xml_selector::{parse_selector, SelectorToken};
9use crate::{config, FloeResult};
10
11struct XmlInputAdapter;
12
13static XML_INPUT_ADAPTER: XmlInputAdapter = XmlInputAdapter;
14
15pub(crate) fn xml_input_adapter() -> &'static dyn InputAdapter {
16 &XML_INPUT_ADAPTER
17}
18
19#[derive(Debug, Clone)]
20pub struct XmlReadError {
21 pub rule: String,
22 pub message: String,
23}
24
25impl std::fmt::Display for XmlReadError {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 write!(f, "{}: {}", self.rule, self.message)
28 }
29}
30
31impl std::error::Error for XmlReadError {}
32
33struct SelectorPlan {
34 source: String,
35 tokens: Vec<SelectorToken>,
36}
37
38fn build_selector_plan(
39 columns: &[config::ColumnConfig],
40) -> Result<Vec<SelectorPlan>, XmlReadError> {
41 let mut plans = Vec::with_capacity(columns.len());
42 let mut seen = std::collections::HashSet::new();
43 for column in columns {
44 let source = column.source_or_name().to_string();
45 if !seen.insert(source.clone()) {
46 return Err(XmlReadError {
47 rule: "xml_selector_invalid".to_string(),
48 message: format!("duplicate xml selector source: {}", source),
49 });
50 }
51 let tokens = parse_selector(&source).map_err(|err| XmlReadError {
52 rule: "xml_selector_invalid".to_string(),
53 message: format!("invalid selector {}: {}", source, err.message),
54 })?;
55 plans.push(SelectorPlan { source, tokens });
56 }
57 Ok(plans)
58}
59
60fn split_tag(tag: &str) -> (Option<&str>, &str) {
61 if let Some((prefix, local)) = tag.split_once(':') {
62 (Some(prefix), local)
63 } else {
64 (None, tag)
65 }
66}
67
68fn matches_tag(node: Node<'_, '_>, tag: &str, namespace: Option<&str>) -> bool {
69 let (prefix, local) = split_tag(tag);
70 let name = node.tag_name();
71 if name.name() != local {
72 return false;
73 }
74 if prefix.is_some() {
75 if let Some(ns) = namespace {
76 return name.namespace() == Some(ns);
77 }
78 return true;
79 }
80 namespace.is_none_or(|ns| name.namespace() == Some(ns))
81}
82
83fn matches_namespace(node: Node<'_, '_>, namespace: Option<&str>) -> bool {
84 namespace.is_none_or(|ns| node.tag_name().namespace() == Some(ns))
85}
86
87fn collect_text(node: Node<'_, '_>) -> Option<String> {
88 let mut text = String::new();
89 for descendant in node.descendants() {
90 if descendant.is_text() {
91 if let Some(value) = descendant.text() {
92 text.push_str(value);
93 }
94 }
95 }
96 let trimmed = text.trim();
97 if trimmed.is_empty() {
98 None
99 } else {
100 Some(trimmed.to_string())
101 }
102}
103
104fn resolve_value_node<'a>(
105 node: Node<'a, 'a>,
106 value_tag: Option<&str>,
107 namespace: Option<&str>,
108) -> Node<'a, 'a> {
109 let Some(value_tag) = value_tag else {
110 return node;
111 };
112 for descendant in node.descendants().skip(1) {
113 if descendant.is_element() && matches_tag(descendant, value_tag, namespace) {
114 return descendant;
115 }
116 }
117 node
118}
119
120fn find_child<'a>(node: Node<'a, 'a>, tag: &str, namespace: Option<&str>) -> Option<Node<'a, 'a>> {
121 node.children()
122 .find(|child| child.is_element() && matches_tag(*child, tag, namespace))
123}
124
125fn evaluate_selector(
126 row: Node<'_, '_>,
127 tokens: &[SelectorToken],
128 namespace: Option<&str>,
129 value_tag: Option<&str>,
130) -> Option<String> {
131 let mut current = row;
132 for token in tokens {
133 match token {
134 SelectorToken::Element(tag) => {
135 let next = find_child(current, tag.as_str(), namespace)?;
136 current = next;
137 }
138 SelectorToken::Attribute(attr) => {
139 return current
140 .attribute(attr.as_str())
141 .map(|value| value.to_string());
142 }
143 }
144 }
145 let value_node = resolve_value_node(current, value_tag, namespace);
146 collect_text(value_node)
147}
148
149fn read_xml_columns(
150 input_path: &Path,
151 row_tag: &str,
152 namespace: Option<&str>,
153) -> Result<Vec<String>, XmlReadError> {
154 let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
155 rule: "xml_parse_error".to_string(),
156 message: format!("failed to read xml at {}: {err}", input_path.display()),
157 })?;
158 let doc = Document::parse(&content).map_err(|err| XmlReadError {
159 rule: "xml_parse_error".to_string(),
160 message: format!("xml parse error: {err}"),
161 })?;
162 let row = doc
163 .descendants()
164 .find(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
165 .ok_or_else(|| XmlReadError {
166 rule: "xml_parse_error".to_string(),
167 message: format!("row_tag={} not found in xml", row_tag),
168 })?;
169
170 let mut names = BTreeSet::new();
171 for child in row.children().filter(|node| node.is_element()) {
172 if !matches_namespace(child, namespace) {
173 continue;
174 }
175 names.insert(child.tag_name().name().to_string());
176 }
177 Ok(names.into_iter().collect())
178}
179
180fn read_xml_file(
181 input_path: &Path,
182 columns: &[config::ColumnConfig],
183 row_tag: &str,
184 namespace: Option<&str>,
185 value_tag: Option<&str>,
186) -> Result<DataFrame, XmlReadError> {
187 let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
188 rule: "xml_parse_error".to_string(),
189 message: format!("failed to read xml at {}: {err}", input_path.display()),
190 })?;
191 let doc = Document::parse(&content).map_err(|err| XmlReadError {
192 rule: "xml_parse_error".to_string(),
193 message: format!("xml parse error: {err}"),
194 })?;
195 let plans = build_selector_plan(columns)?;
196 let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();
197
198 for row in doc
199 .descendants()
200 .filter(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
201 {
202 let mut record = BTreeMap::new();
203 for plan in &plans {
204 let value = evaluate_selector(row, &plan.tokens, namespace, value_tag);
205 record.insert(plan.source.clone(), value);
206 }
207 rows.push(record);
208 }
209
210 if rows.is_empty() {
211 return Err(XmlReadError {
212 rule: "xml_parse_error".to_string(),
213 message: format!("row_tag={} produced no rows", row_tag),
214 });
215 }
216
217 let columns = plans
218 .iter()
219 .map(|plan| plan.source.clone())
220 .collect::<Vec<_>>();
221 build_dataframe(&columns, &rows)
222}
223
224fn build_dataframe(
225 columns: &[String],
226 rows: &[BTreeMap<String, Option<String>>],
227) -> Result<DataFrame, XmlReadError> {
228 let mut series = Vec::with_capacity(columns.len());
229 for name in columns {
230 let mut values = Vec::with_capacity(rows.len());
231 for row in rows {
232 values.push(row.get(name).cloned().unwrap_or(None));
233 }
234 series.push(Series::new(name.as_str().into(), values).into());
235 }
236
237 DataFrame::new(series).map_err(|err| XmlReadError {
238 rule: "xml_parse_error".to_string(),
239 message: format!("failed to build dataframe: {err}"),
240 })
241}
242
243fn xml_options(
244 entity: &config::EntityConfig,
245) -> Result<(String, Option<String>, Option<String>), XmlReadError> {
246 let options = entity.source.options.as_ref().ok_or_else(|| XmlReadError {
247 rule: "xml_parse_error".to_string(),
248 message: "source.options is required for xml input".to_string(),
249 })?;
250 let row_tag = options
251 .row_tag
252 .as_deref()
253 .ok_or_else(|| XmlReadError {
254 rule: "xml_parse_error".to_string(),
255 message: "source.options.row_tag is required for xml input".to_string(),
256 })?
257 .trim()
258 .to_string();
259 if row_tag.is_empty() {
260 return Err(XmlReadError {
261 rule: "xml_parse_error".to_string(),
262 message: "source.options.row_tag is required for xml input".to_string(),
263 });
264 }
265 let namespace = options
266 .namespace
267 .as_deref()
268 .map(|value| value.trim())
269 .filter(|value| !value.is_empty())
270 .map(|value| value.to_string());
271 let value_tag = options
272 .value_tag
273 .as_deref()
274 .map(|value| value.trim())
275 .filter(|value| !value.is_empty())
276 .map(|value| value.to_string());
277 Ok((row_tag, namespace, value_tag))
278}
279
280impl InputAdapter for XmlInputAdapter {
281 fn format(&self) -> &'static str {
282 "xml"
283 }
284
285 fn read_input_columns(
286 &self,
287 entity: &config::EntityConfig,
288 input_file: &InputFile,
289 _columns: &[config::ColumnConfig],
290 ) -> Result<Vec<String>, FileReadError> {
291 let (row_tag, namespace, _value_tag) =
292 xml_options(entity).map_err(|err| FileReadError {
293 rule: err.rule,
294 message: err.message,
295 })?;
296 read_xml_columns(
297 &input_file.source_local_path,
298 &row_tag,
299 namespace.as_deref(),
300 )
301 .map_err(|err| FileReadError {
302 rule: err.rule,
303 message: err.message,
304 })
305 }
306
307 fn read_inputs(
308 &self,
309 entity: &config::EntityConfig,
310 files: &[InputFile],
311 columns: &[config::ColumnConfig],
312 normalize_strategy: Option<&str>,
313 collect_raw: bool,
314 ) -> FloeResult<Vec<ReadInput>> {
315 let mut inputs = Vec::with_capacity(files.len());
316 let (row_tag, namespace, value_tag) = match xml_options(entity) {
317 Ok(options) => options,
318 Err(err) => {
319 for input_file in files {
320 inputs.push(ReadInput::FileError {
321 input_file: input_file.clone(),
322 error: FileReadError {
323 rule: err.rule.clone(),
324 message: err.message.clone(),
325 },
326 });
327 }
328 return Ok(inputs);
329 }
330 };
331
332 for input_file in files {
333 let path = &input_file.source_local_path;
334 let read_result = read_xml_file(
335 path,
336 columns,
337 &row_tag,
338 namespace.as_deref(),
339 value_tag.as_deref(),
340 );
341 match read_result {
342 Ok(df) => {
343 let input = format::read_input_from_df(
344 input_file,
345 &df,
346 columns,
347 normalize_strategy,
348 collect_raw,
349 )?;
350 inputs.push(input);
351 }
352 Err(err) => {
353 inputs.push(ReadInput::FileError {
354 input_file: input_file.clone(),
355 error: FileReadError {
356 rule: err.rule,
357 message: err.message,
358 },
359 });
360 }
361 }
362 }
363 Ok(inputs)
364 }
365}