vortex_tui/
datafusion_helper.rs1use std::sync::Arc;
7
8use arrow_array::Array as ArrowArray;
9use arrow_array::RecordBatch;
10use datafusion::datasource::listing::ListingOptions;
11use datafusion::datasource::listing::ListingTable;
12use datafusion::datasource::listing::ListingTableConfig;
13use datafusion::datasource::listing::ListingTableUrl;
14use datafusion::prelude::SessionContext;
15use vortex::session::VortexSession;
16use vortex_datafusion::VortexFormat;
17
18pub async fn execute_vortex_query(
27 session: &VortexSession,
28 file_path: &str,
29 sql: &str,
30) -> Result<Vec<RecordBatch>, String> {
31 let ctx = create_vortex_context(session, file_path).await?;
32
33 let df = ctx.sql(sql).await.map_err(|e| format!("SQL error: {e}"))?;
34
35 df.collect()
36 .await
37 .map_err(|e| format!("Query execution error: {e}"))
38}
39
40pub async fn create_vortex_context(
46 session: &VortexSession,
47 file_path: &str,
48) -> Result<SessionContext, String> {
49 let ctx = SessionContext::new();
50 let format = Arc::new(VortexFormat::new(session.clone()));
51
52 let table_url =
53 ListingTableUrl::parse(file_path).map_err(|e| format!("Failed to parse file path: {e}"))?;
54
55 let config = ListingTableConfig::new(table_url)
56 .with_listing_options(
57 ListingOptions::new(format).with_session_config_options(ctx.state().config()),
58 )
59 .infer_schema(&ctx.state())
60 .await
61 .map_err(|e| format!("Failed to infer schema: {e}"))?;
62
63 let listing_table = Arc::new(
64 ListingTable::try_new(config).map_err(|e| format!("Failed to create table: {e}"))?,
65 );
66
67 ctx.register_table("data", listing_table)
68 .map_err(|e| format!("Failed to register table: {e}"))?;
69
70 Ok(ctx)
71}
72
73#[allow(clippy::unwrap_used)]
80pub fn arrow_value_to_json(array: &dyn ArrowArray, idx: usize) -> serde_json::Value {
81 use arrow_array::*;
82 use arrow_schema::DataType;
83
84 if array.is_null(idx) {
85 return serde_json::Value::Null;
86 }
87
88 match array.data_type() {
89 DataType::Null => serde_json::Value::Null,
90 DataType::Boolean => {
91 let arr = array.as_any().downcast_ref::<BooleanArray>().unwrap();
92 serde_json::Value::Bool(arr.value(idx))
93 }
94 DataType::Int8 => {
95 let arr = array.as_any().downcast_ref::<Int8Array>().unwrap();
96 serde_json::json!(arr.value(idx))
97 }
98 DataType::Int16 => {
99 let arr = array.as_any().downcast_ref::<Int16Array>().unwrap();
100 serde_json::json!(arr.value(idx))
101 }
102 DataType::Int32 => {
103 let arr = array.as_any().downcast_ref::<Int32Array>().unwrap();
104 serde_json::json!(arr.value(idx))
105 }
106 DataType::Int64 => {
107 let arr = array.as_any().downcast_ref::<Int64Array>().unwrap();
108 serde_json::json!(arr.value(idx))
109 }
110 DataType::UInt8 => {
111 let arr = array.as_any().downcast_ref::<UInt8Array>().unwrap();
112 serde_json::json!(arr.value(idx))
113 }
114 DataType::UInt16 => {
115 let arr = array.as_any().downcast_ref::<UInt16Array>().unwrap();
116 serde_json::json!(arr.value(idx))
117 }
118 DataType::UInt32 => {
119 let arr = array.as_any().downcast_ref::<UInt32Array>().unwrap();
120 serde_json::json!(arr.value(idx))
121 }
122 DataType::UInt64 => {
123 let arr = array.as_any().downcast_ref::<UInt64Array>().unwrap();
124 serde_json::json!(arr.value(idx))
125 }
126 DataType::Float16 => {
127 let arr = array.as_any().downcast_ref::<Float16Array>().unwrap();
128 serde_json::json!(arr.value(idx).to_f32())
129 }
130 DataType::Float32 => {
131 let arr = array.as_any().downcast_ref::<Float32Array>().unwrap();
132 serde_json::json!(arr.value(idx))
133 }
134 DataType::Float64 => {
135 let arr = array.as_any().downcast_ref::<Float64Array>().unwrap();
136 serde_json::json!(arr.value(idx))
137 }
138 DataType::Utf8 => {
139 let arr = array.as_any().downcast_ref::<StringArray>().unwrap();
140 serde_json::Value::String(arr.value(idx).to_string())
141 }
142 DataType::LargeUtf8 => {
143 let arr = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
144 serde_json::Value::String(arr.value(idx).to_string())
145 }
146 DataType::Utf8View => {
147 let arr = array.as_any().downcast_ref::<StringViewArray>().unwrap();
148 serde_json::Value::String(arr.value(idx).to_string())
149 }
150 DataType::Binary => {
151 let arr = array.as_any().downcast_ref::<BinaryArray>().unwrap();
152 let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
153 serde_json::Value::String(hex)
154 }
155 DataType::LargeBinary => {
156 let arr = array.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
157 let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
158 serde_json::Value::String(hex)
159 }
160 DataType::BinaryView => {
161 let arr = array.as_any().downcast_ref::<BinaryViewArray>().unwrap();
162 let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect();
163 serde_json::Value::String(hex)
164 }
165 DataType::Date32 => {
166 let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
167 serde_json::json!(arr.value(idx))
168 }
169 DataType::Date64 => {
170 let arr = array.as_any().downcast_ref::<Date64Array>().unwrap();
171 serde_json::json!(arr.value(idx))
172 }
173 DataType::Timestamp(..) => {
174 if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() {
175 serde_json::json!(arr.value(idx))
176 } else if let Some(arr) = array.as_any().downcast_ref::<TimestampMillisecondArray>() {
177 serde_json::json!(arr.value(idx))
178 } else if let Some(arr) = array.as_any().downcast_ref::<TimestampSecondArray>() {
179 serde_json::json!(arr.value(idx))
180 } else if let Some(arr) = array.as_any().downcast_ref::<TimestampNanosecondArray>() {
181 serde_json::json!(arr.value(idx))
182 } else {
183 serde_json::Value::String("<timestamp>".to_string())
184 }
185 }
186 DataType::Decimal128(..) => {
187 let arr = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
188 serde_json::Value::String(arr.value_as_string(idx))
189 }
190 DataType::Decimal256(..) => {
191 let arr = array.as_any().downcast_ref::<Decimal256Array>().unwrap();
192 serde_json::Value::String(arr.value_as_string(idx))
193 }
194 DataType::List(_) => {
195 let arr = array.as_any().downcast_ref::<ListArray>().unwrap();
196 let value_arr = arr.value(idx);
197 let elements: Vec<serde_json::Value> = (0..value_arr.len())
198 .map(|i| arrow_value_to_json(value_arr.as_ref(), i))
199 .collect();
200 serde_json::Value::Array(elements)
201 }
202 DataType::LargeList(_) => {
203 let arr = array.as_any().downcast_ref::<LargeListArray>().unwrap();
204 let value_arr = arr.value(idx);
205 let elements: Vec<serde_json::Value> = (0..value_arr.len())
206 .map(|i| arrow_value_to_json(value_arr.as_ref(), i))
207 .collect();
208 serde_json::Value::Array(elements)
209 }
210 DataType::Struct(_) => {
211 let arr = array.as_any().downcast_ref::<StructArray>().unwrap();
212 let mut obj = serde_json::Map::new();
213 for (i, field) in arr.fields().iter().enumerate() {
214 let col = arr.column(i);
215 obj.insert(field.name().clone(), arrow_value_to_json(col.as_ref(), idx));
216 }
217 serde_json::Value::Object(obj)
218 }
219 _ => {
220 serde_json::Value::String(format!("<{}>", array.data_type()))
222 }
223 }
224}
225
226pub fn json_value_to_display(value: serde_json::Value) -> String {
232 match value {
233 serde_json::Value::Null => "NULL".to_string(),
234 serde_json::Value::String(s) => s,
235 other => other.to_string(),
236 }
237}