questdb/api.rs
1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use crate::error::SQLError;
4use crate::Error;
5use std::fs::File;
6use std::io::{Read, Write};
7use std::path::Path;
8use crate::types::{Atomicity, SchemaMap};
9
10pub struct QuestDB {
11 client: Client,
12 url: String,
13}
14
15impl QuestDB {
16 /// Creates a new connection to questdb
17 ///
18 /// # Example
19 /// ```
20 /// use questdb::QuestDB;
21 /// let connection = QuestDB::new("http://192.168.1.37:9000");
22 /// ```
23 pub fn new(url: &str) -> Self {
24 QuestDB {
25 client: Client::new(),
26 url: String::from(url),
27 }
28 }
29
30 /// Compiles and executes the SQL query supplied
31 ///
32 /// # Arguments
33 /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be
34 /// included.
35 /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X
36 /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row
37 /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is
38 /// equivalent to limit=0,20
39 /// * `count` - Instructs /exec to count rows and return this value in message header. Default
40 /// value is false. There is slight performance hit for requesting row count.
41 /// * `nm` - Skips metadata section of the response when true. When metadata is known and client
42 /// is paging this flag should typically be set to true to reduce response size. Default value
43 /// is false and metadata is included in the response.
44 ///
45 /// # Example
46 /// ```no-test
47 /// use questdb::QuestDB;
48 /// use serde::{Serialize, Deserialize};
49 ///
50 /// #[derive(Serialize, Deserialize, Debug)]
51 /// struct TestData {
52 /// id: i32,
53 /// ts: String,
54 /// temp: f64,
55 /// sensor_id: i32,
56 /// }
57 ///
58 /// let connection = QuestDB::new("http://192.168.1.37:9000");
59 /// let res = connection.exec::<TestData>("select * from readings", Some(5), None, None)
60 /// .await
61 /// .unwrap();
62 /// ```
63 pub async fn exec<T: DeserializeOwned>(&self, query: &str, limit: Option<usize>, count: Option<bool>, nm: Option<bool>)
64 -> Result<Vec<T>, crate::error::Error>
65 {
66 let mut url = format!("{}/exec?query={}", self.url, query);
67
68 // Check all the optional arguments and add them to the URL
69 if let Some(l) = limit {
70 url += format!("&limit={}", l).as_str();
71 }
72 if let Some(c) = count {
73 url += format!("&count={}", c).as_str();
74 }
75 if let Some(n) = nm {
76 url += format!("&nm={}", n).as_str();
77 }
78
79 let res = self.client.get(url.as_str())
80 .send()
81 .await?
82 .json::<serde_json::Value>()
83 .await?;
84
85 let deserialized = match res.get("dataset") {
86 Some(d) => d,
87 None => {
88 // The SQL failed, return an error with the error data
89 let e: SQLError = serde_json::from_value(res)?;
90 return Err(Error::SQLError(e));
91 },
92 }.to_owned();
93
94 let deserialized: Vec<T> = serde_json::from_value(deserialized)?;
95
96 Ok(deserialized)
97 }
98
99 /// The function `imp` streams tabular text data directly into a table. It supports CSV, TAB and
100 /// Pipe (|) delimited inputs and optional headers. There are no restrictions on data size. Data
101 /// type and structure is detected automatically and usually without additional configuration.
102 /// However in some cases additional configuration can be provided to augment automatic
103 /// detection results.
104 ///
105 /// # Arguments
106 /// * `file_path` - Path to the file that is going to be imported
107 /// * `schema` - List of columns and their types. This will overwrite the default unless the
108 /// table is already created.
109 /// * `table_name` - Name of the table where the data will be saved
110 /// * `overwrite` - Default value is false. Set it to true to have existing table deleted before
111 /// appending data.
112 /// * `durable` - When request is durable QuestDB will flush relevant disk cache before
113 /// responding. Default value is false
114 /// * `atomicity` - Available values are strict and relaxed. Default value is relaxed. When
115 /// atomicity is relaxed data rows that cannot be appended to table are discarded, thus
116 /// allowing partial uploads. In strict mode upload fails as soon as any data error is
117 /// encountered and all previously appended rows are rolled back.
118 ///
119 /// # Example
120 /// ```no-test
121 /// let connection = QuestDB::new("http://192.168.1.37:9000");
122 /// let schema = new_schema!(("movieId", Schema::String), ("imdbId", Schema::Int));
123 ///
124 /// let res = connection.imp("./test.csv", Some(schema), "nu_table123", None, None, None)
125 /// .await
126 /// .unwrap();
127 /// ```
128 pub async fn imp(&self, file_path: &'static str, schema: Option<Vec<SchemaMap>>, table_name: &'static str,
129 overwrite: Option<bool>, durable: Option<bool>, atomicity: Option<Atomicity>)
130 -> Result<(), crate::error::Error>
131 {
132 let mut form = reqwest::multipart::Form::new();
133 let mut url = format!("{}/imp?fmt=json&name={}", self.url, table_name);
134
135 // Check all the optional arguments and add them to the URL
136 if let Some(s) = schema {
137 let data = serde_json::to_string(&s).unwrap();
138 form = form.part("schema", reqwest::multipart::Part::text(data));
139 }
140 if let Some(o) = overwrite {
141 url += format!("&overwrite={}", o).as_str();
142 }
143 if let Some(d) = durable {
144 url += format!("&durable={}", d).as_str();
145 }
146 if let Some(a) = atomicity {
147 url += format!("&atomicity={}", a).as_str();
148 }
149
150 // Read the file as bytes
151 let filep = Path::new(file_path);
152 let mut file = File::open(&filep)?;
153 let mut file_bytes: Vec<u8> = Vec::new();
154 file.read_to_end(&mut file_bytes)?;
155
156 // Create a part with the file_name
157 let file_name = match filep.file_name() {
158 Some(name) => name.to_str().unwrap(),
159 None => filep.to_str().unwrap(),
160 };
161 let part = reqwest::multipart::Part::bytes(file_bytes)
162 .file_name(file_name);
163
164 // Create the form with the file part
165 form = form.part("data", part);
166
167 // Make the POST request
168 let _res = self.client.post(url.as_str())
169 .multipart(form)
170 .send()
171 .await?
172 .text()
173 .await?;
174
175 Ok(())
176 }
177
178 /// Exports the result of the query to a CSV file
179 ///
180 /// # Arguments
181 /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be
182 /// included.
183 /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X
184 /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row
185 /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is
186 /// equivalent to limit=0,20
187 ///
188 /// # Example
189 /// ```no-test
190 /// use questdb::QuestDB;
191 /// use std::fs::File;
192 ///
193 /// let connection = QuestDB::new("http://192.168.1.37:9000");
194 ///
195 /// let mut output_file = File::create("output.csv").unwrap();
196 /// let res = match connection.exp("select * from nu_table", Some(5), &mut output_file)
197 /// .await
198 /// .unwrap();
199 /// ```
200 pub async fn exp(&self, query: &str, limit: Option<usize>, output_file: &mut File) -> Result<(), Error> {
201 let mut url = format!("{}/exp?query={}", self.url, query);
202
203 // Check all the optional arguments and add them to the URL
204 if let Some(l) = limit {
205 url += format!("&limit={}", l).as_str();
206 }
207
208 // Make the GET request
209 let res: String = self.client.get(url.as_str())
210 .send()
211 .await?
212 .text()
213 .await?;
214
215 // Try to write data to the file
216 output_file.write_all(res.as_bytes())?;
217
218 Ok(())
219 }
220}