1use backon::{ExponentialBuilder, Retryable};
2use core::time::Duration;
3use google_cloud_gax::conn::{ConnectionOptions, Environment};
4use google_cloud_gax::retry::RetrySetting;
5use google_cloud_googleapis::cloud::bigquery::storage::v1::{
6 read_session, CreateReadSessionRequest, DataFormat, ReadSession,
7};
8use std::borrow::Cow;
9use std::collections::VecDeque;
10use std::fmt::Debug;
11use std::future::Future;
12use std::marker::PhantomData;
13use std::sync::Arc;
14use token_source::{TokenSource, TokenSourceProvider};
15
16use crate::grpc::apiv1::conn_pool::ConnectionManager;
17use crate::http::bigquery_client::BigqueryClient;
18use crate::http::bigquery_dataset_client::BigqueryDatasetClient;
19use crate::http::bigquery_job_client::BigqueryJobClient;
20use crate::http::bigquery_model_client::BigqueryModelClient;
21use crate::http::bigquery_routine_client::BigqueryRoutineClient;
22use crate::http::bigquery_row_access_policy_client::BigqueryRowAccessPolicyClient;
23use crate::http::bigquery_table_client::BigqueryTableClient;
24use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
25use crate::http::job::get_query_results::GetQueryResultsRequest;
26use crate::http::job::query::QueryRequest;
27use crate::http::job::{is_script, is_select_query, JobConfiguration, JobReference, JobStatistics, JobType};
28use crate::http::table::TableReference;
29use crate::query::{QueryOption, QueryResult};
30use crate::storage;
31use crate::{http, query};
32
33#[cfg(feature = "auth")]
34pub use google_cloud_auth;
35
36const JOB_RETRY_REASONS: [&str; 3] = ["backendError", "rateLimitExceeded", "internalError"];
37
38#[derive(Debug)]
39pub struct HttpClientConfig {
40 client: Option<reqwest_middleware::ClientWithMiddleware>,
41 bigquery_endpoint: Cow<'static, str>,
42 token_source_provider: Box<dyn TokenSourceProvider>,
43 debug: bool,
44}
45
46impl HttpClientConfig {
47 pub fn new_with_emulator(http_addr: impl Into<Cow<'static, str>>) -> Self {
48 Self {
49 client: None,
50 bigquery_endpoint: http_addr.into(),
51 token_source_provider: Box::new(EmptyTokenSourceProvider {}),
52 debug: false,
53 }
54 }
55
56 pub fn new(http_token_source_provider: Box<dyn TokenSourceProvider>) -> Self {
57 Self {
58 client: None,
59 bigquery_endpoint: "https://bigquery.googleapis.com".into(),
60 token_source_provider: http_token_source_provider,
61 debug: false,
62 }
63 }
64
65 pub fn with_debug(mut self, value: bool) -> Self {
66 self.debug = value;
67 self
68 }
69
70 pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
71 self.client = Some(value);
72 self
73 }
74
75 pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
76 self.bigquery_endpoint = value.into();
77 self
78 }
79
80 pub fn create_client(self) -> Arc<BigqueryClient> {
81 let ts = self.token_source_provider.token_source();
82 Arc::new(BigqueryClient::new(
83 ts,
84 self.bigquery_endpoint.as_ref(),
85 self.client
86 .unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()),
87 self.debug,
88 ))
89 }
90}
91
92#[cfg(feature = "auth")]
93impl HttpClientConfig {
94 fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> {
95 google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES)
96 }
97
98 pub fn default_token_provider() -> impl Future<
100 Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
101 > + Send
102 + 'static {
103 google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config())
104 }
105
106 pub fn default_token_provider_with(
108 credentials: google_cloud_auth::credentials::CredentialsFile,
109 ) -> impl Future<
110 Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
111 > + Send
112 + 'static {
113 google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
114 HttpClientConfig::bigquery_http_auth_config(),
115 Box::new(credentials.clone()),
116 )
117 }
118}
119
120#[derive(Debug)]
121pub struct ClientConfig {
122 http: HttpClientConfig,
123 environment: Environment,
124 streaming_read_config: ChannelConfig,
125 streaming_write_config: StreamingWriteConfig,
126}
127
128#[derive(Clone, Debug, Default)]
129pub struct StreamingWriteConfig {
130 channel_config: ChannelConfig,
131 max_insert_count: usize,
132}
133
134impl StreamingWriteConfig {
135 pub fn with_channel_config(mut self, value: ChannelConfig) -> Self {
136 self.channel_config = value;
137 self
138 }
139 pub fn with_max_insert_count(mut self, value: usize) -> Self {
140 self.max_insert_count = value;
141 self
142 }
143}
144
145#[derive(Clone, Debug)]
146pub struct ChannelConfig {
147 num_channels: usize,
149 connect_timeout: Option<Duration>,
150 timeout: Option<Duration>,
151 http2_keep_alive_interval: Option<Duration>,
152 keep_alive_timeout: Option<Duration>,
153 keep_alive_while_idle: Option<bool>,
154}
155
156impl ChannelConfig {
157 pub fn with_num_channels(mut self, value: usize) -> Self {
158 self.num_channels = value;
159 self
160 }
161 pub fn with_connect_timeout(mut self, value: Duration) -> Self {
162 self.connect_timeout = Some(value);
163 self
164 }
165 pub fn with_timeout(mut self, value: Duration) -> Self {
166 self.timeout = Some(value);
167 self
168 }
169 pub fn with_http2_keep_alive_interval(mut self, value: Duration) -> Self {
170 self.http2_keep_alive_interval = Some(value);
171 self
172 }
173 pub fn with_keep_alive_timeout(mut self, value: Duration) -> Self {
174 self.keep_alive_timeout = Some(value);
175 self
176 }
177 pub fn with_keep_alive_while_idle(mut self, value: bool) -> Self {
178 self.keep_alive_while_idle = Some(value);
179 self
180 }
181
182 async fn into_connection_manager(
183 self,
184 environment: &Environment,
185 ) -> Result<ConnectionManager, google_cloud_gax::conn::Error> {
186 ConnectionManager::new(
187 self.num_channels,
188 environment,
189 &ConnectionOptions {
190 timeout: self.timeout,
191 connect_timeout: self.connect_timeout,
192 http2_keep_alive_interval: self.http2_keep_alive_interval,
193 keep_alive_timeout: self.keep_alive_timeout,
194 keep_alive_while_idle: self.keep_alive_while_idle,
195 },
196 )
197 .await
198 }
199}
200
201impl Default for ChannelConfig {
202 fn default() -> Self {
203 Self {
204 num_channels: 4,
205 connect_timeout: Some(Duration::from_secs(30)),
206 timeout: None,
207 http2_keep_alive_interval: None,
208 keep_alive_timeout: None,
209 keep_alive_while_idle: None,
210 }
211 }
212}
213
214#[derive(Debug)]
215pub struct EmptyTokenSourceProvider {}
216
217#[derive(Debug)]
218pub struct EmptyTokenSource {}
219
220#[async_trait::async_trait]
221impl TokenSource for EmptyTokenSource {
222 async fn token(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
223 Ok("".to_string())
224 }
225}
226
227impl TokenSourceProvider for EmptyTokenSourceProvider {
228 fn token_source(&self) -> Arc<dyn TokenSource> {
229 Arc::new(EmptyTokenSource {})
230 }
231}
232
233impl ClientConfig {
234 pub fn new_with_emulator(grpc_host: &str, http_addr: impl Into<Cow<'static, str>>) -> Self {
235 Self {
236 http: HttpClientConfig::new_with_emulator(http_addr),
237 environment: Environment::Emulator(grpc_host.to_string()),
238 streaming_read_config: ChannelConfig::default(),
239 streaming_write_config: StreamingWriteConfig::default(),
240 }
241 }
242
243 pub fn new(
244 http_token_source_provider: Box<dyn TokenSourceProvider>,
245 grpc_token_source_provider: Box<dyn TokenSourceProvider>,
246 ) -> Self {
247 Self {
248 http: HttpClientConfig::new(http_token_source_provider),
249 environment: Environment::GoogleCloud(grpc_token_source_provider),
250 streaming_read_config: ChannelConfig::default(),
251 streaming_write_config: StreamingWriteConfig::default(),
252 }
253 }
254
255 pub fn with_debug(mut self, value: bool) -> Self {
256 self.http.debug = value;
257 self
258 }
259
260 pub fn with_streaming_read_config(mut self, value: ChannelConfig) -> Self {
261 self.streaming_read_config = value;
262 self
263 }
264
265 pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self {
266 self.streaming_write_config = value;
267 self
268 }
269
270 pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
271 self.http.client = Some(value);
272 self
273 }
274
275 pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
276 self.http.bigquery_endpoint = value.into();
277 self
278 }
279}
280
281use crate::http::job::get::GetJobRequest;
282use crate::http::job::list::ListJobsRequest;
283
284use crate::grpc::apiv1::bigquery_client::StreamingReadClient;
285use crate::storage_write::stream::{buffered, committed, default, pending};
286use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
287
288#[cfg(feature = "auth")]
289impl ClientConfig {
290 pub async fn new_with_auth() -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
291 let ts_http = HttpClientConfig::default_token_provider().await?;
292 let ts_grpc =
293 google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_grpc_auth_config()).await?;
294 let project_id = ts_grpc.project_id.clone();
295 let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
296 Ok((config, project_id))
297 }
298
299 pub async fn new_with_credentials(
300 credentials: google_cloud_auth::credentials::CredentialsFile,
301 ) -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
302 let ts_http = HttpClientConfig::default_token_provider_with(credentials.clone()).await?;
303 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
304 Self::bigquery_grpc_auth_config(),
305 Box::new(credentials),
306 )
307 .await?;
308 let project_id = ts_grpc.project_id.clone();
309 let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
310 Ok((config, project_id))
311 }
312
313 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
314 google_cloud_auth::project::Config::default()
315 .with_audience(crate::grpc::apiv1::conn_pool::AUDIENCE)
316 .with_scopes(&crate::grpc::apiv1::conn_pool::SCOPES)
317 }
318}
319
320#[derive(thiserror::Error, Debug)]
321pub enum QueryError {
322 #[error(transparent)]
323 Storage(#[from] storage::Error),
324 #[error(transparent)]
325 JobHttp(#[from] http::error::Error),
326 #[error("job has no destination table to read : job={0:?}")]
327 NoDestinationTable(JobReference),
328 #[error("failed to resolve table for script job: no child jobs found : job={0:?}")]
329 NoChildJobs(JobReference),
330 #[error("job type must be query: job={0:?}, jobType={1:?}")]
331 InvalidJobType(JobReference, String),
332 #[error(transparent)]
333 RunQuery(#[from] query::run::Error),
334}
335
336#[derive(Clone)]
337pub struct Client {
338 dataset_client: BigqueryDatasetClient,
339 table_client: BigqueryTableClient,
340 tabledata_client: BigqueryTabledataClient,
341 job_client: BigqueryJobClient,
342 routine_client: BigqueryRoutineClient,
343 row_access_policy_client: BigqueryRowAccessPolicyClient,
344 model_client: BigqueryModelClient,
345 streaming_read_conn_pool: Arc<ConnectionManager>,
346 streaming_write_conn_pool: Arc<ConnectionManager>,
347 streaming_write_max_insert_count: usize,
348}
349
350impl Client {
351 pub async fn new(config: ClientConfig) -> Result<Self, google_cloud_gax::conn::Error> {
353 let client = config.http.create_client();
354
355 Ok(Self {
356 dataset_client: BigqueryDatasetClient::new(client.clone()),
357 table_client: BigqueryTableClient::new(client.clone()),
358 tabledata_client: BigqueryTabledataClient::new(client.clone()),
359 job_client: BigqueryJobClient::new(client.clone()),
360 routine_client: BigqueryRoutineClient::new(client.clone()),
361 row_access_policy_client: BigqueryRowAccessPolicyClient::new(client.clone()),
362 model_client: BigqueryModelClient::new(client.clone()),
363 streaming_read_conn_pool: Arc::new(
364 config
365 .streaming_read_config
366 .into_connection_manager(&config.environment)
367 .await?,
368 ),
369 streaming_write_conn_pool: Arc::new(
370 config
371 .streaming_write_config
372 .channel_config
373 .into_connection_manager(&config.environment)
374 .await?,
375 ),
376 streaming_write_max_insert_count: config.streaming_write_config.max_insert_count,
377 })
378 }
379
380 pub fn dataset(&self) -> &BigqueryDatasetClient {
383 &self.dataset_client
384 }
385
386 pub fn table(&self) -> &BigqueryTableClient {
389 &self.table_client
390 }
391
392 pub fn tabledata(&self) -> &BigqueryTabledataClient {
395 &self.tabledata_client
396 }
397
398 pub fn job(&self) -> &BigqueryJobClient {
401 &self.job_client
402 }
403
404 pub fn routine(&self) -> &BigqueryRoutineClient {
407 &self.routine_client
408 }
409
410 pub fn row_access_policy(&self) -> &BigqueryRowAccessPolicyClient {
413 &self.row_access_policy_client
414 }
415
416 pub fn model(&self) -> &BigqueryModelClient {
419 &self.model_client
420 }
421
422 pub fn pending_storage_writer(&self, table: &str) -> pending::Writer {
455 pending::Writer::new(1, self.streaming_write_conn_pool.clone(), table.to_string())
456 }
457
458 pub fn default_storage_writer(&self) -> default::Writer {
488 default::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
489 }
490
491 pub fn committed_storage_writer(&self) -> committed::Writer {
523 committed::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
524 }
525
526 pub fn buffered_storage_writer(&self) -> buffered::Writer {
558 buffered::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
559 }
560
561 pub async fn query<T>(&self, project_id: &str, request: QueryRequest) -> Result<query::Iterator<T>, QueryError>
579 where
580 T: http::query::value::StructDecodable + storage::value::StructDecodable,
581 {
582 self.query_with_option(project_id, request, QueryOption::default())
583 .await
584 }
585
586 pub async fn query_with_option<T>(
608 &self,
609 project_id: &str,
610 request: QueryRequest,
611 option: QueryOption,
612 ) -> Result<query::Iterator<T>, QueryError>
613 where
614 T: http::query::value::StructDecodable + storage::value::StructDecodable,
615 {
616 let result = self.job_client.query(project_id, &request).await?;
617 let (total_rows, page_token, rows, force_first_fetch) = if result.job_complete {
618 (
619 result.total_rows.unwrap_or_default(),
620 result.page_token,
621 result.rows.unwrap_or_default(),
622 false,
623 )
624 } else {
625 (
626 self.wait_for_query(&result.job_reference, option.retry, &request.timeout_ms)
627 .await?,
628 None,
629 vec![],
630 true,
631 )
632 };
633
634 if option.enable_storage_read && (page_token.is_none() || page_token.as_ref().unwrap().is_empty()) {
636 tracing::trace!("use storage read api for query {:?}", result.job_reference);
637 let job = self
638 .job_client
639 .get(
640 &result.job_reference.project_id,
641 &result.job_reference.job_id,
642 &GetJobRequest {
643 location: result.job_reference.location.clone(),
644 },
645 )
646 .await?;
647 let iter = self
648 .new_storage_row_iterator_from_job::<T>(job.job_reference, job.statistics, job.configuration)
649 .await?;
650 return Ok(query::Iterator {
651 inner: QueryResult::Storage(iter),
652 total_size: total_rows,
653 });
654 }
655
656 let http_query_iterator = http::query::Iterator {
657 client: self.job_client.clone(),
658 project_id: result.job_reference.project_id,
659 job_id: result.job_reference.job_id,
660 request: GetQueryResultsRequest {
661 start_index: 0,
662 page_token,
663 max_results: request.max_results,
664 timeout_ms: request.timeout_ms,
665 location: result.job_reference.location,
666 format_options: request.format_options,
667 },
668 chunk: VecDeque::from(rows),
669 total_size: total_rows,
670 force_first_fetch,
671 _marker: PhantomData,
672 };
673 Ok(query::Iterator {
674 inner: QueryResult::Http(http_query_iterator),
675 total_size: total_rows,
676 })
677 }
678
679 async fn new_storage_row_iterator_from_job<T>(
680 &self,
681 mut job: JobReference,
682 mut statistics: Option<JobStatistics>,
683 mut config: JobConfiguration,
684 ) -> Result<storage::Iterator<T>, QueryError>
685 where
686 T: http::query::value::StructDecodable + storage::value::StructDecodable,
687 {
688 loop {
689 tracing::trace!("check child job result {:?}, {:?}, {:?}", job, statistics, config);
690 let query_config = match &config.job {
691 JobType::Query(config) => config,
692 _ => return Err(QueryError::InvalidJobType(job.clone(), config.job_type.clone())),
693 };
694 if let Some(dst) = &query_config.destination_table {
695 return Ok(self.read_table(dst, None).await?);
696 }
697 if !is_script(&statistics, &config) {
698 return Err(QueryError::NoDestinationTable(job.clone()));
699 }
700 let children = self
701 .job_client
702 .list(
703 &job.project_id,
704 &ListJobsRequest {
705 parent_job_id: job.job_id.to_string(),
706 ..Default::default()
707 },
708 )
709 .await?;
710
711 let mut found = false;
712 for j in children.into_iter() {
713 if !is_select_query(&j.statistics, &j.configuration) {
714 continue;
715 }
716 job = j.job_reference;
717 statistics = j.statistics;
718 config = j.configuration;
719 found = true;
720 break;
721 }
722 if !found {
723 break;
724 }
725 }
726 Err(QueryError::NoChildJobs(job.clone()))
727 }
728
729 async fn wait_for_query(
730 &self,
731 job: &JobReference,
732 builder: ExponentialBuilder,
733 timeout_ms: &Option<i64>,
734 ) -> Result<i64, query::run::Error> {
735 let request = GetQueryResultsRequest {
737 max_results: Some(0),
738 timeout_ms: *timeout_ms,
739 location: job.location.clone(),
740 ..Default::default()
741 };
742 let action = || async {
743 tracing::debug!("waiting for job completion {:?}", job);
744 let result = self
745 .job_client
746 .get_query_results(&job.project_id, &job.job_id, &request)
747 .await
748 .map_err(query::run::Error::Http)?;
749 if result.job_complete {
750 Ok(result.total_rows)
751 } else {
752 Err(query::run::Error::JobIncomplete)
753 }
754 };
755 action
756 .retry(builder)
757 .when(|e: &query::run::Error| match e {
758 query::run::Error::JobIncomplete => true,
759 query::run::Error::Http(http::error::Error::HttpClient(_)) => true,
760 query::run::Error::Http(http::error::Error::Response(r)) => r.is_retryable(&JOB_RETRY_REASONS),
761 _ => false,
762 })
763 .await
764 }
765
766 pub async fn read_table<T>(
786 &self,
787 table: &TableReference,
788 option: Option<ReadTableOption>,
789 ) -> Result<storage::Iterator<T>, storage::Error>
790 where
791 T: storage::value::StructDecodable,
792 {
793 let option = option.unwrap_or_default();
794
795 let mut client = StreamingReadClient::new(BigQueryReadClient::new(self.streaming_read_conn_pool.conn()));
796 let read_session = client
797 .create_read_session(
798 CreateReadSessionRequest {
799 parent: format!("projects/{}", table.project_id),
800 read_session: Some(ReadSession {
801 name: "".to_string(),
802 expire_time: None,
803 data_format: DataFormat::Arrow.into(),
804 table: table.resource(),
805 table_modifiers: option.session_table_modifiers,
806 read_options: option.session_read_options,
807 streams: vec![],
808 estimated_total_bytes_scanned: 0,
809 estimated_total_physical_file_size: 0,
810 estimated_row_count: 0,
811 trace_id: "".to_string(),
812 schema: option.session_schema,
813 }),
814 max_stream_count: option.max_stream_count,
815 preferred_min_stream_count: 0,
816 },
817 option.session_retry_setting,
818 )
819 .await?
820 .into_inner();
821 storage::Iterator::new(client, read_session, option.read_rows_retry_setting).await
822 }
823}
824
825#[derive(Debug, Default, Clone)]
826pub struct ReadTableOption {
827 session_read_options: Option<read_session::TableReadOptions>,
828 session_table_modifiers: Option<read_session::TableModifiers>,
829 session_schema: Option<read_session::Schema>,
830 session_retry_setting: Option<RetrySetting>,
831 read_rows_retry_setting: Option<RetrySetting>,
832 max_stream_count: i32,
833}
834
835impl ReadTableOption {
836 pub fn with_session_read_options(mut self, value: read_session::TableReadOptions) -> Self {
837 self.session_read_options = Some(value);
838 self
839 }
840
841 pub fn with_session_table_modifiers(mut self, value: read_session::TableModifiers) -> Self {
842 self.session_table_modifiers = Some(value);
843 self
844 }
845
846 pub fn with_session_schema(mut self, value: read_session::Schema) -> Self {
847 self.session_schema = Some(value);
848 self
849 }
850
851 pub fn with_session_retry_setting(mut self, value: RetrySetting) -> Self {
852 self.session_retry_setting = Some(value);
853 self
854 }
855
856 pub fn with_read_rows_retry_setting(mut self, value: RetrySetting) -> Self {
857 self.read_rows_retry_setting = Some(value);
858 self
859 }
860
861 pub fn with_max_stream_count(mut self, value: i32) -> Self {
862 self.max_stream_count = value;
863 self
864 }
865}
866
867#[cfg(test)]
868mod tests {
869 use bigdecimal::BigDecimal;
870
871 use serial_test::serial;
872 use std::collections::HashMap;
873 use std::ops::AddAssign;
874 use std::time::Duration;
875 use time::{Date, OffsetDateTime, Time};
876
877 use google_cloud_googleapis::cloud::bigquery::storage::v1::read_session::TableReadOptions;
878
879 use crate::client::{Client, ClientConfig, ReadTableOption};
880 use crate::http::bigquery_client::test::{create_table_schema, dataset_name, TestData};
881 use crate::http::job::query::QueryRequest;
882 use crate::http::table::{Table, TableReference};
883 use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
884 use crate::http::types::{QueryParameter, QueryParameterStructType, QueryParameterType, QueryParameterValue};
885 use crate::query;
886 use crate::query::QueryOption;
887
888 #[ctor::ctor]
889 fn init() {
890 let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
891 .add_directive("google_cloud_bigquery=trace".parse().unwrap());
892 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
893 }
894
895 async fn create_client() -> (Client, String) {
896 let (client_config, project_id) = ClientConfig::new_with_auth().await.unwrap();
897 (Client::new(client_config).await.unwrap(), project_id.unwrap())
898 }
899
900 #[tokio::test]
901 #[serial]
902 async fn test_query_from_storage() {
903 let option = QueryOption::default().with_enable_storage_read(true);
904 test_query(option).await
905 }
906
907 #[tokio::test]
908 #[serial]
909 async fn test_query_from_rest() {
910 let option = QueryOption::default();
911 test_query(option).await
912 }
913
914 async fn test_query(option: QueryOption) {
915 let (client, project_id) = create_client().await;
916 let mut iterator = client
917 .query_with_option::<query::row::Row>(
918 &project_id,
919 QueryRequest {
920 max_results: Some(2),
921 query: "SELECT
922 'A',
923 TIMESTAMP_MICROS(1230219000000019),
924 100,
925 0.432899,
926 DATE(2023,9,1),
927 TIME(15, 30, 01),
928 NULL,
929 ['A','B'],
930 [TIMESTAMP_MICROS(1230219000000019), TIMESTAMP_MICROS(1230219000000020)],
931 [100,200],
932 [0.432899,0.432900],
933 [DATE(2023,9,1),DATE(2023,9,2)],
934 [TIME_ADD(TIME(15,30,1), INTERVAL 10 MICROSECOND),TIME(0, 0, 0),TIME(23,59,59)],
935 b'test',
936 true,
937 [b'test',b'test2'],
938 [true,false],
939 cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),
940 cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC),
941 cast('-9.9999999999999999999999999999999999999E+28' as NUMERIC),
942 cast('9.9999999999999999999999999999999999999E+28' as NUMERIC),
943 [cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC)]
944 ".to_string(),
945 ..Default::default()
946 },
947 option,
948 )
949 .await
950 .unwrap();
951
952 assert_eq!(1, iterator.total_size);
953
954 while let Some(row) = iterator.next().await.unwrap() {
955 let v: String = row.column(0).unwrap();
956 assert_eq!(v, "A");
957 let v: OffsetDateTime = row.column(1).unwrap();
958 assert_eq!(v.unix_timestamp_nanos(), 1230219000000019000);
959 let v: i64 = row.column(2).unwrap();
960 assert_eq!(v, 100);
961 let v: f64 = row.column(3).unwrap();
962 assert_eq!(v, 0.432899);
963 let v: Date = row.column(4).unwrap();
964 assert_eq!(v, time::macros::date!(2023 - 09 - 01));
965 let v: Time = row.column(5).unwrap();
966 assert_eq!(v, time::macros::time!(15:30:01));
967 let v: Option<String> = row.column(6).unwrap();
968 assert!(v.is_none());
969 let v: Option<OffsetDateTime> = row.column(6).unwrap();
970 assert!(v.is_none());
971 let v: Option<i64> = row.column(6).unwrap();
972 assert!(v.is_none());
973 let v: Option<f64> = row.column(6).unwrap();
974 assert!(v.is_none());
975 let v: Option<Date> = row.column(6).unwrap();
976 assert!(v.is_none());
977 let v: Option<Time> = row.column(6).unwrap();
978 assert!(v.is_none());
979 let v: Option<Vec<Time>> = row.column(6).unwrap();
980 assert!(v.is_none());
981 let v: Option<BigDecimal> = row.column(6).unwrap();
982 assert!(v.is_none());
983 let v: Option<bool> = row.column(6).unwrap();
984 assert!(v.is_none());
985 let v: Option<String> = row.column(6).unwrap();
986 assert!(v.is_none());
987 let v: Option<Vec<u8>> = row.column(6).unwrap();
988 assert!(v.is_none());
989
990 let v: Vec<String> = row.column(7).unwrap();
991 assert_eq!(v, vec!["A", "B"]);
992 let v: Vec<OffsetDateTime> = row.column(8).unwrap();
993 assert_eq!(v[0].unix_timestamp_nanos(), 1230219000000019000);
994 assert_eq!(v[1].unix_timestamp_nanos(), 1230219000000020000);
995 let v: Vec<i64> = row.column(9).unwrap();
996 assert_eq!(v, vec![100, 200]);
997 let v: Vec<f64> = row.column(10).unwrap();
998 assert_eq!(v, vec![0.432899, 0.432900]);
999 let v: Vec<Date> = row.column(11).unwrap();
1000 assert_eq!(v[0], time::macros::date!(2023 - 09 - 01));
1001 assert_eq!(v[1], time::macros::date!(2023 - 09 - 02));
1002 let v: Vec<Time> = row.column(12).unwrap();
1003 let mut tm = time::macros::time!(15:30:01);
1004 tm.add_assign(Duration::from_micros(10));
1005 assert_eq!(v[0], tm);
1006 assert_eq!(v[1], time::macros::time!(0:0:0));
1007 assert_eq!(v[2], time::macros::time!(23:59:59));
1008
1009 let v: Vec<u8> = row.column(13).unwrap();
1010 assert_eq!(v, b"test");
1011 let v: bool = row.column(14).unwrap();
1012 assert!(v);
1013 let v: Vec<Vec<u8>> = row.column(15).unwrap();
1014 assert_eq!(v[0], b"test");
1015 assert_eq!(v[1], b"test2");
1016 let v: Vec<bool> = row.column(16).unwrap();
1017 assert!(v[0]);
1018 assert!(!v[1]);
1019 let v: BigDecimal = row.column(17).unwrap();
1020 assert_eq!(
1021 v.to_string(),
1022 "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1023 );
1024 let v: BigDecimal = row.column(18).unwrap();
1025 assert_eq!(
1026 v.to_string(),
1027 "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1028 );
1029 let v: BigDecimal = row.column(19).unwrap();
1030 assert_eq!(v.to_string(), "-99999999999999999999999999999.999999999");
1031 let v: BigDecimal = row.column(20).unwrap();
1032 assert_eq!(v.to_string(), "99999999999999999999999999999.999999999");
1033 let v: Vec<BigDecimal> = row.column(21).unwrap();
1034 assert_eq!(
1035 v[0].to_string(),
1036 "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1037 );
1038 assert_eq!(
1039 v[1].to_string(),
1040 "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1041 );
1042 }
1043 }
1044
1045 #[tokio::test(flavor = "multi_thread")]
1046 #[serial]
1047 async fn test_query_table_from_storage() {
1048 test_query_table(None, QueryOption::default().with_enable_storage_read(true)).await
1049 }
1050
1051 #[tokio::test(flavor = "multi_thread")]
1052 #[serial]
1053 async fn test_query_table_from_rest() {
1054 test_query_table(Some(1), QueryOption::default()).await
1055 }
1056
1057 async fn insert(client: &Client, project: &str, dataset: &str, table: &str, size: usize, now: &OffsetDateTime) {
1058 let mut table1 = Table::default();
1059 table1.table_reference.dataset_id = dataset.to_string();
1060 table1.table_reference.project_id = project.to_string();
1061 table1.table_reference.table_id = table.to_string();
1062 table1.schema = Some(create_table_schema());
1063 let _table1 = client.table_client.create(&table1).await.unwrap();
1064 let mut req = InsertAllRequest::<TestData>::default();
1065 for i in 0..size {
1066 req.rows.push(Row {
1067 insert_id: None,
1068 json: TestData::default(i, *now + Duration::from_secs(i as u64)),
1069 });
1070 }
1071 client.tabledata().insert(project, dataset, table, &req).await.unwrap();
1072 }
1073
1074 async fn test_query_table(max_results: Option<i64>, option: QueryOption) {
1075 let dataset = dataset_name("table");
1076 let (client, project_id) = create_client().await;
1077 let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1078 let table = format!("test_query_table_{}", now.unix_timestamp());
1079 insert(&client, &project_id, &dataset, &table, 3, &now).await;
1080
1081 let mut data_as_row: Vec<TestData> = vec![];
1083 let mut iterator_as_row = client
1084 .query_with_option::<query::row::Row>(
1085 &project_id,
1086 QueryRequest {
1087 max_results,
1088 query: format!("SELECT * FROM {dataset}.{table}"),
1089 ..Default::default()
1090 },
1091 option.clone(),
1092 )
1093 .await
1094 .unwrap();
1095 while let Some(row) = iterator_as_row.next().await.unwrap() {
1096 data_as_row.push(TestData {
1097 col_string: row.column(0).unwrap(),
1098 col_number: row.column(1).unwrap(),
1099 col_number_array: row.column(2).unwrap(),
1100 col_timestamp: row.column(3).unwrap(),
1101 col_json: row.column(4).unwrap(),
1102 col_json_array: row.column(5).unwrap(),
1103 col_struct: row.column(6).unwrap(),
1104 col_struct_array: row.column(7).unwrap(),
1105 col_binary: row.column(8).unwrap(),
1106 });
1107 }
1108 let mut data_as_struct: Vec<TestData> = vec![];
1109 let mut iterator_as_struct = client
1110 .query_with_option::<TestData>(
1111 &project_id,
1112 QueryRequest {
1113 query: format!("SELECT * FROM {dataset}.{table}"),
1114 ..Default::default()
1115 },
1116 option,
1117 )
1118 .await
1119 .unwrap();
1120 while let Some(row) = iterator_as_struct.next().await.unwrap() {
1121 data_as_struct.push(row);
1122 }
1123 assert_eq!(iterator_as_struct.total_size, 3);
1124 assert_eq!(iterator_as_row.total_size, 3);
1125 assert_eq!(data_as_struct.len(), 3);
1126 assert_eq!(data_as_row.len(), 3);
1127
1128 assert_data(&now, data_as_struct);
1129 assert_data(&now, data_as_row);
1130 }
1131
1132 #[tokio::test(flavor = "multi_thread")]
1133 #[serial]
1134 async fn test_read_table() {
1135 let dataset = dataset_name("table");
1136 let (client, project_id) = create_client().await;
1137 let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1138 let table = format!("test_read_table_{}", now.unix_timestamp());
1139 insert(&client, &project_id, &dataset, &table, 3, &now).await;
1140
1141 let table = TableReference {
1142 project_id,
1143 dataset_id: dataset.to_string(),
1144 table_id: table.to_string(),
1145 };
1146 let mut iterator_as_struct = client.read_table::<TestData>(&table, None).await.unwrap();
1147
1148 let option = ReadTableOption {
1149 session_read_options: Some(TableReadOptions {
1150 row_restriction: "col_string = \"test_0\"".to_string(),
1151 ..Default::default()
1152 }),
1153 ..Default::default()
1154 };
1155 let mut iterator_as_row = client
1156 .read_table::<crate::storage::row::Row>(&table, Some(option))
1157 .await
1158 .unwrap();
1159 let mut data_as_row: Vec<TestData> = vec![];
1160 let mut data_as_struct: Vec<TestData> = vec![];
1161 let mut finish1 = false;
1162 let mut finish2 = false;
1163 loop {
1164 tokio::select! {
1165 row = iterator_as_struct.next() => {
1166 if let Some(row) = row.unwrap() {
1167 tracing::info!("read struct some");
1168 data_as_struct.push(row);
1169 }else {
1170 tracing::info!("read struct none");
1171 finish1 = true;
1172 if finish1 && finish2 {
1173 break;
1174 }
1175 }
1176 },
1177 row = iterator_as_row.next() => {
1178 if let Some(row) = row.unwrap() {
1179 tracing::info!("read row some");
1180 data_as_row.push(TestData {
1181 col_string: row.column(0).unwrap(),
1182 col_number: row.column(1).unwrap(),
1183 col_number_array: row.column(2).unwrap(),
1184 col_timestamp: row.column(3).unwrap(),
1185 col_json: row.column(4).unwrap(),
1186 col_json_array: row.column(5).unwrap(),
1187 col_struct: row.column(6).unwrap(),
1188 col_struct_array: row.column(7).unwrap(),
1189 col_binary: row.column(8).unwrap(),
1190 } );
1191 }else {
1192 tracing::info!("read row none");
1193 finish2 = true;
1194 if finish1 && finish2 {
1195 break;
1196 }
1197 }
1198 }
1199 }
1200 }
1201 assert_eq!(data_as_struct.len(), 3);
1202 assert_eq!(data_as_row.len(), 1);
1203
1204 assert_data(&now, data_as_struct);
1205 assert_data(&now, data_as_row);
1206 }
1207
1208 #[tokio::test(flavor = "multi_thread")]
1209 #[serial]
1210 async fn test_query_job_incomplete_from_storage() {
1211 test_query_job_incomplete(None, QueryOption::default().with_enable_storage_read(true)).await
1212 }
1213
1214 #[tokio::test(flavor = "multi_thread")]
1215 #[serial]
1216 async fn test_query_job_incomplete_from_rest() {
1217 test_query_job_incomplete(Some(4999), QueryOption::default()).await
1218 }
1219
1220 async fn test_query_job_incomplete(max_results: Option<i64>, option: QueryOption) {
1221 let dataset = dataset_name("table");
1222 let (client, project_id) = create_client().await;
1223 let now = OffsetDateTime::now_utc();
1224 let table = format!("test_query_job_incomplete_{}", now.unix_timestamp());
1225 const SIZE: usize = 10000;
1226 insert(&client, &project_id, &dataset, &table, SIZE, &now).await;
1227
1228 let mut data: Vec<query::row::Row> = vec![];
1229 let mut iter = client
1230 .query_with_option::<query::row::Row>(
1231 &project_id,
1232 QueryRequest {
1233 timeout_ms: Some(5), use_query_cache: Some(false),
1235 max_results,
1236 query: format!("SELECT 1 FROM {dataset}.{table}"),
1237 ..Default::default()
1238 },
1239 option,
1240 )
1241 .await
1242 .unwrap();
1243 while let Some(row) = iter.next().await.unwrap() {
1244 data.push(row);
1245 }
1246 assert_eq!(iter.total_size, SIZE as i64);
1247 assert_eq!(data.len(), SIZE);
1248 }
1249
1250 #[derive(Debug, Clone)]
1251 struct Val {
1252 pub val1: String,
1253 pub val2: String,
1254 }
1255
1256 #[tokio::test]
1257 #[serial]
1258 async fn test_query_with_parameter() {
1259 let array_val = [
1260 Val {
1261 val1: "val1-1".to_string(),
1262 val2: "val1-2".to_string(),
1263 },
1264 Val {
1265 val1: "val2-1".to_string(),
1266 val2: "val2-2".to_string(),
1267 },
1268 ];
1269
1270 let query_parameter = QueryParameter {
1271 name: Some("p1".to_string()),
1272 parameter_type: QueryParameterType {
1273 parameter_type: "ARRAY".to_string(),
1274 array_type: Some(Box::new(QueryParameterType {
1275 parameter_type: "STRUCT".to_string(),
1276 struct_types: Some(vec![
1277 QueryParameterStructType {
1278 name: Some("val1".to_string()),
1279 field_type: QueryParameterType {
1280 parameter_type: "STRING".to_string(),
1281 ..Default::default()
1282 },
1283 description: None,
1284 },
1285 QueryParameterStructType {
1286 name: Some("val2".to_string()),
1287 field_type: QueryParameterType {
1288 parameter_type: "STRING".to_string(),
1289 ..Default::default()
1290 },
1291 description: None,
1292 },
1293 ]),
1294 array_type: None,
1295 })),
1296 struct_types: None,
1297 },
1298 parameter_value: QueryParameterValue {
1299 array_values: Some(
1300 array_val
1301 .iter()
1302 .map(|val| {
1303 let mut param_map = HashMap::new();
1304 param_map.insert(
1305 "val1".to_string(),
1306 QueryParameterValue {
1307 value: Some(val.val1.clone()),
1308 ..Default::default()
1309 },
1310 );
1311 param_map.insert(
1312 "val2".to_string(),
1313 QueryParameterValue {
1314 value: Some(val.val2.clone()),
1315 ..Default::default()
1316 },
1317 );
1318 QueryParameterValue {
1319 struct_values: Some(param_map),
1320 value: None,
1321 array_values: None,
1322 }
1323 })
1324 .collect(),
1325 ),
1326 ..Default::default()
1327 },
1328 };
1329 let (client, project_id) = create_client().await;
1330 let mut result = client
1331 .query::<query::row::Row>(
1332 &project_id,
1333 QueryRequest {
1334 query: "
1335 WITH VAL AS (SELECT @p1 AS col1)
1336 SELECT
1337 ARRAY(SELECT val1 FROM UNNEST(col1)) AS val1,
1338 ARRAY(SELECT val2 FROM UNNEST(col1)) AS val2
1339 FROM VAL
1340 "
1341 .to_string(),
1342 query_parameters: vec![query_parameter],
1343 ..QueryRequest::default()
1344 },
1345 )
1346 .await
1347 .unwrap();
1348 let row = result.next().await.unwrap().unwrap();
1349 let col = row.column::<Vec<String>>(0).unwrap();
1350 assert_eq!(col[0], "val1-1".to_string());
1351 assert_eq!(col[1], "val2-1".to_string());
1352 let col = row.column::<Vec<String>>(1).unwrap();
1353 assert_eq!(col[0], "val1-2".to_string());
1354 assert_eq!(col[1], "val2-2".to_string());
1355 }
1356
1357 fn assert_data(now: &OffsetDateTime, data: Vec<TestData>) {
1358 for (i, d) in data.iter().enumerate() {
1359 assert_eq!(&TestData::default(i, *now + Duration::from_secs(i as u64)), d);
1360 }
1361 }
1362}
1363
1364#[cfg(test)]
1365mod emulator_tests {
1366 use crate::client::{Client, ClientConfig};
1367 use crate::http::table::{Table, TableFieldSchema, TableFieldType, TableSchema};
1368 use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
1369 use crate::http::tabledata::list::FetchDataRequest;
1370 use futures_util::StreamExt;
1371
1372 use prost::Message;
1373
1374 use std::time::SystemTime;
1375
1376 #[ignore]
1377 #[tokio::test]
1378 async fn test_emulator_use() {
1379 let config = ClientConfig::new_with_emulator("localhost:9060", "http://localhost:9050");
1380
1381 let now = SystemTime::now()
1383 .duration_since(SystemTime::UNIX_EPOCH)
1384 .unwrap()
1385 .as_secs();
1386 let client = Client::new(config).await.unwrap();
1387 let mut table1 = Table::default();
1388 table1.table_reference.dataset_id = "dataset1".to_string();
1389 table1.table_reference.project_id = "local-project".to_string();
1390 table1.table_reference.table_id = format!("table{now}").to_string();
1391 table1.schema = Some(TableSchema {
1392 fields: vec![TableFieldSchema {
1393 name: "col_string".to_string(),
1394 data_type: TableFieldType::String,
1395 ..Default::default()
1396 }],
1397 });
1398 client.table_client.create(&table1).await.unwrap();
1399
1400 let mut req = InsertAllRequest::<serde_json::Value>::default();
1402 req.rows.push(Row {
1403 insert_id: None,
1404 json: serde_json::from_str(
1405 r#"
1406 {"col_string": "test1"}
1407 "#,
1408 )
1409 .unwrap(),
1410 });
1411 client
1412 .tabledata_client
1413 .insert(
1414 &table1.table_reference.project_id,
1415 &table1.table_reference.dataset_id,
1416 &table1.table_reference.table_id,
1417 &req,
1418 )
1419 .await
1420 .unwrap();
1421
1422 let writer = client.default_storage_writer();
1424 let fqtn = &format!(
1425 "projects/local-project/datasets/dataset1/tables/{}",
1426 table1.table_reference.table_id
1427 );
1428 let stream = writer.create_write_stream(fqtn).await.unwrap();
1429
1430 let mut rows = vec![];
1431 for j in 0..5 {
1432 let data = crate::storage_write::stream::tests::TestData {
1433 col_string: format!("default_{j}"),
1434 };
1435 let mut buf = Vec::new();
1436 data.encode(&mut buf).unwrap();
1437 rows.push(crate::storage_write::stream::tests::create_append_rows_request(vec![
1438 buf.clone(),
1439 buf.clone(),
1440 buf,
1441 ]));
1442 }
1443 let mut result = stream.append_rows(rows).await.unwrap();
1444 while let Some(res) = result.next().await {
1445 let res = res.unwrap();
1446 tracing::info!("append row errors = {:?}", res.row_errors.len());
1447 }
1448
1449 let tref = &table1.table_reference;
1451 let data = client
1452 .tabledata_client
1453 .read(
1454 &tref.project_id,
1455 &tref.dataset_id,
1456 &tref.table_id,
1457 &FetchDataRequest { ..Default::default() },
1458 )
1459 .await
1460 .unwrap();
1461 assert_eq!(16, data.total_rows);
1462
1463 }
1479}