1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Synchronize version of influxdb::Client.

use influxdb::{Client, Error, Query, QueryTypes};
use reqwest::blocking as rb;
use reqwest::blocking::Client as ReqwestClient;
use reqwest::blocking::Response;
use reqwest::{StatusCode, Url};

pub struct SyncClient {
    client: Client,
}

/// A `r2d2` compatible InfluxDB Client,
/// simply get rid of async/await
impl SyncClient {
    pub fn new(client: Client) -> Self {
        SyncClient { client }
    }

    pub fn ping(&self) -> Result<(String, String), Error> {
        let res =
            rb::get(format!("{}/ping", self.client.database_url()).as_str()).map_err(|err| {
                Error::ProtocolError {
                    error: format!("{}", err),
                }
            })?;

        let build = res
            .headers()
            .get("X-Influxdb-Build")
            .unwrap()
            .to_str()
            .unwrap();
        let version = res
            .headers()
            .get("X-Influxdb-Version")
            .unwrap()
            .to_str()
            .unwrap();

        Ok((build.to_owned(), version.to_owned()))
    }

    pub fn query<'q, Q>(&self, q: &'q Q) -> Result<String, Error>
    where
        Q: Query,
        &'q Q: Into<QueryTypes<'q>>,
    {
        let query = q.build().map_err(|err| Error::InvalidQueryError {
            error: format!("{}", err),
        })?;

        let basic_parameters: Vec<(String, String)> = self.client.to_owned().into();

        let client = match q.into() {
            QueryTypes::Read(_) => {
                let read_query = query.get();
                let mut url = Url::parse_with_params(
                    format!("{url}/query", url = self.client.database_url()).as_str(),
                    basic_parameters,
                )
                .map_err(|err| Error::UrlConstructionError {
                    error: format!("{}", err),
                })?;

                url.query_pairs_mut().append_pair("q", &read_query);

                if read_query.contains("SELECT") || read_query.contains("SHOW") {
                    ReqwestClient::new().get(url)
                } else {
                    ReqwestClient::new().post(url)
                }
            }
            QueryTypes::Write(write_query) => {
                let mut url = Url::parse_with_params(
                    format!("{url}/write", url = self.client.database_url()).as_str(),
                    basic_parameters,
                )
                .map_err(|err| Error::InvalidQueryError {
                    error: format!("{}", err),
                })?;

                url.query_pairs_mut()
                    .append_pair("precision", &write_query.get_precision());

                ReqwestClient::new().post(url).body(query.get())
            }
        };

        let res: Response = client
            .send()
            .map_err(|err| Error::ConnectionError { error: err })?;

        match res.status() {
            StatusCode::UNAUTHORIZED => return Err(Error::AuthorizationError),
            StatusCode::FORBIDDEN => return Err(Error::AuthenticationError),
            _ => {}
        }

        let s = res.text().map_err(|_| Error::DeserializationError {
            error: "response could not be converted to UTF-8".to_string(),
        })?;

        // todo: improve error parsing without serde
        if s.contains("\"error\"") {
            return Err(Error::DatabaseError {
                error: format!("influxdb error: \"{}\"", s),
            });
        }

        Ok(s)
    }
}