Skip to main content

argentor_builtins/
csv_processor.rs

1//! CSV processing skill for the Argentor AI agent framework.
2//!
3//! Provides CSV parsing, column selection, filtering, sorting, statistics,
4//! and format conversion (CSV to JSON and back).
5
6use argentor_core::{ArgentorResult, ToolCall, ToolResult};
7use argentor_skills::skill::{Skill, SkillDescriptor};
8use async_trait::async_trait;
9use serde_json::{json, Value};
10
11/// CSV processing skill with parsing, filtering, sorting, and conversion.
12pub struct CsvProcessorSkill {
13    descriptor: SkillDescriptor,
14}
15
16impl CsvProcessorSkill {
17    /// Create a new CSV processor skill.
18    pub fn new() -> Self {
19        Self {
20            descriptor: SkillDescriptor {
21                name: "csv_processor".to_string(),
22                description: "CSV parsing, column selection, filtering, sorting, statistics, and CSV/JSON conversion.".to_string(),
23                parameters_schema: json!({
24                    "type": "object",
25                    "properties": {
26                        "operation": {
27                            "type": "string",
28                            "enum": ["parse", "to_json", "from_json", "select_columns", "filter", "sort", "statistics", "count_rows", "headers"],
29                            "description": "The CSV operation to perform"
30                        },
31                        "csv": {
32                            "type": "string",
33                            "description": "CSV content to process"
34                        },
35                        "json_data": {
36                            "type": "array",
37                            "description": "JSON array of objects to convert to CSV"
38                        },
39                        "columns": {
40                            "type": "array",
41                            "items": { "type": "string" },
42                            "description": "Column names to select"
43                        },
44                        "column": {
45                            "type": "string",
46                            "description": "Column name for filter/sort/statistics"
47                        },
48                        "value": {
49                            "type": "string",
50                            "description": "Value to filter by"
51                        },
52                        "delimiter": {
53                            "type": "string",
54                            "description": "Delimiter character (default: comma)"
55                        },
56                        "ascending": {
57                            "type": "boolean",
58                            "description": "Sort ascending (default: true)"
59                        }
60                    },
61                    "required": ["operation"]
62                }),
63                required_capabilities: vec![],
64                requires_approval: false,
65            },
66        }
67    }
68}
69
70impl Default for CsvProcessorSkill {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76/// Parse CSV text into rows (vector of vectors).
77fn parse_csv(csv: &str, delimiter: char) -> Vec<Vec<String>> {
78    csv.lines()
79        .filter(|line| !line.trim().is_empty())
80        .map(|line| {
81            line.split(delimiter)
82                .map(|cell| cell.trim().to_string())
83                .collect()
84        })
85        .collect()
86}
87
88/// Convert parsed CSV rows (with header) to JSON array of objects.
89fn rows_to_json(rows: &[Vec<String>]) -> Value {
90    if rows.is_empty() {
91        return json!([]);
92    }
93    let headers = &rows[0];
94    let objects: Vec<Value> = rows[1..]
95        .iter()
96        .map(|row| {
97            let mut obj = serde_json::Map::new();
98            for (i, header) in headers.iter().enumerate() {
99                let val = row.get(i).map(std::string::String::as_str).unwrap_or("");
100                obj.insert(header.clone(), Value::String(val.to_string()));
101            }
102            Value::Object(obj)
103        })
104        .collect();
105    Value::Array(objects)
106}
107
108/// Convert JSON array of objects to CSV string.
109fn json_to_csv(data: &[Value], delimiter: char) -> Result<String, String> {
110    if data.is_empty() {
111        return Ok(String::new());
112    }
113    let first = data[0]
114        .as_object()
115        .ok_or("Each JSON element must be an object")?;
116    let headers: Vec<&String> = first.keys().collect();
117    let mut result = headers
118        .iter()
119        .map(|h| h.as_str())
120        .collect::<Vec<_>>()
121        .join(&delimiter.to_string());
122    result.push('\n');
123
124    for item in data {
125        let obj = item
126            .as_object()
127            .ok_or("Each JSON element must be an object")?;
128        let row: Vec<String> = headers
129            .iter()
130            .map(|h| {
131                obj.get(*h)
132                    .map(|v| match v {
133                        Value::String(s) => s.clone(),
134                        other => other.to_string(),
135                    })
136                    .unwrap_or_default()
137            })
138            .collect();
139        result.push_str(&row.join(&delimiter.to_string()));
140        result.push('\n');
141    }
142    Ok(result.trim_end().to_string())
143}
144
145#[async_trait]
146impl Skill for CsvProcessorSkill {
147    fn descriptor(&self) -> &SkillDescriptor {
148        &self.descriptor
149    }
150
151    async fn execute(&self, call: ToolCall) -> ArgentorResult<ToolResult> {
152        let operation = match call.arguments["operation"].as_str() {
153            Some(op) => op,
154            None => {
155                return Ok(ToolResult::error(
156                    &call.id,
157                    "Missing required parameter: 'operation'",
158                ))
159            }
160        };
161
162        let delimiter = call.arguments["delimiter"]
163            .as_str()
164            .and_then(|s| s.chars().next())
165            .unwrap_or(',');
166
167        match operation {
168            "parse" => {
169                let csv = match call.arguments["csv"].as_str() {
170                    Some(v) => v,
171                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
172                };
173                let rows = parse_csv(csv, delimiter);
174                let response = json!({
175                    "rows": rows,
176                    "row_count": rows.len(),
177                    "column_count": rows.first().map(std::vec::Vec::len).unwrap_or(0)
178                });
179                Ok(ToolResult::success(&call.id, response.to_string()))
180            }
181            "to_json" => {
182                let csv = match call.arguments["csv"].as_str() {
183                    Some(v) => v,
184                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
185                };
186                let rows = parse_csv(csv, delimiter);
187                let json_data = rows_to_json(&rows);
188                let response = json!({
189                    "data": json_data,
190                    "record_count": if rows.len() > 1 { rows.len() - 1 } else { 0 }
191                });
192                Ok(ToolResult::success(&call.id, response.to_string()))
193            }
194            "from_json" => {
195                let json_data = match call.arguments["json_data"].as_array() {
196                    Some(v) => v,
197                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'json_data' (array of objects)")),
198                };
199                match json_to_csv(json_data, delimiter) {
200                    Ok(csv) => {
201                        let response = json!({ "csv": csv });
202                        Ok(ToolResult::success(&call.id, response.to_string()))
203                    }
204                    Err(e) => Ok(ToolResult::error(&call.id, e)),
205                }
206            }
207            "select_columns" => {
208                let csv = match call.arguments["csv"].as_str() {
209                    Some(v) => v,
210                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
211                };
212                let columns: Vec<String> = match call.arguments["columns"].as_array() {
213                    Some(arr) => arr.iter().filter_map(|v| v.as_str().map(String::from)).collect(),
214                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'columns'")),
215                };
216                let rows = parse_csv(csv, delimiter);
217                if rows.is_empty() {
218                    return Ok(ToolResult::success(&call.id, json!({"rows": []}).to_string()));
219                }
220                let headers = &rows[0];
221                let indices: Vec<usize> = columns
222                    .iter()
223                    .filter_map(|c| headers.iter().position(|h| h == c))
224                    .collect();
225                let selected: Vec<Vec<String>> = rows
226                    .iter()
227                    .map(|row| indices.iter().filter_map(|&i| row.get(i).cloned()).collect())
228                    .collect();
229                let response = json!({
230                    "rows": selected,
231                    "selected_columns": columns
232                });
233                Ok(ToolResult::success(&call.id, response.to_string()))
234            }
235            "filter" => {
236                let csv = match call.arguments["csv"].as_str() {
237                    Some(v) => v,
238                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
239                };
240                let column = match call.arguments["column"].as_str() {
241                    Some(v) => v,
242                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'column'")),
243                };
244                let value = match call.arguments["value"].as_str() {
245                    Some(v) => v,
246                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'value'")),
247                };
248                let rows = parse_csv(csv, delimiter);
249                if rows.is_empty() {
250                    return Ok(ToolResult::success(&call.id, json!({"rows": [], "match_count": 0}).to_string()));
251                }
252                let headers = &rows[0];
253                let col_idx = match headers.iter().position(|h| h == column) {
254                    Some(i) => i,
255                    None => return Ok(ToolResult::error(&call.id, format!("Column '{column}' not found"))),
256                };
257                let mut filtered = vec![rows[0].clone()];
258                for row in &rows[1..] {
259                    if row.get(col_idx).map(|v| v == value).unwrap_or(false) {
260                        filtered.push(row.clone());
261                    }
262                }
263                let match_count = filtered.len() - 1;
264                let response = json!({
265                    "rows": filtered,
266                    "match_count": match_count
267                });
268                Ok(ToolResult::success(&call.id, response.to_string()))
269            }
270            "sort" => {
271                let csv = match call.arguments["csv"].as_str() {
272                    Some(v) => v,
273                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
274                };
275                let column = match call.arguments["column"].as_str() {
276                    Some(v) => v,
277                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'column'")),
278                };
279                let ascending = call.arguments["ascending"].as_bool().unwrap_or(true);
280                let rows = parse_csv(csv, delimiter);
281                if rows.len() < 2 {
282                    return Ok(ToolResult::success(&call.id, json!({"rows": rows}).to_string()));
283                }
284                let headers = &rows[0];
285                let col_idx = match headers.iter().position(|h| h == column) {
286                    Some(i) => i,
287                    None => return Ok(ToolResult::error(&call.id, format!("Column '{column}' not found"))),
288                };
289                let mut data_rows: Vec<Vec<String>> = rows[1..].to_vec();
290                data_rows.sort_by(|a, b| {
291                    let va = a.get(col_idx).map(std::string::String::as_str).unwrap_or("");
292                    let vb = b.get(col_idx).map(std::string::String::as_str).unwrap_or("");
293                    // Try numeric comparison first
294                    if let (Ok(na), Ok(nb)) = (va.parse::<f64>(), vb.parse::<f64>()) {
295                        let cmp = na.partial_cmp(&nb).unwrap_or(std::cmp::Ordering::Equal);
296                        if ascending { cmp } else { cmp.reverse() }
297                    } else {
298                        let cmp = va.cmp(vb);
299                        if ascending { cmp } else { cmp.reverse() }
300                    }
301                });
302                let mut result = vec![rows[0].clone()];
303                result.extend(data_rows);
304                let response = json!({ "rows": result });
305                Ok(ToolResult::success(&call.id, response.to_string()))
306            }
307            "statistics" => {
308                let csv = match call.arguments["csv"].as_str() {
309                    Some(v) => v,
310                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
311                };
312                let column = match call.arguments["column"].as_str() {
313                    Some(v) => v,
314                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'column'")),
315                };
316                let rows = parse_csv(csv, delimiter);
317                if rows.len() < 2 {
318                    return Ok(ToolResult::error(&call.id, "Not enough data rows"));
319                }
320                let headers = &rows[0];
321                let col_idx = match headers.iter().position(|h| h == column) {
322                    Some(i) => i,
323                    None => return Ok(ToolResult::error(&call.id, format!("Column '{column}' not found"))),
324                };
325                let values: Vec<f64> = rows[1..]
326                    .iter()
327                    .filter_map(|row| row.get(col_idx).and_then(|v| v.parse::<f64>().ok()))
328                    .collect();
329                if values.is_empty() {
330                    return Ok(ToolResult::error(&call.id, "No numeric values found in column"));
331                }
332                let count = values.len();
333                let sum: f64 = values.iter().sum();
334                let mean = sum / count as f64;
335                let min = values.iter().copied().fold(f64::INFINITY, f64::min);
336                let max = values.iter().copied().fold(f64::NEG_INFINITY, f64::max);
337                let mut sorted = values.clone();
338                sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
339                let median = if count % 2 == 0 {
340                    (sorted[count / 2 - 1] + sorted[count / 2]) / 2.0
341                } else {
342                    sorted[count / 2]
343                };
344                let response = json!({
345                    "column": column,
346                    "count": count,
347                    "sum": sum,
348                    "mean": mean,
349                    "min": min,
350                    "max": max,
351                    "median": median
352                });
353                Ok(ToolResult::success(&call.id, response.to_string()))
354            }
355            "count_rows" => {
356                let csv = match call.arguments["csv"].as_str() {
357                    Some(v) => v,
358                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
359                };
360                let rows = parse_csv(csv, delimiter);
361                let data_rows = if rows.is_empty() { 0 } else { rows.len() - 1 };
362                let response = json!({
363                    "total_rows": rows.len(),
364                    "data_rows": data_rows,
365                    "has_header": !rows.is_empty()
366                });
367                Ok(ToolResult::success(&call.id, response.to_string()))
368            }
369            "headers" => {
370                let csv = match call.arguments["csv"].as_str() {
371                    Some(v) => v,
372                    None => return Ok(ToolResult::error(&call.id, "Missing required parameter: 'csv'")),
373                };
374                let rows = parse_csv(csv, delimiter);
375                let headers = rows.first().cloned().unwrap_or_default();
376                let response = json!({
377                    "headers": headers,
378                    "count": headers.len()
379                });
380                Ok(ToolResult::success(&call.id, response.to_string()))
381            }
382            _ => Ok(ToolResult::error(
383                &call.id,
384                format!("Unknown operation: '{operation}'. Supported: parse, to_json, from_json, select_columns, filter, sort, statistics, count_rows, headers"),
385            )),
386        }
387    }
388}
389
390#[cfg(test)]
391#[allow(clippy::unwrap_used, clippy::expect_used)]
392mod tests {
393    use super::*;
394
395    const SAMPLE_CSV: &str = "name,age,city\nAlice,30,NYC\nBob,25,LA\nCharlie,35,NYC";
396
397    fn make_call(args: Value) -> ToolCall {
398        ToolCall {
399            id: "test".to_string(),
400            name: "csv_processor".to_string(),
401            arguments: args,
402        }
403    }
404
405    #[tokio::test]
406    async fn test_parse() {
407        let skill = CsvProcessorSkill::new();
408        let call = make_call(json!({"operation": "parse", "csv": SAMPLE_CSV}));
409        let result = skill.execute(call).await.unwrap();
410        assert!(!result.is_error, "Result: {}", result.content);
411        let parsed: Value = serde_json::from_str(&result.content).unwrap();
412        assert_eq!(parsed["row_count"], 4);
413        assert_eq!(parsed["column_count"], 3);
414    }
415
416    #[tokio::test]
417    async fn test_to_json() {
418        let skill = CsvProcessorSkill::new();
419        let call = make_call(json!({"operation": "to_json", "csv": SAMPLE_CSV}));
420        let result = skill.execute(call).await.unwrap();
421        assert!(!result.is_error);
422        let parsed: Value = serde_json::from_str(&result.content).unwrap();
423        assert_eq!(parsed["record_count"], 3);
424        let data = parsed["data"].as_array().unwrap();
425        assert_eq!(data[0]["name"], "Alice");
426        assert_eq!(data[1]["age"], "25");
427    }
428
429    #[tokio::test]
430    async fn test_from_json() {
431        let skill = CsvProcessorSkill::new();
432        let call = make_call(json!({
433            "operation": "from_json",
434            "json_data": [
435                {"name": "Alice", "age": "30"},
436                {"name": "Bob", "age": "25"}
437            ]
438        }));
439        let result = skill.execute(call).await.unwrap();
440        assert!(!result.is_error);
441        let parsed: Value = serde_json::from_str(&result.content).unwrap();
442        let csv = parsed["csv"].as_str().unwrap();
443        assert!(csv.contains("Alice"));
444        assert!(csv.contains("Bob"));
445    }
446
447    #[tokio::test]
448    async fn test_select_columns() {
449        let skill = CsvProcessorSkill::new();
450        let call = make_call(json!({
451            "operation": "select_columns",
452            "csv": SAMPLE_CSV,
453            "columns": ["name", "city"]
454        }));
455        let result = skill.execute(call).await.unwrap();
456        assert!(!result.is_error);
457        let parsed: Value = serde_json::from_str(&result.content).unwrap();
458        let rows = parsed["rows"].as_array().unwrap();
459        assert_eq!(rows[0], json!(["name", "city"]));
460        assert_eq!(rows[1], json!(["Alice", "NYC"]));
461    }
462
463    #[tokio::test]
464    async fn test_filter() {
465        let skill = CsvProcessorSkill::new();
466        let call = make_call(json!({
467            "operation": "filter",
468            "csv": SAMPLE_CSV,
469            "column": "city",
470            "value": "NYC"
471        }));
472        let result = skill.execute(call).await.unwrap();
473        assert!(!result.is_error);
474        let parsed: Value = serde_json::from_str(&result.content).unwrap();
475        assert_eq!(parsed["match_count"], 2);
476    }
477
478    #[tokio::test]
479    async fn test_filter_no_match() {
480        let skill = CsvProcessorSkill::new();
481        let call = make_call(json!({
482            "operation": "filter",
483            "csv": SAMPLE_CSV,
484            "column": "city",
485            "value": "Chicago"
486        }));
487        let result = skill.execute(call).await.unwrap();
488        assert!(!result.is_error);
489        let parsed: Value = serde_json::from_str(&result.content).unwrap();
490        assert_eq!(parsed["match_count"], 0);
491    }
492
493    #[tokio::test]
494    async fn test_sort_ascending() {
495        let skill = CsvProcessorSkill::new();
496        let call = make_call(json!({
497            "operation": "sort",
498            "csv": SAMPLE_CSV,
499            "column": "age",
500            "ascending": true
501        }));
502        let result = skill.execute(call).await.unwrap();
503        assert!(!result.is_error);
504        let parsed: Value = serde_json::from_str(&result.content).unwrap();
505        let rows = parsed["rows"].as_array().unwrap();
506        assert_eq!(rows[1][1], "25"); // Bob first (youngest)
507        assert_eq!(rows[3][1], "35"); // Charlie last (oldest)
508    }
509
510    #[tokio::test]
511    async fn test_sort_descending() {
512        let skill = CsvProcessorSkill::new();
513        let call = make_call(json!({
514            "operation": "sort",
515            "csv": SAMPLE_CSV,
516            "column": "age",
517            "ascending": false
518        }));
519        let result = skill.execute(call).await.unwrap();
520        assert!(!result.is_error);
521        let parsed: Value = serde_json::from_str(&result.content).unwrap();
522        let rows = parsed["rows"].as_array().unwrap();
523        assert_eq!(rows[1][1], "35"); // Charlie first (oldest)
524    }
525
526    #[tokio::test]
527    async fn test_statistics() {
528        let skill = CsvProcessorSkill::new();
529        let call = make_call(json!({
530            "operation": "statistics",
531            "csv": SAMPLE_CSV,
532            "column": "age"
533        }));
534        let result = skill.execute(call).await.unwrap();
535        assert!(!result.is_error);
536        let parsed: Value = serde_json::from_str(&result.content).unwrap();
537        assert_eq!(parsed["count"], 3);
538        assert_eq!(parsed["min"], 25.0);
539        assert_eq!(parsed["max"], 35.0);
540        assert_eq!(parsed["mean"], 30.0);
541        assert_eq!(parsed["median"], 30.0);
542        assert_eq!(parsed["sum"], 90.0);
543    }
544
545    #[tokio::test]
546    async fn test_count_rows() {
547        let skill = CsvProcessorSkill::new();
548        let call = make_call(json!({"operation": "count_rows", "csv": SAMPLE_CSV}));
549        let result = skill.execute(call).await.unwrap();
550        assert!(!result.is_error);
551        let parsed: Value = serde_json::from_str(&result.content).unwrap();
552        assert_eq!(parsed["total_rows"], 4);
553        assert_eq!(parsed["data_rows"], 3);
554    }
555
556    #[tokio::test]
557    async fn test_headers() {
558        let skill = CsvProcessorSkill::new();
559        let call = make_call(json!({"operation": "headers", "csv": SAMPLE_CSV}));
560        let result = skill.execute(call).await.unwrap();
561        assert!(!result.is_error);
562        let parsed: Value = serde_json::from_str(&result.content).unwrap();
563        assert_eq!(parsed["headers"], json!(["name", "age", "city"]));
564        assert_eq!(parsed["count"], 3);
565    }
566
567    #[tokio::test]
568    async fn test_custom_delimiter() {
569        let skill = CsvProcessorSkill::new();
570        let tsv = "name\tage\nAlice\t30";
571        let call = make_call(json!({"operation": "parse", "csv": tsv, "delimiter": "\t"}));
572        let result = skill.execute(call).await.unwrap();
573        assert!(!result.is_error);
574        let parsed: Value = serde_json::from_str(&result.content).unwrap();
575        assert_eq!(parsed["column_count"], 2);
576    }
577
578    #[tokio::test]
579    async fn test_filter_column_not_found() {
580        let skill = CsvProcessorSkill::new();
581        let call = make_call(json!({
582            "operation": "filter",
583            "csv": SAMPLE_CSV,
584            "column": "nonexistent",
585            "value": "x"
586        }));
587        let result = skill.execute(call).await.unwrap();
588        assert!(result.is_error);
589        assert!(result.content.contains("not found"));
590    }
591
592    #[tokio::test]
593    async fn test_missing_operation() {
594        let skill = CsvProcessorSkill::new();
595        let call = make_call(json!({"csv": "a,b"}));
596        let result = skill.execute(call).await.unwrap();
597        assert!(result.is_error);
598    }
599
600    #[tokio::test]
601    async fn test_unknown_operation() {
602        let skill = CsvProcessorSkill::new();
603        let call = make_call(json!({"operation": "pivot"}));
604        let result = skill.execute(call).await.unwrap();
605        assert!(result.is_error);
606        assert!(result.content.contains("Unknown operation"));
607    }
608
609    #[test]
610    fn test_descriptor_name() {
611        let skill = CsvProcessorSkill::new();
612        assert_eq!(skill.descriptor().name, "csv_processor");
613    }
614}