1use std::sync::Arc;
3
4use log::warn;
5use reqwest::Client;
6
7use crate::auth::Authenticator;
8use crate::error::BQError;
9use crate::model::dataset::Dataset;
10use crate::model::datasets::Datasets;
11use crate::model::information_schema::schemata::Schemata;
12use crate::model::query_request::QueryRequest;
13use crate::model::query_response::{QueryResponse, ResultSet};
14use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
15
16#[derive(Clone)]
18pub struct DatasetApi {
19 client: Client,
20 auth: Arc<dyn Authenticator>,
21 base_url: String,
22}
23
24impl DatasetApi {
25 pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
26 Self {
27 client,
28 auth,
29 base_url: BIG_QUERY_V2_URL.to_string(),
30 }
31 }
32
33 pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
34 self.base_url = base_url;
35 self
36 }
37
38 pub async fn create(&self, dataset: Dataset) -> Result<Dataset, BQError> {
60 let req_url = &format!(
61 "{base_url}/projects/{project_id}/datasets",
62 base_url = self.base_url,
63 project_id = urlencode(&dataset.dataset_reference.project_id)
64 );
65
66 let access_token = self.auth.access_token().await?;
67
68 let request = self
69 .client
70 .post(req_url.as_str())
71 .bearer_auth(access_token)
72 .json(&dataset)
73 .build()?;
74
75 let response = self.client.execute(request).await?;
76
77 process_response(response).await
78 }
79
80 pub async fn list(&self, project_id: &str, options: ListOptions) -> Result<Datasets, BQError> {
106 let req_url = &format!(
107 "{base_url}/projects/{project_id}/datasets",
108 base_url = self.base_url,
109 project_id = urlencode(project_id)
110 );
111
112 let access_token = self.auth.access_token().await?;
113
114 let mut request = self.client.get(req_url).bearer_auth(access_token);
115
116 if let Some(max_results) = options.max_results {
118 request = request.query(&[("maxResults", max_results.to_string())]);
119 }
120 if let Some(page_token) = options.page_token {
121 request = request.query(&[("pageToken", page_token)]);
122 }
123 if let Some(all) = options.all {
124 request = request.query(&[("all", all.to_string())]);
125 }
126 if let Some(filter) = options.filter {
127 request = request.query(&[("filter", filter)]);
128 }
129
130 let request = request.build()?;
131 let response = self.client.execute(request).await?;
132
133 process_response(response).await
134 }
135
136 pub async fn delete(&self, project_id: &str, dataset_id: &str, delete_contents: bool) -> Result<(), BQError> {
164 let req_url = &format!(
165 "{base_url}/projects/{project_id}/datasets/{dataset_id}",
166 base_url = self.base_url,
167 project_id = urlencode(project_id),
168 dataset_id = urlencode(dataset_id)
169 );
170
171 let access_token = self.auth.access_token().await?;
172
173 let request = self
174 .client
175 .delete(req_url)
176 .bearer_auth(access_token)
177 .query(&[("deleteContents", delete_contents.to_string())])
178 .build()?;
179 let response = self.client.execute(request).await?;
180
181 if response.status().is_success() {
182 Ok(())
183 } else {
184 Err(BQError::ResponseError {
185 error: response.json().await?,
186 })
187 }
188 }
189
190 pub async fn delete_if_exists(&self, project_id: &str, dataset_id: &str, delete_contents: bool) -> bool {
217 match self.delete(project_id, dataset_id, delete_contents).await {
218 Err(BQError::ResponseError { error }) => {
219 if error.error.code != 404 {
220 warn!("dataset.delete_if_exists: unexpected error: {:?}", error);
221 }
222 false
223 }
224 Err(err) => {
225 warn!("dataset.delete_if_exists: unexpected error: {:?}", err);
226 false
227 }
228 Ok(_) => true,
229 }
230 }
231
232 pub async fn get(&self, project_id: &str, dataset_id: &str) -> Result<Dataset, BQError> {
257 let req_url = &format!(
258 "{base_url}/projects/{project_id}/datasets/{dataset_id}",
259 base_url = self.base_url,
260 project_id = urlencode(project_id),
261 dataset_id = urlencode(dataset_id)
262 );
263
264 let access_token = self.auth.access_token().await?;
265
266 let request = self.client.get(req_url).bearer_auth(access_token).build()?;
267 let response = self.client.execute(request).await?;
268
269 process_response(response).await
270 }
271
272 pub async fn patch(&self, project_id: &str, dataset_id: &str, dataset: Dataset) -> Result<Dataset, BQError> {
278 let req_url = &format!(
279 "{base_url}/projects/{project_id}/datasets/{dataset_id}",
280 base_url = self.base_url,
281 project_id = urlencode(project_id),
282 dataset_id = urlencode(dataset_id)
283 );
284
285 let access_token = self.auth.access_token().await?;
286
287 let request = self
288 .client
289 .patch(req_url)
290 .bearer_auth(access_token)
291 .json(&dataset)
292 .build()?;
293 let response = self.client.execute(request).await?;
294
295 process_response(response).await
296 }
297
298 pub async fn update(&self, project_id: &str, dataset_id: &str, dataset: Dataset) -> Result<Dataset, BQError> {
303 let req_url = &format!(
304 "{base_url}/projects/{project_id}/datasets/{dataset_id}",
305 base_url = self.base_url,
306 project_id = urlencode(project_id),
307 dataset_id = urlencode(dataset_id)
308 );
309
310 let access_token = self.auth.access_token().await?;
311
312 let request = self
313 .client
314 .put(req_url)
315 .bearer_auth(access_token)
316 .json(&dataset)
317 .build()?;
318 let response = self.client.execute(request).await?;
319
320 process_response(response).await
321 }
322
323 pub async fn information_schema_schemata(&self, project_id: &str, region: &str) -> Result<Vec<Schemata>, BQError> {
324 let req_url = format!(
325 "{base_url}/projects/{project_id}/queries",
326 base_url = self.base_url,
327 project_id = urlencode(project_id)
328 );
329
330 let access_token = self.auth.access_token().await?;
331 let query_request = QueryRequest::new(format!("SELECT * FROM {region}.INFORMATION_SCHEMA.SCHEMATA"));
332
333 let request = self
334 .client
335 .post(req_url.as_str())
336 .bearer_auth(access_token)
337 .json(&query_request)
338 .build()?;
339
340 let resp = self.client.execute(request).await?;
341
342 let query_response: QueryResponse = process_response(resp).await?;
343 let mut rs = ResultSet::new_from_query_response(query_response);
344 let mut result = vec![];
345 let catalog_name_pos = *rs
346 .column_index("catalog_name")
347 .expect("The catalog_name column is expected");
348 let schema_name_pos = *rs
349 .column_index("schema_name")
350 .expect("The schema_name column is expected");
351 let schema_owner_pos = *rs
352 .column_index("schema_owner")
353 .expect("The schema_owner column is expected");
354 let creation_time_pos = *rs
355 .column_index("creation_time")
356 .expect("The creation_time column is expected");
357 let last_modified_time_pos = *rs
358 .column_index("last_modified_time")
359 .expect("The last_modified_time column is expected");
360 let location_pos = *rs.column_index("location").expect("The location column is expected");
361
362 while rs.next_row() {
363 result.push(Schemata {
364 catalog_name: rs.get_string(catalog_name_pos)?.expect("A catalog name is expected"),
365 schema_name: rs.get_string(schema_name_pos)?.expect("A schema_name is expected"),
366 schema_owner: rs.get_string(schema_owner_pos)?,
367 creation_time: rs.get_string(creation_time_pos)?.expect("A creation_time is expected"),
368 last_modified_time: rs
369 .get_string(last_modified_time_pos)?
370 .expect("A last_modified_time is expected"),
371 location: rs.get_string(location_pos)?.expect("A location is expected"),
372 });
373 }
374
375 Ok(result)
378 }
379}
380
381#[derive(Default)]
383pub struct ListOptions {
384 max_results: Option<u64>,
385 page_token: Option<String>,
386 all: Option<bool>,
387 filter: Option<String>,
388}
389
390impl ListOptions {
391 pub fn max_results(mut self, value: u64) -> Self {
394 self.max_results = Some(value);
395 self
396 }
397
398 pub fn page_token(mut self, value: String) -> Self {
400 self.page_token = Some(value);
401 self
402 }
403
404 pub fn all(mut self, value: bool) -> Self {
406 self.all = Some(value);
407 self
408 }
409
410 pub fn filter(mut self, value: String) -> Self {
414 self.filter = Some(value);
415 self
416 }
417}
418
419#[cfg(test)]
420mod test {
421 use crate::dataset::ListOptions;
422 use crate::error::BQError;
423 use crate::model::dataset::Dataset;
424 use crate::{env_vars, Client};
425
426 #[tokio::test]
427 async fn test() -> Result<(), BQError> {
428 let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
429 let dataset_id = &format!("{dataset_id}_dataset");
430
431 let client = Client::from_service_account_key_file(sa_key).await?;
432
433 let result = client.dataset().delete(project_id, dataset_id, true).await;
435 if result.is_ok() {
436 println!("Removed previous dataset '{dataset_id}'");
437 }
438
439 let created_dataset = client
441 .dataset()
442 .create(
443 Dataset::new(project_id, dataset_id)
444 .friendly_name("A dataset used for unit tests")
445 .location("US")
446 .label("owner", "me")
447 .label("env", "prod"),
448 )
449 .await?;
450 assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
451
452 let dataset = client.dataset().get(project_id, dataset_id).await?;
454 assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));
455
456 let dataset = client.dataset().patch(project_id, dataset_id, dataset).await?;
458 assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));
459
460 let dataset = client.dataset().update(project_id, dataset_id, dataset).await?;
462 assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));
463
464 let datasets = client
466 .dataset()
467 .list(project_id, ListOptions::default().all(true))
468 .await?;
469 let mut created_dataset_found = false;
470 for dataset in datasets.datasets.iter() {
471 if dataset.dataset_reference.dataset_id == *dataset_id {
472 created_dataset_found = true;
473 }
474 }
475 assert!(created_dataset_found);
476
477 client.dataset().delete(project_id, dataset_id, true).await?;
479
480 Ok(())
481 }
482
483 #[tokio::test]
484 async fn test_information_schema() -> Result<(), BQError> {
485 let (ref project_id, ref _dataset_id, ref _table_id, ref sa_key) = env_vars();
486 let client = Client::from_service_account_key_file(sa_key).await?;
489
490 let result = client
491 .dataset()
492 .information_schema_schemata(project_id, "region-us")
493 .await?;
494 dbg!(result);
495 Ok(())
496 }
497}