rsigma_runtime/sources/
file.rs1use std::path::Path;
4use std::time::Instant;
5
6use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
7
8use super::extract::apply_extract;
9use super::{ResolvedValue, SourceError, SourceErrorKind};
10
11pub async fn resolve_file(
13 path: &Path,
14 format: DataFormat,
15 extract_expr: Option<&ExtractExpr>,
16) -> Result<ResolvedValue, SourceError> {
17 let contents = tokio::fs::read_to_string(path)
18 .await
19 .map_err(|e| SourceError {
20 source_id: String::new(),
21 kind: SourceErrorKind::Fetch(format!("failed to read {}: {e}", path.display())),
22 })?;
23
24 let parsed = parse_data(&contents, format)?;
25
26 let data = if let Some(expr) = extract_expr {
27 apply_extract(&parsed, expr)?
28 } else {
29 parsed
30 };
31
32 Ok(ResolvedValue {
33 data,
34 resolved_at: Instant::now(),
35 from_cache: false,
36 })
37}
38
39pub fn parse_data(raw: &str, format: DataFormat) -> Result<serde_json::Value, SourceError> {
41 match format {
42 DataFormat::Json => serde_json::from_str(raw).map_err(|e| SourceError {
43 source_id: String::new(),
44 kind: SourceErrorKind::Parse(format!("invalid JSON: {e}")),
45 }),
46 DataFormat::Yaml => {
47 let yaml: yaml_serde::Value = yaml_serde::from_str(raw).map_err(|e| SourceError {
48 source_id: String::new(),
49 kind: SourceErrorKind::Parse(format!("invalid YAML: {e}")),
50 })?;
51 Ok(super::yaml_value_to_json(&yaml))
52 }
53 DataFormat::Lines => {
54 let lines: Vec<serde_json::Value> = raw
55 .lines()
56 .filter(|l| !l.is_empty())
57 .map(|l| serde_json::Value::String(l.to_string()))
58 .collect();
59 Ok(serde_json::Value::Array(lines))
60 }
61 DataFormat::Csv => {
62 let mut reader = csv::ReaderBuilder::new()
63 .has_headers(true)
64 .from_reader(raw.as_bytes());
65 let headers: Vec<String> = reader
66 .headers()
67 .map_err(|e| SourceError {
68 source_id: String::new(),
69 kind: SourceErrorKind::Parse(format!("CSV header error: {e}")),
70 })?
71 .iter()
72 .map(|h| h.to_string())
73 .collect();
74
75 let mut rows = Vec::new();
76 for result in reader.records() {
77 let record = result.map_err(|e| SourceError {
78 source_id: String::new(),
79 kind: SourceErrorKind::Parse(format!("CSV row error: {e}")),
80 })?;
81 let obj: serde_json::Map<String, serde_json::Value> = headers
82 .iter()
83 .zip(record.iter())
84 .map(|(h, v)| (h.clone(), serde_json::Value::String(v.to_string())))
85 .collect();
86 rows.push(serde_json::Value::Object(obj));
87 }
88 Ok(serde_json::Value::Array(rows))
89 }
90 }
91}