1use std::path::PathBuf;
7
8use arrow_array::RecordBatch;
9use serde::Serialize;
10use vortex::error::VortexResult;
11use vortex::error::vortex_err;
12use vortex::session::VortexSession;
13
14use crate::datafusion_helper::arrow_value_to_json;
15use crate::datafusion_helper::execute_vortex_query;
16
17#[derive(Debug, clap::Parser)]
19pub struct QueryArgs {
20 pub file: PathBuf,
22
23 #[arg(long, short)]
26 pub sql: String,
27}
28
29#[derive(Serialize)]
30struct QueryOutput {
31 schema: SchemaInfo,
32 total_rows: u64,
33 rows: Vec<serde_json::Value>,
34}
35
36#[derive(Serialize)]
37struct SchemaInfo {
38 fields: Vec<FieldInfo>,
39}
40
41#[derive(Serialize)]
42struct FieldInfo {
43 name: String,
44 dtype: String,
45 nullable: bool,
46}
47
48pub async fn exec_query(session: &VortexSession, args: QueryArgs) -> VortexResult<()> {
54 let file_path = args
55 .file
56 .to_str()
57 .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
58
59 let batches: Vec<RecordBatch> = execute_vortex_query(session, file_path, &args.sql)
60 .await
61 .map_err(|e| vortex_err!("{e}"))?;
62
63 let schema = if let Some(batch) = batches.first() {
65 build_schema_from_arrow(batch.schema().as_ref())
66 } else {
67 SchemaInfo { fields: vec![] }
68 };
69
70 let mut rows = Vec::new();
72 for batch in &batches {
73 batch_to_json_rows(batch, &mut rows)?;
74 }
75
76 let total_rows = rows.len() as u64;
77
78 let output = QueryOutput {
79 schema,
80 total_rows,
81 rows,
82 };
83
84 let json_output = serde_json::to_string_pretty(&output)
85 .map_err(|e| vortex_err!("Failed to serialize JSON: {e}"))?;
86 println!("{json_output}");
87
88 Ok(())
89}
90
91fn build_schema_from_arrow(schema: &arrow_schema::Schema) -> SchemaInfo {
92 let fields = schema
93 .fields()
94 .iter()
95 .map(|f| FieldInfo {
96 name: f.name().clone(),
97 dtype: f.data_type().to_string(),
98 nullable: f.is_nullable(),
99 })
100 .collect();
101
102 SchemaInfo { fields }
103}
104
105fn batch_to_json_rows(batch: &RecordBatch, rows: &mut Vec<serde_json::Value>) -> VortexResult<()> {
106 let schema = batch.schema();
107
108 for row_idx in 0..batch.num_rows() {
109 let mut obj = serde_json::Map::new();
110
111 for (col_idx, field) in schema.fields().iter().enumerate() {
112 let column = batch.column(col_idx);
113 let value = arrow_value_to_json(column.as_ref(), row_idx);
114 obj.insert(field.name().clone(), value);
115 }
116
117 rows.push(serde_json::Value::Object(obj));
118 }
119
120 Ok(())
121}