hyperdb_api/
async_connection_builder.rs1use std::path::{Path, PathBuf};
12use std::time::Duration;
13
14use crate::async_connection::AsyncConnection;
15use crate::async_transport::AsyncTransport;
16use crate::connection::CreateMode;
17use crate::error::{Error, Result};
18use crate::transport::{detect_transport_type, TransportType};
19use hyperdb_api_core::client::{AsyncClient, Config};
20
21#[derive(Debug, Clone)]
44pub struct AsyncConnectionBuilder {
45 endpoint: String,
46 database: Option<PathBuf>,
47 create_mode: CreateMode,
48 user: Option<String>,
49 password: Option<String>,
50 login_timeout: Option<Duration>,
51 query_timeout: Option<Duration>,
52 application_name: Option<String>,
53 transfer_mode: Option<hyperdb_api_core::client::grpc::TransferMode>,
54}
55
56impl Default for AsyncConnectionBuilder {
57 fn default() -> Self {
58 Self::new("localhost:7483")
59 }
60}
61
62impl AsyncConnectionBuilder {
63 pub fn new(endpoint: impl Into<String>) -> Self {
65 Self {
66 endpoint: endpoint.into(),
67 database: None,
68 create_mode: CreateMode::default(),
69 user: Some("tableau_internal_user".to_string()),
70 password: None,
71 login_timeout: None,
72 query_timeout: None,
73 application_name: None,
74 transfer_mode: None,
75 }
76 }
77
78 #[must_use]
79 pub fn database(mut self, path: impl AsRef<Path>) -> Self {
81 self.database = Some(path.as_ref().to_path_buf());
82 self
83 }
84
85 #[must_use]
87 pub fn create_mode(mut self, mode: CreateMode) -> Self {
88 self.create_mode = mode;
89 self
90 }
91
92 #[must_use]
93 pub fn user(mut self, user: impl Into<String>) -> Self {
95 self.user = Some(user.into());
96 self
97 }
98
99 #[must_use]
100 pub fn password(mut self, password: impl Into<String>) -> Self {
102 self.password = Some(password.into());
103 self
104 }
105
106 #[must_use]
108 pub fn login_timeout(mut self, timeout: Duration) -> Self {
109 self.login_timeout = Some(timeout);
110 self
111 }
112
113 #[must_use]
115 pub fn query_timeout(mut self, timeout: Duration) -> Self {
116 self.query_timeout = Some(timeout);
117 self
118 }
119
120 #[must_use]
121 pub fn application_name(mut self, name: impl Into<String>) -> Self {
123 self.application_name = Some(name.into());
124 self
125 }
126
127 #[must_use]
128 pub fn auth(mut self, user: impl Into<String>, password: impl Into<String>) -> Self {
130 self.user = Some(user.into());
131 self.password = Some(password.into());
132 self
133 }
134
135 #[must_use]
136 pub fn create_new_database(mut self, database_path: impl AsRef<Path>) -> Self {
138 self.database = Some(database_path.as_ref().to_path_buf());
139 self.create_mode = CreateMode::Create;
140 self
141 }
142
143 #[must_use]
144 pub fn create_or_open_database(mut self, database_path: impl AsRef<Path>) -> Self {
146 self.database = Some(database_path.as_ref().to_path_buf());
147 self.create_mode = CreateMode::CreateIfNotExists;
148 self
149 }
150
151 #[must_use]
152 pub fn open_database(mut self, database_path: impl AsRef<Path>) -> Self {
154 self.database = Some(database_path.as_ref().to_path_buf());
155 self.create_mode = CreateMode::DoNotCreate;
156 self
157 }
158
159 #[must_use]
161 pub fn transfer_mode(mut self, mode: hyperdb_api_core::client::grpc::TransferMode) -> Self {
162 self.transfer_mode = Some(mode);
163 self
164 }
165
166 pub async fn build(self) -> Result<AsyncConnection> {
183 let transport_type = detect_transport_type(&self.endpoint);
184 match transport_type {
185 TransportType::Tcp => self.build_tcp().await,
186 #[cfg(unix)]
187 TransportType::UnixSocket => self.build_unix().await,
188 #[cfg(windows)]
189 TransportType::NamedPipe => self.build_named_pipe().await,
190 TransportType::Grpc => self.build_grpc().await,
191 }
192 }
193
194 async fn build_tcp(self) -> Result<AsyncConnection> {
196 let mut config: Config = self
197 .endpoint
198 .parse()
199 .map_err(|e| Error::new(format!("invalid endpoint: {e}")))?;
200
201 if let Some(user) = &self.user {
202 config = config.with_user(user);
203 }
204 if let Some(password) = &self.password {
205 config = config.with_password(password);
206 }
207 if let Some(ref app_name) = self.application_name {
208 config = config.with_application_name(app_name);
209 }
210 if let Some(timeout) = self.login_timeout {
211 config = config.with_connect_timeout(timeout);
212 }
213
214 let db_path_str = self
215 .database
216 .as_ref()
217 .map(|p| p.to_string_lossy().to_string());
218
219 let client = AsyncClient::connect(&config).await?;
220 let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
221
222 if let Some(db_path) = db_path_str {
223 conn.handle_creation_mode_public(&db_path, self.create_mode)
224 .await?;
225 conn.attach_and_set_path_public(&db_path).await?;
226 }
227
228 Ok(conn)
229 }
230
231 #[cfg(unix)]
233 async fn build_unix(self) -> Result<AsyncConnection> {
234 use hyperdb_api_core::client::ConnectionEndpoint;
235
236 let socket_path = if self.endpoint.starts_with("tab.domain://") {
237 let endpoint = ConnectionEndpoint::parse(&self.endpoint)
238 .map_err(|e| Error::new(format!("invalid Unix socket endpoint: {e}")))?;
239 match endpoint {
240 ConnectionEndpoint::DomainSocket { directory, name } => directory.join(&name),
241 ConnectionEndpoint::Tcp { .. } => {
242 return Err(Error::new("expected Unix domain socket endpoint"))
243 }
244 }
245 } else {
246 std::path::PathBuf::from(&self.endpoint)
247 };
248
249 let mut config = Config::new();
250 if let Some(user) = &self.user {
251 config = config.with_user(user);
252 }
253 if let Some(password) = &self.password {
254 config = config.with_password(password);
255 }
256
257 let db_path_str = self
258 .database
259 .as_ref()
260 .map(|p| p.to_string_lossy().to_string());
261
262 let client = AsyncClient::connect_unix(&socket_path, &config).await?;
263 let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
264
265 if let Some(db_path) = db_path_str {
266 conn.handle_creation_mode_public(&db_path, self.create_mode)
267 .await?;
268 conn.attach_and_set_path_public(&db_path).await?;
269 }
270
271 Ok(conn)
272 }
273
274 #[cfg(windows)]
276 async fn build_named_pipe(self) -> Result<AsyncConnection> {
277 use hyperdb_api_core::client::ConnectionEndpoint;
278
279 let pipe_path = if self.endpoint.starts_with("tab.pipe://") {
280 let endpoint = ConnectionEndpoint::parse(&self.endpoint)
281 .map_err(|e| Error::new(format!("invalid named pipe endpoint: {e}")))?;
282 match endpoint {
283 ConnectionEndpoint::NamedPipe { host, name } => {
284 format!(r"\\{host}\pipe\{name}")
285 }
286 _ => return Err(Error::new("expected named pipe endpoint")),
287 }
288 } else {
289 self.endpoint.clone()
290 };
291
292 let mut config = Config::new();
293 if let Some(user) = &self.user {
294 config = config.with_user(user);
295 }
296 if let Some(password) = &self.password {
297 config = config.with_password(password);
298 }
299
300 let db_path_str = self
301 .database
302 .as_ref()
303 .map(|p| p.to_string_lossy().to_string());
304
305 let client = AsyncClient::connect_named_pipe(&pipe_path, &config).await?;
306 let conn = AsyncConnection::from_async_client(client, db_path_str.clone());
307
308 if let Some(db_path) = db_path_str {
309 conn.handle_creation_mode_public(&db_path, self.create_mode)
310 .await?;
311 conn.attach_and_set_path_public(&db_path).await?;
312 }
313
314 Ok(conn)
315 }
316
317 async fn build_grpc(self) -> Result<AsyncConnection> {
319 if self.create_mode != CreateMode::DoNotCreate {
320 return Err(Error::new(
321 "gRPC transport is read-only. Use CreateMode::DoNotCreate for gRPC connections.",
322 ));
323 }
324
325 let db_path_str = self
326 .database
327 .as_ref()
328 .map(|p| p.to_string_lossy().to_string());
329
330 let mut grpc_config = hyperdb_api_core::client::grpc::GrpcConfig::new(&self.endpoint);
331 if let Some(ref db_path) = db_path_str {
332 grpc_config = grpc_config.database(db_path);
333 }
334 if let Some(mode) = self.transfer_mode {
335 grpc_config = grpc_config.transfer_mode(mode);
336 }
337
338 let transport = AsyncTransport::connect_grpc(grpc_config).await?;
339 Ok(AsyncConnection::from_transport(transport, db_path_str))
340 }
341}