clickhouse_arrow/client/builder.rs
1use std::hash::{Hash, Hasher};
2use std::net::SocketAddr;
3use std::path::Path;
4use std::sync::Arc;
5
6use tracing::error;
7
8use super::tcp::Destination;
9use super::{
10 ArrowOptions, Client, ClientFormat, CompressionMethod, ConnectionContext, Extension, Secret,
11};
12#[cfg(feature = "pool")]
13use crate::pool::ConnectionManager;
14use crate::prelude::SettingValue;
15use crate::settings::Settings;
16use crate::telemetry::TraceContext;
17use crate::{ArrowFormat, ClientOptions, Error, NativeFormat, Result};
18
19/// A builder for configuring and creating a `ClickHouse` client.
20///
21/// The `ClientBuilder` provides a fluent interface to set up a [`Client`] with
22/// custom connection parameters, such as the server address, credentials, TLS,
23/// compression, and session settings. It supports creating either a single
24/// [`Client`] (via [`ClientBuilder::build`]) or a connection pool (via
25/// [`ClientBuilder::build_pool_manager`], with the `pool` feature enabled).
26///
27/// Use this builder for fine-grained control over the client configuration. The
28/// builder ensures that the destination address is verified before establishing a
29/// connection, either explicitly via [`ClientBuilder::verify`] or implicitly during
30/// the build process.
31///
32/// # Examples
33/// ```rust,ignore
34/// use clickhouse_arrow::prelude::*;
35///
36/// let client = ClientBuilder::new()
37/// .with_endpoint("localhost:9000")
38/// .with_username("default")
39/// .with_password("")
40/// .build_arrow()
41/// .await
42/// .unwrap();
43///
44/// // Use the client to query `ClickHouse`
45/// client.query("SELECT 1").await.unwrap();
46/// ```
47#[derive(Default, Debug, Clone)]
48pub struct ClientBuilder {
49 destination: Option<Destination>,
50 options: ClientOptions,
51 settings: Option<Settings>,
52 context: Option<ConnectionContext>,
53 verified: bool,
54}
55
56impl ClientBuilder {
57 /// Creates a new `ClientBuilder` with default configuration.
58 ///
59 /// This method initializes a builder with default [`ClientOptions`], no destination,
60 /// settings, or context. Use this as the starting point to configure a `ClickHouse`
61 /// client with methods like [`ClientBuilder::with_endpoint`],
62 /// [`ClientBuilder::with_username`], or [`ClientBuilder::with_tls`].
63 ///
64 /// # Returns
65 /// A new [`ClientBuilder`] instance ready for configuration.
66 ///
67 /// # Examples
68 /// ```rust,ignore
69 /// use clickhouse_arrow::prelude::*;
70 ///
71 /// let builder = ClientBuilder::new()
72 /// .with_endpoint("localhost:9000")
73 /// .with_username("default");
74 /// ```
75 pub fn new() -> Self {
76 ClientBuilder {
77 destination: None,
78 options: ClientOptions::default(),
79 settings: None,
80 context: None,
81 verified: false,
82 }
83 }
84
85 /// Retrieves the configured destination, if set.
86 ///
87 /// This method returns the `ClickHouse` server address (as a [`Destination`]) that
88 /// has been configured via methods like [`ClientBuilder::with_endpoint`] or
89 /// [`ClientBuilder::with_socket_addr`]. Returns `None` if no destination is set.
90 ///
91 /// # Returns
92 /// An `Option<&Destination>` containing the configured destination, or `None` if
93 /// not set.
94 ///
95 /// # Examples
96 /// ```rust,ignore
97 /// use clickhouse_arrow::prelude::*;
98 ///
99 /// let builder = ClientBuilder::new()
100 /// .with_endpoint("localhost:9000");
101 /// if let Some(dest) = builder.destination() {
102 /// println!("Destination: {:?}", dest);
103 /// }
104 /// ```
105 pub fn destination(&self) -> Option<&Destination> { self.destination.as_ref() }
106
107 /// Retrieves the current connection options.
108 ///
109 /// This method returns a reference to the [`ClientOptions`] configured for the
110 /// builder, which includes settings like username, password, TLS, and compression.
111 /// These options can be set via methods like [`ClientBuilder::with_username`],
112 /// [`ClientBuilder::with_tls`], or [`ClientBuilder::with_options`].
113 ///
114 /// # Returns
115 /// A reference to the current [`ClientOptions`].
116 ///
117 /// # Examples
118 /// ```rust,ignore
119 /// use clickhouse_arrow::prelude::*;
120 ///
121 /// let builder = ClientBuilder::new()
122 /// .with_username("default")
123 /// .with_endpoint("localhost:9000");
124 /// let options = builder.options();
125 /// println!("Username: {}", options.username);
126 /// ```
127 pub fn options(&self) -> &ClientOptions { &self.options }
128
129 /// Retrieves the configured session settings, if set.
130 ///
131 /// This method returns the `ClickHouse` session settings (as `Settings`)
132 /// that have been configured via [`ClientBuilder::with_settings`]. These settings
133 /// control query behavior, such as timeouts or maximum rows. Returns `None` if no
134 /// settings are set.
135 ///
136 /// # Returns
137 /// An `Option<&Settings>` containing the configured settings, or `None` if not set.
138 ///
139 /// # Examples
140 /// ```rust,ignore
141 /// use clickhouse_arrow::prelude::*;
142 /// use std::sync::Arc;
143 ///
144 /// let settings = Settings::default();
145 /// let builder = ClientBuilder::new()
146 /// .with_settings(settings.clone());
147 /// if let Some(config_settings) = builder.settings() {
148 /// println!("Settings: {config_settings:?}");
149 /// assert_eq!(config_settings, &settings)
150 /// }
151 /// ```
152 pub fn settings(&self) -> Option<&Settings> { self.settings.as_ref() }
153
154 /// Checks whether the builder's destination has been verified.
155 ///
156 /// A verified builder indicates that the `ClickHouse` server's destination address
157 /// has been resolved into valid socket addresses (via [`ClientBuilder::verify`] or
158 /// during [`ClientBuilder::build`]). Verification ensures that the address is
159 /// reachable before attempting to connect. This method returns `false` if the
160 /// destination is unset or has been modified since the last verification.
161 ///
162 /// # Returns
163 /// A `bool` indicating whether the destination is verified.
164 ///
165 /// # Examples
166 /// ```rust,ignore
167 /// use clickhouse_arrow::prelude::*;
168 ///
169 /// let builder = ClientBuilder::new()
170 /// .with_endpoint("localhost:9000");
171 /// println!("Verified: {}", builder.verified()); // false
172 ///
173 /// let verified_builder = builder.verify().await.unwrap();
174 /// println!("Verified: {}", verified_builder.verified()); // true
175 /// ```
176 pub fn verified(&self) -> bool { self.verified }
177
178 /// Sets the `ClickHouse` server address using a socket address.
179 ///
180 /// This method configures the destination using a [`SocketAddr`] (e.g.,
181 /// `127.0.0.1:9000`). It is useful when the address is already resolved. For
182 /// hostname-based addresses, use [`ClientBuilder::with_endpoint`] or
183 /// [`ClientBuilder::with_host_port`].
184 ///
185 /// # Parameters
186 /// - `addr`: The socket address of the `ClickHouse` server.
187 ///
188 /// # Returns
189 /// A new [`ClientBuilder`] with the updated destination.
190 ///
191 /// # Examples
192 /// ```rust,ignore
193 /// use clickhouse_arrow::prelude::*;
194 /// use std::net::{Ipv4Addr, SocketAddr};
195 ///
196 /// let addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 9000);
197 /// let builder = ClientBuilder::new()
198 /// .with_socket_addr(addr);
199 /// ```
200 #[must_use]
201 pub fn with_socket_addr(self, addr: SocketAddr) -> Self { self.with_destination(addr) }
202
203 /// Sets the `ClickHouse` server address using a hostname and port.
204 ///
205 /// This method configures the destination using a hostname (e.g., `"localhost"`) and
206 /// port number (e.g., `9000`). The hostname will be resolved during verification
207 /// (via [`ClientBuilder::verify`] or [`ClientBuilder::build`]). For resolved
208 /// addresses, use [`ClientBuilder::with_socket_addr`].
209 ///
210 /// # Parameters
211 /// - `host`: The hostname or IP address of the `ClickHouse` server.
212 /// - `port`: The port number of the `ClickHouse` server.
213 ///
214 /// # Returns
215 /// A new [`ClientBuilder`] with the updated destination.
216 ///
217 /// # Examples
218 /// ```rust,ignore
219 /// use clickhouse_arrow::prelude::*;
220 ///
221 /// let builder = ClientBuilder::new()
222 /// .with_host_port("localhost", 9000);
223 /// ```
224 #[must_use]
225 pub fn with_host_port(self, host: impl Into<String>, port: u16) -> Self {
226 self.with_destination((host.into(), port))
227 }
228
229 /// Sets the `ClickHouse` server address using a string endpoint.
230 ///
231 /// This method configures the destination using a string in the format
232 /// `"host:port"` (e.g., `"localhost:9000"`). The endpoint will be resolved during
233 /// verification (via [`ClientBuilder::verify`] or [`ClientBuilder::build`]). For
234 /// separate host and port, use [`ClientBuilder::with_host_port`].
235 ///
236 /// # Parameters
237 /// - `endpoint`: The `ClickHouse` server address as a string (e.g., `"localhost:9000"`).
238 ///
239 /// # Returns
240 /// A new [`ClientBuilder`] with the updated destination.
241 ///
242 /// # Examples
243 /// ```rust,ignore
244 /// use clickhouse_arrow::prelude::*;
245 ///
246 /// let builder = ClientBuilder::new()
247 /// .with_endpoint("localhost:9000");
248 /// ```
249 #[must_use]
250 pub fn with_endpoint(self, endpoint: impl Into<String>) -> Self {
251 self.with_destination(endpoint.into())
252 }
253
254 /// Sets the `ClickHouse` server address using any compatible destination type.
255 ///
256 /// This method configures the destination using a type that can be converted into
257 /// a [`Destination`], such as a string endpoint (e.g., `"localhost:9000"`), a
258 /// `(host, port)` tuple, or a [`SocketAddr`]. It is the most flexible way to set
259 /// the destination. For specific formats, use [`ClientBuilder::with_endpoint`],
260 /// [`ClientBuilder::with_host_port`], or [`ClientBuilder::with_socket_addr`].
261 ///
262 /// # Parameters
263 /// - `destination`: The `ClickHouse` server address, convertible to [`Destination`].
264 ///
265 /// # Returns
266 /// A new [`ClientBuilder`] with the updated destination.
267 ///
268 /// # Examples
269 /// ```rust,ignore
270 /// use clickhouse_arrow::prelude::*;
271 ///
272 /// let builder = ClientBuilder::new()
273 /// .with_destination("localhost:9000");
274 /// ```
275 #[must_use]
276 pub fn with_destination<D>(mut self, destination: D) -> Self
277 where
278 D: Into<Destination>,
279 {
280 self.destination = Some(destination.into());
281 self.verified = false;
282 self
283 }
284
285 /// Sets the connection options directly.
286 ///
287 /// This method replaces the current [`ClientOptions`] with the provided options,
288 /// overriding any settings configured via methods like
289 /// [`ClientBuilder::with_username`] or [`ClientBuilder::with_tls`]. Use this when
290 /// you have a pre-configured [`ClientOptions`] instance or need to set multiple
291 /// options at once.
292 ///
293 /// # Parameters
294 /// - `options`: The [`ClientOptions`] to use for the connection.
295 ///
296 /// # Returns
297 /// A new [`ClientBuilder`] with the updated options.
298 ///
299 /// # Examples
300 /// ```rust,ignore
301 /// use clickhouse_arrow::prelude::*;
302 ///
303 /// let options = ClientOptions::default()
304 /// .username("default")
305 /// .password("");
306 /// let builder = ClientBuilder::new()
307 /// .with_options(options);
308 /// ```
309 #[must_use]
310 pub fn with_options(mut self, options: ClientOptions) -> Self {
311 self.options = options;
312 self.verified = false;
313 self
314 }
315
316 #[must_use]
317 pub fn with_ext<F>(mut self, cb: F) -> Self
318 where
319 F: FnOnce(Extension) -> Extension,
320 {
321 self.options.ext = cb(self.options.ext);
322 self
323 }
324
325 /// Enables or disables TLS for the `ClickHouse` connection.
326 ///
327 /// This method configures whether the client will use a secure TLS connection to
328 /// communicate with `ClickHouse`. If TLS is enabled, you may also need to set a CA
329 /// file (via [`ClientBuilder::with_cafile`]) or domain (via
330 /// [`ClientBuilder::with_domain`]) for secure connections.
331 ///
332 /// # Parameters
333 /// - `tls`: If `true`, enables TLS; if `false`, uses plain TCP.
334 ///
335 /// # Returns
336 /// A new [`ClientBuilder`] with the updated TLS setting.
337 ///
338 /// # Examples
339 /// ```rust,ignore
340 /// use clickhouse_arrow::prelude::*;
341 ///
342 /// let builder = ClientBuilder::new()
343 /// .with_endpoint("localhost:9000")
344 /// .with_tls(true);
345 /// ```
346 #[must_use]
347 pub fn with_tls(mut self, tls: bool) -> Self {
348 if self.options.use_tls != tls {
349 self.options.use_tls = tls;
350 self.verified = false;
351 }
352 self
353 }
354
355 /// Sets the CA file for TLS connections to `ClickHouse`.
356 ///
357 /// This method specifies the path to a certificate authority (CA) file used to
358 /// verify the `ClickHouse` server's certificate during TLS connections. It is
359 /// required when TLS is enabled (via [`ClientBuilder::with_tls`]) and the server
360 /// uses a custom or self-signed certificate.
361 ///
362 /// # Parameters
363 /// - `cafile`: The path to the CA file.
364 ///
365 /// # Returns
366 /// A new [`ClientBuilder`] with the updated CA file setting.
367 ///
368 /// # Examples
369 /// ```rust,ignore
370 /// use clickhouse_arrow::prelude::*;
371 ///
372 /// let builder = ClientBuilder::new()
373 /// .with_endpoint("localhost:9000")
374 /// .with_tls(true)
375 /// .with_cafile("/path/to/ca.crt");
376 /// ```
377 #[must_use]
378 pub fn with_cafile<P: AsRef<Path>>(mut self, cafile: P) -> Self {
379 self.options.cafile = Some(cafile.as_ref().to_path_buf());
380 self
381 }
382
383 /// Forces the use of IPv4-only resolution for the `ClickHouse` server address.
384 ///
385 /// This method configures whether the destination address resolution (during
386 /// [`ClientBuilder::verify`] or [`ClientBuilder::build`]) should use only IPv4
387 /// addresses. By default, both IPv4 and IPv6 are considered. Enable this for
388 /// environments where IPv6 is unsupported.
389 ///
390 /// # Parameters
391 /// - `enabled`: If `true`, restricts resolution to IPv4; if `false`, allows both IPv4 and IPv6.
392 ///
393 /// # Returns
394 /// A new [`ClientBuilder`] with the updated IPv4 setting.
395 ///
396 /// # Examples
397 /// ```rust,ignore
398 /// use clickhouse_arrow::prelude::*;
399 ///
400 /// let builder = ClientBuilder::new()
401 /// .with_endpoint("localhost:9000")
402 /// .with_ipv4_only(true);
403 /// ```
404 #[must_use]
405 pub fn with_ipv4_only(mut self, enabled: bool) -> Self {
406 self.options.ipv4_only = enabled;
407 self
408 }
409
410 /// Sets the `ClickHouse` session settings.
411 ///
412 /// This method configures the session settings (e.g., query timeouts, max rows) for
413 /// the client. These settings are applied to all queries executed by the client.
414 /// Use this to customize query behavior beyond the defaults.
415 ///
416 /// # Parameters
417 /// - `settings`: The session settings as an `impl Into<Settings>`.
418 ///
419 /// # Returns
420 /// A new [`ClientBuilder`] with the updated settings.
421 ///
422 /// # Examples
423 /// ```rust,ignore
424 /// use clickhouse_arrow::prelude::*;
425 /// use std::sync::Arc;
426 ///
427 /// let settings = vec![
428 /// ("select_sequential_consistency", 1)
429 /// ];
430 /// let builder = ClientBuilder::new()
431 /// .with_endpoint("localhost:9000")
432 /// .with_settings(settings);
433 /// ```
434 #[must_use]
435 pub fn with_settings(mut self, settings: impl Into<Settings>) -> Self {
436 self.settings = Some(settings.into());
437 self
438 }
439
440 /// Set a `ClickHouse` session setting.
441 ///
442 /// This method configures the session settings (e.g., query timeouts, max rows) for
443 /// the client. These settings are applied to all queries executed by the client.
444 /// Use this to customize query behavior beyond the defaults.
445 ///
446 /// # Parameters
447 /// - `key`: The session setting's name.
448 /// - `setting`: The session setting as an `impl Into<SettingValue>`.
449 ///
450 /// # Returns
451 /// A new [`ClientBuilder`] with the updated settings.
452 ///
453 /// # Examples
454 /// ```rust,ignore
455 /// use clickhouse_arrow::prelude::*;
456 /// use std::sync::Arc;
457 ///
458 /// let builder = ClientBuilder::new()
459 /// .with_endpoint("localhost:9000")
460 /// .with_setting("select_sequential_consistency", 1);
461 /// ```
462 #[must_use]
463 pub fn with_setting(
464 mut self,
465 name: impl Into<String>,
466 setting: impl Into<SettingValue>,
467 ) -> Self {
468 let setting: SettingValue = setting.into();
469 let settings = self.settings.unwrap_or_default().with_setting(name, setting);
470 self.settings = Some(settings);
471 self
472 }
473
474 /// Sets the username for authenticating with `ClickHouse`.
475 ///
476 /// This method configures the username used to authenticate the client with the
477 /// `ClickHouse` server. The default username is typically `"default"`, but this
478 /// can be customized based on the server's configuration.
479 ///
480 /// # Parameters
481 /// - `username`: The username for authentication.
482 ///
483 /// # Returns
484 /// A new [`ClientBuilder`] with the updated username.
485 ///
486 /// # Examples
487 /// ```rust,ignore
488 /// use clickhouse_arrow::prelude::*;
489 ///
490 /// let builder = ClientBuilder::new()
491 /// .with_endpoint("localhost:9000")
492 /// .with_username("admin");
493 /// ```
494 #[must_use]
495 pub fn with_username(mut self, username: impl Into<String>) -> Self {
496 self.options.username = username.into();
497 self
498 }
499
500 /// Sets the password for authenticating with `ClickHouse`.
501 ///
502 /// This method configures the password used to authenticate the client with the
503 /// `ClickHouse` server. The password is stored securely as a [`Secret`]. If no
504 /// password is required, an empty string can be used.
505 ///
506 /// # Parameters
507 /// - `password`: The password for authentication, convertible to [`Secret`].
508 ///
509 /// # Returns
510 /// A new [`ClientBuilder`] with the updated password.
511 ///
512 /// # Examples
513 /// ```rust,ignore
514 /// use clickhouse_arrow::prelude::*;
515 ///
516 /// let builder = ClientBuilder::new()
517 /// .with_endpoint("localhost:9000")
518 /// .with_username("admin")
519 /// .with_password("secret");
520 /// ```
521 #[must_use]
522 pub fn with_password<T>(mut self, password: T) -> Self
523 where
524 Secret: From<T>,
525 {
526 self.options.password = Secret::from(password);
527 self
528 }
529
530 /// Sets the default database for the `ClickHouse` connection.
531 ///
532 /// This method configures the default database used by the client for queries and
533 /// operations. If not set, the `ClickHouse` server defaults to the `"default"`
534 /// database. Specifying a database is optional but recommended when working with a
535 /// specific database to avoid explicit database prefixes in queries.
536 ///
537 /// # Parameters
538 /// - `database`: The name of the default database.
539 ///
540 /// # Returns
541 /// A new [`ClientBuilder`] with the updated database.
542 ///
543 /// # Examples
544 /// ```rust,ignore
545 /// use clickhouse_arrow::prelude::*;
546 ///
547 /// let builder = ClientBuilder::new()
548 /// .with_endpoint("localhost:9000")
549 /// .with_database("my_db");
550 /// ```
551 #[must_use]
552 pub fn with_database(mut self, database: impl Into<String>) -> Self {
553 self.options.default_database = database.into();
554 self
555 }
556
557 /// Sets the domain for secure TLS connections to `ClickHouse`.
558 ///
559 /// This method specifies the domain name used for TLS verification when connecting
560 /// to a `ClickHouse` server with TLS enabled (via [`ClientBuilder::with_tls`]). If
561 /// not set, the domain is inferred from the destination during verification. Use
562 /// this to explicitly set the domain for cloud or custom TLS configurations.
563 ///
564 /// # Parameters
565 /// - `domain`: The domain name for TLS verification.
566 ///
567 /// # Returns
568 /// A new [`ClientBuilder`] with the updated domain.
569 ///
570 /// # Examples
571 /// ```rust,ignore
572 /// use clickhouse_arrow::prelude::*;
573 ///
574 /// let builder = ClientBuilder::new()
575 /// .with_endpoint("localhost:9000")
576 /// .with_tls(true)
577 /// .with_domain("clickhouse.example.com");
578 /// ```
579 #[must_use]
580 pub fn with_domain(mut self, domain: impl Into<String>) -> Self {
581 self.options.domain = Some(domain.into());
582 self.verified = false;
583 self
584 }
585
586 /// Sets the compression method for data exchange with `ClickHouse`.
587 ///
588 /// This method configures the compression algorithm used for sending and receiving
589 /// data to/from `ClickHouse`. The default is [`CompressionMethod::LZ4`], which
590 /// balances performance and compression ratio. Other options may be available
591 /// depending on the `ClickHouse` server configuration.
592 ///
593 /// # Parameters
594 /// - `compression`: The compression method to use.
595 ///
596 /// # Returns
597 /// A new [`ClientBuilder`] with the updated compression setting.
598 ///
599 /// # Examples
600 /// ```rust,ignore
601 /// use clickhouse_arrow::prelude::*;
602 ///
603 /// let builder = ClientBuilder::new()
604 /// .with_endpoint("localhost:9000")
605 /// .with_compression(CompressionMethod::LZ4);
606 /// ```
607 #[must_use]
608 pub fn with_compression(mut self, compression: CompressionMethod) -> Self {
609 self.options.compression = compression;
610 self
611 }
612
613 /// Sets the Arrow-specific options for `ClickHouse` connections.
614 ///
615 /// This method configures options specific to the Arrow format (used by
616 /// [`Client<ArrowFormat>`]), such as schema mapping or data type conversions. These options
617 /// are applied when the client is built with [`ClientBuilder::build`] for
618 /// [`ArrowFormat`]. Use this to customize Arrow interoperability.
619 ///
620 /// # Parameters
621 /// - `options`: The Arrow-specific options.
622 ///
623 /// # Returns
624 /// A new [`ClientBuilder`] with the updated Arrow options.
625 ///
626 /// # Examples
627 /// ```rust,ignore
628 /// use clickhouse_arrow::prelude::*;
629 ///
630 /// let arrow_options = ArrowOptions::default();
631 /// let builder = ClientBuilder::new()
632 /// .with_endpoint("localhost:9000")
633 /// .with_arrow_options(arrow_options);
634 /// ```
635 #[must_use]
636 pub fn with_arrow_options(mut self, options: ArrowOptions) -> Self {
637 self.options.ext.arrow = Some(options);
638 self
639 }
640
641 /// Sets a tracing context for `ClickHouse` connections and queries.
642 ///
643 /// This method configures a [`TraceContext`] to enable distributed tracing for
644 /// client operations, such as connections and queries. The tracing context is
645 /// included in logs and can be used to monitor and debug client behavior across
646 /// distributed systems.
647 ///
648 /// # Parameters
649 /// - `trace_context`: The tracing context to use.
650 ///
651 /// # Returns
652 /// A new [`ClientBuilder`] with the updated tracing context.
653 ///
654 /// # Examples
655 /// ```rust,ignore
656 /// use clickhouse_arrow::prelude::*;
657 ///
658 /// let trace_context = TraceContext::default();
659 /// let builder = ClientBuilder::new()
660 /// .with_endpoint("localhost:9000")
661 /// .with_trace_context(trace_context);
662 /// ```
663 #[must_use]
664 pub fn with_trace_context(mut self, trace_context: TraceContext) -> Self {
665 let mut context = self.context.unwrap_or_default();
666 context.trace = Some(trace_context);
667 self.context = Some(context);
668 self
669 }
670
671 /// Resolves and verifies the `ClickHouse` server destination early.
672 ///
673 /// This method resolves the configured destination (set via
674 /// [`ClientBuilder::with_endpoint`], etc.) into socket addresses and verifies that
675 /// it is valid and reachable. It also ensures that a domain is set for TLS
676 /// connections if required. Verification is performed automatically during
677 /// [`ClientBuilder::build`], but calling this method explicitly allows early error
678 /// detection.
679 ///
680 /// # Returns
681 /// A [`Result`] containing the verified [`ClientBuilder`], or an error if
682 /// verification fails.
683 ///
684 /// # Errors
685 /// - Fails if no destination is set ([`Error::MissingConnectionInformation`]).
686 /// - Fails if the destination cannot be resolved ([`Error::MalformedConnectionInformation`]).
687 /// - Fails if TLS is enabled but no domain is provided and cannot be inferred
688 /// ([`Error::MalformedConnectionInformation`]).
689 ///
690 /// # Examples
691 /// ```rust,ignore
692 /// use clickhouse_arrow::prelude::*;
693 ///
694 /// let builder = ClientBuilder::new()
695 /// .with_endpoint("localhost:9000");
696 /// let verified_builder = builder.verify().await.unwrap();
697 /// println!("Destination verified!");
698 /// ```
699 pub async fn verify(mut self) -> Result<Self> {
700 let (addrs, domain) = {
701 let destination =
702 self.destination.as_ref().ok_or(Error::MissingConnectionInformation)?;
703 let addrs = destination
704 .resolve(self.options.ipv4_only)
705 .await
706 .inspect_err(|error| error!(?error, "Failed to resolve destination"))?;
707 if addrs.is_empty() {
708 return Err(Error::MalformedConnectionInformation(
709 "Socket addresses cannot be empty".into(),
710 ));
711 }
712
713 if self.options.use_tls && self.options.domain.is_none() {
714 let domain = destination.domain();
715 if domain.is_empty() {
716 return Err(Error::MalformedConnectionInformation(
717 "Domain required for TLS, couldn't be determined from destination".into(),
718 ));
719 }
720 (addrs, Some(domain))
721 } else {
722 (addrs, self.options.domain)
723 }
724 };
725
726 self.options.domain = domain;
727 self.destination = Some(Destination::from(addrs));
728 self.verified = true;
729
730 Ok(self)
731 }
732
733 /// Builds a `ClickHouse` client by connecting to the configured destination.
734 ///
735 /// This method creates a [`Client`] using the configured destination, options,
736 /// settings, and context. It verifies the destination (via
737 /// [`ClientBuilder::verify`] if not already verified) and establishes a connection
738 /// to the `ClickHouse` server. The client type (`NativeClient` or `Client<ArrowFormat>`)
739 /// is determined by the format `T` (e.g., [`NativeFormat`] or [`ArrowFormat`]).
740 ///
741 /// # Parameters
742 /// - `T`: The client format, either [`NativeFormat`] or [`ArrowFormat`].
743 ///
744 /// # Returns
745 /// A [`Result`] containing the connected [`Client<T>`], or an error if the
746 /// connection fails.
747 ///
748 /// # Errors
749 /// - Fails if the destination is unset or invalid ([`Error::MissingConnectionInformation`],
750 /// [`Error::MalformedConnectionInformation`]).
751 /// - Fails if the connection cannot be established (e.g., network issues, authentication
752 /// failure).
753 /// - Fails if TLS or cloud-specific configurations are invalid.
754 ///
755 /// # Panics
756 /// - Shouldn't panic, verification guarantees destination.
757 ///
758 /// # Examples
759 /// ```rust,ignore
760 /// use clickhouse_arrow::prelude::*;
761 ///
762 /// let client = ClientBuilder::new()
763 /// .with_endpoint("localhost:9000")
764 /// .with_username("default")
765 /// .build_arrow()
766 /// .await
767 /// .unwrap();
768 /// client.query("SELECT 1").await.unwrap();
769 /// ```
770 pub async fn build<T: ClientFormat>(self) -> Result<Client<T>> {
771 let verified_builder = if self.verified { self } else { self.verify().await? };
772
773 Client::connect(
774 verified_builder.destination.unwrap(), // Guaranteed in verify above
775 verified_builder.options,
776 verified_builder.settings.map(Arc::new),
777 verified_builder.context,
778 )
779 .await
780 }
781
782 /// A helper method to build a [`Client<ArrowFormat>`] directly
783 ///
784 /// # Errors
785 /// - Returns an error if destination verification fails.
786 ///
787 /// # Panics
788 /// - Shouldn't panic, verification guarantees destination.
789 pub async fn build_arrow(self) -> Result<Client<ArrowFormat>> {
790 Self::build::<ArrowFormat>(self).await
791 }
792
793 /// A helper method to build a [`Client<NativeFormat>`] directly
794 ///
795 /// # Errors
796 /// - Returns an error if destination verification fails.
797 ///
798 /// # Panics
799 /// - Shouldn't panic, verification guarantees destination.
800 pub async fn build_native(self) -> Result<Client<NativeFormat>> {
801 Self::build::<NativeFormat>(self).await
802 }
803
804 /// Builds a connection pool manager for `ClickHouse` clients.
805 ///
806 /// This method creates a [`ConnectionManager<T>`] for managing a pool of
807 /// [`Client<T>`] instances, allowing efficient reuse of connections. It verifies
808 /// the destination and configures the manager with the specified health check
809 /// behavior. The client type (`NativeClient` or `Client<ArrowFormat>`) is determined by
810 /// the format `T` (e.g., [`NativeFormat`] or [`ArrowFormat`]).
811 ///
812 /// # Parameters
813 /// - `check_health`: If `true`, the manager performs health checks on connections before
814 /// reusing them.
815 ///
816 /// # Returns
817 /// A [`Result`] containing the [`ConnectionManager<T>`], or an error if
818 /// verification or setup fails.
819 ///
820 /// # Errors
821 /// - Fails if the destination is unset or invalid ([`Error::MissingConnectionInformation`],
822 /// [`Error::MalformedConnectionInformation`]).
823 /// - Fails if the manager cannot be initialized (e.g., invalid configuration).
824 ///
825 /// # Feature
826 /// Requires the `pool` feature to be enabled.
827 ///
828 /// # Examples
829 /// ```rust,ignore
830 /// use clickhouse_arrow::prelude::*;
831 ///
832 /// let manager = ClientBuilder::new()
833 /// .with_endpoint("localhost:9000")
834 /// .with_username("default")
835 /// .build_pool_manager::<ArrowFormat>(true)
836 /// .await
837 /// .unwrap();
838 /// // Use the manager to get a client from the pool
839 /// let client = manager.get().await.unwrap();
840 /// client.query("SELECT 1").await.unwrap();
841 /// ```
842 #[cfg(feature = "pool")]
843 pub async fn build_pool_manager<T: ClientFormat>(
844 self,
845 check_health: bool,
846 ) -> Result<ConnectionManager<T>> {
847 let manager =
848 ConnectionManager::<T>::try_new_with_builder(self).await?.with_check(check_health);
849 Ok(manager)
850 }
851}
852
853impl ClientBuilder {
854 /// Generates a unique identifier for the connection configuration.
855 ///
856 /// This method creates a string identifier based on the destination, username,
857 /// password, database, and domain configured in the builder. It is useful for
858 /// distinguishing between different connection configurations, especially when
859 /// managing multiple clients or pools. The identifier includes a hashed password
860 /// for security.
861 ///
862 /// # Returns
863 /// A `String` representing the connection configuration.
864 ///
865 /// # Examples
866 /// ```rust,ignore
867 /// use clickhouse_arrow::prelude::*;
868 ///
869 /// let builder = ClientBuilder::new()
870 /// .with_endpoint("localhost:9000")
871 /// .with_username("default")
872 /// .with_password("secret");
873 /// let id = builder.connection_identifier();
874 /// println!("Connection ID: {}", id);
875 /// ```
876 pub fn connection_identifier(&self) -> String {
877 let mut dest_str = self.destination.as_ref().map_or(String::new(), Destination::domain);
878 dest_str.push_str(&self.options.username);
879 let mut hasher = rustc_hash::FxHasher::default();
880 self.options.password.hash(&mut hasher);
881 dest_str.push_str(&hasher.finish().to_string());
882 dest_str.push_str(&self.options.default_database);
883 if let Some(d) = self.options.domain.as_ref() {
884 dest_str.push_str(d);
885 }
886 dest_str
887 }
888}
889
890// Cloud related configuration
891#[cfg(feature = "cloud")]
892impl ClientBuilder {
893 /// Enables or disables a cloud wakeup ping for `ClickHouse` cloud instances.
894 ///
895 /// This method configures whether the client will send a lightweight ping to wake
896 /// up a `ClickHouse` cloud instance before connecting. This is useful for ensuring
897 /// the instance is active, reducing connection latency. The ping is sent during
898 /// [`ClientBuilder::build`] if enabled.
899 ///
900 /// # Parameters
901 /// - `ping`: If `true`, enables the cloud wakeup ping; if `false`, disables it.
902 ///
903 /// # Returns
904 /// A new [`ClientBuilder`] with the updated cloud wakeup setting.
905 ///
906 /// # Feature
907 /// Requires the `cloud` feature to be enabled.
908 ///
909 /// # Examples
910 /// ```rust,ignore
911 /// use clickhouse_arrow::prelude::*;
912 ///
913 /// let builder = ClientBuilder::new()
914 /// .with_endpoint("cloud.clickhouse.com:9000")
915 /// .with_cloud_wakeup(true);
916 /// ```
917 #[must_use]
918 pub fn with_cloud_wakeup(mut self, ping: bool) -> Self {
919 self.options.ext.cloud.wakeup = ping;
920 self
921 }
922
923 /// Sets the maximum timeout for the cloud wakeup ping.
924 ///
925 /// This method configures the maximum time (in seconds) that the cloud wakeup ping
926 /// (enabled via [`ClientBuilder::with_cloud_wakeup`]) will wait for a response from
927 /// the `ClickHouse` cloud instance. If not set, a default timeout is used.
928 ///
929 /// # Parameters
930 /// - `timeout`: The timeout in seconds.
931 ///
932 /// # Returns
933 /// A new [`ClientBuilder`] with the updated cloud timeout setting.
934 ///
935 /// # Feature
936 /// Requires the `cloud` feature to be enabled.
937 ///
938 /// # Examples
939 /// ```rust,ignore
940 /// use clickhouse_arrow::prelude::*;
941 ///
942 /// let builder = ClientBuilder::new()
943 /// .with_endpoint("cloud.clickhouse.com:9000")
944 /// .with_cloud_wakeup(true)
945 /// .with_cloud_timeout(10);
946 /// ```
947 #[must_use]
948 pub fn with_cloud_timeout(mut self, timeout: u64) -> Self {
949 self.options.ext.cloud.timeout = Some(timeout);
950 self
951 }
952
953 /// Sets a tracker for monitoring cloud wakeup pings.
954 ///
955 /// This method configures a shared `Arc<AtomicBool>` to track whether a
956 /// `ClickHouse` cloud instance has been pinged across multiple client instances.
957 /// This is useful for coordinating wakeup operations in a multi-client
958 /// environment. The tracker is used when cloud wakeup is enabled (via
959 /// [`ClientBuilder::with_cloud_wakeup`]).
960 ///
961 /// # Parameters
962 /// - `track`: A shared `Arc<AtomicBool>` to track ping status.
963 ///
964 /// # Returns
965 /// A new [`ClientBuilder`] with the updated cloud tracker.
966 ///
967 /// # Feature
968 /// Requires the `cloud` feature to be enabled.
969 ///
970 /// # Examples
971 /// ```rust,ignore
972 /// use clickhouse_arrow::prelude::*;
973 /// use std::sync::{Arc, atomic::AtomicBool};
974 ///
975 /// let tracker = Arc::new(AtomicBool::new(false));
976 /// let builder = ClientBuilder::new()
977 /// .with_endpoint("cloud.clickhouse.com:9000")
978 /// .with_cloud_wakeup(true)
979 /// .with_cloud_track(tracker);
980 /// ```
981 #[must_use]
982 pub fn with_cloud_track(mut self, track: Arc<std::sync::atomic::AtomicBool>) -> Self {
983 let mut context = self.context.unwrap_or_default();
984 context.cloud = Some(track);
985 self.context = Some(context);
986 self
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
993 use std::path::PathBuf;
994
995 use super::*;
996
997 fn default_builder() -> ClientBuilder { ClientBuilder::new() }
998
999 #[test]
1000 fn test_accessors_empty() {
1001 let builder = default_builder();
1002 assert_eq!(builder.destination(), None);
1003 assert!(!builder.options().use_tls);
1004 assert_eq!(builder.settings(), None);
1005 assert!(!builder.verified());
1006 }
1007
1008 #[test]
1009 fn test_accessors_configured() {
1010 let settings = Settings::default();
1011 let builder = default_builder()
1012 .with_socket_addr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9000))
1013 .with_settings(settings.clone())
1014 .with_options(ClientOptions { use_tls: true, ..Default::default() });
1015 assert!(builder.destination().is_some());
1016 assert!(builder.options().use_tls);
1017 assert_eq!(builder.settings(), Some(&settings));
1018 assert!(!builder.verified());
1019 }
1020
1021 #[test]
1022 fn test_with_socket_addr() {
1023 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9000);
1024 let builder = default_builder().with_socket_addr(addr);
1025 assert_eq!(builder.destination(), Some(&Destination::from(addr)));
1026 assert!(!builder.verified());
1027 }
1028
1029 #[test]
1030 fn test_with_host_port() {
1031 let builder = default_builder().with_host_port("localhost", 9000);
1032 assert_eq!(
1033 builder.destination(),
1034 Some(&Destination::from(("localhost".to_string(), 9000)))
1035 );
1036 assert!(!builder.verified());
1037 }
1038
1039 #[test]
1040 fn test_with_options() {
1041 let options =
1042 ClientOptions { username: "test".to_string(), use_tls: true, ..Default::default() };
1043 let builder = default_builder().with_options(options.clone());
1044 assert_eq!(builder.options(), &options);
1045 assert!(!builder.verified());
1046 }
1047
1048 #[test]
1049 fn test_with_tls() {
1050 let builder = default_builder().with_tls(true);
1051 assert!(builder.options().use_tls);
1052 assert!(!builder.verified());
1053
1054 let builder = builder.with_tls(true); // Same value, no change
1055 assert!(!builder.verified());
1056
1057 let builder = builder.with_tls(false); // Change value
1058 assert!(!builder.options().use_tls);
1059 assert!(!builder.verified());
1060 }
1061
1062 #[test]
1063 fn test_with_cafile() {
1064 let cafile = PathBuf::from("/path/to/ca.pem");
1065 let builder = default_builder().with_cafile(&cafile);
1066 assert_eq!(builder.options().cafile, Some(cafile));
1067 }
1068
1069 #[test]
1070 fn test_with_settings() {
1071 let settings = Settings::default();
1072 let builder = default_builder().with_settings(settings.clone());
1073 assert_eq!(builder.settings(), Some(&settings));
1074 }
1075
1076 #[test]
1077 fn test_with_database() {
1078 let builder = default_builder().with_database("test_db");
1079 assert_eq!(builder.options().default_database, "test_db");
1080 }
1081
1082 #[test]
1083 fn test_with_domain() {
1084 let builder = default_builder().with_domain("example.com");
1085 assert_eq!(builder.options().domain, Some("example.com".to_string()));
1086 assert!(!builder.verified());
1087 }
1088
1089 #[test]
1090 fn test_with_trace_context() {
1091 let trace_context = TraceContext::default();
1092 let builder = default_builder().with_trace_context(trace_context);
1093 assert_eq!(builder.context.unwrap().trace, Some(trace_context));
1094 }
1095
1096 #[test]
1097 fn test_connection_identifier() {
1098 let builder = default_builder()
1099 .with_endpoint("localhost:9000")
1100 .with_username("user")
1101 .with_password("pass")
1102 .with_database("db")
1103 .with_domain("example.com");
1104 let id = builder.connection_identifier();
1105 assert!(id.contains("localhost"));
1106 assert!(id.contains("user"));
1107 assert!(id.contains("db"));
1108 assert!(id.contains("example.com"));
1109
1110 let empty_builder = default_builder();
1111 assert_eq!(empty_builder.connection_identifier(), "default13933120620573868840");
1112 }
1113
1114 #[tokio::test]
1115 async fn test_verify_empty_addrs() {
1116 let builder = default_builder()
1117 .with_destination(Destination::from(vec![])) // Empty SocketAddrs
1118 .verify()
1119 .await;
1120 assert!(matches!(
1121 builder,
1122 Err(Error::MalformedConnectionInformation(msg))
1123 if msg.contains("Socket addresses cannot be empty")
1124 ));
1125 }
1126
1127 #[tokio::test]
1128 async fn test_verify_no_connection_information() {
1129 let builder = default_builder().verify().await;
1130 assert!(matches!(builder, Err(Error::MissingConnectionInformation)));
1131 }
1132
1133 #[cfg(feature = "pool")]
1134 #[tokio::test]
1135 async fn test_build_pool_manager() {
1136 use crate::formats::ArrowFormat;
1137
1138 let builder = default_builder()
1139 .with_endpoint("localhost:9000")
1140 .with_username("user")
1141 .verify()
1142 .await
1143 .unwrap();
1144 let manager = builder.build_pool_manager::<ArrowFormat>(true).await;
1145 assert!(manager.is_ok());
1146 }
1147
1148 #[cfg(feature = "cloud")]
1149 #[test]
1150 fn test_with_cloud_timeout() {
1151 let builder = default_builder().with_cloud_timeout(5000);
1152 assert_eq!(builder.options().ext.cloud.timeout, Some(5000));
1153 }
1154
1155 #[cfg(feature = "cloud")]
1156 #[test]
1157 fn test_with_cloud_wakeup() {
1158 let builder = default_builder().with_cloud_wakeup(true);
1159 assert!(builder.options().ext.cloud.wakeup);
1160 }
1161
1162 #[cfg(feature = "cloud")]
1163 #[test]
1164 fn test_with_cloud_track() {
1165 use std::sync::atomic::Ordering;
1166
1167 let track = Arc::new(std::sync::atomic::AtomicBool::new(false));
1168 let builder = default_builder().with_cloud_track(Arc::clone(&track));
1169 assert_eq!(
1170 builder.context.unwrap().cloud.unwrap().load(Ordering::SeqCst),
1171 track.load(Ordering::SeqCst)
1172 );
1173 }
1174}