1use std::sync::Arc;
2
3use reqwest::Response;
4use reqwest_middleware::{ClientWithMiddleware as Client, RequestBuilder};
5
6use google_cloud_token::TokenSource;
7
8use crate::http::error::{Error, ErrorWrapper};
9
10pub const SCOPES: [&str; 7] = [
11 "https://www.googleapis.com/auth/bigquery",
12 "https://www.googleapis.com/auth/bigquery.insertdata",
13 "https://www.googleapis.com/auth/cloud-platform",
14 "https://www.googleapis.com/auth/cloud-platform.read-only",
15 "https://www.googleapis.com/auth/devstorage.full_control",
16 "https://www.googleapis.com/auth/devstorage.read_only",
17 "https://www.googleapis.com/auth/devstorage.read_write",
18];
19
20#[derive(Debug, Clone)]
21pub struct BigqueryClient {
22 ts: Arc<dyn TokenSource>,
23 endpoint: String,
24 http: Client,
25 debug: bool,
26}
27
28impl BigqueryClient {
29 pub(crate) fn new(ts: Arc<dyn TokenSource>, endpoint: &str, http: Client, debug: bool) -> Self {
30 Self {
31 ts,
32 endpoint: format!("{endpoint}/bigquery/v2"),
33 http,
34 debug,
35 }
36 }
37
38 pub(crate) fn endpoint(&self) -> &str {
39 self.endpoint.as_str()
40 }
41
42 pub(crate) fn http(&self) -> &Client {
43 &self.http
44 }
45
46 async fn with_headers(&self, builder: RequestBuilder) -> Result<RequestBuilder, Error> {
47 let token = self.ts.token().await.map_err(Error::TokenSource)?;
48 Ok(builder
49 .header("X-Goog-Api-Client", "rust")
50 .header(reqwest::header::USER_AGENT, "google-cloud-bigquery")
51 .header(reqwest::header::AUTHORIZATION, token))
52 }
53
54 pub async fn send<T>(&self, builder: RequestBuilder) -> Result<T, Error>
55 where
56 T: serde::de::DeserializeOwned,
57 {
58 let request = self.with_headers(builder).await?;
59 let response = request.send().await?;
60 let response = Self::check_response_status(response).await?;
61 if self.debug {
62 let text = response.text().await?;
63 tracing::info!("{}", text);
64 Ok(serde_json::from_str(text.as_str()).unwrap())
65 } else {
66 Ok(response.json().await?)
67 }
68 }
69
70 pub async fn send_get_empty(&self, builder: RequestBuilder) -> Result<(), Error> {
71 let builder = self.with_headers(builder).await?;
72 let response = builder.send().await?;
73 Self::check_response_status(response).await?;
74 Ok(())
75 }
76
77 async fn check_response_status(response: Response) -> Result<Response, Error> {
79 let error = match response.error_for_status_ref() {
81 Ok(_) => return Ok(response),
82 Err(error) => error,
83 };
84
85 Err(response
87 .json::<ErrorWrapper>()
88 .await
89 .map(|wrapper| Error::Response(wrapper.error))
90 .unwrap_or(Error::HttpClient(error)))
91 }
92}
93
94#[cfg(test)]
95pub(crate) mod test {
96 use std::str::FromStr;
97
98 use base64::engine::general_purpose::STANDARD;
99 use base64_serde::base64_serde_type;
100 use bigdecimal::BigDecimal;
101 use time::OffsetDateTime;
102
103 use google_cloud_auth::project::Config;
104 use google_cloud_auth::token::DefaultTokenSourceProvider;
105 use google_cloud_token::TokenSourceProvider;
106
107 use crate::http::bigquery_client::{BigqueryClient, SCOPES};
108 use crate::http::query;
109 use crate::http::query::value::Decodable as QueryDecodable;
110 use crate::http::table::{TableFieldMode, TableFieldSchema, TableFieldType, TableSchema};
111 use crate::http::tabledata::list::Tuple;
112 use crate::storage;
113 use crate::storage::array::ArrayRef;
114 use crate::storage::value::Decodable as StorageDecodable;
115
116 base64_serde_type!(Base64Standard, STANDARD);
117
118 #[ctor::ctor]
119 fn init() {
120 let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
121 .add_directive("google_cloud_bigquery=trace".parse().unwrap());
122 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
123 }
124
125 pub fn dataset_name(name: &str) -> String {
126 format!("gcrbq_{}", name)
127 }
128
129 pub fn bucket_name(project: &str, name: &str) -> String {
130 format!("{}_gcrbq_{}", project, name)
131 }
132
133 pub async fn create_client() -> (BigqueryClient, String) {
134 let tsp = DefaultTokenSourceProvider::new(Config::default().with_scopes(&SCOPES))
135 .await
136 .unwrap();
137 let cred = tsp.source_credentials.clone();
138 let ts = tsp.token_source();
139 let client = BigqueryClient::new(
140 ts,
141 "https://bigquery.googleapis.com",
142 reqwest_middleware::ClientBuilder::new(reqwest::Client::new()).build(),
143 false,
144 );
145 (client, cred.unwrap().project_id.unwrap())
146 }
147
148 #[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, PartialEq)]
149 pub struct TestDataStruct {
150 pub f1: bool,
151 pub f2: Vec<i64>,
152 }
153
154 impl query::value::StructDecodable for TestDataStruct {
155 fn decode(value: Tuple) -> Result<Self, query::value::Error> {
156 let col = &value.f;
157 Ok(Self {
158 f1: bool::decode(&col[0].v)?,
159 f2: Vec::<i64>::decode(&col[1].v)?,
160 })
161 }
162 }
163
164 impl storage::value::StructDecodable for TestDataStruct {
165 fn decode_arrow(col: &[ArrayRef], row_no: usize) -> Result<TestDataStruct, storage::value::Error> {
166 let f1 = bool::decode_arrow(&col[0], row_no)?;
167 let f2 = Vec::<i64>::decode_arrow(&col[1], row_no)?;
168 Ok(TestDataStruct { f1, f2 })
169 }
170 }
171
172 #[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, PartialEq)]
173 pub struct TestData {
174 pub col_string: Option<String>,
175 pub col_number: Option<BigDecimal>,
176 pub col_number_array: Vec<BigDecimal>,
177 #[serde(default, with = "time::serde::rfc3339::option")]
178 pub col_timestamp: Option<OffsetDateTime>,
179 pub col_json: Option<String>,
180 pub col_json_array: Vec<String>,
181 pub col_struct: Option<TestDataStruct>,
182 pub col_struct_array: Vec<TestDataStruct>,
183 #[serde(default, with = "Base64Standard")]
184 pub col_binary: Vec<u8>,
185 }
186
187 impl query::value::StructDecodable for TestData {
188 fn decode(value: Tuple) -> Result<Self, query::value::Error> {
189 let col = &value.f;
190 Ok(TestData {
191 col_string: Option::<String>::decode(&col[0].v)?,
192 col_number: Option::<BigDecimal>::decode(&col[1].v)?,
193 col_number_array: Vec::<BigDecimal>::decode(&col[2].v)?,
194 col_timestamp: Option::<OffsetDateTime>::decode(&col[3].v)?,
195 col_json: Option::<String>::decode(&col[4].v)?,
196 col_json_array: Vec::<String>::decode(&col[5].v)?,
197 col_struct: Option::<TestDataStruct>::decode(&col[6].v)?,
198 col_struct_array: Vec::<TestDataStruct>::decode(&col[7].v)?,
199 col_binary: Vec::<u8>::decode(&col[8].v)?,
200 })
201 }
202 }
203
204 impl storage::value::StructDecodable for TestData {
205 fn decode_arrow(col: &[ArrayRef], row_no: usize) -> Result<TestData, storage::value::Error> {
206 Ok(TestData {
207 col_string: Option::<String>::decode_arrow(&col[0], row_no)?,
208 col_number: Option::<BigDecimal>::decode_arrow(&col[1], row_no)?,
209 col_number_array: Vec::<BigDecimal>::decode_arrow(&col[2], row_no)?,
210 col_timestamp: Option::<OffsetDateTime>::decode_arrow(&col[3], row_no)?,
211 col_json: Option::<String>::decode_arrow(&col[4], row_no)?,
212 col_json_array: Vec::<String>::decode_arrow(&col[5], row_no)?,
213 col_struct: Option::<TestDataStruct>::decode_arrow(&col[6], row_no)?,
214 col_struct_array: Vec::<TestDataStruct>::decode_arrow(&col[7], row_no)?,
215 col_binary: Vec::<u8>::decode_arrow(&col[8], row_no)?,
216 })
217 }
218 }
219
220 impl TestData {
221 pub fn default(index: usize, now: OffsetDateTime) -> TestData {
222 TestData {
223 col_string: Some(format!("test_{}", index)),
224 col_number: Some(BigDecimal::from_str("-99999999999999999999999999999.999999999").unwrap()),
225 col_number_array: vec![
226 BigDecimal::from_str(
227 "578960446186580977117854925043439539266.34992332820282019728792003956564819967",
228 )
229 .unwrap(),
230 BigDecimal::from_str(
231 "-578960446186580977117854925043439539266.34992332820282019728792003956564819968",
232 )
233 .unwrap(),
234 ],
235 col_timestamp: Some(now),
236 col_json: Some("{\"field\":100}".to_string()),
237 col_json_array: vec!["{\"field\":100}".to_string(), "{\"field\":200}".to_string()],
238 col_struct: Some(TestDataStruct {
239 f1: true,
240 f2: vec![index as i64, 3, 4],
241 }),
242 col_struct_array: vec![
243 TestDataStruct {
244 f1: true,
245 f2: vec![index as i64, 5, 6],
246 },
247 TestDataStruct {
248 f1: false,
249 f2: vec![index as i64, 30, 40],
250 },
251 ],
252 col_binary: b"test".to_vec(),
253 }
254 }
255 }
256
257 pub fn create_table_schema() -> TableSchema {
258 TableSchema {
259 fields: vec![
260 TableFieldSchema {
261 name: "col_string".to_string(),
262 data_type: TableFieldType::String,
263 max_length: Some(32),
264 ..Default::default()
265 },
266 TableFieldSchema {
267 name: "col_number".to_string(),
268 data_type: TableFieldType::Numeric,
269 ..Default::default()
270 },
271 TableFieldSchema {
272 name: "col_number_array".to_string(),
273 data_type: TableFieldType::Bignumeric,
274 mode: Some(TableFieldMode::Repeated),
275 ..Default::default()
276 },
277 TableFieldSchema {
278 name: "col_timestamp".to_string(),
279 data_type: TableFieldType::Timestamp,
280 ..Default::default()
281 },
282 TableFieldSchema {
283 name: "col_json".to_string(),
284 data_type: TableFieldType::Json,
285 ..Default::default()
286 },
287 TableFieldSchema {
288 name: "col_json_array".to_string(),
289 data_type: TableFieldType::Json,
290 mode: Some(TableFieldMode::Repeated),
291 ..Default::default()
292 },
293 TableFieldSchema {
294 name: "col_struct".to_string(),
295 data_type: TableFieldType::Struct,
296 fields: Some(vec![
297 TableFieldSchema {
298 name: "f1".to_string(),
299 data_type: TableFieldType::Bool,
300 ..Default::default()
301 },
302 TableFieldSchema {
303 name: "f2".to_string(),
304 data_type: TableFieldType::Int64,
305 mode: Some(TableFieldMode::Repeated),
306 ..Default::default()
307 },
308 ]),
309 ..Default::default()
310 },
311 TableFieldSchema {
312 name: "col_struct_array".to_string(),
313 data_type: TableFieldType::Struct,
314 fields: Some(vec![
315 TableFieldSchema {
316 name: "f1".to_string(),
317 data_type: TableFieldType::Bool,
318 ..Default::default()
319 },
320 TableFieldSchema {
321 name: "f2".to_string(),
322 data_type: TableFieldType::Int64,
323 mode: Some(TableFieldMode::Repeated),
324 ..Default::default()
325 },
326 ]),
327 mode: Some(TableFieldMode::Repeated),
328 ..Default::default()
329 },
330 TableFieldSchema {
331 name: "col_binary".to_string(),
332 data_type: TableFieldType::Bytes,
333 mode: Some(TableFieldMode::Required),
334 ..Default::default()
335 },
336 ],
337 }
338 }
339}