1use backon::{ExponentialBuilder, Retryable};
2use core::time::Duration;
3use google_cloud_gax::conn::{ConnectionOptions, Environment};
4use google_cloud_gax::retry::RetrySetting;
5use google_cloud_googleapis::cloud::bigquery::storage::v1::{
6 read_session, CreateReadSessionRequest, DataFormat, ReadSession,
7};
8use google_cloud_token::{TokenSource, TokenSourceProvider};
9use std::borrow::Cow;
10use std::collections::VecDeque;
11use std::fmt::Debug;
12use std::future::Future;
13use std::marker::PhantomData;
14use std::sync::Arc;
15
16use crate::grpc::apiv1::conn_pool::ConnectionManager;
17use crate::http::bigquery_client::BigqueryClient;
18use crate::http::bigquery_dataset_client::BigqueryDatasetClient;
19use crate::http::bigquery_job_client::BigqueryJobClient;
20use crate::http::bigquery_model_client::BigqueryModelClient;
21use crate::http::bigquery_routine_client::BigqueryRoutineClient;
22use crate::http::bigquery_row_access_policy_client::BigqueryRowAccessPolicyClient;
23use crate::http::bigquery_table_client::BigqueryTableClient;
24use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
25use crate::http::job::get_query_results::GetQueryResultsRequest;
26use crate::http::job::query::QueryRequest;
27use crate::http::job::{is_script, is_select_query, JobConfiguration, JobReference, JobStatistics, JobType};
28use crate::http::table::TableReference;
29use crate::query::{QueryOption, QueryResult};
30use crate::storage;
31use crate::{http, query};
32
33#[cfg(feature = "auth")]
34pub use google_cloud_auth;
35
36const JOB_RETRY_REASONS: [&str; 3] = ["backendError", "rateLimitExceeded", "internalError"];
37
38#[derive(Debug)]
39pub struct HttpClientConfig {
40 client: Option<reqwest_middleware::ClientWithMiddleware>,
41 bigquery_endpoint: Cow<'static, str>,
42 token_source_provider: Box<dyn TokenSourceProvider>,
43 debug: bool,
44}
45
46impl HttpClientConfig {
47 pub fn new_with_emulator(http_addr: impl Into<Cow<'static, str>>) -> Self {
48 Self {
49 client: None,
50 bigquery_endpoint: http_addr.into(),
51 token_source_provider: Box::new(EmptyTokenSourceProvider {}),
52 debug: false,
53 }
54 }
55
56 pub fn new(http_token_source_provider: Box<dyn TokenSourceProvider>) -> Self {
57 Self {
58 client: None,
59 bigquery_endpoint: "https://bigquery.googleapis.com".into(),
60 token_source_provider: http_token_source_provider,
61 debug: false,
62 }
63 }
64
65 pub fn with_debug(mut self, value: bool) -> Self {
66 self.debug = value;
67 self
68 }
69
70 pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
71 self.client = Some(value);
72 self
73 }
74
75 pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
76 self.bigquery_endpoint = value.into();
77 self
78 }
79
80 pub fn create_client(self) -> Arc<BigqueryClient> {
81 let ts = self.token_source_provider.token_source();
82 Arc::new(BigqueryClient::new(
83 ts,
84 self.bigquery_endpoint.as_ref(),
85 self.client
86 .unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()),
87 self.debug,
88 ))
89 }
90}
91
92#[cfg(feature = "auth")]
93impl HttpClientConfig {
94 fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> {
95 google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES)
96 }
97
98 pub fn default_token_provider() -> impl Future<
100 Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
101 > + Send
102 + 'static {
103 google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config())
104 }
105
106 pub fn default_token_provider_with(
108 credentials: google_cloud_auth::credentials::CredentialsFile,
109 ) -> impl Future<
110 Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
111 > + Send
112 + 'static {
113 google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
114 HttpClientConfig::bigquery_http_auth_config(),
115 Box::new(credentials.clone()),
116 )
117 }
118}
119
120#[derive(Debug)]
121pub struct ClientConfig {
122 http: HttpClientConfig,
123 environment: Environment,
124 streaming_read_config: ChannelConfig,
125 streaming_write_config: StreamingWriteConfig,
126}
127
128#[derive(Clone, Debug, Default)]
129pub struct StreamingWriteConfig {
130 channel_config: ChannelConfig,
131 max_insert_count: usize,
132}
133
134impl StreamingWriteConfig {
135 pub fn with_channel_config(mut self, value: ChannelConfig) -> Self {
136 self.channel_config = value;
137 self
138 }
139 pub fn with_max_insert_count(mut self, value: usize) -> Self {
140 self.max_insert_count = value;
141 self
142 }
143}
144
145#[derive(Clone, Debug)]
146pub struct ChannelConfig {
147 num_channels: usize,
149 connect_timeout: Option<Duration>,
150 timeout: Option<Duration>,
151}
152
153impl ChannelConfig {
154 pub fn with_num_channels(mut self, value: usize) -> Self {
155 self.num_channels = value;
156 self
157 }
158 pub fn with_connect_timeout(mut self, value: Duration) -> Self {
159 self.connect_timeout = Some(value);
160 self
161 }
162 pub fn with_timeout(mut self, value: Duration) -> Self {
163 self.timeout = Some(value);
164 self
165 }
166
167 async fn into_connection_manager(
168 self,
169 environment: &Environment,
170 ) -> Result<ConnectionManager, google_cloud_gax::conn::Error> {
171 ConnectionManager::new(
172 self.num_channels,
173 environment,
174 &ConnectionOptions {
175 timeout: self.timeout,
176 connect_timeout: self.connect_timeout,
177 },
178 )
179 .await
180 }
181}
182
183impl Default for ChannelConfig {
184 fn default() -> Self {
185 Self {
186 num_channels: 4,
187 connect_timeout: Some(Duration::from_secs(30)),
188 timeout: None,
189 }
190 }
191}
192
193#[derive(Debug)]
194pub struct EmptyTokenSourceProvider {}
195
196#[derive(Debug)]
197pub struct EmptyTokenSource {}
198
199#[async_trait::async_trait]
200impl TokenSource for EmptyTokenSource {
201 async fn token(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
202 Ok("".to_string())
203 }
204}
205
206impl TokenSourceProvider for EmptyTokenSourceProvider {
207 fn token_source(&self) -> Arc<dyn TokenSource> {
208 Arc::new(EmptyTokenSource {})
209 }
210}
211
212impl ClientConfig {
213 pub fn new_with_emulator(grpc_host: &str, http_addr: impl Into<Cow<'static, str>>) -> Self {
214 Self {
215 http: HttpClientConfig::new_with_emulator(http_addr),
216 environment: Environment::Emulator(grpc_host.to_string()),
217 streaming_read_config: ChannelConfig::default(),
218 streaming_write_config: StreamingWriteConfig::default(),
219 }
220 }
221
222 pub fn new(
223 http_token_source_provider: Box<dyn TokenSourceProvider>,
224 grpc_token_source_provider: Box<dyn TokenSourceProvider>,
225 ) -> Self {
226 Self {
227 http: HttpClientConfig::new(http_token_source_provider),
228 environment: Environment::GoogleCloud(grpc_token_source_provider),
229 streaming_read_config: ChannelConfig::default(),
230 streaming_write_config: StreamingWriteConfig::default(),
231 }
232 }
233
234 pub fn with_debug(mut self, value: bool) -> Self {
235 self.http.debug = value;
236 self
237 }
238
239 pub fn with_streaming_read_config(mut self, value: ChannelConfig) -> Self {
240 self.streaming_read_config = value;
241 self
242 }
243
244 pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self {
245 self.streaming_write_config = value;
246 self
247 }
248
249 pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
250 self.http.client = Some(value);
251 self
252 }
253
254 pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
255 self.http.bigquery_endpoint = value.into();
256 self
257 }
258}
259
260use crate::http::job::get::GetJobRequest;
261use crate::http::job::list::ListJobsRequest;
262
263use crate::grpc::apiv1::bigquery_client::StreamingReadClient;
264use crate::storage_write::stream::{buffered, committed, default, pending};
265use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
266
267#[cfg(feature = "auth")]
268impl ClientConfig {
269 pub async fn new_with_auth() -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
270 let ts_http = HttpClientConfig::default_token_provider().await?;
271 let ts_grpc =
272 google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_grpc_auth_config()).await?;
273 let project_id = ts_grpc.project_id.clone();
274 let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
275 Ok((config, project_id))
276 }
277
278 pub async fn new_with_credentials(
279 credentials: google_cloud_auth::credentials::CredentialsFile,
280 ) -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
281 let ts_http = HttpClientConfig::default_token_provider_with(credentials.clone()).await?;
282 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
283 Self::bigquery_grpc_auth_config(),
284 Box::new(credentials),
285 )
286 .await?;
287 let project_id = ts_grpc.project_id.clone();
288 let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
289 Ok((config, project_id))
290 }
291
292 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
293 google_cloud_auth::project::Config::default()
294 .with_audience(crate::grpc::apiv1::conn_pool::AUDIENCE)
295 .with_scopes(&crate::grpc::apiv1::conn_pool::SCOPES)
296 }
297}
298
299#[derive(thiserror::Error, Debug)]
300pub enum QueryError {
301 #[error(transparent)]
302 Storage(#[from] storage::Error),
303 #[error(transparent)]
304 JobHttp(#[from] http::error::Error),
305 #[error("job has no destination table to read : job={0:?}")]
306 NoDestinationTable(JobReference),
307 #[error("failed to resolve table for script job: no child jobs found : job={0:?}")]
308 NoChildJobs(JobReference),
309 #[error("job type must be query: job={0:?}, jobType={1:?}")]
310 InvalidJobType(JobReference, String),
311 #[error(transparent)]
312 RunQuery(#[from] query::run::Error),
313}
314
315#[derive(Clone)]
316pub struct Client {
317 dataset_client: BigqueryDatasetClient,
318 table_client: BigqueryTableClient,
319 tabledata_client: BigqueryTabledataClient,
320 job_client: BigqueryJobClient,
321 routine_client: BigqueryRoutineClient,
322 row_access_policy_client: BigqueryRowAccessPolicyClient,
323 model_client: BigqueryModelClient,
324 streaming_read_conn_pool: Arc<ConnectionManager>,
325 streaming_write_conn_pool: Arc<ConnectionManager>,
326 streaming_write_max_insert_count: usize,
327}
328
329impl Client {
330 pub async fn new(config: ClientConfig) -> Result<Self, google_cloud_gax::conn::Error> {
332 let client = config.http.create_client();
333
334 Ok(Self {
335 dataset_client: BigqueryDatasetClient::new(client.clone()),
336 table_client: BigqueryTableClient::new(client.clone()),
337 tabledata_client: BigqueryTabledataClient::new(client.clone()),
338 job_client: BigqueryJobClient::new(client.clone()),
339 routine_client: BigqueryRoutineClient::new(client.clone()),
340 row_access_policy_client: BigqueryRowAccessPolicyClient::new(client.clone()),
341 model_client: BigqueryModelClient::new(client.clone()),
342 streaming_read_conn_pool: Arc::new(
343 config
344 .streaming_read_config
345 .into_connection_manager(&config.environment)
346 .await?,
347 ),
348 streaming_write_conn_pool: Arc::new(
349 config
350 .streaming_write_config
351 .channel_config
352 .into_connection_manager(&config.environment)
353 .await?,
354 ),
355 streaming_write_max_insert_count: config.streaming_write_config.max_insert_count,
356 })
357 }
358
359 pub fn dataset(&self) -> &BigqueryDatasetClient {
362 &self.dataset_client
363 }
364
365 pub fn table(&self) -> &BigqueryTableClient {
368 &self.table_client
369 }
370
371 pub fn tabledata(&self) -> &BigqueryTabledataClient {
374 &self.tabledata_client
375 }
376
377 pub fn job(&self) -> &BigqueryJobClient {
380 &self.job_client
381 }
382
383 pub fn routine(&self) -> &BigqueryRoutineClient {
386 &self.routine_client
387 }
388
389 pub fn row_access_policy(&self) -> &BigqueryRowAccessPolicyClient {
392 &self.row_access_policy_client
393 }
394
395 pub fn model(&self) -> &BigqueryModelClient {
398 &self.model_client
399 }
400
401 pub fn pending_storage_writer(&self, table: &str) -> pending::Writer {
434 pending::Writer::new(1, self.streaming_write_conn_pool.clone(), table.to_string())
435 }
436
437 pub fn default_storage_writer(&self) -> default::Writer {
467 default::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
468 }
469
470 pub fn committed_storage_writer(&self) -> committed::Writer {
502 committed::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
503 }
504
505 pub fn buffered_storage_writer(&self) -> buffered::Writer {
537 buffered::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
538 }
539
540 pub async fn query<T>(&self, project_id: &str, request: QueryRequest) -> Result<query::Iterator<T>, QueryError>
558 where
559 T: http::query::value::StructDecodable + storage::value::StructDecodable,
560 {
561 self.query_with_option(project_id, request, QueryOption::default())
562 .await
563 }
564
565 pub async fn query_with_option<T>(
587 &self,
588 project_id: &str,
589 request: QueryRequest,
590 option: QueryOption,
591 ) -> Result<query::Iterator<T>, QueryError>
592 where
593 T: http::query::value::StructDecodable + storage::value::StructDecodable,
594 {
595 let result = self.job_client.query(project_id, &request).await?;
596 let (total_rows, page_token, rows, force_first_fetch) = if result.job_complete {
597 (
598 result.total_rows.unwrap_or_default(),
599 result.page_token,
600 result.rows.unwrap_or_default(),
601 false,
602 )
603 } else {
604 (
605 self.wait_for_query(&result.job_reference, option.retry, &request.timeout_ms)
606 .await?,
607 None,
608 vec![],
609 true,
610 )
611 };
612
613 if option.enable_storage_read && (page_token.is_none() || page_token.as_ref().unwrap().is_empty()) {
615 tracing::trace!("use storage read api for query {:?}", result.job_reference);
616 let job = self
617 .job_client
618 .get(
619 &result.job_reference.project_id,
620 &result.job_reference.job_id,
621 &GetJobRequest {
622 location: result.job_reference.location.clone(),
623 },
624 )
625 .await?;
626 let iter = self
627 .new_storage_row_iterator_from_job::<T>(job.job_reference, job.statistics, job.configuration)
628 .await?;
629 return Ok(query::Iterator {
630 inner: QueryResult::Storage(iter),
631 total_size: total_rows,
632 });
633 }
634
635 let http_query_iterator = http::query::Iterator {
636 client: self.job_client.clone(),
637 project_id: result.job_reference.project_id,
638 job_id: result.job_reference.job_id,
639 request: GetQueryResultsRequest {
640 start_index: 0,
641 page_token,
642 max_results: request.max_results,
643 timeout_ms: request.timeout_ms,
644 location: result.job_reference.location,
645 format_options: request.format_options,
646 },
647 chunk: VecDeque::from(rows),
648 total_size: total_rows,
649 force_first_fetch,
650 _marker: PhantomData,
651 };
652 Ok(query::Iterator {
653 inner: QueryResult::Http(http_query_iterator),
654 total_size: total_rows,
655 })
656 }
657
658 async fn new_storage_row_iterator_from_job<T>(
659 &self,
660 mut job: JobReference,
661 mut statistics: Option<JobStatistics>,
662 mut config: JobConfiguration,
663 ) -> Result<storage::Iterator<T>, QueryError>
664 where
665 T: http::query::value::StructDecodable + storage::value::StructDecodable,
666 {
667 loop {
668 tracing::trace!("check child job result {:?}, {:?}, {:?}", job, statistics, config);
669 let query_config = match &config.job {
670 JobType::Query(config) => config,
671 _ => return Err(QueryError::InvalidJobType(job.clone(), config.job_type.clone())),
672 };
673 if let Some(dst) = &query_config.destination_table {
674 return Ok(self.read_table(dst, None).await?);
675 }
676 if !is_script(&statistics, &config) {
677 return Err(QueryError::NoDestinationTable(job.clone()));
678 }
679 let children = self
680 .job_client
681 .list(
682 &job.project_id,
683 &ListJobsRequest {
684 parent_job_id: job.job_id.to_string(),
685 ..Default::default()
686 },
687 )
688 .await?;
689
690 let mut found = false;
691 for j in children.into_iter() {
692 if !is_select_query(&j.statistics, &j.configuration) {
693 continue;
694 }
695 job = j.job_reference;
696 statistics = j.statistics;
697 config = j.configuration;
698 found = true;
699 break;
700 }
701 if !found {
702 break;
703 }
704 }
705 Err(QueryError::NoChildJobs(job.clone()))
706 }
707
708 async fn wait_for_query(
709 &self,
710 job: &JobReference,
711 builder: ExponentialBuilder,
712 timeout_ms: &Option<i64>,
713 ) -> Result<i64, query::run::Error> {
714 let request = GetQueryResultsRequest {
716 max_results: Some(0),
717 timeout_ms: *timeout_ms,
718 location: job.location.clone(),
719 ..Default::default()
720 };
721 let action = || async {
722 tracing::debug!("waiting for job completion {:?}", job);
723 let result = self
724 .job_client
725 .get_query_results(&job.project_id, &job.job_id, &request)
726 .await
727 .map_err(query::run::Error::Http)?;
728 if result.job_complete {
729 Ok(result.total_rows)
730 } else {
731 Err(query::run::Error::JobIncomplete)
732 }
733 };
734 action
735 .retry(builder)
736 .when(|e: &query::run::Error| match e {
737 query::run::Error::JobIncomplete => true,
738 query::run::Error::Http(http::error::Error::HttpClient(_)) => true,
739 query::run::Error::Http(http::error::Error::Response(r)) => r.is_retryable(&JOB_RETRY_REASONS),
740 _ => false,
741 })
742 .await
743 }
744
745 pub async fn read_table<T>(
765 &self,
766 table: &TableReference,
767 option: Option<ReadTableOption>,
768 ) -> Result<storage::Iterator<T>, storage::Error>
769 where
770 T: storage::value::StructDecodable,
771 {
772 let option = option.unwrap_or_default();
773
774 let mut client = StreamingReadClient::new(BigQueryReadClient::new(self.streaming_read_conn_pool.conn()));
775 let read_session = client
776 .create_read_session(
777 CreateReadSessionRequest {
778 parent: format!("projects/{}", table.project_id),
779 read_session: Some(ReadSession {
780 name: "".to_string(),
781 expire_time: None,
782 data_format: DataFormat::Arrow.into(),
783 table: table.resource(),
784 table_modifiers: option.session_table_modifiers,
785 read_options: option.session_read_options,
786 streams: vec![],
787 estimated_total_bytes_scanned: 0,
788 estimated_total_physical_file_size: 0,
789 estimated_row_count: 0,
790 trace_id: "".to_string(),
791 schema: option.session_schema,
792 }),
793 max_stream_count: option.max_stream_count,
794 preferred_min_stream_count: 0,
795 },
796 option.session_retry_setting,
797 )
798 .await?
799 .into_inner();
800 storage::Iterator::new(client, read_session, option.read_rows_retry_setting).await
801 }
802}
803
804#[derive(Debug, Default, Clone)]
805pub struct ReadTableOption {
806 session_read_options: Option<read_session::TableReadOptions>,
807 session_table_modifiers: Option<read_session::TableModifiers>,
808 session_schema: Option<read_session::Schema>,
809 session_retry_setting: Option<RetrySetting>,
810 read_rows_retry_setting: Option<RetrySetting>,
811 max_stream_count: i32,
812}
813
814impl ReadTableOption {
815 pub fn with_session_read_options(mut self, value: read_session::TableReadOptions) -> Self {
816 self.session_read_options = Some(value);
817 self
818 }
819
820 pub fn with_session_table_modifiers(mut self, value: read_session::TableModifiers) -> Self {
821 self.session_table_modifiers = Some(value);
822 self
823 }
824
825 pub fn with_session_schema(mut self, value: read_session::Schema) -> Self {
826 self.session_schema = Some(value);
827 self
828 }
829
830 pub fn with_session_retry_setting(mut self, value: RetrySetting) -> Self {
831 self.session_retry_setting = Some(value);
832 self
833 }
834
835 pub fn with_read_rows_retry_setting(mut self, value: RetrySetting) -> Self {
836 self.read_rows_retry_setting = Some(value);
837 self
838 }
839
840 pub fn with_max_stream_count(mut self, value: i32) -> Self {
841 self.max_stream_count = value;
842 self
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use bigdecimal::BigDecimal;
849
850 use serial_test::serial;
851 use std::collections::HashMap;
852 use std::ops::AddAssign;
853 use std::time::Duration;
854 use time::{Date, OffsetDateTime, Time};
855
856 use google_cloud_googleapis::cloud::bigquery::storage::v1::read_session::TableReadOptions;
857
858 use crate::client::{Client, ClientConfig, ReadTableOption};
859 use crate::http::bigquery_client::test::{create_table_schema, dataset_name, TestData};
860 use crate::http::job::query::QueryRequest;
861 use crate::http::table::{Table, TableReference};
862 use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
863 use crate::http::types::{QueryParameter, QueryParameterStructType, QueryParameterType, QueryParameterValue};
864 use crate::query;
865 use crate::query::QueryOption;
866
867 #[ctor::ctor]
868 fn init() {
869 let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
870 .add_directive("google_cloud_bigquery=trace".parse().unwrap());
871 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
872 }
873
874 async fn create_client() -> (Client, String) {
875 let (client_config, project_id) = ClientConfig::new_with_auth().await.unwrap();
876 (Client::new(client_config).await.unwrap(), project_id.unwrap())
877 }
878
879 #[tokio::test]
880 #[serial]
881 async fn test_query_from_storage() {
882 let option = QueryOption::default().with_enable_storage_read(true);
883 test_query(option).await
884 }
885
886 #[tokio::test]
887 #[serial]
888 async fn test_query_from_rest() {
889 let option = QueryOption::default();
890 test_query(option).await
891 }
892
893 async fn test_query(option: QueryOption) {
894 let (client, project_id) = create_client().await;
895 let mut iterator = client
896 .query_with_option::<query::row::Row>(
897 &project_id,
898 QueryRequest {
899 max_results: Some(2),
900 query: "SELECT
901 'A',
902 TIMESTAMP_MICROS(1230219000000019),
903 100,
904 0.432899,
905 DATE(2023,9,1),
906 TIME(15, 30, 01),
907 NULL,
908 ['A','B'],
909 [TIMESTAMP_MICROS(1230219000000019), TIMESTAMP_MICROS(1230219000000020)],
910 [100,200],
911 [0.432899,0.432900],
912 [DATE(2023,9,1),DATE(2023,9,2)],
913 [TIME_ADD(TIME(15,30,1), INTERVAL 10 MICROSECOND),TIME(0, 0, 0),TIME(23,59,59)],
914 b'test',
915 true,
916 [b'test',b'test2'],
917 [true,false],
918 cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),
919 cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC),
920 cast('-9.9999999999999999999999999999999999999E+28' as NUMERIC),
921 cast('9.9999999999999999999999999999999999999E+28' as NUMERIC),
922 [cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC)]
923 ".to_string(),
924 ..Default::default()
925 },
926 option,
927 )
928 .await
929 .unwrap();
930
931 assert_eq!(1, iterator.total_size);
932
933 while let Some(row) = iterator.next().await.unwrap() {
934 let v: String = row.column(0).unwrap();
935 assert_eq!(v, "A");
936 let v: OffsetDateTime = row.column(1).unwrap();
937 assert_eq!(v.unix_timestamp_nanos(), 1230219000000019000);
938 let v: i64 = row.column(2).unwrap();
939 assert_eq!(v, 100);
940 let v: f64 = row.column(3).unwrap();
941 assert_eq!(v, 0.432899);
942 let v: Date = row.column(4).unwrap();
943 assert_eq!(v, time::macros::date!(2023 - 09 - 01));
944 let v: Time = row.column(5).unwrap();
945 assert_eq!(v, time::macros::time!(15:30:01));
946 let v: Option<String> = row.column(6).unwrap();
947 assert!(v.is_none());
948 let v: Option<OffsetDateTime> = row.column(6).unwrap();
949 assert!(v.is_none());
950 let v: Option<i64> = row.column(6).unwrap();
951 assert!(v.is_none());
952 let v: Option<f64> = row.column(6).unwrap();
953 assert!(v.is_none());
954 let v: Option<Date> = row.column(6).unwrap();
955 assert!(v.is_none());
956 let v: Option<Time> = row.column(6).unwrap();
957 assert!(v.is_none());
958 let v: Option<Vec<Time>> = row.column(6).unwrap();
959 assert!(v.is_none());
960 let v: Option<BigDecimal> = row.column(6).unwrap();
961 assert!(v.is_none());
962 let v: Option<bool> = row.column(6).unwrap();
963 assert!(v.is_none());
964 let v: Option<String> = row.column(6).unwrap();
965 assert!(v.is_none());
966 let v: Option<Vec<u8>> = row.column(6).unwrap();
967 assert!(v.is_none());
968
969 let v: Vec<String> = row.column(7).unwrap();
970 assert_eq!(v, vec!["A", "B"]);
971 let v: Vec<OffsetDateTime> = row.column(8).unwrap();
972 assert_eq!(v[0].unix_timestamp_nanos(), 1230219000000019000);
973 assert_eq!(v[1].unix_timestamp_nanos(), 1230219000000020000);
974 let v: Vec<i64> = row.column(9).unwrap();
975 assert_eq!(v, vec![100, 200]);
976 let v: Vec<f64> = row.column(10).unwrap();
977 assert_eq!(v, vec![0.432899, 0.432900]);
978 let v: Vec<Date> = row.column(11).unwrap();
979 assert_eq!(v[0], time::macros::date!(2023 - 09 - 01));
980 assert_eq!(v[1], time::macros::date!(2023 - 09 - 02));
981 let v: Vec<Time> = row.column(12).unwrap();
982 let mut tm = time::macros::time!(15:30:01);
983 tm.add_assign(Duration::from_micros(10));
984 assert_eq!(v[0], tm);
985 assert_eq!(v[1], time::macros::time!(0:0:0));
986 assert_eq!(v[2], time::macros::time!(23:59:59));
987
988 let v: Vec<u8> = row.column(13).unwrap();
989 assert_eq!(v, b"test");
990 let v: bool = row.column(14).unwrap();
991 assert!(v);
992 let v: Vec<Vec<u8>> = row.column(15).unwrap();
993 assert_eq!(v[0], b"test");
994 assert_eq!(v[1], b"test2");
995 let v: Vec<bool> = row.column(16).unwrap();
996 assert!(v[0]);
997 assert!(!v[1]);
998 let v: BigDecimal = row.column(17).unwrap();
999 assert_eq!(
1000 v.to_string(),
1001 "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1002 );
1003 let v: BigDecimal = row.column(18).unwrap();
1004 assert_eq!(
1005 v.to_string(),
1006 "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1007 );
1008 let v: BigDecimal = row.column(19).unwrap();
1009 assert_eq!(v.to_string(), "-99999999999999999999999999999.999999999");
1010 let v: BigDecimal = row.column(20).unwrap();
1011 assert_eq!(v.to_string(), "99999999999999999999999999999.999999999");
1012 let v: Vec<BigDecimal> = row.column(21).unwrap();
1013 assert_eq!(
1014 v[0].to_string(),
1015 "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1016 );
1017 assert_eq!(
1018 v[1].to_string(),
1019 "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1020 );
1021 }
1022 }
1023
1024 #[tokio::test(flavor = "multi_thread")]
1025 #[serial]
1026 async fn test_query_table_from_storage() {
1027 test_query_table(None, QueryOption::default().with_enable_storage_read(true)).await
1028 }
1029
1030 #[tokio::test(flavor = "multi_thread")]
1031 #[serial]
1032 async fn test_query_table_from_rest() {
1033 test_query_table(Some(1), QueryOption::default()).await
1034 }
1035
1036 async fn insert(client: &Client, project: &str, dataset: &str, table: &str, size: usize, now: &OffsetDateTime) {
1037 let mut table1 = Table::default();
1038 table1.table_reference.dataset_id = dataset.to_string();
1039 table1.table_reference.project_id = project.to_string();
1040 table1.table_reference.table_id = table.to_string();
1041 table1.schema = Some(create_table_schema());
1042 let _table1 = client.table_client.create(&table1).await.unwrap();
1043 let mut req = InsertAllRequest::<TestData>::default();
1044 for i in 0..size {
1045 req.rows.push(Row {
1046 insert_id: None,
1047 json: TestData::default(i, *now + Duration::from_secs(i as u64)),
1048 });
1049 }
1050 client.tabledata().insert(project, dataset, table, &req).await.unwrap();
1051 }
1052
1053 async fn test_query_table(max_results: Option<i64>, option: QueryOption) {
1054 let dataset = dataset_name("table");
1055 let (client, project_id) = create_client().await;
1056 let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1057 let table = format!("test_query_table_{}", now.unix_timestamp());
1058 insert(&client, &project_id, &dataset, &table, 3, &now).await;
1059
1060 let mut data_as_row: Vec<TestData> = vec![];
1062 let mut iterator_as_row = client
1063 .query_with_option::<query::row::Row>(
1064 &project_id,
1065 QueryRequest {
1066 max_results,
1067 query: format!("SELECT * FROM {}.{}", dataset, table),
1068 ..Default::default()
1069 },
1070 option.clone(),
1071 )
1072 .await
1073 .unwrap();
1074 while let Some(row) = iterator_as_row.next().await.unwrap() {
1075 data_as_row.push(TestData {
1076 col_string: row.column(0).unwrap(),
1077 col_number: row.column(1).unwrap(),
1078 col_number_array: row.column(2).unwrap(),
1079 col_timestamp: row.column(3).unwrap(),
1080 col_json: row.column(4).unwrap(),
1081 col_json_array: row.column(5).unwrap(),
1082 col_struct: row.column(6).unwrap(),
1083 col_struct_array: row.column(7).unwrap(),
1084 col_binary: row.column(8).unwrap(),
1085 });
1086 }
1087 let mut data_as_struct: Vec<TestData> = vec![];
1088 let mut iterator_as_struct = client
1089 .query_with_option::<TestData>(
1090 &project_id,
1091 QueryRequest {
1092 query: format!("SELECT * FROM {}.{}", dataset, table),
1093 ..Default::default()
1094 },
1095 option,
1096 )
1097 .await
1098 .unwrap();
1099 while let Some(row) = iterator_as_struct.next().await.unwrap() {
1100 data_as_struct.push(row);
1101 }
1102 assert_eq!(iterator_as_struct.total_size, 3);
1103 assert_eq!(iterator_as_row.total_size, 3);
1104 assert_eq!(data_as_struct.len(), 3);
1105 assert_eq!(data_as_row.len(), 3);
1106
1107 assert_data(&now, data_as_struct);
1108 assert_data(&now, data_as_row);
1109 }
1110
1111 #[tokio::test(flavor = "multi_thread")]
1112 #[serial]
1113 async fn test_read_table() {
1114 let dataset = dataset_name("table");
1115 let (client, project_id) = create_client().await;
1116 let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1117 let table = format!("test_read_table_{}", now.unix_timestamp());
1118 insert(&client, &project_id, &dataset, &table, 3, &now).await;
1119
1120 let table = TableReference {
1121 project_id,
1122 dataset_id: dataset.to_string(),
1123 table_id: table.to_string(),
1124 };
1125 let mut iterator_as_struct = client.read_table::<TestData>(&table, None).await.unwrap();
1126
1127 let option = ReadTableOption {
1128 session_read_options: Some(TableReadOptions {
1129 row_restriction: "col_string = \"test_0\"".to_string(),
1130 ..Default::default()
1131 }),
1132 ..Default::default()
1133 };
1134 let mut iterator_as_row = client
1135 .read_table::<crate::storage::row::Row>(&table, Some(option))
1136 .await
1137 .unwrap();
1138 let mut data_as_row: Vec<TestData> = vec![];
1139 let mut data_as_struct: Vec<TestData> = vec![];
1140 let mut finish1 = false;
1141 let mut finish2 = false;
1142 loop {
1143 tokio::select! {
1144 row = iterator_as_struct.next() => {
1145 if let Some(row) = row.unwrap() {
1146 tracing::info!("read struct some");
1147 data_as_struct.push(row);
1148 }else {
1149 tracing::info!("read struct none");
1150 finish1 = true;
1151 if finish1 && finish2 {
1152 break;
1153 }
1154 }
1155 },
1156 row = iterator_as_row.next() => {
1157 if let Some(row) = row.unwrap() {
1158 tracing::info!("read row some");
1159 data_as_row.push(TestData {
1160 col_string: row.column(0).unwrap(),
1161 col_number: row.column(1).unwrap(),
1162 col_number_array: row.column(2).unwrap(),
1163 col_timestamp: row.column(3).unwrap(),
1164 col_json: row.column(4).unwrap(),
1165 col_json_array: row.column(5).unwrap(),
1166 col_struct: row.column(6).unwrap(),
1167 col_struct_array: row.column(7).unwrap(),
1168 col_binary: row.column(8).unwrap(),
1169 } );
1170 }else {
1171 tracing::info!("read row none");
1172 finish2 = true;
1173 if finish1 && finish2 {
1174 break;
1175 }
1176 }
1177 }
1178 }
1179 }
1180 assert_eq!(data_as_struct.len(), 3);
1181 assert_eq!(data_as_row.len(), 1);
1182
1183 assert_data(&now, data_as_struct);
1184 assert_data(&now, data_as_row);
1185 }
1186
1187 #[tokio::test(flavor = "multi_thread")]
1188 #[serial]
1189 async fn test_query_job_incomplete_from_storage() {
1190 test_query_job_incomplete(None, QueryOption::default().with_enable_storage_read(true)).await
1191 }
1192
1193 #[tokio::test(flavor = "multi_thread")]
1194 #[serial]
1195 async fn test_query_job_incomplete_from_rest() {
1196 test_query_job_incomplete(Some(4999), QueryOption::default()).await
1197 }
1198
1199 async fn test_query_job_incomplete(max_results: Option<i64>, option: QueryOption) {
1200 let dataset = dataset_name("table");
1201 let (client, project_id) = create_client().await;
1202 let now = OffsetDateTime::now_utc();
1203 let table = format!("test_query_job_incomplete_{}", now.unix_timestamp());
1204 const SIZE: usize = 10000;
1205 insert(&client, &project_id, &dataset, &table, SIZE, &now).await;
1206
1207 let mut data: Vec<query::row::Row> = vec![];
1208 let mut iter = client
1209 .query_with_option::<query::row::Row>(
1210 &project_id,
1211 QueryRequest {
1212 timeout_ms: Some(5), use_query_cache: Some(false),
1214 max_results,
1215 query: format!("SELECT 1 FROM {}.{}", dataset, table),
1216 ..Default::default()
1217 },
1218 option,
1219 )
1220 .await
1221 .unwrap();
1222 while let Some(row) = iter.next().await.unwrap() {
1223 data.push(row);
1224 }
1225 assert_eq!(iter.total_size, SIZE as i64);
1226 assert_eq!(data.len(), SIZE);
1227 }
1228
1229 #[derive(Debug, Clone)]
1230 struct Val {
1231 pub val1: String,
1232 pub val2: String,
1233 }
1234
1235 #[tokio::test]
1236 #[serial]
1237 async fn test_query_with_parameter() {
1238 let array_val = [
1239 Val {
1240 val1: "val1-1".to_string(),
1241 val2: "val1-2".to_string(),
1242 },
1243 Val {
1244 val1: "val2-1".to_string(),
1245 val2: "val2-2".to_string(),
1246 },
1247 ];
1248
1249 let query_parameter = QueryParameter {
1250 name: Some("p1".to_string()),
1251 parameter_type: QueryParameterType {
1252 parameter_type: "ARRAY".to_string(),
1253 array_type: Some(Box::new(QueryParameterType {
1254 parameter_type: "STRUCT".to_string(),
1255 struct_types: Some(vec![
1256 QueryParameterStructType {
1257 name: Some("val1".to_string()),
1258 field_type: QueryParameterType {
1259 parameter_type: "STRING".to_string(),
1260 ..Default::default()
1261 },
1262 description: None,
1263 },
1264 QueryParameterStructType {
1265 name: Some("val2".to_string()),
1266 field_type: QueryParameterType {
1267 parameter_type: "STRING".to_string(),
1268 ..Default::default()
1269 },
1270 description: None,
1271 },
1272 ]),
1273 array_type: None,
1274 })),
1275 struct_types: None,
1276 },
1277 parameter_value: QueryParameterValue {
1278 array_values: Some(
1279 array_val
1280 .iter()
1281 .map(|val| {
1282 let mut param_map = HashMap::new();
1283 param_map.insert(
1284 "val1".to_string(),
1285 QueryParameterValue {
1286 value: Some(val.val1.clone()),
1287 ..Default::default()
1288 },
1289 );
1290 param_map.insert(
1291 "val2".to_string(),
1292 QueryParameterValue {
1293 value: Some(val.val2.clone()),
1294 ..Default::default()
1295 },
1296 );
1297 QueryParameterValue {
1298 struct_values: Some(param_map),
1299 value: None,
1300 array_values: None,
1301 }
1302 })
1303 .collect(),
1304 ),
1305 ..Default::default()
1306 },
1307 };
1308 let (client, project_id) = create_client().await;
1309 let mut result = client
1310 .query::<query::row::Row>(
1311 &project_id,
1312 QueryRequest {
1313 query: "
1314 WITH VAL AS (SELECT @p1 AS col1)
1315 SELECT
1316 ARRAY(SELECT val1 FROM UNNEST(col1)) AS val1,
1317 ARRAY(SELECT val2 FROM UNNEST(col1)) AS val2
1318 FROM VAL
1319 "
1320 .to_string(),
1321 query_parameters: vec![query_parameter],
1322 ..QueryRequest::default()
1323 },
1324 )
1325 .await
1326 .unwrap();
1327 let row = result.next().await.unwrap().unwrap();
1328 let col = row.column::<Vec<String>>(0).unwrap();
1329 assert_eq!(col[0], "val1-1".to_string());
1330 assert_eq!(col[1], "val2-1".to_string());
1331 let col = row.column::<Vec<String>>(1).unwrap();
1332 assert_eq!(col[0], "val1-2".to_string());
1333 assert_eq!(col[1], "val2-2".to_string());
1334 }
1335
1336 fn assert_data(now: &OffsetDateTime, data: Vec<TestData>) {
1337 for (i, d) in data.iter().enumerate() {
1338 assert_eq!(&TestData::default(i, *now + Duration::from_secs(i as u64)), d);
1339 }
1340 }
1341}
1342
1343#[cfg(test)]
1344mod emulator_tests {
1345 use crate::client::{Client, ClientConfig};
1346 use crate::http::table::{Table, TableFieldSchema, TableFieldType, TableSchema};
1347 use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
1348 use crate::http::tabledata::list::FetchDataRequest;
1349 use futures_util::StreamExt;
1350
1351 use prost::Message;
1352
1353 use std::time::SystemTime;
1354
1355 #[ignore]
1356 #[tokio::test]
1357 async fn test_emulator_use() {
1358 let config = ClientConfig::new_with_emulator("localhost:9060", "http://localhost:9050");
1359
1360 let now = SystemTime::now()
1362 .duration_since(SystemTime::UNIX_EPOCH)
1363 .unwrap()
1364 .as_secs();
1365 let client = Client::new(config).await.unwrap();
1366 let mut table1 = Table::default();
1367 table1.table_reference.dataset_id = "dataset1".to_string();
1368 table1.table_reference.project_id = "local-project".to_string();
1369 table1.table_reference.table_id = format!("table{now}").to_string();
1370 table1.schema = Some(TableSchema {
1371 fields: vec![TableFieldSchema {
1372 name: "col_string".to_string(),
1373 data_type: TableFieldType::String,
1374 ..Default::default()
1375 }],
1376 });
1377 client.table_client.create(&table1).await.unwrap();
1378
1379 let mut req = InsertAllRequest::<serde_json::Value>::default();
1381 req.rows.push(Row {
1382 insert_id: None,
1383 json: serde_json::from_str(
1384 r#"
1385 {"col_string": "test1"}
1386 "#,
1387 )
1388 .unwrap(),
1389 });
1390 client
1391 .tabledata_client
1392 .insert(
1393 &table1.table_reference.project_id,
1394 &table1.table_reference.dataset_id,
1395 &table1.table_reference.table_id,
1396 &req,
1397 )
1398 .await
1399 .unwrap();
1400
1401 let writer = client.default_storage_writer();
1403 let fqtn = &format!(
1404 "projects/local-project/datasets/dataset1/tables/{}",
1405 table1.table_reference.table_id
1406 );
1407 let stream = writer.create_write_stream(fqtn).await.unwrap();
1408
1409 let mut rows = vec![];
1410 for j in 0..5 {
1411 let data = crate::storage_write::stream::tests::TestData {
1412 col_string: format!("default_{j}"),
1413 };
1414 let mut buf = Vec::new();
1415 data.encode(&mut buf).unwrap();
1416 rows.push(crate::storage_write::stream::tests::create_append_rows_request(vec![
1417 buf.clone(),
1418 buf.clone(),
1419 buf,
1420 ]));
1421 }
1422 let mut result = stream.append_rows(rows).await.unwrap();
1423 while let Some(res) = result.next().await {
1424 let res = res.unwrap();
1425 tracing::info!("append row errors = {:?}", res.row_errors.len());
1426 }
1427
1428 let tref = &table1.table_reference;
1430 let data = client
1431 .tabledata_client
1432 .read(
1433 &tref.project_id,
1434 &tref.dataset_id,
1435 &tref.table_id,
1436 &FetchDataRequest { ..Default::default() },
1437 )
1438 .await
1439 .unwrap();
1440 assert_eq!(16, data.total_rows);
1441
1442 }
1458}