gcp_bigquery_client/
tabledata.rs

1//! Manage BigQuery streaming API.
2use std::sync::Arc;
3
4use crate::auth::Authenticator;
5use crate::error::BQError;
6use crate::model::data_format_options::DataFormatOptions;
7use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
8#[cfg(feature = "gzip")]
9use crate::model::table_data_insert_all_request::TableDataInsertAllRequestGzipped;
10use crate::model::table_data_insert_all_response::TableDataInsertAllResponse;
11use crate::model::table_data_list_response::TableDataListResponse;
12use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
13use reqwest::Client;
14
15#[cfg(feature = "gzip")]
16use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
17
18/// A table data API handler.
19#[derive(Clone)]
20pub struct TableDataApi {
21    client: Client,
22    auth: Arc<dyn Authenticator>,
23    base_url: String,
24}
25
26impl TableDataApi {
27    pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
28        Self {
29            client,
30            auth,
31            base_url: BIG_QUERY_V2_URL.to_string(),
32        }
33    }
34
35    pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
36        self.base_url = base_url;
37        self
38    }
39
40    /// Streams data into BigQuery one record at a time without needing to run a load job. Requires the WRITER dataset
41    /// role.
42    /// # Arguments
43    /// * `project_id` - Project ID of the inserted data
44    /// * `dataset_id` - Dataset ID of the inserted data
45    /// * `table_id` - Table ID of the inserted data
46    /// * `insert_request` - Data to insert.
47    pub async fn insert_all(
48        &self,
49        project_id: &str,
50        dataset_id: &str,
51        table_id: &str,
52        insert_request: TableDataInsertAllRequest,
53    ) -> Result<TableDataInsertAllResponse, BQError> {
54        let req_url = format!(
55            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/insertAll",
56            base_url = self.base_url,
57            project_id = urlencode(project_id),
58            dataset_id = urlencode(dataset_id),
59            table_id = urlencode(table_id)
60        );
61
62        let access_token = self.auth.access_token().await?;
63
64        #[cfg(feature = "gzip")]
65        let request = {
66            let insert_request_gzipped = TableDataInsertAllRequestGzipped::try_from(insert_request)?;
67            self.client
68                .post(&req_url)
69                .header(CONTENT_ENCODING, "gzip")
70                .header(CONTENT_TYPE, "application/octet-stream")
71                .bearer_auth(access_token)
72                .body(insert_request_gzipped.data)
73                .build()?
74        };
75
76        #[cfg(not(feature = "gzip"))]
77        let request = self
78            .client
79            .post(&req_url)
80            .bearer_auth(access_token)
81            .json(&insert_request)
82            .build()?;
83
84        let resp = self.client.execute(request).await?;
85
86        process_response(resp).await
87    }
88
89    /// Streams already gzipped data into BigQuery one record at a time without needing to run a load job. Requires the WRITER dataset
90    /// role.
91    /// # Arguments
92    /// * `project_id` - Project ID of the inserted data
93    /// * `dataset_id` - Dataset ID of the inserted data
94    /// * `table_id` - Table ID of the inserted data
95    /// * `insert_request_gzipped` - Gzipped data to insert.
96    #[cfg(feature = "gzip")]
97    pub async fn insert_all_gzipped(
98        &self,
99        project_id: &str,
100        dataset_id: &str,
101        table_id: &str,
102        insert_request_gzipped: TableDataInsertAllRequestGzipped,
103    ) -> Result<TableDataInsertAllResponse, BQError> {
104        let req_url = format!(
105            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/insertAll",
106            base_url = self.base_url,
107            project_id = urlencode(project_id),
108            dataset_id = urlencode(dataset_id),
109            table_id = urlencode(table_id)
110        );
111
112        let access_token = self.auth.access_token().await?;
113
114        let request = self
115            .client
116            .post(&req_url)
117            .header(CONTENT_ENCODING, "gzip")
118            .header(CONTENT_TYPE, "application/octet-stream")
119            .bearer_auth(access_token)
120            .body(insert_request_gzipped.data)
121            .build()?;
122
123        let resp = self.client.execute(request).await?;
124
125        process_response(resp).await
126    }
127
128    /// Lists the content of a table in rows.
129    /// # Arguments
130    /// * `project_id` - Project id of the table to list.
131    /// * `dataset_id` - Dataset id of the table to list.
132    /// * `table_id` - Table id of the table to list.
133    /// * `parameters` - Additional query parameters.
134    pub async fn list(
135        &self,
136        project_id: &str,
137        dataset_id: &str,
138        table_id: &str,
139        parameters: ListQueryParameters,
140    ) -> Result<TableDataListResponse, BQError> {
141        let req_url = format!(
142            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/data",
143            base_url = self.base_url,
144            project_id = urlencode(project_id),
145            dataset_id = urlencode(dataset_id),
146            table_id = urlencode(table_id)
147        );
148
149        let access_token = self.auth.access_token().await?;
150
151        let request = self
152            .client
153            .get(req_url.as_str())
154            .bearer_auth(access_token)
155            .query(&parameters)
156            .build()?;
157
158        let resp = self.client.execute(request).await?;
159
160        process_response(resp).await
161    }
162}
163
164#[derive(Debug, Serialize, Deserialize)]
165#[serde(rename_all = "camelCase")]
166pub struct ListQueryParameters {
167    /// Start row index of the table.
168    #[serde(skip_serializing_if = "Option::is_none")]
169    pub start_index: Option<String>,
170    /// Row limit of the table.
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub max_results: Option<u32>,
173    /// Page token of the request. When this token is non-empty, it indicates additional results
174    /// are available.
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub page_token: Option<String>,
177    /// Subset of fields to return, supports select into sub fields.
178    /// Example: selectedFields = "a,e.d.f";
179    #[serde(skip_serializing_if = "Option::is_none")]
180    pub selected_fields: Option<String>,
181    /// Output timestamp field value in usec int64 instead of double. Output format adjustments.
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub format_options: Option<DataFormatOptions>,
184}
185
186#[cfg(test)]
187mod test {
188    use crate::error::BQError;
189    use crate::model::dataset::Dataset;
190    use crate::model::field_type::FieldType;
191    use crate::model::table::Table;
192    use crate::model::table_data_insert_all_request::TableDataInsertAllRequest;
193    #[cfg(feature = "gzip")]
194    use crate::model::table_data_insert_all_request::TableDataInsertAllRequestGzipped;
195    use crate::model::table_field_schema::TableFieldSchema;
196    use crate::model::table_schema::TableSchema;
197    use crate::{env_vars, Client};
198
199    #[derive(Serialize)]
200    struct Row {
201        col1: String,
202        col2: i64,
203        col3: bool,
204    }
205
206    #[tokio::test]
207    async fn test() -> Result<(), BQError> {
208        let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
209        let dataset_id = &format!("{dataset_id}_tabledata");
210
211        let client = Client::from_service_account_key_file(sa_key).await?;
212
213        client.table().delete_if_exists(project_id, dataset_id, table_id).await;
214        client.dataset().delete_if_exists(project_id, dataset_id, true).await;
215
216        // Create dataset
217        let dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
218
219        let table = dataset
220            .create_table(
221                &client,
222                Table::from_dataset(
223                    &dataset,
224                    table_id,
225                    TableSchema::new(vec![
226                        TableFieldSchema::new("col1", FieldType::String),
227                        TableFieldSchema::new("col2", FieldType::Int64),
228                        TableFieldSchema::new("col3", FieldType::Boolean),
229                    ]),
230                ),
231            )
232            .await?;
233
234        // Insert data via BigQuery Streaming API
235        let mut insert_request = TableDataInsertAllRequest::new();
236        insert_request.add_row(
237            None,
238            Row {
239                col1: "val1".into(),
240                col2: 2,
241                col3: false,
242            },
243        )?;
244
245        let result = client
246            .tabledata()
247            .insert_all(project_id, dataset_id, table_id, insert_request)
248            .await;
249        assert!(result.is_ok(), "Error: {:?}", result);
250
251        #[cfg(feature = "gzip")]
252        {
253            let mut insert_request = TableDataInsertAllRequest::new();
254            insert_request.add_row(
255                None,
256                Row {
257                    col1: "val2".into(),
258                    col2: 3,
259                    col3: true,
260                },
261            )?;
262
263            let insert_request_gzipped =
264                TableDataInsertAllRequestGzipped::try_from(insert_request).expect("Failed to gzip insert request");
265
266            let result_gzipped = client
267                .tabledata()
268                .insert_all_gzipped(project_id, dataset_id, table_id, insert_request_gzipped)
269                .await;
270            assert!(result_gzipped.is_ok(), "Error: {:?}", result_gzipped);
271        }
272
273        // Remove table and dataset
274        table.delete(&client).await?;
275        dataset.delete(&client, true).await?;
276
277        Ok(())
278    }
279}