Skip to main content

hematite/tools/
data_query.rs

1use rusqlite::{types::Value as SqlValue, Connection};
2use serde_json::Value;
3use std::fs::File;
4use std::io::{BufRead, BufReader};
5use std::path::PathBuf;
6
7pub async fn query_data(args: &Value) -> Result<String, String> {
8    let sql = args
9        .get("sql")
10        .and_then(|v| v.as_str())
11        .ok_or("Missing 'sql' argument")?;
12    let path_str = args
13        .get("path")
14        .and_then(|v| v.as_str())
15        .ok_or("Missing 'path' argument")?;
16    let explain = args
17        .get("explain")
18        .and_then(|v| v.as_bool())
19        .unwrap_or(false);
20    let path = PathBuf::from(path_str);
21
22    if !path.exists() {
23        return Err(format!("File not found: {:?}", path));
24    }
25
26    let ext = path
27        .extension()
28        .and_then(|s| s.to_str())
29        .unwrap_or("")
30        .to_lowercase();
31
32    match ext.as_str() {
33        "db" | "sqlite" | "sqlite3" => query_sqlite(&path, sql, explain),
34        "csv" => query_csv_streaming(&path, sql, explain),
35        "json" => {
36            // JSON is harder to stream without a streaming parser, but we'll optimize the batching
37            query_json_optimized(&path, sql, explain)
38        }
39        _ => Err(format!(
40            "Unsupported file extension for SQL query: .{}",
41            ext
42        )),
43    }
44}
45
46fn query_sqlite(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
47    let conn = Connection::open(path).map_err(|e| format!("Failed to open database: {}", e))?;
48    let sql_to_run = if explain {
49        format!("EXPLAIN QUERY PLAN {}", sql)
50    } else {
51        sql.to_string()
52    };
53    execute_and_format(&conn, &sql_to_run)
54}
55
56fn query_csv_streaming(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
57    let file = File::open(path).map_err(|e| format!("Failed to open file: {}", e))?;
58    let reader = BufReader::new(file);
59    let mut lines = reader.lines();
60
61    let header = lines
62        .next()
63        .ok_or("CSV file is empty")?
64        .map_err(|e| e.to_string())?;
65    let delimiter = if header.contains(',') { "," } else { "\t" };
66    let raw_cols: Vec<String> = header
67        .split(delimiter)
68        .map(|s| s.trim().replace("\"", ""))
69        .collect();
70
71    // FAANG Signal: Clean identifiers for SQL
72    let clean_cols: Vec<String> = raw_cols
73        .iter()
74        .map(|s| {
75            s.chars()
76                .filter(|c| c.is_alphanumeric() || *c == '_')
77                .collect::<String>()
78        })
79        .map(|s| {
80            if s.is_empty() {
81                "column".to_string()
82            } else {
83                s
84            }
85        })
86        .collect();
87
88    // FAANG Signal: Schema Inference (Scan first 100 rows)
89    let mut sample_rows = Vec::new();
90    for _ in 0..100 {
91        if let Some(Ok(line)) = lines.next() {
92            sample_rows.push(line);
93        } else {
94            break;
95        }
96    }
97
98    let mut col_types = vec!["INTEGER"; clean_cols.len()];
99    for line in &sample_rows {
100        let vals: Vec<&str> = line.split(delimiter).map(|s| s.trim()).collect();
101        for (i, val) in vals.iter().enumerate() {
102            if i >= col_types.len() {
103                break;
104            }
105            if col_types[i] == "TEXT" {
106                continue;
107            }
108
109            if val.parse::<i64>().is_err() {
110                if val.parse::<f64>().is_ok() {
111                    col_types[i] = "REAL";
112                } else {
113                    col_types[i] = "TEXT";
114                }
115            }
116        }
117    }
118
119    let mut conn = Connection::open_in_memory().map_err(|e| format!("Memory DB Error: {}", e))?;
120
121    let mut create_sql = format!("CREATE TABLE source (");
122    for (i, col) in clean_cols.iter().enumerate() {
123        create_sql.push_str(&format!("{} {}", col, col_types[i]));
124        if i < clean_cols.len() - 1 {
125            create_sql.push_str(", ");
126        }
127    }
128    create_sql.push_str(")");
129
130    conn.execute(&create_sql, [])
131        .map_err(|e| format!("DDL Error: {}", e))?;
132
133    // FAANG Signal: Transactional Batch Ingestion
134    {
135        let tx = conn.transaction().map_err(|e| e.to_string())?;
136        let placeholders = vec!["?"; clean_cols.len()].join(",");
137        let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
138
139        {
140            let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
141
142            // Insert sample rows
143            for line in sample_rows {
144                let vals: Vec<&str> = line.split(delimiter).map(|s| s.trim()).collect();
145                if vals.len() == clean_cols.len() {
146                    stmt.execute(rusqlite::params_from_iter(vals)).ok();
147                }
148            }
149
150            // Insert remaining rows (Streaming)
151            for line_res in lines {
152                if let Ok(line) = line_res {
153                    let vals: Vec<&str> = line.split(delimiter).map(|s| s.trim()).collect();
154                    if vals.len() == clean_cols.len() {
155                        stmt.execute(rusqlite::params_from_iter(vals)).ok();
156                    }
157                }
158            }
159        }
160        tx.commit().map_err(|e| e.to_string())?;
161    }
162
163    let sql_to_run = if explain {
164        format!("EXPLAIN QUERY PLAN {}", sql)
165    } else {
166        sql.to_string()
167    };
168    execute_and_format(&conn, &sql_to_run)
169}
170
171fn query_json_optimized(path: &PathBuf, sql: &str, explain: bool) -> Result<String, String> {
172    let content =
173        std::fs::read_to_string(path).map_err(|e| format!("Failed to read JSON: {}", e))?;
174    let json: Value =
175        serde_json::from_str(&content).map_err(|e| format!("Failed to parse JSON: {}", e))?;
176
177    let arr = json.as_array().ok_or("JSON must be an array of objects")?;
178    if arr.is_empty() {
179        return Err("JSON array is empty".into());
180    }
181
182    let first = arr[0].as_object().ok_or("First record must be an object")?;
183    let cols: Vec<String> = first.keys().cloned().collect();
184
185    let mut conn = Connection::open_in_memory().map_err(|e| e.to_string())?;
186
187    let mut create_sql = format!("CREATE TABLE source (");
188    for (i, col) in cols.iter().enumerate() {
189        create_sql.push_str(&format!("{} TEXT", col)); // JSON is dynamic, default to TEXT
190        if i < cols.len() - 1 {
191            create_sql.push_str(", ");
192        }
193    }
194    create_sql.push_str(")");
195
196    conn.execute(&create_sql, []).map_err(|e| e.to_string())?;
197
198    {
199        let tx = conn.transaction().map_err(|e| e.to_string())?;
200        let placeholders = vec!["?"; cols.len()].join(",");
201        let insert_sql = format!("INSERT INTO source VALUES ({})", placeholders);
202        {
203            let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
204            for item in arr {
205                if let Some(obj) = item.as_object() {
206                    let mut vals = Vec::new();
207                    for col in &cols {
208                        vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
209                    }
210                    stmt.execute(rusqlite::params_from_iter(vals)).ok();
211                }
212            }
213        }
214        tx.commit().map_err(|e| e.to_string())?;
215    }
216
217    let sql_to_run = if explain {
218        format!("EXPLAIN QUERY PLAN {}", sql)
219    } else {
220        sql.to_string()
221    };
222    execute_and_format(&conn, &sql_to_run)
223}
224
225pub async fn export_as_table(args: &Value) -> Result<String, String> {
226    let items = args
227        .get("items")
228        .and_then(|v| v.as_array())
229        .ok_or("Missing 'items' array")?;
230    let path_str = args
231        .get("path")
232        .and_then(|v| v.as_str())
233        .ok_or("Missing 'path' argument")?;
234    let format = args
235        .get("format")
236        .and_then(|v| v.as_str())
237        .unwrap_or("csv")
238        .to_lowercase();
239    let path = PathBuf::from(path_str);
240
241    if items.is_empty() {
242        return Err("No items to export".into());
243    }
244
245    match format.as_str() {
246        "sqlite" | "db" => export_to_sqlite(&path, items),
247        "csv" => export_to_csv(&path, items),
248        _ => Err(format!("Unsupported export format: {}", format)),
249    }
250}
251
252fn export_to_sqlite(path: &PathBuf, items: &[Value]) -> Result<String, String> {
253    let first = items[0].as_object().ok_or("Items must be objects")?;
254    let cols: Vec<String> = first.keys().cloned().collect();
255
256    let conn = Connection::open(path).map_err(|e| format!("Failed to create DB: {}", e))?;
257
258    let mut create_sql = format!("CREATE TABLE IF NOT EXISTS data (");
259    for (i, col) in cols.iter().enumerate() {
260        create_sql.push_str(&format!("{} TEXT", col));
261        if i < cols.len() - 1 {
262            create_sql.push_str(", ");
263        }
264    }
265    create_sql.push_str(")");
266
267    conn.execute(&create_sql, [])
268        .map_err(|e| format!("DDL Error: {}", e))?;
269
270    {
271        let mut tx = Connection::open(path).map_err(|e| e.to_string())?;
272        let tx = tx.transaction().map_err(|e| e.to_string())?;
273        let placeholders = vec!["?"; cols.len()].join(",");
274        let insert_sql = format!("INSERT INTO data VALUES ({})", placeholders);
275
276        {
277            let mut stmt = tx.prepare(&insert_sql).map_err(|e| e.to_string())?;
278            for item in items {
279                if let Some(obj) = item.as_object() {
280                    let mut vals = Vec::new();
281                    for col in &cols {
282                        vals.push(obj.get(col).map(|v| v.to_string()).unwrap_or_default());
283                    }
284                    stmt.execute(rusqlite::params_from_iter(vals)).ok();
285                }
286            }
287        }
288        tx.commit().map_err(|e| e.to_string())?;
289    }
290
291    Ok(format!(
292        "Successfully exported {} items to SQLite: {:?}",
293        items.len(),
294        path
295    ))
296}
297
298fn export_to_csv(path: &PathBuf, items: &[Value]) -> Result<String, String> {
299    let first = items[0].as_object().ok_or("Items must be objects")?;
300    let cols: Vec<String> = first.keys().cloned().collect();
301
302    let mut content = cols.join(",") + "\n";
303    for item in items {
304        if let Some(obj) = item.as_object() {
305            let mut row = Vec::new();
306            for col in &cols {
307                let val = obj
308                    .get(col)
309                    .map(|v| {
310                        let s = v.to_string();
311                        if s.contains(',') || s.contains('"') {
312                            format!("\"{}\"", s.replace("\"", "\"\""))
313                        } else {
314                            s
315                        }
316                    })
317                    .unwrap_or_default();
318                row.push(val);
319            }
320            content.push_str(&(row.join(",") + "\n"));
321        }
322    }
323
324    std::fs::write(path, content).map_err(|e| format!("Failed to write CSV: {}", e))?;
325    Ok(format!(
326        "Successfully exported {} items to CSV: {:?}",
327        items.len(),
328        path
329    ))
330}
331
332fn execute_and_format(conn: &Connection, sql: &str) -> Result<String, String> {
333    let mut stmt = conn.prepare(sql).map_err(|e| format!("SQL Error: {}", e))?;
334    let col_count = stmt.column_count();
335    let col_names: Vec<String> = stmt
336        .column_names()
337        .into_iter()
338        .map(|s| s.to_string())
339        .collect();
340
341    let mut rows = stmt.query([]).map_err(|e| format!("Query Error: {}", e))?;
342
343    let mut out = String::new();
344    // Header
345    for name in &col_names {
346        out.push_str(&format!("{:<15} ", name));
347    }
348    out.push_str("\n");
349    out.push_str(&"-".repeat(col_names.len() * 16));
350    out.push_str("\n");
351
352    let mut count = 0;
353    while let Some(row) = rows.next().map_err(|e| e.to_string())? {
354        for i in 0..col_count {
355            let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
356            let val_str = match val {
357                SqlValue::Null => "NULL".into(),
358                SqlValue::Integer(i) => i.to_string(),
359                SqlValue::Real(f) => format!("{:.4}", f),
360                SqlValue::Text(s) => s,
361                SqlValue::Blob(_) => "<BLOB>".into(),
362            };
363            // Truncate long strings for table view
364            let truncated = if val_str.len() > 14 {
365                format!("{}...", &val_str[..11])
366            } else {
367                val_str
368            };
369            out.push_str(&format!("{:<15} ", truncated));
370        }
371        out.push_str("\n");
372        count += 1;
373        if count >= 100 {
374            out.push_str("\n[Result truncated to first 100 rows]\n");
375            break;
376        }
377    }
378
379    if count == 0 {
380        out.push_str("(No results found)\n");
381    } else if !sql.to_uppercase().contains("EXPLAIN") {
382        out.push_str(&format!("\nReturned {} rows.\n", count));
383    }
384
385    Ok(out)
386}
387
388pub async fn analyze_trends(args: &Value) -> Result<String, String> {
389    let sql = args
390        .get("sql")
391        .and_then(|v| v.as_str())
392        .ok_or("Missing 'sql' argument")?;
393    let path_str = args
394        .get("path")
395        .and_then(|v| v.as_str())
396        .ok_or("Missing 'path' argument")?;
397    let path = PathBuf::from(path_str);
398
399    // 1. Get raw data from SQL
400    let data = query_to_json_helper(&path, sql).await?;
401    if data.is_empty() {
402        return Ok("No data found to analyze.".into());
403    }
404
405    // 2. Prepare Python script for statistical analysis and ASCII charting
406    let python_code = format!(
407        r#"
408import math
409data = {data_json}
410
411def get_stats(vals):
412    if not vals: return None
413    vals.sort()
414    n = len(vals)
415    mean = sum(vals) / n
416    median = vals[n // 2] if n % 2 != 0 else (vals[n // 2 - 1] + vals[n // 2]) / 2
417    variance = sum((x - mean) ** 2 for x in vals) / n
418    std_dev = math.sqrt(variance)
419    return {{"min": vals[0], "max": vals[-1], "mean": mean, "median": median, "std_dev": std_dev}}
420
421# Extract first numeric column
422column_name = None
423values = []
424for row in data:
425    for k, v in row.items():
426        try:
427            val = float(v)
428            if column_name is None: column_name = k
429            if k == column_name: values.append(val)
430        except:
431            continue
432
433if not values:
434    print("Error: No numeric columns found in the result set.")
435    sys.exit(0)
436
437stats = get_stats(values)
438print(f"--- Statistical Analysis for '{{column_name}}' ---")
439print(f"Count:  {{len(values)}}")
440print(f"Min:    {{stats['min']:.4f}}")
441print(f"Max:    {{stats['max']:.4f}}")
442print(f"Mean:   {{stats['mean']:.4f}}")
443print(f"Median: {{stats['median']:.4f}}")
444print(f"StdDev: {{stats['std_dev']:.4f}}")
445print("\n--- Distribution (ASCII Histogram) ---")
446
447bins = 10
448range_val = stats['max'] - stats['min']
449if range_val == 0: range_val = 1
450bin_size = range_val / bins
451hist = [0] * bins
452
453for v in values:
454    idx = int((v - stats['min']) / bin_size)
455    if idx >= bins: idx = bins - 1
456    hist[idx] += 1
457
458max_count = max(hist) if hist else 1
459for i in range(bins):
460    b_min = stats['min'] + (i * bin_size)
461    b_max = b_min + bin_size
462    bar = "█" * int((hist[i] / max_count) * 20)
463    print(f"{{b_min:8.2f}} - {{b_max:8.2f}} | {{bar:<20}} ({{hist[i]}})")
464"#,
465        data_json = serde_json::to_string(&data).unwrap()
466    );
467
468    // 3. Execute in Python sandbox
469    crate::tools::code_sandbox::execute(&serde_json::json!({
470        "language": "python",
471        "code": python_code
472    }))
473    .await
474}
475
476pub async fn query_to_json_helper(
477    path: &std::path::PathBuf,
478    sql: &str,
479) -> Result<Vec<Value>, String> {
480    let ext = path
481        .extension()
482        .and_then(|s| s.to_str())
483        .unwrap_or("")
484        .to_lowercase();
485    let conn = match ext.as_str() {
486        "db" | "sqlite" | "sqlite3" => Connection::open(path).map_err(|e| e.to_string())?,
487        "csv" | "json" => {
488            // For now, reuse the ingestion logic by calling query_data and then we'd need to extract.
489            // Actually, it's better to implement a proper "get_connection" helper.
490            return Err("Streaming SQL results to Python currently requires a local .db file or a pre-audited CSV. Please use query_data to create a .db file first, or run analyze_trends directly on the file.".into());
491        }
492        _ => return Err("Unsupported format".into()),
493    };
494
495    let mut stmt = conn.prepare(sql).map_err(|e| e.to_string())?;
496    let col_names: Vec<String> = stmt
497        .column_names()
498        .into_iter()
499        .map(|s| s.to_string())
500        .collect();
501    let mut rows = stmt.query([]).map_err(|e| e.to_string())?;
502
503    let mut results = Vec::new();
504    while let Some(row) = rows.next().map_err(|e| e.to_string())? {
505        let mut map = serde_json::Map::new();
506        for (i, name) in col_names.iter().enumerate() {
507            let val: SqlValue = row.get(i).unwrap_or(SqlValue::Null);
508            let json_val = match val {
509                SqlValue::Null => Value::Null,
510                SqlValue::Integer(i) => Value::Number(i.into()),
511                SqlValue::Real(f) => serde_json::Number::from_f64(f)
512                    .map(Value::Number)
513                    .unwrap_or(Value::Null),
514                SqlValue::Text(s) => Value::String(s),
515                SqlValue::Blob(_) => Value::String("<BLOB>".into()),
516            };
517            map.insert(name.clone(), json_val);
518        }
519        results.push(Value::Object(map));
520        if results.len() >= 1000 {
521            break;
522        } // FAANG Signal: Safety limit
523    }
524    Ok(results)
525}