1#[allow(clippy::all, dead_code)]
2pub mod hbase;
3
4use easy_ext::ext;
5use hbase::{BatchMutation, HbaseSyncClient, Mutation, THbaseSyncClient};
6use std::collections::BTreeMap;
7use thrift::protocol::{TInputProtocol, TOutputProtocol};
8use thrift_pool::{FromProtocol, ThriftConnection};
9
10pub type Attributes = BTreeMap<Vec<u8>, Vec<u8>>;
11
12impl<IP: TInputProtocol, OP: TOutputProtocol> FromProtocol for HbaseSyncClient<IP, OP> {
13 type InputProtocol = IP;
14 type OutputProtocol = OP;
15
16 fn from_protocol(
17 input_protocol: Self::InputProtocol,
18 output_protocol: Self::OutputProtocol,
19 ) -> Self {
20 Self::new(input_protocol, output_protocol)
21 }
22}
23impl<IP: TInputProtocol, OP: TOutputProtocol> ThriftConnection for HbaseSyncClient<IP, OP> {
24 type Error = thrift::Error;
25 fn is_valid(&mut self) -> std::result::Result<(), Self::Error> {
26 let _ = self.get_table_names()?;
27 Ok(())
28 }
29}
30
31#[ext(THbaseSyncClientExt)]
32pub impl<H: THbaseSyncClient> H {
33 fn table_exists(&mut self, table_name: &str) -> thrift::Result<bool> {
34 let table_name: Vec<u8> = table_name.into();
35 Ok(self.get_table_names()?.into_iter().any(|x| x == table_name))
36 }
37 fn put(
38 &mut self,
39 table_name: &str,
40 row_batches: Vec<BatchMutation>,
41 timestamp: Option<i64>,
42 attributes: Option<Attributes>,
43 ) -> thrift::Result<()> {
44 let attributes = attributes.unwrap_or_default();
45
46 if let Some(timestamp) = timestamp {
47 self.mutate_rows_ts(table_name.into(), row_batches, timestamp, attributes)
48 } else {
49 self.mutate_rows(table_name.into(), row_batches, attributes)
50 }
51 }
52
53 fn remove_table(&mut self, table_name: &str) -> thrift::Result<()> {
54 self.disable_table(table_name.into())?;
55 self.delete_table(table_name.into())?;
56 Ok(())
57 }
58}
59
60#[derive(Debug, Clone)]
61pub struct MutationBuilder {
62 pub is_delete: bool,
63 pub write_to_wal: bool,
64 pub column: Option<(String, String)>,
65 pub value: Option<Vec<u8>>,
66}
67
68impl MutationBuilder {
69 pub fn is_delete(&mut self, is_delete: bool) -> &mut Self {
70 self.is_delete = is_delete;
71 self
72 }
73 pub fn column(
74 &mut self,
75 column_family: impl Into<String>,
76 column_qualifier: impl Into<String>,
77 ) -> &mut Self {
78 self.column = Some((column_family.into(), column_qualifier.into()));
79 self
80 }
81 pub fn value(&mut self, value: impl Into<Vec<u8>>) -> &mut Self {
82 self.value = Some(value.into());
83 self
84 }
85 pub fn write_to_wal(&mut self, write_to_wal: bool) -> &mut Self {
86 self.write_to_wal = write_to_wal;
87 self
88 }
89 pub fn build(&self) -> Mutation {
90 Mutation {
91 column: self
92 .column
93 .as_ref()
94 .map(|(column_family, column_qualifier)| {
95 format!("{}:{}", column_family, column_qualifier).into()
96 }),
97 value: self.value.clone(),
98 is_delete: Some(self.is_delete),
99 write_to_w_a_l: Some(self.write_to_wal),
100 }
101 }
102}
103impl Default for MutationBuilder {
104 fn default() -> Self {
105 Self {
106 is_delete: false,
107 write_to_wal: true,
108 value: None,
109 column: None,
110 }
111 }
112}