Skip to main content

vortex_tui/
datafusion_helper.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Shared DataFusion query execution utilities for both CLI and TUI.
5
6use std::sync::Arc;
7
8use arrow_array::Array as ArrowArray;
9use arrow_array::RecordBatch;
10use datafusion::datasource::listing::ListingOptions;
11use datafusion::datasource::listing::ListingTable;
12use datafusion::datasource::listing::ListingTableConfig;
13use datafusion::datasource::listing::ListingTableUrl;
14use datafusion::prelude::SessionContext;
15use vortex::session::VortexSession;
16use vortex_datafusion::VortexFormat;
17
18/// Execute a SQL query against a Vortex file.
19///
20/// The file is registered as a table named "data".
21/// Returns the result as a vector of RecordBatches.
22///
23/// # Errors
24///
25/// Returns an error if the query fails to parse or execute.
26pub async fn execute_vortex_query(
27    session: &VortexSession,
28    file_path: &str,
29    sql: &str,
30) -> Result<Vec<RecordBatch>, String> {
31    let ctx = create_vortex_context(session, file_path).await?;
32
33    let df = ctx.sql(sql).await.map_err(|e| format!("SQL error: {e}"))?;
34
35    df.collect()
36        .await
37        .map_err(|e| format!("Query execution error: {e}"))
38}
39
40/// Create a DataFusion SessionContext with a Vortex file registered as "data".
41///
42/// # Errors
43///
44/// Returns an error if the context cannot be created.
45pub async fn create_vortex_context(
46    session: &VortexSession,
47    file_path: &str,
48) -> Result<SessionContext, String> {
49    let ctx = SessionContext::new();
50    let format = Arc::new(VortexFormat::new(session.clone()));
51
52    let table_url =
53        ListingTableUrl::parse(file_path).map_err(|e| format!("Failed to parse file path: {e}"))?;
54
55    let config = ListingTableConfig::new(table_url)
56        .with_listing_options(
57            ListingOptions::new(format).with_session_config_options(ctx.state().config()),
58        )
59        .infer_schema(&ctx.state())
60        .await
61        .map_err(|e| format!("Failed to infer schema: {e}"))?;
62
63    let listing_table = Arc::new(
64        ListingTable::try_new(config).map_err(|e| format!("Failed to create table: {e}"))?,
65    );
66
67    ctx.register_table("data", listing_table)
68        .map_err(|e| format!("Failed to register table: {e}"))?;
69
70    Ok(ctx)
71}
72
73/// Convert an Arrow array value at a given index to a JSON value.
74///
75/// # Panics
76///
77/// Panics if the array type doesn't match the expected Arrow array type during downcast.
78/// This should not happen for well-formed Arrow arrays.
79#[allow(clippy::unwrap_used)]
80pub fn arrow_value_to_json(array: &dyn ArrowArray, idx: usize) -> serde_json::Value {
81    use arrow_array::*;
82    use arrow_schema::DataType;
83
84    if array.is_null(idx) {
85        return serde_json::Value::Null;
86    }
87
88    match array.data_type() {
89        DataType::Null => serde_json::Value::Null,
90        DataType::Boolean => {
91            let arr = array.as_any().downcast_ref::<BooleanArray>().unwrap();
92            serde_json::Value::Bool(arr.value(idx))
93        }
94        DataType::Int8 => {
95            let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
96            serde_json::json!(arr.value(idx))
97        }
98        DataType::Int16 => {
99            let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
100            serde_json::json!(arr.value(idx))
101        }
102        DataType::Int32 => {
103            let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
104            serde_json::json!(arr.value(idx))
105        }
106        DataType::Int64 => {
107            let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
108            serde_json::json!(arr.value(idx))
109        }
110        DataType::UInt8 => {
111            let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
112            serde_json::json!(arr.value(idx))
113        }
114        DataType::UInt16 => {
115            let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
116            serde_json::json!(arr.value(idx))
117        }
118        DataType::UInt32 => {
119            let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
120            serde_json::json!(arr.value(idx))
121        }
122        DataType::UInt64 => {
123            let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
124            serde_json::json!(arr.value(idx))
125        }
126        DataType::Float16 => {
127            let arr = array.as_any().downcast_ref::<Float16Array>().unwrap();
128            serde_json::json!(arr.value(idx).to_f32())
129        }
130        DataType::Float32 => {
131            let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
132            serde_json::json!(arr.value(idx))
133        }
134        DataType::Float64 => {
135            let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
136            serde_json::json!(arr.value(idx))
137        }
138        DataType::Utf8 => {
139            let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
140            serde_json::Value::String(arr.value(idx).to_string())
141        }
142        DataType::LargeUtf8 => {
143            let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
144            serde_json::Value::String(arr.value(idx).to_string())
145        }
146        DataType::Utf8View => {
147            let arr = array.as_any().downcast_ref::<StringViewArray>().unwrap();
148            serde_json::Value::String(arr.value(idx).to_string())
149        }
150        DataType::Binary => {
151            let arr = array.as_any().downcast_ref::<BinaryArray>().unwrap();
152            let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
153            serde_json::Value::String(hex)
154        }
155        DataType::LargeBinary => {
156            let arr = array.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
157            let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
158            serde_json::Value::String(hex)
159        }
160        DataType::BinaryView => {
161            let arr = array.as_any().downcast_ref::<BinaryViewArray>().unwrap();
162            let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
163            serde_json::Value::String(hex)
164        }
165        DataType::Date32 => {
166            let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
167            serde_json::json!(arr.value(idx))
168        }
169        DataType::Date64 => {
170            let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
171            serde_json::json!(arr.value(idx))
172        }
173        DataType::Timestamp(..) => {
174            if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
175                serde_json::json!(arr.value(idx))
176            } else if let Some(arr) = array.as_any().downcast_ref::<TimestampMillisecondArray>() {
177                serde_json::json!(arr.value(idx))
178            } else if let Some(arr) = array.as_any().downcast_ref::<TimestampSecondArray>() {
179                serde_json::json!(arr.value(idx))
180            } else if let Some(arr) = array.as_any().downcast_ref::<TimestampNanosecondArray>() {
181                serde_json::json!(arr.value(idx))
182            } else {
183                serde_json::Value::String("<timestamp>".to_string())
184            }
185        }
186        DataType::Decimal128(..) => {
187            let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
188            serde_json::Value::String(arr.value_as_string(idx))
189        }
190        DataType::Decimal256(..) => {
191            let arr = array.as_any().downcast_ref::<Decimal256Array>().unwrap();
192            serde_json::Value::String(arr.value_as_string(idx))
193        }
194        DataType::List(_) => {
195            let arr = array.as_any().downcast_ref::<ListArray>().unwrap();
196            let value_arr = arr.value(idx);
197            let elements: Vec<serde_json::Value> = (0..value_arr.len())
198                .map(|i| arrow_value_to_json(value_arr.as_ref(), i))
199                .collect();
200            serde_json::Value::Array(elements)
201        }
202        DataType::LargeList(_) => {
203            let arr = array.as_any().downcast_ref::<LargeListArray>().unwrap();
204            let value_arr = arr.value(idx);
205            let elements: Vec<serde_json::Value> = (0..value_arr.len())
206                .map(|i| arrow_value_to_json(value_arr.as_ref(), i))
207                .collect();
208            serde_json::Value::Array(elements)
209        }
210        DataType::Struct(_) => {
211            let arr = array.as_any().downcast_ref::<StructArray>().unwrap();
212            let mut obj = serde_json::Map::new();
213            for (i, field) in arr.fields().iter().enumerate() {
214                let col = arr.column(i);
215                obj.insert(field.name().clone(), arrow_value_to_json(col.as_ref(), idx));
216            }
217            serde_json::Value::Object(obj)
218        }
219        _ => {
220            // Fallback for unsupported types
221            serde_json::Value::String(format!("<{}>", array.data_type()))
222        }
223    }
224}
225
226/// Format a JSON value for display in the TUI.
227///
228/// - Null becomes "NULL"
229/// - Strings are displayed without quotes
230/// - Other values use their JSON string representation
231pub fn json_value_to_display(value: serde_json::Value) -> String {
232    match value {
233        serde_json::Value::Null => "NULL".to_string(),
234        serde_json::Value::String(s) => s,
235        other => other.to_string(),
236    }
237}