clickhouse_rs_async/
lib.rs

1//! ## clickhouse-rs
2//! Asynchronous [Yandex ClickHouse](https://clickhouse.yandex/) client library for rust programming language.
3//!
4//! ### Installation
5//! Library hosted on [crates.io](https://crates.io/crates/clickhouse-rs/).
6//!
7//! ```toml
8//! [dependencies]
9//! clickhouse-rs = "*"
10//! ```
11//!
12//! ### Supported data types
13//!
14//! * Date
15//! * DateTime
16//! * Decimal(P, S)
17//! * Float32, Float64
18//! * String, FixedString(N)
19//! * UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
20//! * Nullable(T)
21//! * Array(UInt/Int/String/Date/DateTime)
22//! * SimpleAggregateFunction(F, T)
23//! * IPv4/IPv6
24//! * UUID
25//!
26//! ### DNS
27//!
28//! ```url
29//! schema://user:password@host[:port]/database?param1=value1&...&paramN=valueN
30//! ```
31//!
32//! parameters:
33//!
34//! - `compression` - Whether or not use compression (defaults to `none`). Possible choices:
35//!     * `none`
36//!     * `lz4`
37//!
38//! - `readonly` - Restricts permissions for read data, write data and change settings queries. (defaults to `none`). Possible choices:
39//!     * `0` - All queries are allowed.
40//!     * `1` - Only read data queries are allowed.
41//!     * `2` - Read data and change settings queries are allowed.
42//!
43//! - `connection_timeout` - Timeout for connection (defaults to `500 ms`)
44//! - `keepalive` - TCP keep alive timeout in milliseconds.
45//! - `nodelay` - Whether to enable `TCP_NODELAY` (defaults to `true`).
46//!
47//! - `pool_min` - Lower bound of opened connections for `Pool` (defaults to `10`).
48//! - `pool_max` - Upper bound of opened connections for `Pool` (defaults to `20`).
49//!
50//! - `ping_before_query` - Ping server every time before execute any query. (defaults to `true`).
51//! - `send_retries` - Count of retry to send request to server. (defaults to `3`).
52//! - `retry_timeout` - Amount of time to wait before next retry. (defaults to `5 sec`).
53//! - `ping_timeout` - Timeout for ping (defaults to `500 ms`).
54//!
55//! - `alt_hosts` - Comma separated list of single address host for load-balancing.
56//!
57//! example:
58//! ```url
59//! tcp://user:password@host:9000/clicks?compression=lz4&ping_timeout=42ms
60//! ```
61//!
62//! ## Optional features
63//!
64//! `clickhouse-rs` puts some functionality behind optional features to optimize compile time
65//! for the most common use cases. The following features are available.
66//!
67//! - `tokio_io` *(enabled by default)* — I/O based on [Tokio](https://tokio.rs/).
68//! - `async_std` — I/O based on [async-std](https://async.rs/) (doesn't work together with `tokio_io`).
69//! - `tls-native-tls` — TLS support with native-tls (allowed only with `tokio_io`).
70//! - `tls-rustls` — TLS support with rustls (allowed only with `tokio_io`).
71//!
72//! ### Example
73//!
74//! ```rust
75//! # use std::env;
76//! use clickhouse_rs::{Block, Pool, errors::Error};
77//!
78//! #[tokio::main]
79//! async fn main() -> Result<(), Error> {
80//!     let ddl = r"
81//!         CREATE TABLE IF NOT EXISTS payment (
82//!             customer_id  UInt32,
83//!             amount       UInt32,
84//!             account_name Nullable(FixedString(3))
85//!         ) Engine=Memory";
86//!
87//!     let block = Block::new()
88//!         .column("customer_id",  vec![1_u32,  3,  5,  7,  9])
89//!         .column("amount",       vec![2_u32,  4,  6,  8, 10])
90//!         .column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
91//!
92//!     # let database_url = env::var("DATABASE_URL").unwrap_or("tcp://localhost:9000?compression=lz4".into());
93//!     let pool = Pool::new(database_url);
94//!
95//!     let mut client = pool.get_handle().await?;
96//!     client.execute(ddl).await?;
97//!     client.insert("payment", block).await?;
98//!     let block = client.query("SELECT * FROM payment").fetch_all().await?;
99//!
100//!     for row in block.rows() {
101//!         let id: u32             = row.get("customer_id")?;
102//!         let amount: u32         = row.get("amount")?;
103//!         let name: Option<&str>  = row.get("account_name")?;
104//!         println!("Found payment {id}: {amount} {name:?}");
105//!     }
106//!     Ok(())
107//! }
108//! ```
109
110#![recursion_limit = "1024"]
111
112#[cfg(all(feature = "tls-native-tls", feature = "tls-rustls"))]
113compile_error!("tls-native-tls and tls-rustls are mutually exclusive and cannot be enabled together");
114
115use std::{fmt, future::Future, time::Duration};
116
117use futures_util::{
118    future, future::BoxFuture, future::FutureExt, stream, stream::BoxStream, StreamExt,
119};
120use log::{info, warn};
121
122use crate::{
123    connecting_stream::ConnectingStream,
124    errors::{DriverError, Error, Result},
125    io::ClickhouseTransport,
126    pool::PoolBinding,
127    retry_guard::retry_guard,
128    types::{
129        block::{ChunkIterator, INSERT_BLOCK_SIZE},
130        query_result::stream_blocks::BlockStream,
131        Cmd, Context, IntoOptions, OptionsSource, Packet, Query, QueryResult, SqlType,
132    },
133};
134pub use crate::{
135    errors::ConnectionError,
136    pool::Pool,
137    types::{block::Block, Options, Simple},
138};
139
140mod binary;
141mod client_info;
142mod connecting_stream;
143/// Error types.
144pub mod errors;
145mod io;
146/// Pool types.
147pub mod pool;
148mod retry_guard;
149/// Clickhouse types.
150pub mod types;
151
152/// This macro is a convenient way to pass row into a block.
153///
154/// ```rust
155/// # use clickhouse_rs::{Block, row, errors::Error};
156/// # fn make_block() -> Result<(), Error> {
157///       let mut block = Block::new();
158///       block.push(row!{customer_id: 1, amount: 2, account_name: "foo"})?;
159///       block.push(row!{customer_id: 4, amount: 4, account_name: "bar"})?;
160///       block.push(row!{customer_id: 5, amount: 5, account_name: "baz"})?;
161/// #     assert_eq!(block.row_count(), 3);
162/// #     Ok(())
163/// # }
164/// # make_block().unwrap()
165/// ```
166///
167/// If a column name has special characters, you can use the alternative syntax
168/// with `=>` to pass an expression as column name:
169///
170/// ```rust
171/// # use clickhouse_rs::{Block, row, errors::Error};
172/// # fn make_block() -> Result<(), Error> {
173///       let mut block = Block::new();
174///       block.push(row!{"customer.id" => 1, amount: 2, "account.name" => "foo"})?;
175///       block.push(row!{"customer.id" => 4, amount: 4, "account.name" => "bar"})?;
176///       block.push(row!{"customer.id" => 5, amount: 5, "account.name" => "baz"})?;
177/// #     assert_eq!(block.row_count(), 3);
178/// #     Ok(())
179/// # }
180/// # make_block().unwrap()
181/// ```
182///
183/// You can also use `Vec<(String, Value)>` to construct a row and insert it into a block:
184///
185/// ```rust
186/// # use clickhouse_rs::{Block, errors::Error, types::Value};
187/// # fn make_block() -> Result<(), Error> {
188///       let mut block = Block::new();
189///       for i in 1..10 {
190///           let mut row = Vec::new();
191///           for j in 1..10 {
192///               row.push((format!("#{}", j), Value::from(i * j)));
193///           }
194///           block.push(row)?;
195///       }
196///       assert_eq!(block.row_count(), 9);
197/// #     println!("{:?}", block);
198/// #     Ok(())
199/// # }
200/// # make_block().unwrap()
201/// ```
202#[macro_export]
203macro_rules! row {
204    () => { $crate::types::RNil };
205    ( $i:ident, $($tail:tt)* ) => {
206        row!( $($tail)* ).put(stringify!($i).into(), $i.into())
207    };
208    ( $i:ident ) => { row!($i: $i) };
209
210    ( $k:ident: $v:expr ) => {
211        $crate::types::RNil.put(stringify!($k).into(), $v.into())
212    };
213
214    ( $k:ident: $v:expr, $($tail:tt)* ) => {
215        row!( $($tail)* ).put(stringify!($k).into(), $v.into())
216    };
217
218    ( $k:expr => $v:expr ) => {
219        $crate::types::RNil.put($k.into(), $v.into())
220    };
221
222    ( $k:expr => $v:expr, $($tail:tt)* ) => {
223        row!( $($tail)* ).put($k.into(), $v.into())
224    };
225}
226
227#[macro_export]
228macro_rules! try_opt {
229    ($expr:expr) => {
230        match $expr {
231            Ok(val) => val,
232            Err(err) => return Err(err),
233        }
234    };
235}
236
237#[doc(hidden)]
238pub struct Client {
239    _private: (),
240}
241
242/// Clickhouse client handle.
243pub struct ClientHandle {
244    inner: Option<ClickhouseTransport>,
245    context: Context,
246    pool: PoolBinding,
247}
248
249impl fmt::Debug for ClientHandle {
250    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251        f.debug_struct("ClientHandle")
252            .field("server_info", &self.context.server_info)
253            .finish()
254    }
255}
256
257impl Client {
258    #[deprecated(since = "0.1.4", note = "please use Pool to connect")]
259    pub async fn connect(options: Options) -> Result<ClientHandle> {
260        let source = options.into_options_src();
261        Self::open(source, None).await
262    }
263
264    pub(crate) async fn open(source: OptionsSource, pool: Option<Pool>) -> Result<ClientHandle> {
265        let options = try_opt!(source.get());
266        let compress = options.compression;
267        let timeout = options.connection_timeout;
268
269        let context = Context {
270            options: source.clone(),
271            ..Context::default()
272        };
273
274        with_timeout(
275            async move {
276                let addr = match &pool {
277                    None => &options.addr,
278                    Some(p) => p.get_addr(),
279                };
280
281                info!("try to connect to {}", addr);
282                if addr.port() == Some(8123) {
283                    warn!("You should use port 9000 instead of 8123 because clickhouse-rs work through the binary interface.");
284                }
285                let mut stream = ConnectingStream::new(addr, &options).await?;
286                stream.set_nodelay(options.nodelay)?;
287                stream.set_keepalive(options.keepalive)?;
288
289                let transport = ClickhouseTransport::new(stream, compress, pool.clone());
290                let mut handle = ClientHandle {
291                    inner: Some(transport),
292                    context,
293                    pool: match pool {
294                        None => PoolBinding::None,
295                        Some(p) => PoolBinding::Detached(p),
296                    },
297                };
298
299                handle.hello().await?;
300                Ok(handle)
301            },
302            timeout,
303        )
304        .await
305    }
306}
307
308impl ClientHandle {
309    pub(crate) async fn hello(&mut self) -> Result<()> {
310        let context = self.context.clone();
311        info!("[hello] -> {:?}", &context);
312
313        let mut h = None;
314        let mut info = None;
315        let mut stream = self.inner.take().unwrap().call(Cmd::Hello(context.clone()));
316
317        while let Some(packet) = stream.next().await {
318            match packet {
319                Ok(Packet::Hello(inner, server_info)) => {
320                    info!("[hello] <- {:?}", &server_info);
321                    h = Some(inner);
322                    info = Some(server_info);
323                }
324                Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
325                Err(e) => return Err(Error::Io(e)),
326                _ => return Err(Error::Driver(DriverError::UnexpectedPacket)),
327            }
328        }
329
330        self.inner = h;
331        self.context.server_info = info.unwrap();
332        Ok(())
333    }
334
335    pub async fn ping(&mut self) -> Result<()> {
336        let timeout = try_opt!(self.context.options.get()).ping_timeout;
337
338        with_timeout(
339            async move {
340                info!("[ping]");
341
342                let mut h = None;
343
344                let transport = self.get_inner()?.clear().await?;
345                let mut stream = transport.call(Cmd::Ping);
346
347                while let Some(packet) = stream.next().await {
348                    match packet {
349                        Ok(Packet::Pong(inner)) => {
350                            info!("[pong]");
351                            h = Some(inner);
352                        }
353                        Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
354                        Err(e) => return Err(Error::Io(e)),
355                        _ => return Err(Error::Driver(DriverError::UnexpectedPacket)),
356                    }
357                }
358
359                match h {
360                    None => Err(Error::Connection(ConnectionError::Broken)),
361                    Some(h) => {
362                        self.inner = Some(h);
363                        Ok(())
364                    }
365                }
366            },
367            timeout,
368        )
369        .await
370    }
371
372    /// Executes Clickhouse `query` on Conn.
373    pub fn query<Q>(&mut self, sql: Q) -> QueryResult
374    where
375        Query: From<Q>,
376    {
377        let query = Query::from(sql);
378        QueryResult {
379            client: self,
380            query,
381        }
382    }
383
384    /// Convenience method to prepare and execute a single SQL statement.
385    pub async fn execute<Q>(&mut self, sql: Q) -> Result<()>
386    where
387        Query: From<Q>,
388    {
389        let transport = self.execute_(sql).await?;
390        self.inner = Some(transport);
391        Ok(())
392    }
393
394    async fn execute_<Q>(&mut self, sql: Q) -> Result<ClickhouseTransport>
395    where
396        Query: From<Q>,
397    {
398        let timeout = try_opt!(self.context.options.get())
399            .execute_timeout
400            .unwrap_or_else(|| Duration::from_secs(0));
401        let context = self.context.clone();
402        let query = Query::from(sql);
403        with_timeout(
404            async {
405                self.wrap_future(move |c| {
406                    info!("[execute query] {}", query.get_sql());
407
408                    let transport = c.get_inner();
409
410                    async move {
411                        let transport = transport?;
412                        let mut h = None;
413
414                        let transport = transport.clear().await?;
415                        let mut stream = transport.call(Cmd::SendQuery(query, context.clone()));
416
417                        while let Some(packet) = stream.next().await {
418                            match packet {
419                                Ok(Packet::Eof(inner)) => h = Some(inner),
420                                Ok(Packet::Block(_))
421                                | Ok(Packet::ProfileInfo(_))
422                                | Ok(Packet::Progress(_)) => (),
423                                Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
424                                Err(e) => return Err(Error::Io(e)),
425                                _ => return Err(Error::Driver(DriverError::UnexpectedPacket)),
426                            }
427                        }
428
429                        Ok(h.unwrap())
430                    }
431                })
432                .await
433            },
434            timeout,
435        )
436        .await
437    }
438
439    /// Convenience method to insert block of data.
440    pub async fn insert<Q, B>(&mut self, table: Q, block: B) -> Result<()>
441    where
442        Query: From<Q>,
443        B: AsRef<Block>,
444    {
445        let query = Self::make_query(table, block.as_ref())?;
446        let transport = self.insert_(query.clone(), block.as_ref()).await?;
447        self.inner = Some(transport);
448        Ok(())
449    }
450
451    async fn insert_(&mut self, query: Query, block: &Block) -> Result<ClickhouseTransport> {
452        let timeout = try_opt!(self.context.options.get())
453            .insert_timeout
454            .unwrap_or_else(|| Duration::from_secs(0));
455
456        let context = self.context.clone();
457
458        with_timeout(
459            async {
460                self.wrap_future(move |c| {
461                    info!("[insert]     {}", query.get_sql());
462                    let transport = c.get_inner();
463
464                    async move {
465                        let transport = transport?.clear().await?;
466                        let (transport, dst_block) =
467                            Self::send_insert_query_(transport, context.clone(), query.clone())
468                                .await?;
469                        let casted_block = block.cast_to(&dst_block)?;
470                        let mut chunks = casted_block.chunks(INSERT_BLOCK_SIZE);
471                        let transport =
472                            Self::insert_block_(transport, context.clone(), chunks.next().unwrap())
473                                .await?;
474                        Self::insert_tail_(transport, context, query, chunks).await
475                    }
476                })
477                .await
478            },
479            timeout,
480        )
481        .await
482    }
483
484    async fn insert_tail_(
485        mut transport: ClickhouseTransport,
486        context: Context,
487        query: Query,
488        chunks: ChunkIterator<Simple>,
489    ) -> Result<ClickhouseTransport> {
490        for chunk in chunks {
491            let (transport_, _) =
492                Self::send_insert_query_(transport, context.clone(), query.clone()).await?;
493            transport = Self::insert_block_(transport_, context.clone(), chunk).await?;
494        }
495        Ok(transport)
496    }
497
498    async fn send_insert_query_(
499        transport: ClickhouseTransport,
500        context: Context,
501        query: Query,
502    ) -> Result<(ClickhouseTransport, Block)> {
503        let stream = transport.call(Cmd::SendQuery(query, context));
504        let (transport, b) = stream.read_block().await?;
505        let dst_block = b.unwrap();
506        Ok((transport, dst_block))
507    }
508
509    async fn insert_block_(
510        transport: ClickhouseTransport,
511        context: Context,
512        block: Block,
513    ) -> Result<ClickhouseTransport> {
514        let send_cmd = Cmd::Union(
515            Box::new(Cmd::SendData(block, context.clone())),
516            Box::new(Cmd::SendData(Block::default(), context)),
517        );
518        let (transport, _) = transport.call(send_cmd).read_block().await?;
519        Ok(transport)
520    }
521
522    fn make_query<Q>(table: Q, block: &Block) -> Result<Query>
523    where
524        Query: From<Q>,
525    {
526        let mut names: Vec<_> = Vec::with_capacity(block.as_ref().column_count());
527        for column in block.as_ref().columns() {
528            names.push(try_opt!(column_name_to_string(column.name())));
529        }
530        let fields = names.join(", ");
531        Ok(Query::from(table).map_sql(|table| format!("INSERT INTO {table} ({fields}) VALUES")))
532    }
533
534    pub(crate) async fn wrap_future<T, R, F>(&mut self, f: F) -> Result<T>
535    where
536        F: FnOnce(&mut Self) -> R + Send,
537        R: Future<Output = Result<T>>,
538        T: 'static,
539    {
540        let ping_before_query = try_opt!(self.context.options.get()).ping_before_query;
541
542        if ping_before_query {
543            self.check_connection().await?;
544        }
545        f(self).await
546    }
547
548    pub(crate) fn wrap_stream<'a, F>(&'a mut self, f: F) -> BoxStream<'a, Result<Block>>
549    where
550        F: (FnOnce(&'a mut Self) -> Result<BlockStream<'a>>) + Send + 'static,
551    {
552        let ping_before_query = match self.context.options.get() {
553            Ok(val) => val.ping_before_query,
554            Err(err) => return Box::pin(stream::once(future::err(err))),
555        };
556
557        if ping_before_query {
558            let fut: BoxFuture<'a, BoxStream<'a, Result<Block>>> = Box::pin(async move {
559                let inner: BoxStream<'a, Result<Block>> = match self.check_connection().await {
560                    Ok(_) => match f(self) {
561                        Ok(s) => Box::pin(s),
562                        Err(err) => Box::pin(stream::once(future::err(err))),
563                    },
564                    Err(err) => Box::pin(stream::once(future::err(err))),
565                };
566                inner
567            });
568
569            Box::pin(fut.flatten_stream())
570        } else {
571            match f(self) {
572                Ok(s) => Box::pin(s),
573                Err(err) => Box::pin(stream::once(future::err(err))),
574            }
575        }
576    }
577
578    /// Check connection and try to reconnect if necessary.
579    pub async fn check_connection(&mut self) -> Result<()> {
580        self.pool.detach();
581
582        let source = self.context.options.clone();
583        let pool = self.pool.clone();
584
585        let (send_retries, retry_timeout) = {
586            let options = try_opt!(source.get());
587            (options.send_retries, options.retry_timeout)
588        };
589
590        retry_guard(self, &source, pool.into(), send_retries, retry_timeout).await?;
591
592        if !self.pool.is_attached() && self.pool.is_some() {
593            self.pool.attach();
594        }
595
596        Ok(())
597    }
598
599    pub(crate) fn set_inside(&self, value: bool) {
600        if let Some(ref inner) = self.inner {
601            inner.set_inside(value);
602        } else {
603            unreachable!()
604        }
605    }
606
607    fn get_inner(&mut self) -> Result<ClickhouseTransport> {
608        self.inner
609            .take()
610            .ok_or_else(|| Error::Connection(ConnectionError::Broken))
611    }
612}
613
614fn column_name_to_string(name: &str) -> Result<String> {
615    if name.chars().all(|ch| ch.is_numeric()) {
616        return Ok(name.to_string());
617    }
618
619    if name.chars().any(|ch| ch == '`') {
620        let err = format!("Column name {name:?} shouldn't contains backticks.");
621        return Err(Error::Other(err.into()));
622    }
623
624    Ok(format!("`{name}`"))
625}
626
627#[cfg(feature = "async_std")]
628async fn with_timeout<F, T>(future: F, duration: Duration) -> F::Output
629where
630    F: Future<Output = Result<T>>,
631{
632    use async_std::io;
633    use futures_util::future::TryFutureExt;
634
635    io::timeout(duration, future.map_err(Into::into))
636        .map_err(Into::into)
637        .await
638}
639
640#[cfg(not(feature = "async_std"))]
641async fn with_timeout<F, T>(future: F, timeout: Duration) -> F::Output
642where
643    F: Future<Output = Result<T>>,
644{
645    tokio::time::timeout(timeout, future).await?
646}
647
648#[cfg(test)]
649pub(crate) mod test_misc {
650    use crate::*;
651    use std::env;
652
653    use lazy_static::lazy_static;
654
655    lazy_static! {
656        pub static ref DATABASE_URL: String = env::var("DATABASE_URL").unwrap_or_else(|_| {
657            "tcp://localhost:9000?compression=lz4&ping_timeout=1s&retry_timeout=2s".into()
658        });
659    }
660
661    #[test]
662    fn test_column_name_to_string() {
663        assert_eq!(column_name_to_string("id").unwrap(), "`id`");
664        assert_eq!(column_name_to_string("234").unwrap(), "234");
665        assert_eq!(column_name_to_string("ns:attr").unwrap(), "`ns:attr`");
666        assert!(column_name_to_string("`").is_err());
667    }
668}