floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::{BTreeMap, BTreeSet};
use std::path::Path;

use polars::prelude::{DataFrame, NamedFrom, Series};
use roxmltree::{Document, Node};

use crate::io::format::{self, FileReadError, InputAdapter, LocalInputFile, ReadInput};
use crate::io::read::xml_selector::{parse_selector, SelectorToken};
use crate::{config, FloeResult};

struct XmlInputAdapter;

static XML_INPUT_ADAPTER: XmlInputAdapter = XmlInputAdapter;

pub(crate) fn xml_input_adapter() -> &'static dyn InputAdapter {
    &XML_INPUT_ADAPTER
}

#[derive(Debug, Clone)]
pub struct XmlReadError {
    pub rule: String,
    pub message: String,
}

impl std::fmt::Display for XmlReadError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}: {}", self.rule, self.message)
    }
}

impl std::error::Error for XmlReadError {}

struct SelectorPlan {
    source: String,
    tokens: Vec<SelectorToken>,
}

fn build_selector_plan(
    columns: &[config::ColumnConfig],
) -> Result<Vec<SelectorPlan>, XmlReadError> {
    let mut plans = Vec::with_capacity(columns.len());
    let mut seen = std::collections::HashSet::new();
    for column in columns {
        let source = column.source_or_name().to_string();
        if !seen.insert(source.clone()) {
            return Err(XmlReadError {
                rule: "xml_selector_invalid".to_string(),
                message: format!("duplicate xml selector source: {}", source),
            });
        }
        let tokens = parse_selector(&source).map_err(|err| XmlReadError {
            rule: "xml_selector_invalid".to_string(),
            message: format!("invalid selector {}: {}", source, err.message),
        })?;
        plans.push(SelectorPlan { source, tokens });
    }
    Ok(plans)
}

fn split_tag(tag: &str) -> (Option<&str>, &str) {
    if let Some((prefix, local)) = tag.split_once(':') {
        (Some(prefix), local)
    } else {
        (None, tag)
    }
}

fn matches_tag(node: Node<'_, '_>, tag: &str, namespace: Option<&str>) -> bool {
    let (prefix, local) = split_tag(tag);
    let name = node.tag_name();
    if name.name() != local {
        return false;
    }
    if prefix.is_some() {
        if let Some(ns) = namespace {
            return name.namespace() == Some(ns);
        }
        return true;
    }
    namespace.is_none_or(|ns| name.namespace() == Some(ns))
}

fn matches_namespace(node: Node<'_, '_>, namespace: Option<&str>) -> bool {
    namespace.is_none_or(|ns| node.tag_name().namespace() == Some(ns))
}

fn collect_text(node: Node<'_, '_>) -> Option<String> {
    let mut text = String::new();
    for descendant in node.descendants() {
        if descendant.is_text() {
            if let Some(value) = descendant.text() {
                text.push_str(value);
            }
        }
    }
    let trimmed = text.trim();
    if trimmed.is_empty() {
        None
    } else {
        Some(trimmed.to_string())
    }
}

fn resolve_value_node<'a>(
    node: Node<'a, 'a>,
    value_tag: Option<&str>,
    namespace: Option<&str>,
) -> Node<'a, 'a> {
    let Some(value_tag) = value_tag else {
        return node;
    };
    for descendant in node.descendants().skip(1) {
        if descendant.is_element() && matches_tag(descendant, value_tag, namespace) {
            return descendant;
        }
    }
    node
}

fn find_child<'a>(node: Node<'a, 'a>, tag: &str, namespace: Option<&str>) -> Option<Node<'a, 'a>> {
    node.children()
        .find(|child| child.is_element() && matches_tag(*child, tag, namespace))
}

fn evaluate_selector(
    row: Node<'_, '_>,
    tokens: &[SelectorToken],
    namespace: Option<&str>,
    value_tag: Option<&str>,
) -> Option<String> {
    let mut current = row;
    for token in tokens {
        match token {
            SelectorToken::Element(tag) => {
                let next = find_child(current, tag.as_str(), namespace)?;
                current = next;
            }
            SelectorToken::Attribute(attr) => {
                return current
                    .attribute(attr.as_str())
                    .map(|value| value.to_string());
            }
        }
    }
    let value_node = resolve_value_node(current, value_tag, namespace);
    collect_text(value_node)
}

fn read_xml_columns(
    input_path: &Path,
    row_tag: &str,
    namespace: Option<&str>,
) -> Result<Vec<String>, XmlReadError> {
    let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
        rule: "xml_parse_error".to_string(),
        message: format!("failed to read xml at {}: {err}", input_path.display()),
    })?;
    let doc = Document::parse(&content).map_err(|err| XmlReadError {
        rule: "xml_parse_error".to_string(),
        message: format!("xml parse error: {err}"),
    })?;
    let row = doc
        .descendants()
        .find(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
        .ok_or_else(|| XmlReadError {
            rule: "xml_parse_error".to_string(),
            message: format!("row_tag={} not found in xml", row_tag),
        })?;

    let mut names = BTreeSet::new();
    for child in row.children().filter(|node| node.is_element()) {
        if !matches_namespace(child, namespace) {
            continue;
        }
        names.insert(child.tag_name().name().to_string());
    }
    Ok(names.into_iter().collect())
}

fn read_xml_file(
    input_path: &Path,
    columns: &[config::ColumnConfig],
    row_tag: &str,
    namespace: Option<&str>,
    value_tag: Option<&str>,
) -> Result<DataFrame, XmlReadError> {
    let content = std::fs::read_to_string(input_path).map_err(|err| XmlReadError {
        rule: "xml_parse_error".to_string(),
        message: format!("failed to read xml at {}: {err}", input_path.display()),
    })?;
    let doc = Document::parse(&content).map_err(|err| XmlReadError {
        rule: "xml_parse_error".to_string(),
        message: format!("xml parse error: {err}"),
    })?;
    let plans = build_selector_plan(columns)?;
    let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();

    for row in doc
        .descendants()
        .filter(|node| node.is_element() && matches_tag(*node, row_tag, namespace))
    {
        let mut record = BTreeMap::new();
        for plan in &plans {
            let value = evaluate_selector(row, &plan.tokens, namespace, value_tag);
            record.insert(plan.source.clone(), value);
        }
        rows.push(record);
    }

    if rows.is_empty() {
        return Err(XmlReadError {
            rule: "xml_parse_error".to_string(),
            message: format!("row_tag={} produced no rows", row_tag),
        });
    }

    let columns = plans
        .iter()
        .map(|plan| plan.source.clone())
        .collect::<Vec<_>>();
    build_dataframe(&columns, &rows)
}

fn build_dataframe(
    columns: &[String],
    rows: &[BTreeMap<String, Option<String>>],
) -> Result<DataFrame, XmlReadError> {
    let mut series = Vec::with_capacity(columns.len());
    for name in columns {
        let mut values = Vec::with_capacity(rows.len());
        for row in rows {
            values.push(row.get(name).cloned().unwrap_or(None));
        }
        series.push(Series::new(name.as_str().into(), values).into());
    }

    DataFrame::new(series).map_err(|err| XmlReadError {
        rule: "xml_parse_error".to_string(),
        message: format!("failed to build dataframe: {err}"),
    })
}

fn xml_options(
    entity: &config::EntityConfig,
) -> Result<(String, Option<String>, Option<String>), XmlReadError> {
    let options = entity.source.options.as_ref().ok_or_else(|| XmlReadError {
        rule: "xml_parse_error".to_string(),
        message: "source.options is required for xml input".to_string(),
    })?;
    let row_tag = options
        .row_tag
        .as_deref()
        .ok_or_else(|| XmlReadError {
            rule: "xml_parse_error".to_string(),
            message: "source.options.row_tag is required for xml input".to_string(),
        })?
        .trim()
        .to_string();
    if row_tag.is_empty() {
        return Err(XmlReadError {
            rule: "xml_parse_error".to_string(),
            message: "source.options.row_tag is required for xml input".to_string(),
        });
    }
    let namespace = options
        .namespace
        .as_deref()
        .map(|value| value.trim())
        .filter(|value| !value.is_empty())
        .map(|value| value.to_string());
    let value_tag = options
        .value_tag
        .as_deref()
        .map(|value| value.trim())
        .filter(|value| !value.is_empty())
        .map(|value| value.to_string());
    Ok((row_tag, namespace, value_tag))
}

impl InputAdapter for XmlInputAdapter {
    fn format(&self) -> &'static str {
        "xml"
    }

    fn read_input_columns(
        &self,
        entity: &config::EntityConfig,
        input_file: &LocalInputFile,
        _columns: &[config::ColumnConfig],
    ) -> Result<Vec<String>, FileReadError> {
        let (row_tag, namespace, _value_tag) =
            xml_options(entity).map_err(|err| FileReadError {
                rule: err.rule,
                message: err.message,
            })?;
        read_xml_columns(&input_file.local_path, &row_tag, namespace.as_deref()).map_err(|err| {
            FileReadError {
                rule: err.rule,
                message: err.message,
            }
        })
    }

    fn read_inputs(
        &self,
        entity: &config::EntityConfig,
        files: &[LocalInputFile],
        columns: &[config::ColumnConfig],
        normalize_strategy: Option<&str>,
        collect_raw: bool,
    ) -> FloeResult<Vec<ReadInput>> {
        let mut inputs = Vec::with_capacity(files.len());
        let (row_tag, namespace, value_tag) = match xml_options(entity) {
            Ok(options) => options,
            Err(err) => {
                for input_file in files {
                    inputs.push(ReadInput::FileError {
                        input_file: input_file.file.clone(),
                        error: FileReadError {
                            rule: err.rule.clone(),
                            message: err.message.clone(),
                        },
                    });
                }
                return Ok(inputs);
            }
        };

        for input_file in files {
            let path = &input_file.local_path;
            let read_result = read_xml_file(
                path,
                columns,
                &row_tag,
                namespace.as_deref(),
                value_tag.as_deref(),
            );
            match read_result {
                Ok(df) => {
                    let input = format::read_input_from_df(
                        input_file,
                        &df,
                        columns,
                        normalize_strategy,
                        collect_raw,
                    )?;
                    inputs.push(input);
                }
                Err(err) => {
                    inputs.push(ReadInput::FileError {
                        input_file: input_file.file.clone(),
                        error: FileReadError {
                            rule: err.rule,
                            message: err.message,
                        },
                    });
                }
            }
        }
        Ok(inputs)
    }
}