1use std::sync::Arc;
3
4use crate::auth::Authenticator;
5use crate::error::BQError;
6use crate::model::data_format_options::DataFormatOptions;
7use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
8#[cfg(feature = "gzip")]
9use crate::model::table_data_insert_all_request::TableDataInsertAllRequestGzipped;
10use crate::model::table_data_insert_all_response::TableDataInsertAllResponse;
11use crate::model::table_data_list_response::TableDataListResponse;
12use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
13use reqwest::Client;
14
15#[cfg(feature = "gzip")]
16use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
17
18#[derive(Clone)]
20pub struct TableDataApi {
21 client: Client,
22 auth: Arc<dyn Authenticator>,
23 base_url: String,
24}
25
26impl TableDataApi {
27 pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
28 Self {
29 client,
30 auth,
31 base_url: BIG_QUERY_V2_URL.to_string(),
32 }
33 }
34
35 pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
36 self.base_url = base_url;
37 self
38 }
39
40 pub async fn insert_all(
48 &self,
49 project_id: &str,
50 dataset_id: &str,
51 table_id: &str,
52 insert_request: TableDataInsertAllRequest,
53 ) -> Result<TableDataInsertAllResponse, BQError> {
54 let req_url = format!(
55 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/insertAll",
56 base_url = self.base_url,
57 project_id = urlencode(project_id),
58 dataset_id = urlencode(dataset_id),
59 table_id = urlencode(table_id)
60 );
61
62 let access_token = self.auth.access_token().await?;
63
64 #[cfg(feature = "gzip")]
65 let request = {
66 let insert_request_gzipped = TableDataInsertAllRequestGzipped::try_from(insert_request)?;
67 self.client
68 .post(&req_url)
69 .header(CONTENT_ENCODING, "gzip")
70 .header(CONTENT_TYPE, "application/octet-stream")
71 .bearer_auth(access_token)
72 .body(insert_request_gzipped.data)
73 .build()?
74 };
75
76 #[cfg(not(feature = "gzip"))]
77 let request = self
78 .client
79 .post(&req_url)
80 .bearer_auth(access_token)
81 .json(&insert_request)
82 .build()?;
83
84 let resp = self.client.execute(request).await?;
85
86 process_response(resp).await
87 }
88
89 #[cfg(feature = "gzip")]
97 pub async fn insert_all_gzipped(
98 &self,
99 project_id: &str,
100 dataset_id: &str,
101 table_id: &str,
102 insert_request_gzipped: TableDataInsertAllRequestGzipped,
103 ) -> Result<TableDataInsertAllResponse, BQError> {
104 let req_url = format!(
105 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/insertAll",
106 base_url = self.base_url,
107 project_id = urlencode(project_id),
108 dataset_id = urlencode(dataset_id),
109 table_id = urlencode(table_id)
110 );
111
112 let access_token = self.auth.access_token().await?;
113
114 let request = self
115 .client
116 .post(&req_url)
117 .header(CONTENT_ENCODING, "gzip")
118 .header(CONTENT_TYPE, "application/octet-stream")
119 .bearer_auth(access_token)
120 .body(insert_request_gzipped.data)
121 .build()?;
122
123 let resp = self.client.execute(request).await?;
124
125 process_response(resp).await
126 }
127
128 pub async fn list(
135 &self,
136 project_id: &str,
137 dataset_id: &str,
138 table_id: &str,
139 parameters: ListQueryParameters,
140 ) -> Result<TableDataListResponse, BQError> {
141 let req_url = format!(
142 "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/data",
143 base_url = self.base_url,
144 project_id = urlencode(project_id),
145 dataset_id = urlencode(dataset_id),
146 table_id = urlencode(table_id)
147 );
148
149 let access_token = self.auth.access_token().await?;
150
151 let request = self
152 .client
153 .get(req_url.as_str())
154 .bearer_auth(access_token)
155 .query(¶meters)
156 .build()?;
157
158 let resp = self.client.execute(request).await?;
159
160 process_response(resp).await
161 }
162}
163
164#[derive(Debug, Serialize, Deserialize)]
165#[serde(rename_all = "camelCase")]
166pub struct ListQueryParameters {
167 #[serde(skip_serializing_if = "Option::is_none")]
169 pub start_index: Option<String>,
170 #[serde(skip_serializing_if = "Option::is_none")]
172 pub max_results: Option<u32>,
173 #[serde(skip_serializing_if = "Option::is_none")]
176 pub page_token: Option<String>,
177 #[serde(skip_serializing_if = "Option::is_none")]
180 pub selected_fields: Option<String>,
181 #[serde(skip_serializing_if = "Option::is_none")]
183 pub format_options: Option<DataFormatOptions>,
184}
185
186#[cfg(test)]
187mod test {
188 use crate::error::BQError;
189 use crate::model::dataset::Dataset;
190 use crate::model::field_type::FieldType;
191 use crate::model::table::Table;
192 use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
193 #[cfg(feature = "gzip")]
194 use crate::model::table_data_insert_all_request::TableDataInsertAllRequestGzipped;
195 use crate::model::table_field_schema::TableFieldSchema;
196 use crate::model::table_schema::TableSchema;
197 use crate::{env_vars, Client};
198
199 #[derive(Serialize)]
200 struct Row {
201 col1: String,
202 col2: i64,
203 col3: bool,
204 }
205
206 #[tokio::test]
207 async fn test() -> Result<(), BQError> {
208 let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
209 let dataset_id = &format!("{dataset_id}_tabledata");
210
211 let client = Client::from_service_account_key_file(sa_key).await?;
212
213 client.table().delete_if_exists(project_id, dataset_id, table_id).await;
214 client.dataset().delete_if_exists(project_id, dataset_id, true).await;
215
216 let dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
218
219 let table = dataset
220 .create_table(
221 &client,
222 Table::from_dataset(
223 &dataset,
224 table_id,
225 TableSchema::new(vec![
226 TableFieldSchema::new("col1", FieldType::String),
227 TableFieldSchema::new("col2", FieldType::Int64),
228 TableFieldSchema::new("col3", FieldType::Boolean),
229 ]),
230 ),
231 )
232 .await?;
233
234 let mut insert_request = TableDataInsertAllRequest::new();
236 insert_request.add_row(
237 None,
238 Row {
239 col1: "val1".into(),
240 col2: 2,
241 col3: false,
242 },
243 )?;
244
245 let result = client
246 .tabledata()
247 .insert_all(project_id, dataset_id, table_id, insert_request)
248 .await;
249 assert!(result.is_ok(), "Error: {:?}", result);
250
251 #[cfg(feature = "gzip")]
252 {
253 let mut insert_request = TableDataInsertAllRequest::new();
254 insert_request.add_row(
255 None,
256 Row {
257 col1: "val2".into(),
258 col2: 3,
259 col3: true,
260 },
261 )?;
262
263 let insert_request_gzipped =
264 TableDataInsertAllRequestGzipped::try_from(insert_request).expect("Failed to gzip insert request");
265
266 let result_gzipped = client
267 .tabledata()
268 .insert_all_gzipped(project_id, dataset_id, table_id, insert_request_gzipped)
269 .await;
270 assert!(result_gzipped.is_ok(), "Error: {:?}", result_gzipped);
271 }
272
273 table.delete(&client).await?;
275 dataset.delete(&client, true).await?;
276
277 Ok(())
278 }
279}