1#![recursion_limit = "1024"]
111
112#[cfg(all(feature = "tls-native-tls", feature = "tls-rustls"))]
113compile_error!("tls-native-tls and tls-rustls are mutually exclusive and cannot be enabled together");
114
115use std::{fmt, future::Future, time::Duration};
116
117use futures_util::{
118 future, future::BoxFuture, future::FutureExt, stream, stream::BoxStream, StreamExt,
119};
120use log::{info, warn};
121
122use crate::{
123 connecting_stream::ConnectingStream,
124 errors::{DriverError, Error, Result},
125 io::ClickhouseTransport,
126 pool::PoolBinding,
127 retry_guard::retry_guard,
128 types::{
129 block::{ChunkIterator, INSERT_BLOCK_SIZE},
130 query_result::stream_blocks::BlockStream,
131 Cmd, Context, IntoOptions, OptionsSource, Packet, Query, QueryResult, SqlType,
132 },
133};
134pub use crate::{
135 errors::ConnectionError,
136 pool::Pool,
137 types::{block::Block, Options, Simple},
138};
139
140mod binary;
141mod client_info;
142mod connecting_stream;
143pub mod errors;
145mod io;
146pub mod pool;
148mod retry_guard;
149pub mod types;
151
152#[macro_export]
203macro_rules! row {
204 () => { $crate::types::RNil };
205 ( $i:ident, $($tail:tt)* ) => {
206 row!( $($tail)* ).put(stringify!($i).into(), $i.into())
207 };
208 ( $i:ident ) => { row!($i: $i) };
209
210 ( $k:ident: $v:expr ) => {
211 $crate::types::RNil.put(stringify!($k).into(), $v.into())
212 };
213
214 ( $k:ident: $v:expr, $($tail:tt)* ) => {
215 row!( $($tail)* ).put(stringify!($k).into(), $v.into())
216 };
217
218 ( $k:expr => $v:expr ) => {
219 $crate::types::RNil.put($k.into(), $v.into())
220 };
221
222 ( $k:expr => $v:expr, $($tail:tt)* ) => {
223 row!( $($tail)* ).put($k.into(), $v.into())
224 };
225}
226
227#[macro_export]
228macro_rules! try_opt {
229 ($expr:expr) => {
230 match $expr {
231 Ok(val) => val,
232 Err(err) => return Err(err),
233 }
234 };
235}
236
237#[doc(hidden)]
238pub struct Client {
239 _private: (),
240}
241
242pub struct ClientHandle {
244 inner: Option<ClickhouseTransport>,
245 context: Context,
246 pool: PoolBinding,
247}
248
249impl fmt::Debug for ClientHandle {
250 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251 f.debug_struct("ClientHandle")
252 .field("server_info", &self.context.server_info)
253 .finish()
254 }
255}
256
257impl Client {
258 #[deprecated(since = "0.1.4", note = "please use Pool to connect")]
259 pub async fn connect(options: Options) -> Result<ClientHandle> {
260 let source = options.into_options_src();
261 Self::open(source, None).await
262 }
263
264 pub(crate) async fn open(source: OptionsSource, pool: Option<Pool>) -> Result<ClientHandle> {
265 let options = try_opt!(source.get());
266 let compress = options.compression;
267 let timeout = options.connection_timeout;
268
269 let context = Context {
270 options: source.clone(),
271 ..Context::default()
272 };
273
274 with_timeout(
275 async move {
276 let addr = match &pool {
277 None => &options.addr,
278 Some(p) => p.get_addr(),
279 };
280
281 info!("try to connect to {}", addr);
282 if addr.port() == Some(8123) {
283 warn!("You should use port 9000 instead of 8123 because clickhouse-rs work through the binary interface.");
284 }
285 let mut stream = ConnectingStream::new(addr, &options).await?;
286 stream.set_nodelay(options.nodelay)?;
287 stream.set_keepalive(options.keepalive)?;
288
289 let transport = ClickhouseTransport::new(stream, compress, pool.clone());
290 let mut handle = ClientHandle {
291 inner: Some(transport),
292 context,
293 pool: match pool {
294 None => PoolBinding::None,
295 Some(p) => PoolBinding::Detached(p),
296 },
297 };
298
299 handle.hello().await?;
300 Ok(handle)
301 },
302 timeout,
303 )
304 .await
305 }
306}
307
308impl ClientHandle {
309 pub(crate) async fn hello(&mut self) -> Result<()> {
310 let context = self.context.clone();
311 info!("[hello] -> {:?}", &context);
312
313 let mut h = None;
314 let mut info = None;
315 let mut stream = self.inner.take().unwrap().call(Cmd::Hello(context.clone()));
316
317 while let Some(packet) = stream.next().await {
318 match packet {
319 Ok(Packet::Hello(inner, server_info)) => {
320 info!("[hello] <- {:?}", &server_info);
321 h = Some(inner);
322 info = Some(server_info);
323 }
324 Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
325 Err(e) => return Err(Error::Io(e)),
326 _ => return Err(Error::Driver(DriverError::UnexpectedPacket)),
327 }
328 }
329
330 self.inner = h;
331 self.context.server_info = info.unwrap();
332 Ok(())
333 }
334
335 pub async fn ping(&mut self) -> Result<()> {
336 let timeout = try_opt!(self.context.options.get()).ping_timeout;
337
338 with_timeout(
339 async move {
340 info!("[ping]");
341
342 let mut h = None;
343
344 let transport = self.get_inner()?.clear().await?;
345 let mut stream = transport.call(Cmd::Ping);
346
347 while let Some(packet) = stream.next().await {
348 match packet {
349 Ok(Packet::Pong(inner)) => {
350 info!("[pong]");
351 h = Some(inner);
352 }
353 Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
354 Err(e) => return Err(Error::Io(e)),
355 _ => return Err(Error::Driver(DriverError::UnexpectedPacket)),
356 }
357 }
358
359 match h {
360 None => Err(Error::Connection(ConnectionError::Broken)),
361 Some(h) => {
362 self.inner = Some(h);
363 Ok(())
364 }
365 }
366 },
367 timeout,
368 )
369 .await
370 }
371
372 pub fn query<Q>(&mut self, sql: Q) -> QueryResult
374 where
375 Query: From<Q>,
376 {
377 let query = Query::from(sql);
378 QueryResult {
379 client: self,
380 query,
381 }
382 }
383
384 pub async fn execute<Q>(&mut self, sql: Q) -> Result<()>
386 where
387 Query: From<Q>,
388 {
389 let transport = self.execute_(sql).await?;
390 self.inner = Some(transport);
391 Ok(())
392 }
393
394 async fn execute_<Q>(&mut self, sql: Q) -> Result<ClickhouseTransport>
395 where
396 Query: From<Q>,
397 {
398 let timeout = try_opt!(self.context.options.get())
399 .execute_timeout
400 .unwrap_or_else(|| Duration::from_secs(0));
401 let context = self.context.clone();
402 let query = Query::from(sql);
403 with_timeout(
404 async {
405 self.wrap_future(move |c| {
406 info!("[execute query] {}", query.get_sql());
407
408 let transport = c.get_inner();
409
410 async move {
411 let transport = transport?;
412 let mut h = None;
413
414 let transport = transport.clear().await?;
415 let mut stream = transport.call(Cmd::SendQuery(query, context.clone()));
416
417 while let Some(packet) = stream.next().await {
418 match packet {
419 Ok(Packet::Eof(inner)) => h = Some(inner),
420 Ok(Packet::Block(_))
421 | Ok(Packet::ProfileInfo(_))
422 | Ok(Packet::Progress(_)) => (),
423 Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
424 Err(e) => return Err(Error::Io(e)),
425 _ => return Err(Error::Driver(DriverError::UnexpectedPacket)),
426 }
427 }
428
429 Ok(h.unwrap())
430 }
431 })
432 .await
433 },
434 timeout,
435 )
436 .await
437 }
438
439 pub async fn insert<Q, B>(&mut self, table: Q, block: B) -> Result<()>
441 where
442 Query: From<Q>,
443 B: AsRef<Block>,
444 {
445 let query = Self::make_query(table, block.as_ref())?;
446 let transport = self.insert_(query.clone(), block.as_ref()).await?;
447 self.inner = Some(transport);
448 Ok(())
449 }
450
451 async fn insert_(&mut self, query: Query, block: &Block) -> Result<ClickhouseTransport> {
452 let timeout = try_opt!(self.context.options.get())
453 .insert_timeout
454 .unwrap_or_else(|| Duration::from_secs(0));
455
456 let context = self.context.clone();
457
458 with_timeout(
459 async {
460 self.wrap_future(move |c| {
461 info!("[insert] {}", query.get_sql());
462 let transport = c.get_inner();
463
464 async move {
465 let transport = transport?.clear().await?;
466 let (transport, dst_block) =
467 Self::send_insert_query_(transport, context.clone(), query.clone())
468 .await?;
469 let casted_block = block.cast_to(&dst_block)?;
470 let mut chunks = casted_block.chunks(INSERT_BLOCK_SIZE);
471 let transport =
472 Self::insert_block_(transport, context.clone(), chunks.next().unwrap())
473 .await?;
474 Self::insert_tail_(transport, context, query, chunks).await
475 }
476 })
477 .await
478 },
479 timeout,
480 )
481 .await
482 }
483
484 async fn insert_tail_(
485 mut transport: ClickhouseTransport,
486 context: Context,
487 query: Query,
488 chunks: ChunkIterator<Simple>,
489 ) -> Result<ClickhouseTransport> {
490 for chunk in chunks {
491 let (transport_, _) =
492 Self::send_insert_query_(transport, context.clone(), query.clone()).await?;
493 transport = Self::insert_block_(transport_, context.clone(), chunk).await?;
494 }
495 Ok(transport)
496 }
497
498 async fn send_insert_query_(
499 transport: ClickhouseTransport,
500 context: Context,
501 query: Query,
502 ) -> Result<(ClickhouseTransport, Block)> {
503 let stream = transport.call(Cmd::SendQuery(query, context));
504 let (transport, b) = stream.read_block().await?;
505 let dst_block = b.unwrap();
506 Ok((transport, dst_block))
507 }
508
509 async fn insert_block_(
510 transport: ClickhouseTransport,
511 context: Context,
512 block: Block,
513 ) -> Result<ClickhouseTransport> {
514 let send_cmd = Cmd::Union(
515 Box::new(Cmd::SendData(block, context.clone())),
516 Box::new(Cmd::SendData(Block::default(), context)),
517 );
518 let (transport, _) = transport.call(send_cmd).read_block().await?;
519 Ok(transport)
520 }
521
522 fn make_query<Q>(table: Q, block: &Block) -> Result<Query>
523 where
524 Query: From<Q>,
525 {
526 let mut names: Vec<_> = Vec::with_capacity(block.as_ref().column_count());
527 for column in block.as_ref().columns() {
528 names.push(try_opt!(column_name_to_string(column.name())));
529 }
530 let fields = names.join(", ");
531 Ok(Query::from(table).map_sql(|table| format!("INSERT INTO {table} ({fields}) VALUES")))
532 }
533
534 pub(crate) async fn wrap_future<T, R, F>(&mut self, f: F) -> Result<T>
535 where
536 F: FnOnce(&mut Self) -> R + Send,
537 R: Future<Output = Result<T>>,
538 T: 'static,
539 {
540 let ping_before_query = try_opt!(self.context.options.get()).ping_before_query;
541
542 if ping_before_query {
543 self.check_connection().await?;
544 }
545 f(self).await
546 }
547
548 pub(crate) fn wrap_stream<'a, F>(&'a mut self, f: F) -> BoxStream<'a, Result<Block>>
549 where
550 F: (FnOnce(&'a mut Self) -> Result<BlockStream<'a>>) + Send + 'static,
551 {
552 let ping_before_query = match self.context.options.get() {
553 Ok(val) => val.ping_before_query,
554 Err(err) => return Box::pin(stream::once(future::err(err))),
555 };
556
557 if ping_before_query {
558 let fut: BoxFuture<'a, BoxStream<'a, Result<Block>>> = Box::pin(async move {
559 let inner: BoxStream<'a, Result<Block>> = match self.check_connection().await {
560 Ok(_) => match f(self) {
561 Ok(s) => Box::pin(s),
562 Err(err) => Box::pin(stream::once(future::err(err))),
563 },
564 Err(err) => Box::pin(stream::once(future::err(err))),
565 };
566 inner
567 });
568
569 Box::pin(fut.flatten_stream())
570 } else {
571 match f(self) {
572 Ok(s) => Box::pin(s),
573 Err(err) => Box::pin(stream::once(future::err(err))),
574 }
575 }
576 }
577
578 pub async fn check_connection(&mut self) -> Result<()> {
580 self.pool.detach();
581
582 let source = self.context.options.clone();
583 let pool = self.pool.clone();
584
585 let (send_retries, retry_timeout) = {
586 let options = try_opt!(source.get());
587 (options.send_retries, options.retry_timeout)
588 };
589
590 retry_guard(self, &source, pool.into(), send_retries, retry_timeout).await?;
591
592 if !self.pool.is_attached() && self.pool.is_some() {
593 self.pool.attach();
594 }
595
596 Ok(())
597 }
598
599 pub(crate) fn set_inside(&self, value: bool) {
600 if let Some(ref inner) = self.inner {
601 inner.set_inside(value);
602 } else {
603 unreachable!()
604 }
605 }
606
607 fn get_inner(&mut self) -> Result<ClickhouseTransport> {
608 self.inner
609 .take()
610 .ok_or_else(|| Error::Connection(ConnectionError::Broken))
611 }
612}
613
614fn column_name_to_string(name: &str) -> Result<String> {
615 if name.chars().all(|ch| ch.is_numeric()) {
616 return Ok(name.to_string());
617 }
618
619 if name.chars().any(|ch| ch == '`') {
620 let err = format!("Column name {name:?} shouldn't contains backticks.");
621 return Err(Error::Other(err.into()));
622 }
623
624 Ok(format!("`{name}`"))
625}
626
627#[cfg(feature = "async_std")]
628async fn with_timeout<F, T>(future: F, duration: Duration) -> F::Output
629where
630 F: Future<Output = Result<T>>,
631{
632 use async_std::io;
633 use futures_util::future::TryFutureExt;
634
635 io::timeout(duration, future.map_err(Into::into))
636 .map_err(Into::into)
637 .await
638}
639
640#[cfg(not(feature = "async_std"))]
641async fn with_timeout<F, T>(future: F, timeout: Duration) -> F::Output
642where
643 F: Future<Output = Result<T>>,
644{
645 tokio::time::timeout(timeout, future).await?
646}
647
648#[cfg(test)]
649pub(crate) mod test_misc {
650 use crate::*;
651 use std::env;
652
653 use lazy_static::lazy_static;
654
655 lazy_static! {
656 pub static ref DATABASE_URL: String = env::var("DATABASE_URL").unwrap_or_else(|_| {
657 "tcp://localhost:9000?compression=lz4&ping_timeout=1s&retry_timeout=2s".into()
658 });
659 }
660
661 #[test]
662 fn test_column_name_to_string() {
663 assert_eq!(column_name_to_string("id").unwrap(), "`id`");
664 assert_eq!(column_name_to_string("234").unwrap(), "234");
665 assert_eq!(column_name_to_string("ns:attr").unwrap(), "`ns:attr`");
666 assert!(column_name_to_string("`").is_err());
667 }
668}