google_cloud_bigquery/http/
bigquery_tabledata_client.rs1use std::sync::Arc;
2
3use serde::Serialize;
4
5use crate::http::bigquery_client::BigqueryClient;
6use crate::http::error::Error;
7use crate::http::tabledata;
8use crate::http::tabledata::insert_all::{InsertAllRequest, InsertAllResponse};
9use crate::http::tabledata::list::{FetchDataRequest, FetchDataResponse};
10
11#[derive(Debug, Clone)]
12pub struct BigqueryTabledataClient {
13 inner: Arc<BigqueryClient>,
14}
15
16impl BigqueryTabledataClient {
17 pub fn new(inner: Arc<BigqueryClient>) -> Self {
18 Self { inner }
19 }
20
21 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
45 pub async fn insert<T: Serialize>(
46 &self,
47 project_id: &str,
48 dataset_id: &str,
49 table_id: &str,
50 req: &InsertAllRequest<T>,
51 ) -> Result<InsertAllResponse, Error> {
52 let builder = tabledata::insert_all::build(
53 self.inner.endpoint(),
54 self.inner.http(),
55 project_id,
56 dataset_id,
57 table_id,
58 req,
59 );
60 self.inner.send(builder).await
61 }
62
63 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
65 pub async fn read(
66 &self,
67 project_id: &str,
68 dataset_id: &str,
69 table_id: &str,
70 req: &FetchDataRequest,
71 ) -> Result<FetchDataResponse, Error> {
72 let builder =
73 tabledata::list::build(self.inner.endpoint(), self.inner.http(), project_id, dataset_id, table_id, req);
74 self.inner.send(builder).await
75 }
76}
77
78#[cfg(test)]
79mod test {
80
81 use std::sync::Arc;
82
83 use serial_test::serial;
84 use time::OffsetDateTime;
85
86 use crate::http::bigquery_client::test::{create_client, create_table_schema, dataset_name, TestData};
87 use crate::http::bigquery_table_client::BigqueryTableClient;
88 use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
89 use crate::http::table::Table;
90 use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
91 use crate::http::tabledata::list;
92 use crate::http::tabledata::list::FetchDataRequest;
93
94 #[tokio::test]
95 #[serial]
96 pub async fn insert_all() {
97 let dataset = dataset_name("table");
98 let (client, project) = create_client().await;
99 let client = Arc::new(client);
100 let table_client = BigqueryTableClient::new(client.clone());
101 let client = BigqueryTabledataClient::new(client.clone());
102 let mut table1 = Table::default();
103 table1.table_reference.dataset_id = dataset.to_string();
104 table1.table_reference.project_id = project.to_string();
105 table1.table_reference.table_id = format!("table_data_{}", OffsetDateTime::now_utc().unix_timestamp());
106 table1.schema = Some(create_table_schema());
107 let table1 = table_client.create(&table1).await.unwrap();
108 let ref1 = table1.table_reference;
109
110 let mut req = InsertAllRequest::<serde_json::Value>::default();
112 req.rows.push(Row {
113 insert_id: None,
114 json: serde_json::from_str(
115 r#"
116 {"col_string": "test1", "col_number": 1, "col_number_array": [1,2,3], "col_timestamp":"2022-10-23T00:00:00", "col_json":"{\"field\":100}","col_json_array":["{\"field\":100}","{\"field\":200}"],"col_struct": {"f1":true, "f2":[3,4]},"col_struct_array": [{"f1":true, "f2":[3,4]},{"f1":false, "f2":[30,40]}], "col_binary": "dGVzdAo="}
117 "#,
118 )
119 .unwrap(),
120 });
121 let res = client
122 .insert(ref1.project_id.as_str(), ref1.dataset_id.as_str(), ref1.table_id.as_str(), &req)
123 .await
124 .unwrap();
125 assert!(res.insert_errors.is_none());
126
127 let mut req2 = InsertAllRequest::<TestData>::default();
129 req2.rows.push(Row {
130 insert_id: None,
131 json: TestData::default(1, OffsetDateTime::now_utc()),
132 });
133 let res2 = client
134 .insert(
135 ref1.project_id.as_str(),
136 ref1.dataset_id.as_str(),
137 ref1.table_id.as_str(),
138 &req2,
139 )
140 .await
141 .unwrap();
142 assert!(res2.insert_errors.is_none());
143
144 table_client
145 .delete(ref1.project_id.as_str(), ref1.dataset_id.as_str(), ref1.table_id.as_str())
146 .await
147 .unwrap();
148 }
149
150 #[tokio::test]
151 #[serial]
152 pub async fn read_all() {
153 let dataset = dataset_name("job");
154 let (client, project) = create_client().await;
155 let client = Arc::new(client);
156 let client = BigqueryTabledataClient::new(client.clone());
157
158 let mut fetch_request = FetchDataRequest {
160 max_results: Some(500),
161 ..Default::default()
162 };
163 let mut data: Vec<list::Tuple> = vec![];
164 loop {
165 let result = client
166 .read(project.as_str(), dataset.as_str(), "reading_data", &fetch_request)
167 .await
168 .unwrap();
169 if let Some(rows) = result.rows {
170 data.extend(rows);
171 }
172 if result.page_token.is_none() {
173 break;
174 }
175 fetch_request.page_token = result.page_token
176 }
177 assert_eq!(data.len(), 1000, "{:?}", data.pop());
178 }
179}