use std::path::Path;
use std::time::Instant;
use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
use super::extract::apply_extract;
use super::{ResolvedValue, SourceError, SourceErrorKind};
pub async fn resolve_file(
path: &Path,
format: DataFormat,
extract_expr: Option<&ExtractExpr>,
) -> Result<ResolvedValue, SourceError> {
let contents = tokio::fs::read_to_string(path)
.await
.map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Fetch(format!("failed to read {}: {e}", path.display())),
})?;
let parsed = parse_data(&contents, format)?;
let data = if let Some(expr) = extract_expr {
apply_extract(&parsed, expr)?
} else {
parsed
};
Ok(ResolvedValue {
data,
resolved_at: Instant::now(),
from_cache: false,
})
}
pub fn parse_data(raw: &str, format: DataFormat) -> Result<serde_json::Value, SourceError> {
match format {
DataFormat::Json => serde_json::from_str(raw).map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Parse(format!("invalid JSON: {e}")),
}),
DataFormat::Yaml => {
let yaml: yaml_serde::Value = yaml_serde::from_str(raw).map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Parse(format!("invalid YAML: {e}")),
})?;
Ok(super::yaml_value_to_json(&yaml))
}
DataFormat::Lines => {
let lines: Vec<serde_json::Value> = raw
.lines()
.filter(|l| !l.is_empty())
.map(|l| serde_json::Value::String(l.to_string()))
.collect();
Ok(serde_json::Value::Array(lines))
}
DataFormat::Csv => {
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(raw.as_bytes());
let headers: Vec<String> = reader
.headers()
.map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Parse(format!("CSV header error: {e}")),
})?
.iter()
.map(|h| h.to_string())
.collect();
let mut rows = Vec::new();
for result in reader.records() {
let record = result.map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Parse(format!("CSV row error: {e}")),
})?;
let obj: serde_json::Map<String, serde_json::Value> = headers
.iter()
.zip(record.iter())
.map(|(h, v)| (h.clone(), serde_json::Value::String(v.to_string())))
.collect();
rows.push(serde_json::Value::Object(obj));
}
Ok(serde_json::Value::Array(rows))
}
}
}