google_cloud_bigquery/http/
bigquery_client.rs

1use std::sync::Arc;
2
3use reqwest::Response;
4use reqwest_middleware::{ClientWithMiddleware as Client, RequestBuilder};
5
6use google_cloud_token::TokenSource;
7
8use crate::http::error::{Error, ErrorWrapper};
9
10pub const SCOPES: [&str; 7] = [
11    "https://www.googleapis.com/auth/bigquery",
12    "https://www.googleapis.com/auth/bigquery.insertdata",
13    "https://www.googleapis.com/auth/cloud-platform",
14    "https://www.googleapis.com/auth/cloud-platform.read-only",
15    "https://www.googleapis.com/auth/devstorage.full_control",
16    "https://www.googleapis.com/auth/devstorage.read_only",
17    "https://www.googleapis.com/auth/devstorage.read_write",
18];
19
20#[derive(Debug, Clone)]
21pub struct BigqueryClient {
22    ts: Arc<dyn TokenSource>,
23    endpoint: String,
24    http: Client,
25    debug: bool,
26}
27
28impl BigqueryClient {
29    pub(crate) fn new(ts: Arc<dyn TokenSource>, endpoint: &str, http: Client, debug: bool) -> Self {
30        Self {
31            ts,
32            endpoint: format!("{endpoint}/bigquery/v2"),
33            http,
34            debug,
35        }
36    }
37
38    pub(crate) fn endpoint(&self) -> &str {
39        self.endpoint.as_str()
40    }
41
42    pub(crate) fn http(&self) -> &Client {
43        &self.http
44    }
45
46    async fn with_headers(&self, builder: RequestBuilder) -> Result<RequestBuilder, Error> {
47        let token = self.ts.token().await.map_err(Error::TokenSource)?;
48        Ok(builder
49            .header("X-Goog-Api-Client", "rust")
50            .header(reqwest::header::USER_AGENT, "google-cloud-bigquery")
51            .header(reqwest::header::AUTHORIZATION, token))
52    }
53
54    pub async fn send<T>(&self, builder: RequestBuilder) -> Result<T, Error>
55    where
56        T: serde::de::DeserializeOwned,
57    {
58        let request = self.with_headers(builder).await?;
59        let response = request.send().await?;
60        let response = Self::check_response_status(response).await?;
61        if self.debug {
62            let text = response.text().await?;
63            tracing::info!("{}", text);
64            Ok(serde_json::from_str(text.as_str()).unwrap())
65        } else {
66            Ok(response.json().await?)
67        }
68    }
69
70    pub async fn send_get_empty(&self, builder: RequestBuilder) -> Result<(), Error> {
71        let builder = self.with_headers(builder).await?;
72        let response = builder.send().await?;
73        Self::check_response_status(response).await?;
74        Ok(())
75    }
76
77    /// Checks whether an HTTP response is successful and returns it, or returns an error.
78    async fn check_response_status(response: Response) -> Result<Response, Error> {
79        // Check the status code, returning the response if it is not an error.
80        let error = match response.error_for_status_ref() {
81            Ok(_) => return Ok(response),
82            Err(error) => error,
83        };
84
85        // try to extract a response error, falling back to the status error if it can not be parsed.
86        Err(response
87            .json::<ErrorWrapper>()
88            .await
89            .map(|wrapper| Error::Response(wrapper.error))
90            .unwrap_or(Error::HttpClient(error)))
91    }
92}
93
94#[cfg(test)]
95pub(crate) mod test {
96    use std::str::FromStr;
97
98    use base64::engine::general_purpose::STANDARD;
99    use base64_serde::base64_serde_type;
100    use bigdecimal::BigDecimal;
101    use time::OffsetDateTime;
102
103    use google_cloud_auth::project::Config;
104    use google_cloud_auth::token::DefaultTokenSourceProvider;
105    use google_cloud_token::TokenSourceProvider;
106
107    use crate::http::bigquery_client::{BigqueryClient, SCOPES};
108    use crate::http::query;
109    use crate::http::query::value::Decodable as QueryDecodable;
110    use crate::http::table::{TableFieldMode, TableFieldSchema, TableFieldType, TableSchema};
111    use crate::http::tabledata::list::Tuple;
112    use crate::storage;
113    use crate::storage::array::ArrayRef;
114    use crate::storage::value::Decodable as StorageDecodable;
115
116    base64_serde_type!(Base64Standard, STANDARD);
117
118    #[ctor::ctor]
119    fn init() {
120        let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
121            .add_directive("google_cloud_bigquery=trace".parse().unwrap());
122        let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
123    }
124
125    pub fn dataset_name(name: &str) -> String {
126        format!("gcrbq_{}", name)
127    }
128
129    pub fn bucket_name(project: &str, name: &str) -> String {
130        format!("{}_gcrbq_{}", project, name)
131    }
132
133    pub async fn create_client() -> (BigqueryClient, String) {
134        let tsp = DefaultTokenSourceProvider::new(Config::default().with_scopes(&SCOPES))
135            .await
136            .unwrap();
137        let cred = tsp.source_credentials.clone();
138        let ts = tsp.token_source();
139        let client = BigqueryClient::new(
140            ts,
141            "https://bigquery.googleapis.com",
142            reqwest_middleware::ClientBuilder::new(reqwest::Client::new()).build(),
143            false,
144        );
145        (client, cred.unwrap().project_id.unwrap())
146    }
147
148    #[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, PartialEq)]
149    pub struct TestDataStruct {
150        pub f1: bool,
151        pub f2: Vec<i64>,
152    }
153
154    impl query::value::StructDecodable for TestDataStruct {
155        fn decode(value: Tuple) -> Result<Self, query::value::Error> {
156            let col = &value.f;
157            Ok(Self {
158                f1: bool::decode(&col[0].v)?,
159                f2: Vec::<i64>::decode(&col[1].v)?,
160            })
161        }
162    }
163
164    impl storage::value::StructDecodable for TestDataStruct {
165        fn decode_arrow(col: &[ArrayRef], row_no: usize) -> Result<TestDataStruct, storage::value::Error> {
166            let f1 = bool::decode_arrow(&col[0], row_no)?;
167            let f2 = Vec::<i64>::decode_arrow(&col[1], row_no)?;
168            Ok(TestDataStruct { f1, f2 })
169        }
170    }
171
172    #[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, PartialEq)]
173    pub struct TestData {
174        pub col_string: Option<String>,
175        pub col_number: Option<BigDecimal>,
176        pub col_number_array: Vec<BigDecimal>,
177        #[serde(default, with = "time::serde::rfc3339::option")]
178        pub col_timestamp: Option<OffsetDateTime>,
179        pub col_json: Option<String>,
180        pub col_json_array: Vec<String>,
181        pub col_struct: Option<TestDataStruct>,
182        pub col_struct_array: Vec<TestDataStruct>,
183        #[serde(default, with = "Base64Standard")]
184        pub col_binary: Vec<u8>,
185    }
186
187    impl query::value::StructDecodable for TestData {
188        fn decode(value: Tuple) -> Result<Self, query::value::Error> {
189            let col = &value.f;
190            Ok(TestData {
191                col_string: Option::<String>::decode(&col[0].v)?,
192                col_number: Option::<BigDecimal>::decode(&col[1].v)?,
193                col_number_array: Vec::<BigDecimal>::decode(&col[2].v)?,
194                col_timestamp: Option::<OffsetDateTime>::decode(&col[3].v)?,
195                col_json: Option::<String>::decode(&col[4].v)?,
196                col_json_array: Vec::<String>::decode(&col[5].v)?,
197                col_struct: Option::<TestDataStruct>::decode(&col[6].v)?,
198                col_struct_array: Vec::<TestDataStruct>::decode(&col[7].v)?,
199                col_binary: Vec::<u8>::decode(&col[8].v)?,
200            })
201        }
202    }
203
204    impl storage::value::StructDecodable for TestData {
205        fn decode_arrow(col: &[ArrayRef], row_no: usize) -> Result<TestData, storage::value::Error> {
206            Ok(TestData {
207                col_string: Option::<String>::decode_arrow(&col[0], row_no)?,
208                col_number: Option::<BigDecimal>::decode_arrow(&col[1], row_no)?,
209                col_number_array: Vec::<BigDecimal>::decode_arrow(&col[2], row_no)?,
210                col_timestamp: Option::<OffsetDateTime>::decode_arrow(&col[3], row_no)?,
211                col_json: Option::<String>::decode_arrow(&col[4], row_no)?,
212                col_json_array: Vec::<String>::decode_arrow(&col[5], row_no)?,
213                col_struct: Option::<TestDataStruct>::decode_arrow(&col[6], row_no)?,
214                col_struct_array: Vec::<TestDataStruct>::decode_arrow(&col[7], row_no)?,
215                col_binary: Vec::<u8>::decode_arrow(&col[8], row_no)?,
216            })
217        }
218    }
219
220    impl TestData {
221        pub fn default(index: usize, now: OffsetDateTime) -> TestData {
222            TestData {
223                col_string: Some(format!("test_{}", index)),
224                col_number: Some(BigDecimal::from_str("-99999999999999999999999999999.999999999").unwrap()),
225                col_number_array: vec![
226                    BigDecimal::from_str(
227                        "578960446186580977117854925043439539266.34992332820282019728792003956564819967",
228                    )
229                    .unwrap(),
230                    BigDecimal::from_str(
231                        "-578960446186580977117854925043439539266.34992332820282019728792003956564819968",
232                    )
233                    .unwrap(),
234                ],
235                col_timestamp: Some(now),
236                col_json: Some("{\"field\":100}".to_string()),
237                col_json_array: vec!["{\"field\":100}".to_string(), "{\"field\":200}".to_string()],
238                col_struct: Some(TestDataStruct {
239                    f1: true,
240                    f2: vec![index as i64, 3, 4],
241                }),
242                col_struct_array: vec![
243                    TestDataStruct {
244                        f1: true,
245                        f2: vec![index as i64, 5, 6],
246                    },
247                    TestDataStruct {
248                        f1: false,
249                        f2: vec![index as i64, 30, 40],
250                    },
251                ],
252                col_binary: b"test".to_vec(),
253            }
254        }
255    }
256
257    pub fn create_table_schema() -> TableSchema {
258        TableSchema {
259            fields: vec![
260                TableFieldSchema {
261                    name: "col_string".to_string(),
262                    data_type: TableFieldType::String,
263                    max_length: Some(32),
264                    ..Default::default()
265                },
266                TableFieldSchema {
267                    name: "col_number".to_string(),
268                    data_type: TableFieldType::Numeric,
269                    ..Default::default()
270                },
271                TableFieldSchema {
272                    name: "col_number_array".to_string(),
273                    data_type: TableFieldType::Bignumeric,
274                    mode: Some(TableFieldMode::Repeated),
275                    ..Default::default()
276                },
277                TableFieldSchema {
278                    name: "col_timestamp".to_string(),
279                    data_type: TableFieldType::Timestamp,
280                    ..Default::default()
281                },
282                TableFieldSchema {
283                    name: "col_json".to_string(),
284                    data_type: TableFieldType::Json,
285                    ..Default::default()
286                },
287                TableFieldSchema {
288                    name: "col_json_array".to_string(),
289                    data_type: TableFieldType::Json,
290                    mode: Some(TableFieldMode::Repeated),
291                    ..Default::default()
292                },
293                TableFieldSchema {
294                    name: "col_struct".to_string(),
295                    data_type: TableFieldType::Struct,
296                    fields: Some(vec![
297                        TableFieldSchema {
298                            name: "f1".to_string(),
299                            data_type: TableFieldType::Bool,
300                            ..Default::default()
301                        },
302                        TableFieldSchema {
303                            name: "f2".to_string(),
304                            data_type: TableFieldType::Int64,
305                            mode: Some(TableFieldMode::Repeated),
306                            ..Default::default()
307                        },
308                    ]),
309                    ..Default::default()
310                },
311                TableFieldSchema {
312                    name: "col_struct_array".to_string(),
313                    data_type: TableFieldType::Struct,
314                    fields: Some(vec![
315                        TableFieldSchema {
316                            name: "f1".to_string(),
317                            data_type: TableFieldType::Bool,
318                            ..Default::default()
319                        },
320                        TableFieldSchema {
321                            name: "f2".to_string(),
322                            data_type: TableFieldType::Int64,
323                            mode: Some(TableFieldMode::Repeated),
324                            ..Default::default()
325                        },
326                    ]),
327                    mode: Some(TableFieldMode::Repeated),
328                    ..Default::default()
329                },
330                TableFieldSchema {
331                    name: "col_binary".to_string(),
332                    data_type: TableFieldType::Bytes,
333                    mode: Some(TableFieldMode::Required),
334                    ..Default::default()
335                },
336            ],
337        }
338    }
339}