Skip to main content

gcloud_bigquery/
client.rs

1use backon::{ExponentialBuilder, Retryable};
2use core::time::Duration;
3use google_cloud_gax::conn::{ConnectionOptions, Environment};
4use google_cloud_gax::retry::RetrySetting;
5use google_cloud_googleapis::cloud::bigquery::storage::v1::{
6    read_session, CreateReadSessionRequest, DataFormat, ReadSession,
7};
8use std::borrow::Cow;
9use std::collections::VecDeque;
10use std::fmt::Debug;
11use std::future::Future;
12use std::marker::PhantomData;
13use std::sync::Arc;
14use token_source::{TokenSource, TokenSourceProvider};
15
16use crate::grpc::apiv1::conn_pool::ConnectionManager;
17use crate::http::bigquery_client::BigqueryClient;
18use crate::http::bigquery_dataset_client::BigqueryDatasetClient;
19use crate::http::bigquery_job_client::BigqueryJobClient;
20use crate::http::bigquery_model_client::BigqueryModelClient;
21use crate::http::bigquery_routine_client::BigqueryRoutineClient;
22use crate::http::bigquery_row_access_policy_client::BigqueryRowAccessPolicyClient;
23use crate::http::bigquery_table_client::BigqueryTableClient;
24use crate::http::bigquery_tabledata_client::BigqueryTabledataClient;
25use crate::http::job::get_query_results::GetQueryResultsRequest;
26use crate::http::job::query::QueryRequest;
27use crate::http::job::{is_script, is_select_query, JobConfiguration, JobReference, JobStatistics, JobType};
28use crate::http::table::TableReference;
29use crate::query::{QueryOption, QueryResult};
30use crate::storage;
31use crate::{http, query};
32
33#[cfg(feature = "auth")]
34pub use google_cloud_auth;
35
36const JOB_RETRY_REASONS: [&str; 3] = ["backendError", "rateLimitExceeded", "internalError"];
37
38#[derive(Debug)]
39pub struct HttpClientConfig {
40    client: Option<reqwest_middleware::ClientWithMiddleware>,
41    bigquery_endpoint: Cow<'static, str>,
42    token_source_provider: Box<dyn TokenSourceProvider>,
43    debug: bool,
44}
45
46impl HttpClientConfig {
47    pub fn new_with_emulator(http_addr: impl Into<Cow<'static, str>>) -> Self {
48        Self {
49            client: None,
50            bigquery_endpoint: http_addr.into(),
51            token_source_provider: Box::new(EmptyTokenSourceProvider {}),
52            debug: false,
53        }
54    }
55
56    pub fn new(http_token_source_provider: Box<dyn TokenSourceProvider>) -> Self {
57        Self {
58            client: None,
59            bigquery_endpoint: "https://bigquery.googleapis.com".into(),
60            token_source_provider: http_token_source_provider,
61            debug: false,
62        }
63    }
64
65    pub fn with_debug(mut self, value: bool) -> Self {
66        self.debug = value;
67        self
68    }
69
70    pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
71        self.client = Some(value);
72        self
73    }
74
75    pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
76        self.bigquery_endpoint = value.into();
77        self
78    }
79
80    pub fn create_client(self) -> Arc<BigqueryClient> {
81        let ts = self.token_source_provider.token_source();
82        Arc::new(BigqueryClient::new(
83            ts,
84            self.bigquery_endpoint.as_ref(),
85            self.client
86                .unwrap_or_else(|| reqwest_middleware::ClientBuilder::new(reqwest::Client::default()).build()),
87            self.debug,
88        ))
89    }
90}
91
92#[cfg(feature = "auth")]
93impl HttpClientConfig {
94    fn bigquery_http_auth_config() -> google_cloud_auth::project::Config<'static> {
95        google_cloud_auth::project::Config::default().with_scopes(&http::bigquery_client::SCOPES)
96    }
97
98    ///Creates new token provider for HTTP client
99    pub fn default_token_provider() -> impl Future<
100        Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
101    > + Send
102           + 'static {
103        google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_http_auth_config())
104    }
105
106    ///Creates new token provider for HTTP client with specified `credentials`
107    pub fn default_token_provider_with(
108        credentials: google_cloud_auth::credentials::CredentialsFile,
109    ) -> impl Future<
110        Output = Result<google_cloud_auth::token::DefaultTokenSourceProvider, google_cloud_auth::error::Error>,
111    > + Send
112           + 'static {
113        google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
114            HttpClientConfig::bigquery_http_auth_config(),
115            Box::new(credentials.clone()),
116        )
117    }
118}
119
120#[derive(Debug)]
121pub struct ClientConfig {
122    http: HttpClientConfig,
123    environment: Environment,
124    streaming_read_config: ChannelConfig,
125    streaming_write_config: StreamingWriteConfig,
126}
127
128#[derive(Clone, Debug, Default)]
129pub struct StreamingWriteConfig {
130    channel_config: ChannelConfig,
131    max_insert_count: usize,
132}
133
134impl StreamingWriteConfig {
135    pub fn with_channel_config(mut self, value: ChannelConfig) -> Self {
136        self.channel_config = value;
137        self
138    }
139    pub fn with_max_insert_count(mut self, value: usize) -> Self {
140        self.max_insert_count = value;
141        self
142    }
143}
144
145#[derive(Clone, Debug)]
146pub struct ChannelConfig {
147    /// num_channels is the number of gRPC channels.
148    num_channels: usize,
149    connect_timeout: Option<Duration>,
150    timeout: Option<Duration>,
151    http2_keep_alive_interval: Option<Duration>,
152    keep_alive_timeout: Option<Duration>,
153    keep_alive_while_idle: Option<bool>,
154}
155
156impl ChannelConfig {
157    pub fn with_num_channels(mut self, value: usize) -> Self {
158        self.num_channels = value;
159        self
160    }
161    pub fn with_connect_timeout(mut self, value: Duration) -> Self {
162        self.connect_timeout = Some(value);
163        self
164    }
165    pub fn with_timeout(mut self, value: Duration) -> Self {
166        self.timeout = Some(value);
167        self
168    }
169    pub fn with_http2_keep_alive_interval(mut self, value: Duration) -> Self {
170        self.http2_keep_alive_interval = Some(value);
171        self
172    }
173    pub fn with_keep_alive_timeout(mut self, value: Duration) -> Self {
174        self.keep_alive_timeout = Some(value);
175        self
176    }
177    pub fn with_keep_alive_while_idle(mut self, value: bool) -> Self {
178        self.keep_alive_while_idle = Some(value);
179        self
180    }
181
182    async fn into_connection_manager(
183        self,
184        environment: &Environment,
185    ) -> Result<ConnectionManager, google_cloud_gax::conn::Error> {
186        ConnectionManager::new(
187            self.num_channels,
188            environment,
189            &ConnectionOptions {
190                timeout: self.timeout,
191                connect_timeout: self.connect_timeout,
192                http2_keep_alive_interval: self.http2_keep_alive_interval,
193                keep_alive_timeout: self.keep_alive_timeout,
194                keep_alive_while_idle: self.keep_alive_while_idle,
195            },
196        )
197        .await
198    }
199}
200
201impl Default for ChannelConfig {
202    fn default() -> Self {
203        Self {
204            num_channels: 4,
205            connect_timeout: Some(Duration::from_secs(30)),
206            timeout: None,
207            http2_keep_alive_interval: None,
208            keep_alive_timeout: None,
209            keep_alive_while_idle: None,
210        }
211    }
212}
213
214#[derive(Debug)]
215pub struct EmptyTokenSourceProvider {}
216
217#[derive(Debug)]
218pub struct EmptyTokenSource {}
219
220#[async_trait::async_trait]
221impl TokenSource for EmptyTokenSource {
222    async fn token(&self) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
223        Ok("".to_string())
224    }
225}
226
227impl TokenSourceProvider for EmptyTokenSourceProvider {
228    fn token_source(&self) -> Arc<dyn TokenSource> {
229        Arc::new(EmptyTokenSource {})
230    }
231}
232
233impl ClientConfig {
234    pub fn new_with_emulator(grpc_host: &str, http_addr: impl Into<Cow<'static, str>>) -> Self {
235        Self {
236            http: HttpClientConfig::new_with_emulator(http_addr),
237            environment: Environment::Emulator(grpc_host.to_string()),
238            streaming_read_config: ChannelConfig::default(),
239            streaming_write_config: StreamingWriteConfig::default(),
240        }
241    }
242
243    pub fn new(
244        http_token_source_provider: Box<dyn TokenSourceProvider>,
245        grpc_token_source_provider: Box<dyn TokenSourceProvider>,
246    ) -> Self {
247        Self {
248            http: HttpClientConfig::new(http_token_source_provider),
249            environment: Environment::GoogleCloud(grpc_token_source_provider),
250            streaming_read_config: ChannelConfig::default(),
251            streaming_write_config: StreamingWriteConfig::default(),
252        }
253    }
254
255    pub fn with_debug(mut self, value: bool) -> Self {
256        self.http.debug = value;
257        self
258    }
259
260    pub fn with_streaming_read_config(mut self, value: ChannelConfig) -> Self {
261        self.streaming_read_config = value;
262        self
263    }
264
265    pub fn with_streaming_write_config(mut self, value: StreamingWriteConfig) -> Self {
266        self.streaming_write_config = value;
267        self
268    }
269
270    pub fn with_http_client(mut self, value: reqwest_middleware::ClientWithMiddleware) -> Self {
271        self.http.client = Some(value);
272        self
273    }
274
275    pub fn with_endpoint(mut self, value: impl Into<Cow<'static, str>>) -> Self {
276        self.http.bigquery_endpoint = value.into();
277        self
278    }
279}
280
281use crate::http::job::get::GetJobRequest;
282use crate::http::job::list::ListJobsRequest;
283
284use crate::grpc::apiv1::bigquery_client::StreamingReadClient;
285use crate::storage_write::stream::{buffered, committed, default, pending};
286use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
287
288#[cfg(feature = "auth")]
289impl ClientConfig {
290    pub async fn new_with_auth() -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
291        let ts_http = HttpClientConfig::default_token_provider().await?;
292        let ts_grpc =
293            google_cloud_auth::token::DefaultTokenSourceProvider::new(Self::bigquery_grpc_auth_config()).await?;
294        let project_id = ts_grpc.project_id.clone();
295        let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
296        Ok((config, project_id))
297    }
298
299    pub async fn new_with_credentials(
300        credentials: google_cloud_auth::credentials::CredentialsFile,
301    ) -> Result<(Self, Option<String>), google_cloud_auth::error::Error> {
302        let ts_http = HttpClientConfig::default_token_provider_with(credentials.clone()).await?;
303        let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
304            Self::bigquery_grpc_auth_config(),
305            Box::new(credentials),
306        )
307        .await?;
308        let project_id = ts_grpc.project_id.clone();
309        let config = Self::new(Box::new(ts_http), Box::new(ts_grpc));
310        Ok((config, project_id))
311    }
312
313    fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
314        google_cloud_auth::project::Config::default()
315            .with_audience(crate::grpc::apiv1::conn_pool::AUDIENCE)
316            .with_scopes(&crate::grpc::apiv1::conn_pool::SCOPES)
317    }
318}
319
320#[derive(thiserror::Error, Debug)]
321pub enum QueryError {
322    #[error(transparent)]
323    Storage(#[from] storage::Error),
324    #[error(transparent)]
325    JobHttp(#[from] http::error::Error),
326    #[error("job has no destination table to read : job={0:?}")]
327    NoDestinationTable(JobReference),
328    #[error("failed to resolve table for script job: no child jobs found : job={0:?}")]
329    NoChildJobs(JobReference),
330    #[error("job type must be query: job={0:?}, jobType={1:?}")]
331    InvalidJobType(JobReference, String),
332    #[error(transparent)]
333    RunQuery(#[from] query::run::Error),
334}
335
336#[derive(Clone)]
337pub struct Client {
338    dataset_client: BigqueryDatasetClient,
339    table_client: BigqueryTableClient,
340    tabledata_client: BigqueryTabledataClient,
341    job_client: BigqueryJobClient,
342    routine_client: BigqueryRoutineClient,
343    row_access_policy_client: BigqueryRowAccessPolicyClient,
344    model_client: BigqueryModelClient,
345    streaming_read_conn_pool: Arc<ConnectionManager>,
346    streaming_write_conn_pool: Arc<ConnectionManager>,
347    streaming_write_max_insert_count: usize,
348}
349
350impl Client {
351    /// New client
352    pub async fn new(config: ClientConfig) -> Result<Self, google_cloud_gax::conn::Error> {
353        let client = config.http.create_client();
354
355        Ok(Self {
356            dataset_client: BigqueryDatasetClient::new(client.clone()),
357            table_client: BigqueryTableClient::new(client.clone()),
358            tabledata_client: BigqueryTabledataClient::new(client.clone()),
359            job_client: BigqueryJobClient::new(client.clone()),
360            routine_client: BigqueryRoutineClient::new(client.clone()),
361            row_access_policy_client: BigqueryRowAccessPolicyClient::new(client.clone()),
362            model_client: BigqueryModelClient::new(client.clone()),
363            streaming_read_conn_pool: Arc::new(
364                config
365                    .streaming_read_config
366                    .into_connection_manager(&config.environment)
367                    .await?,
368            ),
369            streaming_write_conn_pool: Arc::new(
370                config
371                    .streaming_write_config
372                    .channel_config
373                    .into_connection_manager(&config.environment)
374                    .await?,
375            ),
376            streaming_write_max_insert_count: config.streaming_write_config.max_insert_count,
377        })
378    }
379
380    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets
381    /// [BigqueryDatasetClient](crate::http::bigquery_dataset_client::BigqueryDatasetClient)
382    pub fn dataset(&self) -> &BigqueryDatasetClient {
383        &self.dataset_client
384    }
385
386    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables
387    /// [BigqueryTableClient](crate::http::bigquery_table_client::BigqueryTableClient)
388    pub fn table(&self) -> &BigqueryTableClient {
389        &self.table_client
390    }
391
392    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata
393    /// [BigqueryTabledataClient](crate::http::bigquery_tabledata_client::BigqueryTabledataClient)
394    pub fn tabledata(&self) -> &BigqueryTabledataClient {
395        &self.tabledata_client
396    }
397
398    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs
399    /// [BigqueryJobClient](crate::http::bigquery_job_client::BigqueryJobClient)
400    pub fn job(&self) -> &BigqueryJobClient {
401        &self.job_client
402    }
403
404    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/routines
405    /// [BigqueryRoutineClient](crate::http::bigquery_routine_client::BigqueryRoutineClient)
406    pub fn routine(&self) -> &BigqueryRoutineClient {
407        &self.routine_client
408    }
409
410    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/rowAccessPolicy
411    /// [BigqueryRowAccessPolicyClient](crate::http::bigquery_row_access_policy_client::BigqueryRowAccessPolicyClient)
412    pub fn row_access_policy(&self) -> &BigqueryRowAccessPolicyClient {
413        &self.row_access_policy_client
414    }
415
416    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/models
417    /// [BigqueryModelClient](crate::http::bigquery_model_client::BigqueryModelClient)
418    pub fn model(&self) -> &BigqueryModelClient {
419        &self.model_client
420    }
421
422    /// Creates a new pending type storage writer for the specified table.
423    /// https://cloud.google.com/bigquery/docs/write-api#pending_type
424    /// ```
425    /// use prost_types::DescriptorProto;
426    /// use google_cloud_bigquery::client::Client;
427    /// use google_cloud_gax::grpc::Status;
428    /// use prost::Message;
429    /// use tokio::sync::futures;
430    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
431    /// use futures_util::stream::StreamExt;
432    ///
433    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
434    /// -> Result<(), Status> {
435    ///     let mut writer = client.pending_storage_writer(table);
436    ///     let stream = writer.create_write_stream().await?;
437    ///
438    ///     let mut data= vec![];
439    ///     for row in rows {
440    ///         let mut buf = Vec::new();
441    ///         row.encode(&mut buf).unwrap();
442    ///         data.push(buf);
443    ///     }
444    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
445    ///     while let Some(Ok(res)) = result.next().await {
446    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
447    ///     }
448    ///
449    ///     let _ = stream.finalize().await?;
450    ///     let _ = writer.commit().await?;
451    ///     Ok(())
452    /// }
453    /// ```
454    pub fn pending_storage_writer(&self, table: &str) -> pending::Writer {
455        pending::Writer::new(1, self.streaming_write_conn_pool.clone(), table.to_string())
456    }
457
458    /// Creates a new default type storage writer.
459    /// https://cloud.google.com/bigquery/docs/write-api#default_stream
460    /// ```
461    /// use prost_types::DescriptorProto;
462    /// use google_cloud_bigquery::client::Client;
463    /// use google_cloud_gax::grpc::Status;
464    /// use prost::Message;
465    /// use tokio::sync::futures;
466    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
467    /// use futures_util::stream::StreamExt;
468    ///
469    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
470    /// -> Result<(), Status> {
471    ///     let writer = client.default_storage_writer();
472    ///     let stream = writer.create_write_stream(table).await?;
473    ///
474    ///     let mut data= vec![];
475    ///     for row in rows {
476    ///         let mut buf = Vec::new();
477    ///         row.encode(&mut buf).unwrap();
478    ///         data.push(buf);
479    ///     }
480    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
481    ///     while let Some(Ok(res)) = result.next().await {
482    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
483    ///     }
484    ///     Ok(())
485    /// }
486    /// ```
487    pub fn default_storage_writer(&self) -> default::Writer {
488        default::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
489    }
490
491    /// Creates a new committed type storage writer.
492    /// https://cloud.google.com/bigquery/docs/write-api#committed_type
493    /// ```
494    /// use prost_types::DescriptorProto;
495    /// use google_cloud_bigquery::client::Client;
496    /// use google_cloud_gax::grpc::Status;
497    /// use prost::Message;
498    /// use tokio::sync::futures;
499    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
500    /// use futures_util::stream::StreamExt;
501    ///
502    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
503    /// -> Result<(), Status> {
504    ///     let writer = client.committed_storage_writer();
505    ///     let stream = writer.create_write_stream(table).await?;
506    ///
507    ///     let mut data= vec![];
508    ///     for row in rows {
509    ///         let mut buf = Vec::new();
510    ///         row.encode(&mut buf).unwrap();
511    ///         data.push(buf);
512    ///     }
513    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
514    ///     while let Some(Ok(res)) = result.next().await {
515    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
516    ///     }
517    ///
518    ///     let _ = stream.finalize().await?;
519    ///     Ok(())
520    /// }
521    /// ```
522    pub fn committed_storage_writer(&self) -> committed::Writer {
523        committed::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
524    }
525
526    /// Creates a new buffered type storage writer.
527    /// https://cloud.google.com/bigquery/docs/write-api#buffered_type
528    /// ```
529    /// use prost_types::DescriptorProto;
530    /// use google_cloud_bigquery::client::Client;
531    /// use prost::Message;
532    /// use tokio::sync::futures;
533    /// use google_cloud_bigquery::storage_write::AppendRowsRequestBuilder;
534    /// use futures_util::stream::StreamExt;
535    /// use google_cloud_gax::grpc::Status;
536    ///
537    /// pub async fn run<T: Message>(client: &Client, table: &str, rows: Vec<T>, schema: DescriptorProto)
538    /// -> Result<(), Status> {
539    ///     let writer = client.buffered_storage_writer();
540    ///     let stream = writer.create_write_stream(table).await?;
541    ///
542    ///     let mut data= vec![];
543    ///     for row in rows {
544    ///         let mut buf = Vec::new();
545    ///         row.encode(&mut buf).unwrap();
546    ///         data.push(buf);
547    ///     }
548    ///     let mut result = stream.append_rows(vec![AppendRowsRequestBuilder::new(schema, data)]).await.unwrap();
549    ///     while let Some(Ok(res)) = result.next().await {
550    ///         tracing::info!("append row errors = {:?}", res.row_errors.len());
551    ///     }
552    ///     let _ = stream.flush_rows(Some(0)).await?;
553    ///     let _ = stream.finalize().await?;
554    ///     Ok(())
555    /// }
556    /// ```
557    pub fn buffered_storage_writer(&self) -> buffered::Writer {
558        buffered::Writer::new(self.streaming_write_max_insert_count, self.streaming_write_conn_pool.clone())
559    }
560
561    /// Run query job and get result.
562    /// ```rust
563    /// use google_cloud_bigquery::http::job::query::QueryRequest;
564    /// use google_cloud_bigquery::query::row::Row;
565    /// use google_cloud_bigquery::client::Client;
566    ///
567    /// async fn run(client: &Client, project_id: &str) {
568    ///     let request = QueryRequest {
569    ///         query: "SELECT * FROM dataset.table".to_string(),
570    ///         ..Default::default()
571    ///     };
572    ///     let mut iter = client.query::<Row>(project_id, request).await.unwrap();
573    ///     while let Some(row) = iter.next().await.unwrap() {
574    ///         let col1 = row.column::<String>(0);
575    ///         let col2 = row.column::<Option<String>>(1);
576    ///     }
577    /// }
578    pub async fn query<T>(&self, project_id: &str, request: QueryRequest) -> Result<query::Iterator<T>, QueryError>
579    where
580        T: http::query::value::StructDecodable + storage::value::StructDecodable,
581    {
582        self.query_with_option(project_id, request, QueryOption::default())
583            .await
584    }
585
586    /// Run query job and get result.
587    /// ```rust
588    /// use google_cloud_bigquery::http::job::query::QueryRequest;
589    /// use google_cloud_bigquery::query::row::Row;
590    /// use google_cloud_bigquery::client::Client;
591    /// use google_cloud_bigquery::query::QueryOption;
592    /// use google_cloud_bigquery::query::ExponentialBuilder;
593    ///
594    /// async fn run(client: &Client, project_id: &str) {
595    ///     let request = QueryRequest {
596    ///         query: "SELECT * FROM dataset.table".to_string(),
597    ///         ..Default::default()
598    ///     };
599    ///     let retry = ExponentialBuilder::default().with_max_times(10);
600    ///     let option = QueryOption::default().with_retry(retry).with_enable_storage_read(true);
601    ///     let mut iter = client.query_with_option::<Row>(project_id, request, option).await.unwrap();
602    ///     while let Some(row) = iter.next().await.unwrap() {
603    ///         let col1 = row.column::<String>(0);
604    ///         let col2 = row.column::<Option<String>>(1);
605    ///     }
606    /// }
607    pub async fn query_with_option<T>(
608        &self,
609        project_id: &str,
610        request: QueryRequest,
611        option: QueryOption,
612    ) -> Result<query::Iterator<T>, QueryError>
613    where
614        T: http::query::value::StructDecodable + storage::value::StructDecodable,
615    {
616        let result = self.job_client.query(project_id, &request).await?;
617        let (total_rows, page_token, rows, force_first_fetch) = if result.job_complete {
618            (
619                result.total_rows.unwrap_or_default(),
620                result.page_token,
621                result.rows.unwrap_or_default(),
622                false,
623            )
624        } else {
625            (
626                self.wait_for_query(&result.job_reference, option.retry, &request.timeout_ms)
627                    .await?,
628                None,
629                vec![],
630                true,
631            )
632        };
633
634        //use storage api instead of rest API
635        if option.enable_storage_read && (page_token.is_none() || page_token.as_ref().unwrap().is_empty()) {
636            tracing::trace!("use storage read api for query {:?}", result.job_reference);
637            let job = self
638                .job_client
639                .get(
640                    &result.job_reference.project_id,
641                    &result.job_reference.job_id,
642                    &GetJobRequest {
643                        location: result.job_reference.location.clone(),
644                    },
645                )
646                .await?;
647            let iter = self
648                .new_storage_row_iterator_from_job::<T>(job.job_reference, job.statistics, job.configuration)
649                .await?;
650            return Ok(query::Iterator {
651                inner: QueryResult::Storage(iter),
652                total_size: total_rows,
653            });
654        }
655
656        let http_query_iterator = http::query::Iterator {
657            client: self.job_client.clone(),
658            project_id: result.job_reference.project_id,
659            job_id: result.job_reference.job_id,
660            request: GetQueryResultsRequest {
661                start_index: 0,
662                page_token,
663                max_results: request.max_results,
664                timeout_ms: request.timeout_ms,
665                location: result.job_reference.location,
666                format_options: request.format_options,
667            },
668            chunk: VecDeque::from(rows),
669            total_size: total_rows,
670            force_first_fetch,
671            _marker: PhantomData,
672        };
673        Ok(query::Iterator {
674            inner: QueryResult::Http(http_query_iterator),
675            total_size: total_rows,
676        })
677    }
678
679    async fn new_storage_row_iterator_from_job<T>(
680        &self,
681        mut job: JobReference,
682        mut statistics: Option<JobStatistics>,
683        mut config: JobConfiguration,
684    ) -> Result<storage::Iterator<T>, QueryError>
685    where
686        T: http::query::value::StructDecodable + storage::value::StructDecodable,
687    {
688        loop {
689            tracing::trace!("check child job result {:?}, {:?}, {:?}", job, statistics, config);
690            let query_config = match &config.job {
691                JobType::Query(config) => config,
692                _ => return Err(QueryError::InvalidJobType(job.clone(), config.job_type.clone())),
693            };
694            if let Some(dst) = &query_config.destination_table {
695                return Ok(self.read_table(dst, None).await?);
696            }
697            if !is_script(&statistics, &config) {
698                return Err(QueryError::NoDestinationTable(job.clone()));
699            }
700            let children = self
701                .job_client
702                .list(
703                    &job.project_id,
704                    &ListJobsRequest {
705                        parent_job_id: job.job_id.to_string(),
706                        ..Default::default()
707                    },
708                )
709                .await?;
710
711            let mut found = false;
712            for j in children.into_iter() {
713                if !is_select_query(&j.statistics, &j.configuration) {
714                    continue;
715                }
716                job = j.job_reference;
717                statistics = j.statistics;
718                config = j.configuration;
719                found = true;
720                break;
721            }
722            if !found {
723                break;
724            }
725        }
726        Err(QueryError::NoChildJobs(job.clone()))
727    }
728
729    async fn wait_for_query(
730        &self,
731        job: &JobReference,
732        builder: ExponentialBuilder,
733        timeout_ms: &Option<i64>,
734    ) -> Result<i64, query::run::Error> {
735        // Use get_query_results only to wait for completion, not to read results.
736        let request = GetQueryResultsRequest {
737            max_results: Some(0),
738            timeout_ms: *timeout_ms,
739            location: job.location.clone(),
740            ..Default::default()
741        };
742        let action = || async {
743            tracing::debug!("waiting for job completion {:?}", job);
744            let result = self
745                .job_client
746                .get_query_results(&job.project_id, &job.job_id, &request)
747                .await
748                .map_err(query::run::Error::Http)?;
749            if result.job_complete {
750                Ok(result.total_rows)
751            } else {
752                Err(query::run::Error::JobIncomplete)
753            }
754        };
755        action
756            .retry(builder)
757            .when(|e: &query::run::Error| match e {
758                query::run::Error::JobIncomplete => true,
759                query::run::Error::Http(http::error::Error::HttpClient(_)) => true,
760                query::run::Error::Http(http::error::Error::Response(r)) => r.is_retryable(&JOB_RETRY_REASONS),
761                _ => false,
762            })
763            .await
764    }
765
766    /// Read table data by BigQuery Storage Read API.
767    /// ```rust
768    /// use google_cloud_bigquery::storage::row::Row;
769    /// use google_cloud_bigquery::client::Client;
770    /// use google_cloud_bigquery::http::table::TableReference;
771    ///
772    /// async fn run(client: &Client, project_id: &str) {
773    ///     let table = TableReference {
774    ///         project_id: project_id.to_string(),
775    ///         dataset_id: "dataset".to_string(),
776    ///         table_id: "table".to_string(),
777    ///     };
778    ///     let mut iter = client.read_table::<Row>(&table, None).await.unwrap();
779    ///     while let Some(row) = iter.next().await.unwrap() {
780    ///         let col1 = row.column::<String>(0);
781    ///         let col2 = row.column::<Option<String>>(1);
782    ///     }
783    /// }
784    /// ```
785    pub async fn read_table<T>(
786        &self,
787        table: &TableReference,
788        option: Option<ReadTableOption>,
789    ) -> Result<storage::Iterator<T>, storage::Error>
790    where
791        T: storage::value::StructDecodable,
792    {
793        let option = option.unwrap_or_default();
794
795        let mut client = StreamingReadClient::new(BigQueryReadClient::new(self.streaming_read_conn_pool.conn()));
796        let read_session = client
797            .create_read_session(
798                CreateReadSessionRequest {
799                    parent: format!("projects/{}", table.project_id),
800                    read_session: Some(ReadSession {
801                        name: "".to_string(),
802                        expire_time: None,
803                        data_format: DataFormat::Arrow.into(),
804                        table: table.resource(),
805                        table_modifiers: option.session_table_modifiers,
806                        read_options: option.session_read_options,
807                        streams: vec![],
808                        estimated_total_bytes_scanned: 0,
809                        estimated_total_physical_file_size: 0,
810                        estimated_row_count: 0,
811                        trace_id: "".to_string(),
812                        schema: option.session_schema,
813                    }),
814                    max_stream_count: option.max_stream_count,
815                    preferred_min_stream_count: 0,
816                },
817                option.session_retry_setting,
818            )
819            .await?
820            .into_inner();
821        storage::Iterator::new(client, read_session, option.read_rows_retry_setting).await
822    }
823}
824
825#[derive(Debug, Default, Clone)]
826pub struct ReadTableOption {
827    session_read_options: Option<read_session::TableReadOptions>,
828    session_table_modifiers: Option<read_session::TableModifiers>,
829    session_schema: Option<read_session::Schema>,
830    session_retry_setting: Option<RetrySetting>,
831    read_rows_retry_setting: Option<RetrySetting>,
832    max_stream_count: i32,
833}
834
835impl ReadTableOption {
836    pub fn with_session_read_options(mut self, value: read_session::TableReadOptions) -> Self {
837        self.session_read_options = Some(value);
838        self
839    }
840
841    pub fn with_session_table_modifiers(mut self, value: read_session::TableModifiers) -> Self {
842        self.session_table_modifiers = Some(value);
843        self
844    }
845
846    pub fn with_session_schema(mut self, value: read_session::Schema) -> Self {
847        self.session_schema = Some(value);
848        self
849    }
850
851    pub fn with_session_retry_setting(mut self, value: RetrySetting) -> Self {
852        self.session_retry_setting = Some(value);
853        self
854    }
855
856    pub fn with_read_rows_retry_setting(mut self, value: RetrySetting) -> Self {
857        self.read_rows_retry_setting = Some(value);
858        self
859    }
860
861    pub fn with_max_stream_count(mut self, value: i32) -> Self {
862        self.max_stream_count = value;
863        self
864    }
865}
866
867#[cfg(test)]
868mod tests {
869    use bigdecimal::BigDecimal;
870
871    use serial_test::serial;
872    use std::collections::HashMap;
873    use std::ops::AddAssign;
874    use std::time::Duration;
875    use time::{Date, OffsetDateTime, Time};
876
877    use google_cloud_googleapis::cloud::bigquery::storage::v1::read_session::TableReadOptions;
878
879    use crate::client::{Client, ClientConfig, ReadTableOption};
880    use crate::http::bigquery_client::test::{create_table_schema, dataset_name, TestData};
881    use crate::http::job::query::QueryRequest;
882    use crate::http::table::{Table, TableReference};
883    use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
884    use crate::http::types::{QueryParameter, QueryParameterStructType, QueryParameterType, QueryParameterValue};
885    use crate::query;
886    use crate::query::QueryOption;
887
888    #[ctor::ctor]
889    fn init() {
890        let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
891            .add_directive("google_cloud_bigquery=trace".parse().unwrap());
892        let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
893    }
894
895    async fn create_client() -> (Client, String) {
896        let (client_config, project_id) = ClientConfig::new_with_auth().await.unwrap();
897        (Client::new(client_config).await.unwrap(), project_id.unwrap())
898    }
899
900    #[tokio::test]
901    #[serial]
902    async fn test_query_from_storage() {
903        let option = QueryOption::default().with_enable_storage_read(true);
904        test_query(option).await
905    }
906
907    #[tokio::test]
908    #[serial]
909    async fn test_query_from_rest() {
910        let option = QueryOption::default();
911        test_query(option).await
912    }
913
914    async fn test_query(option: QueryOption) {
915        let (client, project_id) = create_client().await;
916        let mut iterator = client
917            .query_with_option::<query::row::Row>(
918                &project_id,
919                QueryRequest {
920                    max_results: Some(2),
921                    query: "SELECT
922                        'A',
923                        TIMESTAMP_MICROS(1230219000000019),
924                        100,
925                        0.432899,
926                        DATE(2023,9,1),
927                        TIME(15, 30, 01),
928                        NULL,
929                        ['A','B'],
930                        [TIMESTAMP_MICROS(1230219000000019), TIMESTAMP_MICROS(1230219000000020)],
931                        [100,200],
932                        [0.432899,0.432900],
933                        [DATE(2023,9,1),DATE(2023,9,2)],
934                        [TIME_ADD(TIME(15,30,1), INTERVAL 10 MICROSECOND),TIME(0, 0, 0),TIME(23,59,59)],
935                        b'test',
936                        true,
937                        [b'test',b'test2'],
938                        [true,false],
939                        cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),
940                        cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC),
941                        cast('-9.9999999999999999999999999999999999999E+28' as NUMERIC),
942                        cast('9.9999999999999999999999999999999999999E+28' as NUMERIC),
943                        [cast('-5.7896044618658097711785492504343953926634992332820282019728792003956564819968E+38' as BIGNUMERIC),cast('5.7896044618658097711785492504343953926634992332820282019728792003956564819967E+38' as BIGNUMERIC)]
944                    ".to_string(),
945                    ..Default::default()
946                },
947                option,
948            )
949            .await
950            .unwrap();
951
952        assert_eq!(1, iterator.total_size);
953
954        while let Some(row) = iterator.next().await.unwrap() {
955            let v: String = row.column(0).unwrap();
956            assert_eq!(v, "A");
957            let v: OffsetDateTime = row.column(1).unwrap();
958            assert_eq!(v.unix_timestamp_nanos(), 1230219000000019000);
959            let v: i64 = row.column(2).unwrap();
960            assert_eq!(v, 100);
961            let v: f64 = row.column(3).unwrap();
962            assert_eq!(v, 0.432899);
963            let v: Date = row.column(4).unwrap();
964            assert_eq!(v, time::macros::date!(2023 - 09 - 01));
965            let v: Time = row.column(5).unwrap();
966            assert_eq!(v, time::macros::time!(15:30:01));
967            let v: Option<String> = row.column(6).unwrap();
968            assert!(v.is_none());
969            let v: Option<OffsetDateTime> = row.column(6).unwrap();
970            assert!(v.is_none());
971            let v: Option<i64> = row.column(6).unwrap();
972            assert!(v.is_none());
973            let v: Option<f64> = row.column(6).unwrap();
974            assert!(v.is_none());
975            let v: Option<Date> = row.column(6).unwrap();
976            assert!(v.is_none());
977            let v: Option<Time> = row.column(6).unwrap();
978            assert!(v.is_none());
979            let v: Option<Vec<Time>> = row.column(6).unwrap();
980            assert!(v.is_none());
981            let v: Option<BigDecimal> = row.column(6).unwrap();
982            assert!(v.is_none());
983            let v: Option<bool> = row.column(6).unwrap();
984            assert!(v.is_none());
985            let v: Option<String> = row.column(6).unwrap();
986            assert!(v.is_none());
987            let v: Option<Vec<u8>> = row.column(6).unwrap();
988            assert!(v.is_none());
989
990            let v: Vec<String> = row.column(7).unwrap();
991            assert_eq!(v, vec!["A", "B"]);
992            let v: Vec<OffsetDateTime> = row.column(8).unwrap();
993            assert_eq!(v[0].unix_timestamp_nanos(), 1230219000000019000);
994            assert_eq!(v[1].unix_timestamp_nanos(), 1230219000000020000);
995            let v: Vec<i64> = row.column(9).unwrap();
996            assert_eq!(v, vec![100, 200]);
997            let v: Vec<f64> = row.column(10).unwrap();
998            assert_eq!(v, vec![0.432899, 0.432900]);
999            let v: Vec<Date> = row.column(11).unwrap();
1000            assert_eq!(v[0], time::macros::date!(2023 - 09 - 01));
1001            assert_eq!(v[1], time::macros::date!(2023 - 09 - 02));
1002            let v: Vec<Time> = row.column(12).unwrap();
1003            let mut tm = time::macros::time!(15:30:01);
1004            tm.add_assign(Duration::from_micros(10));
1005            assert_eq!(v[0], tm);
1006            assert_eq!(v[1], time::macros::time!(0:0:0));
1007            assert_eq!(v[2], time::macros::time!(23:59:59));
1008
1009            let v: Vec<u8> = row.column(13).unwrap();
1010            assert_eq!(v, b"test");
1011            let v: bool = row.column(14).unwrap();
1012            assert!(v);
1013            let v: Vec<Vec<u8>> = row.column(15).unwrap();
1014            assert_eq!(v[0], b"test");
1015            assert_eq!(v[1], b"test2");
1016            let v: Vec<bool> = row.column(16).unwrap();
1017            assert!(v[0]);
1018            assert!(!v[1]);
1019            let v: BigDecimal = row.column(17).unwrap();
1020            assert_eq!(
1021                v.to_string(),
1022                "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1023            );
1024            let v: BigDecimal = row.column(18).unwrap();
1025            assert_eq!(
1026                v.to_string(),
1027                "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1028            );
1029            let v: BigDecimal = row.column(19).unwrap();
1030            assert_eq!(v.to_string(), "-99999999999999999999999999999.999999999");
1031            let v: BigDecimal = row.column(20).unwrap();
1032            assert_eq!(v.to_string(), "99999999999999999999999999999.999999999");
1033            let v: Vec<BigDecimal> = row.column(21).unwrap();
1034            assert_eq!(
1035                v[0].to_string(),
1036                "-578960446186580977117854925043439539266.34992332820282019728792003956564819968"
1037            );
1038            assert_eq!(
1039                v[1].to_string(),
1040                "578960446186580977117854925043439539266.34992332820282019728792003956564819967"
1041            );
1042        }
1043    }
1044
1045    #[tokio::test(flavor = "multi_thread")]
1046    #[serial]
1047    async fn test_query_table_from_storage() {
1048        test_query_table(None, QueryOption::default().with_enable_storage_read(true)).await
1049    }
1050
1051    #[tokio::test(flavor = "multi_thread")]
1052    #[serial]
1053    async fn test_query_table_from_rest() {
1054        test_query_table(Some(1), QueryOption::default()).await
1055    }
1056
1057    async fn insert(client: &Client, project: &str, dataset: &str, table: &str, size: usize, now: &OffsetDateTime) {
1058        let mut table1 = Table::default();
1059        table1.table_reference.dataset_id = dataset.to_string();
1060        table1.table_reference.project_id = project.to_string();
1061        table1.table_reference.table_id = table.to_string();
1062        table1.schema = Some(create_table_schema());
1063        let _table1 = client.table_client.create(&table1).await.unwrap();
1064        let mut req = InsertAllRequest::<TestData>::default();
1065        for i in 0..size {
1066            req.rows.push(Row {
1067                insert_id: None,
1068                json: TestData::default(i, *now + Duration::from_secs(i as u64)),
1069            });
1070        }
1071        client.tabledata().insert(project, dataset, table, &req).await.unwrap();
1072    }
1073
1074    async fn test_query_table(max_results: Option<i64>, option: QueryOption) {
1075        let dataset = dataset_name("table");
1076        let (client, project_id) = create_client().await;
1077        let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1078        let table = format!("test_query_table_{}", now.unix_timestamp());
1079        insert(&client, &project_id, &dataset, &table, 3, &now).await;
1080
1081        // query
1082        let mut data_as_row: Vec<TestData> = vec![];
1083        let mut iterator_as_row = client
1084            .query_with_option::<query::row::Row>(
1085                &project_id,
1086                QueryRequest {
1087                    max_results,
1088                    query: format!("SELECT * FROM {dataset}.{table}"),
1089                    ..Default::default()
1090                },
1091                option.clone(),
1092            )
1093            .await
1094            .unwrap();
1095        while let Some(row) = iterator_as_row.next().await.unwrap() {
1096            data_as_row.push(TestData {
1097                col_string: row.column(0).unwrap(),
1098                col_number: row.column(1).unwrap(),
1099                col_number_array: row.column(2).unwrap(),
1100                col_timestamp: row.column(3).unwrap(),
1101                col_json: row.column(4).unwrap(),
1102                col_json_array: row.column(5).unwrap(),
1103                col_struct: row.column(6).unwrap(),
1104                col_struct_array: row.column(7).unwrap(),
1105                col_binary: row.column(8).unwrap(),
1106            });
1107        }
1108        let mut data_as_struct: Vec<TestData> = vec![];
1109        let mut iterator_as_struct = client
1110            .query_with_option::<TestData>(
1111                &project_id,
1112                QueryRequest {
1113                    query: format!("SELECT * FROM {dataset}.{table}"),
1114                    ..Default::default()
1115                },
1116                option,
1117            )
1118            .await
1119            .unwrap();
1120        while let Some(row) = iterator_as_struct.next().await.unwrap() {
1121            data_as_struct.push(row);
1122        }
1123        assert_eq!(iterator_as_struct.total_size, 3);
1124        assert_eq!(iterator_as_row.total_size, 3);
1125        assert_eq!(data_as_struct.len(), 3);
1126        assert_eq!(data_as_row.len(), 3);
1127
1128        assert_data(&now, data_as_struct);
1129        assert_data(&now, data_as_row);
1130    }
1131
1132    #[tokio::test(flavor = "multi_thread")]
1133    #[serial]
1134    async fn test_read_table() {
1135        let dataset = dataset_name("table");
1136        let (client, project_id) = create_client().await;
1137        let now = OffsetDateTime::from_unix_timestamp(OffsetDateTime::now_utc().unix_timestamp()).unwrap();
1138        let table = format!("test_read_table_{}", now.unix_timestamp());
1139        insert(&client, &project_id, &dataset, &table, 3, &now).await;
1140
1141        let table = TableReference {
1142            project_id,
1143            dataset_id: dataset.to_string(),
1144            table_id: table.to_string(),
1145        };
1146        let mut iterator_as_struct = client.read_table::<TestData>(&table, None).await.unwrap();
1147
1148        let option = ReadTableOption {
1149            session_read_options: Some(TableReadOptions {
1150                row_restriction: "col_string = \"test_0\"".to_string(),
1151                ..Default::default()
1152            }),
1153            ..Default::default()
1154        };
1155        let mut iterator_as_row = client
1156            .read_table::<crate::storage::row::Row>(&table, Some(option))
1157            .await
1158            .unwrap();
1159        let mut data_as_row: Vec<TestData> = vec![];
1160        let mut data_as_struct: Vec<TestData> = vec![];
1161        let mut finish1 = false;
1162        let mut finish2 = false;
1163        loop {
1164            tokio::select! {
1165                row = iterator_as_struct.next() => {
1166                    if let Some(row) = row.unwrap() {
1167                        tracing::info!("read struct some");
1168                        data_as_struct.push(row);
1169                    }else {
1170                        tracing::info!("read struct none");
1171                        finish1 = true;
1172                        if finish1 && finish2 {
1173                            break;
1174                        }
1175                    }
1176                },
1177                row = iterator_as_row.next() => {
1178                    if let Some(row) = row.unwrap() {
1179                        tracing::info!("read row some");
1180                        data_as_row.push(TestData {
1181                            col_string: row.column(0).unwrap(),
1182                            col_number: row.column(1).unwrap(),
1183                            col_number_array: row.column(2).unwrap(),
1184                            col_timestamp: row.column(3).unwrap(),
1185                            col_json: row.column(4).unwrap(),
1186                            col_json_array: row.column(5).unwrap(),
1187                            col_struct: row.column(6).unwrap(),
1188                            col_struct_array: row.column(7).unwrap(),
1189                            col_binary: row.column(8).unwrap(),
1190            }           );
1191                    }else {
1192                        tracing::info!("read row none");
1193                        finish2 = true;
1194                        if finish1 && finish2 {
1195                            break;
1196                        }
1197                    }
1198                }
1199            }
1200        }
1201        assert_eq!(data_as_struct.len(), 3);
1202        assert_eq!(data_as_row.len(), 1);
1203
1204        assert_data(&now, data_as_struct);
1205        assert_data(&now, data_as_row);
1206    }
1207
1208    #[tokio::test(flavor = "multi_thread")]
1209    #[serial]
1210    async fn test_query_job_incomplete_from_storage() {
1211        test_query_job_incomplete(None, QueryOption::default().with_enable_storage_read(true)).await
1212    }
1213
1214    #[tokio::test(flavor = "multi_thread")]
1215    #[serial]
1216    async fn test_query_job_incomplete_from_rest() {
1217        test_query_job_incomplete(Some(4999), QueryOption::default()).await
1218    }
1219
1220    async fn test_query_job_incomplete(max_results: Option<i64>, option: QueryOption) {
1221        let dataset = dataset_name("table");
1222        let (client, project_id) = create_client().await;
1223        let now = OffsetDateTime::now_utc();
1224        let table = format!("test_query_job_incomplete_{}", now.unix_timestamp());
1225        const SIZE: usize = 10000;
1226        insert(&client, &project_id, &dataset, &table, SIZE, &now).await;
1227
1228        let mut data: Vec<query::row::Row> = vec![];
1229        let mut iter = client
1230            .query_with_option::<query::row::Row>(
1231                &project_id,
1232                QueryRequest {
1233                    timeout_ms: Some(5), // pass wait_for_query
1234                    use_query_cache: Some(false),
1235                    max_results,
1236                    query: format!("SELECT 1 FROM {dataset}.{table}"),
1237                    ..Default::default()
1238                },
1239                option,
1240            )
1241            .await
1242            .unwrap();
1243        while let Some(row) = iter.next().await.unwrap() {
1244            data.push(row);
1245        }
1246        assert_eq!(iter.total_size, SIZE as i64);
1247        assert_eq!(data.len(), SIZE);
1248    }
1249
1250    #[derive(Debug, Clone)]
1251    struct Val {
1252        pub val1: String,
1253        pub val2: String,
1254    }
1255
1256    #[tokio::test]
1257    #[serial]
1258    async fn test_query_with_parameter() {
1259        let array_val = [
1260            Val {
1261                val1: "val1-1".to_string(),
1262                val2: "val1-2".to_string(),
1263            },
1264            Val {
1265                val1: "val2-1".to_string(),
1266                val2: "val2-2".to_string(),
1267            },
1268        ];
1269
1270        let query_parameter = QueryParameter {
1271            name: Some("p1".to_string()),
1272            parameter_type: QueryParameterType {
1273                parameter_type: "ARRAY".to_string(),
1274                array_type: Some(Box::new(QueryParameterType {
1275                    parameter_type: "STRUCT".to_string(),
1276                    struct_types: Some(vec![
1277                        QueryParameterStructType {
1278                            name: Some("val1".to_string()),
1279                            field_type: QueryParameterType {
1280                                parameter_type: "STRING".to_string(),
1281                                ..Default::default()
1282                            },
1283                            description: None,
1284                        },
1285                        QueryParameterStructType {
1286                            name: Some("val2".to_string()),
1287                            field_type: QueryParameterType {
1288                                parameter_type: "STRING".to_string(),
1289                                ..Default::default()
1290                            },
1291                            description: None,
1292                        },
1293                    ]),
1294                    array_type: None,
1295                })),
1296                struct_types: None,
1297            },
1298            parameter_value: QueryParameterValue {
1299                array_values: Some(
1300                    array_val
1301                        .iter()
1302                        .map(|val| {
1303                            let mut param_map = HashMap::new();
1304                            param_map.insert(
1305                                "val1".to_string(),
1306                                QueryParameterValue {
1307                                    value: Some(val.val1.clone()),
1308                                    ..Default::default()
1309                                },
1310                            );
1311                            param_map.insert(
1312                                "val2".to_string(),
1313                                QueryParameterValue {
1314                                    value: Some(val.val2.clone()),
1315                                    ..Default::default()
1316                                },
1317                            );
1318                            QueryParameterValue {
1319                                struct_values: Some(param_map),
1320                                value: None,
1321                                array_values: None,
1322                            }
1323                        })
1324                        .collect(),
1325                ),
1326                ..Default::default()
1327            },
1328        };
1329        let (client, project_id) = create_client().await;
1330        let mut result = client
1331            .query::<query::row::Row>(
1332                &project_id,
1333                QueryRequest {
1334                    query: "
1335            WITH VAL AS (SELECT @p1 AS col1)
1336            SELECT
1337                ARRAY(SELECT val1 FROM UNNEST(col1)) AS val1,
1338                ARRAY(SELECT val2 FROM UNNEST(col1)) AS val2
1339            FROM VAL
1340            "
1341                    .to_string(),
1342                    query_parameters: vec![query_parameter],
1343                    ..QueryRequest::default()
1344                },
1345            )
1346            .await
1347            .unwrap();
1348        let row = result.next().await.unwrap().unwrap();
1349        let col = row.column::<Vec<String>>(0).unwrap();
1350        assert_eq!(col[0], "val1-1".to_string());
1351        assert_eq!(col[1], "val2-1".to_string());
1352        let col = row.column::<Vec<String>>(1).unwrap();
1353        assert_eq!(col[0], "val1-2".to_string());
1354        assert_eq!(col[1], "val2-2".to_string());
1355    }
1356
1357    fn assert_data(now: &OffsetDateTime, data: Vec<TestData>) {
1358        for (i, d) in data.iter().enumerate() {
1359            assert_eq!(&TestData::default(i, *now + Duration::from_secs(i as u64)), d);
1360        }
1361    }
1362}
1363
1364#[cfg(test)]
1365mod emulator_tests {
1366    use crate::client::{Client, ClientConfig};
1367    use crate::http::table::{Table, TableFieldSchema, TableFieldType, TableSchema};
1368    use crate::http::tabledata::insert_all::{InsertAllRequest, Row};
1369    use crate::http::tabledata::list::FetchDataRequest;
1370    use futures_util::StreamExt;
1371
1372    use prost::Message;
1373
1374    use std::time::SystemTime;
1375
1376    #[ignore]
1377    #[tokio::test]
1378    async fn test_emulator_use() {
1379        let config = ClientConfig::new_with_emulator("localhost:9060", "http://localhost:9050");
1380
1381        // Create Table
1382        let now = SystemTime::now()
1383            .duration_since(SystemTime::UNIX_EPOCH)
1384            .unwrap()
1385            .as_secs();
1386        let client = Client::new(config).await.unwrap();
1387        let mut table1 = Table::default();
1388        table1.table_reference.dataset_id = "dataset1".to_string();
1389        table1.table_reference.project_id = "local-project".to_string();
1390        table1.table_reference.table_id = format!("table{now}").to_string();
1391        table1.schema = Some(TableSchema {
1392            fields: vec![TableFieldSchema {
1393                name: "col_string".to_string(),
1394                data_type: TableFieldType::String,
1395                ..Default::default()
1396            }],
1397        });
1398        client.table_client.create(&table1).await.unwrap();
1399
1400        // Insert data
1401        let mut req = InsertAllRequest::<serde_json::Value>::default();
1402        req.rows.push(Row {
1403            insert_id: None,
1404            json: serde_json::from_str(
1405                r#"
1406                {"col_string": "test1"}
1407            "#,
1408            )
1409            .unwrap(),
1410        });
1411        client
1412            .tabledata_client
1413            .insert(
1414                &table1.table_reference.project_id,
1415                &table1.table_reference.dataset_id,
1416                &table1.table_reference.table_id,
1417                &req,
1418            )
1419            .await
1420            .unwrap();
1421
1422        // Streaming write
1423        let writer = client.default_storage_writer();
1424        let fqtn = &format!(
1425            "projects/local-project/datasets/dataset1/tables/{}",
1426            table1.table_reference.table_id
1427        );
1428        let stream = writer.create_write_stream(fqtn).await.unwrap();
1429
1430        let mut rows = vec![];
1431        for j in 0..5 {
1432            let data = crate::storage_write::stream::tests::TestData {
1433                col_string: format!("default_{j}"),
1434            };
1435            let mut buf = Vec::new();
1436            data.encode(&mut buf).unwrap();
1437            rows.push(crate::storage_write::stream::tests::create_append_rows_request(vec![
1438                buf.clone(),
1439                buf.clone(),
1440                buf,
1441            ]));
1442        }
1443        let mut result = stream.append_rows(rows).await.unwrap();
1444        while let Some(res) = result.next().await {
1445            let res = res.unwrap();
1446            tracing::info!("append row errors = {:?}", res.row_errors.len());
1447        }
1448
1449        // Read all data
1450        let tref = &table1.table_reference;
1451        let data = client
1452            .tabledata_client
1453            .read(
1454                &tref.project_id,
1455                &tref.dataset_id,
1456                &tref.table_id,
1457                &FetchDataRequest { ..Default::default() },
1458            )
1459            .await
1460            .unwrap();
1461        assert_eq!(16, data.total_rows);
1462
1463        /* TODO fix emulator stream
1464        // Read all data by storage
1465        let opt = ReadTableOption::default()
1466            .with_session_read_options(TableReadOptions::default())
1467            .with_max_stream_count(1);
1468        let mut records= client
1469            .read_table::<crate::storage::row::Row>(&tref, Some(opt))
1470            .await
1471            .unwrap();
1472        let mut count = 0;
1473        while let record = records.next().await.unwrap() {
1474            count += 1;
1475        }
1476        assert_eq!(count, data.total_rows);
1477         */
1478    }
1479}