1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
use reqwest::Client; use serde::de::DeserializeOwned; use crate::error::SQLError; use crate::Error; use std::fs::File; use std::io::{Read, Write}; use std::path::Path; use crate::types::{Atomicity, SchemaMap}; pub struct QuestDB { client: Client, url: String, } impl QuestDB { /// Creates a new connection to questdb /// /// # Example /// ``` /// use questdb::QuestDB; /// let connection = QuestDB::new("http://192.168.1.37:9000"); /// ``` pub fn new(url: &str) -> Self { QuestDB { client: Client::new(), url: String::from(url), } } /// Compiles and executes the SQL query supplied /// /// # Arguments /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be /// included. /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is /// equivalent to limit=0,20 /// * `count` - Instructs /exec to count rows and return this value in message header. Default /// value is false. There is slight performance hit for requesting row count. /// * `nm` - Skips metadata section of the response when true. When metadata is known and client /// is paging this flag should typically be set to true to reduce response size. Default value /// is false and metadata is included in the response. /// /// # Example /// ```no-test /// use questdb::QuestDB; /// use serde::{Serialize, Deserialize}; /// /// #[derive(Serialize, Deserialize, Debug)] /// struct TestData { /// id: i32, /// ts: String, /// temp: f64, /// sensor_id: i32, /// } /// /// let connection = QuestDB::new("http://192.168.1.37:9000"); /// let res = connection.exec::<TestData>("select * from readings", Some(5), None, None) /// .await /// .unwrap(); /// ``` pub async fn exec<T: DeserializeOwned>(&self, query: &str, limit: Option<usize>, count: Option<bool>, nm: Option<bool>) -> Result<Vec<T>, crate::error::Error> { let mut url = format!("{}/exec?query={}", self.url, query); // Check all the optional arguments and add them to the URL if let Some(l) = limit { url += format!("&limit={}", l).as_str(); } if let Some(c) = count { url += format!("&count={}", c).as_str(); } if let Some(n) = nm { url += format!("&nm={}", n).as_str(); } let res = self.client.get(url.as_str()) .send() .await? .json::<serde_json::Value>() .await?; let deserialized = match res.get("dataset") { Some(d) => d, None => { // The SQL failed, return an error with the error data let e: SQLError = serde_json::from_value(res)?; return Err(Error::SQLError(e)); }, }.to_owned(); let deserialized: Vec<T> = serde_json::from_value(deserialized)?; Ok(deserialized) } /// The function `imp` streams tabular text data directly into a table. It supports CSV, TAB and /// Pipe (|) delimited inputs and optional headers. There are no restrictions on data size. Data /// type and structure is detected automatically and usually without additional configuration. /// However in some cases additional configuration can be provided to augment automatic /// detection results. /// /// # Arguments /// * `file_path` - Path to the file that is going to be imported /// * `schema` - List of columns and their types. This will overwrite the default unless the /// table is already created. /// * `table_name` - Name of the table where the data will be saved /// * `overwrite` - Default value is false. Set it to true to have existing table deleted before /// appending data. /// * `durable` - When request is durable QuestDB will flush relevant disk cache before /// responding. Default value is false /// * `atomicity` - Available values are strict and relaxed. Default value is relaxed. When /// atomicity is relaxed data rows that cannot be appended to table are discarded, thus /// allowing partial uploads. In strict mode upload fails as soon as any data error is /// encountered and all previously appended rows are rolled back. /// /// # Example /// ```no-test /// let connection = QuestDB::new("http://192.168.1.37:9000"); /// let schema = new_schema!(("movieId", Schema::String), ("imdbId", Schema::Int)); /// /// let res = connection.imp("./test.csv", Some(schema), "nu_table123", None, None, None) /// .await /// .unwrap(); /// ``` pub async fn imp(&self, file_path: &'static str, schema: Option<Vec<SchemaMap>>, table_name: &'static str, overwrite: Option<bool>, durable: Option<bool>, atomicity: Option<Atomicity>) -> Result<(), crate::error::Error> { let mut form = reqwest::multipart::Form::new(); let mut url = format!("{}/imp?fmt=json&name={}", self.url, table_name); // Check all the optional arguments and add them to the URL if let Some(s) = schema { let data = serde_json::to_string(&s).unwrap(); form = form.part("schema", reqwest::multipart::Part::text(data)); } if let Some(o) = overwrite { url += format!("&overwrite={}", o).as_str(); } if let Some(d) = durable { url += format!("&durable={}", d).as_str(); } if let Some(a) = atomicity { url += format!("&atomicity={}", a).as_str(); } // Read the file as bytes let filep = Path::new(file_path); let mut file = File::open(&filep)?; let mut file_bytes: Vec<u8> = Vec::new(); file.read_to_end(&mut file_bytes)?; // Create a part with the file_name let file_name = match filep.file_name() { Some(name) => name.to_str().unwrap(), None => filep.to_str().unwrap(), }; let part = reqwest::multipart::Part::bytes(file_bytes) .file_name(file_name); // Create the form with the file part form = form.part("data", part); // Make the POST request let _res = self.client.post(url.as_str()) .multipart(form) .send() .await? .text() .await?; Ok(()) } /// Exports the result of the query to a CSV file /// /// # Arguments /// * `query` - query text. It can be multi-line, but query separator, such as ; must not be /// included. /// * `limit` - This argument is used for paging. Limit can be either in format of X, Y where X /// is the lower limit and Y is the upper, or just Y. For example, limit=10,20 will return row /// numbers 10 thru to 20 inclusive. and limit=20 will return first 20 rows, which is /// equivalent to limit=0,20 /// /// # Example /// ```no-test /// use questdb::QuestDB; /// use std::fs::File; /// /// let connection = QuestDB::new("http://192.168.1.37:9000"); /// /// let mut output_file = File::create("output.csv").unwrap(); /// let res = match connection.exp("select * from nu_table", Some(5), &mut output_file) /// .await /// .unwrap(); /// ``` pub async fn exp(&self, query: &str, limit: Option<usize>, output_file: &mut File) -> Result<(), Error> { let mut url = format!("{}/exp?query={}", self.url, query); // Check all the optional arguments and add them to the URL if let Some(l) = limit { url += format!("&limit={}", l).as_str(); } // Make the GET request let res: String = self.client.get(url.as_str()) .send() .await? .text() .await?; // Try to write data to the file output_file.write_all(res.as_bytes())?; Ok(()) } }