Skip to main content

routers_tiles/datasource/connectors/
bigtable.rs

1use async_trait::async_trait;
2use bigtable_rs::bigtable::{BigTableConnection, RowCell};
3use bigtable_rs::google::bigtable::v2::{
4    ReadRowsRequest, RowFilter, RowRange, RowSet, SampleRowKeysRequest,
5};
6use std::env;
7
8use crate::error::TileError;
9use crate::repository::{DEFAULT_APP_PROFILE, Repository};
10use crate::{Query, RepositorySet};
11
12use super::repositories::big_table;
13use super::repositories::big_table::BigTableRepository;
14
15type RowKey = Vec<u8>;
16pub type BigTableOutput = Vec<(RowKey, Vec<RowCell>)>;
17pub type BigTableInput = Query<Vec<RowRange>, Option<RowFilter>>;
18pub type BigTableRepositorySet = RepositorySet<BigTableInput, BigTableOutput>;
19
20#[async_trait]
21impl Repository<BigTableInput, BigTableOutput> for BigTableRepository {
22    async fn new(project_id: &str, instance_name: &str, table_id: &str) -> Result<Self, TileError>
23    where
24        Self: Sized,
25    {
26        let connection = BigTableConnection::new(
27            project_id,
28            instance_name,
29            big_table::READ_ONLY,
30            big_table::CHANNEL_SIZE,
31            big_table::TIMEOUT,
32        )
33        .await
34        .map_err(|e| TileError::DataSourceError(e.to_string()))?;
35
36        let client = connection.client();
37
38        Ok(Self {
39            connection,
40            table_name: client.get_full_table_name(table_id),
41        })
42    }
43
44    async fn ping(&self) -> Result<(), TileError> {
45        let mut client = self.connection.client();
46
47        let req = SampleRowKeysRequest {
48            table_name: self.table_name.clone(),
49            app_profile_id: DEFAULT_APP_PROFILE.to_string(),
50            ..SampleRowKeysRequest::default()
51        };
52
53        client
54            .sample_row_keys(req)
55            .await
56            .map(|_| ())
57            .map_err(|e| TileError::DataSourceError(e.to_string()))
58    }
59
60    async fn query(&self, req: BigTableInput) -> Result<BigTableOutput, TileError> {
61        let mut client = self.connection.client();
62
63        let request = ReadRowsRequest::from(BigTableQuery(req.add_param(self.table_name.clone())));
64
65        client
66            .read_rows(request)
67            .await
68            .map_err(|err| TileError::DataSourceError(err.to_string()))
69    }
70}
71
72pub struct BigTableQuery(pub Query<(Vec<RowRange>, String), Option<RowFilter>>);
73
74impl From<BigTableQuery> for ReadRowsRequest {
75    fn from(value: BigTableQuery) -> ReadRowsRequest {
76        let (row_ranges, table_name) = value.0.parameters;
77
78        ReadRowsRequest {
79            table_name,
80
81            app_profile_id: DEFAULT_APP_PROFILE.to_string(),
82            request_stats_view: 0, // new field, not sure what to do
83            rows_limit: 0,         // boundless rows, implement limit via row filter / row-ranges
84
85            filter: value.0.filter,
86            rows: Some(RowSet {
87                row_keys: vec![],
88                row_ranges,
89            }),
90            reversed: false,
91
92            ..ReadRowsRequest::default()
93        }
94    }
95}
96
97fn get_env(key: &str) -> Result<String, TileError> {
98    env::var(key).map_err(|e| TileError::MissingEnvironment(e.to_string()))
99}
100
101pub async fn init_bq() -> Result<BigTableRepository, TileError> {
102    let project_id = get_env("BIGTABLE_PROJECT")?;
103    let instance_name = get_env("BIGTABLE_INSTANCE")?;
104    let table_id = get_env("BIGTABLE_TABLE")?;
105
106    BigTableRepository::new(&project_id, &instance_name, &table_id).await
107}