1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::common::QueryResult;
use clickhouse_rs::errors::Error;
use clickhouse_rs::Pool;
use futures::executor::block_on;
use std::str;
use std::time::*;
use url::Url;
use v_api::app::{ResultCode, OptAuthorize};
use v_authorization::common::Access;
use v_az_lmdb::_authorize;
pub struct CHClient {
client: Pool,
addr: String,
is_ready: bool,
}
impl CHClient {
pub fn new(client_addr: String) -> CHClient {
CHClient {
client: Pool::new(String::new()),
addr: client_addr,
is_ready: false,
}
}
pub fn connect(&mut self) -> bool {
info!("Configuration to connect to Clickhouse: {}", self.addr);
match Url::parse(self.addr.as_ref()) {
Ok(url) => {
let host = url.host_str().unwrap_or("127.0.0.1");
let port = url.port().unwrap_or(9000);
let user = url.username();
let pass = url.password().unwrap_or("123");
let url = format!("tcp://{}:{}@{}:{}/", user, pass, host, port);
info!("Trying to connect to Clickhouse, host: {}, port: {}, user: {}, password: {}", host, port, user, pass);
info!("Connection url: {}", url);
let pool = Pool::new(url);
self.client = pool;
self.is_ready = true;
true
}
Err(e) => {
error!("Invalid connection url, err={:?}", e);
self.is_ready = false;
false
}
}
}
pub fn select(&mut self, user_uri: &str, query: &str, top: i64, limit: i64, from: i64, op_auth: OptAuthorize,) -> QueryResult {
let start = Instant::now();
let mut res = QueryResult::default();
if let Err(e) = block_on(select_from_clickhouse(&mut self.client, &user_uri, query, top, limit, from, op_auth, &mut res)) {
error!("fail read from clickhouse: {:?}", e);
res.result_code = ResultCode::InternalServerError
}
res.total_time = start.elapsed().as_millis() as i64;
res.query_time = res.total_time - res.authorize_time;
debug!("result={:?}", res);
res
}
}
async fn select_from_clickhouse(pool: &mut Pool, user_uri: &str, query: &str, top: i64, limit: i64, from: i64, op_auth: OptAuthorize, out_res: &mut QueryResult) -> Result<(), Error> {
let mut authorized_count = 0;
let mut total_count = 0;
if query.to_uppercase().split_whitespace().any(|x| x == "INSERT" || x == "UPDATE" || x == "DROP" || x == "DELETE") {
out_res.result_code = ResultCode::BadRequest;
return Ok(());
}
let fq = format!("{} LIMIT {} OFFSET {}", query, limit, from);
let mut client = pool.get_handle().await?;
let block = client.query(fq).fetch_all().await?;
for row in block.rows() {
total_count += 1;
let id: String = row.get(row.name(0)?)?;
if op_auth == OptAuthorize::YES {
let start = Instant::now();
match _authorize(&id, user_uri, Access::CanRead as u8, true, None) {
Ok(res) => {
if res == Access::CanRead as u8 {
out_res.result.push(id);
authorized_count += 1;
if authorized_count >= top {
break;
}
}
}
Err(e) => error!("fail authorization {}, err={}", user_uri, e),
}
out_res.authorize_time += start.elapsed().as_micros() as i64;
} else {
out_res.result.push(id);
}
if total_count >= limit {
break;
}
}
out_res.result_code = ResultCode::Ok;
out_res.estimated = from + (block.row_count() as i64);
out_res.count = authorized_count;
out_res.processed = total_count;
out_res.cursor = from + total_count;
out_res.authorize_time /= 1000;
Ok(())
}