Skip to main content

a3s_flow/nodes/
csv_parse.rs

1//! `"csv-parse"` node — parse CSV text into a JSON array.
2//!
3//! Parses CSV text from an upstream node output or variable into a JSON array.
4//! Supports configurable delimiter and header row handling.
5//!
6//! # Config schema
7//!
8//! ```json
9//! {
10//!   "input_selector": "fetch.body",
11//!   "delimiter": ",",
12//!   "has_header": true
13//! }
14//! ```
15//!
16//! | Field | Type | Required | Description |
17//! |-------|------|:--------:|-------------|
18//! | `input_selector` | string | ✅ | Dot path into upstream inputs: `"node_id"` or `"node_id.field.subfield"` |
19//! | `delimiter` | string | — | Field delimiter (default: `","`) |
20//! | `has_header` | boolean | — | Whether the first row is a header (default: `true`) |
21//!
22//! # Output schema
23//!
24//! **With header (`has_header: true`, default):**
25//! ```json
26//! {
27//!   "output": [
28//!     { "name": "Alice", "age": "30", "city": "NYC" },
29//!     { "name": "Bob", "age": "25", "city": "LA" }
30//!   ]
31//! }
32//! ```
33//!
34//! **Without header (`has_header: false`):**
35//! ```json
36//! {
37//!   "output": [
38//!     ["Alice", "30", "NYC"],
39//!     ["Bob", "25", "LA"]
40//!   ]
41//! }
42//! ```
43//!
44//! # Example
45//!
46//! ```json
47//! {
48//!   "id": "parse_csv",
49//!   "type": "csv-parse",
50//!   "data": {
51//!     "input_selector": "fetch.body",
52//!     "delimiter": ",",
53//!     "has_header": true
54//!   }
55//! }
56//! ```
57
58use async_trait::async_trait;
59use csv::ReaderBuilder;
60use serde_json::{json, Value};
61
62use crate::condition::get_path;
63use crate::error::{FlowError, Result};
64use crate::node::{ExecContext, Node};
65
66/// CSV parse node — converts CSV text into a JSON array.
67pub struct CsvParseNode;
68
69#[async_trait]
70impl Node for CsvParseNode {
71    fn node_type(&self) -> &str {
72        "csv-parse"
73    }
74
75    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
76        // ── Resolve input CSV text ────────────────────────────────────────
77        let input_selector = ctx.data["input_selector"].as_str().ok_or_else(|| {
78            FlowError::InvalidDefinition("csv-parse: missing data.input_selector".into())
79        })?;
80
81        let csv_text = resolve_input_text(&ctx.inputs, input_selector)?;
82
83        // ── Parse configuration ───────────────────────────────────────────
84        let delimiter = ctx.data["delimiter"]
85            .as_str()
86            .and_then(|s| s.chars().next())
87            .unwrap_or(',');
88
89        let has_header = ctx.data["has_header"].as_bool().unwrap_or(true);
90
91        // ── Parse CSV ─────────────────────────────────────────────────────
92        let output = if has_header {
93            parse_csv_with_header(&csv_text, delimiter)?
94        } else {
95            parse_csv_without_header(&csv_text, delimiter)?
96        };
97
98        Ok(json!({ "output": output }))
99    }
100}
101
102// ── Helper functions ──────────────────────────────────────────────────────
103
104/// Resolve the input CSV text from upstream inputs using a dot-separated path.
105fn resolve_input_text(
106    inputs: &std::collections::HashMap<String, Value>,
107    selector: &str,
108) -> Result<String> {
109    let parts: Vec<&str> = selector.split('.').collect();
110    if parts.is_empty() {
111        return Err(FlowError::InvalidDefinition(
112            "csv-parse: input_selector cannot be empty".into(),
113        ));
114    }
115
116    let node_id = parts[0];
117    let node_output = inputs.get(node_id).ok_or_else(|| {
118        FlowError::Internal(format!(
119            "csv-parse: upstream node '{}' not found in inputs",
120            node_id
121        ))
122    })?;
123
124    let value = if parts.len() == 1 {
125        node_output
126    } else {
127        let path = parts[1..].join(".");
128        get_path(node_output, &path).unwrap_or(&Value::Null)
129    };
130
131    value.as_str().map(|s| s.to_string()).ok_or_else(|| {
132        FlowError::InvalidDefinition(format!(
133            "csv-parse: input at '{}' is not a string",
134            selector
135        ))
136    })
137}
138
139/// Parse CSV with header row — returns array of objects.
140fn parse_csv_with_header(csv_text: &str, delimiter: char) -> Result<Vec<Value>> {
141    let mut reader = ReaderBuilder::new()
142        .delimiter(delimiter as u8)
143        .from_reader(csv_text.as_bytes());
144
145    let headers = reader
146        .headers()
147        .map_err(|e| FlowError::Internal(format!("csv-parse: failed to read headers: {}", e)))?
148        .iter()
149        .map(|h| h.to_string())
150        .collect::<Vec<_>>();
151
152    let mut output = Vec::new();
153
154    for result in reader.records() {
155        let record = result
156            .map_err(|e| FlowError::Internal(format!("csv-parse: failed to read record: {}", e)))?;
157
158        let mut obj = serde_json::Map::new();
159        for (i, field) in record.iter().enumerate() {
160            let key = headers.get(i).map(|s| s.as_str()).unwrap_or("");
161            obj.insert(key.to_string(), Value::String(field.to_string()));
162        }
163        output.push(Value::Object(obj));
164    }
165
166    Ok(output)
167}
168
169/// Parse CSV without header row — returns array of arrays.
170fn parse_csv_without_header(csv_text: &str, delimiter: char) -> Result<Vec<Value>> {
171    let mut reader = ReaderBuilder::new()
172        .delimiter(delimiter as u8)
173        .has_headers(false)
174        .from_reader(csv_text.as_bytes());
175
176    let mut output = Vec::new();
177
178    for result in reader.records() {
179        let record = result
180            .map_err(|e| FlowError::Internal(format!("csv-parse: failed to read record: {}", e)))?;
181
182        let row: Vec<Value> = record
183            .iter()
184            .map(|field| Value::String(field.to_string()))
185            .collect();
186
187        output.push(Value::Array(row));
188    }
189
190    Ok(output)
191}
192
193// ── Tests ─────────────────────────────────────────────────────────────────
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use std::collections::HashMap;
199
200    fn ctx_with_input(data: Value, input_node: &str, input_value: Value) -> ExecContext {
201        let mut inputs = HashMap::new();
202        inputs.insert(input_node.to_string(), input_value);
203        ExecContext {
204            data,
205            inputs,
206            ..Default::default()
207        }
208    }
209
210    #[tokio::test]
211    async fn parses_csv_with_header() {
212        let csv_text = "name,age,city\nAlice,30,NYC\nBob,25,LA";
213        let node = CsvParseNode;
214        let out = node
215            .execute(ctx_with_input(
216                json!({ "input_selector": "fetch" }),
217                "fetch",
218                json!(csv_text),
219            ))
220            .await
221            .unwrap();
222
223        let output = out["output"].as_array().unwrap();
224        assert_eq!(output.len(), 2);
225        assert_eq!(output[0]["name"], "Alice");
226        assert_eq!(output[0]["age"], "30");
227        assert_eq!(output[0]["city"], "NYC");
228        assert_eq!(output[1]["name"], "Bob");
229    }
230
231    #[tokio::test]
232    async fn parses_csv_without_header() {
233        let csv_text = "Alice,30,NYC\nBob,25,LA";
234        let node = CsvParseNode;
235        let out = node
236            .execute(ctx_with_input(
237                json!({ "input_selector": "fetch", "has_header": false }),
238                "fetch",
239                json!(csv_text),
240            ))
241            .await
242            .unwrap();
243
244        let output = out["output"].as_array().unwrap();
245        assert_eq!(output.len(), 2);
246        assert_eq!(output[0], json!(["Alice", "30", "NYC"]));
247        assert_eq!(output[1], json!(["Bob", "25", "LA"]));
248    }
249
250    #[tokio::test]
251    async fn parses_csv_with_custom_delimiter() {
252        let csv_text = "name;age;city\nAlice;30;NYC\nBob;25;LA";
253        let node = CsvParseNode;
254        let out = node
255            .execute(ctx_with_input(
256                json!({ "input_selector": "fetch", "delimiter": ";" }),
257                "fetch",
258                json!(csv_text),
259            ))
260            .await
261            .unwrap();
262
263        let output = out["output"].as_array().unwrap();
264        assert_eq!(output.len(), 2);
265        assert_eq!(output[0]["name"], "Alice");
266        assert_eq!(output[0]["age"], "30");
267    }
268
269    #[tokio::test]
270    async fn parses_csv_from_nested_path() {
271        let csv_text = "name,age\nAlice,30";
272        let node = CsvParseNode;
273        let out = node
274            .execute(ctx_with_input(
275                json!({ "input_selector": "fetch.body" }),
276                "fetch",
277                json!({ "body": csv_text }),
278            ))
279            .await
280            .unwrap();
281
282        let output = out["output"].as_array().unwrap();
283        assert_eq!(output.len(), 1);
284        assert_eq!(output[0]["name"], "Alice");
285    }
286
287    #[tokio::test]
288    async fn empty_csv_returns_empty_array() {
289        let csv_text = "name,age\n";
290        let node = CsvParseNode;
291        let out = node
292            .execute(ctx_with_input(
293                json!({ "input_selector": "fetch" }),
294                "fetch",
295                json!(csv_text),
296            ))
297            .await
298            .unwrap();
299
300        let output = out["output"].as_array().unwrap();
301        assert_eq!(output.len(), 0);
302    }
303
304    #[tokio::test]
305    async fn missing_input_selector_returns_error() {
306        let node = CsvParseNode;
307        let err = node
308            .execute(ExecContext {
309                data: json!({}),
310                ..Default::default()
311            })
312            .await
313            .unwrap_err();
314        assert!(matches!(err, FlowError::InvalidDefinition(_)));
315    }
316
317    #[tokio::test]
318    async fn non_string_input_returns_error() {
319        let node = CsvParseNode;
320        let err = node
321            .execute(ctx_with_input(
322                json!({ "input_selector": "fetch" }),
323                "fetch",
324                json!(123),
325            ))
326            .await
327            .unwrap_err();
328        assert!(matches!(err, FlowError::InvalidDefinition(_)));
329    }
330
331    #[tokio::test]
332    async fn missing_upstream_node_returns_error() {
333        let node = CsvParseNode;
334        let err = node
335            .execute(ExecContext {
336                data: json!({ "input_selector": "nonexistent" }),
337                ..Default::default()
338            })
339            .await
340            .unwrap_err();
341        assert!(matches!(err, FlowError::Internal(_)));
342    }
343}