hyperdb_api/process.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Hyper server process management.
5//!
6//! This module provides [`HyperProcess`] for spawning and managing local hyperd server instances.
7//!
8//! # Callback Connection Architecture
9//!
10//! The `HyperProcess` uses a **callback connection** mechanism for reliable process lifecycle
11//! management. This works as follows:
12//!
13//! 1. **Startup**: The client creates a TCP listener on an ephemeral port (the "callback proxy")
14//! and passes this address to hyperd via `--callback-connection`. Hyper connects back to this
15//! listener and sends its actual listen endpoint over this connection.
16//!
17//! 2. **Runtime**: The callback connection remains open for the lifetime of the `HyperProcess`.
18//! This connection acts as a "dead man's switch" - Hyper monitors it continuously.
19//!
20//! 3. **Graceful Shutdown**: When `HyperProcess` is dropped or explicitly shut down, the callback
21//! connection is closed. Hyper detects this and initiates graceful shutdown automatically.
22//!
23//! 4. **Crash Safety**: If the client process crashes or is killed, the OS automatically closes
24//! the TCP connection. Hyper detects this and shuts down gracefully, preventing orphan
25//! processes. This is the key advantage over signal-based shutdown.
26//!
27//! # Protocol Details
28//!
29//! The callback connection protocol is simple:
30//! - Client listens on `127.0.0.1:<ephemeral_port>`
31//! - Client starts hyperd with `--callback-connection=tab.tcp://127.0.0.1:<port>`
32//! - Hyper connects to this address and sends: `[1 byte length][N bytes descriptor]`
33//! - The descriptor is the actual listen endpoint (e.g., `tab.tcp://localhost:54321`)
34//! - Connection stays open until shutdown is desired
35//!
36//! # Listen Modes
37//!
38//! `HyperProcess` supports different listen modes via [`ListenMode`]:
39//!
40//! - **`LibPq`** (default): `PostgreSQL` wire protocol for full read/write access
41//! - **Grpc**: gRPC protocol for query-only Arrow-based access
42//! - **Both**: Both protocols enabled (libpq for read/write, gRPC for Arrow queries)
43//!
44//! ```no_run
45//! use hyperdb_api::{HyperProcess, ListenMode, Parameters, Result};
46//!
47//! fn main() -> Result<()> {
48//! // gRPC only
49//! let mut params = Parameters::new();
50//! params.set_listen_mode(ListenMode::Grpc { port: 0 });
51//! let hyper = HyperProcess::new(None, Some(¶ms))?;
52//! println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
53//!
54//! // Both libpq and gRPC
55//! let mut params = Parameters::new();
56//! params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
57//! let hyper = HyperProcess::new(None, Some(¶ms))?;
58//! println!("libpq endpoint: {}", hyper.endpoint().unwrap());
59//! println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
60//! Ok(())
61//! }
62//! ```
63
64use std::io::Read;
65use std::net::{Shutdown, TcpListener, TcpStream};
66use std::path::{Path, PathBuf};
67use std::process::{Child, Command, Stdio};
68use std::sync::atomic::{AtomicBool, Ordering};
69use std::sync::Arc;
70use std::thread;
71use std::time::Duration;
72
73#[cfg(unix)]
74use std::os::unix::process::CommandExt;
75
76#[cfg(any(unix, windows))]
77use hyperdb_api_core::client::ConnectionEndpoint;
78
79use tracing::info;
80
81use crate::error::{Error, Result};
82
83/// Specifies which protocols `HyperProcess` should listen on.
84///
85/// # Examples
86///
87/// ```
88/// use hyperdb_api::{ListenMode, Parameters};
89///
90/// // LibPq only (default) - for full read/write access
91/// let mut params = Parameters::new();
92/// params.set_listen_mode(ListenMode::LibPq);
93///
94/// // gRPC only - for Arrow-based query access
95/// let mut params = Parameters::new();
96/// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // auto-assign port
97///
98/// // Both protocols - libpq for writes, gRPC for Arrow queries
99/// let mut params = Parameters::new();
100/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
101/// ```
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
103pub enum ListenMode {
104 /// `PostgreSQL` wire protocol only (default).
105 ///
106 /// This is the traditional connection mode that supports all Hyper features
107 /// including read and write operations.
108 #[default]
109 LibPq,
110
111 /// gRPC protocol only.
112 ///
113 /// This mode is optimized for query-only workloads and returns results in
114 /// Arrow IPC format. Note that gRPC mode does not support write operations.
115 ///
116 /// Set `port` to 0 to auto-assign an available port.
117 Grpc {
118 /// The port to listen on (0 for auto-assign).
119 port: u16,
120 },
121
122 /// Both libpq and gRPC protocols.
123 ///
124 /// This mode enables full read/write access via libpq while also providing
125 /// gRPC access for Arrow-based queries. The libpq port is auto-assigned,
126 /// while the gRPC port is specified.
127 ///
128 /// Note: When using `Both` mode, the callback connection returns the libpq
129 /// endpoint. Use `HyperProcess::grpc_endpoint()` to get the gRPC endpoint.
130 Both {
131 /// The gRPC port to listen on (cannot be 0 - must be a specific port).
132 grpc_port: u16,
133 },
134}
135
136/// A running Hyper server instance.
137///
138/// This struct manages the lifecycle of a local Hyper server process using a callback
139/// connection for reliable shutdown. The server is automatically shut down when this
140/// object is dropped.
141///
142/// # Callback Connection (Dead Man's Switch)
143///
144/// Unlike traditional process management that relies on signals, `HyperProcess` maintains
145/// a TCP connection to the Hyper server. When this connection is closed (either explicitly
146/// or because the client process exits), Hyper automatically shuts down gracefully.
147///
148/// This provides several benefits:
149/// - **No orphan processes**: If your application crashes, Hyper shuts down automatically
150/// - **Graceful shutdown**: Hyper can flush data and clean up properly
151/// - **Cross-platform**: Works reliably on macOS, Linux, and Windows
152///
153/// # Example
154///
155/// ```no_run
156/// use hyperdb_api::{HyperProcess, Result};
157///
158/// fn main() -> Result<()> {
159/// // Start a Hyper server (auto-detect hyperd location)
160/// let hyper = HyperProcess::new(None, None)?;
161///
162/// println!("Hyper server running at: {}", hyper.endpoint().unwrap());
163///
164/// // Server automatically shuts down when `hyper` goes out of scope
165/// // via the callback connection mechanism
166/// Ok(())
167/// }
168/// ```
169#[must_use = "HyperProcess will shut down when dropped; store it to keep the server running"]
170#[derive(Debug)]
171pub struct HyperProcess {
172 /// The child process handle.
173 child: Option<Child>,
174 /// The libpq endpoint descriptor (host:port or socket path), if libpq is enabled.
175 endpoint: Option<String>,
176 /// The parsed connection endpoint for libpq.
177 connection_endpoint: Option<ConnectionEndpoint>,
178 /// The gRPC endpoint (host:port), if gRPC is enabled.
179 grpc_endpoint: Option<String>,
180 /// Path to the hyperd executable.
181 #[expect(
182 dead_code,
183 reason = "retained for diagnostics and future restart/respawn paths"
184 )]
185 hyperd_path: PathBuf,
186 /// Whether shutdown has been initiated.
187 shutdown_initiated: Arc<AtomicBool>,
188 /// The callback connection to hyperd.
189 /// Keeping this open maintains the "dead man's switch" - when dropped, Hyper shuts down.
190 callback_connection: Option<TcpStream>,
191 /// The listen mode this process was started with.
192 listen_mode: ListenMode,
193 /// The transport mode this process was started with.
194 transport_mode: TransportMode,
195 /// The socket directory for UDS connections (Unix only).
196 /// This directory is automatically cleaned up on drop.
197 #[cfg(unix)]
198 socket_directory: Option<PathBuf>,
199 /// The pipe name for Named Pipe connections (Windows only).
200 #[cfg(windows)]
201 pipe_name: Option<String>,
202 /// The log directory where hyperd writes its log files.
203 log_dir: Option<PathBuf>,
204}
205
206impl HyperProcess {
207 /// Starts a new Hyper server instance.
208 ///
209 /// This method:
210 /// 1. Creates a callback listener on an ephemeral port
211 /// 2. Starts the hyperd process with the callback connection address
212 /// 3. Waits for Hyper to connect back and provide its listen endpoint
213 ///
214 /// # Arguments
215 ///
216 /// * `hyper_path` - Optional path to the hyperd executable. If `None`, searches
217 /// in common locations (`HYPERD_PATH` env var, then known build output paths).
218 /// * `parameters` - Optional parameters for the server.
219 ///
220 /// # Errors
221 ///
222 /// Returns an error if:
223 /// - The hyperd executable cannot be found
224 /// - The callback listener cannot be created
225 /// - The server fails to start
226 /// - Hyper doesn't connect back within the timeout (30 seconds)
227 ///
228 /// # Example
229 ///
230 /// ```no_run
231 /// use hyperdb_api::{HyperProcess, Result};
232 /// use std::path::Path;
233 ///
234 /// fn main() -> Result<()> {
235 /// // Auto-detect hyperd location
236 /// let hyper = HyperProcess::new(None, None)?;
237 ///
238 /// // Or specify explicit path
239 /// let hyper2 = HyperProcess::new(
240 /// Some(Path::new("/path/to/hyperd")),
241 /// None,
242 /// )?;
243 /// Ok(())
244 /// }
245 /// ```
246 pub fn new(hyper_path: Option<&Path>, parameters: Option<&Parameters>) -> Result<Self> {
247 let hyperd_path = match hyper_path {
248 Some(path) => path.to_path_buf(),
249 None => Self::find_hyperd()?,
250 };
251 Self::start_server(&hyperd_path, parameters)
252 }
253
254 /// Resolves the hyperd executable from the `HYPERD_PATH` environment
255 /// variable. The value can point at the executable directly, or at a
256 /// directory containing it.
257 ///
258 /// If `HYPERD_PATH` is unset, returns an error instructing the caller
259 /// to either set it or run the `hyperd-bootstrap` downloader to
260 /// install a pinned release at `.hyperd/current/hyperd`.
261 fn find_hyperd() -> Result<PathBuf> {
262 #[cfg(windows)]
263 const HYPERD_EXE: &str = "hyperd.exe";
264 #[cfg(not(windows))]
265 const HYPERD_EXE: &str = "hyperd";
266
267 let Ok(path_str) = std::env::var("HYPERD_PATH") else {
268 // Walk up from CWD looking for .hyperd/current/<exe> written by
269 // `hyperd-bootstrap download`. This lets `node examples/foo.mjs`
270 // run from any subdirectory of the repo without exporting HYPERD_PATH.
271 if let Ok(cwd) = std::env::current_dir() {
272 let mut dir = cwd.as_path();
273 loop {
274 let candidate = dir.join(".hyperd").join("current").join(HYPERD_EXE);
275 if candidate.exists() {
276 return Ok(candidate);
277 }
278 match dir.parent() {
279 Some(parent) => dir = parent,
280 None => break,
281 }
282 }
283 }
284 return Err(Error::new(
285 "HYPERD_PATH is not set. Point it at a hyperd executable, \
286 or run `make download-hyperd` (or `cargo run -p hyperd-bootstrap -- download`) \
287 to install a pinned release at `.hyperd/current/hyperd`.",
288 ));
289 };
290
291 let path = PathBuf::from(&path_str);
292 if path.is_dir() {
293 let with_exe = path.join(HYPERD_EXE);
294 if with_exe.exists() {
295 return Ok(with_exe);
296 }
297 #[cfg(windows)]
298 {
299 let without_exe = path.join("hyperd");
300 if without_exe.exists() {
301 return Ok(without_exe);
302 }
303 }
304 return Err(Error::new(format!(
305 "HYPERD_PATH set to '{path_str}' but {HYPERD_EXE} not found in that directory"
306 )));
307 }
308 if path.exists() {
309 return Ok(path);
310 }
311 #[cfg(windows)]
312 {
313 let with_ext = PathBuf::from(format!("{path_str}.exe"));
314 if with_ext.exists() {
315 return Ok(with_ext);
316 }
317 }
318 Err(Error::new(format!(
319 "HYPERD_PATH set to '{}' but hyperd executable not found (checked: {})",
320 path_str,
321 path.display()
322 )))
323 }
324
325 /// Starts the hyperd server process with callback connection.
326 fn start_server(hyperd_path: &Path, parameters: Option<&Parameters>) -> Result<Self> {
327 // Verify hyperd exists
328 if !hyperd_path.exists() {
329 return Err(Error::new(format!(
330 "Hyper executable not found at: {}",
331 hyperd_path.display()
332 )));
333 }
334
335 info!(
336 target: "hyperdb_api",
337 path = %hyperd_path.display(),
338 "hyperd-starting"
339 );
340
341 // Create callback listener on ephemeral port
342 // This is the "dead man's switch" - when this connection is closed, Hyper shuts down
343 let callback_listener = TcpListener::bind("127.0.0.1:0")
344 .map_err(|e| Error::new(format!("Failed to create callback listener: {e}")))?;
345
346 let callback_port = callback_listener
347 .local_addr()
348 .map_err(|e| Error::new(format!("Failed to get callback port: {e}")))?
349 .port();
350
351 // Set a timeout for accepting the callback connection
352 callback_listener
353 .set_nonblocking(false)
354 .map_err(|e| Error::new(format!("Failed to set callback listener to blocking: {e}")))?;
355
356 // Check if user wants to disable default parameters
357 let use_defaults = parameters.map_or(true, |p| !p.contains_key(NO_DEFAULT_PARAMETERS));
358
359 // Get the listen mode
360 let listen_mode = parameters.and_then(|p| p.listen_mode).unwrap_or_default();
361
362 // Get transport mode (default to TCP until UDS performance is validated)
363 // See IPC_IMPLEMENTATION.md for details
364 #[cfg(unix)]
365 let transport_mode = parameters
366 .and_then(|p| p.transport_mode)
367 .unwrap_or(TransportMode::Tcp);
368 #[cfg(windows)]
369 let transport_mode = parameters
370 .and_then(|p| p.transport_mode)
371 .unwrap_or(TransportMode::Tcp);
372 #[cfg(not(any(unix, windows)))]
373 let transport_mode = TransportMode::Tcp;
374
375 // Create socket directory for UDS if needed (Unix only)
376 #[cfg(unix)]
377 let socket_directory: Option<PathBuf> = if transport_mode == TransportMode::Ipc {
378 // Use custom directory if provided, otherwise create temp directory
379 let dir = if let Some(custom_dir) =
380 parameters.and_then(|p| p.domain_socket_directory.as_ref())
381 {
382 custom_dir.clone()
383 } else {
384 // Create a temp directory for the socket
385 let temp_dir = std::env::temp_dir().join(format!("hyper-{}", std::process::id()));
386 std::fs::create_dir_all(&temp_dir)
387 .map_err(|e| Error::new(format!("Failed to create socket directory: {e}")))?;
388 temp_dir
389 };
390 Some(dir)
391 } else {
392 None
393 };
394
395 // On non-Unix platforms there is no UDS socket directory; the variable
396 // is only referenced inside `#[cfg(unix)]` blocks so we do not need a
397 // placeholder binding here.
398
399 // Create pipe name for Named Pipes if needed (Windows only)
400 #[cfg(windows)]
401 let pipe_name: Option<String> = if transport_mode == TransportMode::Ipc {
402 Some(format!("hyper-{}", std::process::id()))
403 } else {
404 None
405 };
406
407 // Build command arguments
408 let mut cmd = Command::new(hyperd_path);
409
410 // The "run" subcommand starts the server
411 cmd.arg("run");
412
413 // Callback connection - Hyper will connect to this and send its endpoint
414 // When this connection is closed, Hyper will shut down gracefully
415 cmd.arg("--callback-connection")
416 .arg(format!("tab.tcp://127.0.0.1:{callback_port}"));
417
418 // Configure listen connection based on mode and transport
419 // Connection string formats:
420 // - tab.tcp://host:port - libpq over TCP
421 // - tab.domain://<dir>/domain/<name> - libpq over Unix Domain Socket
422 // - tcp.grpc://host:port - gRPC
423 #[cfg(unix)]
424 let listen_connection = if transport_mode == TransportMode::Ipc {
425 let socket_dir = socket_directory.as_ref().unwrap();
426 match listen_mode {
427 ListenMode::LibPq => format!("tab.domain://{}/domain/hyper", socket_dir.display()),
428 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
429 ListenMode::Both { grpc_port } => {
430 format!(
431 "tab.domain://{}/domain/hyper,tcp.grpc://127.0.0.1:{}",
432 socket_dir.display(),
433 grpc_port
434 )
435 }
436 }
437 } else {
438 match listen_mode {
439 ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
440 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
441 ListenMode::Both { grpc_port } => {
442 format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
443 }
444 }
445 };
446
447 #[cfg(windows)]
448 let listen_connection = if transport_mode == TransportMode::Ipc {
449 let pname = pipe_name.as_ref().unwrap();
450 match listen_mode {
451 ListenMode::LibPq => format!("tab.pipe://./pipe/{pname}"),
452 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
453 ListenMode::Both { grpc_port } => {
454 format!("tab.pipe://./pipe/{pname},tcp.grpc://127.0.0.1:{grpc_port}")
455 }
456 }
457 } else {
458 match listen_mode {
459 ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
460 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
461 ListenMode::Both { grpc_port } => {
462 format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
463 }
464 }
465 };
466
467 #[cfg(not(any(unix, windows)))]
468 let listen_connection = match listen_mode {
469 ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
470 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{}", port),
471 ListenMode::Both { grpc_port } => {
472 format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{}", grpc_port)
473 }
474 };
475
476 cmd.arg("--listen-connection").arg(&listen_connection);
477
478 // Helper to check if a parameter is already set by the user
479 let user_has_param =
480 |key: &str| -> bool { parameters.is_some_and(|p| p.contains_key(key)) };
481
482 // Apply default instance parameters (matching C++ HyperProcess behavior)
483 // These can be overridden by user parameters or disabled entirely with NO_DEFAULT_PARAMETERS
484 if use_defaults {
485 // Initial user for the Hyper instance
486 if !user_has_param("init_user") {
487 cmd.arg("--init-user=tableau_internal_user");
488 }
489
490 // Enable gRPC threads if gRPC mode is enabled
491 // Required for gRPC to function - without this, Hyper will fail to start with:
492 // "gRPC threads are required for running gRPC services"
493 // Using 4 threads as a reasonable default (can be overridden by user)
494 if matches!(
495 listen_mode,
496 ListenMode::Grpc { .. } | ListenMode::Both { .. }
497 ) && !user_has_param("grpc_threads")
498 {
499 cmd.arg("--grpc-threads=4");
500 }
501
502 // Enable gRPC result persistence if gRPC mode is enabled
503 // This is required for ADAPTIVE and ASYNC transfer modes
504 if matches!(
505 listen_mode,
506 ListenMode::Grpc { .. } | ListenMode::Both { .. }
507 ) && !user_has_param("grpc_persist_results")
508 {
509 cmd.arg("--grpc-persist-results=true");
510 }
511
512 // Default language setting
513 if !user_has_param("language") {
514 cmd.arg("--language=en_US");
515 }
516
517 // Log configuration: file-based JSON logging
518 if !user_has_param("log_config") {
519 cmd.arg(format!("--log-config={DEFAULT_LOG_CONFIG}"));
520 }
521
522 // Date style for date parsing (Month-Day-Year)
523 if !user_has_param("date_style") {
524 cmd.arg("--date-style=MDY");
525 }
526
527 // Enforce strict date_style (day/month/year ordering must match exactly)
528 if !user_has_param("date_style_lenient") {
529 cmd.arg("--date-style-lenient=false");
530 }
531
532 // Set default log directory to current directory
533 if !user_has_param("log_dir") {
534 if let Ok(cwd) = std::env::current_dir() {
535 cmd.arg(format!("--log-dir={}", cwd.display()));
536 }
537 }
538
539 // Disable password requirement for local development
540 if !user_has_param("no_password") {
541 cmd.arg("--no-password");
542 }
543
544 // Skip license check for local development
545 if !user_has_param("skip_license") {
546 cmd.arg("--skip-license");
547 }
548
549 // Default new .hyper databases to file format version 3, which
550 // adds support for 128-bit NUMERICs (required to ingest parquet
551 // files whose decimal columns are stored as DECIMAL128).
552 // File format 3 has shipped since Hyper 2022.4.0.
553 if !user_has_param("default_database_version") {
554 cmd.arg("--default-database-version=3");
555 }
556 }
557
558 // Add custom parameters from user
559 if let Some(params) = parameters {
560 for (key, value) in params.iter() {
561 // Skip internal/special parameters
562 if key == "callback_connection"
563 || key == "listen_connection"
564 || key == NO_DEFAULT_PARAMETERS
565 {
566 continue;
567 }
568
569 // Convert underscores to dashes for command-line arguments
570 let cli_key = key.replace('_', "-");
571
572 if value.is_empty() {
573 cmd.arg(format!("--{cli_key}"));
574 } else {
575 cmd.arg(format!("--{cli_key}={value}"));
576 }
577 }
578 }
579
580 // Resolve the effective log directory for later access via log_dir()
581 let resolved_log_dir =
582 if let Some(user_dir) = parameters.and_then(|p| p.get("log_dir")).map(PathBuf::from) {
583 Some(user_dir)
584 } else if use_defaults {
585 std::env::current_dir().ok()
586 } else {
587 None
588 };
589
590 // Redirect stdout/stderr to null - we get the endpoint via callback connection
591 cmd.stdout(Stdio::null());
592 cmd.stderr(Stdio::null());
593
594 // On Unix, start hyperd in its own process group so it doesn't receive
595 // Ctrl-C (SIGINT) signals meant for the parent process. This allows the
596 // parent to handle Ctrl-C gracefully and properly shut down hyperd via
597 // the callback connection mechanism.
598 #[cfg(unix)]
599 cmd.process_group(0);
600
601 // On Windows, prevent a console window from flashing when spawning hyperd.
602 // CREATE_NO_WINDOW (0x08000000) suppresses the creation of a visible console.
603 #[cfg(windows)]
604 {
605 use std::os::windows::process::CommandExt;
606 const CREATE_NO_WINDOW: u32 = 0x08000000;
607 cmd.creation_flags(CREATE_NO_WINDOW);
608 }
609
610 // Start the process
611 let child = cmd.spawn().map_err(|e| {
612 Error::new(format!(
613 "Failed to start Hyper server at {}: {}",
614 hyperd_path.display(),
615 e
616 ))
617 })?;
618
619 // Wait for Hyper to connect back to our callback listener
620 let (callback_connection, callback_endpoint) = Self::wait_for_callback(&callback_listener)?;
621
622 // Determine the endpoints based on listen mode and callback response
623 let (endpoint, grpc_endpoint) = match listen_mode {
624 ListenMode::LibPq => {
625 // Callback returns libpq endpoint
626 (Some(callback_endpoint), None)
627 }
628 ListenMode::Grpc { port } => {
629 // Callback returns gRPC endpoint (with resolved port if auto-assigned)
630 let grpc_ep = if port == 0 {
631 // The callback returns the actual gRPC endpoint with resolved port
632 callback_endpoint
633 } else {
634 format!("127.0.0.1:{port}")
635 };
636 (None, Some(grpc_ep))
637 }
638 ListenMode::Both { grpc_port } => {
639 // Callback returns libpq endpoint, gRPC uses specified port
640 (
641 Some(callback_endpoint),
642 Some(format!("127.0.0.1:{grpc_port}")),
643 )
644 }
645 };
646
647 // Parse connection endpoint if we have a libpq endpoint
648 let connection_endpoint = endpoint.as_ref().map(|ep| {
649 #[cfg(unix)]
650 {
651 // Check if it's a UDS path (contains path separator but no colon with port)
652 if ep.starts_with('/') || socket_directory.is_some() {
653 // UDS endpoint - construct from socket directory
654 if let Some(ref dir) = socket_directory {
655 return ConnectionEndpoint::domain_socket(dir, "hyper");
656 }
657 // Parse as path
658 let path = std::path::Path::new(ep);
659 let dir = path.parent().unwrap_or(std::path::Path::new("/"));
660 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("hyper");
661 return ConnectionEndpoint::domain_socket(dir, name);
662 }
663 }
664 #[cfg(windows)]
665 {
666 // Check if it's a Named Pipe endpoint
667 if pipe_name.is_some() {
668 if let Some(ref pname) = pipe_name {
669 return ConnectionEndpoint::named_pipe(".", pname);
670 }
671 }
672 }
673 // TCP endpoint (host:port format)
674 let parts: Vec<&str> = ep.split(':').collect();
675 if parts.len() == 2 {
676 if let Ok(port) = parts[1].parse::<u16>() {
677 return ConnectionEndpoint::tcp(parts[0], port);
678 }
679 }
680 ConnectionEndpoint::tcp("localhost", 7483) // fallback
681 });
682
683 Ok(HyperProcess {
684 child: Some(child),
685 endpoint,
686 connection_endpoint,
687 grpc_endpoint,
688 hyperd_path: hyperd_path.to_path_buf(),
689 shutdown_initiated: Arc::new(AtomicBool::new(false)),
690 callback_connection: Some(callback_connection),
691 listen_mode,
692 transport_mode,
693 log_dir: resolved_log_dir,
694 #[cfg(unix)]
695 socket_directory,
696 #[cfg(windows)]
697 pipe_name,
698 })
699 }
700
701 /// Waits for Hyper to connect to our callback listener and send its endpoint.
702 ///
703 /// Protocol:
704 /// - Hyper connects to the callback listener
705 /// - Hyper sends: [1 byte length][N bytes connection descriptor string]
706 /// - Connection descriptor format: "tab.tcp://host:port"
707 fn wait_for_callback(listener: &TcpListener) -> Result<(TcpStream, String)> {
708 // Set a timeout for accepting connections
709 listener.set_nonblocking(true).ok();
710
711 let timeout = Duration::from_secs(60);
712 let start = std::time::Instant::now();
713
714 // Poll for incoming connection with timeout
715 let mut stream = loop {
716 if start.elapsed() > timeout {
717 return Err(Error::new(
718 "Timeout waiting for Hyper to connect to callback listener. \
719 Hyper may have failed to start - check hyperd logs for details.",
720 ));
721 }
722
723 match listener.accept() {
724 Ok((stream, _addr)) => break stream,
725 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
726 thread::sleep(Duration::from_millis(50));
727 }
728 Err(e) => {
729 return Err(Error::new(format!(
730 "Failed to accept callback connection: {e}"
731 )));
732 }
733 }
734 };
735
736 // Set stream back to blocking for reading
737 stream
738 .set_nonblocking(false)
739 .map_err(|e| Error::new(format!("Failed to set callback stream to blocking: {e}")))?;
740
741 // Set read timeout
742 stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
743
744 // Read the endpoint descriptor from Hyper
745 // Protocol: [1 byte length][N bytes descriptor string]
746 let mut len_buf = [0u8; 1];
747 stream
748 .read_exact(&mut len_buf)
749 .map_err(|e| Error::new(format!("Failed to read endpoint length from Hyper: {e}")))?;
750
751 let len = len_buf[0] as usize;
752 if len == 0 {
753 return Err(Error::new("Hyper sent empty endpoint descriptor"));
754 }
755
756 let mut descriptor_buf = vec![0u8; len];
757 stream.read_exact(&mut descriptor_buf).map_err(|e| {
758 Error::new(format!(
759 "Failed to read endpoint descriptor from Hyper: {e}"
760 ))
761 })?;
762
763 let descriptor = String::from_utf8(descriptor_buf)
764 .map_err(|e| Error::new(format!("Invalid UTF-8 in endpoint descriptor: {e}")))?;
765
766 // Trim null bytes and whitespace that Hyper may include
767 let descriptor = descriptor.trim_matches(|c: char| c == '\0' || c.is_whitespace());
768
769 // Parse the connection descriptor (format: "tab.tcp://host:port")
770 let endpoint = Self::parse_connection_descriptor(descriptor)?;
771
772 // Clear read timeout for the connection we'll keep open
773 stream.set_read_timeout(None).ok();
774
775 info!(
776 target: "hyperdb_api",
777 %endpoint,
778 "hyperd-started"
779 );
780
781 Ok((stream, endpoint))
782 }
783
784 /// Parses a connection descriptor to extract host:port, socket path, or pipe path.
785 ///
786 /// Input formats:
787 /// - "tab.tcp://host:port" → "host:port"
788 /// - "tab.domain://<dir>/domain/<name>" → "<dir>/domain/<name>" (socket path)
789 /// - "tab.pipe://<host>/pipe/<name>" → "<host>/pipe/<name>" (named pipe)
790 /// - "tcp.grpc://host:port" → "host:port"
791 fn parse_connection_descriptor(descriptor: &str) -> Result<String> {
792 // Handle domain socket format
793 if let Some(rest) = descriptor.strip_prefix("tab.domain://") {
794 // Return the full path for UDS
795 if let Some(idx) = rest.find("/domain/") {
796 let dir = &rest[..idx];
797 let name = &rest[idx + 8..]; // "/domain/".len() == 8
798 let socket_path = format!("{dir}/domain/{name}");
799 return Ok(socket_path);
800 }
801 return Ok(rest.to_string());
802 }
803
804 // Handle named pipe format
805 if let Some(rest) = descriptor.strip_prefix("tab.pipe://") {
806 // Format: tab.pipe://<host>/pipe/<name>
807 // Return as pipe path: \\<host>\pipe\<name>
808 if let Some(idx) = rest.find("/pipe/") {
809 let host = &rest[..idx];
810 let name = &rest[idx + 6..]; // "/pipe/".len() == 6
811 let pipe_path = format!(r"\\{host}\pipe\{name}");
812 return Ok(pipe_path);
813 }
814 return Ok(rest.to_string());
815 }
816
817 // Handle TCP prefixes
818 let without_prefix = descriptor
819 .strip_prefix("tab.tcp://")
820 .or_else(|| descriptor.strip_prefix("tcp.grpc://"))
821 .or_else(|| descriptor.strip_prefix("tcp.grpctls://"))
822 .or_else(|| descriptor.strip_prefix("tcp://"))
823 .unwrap_or(descriptor);
824
825 // Validate it looks like host:port
826 if without_prefix.contains(':') && !without_prefix.is_empty() {
827 Ok(without_prefix.to_string())
828 } else {
829 Err(Error::new(format!(
830 "Invalid connection descriptor format: '{descriptor}'. Expected '<scheme>://host:port' or 'tab.domain://<dir>/domain/<name>'"
831 )))
832 }
833 }
834
835 /// Returns the libpq endpoint for connecting to this instance.
836 ///
837 /// The endpoint is in the format "host:port" (e.g., "localhost:54321").
838 ///
839 /// Returns `None` if the process was started in gRPC-only mode.
840 /// Use [`grpc_endpoint`](Self::grpc_endpoint) for gRPC connections.
841 #[must_use]
842 pub fn endpoint(&self) -> Option<&str> {
843 self.endpoint.as_deref()
844 }
845
846 /// Returns the libpq endpoint, or an error if not available.
847 ///
848 /// This is a convenience method to avoid `unwrap()` calls when you need
849 /// the endpoint and want proper error handling.
850 ///
851 /// # Errors
852 ///
853 /// Returns an error if this process was started in gRPC-only mode.
854 ///
855 /// # Example
856 ///
857 /// ```no_run
858 /// use hyperdb_api::{HyperProcess, Result};
859 ///
860 /// fn main() -> Result<()> {
861 /// let hyper = HyperProcess::new(None, None)?;
862 /// let endpoint = hyper.require_endpoint()?; // No unwrap() needed!
863 /// println!("Server running at: {}", endpoint);
864 /// Ok(())
865 /// }
866 /// ```
867 pub fn require_endpoint(&self) -> crate::error::Result<&str> {
868 self.endpoint().ok_or_else(|| {
869 crate::error::Error::new(
870 "HyperProcess does not have a libpq endpoint (gRPC-only mode). \
871 Use grpc_endpoint() instead or start with LibPq or Both listen mode.",
872 )
873 })
874 }
875
876 /// Returns the gRPC endpoint for connecting to this instance.
877 ///
878 /// The endpoint is in the format "host:port" (e.g., "127.0.0.1:7484").
879 ///
880 /// Returns `None` if the process was started in libpq-only mode.
881 #[must_use]
882 pub fn grpc_endpoint(&self) -> Option<&str> {
883 self.grpc_endpoint.as_deref()
884 }
885
886 /// Returns the gRPC endpoint, or an error if not available.
887 ///
888 /// This is a convenience method to avoid `unwrap()` calls when you need
889 /// the gRPC endpoint and want proper error handling.
890 ///
891 /// # Errors
892 ///
893 /// Returns an error if this process was started in libpq-only mode.
894 pub fn require_grpc_endpoint(&self) -> crate::error::Result<&str> {
895 self.grpc_endpoint().ok_or_else(|| {
896 crate::error::Error::new(
897 "HyperProcess does not have a gRPC endpoint (libpq-only mode). \
898 Use endpoint() instead or start with Grpc or Both listen mode.",
899 )
900 })
901 }
902
903 /// Returns the gRPC endpoint as a full URL suitable for gRPC clients.
904 ///
905 /// Returns the endpoint prefixed with "http://" (e.g., "<http://127.0.0.1:7484>").
906 /// Returns `None` if the process was started in libpq-only mode.
907 #[must_use]
908 pub fn grpc_url(&self) -> Option<String> {
909 self.grpc_endpoint.as_ref().map(|ep| format!("http://{ep}"))
910 }
911
912 /// Returns the gRPC URL, or an error if not available.
913 ///
914 /// This is a convenience method to avoid `unwrap()` calls when you need
915 /// the gRPC URL and want proper error handling.
916 ///
917 /// # Errors
918 ///
919 /// Returns an error if this process was started in libpq-only mode.
920 pub fn require_grpc_url(&self) -> crate::error::Result<String> {
921 Ok(format!("http://{}", self.require_grpc_endpoint()?))
922 }
923
924 /// Returns the listen mode this process was started with.
925 #[must_use]
926 pub fn listen_mode(&self) -> ListenMode {
927 self.listen_mode
928 }
929
930 /// Returns the transport mode this process was started with.
931 #[must_use]
932 pub fn transport_mode(&self) -> TransportMode {
933 self.transport_mode
934 }
935
936 /// Returns the connection endpoint for this process.
937 ///
938 /// This returns a [`ConnectionEndpoint`] that can be used to connect
939 /// to this Hyper instance via TCP, Unix Domain Socket, or Named Pipe.
940 #[must_use]
941 pub fn connection_endpoint(&self) -> Option<&ConnectionEndpoint> {
942 self.connection_endpoint.as_ref()
943 }
944
945 /// Returns the log directory where hyperd writes its log files.
946 ///
947 /// The log file is typically `hyperd.log` within this directory.
948 /// Returns `None` if the log directory could not be determined (e.g.,
949 /// default parameters were disabled and no `log_dir` was specified).
950 ///
951 /// This is useful for setting up [`QueryStatsProvider`](crate::QueryStatsProvider)
952 /// implementations that parse the Hyper log file.
953 #[must_use]
954 pub fn log_dir(&self) -> Option<&Path> {
955 self.log_dir.as_deref()
956 }
957
958 /// Returns the socket directory used for UDS connections (Unix only).
959 ///
960 /// Returns `None` if the process is using TCP or if no socket directory was created.
961 #[cfg(unix)]
962 #[must_use]
963 pub fn socket_directory(&self) -> Option<&Path> {
964 self.socket_directory.as_deref()
965 }
966
967 /// Returns the pipe name used for Named Pipe connections (Windows only).
968 ///
969 /// Returns `None` if the process is using TCP.
970 #[cfg(windows)]
971 pub fn pipe_name(&self) -> Option<&str> {
972 self.pipe_name.as_deref()
973 }
974
975 /// Returns the process ID of the Hyper server.
976 #[must_use]
977 pub fn pid(&self) -> Option<u32> {
978 self.child.as_ref().map(std::process::Child::id)
979 }
980
981 /// Returns whether the Hyper server process is still running.
982 #[must_use]
983 pub fn is_running(&self) -> bool {
984 if let Some(ref child) = self.child {
985 // Try to check if process is alive without waiting
986 #[cfg(unix)]
987 {
988 match Command::new("kill")
989 .args(["-0", &child.id().to_string()])
990 .output()
991 {
992 Ok(output) => output.status.success(),
993 Err(_) => false,
994 }
995 }
996 #[cfg(not(unix))]
997 {
998 // On Windows, we can't easily check without waiting
999 // Assume it's running if we have a handle
1000 let _ = child; // Silence unused variable warning
1001 true
1002 }
1003 } else {
1004 false
1005 }
1006 }
1007
1008 /// Shuts down the Hyper server gracefully with a timeout.
1009 ///
1010 /// This closes the callback connection, which signals Hyper to shut down gracefully.
1011 /// If Hyper doesn't exit within the timeout, it will be forcefully terminated.
1012 ///
1013 /// # Arguments
1014 ///
1015 /// * `timeout` - Maximum time to wait for graceful shutdown before force-killing.
1016 ///
1017 /// # Errors
1018 ///
1019 /// Returns an error if the shutdown fails.
1020 pub fn shutdown_timeout(mut self, timeout: Duration) -> Result<()> {
1021 self.shutdown_initiated.store(true, Ordering::SeqCst);
1022 self.do_shutdown(Some(timeout))
1023 }
1024
1025 /// Shuts down the Hyper server gracefully, waiting indefinitely.
1026 ///
1027 /// This closes the callback connection and waits for Hyper to exit.
1028 /// Use [`shutdown_timeout`](Self::shutdown_timeout) if you need a timeout.
1029 ///
1030 /// # Errors
1031 ///
1032 /// Returns an error if the shutdown fails.
1033 pub fn shutdown_graceful(mut self) -> Result<()> {
1034 self.shutdown_initiated.store(true, Ordering::SeqCst);
1035 self.do_shutdown(None)
1036 }
1037
1038 /// Closes the callback connection to signal Hyper to shut down.
1039 ///
1040 /// This is the graceful shutdown mechanism - Hyper monitors the callback connection
1041 /// and will initiate shutdown when it's closed.
1042 fn close_callback_connection(&mut self) {
1043 if let Some(conn) = self.callback_connection.take() {
1044 // Gracefully shutdown both directions
1045 let _ = conn.shutdown(Shutdown::Both);
1046 // Connection is dropped here, closing the socket
1047 }
1048 }
1049
1050 /// Internal shutdown implementation.
1051 fn do_shutdown(&mut self, timeout: Option<Duration>) -> Result<()> {
1052 info!(target: "hyperdb_api", "hyperd-shutdown");
1053
1054 // Step 1: Close the callback connection to signal graceful shutdown
1055 // Hyper will detect this and begin shutting down
1056 self.close_callback_connection();
1057
1058 if let Some(mut child) = self.child.take() {
1059 // Step 2: Wait for the process to exit
1060 let wait_result = if let Some(timeout) = timeout {
1061 // Wait with timeout
1062 let start = std::time::Instant::now();
1063 loop {
1064 match child.try_wait() {
1065 Ok(Some(status)) => break Ok(status),
1066 Ok(None) => {
1067 if start.elapsed() > timeout {
1068 // Step 3: Force kill ONLY after timeout
1069 // This should rarely happen if Hyper is healthy
1070 #[cfg(unix)]
1071 {
1072 // Try SIGTERM first
1073 let _ = Command::new("kill")
1074 .args(["-TERM", &child.id().to_string()])
1075 .output();
1076 thread::sleep(Duration::from_millis(100));
1077 }
1078 // Then force kill
1079 let _ = child.kill();
1080 break child.wait().map_err(|e| {
1081 Error::new(format!("Failed to wait for hyperd: {e}"))
1082 });
1083 }
1084 thread::sleep(Duration::from_millis(100));
1085 }
1086 Err(e) => break Err(Error::new(format!("Failed to wait for hyperd: {e}"))),
1087 }
1088 }
1089 } else {
1090 // Wait indefinitely
1091 child
1092 .wait()
1093 .map_err(|e| Error::new(format!("Failed to wait for hyperd: {e}")))
1094 };
1095
1096 wait_result?;
1097 }
1098
1099 Ok(())
1100 }
1101}
1102
1103impl Drop for HyperProcess {
1104 fn drop(&mut self) {
1105 if !self.shutdown_initiated.load(Ordering::SeqCst) {
1106 // Try to gracefully shutdown with a short timeout
1107 let _ = self.do_shutdown(Some(Duration::from_secs(5)));
1108 }
1109
1110 // Clean up socket directory if we created one
1111 #[cfg(unix)]
1112 if let Some(ref dir) = self.socket_directory {
1113 // Only clean up if it's a temp directory we created (contains our PID)
1114 let dir_name = dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
1115 if dir_name.starts_with("hyper-") {
1116 let _ = std::fs::remove_dir_all(dir);
1117 }
1118 }
1119 }
1120}
1121
1122// SAFETY: `HyperProcess` owns its `std::process::Child` handle and (optionally)
1123// a TCP connection. Both are themselves `Send`, and no field holds thread-local
1124// state or a non-`Send` raw pointer. Ownership of a `HyperProcess` therefore
1125// transfers cleanly across thread boundaries.
1126unsafe impl Send for HyperProcess {}
1127
1128/// Special parameter key that disables default instance parameters when present.
1129///
1130/// By default, [`HyperProcess`] starts hyperd with a set of sensible default parameters
1131/// (matching the C++ `HyperProcess` behavior). If you need full control over all parameters,
1132/// include this key in your [`Parameters`] to disable all defaults.
1133pub(crate) const NO_DEFAULT_PARAMETERS: &str = "no_default_parameters";
1134
1135/// Default log configuration for hyperd: file-based JSON logging.
1136const DEFAULT_LOG_CONFIG: &str = "file,json,all,hyperd,0";
1137
1138/// Parameters for configuring the Hyper server.
1139///
1140/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1141/// (matching the C++ `HyperProcess` behavior):
1142///
1143/// | Parameter | Default Value | Description |
1144/// |-----------|---------------|-------------|
1145/// | `init_user` | `tableau_internal_user` | Initial user for the Hyper instance |
1146/// | `language` | `en_US` | Default language setting |
1147/// | `log_config` | `file,json,all,hyperd,0` | Log configuration |
1148/// | `date_style` | `MDY` | Date format (Month-Day-Year) |
1149/// | `date_style_lenient` | `false` | Strict date parsing |
1150/// | `log_dir` | Current directory | Log file directory |
1151/// | `no_password` | (flag) | Disable password requirement |
1152/// | `skip_license` | (flag) | Skip license check |
1153/// | `default_database_version` | `3` | File format version for newly created `.hyper` databases (v3 adds 128-bit NUMERIC support, required for DECIMAL128 parquet columns) |
1154///
1155/// To disable these defaults, add the `no_default_parameters` key (for example
1156/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1157///
1158/// # Listen Modes
1159///
1160/// Use [`set_listen_mode`](Parameters::set_listen_mode) to configure which protocols Hyper listens on:
1161///
1162/// ```
1163/// use hyperdb_api::{ListenMode, Parameters};
1164///
1165/// // gRPC only (for Arrow-based queries)
1166/// let mut params = Parameters::new();
1167/// params.set_listen_mode(ListenMode::Grpc { port: 0 });
1168///
1169/// // Both libpq and gRPC
1170/// let mut params = Parameters::new();
1171/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
1172/// ```
1173///
1174/// # Example
1175///
1176/// ```
1177/// use hyperdb_api::Parameters;
1178///
1179/// let mut params = Parameters::new();
1180/// params.set("log_file_size_limit", "100k");
1181/// params.set("log_file_max_count", "7");
1182/// ```
1183///
1184/// # Transport Modes
1185///
1186/// Use [`set_transport_mode`](Parameters::set_transport_mode) to control whether Hyper uses
1187/// TCP or IPC (Unix Domain Sockets):
1188///
1189/// ```
1190/// use hyperdb_api::{Parameters, TransportMode};
1191///
1192/// let mut params = Parameters::new();
1193/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1194/// ```
1195///
1196/// Transport mode for `HyperProcess` connections.
1197///
1198/// Controls whether the server uses TCP or Unix Domain Sockets (IPC) for connections.
1199/// On Unix systems, IPC is the default for better local performance.
1200/// On Windows, TCP is always used.
1201#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1202pub enum TransportMode {
1203 /// Use IPC (Unix Domain Sockets on Unix, Named Pipes on Windows).
1204 /// This is the default mode and provides better performance for local connections.
1205 #[default]
1206 Ipc,
1207
1208 /// Use TCP/IP connections.
1209 /// Required when connecting from remote clients or when IPC is not available.
1210 Tcp,
1211}
1212
1213/// Parameters for configuring the Hyper server.
1214///
1215/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1216/// (matching the C++ `HyperProcess` behavior). You can override these defaults or disable
1217/// them entirely by adding the `no_default_parameters` key (for example
1218/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1219///
1220/// # Transport Modes
1221///
1222/// Use [`set_transport_mode`](Self::set_transport_mode) to control whether Hyper uses
1223/// TCP or IPC (Unix Domain Sockets on Unix systems).
1224///
1225/// # Example
1226///
1227/// ```
1228/// use hyperdb_api::{Parameters, TransportMode};
1229///
1230/// let mut params = Parameters::new();
1231/// params.set("log_file_size_limit", "100k");
1232/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1233/// ```
1234#[derive(Debug, Clone, Default)]
1235pub struct Parameters {
1236 values: Vec<(String, String)>,
1237 /// The listen mode for the Hyper server.
1238 pub(crate) listen_mode: Option<ListenMode>,
1239 /// The transport mode (TCP or IPC/UDS).
1240 pub(crate) transport_mode: Option<TransportMode>,
1241 /// Custom domain socket directory (Unix only).
1242 #[cfg(unix)]
1243 pub(crate) domain_socket_directory: Option<PathBuf>,
1244}
1245
1246impl Parameters {
1247 /// Creates a new empty Parameters instance.
1248 #[must_use]
1249 pub fn new() -> Self {
1250 Parameters {
1251 values: Vec::new(),
1252 listen_mode: None,
1253 transport_mode: None,
1254 #[cfg(unix)]
1255 domain_socket_directory: None,
1256 }
1257 }
1258
1259 /// Sets the transport mode (TCP or IPC/UDS).
1260 ///
1261 /// By default, `HyperProcess` uses IPC (Unix Domain Sockets on Unix) for better
1262 /// performance. Use `TransportMode::Tcp` if you need TCP connections.
1263 ///
1264 /// # Example
1265 ///
1266 /// ```
1267 /// use hyperdb_api::{Parameters, TransportMode};
1268 ///
1269 /// let mut params = Parameters::new();
1270 /// params.set_transport_mode(TransportMode::Tcp); // Use TCP instead of IPC
1271 /// ```
1272 pub fn set_transport_mode(&mut self, mode: TransportMode) -> &mut Self {
1273 self.transport_mode = Some(mode);
1274 self
1275 }
1276
1277 /// Returns the configured transport mode.
1278 #[must_use]
1279 pub fn transport_mode(&self) -> Option<TransportMode> {
1280 self.transport_mode
1281 }
1282
1283 /// Sets a custom domain socket directory (Unix only).
1284 ///
1285 /// By default, `HyperProcess` creates sockets in a temporary directory.
1286 /// Use this to specify a custom location.
1287 #[cfg(unix)]
1288 pub fn set_domain_socket_directory(&mut self, dir: impl Into<PathBuf>) -> &mut Self {
1289 self.domain_socket_directory = Some(dir.into());
1290 self
1291 }
1292
1293 /// Returns the configured domain socket directory (Unix only).
1294 #[cfg(unix)]
1295 #[must_use]
1296 pub fn domain_socket_directory(&self) -> Option<&Path> {
1297 self.domain_socket_directory.as_deref()
1298 }
1299
1300 /// Sets a parameter value.
1301 ///
1302 /// # Arguments
1303 ///
1304 /// * `key` - The parameter name.
1305 /// * `value` - The parameter value (empty string for flags).
1306 pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
1307 let key = key.into();
1308 let value = value.into();
1309
1310 // Update existing or add new
1311 if let Some(entry) = self.values.iter_mut().find(|(k, _)| k == &key) {
1312 entry.1 = value;
1313 } else {
1314 self.values.push((key, value));
1315 }
1316
1317 self
1318 }
1319
1320 /// Sets the listen mode for the Hyper server.
1321 ///
1322 /// This controls which protocols the server listens on:
1323 /// - [`ListenMode::LibPq`]: `PostgreSQL` wire protocol only (default)
1324 /// - [`ListenMode::Grpc`]: gRPC protocol only (query-only, Arrow results)
1325 /// - [`ListenMode::Both`]: Both protocols enabled
1326 ///
1327 /// # Example
1328 ///
1329 /// ```
1330 /// use hyperdb_api::{ListenMode, Parameters};
1331 ///
1332 /// let mut params = Parameters::new();
1333 /// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // Auto-assign port
1334 /// ```
1335 pub fn set_listen_mode(&mut self, mode: ListenMode) -> &mut Self {
1336 self.listen_mode = Some(mode);
1337 self
1338 }
1339
1340 /// Returns the configured listen mode, if any.
1341 #[must_use]
1342 pub fn listen_mode(&self) -> Option<ListenMode> {
1343 self.listen_mode
1344 }
1345
1346 /// Gets a parameter value.
1347 #[must_use]
1348 pub fn get(&self, key: &str) -> Option<&str> {
1349 self.values
1350 .iter()
1351 .find(|(k, _)| k == key)
1352 .map(|(_, v)| v.as_str())
1353 }
1354
1355 /// Returns whether the parameters contain the given key.
1356 #[must_use]
1357 pub fn contains_key(&self, key: &str) -> bool {
1358 self.values.iter().any(|(k, _)| k == key)
1359 }
1360
1361 /// Returns an iterator over the parameters.
1362 pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
1363 self.values.iter().map(|(k, v)| (k.as_str(), v.as_str()))
1364 }
1365
1366 /// Returns whether the parameters are empty.
1367 #[must_use]
1368 pub fn is_empty(&self) -> bool {
1369 self.values.is_empty()
1370 }
1371
1372 /// Returns the number of parameters.
1373 #[must_use]
1374 pub fn len(&self) -> usize {
1375 self.values.len()
1376 }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381 use super::*;
1382
1383 #[test]
1384 fn test_parameters() {
1385 let mut params = Parameters::new();
1386 params.set("key1", "value1");
1387 params.set("key2", "value2");
1388
1389 assert_eq!(params.get("key1"), Some("value1"));
1390 assert_eq!(params.get("key2"), Some("value2"));
1391 assert_eq!(params.get("key3"), None);
1392 assert_eq!(params.len(), 2);
1393 }
1394
1395 #[test]
1396 fn test_parameters_update() {
1397 let mut params = Parameters::new();
1398 params.set("key", "value1");
1399 params.set("key", "value2");
1400
1401 assert_eq!(params.get("key"), Some("value2"));
1402 assert_eq!(params.len(), 1);
1403 }
1404
1405 #[test]
1406 fn test_parse_connection_descriptor() {
1407 assert_eq!(
1408 HyperProcess::parse_connection_descriptor("tab.tcp://localhost:12345").unwrap(),
1409 "localhost:12345"
1410 );
1411 assert_eq!(
1412 HyperProcess::parse_connection_descriptor("tab.tcp://127.0.0.1:7483").unwrap(),
1413 "127.0.0.1:7483"
1414 );
1415 assert_eq!(
1416 HyperProcess::parse_connection_descriptor("tcp://localhost:8080").unwrap(),
1417 "localhost:8080"
1418 );
1419 // Already in host:port format
1420 assert_eq!(
1421 HyperProcess::parse_connection_descriptor("localhost:9999").unwrap(),
1422 "localhost:9999"
1423 );
1424 }
1425
1426 #[test]
1427 fn test_parse_connection_descriptor_named_pipe() {
1428 assert_eq!(
1429 HyperProcess::parse_connection_descriptor("tab.pipe://./pipe/hyper-12345").unwrap(),
1430 r"\\.\pipe\hyper-12345"
1431 );
1432 assert_eq!(
1433 HyperProcess::parse_connection_descriptor("tab.pipe://server1/pipe/mydb").unwrap(),
1434 r"\\server1\pipe\mydb"
1435 );
1436 }
1437
1438 #[test]
1439 fn test_parse_connection_descriptor_invalid() {
1440 assert!(HyperProcess::parse_connection_descriptor("").is_err());
1441 assert!(HyperProcess::parse_connection_descriptor("invalid").is_err());
1442 }
1443
1444 #[test]
1445 fn test_parameters_contains_key() {
1446 let mut params = Parameters::new();
1447 params.set("key1", "value1");
1448
1449 assert!(params.contains_key("key1"));
1450 assert!(!params.contains_key("key2"));
1451 }
1452
1453 #[test]
1454 fn test_no_default_parameters_constant() {
1455 // Verify the constant matches what C++ uses
1456 assert_eq!(NO_DEFAULT_PARAMETERS, "no_default_parameters");
1457 }
1458
1459 #[test]
1460 fn test_parameters_with_no_defaults() {
1461 let mut params = Parameters::new();
1462 params.set(NO_DEFAULT_PARAMETERS, "");
1463 params.set("init_user", "custom_user");
1464
1465 assert!(params.contains_key(NO_DEFAULT_PARAMETERS));
1466 assert_eq!(params.get("init_user"), Some("custom_user"));
1467 }
1468}