clickhouse_readonly/
client.rs

1use std::fmt;
2
3use crate::{
4    block::Block,
5    error::{Error, Result},
6    pool::{Pool, PoolBinding, PoolConfig},
7    query::{block_stream::BlockStream, *},
8    stream::ConnectingStream,
9    transport::ClickhouseTransport,
10    types::{Cmd, Packet, ServerInfo},
11};
12use futures_core::{future::BoxFuture, stream::BoxStream};
13use futures_util::{FutureExt, StreamExt};
14use log::{info, warn};
15
16/// Retry guard max attempts
17const MAX_RETRY_ATTEMTS: usize = 3;
18
19pub(crate) const PING_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
20/// Retry guard timeout between attempts
21pub(crate) const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
22
23pub struct Client {
24    _private: std::marker::PhantomData<()>,
25}
26
27#[derive(Clone)]
28pub(crate) struct Context {
29    pub(crate) server_info: ServerInfo,
30    pub(crate) hostname: String,
31    pub(crate) config: PoolConfig,
32}
33
34impl Default for Context {
35    fn default() -> Self {
36        Self {
37            server_info: ServerInfo::default(),
38            hostname: hostname::get().unwrap().into_string().unwrap(),
39            config: PoolConfig::default(),
40        }
41    }
42}
43
44/// Clickhouse client handle.
45pub struct ClientHandle {
46    pub(crate) inner: Option<ClickhouseTransport>,
47    pub(crate) context: Context,
48    pub(crate) pool: PoolBinding,
49}
50
51impl fmt::Debug for ClientHandle {
52    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
53        f.debug_struct("ClientHandle")
54            .field("server_info", &self.context.server_info)
55            .finish()
56    }
57}
58
59impl Client {
60    pub(crate) async fn open(config: PoolConfig, pool: Option<Pool>) -> Result<ClientHandle> {
61        let timeout = match config.connection_timeout {
62            Some(timeout) => timeout,
63            None => {
64                return Err(Error::Other(
65                    "Connection timeout was not set on `PoolConfig`".into(),
66                ))
67            }
68        };
69
70        let context = Context {
71            config: config.clone(),
72            ..Default::default()
73        };
74
75        with_timeout(
76            async move {
77                let addr = match &pool {
78                    None => &config.addr,
79                    Some(p) => p.get_addr(),
80                };
81
82                info!("try to connect to {}", addr);
83                if addr.port() == Some(8123) {
84                    warn!("You should use port 9000 instead of 8123 because clickhouse-rs work through the binary interface.");
85                }
86                let mut stream = ConnectingStream::new(addr, &config).await?;
87                stream.set_nodelay(true)?;
88
89                let transport = ClickhouseTransport::new(stream, pool.clone());
90
91                let mut handle = ClientHandle {
92                    inner: Some(transport),
93                    pool: match pool {
94                        None => PoolBinding::None,
95                        Some(p) => PoolBinding::Detached(p),
96                    },
97                    context
98                };
99
100                handle.hello().await?;
101                Ok(handle)
102            },
103            timeout,
104        )
105        .await
106    }
107}
108
109impl ClientHandle {
110    async fn hello(&mut self) -> Result<()> {
111        let context = self.context.clone();
112        info!(
113            "[hello] -> \n{:?}\n{:?}\n{:?}\n{:?}\n{:?}",
114            &context.hostname,
115            &context.server_info,
116            &context.config.addr.host(),
117            &context.config.database,
118            &context.config.username,
119        );
120
121        let mut h = None;
122        let mut info = None;
123        let mut stream = self.inner.take().unwrap().call(Cmd::Hello(context.clone()));
124
125        while let Some(packet) = stream.next().await {
126            match packet {
127                Ok(Packet::Hello(inner, server_info)) => {
128                    info!("[hello] <- {:?}", &server_info);
129                    h = Some(inner);
130                    info = Some(server_info);
131                }
132                Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
133                Err(e) => return Err(Error::IO(e)),
134                _ => {}
135            }
136        }
137
138        self.inner = h;
139        self.context.server_info = info.unwrap();
140        Ok(())
141    }
142
143    async fn ping(&mut self) -> Result<()> {
144        with_timeout(
145            async move {
146                info!("[ping]");
147
148                let mut h = None;
149
150                let transport = self.inner.take().unwrap().clear().await?;
151                let mut stream = transport.call(Cmd::Ping);
152
153                while let Some(packet) = stream.next().await {
154                    match packet {
155                        Ok(Packet::Pong(inner)) => {
156                            info!("[pong]");
157                            h = Some(inner);
158                        }
159                        Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
160                        Err(e) => return Err(Error::IO(e)),
161                        _ => {}
162                    }
163                }
164
165                self.inner = h;
166                Ok(())
167            },
168            PING_TIMEOUT,
169        )
170        .await
171    }
172
173    /// Executes Clickhouse `query` on Conn.
174    pub fn query<Q>(&mut self, sql: Q) -> QueryResult
175    where
176        Query: From<Q>,
177    {
178        let query = Query::from(sql);
179        QueryResult {
180            client: self,
181            query,
182        }
183    }
184
185    pub(crate) fn wrap_stream<'a, F>(&'a mut self, f: F) -> BoxStream<'a, Result<Block>>
186    where
187        F: (FnOnce(&'a mut Self) -> BlockStream<'a>) + Send + 'static,
188    {
189        let fut: BoxFuture<'a, BoxStream<'a, Result<Block>>> = Box::pin(async move {
190            let inner: BoxStream<'a, Result<Block>> = match self.check_connection().await {
191                Ok(_) => Box::pin(f(self)),
192                Err(err) => Box::pin(futures_util::stream::once(futures_util::future::err(err))),
193            };
194            inner
195        });
196
197        Box::pin(fut.flatten_stream())
198    }
199
200    /// Check connection and try to reconnect if necessary.
201    async fn check_connection(&mut self) -> Result<()> {
202        self.pool.detach();
203
204        let source = self.context.config.clone();
205        let pool = self.pool.clone();
206
207        retry(self, &source, pool.into()).await?;
208
209        if !self.pool.is_attached() && self.pool.is_some() {
210            self.pool.attach();
211        }
212
213        Ok(())
214    }
215
216    /// Switch Transport AtomicUsize status on takeing/returning connection
217    pub(crate) fn set_inside(&self, value: bool) {
218        if let Some(ref inner) = self.inner {
219            inner.set_inside(value);
220        } else {
221            unreachable!()
222        }
223    }
224}
225
226pub(crate) async fn retry(
227    handle: &mut ClientHandle,
228    source: &PoolConfig,
229    pool: Option<Pool>,
230) -> Result<()> {
231    let mut attempt = 0;
232    let mut skip_check = false;
233
234    loop {
235        if skip_check {
236            skip_check = false;
237        } else {
238            match handle.ping().await {
239                Ok(()) => return Ok(()),
240                Err(err) => {
241                    if attempt >= MAX_RETRY_ATTEMTS {
242                        return Err(err);
243                    }
244                }
245            }
246        }
247
248        match reconnect(handle, source, pool.clone()).await {
249            Ok(()) => continue,
250            Err(err) => {
251                skip_check = true;
252                if attempt >= MAX_RETRY_ATTEMTS {
253                    return Err(err);
254                }
255
256                tokio::time::sleep(RETRY_TIMEOUT).await;
257            }
258        }
259
260        attempt += 1;
261    }
262}
263
264async fn reconnect(conn: &mut ClientHandle, source: &PoolConfig, pool: Option<Pool>) -> Result<()> {
265    warn!("[reconnect]");
266    let mut new_conn = match pool {
267        None => Client::open(source.clone(), pool).await?,
268        Some(p) => p.get_handle().await?,
269    };
270    std::mem::swap(conn, &mut new_conn);
271    Ok(())
272}
273
274pub(crate) async fn with_timeout<F, T>(future: F, timeout: std::time::Duration) -> F::Output
275where
276    F: std::future::Future<Output = crate::error::Result<T>>,
277{
278    tokio::time::timeout(timeout, future).await?
279}