Skip to main content

influxdb3_client/
query.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::ops::Index;
4use std::sync::Arc;
5
6use arrow_array::array::{
7    Array, BinaryArray, BooleanArray, Decimal128Array, Decimal256Array, DictionaryArray,
8    Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray,
9    StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
10    TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
11};
12use arrow_array::types::{
13    Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
14};
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17
18use crate::error::Error;
19
20/// Selects the query language used for a query operation.
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
22pub enum QueryType {
23    /// Standard SQL (default)
24    #[default]
25    Sql,
26    /// InfluxQL, the InfluxDB 1.x query language
27    InfluxQL,
28}
29
30impl QueryType {
31    pub fn as_str(self) -> &'static str {
32        match self {
33            QueryType::Sql => "sql",
34            QueryType::InfluxQL => "influxql",
35        }
36    }
37}
38
39/// Named query parameters for parameterised SQL / InfluxQL statements.
40///
41/// Prefer chaining `.param("k", v)` on [`crate::QueryRequest`]; use this type
42/// directly when you need to assemble parameters dynamically.
43pub type QueryParameters = HashMap<String, serde_json::Value>;
44
45/// Options controlling a single query operation.
46#[derive(Debug, Clone, Default)]
47pub struct QueryOptions {
48    pub(crate) query_type: QueryType,
49    /// Extra gRPC metadata headers sent with the Flight DoGet request.
50    pub headers: HashMap<String, String>,
51}
52
53/// A dynamically typed value extracted from a query result row.
54#[derive(Debug, Clone, PartialEq)]
55pub enum Value {
56    Bool(bool),
57    I8(i8),
58    I16(i16),
59    I32(i32),
60    I64(i64),
61    U8(u8),
62    U16(u16),
63    U32(u32),
64    U64(u64),
65    F32(f32),
66    F64(f64),
67    String(String),
68    Binary(Vec<u8>),
69    /// Nanosecond-epoch timestamp
70    Timestamp(i64),
71    Null,
72}
73
74impl Value {
75    pub fn as_f64(&self) -> Option<f64> {
76        match self {
77            Value::F64(v) => Some(*v),
78            Value::F32(v) => Some(*v as f64),
79            Value::I64(v) => Some(*v as f64),
80            Value::I32(v) => Some(*v as f64),
81            Value::U64(v) => Some(*v as f64),
82            Value::U32(v) => Some(*v as f64),
83            _ => None,
84        }
85    }
86
87    pub fn as_i64(&self) -> Option<i64> {
88        match self {
89            Value::I64(v) => Some(*v),
90            Value::I32(v) => Some(*v as i64),
91            Value::I16(v) => Some(*v as i64),
92            Value::I8(v) => Some(*v as i64),
93            Value::Timestamp(v) => Some(*v),
94            _ => None,
95        }
96    }
97
98    pub fn as_str(&self) -> Option<&str> {
99        match self {
100            Value::String(s) => Some(s.as_str()),
101            _ => None,
102        }
103    }
104
105    pub fn as_bool(&self) -> Option<bool> {
106        match self {
107            Value::Bool(b) => Some(*b),
108            _ => None,
109        }
110    }
111
112    pub fn is_null(&self) -> bool {
113        matches!(self, Value::Null)
114    }
115}
116
117impl fmt::Display for Value {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        match self {
120            Value::Bool(v) => write!(f, "{v}"),
121            Value::I8(v) => write!(f, "{v}"),
122            Value::I16(v) => write!(f, "{v}"),
123            Value::I32(v) => write!(f, "{v}"),
124            Value::I64(v) => write!(f, "{v}"),
125            Value::U8(v) => write!(f, "{v}"),
126            Value::U16(v) => write!(f, "{v}"),
127            Value::U32(v) => write!(f, "{v}"),
128            Value::U64(v) => write!(f, "{v}"),
129            Value::F32(v) => write!(f, "{v}"),
130            Value::F64(v) => write!(f, "{v}"),
131            Value::String(v) => f.write_str(v),
132            Value::Binary(v) => write!(f, "{}b", v.len()),
133            Value::Timestamp(v) => write!(f, "{v}"),
134            Value::Null => f.write_str("null"),
135        }
136    }
137}
138
139/// A single row from a query result.
140///
141/// Holds the raw `Vec<Value>` (one slot per column) and a shared index mapping
142/// column names to slot positions.  Lookup by name is O(1) via the shared
143/// `Arc<HashMap>`, so iteration allocates no per-row map.
144#[derive(Debug, Clone)]
145pub struct Row {
146    values: Vec<Value>,
147    columns: Arc<Vec<String>>,
148    index: Arc<HashMap<String, usize>>,
149}
150
151impl Row {
152    /// Look up a value by column name.
153    pub fn get(&self, name: &str) -> Option<&Value> {
154        self.index.get(name).and_then(|&i| self.values.get(i))
155    }
156
157    /// Look up a value by column position.
158    pub fn at(&self, idx: usize) -> Option<&Value> {
159        self.values.get(idx)
160    }
161
162    /// All column names, in schema order.
163    pub fn columns(&self) -> &[String] {
164        &self.columns
165    }
166
167    /// All values, in schema order.
168    pub fn values(&self) -> &[Value] {
169        &self.values
170    }
171
172    /// Number of columns in this row.
173    pub fn len(&self) -> usize {
174        self.values.len()
175    }
176
177    pub fn is_empty(&self) -> bool {
178        self.values.is_empty()
179    }
180
181    /// Convert to a `HashMap<String, Value>` for callers that prefer map-shaped
182    /// rows.  Allocates one HashMap and clones every column name.
183    pub fn into_map(self) -> HashMap<String, Value> {
184        self.columns.iter().cloned().zip(self.values).collect()
185    }
186}
187
188impl Index<&str> for Row {
189    type Output = Value;
190    fn index(&self, name: &str) -> &Value {
191        self.get(name)
192            .unwrap_or_else(|| panic!("no column named '{name}'"))
193    }
194}
195
196impl Index<usize> for Row {
197    type Output = Value;
198    fn index(&self, idx: usize) -> &Value {
199        &self.values[idx]
200    }
201}
202
203/// The complete result of a query: a collection of Arrow [`RecordBatch`]es.
204///
205/// Use `for row in result` (yields [`Row`]) for row-oriented access, or
206/// [`QueryResult::record_batches()`] for direct Arrow access.
207pub struct QueryResult {
208    pub(crate) schema: SchemaRef,
209    pub(crate) batches: Vec<RecordBatch>,
210}
211
212impl QueryResult {
213    pub fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
214        QueryResult { schema, batches }
215    }
216
217    pub fn schema(&self) -> &SchemaRef {
218        &self.schema
219    }
220
221    /// The underlying Arrow record batches (zero-copy).
222    pub fn record_batches(&self) -> &[RecordBatch] {
223        &self.batches
224    }
225
226    /// Total number of rows across all batches.
227    pub fn num_rows(&self) -> usize {
228        self.batches.iter().map(|b| b.num_rows()).sum()
229    }
230
231    /// Column names in schema order.
232    pub fn column_names(&self) -> Vec<&str> {
233        self.schema
234            .fields()
235            .iter()
236            .map(|f| f.name().as_str())
237            .collect()
238    }
239
240    /// Collect all rows into a `Vec<Row>`.
241    pub fn rows(self) -> Result<Vec<Row>, Error> {
242        self.into_iter().collect()
243    }
244
245    /// Convert the query result to a polars [`DataFrame`].
246    ///
247    /// Requires the `polars` Cargo feature.
248    ///
249    /// Note: this serialises the batches to Arrow IPC and reads them back
250    /// through polars, so it transiently holds roughly twice the result in
251    /// memory. For very large results, prefer streaming the
252    /// [`RecordBatch`]es via [`crate::Client::sql`]`(..).stream()` and
253    /// converting incrementally.
254    #[cfg(feature = "polars")]
255    pub fn to_polars(self) -> crate::Result<polars::prelude::DataFrame> {
256        use arrow::ipc::writer::FileWriter;
257        use polars::io::SerReader;
258        use polars::prelude::IpcReader;
259        use std::io::Cursor;
260
261        let mut buf: Vec<u8> = Vec::new();
262        {
263            let mut writer = FileWriter::try_new(&mut buf, &self.schema)?;
264            for batch in &self.batches {
265                writer.write(batch)?;
266            }
267            writer.finish()?;
268        }
269
270        let cursor = Cursor::new(buf);
271        IpcReader::new(cursor)
272            .finish()
273            .map_err(|e| crate::error::Error::Config(format!("polars conversion error: {e}")))
274    }
275}
276
277impl IntoIterator for QueryResult {
278    type Item = Result<Row, Error>;
279    type IntoIter = QueryIterator;
280
281    fn into_iter(self) -> Self::IntoIter {
282        QueryIterator::new(self.schema, self.batches)
283    }
284}
285
286/// Row-by-row iterator over a [`QueryResult`].
287///
288/// Holds the column-name index in an `Arc` so each yielded [`Row`] can share
289/// the same name-to-position map, so there is no per-row HashMap allocation.
290pub struct QueryIterator {
291    schema: SchemaRef,
292    batches: Vec<RecordBatch>,
293    batch_idx: usize,
294    row_idx: usize,
295    columns: Arc<Vec<String>>,
296    index: Arc<HashMap<String, usize>>,
297}
298
299impl QueryIterator {
300    pub(crate) fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
301        let columns: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
302        let index: HashMap<String, usize> = columns
303            .iter()
304            .enumerate()
305            .map(|(i, n)| (n.clone(), i))
306            .collect();
307        QueryIterator {
308            schema,
309            batches,
310            batch_idx: 0,
311            row_idx: 0,
312            columns: Arc::new(columns),
313            index: Arc::new(index),
314        }
315    }
316
317    /// The column names, in schema order.
318    pub fn column_names(&self) -> &[String] {
319        &self.columns
320    }
321
322    /// Total number of rows across all batches.
323    pub fn num_rows(&self) -> usize {
324        self.batches.iter().map(|b| b.num_rows()).sum()
325    }
326}
327
328impl Iterator for QueryIterator {
329    type Item = Result<Row, Error>;
330
331    fn next(&mut self) -> Option<Self::Item> {
332        while self.batch_idx < self.batches.len()
333            && self.row_idx >= self.batches[self.batch_idx].num_rows()
334        {
335            self.batch_idx += 1;
336            self.row_idx = 0;
337        }
338
339        if self.batch_idx >= self.batches.len() {
340            return None;
341        }
342
343        let batch = &self.batches[self.batch_idx];
344        let row = self.row_idx;
345        self.row_idx += 1;
346
347        let mut values = Vec::with_capacity(batch.num_columns());
348        for col_idx in 0..self.schema.fields().len() {
349            let col = batch.column(col_idx);
350            values.push(extract_value(col.as_ref(), row));
351        }
352
353        Some(Ok(Row {
354            values,
355            columns: Arc::clone(&self.columns),
356            index: Arc::clone(&self.index),
357        }))
358    }
359}
360
361/// Extract a single row value from an Arrow array column.
362pub fn extract_value(array: &dyn Array, row: usize) -> Value {
363    use arrow_schema::DataType::*;
364
365    if array.is_null(row) {
366        return Value::Null;
367    }
368
369    match array.data_type() {
370        Boolean => Value::Bool(
371            array
372                .as_any()
373                .downcast_ref::<BooleanArray>()
374                .unwrap()
375                .value(row),
376        ),
377        Int8 => Value::I8(
378            array
379                .as_any()
380                .downcast_ref::<Int8Array>()
381                .unwrap()
382                .value(row),
383        ),
384        Int16 => Value::I16(
385            array
386                .as_any()
387                .downcast_ref::<Int16Array>()
388                .unwrap()
389                .value(row),
390        ),
391        Int32 => Value::I32(
392            array
393                .as_any()
394                .downcast_ref::<Int32Array>()
395                .unwrap()
396                .value(row),
397        ),
398        Int64 => Value::I64(
399            array
400                .as_any()
401                .downcast_ref::<Int64Array>()
402                .unwrap()
403                .value(row),
404        ),
405        UInt8 => Value::U8(
406            array
407                .as_any()
408                .downcast_ref::<UInt8Array>()
409                .unwrap()
410                .value(row),
411        ),
412        UInt16 => Value::U16(
413            array
414                .as_any()
415                .downcast_ref::<UInt16Array>()
416                .unwrap()
417                .value(row),
418        ),
419        UInt32 => Value::U32(
420            array
421                .as_any()
422                .downcast_ref::<UInt32Array>()
423                .unwrap()
424                .value(row),
425        ),
426        UInt64 => Value::U64(
427            array
428                .as_any()
429                .downcast_ref::<UInt64Array>()
430                .unwrap()
431                .value(row),
432        ),
433        Float32 => Value::F32(
434            array
435                .as_any()
436                .downcast_ref::<Float32Array>()
437                .unwrap()
438                .value(row),
439        ),
440        Float64 => Value::F64(
441            array
442                .as_any()
443                .downcast_ref::<Float64Array>()
444                .unwrap()
445                .value(row),
446        ),
447        Utf8 => Value::String(
448            array
449                .as_any()
450                .downcast_ref::<StringArray>()
451                .unwrap()
452                .value(row)
453                .to_owned(),
454        ),
455        LargeUtf8 => Value::String(
456            array
457                .as_any()
458                .downcast_ref::<LargeStringArray>()
459                .unwrap()
460                .value(row)
461                .to_owned(),
462        ),
463        Binary | LargeBinary => Value::Binary(
464            array
465                .as_any()
466                .downcast_ref::<BinaryArray>()
467                .unwrap()
468                .value(row)
469                .to_owned(),
470        ),
471        Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Value::Timestamp(
472            array
473                .as_any()
474                .downcast_ref::<TimestampNanosecondArray>()
475                .unwrap()
476                .value(row),
477        ),
478        Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Value::Timestamp(
479            array
480                .as_any()
481                .downcast_ref::<TimestampMicrosecondArray>()
482                .unwrap()
483                .value(row)
484                * 1_000,
485        ),
486        Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Value::Timestamp(
487            array
488                .as_any()
489                .downcast_ref::<TimestampMillisecondArray>()
490                .unwrap()
491                .value(row)
492                * 1_000_000,
493        ),
494        Timestamp(arrow_schema::TimeUnit::Second, _) => Value::Timestamp(
495            array
496                .as_any()
497                .downcast_ref::<TimestampSecondArray>()
498                .unwrap()
499                .value(row)
500                * 1_000_000_000,
501        ),
502        // Dictionary-encoded columns: InfluxDB 3 returns tag columns as
503        // Dictionary(Int32, Utf8).  Resolve the key for this row and recurse
504        // into the values array, so the actual tag value is returned rather
505        // than a debug dump of the column.
506        Dictionary(key_type, _) => {
507            macro_rules! resolve {
508                ($t:ty) => {{
509                    let dict = array
510                        .as_any()
511                        .downcast_ref::<DictionaryArray<$t>>()
512                        .unwrap();
513                    let key = dict.keys().value(row) as usize;
514                    extract_value(dict.values().as_ref(), key)
515                }};
516            }
517            match key_type.as_ref() {
518                Int8 => resolve!(Int8Type),
519                Int16 => resolve!(Int16Type),
520                Int32 => resolve!(Int32Type),
521                Int64 => resolve!(Int64Type),
522                UInt8 => resolve!(UInt8Type),
523                UInt16 => resolve!(UInt16Type),
524                UInt32 => resolve!(UInt32Type),
525                UInt64 => resolve!(UInt64Type),
526                _ => Value::Null,
527            }
528        }
529        // Decimals carry a scale that doesn't map onto an f64/i64 cleanly;
530        // render them as their exact decimal string.
531        Decimal128(_, _) => Value::String(
532            array
533                .as_any()
534                .downcast_ref::<Decimal128Array>()
535                .unwrap()
536                .value_as_string(row),
537        ),
538        Decimal256(_, _) => Value::String(
539            array
540                .as_any()
541                .downcast_ref::<Decimal256Array>()
542                .unwrap()
543                .value_as_string(row),
544        ),
545        _other => Value::Null,
546    }
547}