Skip to main content

hematite/tools/
data_query.rs

1use crate::agent::truncation::safe_head;
2use rusqlite::{types::Value as SqlValue, Connection};
3use serde_json::Value;
4use std::fmt::Write as _;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::PathBuf;
8
9pub async fn query_data(args: &Value) -> Result<String, String> {
10    let sql = args
11        .get("sql")
12        .and_then(|v| v.as_str())
13        .ok_or("Missing 'sql' argument")?;
14    let path_str = args
15        .get("path")
16        .and_then(|v| v.as_str())
17        .ok_or("Missing 'path' argument")?;
18    let explain = args
19        .get("explain")
20        .and_then(|v| v.as_bool())
21        .unwrap_or(false);
22    let path = PathBuf::from(path_str);
23
24    if !path.exists() {
25        return Err(format!("File not found: {:?}", path));
26    }
27
28    let ext = path
29        .extension()
30        .and_then(|s| s.to_str())
31        .unwrap_or("")
32        .to_lowercase();
33
34    match ext.as_str() {
35        "db" | "sqlite" | "sqlite3" => query_sqlite(&path, sql, explain),
36        "csv" => query_csv_streaming(&path, sql, explain),
37        "json" => {
38            // JSON is harder to stream without a streaming parser, but we'll optimize the batching
39            query_json_optimized(&path, sql, explain)
40        }
41        _ => Err(format!(
42            "Unsupported file extension for SQL query: .{}",
43            ext
44        )),
45    }
46}
47
48fn query_sqlite(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
49    let conn = Connection::open(path).map_err(|e| format!("Failed to open database: {}", e))?;
50    let sql_to_run = if explain {
51        format!("EXPLAIN QUERY PLAN {}", sql)
52    } else {
53        sql.to_string()
54    };
55    execute_and_format(&conn, &sql_to_run)
56}
57
58fn query_csv_streaming(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
59    let file = File::open(path).map_err(|e| format!("Failed to open file: {}", e))?;
60    let reader = BufReader::new(file);
61    let mut lines = reader.lines();
62
63    let header = lines
64        .next()
65        .ok_or("CSV file is empty")?
66        .map_err(|e| e.to_string())?;
67    let delimiter = if header.contains(',') { "," } else { "\t" };
68    let raw_cols: Vec<String> = header
69        .split(delimiter)
70        .map(|s| s.trim().replace("\"", ""))
71        .collect();
72
73    // FAANG Signal: Clean identifiers for SQL
74    let clean_cols: Vec<String> = raw_cols
75        .iter()
76        .map(|s| {
77            s.chars()
78                .filter(|c| c.is_alphanumeric() || *c == '_')
79                .collect::<String>()
80        })
81        .map(|s| {
82            if s.is_empty() {
83                "column".to_string()
84            } else {
85                s
86            }
87        })
88        .collect();
89
90    // FAANG Signal: Schema Inference (Scan first 100 rows)
91    let mut sample_rows = Vec::new();
92    for _ in 0..100 {
93        if let Some(Ok(line)) = lines.next() {
94            sample_rows.push(line);
95        } else {
96            break;
97        }
98    }
99
100    let mut col_types = vec!["INTEGER"; clean_cols.len()];
101    for line in &sample_rows {
102        for (i, val) in line.split(delimiter).map(|s| s.trim()).enumerate() {
103            if i >= col_types.len() {
104                break;
105            }
106            if col_types[i] == "TEXT" {
107                continue;
108            }
109
110            if val.parse::<i64>().is_err() {
111                if val.parse::<f64>().is_ok() {
112                    col_types[i] = "REAL";
113                } else {
114                    col_types[i] = "TEXT";
115                }
116            }
117        }
118    }
119
120    let mut conn = Connection::open_in_memory().map_err(|e| format!("Memory DB Error: {}", e))?;
121
122    let mut create_sql = "CREATE TABLE source (".to_string();
123    for (i, col) in clean_cols.iter().enumerate() {
124        let _ = write!(create_sql, "{} {}", col, col_types[i]);
125        if i < clean_cols.len() - 1 {
126            create_sql.push_str(", ");
127        }
128    }
129    create_sql.push(')');
130
131    conn.execute(&create_sql, [])
132        .map_err(|e| format!("DDL Error: {}", e))?;
133
134    // FAANG Signal: Transactional Batch Ingestion
135    {
136        let tx = conn.transaction().map_err(|e| e.to_string())?;
137        let placeholders = vec!["?"; clean_cols.len()].join(",");
138        let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
139
140        {
141            let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
142
143            // Insert sample rows
144            let ncols = clean_cols.len();
145            for line in sample_rows {
146                let mut vals = Vec::with_capacity(ncols);
147                vals.extend(line.split(delimiter).map(|s| s.trim()));
148                if vals.len() == ncols {
149                    stmt.execute(rusqlite::params_from_iter(vals)).ok();
150                }
151            }
152
153            // Insert remaining rows (Streaming)
154            for line in lines.map_while(Result::ok) {
155                let mut vals = Vec::with_capacity(ncols);
156                vals.extend(line.split(delimiter).map(|s| s.trim()));
157                if vals.len() == ncols {
158                    stmt.execute(rusqlite::params_from_iter(vals)).ok();
159                }
160            }
161        }
162        tx.commit().map_err(|e| e.to_string())?;
163    }
164
165    let sql_to_run = if explain {
166        format!("EXPLAIN QUERY PLAN {}", sql)
167    } else {
168        sql.to_string()
169    };
170    execute_and_format(&conn, &sql_to_run)
171}
172
173fn query_json_optimized(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
174    let content =
175        std::fs::read_to_string(path).map_err(|e| format!("Failed to read JSON: {}", e))?;
176    let json: Value =
177        serde_json::from_str(&content).map_err(|e| format!("Failed to parse JSON: {}", e))?;
178
179    let arr = json.as_array().ok_or("JSON must be an array of objects")?;
180    if arr.is_empty() {
181        return Err("JSON array is empty".into());
182    }
183
184    let first = arr[0].as_object().ok_or("First record must be an object")?;
185    let cols: Vec<String> = first.keys().cloned().collect();
186
187    let mut conn = Connection::open_in_memory().map_err(|e| e.to_string())?;
188
189    let mut create_sql = "CREATE TABLE source (".to_string();
190    for (i, col) in cols.iter().enumerate() {
191        let _ = write!(create_sql, "{} TEXT", col); // JSON is dynamic, default to TEXT
192        if i < cols.len() - 1 {
193            create_sql.push_str(", ");
194        }
195    }
196    create_sql.push(')');
197
198    conn.execute(&create_sql, []).map_err(|e| e.to_string())?;
199
200    {
201        let tx = conn.transaction().map_err(|e| e.to_string())?;
202        let placeholders = vec!["?"; cols.len()].join(",");
203        let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
204        {
205            let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
206            for item in arr {
207                if let Some(obj) = item.as_object() {
208                    let mut vals = Vec::with_capacity(cols.len());
209                    for col in &cols {
210                        vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
211                    }
212                    stmt.execute(rusqlite::params_from_iter(vals)).ok();
213                }
214            }
215        }
216        tx.commit().map_err(|e| e.to_string())?;
217    }
218
219    let sql_to_run = if explain {
220        format!("EXPLAIN QUERY PLAN {}", sql)
221    } else {
222        sql.to_string()
223    };
224    execute_and_format(&conn, &sql_to_run)
225}
226
227pub async fn export_as_table(args: &Value) -> Result<String, String> {
228    let items = args
229        .get("items")
230        .and_then(|v| v.as_array())
231        .ok_or("Missing 'items' array")?;
232    let path_str = args
233        .get("path")
234        .and_then(|v| v.as_str())
235        .ok_or("Missing 'path' argument")?;
236    let format = args
237        .get("format")
238        .and_then(|v| v.as_str())
239        .unwrap_or("csv")
240        .to_lowercase();
241    let path = PathBuf::from(path_str);
242
243    if items.is_empty() {
244        return Err("No items to export".into());
245    }
246
247    match format.as_str() {
248        "sqlite" | "db" => export_to_sqlite(&path, items),
249        "csv" => export_to_csv(&path, items),
250        _ => Err(format!("Unsupported export format: {}", format)),
251    }
252}
253
254fn export_to_sqlite(path: &PathBuf, items: &[Value]) -> Result<String, String> {
255    let first = items[0].as_object().ok_or("Items must be objects")?;
256    let cols: Vec<String> = first.keys().cloned().collect();
257
258    let conn = Connection::open(path).map_err(|e| format!("Failed to create DB: {}", e))?;
259
260    let mut create_sql = "CREATE TABLE IF NOT EXISTS data (".to_string();
261    for (i, col) in cols.iter().enumerate() {
262        let _ = write!(create_sql, "{} TEXT", col);
263        if i < cols.len() - 1 {
264            create_sql.push_str(", ");
265        }
266    }
267    create_sql.push(')');
268
269    conn.execute(&create_sql, [])
270        .map_err(|e| format!("DDL Error: {}", e))?;
271
272    {
273        let mut tx = Connection::open(path).map_err(|e| e.to_string())?;
274        let tx = tx.transaction().map_err(|e| e.to_string())?;
275        let placeholders = vec!["?"; cols.len()].join(",");
276        let insert_sql = format!("INSERT INTO data VALUES ({})", placeholders);
277
278        {
279            let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
280            for item in items {
281                if let Some(obj) = item.as_object() {
282                    let mut vals = Vec::with_capacity(cols.len());
283                    for col in &cols {
284                        vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
285                    }
286                    stmt.execute(rusqlite::params_from_iter(vals)).ok();
287                }
288            }
289        }
290        tx.commit().map_err(|e| e.to_string())?;
291    }
292
293    Ok(format!(
294        "Successfully exported {} items to SQLite: {:?}",
295        items.len(),
296        path
297    ))
298}
299
300fn export_to_csv(path: &PathBuf, items: &[Value]) -> Result<String, String> {
301    let first = items[0].as_object().ok_or("Items must be objects")?;
302    let cols: Vec<String> = first.keys().cloned().collect();
303
304    let mut content = cols.join(",") + "\n";
305    for item in items {
306        if let Some(obj) = item.as_object() {
307            let mut row = Vec::with_capacity(cols.len());
308            for col in &cols {
309                let val = obj
310                    .get(col)
311                    .map(|v| {
312                        let s = v.to_string();
313                        if s.contains(',') || s.contains('"') {
314                            format!("\"{}\"", s.replace("\"", "\"\""))
315                        } else {
316                            s
317                        }
318                    })
319                    .unwrap_or_default();
320                row.push(val);
321            }
322            for (i, val) in row.iter().enumerate() {
323                if i > 0 {
324                    content.push(',');
325                }
326                content.push_str(val);
327            }
328            content.push('\n');
329        }
330    }
331
332    std::fs::write(path, content).map_err(|e| format!("Failed to write CSV: {}", e))?;
333    Ok(format!(
334        "Successfully exported {} items to CSV: {:?}",
335        items.len(),
336        path
337    ))
338}
339
340fn execute_and_format(conn: &Connection, sql: &str) -> Result<String, String> {
341    let mut stmt = conn.prepare(sql).map_err(|e| format!("SQL Error: {}", e))?;
342    let col_count = stmt.column_count();
343    let col_names: Vec<String> = stmt
344        .column_names()
345        .into_iter()
346        .map(|s| s.to_string())
347        .collect();
348
349    let mut rows = stmt.query([]).map_err(|e| format!("Query Error: {}", e))?;
350
351    let mut out = String::with_capacity(col_names.len() * 16 * 50);
352    // Header
353    for name in &col_names {
354        let _ = write!(out, "{:<15} ", name);
355    }
356    out.push('\n');
357    out.push_str(&"-".repeat(col_names.len() * 16));
358    out.push('\n');
359
360    let mut count = 0;
361    while let Some(row) = rows.next().map_err(|e| e.to_string())? {
362        for i in 0..col_count {
363            let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
364            let val_str = match val {
365                SqlValue::Null => "NULL".into(),
366                SqlValue::Integer(i) => i.to_string(),
367                SqlValue::Real(f) => format!("{:.4}", f),
368                SqlValue::Text(s) => s,
369                SqlValue::Blob(_) => "<BLOB>".into(),
370            };
371            // Truncate long strings for table view
372            let truncated = if val_str.len() > 14 {
373                format!("{}...", safe_head(&val_str, 11))
374            } else {
375                val_str
376            };
377            let _ = write!(out, "{:<15} ", truncated);
378        }
379        out.push('\n');
380        count += 1;
381        if count >= 100 {
382            out.push_str("\n[Result truncated to first 100 rows]\n");
383            break;
384        }
385    }
386
387    if count == 0 {
388        out.push_str("(No results found)\n");
389    } else if !sql.to_uppercase().contains("EXPLAIN") {
390        let _ = write!(out, "\nReturned {} rows.\n", count);
391    }
392
393    Ok(out)
394}
395
396pub async fn analyze_trends(args: &Value) -> Result<String, String> {
397    let sql = args
398        .get("sql")
399        .and_then(|v| v.as_str())
400        .ok_or("Missing 'sql' argument")?;
401    let path_str = args
402        .get("path")
403        .and_then(|v| v.as_str())
404        .ok_or("Missing 'path' argument")?;
405    let path = PathBuf::from(path_str);
406
407    // 1. Get raw data from SQL
408    let data = query_to_json_helper(&path, sql).await?;
409    if data.is_empty() {
410        return Ok("No data found to analyze.".into());
411    }
412
413    // 2. Prepare Python script for statistical analysis and ASCII charting
414    let python_code = format!(
415        r#"
416import math
417data = {data_json}
418
419def get_stats(vals):
420    if not vals: return None
421    vals.sort()
422    n = len(vals)
423    mean = sum(vals) / n
424    median = vals[n // 2] if n % 2 != 0 else (vals[n // 2 - 1] + vals[n // 2]) / 2
425    variance = sum((x - mean) ** 2 for x in vals) / n
426    std_dev = math.sqrt(variance)
427    return {{"min": vals[0], "max": vals[-1], "mean": mean, "median": median, "std_dev": std_dev}}
428
429# Extract first numeric column
430column_name = None
431values = []
432for row in data:
433    for k, v in row.items():
434        try:
435            val = float(v)
436            if column_name is None: column_name = k
437            if k == column_name: values.append(val)
438        except:
439            continue
440
441if not values:
442    print("Error: No numeric columns found in the result set.")
443    sys.exit(0)
444
445stats = get_stats(values)
446print(f"--- Statistical Analysis for '{{column_name}}' ---")
447print(f"Count:  {{len(values)}}")
448print(f"Min:    {{stats['min']:.4f}}")
449print(f"Max:    {{stats['max']:.4f}}")
450print(f"Mean:   {{stats['mean']:.4f}}")
451print(f"Median: {{stats['median']:.4f}}")
452print(f"StdDev: {{stats['std_dev']:.4f}}")
453print("\n--- Distribution (ASCII Histogram) ---")
454
455bins = 10
456range_val = stats['max'] - stats['min']
457if range_val == 0: range_val = 1
458bin_size = range_val / bins
459hist = [0] * bins
460
461for v in values:
462    idx = int((v - stats['min']) / bin_size)
463    if idx >= bins: idx = bins - 1
464    hist[idx] += 1
465
466max_count = max(hist) if hist else 1
467for i in range(bins):
468    b_min = stats['min'] + (i * bin_size)
469    b_max = b_min + bin_size
470    bar = "█" * int((hist[i] / max_count) * 20)
471    print(f"{{b_min:8.2f}} - {{b_max:8.2f}} | {{bar:<20}} ({{hist[i]}})")
472"#,
473        data_json = serde_json::to_string(&data).unwrap()
474    );
475
476    // 3. Execute in Python sandbox
477    crate::tools::code_sandbox::execute(&serde_json::json!({
478        "language": "python",
479        "code": python_code
480    }))
481    .await
482}
483
484pub async fn query_to_json_helper(
485    path: &std::path::PathBuf,
486    sql: &str,
487) -> Result<Vec<Value>, String> {
488    let ext = path
489        .extension()
490        .and_then(|s| s.to_str())
491        .unwrap_or("")
492        .to_lowercase();
493    let conn = match ext.as_str() {
494        "db" | "sqlite" | "sqlite3" => Connection::open(path).map_err(|e| e.to_string())?,
495        "csv" | "json" => {
496            // For now, reuse the ingestion logic by calling query_data and then we'd need to extract.
497            // Actually, it's better to implement a proper "get_connection" helper.
498            return Err("Streaming SQL results to Python currently requires a local .db file or a pre-audited CSV. Please use query_data to create a .db file first, or run analyze_trends directly on the file.".into());
499        }
500        _ => return Err("Unsupported format".into()),
501    };
502
503    let mut stmt = conn.prepare(sql).map_err(|e| e.to_string())?;
504    let col_names: Vec<String> = stmt
505        .column_names()
506        .into_iter()
507        .map(|s| s.to_string())
508        .collect();
509    let mut rows = stmt.query([]).map_err(|e| e.to_string())?;
510
511    let mut results = Vec::new();
512    while let Some(row) = rows.next().map_err(|e| e.to_string())? {
513        let mut map = serde_json::Map::new();
514        for (i, name) in col_names.iter().enumerate() {
515            let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
516            let json_val = match val {
517                SqlValue::Null => Value::Null,
518                SqlValue::Integer(i) => Value::Number(i.into()),
519                SqlValue::Real(f) => serde_json::Number::from_f64(f)
520                    .map(Value::Number)
521                    .unwrap_or(Value::Null),
522                SqlValue::Text(s) => Value::String(s),
523                SqlValue::Blob(_) => Value::String("<BLOB>".into()),
524            };
525            map.insert(name.clone(), json_val);
526        }
527        results.push(Value::Object(map));
528        if results.len() >= 1000 {
529            break;
530        } // FAANG Signal: Safety limit
531    }
532    Ok(results)
533}