1use v_authorization_impl_tt2_lmdb::AzContext;
2use crate::search::common::{is_identifier, AuthorizationLevel, FTQuery, QueryResult, ResultFormat};
3use crate::v_authorization::common::AuthorizationContext;
4use crate::v_api::common_type::ResultCode;
5use crate::v_api::common_type::OptAuthorize;
6use chrono::prelude::*;
7use chrono::DateTime;
8use chrono_tz::Tz;
9use futures::executor::block_on;
10use futures::lock::Mutex;
11use serde_json::json;
12use serde_json::Value;
13use std::collections::HashSet;
14use std::time::*;
15use url::Url;
16use v_authorization::common::Access;
17use v_clickhouse_rs::errors::Error;
18use v_clickhouse_rs::types::{Column, SqlType};
19use v_clickhouse_rs::types::{FromSql, Row};
20use v_clickhouse_rs::Pool;
21
22pub struct CHClient {
23 client: Option<Pool>,
24 addr: String,
25 is_ready: bool,
26 az: AzContext,
27}
28
29impl CHClient {
30 pub fn new(client_addr: String) -> CHClient {
31 CHClient {
32 client: None,
33 addr: client_addr,
34 is_ready: false,
35 az: AzContext::new_lmdb(1000),
36 }
37 }
38
39 pub fn connect(&mut self) -> bool {
40 info!("Configuration to connect to Clickhouse: {}", self.addr);
41 match Url::parse(self.addr.as_ref()) {
42 Ok(url) => {
43 let host = url.host_str().unwrap_or("127.0.0.1");
44 let port = url.port().unwrap_or(9000);
45 let user = url.username();
46 let pass = url.password().unwrap_or("123");
47 let url = format!("tcp://{}:{}@{}:{}/", user, pass, host, port);
48 info!("Trying to connect to Clickhouse, host: {}, port: {}, user: {}, password: {}", host, port, user, pass);
49 info!("Connection url: {}", url);
50 let pool = Pool::new(url);
51 self.client = Some(pool);
52 self.is_ready = true;
53 },
54 Err(e) => {
55 error!("Invalid connection url, err={:?}", e);
56 self.is_ready = false;
57 },
58 }
59 self.is_ready
60 }
61
62 pub fn select(&mut self, req: FTQuery, op_auth: OptAuthorize) -> QueryResult {
63 if !self.is_ready {
64 self.connect();
65 }
66
67 let start = Instant::now();
68 let mut res = QueryResult::default();
69
70 if let Some(c) = &self.client {
71 if let Err(e) = block_on(select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az)) {
72 error!("fail read from clickhouse: {:?}", e);
73 res.result_code = ResultCode::InternalServerError
74 }
75 }
76
77 res.total_time = start.elapsed().as_millis() as i64;
78 res.query_time = res.total_time - res.authorize_time;
79 debug!("result={:?}", res);
80
81 res
82 }
83
84 pub async fn select_async(&mut self, req: FTQuery, op_auth: OptAuthorize) -> Result<QueryResult, Error> {
85 let start = Instant::now();
86 let mut res = QueryResult::default();
87
88 if let Some(c) = &self.client {
89 select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az).await?;
90 }
91 res.total_time = start.elapsed().as_millis() as i64;
92 res.query_time = res.total_time - res.authorize_time;
93 debug!("result={:?}", res);
94
95 Ok(res)
96 }
97
98 pub async fn query_select_async(
99 &mut self,
100 user_uri: &str,
101 query: &str,
102 res_format: ResultFormat,
103 authorization_level: AuthorizationLevel,
104 az: &Mutex<AzContext>,
105 ) -> Result<Value, Error> {
106 let mut jres = Value::default();
107 if let Some(pool) = &self.client {
108 let mut client = pool.get_handle().await?;
109 let block = client.query(query).fetch_all().await?;
110
111 let mut excluded_rows = HashSet::new();
114
115 if res_format == ResultFormat::Cols {
116 for col in block.columns() {
117 let mut jrow = Value::Array(vec![]);
118 let mut row_count = 0;
119 for row in block.rows() {
120 if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
121 if authorization_level == AuthorizationLevel::RowColumn {
122 excluded_rows.insert(row_count);
123 }
124 }
125 row_count += 1;
126 }
127 jres[col.name().to_owned()] = jrow;
128 }
129 } else {
130 let mut v_cols = vec![];
131 for col in block.columns() {
132 v_cols.push(Value::String(col.name().to_owned()));
133 }
134 jres["cols"] = Value::Array(v_cols);
135 let mut jrows = vec![];
136 for row in block.rows() {
137 let mut skip_row = false;
138 let mut jrow = if res_format == ResultFormat::Full {
139 Value::from(serde_json::Map::new())
140 } else {
141 Value::Array(vec![])
142 };
143 for col in block.columns() {
144 if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
146 skip_row = true;
147 break;
148 }
149 }
150 if !skip_row {
151 jrows.push(jrow);
152 }
153 }
154
155 jres["rows"] = Value::Array(jrows);
156 }
157
158 if res_format == ResultFormat::Cols && authorization_level == AuthorizationLevel::RowColumn {
159 for (_col_name, col_values) in jres.as_object_mut().unwrap().iter_mut() {
160 if let Value::Array(ref mut rows) = col_values {
161 let mut i = 0;
162 rows.retain(|_| {
163 let retain = !excluded_rows.contains(&i);
164 i += 1;
165 retain
166 });
167 }
168 }
169 }
170 }
171
172 Ok(jres)
174 }
175}
176
177async fn cltjs<'a, K: v_clickhouse_rs::types::ColumnType, T: FromSql<'a> + serde::Serialize>(
178 row: &'a Row<'_, K>,
179 col: &'a Column<K>,
180 jrow: &mut Value,
181 user_uri: &str,
182 res_format: &ResultFormat,
183 authorization_level: &AuthorizationLevel,
184 az: &Mutex<AzContext>,
185) -> Result<bool, Error> {
186 let v: T = row.get(col.name())?;
187 let jv = json!(v);
188
189 async fn check_authorization(
190 jv: &Value,
191 jrow: &mut Value,
192 col_name: &str,
193 user_uri: &str,
194 res_format: &ResultFormat,
195 authorization_level: &AuthorizationLevel,
196 az: &Mutex<AzContext>,
197 ) -> Result<bool, Error> {
198 match jv {
199 Value::String(vc) => {
200 let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
201 if authorized {
202 insert_value(jrow, col_name, jv.clone());
203 } else {
204 match authorization_level {
205 AuthorizationLevel::Cell => insert_value(jrow, col_name, json!("d:NotAuthorized")),
206 _ => {
207 if res_format == &ResultFormat::Cols {
208 insert_value(jrow, col_name, json!("d:NotAuthorized"))
209 }
210 return Ok(false);
211 },
212 }
213 }
214 Ok(true)
215 },
216 Value::Array(array) => {
217 let mut new_array = Vec::new();
218 for item in array {
219 match item {
220 Value::String(vc) => {
221 let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
222 if authorized {
223 new_array.push(json!(vc));
224 } else {
225 match authorization_level {
226 AuthorizationLevel::Cell => new_array.push(json!("v-s:NotAuthorized")),
227 _ => {
228 if res_format == &ResultFormat::Cols {
229 new_array.push(json!("v-s:NotAuthorized"))
230 }
231 return Ok(false);
232 },
233 }
234 }
235 },
236 _ => new_array.push(item.clone()), }
238 }
239 insert_value(jrow, col_name, Value::Array(new_array));
240 Ok(true)
241 },
242 _ => {
243 insert_value(jrow, col_name, jv.clone());
244 Ok(true)
245 },
246 }
247 }
248
249 async fn process_authorization(vc: &str, user_uri: &str, authorization_level: &AuthorizationLevel, az: &Mutex<AzContext>) -> Result<bool, Error> {
250 if (authorization_level == &AuthorizationLevel::Cell || authorization_level == &AuthorizationLevel::RowColumn) && is_identifier(vc) {
251 let mut az_lock = az.lock().await;
252 let authorized = az_lock.authorize(vc, user_uri, Access::CanRead as u8, false)?;
253 Ok(authorized == Access::CanRead as u8)
254 } else {
255 Ok(true)
257 }
258 }
259
260 fn insert_value(jrow: &mut Value, col_name: &str, value: Value) {
261 if let Some(o) = jrow.as_object_mut() {
262 o.insert(col_name.to_owned(), value);
263 } else if let Some(o) = jrow.as_array_mut() {
264 o.push(value);
265 }
266 }
267
268 check_authorization(&jv, jrow, col.name(), user_uri, res_format, authorization_level, az).await
269}
270
271async fn col_to_json<K: v_clickhouse_rs::types::ColumnType>(
272 row: &Row<'_, K>,
273 col: &Column<K>,
274 jrow: &mut Value,
275 user_uri: &str,
276 res_format: &ResultFormat,
277 authorization_level: &AuthorizationLevel,
278 az: &Mutex<AzContext>,
279) -> Result<bool, v_clickhouse_rs::errors::Error> {
280 let mut res = true;
281 let sql_type = col.sql_type();
282 match sql_type {
283 SqlType::UInt8 => {
284 res = cltjs::<K, u8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
285 },
286 SqlType::UInt16 => {
287 res = cltjs::<K, u16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
288 },
289 SqlType::UInt32 => {
290 res = cltjs::<K, u32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
291 },
292 SqlType::UInt64 => {
293 res = cltjs::<K, u64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
294 },
295 SqlType::Int8 => {
296 res = cltjs::<K, i8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
297 },
298 SqlType::Int16 => {
299 res = cltjs::<K, i16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
300 },
301 SqlType::Int32 => {
302 res = cltjs::<K, i32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
303 },
304 SqlType::Int64 => {
305 res = cltjs::<K, i64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
306 },
307 SqlType::String => {
308 res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
309 },
310 SqlType::FixedString(_) => {
311 res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
312 },
313 SqlType::Float32 => {
314 res = cltjs::<K, f32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
315 },
316 SqlType::Float64 => {
317 res = cltjs::<K, f64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
318 },
319 SqlType::Date => {
320 let v: DateTime<Tz> = row.get(col.name())?;
321 if let Some(o) = jrow.as_object_mut() {
322 o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
323 } else if let Some(o) = jrow.as_array_mut() {
324 o.push(json!(v.date_naive().to_string()));
325 }
326 },
327 SqlType::DateTime(_) => {
328 let v: DateTime<Tz> = row.get(col.name())?;
329 if let Some(o) = jrow.as_object_mut() {
330 o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
331 } else if let Some(o) = jrow.as_array_mut() {
332 o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
333 }
334 },
335 SqlType::Decimal(_, _) => {
336 let v: f64 = row.get(col.name())?;
337 if let Some(o) = jrow.as_object_mut() {
338 o.insert(col.name().to_owned(), json!(v));
339 } else if let Some(o) = jrow.as_array_mut() {
340 o.push(json!(v));
341 }
342 },
343 SqlType::Array(ref stype) => match stype {
344 SqlType::UInt8 => {
345 res = cltjs::<K, Vec<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
346 },
347 SqlType::UInt16 => {
348 res = cltjs::<K, Vec<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
349 },
350 SqlType::UInt32 => {
351 res = cltjs::<K, Vec<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
352 },
353 SqlType::UInt64 => {
354 res = cltjs::<K, Vec<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
355 },
356 SqlType::Int8 => {
357 res = cltjs::<K, Vec<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
358 },
359 SqlType::Int16 => {
360 res = cltjs::<K, Vec<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
361 },
362 SqlType::Int32 => {
363 res = cltjs::<K, Vec<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
364 },
365 SqlType::Int64 => {
366 res = cltjs::<K, Vec<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
367 },
368 SqlType::String => {
369 res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
370 },
371 SqlType::FixedString(_) => {
372 res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
373 },
374 SqlType::Float32 => {
375 res = cltjs::<K, Vec<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
376 },
377 SqlType::Float64 => {
378 res = cltjs::<K, Vec<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
379 },
380 SqlType::Date => {
381 let v: Vec<DateTime<Tz>> = row.get(col.name())?;
382 let mut a = vec![];
383 for ev in v {
384 a.push(json!(ev.date_naive().to_string()));
385 }
386 if let Some(o) = jrow.as_object_mut() {
387 o.insert(col.name().to_owned(), json!(a));
388 } else if let Some(o) = jrow.as_array_mut() {
389 o.push(json!(a));
390 }
391 },
392 SqlType::DateTime(_) => {
393 let v: Vec<DateTime<Tz>> = row.get(col.name())?;
394 let mut a = vec![];
395 for ev in v {
396 a.push(json!(ev.to_rfc3339_opts(SecondsFormat::Millis, false)));
397 }
398 if let Some(o) = jrow.as_object_mut() {
399 o.insert(col.name().to_owned(), json!(a));
400 } else if let Some(o) = jrow.as_array_mut() {
401 o.push(json!(a));
402 }
403 },
404 SqlType::Decimal(_, _) => {
405 let v: Vec<f64> = row.get(col.name())?;
406 let mut a = vec![];
407 for ev in v {
408 a.push(json!(ev));
409 }
410 if let Some(o) = jrow.as_object_mut() {
411 o.insert(col.name().to_owned(), json!(a));
412 } else if let Some(o) = jrow.as_array_mut() {
413 o.push(json!(a));
414 }
415 },
416 _ => {
417 println!("unknown array type {:?}", stype);
418 },
419 },
420 SqlType::Nullable(ref inner_type) => match inner_type {
421 SqlType::UInt8 => {
422 res = cltjs::<K, Option<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
423 },
424 SqlType::UInt16 => {
425 res = cltjs::<K, Option<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
426 },
427 SqlType::UInt32 => {
428 res = cltjs::<K, Option<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
429 },
430 SqlType::UInt64 => {
431 res = cltjs::<K, Option<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
432 },
433 SqlType::Int8 => {
434 res = cltjs::<K, Option<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
435 },
436 SqlType::Int16 => {
437 res = cltjs::<K, Option<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
438 },
439 SqlType::Int32 => {
440 res = cltjs::<K, Option<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
441 },
442 SqlType::Int64 => {
443 res = cltjs::<K, Option<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
444 },
445 SqlType::Float32 => {
446 res = cltjs::<K, Option<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
447 },
448 SqlType::Float64 => {
449 res = cltjs::<K, Option<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
450 },
451 SqlType::String => {
452 res = cltjs::<K, Option<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
453 },
454 SqlType::Date => {
455 let v: DateTime<Tz> = row.get(col.name())?;
456 if let Some(o) = jrow.as_object_mut() {
457 o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
458 } else if let Some(o) = jrow.as_array_mut() {
459 o.push(json!(v.date_naive().to_string()));
460 }
461 },
462 SqlType::DateTime(_) => {
463 let v: DateTime<Tz> = row.get(col.name())?;
464 if let Some(o) = jrow.as_object_mut() {
465 o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
466 } else if let Some(o) = jrow.as_array_mut() {
467 o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
468 }
469 },
470 _ => {
471 println!("unknown nullable type {:?}", inner_type);
472 },
473 },
474 _ => {
475 println!("unknown type {:?}", col.sql_type());
476 },
477 }
478 Ok(res)
479}
480
481async fn select_from_clickhouse(req: FTQuery, pool: &Pool, op_auth: OptAuthorize, out_res: &mut QueryResult, az: &mut AzContext) -> Result<(), Error> {
482 let mut authorized_count = 0;
483 let mut total_count = 0;
484
485 if req
486 .query
487 .to_uppercase()
488 .split([':', '-', ' ', '(', ')', '<', '<', '=', ','].as_ref())
489 .any(|x| x.trim() == "INSERT" || x.trim() == "UPDATE" || x.trim() == "DROP" || x.trim() == "DELETE" || x.trim() == "ALTER" || x.trim() == "EXEC")
490 {
491 out_res.result_code = ResultCode::BadRequest;
492 return Ok(());
493 }
494
495 let fq = if req.limit > 0 {
496 format!("{} LIMIT {} OFFSET {}", req.query, req.limit, req.from)
497 } else {
498 format!("{} OFFSET {}", req.query, req.from)
499 };
500
501 debug!("query={}", fq);
502
503 let mut client = pool.get_handle().await?;
504 let block = client.query(fq).fetch_all().await?;
505 for row in block.rows() {
506 total_count += 1;
507
508 let id: String = row.get(row.name(0)?)?;
509
510 if op_auth == OptAuthorize::YES {
511 let start = Instant::now();
512
513 match az.authorize(&id, &req.user, Access::CanRead as u8, false) {
514 Ok(res) => {
515 if res == Access::CanRead as u8 {
516 out_res.result.push(id);
517 authorized_count += 1;
518
519 if authorized_count >= req.top {
520 break;
521 }
522 }
523 },
524 Err(e) => error!("fail authorization {}, err={}", req.user, e),
525 }
526 out_res.authorize_time += start.elapsed().as_micros() as i64;
527 } else {
528 out_res.result.push(id);
529 }
530
531 if req.limit > 0 && total_count >= req.limit {
532 break;
533 }
534 }
535
536 out_res.result_code = ResultCode::Ok;
537 out_res.estimated = (req.from + block.row_count() as i32) as i64;
538 out_res.count = authorized_count as i64;
539 out_res.processed = total_count as i64;
540 out_res.cursor = (req.from + total_count) as i64;
541 out_res.authorize_time /= 1000;
542
543 Ok(())
544}