#![recursion_limit = "1024"]
extern crate byteorder;
extern crate chrono;
extern crate chrono_tz;
extern crate clickhouse_rs_cityhash_sys;
extern crate core;
extern crate failure;
#[macro_use]
extern crate futures;
extern crate hostname;
#[cfg(test)]
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate log;
extern crate lz4;
#[cfg(test)]
extern crate rand;
extern crate tokio;
extern crate tokio_timer;
extern crate url;
use std::fmt;
use futures::{Future, Stream};
use tokio::prelude::*;
use crate::{
connecting_stream::ConnectingStream,
errors::{DriverError, Error},
io::{BoxFuture, ClickhouseTransport},
pool::PoolBinding,
retry_guard::RetryGuard,
types::{Block, Cmd, Context, IntoOptions, Options, OptionsSource, Packet, Query, QueryResult},
};
pub use crate::pool::Pool;
mod binary;
mod client_info;
mod connecting_stream;
pub mod errors;
mod io;
mod pool;
mod retry_guard;
pub mod types;
macro_rules! try_opt {
($expr:expr) => {
match $expr {
Ok(val) => val,
Err(err) => return Box::new(future::err(err)),
}
};
}
pub struct Client {
_private: (),
}
pub struct ClientHandle {
inner: Option<ClickhouseTransport>,
context: Context,
pool: PoolBinding,
}
impl fmt::Debug for ClientHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ClientHandle")
.field("server_info", &self.context.server_info)
.finish()
}
}
impl Client {
#[deprecated(since = "0.1.4", note = "please use Pool to connect")]
pub fn connect(options: Options) -> BoxFuture<ClientHandle> {
Self::open(&options.into_options_src())
}
pub(crate) fn open(source: &OptionsSource) -> BoxFuture<ClientHandle> {
let options = try_opt!(source.get()).as_ref().to_owned();
let compress = options.compression;
let timeout = options.connection_timeout;
let context = Context {
options: source.clone(),
..Context::default()
};
Box::new(
ConnectingStream::new(&options.addr)
.and_then(move |stream| {
stream.set_nodelay(options.nodelay)?;
stream.set_keepalive(options.keepalive)?;
let transport = ClickhouseTransport::new(stream, compress);
Ok(ClientHandle {
inner: Some(transport),
context,
pool: PoolBinding::None,
})
})
.map_err(|error| error.into())
.and_then(|client| client.hello())
.timeout(timeout)
.map_err(Error::from),
)
}
}
impl ClientHandle {
fn hello(mut self) -> BoxFuture<Self> {
let context = self.context.clone();
let pool = self.pool.clone();
info!("[hello] -> {:?}", &context);
Box::new(
self.inner
.take()
.unwrap()
.call(Cmd::Hello(context.clone()))
.fold(None, move |_, packet| match packet {
Packet::Hello(inner, server_info) => {
info!("[hello] <- {:?}", &server_info);
let context = Context {
server_info,
..context.clone()
};
let client = Self {
inner: Some(inner),
context,
pool: pool.clone(),
};
future::ok::<_, Error>(Some(client))
}
Packet::Exception(e) => future::err::<_, Error>(Error::Server(e)),
_ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
})
.map(Option::unwrap),
)
}
pub fn ping(mut self) -> BoxFuture<Self> {
let context = self.context.clone();
let timeout = try_opt!(self.context.options.get()).ping_timeout;
let pool = self.pool.clone();
info!("[ping]");
Box::new(
self.inner
.take()
.unwrap()
.call(Cmd::Ping)
.fold(None, move |_, packet| match packet {
Packet::Pong(inner) => {
let client = Self {
inner: Some(inner),
context: context.clone(),
pool: pool.clone(),
};
info!("[pong]");
future::ok::<_, Error>(Some(client))
}
Packet::Exception(exception) => {
future::err::<_, Error>(Error::Server(exception))
}
_ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
})
.map(Option::unwrap)
.timeout(timeout)
.map_err(Error::from),
)
}
pub fn query<Q>(self, sql: Q) -> QueryResult
where
Query: From<Q>,
{
let query = Query::from(sql);
QueryResult {
client: self,
query,
}
}
#[deprecated(since = "0.1.7", note = "please use query(sql).fetch_all() instead")]
pub fn query_all<Q>(self, sql: Q) -> BoxFuture<(Self, Block)>
where
Query: From<Q>,
{
self.query(sql).fetch_all()
}
pub fn execute<Q>(self, sql: Q) -> BoxFuture<Self>
where
Query: From<Q>,
{
let context = self.context.clone();
let pool = self.pool.clone();
let query = Query::from(sql);
self.wrap_future(|mut c| {
info!("[execute] {}", query.get_sql());
c.inner
.take()
.unwrap()
.call(Cmd::SendQuery(query, context.clone()))
.fold(None, move |acc, packet| match packet {
Packet::Eof(inner) => {
let client = Self {
inner: Some(inner),
context: context.clone(),
pool: pool.clone(),
};
future::ok::<_, Error>(Some(client))
}
Packet::Block(_) | Packet::ProfileInfo(_) | Packet::Progress(_) => {
future::ok::<_, Error>(acc)
}
Packet::Exception(exception) => {
future::err::<_, Error>(Error::Server(exception))
}
_ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
})
.map(Option::unwrap)
})
}
pub fn insert<Q>(self, table: Q, block: Block) -> BoxFuture<Self>
where
Query: From<Q>,
{
let names: Vec<_> = block
.as_ref()
.columns()
.iter()
.map(|column| column.name().to_string())
.collect();
let fields = names.join(", ");
let query = Query::from(table)
.map_sql(|table| format!("INSERT INTO {} ({}) VALUES", table, fields));
let context = self.context.clone();
let pool = self.pool.clone();
let send_cmd = Cmd::Union(
Box::new(Cmd::SendData(block, context.clone())),
Box::new(Cmd::SendData(Block::default(), context.clone())),
);
self.wrap_future(|mut c| {
info!("[insert] {}", query.get_sql());
c.inner
.take()
.unwrap()
.call(Cmd::SendQuery(query, context.clone()))
.read_block(context.clone(), pool.clone())
.and_then(move |mut c| {
c.inner
.take()
.unwrap()
.call(send_cmd)
.read_block(context, pool)
})
})
}
pub(crate) fn wrap_future<T, R, F>(self, f: F) -> BoxFuture<T>
where
F: FnOnce(Self) -> R + Send + 'static,
R: Future<Item = T, Error = Error> + Send + 'static,
T: Send + 'static,
{
let ping_before_query = try_opt!(self.context.options.get()).ping_before_query;
if ping_before_query {
Box::new(self.check_connection().and_then(move |c| Box::new(f(c))))
} else {
Box::new(f(self))
}
}
pub fn check_connection(mut self) -> BoxFuture<Self> {
let pool: Option<Pool> = self.pool.clone().into();
self.pool.detach();
let source = self.context.options.clone();
let (send_retries, retry_timeout) = {
let options = try_opt!(source.get());
(options.send_retries, options.retry_timeout)
};
let reconnect = move || -> BoxFuture<Self> {
warn!("[reconnect]");
match pool.clone() {
None => Client::open(&source),
Some(p) => Box::new(p.get_handle()),
}
};
Box::new(
RetryGuard::new(self, |c| c.ping(), reconnect, send_retries, retry_timeout).and_then(
|mut c| {
if !c.pool.is_attached() && c.pool.is_some() {
c.pool.attach();
}
Ok(c)
},
),
)
}
}
#[cfg(test)]
mod test_misc {
use std::env;
lazy_static! {
pub static ref DATABASE_URL: String =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
}
}