google_cloud_bigquery/http/
bigquery_job_client.rs

1use std::sync::Arc;
2
3use crate::http::bigquery_client::BigqueryClient;
4use crate::http::error::Error;
5use crate::http::job;
6use crate::http::job::cancel::{CancelJobRequest, CancelJobResponse};
7use crate::http::job::get::GetJobRequest;
8use crate::http::job::get_query_results::{GetQueryResultsRequest, GetQueryResultsResponse};
9use crate::http::job::list::{JobOverview, ListJobsRequest, ListJobsResponse};
10use crate::http::job::query::{QueryRequest, QueryResponse};
11use crate::http::job::Job;
12
13#[derive(Debug, Clone)]
14pub struct BigqueryJobClient {
15    inner: Arc<BigqueryClient>,
16}
17
18impl BigqueryJobClient {
19    pub fn new(inner: Arc<BigqueryClient>) -> Self {
20        Self { inner }
21    }
22
23    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/create
24    /// ```rust
25    /// use google_cloud_bigquery::http::bigquery_job_client::BigqueryJobClient;
26    /// use google_cloud_bigquery::http::error::Error;
27    /// use google_cloud_bigquery::http::job::{CreateDisposition, Job, JobConfiguration, JobConfigurationSourceTable, JobConfigurationTableCopy, JobReference, JobState, JobType, OperationType, WriteDisposition};
28    /// use google_cloud_bigquery::http::table::TableReference;
29    ///
30    /// async fn run(client: BigqueryJobClient)  {
31    ///     let job = Job {
32    ///         job_reference: JobReference {
33    ///             project_id: "project".to_string(),
34    ///             job_id: "job".to_string(),
35    ///             location: Some("asia-northeast1".to_string()),
36    ///         },
37    ///         configuration: JobConfiguration {
38    ///             job: JobType::Copy(JobConfigurationTableCopy {
39    ///                 source_table: JobConfigurationSourceTable::SourceTable(TableReference {
40    ///                     project_id: "project".to_string(),
41    ///                     dataset_id: "dataset".to_string(),
42    ///                     table_id: "source_table".to_string(),
43    ///                 }),
44    ///                 destination_table: TableReference {
45    ///                     project_id: "project".to_string(),
46    ///                     dataset_id: "dataset".to_string(),
47    ///                     table_id: "destination_table".to_string(),
48    ///                 },
49    ///                 create_disposition: Some(CreateDisposition::CreateIfNeeded),
50    ///                 write_disposition: Some(WriteDisposition::WriteTruncate),
51    ///                 operation_type: Some(OperationType::Copy),
52    ///                 ..Default::default()
53    ///             }),
54    ///             ..Default::default()
55    ///         },
56    ///         ..Default::default()
57    ///     };
58    ///     let created = client.create(&job).await.unwrap();
59    ///     assert!(created.status.errors.is_none());
60    ///     assert!(created.status.error_result.is_none());
61    ///     assert!(created.status.state == JobState::Running || created.status.state == JobState::Done);
62    /// }
63    /// ```
64    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
65    pub async fn create(&self, metadata: &Job) -> Result<Job, Error> {
66        let builder = job::insert::build(self.inner.endpoint(), self.inner.http(), metadata);
67        self.inner.send(builder).await
68    }
69
70    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/delete
71    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
72    pub async fn delete(&self, project_id: &str, job_id: &str) -> Result<(), Error> {
73        let builder = job::delete::build(self.inner.endpoint(), self.inner.http(), project_id, job_id);
74        self.inner.send_get_empty(builder).await
75    }
76
77    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
78    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
79    pub async fn get(&self, project_id: &str, job_id: &str, data: &GetJobRequest) -> Result<Job, Error> {
80        let builder = job::get::build(self.inner.endpoint(), self.inner.http(), project_id, job_id, data);
81        self.inner.send(builder).await
82    }
83
84    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
85    /// ```rust
86    /// use google_cloud_bigquery::http::bigquery_job_client::BigqueryJobClient;
87    /// use google_cloud_bigquery::http::error::Error;
88    /// use google_cloud_bigquery::http::job::cancel::CancelJobRequest;
89    /// use google_cloud_bigquery::http::job::{Job, JobReference, JobState};
90    ///
91    ///  async fn run(client: BigqueryJobClient, job: JobReference) {
92    ///     let request = CancelJobRequest {
93    ///         location: job.location,
94    ///     };
95    ///     let cancelled = client.cancel(&job.project_id, &job.job_id, &request).await.unwrap();
96    ///  }
97    /// ```
98    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
99    pub async fn cancel(
100        &self,
101        project_id: &str,
102        job_id: &str,
103        data: &CancelJobRequest,
104    ) -> Result<CancelJobResponse, Error> {
105        let builder = job::cancel::build(self.inner.endpoint(), self.inner.http(), project_id, job_id, data);
106        self.inner.send(builder).await
107    }
108
109    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
110    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
111    pub async fn query(&self, project_id: &str, data: &QueryRequest) -> Result<QueryResponse, Error> {
112        let builder = job::query::build(self.inner.endpoint(), self.inner.http(), project_id, data);
113        self.inner.send(builder).await
114    }
115
116    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get_query_results
117    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
118    pub async fn get_query_results(
119        &self,
120        project_id: &str,
121        job_id: &str,
122        data: &GetQueryResultsRequest,
123    ) -> Result<GetQueryResultsResponse, Error> {
124        let builder = job::get_query_results::build(self.inner.endpoint(), self.inner.http(), project_id, job_id, data);
125        self.inner.send(builder).await
126    }
127
128    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list
129    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
130    pub async fn list(&self, project_id: &str, req: &ListJobsRequest) -> Result<Vec<JobOverview>, Error> {
131        let mut page_token: Option<String> = None;
132        let mut jobs = vec![];
133        loop {
134            let builder = job::list::build(self.inner.endpoint(), self.inner.http(), project_id, req, page_token);
135            let response: ListJobsResponse = self.inner.send(builder).await?;
136            jobs.extend(response.jobs);
137            if response.next_page_token.is_none() {
138                break;
139            }
140            page_token = response.next_page_token;
141        }
142        Ok(jobs)
143    }
144}
145
146#[cfg(test)]
147mod test {
148    use core::default::Default;
149    use std::sync::Arc;
150
151    use serial_test::serial;
152    use time::OffsetDateTime;
153
154    use crate::http::bigquery_client::test::{bucket_name, create_client, create_table_schema, dataset_name, TestData};
155    use crate::http::bigquery_job_client::BigqueryJobClient;
156    use crate::http::bigquery_table_client::BigqueryTableClient;
157    use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
158    use crate::http::job::cancel::CancelJobRequest;
159
160    use crate::http::job::get_query_results::GetQueryResultsRequest;
161    use crate::http::job::query::QueryRequest;
162    use crate::http::job::{
163        CreateDisposition, Job, JobConfiguration, JobConfigurationExtract, JobConfigurationExtractSource,
164        JobConfigurationLoad, JobConfigurationQuery, JobConfigurationSourceTable, JobConfigurationTableCopy, JobState,
165        JobType, OperationType, WriteDisposition,
166    };
167
168    use crate::http::table::{DestinationFormat, SourceFormat, Table, TableReference};
169    use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
170
171    #[ctor::ctor]
172    fn init() {
173        let _ = tracing_subscriber::fmt::try_init();
174    }
175
176    #[tokio::test]
177    #[serial]
178    pub async fn create_job_error() {
179        let (client, project) = create_client().await;
180        let client = BigqueryJobClient::new(Arc::new(client));
181
182        let mut job1 = Job::default();
183        job1.job_reference.job_id = format!("test_{}", OffsetDateTime::now_utc().unix_timestamp());
184        job1.job_reference.project_id = project.to_string();
185        job1.job_reference.location = Some("asia-northeast1".to_string());
186        job1.configuration = JobConfiguration {
187            job: JobType::Query(JobConfigurationQuery {
188                query: "SELECT 1 FROM invalid_table".to_string(),
189                ..Default::default()
190            }),
191            ..Default::default()
192        };
193        let job1 = client.create(&job1).await.unwrap();
194        assert!(job1.status.errors.is_some());
195        assert!(job1.status.error_result.is_some());
196        let error_result = job1.status.error_result.unwrap();
197        assert_eq!(error_result.reason.unwrap().as_str(), "invalid");
198        assert_eq!(error_result.location.unwrap().as_str(), "invalid_table");
199        assert_eq!(job1.status.state, JobState::Done);
200    }
201
202    #[tokio::test]
203    #[serial]
204    pub async fn create_job() {
205        let dataset = dataset_name("job");
206        let (client, project) = create_client().await;
207        let client = Arc::new(client);
208        let client = BigqueryJobClient::new(client);
209
210        // query job
211        let mut job1 = Job::default();
212        job1.job_reference.job_id = format!("test_query_{}", OffsetDateTime::now_utc().unix_timestamp());
213        job1.job_reference.project_id = project.to_string();
214        job1.job_reference.location = Some("us-central1".to_string());
215        job1.configuration = JobConfiguration {
216            job: JobType::Query(JobConfigurationQuery {
217                use_legacy_sql: Some(false),
218                query: "SELECT 1".to_string(),
219                ..Default::default()
220            }),
221            ..Default::default()
222        };
223        let job1 = client.create(&job1).await.unwrap();
224        assert!(job1.status.errors.is_none());
225        assert!(job1.status.error_result.is_none());
226        assert_eq!(job1.status.state, JobState::Done);
227        assert_eq!(
228            job1.statistics.unwrap().query.unwrap().statement_type.unwrap().as_str(),
229            "SELECT"
230        );
231
232        let bucket_name = bucket_name(&project, "job");
233        // load job
234        let mut job1 = Job::default();
235        job1.job_reference.job_id = format!("test_load_{}", OffsetDateTime::now_utc().unix_timestamp());
236        job1.job_reference.project_id = project.to_string();
237        job1.job_reference.location = Some("us-central1".to_string());
238        job1.configuration = JobConfiguration {
239            job: JobType::Load(JobConfigurationLoad {
240                source_uris: vec![format!("gs://{}/external_data.csv", bucket_name)],
241                source_format: Some(SourceFormat::Csv),
242                field_delimiter: Some("|".to_string()),
243                encoding: Some("UTF-8".to_string()),
244                skip_leading_rows: Some(0),
245                autodetect: Some(true),
246                write_disposition: Some(WriteDisposition::WriteTruncate),
247                destination_table: TableReference {
248                    project_id: project.to_string(),
249                    dataset_id: dataset.clone(),
250                    table_id: "external_data".to_string(),
251                },
252                ..Default::default()
253            }),
254            ..Default::default()
255        };
256        let job1 = client.create(&job1).await.unwrap();
257        assert!(job1.status.errors.is_none());
258        assert!(job1.status.error_result.is_none());
259        assert!(job1.status.state == JobState::Running || job1.status.state == JobState::Done);
260
261        // copy job
262        let mut job2 = Job::default();
263        job2.job_reference.job_id = format!("test_copy_{}", OffsetDateTime::now_utc().unix_timestamp());
264        job2.job_reference.project_id = project.to_string();
265        job2.job_reference.location = Some("us-central1".to_string());
266        job2.configuration = JobConfiguration {
267            job: JobType::Copy(JobConfigurationTableCopy {
268                source_table: JobConfigurationSourceTable::SourceTable(TableReference {
269                    project_id: project.to_string(),
270                    dataset_id: dataset.clone(),
271                    table_id: "external_data".to_string(),
272                }),
273                destination_table: TableReference {
274                    project_id: project.to_string(),
275                    dataset_id: dataset.clone(),
276                    table_id: "external_data_copy".to_string(),
277                },
278                create_disposition: Some(CreateDisposition::CreateIfNeeded),
279                write_disposition: Some(WriteDisposition::WriteTruncate),
280                operation_type: Some(OperationType::Copy),
281                ..Default::default()
282            }),
283            ..Default::default()
284        };
285        let job2 = client.create(&job2).await.unwrap();
286        assert!(job2.status.errors.is_none());
287        assert!(job2.status.error_result.is_none());
288        assert!(job2.status.state == JobState::Running || job2.status.state == JobState::Done);
289
290        // extract table job
291        let mut job3 = Job::default();
292        job3.job_reference.job_id = format!("test_extract_{}", OffsetDateTime::now_utc().unix_timestamp());
293        job3.job_reference.project_id = project.to_string();
294        job3.job_reference.location = Some("us-central1".to_string());
295        job3.configuration = JobConfiguration {
296            job: JobType::Extract(JobConfigurationExtract {
297                destination_uris: vec![format!("gs://{}/extracted_data.json", project)],
298                destination_format: Some(DestinationFormat::NewlineDelimitedJson),
299                source: JobConfigurationExtractSource::SourceTable(TableReference {
300                    project_id: project.to_string(),
301                    dataset_id: dataset.clone(),
302                    table_id: "external_data_copy".to_string(),
303                }),
304                ..Default::default()
305            }),
306            ..Default::default()
307        };
308        let job3 = client.create(&job3).await.unwrap();
309        assert!(job3.status.errors.is_none());
310        assert!(job3.status.error_result.is_none());
311        assert!(job3.status.state == JobState::Running || job3.status.state == JobState::Done);
312
313        // cancel
314        let cancelled = client
315            .cancel(
316                job3.job_reference.project_id.as_str(),
317                job3.job_reference.job_id.as_str(),
318                &CancelJobRequest {
319                    location: job3.job_reference.location,
320                },
321            )
322            .await
323            .unwrap();
324        assert!(cancelled.job.status.state == JobState::Running || cancelled.job.status.state == JobState::Done);
325    }
326
327    #[tokio::test]
328    #[serial]
329    pub async fn query() {
330        let dataset = dataset_name("job_temp");
331        let (client, project) = create_client().await;
332        let client = Arc::new(client);
333        let table_client = BigqueryTableClient::new(client.clone());
334        let tabledata_client = BigqueryTabledataClient::new(client.clone());
335
336        // insert test data
337        let mut table1 = Table::default();
338        table1.table_reference.dataset_id.clone_from(&dataset);
339        table1.table_reference.project_id = project.to_string();
340        table1.table_reference.table_id = format!("table_data_{}", OffsetDateTime::now_utc().unix_timestamp());
341        table1.schema = Some(create_table_schema());
342        let table1 = table_client.create(&table1).await.unwrap();
343        let ref1 = table1.table_reference;
344
345        // json value
346        let mut req = InsertAllRequest::<TestData>::default();
347        for i in 0..3 {
348            req.rows.push(Row {
349                insert_id: None,
350                json: TestData::default(i, OffsetDateTime::now_utc()),
351            });
352        }
353        let res = tabledata_client
354            .insert(ref1.project_id.as_str(), ref1.dataset_id.as_str(), ref1.table_id.as_str(), &req)
355            .await
356            .unwrap();
357        assert!(res.insert_errors.is_none());
358
359        // query
360        let client = BigqueryJobClient::new(client);
361        let result = client
362            .query(
363                project.as_str(),
364                &QueryRequest {
365                    max_results: Some(2),
366                    query: format!("SELECT * FROM {}.{}", ref1.dataset_id.as_str(), ref1.table_id.as_str()),
367                    ..Default::default()
368                },
369            )
370            .await
371            .unwrap();
372        assert!(result.page_token.is_some());
373        assert_eq!(result.rows.unwrap().len(), 2);
374        assert_eq!(result.total_rows.unwrap(), 3);
375        assert_eq!(result.total_bytes_processed.unwrap(), 0);
376        assert!(result.job_complete);
377
378        // query all results
379        let mut page_token = result.page_token;
380        let location = result.job_reference.location;
381        loop {
382            let query_results = client
383                .get_query_results(
384                    result.job_reference.project_id.as_str(),
385                    result.job_reference.job_id.as_str(),
386                    &GetQueryResultsRequest {
387                        page_token,
388                        location: location.clone(),
389                        ..Default::default()
390                    },
391                )
392                .await
393                .unwrap();
394            assert_eq!(query_results.rows.unwrap().len(), 1);
395            assert_eq!(query_results.total_rows, 3);
396            if query_results.page_token.is_none() {
397                break;
398            }
399            page_token = query_results.page_token
400        }
401
402        // dry run
403        let result = client
404            .query(
405                project.as_str(),
406                &QueryRequest {
407                    dry_run: Some(true),
408                    max_results: Some(10),
409                    query: format!("SELECT * FROM {}.{}", ref1.dataset_id.as_str(), ref1.table_id.as_str()),
410                    ..Default::default()
411                },
412            )
413            .await
414            .unwrap();
415        assert!(result.job_reference.job_id.is_empty());
416        assert!(result.total_rows.is_none());
417        assert_eq!(result.total_bytes_processed.unwrap(), 0);
418        assert!(result.job_complete);
419
420        table_client
421            .delete(&ref1.project_id, &ref1.dataset_id, &ref1.table_id)
422            .await
423            .unwrap();
424    }
425}