1use crate::agent::truncation::safe_head;
2use rusqlite::{types::Value as SqlValue, Connection};
3use serde_json::Value;
4use std::fmt::Write as _;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::PathBuf;
8
9pub async fn query_data(args: &Value) -> Result<String, String> {
10 let sql = args
11 .get("sql")
12 .and_then(|v| v.as_str())
13 .ok_or("Missing 'sql' argument")?;
14 let path_str = args
15 .get("path")
16 .and_then(|v| v.as_str())
17 .ok_or("Missing 'path' argument")?;
18 let explain = args
19 .get("explain")
20 .and_then(|v| v.as_bool())
21 .unwrap_or(false);
22 let path = PathBuf::from(path_str);
23
24 if !path.exists() {
25 return Err(format!("File not found: {:?}", path));
26 }
27
28 let ext = path
29 .extension()
30 .and_then(|s| s.to_str())
31 .unwrap_or("")
32 .to_lowercase();
33
34 match ext.as_str() {
35 "db" | "sqlite" | "sqlite3" => query_sqlite(&path, sql, explain),
36 "csv" => query_csv_streaming(&path, sql, explain),
37 "json" => {
38 query_json_optimized(&path, sql, explain)
40 }
41 _ => Err(format!(
42 "Unsupported file extension for SQL query: .{}",
43 ext
44 )),
45 }
46}
47
48fn query_sqlite(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
49 let conn = Connection::open(path).map_err(|e| format!("Failed to open database: {}", e))?;
50 let sql_to_run = if explain {
51 format!("EXPLAIN QUERY PLAN {}", sql)
52 } else {
53 sql.to_string()
54 };
55 execute_and_format(&conn, &sql_to_run)
56}
57
58fn query_csv_streaming(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
59 let file = File::open(path).map_err(|e| format!("Failed to open file: {}", e))?;
60 let reader = BufReader::new(file);
61 let mut lines = reader.lines();
62
63 let header = lines
64 .next()
65 .ok_or("CSV file is empty")?
66 .map_err(|e| e.to_string())?;
67 let delimiter = if header.contains(',') { "," } else { "\t" };
68 let raw_cols: Vec<String> = header
69 .split(delimiter)
70 .map(|s| s.trim().replace("\"", ""))
71 .collect();
72
73 let clean_cols: Vec<String> = raw_cols
75 .iter()
76 .map(|s| {
77 s.chars()
78 .filter(|c| c.is_alphanumeric() || *c == '_')
79 .collect::<String>()
80 })
81 .map(|s| {
82 if s.is_empty() {
83 "column".to_string()
84 } else {
85 s
86 }
87 })
88 .collect();
89
90 let mut sample_rows = Vec::new();
92 for _ in 0..100 {
93 if let Some(Ok(line)) = lines.next() {
94 sample_rows.push(line);
95 } else {
96 break;
97 }
98 }
99
100 let mut col_types = vec!["INTEGER"; clean_cols.len()];
101 for line in &sample_rows {
102 for (i, val) in line.split(delimiter).map(|s| s.trim()).enumerate() {
103 if i >= col_types.len() {
104 break;
105 }
106 if col_types[i] == "TEXT" {
107 continue;
108 }
109
110 if val.parse::<i64>().is_err() {
111 if val.parse::<f64>().is_ok() {
112 col_types[i] = "REAL";
113 } else {
114 col_types[i] = "TEXT";
115 }
116 }
117 }
118 }
119
120 let mut conn = Connection::open_in_memory().map_err(|e| format!("Memory DB Error: {}", e))?;
121
122 let mut create_sql = "CREATE TABLE source (".to_string();
123 for (i, col) in clean_cols.iter().enumerate() {
124 let _ = write!(create_sql, "{} {}", col, col_types[i]);
125 if i < clean_cols.len() - 1 {
126 create_sql.push_str(", ");
127 }
128 }
129 create_sql.push(')');
130
131 conn.execute(&create_sql, [])
132 .map_err(|e| format!("DDL Error: {}", e))?;
133
134 {
136 let tx = conn.transaction().map_err(|e| e.to_string())?;
137 let placeholders = vec!["?"; clean_cols.len()].join(",");
138 let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
139
140 {
141 let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
142
143 let ncols = clean_cols.len();
145 for line in sample_rows {
146 let mut vals = Vec::with_capacity(ncols);
147 vals.extend(line.split(delimiter).map(|s| s.trim()));
148 if vals.len() == ncols {
149 stmt.execute(rusqlite::params_from_iter(vals)).ok();
150 }
151 }
152
153 for line in lines.map_while(Result::ok) {
155 let mut vals = Vec::with_capacity(ncols);
156 vals.extend(line.split(delimiter).map(|s| s.trim()));
157 if vals.len() == ncols {
158 stmt.execute(rusqlite::params_from_iter(vals)).ok();
159 }
160 }
161 }
162 tx.commit().map_err(|e| e.to_string())?;
163 }
164
165 let sql_to_run = if explain {
166 format!("EXPLAIN QUERY PLAN {}", sql)
167 } else {
168 sql.to_string()
169 };
170 execute_and_format(&conn, &sql_to_run)
171}
172
173fn query_json_optimized(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
174 let content =
175 std::fs::read_to_string(path).map_err(|e| format!("Failed to read JSON: {}", e))?;
176 let json: Value =
177 serde_json::from_str(&content).map_err(|e| format!("Failed to parse JSON: {}", e))?;
178
179 let arr = json.as_array().ok_or("JSON must be an array of objects")?;
180 if arr.is_empty() {
181 return Err("JSON array is empty".into());
182 }
183
184 let first = arr[0].as_object().ok_or("First record must be an object")?;
185 let cols: Vec<String> = first.keys().cloned().collect();
186
187 let mut conn = Connection::open_in_memory().map_err(|e| e.to_string())?;
188
189 let mut create_sql = "CREATE TABLE source (".to_string();
190 for (i, col) in cols.iter().enumerate() {
191 let _ = write!(create_sql, "{} TEXT", col); if i < cols.len() - 1 {
193 create_sql.push_str(", ");
194 }
195 }
196 create_sql.push(')');
197
198 conn.execute(&create_sql, []).map_err(|e| e.to_string())?;
199
200 {
201 let tx = conn.transaction().map_err(|e| e.to_string())?;
202 let placeholders = vec!["?"; cols.len()].join(",");
203 let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
204 {
205 let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
206 for item in arr {
207 if let Some(obj) = item.as_object() {
208 let mut vals = Vec::with_capacity(cols.len());
209 for col in &cols {
210 vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
211 }
212 stmt.execute(rusqlite::params_from_iter(vals)).ok();
213 }
214 }
215 }
216 tx.commit().map_err(|e| e.to_string())?;
217 }
218
219 let sql_to_run = if explain {
220 format!("EXPLAIN QUERY PLAN {}", sql)
221 } else {
222 sql.to_string()
223 };
224 execute_and_format(&conn, &sql_to_run)
225}
226
227pub async fn export_as_table(args: &Value) -> Result<String, String> {
228 let items = args
229 .get("items")
230 .and_then(|v| v.as_array())
231 .ok_or("Missing 'items' array")?;
232 let path_str = args
233 .get("path")
234 .and_then(|v| v.as_str())
235 .ok_or("Missing 'path' argument")?;
236 let format = args
237 .get("format")
238 .and_then(|v| v.as_str())
239 .unwrap_or("csv")
240 .to_lowercase();
241 let path = PathBuf::from(path_str);
242
243 if items.is_empty() {
244 return Err("No items to export".into());
245 }
246
247 match format.as_str() {
248 "sqlite" | "db" => export_to_sqlite(&path, items),
249 "csv" => export_to_csv(&path, items),
250 _ => Err(format!("Unsupported export format: {}", format)),
251 }
252}
253
254fn export_to_sqlite(path: &PathBuf, items: &[Value]) -> Result<String, String> {
255 let first = items[0].as_object().ok_or("Items must be objects")?;
256 let cols: Vec<String> = first.keys().cloned().collect();
257
258 let conn = Connection::open(path).map_err(|e| format!("Failed to create DB: {}", e))?;
259
260 let mut create_sql = "CREATE TABLE IF NOT EXISTS data (".to_string();
261 for (i, col) in cols.iter().enumerate() {
262 let _ = write!(create_sql, "{} TEXT", col);
263 if i < cols.len() - 1 {
264 create_sql.push_str(", ");
265 }
266 }
267 create_sql.push(')');
268
269 conn.execute(&create_sql, [])
270 .map_err(|e| format!("DDL Error: {}", e))?;
271
272 {
273 let mut tx = Connection::open(path).map_err(|e| e.to_string())?;
274 let tx = tx.transaction().map_err(|e| e.to_string())?;
275 let placeholders = vec!["?"; cols.len()].join(",");
276 let insert_sql = format!("INSERT INTO data VALUES ({})", placeholders);
277
278 {
279 let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
280 for item in items {
281 if let Some(obj) = item.as_object() {
282 let mut vals = Vec::with_capacity(cols.len());
283 for col in &cols {
284 vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
285 }
286 stmt.execute(rusqlite::params_from_iter(vals)).ok();
287 }
288 }
289 }
290 tx.commit().map_err(|e| e.to_string())?;
291 }
292
293 Ok(format!(
294 "Successfully exported {} items to SQLite: {:?}",
295 items.len(),
296 path
297 ))
298}
299
300fn export_to_csv(path: &PathBuf, items: &[Value]) -> Result<String, String> {
301 let first = items[0].as_object().ok_or("Items must be objects")?;
302 let cols: Vec<String> = first.keys().cloned().collect();
303
304 let mut content = cols.join(",") + "\n";
305 for item in items {
306 if let Some(obj) = item.as_object() {
307 let mut row = Vec::with_capacity(cols.len());
308 for col in &cols {
309 let val = obj
310 .get(col)
311 .map(|v| {
312 let s = v.to_string();
313 if s.contains(',') || s.contains('"') {
314 format!("\"{}\"", s.replace("\"", "\"\""))
315 } else {
316 s
317 }
318 })
319 .unwrap_or_default();
320 row.push(val);
321 }
322 for (i, val) in row.iter().enumerate() {
323 if i > 0 {
324 content.push(',');
325 }
326 content.push_str(val);
327 }
328 content.push('\n');
329 }
330 }
331
332 std::fs::write(path, content).map_err(|e| format!("Failed to write CSV: {}", e))?;
333 Ok(format!(
334 "Successfully exported {} items to CSV: {:?}",
335 items.len(),
336 path
337 ))
338}
339
340fn execute_and_format(conn: &Connection, sql: &str) -> Result<String, String> {
341 let mut stmt = conn.prepare(sql).map_err(|e| format!("SQL Error: {}", e))?;
342 let col_count = stmt.column_count();
343 let col_names: Vec<String> = stmt
344 .column_names()
345 .into_iter()
346 .map(|s| s.to_string())
347 .collect();
348
349 let mut rows = stmt.query([]).map_err(|e| format!("Query Error: {}", e))?;
350
351 let mut out = String::with_capacity(col_names.len() * 16 * 50);
352 for name in &col_names {
354 let _ = write!(out, "{:<15} ", name);
355 }
356 out.push('\n');
357 out.push_str(&"-".repeat(col_names.len() * 16));
358 out.push('\n');
359
360 let mut count = 0;
361 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
362 for i in 0..col_count {
363 let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
364 let val_str = match val {
365 SqlValue::Null => "NULL".into(),
366 SqlValue::Integer(i) => i.to_string(),
367 SqlValue::Real(f) => format!("{:.4}", f),
368 SqlValue::Text(s) => s,
369 SqlValue::Blob(_) => "<BLOB>".into(),
370 };
371 let truncated = if val_str.len() > 14 {
373 format!("{}...", safe_head(&val_str, 11))
374 } else {
375 val_str
376 };
377 let _ = write!(out, "{:<15} ", truncated);
378 }
379 out.push('\n');
380 count += 1;
381 if count >= 100 {
382 out.push_str("\n[Result truncated to first 100 rows]\n");
383 break;
384 }
385 }
386
387 if count == 0 {
388 out.push_str("(No results found)\n");
389 } else if !sql.to_uppercase().contains("EXPLAIN") {
390 let _ = write!(out, "\nReturned {} rows.\n", count);
391 }
392
393 Ok(out)
394}
395
396pub async fn analyze_trends(args: &Value) -> Result<String, String> {
397 let sql = args
398 .get("sql")
399 .and_then(|v| v.as_str())
400 .ok_or("Missing 'sql' argument")?;
401 let path_str = args
402 .get("path")
403 .and_then(|v| v.as_str())
404 .ok_or("Missing 'path' argument")?;
405 let path = PathBuf::from(path_str);
406
407 let data = query_to_json_helper(&path, sql).await?;
409 if data.is_empty() {
410 return Ok("No data found to analyze.".into());
411 }
412
413 let python_code = format!(
415 r#"
416import math
417data = {data_json}
418
419def get_stats(vals):
420 if not vals: return None
421 vals.sort()
422 n = len(vals)
423 mean = sum(vals) / n
424 median = vals[n // 2] if n % 2 != 0 else (vals[n // 2 - 1] + vals[n // 2]) / 2
425 variance = sum((x - mean) ** 2 for x in vals) / n
426 std_dev = math.sqrt(variance)
427 return {{"min": vals[0], "max": vals[-1], "mean": mean, "median": median, "std_dev": std_dev}}
428
429# Extract first numeric column
430column_name = None
431values = []
432for row in data:
433 for k, v in row.items():
434 try:
435 val = float(v)
436 if column_name is None: column_name = k
437 if k == column_name: values.append(val)
438 except:
439 continue
440
441if not values:
442 print("Error: No numeric columns found in the result set.")
443 sys.exit(0)
444
445stats = get_stats(values)
446print(f"--- Statistical Analysis for '{{column_name}}' ---")
447print(f"Count: {{len(values)}}")
448print(f"Min: {{stats['min']:.4f}}")
449print(f"Max: {{stats['max']:.4f}}")
450print(f"Mean: {{stats['mean']:.4f}}")
451print(f"Median: {{stats['median']:.4f}}")
452print(f"StdDev: {{stats['std_dev']:.4f}}")
453print("\n--- Distribution (ASCII Histogram) ---")
454
455bins = 10
456range_val = stats['max'] - stats['min']
457if range_val == 0: range_val = 1
458bin_size = range_val / bins
459hist = [0] * bins
460
461for v in values:
462 idx = int((v - stats['min']) / bin_size)
463 if idx >= bins: idx = bins - 1
464 hist[idx] += 1
465
466max_count = max(hist) if hist else 1
467for i in range(bins):
468 b_min = stats['min'] + (i * bin_size)
469 b_max = b_min + bin_size
470 bar = "█" * int((hist[i] / max_count) * 20)
471 print(f"{{b_min:8.2f}} - {{b_max:8.2f}} | {{bar:<20}} ({{hist[i]}})")
472"#,
473 data_json = serde_json::to_string(&data).unwrap()
474 );
475
476 crate::tools::code_sandbox::execute(&serde_json::json!({
478 "language": "python",
479 "code": python_code
480 }))
481 .await
482}
483
484pub async fn query_to_json_helper(
485 path: &std::path::PathBuf,
486 sql: &str,
487) -> Result<Vec<Value>, String> {
488 let ext = path
489 .extension()
490 .and_then(|s| s.to_str())
491 .unwrap_or("")
492 .to_lowercase();
493 let conn = match ext.as_str() {
494 "db" | "sqlite" | "sqlite3" => Connection::open(path).map_err(|e| e.to_string())?,
495 "csv" | "json" => {
496 return Err("Streaming SQL results to Python currently requires a local .db file or a pre-audited CSV. Please use query_data to create a .db file first, or run analyze_trends directly on the file.".into());
499 }
500 _ => return Err("Unsupported format".into()),
501 };
502
503 let mut stmt = conn.prepare(sql).map_err(|e| e.to_string())?;
504 let col_names: Vec<String> = stmt
505 .column_names()
506 .into_iter()
507 .map(|s| s.to_string())
508 .collect();
509 let mut rows = stmt.query([]).map_err(|e| e.to_string())?;
510
511 let mut results = Vec::new();
512 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
513 let mut map = serde_json::Map::new();
514 for (i, name) in col_names.iter().enumerate() {
515 let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
516 let json_val = match val {
517 SqlValue::Null => Value::Null,
518 SqlValue::Integer(i) => Value::Number(i.into()),
519 SqlValue::Real(f) => serde_json::Number::from_f64(f)
520 .map(Value::Number)
521 .unwrap_or(Value::Null),
522 SqlValue::Text(s) => Value::String(s),
523 SqlValue::Blob(_) => Value::String("<BLOB>".into()),
524 };
525 map.insert(name.clone(), json_val);
526 }
527 results.push(Value::Object(map));
528 if results.len() >= 1000 {
529 break;
530 } }
532 Ok(results)
533}