clickhouse_readonly/pool/
mod.rs

1use std::{
2    fmt, mem,
3    pin::Pin,
4    sync::atomic::{self, Ordering},
5    sync::Arc,
6    task::{Context, Poll, Waker},
7    time::Duration,
8};
9
10use futures_util::future::BoxFuture;
11
12use crate::{
13    client::{Client, ClientHandle},
14    error::Result,
15};
16
17pub use self::futures::GetHandle;
18use futures_util::FutureExt;
19use url::Url;
20
21mod futures;
22
23/// Default connection timeout
24const CONN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
25/// Default connection timeout
26const QUERY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
27
28#[derive(Debug, Clone)]
29pub struct PoolConfig {
30    pub(crate) addr: Url,
31    pub(crate) database: String,
32    pub(crate) username: String,
33    pub(crate) password: String,
34    pub(crate) connection_timeout: Option<Duration>,
35    pub(crate) query_timeout: Option<Duration>,
36    pub(crate) secure: bool,
37}
38
39pub struct PoolConfigBuilder(PoolConfig);
40
41impl PoolConfigBuilder {
42    pub fn new(
43        addr: Url,
44        database: String,
45        username: String,
46        password: String,
47        secure: bool,
48    ) -> Self {
49        Self(PoolConfig {
50            addr,
51            database,
52            username,
53            password,
54            connection_timeout: None,
55            query_timeout: None,
56            secure,
57        })
58    }
59
60    pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
61        self.0.connection_timeout = Some(timeout);
62        self
63    }
64
65    pub fn with_query_timeout(mut self, timeout: Duration) -> Self {
66        self.0.query_timeout = Some(timeout);
67        self
68    }
69
70    pub fn build(mut self) -> PoolConfig {
71        if self.0.connection_timeout.is_none() {
72            self.0.connection_timeout = Some(CONN_TIMEOUT)
73        }
74
75        if self.0.query_timeout.is_none() {
76            self.0.query_timeout = Some(QUERY_TIMEOUT)
77        }
78
79        self.0
80    }
81}
82
83impl Default for PoolConfig {
84    fn default() -> Self {
85        Self {
86            addr: Url::parse("tcp://127.0.0.1:9000").unwrap(),
87            database: "default".to_string(),
88            username: Default::default(),
89            password: Default::default(),
90            connection_timeout: Some(CONN_TIMEOUT),
91            query_timeout: Some(QUERY_TIMEOUT),
92            secure: false,
93        }
94    }
95}
96
97#[derive(Debug)]
98pub(crate) struct Inner {
99    new: crossbeam::queue::ArrayQueue<BoxFuture<'static, Result<ClientHandle>>>,
100    idle: crossbeam::queue::ArrayQueue<ClientHandle>,
101    tasks: crossbeam::queue::SegQueue<Waker>,
102    ongoing: atomic::AtomicUsize,
103    hosts: Vec<Url>,
104    connections_num: atomic::AtomicUsize,
105}
106
107impl Inner {
108    pub(crate) fn release_conn(&self) {
109        self.ongoing.fetch_sub(1, Ordering::AcqRel);
110        while let Some(task) = self.tasks.pop() {
111            task.wake()
112        }
113    }
114
115    fn conn_count(&self) -> usize {
116        let is_new_some = self.new.len();
117        let ongoing = self.ongoing.load(Ordering::Acquire);
118        let idle_count = self.idle.len();
119        is_new_some + idle_count + ongoing
120    }
121}
122
123#[derive(Clone)]
124pub(crate) enum PoolBinding {
125    None,
126    Attached(Pool),
127    Detached(Pool),
128}
129
130impl From<PoolBinding> for Option<Pool> {
131    fn from(binding: PoolBinding) -> Self {
132        match binding {
133            PoolBinding::None => None,
134            PoolBinding::Attached(pool) | PoolBinding::Detached(pool) => Some(pool),
135        }
136    }
137}
138
139impl PoolBinding {
140    pub(crate) fn take(&mut self) -> Self {
141        mem::replace(self, PoolBinding::None)
142    }
143
144    fn return_conn(self, client: ClientHandle) {
145        if let Some(mut pool) = self.into() {
146            Pool::return_conn(&mut pool, client);
147        }
148    }
149
150    pub(crate) fn is_attached(&self) -> bool {
151        matches!(self, PoolBinding::Attached(_))
152    }
153
154    pub(crate) fn is_some(&self) -> bool {
155        !matches!(self, PoolBinding::None)
156    }
157
158    pub(crate) fn attach(&mut self) {
159        match self.take() {
160            PoolBinding::Detached(pool) => *self = PoolBinding::Attached(pool),
161            _ => unreachable!(),
162        }
163    }
164
165    pub(crate) fn detach(&mut self) {
166        match self.take() {
167            PoolBinding::Attached(pool) => *self = PoolBinding::Detached(pool),
168            _ => unreachable!(),
169        }
170    }
171}
172
173/// Asynchronous pool of Clickhouse connections.
174#[derive(Clone)]
175pub struct Pool {
176    pub(crate) config: PoolConfig,
177    pub(crate) inner: Arc<Inner>,
178    min: usize,
179    max: usize,
180}
181
182#[derive(Debug)]
183struct PoolInfo {
184    new_len: usize,
185    idle_len: usize,
186    tasks_len: usize,
187    ongoing: usize,
188}
189
190impl fmt::Debug for Pool {
191    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192        let info = self.info();
193        f.debug_struct("Pool")
194            .field("min", &self.min)
195            .field("max", &self.max)
196            .field("new connections count", &info.new_len)
197            .field("idle connections count", &info.idle_len)
198            .field("tasks count", &info.tasks_len)
199            .field("ongoing connections count", &info.ongoing)
200            .finish()
201    }
202}
203
204impl Pool {
205    /// Constructs a new Pool.
206    pub fn new(config: PoolConfig) -> Self {
207        let min = 5;
208        let max = 10;
209        let hosts = vec![config.addr.clone()];
210
211        let inner = Arc::new(Inner {
212            new: crossbeam::queue::ArrayQueue::new(1),
213            idle: crossbeam::queue::ArrayQueue::new(max),
214            tasks: crossbeam::queue::SegQueue::new(),
215            ongoing: atomic::AtomicUsize::new(0),
216            connections_num: atomic::AtomicUsize::new(0),
217            hosts,
218        });
219
220        Self {
221            config,
222            inner,
223            min,
224            max,
225        }
226    }
227
228    fn info(&self) -> PoolInfo {
229        PoolInfo {
230            new_len: self.inner.new.len(),
231            idle_len: self.inner.idle.len(),
232            tasks_len: self.inner.tasks.len(),
233            ongoing: self.inner.ongoing.load(Ordering::Acquire),
234        }
235    }
236
237    /// Returns future that resolves to `ClientHandle`.
238    pub fn get_handle(&self) -> GetHandle {
239        GetHandle::new(self)
240    }
241
242    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<ClientHandle>> {
243        self.handle_futures(cx)?;
244
245        match self.take_conn() {
246            Some(client) => Poll::Ready(Ok(client)),
247            None => {
248                let new_conn_created = {
249                    let conn_count = self.inner.conn_count();
250
251                    if conn_count < self.max && self.inner.new.push(self.new_connection()).is_ok() {
252                        true
253                    } else {
254                        self.inner.tasks.push(cx.waker().clone());
255                        false
256                    }
257                };
258                if new_conn_created {
259                    self.poll(cx)
260                } else {
261                    Poll::Pending
262                }
263            }
264        }
265    }
266
267    fn new_connection(&self) -> BoxFuture<'static, Result<ClientHandle>> {
268        let source = self.config.clone();
269        let pool = Some(self.clone());
270        Box::pin(async move { Client::open(source, pool).await })
271    }
272
273    fn handle_futures(&mut self, cx: &mut Context<'_>) -> Result<()> {
274        if let Some(mut new) = self.inner.new.pop() {
275            match new.poll_unpin(cx) {
276                Poll::Ready(Ok(client)) => {
277                    self.inner.idle.push(client).unwrap();
278                }
279                Poll::Pending => {
280                    // NOTE: it is okay to drop the construction task
281                    // because another construction will be attempted
282                    // later in Pool::poll
283                    let _ = self.inner.new.push(new);
284                }
285                Poll::Ready(Err(err)) => {
286                    return Err(err);
287                }
288            }
289        }
290
291        Ok(())
292    }
293
294    fn take_conn(&mut self) -> Option<ClientHandle> {
295        if let Some(mut client) = self.inner.idle.pop() {
296            client.pool = PoolBinding::Attached(self.clone());
297            client.set_inside(false);
298            self.inner.ongoing.fetch_add(1, Ordering::AcqRel);
299            Some(client)
300        } else {
301            None
302        }
303    }
304
305    fn return_conn(&mut self, mut client: ClientHandle) {
306        let min = self.min;
307
308        let is_attached = client.pool.is_attached();
309        client.pool = PoolBinding::None;
310        client.set_inside(true);
311
312        if self.inner.idle.len() < min && is_attached {
313            let _ = self.inner.idle.push(client);
314        }
315        self.inner.ongoing.fetch_sub(1, Ordering::AcqRel);
316
317        while let Some(task) = self.inner.tasks.pop() {
318            task.wake()
319        }
320    }
321
322    pub(crate) fn get_addr(&self) -> &Url {
323        let n = self.inner.hosts.len();
324        let index = self.inner.connections_num.fetch_add(1, Ordering::SeqCst);
325        &self.inner.hosts[index % n]
326    }
327}
328
329impl Drop for ClientHandle {
330    fn drop(&mut self) {
331        if let (pool, Some(inner)) = (self.pool.take(), self.inner.take()) {
332            if !pool.is_some() {
333                return;
334            }
335
336            let context = self.context.clone();
337            let client = Self {
338                inner: Some(inner),
339                pool: pool.clone(),
340                context,
341            };
342            pool.return_conn(client);
343        }
344    }
345}