floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::HashSet;
use std::io::BufRead;
use std::path::Path;

use polars::prelude::{DataFrame, NamedFrom, Series};
use serde_json::Value;

use crate::io::format::{self, FileReadError, InputAdapter, LocalInputFile, ReadInput};
use crate::io::read::json_selector::{
    compact_json, evaluate_selector, parse_selector, SelectorToken, SelectorValue,
};
use crate::{config, FloeResult};

struct JsonInputAdapter;

static JSON_INPUT_ADAPTER: JsonInputAdapter = JsonInputAdapter;

pub(crate) fn json_input_adapter() -> &'static dyn InputAdapter {
    &JSON_INPUT_ADAPTER
}

#[derive(Debug, Clone)]
pub struct JsonReadError {
    pub rule: String,
    pub message: String,
}

impl std::fmt::Display for JsonReadError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}: {}", self.rule, self.message)
    }
}

impl std::error::Error for JsonReadError {}

struct SelectorPlan {
    source: String,
    tokens: Vec<SelectorToken>,
    top_level_key: Option<String>,
}

fn build_selector_plan(
    columns: &[config::ColumnConfig],
) -> Result<Vec<SelectorPlan>, JsonReadError> {
    let mut plans = Vec::with_capacity(columns.len());
    let mut seen = HashSet::new();
    for column in columns {
        let source = column.source_or_name().to_string();
        if !seen.insert(source.clone()) {
            return Err(JsonReadError {
                rule: "json_selector_invalid".to_string(),
                message: format!("duplicate json selector source: {}", source),
            });
        }
        let tokens = parse_selector(&source).map_err(|err| JsonReadError {
            rule: "json_selector_invalid".to_string(),
            message: format!("invalid selector {}: {}", source, err.message),
        })?;
        let top_level_key = match tokens.as_slice() {
            [SelectorToken::Field(name)] => Some(name.clone()),
            _ => None,
        };
        plans.push(SelectorPlan {
            source,
            tokens,
            top_level_key,
        });
    }
    Ok(plans)
}

fn evaluate_selector_plan(
    value: &Value,
    plan: &SelectorPlan,
) -> Result<SelectorValue, JsonReadError> {
    if let Some(key) = plan.top_level_key.as_deref() {
        let Some(object) = value.as_object() else {
            return Ok(SelectorValue::Null);
        };
        let Some(current) = object.get(key) else {
            return Ok(SelectorValue::Null);
        };
        if current.is_null() {
            return Ok(SelectorValue::Null);
        }
        if current.is_object() || current.is_array() {
            return Ok(SelectorValue::NonScalar(current.clone()));
        }
        return match current {
            Value::String(value) => Ok(SelectorValue::Scalar(value.clone())),
            Value::Bool(value) => Ok(SelectorValue::Scalar(value.to_string())),
            Value::Number(value) => Ok(SelectorValue::Scalar(value.to_string())),
            Value::Null => Ok(SelectorValue::Null),
            Value::Object(_) | Value::Array(_) => Ok(SelectorValue::NonScalar(current.clone())),
        };
    }

    evaluate_selector(value, &plan.tokens).map_err(|err| JsonReadError {
        rule: "json_selector_invalid".to_string(),
        message: format!("invalid selector {}: {}", plan.source, err.message),
    })
}

fn append_extracted_row_into(
    value: &Value,
    plans: &[SelectorPlan],
    cast_mode: &str,
    location_kind: &'static str,
    location_index: usize,
    output_columns: &mut [Vec<Option<String>>],
) -> Result<(), JsonReadError> {
    for (output, plan) in output_columns.iter_mut().zip(plans) {
        let selected = evaluate_selector_plan(value, plan)?;
        let cell = match selected {
            SelectorValue::Null => None,
            SelectorValue::Scalar(value) => Some(value),
            SelectorValue::NonScalar(value) => {
                let location = format!("{location_kind} {location_index}");
                if cast_mode == "coerce" {
                    Some(compact_json(&value).map_err(|err| JsonReadError {
                        rule: "json_selector_non_scalar".to_string(),
                        message: format!(
                            "failed to stringify selector {} at {}: {}",
                            plan.source, location, err.message
                        ),
                    })?)
                } else {
                    return Err(JsonReadError {
                        rule: "json_selector_non_scalar".to_string(),
                        message: format!(
                            "non-scalar value for selector {} at {}",
                            plan.source, location
                        ),
                    });
                }
            }
        };
        output.push(cell);
    }
    Ok(())
}

fn read_ndjson_file(
    input_path: &Path,
    columns: &[config::ColumnConfig],
    cast_mode: &str,
) -> Result<DataFrame, JsonReadError> {
    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("failed to read json at {}: {err}", input_path.display()),
    })?;

    let plans = build_selector_plan(columns)?;
    let mut column_values: Vec<Vec<Option<String>>> = vec![Vec::new(); plans.len()];
    for (idx, line) in content.lines().enumerate() {
        let line = line.trim();
        if line.is_empty() {
            continue;
        }
        let value: Value = serde_json::from_str(line).map_err(|err| JsonReadError {
            rule: "json_parse_error".to_string(),
            message: format!("json parse error at line {}: {err}", idx + 1),
        })?;
        if !value.is_object() {
            return Err(JsonReadError {
                rule: "json_parse_error".to_string(),
                message: format!("expected json object at line {}", idx + 1),
            });
        }

        append_extracted_row_into(
            &value,
            &plans,
            cast_mode,
            "line",
            idx + 1,
            &mut column_values,
        )?;
    }

    build_dataframe(&plans, column_values)
}

fn read_ndjson_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
    let file = std::fs::File::open(input_path).map_err(|err| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("failed to read json at {}: {err}", input_path.display()),
    })?;
    let reader = std::io::BufReader::new(file);
    let mut first_error: Option<JsonReadError> = None;

    for (idx, line) in reader.lines().enumerate() {
        let line = line.map_err(|err| JsonReadError {
            rule: "json_parse_error".to_string(),
            message: format!("failed to read json at {}: {err}", input_path.display()),
        })?;
        let line = line.trim();
        if line.is_empty() {
            continue;
        }
        match serde_json::from_str::<Value>(line) {
            Ok(value) => {
                let object = value.as_object().ok_or_else(|| JsonReadError {
                    rule: "json_parse_error".to_string(),
                    message: format!("expected json object at line {}", idx + 1),
                })?;
                let mut keys = object.keys().cloned().collect::<Vec<_>>();
                keys.sort();
                return Ok(keys);
            }
            Err(err) => {
                if first_error.is_none() {
                    first_error = Some(JsonReadError {
                        rule: "json_parse_error".to_string(),
                        message: format!("json parse error at line {}: {err}", idx + 1),
                    });
                }
                continue;
            }
        }
    }

    Err(first_error.unwrap_or_else(|| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("no json objects found in {}", input_path.display()),
    }))
}

fn read_json_array_file(
    input_path: &Path,
    columns: &[config::ColumnConfig],
    cast_mode: &str,
) -> Result<DataFrame, JsonReadError> {
    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("failed to read json at {}: {err}", input_path.display()),
    })?;

    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("json parse error: {err}"),
    })?;
    let array = value.as_array().ok_or_else(|| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: "expected json array at root".to_string(),
    })?;

    let plans = build_selector_plan(columns)?;
    let mut column_values: Vec<Vec<Option<String>>> = (0..plans.len())
        .map(|_| Vec::with_capacity(array.len()))
        .collect();

    for (idx, value) in array.iter().enumerate() {
        if !value.is_object() {
            return Err(JsonReadError {
                rule: "json_parse_error".to_string(),
                message: format!("expected json object at index {}", idx),
            });
        }
        append_extracted_row_into(value, &plans, cast_mode, "index", idx, &mut column_values)?;
    }

    build_dataframe(&plans, column_values)
}

fn read_json_array_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("failed to read json at {}: {err}", input_path.display()),
    })?;
    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("json parse error: {err}"),
    })?;
    let array = value.as_array().ok_or_else(|| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: "expected json array at root".to_string(),
    })?;
    if array.is_empty() {
        return Err(JsonReadError {
            rule: "json_parse_error".to_string(),
            message: "json array is empty".to_string(),
        });
    }
    let object = array[0].as_object().ok_or_else(|| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: "expected json object at index 0".to_string(),
    })?;
    let mut keys = object.keys().cloned().collect::<Vec<_>>();
    keys.sort();
    Ok(keys)
}

fn build_dataframe(
    plans: &[SelectorPlan],
    column_values: Vec<Vec<Option<String>>>,
) -> Result<DataFrame, JsonReadError> {
    let mut series = Vec::with_capacity(plans.len());
    for (plan, values) in plans.iter().zip(column_values) {
        series.push(Series::new(plan.source.as_str().into(), values).into());
    }

    DataFrame::new(series).map_err(|err| JsonReadError {
        rule: "json_parse_error".to_string(),
        message: format!("failed to build dataframe: {err}"),
    })
}

impl InputAdapter for JsonInputAdapter {
    fn format(&self) -> &'static str {
        "json"
    }

    fn read_input_columns(
        &self,
        entity: &config::EntityConfig,
        input_file: &LocalInputFile,
        _columns: &[config::ColumnConfig],
    ) -> Result<Vec<String>, FileReadError> {
        let json_mode = entity
            .source
            .options
            .as_ref()
            .and_then(|options| options.json_mode.as_deref())
            .unwrap_or("array");
        let path = &input_file.local_path;
        let read_result = match json_mode {
            "ndjson" => read_ndjson_columns(path),
            "array" => read_json_array_columns(path),
            other => Err(JsonReadError {
                rule: "json_parse_error".to_string(),
                message: format!(
                    "unsupported source.options.json_mode={} (allowed: array, ndjson)",
                    other
                ),
            }),
        };
        read_result.map_err(|err| FileReadError {
            rule: err.rule,
            message: err.message,
        })
    }

    fn read_inputs(
        &self,
        entity: &config::EntityConfig,
        files: &[LocalInputFile],
        columns: &[config::ColumnConfig],
        normalize_strategy: Option<&str>,
        collect_raw: bool,
    ) -> FloeResult<Vec<ReadInput>> {
        let mut inputs = Vec::with_capacity(files.len());
        let json_mode = entity
            .source
            .options
            .as_ref()
            .and_then(|options| options.json_mode.as_deref())
            .unwrap_or("array");
        let cast_mode = entity.source.cast_mode.as_deref().unwrap_or("strict");
        for input_file in files {
            let path = &input_file.local_path;
            let read_result = match json_mode {
                "ndjson" => read_ndjson_file(path, columns, cast_mode),
                "array" => read_json_array_file(path, columns, cast_mode),
                other => Err(JsonReadError {
                    rule: "json_parse_error".to_string(),
                    message: format!(
                        "unsupported source.options.json_mode={} (allowed: array, ndjson)",
                        other
                    ),
                }),
            };
            match read_result {
                Ok(df) => {
                    let input = format::read_input_from_df(
                        input_file,
                        &df,
                        columns,
                        normalize_strategy,
                        collect_raw,
                    )?;
                    inputs.push(input);
                }
                Err(err) => {
                    inputs.push(ReadInput::FileError {
                        input_file: input_file.file.clone(),
                        error: FileReadError {
                            rule: err.rule,
                            message: err.message,
                        },
                    });
                }
            }
        }
        Ok(inputs)
    }
}