1#![recursion_limit = "1024"]
113
114extern crate byteorder;
115extern crate chrono;
116extern crate chrono_tz;
117extern crate clickhouse_rs_cityhash_sys;
118extern crate core;
119#[macro_use]
120extern crate futures;
121extern crate hostname;
122#[macro_use]
123extern crate lazy_static;
124#[macro_use]
125extern crate log;
126extern crate lz4;
127#[cfg(test)]
128extern crate rand;
129extern crate tokio;
130extern crate tokio_timer;
131extern crate url;
132
133use std::{fmt, time::Duration};
134
135use futures::{Future, Stream};
136use tokio::prelude::*;
137
138pub use crate::pool::Pool;
139use crate::{
140 connecting_stream::ConnectingStream,
141 errors::{DriverError, Error},
142 io::{BoxFuture, BoxStream, ClickhouseTransport},
143 pool::PoolBinding,
144 retry_guard::RetryGuard,
145 types::{
146 set_exception_handle, Block, Cmd, Complex, Context, Either, IntoOptions, Options,
147 OptionsSource, Packet, Query, QueryResult,
148 },
149};
150
151mod binary;
152mod client_info;
153mod connecting_stream;
154pub mod errors;
156mod io;
157pub mod pool;
159mod retry_guard;
160pub mod types;
162
163#[macro_export]
198macro_rules! row {
199 () => { $crate::types::RNil };
200 ( $i:ident, $($tail:tt)* ) => {
201 row!( $($tail)* ).put(stringify!($i).into(), $i.into())
202 };
203 ( $i:ident ) => { row!($i: $i) };
204
205 ( $k:ident: $v:expr ) => {
206 $crate::types::RNil.put(stringify!($k).into(), $v.into())
207 };
208
209 ( $k:ident: $v:expr, $($tail:tt)* ) => {
210 row!( $($tail)* ).put(stringify!($k).into(), $v.into())
211 };
212
213 ( $k:expr => $v:expr ) => {
214 $crate::types::RNil.put($k.into(), $v.into())
215 };
216
217 ( $k:expr => $v:expr, $($tail:tt)* ) => {
218 row!( $($tail)* ).put($k.into(), $v.into())
219 };
220}
221
222macro_rules! try_opt {
223 ($expr:expr) => {
224 match $expr {
225 Ok(val) => val,
226 Err(err) => return Either::Left(future::err(err)),
227 }
228 };
229}
230
231#[doc(hidden)]
232pub struct Client {
233 _private: (),
234}
235
236pub struct ClientHandle {
238 inner: Option<ClickhouseTransport>,
239 context: Context,
240 pool: PoolBinding,
241}
242
243impl fmt::Debug for ClientHandle {
244 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
245 f.debug_struct("ClientHandle")
246 .field("server_info", &self.context.server_info)
247 .finish()
248 }
249}
250
251impl Client {
252 #[deprecated(since = "0.1.4", note = "please use Pool to connect")]
253 pub fn connect(options: Options) -> impl Future<Item = ClientHandle, Error = Error> {
254 Self::open(&options.into_options_src(), None)
255 }
256
257 pub(crate) fn open(
258 source: &OptionsSource,
259 pool: Option<Pool>,
260 ) -> impl Future<Item = ClientHandle, Error = Error> {
261 let options = try_opt!(source.get()).as_ref().to_owned();
262 let compress = options.compression;
263 let timeout = options.connection_timeout;
264
265 let context = Context {
266 options: source.clone(),
267 ..Context::default()
268 };
269
270 Either::Right(
271 future::lazy(move || {
272 let addr = match &pool {
273 None => &options.addr,
274 Some(p) => p.get_addr(),
275 };
276
277 info!("try to connect to {}", addr);
278 ConnectingStream::new(addr, &options)
279 .and_then(move |mut stream| {
280 stream.set_nodelay(options.nodelay)?;
281 stream.set_keepalive(options.keepalive)?;
282
283 let transport = ClickhouseTransport::new(stream, compress, pool);
284 Ok(ClientHandle {
285 inner: Some(transport),
286 context,
287 pool: PoolBinding::None,
288 })
289 })
290 .map_err(Into::into)
291 .and_then(ClientHandle::hello)
292 .timeout(timeout)
293 .map_err(Error::from)
294 }),
295 )
296 }
297}
298
299impl ClientHandle {
300 fn hello(mut self) -> impl Future<Item = Self, Error = Error> {
301 let context = self.context.clone();
302 let pool = self.pool.clone();
303 debug!("[hello] -> {:?}", &context);
304
305 self.inner
306 .take()
307 .unwrap()
308 .call(Cmd::Hello(context.clone()))
309 .fold(None, move |_, packet| match packet {
310 Packet::Hello(inner, server_info) => {
311 info!("[hello] <- {:?}", &server_info);
312 let context = Context {
313 server_info,
314 ..context.clone()
315 };
316 let client = Self {
317 inner: Some(inner),
318 context,
319 pool: pool.clone(),
320 };
321 future::ok::<_, Error>(Some(client))
322 }
323 Packet::Exception(e, _) => future::err::<_, Error>(Error::Server(e)),
324 _ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
325 })
326 .map(Option::unwrap)
327 }
328
329 pub fn ping(mut self) -> impl Future<Item = Self, Error = Error> {
330 let context = self.context.clone();
331 let timeout = try_opt!(self.context.options.get()).ping_timeout;
332
333 let pool = self.pool.clone();
334 info!("[ping]");
335 let fut = self
336 .inner
337 .take()
338 .unwrap()
339 .call(Cmd::Ping)
340 .fold(None, move |_, packet| match packet {
341 Packet::Pong(inner) => {
342 let client = Self {
343 inner: Some(inner),
344 context: context.clone(),
345 pool: pool.clone(),
346 };
347 info!("[pong]");
348 future::ok::<_, Error>(Some(client))
349 }
350 Packet::Exception(mut exception, transport) => {
351 set_exception_handle(&mut exception, transport, context.clone(), pool.clone());
352 future::err::<_, Error>(Error::Server(exception))
353 }
354 _ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
355 })
356 .map(Option::unwrap)
357 .timeout(timeout)
358 .map_err(Error::from);
359
360 Either::Right(fut)
361 }
362
363 pub fn query<Q>(self, sql: Q) -> QueryResult
365 where
366 Query: From<Q>,
367 {
368 let query = Query::from(sql);
369 QueryResult {
370 client: self,
371 query,
372 }
373 }
374
375 #[deprecated(since = "0.1.7", note = "please use query(sql).fetch_all() instead")]
377 pub fn query_all<Q>(self, sql: Q) -> BoxFuture<(Self, Block<Complex>)>
378 where
379 Query: From<Q>,
380 {
381 self.query(sql).fetch_all()
382 }
383
384 pub fn execute<Q>(self, sql: Q) -> impl Future<Item = Self, Error = Error>
386 where
387 Query: From<Q>,
388 {
389 let context = self.context.clone();
390 let pool = self.pool.clone();
391 let timeout = try_opt!(context.options.get()).execute_timeout;
392
393 let query = Query::from(sql);
394 let fut = self.wrap_future(move |mut c| {
395 info!("[execute] {}", query.get_sql());
396
397 let future = c
398 .inner
399 .take()
400 .unwrap()
401 .call(Cmd::SendQuery(query, context.clone()))
402 .fold(None, move |acc, packet| match packet {
403 Packet::Eof(inner) => {
404 let client = Self {
405 inner: Some(inner),
406 context: context.clone(),
407 pool: pool.clone(),
408 };
409 future::ok::<_, Error>(Some(client))
410 }
411 Packet::Block(_) | Packet::ProfileInfo(_) | Packet::Progress(_) => {
412 future::ok::<_, Error>(acc)
413 }
414 Packet::Exception(mut exception, transport) => {
415 set_exception_handle(
416 &mut exception,
417 transport,
418 context.clone(),
419 pool.clone(),
420 );
421 future::err::<_, Error>(Error::Server(exception))
422 }
423 _ => future::err::<_, Error>(Error::Driver(DriverError::UnexpectedPacket)),
424 })
425 .map(Option::unwrap);
426
427 with_timeout(future, timeout)
428 });
429
430 Either::Right(fut)
431 }
432
433 pub fn insert<Q>(self, table: Q, block: Block) -> impl Future<Item = Self, Error = Error>
435 where
436 Query: From<Q>,
437 {
438 let mut names: Vec<_> = Vec::with_capacity(block.column_count());
439 for column in block.columns() {
440 names.push(try_opt!(column_name_to_string(column.name())));
441 }
442 let fields = names.join(", ");
443
444 let query = Query::from(table)
445 .map_sql(|table| format!("INSERT INTO {} ({}) VALUES", table, fields));
446
447 let context = self.context.clone();
448 let pool = self.pool.clone();
449 let timeout = try_opt!(context.options.get()).insert_timeout;
450
451 let fut = self.wrap_future(move |mut c| {
452 info!("[insert] {}", query.get_sql());
453
454 let future = c
455 .inner
456 .take()
457 .unwrap()
458 .call(Cmd::SendQuery(query, context.clone()))
459 .read_block(context.clone(), pool.clone())
460 .and_then(move |(mut c, b)| -> BoxFuture<Self> {
461 let dst_block = b.unwrap();
462
463 let casted_block = match block.cast_to(&dst_block) {
464 Ok(value) => value,
465 Err(err) => return Box::new(future::err::<Self, Error>(err)),
466 };
467
468 let send_cmd = Cmd::Union(
469 Box::new(Cmd::SendData(casted_block, context.clone())),
470 Box::new(Cmd::SendData(Block::default(), context.clone())),
471 );
472
473 Box::new(
474 c.inner
475 .take()
476 .unwrap()
477 .call(send_cmd)
478 .read_block(context, pool)
479 .map(|(c, _)| c),
480 )
481 });
482
483 with_timeout(future, timeout)
484 });
485
486 Either::Right(fut)
487 }
488
489 pub(crate) fn wrap_future<T, R, F>(self, f: F) -> impl Future<Item = T, Error = Error>
490 where
491 F: FnOnce(Self) -> R + Send + 'static,
492 R: Future<Item = T, Error = Error> + Send + 'static,
493 T: Send + 'static,
494 {
495 let ping_before_query = try_opt!(self.context.options.get()).ping_before_query;
496
497 let fut = if ping_before_query {
498 Either::Left(self.check_connection().and_then(move |c| Box::new(f(c))))
499 } else {
500 Either::Right(f(self))
501 };
502
503 Either::Right(fut)
504 }
505
506 pub(crate) fn wrap_stream<T, R, F>(self, f: F) -> BoxStream<T>
507 where
508 F: FnOnce(Self) -> R + Send + 'static,
509 R: Stream<Item = T, Error = Error> + Send + 'static,
510 T: Send + 'static,
511 {
512 let ping_before_query = match self.context.options.get() {
513 Ok(val) => val.ping_before_query,
514 Err(err) => return Box::new(stream::once(Err(err))),
515 };
516
517 if ping_before_query {
518 let fut = self
519 .check_connection()
520 .and_then(move |c| future::ok(Box::new(f(c))))
521 .flatten_stream();
522 Box::new(fut)
523 } else {
524 Box::new(f(self))
525 }
526 }
527
528 pub fn check_connection(mut self) -> impl Future<Item = Self, Error = Error> {
530 let pool: Option<Pool> = self.pool.clone().into();
531 self.pool.detach();
532
533 let source = self.context.options.clone();
534
535 let (send_retries, retry_timeout) = match source.get() {
536 Ok(val) => (val.send_retries, val.retry_timeout),
537 Err(err) => return Either::Left(future::err(err)),
538 };
539
540 let reconnect = move || -> BoxFuture<Self> {
541 warn!("[reconnect]");
542 match pool.clone() {
543 None => Box::new(Client::open(&source, None)),
544 Some(p) => Box::new(p.get_handle()),
545 }
546 };
547
548 let fut = RetryGuard::new(
549 self,
550 |c| Box::new(c.ping()),
551 reconnect,
552 send_retries,
553 retry_timeout,
554 )
555 .and_then(|mut c| {
556 if !c.pool.is_attached() && c.pool.is_some() {
557 c.pool.attach();
558 }
559 Ok(c)
560 });
561
562 Either::Right(fut)
563 }
564
565 pub(crate) fn set_inside(&self, value: bool) {
566 if let Some(ref inner) = self.inner {
567 inner.set_inside(value);
568 } else {
569 unreachable!()
570 }
571 }
572}
573
574fn column_name_to_string(name: &str) -> Result<String, Error> {
575 if name.chars().all(|ch| ch.is_alphanumeric()) {
576 return Ok(name.to_string());
577 }
578
579 if name.chars().any(|ch| ch == '`') {
580 return Err(Error::Other("Column name shouldn't contains backticks.".into()));
581 }
582
583 Ok(format!("`{}`", name))
584}
585
586pub(crate) fn with_timeout<F>(
587 f: F,
588 timeout: Option<Duration>,
589) -> impl Future<Item = F::Item, Error = Error>
590where
591 F: Future<Error = Error> + Send + 'static,
592{
593 if let Some(timeout) = timeout {
594 Either::Left(f.timeout(timeout).map_err(|err| err.into()))
595 } else {
596 Either::Right(f)
597 }
598}
599
600#[cfg(test)]
601mod test_misc {
602 use crate::*;
603 use std::env;
604
605 lazy_static! {
606 pub static ref DATABASE_URL: String = env::var("DATABASE_URL").unwrap_or_else(|_| {
607 "tcp://localhost:9000?compression=lz4&ping_timeout=5s&retry_timeout=5s".into()
608 });
609 }
610
611 #[test]
612 fn test_column_name_to_string() {
613 assert_eq!(column_name_to_string("id").unwrap(), "id");
614 assert_eq!(column_name_to_string("ns:attr").unwrap(), "`ns:attr`");
615 assert!(column_name_to_string("`").is_err());
616 }
617}