clickhouse_rs/
lib.rs

1//! ## clickhouse-rs
2//! Tokio based asynchronous [Yandex ClickHouse](https://clickhouse.yandex/) client library for rust programming language.
3//!
4//! ### Installation
5//! Library hosted on [crates.io](https://crates.io/crates/clickhouse-rs/).
6//!
7//! ```toml
8//! [dependencies]
9//! clickhouse-rs = "*"
10//! ```
11//!
12//! ### Supported data types
13//!
14//! * Date
15//! * DateTime
16//! * Decimal(P, S)
17//! * Float32, Float64
18//! * String, FixedString(N)
19//! * UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
20//! * Nullable(T)
21//! * Array(UInt/Int/String/Date/DateTime)
22//! * IPv4/IPv6
23//! * UUID
24//!
25//! ### DNS
26//!
27//! ```url
28//! schema://user:password@host[:port]/database?param1=value1&...&paramN=valueN
29//! ```
30//!
31//! parameters:
32//!
33//! - `compression` - Whether or not use compression (defaults to `none`). Possible choices:
34//!     * `none`
35//!     * `lz4`
36//!
37//! - `readonly` - Restricts permissions for read data, write data and change settings queries. (defaults to `none`). Possible choices:
38//!     * `0` - All queries are allowed.
39//!     * `1` - Only read data queries are allowed.
40//!     * `2` - Read data and change settings queries are allowed.
41//!
42//! - `connection_timeout` - Timeout for connection (defaults to `500 ms`)
43//! - `keepalive` - TCP keep alive timeout in milliseconds.
44//! - `nodelay` - Whether to enable `TCP_NODELAY` (defaults to `true`).
45//!
46//! - `pool_max` - Lower bound of opened connections for `Pool` (defaults to `10`).
47//! - `pool_min` - Upper bound of opened connections for `Pool` (defaults to `20`).
48//!
49//! - `ping_before_query` - Ping server every time before execute any query. (defaults to `true`).
50//! - `send_retries` - Count of retry to send request to server. (defaults to `3`).
51//! - `retry_timeout` - Amount of time to wait before next retry. (defaults to `5 sec`).
52//! - `ping_timeout` - Timeout for ping (defaults to `500 ms`).
53//!
54//! - `alt_hosts` - Comma separated list of single address host for load-balancing.
55//!
56//! - `query_timeout` - Timeout for queries (defaults to `180 sec`).
57//! - `query_block_timeout` - Timeout for each block in a query (defaults to `180 sec`).
58//! - `insert_timeout` - Timeout for inserts (defaults to `180 sec`).
59//! - `execute_timeout` - Timeout for execute (defaults to `180 sec`).
60//!
61//! SSL/TLS parameters:
62//!
63//! - `secure` - establish secure connection (defaults is `false`).
64//! - `skip_verify` - skip certificate verification (defaults is `false`).
65//!
66//! example:
67//! ```url
68//! tcp://user:password@host:9000/clicks?compression=lz4&ping_timeout=42ms
69//! ```
70//!
71//! ### Example
72//!
73//! ```rust
74//! use futures::Future;
75//! use clickhouse_rs::{Pool, types::Block};
76//! # use std::env;
77//!
78//!  let ddl = "
79//!      CREATE TABLE IF NOT EXISTS payment (
80//!          customer_id  UInt32,
81//!          amount       UInt32,
82//!          account_name Nullable(FixedString(3))
83//!      ) Engine=Memory";
84//!
85//!  let block = Block::new()
86//!      .column("customer_id",  vec![1_u32,  3,  5,  7,  9])
87//!      .column("amount",       vec![2_u32,  4,  6,  8, 10])
88//!      .column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
89//!
90//!  # let database_url = env::var("DATABASE_URL").unwrap_or("tcp://localhost:9000?compression=lz4".into());
91//!  let pool = Pool::new(database_url);
92//!
93//!  let done = pool
94//!     .get_handle()
95//!     .and_then(move |c| c.execute(ddl))
96//!     .and_then(move |c| c.insert("payment", block))
97//!     .and_then(move |c| c.query("SELECT * FROM payment").fetch_all())
98//!     .and_then(move |(_, block)| {
99//!         for row in block.rows() {
100//!             let id: u32     = row.get("customer_id")?;
101//!             let amount: u32 = row.get("amount")?;
102//!             let name: Option<&str>  = row.get("account_name")?;
103//!             println!("Found payment {}: {} {:?}", id, amount, name);
104//!         }
105//!         Ok(())
106//!     })
107//!     .map_err(|err| eprintln!("database error: {}", err));
108//!
109//! tokio::run(done)
110//! ```
111
112#![recursion_limit = "1024"]
113
114extern crate byteorder;
115extern crate chrono;
116extern crate chrono_tz;
117extern crate clickhouse_rs_cityhash_sys;
118extern crate core;
119#[macro_use]
120extern crate futures;
121extern crate hostname;
122#[macro_use]
123extern crate lazy_static;
124#[macro_use]
125extern crate log;
126extern crate lz4;
127#[cfg(test)]
128extern crate rand;
129extern crate tokio;
130extern crate tokio_timer;
131extern crate url;
132
133use std::{fmt, time::Duration};
134
135use futures::{Future, Stream};
136use tokio::prelude::*;
137
138pub use crate::pool::Pool;
139use crate::{
140    connecting_stream::ConnectingStream,
141    errors::{DriverError, Error},
142    io::{BoxFuture, BoxStream, ClickhouseTransport},
143    pool::PoolBinding,
144    retry_guard::RetryGuard,
145    types::{
146        set_exception_handle, Block, Cmd, Complex, Context, Either, IntoOptions, Options,
147        OptionsSource, Packet, Query, QueryResult,
148    },
149};
150
151mod binary;
152mod client_info;
153mod connecting_stream;
154/// Error types.
155pub mod errors;
156mod io;
157/// Pool types.
158pub mod pool;
159mod retry_guard;
160/// Clickhouse types.
161pub mod types;
162
163/// This macro is a convenient way to pass row into a block.
164///
165/// ```rust
166/// # use clickhouse_rs::{types::Block, row, errors::Error};
167/// # fn make_block() -> Result<(), Error> {
168///       let mut block = Block::new();
169///       block.push(row!{customer_id: 1, amount: 2, account_name: "foo"})?;
170///       block.push(row!{customer_id: 4, amount: 4, account_name: "bar"})?;
171///       block.push(row!{customer_id: 5, amount: 5, account_name: "baz"})?;
172/// #     assert_eq!(block.row_count(), 3);
173/// #     Ok(())
174/// # }
175/// # make_block().unwrap()
176/// ```
177///
178/// you can also use `Vec<(String, Value)>` to construct row to insert into a block:
179///
180/// ```rust
181/// # use clickhouse_rs::{types::Block, errors::Error, types::Value};
182/// # fn make_block() -> Result<(), Error> {
183///       let mut block = Block::new();
184///       for i in 1..10 {
185///           let mut row = Vec::new();
186///           for j in 1..10 {
187///               row.push((format!("#{}", j), Value::from(i * j)));
188///           }
189///           block.push(row)?;
190///       }
191///       assert_eq!(block.row_count(), 9);
192/// #     println!("{:?}", block);
193/// #     Ok(())
194/// # }
195/// # make_block().unwrap()
196/// ```
197#[macro_export]
198macro_rules! row {
199    () => { $crate::types::RNil };
200    ( $i:ident, $($tail:tt)* ) => {
201        row!( $($tail)* ).put(stringify!($i).into(), $i.into())
202    };
203    ( $i:ident ) => { row!($i: $i) };
204
205    ( $k:ident: $v:expr ) => {
206        $crate::types::RNil.put(stringify!($k).into(), $v.into())
207    };
208
209    ( $k:ident: $v:expr, $($tail:tt)* ) => {
210        row!( $($tail)* ).put(stringify!($k).into(), $v.into())
211    };
212
213    ( $k:expr => $v:expr ) => {
214        $crate::types::RNil.put($k.into(), $v.into())
215    };
216
217    ( $k:expr => $v:expr, $($tail:tt)* ) => {
218        row!( $($tail)* ).put($k.into(), $v.into())
219    };
220}
221
222macro_rules! try_opt {
223    ($expr:expr) => {
224        match $expr {
225            Ok(val) => val,
226            Err(err) => return Either::Left(future::err(err)),
227        }
228    };
229}
230
231#[doc(hidden)]
232pub struct Client {
233    _private: (),
234}
235
236/// Clickhouse client handle.
237pub struct ClientHandle {
238    inner: Option<ClickhouseTransport>,
239    context: Context,
240    pool: PoolBinding,
241}
242
243impl fmt::Debug for ClientHandle {
244    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
245        f.debug_struct("ClientHandle")
246            .field("server_info", &self.context.server_info)
247            .finish()
248    }
249}
250
251impl Client {
252    #[deprecated(since = "0.1.4", note = "please use Pool to connect")]
253    pub fn connect(options: Options) -> impl Future<Item = ClientHandle, Error = Error> {
254        Self::open(&options.into_options_src(), None)
255    }
256
257    pub(crate) fn open(
258        source: &OptionsSource,
259        pool: Option<Pool>,
260    ) -> impl Future<Item = ClientHandle, Error = Error> {
261        let options = try_opt!(source.get()).as_ref().to_owned();
262        let compress = options.compression;
263        let timeout = options.connection_timeout;
264
265        let context = Context {
266            options: source.clone(),
267            ..Context::default()
268        };
269
270        Either::Right(
271            future::lazy(move || {
272                let addr = match &pool {
273                    None => &options.addr,
274                    Some(p) => p.get_addr(),
275                };
276
277                info!("try to connect to {}", addr);
278                ConnectingStream::new(addr, &options)
279                    .and_then(move |mut stream| {
280                        stream.set_nodelay(options.nodelay)?;
281                        stream.set_keepalive(options.keepalive)?;
282
283                        let transport = ClickhouseTransport::new(stream, compress, pool);
284                        Ok(ClientHandle {
285                            inner: Some(transport),
286                            context,
287                            pool: PoolBinding::None,
288                        })
289                    })
290                    .map_err(Into::into)
291                    .and_then(ClientHandle::hello)
292                    .timeout(timeout)
293                    .map_err(Error::from)
294            }),
295        )
296    }
297}
298
299impl ClientHandle {
300    fn hello(mut self) -> impl Future<Item = Self, Error = Error> {
301        let context = self.context.clone();
302        let pool = self.pool.clone();
303        debug!("[hello] -> {:?}", &context);
304
305        self.inner
306            .take()
307            .unwrap()
308            .call(Cmd::Hello(context.clone()))
309            .fold(None, move |_, packet| match packet {
310                Packet::Hello(inner, server_info) => {
311                    info!("[hello] <- {:?}", &server_info);
312                    let context = Context {
313                        server_info,
314                        ..context.clone()
315                    };
316                    let client = Self {
317                        inner: Some(inner),
318                        context,
319                        pool: pool.clone(),
320                    };
321                    future::ok::<_, Error>(Some(client))
322                }
323                Packet::Exception(e, _) => future::err::<_, Error>(Error::Server(e)),
324                _ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
325            })
326            .map(Option::unwrap)
327    }
328
329    pub fn ping(mut self) -> impl Future<Item = Self, Error = Error> {
330        let context = self.context.clone();
331        let timeout = try_opt!(self.context.options.get()).ping_timeout;
332
333        let pool = self.pool.clone();
334        info!("[ping]");
335        let fut = self
336            .inner
337            .take()
338            .unwrap()
339            .call(Cmd::Ping)
340            .fold(None, move |_, packet| match packet {
341                Packet::Pong(inner) => {
342                    let client = Self {
343                        inner: Some(inner),
344                        context: context.clone(),
345                        pool: pool.clone(),
346                    };
347                    info!("[pong]");
348                    future::ok::<_, Error>(Some(client))
349                }
350                Packet::Exception(mut exception, transport) => {
351                    set_exception_handle(&mut exception, transport, context.clone(), pool.clone());
352                    future::err::<_, Error>(Error::Server(exception))
353                }
354                _ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
355            })
356            .map(Option::unwrap)
357            .timeout(timeout)
358            .map_err(Error::from);
359
360        Either::Right(fut)
361    }
362
363    /// Executes Clickhouse `query` on Conn.
364    pub fn query<Q>(self, sql: Q) -> QueryResult
365    where
366        Query: From<Q>,
367    {
368        let query = Query::from(sql);
369        QueryResult {
370            client: self,
371            query,
372        }
373    }
374
375    /// Fetch data from table. It returns a block that contains all rows.
376    #[deprecated(since = "0.1.7", note = "please use query(sql).fetch_all() instead")]
377    pub fn query_all<Q>(self, sql: Q) -> BoxFuture<(Self, Block<Complex>)>
378    where
379        Query: From<Q>,
380    {
381        self.query(sql).fetch_all()
382    }
383
384    /// Convenience method to prepare and execute a single SQL statement.
385    pub fn execute<Q>(self, sql: Q) -> impl Future<Item = Self, Error = Error>
386    where
387        Query: From<Q>,
388    {
389        let context = self.context.clone();
390        let pool = self.pool.clone();
391        let timeout = try_opt!(context.options.get()).execute_timeout;
392
393        let query = Query::from(sql);
394        let fut = self.wrap_future(move |mut c| {
395            info!("[execute]    {}", query.get_sql());
396
397            let future = c
398                .inner
399                .take()
400                .unwrap()
401                .call(Cmd::SendQuery(query, context.clone()))
402                .fold(None, move |acc, packet| match packet {
403                    Packet::Eof(inner) => {
404                        let client = Self {
405                            inner: Some(inner),
406                            context: context.clone(),
407                            pool: pool.clone(),
408                        };
409                        future::ok::<_, Error>(Some(client))
410                    }
411                    Packet::Block(_) | Packet::ProfileInfo(_) | Packet::Progress(_) => {
412                        future::ok::<_, Error>(acc)
413                    }
414                    Packet::Exception(mut exception, transport) => {
415                        set_exception_handle(
416                            &mut exception,
417                            transport,
418                            context.clone(),
419                            pool.clone(),
420                        );
421                        future::err::<_, Error>(Error::Server(exception))
422                    }
423                    _ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
424                })
425                .map(Option::unwrap);
426
427            with_timeout(future, timeout)
428        });
429
430        Either::Right(fut)
431    }
432
433    /// Convenience method to insert block of data.
434    pub fn insert<Q>(self, table: Q, block: Block) -> impl Future<Item = Self, Error = Error>
435    where
436        Query: From<Q>,
437    {
438        let mut names: Vec<_> = Vec::with_capacity(block.column_count());
439        for column in block.columns() {
440            names.push(try_opt!(column_name_to_string(column.name())));
441        }
442        let fields = names.join(", ");
443
444        let query = Query::from(table)
445            .map_sql(|table| format!("INSERT INTO {} ({}) VALUES", table, fields));
446
447        let context = self.context.clone();
448        let pool = self.pool.clone();
449        let timeout = try_opt!(context.options.get()).insert_timeout;
450
451        let fut = self.wrap_future(move |mut c| {
452            info!("[insert]     {}", query.get_sql());
453
454            let future = c
455                .inner
456                .take()
457                .unwrap()
458                .call(Cmd::SendQuery(query, context.clone()))
459                .read_block(context.clone(), pool.clone())
460                .and_then(move |(mut c, b)| -> BoxFuture<Self> {
461                    let dst_block = b.unwrap();
462
463                    let casted_block = match block.cast_to(&dst_block) {
464                        Ok(value) => value,
465                        Err(err) => return Box::new(future::err::<Self, Error>(err)),
466                    };
467
468                    let send_cmd = Cmd::Union(
469                        Box::new(Cmd::SendData(casted_block, context.clone())),
470                        Box::new(Cmd::SendData(Block::default(), context.clone())),
471                    );
472
473                    Box::new(
474                        c.inner
475                            .take()
476                            .unwrap()
477                            .call(send_cmd)
478                            .read_block(context, pool)
479                            .map(|(c, _)| c),
480                    )
481                });
482
483            with_timeout(future, timeout)
484        });
485
486        Either::Right(fut)
487    }
488
489    pub(crate) fn wrap_future<T, R, F>(self, f: F) -> impl Future<Item = T, Error = Error>
490    where
491        F: FnOnce(Self) -> R + Send + 'static,
492        R: Future<Item = T, Error = Error> + Send + 'static,
493        T: Send + 'static,
494    {
495        let ping_before_query = try_opt!(self.context.options.get()).ping_before_query;
496
497        let fut = if ping_before_query {
498            Either::Left(self.check_connection().and_then(move |c| Box::new(f(c))))
499        } else {
500            Either::Right(f(self))
501        };
502
503        Either::Right(fut)
504    }
505
506    pub(crate) fn wrap_stream<T, R, F>(self, f: F) -> BoxStream<T>
507    where
508        F: FnOnce(Self) -> R + Send + 'static,
509        R: Stream<Item = T, Error = Error> + Send + 'static,
510        T: Send + 'static,
511    {
512        let ping_before_query = match self.context.options.get() {
513            Ok(val) => val.ping_before_query,
514            Err(err) => return Box::new(stream::once(Err(err))),
515        };
516
517        if ping_before_query {
518            let fut = self
519                .check_connection()
520                .and_then(move |c| future::ok(Box::new(f(c))))
521                .flatten_stream();
522            Box::new(fut)
523        } else {
524            Box::new(f(self))
525        }
526    }
527
528    /// Check connection and try to reconnect if necessary.
529    pub fn check_connection(mut self) -> impl Future<Item = Self, Error = Error> {
530        let pool: Option<Pool> = self.pool.clone().into();
531        self.pool.detach();
532
533        let source = self.context.options.clone();
534
535        let (send_retries, retry_timeout) = match source.get() {
536            Ok(val) => (val.send_retries, val.retry_timeout),
537            Err(err) => return Either::Left(future::err(err)),
538        };
539
540        let reconnect = move || -> BoxFuture<Self> {
541            warn!("[reconnect]");
542            match pool.clone() {
543                None => Box::new(Client::open(&source, None)),
544                Some(p) => Box::new(p.get_handle()),
545            }
546        };
547
548        let fut = RetryGuard::new(
549            self,
550            |c| Box::new(c.ping()),
551            reconnect,
552            send_retries,
553            retry_timeout,
554        )
555        .and_then(|mut c| {
556            if !c.pool.is_attached() && c.pool.is_some() {
557                c.pool.attach();
558            }
559            Ok(c)
560        });
561
562        Either::Right(fut)
563    }
564
565    pub(crate) fn set_inside(&self, value: bool) {
566        if let Some(ref inner) = self.inner {
567            inner.set_inside(value);
568        } else {
569            unreachable!()
570        }
571    }
572}
573
574fn column_name_to_string(name: &str) -> Result<String, Error> {
575    if name.chars().all(|ch| ch.is_alphanumeric()) {
576        return Ok(name.to_string());
577    }
578
579    if name.chars().any(|ch| ch == '`') {
580        return Err(Error::Other("Column name shouldn't contains backticks.".into()));
581    }
582
583    Ok(format!("`{}`", name))
584}
585
586pub(crate) fn with_timeout<F>(
587    f: F,
588    timeout: Option<Duration>,
589) -> impl Future<Item = F::Item, Error = Error>
590where
591    F: Future<Error = Error> + Send + 'static,
592{
593    if let Some(timeout) = timeout {
594        Either::Left(f.timeout(timeout).map_err(|err| err.into()))
595    } else {
596        Either::Right(f)
597    }
598}
599
600#[cfg(test)]
601mod test_misc {
602    use crate::*;
603    use std::env;
604
605    lazy_static! {
606        pub static ref DATABASE_URL: String = env::var("DATABASE_URL").unwrap_or_else(|_| {
607            "tcp://localhost:9000?compression=lz4&ping_timeout=5s&retry_timeout=5s".into()
608        });
609    }
610
611    #[test]
612    fn test_column_name_to_string() {
613        assert_eq!(column_name_to_string("id").unwrap(), "id");
614        assert_eq!(column_name_to_string("ns:attr").unwrap(), "`ns:attr`");
615        assert!(column_name_to_string("`").is_err());
616    }
617}