1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
use reqwest::Client;
use serde::de::DeserializeOwned;
use crate::error::SQLError;
use crate::Error;
use std::fs::File;
use std::io::{Read, Write};
use std::path::Path;
use crate::types::{Atomicity, SchemaMap};

pub struct QuestDB {
    client: Client,
    url: String,
}

impl QuestDB {
    /// Creates a new connection to questdb
    ///
    /// # Example
    /// ```
    /// use questdb::QuestDB;
    /// let connection = QuestDB::new("http://192.168.1.37:9000");
    /// ```
    pub fn new(url: &str) -> Self {
        QuestDB {
            client: Client::new(),
            url: String::from(url),
        }
    }

    /// Compiles and executes the SQL query supplied
    ///
    /// # Arguments
    /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be
    /// included.
    /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X
    /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row
    /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is
    /// equivalent to limit=0,20
    /// * `count` - Instructs /exec to count rows and return this value in message header. Default
    /// value is false. There is slight performance hit for requesting row count.
    /// * `nm` - Skips metadata section of the response when true. When metadata is known and client
    /// is paging this flag should typically be set to true to reduce response size. Default value
    /// is false and metadata is included in the response.
    ///
    /// # Example
    /// ```no-test
    /// use questdb::QuestDB;
    /// use serde::{Serialize, Deserialize};
    ///
    /// #[derive(Serialize, Deserialize, Debug)]
    /// struct TestData {
    ///     id: i32,
    ///     ts: String,
    ///     temp: f64,
    ///     sensor_id: i32,
    /// }
    ///
    /// let connection = QuestDB::new("http://192.168.1.37:9000");
    /// let res = connection.exec::<TestData>("select * from readings", Some(5), None, None)
    ///     .await
    ///     .unwrap();
    /// ```
    pub async fn exec<T: DeserializeOwned>(&self, query: &str, limit: Option<usize>, count: Option<bool>, nm: Option<bool>)
        -> Result<Vec<T>, crate::error::Error>
    {
        let mut url = format!("{}/exec?query={}", self.url, query);

        // Check all the optional arguments and add them to the URL
        if let Some(l) = limit {
            url += format!("&limit={}", l).as_str();
        }
        if let Some(c) = count {
            url += format!("&count={}", c).as_str();
        }
        if let Some(n) = nm {
            url += format!("&nm={}", n).as_str();
        }

        let res = self.client.get(url.as_str())
            .send()
            .await?
            .json::<serde_json::Value>()
            .await?;

        let deserialized = match res.get("dataset") {
            Some(d) => d,
            None => {
                // The SQL failed, return an error with the error data
                let e: SQLError = serde_json::from_value(res)?;
                return Err(Error::SQLError(e));
            },
        }.to_owned();

        let deserialized: Vec<T> = serde_json::from_value(deserialized)?;

        Ok(deserialized)
    }

    /// The function `imp` streams tabular text data directly into a table. It supports CSV, TAB and
    /// Pipe (|) delimited inputs and optional headers. There are no restrictions on data size. Data
    /// type and structure is detected automatically and usually without additional configuration.
    /// However in some cases additional configuration can be provided to augment automatic
    /// detection results.
    ///
    /// # Arguments
    /// * `file_path` - Path to the file that is going to be imported
    /// * `schema` - List of columns and their types. This will overwrite the default unless the
    ///     table is already created.
    /// * `table_name` - Name of the table where the data will be saved
    /// * `overwrite` - Default value is false. Set it to true to have existing table deleted before
    ///     appending data.
    /// * `durable` - When request is durable QuestDB will flush relevant disk cache before
    ///     responding. Default value is false
    /// * `atomicity` - Available values are strict and relaxed. Default value is relaxed. When
    ///     atomicity is relaxed data rows that cannot be appended to table are discarded, thus
    ///     allowing partial uploads. In strict mode upload fails as soon as any data error is
    ///     encountered and all previously appended rows are rolled back.
    ///
    /// # Example
    /// ```no-test
    /// let connection = QuestDB::new("http://192.168.1.37:9000");
    /// let schema = new_schema!(("movieId", Schema::String), ("imdbId", Schema::Int));
    ///
    /// let res = connection.imp("./test.csv", Some(schema), "nu_table123", None, None, None)
    ///     .await
    ///     .unwrap();
    /// ```
    pub async fn imp(&self, file_path: &'static str, schema: Option<Vec<SchemaMap>>, table_name: &'static str,
                     overwrite: Option<bool>, durable: Option<bool>, atomicity: Option<Atomicity>)
        -> Result<(), crate::error::Error>
    {
        let mut form = reqwest::multipart::Form::new();
        let mut url = format!("{}/imp?fmt=json&name={}", self.url, table_name);

        // Check all the optional arguments and add them to the URL
        if let Some(s) = schema {
            let data = serde_json::to_string(&s).unwrap();
            form = form.part("schema", reqwest::multipart::Part::text(data));
        }
        if let Some(o) = overwrite {
            url += format!("&overwrite={}", o).as_str();
        }
        if let Some(d) = durable {
            url += format!("&durable={}", d).as_str();
        }
        if let Some(a) = atomicity {
            url += format!("&atomicity={}", a).as_str();
        }

        // Read the file as bytes
        let filep = Path::new(file_path);
        let mut file = File::open(&filep)?;
        let mut file_bytes: Vec<u8> = Vec::new();
        file.read_to_end(&mut file_bytes)?;

        // Create a part with the file_name
        let file_name = match filep.file_name() {
            Some(name) => name.to_str().unwrap(),
            None => filep.to_str().unwrap(),
        };
        let part = reqwest::multipart::Part::bytes(file_bytes)
            .file_name(file_name);

        // Create the form with the file part
        form = form.part("data", part);

        // Make the POST request
        let _res = self.client.post(url.as_str())
            .multipart(form)
            .send()
            .await?
            .text()
            .await?;

        Ok(())
    }

    /// Exports the result of the query to a CSV file
    ///
    /// # Arguments
    /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be
    /// included.
    /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X
    /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row
    /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is
    /// equivalent to limit=0,20
    ///
    /// # Example
    /// ```no-test
    /// use questdb::QuestDB;
    /// use std::fs::File;
    ///
    /// let connection = QuestDB::new("http://192.168.1.37:9000");
    ///
    /// let mut output_file = File::create("output.csv").unwrap();
    /// let res = match connection.exp("select * from nu_table", Some(5), &mut output_file)
    ///     .await
    ///     .unwrap();
    /// ```
    pub async fn exp(&self, query: &str, limit: Option<usize>, output_file: &mut File) -> Result<(), Error> {
        let mut url = format!("{}/exp?query={}", self.url, query);

        // Check all the optional arguments and add them to the URL
        if let Some(l) = limit {
            url += format!("&limit={}", l).as_str();
        }

        // Make the GET request
        let res: String = self.client.get(url.as_str())
            .send()
            .await?
            .text()
            .await?;

        // Try to write data to the file
        output_file.write_all(res.as_bytes())?;

        Ok(())
    }
}