clickhouse_driver/pool/
builder.rs1use std::{
2 convert::{TryFrom, TryInto},
3 sync::atomic,
4 sync::Arc,
5};
6
7use tokio::sync;
8
9use crate::{
10 client::InnerConection,
11 errors::{Result, UrlError},
12 sync::WakerSet,
13};
14
15use super::{CompressionMethod, Inner, Options, Pool, POOL_STATUS_SERVE};
16use parking_lot::Mutex;
17
18pub struct PoolBuilder {
31 addr: Vec<String>,
32 username: Option<String>,
33 password: Option<String>,
34 database: Option<String>,
35 pool_min: u16,
36 pool_max: u16,
37 compression: CompressionMethod,
38 ping: bool,
39}
40
41impl TryFrom<PoolBuilder> for Options {
42 type Error = UrlError;
43
44 fn try_from(mut value: PoolBuilder) -> std::result::Result<Self, UrlError> {
45 if value.addr.is_empty() {
46 return Err(UrlError::Invalid);
47 }
48
49 let mut options = crate::DEF_OPTIONS.clone();
50 std::mem::swap(&mut options.addr, &mut value.addr);
51 options.compression = value.compression;
52 options.ping_before_query = value.ping;
53 options.username = value.username.replace(options.username).unwrap_or_default();
54 options.password = value.password.replace(options.password).unwrap_or_default();
55 options.database = value.database.replace(options.database).unwrap_or_default();
56
57 options.pool_min = value.pool_min;
58 options.pool_max = value.pool_max;
59 Ok(options)
60 }
61}
62
63impl PoolBuilder {
64 #[inline]
67 pub fn with_compression(mut self) -> Self {
68 self.compression = CompressionMethod::LZ4;
69 self
70 }
71 #[inline]
74 pub fn with_ping(mut self) -> Self {
75 self.ping = true;
76 self
77 }
78 #[inline]
82 pub fn with_pool(mut self, min: u16, max: u16) -> Self {
83 self.pool_min = min;
84 self.pool_max = max;
85 self
86 }
87
88 #[inline]
90 pub fn with_username(mut self, value: impl ToString) -> Self {
91 self.username = Some(value.to_string());
92 self
93 }
94 #[inline]
97 pub fn with_database(mut self, value: impl ToString) -> Self {
98 self.database = Some(value.to_string());
99 self
100 }
101 #[inline]
103 pub fn with_password(mut self, value: impl ToString) -> Self {
104 self.password = Some(value.to_string());
105 self
106 }
107
108 #[inline]
121 pub fn add_addr(mut self, value: impl ToString) -> Self {
122 self.addr.push(value.to_string());
123 self
124 }
125 #[inline]
130 pub fn build(self) -> Result<Pool> {
131 let options: Options = self.try_into()?;
132 PoolBuilder::create(options)
133 }
134 pub(super) fn create(mut options: Options) -> Result<Pool> {
136 if options.pool_max < options.pool_min {
137 return Err(UrlError::InvalidPoolConstraints {
138 min: options.pool_min,
139 max: options.pool_max,
140 }
141 .into());
142 }
143
144 #[allow(unused_variables)]
145 if cfg!(feature = "recycle") {
146 let (tx, rx) = sync::mpsc::unbounded_channel::<Option<Box<InnerConection>>>();
147 }
148
149 let hosts = options.take_addr();
150
151 let inner = Arc::new(Inner {
152 new: crossbeam::queue::ArrayQueue::new(options.pool_min as usize),
153 options,
154 wakers: WakerSet::new(),
155 lock: Mutex::new(0),
156 connections_num: atomic::AtomicUsize::new(0),
157 #[cfg(feature = "recycle")]
159 recycler: Some(rx),
160 hosts,
161 close: POOL_STATUS_SERVE.into(),
162 });
163
164 let mut pool = Pool {
165 inner,
166 #[cfg(feature = "recycle")]
167 drop: tx,
168 };
169 pool.inner.spawn_recycler();
170 Ok(pool)
171 }
172}
173
174impl Default for PoolBuilder {
175 fn default() -> Self {
176 PoolBuilder {
177 addr: Vec::new(),
178 username: None,
179 password: None,
180 database: None,
181 pool_min: crate::DEF_OPTIONS.pool_min,
182 pool_max: crate::DEF_OPTIONS.pool_max,
183 ping: false,
184 compression: CompressionMethod::None,
185 }
186 }
187}