1use v_authorization_lmdb_impl::LmdbAzContext;
2use crate::search::common::{is_identifier, AuthorizationLevel, FTQuery, QueryResult, ResultFormat};
3use crate::storage::async_storage::AuthorizationProvider;
4use crate::v_authorization::common::AuthorizationContext;
5use crate::v_api::common_type::ResultCode;
6use crate::v_api::common_type::OptAuthorize;
7use chrono::prelude::*;
8use chrono::DateTime;
9use chrono_tz::Tz;
10use futures::executor::block_on;
11use serde_json::json;
12use serde_json::Value;
13use std::collections::HashSet;
14use std::time::*;
15use url::Url;
16use v_authorization::common::Access;
17use v_clickhouse_rs::errors::Error;
18use v_clickhouse_rs::types::{Column, SqlType};
19use v_clickhouse_rs::types::{FromSql, Row};
20use v_clickhouse_rs::Pool;
21
22pub struct CHClient {
23 client: Option<Pool>,
24 addr: String,
25 is_ready: bool,
26 az: LmdbAzContext,
27}
28
29impl CHClient {
30 pub fn new(client_addr: String) -> CHClient {
31 CHClient {
32 client: None,
33 addr: client_addr,
34 is_ready: false,
35 az: LmdbAzContext::new(1000),
36 }
37 }
38
39 pub fn connect(&mut self) -> bool {
40 info!("Configuration to connect to Clickhouse: {}", self.addr);
41 match Url::parse(self.addr.as_ref()) {
42 Ok(url) => {
43 let host = url.host_str().unwrap_or("127.0.0.1");
44 let port = url.port().unwrap_or(9000);
45 let user = url.username();
46 let pass = url.password().unwrap_or("123");
47 let url = format!("tcp://{}:{}@{}:{}/", user, pass, host, port);
48 info!("Trying to connect to Clickhouse, host: {}, port: {}, user: {}, password: {}", host, port, user, pass);
49 info!("Connection url: {}", url);
50 let pool = Pool::new(url);
51 self.client = Some(pool);
52 self.is_ready = true;
53 },
54 Err(e) => {
55 error!("Invalid connection url, err={:?}", e);
56 self.is_ready = false;
57 },
58 }
59 self.is_ready
60 }
61
62 pub fn select(&mut self, req: FTQuery, op_auth: OptAuthorize) -> QueryResult {
63 if !self.is_ready {
64 self.connect();
65 }
66
67 let start = Instant::now();
68 let mut res = QueryResult::default();
69
70 if let Some(c) = &self.client {
71 if let Err(e) = block_on(select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az)) {
72 error!("fail read from clickhouse: {:?}", e);
73 res.result_code = ResultCode::InternalServerError
74 }
75 }
76
77 res.total_time = start.elapsed().as_millis() as i64;
78 res.query_time = res.total_time - res.authorize_time;
79 debug!("result={:?}", res);
80
81 res
82 }
83
84 pub async fn select_async(&mut self, req: FTQuery, op_auth: OptAuthorize) -> Result<QueryResult, Error> {
85 let start = Instant::now();
86 let mut res = QueryResult::default();
87
88 if let Some(c) = &self.client {
89 select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az).await?;
90 }
91 res.total_time = start.elapsed().as_millis() as i64;
92 res.query_time = res.total_time - res.authorize_time;
93 debug!("result={:?}", res);
94
95 Ok(res)
96 }
97
98 pub async fn query_select_async(
99 &mut self,
100 user_uri: &str,
101 query: &str,
102 res_format: ResultFormat,
103 authorization_level: AuthorizationLevel,
104 az: &AuthorizationProvider,
105 ) -> Result<Value, Error> {
106 let mut jres = Value::default();
107 if let Some(pool) = &self.client {
108 let mut client = pool.get_handle().await?;
109 let block = client.query(query).fetch_all().await?;
110
111 let mut excluded_rows = HashSet::new();
114
115 if res_format == ResultFormat::Cols {
116 for col in block.columns() {
117 let mut jrow = Value::Array(vec![]);
118 let mut row_count = 0;
119 for row in block.rows() {
120 if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
121 if authorization_level == AuthorizationLevel::RowColumn {
122 excluded_rows.insert(row_count);
123 }
124 }
125 row_count += 1;
126 }
127 jres[col.name().to_owned()] = jrow;
128 }
129 } else {
130 let mut v_cols = vec![];
131 for col in block.columns() {
132 v_cols.push(Value::String(col.name().to_owned()));
133 }
134 jres["cols"] = Value::Array(v_cols);
135 let mut jrows = vec![];
136 for row in block.rows() {
137 let mut skip_row = false;
138 let mut jrow = if res_format == ResultFormat::Full {
139 Value::from(serde_json::Map::new())
140 } else {
141 Value::Array(vec![])
142 };
143 for col in block.columns() {
144 if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
146 skip_row = true;
147 break;
148 }
149 }
150 if !skip_row {
151 jrows.push(jrow);
152 }
153 }
154
155 jres["rows"] = Value::Array(jrows);
156 }
157
158 if res_format == ResultFormat::Cols && authorization_level == AuthorizationLevel::RowColumn {
159 for (_col_name, col_values) in jres.as_object_mut().unwrap().iter_mut() {
160 if let Value::Array(ref mut rows) = col_values {
161 let mut i = 0;
162 rows.retain(|_| {
163 let retain = !excluded_rows.contains(&i);
164 i += 1;
165 retain
166 });
167 }
168 }
169 }
170 }
171
172 Ok(jres)
174 }
175}
176
177async fn cltjs<'a, K: v_clickhouse_rs::types::ColumnType, T: FromSql<'a> + serde::Serialize>(
178 row: &'a Row<'_, K>,
179 col: &'a Column<K>,
180 jrow: &mut Value,
181 user_uri: &str,
182 res_format: &ResultFormat,
183 authorization_level: &AuthorizationLevel,
184 az: &AuthorizationProvider,
185) -> Result<bool, Error> {
186 let v: T = row.get(col.name())?;
187 let jv = json!(v);
188
189 async fn check_authorization(
190 jv: &Value,
191 jrow: &mut Value,
192 col_name: &str,
193 user_uri: &str,
194 res_format: &ResultFormat,
195 authorization_level: &AuthorizationLevel,
196 az: &AuthorizationProvider,
197 ) -> Result<bool, Error> {
198 match jv {
199 Value::String(vc) => {
200 let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
201 if authorized {
202 insert_value(jrow, col_name, jv.clone());
203 } else {
204 match authorization_level {
205 AuthorizationLevel::Cell => insert_value(jrow, col_name, json!("d:NotAuthorized")),
206 _ => {
207 if res_format == &ResultFormat::Cols {
208 insert_value(jrow, col_name, json!("d:NotAuthorized"))
209 }
210 return Ok(false);
211 },
212 }
213 }
214 Ok(true)
215 },
216 Value::Array(array) => {
217 let mut new_array = Vec::new();
218 for item in array {
219 match item {
220 Value::String(vc) => {
221 let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
222 if authorized {
223 new_array.push(json!(vc));
224 } else {
225 match authorization_level {
226 AuthorizationLevel::Cell => new_array.push(json!("v-s:NotAuthorized")),
227 _ => {
228 if res_format == &ResultFormat::Cols {
229 new_array.push(json!("v-s:NotAuthorized"))
230 }
231 return Ok(false);
232 },
233 }
234 }
235 },
236 _ => new_array.push(item.clone()), }
238 }
239 insert_value(jrow, col_name, Value::Array(new_array));
240 Ok(true)
241 },
242 _ => {
243 insert_value(jrow, col_name, jv.clone());
244 Ok(true)
245 },
246 }
247 }
248
249 async fn process_authorization(vc: &str, user_uri: &str, authorization_level: &AuthorizationLevel, az: &AuthorizationProvider) -> Result<bool, Error> {
250 if (authorization_level == &AuthorizationLevel::Cell || authorization_level == &AuthorizationLevel::RowColumn) && is_identifier(vc) {
251 let authorized = az.authorize(vc, user_uri, Access::CanRead as u8, false).await?;
252 Ok(authorized == Access::CanRead as u8)
253 } else {
254 Ok(true)
256 }
257 }
258
259 fn insert_value(jrow: &mut Value, col_name: &str, value: Value) {
260 if let Some(o) = jrow.as_object_mut() {
261 o.insert(col_name.to_owned(), value);
262 } else if let Some(o) = jrow.as_array_mut() {
263 o.push(value);
264 }
265 }
266
267 check_authorization(&jv, jrow, col.name(), user_uri, res_format, authorization_level, az).await
268}
269
270async fn col_to_json<K: v_clickhouse_rs::types::ColumnType>(
271 row: &Row<'_, K>,
272 col: &Column<K>,
273 jrow: &mut Value,
274 user_uri: &str,
275 res_format: &ResultFormat,
276 authorization_level: &AuthorizationLevel,
277 az: &AuthorizationProvider,
278) -> Result<bool, v_clickhouse_rs::errors::Error> {
279 let mut res = true;
280 let sql_type = col.sql_type();
281 match sql_type {
282 SqlType::UInt8 => {
283 res = cltjs::<K, u8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
284 },
285 SqlType::UInt16 => {
286 res = cltjs::<K, u16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
287 },
288 SqlType::UInt32 => {
289 res = cltjs::<K, u32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
290 },
291 SqlType::UInt64 => {
292 res = cltjs::<K, u64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
293 },
294 SqlType::Int8 => {
295 res = cltjs::<K, i8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
296 },
297 SqlType::Int16 => {
298 res = cltjs::<K, i16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
299 },
300 SqlType::Int32 => {
301 res = cltjs::<K, i32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
302 },
303 SqlType::Int64 => {
304 res = cltjs::<K, i64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
305 },
306 SqlType::String => {
307 res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
308 },
309 SqlType::FixedString(_) => {
310 res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
311 },
312 SqlType::Float32 => {
313 res = cltjs::<K, f32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
314 },
315 SqlType::Float64 => {
316 res = cltjs::<K, f64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
317 },
318 SqlType::Date => {
319 let v: DateTime<Tz> = row.get(col.name())?;
320 if let Some(o) = jrow.as_object_mut() {
321 o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
322 } else if let Some(o) = jrow.as_array_mut() {
323 o.push(json!(v.date_naive().to_string()));
324 }
325 },
326 SqlType::DateTime(_) => {
327 let v: DateTime<Tz> = row.get(col.name())?;
328 if let Some(o) = jrow.as_object_mut() {
329 o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
330 } else if let Some(o) = jrow.as_array_mut() {
331 o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
332 }
333 },
334 SqlType::Decimal(_, _) => {
335 let v: f64 = row.get(col.name())?;
336 if let Some(o) = jrow.as_object_mut() {
337 o.insert(col.name().to_owned(), json!(v));
338 } else if let Some(o) = jrow.as_array_mut() {
339 o.push(json!(v));
340 }
341 },
342 SqlType::Array(ref stype) => match stype {
343 SqlType::UInt8 => {
344 res = cltjs::<K, Vec<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
345 },
346 SqlType::UInt16 => {
347 res = cltjs::<K, Vec<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
348 },
349 SqlType::UInt32 => {
350 res = cltjs::<K, Vec<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
351 },
352 SqlType::UInt64 => {
353 res = cltjs::<K, Vec<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
354 },
355 SqlType::Int8 => {
356 res = cltjs::<K, Vec<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
357 },
358 SqlType::Int16 => {
359 res = cltjs::<K, Vec<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
360 },
361 SqlType::Int32 => {
362 res = cltjs::<K, Vec<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
363 },
364 SqlType::Int64 => {
365 res = cltjs::<K, Vec<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
366 },
367 SqlType::String => {
368 res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
369 },
370 SqlType::FixedString(_) => {
371 res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
372 },
373 SqlType::Float32 => {
374 res = cltjs::<K, Vec<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
375 },
376 SqlType::Float64 => {
377 res = cltjs::<K, Vec<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
378 },
379 SqlType::Date => {
380 let v: Vec<DateTime<Tz>> = row.get(col.name())?;
381 let mut a = vec![];
382 for ev in v {
383 a.push(json!(ev.date_naive().to_string()));
384 }
385 if let Some(o) = jrow.as_object_mut() {
386 o.insert(col.name().to_owned(), json!(a));
387 } else if let Some(o) = jrow.as_array_mut() {
388 o.push(json!(a));
389 }
390 },
391 SqlType::DateTime(_) => {
392 let v: Vec<DateTime<Tz>> = row.get(col.name())?;
393 let mut a = vec![];
394 for ev in v {
395 a.push(json!(ev.to_rfc3339_opts(SecondsFormat::Millis, false)));
396 }
397 if let Some(o) = jrow.as_object_mut() {
398 o.insert(col.name().to_owned(), json!(a));
399 } else if let Some(o) = jrow.as_array_mut() {
400 o.push(json!(a));
401 }
402 },
403 SqlType::Decimal(_, _) => {
404 let v: Vec<f64> = row.get(col.name())?;
405 let mut a = vec![];
406 for ev in v {
407 a.push(json!(ev));
408 }
409 if let Some(o) = jrow.as_object_mut() {
410 o.insert(col.name().to_owned(), json!(a));
411 } else if let Some(o) = jrow.as_array_mut() {
412 o.push(json!(a));
413 }
414 },
415 _ => {
416 println!("unknown array type {:?}", stype);
417 },
418 },
419 SqlType::Nullable(ref inner_type) => match inner_type {
420 SqlType::UInt8 => {
421 res = cltjs::<K, Option<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
422 },
423 SqlType::UInt16 => {
424 res = cltjs::<K, Option<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
425 },
426 SqlType::UInt32 => {
427 res = cltjs::<K, Option<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
428 },
429 SqlType::UInt64 => {
430 res = cltjs::<K, Option<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
431 },
432 SqlType::Int8 => {
433 res = cltjs::<K, Option<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
434 },
435 SqlType::Int16 => {
436 res = cltjs::<K, Option<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
437 },
438 SqlType::Int32 => {
439 res = cltjs::<K, Option<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
440 },
441 SqlType::Int64 => {
442 res = cltjs::<K, Option<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
443 },
444 SqlType::Float32 => {
445 res = cltjs::<K, Option<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
446 },
447 SqlType::Float64 => {
448 res = cltjs::<K, Option<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
449 },
450 SqlType::String => {
451 res = cltjs::<K, Option<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
452 },
453 SqlType::Date => {
454 let v: DateTime<Tz> = row.get(col.name())?;
455 if let Some(o) = jrow.as_object_mut() {
456 o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
457 } else if let Some(o) = jrow.as_array_mut() {
458 o.push(json!(v.date_naive().to_string()));
459 }
460 },
461 SqlType::DateTime(_) => {
462 let v: DateTime<Tz> = row.get(col.name())?;
463 if let Some(o) = jrow.as_object_mut() {
464 o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
465 } else if let Some(o) = jrow.as_array_mut() {
466 o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
467 }
468 },
469 _ => {
470 println!("unknown nullable type {:?}", inner_type);
471 },
472 },
473 _ => {
474 println!("unknown type {:?}", col.sql_type());
475 },
476 }
477 Ok(res)
478}
479
480async fn select_from_clickhouse(req: FTQuery, pool: &Pool, op_auth: OptAuthorize, out_res: &mut QueryResult, az: &mut LmdbAzContext) -> Result<(), Error> {
481 let mut authorized_count = 0;
482 let mut total_count = 0;
483
484 if req
485 .query
486 .to_uppercase()
487 .split([':', '-', ' ', '(', ')', '<', '<', '=', ','].as_ref())
488 .any(|x| x.trim() == "INSERT" || x.trim() == "UPDATE" || x.trim() == "DROP" || x.trim() == "DELETE" || x.trim() == "ALTER" || x.trim() == "EXEC")
489 {
490 out_res.result_code = ResultCode::BadRequest;
491 return Ok(());
492 }
493
494 let fq = if req.limit > 0 {
495 format!("{} LIMIT {} OFFSET {}", req.query, req.limit, req.from)
496 } else {
497 format!("{} OFFSET {}", req.query, req.from)
498 };
499
500 debug!("query={}", fq);
501
502 let mut client = pool.get_handle().await?;
503 let block = client.query(fq).fetch_all().await?;
504 for row in block.rows() {
505 total_count += 1;
506
507 let id: String = row.get(row.name(0)?)?;
508
509 if op_auth == OptAuthorize::YES {
510 let start = Instant::now();
511
512 match az.authorize(&id, &req.user, Access::CanRead as u8, false) {
513 Ok(res) => {
514 if res == Access::CanRead as u8 {
515 out_res.result.push(id);
516 authorized_count += 1;
517
518 if authorized_count >= req.top {
519 break;
520 }
521 }
522 },
523 Err(e) => error!("fail authorization {}, err={}", req.user, e),
524 }
525 out_res.authorize_time += start.elapsed().as_micros() as i64;
526 } else {
527 out_res.result.push(id);
528 }
529
530 if req.limit > 0 && total_count >= req.limit {
531 break;
532 }
533 }
534
535 out_res.result_code = ResultCode::Ok;
536 out_res.estimated = (req.from + block.row_count() as i32) as i64;
537 out_res.count = authorized_count as i64;
538 out_res.processed = total_count as i64;
539 out_res.cursor = (req.from + total_count) as i64;
540 out_res.authorize_time /= 1000;
541
542 Ok(())
543}