1use std::collections::{BTreeMap, HashMap, HashSet};
6use std::str::FromStr;
7
8use crate::{Client, Http, RequestError, ReqwestProcessing, Serializing};
9
10use base64::decode;
11use chrono::DateTime;
12use csv::StringRecord;
13use fallible_iterator::FallibleIterator;
14use go_parse_duration::parse_duration;
15use influxdb2_structmap::value::Value;
16use influxdb2_structmap::{FromMap, GenericMap};
17use ordered_float::OrderedFloat;
18use reqwest::{Method, StatusCode};
19use snafu::ResultExt;
20
21use crate::models::{
22 AnalyzeQueryResponse, AstResponse, FluxSuggestion, FluxSuggestions, LanguageRequest, Query,
23};
24
25#[derive(Clone, Debug)]
27pub struct QueryTableIter {
28 text: String,
29}
30
31impl<'a> QueryTableIter {
32 fn new(text: String) -> Self {
33 Self { text }
34 }
35
36 pub fn result(&'a self) -> impl FallibleIterator<Item = FluxRecord, Error = RequestError> + 'a {
38 QueryTableResult::new(&self.text)
39 }
40
41 pub fn is_empty(&self) -> bool {
43 matches!(QueryTableResult::new(&self.text).next(), Ok(None))
44 }
45}
46
47impl Client {
48 pub async fn query_suggestions(&self) -> Result<FluxSuggestions, RequestError> {
50 let req_url = self.url("/api/v2/query/suggestions");
51 let response = self
52 .request(Method::GET, &req_url)
53 .send()
54 .await
55 .context(ReqwestProcessing)?;
56
57 match response.status() {
58 StatusCode::OK => Ok(response
59 .json::<FluxSuggestions>()
60 .await
61 .context(ReqwestProcessing)?),
62 status => {
63 let text = response.text().await.context(ReqwestProcessing)?;
64 Http { status, text }.fail()?
65 }
66 }
67 }
68
69 pub async fn query_suggestions_name(&self, name: &str) -> Result<FluxSuggestion, RequestError> {
71 let req_url = self.url(&format!(
72 "/api/v2/query/suggestions/{name}",
73 name = crate::common::urlencode(name)
74 ));
75
76 let response = self
77 .request(Method::GET, &req_url)
78 .send()
79 .await
80 .context(ReqwestProcessing)?;
81
82 match response.status() {
83 StatusCode::OK => Ok(response
84 .json::<FluxSuggestion>()
85 .await
86 .context(ReqwestProcessing)?),
87 status => {
88 let text = response.text().await.context(ReqwestProcessing)?;
89 Http { status, text }.fail()?
90 }
91 }
92 }
93
94 pub async fn query<T: FromMap>(&self, query: Option<Query>) -> Result<Vec<T>, RequestError> {
96 let req_url = self.url("/api/v2/query");
97 let body = serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?;
98
99 let response = self
100 .request(Method::POST, &req_url)
101 .header("Accepting-Encoding", "identity")
102 .header("Content-Type", "application/json")
103 .query(&[("org", &self.org)])
104 .body(body)
105 .send()
106 .await
107 .context(ReqwestProcessing)?;
108
109 match response.status() {
110 StatusCode::OK => {
111 let text = response.text().await.unwrap();
112 let qtr = QueryTableResult::new(&text[..]);
113 let qr = QueryResult::new(qtr)?;
114 let mut res = vec![];
115 for item in qr.items {
116 res.push(T::from_genericmap(item));
117 }
118 Ok(res)
119 }
120 status => {
121 let text = response.text().await.context(ReqwestProcessing)?;
122 Http { status, text }.fail()?
123 }
124 }
125 }
126
127 pub async fn query_raw(&self, query: Option<Query>) -> Result<Vec<FluxRecord>, RequestError> {
129 let req_url = self.url("/api/v2/query");
130 let body = serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?;
131
132 let response = self
133 .request(Method::POST, &req_url)
134 .header("Accepting-Encoding", "identity")
135 .header("Content-Type", "application/json")
136 .query(&[("org", &self.org)])
137 .body(body)
138 .send()
139 .await
140 .context(ReqwestProcessing)?;
141
142 match response.status() {
143 StatusCode::OK => {
144 let text = response.text().await.unwrap();
145 let qtr = QueryTableResult::new(&text[..]);
146 let mut records = vec![];
147 for record in qtr.iterator() {
148 records.push(record?);
149 }
150 Ok(records)
151 }
152 status => {
153 let text = response.text().await.context(ReqwestProcessing)?;
154 Http { status, text }.fail()?
155 }
156 }
157 }
158
159 pub async fn query_raw_iter(
162 &self,
163 query: Option<Query>,
164 ) -> Result<QueryTableIter, RequestError> {
165 let req_url = self.url("/api/v2/query");
166 let body = serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?;
167
168 let response = self
169 .request(Method::POST, &req_url)
170 .header("Accepting-Encoding", "identity")
171 .header("Content-Type", "application/json")
172 .query(&[("org", &self.org)])
173 .body(body)
174 .send()
175 .await
176 .context(ReqwestProcessing)?;
177
178 match response.status() {
179 StatusCode::OK => {
180 let text = response.text().await.context(ReqwestProcessing)?;
181
182 Ok(QueryTableIter::new(text))
183 }
184 status => {
185 let text = response.text().await.context(ReqwestProcessing)?;
186 Http { status, text }.fail()?
187 }
188 }
189 }
190
191 pub async fn query_analyze(
193 &self,
194 query: Option<Query>,
195 ) -> Result<AnalyzeQueryResponse, RequestError> {
196 let req_url = self.url("/api/v2/query/analyze");
197
198 let response = self
199 .request(Method::POST, &req_url)
200 .header("Content-Type", "application/json")
201 .body(serde_json::to_string(&query.unwrap_or_default()).context(Serializing)?)
202 .send()
203 .await
204 .context(ReqwestProcessing)?;
205
206 match response.status() {
207 StatusCode::OK => Ok(response
208 .json::<AnalyzeQueryResponse>()
209 .await
210 .context(ReqwestProcessing)?),
211 status => {
212 let text = response.text().await.context(ReqwestProcessing)?;
213 Http { status, text }.fail()?
214 }
215 }
216 }
217
218 pub async fn query_ast(
220 &self,
221 language_request: Option<LanguageRequest>,
222 ) -> Result<AstResponse, RequestError> {
223 let req_url = self.url("/api/v2/query/ast");
224
225 let response = self
226 .request(Method::POST, &req_url)
227 .header("Content-Type", "application/json")
228 .body(
229 serde_json::to_string(&language_request.unwrap_or_default())
230 .context(Serializing)?,
231 )
232 .send()
233 .await
234 .context(ReqwestProcessing)?;
235
236 match response.status() {
237 StatusCode::OK => Ok(response
238 .json::<AstResponse>()
239 .await
240 .context(ReqwestProcessing)?),
241 status => {
242 let text = response.text().await.context(ReqwestProcessing)?;
243 Http { status, text }.fail()?
244 }
245 }
246 }
247
248 pub async fn list_measurements(
256 &self,
257 bucket: &str,
258 start: Option<&str>,
259 stop: Option<&str>,
260 ) -> Result<Vec<String>, RequestError> {
261 let mut params = vec![];
262 params.push(format!(r#"bucket: "{bucket}""#));
263 if let Some(start) = start {
264 params.push(format!("start: {start}"));
265 }
266 if let Some(stop) = stop {
267 params.push(format!("stop: {stop}"));
268 }
269 let params = params.join(", ");
270
271 let query = Query::new(format!(
272 r#"import "influxdata/influxdb/schema"
273
274 schema.measurements({params})"#
275 ));
276 self.exec_schema_query(query).await
277 }
278
279 pub async fn list_measurement_field_keys(
288 &self,
289 bucket: &str,
290 measurement: &str,
291 start: Option<&str>,
292 stop: Option<&str>,
293 ) -> Result<Vec<String>, RequestError> {
294 let mut params = vec![];
295 params.push(format!(r#"bucket: "{bucket}""#));
296 params.push(format!(r#"measurement: "{measurement}""#));
297 if let Some(start) = start {
298 params.push(format!("start: {start}"));
299 }
300 if let Some(stop) = stop {
301 params.push(format!("stop: {stop}"));
302 }
303 let params = params.join(", ");
304
305 let query = Query::new(format!(
306 r#"import "influxdata/influxdb/schema"
307
308 schema.measurementFieldKeys({params})"#,
309 ));
310 self.exec_schema_query(query).await
311 }
312
313 pub async fn list_measurement_tag_values(
323 &self,
324 bucket: &str,
325 measurement: &str,
326 tag: &str,
327 start: Option<&str>,
328 stop: Option<&str>,
329 ) -> Result<Vec<String>, RequestError> {
330 let mut params = vec![];
331 params.push(format!(r#"bucket: "{bucket}""#));
332 params.push(format!(r#"measurement: "{measurement}""#));
333 params.push(format!(r#"tag: "{tag}""#));
334 if let Some(start) = start {
335 params.push(format!("start: {start}"));
336 }
337 if let Some(stop) = stop {
338 params.push(format!("stop: {stop}"));
339 }
340 let params = params.join(", ");
341
342 let query = Query::new(format!(
343 r#"import "influxdata/influxdb/schema"
344
345 schema.measurementTagValues({params})"#,
346 ));
347 self.exec_schema_query(query).await
348 }
349
350 pub async fn list_measurement_tag_keys(
359 &self,
360 bucket: &str,
361 measurement: &str,
362 start: Option<&str>,
363 stop: Option<&str>,
364 ) -> Result<Vec<String>, RequestError> {
365 let mut params = vec![];
366 params.push(format!(r#"bucket: "{bucket}""#));
367 params.push(format!(r#"measurement: "{measurement}""#));
368 if let Some(start) = start {
369 params.push(format!("start: {start}"));
370 }
371 if let Some(stop) = stop {
372 params.push(format!("stop: {stop}"));
373 }
374 let params = params.join(", ");
375
376 let query = Query::new(format!(
377 r#"import "influxdata/influxdb/schema"
378
379 schema.measurementTagKeys({params})"#,
380 ));
381 self.exec_schema_query(query).await
382 }
383
384 async fn exec_schema_query(&self, query: Query) -> Result<Vec<String>, RequestError> {
385 let req_url = self.url("/api/v2/query");
386 let body = serde_json::to_string(&query).context(Serializing)?;
387
388 let response = self
389 .request(Method::POST, &req_url)
390 .header("Accepting-Encoding", "identity")
391 .header("Content-Type", "application/json")
392 .query(&[("org", &self.org)])
393 .body(body)
394 .send()
395 .await
396 .context(ReqwestProcessing)?;
397
398 match response.status() {
399 StatusCode::OK => {
400 let text: String = response.text().await.unwrap();
401
402 let mut reader = csv::ReaderBuilder::new()
403 .has_headers(true)
404 .comment(Some(b'#'))
405 .from_reader(text.as_bytes());
406
407 Ok(reader
408 .records()
409 .flatten()
410 .flat_map(|r: StringRecord| r.get(3).map(|s| s.to_owned()))
411 .collect())
412 }
413 status => {
414 let text = response.text().await.context(ReqwestProcessing)?;
415 Http { status, text }.fail()?
416 }
417 }
418 }
419}
420
421#[derive(Clone, Copy, Debug, PartialEq)]
422enum DataType {
423 String,
424 Double,
425 Bool,
426 Long,
427 UnsignedLong,
428 Duration,
429 Base64Binary,
430 TimeRFC,
431}
432
433impl FromStr for DataType {
434 type Err = RequestError;
435
436 fn from_str(input: &str) -> Result<Self, RequestError> {
437 match input {
438 "string" => Ok(Self::String),
439 "double" => Ok(Self::Double),
440 "boolean" => Ok(Self::Bool),
441 "long" => Ok(Self::Long),
442 "unsignedLong" => Ok(Self::UnsignedLong),
443 "duration" => Ok(Self::Duration),
444 "base64Binary" => Ok(Self::Base64Binary),
445 "dateTime:RFC3339" => Ok(Self::TimeRFC),
446 "dateTime:RFC3339Nano" => Ok(Self::TimeRFC),
447 _ => Err(RequestError::Deserializing {
448 text: format!("unknown datatype: {}", input),
449 }),
450 }
451 }
452}
453
454struct FluxColumn {
455 name: String,
456 data_type: DataType,
457 group: bool,
458 default_value: String,
459}
460
461#[derive(Clone, Debug, PartialEq)]
463pub struct FluxRecord {
464 pub table: i32,
466 pub values: GenericMap,
468}
469
470struct FluxTableMetadata {
471 position: i32,
472 columns: Vec<FluxColumn>,
473}
474
475struct QueryTableResult<'a> {
476 csv_reader: csv::Reader<&'a [u8]>,
477 table_position: i32,
478 table_changed: bool,
479 table: Option<FluxTableMetadata>,
480}
481
482#[derive(PartialEq)]
483enum ParsingState {
484 Normal,
485 Annotation,
486 Error,
487}
488
489impl<'a> QueryTableResult<'a> {
490 fn new(text: &'a str) -> Self {
491 let reader = csv::ReaderBuilder::new()
492 .has_headers(false)
493 .flexible(true)
494 .from_reader(text.as_bytes());
495 Self {
496 csv_reader: reader,
497 table_position: 0,
498 table_changed: false,
499 table: None,
500 }
501 }
502}
503
504impl<'a> FallibleIterator for QueryTableResult<'a> {
505 type Item = FluxRecord;
506 type Error = RequestError;
507
508 fn next(&mut self) -> Result<Option<FluxRecord>, RequestError> {
509 let record: FluxRecord;
511
512 self.table_changed = false;
513 let mut row = StringRecord::new();
514 let mut parsing_state = ParsingState::Normal;
515 let mut data_type_annotation_found = false;
516 loop {
517 if !self.csv_reader.read_record(&mut row).unwrap() {
518 return Ok(None);
520 }
521 if row.len() <= 1 {
522 continue;
523 }
524 if let Some(s) = row.get(0) {
525 if !s.is_empty() && s.chars().nth(0).unwrap() == '#' {
526 if parsing_state == ParsingState::Normal {
528 self.table = Some(FluxTableMetadata {
529 position: self.table_position,
530 columns: Vec::new(),
531 });
532 self.table_position += 1;
533 self.table_changed = true;
534 for _ in 1..row.len() {
535 self.table.as_mut().unwrap().columns.push(FluxColumn {
536 name: String::from(""),
537 data_type: DataType::String,
538 group: false,
539 default_value: String::from(""),
540 });
541 }
542 parsing_state = ParsingState::Annotation;
543 }
544 }
545 }
546 if self.table.is_none() {
547 return Err(RequestError::Deserializing {
548 text: String::from("annotations not found"),
549 });
550 }
551 if row.len() - 1 != self.table.as_ref().unwrap().columns.len() {
552 return Err(RequestError::Deserializing {
553 text: format!(
554 "row has different number of columns than the table: {} vs {}",
555 row.len() - 1,
556 self.table.as_ref().unwrap().columns.len(),
557 ),
558 });
559 }
560 if let Some(s) = row.get(0) {
561 match s {
562 "" => {
563 match parsing_state {
564 ParsingState::Annotation => {
565 if !data_type_annotation_found {
567 return Err(RequestError::Deserializing {
568 text: String::from("datatype annotation not found"),
569 });
570 }
571 if row.get(1).unwrap() == "error" {
572 parsing_state = ParsingState::Error;
573 } else {
574 for i in 1..row.len() {
575 let column =
576 &mut self.table.as_mut().unwrap().columns[i - 1];
577 column.name = String::from(row.get(i).unwrap());
578 }
579 parsing_state = ParsingState::Normal;
580 }
581 continue;
582 }
583 ParsingState::Error => {
584 let msg = if row.len() > 1 && !row.get(1).unwrap().is_empty() {
585 row.get(1).unwrap()
586 } else {
587 "unknown query error"
588 };
589 let mut reference = String::from("");
590 if row.len() > 2 && !row.get(2).unwrap().is_empty() {
591 let s = row.get(2).unwrap();
592 reference = format!(",{}", s);
593 }
594 return Err(RequestError::Deserializing {
595 text: format!("{}{}", msg, reference),
596 });
597 }
598 _ => {}
599 }
600 let mut values = BTreeMap::new();
601 for i in 1..row.len() {
602 let column = &self.table.as_mut().unwrap().columns[i - 1];
603 let mut v = row.get(i).unwrap();
604 if v.is_empty() {
605 v = &column.default_value[..];
606 }
607 let value = parse_value(v, column.data_type, column.name.as_str())?;
608 values.entry(column.name.clone()).or_insert(value);
609 }
610 record = FluxRecord {
611 table: self.table.as_ref().unwrap().position,
612 values,
613 };
614 break;
615 }
616 "#datatype" => {
617 data_type_annotation_found = true;
618 for i in 1..row.len() {
619 let column = &mut self.table.as_mut().unwrap().columns[i - 1];
620 let dt = DataType::from_str(row.get(i).unwrap())?;
621 column.data_type = dt;
622 }
623 }
624 "#group" => {
625 for i in 1..row.len() {
626 let column = &mut self.table.as_mut().unwrap().columns[i - 1];
627 column.group = row.get(i).unwrap() == "true";
628 }
629 }
630 "#default" => {
631 for i in 1..row.len() {
632 let column = &mut self.table.as_mut().unwrap().columns[i - 1];
633 column.default_value = String::from(row.get(i).unwrap());
634 }
635 }
636 _ => {
637 return Err(RequestError::Deserializing {
638 text: format!("invalid first cell: {}", s),
639 });
640 }
641 }
642 }
643 }
644 Ok(Some(record))
645 }
646}
647
648struct QueryResult {
649 items: Vec<GenericMap>,
650}
651
652impl QueryResult {
653 fn new(qtr: QueryTableResult<'_>) -> Result<Self, RequestError> {
654 let ignored_keys = vec!["_field", "_value", "table"];
655 let ignored_keys: HashSet<&str> = ignored_keys.into_iter().collect();
656
657 let mut build_table = HashMap::<GenericMap, GenericMap>::new();
663 let mut key_order: Vec<GenericMap> = vec![];
664 for record in qtr.iterator() {
665 let mut record_values = record?.values;
666
667 let mut key = record_values.clone();
669 key.retain(|k, _| !ignored_keys.contains(k.as_str()));
670
671 match build_table.get_mut(&key) {
672 Some(entry) => {
673 let field;
675 if let Value::String(f) = record_values.get("_field").unwrap() {
676 field = f.clone();
677 } else {
678 unreachable!();
679 }
680 let value = record_values.get("_value").unwrap();
681 entry.insert(field, value.clone());
682 }
683 None => {
684 let field;
686 if let Value::String(f) = record_values.get("_field").unwrap() {
687 field = f.clone();
688 } else {
689 unreachable!();
690 }
691 let value = record_values.get("_value").unwrap();
692 record_values.insert(field, value.clone());
693
694 build_table.insert(key.clone(), record_values);
695 key_order.push(key);
696 }
697 }
698 }
699
700 let mut items = vec![];
702 for key in key_order {
703 let entry = build_table.get(&key).unwrap();
704 items.push(entry.clone());
705 }
706
707 Ok(Self { items })
708 }
709}
710
711fn parse_value(s: &str, t: DataType, name: &str) -> Result<Value, RequestError> {
712 match t {
713 DataType::String => Ok(Value::String(String::from(s))),
714 DataType::Double => {
715 let v = s.parse::<f64>().unwrap();
716 Ok(Value::Double(OrderedFloat::from(v)))
717 }
718 DataType::Bool => {
719 if s.to_lowercase() == "false" {
720 Ok(Value::Bool(false))
721 } else {
722 Ok(Value::Bool(true))
723 }
724 }
725 DataType::Long => {
726 let v = s.parse::<i64>().unwrap();
727 Ok(Value::Long(v))
728 }
729 DataType::UnsignedLong => {
730 let v = s.parse::<u64>().unwrap();
731 Ok(Value::UnsignedLong(v))
732 }
733 DataType::Duration => match parse_duration(s) {
734 Ok(d) => Ok(Value::Duration(chrono::Duration::nanoseconds(d))),
735 Err(_) => Err(RequestError::Deserializing {
736 text: format!("invalid duration: {}, name: {}", s, name),
737 }),
738 },
739 DataType::Base64Binary => {
740 let b = decode(s).unwrap();
741 Ok(Value::Base64Binary(b))
742 }
743 DataType::TimeRFC => {
744 let t = DateTime::parse_from_rfc3339(s).unwrap();
745 Ok(Value::TimeRFC(t))
746 }
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753 use crate::FromDataPoint;
754 use mockito::{mock, Matcher};
755
756 #[derive(Default, FromDataPoint)]
757 struct Empty {}
758
759 #[tokio::test]
760 async fn query_suggestions() {
761 let token = "some-token";
762
763 let mock_server = mock("GET", "/api/v2/query/suggestions")
764 .match_header("Authorization", format!("Token {}", token).as_str())
765 .create();
766
767 let client = Client::new(mockito::server_url(), "org", token);
768
769 let _result = client.query_suggestions().await;
770
771 mock_server.assert();
772 }
773
774 #[tokio::test]
775 async fn query_suggestions_name() {
776 let token = "some-token";
777 let suggestion_name = "some-name";
778
779 let mock_server = mock(
780 "GET",
781 format!(
782 "/api/v2/query/suggestions/{name}",
783 name = crate::common::urlencode(suggestion_name)
784 )
785 .as_str(),
786 )
787 .match_header("Authorization", format!("Token {}", token).as_str())
788 .create();
789
790 let client = Client::new(mockito::server_url(), "org", token);
791
792 let _result = client.query_suggestions_name(suggestion_name).await;
793
794 mock_server.assert();
795 }
796
797 #[tokio::test]
798 async fn query() {
799 let token = "some-token";
800 let org = "some-org";
801 let query: Option<Query> = Some(Query::new("some-influx-query-string".to_string()));
802 let mock_server = mock("POST", "/api/v2/query")
803 .match_header("Authorization", format!("Token {}", token).as_str())
804 .match_header("Accepting-Encoding", "identity")
805 .match_header("Content-Type", "application/json")
806 .match_query(Matcher::UrlEncoded("org".into(), org.into()))
807 .match_body(
808 serde_json::to_string(&query.clone().unwrap_or_default())
809 .unwrap()
810 .as_str(),
811 )
812 .create();
813
814 let client = Client::new(mockito::server_url(), org, token);
815
816 let _result = client.query::<Empty>(query).await;
817
818 mock_server.assert();
819 }
820
821 #[tokio::test]
822 async fn query_opt() {
823 let token = "some-token";
824 let org = "some-org";
825
826 let mock_server = mock("POST", "/api/v2/query")
827 .match_header("Authorization", format!("Token {}", token).as_str())
828 .match_header("Accepting-Encoding", "identity")
829 .match_header("Content-Type", "application/json")
830 .match_query(Matcher::UrlEncoded("org".into(), org.into()))
831 .match_body(serde_json::to_string(&Query::default()).unwrap().as_str())
832 .create();
833
834 let client = Client::new(mockito::server_url(), org, token);
835
836 let _result = client.query::<Empty>(None).await;
837
838 mock_server.assert();
839 }
840
841 #[tokio::test]
842 async fn query_analyze() {
843 let token = "some-token";
844 let query: Option<Query> = Some(Query::new("some-influx-query-string".to_string()));
845 let mock_server = mock("POST", "/api/v2/query/analyze")
846 .match_header("Authorization", format!("Token {}", token).as_str())
847 .match_header("Content-Type", "application/json")
848 .match_body(
849 serde_json::to_string(&query.clone().unwrap_or_default())
850 .unwrap()
851 .as_str(),
852 )
853 .create();
854
855 let client = Client::new(mockito::server_url(), "org", token);
856
857 let _result = client.query_analyze(query).await;
858
859 mock_server.assert();
860 }
861
862 #[tokio::test]
863 async fn query_analyze_opt() {
864 let token = "some-token";
865 let query: Option<Query> = None;
866 let mock_server = mock("POST", "/api/v2/query/analyze")
867 .match_header("Authorization", format!("Token {}", token).as_str())
868 .match_header("Content-Type", "application/json")
869 .match_body(
870 serde_json::to_string(&query.clone().unwrap_or_default())
871 .unwrap()
872 .as_str(),
873 )
874 .create();
875
876 let client = Client::new(mockito::server_url(), "org", token);
877
878 let _result = client.query_analyze(query).await;
879
880 mock_server.assert();
881 }
882
883 #[tokio::test]
884 async fn query_ast() {
885 let token = "some-token";
886 let language_request: Option<LanguageRequest> =
887 Some(LanguageRequest::new("some-influx-query-string".to_string()));
888 let mock_server = mock("POST", "/api/v2/query/ast")
889 .match_header("Authorization", format!("Token {}", token).as_str())
890 .match_header("Content-Type", "application/json")
891 .match_body(
892 serde_json::to_string(&language_request.clone().unwrap_or_default())
893 .unwrap()
894 .as_str(),
895 )
896 .create();
897
898 let client = Client::new(mockito::server_url(), "org", token);
899
900 let _result = client.query_ast(language_request).await;
901
902 mock_server.assert();
903 }
904
905 #[tokio::test]
906 async fn query_ast_opt() {
907 let token = "some-token";
908 let language_request: Option<LanguageRequest> = None;
909 let mock_server = mock("POST", "/api/v2/query/ast")
910 .match_header("Authorization", format!("Token {}", token).as_str())
911 .match_header("Content-Type", "application/json")
912 .match_body(
913 serde_json::to_string(&language_request.clone().unwrap_or_default())
914 .unwrap()
915 .as_str(),
916 )
917 .create();
918
919 let client = Client::new(mockito::server_url(), "org", token);
920
921 let _result = client.query_ast(language_request).await;
922
923 mock_server.assert();
924 }
925
926 #[test]
927 fn test_query_table_result() {
928 let text = "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
929#group,false,false,true,true,false,false,true,true,true,true
930#default,_result,,,,,,,,,
931,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
932,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
933,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
934";
935 let qtr = QueryTableResult::new(text);
936 #[rustfmt::skip]
937 let expected: [FluxRecord; 2] = [
938 FluxRecord {
939 table: 0,
940 values: [
941 (String::from("result"), Value::String(String::from("_result"))),
942 (String::from("table"), Value::Long(0)),
943 (String::from("_start"), parse_value("2020-02-17T22:19:49.747562847Z", DataType::TimeRFC, "_start").unwrap()),
944 (String::from("_stop"), parse_value("2020-02-18T22:19:49.747562847Z", DataType::TimeRFC, "_stop").unwrap()),
945 (String::from("_time"), parse_value("2020-02-18T10:34:08.135814545Z", DataType::TimeRFC, "_time").unwrap()),
946 (String::from("_field"), Value::String(String::from("f"))),
947 (String::from("_measurement"), Value::String(String::from("test"))),
948 (String::from("_value"), Value::Double(OrderedFloat::from(1.4))),
949 (String::from("a"), Value::String(String::from("1"))),
950 (String::from("b"), Value::String(String::from("adsfasdf"))),
951 ].iter().cloned().collect(),
952 },
953 FluxRecord {
954 table: 0,
955 values: [
956 (String::from("result"), Value::String(String::from("_result"))),
957 (String::from("table"), Value::Long(0)),
958 (String::from("_start"), parse_value("2020-02-17T22:19:49.747562847Z", DataType::TimeRFC, "_start").unwrap()),
959 (String::from("_stop"), parse_value("2020-02-18T22:19:49.747562847Z", DataType::TimeRFC, "_stop").unwrap()),
960 (String::from("_time"), parse_value("2020-02-18T22:08:44.850214724Z", DataType::TimeRFC, "_time").unwrap()),
961 (String::from("_field"), Value::String(String::from("f"))),
962 (String::from("_measurement"), Value::String(String::from("test"))),
963 (String::from("_value"), Value::Double(OrderedFloat::from(6.6))),
964 (String::from("a"), Value::String(String::from("1"))),
965 (String::from("b"), Value::String(String::from("adsfasdf"))),
966 ].iter().cloned().collect(),
967 },
968 ];
969 for (i, item) in qtr.iterator().enumerate() {
970 match item {
971 Ok(record) => {
972 assert_eq!(record, expected[i]);
973 }
974 Err(e) => {
975 assert_eq!(format!("{}", e), "");
976 }
977 }
978 }
979 }
980}