hbase_thrift/
lib.rs

1#[allow(clippy::all, dead_code)]
2pub mod hbase;
3
4use easy_ext::ext;
5use hbase::{BatchMutation, HbaseSyncClient, Mutation, THbaseSyncClient};
6use std::collections::BTreeMap;
7use thrift::protocol::{TInputProtocol, TOutputProtocol};
8use thrift_pool::{FromProtocol, ThriftConnection};
9
10pub type Attributes = BTreeMap<Vec<u8>, Vec<u8>>;
11
12impl<IP: TInputProtocol, OP: TOutputProtocol> FromProtocol for HbaseSyncClient<IP, OP> {
13    type InputProtocol = IP;
14    type OutputProtocol = OP;
15
16    fn from_protocol(
17        input_protocol: Self::InputProtocol,
18        output_protocol: Self::OutputProtocol,
19    ) -> Self {
20        Self::new(input_protocol, output_protocol)
21    }
22}
23impl<IP: TInputProtocol, OP: TOutputProtocol> ThriftConnection for HbaseSyncClient<IP, OP> {
24    type Error = thrift::Error;
25    fn is_valid(&mut self) -> std::result::Result<(), Self::Error> {
26        let _ = self.get_table_names()?;
27        Ok(())
28    }
29}
30
31#[ext(THbaseSyncClientExt)]
32pub impl<H: THbaseSyncClient> H {
33    fn table_exists(&mut self, table_name: &str) -> thrift::Result<bool> {
34        let table_name: Vec<u8> = table_name.into();
35        Ok(self.get_table_names()?.into_iter().any(|x| x == table_name))
36    }
37    fn put(
38        &mut self,
39        table_name: &str,
40        row_batches: Vec<BatchMutation>,
41        timestamp: Option<i64>,
42        attributes: Option<Attributes>,
43    ) -> thrift::Result<()> {
44        let attributes = attributes.unwrap_or_default();
45
46        if let Some(timestamp) = timestamp {
47            self.mutate_rows_ts(table_name.into(), row_batches, timestamp, attributes)
48        } else {
49            self.mutate_rows(table_name.into(), row_batches, attributes)
50        }
51    }
52
53    fn remove_table(&mut self, table_name: &str) -> thrift::Result<()> {
54        self.disable_table(table_name.into())?;
55        self.delete_table(table_name.into())?;
56        Ok(())
57    }
58}
59
60#[derive(Debug, Clone)]
61pub struct MutationBuilder {
62    pub is_delete: bool,
63    pub write_to_wal: bool,
64    pub column: Option<(String, String)>,
65    pub value: Option<Vec<u8>>,
66}
67
68impl MutationBuilder {
69    pub fn is_delete(&mut self, is_delete: bool) -> &mut Self {
70        self.is_delete = is_delete;
71        self
72    }
73    pub fn column(
74        &mut self,
75        column_family: impl Into<String>,
76        column_qualifier: impl Into<String>,
77    ) -> &mut Self {
78        self.column = Some((column_family.into(), column_qualifier.into()));
79        self
80    }
81    pub fn value(&mut self, value: impl Into<Vec<u8>>) -> &mut Self {
82        self.value = Some(value.into());
83        self
84    }
85    pub fn write_to_wal(&mut self, write_to_wal: bool) -> &mut Self {
86        self.write_to_wal = write_to_wal;
87        self
88    }
89    pub fn build(&self) -> Mutation {
90        Mutation {
91            column: self
92                .column
93                .as_ref()
94                .map(|(column_family, column_qualifier)| {
95                    format!("{}:{}", column_family, column_qualifier).into()
96                }),
97            value: self.value.clone(),
98            is_delete: Some(self.is_delete),
99            write_to_w_a_l: Some(self.write_to_wal),
100        }
101    }
102}
103impl Default for MutationBuilder {
104    fn default() -> Self {
105        Self {
106            is_delete: false,
107            write_to_wal: true,
108            value: None,
109            column: None,
110        }
111    }
112}