Skip to main content

v_common/search/
clickhouse_client.rs

1use v_authorization_impl_tt2_lmdb::AzContext;
2use crate::search::common::{is_identifier, AuthorizationLevel, FTQuery, QueryResult, ResultFormat};
3use crate::v_authorization::common::AuthorizationContext;
4use crate::v_api::common_type::ResultCode;
5use crate::v_api::common_type::OptAuthorize;
6use chrono::prelude::*;
7use chrono::DateTime;
8use chrono_tz::Tz;
9use futures::executor::block_on;
10use futures::lock::Mutex;
11use serde_json::json;
12use serde_json::Value;
13use std::collections::HashSet;
14use std::time::*;
15use url::Url;
16use v_authorization::common::Access;
17use v_clickhouse_rs::errors::Error;
18use v_clickhouse_rs::types::{Column, SqlType};
19use v_clickhouse_rs::types::{FromSql, Row};
20use v_clickhouse_rs::Pool;
21
22pub struct CHClient {
23    client: Option<Pool>,
24    addr: String,
25    is_ready: bool,
26    az: AzContext,
27}
28
29impl CHClient {
30    pub fn new(client_addr: String) -> CHClient {
31        CHClient {
32            client: None,
33            addr: client_addr,
34            is_ready: false,
35            az: AzContext::new_lmdb(1000),
36        }
37    }
38
39    pub fn connect(&mut self) -> bool {
40        info!("Configuration to connect to Clickhouse: {}", self.addr);
41        match Url::parse(self.addr.as_ref()) {
42            Ok(url) => {
43                let host = url.host_str().unwrap_or("127.0.0.1");
44                let port = url.port().unwrap_or(9000);
45                let user = url.username();
46                let pass = url.password().unwrap_or("123");
47                let url = format!("tcp://{}:{}@{}:{}/", user, pass, host, port);
48                info!("Trying to connect to Clickhouse, host: {}, port: {}, user: {}, password: {}", host, port, user, pass);
49                info!("Connection url: {}", url);
50                let pool = Pool::new(url);
51                self.client = Some(pool);
52                self.is_ready = true;
53            },
54            Err(e) => {
55                error!("Invalid connection url, err={:?}", e);
56                self.is_ready = false;
57            },
58        }
59        self.is_ready
60    }
61
62    pub fn select(&mut self, req: FTQuery, op_auth: OptAuthorize) -> QueryResult {
63        if !self.is_ready {
64            self.connect();
65        }
66
67        let start = Instant::now();
68        let mut res = QueryResult::default();
69
70        if let Some(c) = &self.client {
71            if let Err(e) = block_on(select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az)) {
72                error!("fail read from clickhouse: {:?}", e);
73                res.result_code = ResultCode::InternalServerError
74            }
75        }
76
77        res.total_time = start.elapsed().as_millis() as i64;
78        res.query_time = res.total_time - res.authorize_time;
79        debug!("result={:?}", res);
80
81        res
82    }
83
84    pub async fn select_async(&mut self, req: FTQuery, op_auth: OptAuthorize) -> Result<QueryResult, Error> {
85        let start = Instant::now();
86        let mut res = QueryResult::default();
87
88        if let Some(c) = &self.client {
89            select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az).await?;
90        }
91        res.total_time = start.elapsed().as_millis() as i64;
92        res.query_time = res.total_time - res.authorize_time;
93        debug!("result={:?}", res);
94
95        Ok(res)
96    }
97
98    pub async fn query_select_async(
99        &mut self,
100        user_uri: &str,
101        query: &str,
102        res_format: ResultFormat,
103        authorization_level: AuthorizationLevel,
104        az: &Mutex<AzContext>,
105    ) -> Result<Value, Error> {
106        let mut jres = Value::default();
107        if let Some(pool) = &self.client {
108            let mut client = pool.get_handle().await?;
109            let block = client.query(query).fetch_all().await?;
110
111            //warn!("@ block={:?}", block);
112
113            let mut excluded_rows = HashSet::new();
114
115            if res_format == ResultFormat::Cols {
116                for col in block.columns() {
117                    let mut jrow = Value::Array(vec![]);
118                    let mut row_count = 0;
119                    for row in block.rows() {
120                        if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
121                            if authorization_level == AuthorizationLevel::RowColumn {
122                                excluded_rows.insert(row_count);
123                            }
124                        }
125                        row_count += 1;
126                    }
127                    jres[col.name().to_owned()] = jrow;
128                }
129            } else {
130                let mut v_cols = vec![];
131                for col in block.columns() {
132                    v_cols.push(Value::String(col.name().to_owned()));
133                }
134                jres["cols"] = Value::Array(v_cols);
135                let mut jrows = vec![];
136                for row in block.rows() {
137                    let mut skip_row = false;
138                    let mut jrow = if res_format == ResultFormat::Full {
139                        Value::from(serde_json::Map::new())
140                    } else {
141                        Value::Array(vec![])
142                    };
143                    for col in block.columns() {
144                        //println!("{} {}", col.name(), col.sql_type());
145                        if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
146                            skip_row = true;
147                            break;
148                        }
149                    }
150                    if !skip_row {
151                        jrows.push(jrow);
152                    }
153                }
154
155                jres["rows"] = Value::Array(jrows);
156            }
157
158            if res_format == ResultFormat::Cols && authorization_level == AuthorizationLevel::RowColumn {
159                for (_col_name, col_values) in jres.as_object_mut().unwrap().iter_mut() {
160                    if let Value::Array(ref mut rows) = col_values {
161                        let mut i = 0;
162                        rows.retain(|_| {
163                            let retain = !excluded_rows.contains(&i);
164                            i += 1;
165                            retain
166                        });
167                    }
168                }
169            }
170        }
171
172        //println!("{}", res);
173        Ok(jres)
174    }
175}
176
177async fn cltjs<'a, K: v_clickhouse_rs::types::ColumnType, T: FromSql<'a> + serde::Serialize>(
178    row: &'a Row<'_, K>,
179    col: &'a Column<K>,
180    jrow: &mut Value,
181    user_uri: &str,
182    res_format: &ResultFormat,
183    authorization_level: &AuthorizationLevel,
184    az: &Mutex<AzContext>,
185) -> Result<bool, Error> {
186    let v: T = row.get(col.name())?;
187    let jv = json!(v);
188
189    async fn check_authorization(
190        jv: &Value,
191        jrow: &mut Value,
192        col_name: &str,
193        user_uri: &str,
194        res_format: &ResultFormat,
195        authorization_level: &AuthorizationLevel,
196        az: &Mutex<AzContext>,
197    ) -> Result<bool, Error> {
198        match jv {
199            Value::String(vc) => {
200                let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
201                if authorized {
202                    insert_value(jrow, col_name, jv.clone());
203                } else {
204                    match authorization_level {
205                        AuthorizationLevel::Cell => insert_value(jrow, col_name, json!("d:NotAuthorized")),
206                        _ => {
207                            if res_format == &ResultFormat::Cols {
208                                insert_value(jrow, col_name, json!("d:NotAuthorized"))
209                            }
210                            return Ok(false);
211                        },
212                    }
213                }
214                Ok(true)
215            },
216            Value::Array(array) => {
217                let mut new_array = Vec::new();
218                for item in array {
219                    match item {
220                        Value::String(vc) => {
221                            let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
222                            if authorized {
223                                new_array.push(json!(vc));
224                            } else {
225                                match authorization_level {
226                                    AuthorizationLevel::Cell => new_array.push(json!("v-s:NotAuthorized")),
227                                    _ => {
228                                        if res_format == &ResultFormat::Cols {
229                                            new_array.push(json!("v-s:NotAuthorized"))
230                                        }
231                                        return Ok(false);
232                                    },
233                                }
234                            }
235                        },
236                        _ => new_array.push(item.clone()), // Для не строковых элементов вставка без изменений
237                    }
238                }
239                insert_value(jrow, col_name, Value::Array(new_array));
240                Ok(true)
241            },
242            _ => {
243                insert_value(jrow, col_name, jv.clone());
244                Ok(true)
245            },
246        }
247    }
248
249    async fn process_authorization(vc: &str, user_uri: &str, authorization_level: &AuthorizationLevel, az: &Mutex<AzContext>) -> Result<bool, Error> {
250        if (authorization_level == &AuthorizationLevel::Cell || authorization_level == &AuthorizationLevel::RowColumn) && is_identifier(vc) {
251            let mut az_lock = az.lock().await;
252            let authorized = az_lock.authorize(vc, user_uri, Access::CanRead as u8, false)?;
253            Ok(authorized == Access::CanRead as u8)
254        } else {
255            // Если значение не является идентификатором, считаем, что авторизация не требуется
256            Ok(true)
257        }
258    }
259
260    fn insert_value(jrow: &mut Value, col_name: &str, value: Value) {
261        if let Some(o) = jrow.as_object_mut() {
262            o.insert(col_name.to_owned(), value);
263        } else if let Some(o) = jrow.as_array_mut() {
264            o.push(value);
265        }
266    }
267
268    check_authorization(&jv, jrow, col.name(), user_uri, res_format, authorization_level, az).await
269}
270
271async fn col_to_json<K: v_clickhouse_rs::types::ColumnType>(
272    row: &Row<'_, K>,
273    col: &Column<K>,
274    jrow: &mut Value,
275    user_uri: &str,
276    res_format: &ResultFormat,
277    authorization_level: &AuthorizationLevel,
278    az: &Mutex<AzContext>,
279) -> Result<bool, v_clickhouse_rs::errors::Error> {
280    let mut res = true;
281    let sql_type = col.sql_type();
282    match sql_type {
283        SqlType::UInt8 => {
284            res = cltjs::<K, u8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
285        },
286        SqlType::UInt16 => {
287            res = cltjs::<K, u16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
288        },
289        SqlType::UInt32 => {
290            res = cltjs::<K, u32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
291        },
292        SqlType::UInt64 => {
293            res = cltjs::<K, u64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
294        },
295        SqlType::Int8 => {
296            res = cltjs::<K, i8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
297        },
298        SqlType::Int16 => {
299            res = cltjs::<K, i16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
300        },
301        SqlType::Int32 => {
302            res = cltjs::<K, i32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
303        },
304        SqlType::Int64 => {
305            res = cltjs::<K, i64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
306        },
307        SqlType::String => {
308            res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
309        },
310        SqlType::FixedString(_) => {
311            res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
312        },
313        SqlType::Float32 => {
314            res = cltjs::<K, f32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
315        },
316        SqlType::Float64 => {
317            res = cltjs::<K, f64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
318        },
319        SqlType::Date => {
320            let v: DateTime<Tz> = row.get(col.name())?;
321            if let Some(o) = jrow.as_object_mut() {
322                o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
323            } else if let Some(o) = jrow.as_array_mut() {
324                o.push(json!(v.date_naive().to_string()));
325            }
326        },
327        SqlType::DateTime(_) => {
328            let v: DateTime<Tz> = row.get(col.name())?;
329            if let Some(o) = jrow.as_object_mut() {
330                o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
331            } else if let Some(o) = jrow.as_array_mut() {
332                o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
333            }
334        },
335        SqlType::Decimal(_, _) => {
336            let v: f64 = row.get(col.name())?;
337            if let Some(o) = jrow.as_object_mut() {
338                o.insert(col.name().to_owned(), json!(v));
339            } else if let Some(o) = jrow.as_array_mut() {
340                o.push(json!(v));
341            }
342        },
343        SqlType::Array(ref stype) => match stype {
344            SqlType::UInt8 => {
345                res = cltjs::<K, Vec<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
346            },
347            SqlType::UInt16 => {
348                res = cltjs::<K, Vec<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
349            },
350            SqlType::UInt32 => {
351                res = cltjs::<K, Vec<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
352            },
353            SqlType::UInt64 => {
354                res = cltjs::<K, Vec<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
355            },
356            SqlType::Int8 => {
357                res = cltjs::<K, Vec<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
358            },
359            SqlType::Int16 => {
360                res = cltjs::<K, Vec<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
361            },
362            SqlType::Int32 => {
363                res = cltjs::<K, Vec<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
364            },
365            SqlType::Int64 => {
366                res = cltjs::<K, Vec<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
367            },
368            SqlType::String => {
369                res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
370            },
371            SqlType::FixedString(_) => {
372                res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
373            },
374            SqlType::Float32 => {
375                res = cltjs::<K, Vec<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
376            },
377            SqlType::Float64 => {
378                res = cltjs::<K, Vec<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
379            },
380            SqlType::Date => {
381                let v: Vec<DateTime<Tz>> = row.get(col.name())?;
382                let mut a = vec![];
383                for ev in v {
384                    a.push(json!(ev.date_naive().to_string()));
385                }
386                if let Some(o) = jrow.as_object_mut() {
387                    o.insert(col.name().to_owned(), json!(a));
388                } else if let Some(o) = jrow.as_array_mut() {
389                    o.push(json!(a));
390                }
391            },
392            SqlType::DateTime(_) => {
393                let v: Vec<DateTime<Tz>> = row.get(col.name())?;
394                let mut a = vec![];
395                for ev in v {
396                    a.push(json!(ev.to_rfc3339_opts(SecondsFormat::Millis, false)));
397                }
398                if let Some(o) = jrow.as_object_mut() {
399                    o.insert(col.name().to_owned(), json!(a));
400                } else if let Some(o) = jrow.as_array_mut() {
401                    o.push(json!(a));
402                }
403            },
404            SqlType::Decimal(_, _) => {
405                let v: Vec<f64> = row.get(col.name())?;
406                let mut a = vec![];
407                for ev in v {
408                    a.push(json!(ev));
409                }
410                if let Some(o) = jrow.as_object_mut() {
411                    o.insert(col.name().to_owned(), json!(a));
412                } else if let Some(o) = jrow.as_array_mut() {
413                    o.push(json!(a));
414                }
415            },
416            _ => {
417                println!("unknown array type {:?}", stype);
418            },
419        },
420        SqlType::Nullable(ref inner_type) => match inner_type {
421            SqlType::UInt8 => {
422                res = cltjs::<K, Option<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
423            },
424            SqlType::UInt16 => {
425                res = cltjs::<K, Option<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
426            },
427            SqlType::UInt32 => {
428                res = cltjs::<K, Option<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
429            },
430            SqlType::UInt64 => {
431                res = cltjs::<K, Option<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
432            },
433            SqlType::Int8 => {
434                res = cltjs::<K, Option<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
435            },
436            SqlType::Int16 => {
437                res = cltjs::<K, Option<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
438            },
439            SqlType::Int32 => {
440                res = cltjs::<K, Option<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
441            },
442            SqlType::Int64 => {
443                res = cltjs::<K, Option<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
444            },
445            SqlType::Float32 => {
446                res = cltjs::<K, Option<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
447            },
448            SqlType::Float64 => {
449                res = cltjs::<K, Option<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
450            },
451            SqlType::String => {
452                res = cltjs::<K, Option<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
453            },
454            SqlType::Date => {
455                let v: DateTime<Tz> = row.get(col.name())?;
456                if let Some(o) = jrow.as_object_mut() {
457                    o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
458                } else if let Some(o) = jrow.as_array_mut() {
459                    o.push(json!(v.date_naive().to_string()));
460                }
461            },
462            SqlType::DateTime(_) => {
463                let v: DateTime<Tz> = row.get(col.name())?;
464                if let Some(o) = jrow.as_object_mut() {
465                    o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
466                } else if let Some(o) = jrow.as_array_mut() {
467                    o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
468                }
469            },
470            _ => {
471                println!("unknown nullable type {:?}", inner_type);
472            },
473        },
474        _ => {
475            println!("unknown type {:?}", col.sql_type());
476        },
477    }
478    Ok(res)
479}
480
481async fn select_from_clickhouse(req: FTQuery, pool: &Pool, op_auth: OptAuthorize, out_res: &mut QueryResult, az: &mut AzContext) -> Result<(), Error> {
482    let mut authorized_count = 0;
483    let mut total_count = 0;
484
485    if req
486        .query
487        .to_uppercase()
488        .split([':', '-', ' ', '(', ')', '<', '<', '=', ','].as_ref())
489        .any(|x| x.trim() == "INSERT" || x.trim() == "UPDATE" || x.trim() == "DROP" || x.trim() == "DELETE" || x.trim() == "ALTER" || x.trim() == "EXEC")
490    {
491        out_res.result_code = ResultCode::BadRequest;
492        return Ok(());
493    }
494
495    let fq = if req.limit > 0 {
496        format!("{} LIMIT {} OFFSET {}", req.query, req.limit, req.from)
497    } else {
498        format!("{} OFFSET {}", req.query, req.from)
499    };
500
501    debug!("query={}", fq);
502
503    let mut client = pool.get_handle().await?;
504    let block = client.query(fq).fetch_all().await?;
505    for row in block.rows() {
506        total_count += 1;
507
508        let id: String = row.get(row.name(0)?)?;
509
510        if op_auth == OptAuthorize::YES {
511            let start = Instant::now();
512
513            match az.authorize(&id, &req.user, Access::CanRead as u8, false) {
514                Ok(res) => {
515                    if res == Access::CanRead as u8 {
516                        out_res.result.push(id);
517                        authorized_count += 1;
518
519                        if authorized_count >= req.top {
520                            break;
521                        }
522                    }
523                },
524                Err(e) => error!("fail authorization {}, err={}", req.user, e),
525            }
526            out_res.authorize_time += start.elapsed().as_micros() as i64;
527        } else {
528            out_res.result.push(id);
529        }
530
531        if req.limit > 0 && total_count >= req.limit {
532            break;
533        }
534    }
535
536    out_res.result_code = ResultCode::Ok;
537    out_res.estimated = (req.from + block.row_count() as i32) as i64;
538    out_res.count = authorized_count as i64;
539    out_res.processed = total_count as i64;
540    out_res.cursor = (req.from + total_count) as i64;
541    out_res.authorize_time /= 1000;
542
543    Ok(())
544}