1pub mod request {
2 #[derive(Debug)]
3 pub struct QueryRequestBuilder {
4 query_request: QueryRequest,
5 }
6
7 #[allow(dead_code)]
8 impl QueryRequestBuilder {
9 pub fn new(query: String) -> Self {
10 let query_request = QueryRequest::new(query);
11 QueryRequestBuilder { query_request }
12 }
13
14 pub fn max_results(mut self, max_results: i32) -> Self {
15 self.query_request.max_results = Some(max_results);
16 self
17 }
18
19 pub fn default_dataset(mut self, default_dataset: DatasetReference) -> Self {
20 self.query_request.default_dataset = Some(default_dataset);
21 self
22 }
23
24 pub fn dry_run(mut self) -> Self {
25 self.query_request.dry_run = true;
26 self
27 }
28
29 pub fn use_legacy_sql(mut self) -> Self {
30 self.query_request.use_legacy_sql = true;
31 self
32 }
33
34 pub fn create_session(mut self) -> Self {
35 self.query_request.create_session = true;
36 self
37 }
38
39 pub fn build(self) -> QueryRequest {
40 self.query_request
41 }
42 }
43
44 impl From<QueryRequestBuilder> for QueryRequest {
45 fn from(val: QueryRequestBuilder) -> Self {
46 val.build()
47 }
48 }
49
50 #[derive(Debug, serde::Deserialize, serde::Serialize)]
51 pub struct QueryRequest {
53 kind: Option<String>,
55 query: String,
56 max_results: Option<i32>,
57 default_dataset: Option<DatasetReference>,
58 timeout_ms: Option<i32>,
59 dry_run: bool,
60 preserve_nulls: bool,
62 use_query_cache: bool,
63 use_legacy_sql: bool,
64 parameter_mode: Option<String>,
65 query_parameters: Option<Vec<QueryParameter>>,
66 location: Option<String>,
67 format_options: Option<DataFormatOptions>,
68 connection_properties: Option<Vec<ConnectionProperty>>,
69 labels: Option<std::collections::HashMap<String, String>>,
70 maximum_bytes_billed: Option<String>,
76 request_id: Option<String>,
77 create_session: bool,
79 }
80
81 impl QueryRequest {
82 pub fn new(query: String) -> Self {
83 Self {
84 kind: None,
85 query: query.replace('\n', ""),
86 max_results: None,
87 default_dataset: None,
88 timeout_ms: None,
89 dry_run: false,
90 preserve_nulls: false,
91 use_query_cache: false,
92 use_legacy_sql: false,
93 parameter_mode: None,
94 query_parameters: None,
95 location: None,
96 format_options: None,
97 connection_properties: None,
98 labels: None,
99 maximum_bytes_billed: None,
100 request_id: None,
101 create_session: false,
102 }
103 }
104 }
105
106 #[derive(Debug, serde::Deserialize, serde::Serialize)]
107 pub struct DatasetReference {
108 dataset_id: String,
109 project_id: String,
110 }
111
112 #[derive(Debug, serde::Deserialize, serde::Serialize)]
113 pub struct QueryParameter {
114 name: String,
115 parameter_type: QueryParameterType,
116 parameter_value: QueryParameterValue,
117 }
118
119 #[derive(Debug, serde::Deserialize, serde::Serialize)]
120 pub struct QueryParameterType {
121 #[serde(rename = "type")]
122 type_: String,
123 array_type: Option<Box<QueryParameterType>>,
124 struct_types: Option<Vec<StructType>>,
125 }
126
127 #[derive(Debug, serde::Deserialize, serde::Serialize)]
128 pub struct StructType {
129 name: String,
130 type_: QueryParameterType,
131 description: String,
132 }
133
134 #[derive(Debug, serde::Deserialize, serde::Serialize)]
135 pub struct QueryParameterValue {
136 value: Option<String>,
137 array_values: Option<Vec<QueryParameterValue>>,
138 struct_values: Option<std::collections::HashMap<String, QueryParameterValue>>,
139 }
140
141 #[derive(Debug, serde::Deserialize, serde::Serialize)]
142 pub struct DataFormatOptions {
143 use_int64_timestamp: Option<bool>,
144 }
145
146 #[derive(Debug, serde::Deserialize, serde::Serialize)]
147 pub struct ConnectionProperty {
148 key: String,
149 value: String,
150 }
151}
152
153pub mod response {
154 use core::time;
155
156 #[derive(Debug, serde::Deserialize, serde::Serialize)]
157 #[serde(rename_all = "camelCase")]
158 pub struct QueryResponseDryRun {
159 pub job_complete: bool,
160 pub job_reference: Option<JobReference>,
161 pub kind: String,
162 pub schema: TableSchema,
163 pub total_bytes_processed: Option<String>,
164 }
165
166 #[derive(Debug, serde::Deserialize, serde::Serialize)]
204 #[serde(rename_all = "camelCase")]
205 pub struct QueryResponse {
206 pub kind: String,
207 pub etag: Option<String>,
208 pub schema: Option<TableSchema>,
209 pub job_reference: JobReference,
210 pub total_rows: Option<String>,
212 pub page_token: Option<String>,
213 #[serde(default)]
214 pub rows: Vec<serde_json::Value>,
215 pub total_bytes_processed: Option<String>,
216 pub job_complete: bool,
217 pub errors: Option<Vec<ErrorProto>>,
218 #[serde(default)]
219 pub cache_hit: bool,
220 pub num_dml_affected_rows: Option<String>,
221 }
222
223 pub fn retry<T>(handler: impl Fn() -> Option<T>, retries: Option<u32>) -> T {
224 let retries = retries.unwrap_or(0);
225
226 if retries > 10 {
227 panic!("exceeded retry limit");
228 }
229
230 let base_delay = time::Duration::from_millis(400);
231 let delay = base_delay * 2_u32.pow(retries);
232 std::thread::sleep(delay);
233
234 if let Some(result) = handler() {
235 return result;
236 }
237
238 retry(handler, Some(retries + 1))
239 }
240
241 impl QueryResponse {
242 pub fn retry(self, client: &crate::api::Client) -> Self {
243 if self.job_complete {
244 return self;
245 }
246
247 let Some(job_id) = &self.job_reference.job_id else {
248 panic!("no id found for incomplete job");
249 };
250
251 let handler = || {
252 let response = client.jobs_query_results(job_id, &self.job_reference.location);
253
254 if response.job_complete {
255 Some(response)
256 } else {
257 None
258 }
259 };
260
261 retry(handler, None)
262 }
263
264 fn csv_formatting_rules(mut row: String) -> String {
266 let mut add_quotes = row.contains([',', '\n']);
267
268 if row.contains('"') {
269 row = row.replace('"', "\"\"");
270 add_quotes = true;
271 }
272
273 if add_quotes {
274 row.insert(0, '"');
275 row.push('"');
276 }
277
278 row
279 }
280
281 pub fn into_csv(self) -> String {
282 let mut rows: Vec<String> = Vec::new();
283
284 if let Some(schema) = self.schema {
285 let header: Vec<String> = schema
286 .fields
287 .into_iter()
288 .map(|c| c.name)
289 .map(Self::csv_formatting_rules)
290 .collect();
291 rows.push(header.join(","));
292 }
293
294 let mut values: Vec<String> = self
295 .rows
296 .into_iter()
297 .filter_map(|v| match v["f"].clone() {
298 serde_json::Value::Array(a) => {
299 let row: Vec<String> = a
300 .into_iter()
301 .map(|v| match v["v"].clone() {
302 serde_json::Value::String(x) => x,
303 serde_json::Value::Bool(x) => x.to_string(),
304 serde_json::Value::Number(x) => x.to_string(),
305 serde_json::Value::Null => String::new(),
306 _ => String::new(),
307 })
310 .map(Self::csv_formatting_rules)
312 .collect();
313 Some(row.join(","))
314 }
315 _ => None,
316 })
317 .collect();
318
319 rows.append(values.as_mut());
320
321 rows.join("\n")
322 }
323
324 #[allow(dead_code)]
325 pub fn into_json(self) -> serde_json::Value {
329 let mut rows: Vec<serde_json::Value> = Vec::new();
330
331 if let Some(schema) = self.schema {
332 let header: Vec<serde_json::Value> = schema
333 .fields
334 .into_iter()
335 .map(|c| serde_json::Value::String(c.name))
336 .collect();
337
338 rows.push(serde_json::Value::Array(header));
339 }
340
341 let mut values: Vec<serde_json::Value> = self
342 .rows
343 .into_iter()
344 .filter_map(|v| match v["f"].clone() {
345 serde_json::Value::Array(a) => {
346 let row = a.into_iter().map(|v| v["v"].clone()).collect();
347 Some(serde_json::Value::Array(row))
348 }
349 _ => None,
350 })
351 .collect();
352
353 rows.append(values.as_mut());
354
355 serde_json::Value::Array(rows)
356 }
357 }
358
359 #[derive(Debug, serde::Deserialize, serde::Serialize)]
360 #[serde(rename_all = "camelCase")]
361 pub struct TableSchema {
362 pub fields: Vec<TableFieldSchema>,
363 }
364
365 #[derive(Debug, serde::Deserialize, serde::Serialize)]
366 #[serde(rename_all = "camelCase")]
367 pub struct TableFieldSchema {
368 pub name: String,
369 #[serde(rename = "type")]
370 pub field_type: String,
371 pub mode: String,
372 pub fields: Option<Vec<TableFieldSchema>>,
373 pub description: Option<String>,
374 pub policy_tags: Option<PolicyTags>,
375 pub max_length: Option<String>,
376 pub precision: Option<String>,
377 pub scale: Option<String>,
378 pub rounding_mode: Option<RoundingMode>,
379 pub collation: Option<String>,
380 pub default_value_expression: Option<String>,
381 }
382
383 #[derive(Debug, serde::Deserialize, serde::Serialize)]
384 #[serde(rename_all = "camelCase")]
385 pub struct PolicyTags {
386 pub names: Vec<String>,
387 }
388
389 #[derive(Debug, serde::Deserialize, serde::Serialize)]
390 #[serde(rename_all = "camelCase")]
391 #[allow(clippy::enum_variant_names)]
392 pub enum RoundingMode {
393 RoundingModeUnspecified,
394 RoundHalfAwayFromZero,
395 RoundHalfEven,
396 }
397
398 #[derive(Debug, serde::Deserialize, serde::Serialize)]
399 #[serde(rename_all = "camelCase")]
400 pub struct JobReference {
401 pub project_id: String,
402 pub job_id: Option<String>,
404 pub location: String,
405 }
406
407 #[derive(Debug, serde::Deserialize, serde::Serialize)]
408 #[serde(rename_all = "camelCase")]
409 pub struct ErrorProto {
410 pub reason: String,
411 pub location: String,
412 pub debug_info: String,
413 pub message: String,
414 }
415}