1use std::fmt;
2
3use crate::{
4 block::Block,
5 error::{Error, Result},
6 pool::{Pool, PoolBinding, PoolConfig},
7 query::{block_stream::BlockStream, *},
8 stream::ConnectingStream,
9 transport::ClickhouseTransport,
10 types::{Cmd, Packet, ServerInfo},
11};
12use futures_core::{future::BoxFuture, stream::BoxStream};
13use futures_util::{FutureExt, StreamExt};
14use log::{info, warn};
15
16const MAX_RETRY_ATTEMTS: usize = 3;
18
19pub(crate) const PING_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
20pub(crate) const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
22
23pub struct Client {
24 _private: std::marker::PhantomData<()>,
25}
26
27#[derive(Clone)]
28pub(crate) struct Context {
29 pub(crate) server_info: ServerInfo,
30 pub(crate) hostname: String,
31 pub(crate) config: PoolConfig,
32}
33
34impl Default for Context {
35 fn default() -> Self {
36 Self {
37 server_info: ServerInfo::default(),
38 hostname: hostname::get().unwrap().into_string().unwrap(),
39 config: PoolConfig::default(),
40 }
41 }
42}
43
44pub struct ClientHandle {
46 pub(crate) inner: Option<ClickhouseTransport>,
47 pub(crate) context: Context,
48 pub(crate) pool: PoolBinding,
49}
50
51impl fmt::Debug for ClientHandle {
52 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
53 f.debug_struct("ClientHandle")
54 .field("server_info", &self.context.server_info)
55 .finish()
56 }
57}
58
59impl Client {
60 pub(crate) async fn open(config: PoolConfig, pool: Option<Pool>) -> Result<ClientHandle> {
61 let timeout = match config.connection_timeout {
62 Some(timeout) => timeout,
63 None => {
64 return Err(Error::Other(
65 "Connection timeout was not set on `PoolConfig`".into(),
66 ))
67 }
68 };
69
70 let context = Context {
71 config: config.clone(),
72 ..Default::default()
73 };
74
75 with_timeout(
76 async move {
77 let addr = match &pool {
78 None => &config.addr,
79 Some(p) => p.get_addr(),
80 };
81
82 info!("try to connect to {}", addr);
83 if addr.port() == Some(8123) {
84 warn!("You should use port 9000 instead of 8123 because clickhouse-rs work through the binary interface.");
85 }
86 let mut stream = ConnectingStream::new(addr, &config).await?;
87 stream.set_nodelay(true)?;
88
89 let transport = ClickhouseTransport::new(stream, pool.clone());
90
91 let mut handle = ClientHandle {
92 inner: Some(transport),
93 pool: match pool {
94 None => PoolBinding::None,
95 Some(p) => PoolBinding::Detached(p),
96 },
97 context
98 };
99
100 handle.hello().await?;
101 Ok(handle)
102 },
103 timeout,
104 )
105 .await
106 }
107}
108
109impl ClientHandle {
110 async fn hello(&mut self) -> Result<()> {
111 let context = self.context.clone();
112 info!(
113 "[hello] -> \n{:?}\n{:?}\n{:?}\n{:?}\n{:?}",
114 &context.hostname,
115 &context.server_info,
116 &context.config.addr.host(),
117 &context.config.database,
118 &context.config.username,
119 );
120
121 let mut h = None;
122 let mut info = None;
123 let mut stream = self.inner.take().unwrap().call(Cmd::Hello(context.clone()));
124
125 while let Some(packet) = stream.next().await {
126 match packet {
127 Ok(Packet::Hello(inner, server_info)) => {
128 info!("[hello] <- {:?}", &server_info);
129 h = Some(inner);
130 info = Some(server_info);
131 }
132 Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
133 Err(e) => return Err(Error::IO(e)),
134 _ => {}
135 }
136 }
137
138 self.inner = h;
139 self.context.server_info = info.unwrap();
140 Ok(())
141 }
142
143 async fn ping(&mut self) -> Result<()> {
144 with_timeout(
145 async move {
146 info!("[ping]");
147
148 let mut h = None;
149
150 let transport = self.inner.take().unwrap().clear().await?;
151 let mut stream = transport.call(Cmd::Ping);
152
153 while let Some(packet) = stream.next().await {
154 match packet {
155 Ok(Packet::Pong(inner)) => {
156 info!("[pong]");
157 h = Some(inner);
158 }
159 Ok(Packet::Exception(e)) => return Err(Error::Server(e)),
160 Err(e) => return Err(Error::IO(e)),
161 _ => {}
162 }
163 }
164
165 self.inner = h;
166 Ok(())
167 },
168 PING_TIMEOUT,
169 )
170 .await
171 }
172
173 pub fn query<Q>(&mut self, sql: Q) -> QueryResult
175 where
176 Query: From<Q>,
177 {
178 let query = Query::from(sql);
179 QueryResult {
180 client: self,
181 query,
182 }
183 }
184
185 pub(crate) fn wrap_stream<'a, F>(&'a mut self, f: F) -> BoxStream<'a, Result<Block>>
186 where
187 F: (FnOnce(&'a mut Self) -> BlockStream<'a>) + Send + 'static,
188 {
189 let fut: BoxFuture<'a, BoxStream<'a, Result<Block>>> = Box::pin(async move {
190 let inner: BoxStream<'a, Result<Block>> = match self.check_connection().await {
191 Ok(_) => Box::pin(f(self)),
192 Err(err) => Box::pin(futures_util::stream::once(futures_util::future::err(err))),
193 };
194 inner
195 });
196
197 Box::pin(fut.flatten_stream())
198 }
199
200 async fn check_connection(&mut self) -> Result<()> {
202 self.pool.detach();
203
204 let source = self.context.config.clone();
205 let pool = self.pool.clone();
206
207 retry(self, &source, pool.into()).await?;
208
209 if !self.pool.is_attached() && self.pool.is_some() {
210 self.pool.attach();
211 }
212
213 Ok(())
214 }
215
216 pub(crate) fn set_inside(&self, value: bool) {
218 if let Some(ref inner) = self.inner {
219 inner.set_inside(value);
220 } else {
221 unreachable!()
222 }
223 }
224}
225
226pub(crate) async fn retry(
227 handle: &mut ClientHandle,
228 source: &PoolConfig,
229 pool: Option<Pool>,
230) -> Result<()> {
231 let mut attempt = 0;
232 let mut skip_check = false;
233
234 loop {
235 if skip_check {
236 skip_check = false;
237 } else {
238 match handle.ping().await {
239 Ok(()) => return Ok(()),
240 Err(err) => {
241 if attempt >= MAX_RETRY_ATTEMTS {
242 return Err(err);
243 }
244 }
245 }
246 }
247
248 match reconnect(handle, source, pool.clone()).await {
249 Ok(()) => continue,
250 Err(err) => {
251 skip_check = true;
252 if attempt >= MAX_RETRY_ATTEMTS {
253 return Err(err);
254 }
255
256 tokio::time::sleep(RETRY_TIMEOUT).await;
257 }
258 }
259
260 attempt += 1;
261 }
262}
263
264async fn reconnect(conn: &mut ClientHandle, source: &PoolConfig, pool: Option<Pool>) -> Result<()> {
265 warn!("[reconnect]");
266 let mut new_conn = match pool {
267 None => Client::open(source.clone(), pool).await?,
268 Some(p) => p.get_handle().await?,
269 };
270 std::mem::swap(conn, &mut new_conn);
271 Ok(())
272}
273
274pub(crate) async fn with_timeout<F, T>(future: F, timeout: std::time::Duration) -> F::Output
275where
276 F: std::future::Future<Output = crate::error::Result<T>>,
277{
278 tokio::time::timeout(timeout, future).await?
279}