1use rusqlite::{types::Value as SqlValue, Connection};
2use serde_json::Value;
3use std::fs::File;
4use std::io::{BufRead, BufReader};
5use std::path::PathBuf;
6
7pub async fn query_data(args: &Value) -> Result<String, String> {
8 let sql = args
9 .get("sql")
10 .and_then(|v| v.as_str())
11 .ok_or("Missing 'sql' argument")?;
12 let path_str = args
13 .get("path")
14 .and_then(|v| v.as_str())
15 .ok_or("Missing 'path' argument")?;
16 let explain = args
17 .get("explain")
18 .and_then(|v| v.as_bool())
19 .unwrap_or(false);
20 let path = PathBuf::from(path_str);
21
22 if !path.exists() {
23 return Err(format!("File not found: {:?}", path));
24 }
25
26 let ext = path
27 .extension()
28 .and_then(|s| s.to_str())
29 .unwrap_or("")
30 .to_lowercase();
31
32 match ext.as_str() {
33 "db" | "sqlite" | "sqlite3" => query_sqlite(&path, sql, explain),
34 "csv" => query_csv_streaming(&path, sql, explain),
35 "json" => {
36 query_json_optimized(&path, sql, explain)
38 }
39 _ => Err(format!(
40 "Unsupported file extension for SQL query: .{}",
41 ext
42 )),
43 }
44}
45
46fn query_sqlite(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
47 let conn = Connection::open(path).map_err(|e| format!("Failed to open database: {}", e))?;
48 let sql_to_run = if explain {
49 format!("EXPLAIN QUERY PLAN {}", sql)
50 } else {
51 sql.to_string()
52 };
53 execute_and_format(&conn, &sql_to_run)
54}
55
56fn query_csv_streaming(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
57 let file = File::open(path).map_err(|e| format!("Failed to open file: {}", e))?;
58 let reader = BufReader::new(file);
59 let mut lines = reader.lines();
60
61 let header = lines
62 .next()
63 .ok_or("CSV file is empty")?
64 .map_err(|e| e.to_string())?;
65 let delimiter = if header.contains(',') { "," } else { "\t" };
66 let raw_cols: Vec<String> = header
67 .split(delimiter)
68 .map(|s| s.trim().replace("\"", ""))
69 .collect();
70
71 let clean_cols: Vec<String> = raw_cols
73 .iter()
74 .map(|s| {
75 s.chars()
76 .filter(|c| c.is_alphanumeric() || *c == '_')
77 .collect::<String>()
78 })
79 .map(|s| {
80 if s.is_empty() {
81 "column".to_string()
82 } else {
83 s
84 }
85 })
86 .collect();
87
88 let mut sample_rows = Vec::new();
90 for _ in 0..100 {
91 if let Some(Ok(line)) = lines.next() {
92 sample_rows.push(line);
93 } else {
94 break;
95 }
96 }
97
98 let mut col_types = vec!["INTEGER"; clean_cols.len()];
99 for line in &sample_rows {
100 let vals: Vec<&str> = line.split(delimiter).map(|s| s.trim()).collect();
101 for (i, val) in vals.iter().enumerate() {
102 if i >= col_types.len() {
103 break;
104 }
105 if col_types[i] == "TEXT" {
106 continue;
107 }
108
109 if val.parse::<i64>().is_err() {
110 if val.parse::<f64>().is_ok() {
111 col_types[i] = "REAL";
112 } else {
113 col_types[i] = "TEXT";
114 }
115 }
116 }
117 }
118
119 let mut conn = Connection::open_in_memory().map_err(|e| format!("Memory DB Error: {}", e))?;
120
121 let mut create_sql = format!("CREATE TABLE source (");
122 for (i, col) in clean_cols.iter().enumerate() {
123 create_sql.push_str(&format!("{} {}", col, col_types[i]));
124 if i < clean_cols.len() - 1 {
125 create_sql.push_str(", ");
126 }
127 }
128 create_sql.push_str(")");
129
130 conn.execute(&create_sql, [])
131 .map_err(|e| format!("DDL Error: {}", e))?;
132
133 {
135 let tx = conn.transaction().map_err(|e| e.to_string())?;
136 let placeholders = vec!["?"; clean_cols.len()].join(",");
137 let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
138
139 {
140 let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
141
142 for line in sample_rows {
144 let vals: Vec<&str> = line.split(delimiter).map(|s| s.trim()).collect();
145 if vals.len() == clean_cols.len() {
146 stmt.execute(rusqlite::params_from_iter(vals)).ok();
147 }
148 }
149
150 for line_res in lines {
152 if let Ok(line) = line_res {
153 let vals: Vec<&str> = line.split(delimiter).map(|s| s.trim()).collect();
154 if vals.len() == clean_cols.len() {
155 stmt.execute(rusqlite::params_from_iter(vals)).ok();
156 }
157 }
158 }
159 }
160 tx.commit().map_err(|e| e.to_string())?;
161 }
162
163 let sql_to_run = if explain {
164 format!("EXPLAIN QUERY PLAN {}", sql)
165 } else {
166 sql.to_string()
167 };
168 execute_and_format(&conn, &sql_to_run)
169}
170
171fn query_json_optimized(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
172 let content =
173 std::fs::read_to_string(path).map_err(|e| format!("Failed to read JSON: {}", e))?;
174 let json: Value =
175 serde_json::from_str(&content).map_err(|e| format!("Failed to parse JSON: {}", e))?;
176
177 let arr = json.as_array().ok_or("JSON must be an array of objects")?;
178 if arr.is_empty() {
179 return Err("JSON array is empty".into());
180 }
181
182 let first = arr[0].as_object().ok_or("First record must be an object")?;
183 let cols: Vec<String> = first.keys().cloned().collect();
184
185 let mut conn = Connection::open_in_memory().map_err(|e| e.to_string())?;
186
187 let mut create_sql = format!("CREATE TABLE source (");
188 for (i, col) in cols.iter().enumerate() {
189 create_sql.push_str(&format!("{} TEXT", col)); if i < cols.len() - 1 {
191 create_sql.push_str(", ");
192 }
193 }
194 create_sql.push_str(")");
195
196 conn.execute(&create_sql, []).map_err(|e| e.to_string())?;
197
198 {
199 let tx = conn.transaction().map_err(|e| e.to_string())?;
200 let placeholders = vec!["?"; cols.len()].join(",");
201 let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
202 {
203 let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
204 for item in arr {
205 if let Some(obj) = item.as_object() {
206 let mut vals = Vec::new();
207 for col in &cols {
208 vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
209 }
210 stmt.execute(rusqlite::params_from_iter(vals)).ok();
211 }
212 }
213 }
214 tx.commit().map_err(|e| e.to_string())?;
215 }
216
217 let sql_to_run = if explain {
218 format!("EXPLAIN QUERY PLAN {}", sql)
219 } else {
220 sql.to_string()
221 };
222 execute_and_format(&conn, &sql_to_run)
223}
224
225pub async fn export_as_table(args: &Value) -> Result<String, String> {
226 let items = args
227 .get("items")
228 .and_then(|v| v.as_array())
229 .ok_or("Missing 'items' array")?;
230 let path_str = args
231 .get("path")
232 .and_then(|v| v.as_str())
233 .ok_or("Missing 'path' argument")?;
234 let format = args
235 .get("format")
236 .and_then(|v| v.as_str())
237 .unwrap_or("csv")
238 .to_lowercase();
239 let path = PathBuf::from(path_str);
240
241 if items.is_empty() {
242 return Err("No items to export".into());
243 }
244
245 match format.as_str() {
246 "sqlite" | "db" => export_to_sqlite(&path, items),
247 "csv" => export_to_csv(&path, items),
248 _ => Err(format!("Unsupported export format: {}", format)),
249 }
250}
251
252fn export_to_sqlite(path: &PathBuf, items: &[Value]) -> Result<String, String> {
253 let first = items[0].as_object().ok_or("Items must be objects")?;
254 let cols: Vec<String> = first.keys().cloned().collect();
255
256 let conn = Connection::open(path).map_err(|e| format!("Failed to create DB: {}", e))?;
257
258 let mut create_sql = format!("CREATE TABLE IF NOT EXISTS data (");
259 for (i, col) in cols.iter().enumerate() {
260 create_sql.push_str(&format!("{} TEXT", col));
261 if i < cols.len() - 1 {
262 create_sql.push_str(", ");
263 }
264 }
265 create_sql.push_str(")");
266
267 conn.execute(&create_sql, [])
268 .map_err(|e| format!("DDL Error: {}", e))?;
269
270 {
271 let mut tx = Connection::open(path).map_err(|e| e.to_string())?;
272 let tx = tx.transaction().map_err(|e| e.to_string())?;
273 let placeholders = vec!["?"; cols.len()].join(",");
274 let insert_sql = format!("INSERT INTO data VALUES ({})", placeholders);
275
276 {
277 let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
278 for item in items {
279 if let Some(obj) = item.as_object() {
280 let mut vals = Vec::new();
281 for col in &cols {
282 vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
283 }
284 stmt.execute(rusqlite::params_from_iter(vals)).ok();
285 }
286 }
287 }
288 tx.commit().map_err(|e| e.to_string())?;
289 }
290
291 Ok(format!(
292 "Successfully exported {} items to SQLite: {:?}",
293 items.len(),
294 path
295 ))
296}
297
298fn export_to_csv(path: &PathBuf, items: &[Value]) -> Result<String, String> {
299 let first = items[0].as_object().ok_or("Items must be objects")?;
300 let cols: Vec<String> = first.keys().cloned().collect();
301
302 let mut content = cols.join(",") + "\n";
303 for item in items {
304 if let Some(obj) = item.as_object() {
305 let mut row = Vec::new();
306 for col in &cols {
307 let val = obj
308 .get(col)
309 .map(|v| {
310 let s = v.to_string();
311 if s.contains(',') || s.contains('"') {
312 format!("\"{}\"", s.replace("\"", "\"\""))
313 } else {
314 s
315 }
316 })
317 .unwrap_or_default();
318 row.push(val);
319 }
320 content.push_str(&(row.join(",") + "\n"));
321 }
322 }
323
324 std::fs::write(path, content).map_err(|e| format!("Failed to write CSV: {}", e))?;
325 Ok(format!(
326 "Successfully exported {} items to CSV: {:?}",
327 items.len(),
328 path
329 ))
330}
331
332fn execute_and_format(conn: &Connection, sql: &str) -> Result<String, String> {
333 let mut stmt = conn.prepare(sql).map_err(|e| format!("SQL Error: {}", e))?;
334 let col_count = stmt.column_count();
335 let col_names: Vec<String> = stmt
336 .column_names()
337 .into_iter()
338 .map(|s| s.to_string())
339 .collect();
340
341 let mut rows = stmt.query([]).map_err(|e| format!("Query Error: {}", e))?;
342
343 let mut out = String::new();
344 for name in &col_names {
346 out.push_str(&format!("{:<15} ", name));
347 }
348 out.push_str("\n");
349 out.push_str(&"-".repeat(col_names.len() * 16));
350 out.push_str("\n");
351
352 let mut count = 0;
353 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
354 for i in 0..col_count {
355 let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
356 let val_str = match val {
357 SqlValue::Null => "NULL".into(),
358 SqlValue::Integer(i) => i.to_string(),
359 SqlValue::Real(f) => format!("{:.4}", f),
360 SqlValue::Text(s) => s,
361 SqlValue::Blob(_) => "<BLOB>".into(),
362 };
363 let truncated = if val_str.len() > 14 {
365 format!("{}...", &val_str[..11])
366 } else {
367 val_str
368 };
369 out.push_str(&format!("{:<15} ", truncated));
370 }
371 out.push_str("\n");
372 count += 1;
373 if count >= 100 {
374 out.push_str("\n[Result truncated to first 100 rows]\n");
375 break;
376 }
377 }
378
379 if count == 0 {
380 out.push_str("(No results found)\n");
381 } else if !sql.to_uppercase().contains("EXPLAIN") {
382 out.push_str(&format!("\nReturned {} rows.\n", count));
383 }
384
385 Ok(out)
386}
387
388pub async fn analyze_trends(args: &Value) -> Result<String, String> {
389 let sql = args
390 .get("sql")
391 .and_then(|v| v.as_str())
392 .ok_or("Missing 'sql' argument")?;
393 let path_str = args
394 .get("path")
395 .and_then(|v| v.as_str())
396 .ok_or("Missing 'path' argument")?;
397 let path = PathBuf::from(path_str);
398
399 let data = query_to_json_helper(&path, sql).await?;
401 if data.is_empty() {
402 return Ok("No data found to analyze.".into());
403 }
404
405 let python_code = format!(
407 r#"
408import math
409data = {data_json}
410
411def get_stats(vals):
412 if not vals: return None
413 vals.sort()
414 n = len(vals)
415 mean = sum(vals) / n
416 median = vals[n // 2] if n % 2 != 0 else (vals[n // 2 - 1] + vals[n // 2]) / 2
417 variance = sum((x - mean) ** 2 for x in vals) / n
418 std_dev = math.sqrt(variance)
419 return {{"min": vals[0], "max": vals[-1], "mean": mean, "median": median, "std_dev": std_dev}}
420
421# Extract first numeric column
422column_name = None
423values = []
424for row in data:
425 for k, v in row.items():
426 try:
427 val = float(v)
428 if column_name is None: column_name = k
429 if k == column_name: values.append(val)
430 except:
431 continue
432
433if not values:
434 print("Error: No numeric columns found in the result set.")
435 sys.exit(0)
436
437stats = get_stats(values)
438print(f"--- Statistical Analysis for '{{column_name}}' ---")
439print(f"Count: {{len(values)}}")
440print(f"Min: {{stats['min']:.4f}}")
441print(f"Max: {{stats['max']:.4f}}")
442print(f"Mean: {{stats['mean']:.4f}}")
443print(f"Median: {{stats['median']:.4f}}")
444print(f"StdDev: {{stats['std_dev']:.4f}}")
445print("\n--- Distribution (ASCII Histogram) ---")
446
447bins = 10
448range_val = stats['max'] - stats['min']
449if range_val == 0: range_val = 1
450bin_size = range_val / bins
451hist = [0] * bins
452
453for v in values:
454 idx = int((v - stats['min']) / bin_size)
455 if idx >= bins: idx = bins - 1
456 hist[idx] += 1
457
458max_count = max(hist) if hist else 1
459for i in range(bins):
460 b_min = stats['min'] + (i * bin_size)
461 b_max = b_min + bin_size
462 bar = "█" * int((hist[i] / max_count) * 20)
463 print(f"{{b_min:8.2f}} - {{b_max:8.2f}} | {{bar:<20}} ({{hist[i]}})")
464"#,
465 data_json = serde_json::to_string(&data).unwrap()
466 );
467
468 crate::tools::code_sandbox::execute(&serde_json::json!({
470 "language": "python",
471 "code": python_code
472 }))
473 .await
474}
475
476pub async fn query_to_json_helper(
477 path: &std::path::PathBuf,
478 sql: &str,
479) -> Result<Vec<Value>, String> {
480 let ext = path
481 .extension()
482 .and_then(|s| s.to_str())
483 .unwrap_or("")
484 .to_lowercase();
485 let conn = match ext.as_str() {
486 "db" | "sqlite" | "sqlite3" => Connection::open(path).map_err(|e| e.to_string())?,
487 "csv" | "json" => {
488 return Err("Streaming SQL results to Python currently requires a local .db file or a pre-audited CSV. Please use query_data to create a .db file first, or run analyze_trends directly on the file.".into());
491 }
492 _ => return Err("Unsupported format".into()),
493 };
494
495 let mut stmt = conn.prepare(sql).map_err(|e| e.to_string())?;
496 let col_names: Vec<String> = stmt
497 .column_names()
498 .into_iter()
499 .map(|s| s.to_string())
500 .collect();
501 let mut rows = stmt.query([]).map_err(|e| e.to_string())?;
502
503 let mut results = Vec::new();
504 while let Some(row) = rows.next().map_err(|e| e.to_string())? {
505 let mut map = serde_json::Map::new();
506 for (i, name) in col_names.iter().enumerate() {
507 let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
508 let json_val = match val {
509 SqlValue::Null => Value::Null,
510 SqlValue::Integer(i) => Value::Number(i.into()),
511 SqlValue::Real(f) => serde_json::Number::from_f64(f)
512 .map(Value::Number)
513 .unwrap_or(Value::Null),
514 SqlValue::Text(s) => Value::String(s),
515 SqlValue::Blob(_) => Value::String("<BLOB>".into()),
516 };
517 map.insert(name.clone(), json_val);
518 }
519 results.push(Value::Object(map));
520 if results.len() >= 1000 {
521 break;
522 } }
524 Ok(results)
525}