bq_rs/
query.rs

1pub mod request {
2    #[derive(Debug)]
3    pub struct QueryRequestBuilder {
4        query_request: QueryRequest,
5    }
6
7    #[allow(dead_code)]
8    impl QueryRequestBuilder {
9        pub fn new(query: String) -> Self {
10            let query_request = QueryRequest::new(query);
11            QueryRequestBuilder { query_request }
12        }
13
14        pub fn max_results(mut self, max_results: i32) -> Self {
15            self.query_request.max_results = Some(max_results);
16            self
17        }
18
19        pub fn default_dataset(mut self, default_dataset: DatasetReference) -> Self {
20            self.query_request.default_dataset = Some(default_dataset);
21            self
22        }
23
24        pub fn dry_run(mut self) -> Self {
25            self.query_request.dry_run = true;
26            self
27        }
28
29        pub fn use_legacy_sql(mut self) -> Self {
30            self.query_request.use_legacy_sql = true;
31            self
32        }
33
34        pub fn create_session(mut self) -> Self {
35            self.query_request.create_session = true;
36            self
37        }
38
39        pub fn build(self) -> QueryRequest {
40            self.query_request
41        }
42    }
43
44    impl From<QueryRequestBuilder> for QueryRequest {
45        fn from(val: QueryRequestBuilder) -> Self {
46            val.build()
47        }
48    }
49
50    #[derive(Debug, serde::Deserialize, serde::Serialize)]
51    /// <https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#queryrequest>
52    pub struct QueryRequest {
53        /// deprecated
54        kind: Option<String>,
55        query: String,
56        max_results: Option<i32>,
57        default_dataset: Option<DatasetReference>,
58        timeout_ms: Option<i32>,
59        dry_run: bool,
60        /// deprecated
61        preserve_nulls: bool,
62        use_query_cache: bool,
63        use_legacy_sql: bool,
64        parameter_mode: Option<String>,
65        query_parameters: Option<Vec<QueryParameter>>,
66        location: Option<String>,
67        format_options: Option<DataFormatOptions>,
68        connection_properties: Option<Vec<ConnectionProperty>>,
69        labels: Option<std::collections::HashMap<String, String>>,
70        /// Optional. Limits the bytes billed for this query.
71        ///
72        /// Queries with bytes billed above this limit will fail (without incurring a charge).
73        ///
74        /// If unspecified, the project default is used.
75        maximum_bytes_billed: Option<String>,
76        request_id: Option<String>,
77        /// <https://cloud.google.com/bigquery/docs/sessions-create>
78        create_session: bool,
79    }
80
81    impl QueryRequest {
82        pub fn new(query: String) -> Self {
83            Self {
84                kind: None,
85                query: query.replace('\n', ""),
86                max_results: None,
87                default_dataset: None,
88                timeout_ms: None,
89                dry_run: false,
90                preserve_nulls: false,
91                use_query_cache: false,
92                use_legacy_sql: false,
93                parameter_mode: None,
94                query_parameters: None,
95                location: None,
96                format_options: None,
97                connection_properties: None,
98                labels: None,
99                maximum_bytes_billed: None,
100                request_id: None,
101                create_session: false,
102            }
103        }
104    }
105
106    #[derive(Debug, serde::Deserialize, serde::Serialize)]
107    pub struct DatasetReference {
108        dataset_id: String,
109        project_id: String,
110    }
111
112    #[derive(Debug, serde::Deserialize, serde::Serialize)]
113    pub struct QueryParameter {
114        name: String,
115        parameter_type: QueryParameterType,
116        parameter_value: QueryParameterValue,
117    }
118
119    #[derive(Debug, serde::Deserialize, serde::Serialize)]
120    pub struct QueryParameterType {
121        #[serde(rename = "type")]
122        type_: String,
123        array_type: Option<Box<QueryParameterType>>,
124        struct_types: Option<Vec<StructType>>,
125    }
126
127    #[derive(Debug, serde::Deserialize, serde::Serialize)]
128    pub struct StructType {
129        name: String,
130        type_: QueryParameterType,
131        description: String,
132    }
133
134    #[derive(Debug, serde::Deserialize, serde::Serialize)]
135    pub struct QueryParameterValue {
136        value: Option<String>,
137        array_values: Option<Vec<QueryParameterValue>>,
138        struct_values: Option<std::collections::HashMap<String, QueryParameterValue>>,
139    }
140
141    #[derive(Debug, serde::Deserialize, serde::Serialize)]
142    pub struct DataFormatOptions {
143        use_int64_timestamp: Option<bool>,
144    }
145
146    #[derive(Debug, serde::Deserialize, serde::Serialize)]
147    pub struct ConnectionProperty {
148        key: String,
149        value: String,
150    }
151}
152
153pub mod response {
154    use core::time;
155
156    #[derive(Debug, serde::Deserialize, serde::Serialize)]
157    #[serde(rename_all = "camelCase")]
158    pub struct QueryResponseDryRun {
159        pub job_complete: bool,
160        pub job_reference: Option<JobReference>,
161        pub kind: String,
162        pub schema: TableSchema,
163        pub total_bytes_processed: Option<String>,
164    }
165
166    /* #[derive(Debug, serde::Deserialize, serde::Serialize)]
167    #[serde(crate = "rocket::serde")]
168    #[serde(rename_all = "camelCase")]
169    struct QueryResponse {
170        kind: String,
171        schema: TableSchema,
172        job_reference: JobReference,
173        total_rows: String,
174        page_token: String,
175        rows: Vec<serde_json::Value>,
176        total_bytes_processed: String,
177        job_complete: bool,
178        errors: Vec<ErrorProto>,
179        cache_hit: bool,
180        num_dml_affected_rows: String,
181        session_info: SessionInfo,
182        dml_stats: DmlStats,
183    }
184
185    #[derive(Debug, serde::Deserialize, serde::Serialize)]
186    #[serde(crate = "rocket::serde")]
187    #[serde(rename_all = "camelCase")]
188    struct QueryResultsResponse {
189        kind: String,
190        etag: String,
191        schema: TableSchema,
192        job_reference: JobReference,
193        total_rows: String,
194        page_token: String,
195        rows: Vec<serde_json::Value>,
196        total_bytes_processed: String,
197        job_complete: bool,
198        errors: Vec<ErrorProto>,
199        cache_hit: bool,
200        num_dml_affected_rows: String,
201    } */
202
203    #[derive(Debug, serde::Deserialize, serde::Serialize)]
204    #[serde(rename_all = "camelCase")]
205    pub struct QueryResponse {
206        pub kind: String,
207        pub etag: Option<String>,
208        pub schema: Option<TableSchema>,
209        pub job_reference: JobReference,
210        /// dry runs do not have total rows
211        pub total_rows: Option<String>,
212        pub page_token: Option<String>,
213        #[serde(default)]
214        pub rows: Vec<serde_json::Value>,
215        pub total_bytes_processed: Option<String>,
216        pub job_complete: bool,
217        pub errors: Option<Vec<ErrorProto>>,
218        #[serde(default)]
219        pub cache_hit: bool,
220        pub num_dml_affected_rows: Option<String>,
221    }
222
223    pub fn retry<T>(handler: impl Fn() -> Option<T>, retries: Option<u32>) -> T {
224        let retries = retries.unwrap_or(0);
225
226        if retries > 10 {
227            panic!("exceeded retry limit");
228        }
229
230        let base_delay = time::Duration::from_millis(400);
231        let delay = base_delay * 2_u32.pow(retries);
232        std::thread::sleep(delay);
233
234        if let Some(result) = handler() {
235            return result;
236        }
237
238        retry(handler, Some(retries + 1))
239    }
240
241    impl QueryResponse {
242        pub fn retry(self, client: &crate::api::Client) -> Self {
243            if self.job_complete {
244                return self;
245            }
246
247            let Some(job_id) = &self.job_reference.job_id else {
248                panic!("no id found for incomplete job");
249            };
250
251            let handler = || {
252                let response = client.jobs_query_results(job_id, &self.job_reference.location);
253
254                if response.job_complete {
255                    Some(response)
256                } else {
257                    None
258                }
259            };
260
261            retry(handler, None)
262        }
263
264        /// follow proper csv convention: https://stackoverflow.com/a/769820
265        fn csv_formatting_rules(mut row: String) -> String {
266            let mut add_quotes = row.contains([',', '\n']);
267
268            if row.contains('"') {
269                row = row.replace('"', "\"\"");
270                add_quotes = true;
271            }
272
273            if add_quotes {
274                row.insert(0, '"');
275                row.push('"');
276            }
277
278            row
279        }
280
281        pub fn into_csv(self) -> String {
282            let mut rows: Vec<String> = Vec::new();
283
284            if let Some(schema) = self.schema {
285                let header: Vec<String> = schema
286                    .fields
287                    .into_iter()
288                    .map(|c| c.name)
289                    .map(Self::csv_formatting_rules)
290                    .collect();
291                rows.push(header.join(","));
292            }
293
294            let mut values: Vec<String> = self
295                .rows
296                .into_iter()
297                .filter_map(|v| match v["f"].clone() {
298                    serde_json::Value::Array(a) => {
299                        let row: Vec<String> = a
300                            .into_iter()
301                            .map(|v| match v["v"].clone() {
302                                serde_json::Value::String(x) => x,
303                                serde_json::Value::Bool(x) => x.to_string(),
304                                serde_json::Value::Number(x) => x.to_string(),
305                                serde_json::Value::Null => String::new(),
306                                _ => String::new(),
307                                //serde_json::Value::Array(_) => todo!(),
308                                //serde_json::Value::Object(_) => todo!(),
309                            })
310                            // surround values with double quotes
311                            .map(Self::csv_formatting_rules)
312                            .collect();
313                        Some(row.join(","))
314                    }
315                    _ => None,
316                })
317                .collect();
318
319            rows.append(values.as_mut());
320
321            rows.join("\n")
322        }
323
324        #[allow(dead_code)]
325        /// this needs works
326        /// it is not outputting proper json format
327        /// it needs to convert from google bigqquery protobuf
328        pub fn into_json(self) -> serde_json::Value {
329            let mut rows: Vec<serde_json::Value> = Vec::new();
330
331            if let Some(schema) = self.schema {
332                let header: Vec<serde_json::Value> = schema
333                    .fields
334                    .into_iter()
335                    .map(|c| serde_json::Value::String(c.name))
336                    .collect();
337
338                rows.push(serde_json::Value::Array(header));
339            }
340
341            let mut values: Vec<serde_json::Value> = self
342                .rows
343                .into_iter()
344                .filter_map(|v| match v["f"].clone() {
345                    serde_json::Value::Array(a) => {
346                        let row = a.into_iter().map(|v| v["v"].clone()).collect();
347                        Some(serde_json::Value::Array(row))
348                    }
349                    _ => None,
350                })
351                .collect();
352
353            rows.append(values.as_mut());
354
355            serde_json::Value::Array(rows)
356        }
357    }
358
359    #[derive(Debug, serde::Deserialize, serde::Serialize)]
360    #[serde(rename_all = "camelCase")]
361    pub struct TableSchema {
362        pub fields: Vec<TableFieldSchema>,
363    }
364
365    #[derive(Debug, serde::Deserialize, serde::Serialize)]
366    #[serde(rename_all = "camelCase")]
367    pub struct TableFieldSchema {
368        pub name: String,
369        #[serde(rename = "type")]
370        pub field_type: String,
371        pub mode: String,
372        pub fields: Option<Vec<TableFieldSchema>>,
373        pub description: Option<String>,
374        pub policy_tags: Option<PolicyTags>,
375        pub max_length: Option<String>,
376        pub precision: Option<String>,
377        pub scale: Option<String>,
378        pub rounding_mode: Option<RoundingMode>,
379        pub collation: Option<String>,
380        pub default_value_expression: Option<String>,
381    }
382
383    #[derive(Debug, serde::Deserialize, serde::Serialize)]
384    #[serde(rename_all = "camelCase")]
385    pub struct PolicyTags {
386        pub names: Vec<String>,
387    }
388
389    #[derive(Debug, serde::Deserialize, serde::Serialize)]
390    #[serde(rename_all = "camelCase")]
391    #[allow(clippy::enum_variant_names)]
392    pub enum RoundingMode {
393        RoundingModeUnspecified,
394        RoundHalfAwayFromZero,
395        RoundHalfEven,
396    }
397
398    #[derive(Debug, serde::Deserialize, serde::Serialize)]
399    #[serde(rename_all = "camelCase")]
400    pub struct JobReference {
401        pub project_id: String,
402        /// dry runs do not contain a `job_id`
403        pub job_id: Option<String>,
404        pub location: String,
405    }
406
407    #[derive(Debug, serde::Deserialize, serde::Serialize)]
408    #[serde(rename_all = "camelCase")]
409    pub struct ErrorProto {
410        pub reason: String,
411        pub location: String,
412        pub debug_info: String,
413        pub message: String,
414    }
415}