1use std::sync::Arc;
3
4use async_stream::stream;
5use reqwest::Client;
6use tokio_stream::Stream;
7
8use crate::auth::Authenticator;
9use crate::error::BQError;
10use crate::model::get_query_results_parameters::GetQueryResultsParameters;
11use crate::model::get_query_results_response::GetQueryResultsResponse;
12use crate::model::job::Job;
13use crate::model::job_cancel_response::JobCancelResponse;
14use crate::model::job_configuration::JobConfiguration;
15use crate::model::job_configuration_query::JobConfigurationQuery;
16use crate::model::job_list::JobList;
17use crate::model::job_list_parameters::JobListParameters;
18use crate::model::job_reference::JobReference;
19use crate::model::query_request::QueryRequest;
20use crate::model::query_response::QueryResponse;
21use crate::model::table_row::TableRow;
22use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
23
24#[derive(Clone)]
26pub struct JobApi {
27 client: Client,
28 auth: Arc<dyn Authenticator>,
29 base_url: String,
30}
31
32impl JobApi {
33 pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
34 Self {
35 client,
36 auth,
37 base_url: BIG_QUERY_V2_URL.to_string(),
38 }
39 }
40
41 pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
42 self.base_url = base_url;
43 self
44 }
45
46 pub async fn query(&self, project_id: &str, query_request: QueryRequest) -> Result<QueryResponse, BQError> {
52 let req_url = format!(
53 "{base_url}/projects/{project_id}/queries",
54 base_url = self.base_url,
55 project_id = urlencode(project_id)
56 );
57
58 let access_token = self.auth.access_token().await?;
59
60 let request = self
61 .client
62 .post(req_url.as_str())
63 .bearer_auth(access_token)
64 .json(&query_request)
65 .build()?;
66
67 let resp = self.client.execute(request).await?;
68
69 let query_response: QueryResponse = process_response(resp).await?;
70 Ok(query_response)
71 }
72
73 pub fn query_all<'a>(
80 &'a self,
81 project_id: &'a str,
82 query: JobConfigurationQuery,
83 page_size: Option<i32>,
84 ) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
85 stream! {
86 let job = Job {
87 configuration: Some(JobConfiguration {
88 dry_run: Some(false),
89 query: Some(query),
90 ..Default::default()
91 }),
92 ..Default::default()
93 };
94
95 let job = self.insert(project_id, job).await?;
96
97 if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
98 let mut page_token: Option<String> = None;
99 loop {
100 let qr = self
101 .get_query_results(
102 project_id,
103 job_id,
104 GetQueryResultsParameters {
105 page_token: page_token.clone(),
106 max_results: page_size,
107 ..Default::default()
108 },
109 )
110 .await?;
111
112 if !qr.job_complete.unwrap_or(false) {
114 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
115 continue;
116 }
117
118 yield Ok(qr.rows.unwrap_or_default());
121
122 page_token = match qr.page_token {
123 None => break,
124 f => f,
125 };
126 }
127 }
128 }
129 }
130
131 pub fn query_all_with_location<'a>(
140 &'a self,
141 project_id: &'a str,
142 location: &'a str,
143 query: JobConfigurationQuery,
144 page_size: Option<i32>,
145 ) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
146 stream! {
147 let job = Job {
148 configuration: Some(JobConfiguration {
149 dry_run: Some(false),
150 query: Some(query),
151 ..Default::default()
152 }),
153 job_reference: Some(JobReference {
154 location: Some(location.to_string()),
155 project_id: Some(project_id.to_string()),
156 ..Default::default()
157 }),
158 ..Default::default()
159 };
160
161 let job = self.insert(project_id, job).await?;
162
163 if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
164 let mut page_token: Option<String> = None;
165 loop {
166 let qr = self
167 .get_query_results(
168 project_id,
169 job_id,
170 GetQueryResultsParameters {
171 page_token: page_token.clone(),
172 max_results: page_size,
173 location: Some(location.to_string()),
174 ..Default::default()
175 },
176 )
177 .await?;
178
179 if !qr.job_complete.unwrap_or(false) {
181 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
182
183 continue;
184 }
185
186
187 yield Ok(qr.rows.unwrap_or_default());
190
191 page_token = match qr.page_token {
192 None => break,
193 f => f,
194 };
195 }
196 }
197 }
198 }
199
200 pub fn query_all_with_job_reference<'a>(
209 &'a self,
210 project_id: &'a str,
211 job_reference: JobReference,
212 query: JobConfigurationQuery,
213 page_size: Option<i32>,
214 ) -> impl Stream<Item = Result<Vec<TableRow>, BQError>> + 'a {
215 stream! {
216 let location = job_reference.location.as_ref().cloned();
217
218 let job = Job {
219 configuration: Some(JobConfiguration {
220 dry_run: Some(false),
221 query: Some(query),
222 ..Default::default()
223 }),
224 job_reference: Some(job_reference),
225 ..Default::default()
226 };
227
228 let job = self.insert(project_id, job).await?;
229
230 if let Some(ref job_id) = job.job_reference.and_then(|r| r.job_id) {
231 let mut page_token: Option<String> = None;
232 loop {
233 let gqrp = GetQueryResultsParameters {
234 page_token,
235 max_results: page_size,
236 location: location.clone(),
237 ..Default::default()
238 };
239 let qr = self
240 .get_query_results(
241 project_id,
242 job_id,
243 gqrp,
244 )
245 .await?;
246
247 yield Ok(qr.rows.expect("Rows are not present"));
249
250 page_token = match qr.page_token {
251 None => break,
252 f => f,
253 };
254 }
255 }
256 }
257 }
258
259 pub async fn insert(&self, project_id: &str, job: Job) -> Result<Job, BQError> {
264 let req_url = format!(
265 "{base_url}/projects/{project_id}/jobs",
266 base_url = self.base_url,
267 project_id = urlencode(project_id)
268 );
269
270 let access_token = self.auth.access_token().await?;
271
272 let request = self
273 .client
274 .post(req_url.as_str())
275 .bearer_auth(access_token)
276 .json(&job)
277 .build()?;
278
279 let resp = self.client.execute(request).await?;
280
281 process_response(resp).await
282 }
283
284 pub async fn list(&self, project_id: &str) -> Result<JobList, BQError> {
290 let req_url = format!(
291 "{base_url}/projects/{project_id}/jobs",
292 base_url = self.base_url,
293 project_id = urlencode(project_id)
294 );
295
296 let access_token = self.auth.access_token().await?;
297
298 let request = self.client.get(req_url.as_str()).bearer_auth(access_token).build()?;
299
300 let resp = self.client.execute(request).await?;
301
302 process_response(resp).await
303 }
304
305 pub fn get_job_list<'a>(
313 &'a self,
314 project_id: &'a str,
315 parameters: Option<JobListParameters>,
316 ) -> impl Stream<Item = Result<JobList, BQError>> + 'a {
317 stream! {
318 let req_url = format!(
319 "{base_url}/projects/{project_id}/jobs",
320 base_url = self.base_url,
321 project_id = urlencode(project_id),
322 );
323
324 let mut params = parameters.unwrap_or_default();
325 params.page_token = None;
326
327 loop {
328 let mut request_builder = self.client.get(req_url.as_str());
329
330 request_builder = request_builder.query(¶ms);
331
332 let access_token = self.auth.access_token().await?;
333 let request = request_builder.bearer_auth(access_token).build()?;
334
335 let resp = self.client.execute(request).await?;
336
337 let process_resp: Result<JobList, BQError> = process_response(resp).await;
338
339 yield match process_resp {
340 Err(e) => {params.page_token=None; Err(e)},
341 Ok(job_list) => {params.page_token.clone_from(&job_list.next_page_token); Ok(job_list.clone())}
342 };
343
344 if params.page_token.is_none() {
345 break;
346 }
347 }
348 }
349 }
350
351 pub async fn get_query_results(
357 &self,
358 project_id: &str,
359 job_id: &str,
360 parameters: GetQueryResultsParameters,
361 ) -> Result<GetQueryResultsResponse, BQError> {
362 let req_url = format!(
363 "{base_url}/projects/{project_id}/queries/{job_id}",
364 base_url = self.base_url,
365 project_id = urlencode(project_id),
366 job_id = urlencode(job_id),
367 );
368
369 let access_token = self.auth.access_token().await?;
370
371 let request = self
372 .client
373 .get(req_url.as_str())
374 .query(¶meters)
375 .bearer_auth(access_token)
376 .build()?;
377
378 let resp = self.client.execute(request).await?;
379
380 let get_query_results_response: GetQueryResultsResponse = process_response(resp).await?;
381 Ok(get_query_results_response)
382 }
383
384 pub async fn get_job(&self, project_id: &str, job_id: &str, location: Option<&str>) -> Result<Job, BQError> {
393 let req_url = format!(
394 "{base_url}/projects/{project_id}/jobs/{job_id}",
395 base_url = self.base_url,
396 project_id = urlencode(project_id),
397 job_id = urlencode(job_id),
398 );
399
400 let mut request_builder = self.client.get(req_url.as_str());
401
402 if let Some(location) = location {
403 request_builder = request_builder.query(&[("location", location)]);
404 }
405
406 let access_token = self.auth.access_token().await?;
407 let request = request_builder.bearer_auth(access_token).build()?;
408
409 let resp = self.client.execute(request).await?;
410
411 process_response(resp).await
412 }
413
414 pub async fn cancel_job(
423 &self,
424 project_id: &str,
425 job_id: &str,
426 location: Option<&str>,
427 ) -> Result<JobCancelResponse, BQError> {
428 let req_url = format!(
429 "{base_url}/projects/{project_id}/jobs/{job_id}/cancel",
430 base_url = self.base_url,
431 project_id = urlencode(project_id),
432 job_id = urlencode(job_id),
433 );
434
435 let mut request_builder = self.client.post(req_url.as_str());
436
437 if let Some(location) = location {
438 request_builder = request_builder.query(&[("location", location)]);
439 }
440
441 let access_token = self.auth.access_token().await?;
442
443 let request = request_builder.bearer_auth(access_token).build()?;
444
445 let resp = self.client.execute(request).await?;
446
447 process_response(resp).await
448 }
449}
450
451#[cfg(test)]
452mod test {
453 use serde::Serialize;
454 use tokio_stream::StreamExt;
455
456 use crate::error::BQError;
457 use crate::model::dataset::Dataset;
458 use crate::model::field_type::serialize_json_as_string;
459 use crate::model::job_configuration_query::JobConfigurationQuery;
460 use crate::model::job_reference::JobReference;
461 use crate::model::query_parameter::QueryParameter;
462 use crate::model::query_parameter_type::QueryParameterType;
463 use crate::model::query_parameter_value::QueryParameterValue;
464 use crate::model::query_request::QueryRequest;
465 use crate::model::query_response::{QueryResponse, ResultSet};
466 use crate::model::table::Table;
467 use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
468 use crate::model::table_field_schema::TableFieldSchema;
469 use crate::model::table_schema::TableSchema;
470 use crate::{env_vars, Client};
471
472 #[derive(Serialize)]
473 struct MyRow {
474 int_value: i64,
475 float_value: f64,
476 bool_value: bool,
477 string_value: String,
478 record_value: FirstRecordLevel,
479 #[serde(serialize_with = "serialize_json_as_string")]
481 json_value: serde_json::value::Value,
482 }
483
484 #[derive(Serialize)]
485 struct FirstRecordLevel {
486 int_value: i64,
487 string_value: String,
488 record_value: SecondRecordLevel,
489 }
490
491 #[derive(Serialize)]
492 struct SecondRecordLevel {
493 int_value: i64,
494 string_value: String,
495 }
496
497 #[tokio::test]
498 async fn test() -> Result<(), BQError> {
499 let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
500 let dataset_id = &format!("{dataset_id}_job");
501
502 let client = Client::from_service_account_key_file(sa_key).await?;
503
504 client.table().delete_if_exists(project_id, dataset_id, table_id).await;
505 client.dataset().delete_if_exists(project_id, dataset_id, true).await;
506
507 let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
509 assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
510
511 let table = Table::new(
513 project_id,
514 dataset_id,
515 table_id,
516 TableSchema::new(vec![
517 TableFieldSchema::integer("int_value"),
518 TableFieldSchema::float("float_value"),
519 TableFieldSchema::bool("bool_value"),
520 TableFieldSchema::string("string_value"),
521 TableFieldSchema::record(
522 "record_value",
523 vec![
524 TableFieldSchema::integer("int_value"),
525 TableFieldSchema::string("string_value"),
526 TableFieldSchema::record(
527 "record_value",
528 vec![
529 TableFieldSchema::integer("int_value"),
530 TableFieldSchema::string("string_value"),
531 ],
532 ),
533 ],
534 ),
535 TableFieldSchema::json("json_value"),
536 ]),
537 );
538
539 let created_table = client.table().create(table).await?;
540 assert_eq!(created_table.table_reference.table_id, table_id.to_string());
541
542 let mut insert_request = TableDataInsertAllRequest::new();
544 insert_request.add_row(
545 None,
546 MyRow {
547 int_value: 1,
548 float_value: 1.0,
549 bool_value: false,
550 string_value: "first".into(),
551 record_value: FirstRecordLevel {
552 int_value: 10,
553 string_value: "sub_level_1.1".into(),
554 record_value: SecondRecordLevel {
555 int_value: 20,
556 string_value: "leaf".to_string(),
557 },
558 },
559 json_value: serde_json::from_str("{\"a\":2,\"b\":\"hello\"}")?,
560 },
561 )?;
562 insert_request.add_row(
563 None,
564 MyRow {
565 int_value: 2,
566 float_value: 2.0,
567 bool_value: true,
568 string_value: "second".into(),
569 record_value: FirstRecordLevel {
570 int_value: 11,
571 string_value: "sub_level_1.2".into(),
572 record_value: SecondRecordLevel {
573 int_value: 21,
574 string_value: "leaf".to_string(),
575 },
576 },
577 json_value: serde_json::from_str("{\"a\":1,\"b\":\"goodbye\",\"c\":3}")?,
578 },
579 )?;
580 insert_request.add_row(
581 None,
582 MyRow {
583 int_value: 3,
584 float_value: 3.0,
585 bool_value: false,
586 string_value: "third".into(),
587 record_value: FirstRecordLevel {
588 int_value: 12,
589 string_value: "sub_level_1.3".into(),
590 record_value: SecondRecordLevel {
591 int_value: 22,
592 string_value: "leaf".to_string(),
593 },
594 },
595 json_value: serde_json::from_str("{\"b\":\"world\",\"c\":2}")?,
596 },
597 )?;
598 insert_request.add_row(
599 None,
600 MyRow {
601 int_value: 4,
602 float_value: 4.0,
603 bool_value: true,
604 string_value: "fourth".into(),
605 record_value: FirstRecordLevel {
606 int_value: 13,
607 string_value: "sub_level_1.4".into(),
608 record_value: SecondRecordLevel {
609 int_value: 23,
610 string_value: "leaf".to_string(),
611 },
612 },
613 json_value: serde_json::from_str("{\"a\":3,\"c\":1}")?,
614 },
615 )?;
616
617 let n_rows = insert_request.len();
618
619 let result = client
620 .tabledata()
621 .insert_all(project_id, dataset_id, table_id, insert_request)
622 .await;
623
624 assert!(result.is_ok(), "{:?}", result);
625 let result = result.unwrap();
626 assert!(result.insert_errors.is_none(), "{:?}", result);
627
628 let query_response = client
630 .job()
631 .query(
632 project_id,
633 QueryRequest::new(format!(
634 "SELECT COUNT(*) AS c FROM `{project_id}.{dataset_id}.{table_id}`"
635 )),
636 )
637 .await?;
638
639 let job_id = query_response
641 .job_reference
642 .as_ref()
643 .expect("expected job_reference")
644 .job_id
645 .clone()
646 .expect("expected job_id");
647
648 let mut rs = ResultSet::new_from_query_response(query_response);
649
650 while rs.next_row() {
651 assert!(rs.get_i64_by_name("c")?.is_some());
652 }
653
654 let job = client.job_api.get_job(project_id, &job_id, None).await?;
655 assert_eq!(job.status.unwrap().state.unwrap(), "DONE");
656
657 let query_results = client
659 .job()
660 .get_query_results(project_id, &job_id, Default::default())
661 .await?;
662 let mut query_results_rs = ResultSet::new_from_query_response(QueryResponse::from(query_results));
663 assert_eq!(query_results_rs.row_count(), rs.row_count());
664 while query_results_rs.next_row() {
665 assert!(rs.get_i64_by_name("c")?.is_some());
666 }
667
668 let query_all_results: Result<Vec<_>, _> = client
670 .job()
671 .query_all(
672 project_id,
673 JobConfigurationQuery {
674 query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
675 query_parameters: None,
676 use_legacy_sql: Some(false),
677 ..Default::default()
678 },
679 Some(2),
680 )
681 .collect::<Result<Vec<_>, _>>()
682 .await
683 .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
684
685 assert!(query_all_results.is_ok());
686 assert_eq!(query_all_results.unwrap().len(), n_rows);
687
688 let location = "us";
690 let query_all_results_with_location: Result<Vec<_>, _> = client
691 .job()
692 .query_all_with_location(
693 project_id,
694 location,
695 JobConfigurationQuery {
696 query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
697 query_parameters: None,
698 use_legacy_sql: Some(false),
699 ..Default::default()
700 },
701 Some(2),
702 )
703 .collect::<Result<Vec<_>, _>>()
704 .await
705 .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
706
707 assert!(query_all_results_with_location.is_ok());
708 assert_eq!(query_all_results_with_location.unwrap().len(), n_rows);
709
710 let job_reference = JobReference {
712 project_id: Some(project_id.to_string()),
713 location: Some(location.to_string()),
714 ..Default::default()
715 };
716 let query_all_results_with_job_reference: Result<Vec<_>, _> = client
717 .job()
718 .query_all_with_job_reference(
719 project_id,
720 job_reference,
721 JobConfigurationQuery {
722 query: format!("SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"),
723 query_parameters: None,
724 use_legacy_sql: Some(false),
725 ..Default::default()
726 },
727 Some(2),
728 )
729 .collect::<Result<Vec<_>, _>>()
730 .await
731 .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
732
733 assert!(query_all_results_with_job_reference.is_ok());
734 assert_eq!(query_all_results_with_job_reference.unwrap().len(), n_rows);
735
736 let query_all_results_with_parameter: Result<Vec<_>, _> = client
738 .job()
739 .query_all(
740 project_id,
741 JobConfigurationQuery {
742 query: format!("SELECT int_value, json_value.a, json_value.b FROM `{project_id}.{dataset_id}.{table_id}` where CAST(JSON_VALUE(json_value,'$.a') as int) >= @compare"),
743 query_parameters: Some(vec![QueryParameter {
744 name: Some("compare".to_string()),
745 parameter_type: Some(QueryParameterType { array_type: None, struct_types: None, r#type: "INTEGER".to_string() }),
746 parameter_value: Some(QueryParameterValue { array_values: None, struct_values: None, value: Some("2".to_string()) }),
747 }]),
748 use_legacy_sql: Some(false),
749 ..Default::default()
750 },
751 Some(2),
752 )
753 .collect::<Result<Vec<_>, _>>()
754 .await
755 .map(|vec_of_vecs| vec_of_vecs.into_iter().flatten().collect());
756
757 assert!(query_all_results_with_parameter.is_ok());
758 assert_eq!(query_all_results_with_parameter.unwrap().len(), 2);
760
761 client.table().delete(project_id, dataset_id, table_id).await?;
763
764 client.dataset().delete(project_id, dataset_id, true).await?;
766
767 Ok(())
768 }
769}