1use std::sync::Arc;
3
4use log::warn;
5use reqwest::Client;
6
7use crate::auth::Authenticator;
8use crate::error::BQError;
9use crate::model::get_iam_policy_request::GetIamPolicyRequest;
10use crate::model::policy::Policy;
11use crate::model::set_iam_policy_request::SetIamPolicyRequest;
12use crate::model::table::Table;
13use crate::model::table_list::TableList;
14use crate::model::test_iam_permissions_request::TestIamPermissionsRequest;
15use crate::model::test_iam_permissions_response::TestIamPermissionsResponse;
16use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
17
18#[derive(Clone)]
20pub struct TableApi {
21 client: Client,
22 auth: Arc<dyn Authenticator>,
23 base_url: String,
24}
25
26impl TableApi {
27 pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
28 Self {
29 client,
30 auth,
31 base_url: BIG_QUERY_V2_URL.to_string(),
32 }
33 }
34
35 pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
36 self.base_url = base_url;
37 self
38 }
39
40 pub async fn create(&self, table: Table) -> Result<Table, BQError> {
44 let req_url = &format!(
45 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables",
46 base_url = self.base_url,
47 project_id = urlencode(&table.table_reference.project_id),
48 dataset_id = urlencode(&table.table_reference.dataset_id)
49 );
50
51 let access_token = self.auth.access_token().await?;
52
53 let request = self
54 .client
55 .post(req_url.as_str())
56 .bearer_auth(access_token)
57 .json(&table)
58 .build()?;
59
60 let response = self.client.execute(request).await?;
61
62 process_response(response).await
63 }
64
65 pub async fn delete(&self, project_id: &str, dataset_id: &str, table_id: &str) -> Result<(), BQError> {
71 let req_url = &format!(
72 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
73 base_url = self.base_url,
74 project_id = urlencode(project_id),
75 dataset_id = urlencode(dataset_id),
76 table_id = urlencode(table_id)
77 );
78
79 let access_token = self.auth.access_token().await?;
80
81 let request = self.client.delete(req_url.as_str()).bearer_auth(access_token).build()?;
82
83 let response = self.client.execute(request).await?;
84
85 if response.status().is_success() {
86 Ok(())
87 } else {
88 Err(BQError::ResponseError {
89 error: response.json().await?,
90 })
91 }
92 }
93
94 pub async fn delete_if_exists(&self, project_id: &str, dataset_id: &str, table_id: &str) -> bool {
95 match self.delete(project_id, dataset_id, table_id).await {
96 Err(BQError::ResponseError { error }) => {
97 if error.error.code != 404 {
98 warn!("table.delete_if_exists: unexpected error: {error:?}");
99 }
100 false
101 }
102 Err(err) => {
103 warn!("table.delete_if_exists: unexpected error: {err:?}");
104 false
105 }
106 Ok(_) => true,
107 }
108 }
109
110 pub async fn get(
121 &self,
122 project_id: &str,
123 dataset_id: &str,
124 table_id: &str,
125 selected_fields: Option<Vec<&str>>,
126 ) -> Result<Table, BQError> {
127 let req_url = &format!(
128 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
129 base_url = self.base_url,
130 project_id = urlencode(project_id),
131 dataset_id = urlencode(dataset_id),
132 table_id = urlencode(table_id)
133 );
134
135 let access_token = self.auth.access_token().await?;
136
137 let mut request_builder = self.client.get(req_url.as_str()).bearer_auth(access_token);
138 if let Some(selected_fields) = selected_fields {
139 let selected_fields = selected_fields.join(",");
140 request_builder = request_builder.query(&[("selectedFields", selected_fields)]);
141 }
142
143 let request = request_builder.build()?;
144
145 let response = self.client.execute(request).await?;
146
147 process_response(response).await
148 }
149
150 pub async fn list(&self, project_id: &str, dataset_id: &str, options: ListOptions) -> Result<TableList, BQError> {
156 let req_url = &format!(
157 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables",
158 base_url = self.base_url,
159 project_id = urlencode(project_id),
160 dataset_id = urlencode(dataset_id)
161 );
162
163 let access_token = self.auth.access_token().await?;
164
165 let mut request = self.client.get(req_url).bearer_auth(access_token);
166
167 if let Some(max_results) = options.max_results {
169 request = request.query(&[("maxResults", max_results.to_string())]);
170 }
171 if let Some(page_token) = options.page_token {
172 request = request.query(&[("pageToken", page_token)]);
173 }
174
175 let request = request.build()?;
176 let response = self.client.execute(request).await?;
177
178 process_response(response).await
179 }
180
181 pub async fn patch(
190 &self,
191 project_id: &str,
192 dataset_id: &str,
193 table_id: &str,
194 table: Table,
195 ) -> Result<Table, BQError> {
196 let req_url = &format!(
197 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
198 base_url = self.base_url,
199 project_id = urlencode(project_id),
200 dataset_id = urlencode(dataset_id),
201 table_id = urlencode(table_id)
202 );
203
204 let access_token = self.auth.access_token().await?;
205
206 let request = self
207 .client
208 .patch(req_url)
209 .bearer_auth(access_token)
210 .json(&table)
211 .build()?;
212 let response = self.client.execute(request).await?;
213
214 process_response(response).await
215 }
216
217 pub async fn update(
225 &self,
226 project_id: &str,
227 dataset_id: &str,
228 table_id: &str,
229 table: Table,
230 ) -> Result<Table, BQError> {
231 let req_url = &format!(
232 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
233 base_url = self.base_url,
234 project_id = urlencode(project_id),
235 dataset_id = urlencode(dataset_id),
236 table_id = urlencode(table_id)
237 );
238
239 let access_token = self.auth.access_token().await?;
240
241 let request = self
242 .client
243 .put(req_url)
244 .bearer_auth(access_token)
245 .json(&table)
246 .build()?;
247 let response = self.client.execute(request).await?;
248
249 process_response(response).await
250 }
251
252 pub async fn get_iam_policy(
258 &self,
259 resource: &str,
260 get_iam_policy_request: GetIamPolicyRequest,
261 ) -> Result<Policy, BQError> {
262 let req_url = &format!(
263 "{base_url}/projects/{resource}/:getIamPolicy",
264 base_url = self.base_url,
265 resource = urlencode(resource)
266 );
267
268 let access_token = self.auth.access_token().await?;
269
270 let request = self
271 .client
272 .post(req_url.as_str())
273 .bearer_auth(access_token)
274 .json(&get_iam_policy_request)
275 .build()?;
276
277 let response = self.client.execute(request).await?;
278
279 process_response(response).await
280 }
281
282 pub async fn set_iam_policy(
287 &self,
288 resource: &str,
289 set_iam_policy_request: SetIamPolicyRequest,
290 ) -> Result<Policy, BQError> {
291 let req_url = &format!(
292 "{base_url}/projects/{resource}/:setIamPolicy",
293 base_url = self.base_url,
294 resource = urlencode(resource)
295 );
296
297 let access_token = self.auth.access_token().await?;
298
299 let request = self
300 .client
301 .post(req_url.as_str())
302 .bearer_auth(access_token)
303 .json(&set_iam_policy_request)
304 .build()?;
305
306 let response = self.client.execute(request).await?;
307
308 process_response(response).await
309 }
310
311 pub async fn test_iam_permissions(
319 &self,
320 resource: &str,
321 test_iam_permissions_request: TestIamPermissionsRequest,
322 ) -> Result<TestIamPermissionsResponse, BQError> {
323 let req_url = &format!(
324 "{base_url}/projects/{resource}/:testIamPermissions",
325 base_url = self.base_url,
326 resource = urlencode(resource)
327 );
328
329 let access_token = self.auth.access_token().await?;
330
331 let request = self
332 .client
333 .post(req_url.as_str())
334 .bearer_auth(access_token)
335 .json(&test_iam_permissions_request)
336 .build()?;
337
338 let response = self.client.execute(request).await?;
339
340 process_response(response).await
341 }
342}
343
344#[derive(Default)]
346pub struct ListOptions {
347 max_results: Option<u64>,
348 page_token: Option<String>,
349}
350
351impl ListOptions {
352 pub fn max_results(mut self, value: u64) -> Self {
355 self.max_results = Some(value);
356 self
357 }
358
359 pub fn page_token(mut self, value: String) -> Self {
361 self.page_token = Some(value);
362 self
363 }
364}
365
366#[cfg(test)]
367mod test {
368 use crate::error::BQError;
369 use crate::model::dataset::Dataset;
370 use crate::model::field_type::FieldType;
371 use crate::model::table::Table;
372 use crate::model::table_field_schema::TableFieldSchema;
373 use crate::model::table_schema::TableSchema;
374 use crate::table::ListOptions;
375 use crate::{env_vars, Client};
376 use std::time::{Duration, SystemTime};
377
378 #[tokio::test]
379 async fn test() -> Result<(), BQError> {
380 let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
381 let dataset_id = &format!("{dataset_id}_table");
382
383 let client = Client::from_service_account_key_file(sa_key).await?;
384
385 client.dataset().delete_if_exists(project_id, dataset_id, true).await;
387
388 let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
390 assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
391
392 let table = Table::new(
394 project_id,
395 dataset_id,
396 table_id,
397 TableSchema::new(vec![
398 TableFieldSchema::new("col1", FieldType::String),
399 TableFieldSchema::new("col2", FieldType::Int64),
400 TableFieldSchema::new("col3", FieldType::Boolean),
401 TableFieldSchema::new("col4", FieldType::Datetime),
402 ]),
403 );
404 let created_table = client
405 .table()
406 .create(
407 table
408 .description("A table used for unit tests")
409 .label("owner", "me")
410 .label("env", "prod")
411 .expiration_time(SystemTime::now() + Duration::from_secs(3600)),
412 )
413 .await?;
414 assert_eq!(created_table.table_reference.table_id, table_id.to_string());
415
416 let table = client.table().get(project_id, dataset_id, table_id, None).await?;
417 assert_eq!(table.table_reference.table_id, table_id.to_string());
418
419 let table = client.table().update(project_id, dataset_id, table_id, table).await?;
420 assert_eq!(table.table_reference.table_id, table_id.to_string());
421
422 let table = client.table().patch(project_id, dataset_id, table_id, table).await?;
423 assert_eq!(table.table_reference.table_id, table_id.to_string());
424
425 let tables = client
427 .table()
428 .list(project_id, dataset_id, ListOptions::default())
429 .await?;
430 let mut created_table_found = false;
431 for table_list_tables in tables.tables.unwrap().iter() {
432 if &table_list_tables.table_reference.dataset_id == dataset_id {
433 created_table_found = true;
434 }
435 }
436 assert!(created_table_found);
437
438 client.table().delete(project_id, dataset_id, table_id).await?;
439
440 client.dataset().delete(project_id, dataset_id, true).await?;
442
443 Ok(())
444 }
445}