routers_tiles/datasource/connectors/
bigtable.rs1use async_trait::async_trait;
2use bigtable_rs::bigtable::{BigTableConnection, RowCell};
3use bigtable_rs::google::bigtable::v2::{
4 ReadRowsRequest, RowFilter, RowRange, RowSet, SampleRowKeysRequest,
5};
6use std::env;
7
8use crate::error::TileError;
9use crate::repository::{DEFAULT_APP_PROFILE, Repository};
10use crate::{Query, RepositorySet};
11
12use super::repositories::big_table;
13use super::repositories::big_table::BigTableRepository;
14
15type RowKey = Vec<u8>;
16pub type BigTableOutput = Vec<(RowKey, Vec<RowCell>)>;
17pub type BigTableInput = Query<Vec<RowRange>, Option<RowFilter>>;
18pub type BigTableRepositorySet = RepositorySet<BigTableInput, BigTableOutput>;
19
20#[async_trait]
21impl Repository<BigTableInput, BigTableOutput> for BigTableRepository {
22 async fn new(project_id: &str, instance_name: &str, table_id: &str) -> Result<Self, TileError>
23 where
24 Self: Sized,
25 {
26 let connection = BigTableConnection::new(
27 project_id,
28 instance_name,
29 big_table::READ_ONLY,
30 big_table::CHANNEL_SIZE,
31 big_table::TIMEOUT,
32 )
33 .await
34 .map_err(|e| TileError::DataSourceError(e.to_string()))?;
35
36 let client = connection.client();
37
38 Ok(Self {
39 connection,
40 table_name: client.get_full_table_name(table_id),
41 })
42 }
43
44 async fn ping(&self) -> Result<(), TileError> {
45 let mut client = self.connection.client();
46
47 let req = SampleRowKeysRequest {
48 table_name: self.table_name.clone(),
49 app_profile_id: DEFAULT_APP_PROFILE.to_string(),
50 ..SampleRowKeysRequest::default()
51 };
52
53 client
54 .sample_row_keys(req)
55 .await
56 .map(|_| ())
57 .map_err(|e| TileError::DataSourceError(e.to_string()))
58 }
59
60 async fn query(&self, req: BigTableInput) -> Result<BigTableOutput, TileError> {
61 let mut client = self.connection.client();
62
63 let request = ReadRowsRequest::from(BigTableQuery(req.add_param(self.table_name.clone())));
64
65 client
66 .read_rows(request)
67 .await
68 .map_err(|err| TileError::DataSourceError(err.to_string()))
69 }
70}
71
72pub struct BigTableQuery(pub Query<(Vec<RowRange>, String), Option<RowFilter>>);
73
74impl From<BigTableQuery> for ReadRowsRequest {
75 fn from(value: BigTableQuery) -> ReadRowsRequest {
76 let (row_ranges, table_name) = value.0.parameters;
77
78 ReadRowsRequest {
79 table_name,
80
81 app_profile_id: DEFAULT_APP_PROFILE.to_string(),
82 request_stats_view: 0, rows_limit: 0, filter: value.0.filter,
86 rows: Some(RowSet {
87 row_keys: vec![],
88 row_ranges,
89 }),
90 reversed: false,
91
92 ..ReadRowsRequest::default()
93 }
94 }
95}
96
97fn get_env(key: &str) -> Result<String, TileError> {
98 env::var(key).map_err(|e| TileError::MissingEnvironment(e.to_string()))
99}
100
101pub async fn init_bq() -> Result<BigTableRepository, TileError> {
102 let project_id = get_env("BIGTABLE_PROJECT")?;
103 let instance_name = get_env("BIGTABLE_INSTANCE")?;
104 let table_id = get_env("BIGTABLE_TABLE")?;
105
106 BigTableRepository::new(&project_id, &instance_name, &table_id).await
107}