clickhouse_readonly/pool/
mod.rs1use std::{
2 fmt, mem,
3 pin::Pin,
4 sync::atomic::{self, Ordering},
5 sync::Arc,
6 task::{Context, Poll, Waker},
7 time::Duration,
8};
9
10use futures_util::future::BoxFuture;
11
12use crate::{
13 client::{Client, ClientHandle},
14 error::Result,
15};
16
17pub use self::futures::GetHandle;
18use futures_util::FutureExt;
19use url::Url;
20
21mod futures;
22
23const CONN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
25const QUERY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
27
28#[derive(Debug, Clone)]
29pub struct PoolConfig {
30 pub(crate) addr: Url,
31 pub(crate) database: String,
32 pub(crate) username: String,
33 pub(crate) password: String,
34 pub(crate) connection_timeout: Option<Duration>,
35 pub(crate) query_timeout: Option<Duration>,
36 pub(crate) secure: bool,
37}
38
39pub struct PoolConfigBuilder(PoolConfig);
40
41impl PoolConfigBuilder {
42 pub fn new(
43 addr: Url,
44 database: String,
45 username: String,
46 password: String,
47 secure: bool,
48 ) -> Self {
49 Self(PoolConfig {
50 addr,
51 database,
52 username,
53 password,
54 connection_timeout: None,
55 query_timeout: None,
56 secure,
57 })
58 }
59
60 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
61 self.0.connection_timeout = Some(timeout);
62 self
63 }
64
65 pub fn with_query_timeout(mut self, timeout: Duration) -> Self {
66 self.0.query_timeout = Some(timeout);
67 self
68 }
69
70 pub fn build(mut self) -> PoolConfig {
71 if self.0.connection_timeout.is_none() {
72 self.0.connection_timeout = Some(CONN_TIMEOUT)
73 }
74
75 if self.0.query_timeout.is_none() {
76 self.0.query_timeout = Some(QUERY_TIMEOUT)
77 }
78
79 self.0
80 }
81}
82
83impl Default for PoolConfig {
84 fn default() -> Self {
85 Self {
86 addr: Url::parse("tcp://127.0.0.1:9000").unwrap(),
87 database: "default".to_string(),
88 username: Default::default(),
89 password: Default::default(),
90 connection_timeout: Some(CONN_TIMEOUT),
91 query_timeout: Some(QUERY_TIMEOUT),
92 secure: false,
93 }
94 }
95}
96
97#[derive(Debug)]
98pub(crate) struct Inner {
99 new: crossbeam::queue::ArrayQueue<BoxFuture<'static, Result<ClientHandle>>>,
100 idle: crossbeam::queue::ArrayQueue<ClientHandle>,
101 tasks: crossbeam::queue::SegQueue<Waker>,
102 ongoing: atomic::AtomicUsize,
103 hosts: Vec<Url>,
104 connections_num: atomic::AtomicUsize,
105}
106
107impl Inner {
108 pub(crate) fn release_conn(&self) {
109 self.ongoing.fetch_sub(1, Ordering::AcqRel);
110 while let Some(task) = self.tasks.pop() {
111 task.wake()
112 }
113 }
114
115 fn conn_count(&self) -> usize {
116 let is_new_some = self.new.len();
117 let ongoing = self.ongoing.load(Ordering::Acquire);
118 let idle_count = self.idle.len();
119 is_new_some + idle_count + ongoing
120 }
121}
122
123#[derive(Clone)]
124pub(crate) enum PoolBinding {
125 None,
126 Attached(Pool),
127 Detached(Pool),
128}
129
130impl From<PoolBinding> for Option<Pool> {
131 fn from(binding: PoolBinding) -> Self {
132 match binding {
133 PoolBinding::None => None,
134 PoolBinding::Attached(pool) | PoolBinding::Detached(pool) => Some(pool),
135 }
136 }
137}
138
139impl PoolBinding {
140 pub(crate) fn take(&mut self) -> Self {
141 mem::replace(self, PoolBinding::None)
142 }
143
144 fn return_conn(self, client: ClientHandle) {
145 if let Some(mut pool) = self.into() {
146 Pool::return_conn(&mut pool, client);
147 }
148 }
149
150 pub(crate) fn is_attached(&self) -> bool {
151 matches!(self, PoolBinding::Attached(_))
152 }
153
154 pub(crate) fn is_some(&self) -> bool {
155 !matches!(self, PoolBinding::None)
156 }
157
158 pub(crate) fn attach(&mut self) {
159 match self.take() {
160 PoolBinding::Detached(pool) => *self = PoolBinding::Attached(pool),
161 _ => unreachable!(),
162 }
163 }
164
165 pub(crate) fn detach(&mut self) {
166 match self.take() {
167 PoolBinding::Attached(pool) => *self = PoolBinding::Detached(pool),
168 _ => unreachable!(),
169 }
170 }
171}
172
173#[derive(Clone)]
175pub struct Pool {
176 pub(crate) config: PoolConfig,
177 pub(crate) inner: Arc<Inner>,
178 min: usize,
179 max: usize,
180}
181
182#[derive(Debug)]
183struct PoolInfo {
184 new_len: usize,
185 idle_len: usize,
186 tasks_len: usize,
187 ongoing: usize,
188}
189
190impl fmt::Debug for Pool {
191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 let info = self.info();
193 f.debug_struct("Pool")
194 .field("min", &self.min)
195 .field("max", &self.max)
196 .field("new connections count", &info.new_len)
197 .field("idle connections count", &info.idle_len)
198 .field("tasks count", &info.tasks_len)
199 .field("ongoing connections count", &info.ongoing)
200 .finish()
201 }
202}
203
204impl Pool {
205 pub fn new(config: PoolConfig) -> Self {
207 let min = 5;
208 let max = 10;
209 let hosts = vec![config.addr.clone()];
210
211 let inner = Arc::new(Inner {
212 new: crossbeam::queue::ArrayQueue::new(1),
213 idle: crossbeam::queue::ArrayQueue::new(max),
214 tasks: crossbeam::queue::SegQueue::new(),
215 ongoing: atomic::AtomicUsize::new(0),
216 connections_num: atomic::AtomicUsize::new(0),
217 hosts,
218 });
219
220 Self {
221 config,
222 inner,
223 min,
224 max,
225 }
226 }
227
228 fn info(&self) -> PoolInfo {
229 PoolInfo {
230 new_len: self.inner.new.len(),
231 idle_len: self.inner.idle.len(),
232 tasks_len: self.inner.tasks.len(),
233 ongoing: self.inner.ongoing.load(Ordering::Acquire),
234 }
235 }
236
237 pub fn get_handle(&self) -> GetHandle {
239 GetHandle::new(self)
240 }
241
242 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<ClientHandle>> {
243 self.handle_futures(cx)?;
244
245 match self.take_conn() {
246 Some(client) => Poll::Ready(Ok(client)),
247 None => {
248 let new_conn_created = {
249 let conn_count = self.inner.conn_count();
250
251 if conn_count < self.max && self.inner.new.push(self.new_connection()).is_ok() {
252 true
253 } else {
254 self.inner.tasks.push(cx.waker().clone());
255 false
256 }
257 };
258 if new_conn_created {
259 self.poll(cx)
260 } else {
261 Poll::Pending
262 }
263 }
264 }
265 }
266
267 fn new_connection(&self) -> BoxFuture<'static, Result<ClientHandle>> {
268 let source = self.config.clone();
269 let pool = Some(self.clone());
270 Box::pin(async move { Client::open(source, pool).await })
271 }
272
273 fn handle_futures(&mut self, cx: &mut Context<'_>) -> Result<()> {
274 if let Some(mut new) = self.inner.new.pop() {
275 match new.poll_unpin(cx) {
276 Poll::Ready(Ok(client)) => {
277 self.inner.idle.push(client).unwrap();
278 }
279 Poll::Pending => {
280 let _ = self.inner.new.push(new);
284 }
285 Poll::Ready(Err(err)) => {
286 return Err(err);
287 }
288 }
289 }
290
291 Ok(())
292 }
293
294 fn take_conn(&mut self) -> Option<ClientHandle> {
295 if let Some(mut client) = self.inner.idle.pop() {
296 client.pool = PoolBinding::Attached(self.clone());
297 client.set_inside(false);
298 self.inner.ongoing.fetch_add(1, Ordering::AcqRel);
299 Some(client)
300 } else {
301 None
302 }
303 }
304
305 fn return_conn(&mut self, mut client: ClientHandle) {
306 let min = self.min;
307
308 let is_attached = client.pool.is_attached();
309 client.pool = PoolBinding::None;
310 client.set_inside(true);
311
312 if self.inner.idle.len() < min && is_attached {
313 let _ = self.inner.idle.push(client);
314 }
315 self.inner.ongoing.fetch_sub(1, Ordering::AcqRel);
316
317 while let Some(task) = self.inner.tasks.pop() {
318 task.wake()
319 }
320 }
321
322 pub(crate) fn get_addr(&self) -> &Url {
323 let n = self.inner.hosts.len();
324 let index = self.inner.connections_num.fetch_add(1, Ordering::SeqCst);
325 &self.inner.hosts[index % n]
326 }
327}
328
329impl Drop for ClientHandle {
330 fn drop(&mut self) {
331 if let (pool, Some(inner)) = (self.pool.take(), self.inner.take()) {
332 if !pool.is_some() {
333 return;
334 }
335
336 let context = self.context.clone();
337 let client = Self {
338 inner: Some(inner),
339 pool: pool.clone(),
340 context,
341 };
342 pool.return_conn(client);
343 }
344 }
345}