Skip to main content

vortex_tui/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Execute SQL queries against Vortex files using DataFusion.
5
6use std::path::PathBuf;
7
8use arrow_array::RecordBatch;
9use serde::Serialize;
10use vortex::error::VortexResult;
11use vortex::error::vortex_err;
12use vortex::session::VortexSession;
13
14use crate::datafusion_helper::arrow_value_to_json;
15use crate::datafusion_helper::execute_vortex_query;
16
17/// Command-line arguments for the query command.
18#[derive(Debug, clap::Parser)]
19pub struct QueryArgs {
20    /// Path to the Vortex file
21    pub file: PathBuf,
22
23    /// SQL query to execute. The table is available as 'data'.
24    /// Example: "SELECT * FROM data WHERE col > 10 LIMIT 100"
25    #[arg(long, short)]
26    pub sql: String,
27}
28
29#[derive(Serialize)]
30struct QueryOutput {
31    schema: SchemaInfo,
32    total_rows: u64,
33    rows: Vec<serde_json::Value>,
34}
35
36#[derive(Serialize)]
37struct SchemaInfo {
38    fields: Vec<FieldInfo>,
39}
40
41#[derive(Serialize)]
42struct FieldInfo {
43    name: String,
44    dtype: String,
45    nullable: bool,
46}
47
48/// Execute a SQL query against a Vortex file.
49///
50/// # Errors
51///
52/// Returns an error if the file cannot be opened or the query fails.
53pub async fn exec_query(session: &VortexSession, args: QueryArgs) -> VortexResult<()> {
54    let file_path = args
55        .file
56        .to_str()
57        .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
58
59    let batches: Vec<RecordBatch> = execute_vortex_query(session, file_path, &args.sql)
60        .await
61        .map_err(|e| vortex_err!("{e}"))?;
62
63    // Build schema info from the result
64    let schema = if let Some(batch) = batches.first() {
65        build_schema_from_arrow(batch.schema().as_ref())
66    } else {
67        SchemaInfo { fields: vec![] }
68    };
69
70    // Convert batches to JSON rows
71    let mut rows = Vec::new();
72    for batch in &batches {
73        batch_to_json_rows(batch, &mut rows)?;
74    }
75
76    let total_rows = rows.len() as u64;
77
78    let output = QueryOutput {
79        schema,
80        total_rows,
81        rows,
82    };
83
84    let json_output = serde_json::to_string_pretty(&output)
85        .map_err(|e| vortex_err!("Failed to serialize JSON: {e}"))?;
86    println!("{json_output}");
87
88    Ok(())
89}
90
91fn build_schema_from_arrow(schema: &arrow_schema::Schema) -> SchemaInfo {
92    let fields = schema
93        .fields()
94        .iter()
95        .map(|f| FieldInfo {
96            name: f.name().clone(),
97            dtype: f.data_type().to_string(),
98            nullable: f.is_nullable(),
99        })
100        .collect();
101
102    SchemaInfo { fields }
103}
104
105fn batch_to_json_rows(batch: &RecordBatch, rows: &mut Vec<serde_json::Value>) -> VortexResult<()> {
106    let schema = batch.schema();
107
108    for row_idx in 0..batch.num_rows() {
109        let mut obj = serde_json::Map::new();
110
111        for (col_idx, field) in schema.fields().iter().enumerate() {
112            let column = batch.column(col_idx);
113            let value = arrow_value_to_json(column.as_ref(), row_idx);
114            obj.insert(field.name().clone(), value);
115        }
116
117        rows.push(serde_json::Value::Object(obj));
118    }
119
120    Ok(())
121}