gcp_bigquery_client/
job.rs

1//! Manage BigQuery jobs.
2use std::sync::Arc;
3
4use async_stream::stream;
5use reqwest::Client;
6use tokio_stream::Stream;
7
8use crate::auth::Authenticator;
9use crate::error::BQError;
10use crate::model::get_query_results_parameters::GetQueryResultsParameters;
11use crate::model::get_query_results_response::GetQueryResultsResponse;
12use crate::model::job::Job;
13use crate::model::job_cancel_response::JobCancelResponse;
14use crate::model::job_configuration::JobConfiguration;
15use crate::model::job_configuration_query::JobConfigurationQuery;
16use crate::model::job_list::JobList;
17use crate::model::job_list_parameters::JobListParameters;
18use crate::model::job_reference::JobReference;
19use crate::model::query_request::QueryRequest;
20use crate::model::query_response::QueryResponse;
21use crate::model::table_row::TableRow;
22use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
23
24/// A job API handler.
25#[derive(Clone)]
26pub struct JobApi {
27    client: Client,
28    auth: Arc<dyn Authenticator>,
29    base_url: String,
30}
31
32impl JobApi {
33    pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
34        Self {
35            client,
36            auth,
37            base_url: BIG_QUERY_V2_URL.to_string(),
38        }
39    }
40
41    pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
42        self.base_url = base_url;
43        self
44    }
45
46    /// Runs a BigQuery SQL query synchronously and returns query results if the query completes within a specified
47    /// timeout.
48    /// # Arguments
49    /// * `project_id` - Project ID of the query request.
50    /// * `query_request` - The request body contains an instance of QueryRequest.
51    pub async fn query(&self, project_id: &str, query_request: QueryRequest) -> Result<QueryResponse, BQError> {
52        let req_url = format!(
53            "{base_url}/projects/{project_id}/queries",
54            base_url = self.base_url,
55            project_id = urlencode(project_id)
56        );
57
58        let access_token = self.auth.access_token().await?;
59
60        let request = self
61            .client
62            .post(req_url.as_str())
63            .bearer_auth(access_token)
64            .json(&query_request)
65            .build()?;
66
67        let resp = self.client.execute(request).await?;
68
69        let query_response: QueryResponse = process_response(resp).await?;
70        Ok(query_response)
71    }
72
73    /// Runs a BigQuery SQL query, paginating through all the results synchronously.
74    /// # Arguments
75    /// * `project_id`- Project ID of the query request.
76    /// * `query` - The initial query configuration that is submitted when the job is inserted.
77    /// * `page_size` - The size of each page fetched. By default, this is set to `None`, and the limit is 10 MB of
78    ///   rows instead.
79    pub fn query_all<'a>(
80        &'a self,
81        project_id: &'a str,
82        query: JobConfigurationQuery,
83        page_size: Option<i32>,
84    ) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
85        stream! {
86            let job = Job {
87                configuration: Some(JobConfiguration {
88                    dry_run: Some(false),
89                    query:   Some(query),
90                    ..Default::default()
91                }),
92                ..Default::default()
93            };
94
95            let job = self.insert(project_id, job).await?;
96
97            if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
98                let mut page_token: Option<String> = None;
99                loop {
100                    let qr = self
101                        .get_query_results(
102                            project_id,
103                            job_id,
104                            GetQueryResultsParameters {
105                                page_token: page_token.clone(),
106                                max_results: page_size,
107                                ..Default::default()
108                            },
109                        )
110                        .await?;
111
112                    // Waiting for the job to be completed.
113                    if !qr.job_complete.unwrap_or(false) {
114                        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
115                        continue;
116                    }
117
118                    // Rows is present when the query finishes successfully.
119                    // Rows is empty when query result is empty.
120                    yield Ok(qr.rows.unwrap_or_default());
121
122                    page_token = match qr.page_token {
123                        None => break,
124                        f => f,
125                    };
126                }
127            }
128        }
129    }
130
131    /// Runs a BigQuery SQL query, paginating through all the results synchronously.
132    /// Use this function when location of the job differs from the default value (US)
133    /// # Arguments
134    /// * `project_id`- Project ID of the query request.
135    /// * `location`  - Geographic location of the job.
136    /// * `query` - The initial query configuration that is submitted when the job is inserted.
137    /// * `page_size` - The size of each page fetched. By default, this is set to `None`, and the limit is 10 MB of
138    ///   rows instead.
139    pub fn query_all_with_location<'a>(
140        &'a self,
141        project_id: &'a str,
142        location: &'a str,
143        query: JobConfigurationQuery,
144        page_size: Option<i32>,
145    ) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
146        stream! {
147            let job = Job {
148                configuration: Some(JobConfiguration {
149                    dry_run: Some(false),
150                    query:   Some(query),
151                    ..Default::default()
152                }),
153                job_reference: Some(JobReference {
154                    location:   Some(location.to_string()),
155                    project_id: Some(project_id.to_string()),
156                    ..Default::default()
157                }),
158                ..Default::default()
159            };
160
161            let job = self.insert(project_id, job).await?;
162
163            if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
164                let mut page_token: Option<String> = None;
165                loop {
166                    let qr = self
167                        .get_query_results(
168                            project_id,
169                            job_id,
170                            GetQueryResultsParameters {
171                                page_token: page_token.clone(),
172                                max_results: page_size,
173                                location:    Some(location.to_string()),
174                                ..Default::default()
175                            },
176                        )
177                        .await?;
178
179                        // Waiting for completed the job.
180                        if !qr.job_complete.unwrap_or(false) {
181                            tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
182
183                            continue;
184                        }
185
186
187                    // Rows is present when the query finishes successfully.
188                    // Rows be empty when query result is empty.
189                    yield Ok(qr.rows.unwrap_or_default());
190
191                    page_token = match qr.page_token {
192                        None => break,
193                        f => f,
194                    };
195                }
196            }
197        }
198    }
199
200    /// Runs a BigQuery SQL query, paginating through all the results synchronously.
201    /// Use this function when you need to have your job with non-default location, project_id & job_id values
202    /// # Arguments
203    /// * `project_id`- Project ID of the query request.
204    /// * `job_reference` - The initital job reference configuration that is submitted when the job is inserted
205    /// * `query` - The initial query configuration that is submitted when the job is inserted.
206    /// * `page_size` - The size of each page fetched. By default, this is set to `None`, and the limit is 10 MB of
207    ///   rows instead.
208    pub fn query_all_with_job_reference<'a>(
209        &'a self,
210        project_id: &'a str,
211        job_reference: JobReference,
212        query: JobConfigurationQuery,
213        page_size: Option<i32>,
214    ) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
215        stream! {
216            let location = job_reference.location.as_ref().cloned();
217
218            let job = Job {
219                configuration: Some(JobConfiguration {
220                    dry_run: Some(false),
221                    query:   Some(query),
222                    ..Default::default()
223                }),
224                job_reference: Some(job_reference),
225                ..Default::default()
226            };
227
228            let job = self.insert(project_id, job).await?;
229
230            if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
231                let mut page_token: Option<String> = None;
232                loop {
233                    let gqrp = GetQueryResultsParameters {
234                                page_token,
235                                max_results: page_size,
236                                location:    location.clone(),
237                                ..Default::default()
238                            };
239                    let qr = self
240                        .get_query_results(
241                            project_id,
242                            job_id,
243                            gqrp,
244                        )
245                        .await?;
246
247                    // Rows is present when the query finishes successfully.
248                    yield Ok(qr.rows.expect("Rows are not present"));
249
250                    page_token = match qr.page_token {
251                        None => break,
252                        f => f,
253                    };
254                }
255            }
256        }
257    }
258
259    /// Starts a new asynchronous job.
260    /// # Arguments
261    /// * `project_id` - Project ID of project that will be billed for the job.
262    /// * `job` - The request body contains an instance of Job.
263    pub async fn insert(&self, project_id: &str, job: Job) -> Result<Job, BQError> {
264        let req_url = format!(
265            "{base_url}/projects/{project_id}/jobs",
266            base_url = self.base_url,
267            project_id = urlencode(project_id)
268        );
269
270        let access_token = self.auth.access_token().await?;
271
272        let request = self
273            .client
274            .post(req_url.as_str())
275            .bearer_auth(access_token)
276            .json(&job)
277            .build()?;
278
279        let resp = self.client.execute(request).await?;
280
281        process_response(resp).await
282    }
283
284    /// Lists all jobs that you started in the specified project. Job information is available for a six month period
285    /// after creation. The job list is sorted in reverse chronological order, by job creation time. Requires the Can
286    /// View project role, or the Is Owner project role if you set the allUsers property.
287    /// # Arguments
288    /// * `project_id` - Project ID of the jobs to list.
289    pub async fn list(&self, project_id: &str) -> Result<JobList, BQError> {
290        let req_url = format!(
291            "{base_url}/projects/{project_id}/jobs",
292            base_url = self.base_url,
293            project_id = urlencode(project_id)
294        );
295
296        let access_token = self.auth.access_token().await?;
297
298        let request = self.client.get(req_url.as_str()).bearer_auth(access_token).build()?;
299
300        let resp = self.client.execute(request).await?;
301
302        process_response(resp).await
303    }
304
305    /// Lists all jobs that you started in the specified project paginating through all the results synchronously.
306    /// Job information is available for a six month period after creation.
307    /// The job list is sorted in reverse chronological order, by job creation time. Requires the Can
308    /// View project role, or the Is Owner project role if you set the allUsers property.
309    /// # Arguments
310    /// * `project_id` - Project ID of the jobs to list.
311    /// * `parameters` - The query parameters for jobs.list.
312    pub fn get_job_list<'a>(
313        &'a self,
314        project_id: &'a str,
315        parameters: Option<JobListParameters>,
316    ) -> impl Stream<Item = Result<JobList, BQError>> + 'a {
317        stream! {
318            let req_url = format!(
319                "{base_url}/projects/{project_id}/jobs",
320                base_url = self.base_url,
321                project_id = urlencode(project_id),
322                );
323
324            let mut params = parameters.unwrap_or_default();
325            params.page_token = None;
326
327            loop {
328                let mut request_builder = self.client.get(req_url.as_str());
329
330                request_builder = request_builder.query(&params);
331
332                let access_token = self.auth.access_token().await?;
333                let request = request_builder.bearer_auth(access_token).build()?;
334
335                let resp = self.client.execute(request).await?;
336
337                let process_resp: Result<JobList, BQError> = process_response(resp).await;
338
339                yield match process_resp {
340                    Err(e) => {params.page_token=None; Err(e)},
341                    Ok(job_list) => {params.page_token.clone_from(&job_list.next_page_token); Ok(job_list.clone())}
342                };
343
344                if params.page_token.is_none() {
345                    break;
346                }
347            }
348        }
349    }
350
351    /// RPC to get the results of a query job.
352    /// # Arguments
353    /// * `project_id` - Project ID of the query request.
354    /// * `job_id` - Job ID of the query job.
355    /// * `parameters` - The query parameters for jobs.getQueryResults.
356    pub async fn get_query_results(
357        &self,
358        project_id: &str,
359        job_id: &str,
360        parameters: GetQueryResultsParameters,
361    ) -> Result<GetQueryResultsResponse, BQError> {
362        let req_url = format!(
363            "{base_url}/projects/{project_id}/queries/{job_id}",
364            base_url = self.base_url,
365            project_id = urlencode(project_id),
366            job_id = urlencode(job_id),
367        );
368
369        let access_token = self.auth.access_token().await?;
370
371        let request = self
372            .client
373            .get(req_url.as_str())
374            .query(&parameters)
375            .bearer_auth(access_token)
376            .build()?;
377
378        let resp = self.client.execute(request).await?;
379
380        let get_query_results_response: GetQueryResultsResponse = process_response(resp).await?;
381        Ok(get_query_results_response)
382    }
383
384    /// Returns information about a specific job. Job information is available for a six month
385    /// period after creation. Requires that you're the person who ran the job, or have the Is
386    /// Owner project role.
387    /// # Arguments
388    /// * `project_id` - Project ID of the requested job.
389    /// * `job_id` - Job ID of the requested job.
390    /// * `location` - The geographic location of the job. Required except for US and EU. See
391    ///   details at https://cloud.google.com/bigquery/docs/locations#specifying_your_location.
392    pub async fn get_job(&self, project_id: &str, job_id: &str, location: Option<&str>) -> Result<Job, BQError> {
393        let req_url = format!(
394            "{base_url}/projects/{project_id}/jobs/{job_id}",
395            base_url = self.base_url,
396            project_id = urlencode(project_id),
397            job_id = urlencode(job_id),
398        );
399
400        let mut request_builder = self.client.get(req_url.as_str());
401
402        if let Some(location) = location {
403            request_builder = request_builder.query(&[("location", location)]);
404        }
405
406        let access_token = self.auth.access_token().await?;
407        let request = request_builder.bearer_auth(access_token).build()?;
408
409        let resp = self.client.execute(request).await?;
410
411        process_response(resp).await
412    }
413
414    /// Requests that a job be cancelled. This call will return immediately, and the client will
415    /// need to poll for the job status to see if the cancel completed successfully. Cancelled jobs
416    /// may still incur costs.
417    /// # Arguments
418    /// * `project_id` - Project ID of the job to cancel.
419    /// * `job_id` - Job ID of the job to cancel.
420    /// * `location` - The geographic location of the job. Required except for US and EU. See
421    ///   details at https://cloud.google.com/bigquery/docs/locations#specifying_your_location.
422    pub async fn cancel_job(
423        &self,
424        project_id: &str,
425        job_id: &str,
426        location: Option<&str>,
427    ) -> Result<JobCancelResponse, BQError> {
428        let req_url = format!(
429            "{base_url}/projects/{project_id}/jobs/{job_id}/cancel",
430            base_url = self.base_url,
431            project_id = urlencode(project_id),
432            job_id = urlencode(job_id),
433        );
434
435        let mut request_builder = self.client.post(req_url.as_str());
436
437        if let Some(location) = location {
438            request_builder = request_builder.query(&[("location", location)]);
439        }
440
441        let access_token = self.auth.access_token().await?;
442
443        let request = request_builder.bearer_auth(access_token).build()?;
444
445        let resp = self.client.execute(request).await?;
446
447        process_response(resp).await
448    }
449}
450
451#[cfg(test)]
452mod test {
453    use serde::Serialize;
454    use tokio_stream::StreamExt;
455
456    use crate::error::BQError;
457    use crate::model::dataset::Dataset;
458    use crate::model::field_type::serialize_json_as_string;
459    use crate::model::job_configuration_query::JobConfigurationQuery;
460    use crate::model::job_reference::JobReference;
461    use crate::model::query_parameter::QueryParameter;
462    use crate::model::query_parameter_type::QueryParameterType;
463    use crate::model::query_parameter_value::QueryParameterValue;
464    use crate::model::query_request::QueryRequest;
465    use crate::model::query_response::{QueryResponse, ResultSet};
466    use crate::model::table::Table;
467    use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
468    use crate::model::table_field_schema::TableFieldSchema;
469    use crate::model::table_schema::TableSchema;
470    use crate::{env_vars, Client};
471
472    #[derive(Serialize)]
473    struct MyRow {
474        int_value: i64,
475        float_value: f64,
476        bool_value: bool,
477        string_value: String,
478        record_value: FirstRecordLevel,
479        // Serialized as string but deserialized as serde json value.
480        #[serde(serialize_with = "serialize_json_as_string")]
481        json_value: serde_json::value::Value,
482    }
483
484    #[derive(Serialize)]
485    struct FirstRecordLevel {
486        int_value: i64,
487        string_value: String,
488        record_value: SecondRecordLevel,
489    }
490
491    #[derive(Serialize)]
492    struct SecondRecordLevel {
493        int_value: i64,
494        string_value: String,
495    }
496
497    #[tokio::test]
498    async fn test() -> Result<(), BQError> {
499        let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
500        let dataset_id = &format!("{dataset_id}_job");
501
502        let client = Client::from_service_account_key_file(sa_key).await?;
503
504        client.table().delete_if_exists(project_id, dataset_id, table_id).await;
505        client.dataset().delete_if_exists(project_id, dataset_id, true).await;
506
507        // Create dataset
508        let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
509        assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
510
511        // Create table
512        let table = Table::new(
513            project_id,
514            dataset_id,
515            table_id,
516            TableSchema::new(vec![
517                TableFieldSchema::integer("int_value"),
518                TableFieldSchema::float("float_value"),
519                TableFieldSchema::bool("bool_value"),
520                TableFieldSchema::string("string_value"),
521                TableFieldSchema::record(
522                    "record_value",
523                    vec![
524                        TableFieldSchema::integer("int_value"),
525                        TableFieldSchema::string("string_value"),
526                        TableFieldSchema::record(
527                            "record_value",
528                            vec![
529                                TableFieldSchema::integer("int_value"),
530                                TableFieldSchema::string("string_value"),
531                            ],
532                        ),
533                    ],
534                ),
535                TableFieldSchema::json("json_value"),
536            ]),
537        );
538
539        let created_table = client.table().create(table).await?;
540        assert_eq!(created_table.table_reference.table_id, table_id.to_string());
541
542        // Insert data
543        let mut insert_request = TableDataInsertAllRequest::new();
544        insert_request.add_row(
545            None,
546            MyRow {
547                int_value: 1,
548                float_value: 1.0,
549                bool_value: false,
550                string_value: "first".into(),
551                record_value: FirstRecordLevel {
552                    int_value: 10,
553                    string_value: "sub_level_1.1".into(),
554                    record_value: SecondRecordLevel {
555                        int_value: 20,
556                        string_value: "leaf".to_string(),
557                    },
558                },
559                json_value: serde_json::from_str("{\"a\":2,\"b\":\"hello\"}")?,
560            },
561        )?;
562        insert_request.add_row(
563            None,
564            MyRow {
565                int_value: 2,
566                float_value: 2.0,
567                bool_value: true,
568                string_value: "second".into(),
569                record_value: FirstRecordLevel {
570                    int_value: 11,
571                    string_value: "sub_level_1.2".into(),
572                    record_value: SecondRecordLevel {
573                        int_value: 21,
574                        string_value: "leaf".to_string(),
575                    },
576                },
577                json_value: serde_json::from_str("{\"a\":1,\"b\":\"goodbye\",\"c\":3}")?,
578            },
579        )?;
580        insert_request.add_row(
581            None,
582            MyRow {
583                int_value: 3,
584                float_value: 3.0,
585                bool_value: false,
586                string_value: "third".into(),
587                record_value: FirstRecordLevel {
588                    int_value: 12,
589                    string_value: "sub_level_1.3".into(),
590                    record_value: SecondRecordLevel {
591                        int_value: 22,
592                        string_value: "leaf".to_string(),
593                    },
594                },
595                json_value: serde_json::from_str("{\"b\":\"world\",\"c\":2}")?,
596            },
597        )?;
598        insert_request.add_row(
599            None,
600            MyRow {
601                int_value: 4,
602                float_value: 4.0,
603                bool_value: true,
604                string_value: "fourth".into(),
605                record_value: FirstRecordLevel {
606                    int_value: 13,
607                    string_value: "sub_level_1.4".into(),
608                    record_value: SecondRecordLevel {
609                        int_value: 23,
610                        string_value: "leaf".to_string(),
611                    },
612                },
613                json_value: serde_json::from_str("{\"a\":3,\"c\":1}")?,
614            },
615        )?;
616
617        let n_rows = insert_request.len();
618
619        let result = client
620            .tabledata()
621            .insert_all(project_id, dataset_id, table_id, insert_request)
622            .await;
623
624        assert!(result.is_ok(), "{:?}", result);
625        let result = result.unwrap();
626        assert!(result.insert_errors.is_none(), "{:?}", result);
627
628        // Query
629        let query_response = client
630            .job()
631            .query(
632                project_id,
633                QueryRequest::new(format!(
634                    "SELECT COUNT(*) AS c FROM `{project_id}.{dataset_id}.{table_id}`"
635                )),
636            )
637            .await?;
638
639        // Get job id
640        let job_id = query_response
641            .job_reference
642            .as_ref()
643            .expect("expected job_reference")
644            .job_id
645            .clone()
646            .expect("expected job_id");
647
648        let mut rs = ResultSet::new_from_query_response(query_response);
649
650        while rs.next_row() {
651            assert!(rs.get_i64_by_name("c")?.is_some());
652        }
653
654        let job = client.job_api.get_job(project_id, &job_id, None).await?;
655        assert_eq!(job.status.unwrap().state.unwrap(), "DONE");
656
657        // GetQueryResults
658        let query_results = client
659            .job()
660            .get_query_results(project_id, &job_id, Default::default())
661            .await?;
662        let mut query_results_rs = ResultSet::new_from_query_response(QueryResponse::from(query_results));
663        assert_eq!(query_results_rs.row_count(), rs.row_count());
664        while query_results_rs.next_row() {
665            assert!(rs.get_i64_by_name("c")?.is_some());
666        }
667
668        // Query all
669        let query_all_results: Result<Vec<_>, _> = client
670            .job()
671            .query_all(
672                project_id,
673                JobConfigurationQuery {
674                    query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
675                    query_parameters: None,
676                    use_legacy_sql: Some(false),
677                    ..Default::default()
678                },
679                Some(2),
680            )
681            .collect::<Result<Vec<_>, _>>()
682            .await
683            .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
684
685        assert!(query_all_results.is_ok());
686        assert_eq!(query_all_results.unwrap().len(), n_rows);
687
688        // Query all with location
689        let location = "us";
690        let query_all_results_with_location: Result<Vec<_>, _> = client
691            .job()
692            .query_all_with_location(
693                project_id,
694                location,
695                JobConfigurationQuery {
696                    query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
697                    query_parameters: None,
698                    use_legacy_sql: Some(false),
699                    ..Default::default()
700                },
701                Some(2),
702            )
703            .collect::<Result<Vec<_>, _>>()
704            .await
705            .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
706
707        assert!(query_all_results_with_location.is_ok());
708        assert_eq!(query_all_results_with_location.unwrap().len(), n_rows);
709
710        // Query all with JobReference
711        let job_reference = JobReference {
712            project_id: Some(project_id.to_string()),
713            location: Some(location.to_string()),
714            ..Default::default()
715        };
716        let query_all_results_with_job_reference: Result<Vec<_>, _> = client
717            .job()
718            .query_all_with_job_reference(
719                project_id,
720                job_reference,
721                JobConfigurationQuery {
722                    query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
723                    query_parameters: None,
724                    use_legacy_sql: Some(false),
725                    ..Default::default()
726                },
727                Some(2),
728            )
729            .collect::<Result<Vec<_>, _>>()
730            .await
731            .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
732
733        assert!(query_all_results_with_job_reference.is_ok());
734        assert_eq!(query_all_results_with_job_reference.unwrap().len(), n_rows);
735
736        // Query all with json parameter
737        let query_all_results_with_parameter: Result<Vec<_>, _> = client
738            .job()
739            .query_all(
740                project_id,
741                JobConfigurationQuery {
742                    query: format!("SELECT int_value, json_value.a, json_value.b FROM `{project_id}.{dataset_id}.{table_id}` where CAST(JSON_VALUE(json_value,'$.a') as int) >= @compare"),
743                    query_parameters: Some(vec![QueryParameter {
744                        name: Some("compare".to_string()),
745                        parameter_type: Some(QueryParameterType { array_type: None, struct_types: None, r#type: "INTEGER".to_string() }),
746                        parameter_value: Some(QueryParameterValue { array_values: None, struct_values: None, value: Some("2".to_string()) }),
747                    }]),
748                    use_legacy_sql: Some(false),
749                    ..Default::default()
750                },
751                Some(2),
752            )
753            .collect::<Result<Vec<_>, _>>()
754            .await
755            .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
756
757        assert!(query_all_results_with_parameter.is_ok());
758        // 2 rows match the query: {"a":2,"b":"hello"} and {"a":3,"c":1}
759        assert_eq!(query_all_results_with_parameter.unwrap().len(), 2);
760
761        // Delete table
762        client.table().delete(project_id, dataset_id, table_id).await?;
763
764        // Delete dataset
765        client.dataset().delete(project_id, dataset_id, true).await?;
766
767        Ok(())
768    }
769}