Skip to main content

clickhouse_driver/pool/
builder.rs

1use std::{
2    convert::{TryFrom, TryInto},
3    sync::atomic,
4    sync::Arc,
5};
6
7use tokio::sync;
8
9use crate::{
10    client::InnerConection,
11    errors::{Result, UrlError},
12    sync::WakerSet,
13};
14
15use super::{CompressionMethod, Inner, Options, Pool, POOL_STATUS_SERVE};
16use parking_lot::Mutex;
17
18/// Connection pool builder
19///
20/// # Example
21/// `
22/// let pool = PoolBuilder::default()
23///    .with_database("default")
24///    .with_username("default")
25///    .with_password("")
26///    .add_addr("www.example.com:9000")
27///    .build()
28///    .unwrap();
29/// `
30pub struct PoolBuilder {
31    addr: Vec<String>,
32    username: Option<String>,
33    password: Option<String>,
34    database: Option<String>,
35    pool_min: u16,
36    pool_max: u16,
37    compression: CompressionMethod,
38    ping: bool,
39}
40
41impl TryFrom<PoolBuilder> for Options {
42    type Error = UrlError;
43
44    fn try_from(mut value: PoolBuilder) -> std::result::Result<Self, UrlError> {
45        if value.addr.is_empty() {
46            return Err(UrlError::Invalid);
47        }
48
49        let mut options = crate::DEF_OPTIONS.clone();
50        std::mem::swap(&mut options.addr, &mut value.addr);
51        options.compression = value.compression;
52        options.ping_before_query = value.ping;
53        options.username = value.username.replace(options.username).unwrap_or_default();
54        options.password = value.password.replace(options.password).unwrap_or_default();
55        options.database = value.database.replace(options.database).unwrap_or_default();
56
57        options.pool_min = value.pool_min;
58        options.pool_max = value.pool_max;
59        Ok(options)
60    }
61}
62
63impl PoolBuilder {
64    /// Set compression option
65    /// This make connection use LZ4 compression for block data transfer
66    #[inline]
67    pub fn with_compression(mut self) -> Self {
68        self.compression = CompressionMethod::LZ4;
69        self
70    }
71    /// If set, this option make connection check server availability after it
72    /// is received from pool.
73    #[inline]
74    pub fn with_ping(mut self) -> Self {
75        self.ping = true;
76        self
77    }
78    /// Set connection pool boundaries
79    /// min - set the number of idle connection that the pool can keep up to
80    /// max - set maximum number of connection that pool can issued
81    #[inline]
82    pub fn with_pool(mut self, min: u16, max: u16) -> Self {
83        self.pool_min = min;
84        self.pool_max = max;
85        self
86    }
87
88    /// Set the username that is used in authentication
89    #[inline]
90    pub fn with_username(mut self, value: impl ToString) -> Self {
91        self.username = Some(value.to_string());
92        self
93    }
94    /// Set the default database that is used in query processing if
95    /// the query doesn't explicitly specify another database name
96    #[inline]
97    pub fn with_database(mut self, value: impl ToString) -> Self {
98        self.database = Some(value.to_string());
99        self
100    }
101    /// Set password that is used in authentication
102    #[inline]
103    pub fn with_password(mut self, value: impl ToString) -> Self {
104        self.password = Some(value.to_string());
105        self
106    }
107
108    /// Set server host address.
109    ///
110    /// Address must have domain name and port number
111    /// # Example
112    /// `
113    /// PoolBuilder::new()
114    ///   .with_addr('example1.com:9000')
115    ///   .with_addr('example2.com:9000');
116    /// `
117    /// Connection pool can have multiple addresses
118    /// In this case next connection randomly chooses any
119    /// available one if it's reachable
120    #[inline]
121    pub fn add_addr(mut self, value: impl ToString) -> Self {
122        self.addr.push(value.to_string());
123        self
124    }
125    /// Convert the Builder into Pool using specified options.
126    /// Note! Created Pool does not have connection.
127    /// First connection will be created by executing pool.connection()
128    ///
129    #[inline]
130    pub fn build(self) -> Result<Pool> {
131        let options: Options = self.try_into()?;
132        PoolBuilder::create(options)
133    }
134    /// Construct Pool from Option object
135    pub(super) fn create(mut options: Options) -> Result<Pool> {
136        if options.pool_max < options.pool_min {
137            return Err(UrlError::InvalidPoolConstraints {
138                min: options.pool_min,
139                max: options.pool_max,
140            }
141            .into());
142        }
143
144        #[allow(unused_variables)]
145        if cfg!(feature = "recycle") {
146            let (tx, rx) = sync::mpsc::unbounded_channel::<Option<Box<InnerConection>>>();
147        }
148
149        let hosts = options.take_addr();
150
151        let inner = Arc::new(Inner {
152            new: crossbeam::queue::ArrayQueue::new(options.pool_min as usize),
153            options,
154            wakers: WakerSet::new(),
155            lock: Mutex::new(0),
156            connections_num: atomic::AtomicUsize::new(0),
157            //wait: atomic::AtomicUsize::new(0),
158            #[cfg(feature = "recycle")]
159            recycler: Some(rx),
160            hosts,
161            close: POOL_STATUS_SERVE.into(),
162        });
163
164        let mut pool = Pool {
165            inner,
166            #[cfg(feature = "recycle")]
167            drop: tx,
168        };
169        pool.inner.spawn_recycler();
170        Ok(pool)
171    }
172}
173
174impl Default for PoolBuilder {
175    fn default() -> Self {
176        PoolBuilder {
177            addr: Vec::new(),
178            username: None,
179            password: None,
180            database: None,
181            pool_min: crate::DEF_OPTIONS.pool_min,
182            pool_max: crate::DEF_OPTIONS.pool_max,
183            ping: false,
184            compression: CompressionMethod::None,
185        }
186    }
187}