Skip to main content

rsigma_runtime/sources/
file.rs

1//! File source resolver: reads data from local files.
2
3use std::path::Path;
4use std::time::Instant;
5
6use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
7
8use super::extract::apply_extract;
9use super::{ResolvedValue, SourceError, SourceErrorKind};
10
11/// Resolve a file source by reading and parsing the file at `path`.
12pub async fn resolve_file(
13    path: &Path,
14    format: DataFormat,
15    extract_expr: Option<&ExtractExpr>,
16) -> Result<ResolvedValue, SourceError> {
17    let contents = tokio::fs::read_to_string(path)
18        .await
19        .map_err(|e| SourceError {
20            source_id: String::new(),
21            kind: SourceErrorKind::Fetch(format!("failed to read {}: {e}", path.display())),
22        })?;
23
24    let parsed = parse_data(&contents, format)?;
25
26    let data = if let Some(expr) = extract_expr {
27        apply_extract(&parsed, expr)?
28    } else {
29        parsed
30    };
31
32    Ok(ResolvedValue {
33        data,
34        resolved_at: Instant::now(),
35        from_cache: false,
36    })
37}
38
39/// Parse raw string data according to the specified format.
40pub fn parse_data(raw: &str, format: DataFormat) -> Result<serde_json::Value, SourceError> {
41    match format {
42        DataFormat::Json => serde_json::from_str(raw).map_err(|e| SourceError {
43            source_id: String::new(),
44            kind: SourceErrorKind::Parse(format!("invalid JSON: {e}")),
45        }),
46        DataFormat::Yaml => {
47            let yaml: serde_yaml::Value = serde_yaml::from_str(raw).map_err(|e| SourceError {
48                source_id: String::new(),
49                kind: SourceErrorKind::Parse(format!("invalid YAML: {e}")),
50            })?;
51            Ok(super::yaml_value_to_json(&yaml))
52        }
53        DataFormat::Lines => {
54            let lines: Vec<serde_json::Value> = raw
55                .lines()
56                .filter(|l| !l.is_empty())
57                .map(|l| serde_json::Value::String(l.to_string()))
58                .collect();
59            Ok(serde_json::Value::Array(lines))
60        }
61        DataFormat::Csv => {
62            let mut reader = csv::ReaderBuilder::new()
63                .has_headers(true)
64                .from_reader(raw.as_bytes());
65            let headers: Vec<String> = reader
66                .headers()
67                .map_err(|e| SourceError {
68                    source_id: String::new(),
69                    kind: SourceErrorKind::Parse(format!("CSV header error: {e}")),
70                })?
71                .iter()
72                .map(|h| h.to_string())
73                .collect();
74
75            let mut rows = Vec::new();
76            for result in reader.records() {
77                let record = result.map_err(|e| SourceError {
78                    source_id: String::new(),
79                    kind: SourceErrorKind::Parse(format!("CSV row error: {e}")),
80                })?;
81                let obj: serde_json::Map<String, serde_json::Value> = headers
82                    .iter()
83                    .zip(record.iter())
84                    .map(|(h, v)| (h.clone(), serde_json::Value::String(v.to_string())))
85                    .collect();
86                rows.push(serde_json::Value::Object(obj));
87            }
88            Ok(serde_json::Value::Array(rows))
89        }
90    }
91}