Skip to main content

v_common/search/
clickhouse_client.rs

1use v_authorization_lmdb_impl::LmdbAzContext;
2use crate::search::common::{is_identifier, AuthorizationLevel, FTQuery, QueryResult, ResultFormat};
3use crate::storage::async_storage::AuthorizationProvider;
4use crate::v_authorization::common::AuthorizationContext;
5use crate::v_api::common_type::ResultCode;
6use crate::v_api::common_type::OptAuthorize;
7use chrono::prelude::*;
8use chrono::DateTime;
9use chrono_tz::Tz;
10use futures::executor::block_on;
11use serde_json::json;
12use serde_json::Value;
13use std::collections::HashSet;
14use std::time::*;
15use url::Url;
16use v_authorization::common::Access;
17use v_clickhouse_rs::errors::Error;
18use v_clickhouse_rs::types::{Column, SqlType};
19use v_clickhouse_rs::types::{FromSql, Row};
20use v_clickhouse_rs::Pool;
21
22pub struct CHClient {
23    client: Option<Pool>,
24    addr: String,
25    is_ready: bool,
26    az: LmdbAzContext,
27}
28
29impl CHClient {
30    pub fn new(client_addr: String) -> CHClient {
31        CHClient {
32            client: None,
33            addr: client_addr,
34            is_ready: false,
35            az: LmdbAzContext::new(1000),
36        }
37    }
38
39    pub fn connect(&mut self) -> bool {
40        info!("Configuration to connect to Clickhouse: {}", self.addr);
41        match Url::parse(self.addr.as_ref()) {
42            Ok(url) => {
43                let host = url.host_str().unwrap_or("127.0.0.1");
44                let port = url.port().unwrap_or(9000);
45                let user = url.username();
46                let pass = url.password().unwrap_or("123");
47                let url = format!("tcp://{}:{}@{}:{}/", user, pass, host, port);
48                info!("Trying to connect to Clickhouse, host: {}, port: {}, user: {}, password: {}", host, port, user, pass);
49                info!("Connection url: {}", url);
50                let pool = Pool::new(url);
51                self.client = Some(pool);
52                self.is_ready = true;
53            },
54            Err(e) => {
55                error!("Invalid connection url, err={:?}", e);
56                self.is_ready = false;
57            },
58        }
59        self.is_ready
60    }
61
62    pub fn select(&mut self, req: FTQuery, op_auth: OptAuthorize) -> QueryResult {
63        if !self.is_ready {
64            self.connect();
65        }
66
67        let start = Instant::now();
68        let mut res = QueryResult::default();
69
70        if let Some(c) = &self.client {
71            if let Err(e) = block_on(select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az)) {
72                error!("fail read from clickhouse: {:?}", e);
73                res.result_code = ResultCode::InternalServerError
74            }
75        }
76
77        res.total_time = start.elapsed().as_millis() as i64;
78        res.query_time = res.total_time - res.authorize_time;
79        debug!("result={:?}", res);
80
81        res
82    }
83
84    pub async fn select_async(&mut self, req: FTQuery, op_auth: OptAuthorize) -> Result<QueryResult, Error> {
85        let start = Instant::now();
86        let mut res = QueryResult::default();
87
88        if let Some(c) = &self.client {
89            select_from_clickhouse(req, c, op_auth, &mut res, &mut self.az).await?;
90        }
91        res.total_time = start.elapsed().as_millis() as i64;
92        res.query_time = res.total_time - res.authorize_time;
93        debug!("result={:?}", res);
94
95        Ok(res)
96    }
97
98    pub async fn query_select_async(
99        &mut self,
100        user_uri: &str,
101        query: &str,
102        res_format: ResultFormat,
103        authorization_level: AuthorizationLevel,
104        az: &AuthorizationProvider,
105    ) -> Result<Value, Error> {
106        let mut jres = Value::default();
107        if let Some(pool) = &self.client {
108            let mut client = pool.get_handle().await?;
109            let block = client.query(query).fetch_all().await?;
110
111            //warn!("@ block={:?}", block);
112
113            let mut excluded_rows = HashSet::new();
114
115            if res_format == ResultFormat::Cols {
116                for col in block.columns() {
117                    let mut jrow = Value::Array(vec![]);
118                    let mut row_count = 0;
119                    for row in block.rows() {
120                        if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
121                            if authorization_level == AuthorizationLevel::RowColumn {
122                                excluded_rows.insert(row_count);
123                            }
124                        }
125                        row_count += 1;
126                    }
127                    jres[col.name().to_owned()] = jrow;
128                }
129            } else {
130                let mut v_cols = vec![];
131                for col in block.columns() {
132                    v_cols.push(Value::String(col.name().to_owned()));
133                }
134                jres["cols"] = Value::Array(v_cols);
135                let mut jrows = vec![];
136                for row in block.rows() {
137                    let mut skip_row = false;
138                    let mut jrow = if res_format == ResultFormat::Full {
139                        Value::from(serde_json::Map::new())
140                    } else {
141                        Value::Array(vec![])
142                    };
143                    for col in block.columns() {
144                        //println!("{} {}", col.name(), col.sql_type());
145                        if !col_to_json(&row, col, &mut jrow, user_uri, &res_format, &authorization_level, az).await? {
146                            skip_row = true;
147                            break;
148                        }
149                    }
150                    if !skip_row {
151                        jrows.push(jrow);
152                    }
153                }
154
155                jres["rows"] = Value::Array(jrows);
156            }
157
158            if res_format == ResultFormat::Cols && authorization_level == AuthorizationLevel::RowColumn {
159                for (_col_name, col_values) in jres.as_object_mut().unwrap().iter_mut() {
160                    if let Value::Array(ref mut rows) = col_values {
161                        let mut i = 0;
162                        rows.retain(|_| {
163                            let retain = !excluded_rows.contains(&i);
164                            i += 1;
165                            retain
166                        });
167                    }
168                }
169            }
170        }
171
172        //println!("{}", res);
173        Ok(jres)
174    }
175}
176
177async fn cltjs<'a, K: v_clickhouse_rs::types::ColumnType, T: FromSql<'a> + serde::Serialize>(
178    row: &'a Row<'_, K>,
179    col: &'a Column<K>,
180    jrow: &mut Value,
181    user_uri: &str,
182    res_format: &ResultFormat,
183    authorization_level: &AuthorizationLevel,
184    az: &AuthorizationProvider,
185) -> Result<bool, Error> {
186    let v: T = row.get(col.name())?;
187    let jv = json!(v);
188
189    async fn check_authorization(
190        jv: &Value,
191        jrow: &mut Value,
192        col_name: &str,
193        user_uri: &str,
194        res_format: &ResultFormat,
195        authorization_level: &AuthorizationLevel,
196        az: &AuthorizationProvider,
197    ) -> Result<bool, Error> {
198        match jv {
199            Value::String(vc) => {
200                let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
201                if authorized {
202                    insert_value(jrow, col_name, jv.clone());
203                } else {
204                    match authorization_level {
205                        AuthorizationLevel::Cell => insert_value(jrow, col_name, json!("d:NotAuthorized")),
206                        _ => {
207                            if res_format == &ResultFormat::Cols {
208                                insert_value(jrow, col_name, json!("d:NotAuthorized"))
209                            }
210                            return Ok(false);
211                        },
212                    }
213                }
214                Ok(true)
215            },
216            Value::Array(array) => {
217                let mut new_array = Vec::new();
218                for item in array {
219                    match item {
220                        Value::String(vc) => {
221                            let authorized = process_authorization(vc, user_uri, authorization_level, az).await?;
222                            if authorized {
223                                new_array.push(json!(vc));
224                            } else {
225                                match authorization_level {
226                                    AuthorizationLevel::Cell => new_array.push(json!("v-s:NotAuthorized")),
227                                    _ => {
228                                        if res_format == &ResultFormat::Cols {
229                                            new_array.push(json!("v-s:NotAuthorized"))
230                                        }
231                                        return Ok(false);
232                                    },
233                                }
234                            }
235                        },
236                        _ => new_array.push(item.clone()), // Non-string elements are inserted without changes
237                    }
238                }
239                insert_value(jrow, col_name, Value::Array(new_array));
240                Ok(true)
241            },
242            _ => {
243                insert_value(jrow, col_name, jv.clone());
244                Ok(true)
245            },
246        }
247    }
248
249    async fn process_authorization(vc: &str, user_uri: &str, authorization_level: &AuthorizationLevel, az: &AuthorizationProvider) -> Result<bool, Error> {
250        if (authorization_level == &AuthorizationLevel::Cell || authorization_level == &AuthorizationLevel::RowColumn) && is_identifier(vc) {
251            let authorized = az.authorize(vc, user_uri, Access::CanRead as u8, false).await?;
252            Ok(authorized == Access::CanRead as u8)
253        } else {
254            // If value is not an identifier, authorization is not required
255            Ok(true)
256        }
257    }
258
259    fn insert_value(jrow: &mut Value, col_name: &str, value: Value) {
260        if let Some(o) = jrow.as_object_mut() {
261            o.insert(col_name.to_owned(), value);
262        } else if let Some(o) = jrow.as_array_mut() {
263            o.push(value);
264        }
265    }
266
267    check_authorization(&jv, jrow, col.name(), user_uri, res_format, authorization_level, az).await
268}
269
270async fn col_to_json<K: v_clickhouse_rs::types::ColumnType>(
271    row: &Row<'_, K>,
272    col: &Column<K>,
273    jrow: &mut Value,
274    user_uri: &str,
275    res_format: &ResultFormat,
276    authorization_level: &AuthorizationLevel,
277    az: &AuthorizationProvider,
278) -> Result<bool, v_clickhouse_rs::errors::Error> {
279    let mut res = true;
280    let sql_type = col.sql_type();
281    match sql_type {
282        SqlType::UInt8 => {
283            res = cltjs::<K, u8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
284        },
285        SqlType::UInt16 => {
286            res = cltjs::<K, u16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
287        },
288        SqlType::UInt32 => {
289            res = cltjs::<K, u32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
290        },
291        SqlType::UInt64 => {
292            res = cltjs::<K, u64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
293        },
294        SqlType::Int8 => {
295            res = cltjs::<K, i8>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
296        },
297        SqlType::Int16 => {
298            res = cltjs::<K, i16>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
299        },
300        SqlType::Int32 => {
301            res = cltjs::<K, i32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
302        },
303        SqlType::Int64 => {
304            res = cltjs::<K, i64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
305        },
306        SqlType::String => {
307            res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
308        },
309        SqlType::FixedString(_) => {
310            res = cltjs::<K, String>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
311        },
312        SqlType::Float32 => {
313            res = cltjs::<K, f32>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
314        },
315        SqlType::Float64 => {
316            res = cltjs::<K, f64>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
317        },
318        SqlType::Date => {
319            let v: DateTime<Tz> = row.get(col.name())?;
320            if let Some(o) = jrow.as_object_mut() {
321                o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
322            } else if let Some(o) = jrow.as_array_mut() {
323                o.push(json!(v.date_naive().to_string()));
324            }
325        },
326        SqlType::DateTime(_) => {
327            let v: DateTime<Tz> = row.get(col.name())?;
328            if let Some(o) = jrow.as_object_mut() {
329                o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
330            } else if let Some(o) = jrow.as_array_mut() {
331                o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
332            }
333        },
334        SqlType::Decimal(_, _) => {
335            let v: f64 = row.get(col.name())?;
336            if let Some(o) = jrow.as_object_mut() {
337                o.insert(col.name().to_owned(), json!(v));
338            } else if let Some(o) = jrow.as_array_mut() {
339                o.push(json!(v));
340            }
341        },
342        SqlType::Array(ref stype) => match stype {
343            SqlType::UInt8 => {
344                res = cltjs::<K, Vec<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
345            },
346            SqlType::UInt16 => {
347                res = cltjs::<K, Vec<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
348            },
349            SqlType::UInt32 => {
350                res = cltjs::<K, Vec<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
351            },
352            SqlType::UInt64 => {
353                res = cltjs::<K, Vec<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
354            },
355            SqlType::Int8 => {
356                res = cltjs::<K, Vec<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
357            },
358            SqlType::Int16 => {
359                res = cltjs::<K, Vec<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
360            },
361            SqlType::Int32 => {
362                res = cltjs::<K, Vec<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
363            },
364            SqlType::Int64 => {
365                res = cltjs::<K, Vec<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
366            },
367            SqlType::String => {
368                res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
369            },
370            SqlType::FixedString(_) => {
371                res = cltjs::<K, Vec<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
372            },
373            SqlType::Float32 => {
374                res = cltjs::<K, Vec<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
375            },
376            SqlType::Float64 => {
377                res = cltjs::<K, Vec<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
378            },
379            SqlType::Date => {
380                let v: Vec<DateTime<Tz>> = row.get(col.name())?;
381                let mut a = vec![];
382                for ev in v {
383                    a.push(json!(ev.date_naive().to_string()));
384                }
385                if let Some(o) = jrow.as_object_mut() {
386                    o.insert(col.name().to_owned(), json!(a));
387                } else if let Some(o) = jrow.as_array_mut() {
388                    o.push(json!(a));
389                }
390            },
391            SqlType::DateTime(_) => {
392                let v: Vec<DateTime<Tz>> = row.get(col.name())?;
393                let mut a = vec![];
394                for ev in v {
395                    a.push(json!(ev.to_rfc3339_opts(SecondsFormat::Millis, false)));
396                }
397                if let Some(o) = jrow.as_object_mut() {
398                    o.insert(col.name().to_owned(), json!(a));
399                } else if let Some(o) = jrow.as_array_mut() {
400                    o.push(json!(a));
401                }
402            },
403            SqlType::Decimal(_, _) => {
404                let v: Vec<f64> = row.get(col.name())?;
405                let mut a = vec![];
406                for ev in v {
407                    a.push(json!(ev));
408                }
409                if let Some(o) = jrow.as_object_mut() {
410                    o.insert(col.name().to_owned(), json!(a));
411                } else if let Some(o) = jrow.as_array_mut() {
412                    o.push(json!(a));
413                }
414            },
415            _ => {
416                println!("unknown array type {:?}", stype);
417            },
418        },
419        SqlType::Nullable(ref inner_type) => match inner_type {
420            SqlType::UInt8 => {
421                res = cltjs::<K, Option<u8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
422            },
423            SqlType::UInt16 => {
424                res = cltjs::<K, Option<u16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
425            },
426            SqlType::UInt32 => {
427                res = cltjs::<K, Option<u32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
428            },
429            SqlType::UInt64 => {
430                res = cltjs::<K, Option<u64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
431            },
432            SqlType::Int8 => {
433                res = cltjs::<K, Option<i8>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
434            },
435            SqlType::Int16 => {
436                res = cltjs::<K, Option<i16>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
437            },
438            SqlType::Int32 => {
439                res = cltjs::<K, Option<i32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
440            },
441            SqlType::Int64 => {
442                res = cltjs::<K, Option<i64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
443            },
444            SqlType::Float32 => {
445                res = cltjs::<K, Option<f32>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
446            },
447            SqlType::Float64 => {
448                res = cltjs::<K, Option<f64>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
449            },
450            SqlType::String => {
451                res = cltjs::<K, Option<String>>(row, col, jrow, user_uri, res_format, authorization_level, az).await?;
452            },
453            SqlType::Date => {
454                let v: DateTime<Tz> = row.get(col.name())?;
455                if let Some(o) = jrow.as_object_mut() {
456                    o.insert(col.name().to_owned(), json!(v.date_naive().to_string()));
457                } else if let Some(o) = jrow.as_array_mut() {
458                    o.push(json!(v.date_naive().to_string()));
459                }
460            },
461            SqlType::DateTime(_) => {
462                let v: DateTime<Tz> = row.get(col.name())?;
463                if let Some(o) = jrow.as_object_mut() {
464                    o.insert(col.name().to_owned(), json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
465                } else if let Some(o) = jrow.as_array_mut() {
466                    o.push(json!(v.to_rfc3339_opts(SecondsFormat::Millis, false)));
467                }
468            },
469            _ => {
470                println!("unknown nullable type {:?}", inner_type);
471            },
472        },
473        _ => {
474            println!("unknown type {:?}", col.sql_type());
475        },
476    }
477    Ok(res)
478}
479
480async fn select_from_clickhouse(req: FTQuery, pool: &Pool, op_auth: OptAuthorize, out_res: &mut QueryResult, az: &mut LmdbAzContext) -> Result<(), Error> {
481    let mut authorized_count = 0;
482    let mut total_count = 0;
483
484    if req
485        .query
486        .to_uppercase()
487        .split([':', '-', ' ', '(', ')', '<', '<', '=', ','].as_ref())
488        .any(|x| x.trim() == "INSERT" || x.trim() == "UPDATE" || x.trim() == "DROP" || x.trim() == "DELETE" || x.trim() == "ALTER" || x.trim() == "EXEC")
489    {
490        out_res.result_code = ResultCode::BadRequest;
491        return Ok(());
492    }
493
494    let fq = if req.limit > 0 {
495        format!("{} LIMIT {} OFFSET {}", req.query, req.limit, req.from)
496    } else {
497        format!("{} OFFSET {}", req.query, req.from)
498    };
499
500    debug!("query={}", fq);
501
502    let mut client = pool.get_handle().await?;
503    let block = client.query(fq).fetch_all().await?;
504    for row in block.rows() {
505        total_count += 1;
506
507        let id: String = row.get(row.name(0)?)?;
508
509        if op_auth == OptAuthorize::YES {
510            let start = Instant::now();
511
512            match az.authorize(&id, &req.user, Access::CanRead as u8, false) {
513                Ok(res) => {
514                    if res == Access::CanRead as u8 {
515                        out_res.result.push(id);
516                        authorized_count += 1;
517
518                        if authorized_count >= req.top {
519                            break;
520                        }
521                    }
522                },
523                Err(e) => error!("fail authorization {}, err={}", req.user, e),
524            }
525            out_res.authorize_time += start.elapsed().as_micros() as i64;
526        } else {
527            out_res.result.push(id);
528        }
529
530        if req.limit > 0 && total_count >= req.limit {
531            break;
532        }
533    }
534
535    out_res.result_code = ResultCode::Ok;
536    out_res.estimated = (req.from + block.row_count() as i32) as i64;
537    out_res.count = authorized_count as i64;
538    out_res.processed = total_count as i64;
539    out_res.cursor = (req.from + total_count) as i64;
540    out_res.authorize_time /= 1000;
541
542    Ok(())
543}