Skip to main content

hyperdb_api/
async_connection_builder.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Fluent builder for [`AsyncConnection`].
5//!
6//! Mirrors [`ConnectionBuilder`](crate::ConnectionBuilder) one-for-one,
7//! differing only in that `build()` and the transport-specific builders
8//! are `async`. The field set and defaults are identical so users can
9//! swap between the two with minimal friction.
10
11use std::path::{Path, PathBuf};
12use std::time::Duration;
13
14use crate::async_connection::AsyncConnection;
15use crate::async_transport::AsyncTransport;
16use crate::connection::CreateMode;
17use crate::error::{Error, Result};
18use crate::transport::{detect_transport_type, TransportType};
19use hyperdb_api_core::client::{AsyncClient, Config};
20
21/// An async builder for creating database connections.
22///
23/// See [`ConnectionBuilder`](crate::ConnectionBuilder) for the sync
24/// equivalent and the full contract. All setters are identical; only
25/// the terminal `build()` calls are async.
26///
27/// # Example
28///
29/// ```no_run
30/// use hyperdb_api::{AsyncConnectionBuilder, CreateMode, Result};
31///
32/// #[tokio::main]
33/// async fn main() -> Result<()> {
34///     let conn = AsyncConnectionBuilder::new("localhost:7483")
35///         .database("example.hyper")
36///         .create_mode(CreateMode::CreateIfNotExists)
37///         .auth("myuser", "mypassword")
38///         .build()
39///         .await?;
40///     Ok(())
41/// }
42/// ```
43#[derive(Debug, Clone)]
44pub struct AsyncConnectionBuilder {
45    endpoint: String,
46    database: Option<PathBuf>,
47    create_mode: CreateMode,
48    user: Option<String>,
49    password: Option<String>,
50    login_timeout: Option<Duration>,
51    query_timeout: Option<Duration>,
52    application_name: Option<String>,
53    transfer_mode: Option<hyperdb_api_core::client::grpc::TransferMode>,
54}
55
56impl Default for AsyncConnectionBuilder {
57    fn default() -> Self {
58        Self::new("localhost:7483")
59    }
60}
61
62impl AsyncConnectionBuilder {
63    /// Creates a new builder for the given endpoint.
64    pub fn new(endpoint: impl Into<String>) -> Self {
65        Self {
66            endpoint: endpoint.into(),
67            database: None,
68            create_mode: CreateMode::default(),
69            user: Some("tableau_internal_user".to_string()),
70            password: None,
71            login_timeout: None,
72            query_timeout: None,
73            application_name: None,
74            transfer_mode: None,
75        }
76    }
77
78    #[must_use]
79    /// Sets the database path.
80    pub fn database(mut self, path: impl AsRef<Path>) -> Self {
81        self.database = Some(path.as_ref().to_path_buf());
82        self
83    }
84
85    /// Sets the database creation mode.
86    #[must_use]
87    pub fn create_mode(mut self, mode: CreateMode) -> Self {
88        self.create_mode = mode;
89        self
90    }
91
92    #[must_use]
93    /// Sets the username for authentication.
94    pub fn user(mut self, user: impl Into<String>) -> Self {
95        self.user = Some(user.into());
96        self
97    }
98
99    #[must_use]
100    /// Sets the password for authentication.
101    pub fn password(mut self, password: impl Into<String>) -> Self {
102        self.password = Some(password.into());
103        self
104    }
105
106    /// Sets the login timeout.
107    #[must_use]
108    pub fn login_timeout(mut self, timeout: Duration) -> Self {
109        self.login_timeout = Some(timeout);
110        self
111    }
112
113    /// Sets the query timeout.
114    #[must_use]
115    pub fn query_timeout(mut self, timeout: Duration) -> Self {
116        self.query_timeout = Some(timeout);
117        self
118    }
119
120    #[must_use]
121    /// Sets the application name sent to the server.
122    pub fn application_name(mut self, name: impl Into<String>) -> Self {
123        self.application_name = Some(name.into());
124        self
125    }
126
127    #[must_use]
128    /// Convenience method to set user and password at once.
129    pub fn auth(mut self, user: impl Into<String>, password: impl Into<String>) -> Self {
130        self.user = Some(user.into());
131        self.password = Some(password.into());
132        self
133    }
134
135    #[must_use]
136    /// Convenience method to create a new database.
137    pub fn create_new_database(mut self, database_path: impl AsRef<Path>) -> Self {
138        self.database = Some(database_path.as_ref().to_path_buf());
139        self.create_mode = CreateMode::Create;
140        self
141    }
142
143    #[must_use]
144    /// Convenience method to create database if it doesn't exist.
145    pub fn create_or_open_database(mut self, database_path: impl AsRef<Path>) -> Self {
146        self.database = Some(database_path.as_ref().to_path_buf());
147        self.create_mode = CreateMode::CreateIfNotExists;
148        self
149    }
150
151    #[must_use]
152    /// Convenience method to open an existing database.
153    pub fn open_database(mut self, database_path: impl AsRef<Path>) -> Self {
154        self.database = Some(database_path.as_ref().to_path_buf());
155        self.create_mode = CreateMode::DoNotCreate;
156        self
157    }
158
159    /// Sets the transfer mode for gRPC connections (ignored for TCP).
160    #[must_use]
161    pub fn transfer_mode(mut self, mode: hyperdb_api_core::client::grpc::TransferMode) -> Self {
162        self.transfer_mode = Some(mode);
163        self
164    }
165
166    /// Builds and establishes the connection (async).
167    ///
168    /// Transport is auto-detected from the endpoint URL:
169    /// - `https://` / `http://` → gRPC
170    /// - `tab.domain://` → Unix domain socket (Unix only)
171    /// - `tab.pipe://` → named pipe (Windows only)
172    /// - otherwise → TCP
173    ///
174    /// # Errors
175    ///
176    /// - Returns [`Error::Io`] or [`Error::Client`] if the transport
177    ///   handshake fails (TCP refused, TLS rejected, named-pipe not
178    ///   found, gRPC channel setup failure).
179    /// - Returns [`Error::Client`] if authentication is rejected.
180    /// - Returns [`Error::Client`] if the `CreateMode` SQL is rejected
181    ///   for a builder that configured a database path.
182    pub async fn build(self) -> Result<AsyncConnection> {
183        let transport_type = detect_transport_type(&self.endpoint);
184        match transport_type {
185            TransportType::Tcp => self.build_tcp().await,
186            #[cfg(unix)]
187            TransportType::UnixSocket => self.build_unix().await,
188            #[cfg(windows)]
189            TransportType::NamedPipe => self.build_named_pipe().await,
190            TransportType::Grpc => self.build_grpc().await,
191        }
192    }
193
194    /// Build a TCP connection (async).
195    async fn build_tcp(self) -> Result<AsyncConnection> {
196        let mut config: Config = self
197            .endpoint
198            .parse()
199            .map_err(|e| Error::new(format!("invalid endpoint: {e}")))?;
200
201        if let Some(user) = &self.user {
202            config = config.with_user(user);
203        }
204        if let Some(password) = &self.password {
205            config = config.with_password(password);
206        }
207        if let Some(ref app_name) = self.application_name {
208            config = config.with_application_name(app_name);
209        }
210        if let Some(timeout) = self.login_timeout {
211            config = config.with_connect_timeout(timeout);
212        }
213
214        let db_path_str = self
215            .database
216            .as_ref()
217            .map(|p| p.to_string_lossy().to_string());
218
219        let client = AsyncClient::connect(&config).await?;
220        let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
221
222        if let Some(db_path) = db_path_str {
223            conn.handle_creation_mode_public(&db_path, self.create_mode)
224                .await?;
225            conn.attach_and_set_path_public(&db_path).await?;
226        }
227
228        Ok(conn)
229    }
230
231    /// Build a Unix domain socket connection (async, Unix only).
232    #[cfg(unix)]
233    async fn build_unix(self) -> Result<AsyncConnection> {
234        use hyperdb_api_core::client::ConnectionEndpoint;
235
236        let socket_path = if self.endpoint.starts_with("tab.domain://") {
237            let endpoint = ConnectionEndpoint::parse(&self.endpoint)
238                .map_err(|e| Error::new(format!("invalid Unix socket endpoint: {e}")))?;
239            match endpoint {
240                ConnectionEndpoint::DomainSocket { directory, name } => directory.join(&name),
241                ConnectionEndpoint::Tcp { .. } => {
242                    return Err(Error::new("expected Unix domain socket endpoint"))
243                }
244            }
245        } else {
246            std::path::PathBuf::from(&self.endpoint)
247        };
248
249        let mut config = Config::new();
250        if let Some(user) = &self.user {
251            config = config.with_user(user);
252        }
253        if let Some(password) = &self.password {
254            config = config.with_password(password);
255        }
256
257        let db_path_str = self
258            .database
259            .as_ref()
260            .map(|p| p.to_string_lossy().to_string());
261
262        let client = AsyncClient::connect_unix(&socket_path, &config).await?;
263        let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
264
265        if let Some(db_path) = db_path_str {
266            conn.handle_creation_mode_public(&db_path, self.create_mode)
267                .await?;
268            conn.attach_and_set_path_public(&db_path).await?;
269        }
270
271        Ok(conn)
272    }
273
274    /// Build a Windows Named Pipe connection (async, Windows only).
275    #[cfg(windows)]
276    async fn build_named_pipe(self) -> Result<AsyncConnection> {
277        use hyperdb_api_core::client::ConnectionEndpoint;
278
279        let pipe_path = if self.endpoint.starts_with("tab.pipe://") {
280            let endpoint = ConnectionEndpoint::parse(&self.endpoint)
281                .map_err(|e| Error::new(format!("invalid named pipe endpoint: {e}")))?;
282            match endpoint {
283                ConnectionEndpoint::NamedPipe { host, name } => {
284                    format!(r"\\{host}\pipe\{name}")
285                }
286                _ => return Err(Error::new("expected named pipe endpoint")),
287            }
288        } else {
289            self.endpoint.clone()
290        };
291
292        let mut config = Config::new();
293        if let Some(user) = &self.user {
294            config = config.with_user(user);
295        }
296        if let Some(password) = &self.password {
297            config = config.with_password(password);
298        }
299
300        let db_path_str = self
301            .database
302            .as_ref()
303            .map(|p| p.to_string_lossy().to_string());
304
305        let client = AsyncClient::connect_named_pipe(&pipe_path, &config).await?;
306        let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
307
308        if let Some(db_path) = db_path_str {
309            conn.handle_creation_mode_public(&db_path, self.create_mode)
310                .await?;
311            conn.attach_and_set_path_public(&db_path).await?;
312        }
313
314        Ok(conn)
315    }
316
317    /// Build a gRPC connection (async).
318    async fn build_grpc(self) -> Result<AsyncConnection> {
319        if self.create_mode != CreateMode::DoNotCreate {
320            return Err(Error::new(
321                "gRPC transport is read-only. Use CreateMode::DoNotCreate for gRPC connections.",
322            ));
323        }
324
325        let db_path_str = self
326            .database
327            .as_ref()
328            .map(|p| p.to_string_lossy().to_string());
329
330        let mut grpc_config = hyperdb_api_core::client::grpc::GrpcConfig::new(&self.endpoint);
331        if let Some(ref db_path) = db_path_str {
332            grpc_config = grpc_config.database(db_path);
333        }
334        if let Some(mode) = self.transfer_mode {
335            grpc_config = grpc_config.transfer_mode(mode);
336        }
337
338        let transport = AsyncTransport::connect_grpc(grpc_config).await?;
339        Ok(AsyncConnection::from_transport(transport, db_path_str))
340    }
341}