questdb/
api.rs

1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use crate::error::SQLError;
4use crate::Error;
5use std::fs::File;
6use std::io::{Read, Write};
7use std::path::Path;
8use crate::types::{Atomicity, SchemaMap};
9
10pub struct QuestDB {
11    client: Client,
12    url: String,
13}
14
15impl QuestDB {
16    /// Creates a new connection to questdb
17    ///
18    /// # Example
19    /// ```
20    /// use questdb::QuestDB;
21    /// let connection = QuestDB::new("http://192.168.1.37:9000");
22    /// ```
23    pub fn new(url: &str) -> Self {
24        QuestDB {
25            client: Client::new(),
26            url: String::from(url),
27        }
28    }
29
30    /// Compiles and executes the SQL query supplied
31    ///
32    /// # Arguments
33    /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be
34    /// included.
35    /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X
36    /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row
37    /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is
38    /// equivalent to limit=0,20
39    /// * `count` - Instructs /exec to count rows and return this value in message header. Default
40    /// value is false. There is slight performance hit for requesting row count.
41    /// * `nm` - Skips metadata section of the response when true. When metadata is known and client
42    /// is paging this flag should typically be set to true to reduce response size. Default value
43    /// is false and metadata is included in the response.
44    ///
45    /// # Example
46    /// ```no-test
47    /// use questdb::QuestDB;
48    /// use serde::{Serialize, Deserialize};
49    ///
50    /// #[derive(Serialize, Deserialize, Debug)]
51    /// struct TestData {
52    ///     id: i32,
53    ///     ts: String,
54    ///     temp: f64,
55    ///     sensor_id: i32,
56    /// }
57    ///
58    /// let connection = QuestDB::new("http://192.168.1.37:9000");
59    /// let res = connection.exec::<TestData>("select * from readings", Some(5), None, None)
60    ///     .await
61    ///     .unwrap();
62    /// ```
63    pub async fn exec<T: DeserializeOwned>(&self, query: &str, limit: Option<usize>, count: Option<bool>, nm: Option<bool>)
64        -> Result<Vec<T>, crate::error::Error>
65    {
66        let mut url = format!("{}/exec?query={}", self.url, query);
67
68        // Check all the optional arguments and add them to the URL
69        if let Some(l) = limit {
70            url += format!("&limit={}", l).as_str();
71        }
72        if let Some(c) = count {
73            url += format!("&count={}", c).as_str();
74        }
75        if let Some(n) = nm {
76            url += format!("&nm={}", n).as_str();
77        }
78
79        let res = self.client.get(url.as_str())
80            .send()
81            .await?
82            .json::<serde_json::Value>()
83            .await?;
84
85        let deserialized = match res.get("dataset") {
86            Some(d) => d,
87            None => {
88                // The SQL failed, return an error with the error data
89                let e: SQLError = serde_json::from_value(res)?;
90                return Err(Error::SQLError(e));
91            },
92        }.to_owned();
93
94        let deserialized: Vec<T> = serde_json::from_value(deserialized)?;
95
96        Ok(deserialized)
97    }
98
99    /// The function `imp` streams tabular text data directly into a table. It supports CSV, TAB and
100    /// Pipe (|) delimited inputs and optional headers. There are no restrictions on data size. Data
101    /// type and structure is detected automatically and usually without additional configuration.
102    /// However in some cases additional configuration can be provided to augment automatic
103    /// detection results.
104    ///
105    /// # Arguments
106    /// * `file_path` - Path to the file that is going to be imported
107    /// * `schema` - List of columns and their types. This will overwrite the default unless the
108    ///     table is already created.
109    /// * `table_name` - Name of the table where the data will be saved
110    /// * `overwrite` - Default value is false. Set it to true to have existing table deleted before
111    ///     appending data.
112    /// * `durable` - When request is durable QuestDB will flush relevant disk cache before
113    ///     responding. Default value is false
114    /// * `atomicity` - Available values are strict and relaxed. Default value is relaxed. When
115    ///     atomicity is relaxed data rows that cannot be appended to table are discarded, thus
116    ///     allowing partial uploads. In strict mode upload fails as soon as any data error is
117    ///     encountered and all previously appended rows are rolled back.
118    ///
119    /// # Example
120    /// ```no-test
121    /// let connection = QuestDB::new("http://192.168.1.37:9000");
122    /// let schema = new_schema!(("movieId", Schema::String), ("imdbId", Schema::Int));
123    ///
124    /// let res = connection.imp("./test.csv", Some(schema), "nu_table123", None, None, None)
125    ///     .await
126    ///     .unwrap();
127    /// ```
128    pub async fn imp(&self, file_path: &'static str, schema: Option<Vec<SchemaMap>>, table_name: &'static str,
129                     overwrite: Option<bool>, durable: Option<bool>, atomicity: Option<Atomicity>)
130        -> Result<(), crate::error::Error>
131    {
132        let mut form = reqwest::multipart::Form::new();
133        let mut url = format!("{}/imp?fmt=json&name={}", self.url, table_name);
134
135        // Check all the optional arguments and add them to the URL
136        if let Some(s) = schema {
137            let data = serde_json::to_string(&s).unwrap();
138            form = form.part("schema", reqwest::multipart::Part::text(data));
139        }
140        if let Some(o) = overwrite {
141            url += format!("&overwrite={}", o).as_str();
142        }
143        if let Some(d) = durable {
144            url += format!("&durable={}", d).as_str();
145        }
146        if let Some(a) = atomicity {
147            url += format!("&atomicity={}", a).as_str();
148        }
149
150        // Read the file as bytes
151        let filep = Path::new(file_path);
152        let mut file = File::open(&filep)?;
153        let mut file_bytes: Vec<u8> = Vec::new();
154        file.read_to_end(&mut file_bytes)?;
155
156        // Create a part with the file_name
157        let file_name = match filep.file_name() {
158            Some(name) => name.to_str().unwrap(),
159            None => filep.to_str().unwrap(),
160        };
161        let part = reqwest::multipart::Part::bytes(file_bytes)
162            .file_name(file_name);
163
164        // Create the form with the file part
165        form = form.part("data", part);
166
167        // Make the POST request
168        let _res = self.client.post(url.as_str())
169            .multipart(form)
170            .send()
171            .await?
172            .text()
173            .await?;
174
175        Ok(())
176    }
177
178    /// Exports the result of the query to a CSV file
179    ///
180    /// # Arguments
181    /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be
182    /// included.
183    /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X
184    /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row
185    /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is
186    /// equivalent to limit=0,20
187    ///
188    /// # Example
189    /// ```no-test
190    /// use questdb::QuestDB;
191    /// use std::fs::File;
192    ///
193    /// let connection = QuestDB::new("http://192.168.1.37:9000");
194    ///
195    /// let mut output_file = File::create("output.csv").unwrap();
196    /// let res = match connection.exp("select * from nu_table", Some(5), &mut output_file)
197    ///     .await
198    ///     .unwrap();
199    /// ```
200    pub async fn exp(&self, query: &str, limit: Option<usize>, output_file: &mut File) -> Result<(), Error> {
201        let mut url = format!("{}/exp?query={}", self.url, query);
202
203        // Check all the optional arguments and add them to the URL
204        if let Some(l) = limit {
205            url += format!("&limit={}", l).as_str();
206        }
207
208        // Make the GET request
209        let res: String = self.client.get(url.as_str())
210            .send()
211            .await?
212            .text()
213            .await?;
214
215        // Try to write data to the file
216        output_file.write_all(res.as_bytes())?;
217
218        Ok(())
219    }
220}