1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use google_cloud_gax::conn::{ConnectionManager as GRPCConnectionManager, ConnectionOptions, Environment, Error};
use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_write_client::BigQueryWriteClient;

use crate::grpc::apiv1::bigquery_client::{StreamingReadClient, StreamingWriteClient};

pub const AUDIENCE: &str = "https://bigquerystorage.googleapis.com/";
pub const DOMAIN: &str = "bigquerystorage.googleapis.com";
pub const SCOPES: [&str; 3] = [
    "https://www.googleapis.com/auth/bigquery",
    "https://www.googleapis.com/auth/bigquery.insertdata",
    "https://www.googleapis.com/auth/cloud-platform",
];

pub struct ReadConnectionManager {
    inner: GRPCConnectionManager,
}

impl ReadConnectionManager {
    pub async fn new(
        pool_size: usize,
        environment: &Environment,
        domain: &str,
        conn_options: &ConnectionOptions,
    ) -> Result<Self, Error> {
        Ok(ReadConnectionManager {
            inner: GRPCConnectionManager::new(pool_size, domain, AUDIENCE, environment, conn_options).await?,
        })
    }

    pub fn num(&self) -> usize {
        self.inner.num()
    }

    pub fn conn(&self) -> StreamingReadClient {
        let conn = self.inner.conn();
        StreamingReadClient::new(BigQueryReadClient::new(conn))
    }
}

pub struct WriteConnectionManager {
    inner: GRPCConnectionManager,
}

impl WriteConnectionManager {
    pub async fn new(
        pool_size: usize,
        environment: &Environment,
        domain: &str,
        conn_options: &ConnectionOptions,
    ) -> Result<Self, Error> {
        Ok(WriteConnectionManager {
            inner: GRPCConnectionManager::new(pool_size, domain, AUDIENCE, environment, conn_options).await?,
        })
    }

    pub fn num(&self) -> usize {
        self.inner.num()
    }

    pub fn conn(&self) -> StreamingWriteClient {
        let conn = self.inner.conn();
        StreamingWriteClient::new(BigQueryWriteClient::new(conn))
    }
}