google_cloud_bigquery/http/
bigquery_tabledata_client.rs

1use std::sync::Arc;
2
3use serde::Serialize;
4
5use crate::http::bigquery_client::BigqueryClient;
6use crate::http::error::Error;
7use crate::http::tabledata;
8use crate::http::tabledata::insert_all::{InsertAllRequest, InsertAllResponse};
9use crate::http::tabledata::list::{FetchDataRequest, FetchDataResponse};
10
11#[derive(Debug, Clone)]
12pub struct BigqueryTabledataClient {
13    inner: Arc<BigqueryClient>,
14}
15
16impl BigqueryTabledataClient {
17    pub fn new(inner: Arc<BigqueryClient>) -> Self {
18        Self { inner }
19    }
20
21    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insert
22    /// ```rust
23    /// use google_cloud_bigquery::http::tabledata::insert_all::{InsertAllRequest, Row};
24    /// use google_cloud_bigquery::http::bigquery_tabledata_client::BigqueryTabledataClient;
25    ///
26    /// #[derive(serde::Serialize)]
27    /// pub struct TestData {
28    ///     pub col1: String,
29    /// }
30    ///
31    /// async fn run(client: &BigqueryTabledataClient, project_id: &str, data: TestData) {
32    ///     let data1 = Row {
33    ///         insert_id: None,
34    ///         json: data,
35    ///     };
36    ///     let request = InsertAllRequest {
37    ///         rows: vec![data1],
38    ///         ..Default::default()
39    ///     };
40    ///     let result = client.insert(project_id, "dataset", "table", &request).await.unwrap();
41    ///     let error = result.insert_errors;
42    /// }
43    /// ```
44    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
45    pub async fn insert<T: Serialize>(
46        &self,
47        project_id: &str,
48        dataset_id: &str,
49        table_id: &str,
50        req: &InsertAllRequest<T>,
51    ) -> Result<InsertAllResponse, Error> {
52        let builder = tabledata::insert_all::build(
53            self.inner.endpoint(),
54            self.inner.http(),
55            project_id,
56            dataset_id,
57            table_id,
58            req,
59        );
60        self.inner.send(builder).await
61    }
62
63    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
64    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
65    pub async fn read(
66        &self,
67        project_id: &str,
68        dataset_id: &str,
69        table_id: &str,
70        req: &FetchDataRequest,
71    ) -> Result<FetchDataResponse, Error> {
72        let builder =
73            tabledata::list::build(self.inner.endpoint(), self.inner.http(), project_id, dataset_id, table_id, req);
74        self.inner.send(builder).await
75    }
76}
77
78#[cfg(test)]
79mod test {
80
81    use std::sync::Arc;
82
83    use serial_test::serial;
84    use time::OffsetDateTime;
85
86    use crate::http::bigquery_client::test::{create_client, create_table_schema, dataset_name, TestData};
87    use crate::http::bigquery_table_client::BigqueryTableClient;
88    use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
89    use crate::http::table::Table;
90    use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
91    use crate::http::tabledata::list;
92    use crate::http::tabledata::list::FetchDataRequest;
93
94    #[tokio::test]
95    #[serial]
96    pub async fn insert_all() {
97        let dataset = dataset_name("table");
98        let (client, project) = create_client().await;
99        let client = Arc::new(client);
100        let table_client = BigqueryTableClient::new(client.clone());
101        let client = BigqueryTabledataClient::new(client.clone());
102        let mut table1 = Table::default();
103        table1.table_reference.dataset_id = dataset.to_string();
104        table1.table_reference.project_id = project.to_string();
105        table1.table_reference.table_id = format!("table_data_{}", OffsetDateTime::now_utc().unix_timestamp());
106        table1.schema = Some(create_table_schema());
107        let table1 = table_client.create(&table1).await.unwrap();
108        let ref1 = table1.table_reference;
109
110        // insert as json string
111        let mut req = InsertAllRequest::<serde_json::Value>::default();
112        req.rows.push(Row {
113            insert_id: None,
114            json: serde_json::from_str(
115                r#"
116                {"col_string": "test1", "col_number": 1, "col_number_array": [1,2,3], "col_timestamp":"2022-10-23T00:00:00", "col_json":"{\"field\":100}","col_json_array":["{\"field\":100}","{\"field\":200}"],"col_struct": {"f1":true, "f2":[3,4]},"col_struct_array": [{"f1":true, "f2":[3,4]},{"f1":false, "f2":[30,40]}], "col_binary": "dGVzdAo="}
117            "#,
118            )
119                .unwrap(),
120        });
121        let res = client
122            .insert(ref1.project_id.as_str(), ref1.dataset_id.as_str(), ref1.table_id.as_str(), &req)
123            .await
124            .unwrap();
125        assert!(res.insert_errors.is_none());
126
127        // isnert as struct
128        let mut req2 = InsertAllRequest::<TestData>::default();
129        req2.rows.push(Row {
130            insert_id: None,
131            json: TestData::default(1, OffsetDateTime::now_utc()),
132        });
133        let res2 = client
134            .insert(
135                ref1.project_id.as_str(),
136                ref1.dataset_id.as_str(),
137                ref1.table_id.as_str(),
138                &req2,
139            )
140            .await
141            .unwrap();
142        assert!(res2.insert_errors.is_none());
143
144        table_client
145            .delete(ref1.project_id.as_str(), ref1.dataset_id.as_str(), ref1.table_id.as_str())
146            .await
147            .unwrap();
148    }
149
150    #[tokio::test]
151    #[serial]
152    pub async fn read_all() {
153        let dataset = dataset_name("job");
154        let (client, project) = create_client().await;
155        let client = Arc::new(client);
156        let client = BigqueryTabledataClient::new(client.clone());
157
158        // fetch
159        let mut fetch_request = FetchDataRequest {
160            max_results: Some(500),
161            ..Default::default()
162        };
163        let mut data: Vec<list::Tuple> = vec![];
164        loop {
165            let result = client
166                .read(project.as_str(), dataset.as_str(), "reading_data", &fetch_request)
167                .await
168                .unwrap();
169            if let Some(rows) = result.rows {
170                data.extend(rows);
171            }
172            if result.page_token.is_none() {
173                break;
174            }
175            fetch_request.page_token = result.page_token
176        }
177        assert_eq!(data.len(), 1000, "{:?}", data.pop());
178    }
179}