1use std::sync::Arc;
2
3use crate::http::bigquery_client::BigqueryClient;
4use crate::http::error::Error;
5use crate::http::job;
6use crate::http::job::cancel::{CancelJobRequest, CancelJobResponse};
7use crate::http::job::get::GetJobRequest;
8use crate::http::job::get_query_results::{GetQueryResultsRequest, GetQueryResultsResponse};
9use crate::http::job::list::{JobOverview, ListJobsRequest, ListJobsResponse};
10use crate::http::job::query::{QueryRequest, QueryResponse};
11use crate::http::job::Job;
12
13#[derive(Debug, Clone)]
14pub struct BigqueryJobClient {
15 inner: Arc<BigqueryClient>,
16}
17
18impl BigqueryJobClient {
19 pub fn new(inner: Arc<BigqueryClient>) -> Self {
20 Self { inner }
21 }
22
23 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
65 pub async fn create(&self, metadata: &Job) -> Result<Job, Error> {
66 let builder = job::insert::build(self.inner.endpoint(), self.inner.http(), metadata);
67 self.inner.send(builder).await
68 }
69
70 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
72 pub async fn delete(&self, project_id: &str, job_id: &str) -> Result<(), Error> {
73 let builder = job::delete::build(self.inner.endpoint(), self.inner.http(), project_id, job_id);
74 self.inner.send_get_empty(builder).await
75 }
76
77 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
79 pub async fn get(&self, project_id: &str, job_id: &str, data: &GetJobRequest) -> Result<Job, Error> {
80 let builder = job::get::build(self.inner.endpoint(), self.inner.http(), project_id, job_id, data);
81 self.inner.send(builder).await
82 }
83
84 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
99 pub async fn cancel(
100 &self,
101 project_id: &str,
102 job_id: &str,
103 data: &CancelJobRequest,
104 ) -> Result<CancelJobResponse, Error> {
105 let builder = job::cancel::build(self.inner.endpoint(), self.inner.http(), project_id, job_id, data);
106 self.inner.send(builder).await
107 }
108
109 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
111 pub async fn query(&self, project_id: &str, data: &QueryRequest) -> Result<QueryResponse, Error> {
112 let builder = job::query::build(self.inner.endpoint(), self.inner.http(), project_id, data);
113 self.inner.send(builder).await
114 }
115
116 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
118 pub async fn get_query_results(
119 &self,
120 project_id: &str,
121 job_id: &str,
122 data: &GetQueryResultsRequest,
123 ) -> Result<GetQueryResultsResponse, Error> {
124 let builder = job::get_query_results::build(self.inner.endpoint(), self.inner.http(), project_id, job_id, data);
125 self.inner.send(builder).await
126 }
127
128 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
130 pub async fn list(&self, project_id: &str, req: &ListJobsRequest) -> Result<Vec<JobOverview>, Error> {
131 let mut page_token: Option<String> = None;
132 let mut jobs = vec![];
133 loop {
134 let builder = job::list::build(self.inner.endpoint(), self.inner.http(), project_id, req, page_token);
135 let response: ListJobsResponse = self.inner.send(builder).await?;
136 jobs.extend(response.jobs);
137 if response.next_page_token.is_none() {
138 break;
139 }
140 page_token = response.next_page_token;
141 }
142 Ok(jobs)
143 }
144}
145
146#[cfg(test)]
147mod test {
148 use core::default::Default;
149 use std::sync::Arc;
150
151 use serial_test::serial;
152 use time::OffsetDateTime;
153
154 use crate::http::bigquery_client::test::{bucket_name, create_client, create_table_schema, dataset_name, TestData};
155 use crate::http::bigquery_job_client::BigqueryJobClient;
156 use crate::http::bigquery_table_client::BigqueryTableClient;
157 use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
158 use crate::http::job::cancel::CancelJobRequest;
159
160 use crate::http::job::get_query_results::GetQueryResultsRequest;
161 use crate::http::job::query::QueryRequest;
162 use crate::http::job::{
163 CreateDisposition, Job, JobConfiguration, JobConfigurationExtract, JobConfigurationExtractSource,
164 JobConfigurationLoad, JobConfigurationQuery, JobConfigurationSourceTable, JobConfigurationTableCopy, JobState,
165 JobType, OperationType, WriteDisposition,
166 };
167
168 use crate::http::table::{DestinationFormat, SourceFormat, Table, TableReference};
169 use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
170
171 #[ctor::ctor]
172 fn init() {
173 let _ = tracing_subscriber::fmt::try_init();
174 }
175
176 #[tokio::test]
177 #[serial]
178 pub async fn create_job_error() {
179 let (client, project) = create_client().await;
180 let client = BigqueryJobClient::new(Arc::new(client));
181
182 let mut job1 = Job::default();
183 job1.job_reference.job_id = format!("test_{}", OffsetDateTime::now_utc().unix_timestamp());
184 job1.job_reference.project_id = project.to_string();
185 job1.job_reference.location = Some("asia-northeast1".to_string());
186 job1.configuration = JobConfiguration {
187 job: JobType::Query(JobConfigurationQuery {
188 query: "SELECT 1 FROM invalid_table".to_string(),
189 ..Default::default()
190 }),
191 ..Default::default()
192 };
193 let job1 = client.create(&job1).await.unwrap();
194 assert!(job1.status.errors.is_some());
195 assert!(job1.status.error_result.is_some());
196 let error_result = job1.status.error_result.unwrap();
197 assert_eq!(error_result.reason.unwrap().as_str(), "invalid");
198 assert_eq!(error_result.location.unwrap().as_str(), "invalid_table");
199 assert_eq!(job1.status.state, JobState::Done);
200 }
201
202 #[tokio::test]
203 #[serial]
204 pub async fn create_job() {
205 let dataset = dataset_name("job");
206 let (client, project) = create_client().await;
207 let client = Arc::new(client);
208 let client = BigqueryJobClient::new(client);
209
210 let mut job1 = Job::default();
212 job1.job_reference.job_id = format!("test_query_{}", OffsetDateTime::now_utc().unix_timestamp());
213 job1.job_reference.project_id = project.to_string();
214 job1.job_reference.location = Some("us-central1".to_string());
215 job1.configuration = JobConfiguration {
216 job: JobType::Query(JobConfigurationQuery {
217 use_legacy_sql: Some(false),
218 query: "SELECT 1".to_string(),
219 ..Default::default()
220 }),
221 ..Default::default()
222 };
223 let job1 = client.create(&job1).await.unwrap();
224 assert!(job1.status.errors.is_none());
225 assert!(job1.status.error_result.is_none());
226 assert_eq!(job1.status.state, JobState::Done);
227 assert_eq!(
228 job1.statistics.unwrap().query.unwrap().statement_type.unwrap().as_str(),
229 "SELECT"
230 );
231
232 let bucket_name = bucket_name(&project, "job");
233 let mut job1 = Job::default();
235 job1.job_reference.job_id = format!("test_load_{}", OffsetDateTime::now_utc().unix_timestamp());
236 job1.job_reference.project_id = project.to_string();
237 job1.job_reference.location = Some("us-central1".to_string());
238 job1.configuration = JobConfiguration {
239 job: JobType::Load(JobConfigurationLoad {
240 source_uris: vec![format!("gs://{}/external_data.csv", bucket_name)],
241 source_format: Some(SourceFormat::Csv),
242 field_delimiter: Some("|".to_string()),
243 encoding: Some("UTF-8".to_string()),
244 skip_leading_rows: Some(0),
245 autodetect: Some(true),
246 write_disposition: Some(WriteDisposition::WriteTruncate),
247 destination_table: TableReference {
248 project_id: project.to_string(),
249 dataset_id: dataset.clone(),
250 table_id: "external_data".to_string(),
251 },
252 ..Default::default()
253 }),
254 ..Default::default()
255 };
256 let job1 = client.create(&job1).await.unwrap();
257 assert!(job1.status.errors.is_none());
258 assert!(job1.status.error_result.is_none());
259 assert!(job1.status.state == JobState::Running || job1.status.state == JobState::Done);
260
261 let mut job2 = Job::default();
263 job2.job_reference.job_id = format!("test_copy_{}", OffsetDateTime::now_utc().unix_timestamp());
264 job2.job_reference.project_id = project.to_string();
265 job2.job_reference.location = Some("us-central1".to_string());
266 job2.configuration = JobConfiguration {
267 job: JobType::Copy(JobConfigurationTableCopy {
268 source_table: JobConfigurationSourceTable::SourceTable(TableReference {
269 project_id: project.to_string(),
270 dataset_id: dataset.clone(),
271 table_id: "external_data".to_string(),
272 }),
273 destination_table: TableReference {
274 project_id: project.to_string(),
275 dataset_id: dataset.clone(),
276 table_id: "external_data_copy".to_string(),
277 },
278 create_disposition: Some(CreateDisposition::CreateIfNeeded),
279 write_disposition: Some(WriteDisposition::WriteTruncate),
280 operation_type: Some(OperationType::Copy),
281 ..Default::default()
282 }),
283 ..Default::default()
284 };
285 let job2 = client.create(&job2).await.unwrap();
286 assert!(job2.status.errors.is_none());
287 assert!(job2.status.error_result.is_none());
288 assert!(job2.status.state == JobState::Running || job2.status.state == JobState::Done);
289
290 let mut job3 = Job::default();
292 job3.job_reference.job_id = format!("test_extract_{}", OffsetDateTime::now_utc().unix_timestamp());
293 job3.job_reference.project_id = project.to_string();
294 job3.job_reference.location = Some("us-central1".to_string());
295 job3.configuration = JobConfiguration {
296 job: JobType::Extract(JobConfigurationExtract {
297 destination_uris: vec![format!("gs://{}/extracted_data.json", project)],
298 destination_format: Some(DestinationFormat::NewlineDelimitedJson),
299 source: JobConfigurationExtractSource::SourceTable(TableReference {
300 project_id: project.to_string(),
301 dataset_id: dataset.clone(),
302 table_id: "external_data_copy".to_string(),
303 }),
304 ..Default::default()
305 }),
306 ..Default::default()
307 };
308 let job3 = client.create(&job3).await.unwrap();
309 assert!(job3.status.errors.is_none());
310 assert!(job3.status.error_result.is_none());
311 assert!(job3.status.state == JobState::Running || job3.status.state == JobState::Done);
312
313 let cancelled = client
315 .cancel(
316 job3.job_reference.project_id.as_str(),
317 job3.job_reference.job_id.as_str(),
318 &CancelJobRequest {
319 location: job3.job_reference.location,
320 },
321 )
322 .await
323 .unwrap();
324 assert!(cancelled.job.status.state == JobState::Running || cancelled.job.status.state == JobState::Done);
325 }
326
327 #[tokio::test]
328 #[serial]
329 pub async fn query() {
330 let dataset = dataset_name("job_temp");
331 let (client, project) = create_client().await;
332 let client = Arc::new(client);
333 let table_client = BigqueryTableClient::new(client.clone());
334 let tabledata_client = BigqueryTabledataClient::new(client.clone());
335
336 let mut table1 = Table::default();
338 table1.table_reference.dataset_id.clone_from(&dataset);
339 table1.table_reference.project_id = project.to_string();
340 table1.table_reference.table_id = format!("table_data_{}", OffsetDateTime::now_utc().unix_timestamp());
341 table1.schema = Some(create_table_schema());
342 let table1 = table_client.create(&table1).await.unwrap();
343 let ref1 = table1.table_reference;
344
345 let mut req = InsertAllRequest::<TestData>::default();
347 for i in 0..3 {
348 req.rows.push(Row {
349 insert_id: None,
350 json: TestData::default(i, OffsetDateTime::now_utc()),
351 });
352 }
353 let res = tabledata_client
354 .insert(ref1.project_id.as_str(), ref1.dataset_id.as_str(), ref1.table_id.as_str(), &req)
355 .await
356 .unwrap();
357 assert!(res.insert_errors.is_none());
358
359 let client = BigqueryJobClient::new(client);
361 let result = client
362 .query(
363 project.as_str(),
364 &QueryRequest {
365 max_results: Some(2),
366 query: format!("SELECT * FROM {}.{}", ref1.dataset_id.as_str(), ref1.table_id.as_str()),
367 ..Default::default()
368 },
369 )
370 .await
371 .unwrap();
372 assert!(result.page_token.is_some());
373 assert_eq!(result.rows.unwrap().len(), 2);
374 assert_eq!(result.total_rows.unwrap(), 3);
375 assert_eq!(result.total_bytes_processed.unwrap(), 0);
376 assert!(result.job_complete);
377
378 let mut page_token = result.page_token;
380 let location = result.job_reference.location;
381 loop {
382 let query_results = client
383 .get_query_results(
384 result.job_reference.project_id.as_str(),
385 result.job_reference.job_id.as_str(),
386 &GetQueryResultsRequest {
387 page_token,
388 location: location.clone(),
389 ..Default::default()
390 },
391 )
392 .await
393 .unwrap();
394 assert_eq!(query_results.rows.unwrap().len(), 1);
395 assert_eq!(query_results.total_rows, 3);
396 if query_results.page_token.is_none() {
397 break;
398 }
399 page_token = query_results.page_token
400 }
401
402 let result = client
404 .query(
405 project.as_str(),
406 &QueryRequest {
407 dry_run: Some(true),
408 max_results: Some(10),
409 query: format!("SELECT * FROM {}.{}", ref1.dataset_id.as_str(), ref1.table_id.as_str()),
410 ..Default::default()
411 },
412 )
413 .await
414 .unwrap();
415 assert!(result.job_reference.job_id.is_empty());
416 assert!(result.total_rows.is_none());
417 assert_eq!(result.total_bytes_processed.unwrap(), 0);
418 assert!(result.job_complete);
419
420 table_client
421 .delete(&ref1.project_id, &ref1.dataset_id, &ref1.table_id)
422 .await
423 .unwrap();
424 }
425}