influxdb2/api/
query.rs

1//! Query
2//!
3//! Query InfluxDB using InfluxQL or Flux Query
4
5use std::collections::{BTreeMap, HashMap, HashSet};
6use std::str::FromStr;
7
8use crate::{Client, Http, RequestError, ReqwestProcessing, Serializing};
9
10use base64::decode;
11use chrono::DateTime;
12use csv::StringRecord;
13use fallible_iterator::FallibleIterator;
14use go_parse_duration::parse_duration;
15use influxdb2_structmap::value::Value;
16use influxdb2_structmap::{FromMap, GenericMap};
17use ordered_float::OrderedFloat;
18use reqwest::{Method, StatusCode};
19use snafu::ResultExt;
20
21use crate::models::{
22    AnalyzeQueryResponse, AstResponse, FluxSuggestion, FluxSuggestions, LanguageRequest, Query,
23};
24
25/// Allows for multiple iterators over the result
26#[derive(Clone, Debug)]
27pub struct QueryTableIter {
28    text: String,
29}
30
31impl<'a> QueryTableIter {
32    fn new(text: String) -> Self {
33        Self { text }
34    }
35
36    /// Get the iterator
37    pub fn result(&'a self) -> impl FallibleIterator<Item = FluxRecord, Error = RequestError> + 'a {
38        QueryTableResult::new(&self.text)
39    }
40
41    /// Is the response empty?
42    pub fn is_empty(&self) -> bool {
43        matches!(QueryTableResult::new(&self.text).next(), Ok(None))
44    }
45}
46
47impl Client {
48    /// Get Query Suggestions
49    pub async fn query_suggestions(&self) -> Result<FluxSuggestions, RequestError> {
50        let req_url = self.url("/api/v2/query/suggestions");
51        let response = self
52            .request(Method::GET, &req_url)
53            .send()
54            .await
55            .context(ReqwestProcessing)?;
56
57        match response.status() {
58            StatusCode::OK => Ok(response
59                .json::<FluxSuggestions>()
60                .await
61                .context(ReqwestProcessing)?),
62            status => {
63                let text = response.text().await.context(ReqwestProcessing)?;
64                Http { status, text }.fail()?
65            }
66        }
67    }
68
69    /// Query Suggestions with name
70    pub async fn query_suggestions_name(&self, name: &str) -> Result<FluxSuggestion, RequestError> {
71        let req_url = self.url(&format!(
72            "/api/v2/query/suggestions/{name}",
73            name = crate::common::urlencode(name)
74        ));
75
76        let response = self
77            .request(Method::GET, &req_url)
78            .send()
79            .await
80            .context(ReqwestProcessing)?;
81
82        match response.status() {
83            StatusCode::OK => Ok(response
84                .json::<FluxSuggestion>()
85                .await
86                .context(ReqwestProcessing)?),
87            status => {
88                let text = response.text().await.context(ReqwestProcessing)?;
89                Http { status, text }.fail()?
90            }
91        }
92    }
93
94    /// Query
95    pub async fn query<T: FromMap>(&self, query: Option<Query>) -> Result<Vec<T>, RequestError> {
96        let req_url = self.url("/api/v2/query");
97        let body = serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?;
98
99        let response = self
100            .request(Method::POST, &req_url)
101            .header("Accepting-Encoding", "identity")
102            .header("Content-Type", "application/json")
103            .query(&[("org", &self.org)])
104            .body(body)
105            .send()
106            .await
107            .context(ReqwestProcessing)?;
108
109        match response.status() {
110            StatusCode::OK => {
111                let text = response.text().await.unwrap();
112                let qtr = QueryTableResult::new(&text[..]);
113                let qr = QueryResult::new(qtr)?;
114                let mut res = vec![];
115                for item in qr.items {
116                    res.push(T::from_genericmap(item));
117                }
118                Ok(res)
119            }
120            status => {
121                let text = response.text().await.context(ReqwestProcessing)?;
122                Http { status, text }.fail()?
123            }
124        }
125    }
126
127    /// Query Raw
128    pub async fn query_raw(&self, query: Option<Query>) -> Result<Vec<FluxRecord>, RequestError> {
129        let req_url = self.url("/api/v2/query");
130        let body = serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?;
131
132        let response = self
133            .request(Method::POST, &req_url)
134            .header("Accepting-Encoding", "identity")
135            .header("Content-Type", "application/json")
136            .query(&[("org", &self.org)])
137            .body(body)
138            .send()
139            .await
140            .context(ReqwestProcessing)?;
141
142        match response.status() {
143            StatusCode::OK => {
144                let text = response.text().await.unwrap();
145                let qtr = QueryTableResult::new(&text[..]);
146                let mut records = vec![];
147                for record in qtr.iterator() {
148                    records.push(record?);
149                }
150                Ok(records)
151            }
152            status => {
153                let text = response.text().await.context(ReqwestProcessing)?;
154                Http { status, text }.fail()?
155            }
156        }
157    }
158
159    /// Query return an iterator over the raw results
160    /// Saves on memory usage
161    pub async fn query_raw_iter(
162        &self,
163        query: Option<Query>,
164    ) -> Result<QueryTableIter, RequestError> {
165        let req_url = self.url("/api/v2/query");
166        let body = serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?;
167
168        let response = self
169            .request(Method::POST, &req_url)
170            .header("Accepting-Encoding", "identity")
171            .header("Content-Type", "application/json")
172            .query(&[("org", &self.org)])
173            .body(body)
174            .send()
175            .await
176            .context(ReqwestProcessing)?;
177
178        match response.status() {
179            StatusCode::OK => {
180                let text = response.text().await.context(ReqwestProcessing)?;
181
182                Ok(QueryTableIter::new(text))
183            }
184            status => {
185                let text = response.text().await.context(ReqwestProcessing)?;
186                Http { status, text }.fail()?
187            }
188        }
189    }
190
191    /// Analyze Query
192    pub async fn query_analyze(
193        &self,
194        query: Option<Query>,
195    ) -> Result<AnalyzeQueryResponse, RequestError> {
196        let req_url = self.url("/api/v2/query/analyze");
197
198        let response = self
199            .request(Method::POST, &req_url)
200            .header("Content-Type", "application/json")
201            .body(serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?)
202            .send()
203            .await
204            .context(ReqwestProcessing)?;
205
206        match response.status() {
207            StatusCode::OK => Ok(response
208                .json::<AnalyzeQueryResponse>()
209                .await
210                .context(ReqwestProcessing)?),
211            status => {
212                let text = response.text().await.context(ReqwestProcessing)?;
213                Http { status, text }.fail()?
214            }
215        }
216    }
217
218    /// Get Query AST Response
219    pub async fn query_ast(
220        &self,
221        language_request: Option<LanguageRequest>,
222    ) -> Result<AstResponse, RequestError> {
223        let req_url = self.url("/api/v2/query/ast");
224
225        let response = self
226            .request(Method::POST, &req_url)
227            .header("Content-Type", "application/json")
228            .body(
229                serde_json::to_string(&language_request.unwrap_or_default())
230                    .context(Serializing)?,
231            )
232            .send()
233            .await
234            .context(ReqwestProcessing)?;
235
236        match response.status() {
237            StatusCode::OK => Ok(response
238                .json::<AstResponse>()
239                .await
240                .context(ReqwestProcessing)?),
241            status => {
242                let text = response.text().await.context(ReqwestProcessing)?;
243                Http { status, text }.fail()?
244            }
245        }
246    }
247
248    /// Returns bucket measurements
249    ///
250    /// # Arguments
251    ///
252    /// * `bucket` - The bucket name
253    /// * `start` - Optional start time. Default is `-30d`
254    /// * `stop` - Optional stop time. Default is `now()`
255    pub async fn list_measurements(
256        &self,
257        bucket: &str,
258        start: Option<&str>,
259        stop: Option<&str>,
260    ) -> Result<Vec<String>, RequestError> {
261        let mut params = vec![];
262        params.push(format!(r#"bucket: "{bucket}""#));
263        if let Some(start) = start {
264            params.push(format!("start: {start}"));
265        }
266        if let Some(stop) = stop {
267            params.push(format!("stop: {stop}"));
268        }
269        let params = params.join(", ");
270
271        let query = Query::new(format!(
272            r#"import "influxdata/influxdb/schema"
273
274            schema.measurements({params})"#
275        ));
276        self.exec_schema_query(query).await
277    }
278
279    /// List field keys for measurement
280    ///
281    /// # Arguments
282    ///
283    /// * `bucket` - The bucket name
284    /// * `measurement` - The measurement name
285    /// * `start` - Optional start time. Default is `-30d`
286    /// * `stop` - Optional stop time. Default is `now()`
287    pub async fn list_measurement_field_keys(
288        &self,
289        bucket: &str,
290        measurement: &str,
291        start: Option<&str>,
292        stop: Option<&str>,
293    ) -> Result<Vec<String>, RequestError> {
294        let mut params = vec![];
295        params.push(format!(r#"bucket: "{bucket}""#));
296        params.push(format!(r#"measurement: "{measurement}""#));
297        if let Some(start) = start {
298            params.push(format!("start: {start}"));
299        }
300        if let Some(stop) = stop {
301            params.push(format!("stop: {stop}"));
302        }
303        let params = params.join(", ");
304
305        let query = Query::new(format!(
306            r#"import "influxdata/influxdb/schema"
307
308            schema.measurementFieldKeys({params})"#,
309        ));
310        self.exec_schema_query(query).await
311    }
312
313    /// List all tag values for measurement tag
314    ///
315    /// # Arguments
316    ///
317    /// * `bucket` - The bucket name
318    /// * `measurement` - The measurement name
319    /// * `tag` - The tag name
320    /// * `start` - Optional start time. Default is `-30d`
321    /// * `stop` - Optional stop time. Default is `now()`
322    pub async fn list_measurement_tag_values(
323        &self,
324        bucket: &str,
325        measurement: &str,
326        tag: &str,
327        start: Option<&str>,
328        stop: Option<&str>,
329    ) -> Result<Vec<String>, RequestError> {
330        let mut params = vec![];
331        params.push(format!(r#"bucket: "{bucket}""#));
332        params.push(format!(r#"measurement: "{measurement}""#));
333        params.push(format!(r#"tag: "{tag}""#));
334        if let Some(start) = start {
335            params.push(format!("start: {start}"));
336        }
337        if let Some(stop) = stop {
338            params.push(format!("stop: {stop}"));
339        }
340        let params = params.join(", ");
341
342        let query = Query::new(format!(
343            r#"import "influxdata/influxdb/schema"
344
345            schema.measurementTagValues({params})"#,
346        ));
347        self.exec_schema_query(query).await
348    }
349
350    /// List all tag keys for measurement
351    ///
352    /// # Arguments
353    ///
354    /// * `bucket` - The bucket name
355    /// * `measurement` - The measurement name
356    /// * `start` - Optional start time. Default is `-30d`
357    /// * `stop` - Optional stop time. Default is `now()`
358    pub async fn list_measurement_tag_keys(
359        &self,
360        bucket: &str,
361        measurement: &str,
362        start: Option<&str>,
363        stop: Option<&str>,
364    ) -> Result<Vec<String>, RequestError> {
365        let mut params = vec![];
366        params.push(format!(r#"bucket: "{bucket}""#));
367        params.push(format!(r#"measurement: "{measurement}""#));
368        if let Some(start) = start {
369            params.push(format!("start: {start}"));
370        }
371        if let Some(stop) = stop {
372            params.push(format!("stop: {stop}"));
373        }
374        let params = params.join(", ");
375
376        let query = Query::new(format!(
377            r#"import "influxdata/influxdb/schema"
378
379            schema.measurementTagKeys({params})"#,
380        ));
381        self.exec_schema_query(query).await
382    }
383
384    async fn exec_schema_query(&self, query: Query) -> Result<Vec<String>, RequestError> {
385        let req_url = self.url("/api/v2/query");
386        let body = serde_json::to_string(&query).context(Serializing)?;
387
388        let response = self
389            .request(Method::POST, &req_url)
390            .header("Accepting-Encoding", "identity")
391            .header("Content-Type", "application/json")
392            .query(&[("org", &self.org)])
393            .body(body)
394            .send()
395            .await
396            .context(ReqwestProcessing)?;
397
398        match response.status() {
399            StatusCode::OK => {
400                let text: String = response.text().await.unwrap();
401
402                let mut reader = csv::ReaderBuilder::new()
403                    .has_headers(true)
404                    .comment(Some(b'#'))
405                    .from_reader(text.as_bytes());
406
407                Ok(reader
408                    .records()
409                    .flatten()
410                    .flat_map(|r: StringRecord| r.get(3).map(|s| s.to_owned()))
411                    .collect())
412            }
413            status => {
414                let text = response.text().await.context(ReqwestProcessing)?;
415                Http { status, text }.fail()?
416            }
417        }
418    }
419}
420
421#[derive(Clone, Copy, Debug, PartialEq)]
422enum DataType {
423    String,
424    Double,
425    Bool,
426    Long,
427    UnsignedLong,
428    Duration,
429    Base64Binary,
430    TimeRFC,
431}
432
433impl FromStr for DataType {
434    type Err = RequestError;
435
436    fn from_str(input: &str) -> Result<Self, RequestError> {
437        match input {
438            "string" => Ok(Self::String),
439            "double" => Ok(Self::Double),
440            "boolean" => Ok(Self::Bool),
441            "long" => Ok(Self::Long),
442            "unsignedLong" => Ok(Self::UnsignedLong),
443            "duration" => Ok(Self::Duration),
444            "base64Binary" => Ok(Self::Base64Binary),
445            "dateTime:RFC3339" => Ok(Self::TimeRFC),
446            "dateTime:RFC3339Nano" => Ok(Self::TimeRFC),
447            _ => Err(RequestError::Deserializing {
448                text: format!("unknown datatype: {}", input),
449            }),
450        }
451    }
452}
453
454struct FluxColumn {
455    name: String,
456    data_type: DataType,
457    group: bool,
458    default_value: String,
459}
460
461/// Represents a flux record returned from a query.
462#[derive(Clone, Debug, PartialEq)]
463pub struct FluxRecord {
464    /// Table id
465    pub table: i32,
466    /// Map of key/value pairs
467    pub values: GenericMap,
468}
469
470struct FluxTableMetadata {
471    position: i32,
472    columns: Vec<FluxColumn>,
473}
474
475struct QueryTableResult<'a> {
476    csv_reader: csv::Reader<&'a [u8]>,
477    table_position: i32,
478    table_changed: bool,
479    table: Option<FluxTableMetadata>,
480}
481
482#[derive(PartialEq)]
483enum ParsingState {
484    Normal,
485    Annotation,
486    Error,
487}
488
489impl<'a> QueryTableResult<'a> {
490    fn new(text: &'a str) -> Self {
491        let reader = csv::ReaderBuilder::new()
492            .has_headers(false)
493            .flexible(true)
494            .from_reader(text.as_bytes());
495        Self {
496            csv_reader: reader,
497            table_position: 0,
498            table_changed: false,
499            table: None,
500        }
501    }
502}
503
504impl<'a> FallibleIterator for QueryTableResult<'a> {
505    type Item = FluxRecord;
506    type Error = RequestError;
507
508    fn next(&mut self) -> Result<Option<FluxRecord>, RequestError> {
509        // Hold the FluxRecord to be returned.
510        let record: FluxRecord;
511
512        self.table_changed = false;
513        let mut row = StringRecord::new();
514        let mut parsing_state = ParsingState::Normal;
515        let mut data_type_annotation_found = false;
516        loop {
517            if !self.csv_reader.read_record(&mut row).unwrap() {
518                // EOF
519                return Ok(None);
520            }
521            if row.len() <= 1 {
522                continue;
523            }
524            if let Some(s) = row.get(0) {
525                if !s.is_empty() && s.chars().nth(0).unwrap() == '#' {
526                    // Finding new table, prepare for annotation parsing
527                    if parsing_state == ParsingState::Normal {
528                        self.table = Some(FluxTableMetadata {
529                            position: self.table_position,
530                            columns: Vec::new(),
531                        });
532                        self.table_position += 1;
533                        self.table_changed = true;
534                        for _ in 1..row.len() {
535                            self.table.as_mut().unwrap().columns.push(FluxColumn {
536                                name: String::from(""),
537                                data_type: DataType::String,
538                                group: false,
539                                default_value: String::from(""),
540                            });
541                        }
542                        parsing_state = ParsingState::Annotation;
543                    }
544                }
545            }
546            if self.table.is_none() {
547                return Err(RequestError::Deserializing {
548                    text: String::from("annotations not found"),
549                });
550            }
551            if row.len() - 1 != self.table.as_ref().unwrap().columns.len() {
552                return Err(RequestError::Deserializing {
553                    text: format!(
554                        "row has different number of columns than the table: {} vs {}",
555                        row.len() - 1,
556                        self.table.as_ref().unwrap().columns.len(),
557                    ),
558                });
559            }
560            if let Some(s) = row.get(0) {
561                match s {
562                    "" => {
563                        match parsing_state {
564                            ParsingState::Annotation => {
565                                // Parse column name (csv header)
566                                if !data_type_annotation_found {
567                                    return Err(RequestError::Deserializing {
568                                        text: String::from("datatype annotation not found"),
569                                    });
570                                }
571                                if row.get(1).unwrap() == "error" {
572                                    parsing_state = ParsingState::Error;
573                                } else {
574                                    for i in 1..row.len() {
575                                        let column =
576                                            &mut self.table.as_mut().unwrap().columns[i - 1];
577                                        column.name = String::from(row.get(i).unwrap());
578                                    }
579                                    parsing_state = ParsingState::Normal;
580                                }
581                                continue;
582                            }
583                            ParsingState::Error => {
584                                let msg = if row.len() > 1 && !row.get(1).unwrap().is_empty() {
585                                    row.get(1).unwrap()
586                                } else {
587                                    "unknown query error"
588                                };
589                                let mut reference = String::from("");
590                                if row.len() > 2 && !row.get(2).unwrap().is_empty() {
591                                    let s = row.get(2).unwrap();
592                                    reference = format!(",{}", s);
593                                }
594                                return Err(RequestError::Deserializing {
595                                    text: format!("{}{}", msg, reference),
596                                });
597                            }
598                            _ => {}
599                        }
600                        let mut values = BTreeMap::new();
601                        for i in 1..row.len() {
602                            let column = &self.table.as_mut().unwrap().columns[i - 1];
603                            let mut v = row.get(i).unwrap();
604                            if v.is_empty() {
605                                v = &column.default_value[..];
606                            }
607                            let value = parse_value(v, column.data_type, column.name.as_str())?;
608                            values.entry(column.name.clone()).or_insert(value);
609                        }
610                        record = FluxRecord {
611                            table: self.table.as_ref().unwrap().position,
612                            values,
613                        };
614                        break;
615                    }
616                    "#datatype" => {
617                        data_type_annotation_found = true;
618                        for i in 1..row.len() {
619                            let column = &mut self.table.as_mut().unwrap().columns[i - 1];
620                            let dt = DataType::from_str(row.get(i).unwrap())?;
621                            column.data_type = dt;
622                        }
623                    }
624                    "#group" => {
625                        for i in 1..row.len() {
626                            let column = &mut self.table.as_mut().unwrap().columns[i - 1];
627                            column.group = row.get(i).unwrap() == "true";
628                        }
629                    }
630                    "#default" => {
631                        for i in 1..row.len() {
632                            let column = &mut self.table.as_mut().unwrap().columns[i - 1];
633                            column.default_value = String::from(row.get(i).unwrap());
634                        }
635                    }
636                    _ => {
637                        return Err(RequestError::Deserializing {
638                            text: format!("invalid first cell: {}", s),
639                        });
640                    }
641                }
642            }
643        }
644        Ok(Some(record))
645    }
646}
647
648struct QueryResult {
649    items: Vec<GenericMap>,
650}
651
652impl QueryResult {
653    fn new(qtr: QueryTableResult<'_>) -> Result<Self, RequestError> {
654        let ignored_keys = vec!["_field", "_value", "table"];
655        let ignored_keys: HashSet<&str> = ignored_keys.into_iter().collect();
656
657        // Construct build table, this groups values with the same tags and
658        // timestamp but in different table.
659        //
660        // We need to do this because influxdb v2 stores multiple fields in
661        // different tables even though it's part of the same measurement.
662        let mut build_table = HashMap::<GenericMap, GenericMap>::new();
663        let mut key_order: Vec<GenericMap> = vec![];
664        for record in qtr.iterator() {
665            let mut record_values = record?.values;
666
667            // Construct key
668            let mut key = record_values.clone();
669            key.retain(|k, _| !ignored_keys.contains(k.as_str()));
670
671            match build_table.get_mut(&key) {
672                Some(entry) => {
673                    // Set field value
674                    let field;
675                    if let Value::String(f) = record_values.get("_field").unwrap() {
676                        field = f.clone();
677                    } else {
678                        unreachable!();
679                    }
680                    let value = record_values.get("_value").unwrap();
681                    entry.insert(field, value.clone());
682                }
683                None => {
684                    // Set field value
685                    let field;
686                    if let Value::String(f) = record_values.get("_field").unwrap() {
687                        field = f.clone();
688                    } else {
689                        unreachable!();
690                    }
691                    let value = record_values.get("_value").unwrap();
692                    record_values.insert(field, value.clone());
693
694                    build_table.insert(key.clone(), record_values);
695                    key_order.push(key);
696                }
697            }
698        }
699
700        // Build items based on the order the `key` is inserted
701        let mut items = vec![];
702        for key in key_order {
703            let entry = build_table.get(&key).unwrap();
704            items.push(entry.clone());
705        }
706
707        Ok(Self { items })
708    }
709}
710
711fn parse_value(s: &str, t: DataType, name: &str) -> Result<Value, RequestError> {
712    match t {
713        DataType::String => Ok(Value::String(String::from(s))),
714        DataType::Double => {
715            let v = s.parse::<f64>().unwrap();
716            Ok(Value::Double(OrderedFloat::from(v)))
717        }
718        DataType::Bool => {
719            if s.to_lowercase() == "false" {
720                Ok(Value::Bool(false))
721            } else {
722                Ok(Value::Bool(true))
723            }
724        }
725        DataType::Long => {
726            let v = s.parse::<i64>().unwrap();
727            Ok(Value::Long(v))
728        }
729        DataType::UnsignedLong => {
730            let v = s.parse::<u64>().unwrap();
731            Ok(Value::UnsignedLong(v))
732        }
733        DataType::Duration => match parse_duration(s) {
734            Ok(d) => Ok(Value::Duration(chrono::Duration::nanoseconds(d))),
735            Err(_) => Err(RequestError::Deserializing {
736                text: format!("invalid duration: {}, name: {}", s, name),
737            }),
738        },
739        DataType::Base64Binary => {
740            let b = decode(s).unwrap();
741            Ok(Value::Base64Binary(b))
742        }
743        DataType::TimeRFC => {
744            let t = DateTime::parse_from_rfc3339(s).unwrap();
745            Ok(Value::TimeRFC(t))
746        }
747    }
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753    use crate::FromDataPoint;
754    use mockito::{mock, Matcher};
755
756    #[derive(Default, FromDataPoint)]
757    struct Empty {}
758
759    #[tokio::test]
760    async fn query_suggestions() {
761        let token = "some-token";
762
763        let mock_server = mock("GET", "/api/v2/query/suggestions")
764            .match_header("Authorization", format!("Token {}", token).as_str())
765            .create();
766
767        let client = Client::new(mockito::server_url(), "org", token);
768
769        let _result = client.query_suggestions().await;
770
771        mock_server.assert();
772    }
773
774    #[tokio::test]
775    async fn query_suggestions_name() {
776        let token = "some-token";
777        let suggestion_name = "some-name";
778
779        let mock_server = mock(
780            "GET",
781            format!(
782                "/api/v2/query/suggestions/{name}",
783                name = crate::common::urlencode(suggestion_name)
784            )
785            .as_str(),
786        )
787        .match_header("Authorization", format!("Token {}", token).as_str())
788        .create();
789
790        let client = Client::new(mockito::server_url(), "org", token);
791
792        let _result = client.query_suggestions_name(suggestion_name).await;
793
794        mock_server.assert();
795    }
796
797    #[tokio::test]
798    async fn query() {
799        let token = "some-token";
800        let org = "some-org";
801        let query: Option<Query> = Some(Query::new("some-influx-query-string".to_string()));
802        let mock_server = mock("POST", "/api/v2/query")
803            .match_header("Authorization", format!("Token {}", token).as_str())
804            .match_header("Accepting-Encoding", "identity")
805            .match_header("Content-Type", "application/json")
806            .match_query(Matcher::UrlEncoded("org".into(), org.into()))
807            .match_body(
808                serde_json::to_string(&query.clone().unwrap_or_default())
809                    .unwrap()
810                    .as_str(),
811            )
812            .create();
813
814        let client = Client::new(mockito::server_url(), org, token);
815
816        let _result = client.query::<Empty>(query).await;
817
818        mock_server.assert();
819    }
820
821    #[tokio::test]
822    async fn query_opt() {
823        let token = "some-token";
824        let org = "some-org";
825
826        let mock_server = mock("POST", "/api/v2/query")
827            .match_header("Authorization", format!("Token {}", token).as_str())
828            .match_header("Accepting-Encoding", "identity")
829            .match_header("Content-Type", "application/json")
830            .match_query(Matcher::UrlEncoded("org".into(), org.into()))
831            .match_body(serde_json::to_string(&Query::default()).unwrap().as_str())
832            .create();
833
834        let client = Client::new(mockito::server_url(), org, token);
835
836        let _result = client.query::<Empty>(None).await;
837
838        mock_server.assert();
839    }
840
841    #[tokio::test]
842    async fn query_analyze() {
843        let token = "some-token";
844        let query: Option<Query> = Some(Query::new("some-influx-query-string".to_string()));
845        let mock_server = mock("POST", "/api/v2/query/analyze")
846            .match_header("Authorization", format!("Token {}", token).as_str())
847            .match_header("Content-Type", "application/json")
848            .match_body(
849                serde_json::to_string(&query.clone().unwrap_or_default())
850                    .unwrap()
851                    .as_str(),
852            )
853            .create();
854
855        let client = Client::new(mockito::server_url(), "org", token);
856
857        let _result = client.query_analyze(query).await;
858
859        mock_server.assert();
860    }
861
862    #[tokio::test]
863    async fn query_analyze_opt() {
864        let token = "some-token";
865        let query: Option<Query> = None;
866        let mock_server = mock("POST", "/api/v2/query/analyze")
867            .match_header("Authorization", format!("Token {}", token).as_str())
868            .match_header("Content-Type", "application/json")
869            .match_body(
870                serde_json::to_string(&query.clone().unwrap_or_default())
871                    .unwrap()
872                    .as_str(),
873            )
874            .create();
875
876        let client = Client::new(mockito::server_url(), "org", token);
877
878        let _result = client.query_analyze(query).await;
879
880        mock_server.assert();
881    }
882
883    #[tokio::test]
884    async fn query_ast() {
885        let token = "some-token";
886        let language_request: Option<LanguageRequest> =
887            Some(LanguageRequest::new("some-influx-query-string".to_string()));
888        let mock_server = mock("POST", "/api/v2/query/ast")
889            .match_header("Authorization", format!("Token {}", token).as_str())
890            .match_header("Content-Type", "application/json")
891            .match_body(
892                serde_json::to_string(&language_request.clone().unwrap_or_default())
893                    .unwrap()
894                    .as_str(),
895            )
896            .create();
897
898        let client = Client::new(mockito::server_url(), "org", token);
899
900        let _result = client.query_ast(language_request).await;
901
902        mock_server.assert();
903    }
904
905    #[tokio::test]
906    async fn query_ast_opt() {
907        let token = "some-token";
908        let language_request: Option<LanguageRequest> = None;
909        let mock_server = mock("POST", "/api/v2/query/ast")
910            .match_header("Authorization", format!("Token {}", token).as_str())
911            .match_header("Content-Type", "application/json")
912            .match_body(
913                serde_json::to_string(&language_request.clone().unwrap_or_default())
914                    .unwrap()
915                    .as_str(),
916            )
917            .create();
918
919        let client = Client::new(mockito::server_url(), "org", token);
920
921        let _result = client.query_ast(language_request).await;
922
923        mock_server.assert();
924    }
925
926    #[test]
927    fn test_query_table_result() {
928        let text = "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
929#group,false,false,true,true,false,false,true,true,true,true
930#default,_result,,,,,,,,,
931,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
932,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
933,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
934";
935        let qtr = QueryTableResult::new(text);
936        #[rustfmt::skip]
937        let expected: [FluxRecord; 2] = [
938            FluxRecord {
939                table: 0,
940                values: [
941                    (String::from("result"), Value::String(String::from("_result"))),
942                    (String::from("table"), Value::Long(0)),
943                    (String::from("_start"), parse_value("2020-02-17T22:19:49.747562847Z", DataType::TimeRFC, "_start").unwrap()),
944                    (String::from("_stop"), parse_value("2020-02-18T22:19:49.747562847Z", DataType::TimeRFC, "_stop").unwrap()),
945                    (String::from("_time"), parse_value("2020-02-18T10:34:08.135814545Z", DataType::TimeRFC, "_time").unwrap()),
946                    (String::from("_field"), Value::String(String::from("f"))),
947                    (String::from("_measurement"), Value::String(String::from("test"))),
948                    (String::from("_value"), Value::Double(OrderedFloat::from(1.4))),
949                    (String::from("a"), Value::String(String::from("1"))),
950                    (String::from("b"), Value::String(String::from("adsfasdf"))),
951                ].iter().cloned().collect(),
952            },
953            FluxRecord {
954                table: 0,
955                values: [
956                    (String::from("result"), Value::String(String::from("_result"))),
957                    (String::from("table"), Value::Long(0)),
958                    (String::from("_start"), parse_value("2020-02-17T22:19:49.747562847Z", DataType::TimeRFC, "_start").unwrap()),
959                    (String::from("_stop"), parse_value("2020-02-18T22:19:49.747562847Z", DataType::TimeRFC, "_stop").unwrap()),
960                    (String::from("_time"), parse_value("2020-02-18T22:08:44.850214724Z", DataType::TimeRFC, "_time").unwrap()),
961                    (String::from("_field"), Value::String(String::from("f"))),
962                    (String::from("_measurement"), Value::String(String::from("test"))),
963                    (String::from("_value"), Value::Double(OrderedFloat::from(6.6))),
964                    (String::from("a"), Value::String(String::from("1"))),
965                    (String::from("b"), Value::String(String::from("adsfasdf"))),
966                ].iter().cloned().collect(),
967            },
968        ];
969        for (i, item) in qtr.iterator().enumerate() {
970            match item {
971                Ok(record) => {
972                    assert_eq!(record, expected[i]);
973                }
974                Err(e) => {
975                    assert_eq!(format!("{}", e), "");
976                }
977            }
978        }
979    }
980}