1use std::collections::HashMap;
2use std::fmt;
3use std::ops::Index;
4use std::sync::Arc;
5
6use arrow_array::array::{
7 Array, BinaryArray, BooleanArray, Decimal128Array, Decimal256Array, DictionaryArray,
8 Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray,
9 StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
10 TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
11};
12use arrow_array::types::{
13 Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
14};
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17
18use crate::error::Error;
19
20#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
22pub enum QueryType {
23 #[default]
25 Sql,
26 InfluxQL,
28}
29
30impl QueryType {
31 pub fn as_str(self) -> &'static str {
32 match self {
33 QueryType::Sql => "sql",
34 QueryType::InfluxQL => "influxql",
35 }
36 }
37}
38
39pub type QueryParameters = HashMap<String, serde_json::Value>;
44
45#[derive(Debug, Clone, Default)]
47pub struct QueryOptions {
48 pub(crate) query_type: QueryType,
49 pub headers: HashMap<String, String>,
51}
52
53#[derive(Debug, Clone, PartialEq)]
55pub enum Value {
56 Bool(bool),
57 I8(i8),
58 I16(i16),
59 I32(i32),
60 I64(i64),
61 U8(u8),
62 U16(u16),
63 U32(u32),
64 U64(u64),
65 F32(f32),
66 F64(f64),
67 String(String),
68 Binary(Vec<u8>),
69 Timestamp(i64),
71 Null,
72}
73
74impl Value {
75 pub fn as_f64(&self) -> Option<f64> {
76 match self {
77 Value::F64(v) => Some(*v),
78 Value::F32(v) => Some(*v as f64),
79 Value::I64(v) => Some(*v as f64),
80 Value::I32(v) => Some(*v as f64),
81 Value::U64(v) => Some(*v as f64),
82 Value::U32(v) => Some(*v as f64),
83 _ => None,
84 }
85 }
86
87 pub fn as_i64(&self) -> Option<i64> {
88 match self {
89 Value::I64(v) => Some(*v),
90 Value::I32(v) => Some(*v as i64),
91 Value::I16(v) => Some(*v as i64),
92 Value::I8(v) => Some(*v as i64),
93 Value::Timestamp(v) => Some(*v),
94 _ => None,
95 }
96 }
97
98 pub fn as_str(&self) -> Option<&str> {
99 match self {
100 Value::String(s) => Some(s.as_str()),
101 _ => None,
102 }
103 }
104
105 pub fn as_bool(&self) -> Option<bool> {
106 match self {
107 Value::Bool(b) => Some(*b),
108 _ => None,
109 }
110 }
111
112 pub fn is_null(&self) -> bool {
113 matches!(self, Value::Null)
114 }
115}
116
117impl fmt::Display for Value {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 match self {
120 Value::Bool(v) => write!(f, "{v}"),
121 Value::I8(v) => write!(f, "{v}"),
122 Value::I16(v) => write!(f, "{v}"),
123 Value::I32(v) => write!(f, "{v}"),
124 Value::I64(v) => write!(f, "{v}"),
125 Value::U8(v) => write!(f, "{v}"),
126 Value::U16(v) => write!(f, "{v}"),
127 Value::U32(v) => write!(f, "{v}"),
128 Value::U64(v) => write!(f, "{v}"),
129 Value::F32(v) => write!(f, "{v}"),
130 Value::F64(v) => write!(f, "{v}"),
131 Value::String(v) => f.write_str(v),
132 Value::Binary(v) => write!(f, "{}b", v.len()),
133 Value::Timestamp(v) => write!(f, "{v}"),
134 Value::Null => f.write_str("null"),
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
145pub struct Row {
146 values: Vec<Value>,
147 columns: Arc<Vec<String>>,
148 index: Arc<HashMap<String, usize>>,
149}
150
151impl Row {
152 pub fn get(&self, name: &str) -> Option<&Value> {
154 self.index.get(name).and_then(|&i| self.values.get(i))
155 }
156
157 pub fn at(&self, idx: usize) -> Option<&Value> {
159 self.values.get(idx)
160 }
161
162 pub fn columns(&self) -> &[String] {
164 &self.columns
165 }
166
167 pub fn values(&self) -> &[Value] {
169 &self.values
170 }
171
172 pub fn len(&self) -> usize {
174 self.values.len()
175 }
176
177 pub fn is_empty(&self) -> bool {
178 self.values.is_empty()
179 }
180
181 pub fn into_map(self) -> HashMap<String, Value> {
184 self.columns.iter().cloned().zip(self.values).collect()
185 }
186}
187
188impl Index<&str> for Row {
189 type Output = Value;
190 fn index(&self, name: &str) -> &Value {
191 self.get(name)
192 .unwrap_or_else(|| panic!("no column named '{name}'"))
193 }
194}
195
196impl Index<usize> for Row {
197 type Output = Value;
198 fn index(&self, idx: usize) -> &Value {
199 &self.values[idx]
200 }
201}
202
203pub struct QueryResult {
208 pub(crate) schema: SchemaRef,
209 pub(crate) batches: Vec<RecordBatch>,
210}
211
212impl QueryResult {
213 pub fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
214 QueryResult { schema, batches }
215 }
216
217 pub fn schema(&self) -> &SchemaRef {
218 &self.schema
219 }
220
221 pub fn record_batches(&self) -> &[RecordBatch] {
223 &self.batches
224 }
225
226 pub fn num_rows(&self) -> usize {
228 self.batches.iter().map(|b| b.num_rows()).sum()
229 }
230
231 pub fn column_names(&self) -> Vec<&str> {
233 self.schema
234 .fields()
235 .iter()
236 .map(|f| f.name().as_str())
237 .collect()
238 }
239
240 pub fn rows(self) -> Result<Vec<Row>, Error> {
242 self.into_iter().collect()
243 }
244
245 #[cfg(feature = "polars")]
255 pub fn to_polars(self) -> crate::Result<polars::prelude::DataFrame> {
256 use arrow::ipc::writer::FileWriter;
257 use polars::io::SerReader;
258 use polars::prelude::IpcReader;
259 use std::io::Cursor;
260
261 let mut buf: Vec<u8> = Vec::new();
262 {
263 let mut writer = FileWriter::try_new(&mut buf, &self.schema)?;
264 for batch in &self.batches {
265 writer.write(batch)?;
266 }
267 writer.finish()?;
268 }
269
270 let cursor = Cursor::new(buf);
271 IpcReader::new(cursor)
272 .finish()
273 .map_err(|e| crate::error::Error::Config(format!("polars conversion error: {e}")))
274 }
275}
276
277impl IntoIterator for QueryResult {
278 type Item = Result<Row, Error>;
279 type IntoIter = QueryIterator;
280
281 fn into_iter(self) -> Self::IntoIter {
282 QueryIterator::new(self.schema, self.batches)
283 }
284}
285
286pub struct QueryIterator {
291 schema: SchemaRef,
292 batches: Vec<RecordBatch>,
293 batch_idx: usize,
294 row_idx: usize,
295 columns: Arc<Vec<String>>,
296 index: Arc<HashMap<String, usize>>,
297}
298
299impl QueryIterator {
300 pub(crate) fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
301 let columns: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
302 let index: HashMap<String, usize> = columns
303 .iter()
304 .enumerate()
305 .map(|(i, n)| (n.clone(), i))
306 .collect();
307 QueryIterator {
308 schema,
309 batches,
310 batch_idx: 0,
311 row_idx: 0,
312 columns: Arc::new(columns),
313 index: Arc::new(index),
314 }
315 }
316
317 pub fn column_names(&self) -> &[String] {
319 &self.columns
320 }
321
322 pub fn num_rows(&self) -> usize {
324 self.batches.iter().map(|b| b.num_rows()).sum()
325 }
326}
327
328impl Iterator for QueryIterator {
329 type Item = Result<Row, Error>;
330
331 fn next(&mut self) -> Option<Self::Item> {
332 while self.batch_idx < self.batches.len()
333 && self.row_idx >= self.batches[self.batch_idx].num_rows()
334 {
335 self.batch_idx += 1;
336 self.row_idx = 0;
337 }
338
339 if self.batch_idx >= self.batches.len() {
340 return None;
341 }
342
343 let batch = &self.batches[self.batch_idx];
344 let row = self.row_idx;
345 self.row_idx += 1;
346
347 let mut values = Vec::with_capacity(batch.num_columns());
348 for col_idx in 0..self.schema.fields().len() {
349 let col = batch.column(col_idx);
350 values.push(extract_value(col.as_ref(), row));
351 }
352
353 Some(Ok(Row {
354 values,
355 columns: Arc::clone(&self.columns),
356 index: Arc::clone(&self.index),
357 }))
358 }
359}
360
361pub fn extract_value(array: &dyn Array, row: usize) -> Value {
363 use arrow_schema::DataType::*;
364
365 if array.is_null(row) {
366 return Value::Null;
367 }
368
369 match array.data_type() {
370 Boolean => Value::Bool(
371 array
372 .as_any()
373 .downcast_ref::<BooleanArray>()
374 .unwrap()
375 .value(row),
376 ),
377 Int8 => Value::I8(
378 array
379 .as_any()
380 .downcast_ref::<Int8Array>()
381 .unwrap()
382 .value(row),
383 ),
384 Int16 => Value::I16(
385 array
386 .as_any()
387 .downcast_ref::<Int16Array>()
388 .unwrap()
389 .value(row),
390 ),
391 Int32 => Value::I32(
392 array
393 .as_any()
394 .downcast_ref::<Int32Array>()
395 .unwrap()
396 .value(row),
397 ),
398 Int64 => Value::I64(
399 array
400 .as_any()
401 .downcast_ref::<Int64Array>()
402 .unwrap()
403 .value(row),
404 ),
405 UInt8 => Value::U8(
406 array
407 .as_any()
408 .downcast_ref::<UInt8Array>()
409 .unwrap()
410 .value(row),
411 ),
412 UInt16 => Value::U16(
413 array
414 .as_any()
415 .downcast_ref::<UInt16Array>()
416 .unwrap()
417 .value(row),
418 ),
419 UInt32 => Value::U32(
420 array
421 .as_any()
422 .downcast_ref::<UInt32Array>()
423 .unwrap()
424 .value(row),
425 ),
426 UInt64 => Value::U64(
427 array
428 .as_any()
429 .downcast_ref::<UInt64Array>()
430 .unwrap()
431 .value(row),
432 ),
433 Float32 => Value::F32(
434 array
435 .as_any()
436 .downcast_ref::<Float32Array>()
437 .unwrap()
438 .value(row),
439 ),
440 Float64 => Value::F64(
441 array
442 .as_any()
443 .downcast_ref::<Float64Array>()
444 .unwrap()
445 .value(row),
446 ),
447 Utf8 => Value::String(
448 array
449 .as_any()
450 .downcast_ref::<StringArray>()
451 .unwrap()
452 .value(row)
453 .to_owned(),
454 ),
455 LargeUtf8 => Value::String(
456 array
457 .as_any()
458 .downcast_ref::<LargeStringArray>()
459 .unwrap()
460 .value(row)
461 .to_owned(),
462 ),
463 Binary | LargeBinary => Value::Binary(
464 array
465 .as_any()
466 .downcast_ref::<BinaryArray>()
467 .unwrap()
468 .value(row)
469 .to_owned(),
470 ),
471 Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Value::Timestamp(
472 array
473 .as_any()
474 .downcast_ref::<TimestampNanosecondArray>()
475 .unwrap()
476 .value(row),
477 ),
478 Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Value::Timestamp(
479 array
480 .as_any()
481 .downcast_ref::<TimestampMicrosecondArray>()
482 .unwrap()
483 .value(row)
484 * 1_000,
485 ),
486 Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Value::Timestamp(
487 array
488 .as_any()
489 .downcast_ref::<TimestampMillisecondArray>()
490 .unwrap()
491 .value(row)
492 * 1_000_000,
493 ),
494 Timestamp(arrow_schema::TimeUnit::Second, _) => Value::Timestamp(
495 array
496 .as_any()
497 .downcast_ref::<TimestampSecondArray>()
498 .unwrap()
499 .value(row)
500 * 1_000_000_000,
501 ),
502 Dictionary(key_type, _) => {
507 macro_rules! resolve {
508 ($t:ty) => {{
509 let dict = array
510 .as_any()
511 .downcast_ref::<DictionaryArray<$t>>()
512 .unwrap();
513 let key = dict.keys().value(row) as usize;
514 extract_value(dict.values().as_ref(), key)
515 }};
516 }
517 match key_type.as_ref() {
518 Int8 => resolve!(Int8Type),
519 Int16 => resolve!(Int16Type),
520 Int32 => resolve!(Int32Type),
521 Int64 => resolve!(Int64Type),
522 UInt8 => resolve!(UInt8Type),
523 UInt16 => resolve!(UInt16Type),
524 UInt32 => resolve!(UInt32Type),
525 UInt64 => resolve!(UInt64Type),
526 _ => Value::Null,
527 }
528 }
529 Decimal128(_, _) => Value::String(
532 array
533 .as_any()
534 .downcast_ref::<Decimal128Array>()
535 .unwrap()
536 .value_as_string(row),
537 ),
538 Decimal256(_, _) => Value::String(
539 array
540 .as_any()
541 .downcast_ref::<Decimal256Array>()
542 .unwrap()
543 .value_as_string(row),
544 ),
545 _other => Value::Null,
546 }
547}