use super::*;
pub(crate) fn read_json_input(
file: &std::path::Path,
ext: &str,
) -> Result<(Vec<String>, Vec<Vec<String>>, usize)> {
use finetype_core::json_reader;
if ext == "json" {
let content = std::fs::read_to_string(file)?;
let value: serde_json::Value = serde_json::from_str(&content)
.map_err(|e| anyhow::anyhow!("Malformed JSON in {:?}: {}", file, e))?;
match &value {
serde_json::Value::Array(arr) => {
let mut all_paths: indexmap::IndexMap<String, Vec<Option<String>>> =
indexmap::IndexMap::new();
let row_count = arr.len();
for item in arr {
let item_map = json_reader::collect_json(item);
for (path, values) in item_map.all_paths() {
let entry = all_paths.entry(path.clone()).or_default();
entry.extend(values.iter().cloned());
}
for (path, values) in &mut all_paths {
if !item_map.all_paths().contains_key(path) {
values.push(None);
}
}
}
let headers: Vec<String> = all_paths.keys().cloned().collect();
let columns: Vec<Vec<String>> = all_paths
.values()
.map(|vals| {
vals.iter()
.filter_map(|v| v.clone())
.filter(|v| !v.is_empty())
.collect()
})
.collect();
eprintln!(
"Found {} paths across {} array elements",
headers.len(),
row_count
);
Ok((headers, columns, row_count))
}
serde_json::Value::Object(_) => {
let path_map = json_reader::collect_json(&value);
let headers: Vec<String> = path_map.paths().cloned().collect();
let columns: Vec<Vec<String>> = headers
.iter()
.map(|h| {
path_map
.get(h)
.map(|vals| {
vals.iter()
.filter_map(|v| v.clone())
.filter(|v| !v.is_empty())
.collect()
})
.unwrap_or_default()
})
.collect();
let row_count = path_map.row_count();
eprintln!("Found {} paths in single JSON document", headers.len());
Ok((headers, columns, row_count))
}
_ => {
anyhow::bail!(
"JSON input must be an object or array of objects, got scalar value in {:?}",
file
);
}
}
} else {
let reader = std::fs::File::open(file)?;
let path_map = json_reader::collect_ndjson(reader)
.map_err(|e| anyhow::anyhow!("Error reading NDJSON from {:?}: {}", file, e))?;
let headers: Vec<String> = path_map.paths().cloned().collect();
let columns: Vec<Vec<String>> = headers
.iter()
.map(|h| {
path_map
.get(h)
.map(|vals| {
vals.iter()
.filter_map(|v| v.clone())
.filter(|v| !v.is_empty())
.collect()
})
.unwrap_or_default()
})
.collect();
let row_count = path_map.row_count();
eprintln!(
"Found {} paths across {} NDJSON documents",
headers.len(),
row_count
);
Ok((headers, columns, row_count))
}
}
const NULLISH_TOKENS: [&str; 7] = ["NULL", "null", "NA", "N/A", "nan", "NaN", "None"];
fn is_nullish(trimmed: &str) -> bool {
trimmed.is_empty() || NULLISH_TOKENS.contains(&trimmed)
}
pub(crate) fn read_csv_input(
file: &std::path::Path,
delimiter: Option<char>,
) -> Result<(Vec<String>, Vec<Vec<String>>, usize)> {
let is_parquet = file
.extension()
.and_then(|s| s.to_str())
.map(|s| s.eq_ignore_ascii_case("parquet"))
.unwrap_or(false);
let input_literal = crate::sql::sql_quote(&file.to_string_lossy());
let (source, csv_opts) = if is_parquet {
(
format!("SELECT COLUMNS(*)::VARCHAR FROM read_parquet({input_literal})"),
None,
)
} else {
let mut opts = String::from("auto_detect=true, all_varchar=true, null_padding=true");
if let Some(delim) = delimiter {
opts.push_str(", sep=");
opts.push_str(&crate::sql::sql_quote(&delim.to_string()));
}
(
format!("SELECT * FROM read_csv({input_literal}, {opts})"),
Some(opts),
)
};
let run_duckdb = |source: &str| -> Result<std::process::Output> {
let query = format!("{source};");
std::process::Command::new("duckdb")
.arg("-csv")
.arg("-c")
.arg(".nullvalue ''")
.arg("-c")
.arg(&query)
.output()
.map_err(|e| {
anyhow::anyhow!(
"could not invoke duckdb CLI (is duckdb on PATH?): {e}. \
Install it from https://duckdb.org/docs/installation"
)
})
};
let mut out = run_duckdb(&source)?;
if !out.status.success() {
if let Some(opts) = csv_opts {
let retry = format!("SELECT * FROM read_csv({input_literal}, {opts}, parallel=false)");
out = run_duckdb(&retry)?;
}
}
if !out.status.success() {
anyhow::bail!(
"duckdb failed to read {:?}: {}",
file,
String::from_utf8_lossy(&out.stderr).trim()
);
}
let mut reader = csv::ReaderBuilder::new().from_reader(out.stdout.as_slice());
let headers: Vec<String> = reader.headers()?.iter().map(|h| h.to_string()).collect();
let n_cols = headers.len();
eprintln!("Found {} columns: {:?}", n_cols, headers);
let mut columns: Vec<Vec<String>> = vec![Vec::new(); n_cols];
let mut row_count = 0;
for result in reader.records() {
let record = result?;
row_count += 1;
for (i, field) in record.iter().enumerate() {
if i < n_cols {
let trimmed = field.trim();
if !is_nullish(trimmed) {
columns[i].push(trimmed.to_string());
}
}
}
}
Ok((headers, columns, row_count))
}
pub(crate) fn path_leaf(path: &str) -> String {
let clean = path.trim_end_matches("[]");
if let Some(pos) = clean.rfind('.') {
clean[pos + 1..].to_string()
} else {
clean.to_string()
}
}
pub(crate) fn reconstruct_json_schema(
profiles: &[(String, String, Option<String>, f32)],
) -> serde_json::Value {
let mut root = serde_json::Map::new();
for (name, label, broad_type, confidence) in profiles {
if label == "unknown" {
continue;
}
let type_info = {
let mut obj = serde_json::Map::new();
obj.insert("type".to_string(), json!(label));
if let Some(bt) = broad_type {
obj.insert("broad_type".to_string(), json!(bt));
}
obj.insert(
"confidence".to_string(),
json!(format!("{:.1}%", confidence * 100.0)),
);
serde_json::Value::Object(obj)
};
insert_path(&mut root, name, type_info);
}
serde_json::Value::Object(root)
}
pub(crate) fn insert_path(
root: &mut serde_json::Map<String, serde_json::Value>,
path: &str,
value: serde_json::Value,
) {
let parts: Vec<&str> = path.split('.').collect();
if parts.len() == 1 {
let key = parts[0];
if let Some(name) = key.strip_suffix("[]") {
let entry = root
.entry(name.to_string())
.or_insert_with(|| json!({"_array": true}));
if let serde_json::Value::Object(obj) = entry {
obj.insert("_items".to_string(), value);
}
} else {
root.insert(key.to_string(), value);
}
return;
}
let key = parts[0];
let rest = parts[1..].join(".");
if let Some(name) = key.strip_suffix("[]") {
let entry = root
.entry(name.to_string())
.or_insert_with(|| json!({"_array": true}));
if let serde_json::Value::Object(obj) = entry {
obj.insert("_array".to_string(), json!(true));
let items = obj
.entry("_items".to_string())
.or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
if let serde_json::Value::Object(items_map) = items {
insert_path(items_map, &rest, value);
}
}
} else {
let entry = root
.entry(key.to_string())
.or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
if let serde_json::Value::Object(obj) = entry {
insert_path(obj, &rest, value);
}
}
}