a3s_flow/nodes/
csv_parse.rs1use async_trait::async_trait;
59use csv::ReaderBuilder;
60use serde_json::{json, Value};
61
62use crate::condition::get_path;
63use crate::error::{FlowError, Result};
64use crate::node::{ExecContext, Node};
65
66pub struct CsvParseNode;
68
69#[async_trait]
70impl Node for CsvParseNode {
71 fn node_type(&self) -> &str {
72 "csv-parse"
73 }
74
75 async fn execute(&self, ctx: ExecContext) -> Result<Value> {
76 let input_selector = ctx.data["input_selector"].as_str().ok_or_else(|| {
78 FlowError::InvalidDefinition("csv-parse: missing data.input_selector".into())
79 })?;
80
81 let csv_text = resolve_input_text(&ctx.inputs, input_selector)?;
82
83 let delimiter = ctx.data["delimiter"]
85 .as_str()
86 .and_then(|s| s.chars().next())
87 .unwrap_or(',');
88
89 let has_header = ctx.data["has_header"].as_bool().unwrap_or(true);
90
91 let output = if has_header {
93 parse_csv_with_header(&csv_text, delimiter)?
94 } else {
95 parse_csv_without_header(&csv_text, delimiter)?
96 };
97
98 Ok(json!({ "output": output }))
99 }
100}
101
102fn resolve_input_text(
106 inputs: &std::collections::HashMap<String, Value>,
107 selector: &str,
108) -> Result<String> {
109 let parts: Vec<&str> = selector.split('.').collect();
110 if parts.is_empty() {
111 return Err(FlowError::InvalidDefinition(
112 "csv-parse: input_selector cannot be empty".into(),
113 ));
114 }
115
116 let node_id = parts[0];
117 let node_output = inputs.get(node_id).ok_or_else(|| {
118 FlowError::Internal(format!(
119 "csv-parse: upstream node '{}' not found in inputs",
120 node_id
121 ))
122 })?;
123
124 let value = if parts.len() == 1 {
125 node_output
126 } else {
127 let path = parts[1..].join(".");
128 get_path(node_output, &path).unwrap_or(&Value::Null)
129 };
130
131 value.as_str().map(|s| s.to_string()).ok_or_else(|| {
132 FlowError::InvalidDefinition(format!(
133 "csv-parse: input at '{}' is not a string",
134 selector
135 ))
136 })
137}
138
139fn parse_csv_with_header(csv_text: &str, delimiter: char) -> Result<Vec<Value>> {
141 let mut reader = ReaderBuilder::new()
142 .delimiter(delimiter as u8)
143 .from_reader(csv_text.as_bytes());
144
145 let headers = reader
146 .headers()
147 .map_err(|e| FlowError::Internal(format!("csv-parse: failed to read headers: {}", e)))?
148 .iter()
149 .map(|h| h.to_string())
150 .collect::<Vec<_>>();
151
152 let mut output = Vec::new();
153
154 for result in reader.records() {
155 let record = result
156 .map_err(|e| FlowError::Internal(format!("csv-parse: failed to read record: {}", e)))?;
157
158 let mut obj = serde_json::Map::new();
159 for (i, field) in record.iter().enumerate() {
160 let key = headers.get(i).map(|s| s.as_str()).unwrap_or("");
161 obj.insert(key.to_string(), Value::String(field.to_string()));
162 }
163 output.push(Value::Object(obj));
164 }
165
166 Ok(output)
167}
168
169fn parse_csv_without_header(csv_text: &str, delimiter: char) -> Result<Vec<Value>> {
171 let mut reader = ReaderBuilder::new()
172 .delimiter(delimiter as u8)
173 .has_headers(false)
174 .from_reader(csv_text.as_bytes());
175
176 let mut output = Vec::new();
177
178 for result in reader.records() {
179 let record = result
180 .map_err(|e| FlowError::Internal(format!("csv-parse: failed to read record: {}", e)))?;
181
182 let row: Vec<Value> = record
183 .iter()
184 .map(|field| Value::String(field.to_string()))
185 .collect();
186
187 output.push(Value::Array(row));
188 }
189
190 Ok(output)
191}
192
193#[cfg(test)]
196mod tests {
197 use super::*;
198 use std::collections::HashMap;
199
200 fn ctx_with_input(data: Value, input_node: &str, input_value: Value) -> ExecContext {
201 let mut inputs = HashMap::new();
202 inputs.insert(input_node.to_string(), input_value);
203 ExecContext {
204 data,
205 inputs,
206 ..Default::default()
207 }
208 }
209
210 #[tokio::test]
211 async fn parses_csv_with_header() {
212 let csv_text = "name,age,city\nAlice,30,NYC\nBob,25,LA";
213 let node = CsvParseNode;
214 let out = node
215 .execute(ctx_with_input(
216 json!({ "input_selector": "fetch" }),
217 "fetch",
218 json!(csv_text),
219 ))
220 .await
221 .unwrap();
222
223 let output = out["output"].as_array().unwrap();
224 assert_eq!(output.len(), 2);
225 assert_eq!(output[0]["name"], "Alice");
226 assert_eq!(output[0]["age"], "30");
227 assert_eq!(output[0]["city"], "NYC");
228 assert_eq!(output[1]["name"], "Bob");
229 }
230
231 #[tokio::test]
232 async fn parses_csv_without_header() {
233 let csv_text = "Alice,30,NYC\nBob,25,LA";
234 let node = CsvParseNode;
235 let out = node
236 .execute(ctx_with_input(
237 json!({ "input_selector": "fetch", "has_header": false }),
238 "fetch",
239 json!(csv_text),
240 ))
241 .await
242 .unwrap();
243
244 let output = out["output"].as_array().unwrap();
245 assert_eq!(output.len(), 2);
246 assert_eq!(output[0], json!(["Alice", "30", "NYC"]));
247 assert_eq!(output[1], json!(["Bob", "25", "LA"]));
248 }
249
250 #[tokio::test]
251 async fn parses_csv_with_custom_delimiter() {
252 let csv_text = "name;age;city\nAlice;30;NYC\nBob;25;LA";
253 let node = CsvParseNode;
254 let out = node
255 .execute(ctx_with_input(
256 json!({ "input_selector": "fetch", "delimiter": ";" }),
257 "fetch",
258 json!(csv_text),
259 ))
260 .await
261 .unwrap();
262
263 let output = out["output"].as_array().unwrap();
264 assert_eq!(output.len(), 2);
265 assert_eq!(output[0]["name"], "Alice");
266 assert_eq!(output[0]["age"], "30");
267 }
268
269 #[tokio::test]
270 async fn parses_csv_from_nested_path() {
271 let csv_text = "name,age\nAlice,30";
272 let node = CsvParseNode;
273 let out = node
274 .execute(ctx_with_input(
275 json!({ "input_selector": "fetch.body" }),
276 "fetch",
277 json!({ "body": csv_text }),
278 ))
279 .await
280 .unwrap();
281
282 let output = out["output"].as_array().unwrap();
283 assert_eq!(output.len(), 1);
284 assert_eq!(output[0]["name"], "Alice");
285 }
286
287 #[tokio::test]
288 async fn empty_csv_returns_empty_array() {
289 let csv_text = "name,age\n";
290 let node = CsvParseNode;
291 let out = node
292 .execute(ctx_with_input(
293 json!({ "input_selector": "fetch" }),
294 "fetch",
295 json!(csv_text),
296 ))
297 .await
298 .unwrap();
299
300 let output = out["output"].as_array().unwrap();
301 assert_eq!(output.len(), 0);
302 }
303
304 #[tokio::test]
305 async fn missing_input_selector_returns_error() {
306 let node = CsvParseNode;
307 let err = node
308 .execute(ExecContext {
309 data: json!({}),
310 ..Default::default()
311 })
312 .await
313 .unwrap_err();
314 assert!(matches!(err, FlowError::InvalidDefinition(_)));
315 }
316
317 #[tokio::test]
318 async fn non_string_input_returns_error() {
319 let node = CsvParseNode;
320 let err = node
321 .execute(ctx_with_input(
322 json!({ "input_selector": "fetch" }),
323 "fetch",
324 json!(123),
325 ))
326 .await
327 .unwrap_err();
328 assert!(matches!(err, FlowError::InvalidDefinition(_)));
329 }
330
331 #[tokio::test]
332 async fn missing_upstream_node_returns_error() {
333 let node = CsvParseNode;
334 let err = node
335 .execute(ExecContext {
336 data: json!({ "input_selector": "nonexistent" }),
337 ..Default::default()
338 })
339 .await
340 .unwrap_err();
341 assert!(matches!(err, FlowError::Internal(_)));
342 }
343}