netsky 0.1.7

netsky CLI: the viable system launcher and subcommand dispatcher
Documentation
//! DataFusion SQL over netsky observability tables.

use chrono::Utc;
use netsky_db::Db;
use serde_json::{Value, json};

use super::db_diag::{wrap_open_error, wrap_query_error};

pub fn run(sql: &str, json_out: bool) -> netsky_core::Result<()> {
    let db = Db::open_read_only().map_err(wrap_open_error)?;
    if !json_out {
        let output = db.query(sql).map_err(|e| wrap_query_error(&db, e))?;
        println!("{output}");
        return Ok(());
    }
    let batches = db
        .query_batches(sql)
        .map_err(|e| wrap_query_error(&db, e))?;
    let rows = batches_to_rows(&batches)?;
    let envelope = json!({
        "command": "query",
        "status": "green",
        "summary": format!("{} row(s)", rows.len()),
        "generated_at": Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
        "data": {
            "sql": sql,
            "row_count": rows.len(),
            "rows": rows,
        },
    });
    println!("{}", serde_json::to_string_pretty(&envelope)?);
    Ok(())
}

fn batches_to_rows(batches: &[netsky_db::ArrowRecordBatch]) -> netsky_core::Result<Vec<Value>> {
    use netsky_db::arrow_array::ArrayRef;
    let mut rows = Vec::new();
    for batch in batches {
        let schema = batch.schema();
        let columns: Vec<(String, ArrayRef)> = schema
            .fields()
            .iter()
            .enumerate()
            .map(|(i, f)| (f.name().clone(), batch.column(i).clone()))
            .collect();
        for row_idx in 0..batch.num_rows() {
            let mut obj = serde_json::Map::with_capacity(columns.len());
            for (name, col) in &columns {
                obj.insert(name.clone(), array_value(col.as_ref(), row_idx));
            }
            rows.push(Value::Object(obj));
        }
    }
    Ok(rows)
}

fn array_value(arr: &dyn netsky_db::arrow_array::Array, idx: usize) -> Value {
    use netsky_db::arrow_array::{BooleanArray, Float64Array, Int32Array, Int64Array, StringArray};
    if arr.is_null(idx) {
        return Value::Null;
    }
    if let Some(a) = arr.as_any().downcast_ref::<Int64Array>() {
        return json!(a.value(idx));
    }
    if let Some(a) = arr.as_any().downcast_ref::<Int32Array>() {
        return json!(a.value(idx));
    }
    if let Some(a) = arr.as_any().downcast_ref::<Float64Array>() {
        return json!(a.value(idx));
    }
    if let Some(a) = arr.as_any().downcast_ref::<BooleanArray>() {
        return json!(a.value(idx));
    }
    if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
        return json!(a.value(idx));
    }
    // Fallback: stringify whatever DataFusion handed us so the row is still
    // emittable. Lossy on exotic types (timestamps with tz, lists, structs)
    // but never panics, which is the contract for `query --json` over
    // arbitrary user SQL.
    Value::String(format!("{:?}", arr.slice(idx, 1)))
}