google_cloud_bigquery/
client.rs

1use backon::{ExponentialBuilder, Retryable};
2use core::time::Duration;
3use google_cloud_gax::conn::{ConnectionOptions, Environment};
4use google_cloud_gax::retry::RetrySetting;
5use google_cloud_googleapis::cloud::bigquery::storage::v1::{
6    read_session, CreateReadSessionRequest, DataFormat, ReadSession,
7};
8use google_cloud_token::{TokenSource, TokenSourceProvider};
9use std::borrow::Cow;
10use std::collections::VecDeque;
11use std::fmt::Debug;
12use std::future::Future;
13use std::marker::PhantomData;
14use std::sync::Arc;
15
16use crate::grpc::apiv1::conn_pool::ConnectionManager;
17use crate::http::bigquery_client::BigqueryClient;
18use crate::http::bigquery_dataset_client::BigqueryDatasetClient;
19use crate::http::bigquery_job_client::BigqueryJobClient;
20use crate::http::bigquery_model_client::BigqueryModelClient;
21use crate::http::bigquery_routine_client::BigqueryRoutineClient;
22use crate::http::bigquery_row_access_policy_client::BigqueryRowAccessPolicyClient;
23use crate::http::bigquery_table_client::BigqueryTableClient;
24use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
25use crate::http::job::get_query_results::GetQueryResultsRequest;
26use crate::http::job::query::QueryRequest;
27use crate::http::job::{is_script, is_select_query, JobConfiguration, JobReference, JobStatistics, JobType};
28use crate::http::table::TableReference;
29use crate::query::{QueryOption, QueryResult};
30use crate::storage;
31use crate::{http, query};
32
33#[cfg(feature = "auth")]
34pub use google_cloud_auth;
35
36const JOB_RETRY_REASONS: [&str; 3] = ["backendError", "rateLimitExceeded", "internalError"];
37
38#[derive(Debug)]
39pub struct HttpClientConfig {
40    client: Option<reqwest_middleware::ClientWithMiddleware>,
41    bigquery_endpoint: Cow<'static, str>,
42    token_source_provider: Box<dyn TokenSourceProvider>,
43    debug: bool,
44}
45
46impl HttpClientConfig {
47    pub fn new_with_emulator(http_addr: impl Into<Cow<'static, str>>) -> Self {
48        Self {
49            client: None,
50            bigquery_endpoint: http_addr.into(),
51            token_source_provider: Box::new(EmptyTokenSourceProvider {}),
52            debug: false,
53        }
54    }
55
56    pub fn new(http_token_source_provider: Box<dyn TokenSourceProvider>) -> Self {
57        Self {
58            client: None,
59            bigquery_endpoint: "https://bigquery.googleapis.com".into(),
60            token_source_provider: http_token_source_provider,
61            debug: false,
62        }
63    }
64
65    pub fn with_debug(mut self, value: bool) -> Self {
66        self.debug = value;
67        self
68    }
69
70    pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
71        self.client = Some(value);
72        self
73    }
74
75    pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
76        self.bigquery_endpoint = value.into();
77        self
78    }
79
80    pub fn create_client(self) -> Arc<BigqueryClient> {
81        let ts = self.token_source_provider.token_source();
82        Arc::new(BigqueryClient::new(
83            ts,
84            self.bigquery_endpoint.as_ref(),
85            self.client
86                .unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()),
87            self.debug,
88        ))
89    }
90}
91
92#[cfg(feature = "auth")]
93impl HttpClientConfig {
94    fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> {
95        google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES)
96    }
97
98    ///Creates new token provider for HTTP client
99    pub fn default_token_provider() -> impl Future<
100        Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
101    > + Send
102           + 'static {
103        google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config())
104    }
105
106    ///Creates new token provider for HTTP client with specified `credentials`
107    pub fn default_token_provider_with(
108        credentials: google_cloud_auth::credentials::CredentialsFile,
109    ) -> impl Future<
110        Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
111    > + Send
112           + 'static {
113        google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
114            HttpClientConfig::bigquery_http_auth_config(),
115            Box::new(credentials.clone()),
116        )
117    }
118}
119
120#[derive(Debug)]
121pub struct ClientConfig {
122    http: HttpClientConfig,
123    environment: Environment,
124    streaming_read_config: ChannelConfig,
125    streaming_write_config: StreamingWriteConfig,
126}
127
128#[derive(Clone, Debug, Default)]
129pub struct StreamingWriteConfig {
130    channel_config: ChannelConfig,
131    max_insert_count: usize,
132}
133
134impl StreamingWriteConfig {
135    pub fn with_channel_config(mut self, value: ChannelConfig) -> Self {
136        self.channel_config = value;
137        self
138    }
139    pub fn with_max_insert_count(mut self, value: usize) -> Self {
140        self.max_insert_count = value;
141        self
142    }
143}
144
145#[derive(Clone, Debug)]
146pub struct ChannelConfig {
147    /// num_channels is the number of gRPC channels.
148    num_channels: usize,
149    connect_timeout: Option<Duration>,
150    timeout: Option<Duration>,
151}
152
153impl ChannelConfig {
154    pub fn with_num_channels(mut self, value: usize) -> Self {
155        self.num_channels = value;
156        self
157    }
158    pub fn with_connect_timeout(mut self, value: Duration) -> Self {
159        self.connect_timeout = Some(value);
160        self
161    }
162    pub fn with_timeout(mut self, value: Duration) -> Self {
163        self.timeout = Some(value);
164        self
165    }
166
167    async fn into_connection_manager(
168        self,
169        environment: &Environment,
170    ) -> Result<ConnectionManager, google_cloud_gax::conn::Error> {
171        ConnectionManager::new(
172            self.num_channels,
173            environment,
174            &ConnectionOptions {
175                timeout: self.timeout,
176                connect_timeout: self.connect_timeout,
177            },
178        )
179        .await
180    }
181}
182
183impl Default for ChannelConfig {
184    fn default() -> Self {
185        Self {
186            num_channels: 4,
187            connect_timeout: Some(Duration::from_secs(30)),
188            timeout: None,
189        }
190    }
191}
192
193#[derive(Debug)]
194pub struct EmptyTokenSourceProvider {}
195
196#[derive(Debug)]
197pub struct EmptyTokenSource {}
198
199#[async_trait::async_trait]
200impl TokenSource for EmptyTokenSource {
201    async fn token(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
202        Ok("".to_string())
203    }
204}
205
206impl TokenSourceProvider for EmptyTokenSourceProvider {
207    fn token_source(&self) -> Arc<dyn TokenSource> {
208        Arc::new(EmptyTokenSource {})
209    }
210}
211
212impl ClientConfig {
213    pub fn new_with_emulator(grpc_host: &str, http_addr: impl Into<Cow<'static, str>>) -> Self {
214        Self {
215            http: HttpClientConfig::new_with_emulator(http_addr),
216            environment: Environment::Emulator(grpc_host.to_string()),
217            streaming_read_config: ChannelConfig::default(),
218            streaming_write_config: StreamingWriteConfig::default(),
219        }
220    }
221
222    pub fn new(
223        http_token_source_provider: Box<dyn TokenSourceProvider>,
224        grpc_token_source_provider: Box<dyn TokenSourceProvider>,
225    ) -> Self {
226        Self {
227            http: HttpClientConfig::new(http_token_source_provider),
228            environment: Environment::GoogleCloud(grpc_token_source_provider),
229            streaming_read_config: ChannelConfig::default(),
230            streaming_write_config: StreamingWriteConfig::default(),
231        }
232    }
233
234    pub fn with_debug(mut self, value: bool) -> Self {
235        self.http.debug = value;
236        self
237    }
238
239    pub fn with_streaming_read_config(mut self, value: ChannelConfig) -> Self {
240        self.streaming_read_config = value;
241        self
242    }
243
244    pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self {
245        self.streaming_write_config = value;
246        self
247    }
248
249    pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
250        self.http.client = Some(value);
251        self
252    }
253
254    pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
255        self.http.bigquery_endpoint = value.into();
256        self
257    }
258}
259
260use crate::http::job::get::GetJobRequest;
261use crate::http::job::list::ListJobsRequest;
262
263use crate::grpc::apiv1::bigquery_client::StreamingReadClient;
264use crate::storage_write::stream::{buffered, committed, default, pending};
265use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
266
267#[cfg(feature = "auth")]
268impl ClientConfig {
269    pub async fn new_with_auth() -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
270        let ts_http = HttpClientConfig::default_token_provider().await?;
271        let ts_grpc =
272            google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_grpc_auth_config()).await?;
273        let project_id = ts_grpc.project_id.clone();
274        let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
275        Ok((config, project_id))
276    }
277
278    pub async fn new_with_credentials(
279        credentials: google_cloud_auth::credentials::CredentialsFile,
280    ) -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
281        let ts_http = HttpClientConfig::default_token_provider_with(credentials.clone()).await?;
282        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
283            Self::bigquery_grpc_auth_config(),
284            Box::new(credentials),
285        )
286        .await?;
287        let project_id = ts_grpc.project_id.clone();
288        let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
289        Ok((config, project_id))
290    }
291
292    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
293        google_cloud_auth::project::Config::default()
294            .with_audience(crate::grpc::apiv1::conn_pool::AUDIENCE)
295            .with_scopes(&crate::grpc::apiv1::conn_pool::SCOPES)
296    }
297}
298
299#[derive(thiserror::Error, Debug)]
300pub enum QueryError {
301    #[error(transparent)]
302    Storage(#[from] storage::Error),
303    #[error(transparent)]
304    JobHttp(#[from] http::error::Error),
305    #[error("job has no destination table to read : job={0:?}")]
306    NoDestinationTable(JobReference),
307    #[error("failed to resolve table for script job: no child jobs found : job={0:?}")]
308    NoChildJobs(JobReference),
309    #[error("job type must be query: job={0:?}, jobType={1:?}")]
310    InvalidJobType(JobReference, String),
311    #[error(transparent)]
312    RunQuery(#[from] query::run::Error),
313}
314
315#[derive(Clone)]
316pub struct Client {
317    dataset_client: BigqueryDatasetClient,
318    table_client: BigqueryTableClient,
319    tabledata_client: BigqueryTabledataClient,
320    job_client: BigqueryJobClient,
321    routine_client: BigqueryRoutineClient,
322    row_access_policy_client: BigqueryRowAccessPolicyClient,
323    model_client: BigqueryModelClient,
324    streaming_read_conn_pool: Arc<ConnectionManager>,
325    streaming_write_conn_pool: Arc<ConnectionManager>,
326    streaming_write_max_insert_count: usize,
327}
328
329impl Client {
330    /// New client
331    pub async fn new(config: ClientConfig) -> Result<Self, google_cloud_gax::conn::Error> {
332        let client = config.http.create_client();
333
334        Ok(Self {
335            dataset_client: BigqueryDatasetClient::new(client.clone()),
336            table_client: BigqueryTableClient::new(client.clone()),
337            tabledata_client: BigqueryTabledataClient::new(client.clone()),
338            job_client: BigqueryJobClient::new(client.clone()),
339            routine_client: BigqueryRoutineClient::new(client.clone()),
340            row_access_policy_client: BigqueryRowAccessPolicyClient::new(client.clone()),
341            model_client: BigqueryModelClient::new(client.clone()),
342            streaming_read_conn_pool: Arc::new(
343                config
344                    .streaming_read_config
345                    .into_connection_manager(&config.environment)
346                    .await?,
347            ),
348            streaming_write_conn_pool: Arc::new(
349                config
350                    .streaming_write_config
351                    .channel_config
352                    .into_connection_manager(&config.environment)
353                    .await?,
354            ),
355            streaming_write_max_insert_count: config.streaming_write_config.max_insert_count,
356        })
357    }
358
359    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets
360    /// [BigqueryDatasetClient](crate::http::bigquery_dataset_client::BigqueryDatasetClient)
361    pub fn dataset(&self) -> &BigqueryDatasetClient {
362        &self.dataset_client
363    }
364
365    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables
366    /// [BigqueryTableClient](crate::http::bigquery_table_client::BigqueryTableClient)
367    pub fn table(&self) -> &BigqueryTableClient {
368        &self.table_client
369    }
370
371    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata
372    /// [BigqueryTabledataClient](crate::http::bigquery_tabledata_client::BigqueryTabledataClient)
373    pub fn tabledata(&self) -> &BigqueryTabledataClient {
374        &self.tabledata_client
375    }
376
377    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs
378    /// [BigqueryJobClient](crate::http::bigquery_job_client::BigqueryJobClient)
379    pub fn job(&self) -> &BigqueryJobClient {
380        &self.job_client
381    }
382
383    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/routines
384    /// [BigqueryRoutineClient](crate::http::bigquery_routine_client::BigqueryRoutineClient)
385    pub fn routine(&self) -> &BigqueryRoutineClient {
386        &self.routine_client
387    }
388
389    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/rowAccessPolicy
390    /// [BigqueryRowAccessPolicyClient](crate::http::bigquery_row_access_policy_client::BigqueryRowAccessPolicyClient)
391    pub fn row_access_policy(&self) -> &BigqueryRowAccessPolicyClient {
392        &self.row_access_policy_client
393    }
394
395    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/models
396    /// [BigqueryModelClient](crate::http::bigquery_model_client::BigqueryModelClient)
397    pub fn model(&self) -> &BigqueryModelClient {
398        &self.model_client
399    }
400
401    /// Creates a new pending type storage writer for the specified table.
402    /// https://cloud.google.com/bigquery/docs/write-api#pending_type
403    /// ```
404    /// use prost_types::DescriptorProto;
405    /// use google_cloud_bigquery::client::Client;
406    /// use google_cloud_gax::grpc::Status;
407    /// use prost::Message;
408    /// use tokio::sync::futures;
409    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
410    /// use futures_util::stream::StreamExt;
411    ///
412    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
413    /// -> Result<(), Status> {
414    ///     let mut writer = client.pending_storage_writer(table);
415    ///     let stream = writer.create_write_stream().await?;
416    ///
417    ///     let mut data= vec![];
418    ///     for row in rows {
419    ///         let mut buf = Vec::new();
420    ///         row.encode(&mut buf).unwrap();
421    ///         data.push(buf);
422    ///     }
423    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
424    ///     while let Some(Ok(res)) = result.next().await {
425    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
426    ///     }
427    ///
428    ///     let _ = stream.finalize().await?;
429    ///     let _ = writer.commit().await?;
430    ///     Ok(())
431    /// }
432    /// ```
433    pub fn pending_storage_writer(&self, table: &str) -> pending::Writer {
434        pending::Writer::new(1, self.streaming_write_conn_pool.clone(), table.to_string())
435    }
436
437    /// Creates a new default type storage writer.
438    /// https://cloud.google.com/bigquery/docs/write-api#default_stream
439    /// ```
440    /// use prost_types::DescriptorProto;
441    /// use google_cloud_bigquery::client::Client;
442    /// use google_cloud_gax::grpc::Status;
443    /// use prost::Message;
444    /// use tokio::sync::futures;
445    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
446    /// use futures_util::stream::StreamExt;
447    ///
448    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
449    /// -> Result<(), Status> {
450    ///     let writer = client.default_storage_writer();
451    ///     let stream = writer.create_write_stream(table).await?;
452    ///
453    ///     let mut data= vec![];
454    ///     for row in rows {
455    ///         let mut buf = Vec::new();
456    ///         row.encode(&mut buf).unwrap();
457    ///         data.push(buf);
458    ///     }
459    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
460    ///     while let Some(Ok(res)) = result.next().await {
461    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
462    ///     }
463    ///     Ok(())
464    /// }
465    /// ```
466    pub fn default_storage_writer(&self) -> default::Writer {
467        default::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
468    }
469
470    /// Creates a new committed type storage writer.
471    /// https://cloud.google.com/bigquery/docs/write-api#committed_type
472    /// ```
473    /// use prost_types::DescriptorProto;
474    /// use google_cloud_bigquery::client::Client;
475    /// use google_cloud_gax::grpc::Status;
476    /// use prost::Message;
477    /// use tokio::sync::futures;
478    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
479    /// use futures_util::stream::StreamExt;
480    ///
481    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
482    /// -> Result<(), Status> {
483    ///     let writer = client.committed_storage_writer();
484    ///     let stream = writer.create_write_stream(table).await?;
485    ///
486    ///     let mut data= vec![];
487    ///     for row in rows {
488    ///         let mut buf = Vec::new();
489    ///         row.encode(&mut buf).unwrap();
490    ///         data.push(buf);
491    ///     }
492    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
493    ///     while let Some(Ok(res)) = result.next().await {
494    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
495    ///     }
496    ///
497    ///     let _ = stream.finalize().await?;
498    ///     Ok(())
499    /// }
500    /// ```
501    pub fn committed_storage_writer(&self) -> committed::Writer {
502        committed::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
503    }
504
505    /// Creates a new buffered type storage writer.
506    /// https://cloud.google.com/bigquery/docs/write-api#buffered_type
507    /// ```
508    /// use prost_types::DescriptorProto;
509    /// use google_cloud_bigquery::client::Client;
510    /// use prost::Message;
511    /// use tokio::sync::futures;
512    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
513    /// use futures_util::stream::StreamExt;
514    /// use google_cloud_gax::grpc::Status;
515    ///
516    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
517    /// -> Result<(), Status> {
518    ///     let writer = client.buffered_storage_writer();
519    ///     let stream = writer.create_write_stream(table).await?;
520    ///
521    ///     let mut data= vec![];
522    ///     for row in rows {
523    ///         let mut buf = Vec::new();
524    ///         row.encode(&mut buf).unwrap();
525    ///         data.push(buf);
526    ///     }
527    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
528    ///     while let Some(Ok(res)) = result.next().await {
529    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
530    ///     }
531    ///     let _ = stream.flush_rows(Some(0)).await?;
532    ///     let _ = stream.finalize().await?;
533    ///     Ok(())
534    /// }
535    /// ```
536    pub fn buffered_storage_writer(&self) -> buffered::Writer {
537        buffered::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
538    }
539
540    /// Run query job and get result.
541    /// ```rust
542    /// use google_cloud_bigquery::http::job::query::QueryRequest;
543    /// use google_cloud_bigquery::query::row::Row;
544    /// use google_cloud_bigquery::client::Client;
545    ///
546    /// async fn run(client: &Client, project_id: &str) {
547    ///     let request = QueryRequest {
548    ///         query: "SELECT * FROM dataset.table".to_string(),
549    ///         ..Default::default()
550    ///     };
551    ///     let mut iter = client.query::<Row>(project_id, request).await.unwrap();
552    ///     while let Some(row) = iter.next().await.unwrap() {
553    ///         let col1 = row.column::<String>(0);
554    ///         let col2 = row.column::<Option<String>>(1);
555    ///     }
556    /// }
557    pub async fn query<T>(&self, project_id: &str, request: QueryRequest) -> Result<query::Iterator<T>, QueryError>
558    where
559        T: http::query::value::StructDecodable + storage::value::StructDecodable,
560    {
561        self.query_with_option(project_id, request, QueryOption::default())
562            .await
563    }
564
565    /// Run query job and get result.
566    /// ```rust
567    /// use google_cloud_bigquery::http::job::query::QueryRequest;
568    /// use google_cloud_bigquery::query::row::Row;
569    /// use google_cloud_bigquery::client::Client;
570    /// use google_cloud_bigquery::query::QueryOption;
571    /// use google_cloud_bigquery::query::ExponentialBuilder;
572    ///
573    /// async fn run(client: &Client, project_id: &str) {
574    ///     let request = QueryRequest {
575    ///         query: "SELECT * FROM dataset.table".to_string(),
576    ///         ..Default::default()
577    ///     };
578    ///     let retry = ExponentialBuilder::default().with_max_times(10);
579    ///     let option = QueryOption::default().with_retry(retry).with_enable_storage_read(true);
580    ///     let mut iter = client.query_with_option::<Row>(project_id, request, option).await.unwrap();
581    ///     while let Some(row) = iter.next().await.unwrap() {
582    ///         let col1 = row.column::<String>(0);
583    ///         let col2 = row.column::<Option<String>>(1);
584    ///     }
585    /// }
586    pub async fn query_with_option<T>(
587        &self,
588        project_id: &str,
589        request: QueryRequest,
590        option: QueryOption,
591    ) -> Result<query::Iterator<T>, QueryError>
592    where
593        T: http::query::value::StructDecodable + storage::value::StructDecodable,
594    {
595        let result = self.job_client.query(project_id, &request).await?;
596        let (total_rows, page_token, rows, force_first_fetch) = if result.job_complete {
597            (
598                result.total_rows.unwrap_or_default(),
599                result.page_token,
600                result.rows.unwrap_or_default(),
601                false,
602            )
603        } else {
604            (
605                self.wait_for_query(&result.job_reference, option.retry, &request.timeout_ms)
606                    .await?,
607                None,
608                vec![],
609                true,
610            )
611        };
612
613        //use storage api instead of rest API
614        if option.enable_storage_read && (page_token.is_none() || page_token.as_ref().unwrap().is_empty()) {
615            tracing::trace!("use storage read api for query {:?}", result.job_reference);
616            let job = self
617                .job_client
618                .get(
619                    &result.job_reference.project_id,
620                    &result.job_reference.job_id,
621                    &GetJobRequest {
622                        location: result.job_reference.location.clone(),
623                    },
624                )
625                .await?;
626            let iter = self
627                .new_storage_row_iterator_from_job::<T>(job.job_reference, job.statistics, job.configuration)
628                .await?;
629            return Ok(query::Iterator {
630                inner: QueryResult::Storage(iter),
631                total_size: total_rows,
632            });
633        }
634
635        let http_query_iterator = http::query::Iterator {
636            client: self.job_client.clone(),
637            project_id: result.job_reference.project_id,
638            job_id: result.job_reference.job_id,
639            request: GetQueryResultsRequest {
640                start_index: 0,
641                page_token,
642                max_results: request.max_results,
643                timeout_ms: request.timeout_ms,
644                location: result.job_reference.location,
645                format_options: request.format_options,
646            },
647            chunk: VecDeque::from(rows),
648            total_size: total_rows,
649            force_first_fetch,
650            _marker: PhantomData,
651        };
652        Ok(query::Iterator {
653            inner: QueryResult::Http(http_query_iterator),
654            total_size: total_rows,
655        })
656    }
657
658    async fn new_storage_row_iterator_from_job<T>(
659        &self,
660        mut job: JobReference,
661        mut statistics: Option<JobStatistics>,
662        mut config: JobConfiguration,
663    ) -> Result<storage::Iterator<T>, QueryError>
664    where
665        T: http::query::value::StructDecodable + storage::value::StructDecodable,
666    {
667        loop {
668            tracing::trace!("check child job result {:?}, {:?}, {:?}", job, statistics, config);
669            let query_config = match &config.job {
670                JobType::Query(config) => config,
671                _ => return Err(QueryError::InvalidJobType(job.clone(), config.job_type.clone())),
672            };
673            if let Some(dst) = &query_config.destination_table {
674                return Ok(self.read_table(dst, None).await?);
675            }
676            if !is_script(&statistics, &config) {
677                return Err(QueryError::NoDestinationTable(job.clone()));
678            }
679            let children = self
680                .job_client
681                .list(
682                    &job.project_id,
683                    &ListJobsRequest {
684                        parent_job_id: job.job_id.to_string(),
685                        ..Default::default()
686                    },
687                )
688                .await?;
689
690            let mut found = false;
691            for j in children.into_iter() {
692                if !is_select_query(&j.statistics, &j.configuration) {
693                    continue;
694                }
695                job = j.job_reference;
696                statistics = j.statistics;
697                config = j.configuration;
698                found = true;
699                break;
700            }
701            if !found {
702                break;
703            }
704        }
705        Err(QueryError::NoChildJobs(job.clone()))
706    }
707
708    async fn wait_for_query(
709        &self,
710        job: &JobReference,
711        builder: ExponentialBuilder,
712        timeout_ms: &Option<i64>,
713    ) -> Result<i64, query::run::Error> {
714        // Use get_query_results only to wait for completion, not to read results.
715        let request = GetQueryResultsRequest {
716            max_results: Some(0),
717            timeout_ms: *timeout_ms,
718            location: job.location.clone(),
719            ..Default::default()
720        };
721        let action = || async {
722            tracing::debug!("waiting for job completion {:?}", job);
723            let result = self
724                .job_client
725                .get_query_results(&job.project_id, &job.job_id, &request)
726                .await
727                .map_err(query::run::Error::Http)?;
728            if result.job_complete {
729                Ok(result.total_rows)
730            } else {
731                Err(query::run::Error::JobIncomplete)
732            }
733        };
734        action
735            .retry(builder)
736            .when(|e: &query::run::Error| match e {
737                query::run::Error::JobIncomplete => true,
738                query::run::Error::Http(http::error::Error::HttpClient(_)) => true,
739                query::run::Error::Http(http::error::Error::Response(r)) => r.is_retryable(&JOB_RETRY_REASONS),
740                _ => false,
741            })
742            .await
743    }
744
745    /// Read table data by BigQuery Storage Read API.
746    /// ```rust
747    /// use google_cloud_bigquery::storage::row::Row;
748    /// use google_cloud_bigquery::client::Client;
749    /// use google_cloud_bigquery::http::table::TableReference;
750    ///
751    /// async fn run(client: &Client, project_id: &str) {
752    ///     let table = TableReference {
753    ///         project_id: project_id.to_string(),
754    ///         dataset_id: "dataset".to_string(),
755    ///         table_id: "table".to_string(),
756    ///     };
757    ///     let mut iter = client.read_table::<Row>(&table, None).await.unwrap();
758    ///     while let Some(row) = iter.next().await.unwrap() {
759    ///         let col1 = row.column::<String>(0);
760    ///         let col2 = row.column::<Option<String>>(1);
761    ///     }
762    /// }
763    /// ```
764    pub async fn read_table<T>(
765        &self,
766        table: &TableReference,
767        option: Option<ReadTableOption>,
768    ) -> Result<storage::Iterator<T>, storage::Error>
769    where
770        T: storage::value::StructDecodable,
771    {
772        let option = option.unwrap_or_default();
773
774        let mut client = StreamingReadClient::new(BigQueryReadClient::new(self.streaming_read_conn_pool.conn()));
775        let read_session = client
776            .create_read_session(
777                CreateReadSessionRequest {
778                    parent: format!("projects/{}", table.project_id),
779                    read_session: Some(ReadSession {
780                        name: "".to_string(),
781                        expire_time: None,
782                        data_format: DataFormat::Arrow.into(),
783                        table: table.resource(),
784                        table_modifiers: option.session_table_modifiers,
785                        read_options: option.session_read_options,
786                        streams: vec![],
787                        estimated_total_bytes_scanned: 0,
788                        estimated_total_physical_file_size: 0,
789                        estimated_row_count: 0,
790                        trace_id: "".to_string(),
791                        schema: option.session_schema,
792                    }),
793                    max_stream_count: option.max_stream_count,
794                    preferred_min_stream_count: 0,
795                },
796                option.session_retry_setting,
797            )
798            .await?
799            .into_inner();
800        storage::Iterator::new(client, read_session, option.read_rows_retry_setting).await
801    }
802}
803
804#[derive(Debug, Default, Clone)]
805pub struct ReadTableOption {
806    session_read_options: Option<read_session::TableReadOptions>,
807    session_table_modifiers: Option<read_session::TableModifiers>,
808    session_schema: Option<read_session::Schema>,
809    session_retry_setting: Option<RetrySetting>,
810    read_rows_retry_setting: Option<RetrySetting>,
811    max_stream_count: i32,
812}
813
814impl ReadTableOption {
815    pub fn with_session_read_options(mut self, value: read_session::TableReadOptions) -> Self {
816        self.session_read_options = Some(value);
817        self
818    }
819
820    pub fn with_session_table_modifiers(mut self, value: read_session::TableModifiers) -> Self {
821        self.session_table_modifiers = Some(value);
822        self
823    }
824
825    pub fn with_session_schema(mut self, value: read_session::Schema) -> Self {
826        self.session_schema = Some(value);
827        self
828    }
829
830    pub fn with_session_retry_setting(mut self, value: RetrySetting) -> Self {
831        self.session_retry_setting = Some(value);
832        self
833    }
834
835    pub fn with_read_rows_retry_setting(mut self, value: RetrySetting) -> Self {
836        self.read_rows_retry_setting = Some(value);
837        self
838    }
839
840    pub fn with_max_stream_count(mut self, value: i32) -> Self {
841        self.max_stream_count = value;
842        self
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use bigdecimal::BigDecimal;
849
850    use serial_test::serial;
851    use std::collections::HashMap;
852    use std::ops::AddAssign;
853    use std::time::Duration;
854    use time::{Date, OffsetDateTime, Time};
855
856    use google_cloud_googleapis::cloud::bigquery::storage::v1::read_session::TableReadOptions;
857
858    use crate::client::{Client, ClientConfig, ReadTableOption};
859    use crate::http::bigquery_client::test::{create_table_schema, dataset_name, TestData};
860    use crate::http::job::query::QueryRequest;
861    use crate::http::table::{Table, TableReference};
862    use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
863    use crate::http::types::{QueryParameter, QueryParameterStructType, QueryParameterType, QueryParameterValue};
864    use crate::query;
865    use crate::query::QueryOption;
866
867    #[ctor::ctor]
868    fn init() {
869        let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
870            .add_directive("google_cloud_bigquery=trace".parse().unwrap());
871        let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
872    }
873
874    async fn create_client() -> (Client, String) {
875        let (client_config, project_id) = ClientConfig::new_with_auth().await.unwrap();
876        (Client::new(client_config).await.unwrap(), project_id.unwrap())
877    }
878
879    #[tokio::test]
880    #[serial]
881    async fn test_query_from_storage() {
882        let option = QueryOption::default().with_enable_storage_read(true);
883        test_query(option).await
884    }
885
886    #[tokio::test]
887    #[serial]
888    async fn test_query_from_rest() {
889        let option = QueryOption::default();
890        test_query(option).await
891    }
892
893    async fn test_query(option: QueryOption) {
894        let (client, project_id) = create_client().await;
895        let mut iterator = client
896            .query_with_option::<query::row::Row>(
897                &project_id,
898                QueryRequest {
899                    max_results: Some(2),
900                    query: "SELECT
901                        'A',
902                        TIMESTAMP_MICROS(1230219000000019),
903                        100,
904                        0.432899,
905                        DATE(2023,9,1),
906                        TIME(15, 30, 01),
907                        NULL,
908                        ['A','B'],
909                        [TIMESTAMP_MICROS(1230219000000019), TIMESTAMP_MICROS(1230219000000020)],
910                        [100,200],
911                        [0.432899,0.432900],
912                        [DATE(2023,9,1),DATE(2023,9,2)],
913                        [TIME_ADD(TIME(15,30,1), INTERVAL 10 MICROSECOND),TIME(0, 0, 0),TIME(23,59,59)],
914                        b'test',
915                        true,
916                        [b'test',b'test2'],
917                        [true,false],
918                        cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),
919                        cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC),
920                        cast('-9.9999999999999999999999999999999999999E+28' as NUMERIC),
921                        cast('9.9999999999999999999999999999999999999E+28' as NUMERIC),
922                        [cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC)]
923                    ".to_string(),
924                    ..Default::default()
925                },
926                option,
927            )
928            .await
929            .unwrap();
930
931        assert_eq!(1, iterator.total_size);
932
933        while let Some(row) = iterator.next().await.unwrap() {
934            let v: String = row.column(0).unwrap();
935            assert_eq!(v, "A");
936            let v: OffsetDateTime = row.column(1).unwrap();
937            assert_eq!(v.unix_timestamp_nanos(), 1230219000000019000);
938            let v: i64 = row.column(2).unwrap();
939            assert_eq!(v, 100);
940            let v: f64 = row.column(3).unwrap();
941            assert_eq!(v, 0.432899);
942            let v: Date = row.column(4).unwrap();
943            assert_eq!(v, time::macros::date!(2023 - 09 - 01));
944            let v: Time = row.column(5).unwrap();
945            assert_eq!(v, time::macros::time!(15:30:01));
946            let v: Option<String> = row.column(6).unwrap();
947            assert!(v.is_none());
948            let v: Option<OffsetDateTime> = row.column(6).unwrap();
949            assert!(v.is_none());
950            let v: Option<i64> = row.column(6).unwrap();
951            assert!(v.is_none());
952            let v: Option<f64> = row.column(6).unwrap();
953            assert!(v.is_none());
954            let v: Option<Date> = row.column(6).unwrap();
955            assert!(v.is_none());
956            let v: Option<Time> = row.column(6).unwrap();
957            assert!(v.is_none());
958            let v: Option<Vec<Time>> = row.column(6).unwrap();
959            assert!(v.is_none());
960            let v: Option<BigDecimal> = row.column(6).unwrap();
961            assert!(v.is_none());
962            let v: Option<bool> = row.column(6).unwrap();
963            assert!(v.is_none());
964            let v: Option<String> = row.column(6).unwrap();
965            assert!(v.is_none());
966            let v: Option<Vec<u8>> = row.column(6).unwrap();
967            assert!(v.is_none());
968
969            let v: Vec<String> = row.column(7).unwrap();
970            assert_eq!(v, vec!["A", "B"]);
971            let v: Vec<OffsetDateTime> = row.column(8).unwrap();
972            assert_eq!(v[0].unix_timestamp_nanos(), 1230219000000019000);
973            assert_eq!(v[1].unix_timestamp_nanos(), 1230219000000020000);
974            let v: Vec<i64> = row.column(9).unwrap();
975            assert_eq!(v, vec![100, 200]);
976            let v: Vec<f64> = row.column(10).unwrap();
977            assert_eq!(v, vec![0.432899, 0.432900]);
978            let v: Vec<Date> = row.column(11).unwrap();
979            assert_eq!(v[0], time::macros::date!(2023 - 09 - 01));
980            assert_eq!(v[1], time::macros::date!(2023 - 09 - 02));
981            let v: Vec<Time> = row.column(12).unwrap();
982            let mut tm = time::macros::time!(15:30:01);
983            tm.add_assign(Duration::from_micros(10));
984            assert_eq!(v[0], tm);
985            assert_eq!(v[1], time::macros::time!(0:0:0));
986            assert_eq!(v[2], time::macros::time!(23:59:59));
987
988            let v: Vec<u8> = row.column(13).unwrap();
989            assert_eq!(v, b"test");
990            let v: bool = row.column(14).unwrap();
991            assert!(v);
992            let v: Vec<Vec<u8>> = row.column(15).unwrap();
993            assert_eq!(v[0], b"test");
994            assert_eq!(v[1], b"test2");
995            let v: Vec<bool> = row.column(16).unwrap();
996            assert!(v[0]);
997            assert!(!v[1]);
998            let v: BigDecimal = row.column(17).unwrap();
999            assert_eq!(
1000                v.to_string(),
1001                "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1002            );
1003            let v: BigDecimal = row.column(18).unwrap();
1004            assert_eq!(
1005                v.to_string(),
1006                "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1007            );
1008            let v: BigDecimal = row.column(19).unwrap();
1009            assert_eq!(v.to_string(), "-99999999999999999999999999999.999999999");
1010            let v: BigDecimal = row.column(20).unwrap();
1011            assert_eq!(v.to_string(), "99999999999999999999999999999.999999999");
1012            let v: Vec<BigDecimal> = row.column(21).unwrap();
1013            assert_eq!(
1014                v[0].to_string(),
1015                "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1016            );
1017            assert_eq!(
1018                v[1].to_string(),
1019                "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1020            );
1021        }
1022    }
1023
1024    #[tokio::test(flavor = "multi_thread")]
1025    #[serial]
1026    async fn test_query_table_from_storage() {
1027        test_query_table(None, QueryOption::default().with_enable_storage_read(true)).await
1028    }
1029
1030    #[tokio::test(flavor = "multi_thread")]
1031    #[serial]
1032    async fn test_query_table_from_rest() {
1033        test_query_table(Some(1), QueryOption::default()).await
1034    }
1035
1036    async fn insert(client: &Client, project: &str, dataset: &str, table: &str, size: usize, now: &OffsetDateTime) {
1037        let mut table1 = Table::default();
1038        table1.table_reference.dataset_id = dataset.to_string();
1039        table1.table_reference.project_id = project.to_string();
1040        table1.table_reference.table_id = table.to_string();
1041        table1.schema = Some(create_table_schema());
1042        let _table1 = client.table_client.create(&table1).await.unwrap();
1043        let mut req = InsertAllRequest::<TestData>::default();
1044        for i in 0..size {
1045            req.rows.push(Row {
1046                insert_id: None,
1047                json: TestData::default(i, *now + Duration::from_secs(i as u64)),
1048            });
1049        }
1050        client.tabledata().insert(project, dataset, table, &req).await.unwrap();
1051    }
1052
1053    async fn test_query_table(max_results: Option<i64>, option: QueryOption) {
1054        let dataset = dataset_name("table");
1055        let (client, project_id) = create_client().await;
1056        let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1057        let table = format!("test_query_table_{}", now.unix_timestamp());
1058        insert(&client, &project_id, &dataset, &table, 3, &now).await;
1059
1060        // query
1061        let mut data_as_row: Vec<TestData> = vec![];
1062        let mut iterator_as_row = client
1063            .query_with_option::<query::row::Row>(
1064                &project_id,
1065                QueryRequest {
1066                    max_results,
1067                    query: format!("SELECT * FROM {}.{}", dataset, table),
1068                    ..Default::default()
1069                },
1070                option.clone(),
1071            )
1072            .await
1073            .unwrap();
1074        while let Some(row) = iterator_as_row.next().await.unwrap() {
1075            data_as_row.push(TestData {
1076                col_string: row.column(0).unwrap(),
1077                col_number: row.column(1).unwrap(),
1078                col_number_array: row.column(2).unwrap(),
1079                col_timestamp: row.column(3).unwrap(),
1080                col_json: row.column(4).unwrap(),
1081                col_json_array: row.column(5).unwrap(),
1082                col_struct: row.column(6).unwrap(),
1083                col_struct_array: row.column(7).unwrap(),
1084                col_binary: row.column(8).unwrap(),
1085            });
1086        }
1087        let mut data_as_struct: Vec<TestData> = vec![];
1088        let mut iterator_as_struct = client
1089            .query_with_option::<TestData>(
1090                &project_id,
1091                QueryRequest {
1092                    query: format!("SELECT * FROM {}.{}", dataset, table),
1093                    ..Default::default()
1094                },
1095                option,
1096            )
1097            .await
1098            .unwrap();
1099        while let Some(row) = iterator_as_struct.next().await.unwrap() {
1100            data_as_struct.push(row);
1101        }
1102        assert_eq!(iterator_as_struct.total_size, 3);
1103        assert_eq!(iterator_as_row.total_size, 3);
1104        assert_eq!(data_as_struct.len(), 3);
1105        assert_eq!(data_as_row.len(), 3);
1106
1107        assert_data(&now, data_as_struct);
1108        assert_data(&now, data_as_row);
1109    }
1110
1111    #[tokio::test(flavor = "multi_thread")]
1112    #[serial]
1113    async fn test_read_table() {
1114        let dataset = dataset_name("table");
1115        let (client, project_id) = create_client().await;
1116        let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1117        let table = format!("test_read_table_{}", now.unix_timestamp());
1118        insert(&client, &project_id, &dataset, &table, 3, &now).await;
1119
1120        let table = TableReference {
1121            project_id,
1122            dataset_id: dataset.to_string(),
1123            table_id: table.to_string(),
1124        };
1125        let mut iterator_as_struct = client.read_table::<TestData>(&table, None).await.unwrap();
1126
1127        let option = ReadTableOption {
1128            session_read_options: Some(TableReadOptions {
1129                row_restriction: "col_string = \"test_0\"".to_string(),
1130                ..Default::default()
1131            }),
1132            ..Default::default()
1133        };
1134        let mut iterator_as_row = client
1135            .read_table::<crate::storage::row::Row>(&table, Some(option))
1136            .await
1137            .unwrap();
1138        let mut data_as_row: Vec<TestData> = vec![];
1139        let mut data_as_struct: Vec<TestData> = vec![];
1140        let mut finish1 = false;
1141        let mut finish2 = false;
1142        loop {
1143            tokio::select! {
1144                row = iterator_as_struct.next() => {
1145                    if let Some(row) = row.unwrap() {
1146                        tracing::info!("read struct some");
1147                        data_as_struct.push(row);
1148                    }else {
1149                        tracing::info!("read struct none");
1150                        finish1 = true;
1151                        if finish1 && finish2 {
1152                            break;
1153                        }
1154                    }
1155                },
1156                row = iterator_as_row.next() => {
1157                    if let Some(row) = row.unwrap() {
1158                        tracing::info!("read row some");
1159                        data_as_row.push(TestData {
1160                            col_string: row.column(0).unwrap(),
1161                            col_number: row.column(1).unwrap(),
1162                            col_number_array: row.column(2).unwrap(),
1163                            col_timestamp: row.column(3).unwrap(),
1164                            col_json: row.column(4).unwrap(),
1165                            col_json_array: row.column(5).unwrap(),
1166                            col_struct: row.column(6).unwrap(),
1167                            col_struct_array: row.column(7).unwrap(),
1168                            col_binary: row.column(8).unwrap(),
1169            }           );
1170                    }else {
1171                        tracing::info!("read row none");
1172                        finish2 = true;
1173                        if finish1 && finish2 {
1174                            break;
1175                        }
1176                    }
1177                }
1178            }
1179        }
1180        assert_eq!(data_as_struct.len(), 3);
1181        assert_eq!(data_as_row.len(), 1);
1182
1183        assert_data(&now, data_as_struct);
1184        assert_data(&now, data_as_row);
1185    }
1186
1187    #[tokio::test(flavor = "multi_thread")]
1188    #[serial]
1189    async fn test_query_job_incomplete_from_storage() {
1190        test_query_job_incomplete(None, QueryOption::default().with_enable_storage_read(true)).await
1191    }
1192
1193    #[tokio::test(flavor = "multi_thread")]
1194    #[serial]
1195    async fn test_query_job_incomplete_from_rest() {
1196        test_query_job_incomplete(Some(4999), QueryOption::default()).await
1197    }
1198
1199    async fn test_query_job_incomplete(max_results: Option<i64>, option: QueryOption) {
1200        let dataset = dataset_name("table");
1201        let (client, project_id) = create_client().await;
1202        let now = OffsetDateTime::now_utc();
1203        let table = format!("test_query_job_incomplete_{}", now.unix_timestamp());
1204        const SIZE: usize = 10000;
1205        insert(&client, &project_id, &dataset, &table, SIZE, &now).await;
1206
1207        let mut data: Vec<query::row::Row> = vec![];
1208        let mut iter = client
1209            .query_with_option::<query::row::Row>(
1210                &project_id,
1211                QueryRequest {
1212                    timeout_ms: Some(5), // pass wait_for_query
1213                    use_query_cache: Some(false),
1214                    max_results,
1215                    query: format!("SELECT 1 FROM {}.{}", dataset, table),
1216                    ..Default::default()
1217                },
1218                option,
1219            )
1220            .await
1221            .unwrap();
1222        while let Some(row) = iter.next().await.unwrap() {
1223            data.push(row);
1224        }
1225        assert_eq!(iter.total_size, SIZE as i64);
1226        assert_eq!(data.len(), SIZE);
1227    }
1228
1229    #[derive(Debug, Clone)]
1230    struct Val {
1231        pub val1: String,
1232        pub val2: String,
1233    }
1234
1235    #[tokio::test]
1236    #[serial]
1237    async fn test_query_with_parameter() {
1238        let array_val = [
1239            Val {
1240                val1: "val1-1".to_string(),
1241                val2: "val1-2".to_string(),
1242            },
1243            Val {
1244                val1: "val2-1".to_string(),
1245                val2: "val2-2".to_string(),
1246            },
1247        ];
1248
1249        let query_parameter = QueryParameter {
1250            name: Some("p1".to_string()),
1251            parameter_type: QueryParameterType {
1252                parameter_type: "ARRAY".to_string(),
1253                array_type: Some(Box::new(QueryParameterType {
1254                    parameter_type: "STRUCT".to_string(),
1255                    struct_types: Some(vec![
1256                        QueryParameterStructType {
1257                            name: Some("val1".to_string()),
1258                            field_type: QueryParameterType {
1259                                parameter_type: "STRING".to_string(),
1260                                ..Default::default()
1261                            },
1262                            description: None,
1263                        },
1264                        QueryParameterStructType {
1265                            name: Some("val2".to_string()),
1266                            field_type: QueryParameterType {
1267                                parameter_type: "STRING".to_string(),
1268                                ..Default::default()
1269                            },
1270                            description: None,
1271                        },
1272                    ]),
1273                    array_type: None,
1274                })),
1275                struct_types: None,
1276            },
1277            parameter_value: QueryParameterValue {
1278                array_values: Some(
1279                    array_val
1280                        .iter()
1281                        .map(|val| {
1282                            let mut param_map = HashMap::new();
1283                            param_map.insert(
1284                                "val1".to_string(),
1285                                QueryParameterValue {
1286                                    value: Some(val.val1.clone()),
1287                                    ..Default::default()
1288                                },
1289                            );
1290                            param_map.insert(
1291                                "val2".to_string(),
1292                                QueryParameterValue {
1293                                    value: Some(val.val2.clone()),
1294                                    ..Default::default()
1295                                },
1296                            );
1297                            QueryParameterValue {
1298                                struct_values: Some(param_map),
1299                                value: None,
1300                                array_values: None,
1301                            }
1302                        })
1303                        .collect(),
1304                ),
1305                ..Default::default()
1306            },
1307        };
1308        let (client, project_id) = create_client().await;
1309        let mut result = client
1310            .query::<query::row::Row>(
1311                &project_id,
1312                QueryRequest {
1313                    query: "
1314            WITH VAL AS (SELECT @p1 AS col1)
1315            SELECT
1316                ARRAY(SELECT val1 FROM UNNEST(col1)) AS val1,
1317                ARRAY(SELECT val2 FROM UNNEST(col1)) AS val2
1318            FROM VAL
1319            "
1320                    .to_string(),
1321                    query_parameters: vec![query_parameter],
1322                    ..QueryRequest::default()
1323                },
1324            )
1325            .await
1326            .unwrap();
1327        let row = result.next().await.unwrap().unwrap();
1328        let col = row.column::<Vec<String>>(0).unwrap();
1329        assert_eq!(col[0], "val1-1".to_string());
1330        assert_eq!(col[1], "val2-1".to_string());
1331        let col = row.column::<Vec<String>>(1).unwrap();
1332        assert_eq!(col[0], "val1-2".to_string());
1333        assert_eq!(col[1], "val2-2".to_string());
1334    }
1335
1336    fn assert_data(now: &OffsetDateTime, data: Vec<TestData>) {
1337        for (i, d) in data.iter().enumerate() {
1338            assert_eq!(&TestData::default(i, *now + Duration::from_secs(i as u64)), d);
1339        }
1340    }
1341}
1342
1343#[cfg(test)]
1344mod emulator_tests {
1345    use crate::client::{Client, ClientConfig};
1346    use crate::http::table::{Table, TableFieldSchema, TableFieldType, TableSchema};
1347    use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
1348    use crate::http::tabledata::list::FetchDataRequest;
1349    use futures_util::StreamExt;
1350
1351    use prost::Message;
1352
1353    use std::time::SystemTime;
1354
1355    #[ignore]
1356    #[tokio::test]
1357    async fn test_emulator_use() {
1358        let config = ClientConfig::new_with_emulator("localhost:9060", "http://localhost:9050");
1359
1360        // Create Table
1361        let now = SystemTime::now()
1362            .duration_since(SystemTime::UNIX_EPOCH)
1363            .unwrap()
1364            .as_secs();
1365        let client = Client::new(config).await.unwrap();
1366        let mut table1 = Table::default();
1367        table1.table_reference.dataset_id = "dataset1".to_string();
1368        table1.table_reference.project_id = "local-project".to_string();
1369        table1.table_reference.table_id = format!("table{now}").to_string();
1370        table1.schema = Some(TableSchema {
1371            fields: vec![TableFieldSchema {
1372                name: "col_string".to_string(),
1373                data_type: TableFieldType::String,
1374                ..Default::default()
1375            }],
1376        });
1377        client.table_client.create(&table1).await.unwrap();
1378
1379        // Insert data
1380        let mut req = InsertAllRequest::<serde_json::Value>::default();
1381        req.rows.push(Row {
1382            insert_id: None,
1383            json: serde_json::from_str(
1384                r#"
1385                {"col_string": "test1"}
1386            "#,
1387            )
1388            .unwrap(),
1389        });
1390        client
1391            .tabledata_client
1392            .insert(
1393                &table1.table_reference.project_id,
1394                &table1.table_reference.dataset_id,
1395                &table1.table_reference.table_id,
1396                &req,
1397            )
1398            .await
1399            .unwrap();
1400
1401        // Streaming write
1402        let writer = client.default_storage_writer();
1403        let fqtn = &format!(
1404            "projects/local-project/datasets/dataset1/tables/{}",
1405            table1.table_reference.table_id
1406        );
1407        let stream = writer.create_write_stream(fqtn).await.unwrap();
1408
1409        let mut rows = vec![];
1410        for j in 0..5 {
1411            let data = crate::storage_write::stream::tests::TestData {
1412                col_string: format!("default_{j}"),
1413            };
1414            let mut buf = Vec::new();
1415            data.encode(&mut buf).unwrap();
1416            rows.push(crate::storage_write::stream::tests::create_append_rows_request(vec![
1417                buf.clone(),
1418                buf.clone(),
1419                buf,
1420            ]));
1421        }
1422        let mut result = stream.append_rows(rows).await.unwrap();
1423        while let Some(res) = result.next().await {
1424            let res = res.unwrap();
1425            tracing::info!("append row errors = {:?}", res.row_errors.len());
1426        }
1427
1428        // Read all data
1429        let tref = &table1.table_reference;
1430        let data = client
1431            .tabledata_client
1432            .read(
1433                &tref.project_id,
1434                &tref.dataset_id,
1435                &tref.table_id,
1436                &FetchDataRequest { ..Default::default() },
1437            )
1438            .await
1439            .unwrap();
1440        assert_eq!(16, data.total_rows);
1441
1442        /* TODO fix emulator stream
1443        // Read all data by storage
1444        let opt = ReadTableOption::default()
1445            .with_session_read_options(TableReadOptions::default())
1446            .with_max_stream_count(1);
1447        let mut records= client
1448            .read_table::<crate::storage::row::Row>(&tref, Some(opt))
1449            .await
1450            .unwrap();
1451        let mut count = 0;
1452        while let record = records.next().await.unwrap() {
1453            count += 1;
1454        }
1455        assert_eq!(count, data.total_rows);
1456         */
1457    }
1458}