1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use crate::common::QueryResult;
use clickhouse_rs::errors::Error;
use clickhouse_rs::Pool;
use futures::executor::block_on;
use std::str;
use std::time::*;
use url::Url;
use v_api::app::{OptAuthorize, ResultCode};
use v_authorization::common::Access;
use v_az_lmdb::_authorize;

pub struct CHClient {
    client: Pool,
    addr: String,
    is_ready: bool,
}

impl CHClient {
    pub fn new(client_addr: String) -> CHClient {
        CHClient {
            client: Pool::new(String::new()),
            addr: client_addr,
            is_ready: false,
        }
    }

    pub fn connect(&mut self) -> bool {
        info!("Configuration to connect to Clickhouse: {}", self.addr);
        match Url::parse(self.addr.as_ref()) {
            Ok(url) => {
                let host = url.host_str().unwrap_or("127.0.0.1");
                let port = url.port().unwrap_or(9000);
                let user = url.username();
                let pass = url.password().unwrap_or("123");
                let url = format!("tcp://{}:{}@{}:{}/", user, pass, host, port);
                info!("Trying to connect to Clickhouse, host: {}, port: {}, user: {}, password: {}", host, port, user, pass);
                info!("Connection url: {}", url);
                let pool = Pool::new(url);
                self.client = pool;
                self.is_ready = true;
                true
            }
            Err(e) => {
                error!("Invalid connection url, err={:?}", e);
                self.is_ready = false;
                false
            }
        }
    }

    pub fn select(&mut self, user_uri: &str, query: &str, top: i64, limit: i64, from: i64, op_auth: OptAuthorize) -> QueryResult {
        let start = Instant::now();
        let mut res = QueryResult::default();

        if let Err(e) = block_on(select_from_clickhouse(&mut self.client, &user_uri, query, top, limit, from, op_auth, &mut res)) {
            error!("fail read from clickhouse: {:?}", e);
            res.result_code = ResultCode::InternalServerError
        }

        res.total_time = start.elapsed().as_millis() as i64;
        res.query_time = res.total_time - res.authorize_time;
        debug!("result={:?}", res);

        res
    }
}

async fn select_from_clickhouse(
    pool: &mut Pool,
    user_uri: &str,
    query: &str,
    top: i64,
    limit: i64,
    from: i64,
    op_auth: OptAuthorize,
    out_res: &mut QueryResult,
) -> Result<(), Error> {
    let mut authorized_count = 0;
    let mut total_count = 0;

    if query.to_uppercase().split_whitespace().any(|x| x == "INSERT" || x == "UPDATE" || x == "DROP" || x == "DELETE") {
        out_res.result_code = ResultCode::BadRequest;
        return Ok(());
    }

    let fq = format!("{} LIMIT {} OFFSET {}", query, limit, from);

    //info!("query={}", fq);

    let mut client = pool.get_handle().await?;
    let block = client.query(fq).fetch_all().await?;
    for row in block.rows() {
        total_count += 1;

        let id: String = row.get(row.name(0)?)?;

        if op_auth == OptAuthorize::YES {
            let start = Instant::now();

            match _authorize(&id, user_uri, Access::CanRead as u8, true, None) {
                Ok(res) => {
                    if res == Access::CanRead as u8 {
                        out_res.result.push(id);
                        authorized_count += 1;

                        if authorized_count >= top {
                            break;
                        }
                    }
                }
                Err(e) => error!("fail authorization {}, err={}", user_uri, e),
            }
            out_res.authorize_time += start.elapsed().as_micros() as i64;
        } else {
            out_res.result.push(id);
        }

        if total_count >= limit {
            break;
        }
    }

    out_res.result_code = ResultCode::Ok;
    out_res.estimated = from + (block.row_count() as i64);
    out_res.count = authorized_count;
    out_res.processed = total_count;
    out_res.cursor = from + total_count;
    out_res.authorize_time /= 1000;

    Ok(())
}