hyperdb_api/process.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Hyper server process management.
5//!
6//! This module provides [`HyperProcess`] for spawning and managing local hyperd server instances.
7//!
8//! # Callback Connection Architecture
9//!
10//! The `HyperProcess` uses a **callback connection** mechanism for reliable process lifecycle
11//! management. This works as follows:
12//!
13//! 1. **Startup**: The client creates a TCP listener on an ephemeral port (the "callback proxy")
14//! and passes this address to hyperd via `--callback-connection`. Hyper connects back to this
15//! listener and sends its actual listen endpoint over this connection.
16//!
17//! 2. **Runtime**: The callback connection remains open for the lifetime of the `HyperProcess`.
18//! This connection acts as a "dead man's switch" - Hyper monitors it continuously.
19//!
20//! 3. **Graceful Shutdown**: When `HyperProcess` is dropped or explicitly shut down, the callback
21//! connection is closed. Hyper detects this and initiates graceful shutdown automatically.
22//!
23//! 4. **Crash Safety**: If the client process crashes or is killed, the OS automatically closes
24//! the TCP connection. Hyper detects this and shuts down gracefully, preventing orphan
25//! processes. This is the key advantage over signal-based shutdown.
26//!
27//! # Protocol Details
28//!
29//! The callback connection protocol is simple:
30//! - Client listens on `127.0.0.1:<ephemeral_port>`
31//! - Client starts hyperd with `--callback-connection=tab.tcp://127.0.0.1:<port>`
32//! - Hyper connects to this address and sends: `[1 byte length][N bytes descriptor]`
33//! - The descriptor is the actual listen endpoint (e.g., `tab.tcp://localhost:54321`)
34//! - Connection stays open until shutdown is desired
35//!
36//! # Listen Modes
37//!
38//! `HyperProcess` supports different listen modes via [`ListenMode`]:
39//!
40//! - **`LibPq`** (default): `PostgreSQL` wire protocol for full read/write access
41//! - **Grpc**: gRPC protocol for query-only Arrow-based access
42//! - **Both**: Both protocols enabled (libpq for read/write, gRPC for Arrow queries)
43//!
44//! ```no_run
45//! use hyperdb_api::{HyperProcess, ListenMode, Parameters, Result};
46//!
47//! fn main() -> Result<()> {
48//! // gRPC only
49//! let mut params = Parameters::new();
50//! params.set_listen_mode(ListenMode::Grpc { port: 0 });
51//! let hyper = HyperProcess::new(None, Some(¶ms))?;
52//! println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
53//!
54//! // Both libpq and gRPC
55//! let mut params = Parameters::new();
56//! params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
57//! let hyper = HyperProcess::new(None, Some(¶ms))?;
58//! println!("libpq endpoint: {}", hyper.endpoint().unwrap());
59//! println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
60//! Ok(())
61//! }
62//! ```
63
64use std::io::Read;
65use std::net::{Shutdown, TcpListener, TcpStream};
66use std::path::{Path, PathBuf};
67use std::process::{Child, Command, Stdio};
68use std::sync::atomic::{AtomicBool, Ordering};
69use std::sync::Arc;
70use std::thread;
71use std::time::Duration;
72
73#[cfg(unix)]
74use std::os::unix::process::CommandExt;
75
76#[cfg(any(unix, windows))]
77use hyperdb_api_core::client::ConnectionEndpoint;
78
79use tracing::info;
80
81use crate::error::{Error, Result};
82
83/// Specifies which protocols `HyperProcess` should listen on.
84///
85/// # Examples
86///
87/// ```
88/// use hyperdb_api::{ListenMode, Parameters};
89///
90/// // LibPq only (default) - for full read/write access
91/// let mut params = Parameters::new();
92/// params.set_listen_mode(ListenMode::LibPq);
93///
94/// // gRPC only - for Arrow-based query access
95/// let mut params = Parameters::new();
96/// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // auto-assign port
97///
98/// // Both protocols - libpq for writes, gRPC for Arrow queries
99/// let mut params = Parameters::new();
100/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
101/// ```
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
103pub enum ListenMode {
104 /// `PostgreSQL` wire protocol only (default).
105 ///
106 /// This is the traditional connection mode that supports all Hyper features
107 /// including read and write operations.
108 #[default]
109 LibPq,
110
111 /// gRPC protocol only.
112 ///
113 /// This mode is optimized for query-only workloads and returns results in
114 /// Arrow IPC format. Note that gRPC mode does not support write operations.
115 ///
116 /// Set `port` to 0 to auto-assign an available port.
117 Grpc {
118 /// The port to listen on (0 for auto-assign).
119 port: u16,
120 },
121
122 /// Both libpq and gRPC protocols.
123 ///
124 /// This mode enables full read/write access via libpq while also providing
125 /// gRPC access for Arrow-based queries. The libpq port is auto-assigned,
126 /// while the gRPC port is specified.
127 ///
128 /// Note: When using `Both` mode, the callback connection returns the libpq
129 /// endpoint. Use `HyperProcess::grpc_endpoint()` to get the gRPC endpoint.
130 Both {
131 /// The gRPC port to listen on (cannot be 0 - must be a specific port).
132 grpc_port: u16,
133 },
134}
135
136/// A running Hyper server instance.
137///
138/// This struct manages the lifecycle of a local Hyper server process using a callback
139/// connection for reliable shutdown. The server is automatically shut down when this
140/// object is dropped.
141///
142/// # Callback Connection (Dead Man's Switch)
143///
144/// Unlike traditional process management that relies on signals, `HyperProcess` maintains
145/// a TCP connection to the Hyper server. When this connection is closed (either explicitly
146/// or because the client process exits), Hyper automatically shuts down gracefully.
147///
148/// This provides several benefits:
149/// - **No orphan processes**: If your application crashes, Hyper shuts down automatically
150/// - **Graceful shutdown**: Hyper can flush data and clean up properly
151/// - **Cross-platform**: Works reliably on macOS, Linux, and Windows
152///
153/// # Example
154///
155/// ```no_run
156/// use hyperdb_api::{HyperProcess, Result};
157///
158/// fn main() -> Result<()> {
159/// // Start a Hyper server (auto-detect hyperd location)
160/// let hyper = HyperProcess::new(None, None)?;
161///
162/// println!("Hyper server running at: {}", hyper.endpoint().unwrap());
163///
164/// // Server automatically shuts down when `hyper` goes out of scope
165/// // via the callback connection mechanism
166/// Ok(())
167/// }
168/// ```
169#[must_use = "HyperProcess will shut down when dropped; store it to keep the server running"]
170#[derive(Debug)]
171pub struct HyperProcess {
172 /// The child process handle.
173 child: Option<Child>,
174 /// The libpq endpoint descriptor (host:port or socket path), if libpq is enabled.
175 endpoint: Option<String>,
176 /// The parsed connection endpoint for libpq.
177 connection_endpoint: Option<ConnectionEndpoint>,
178 /// The gRPC endpoint (host:port), if gRPC is enabled.
179 grpc_endpoint: Option<String>,
180 /// Path to the hyperd executable.
181 #[expect(
182 dead_code,
183 reason = "retained for diagnostics and future restart/respawn paths"
184 )]
185 hyperd_path: PathBuf,
186 /// Whether shutdown has been initiated.
187 shutdown_initiated: Arc<AtomicBool>,
188 /// The callback connection to hyperd.
189 /// Keeping this open maintains the "dead man's switch" - when dropped, Hyper shuts down.
190 callback_connection: Option<TcpStream>,
191 /// The listen mode this process was started with.
192 listen_mode: ListenMode,
193 /// The transport mode this process was started with.
194 transport_mode: TransportMode,
195 /// The socket directory for UDS connections (Unix only).
196 /// This directory is automatically cleaned up on drop.
197 #[cfg(unix)]
198 socket_directory: Option<PathBuf>,
199 /// The pipe name for Named Pipe connections (Windows only).
200 #[cfg(windows)]
201 pipe_name: Option<String>,
202 /// The log directory where hyperd writes its log files.
203 log_dir: Option<PathBuf>,
204}
205
206impl HyperProcess {
207 /// Starts a new Hyper server instance.
208 ///
209 /// This method:
210 /// 1. Creates a callback listener on an ephemeral port
211 /// 2. Starts the hyperd process with the callback connection address
212 /// 3. Waits for Hyper to connect back and provide its listen endpoint
213 ///
214 /// # Arguments
215 ///
216 /// * `hyper_path` - Optional path to the hyperd executable. If `None`, searches
217 /// in common locations (`HYPERD_PATH` env var, then known build output paths).
218 /// * `parameters` - Optional parameters for the server.
219 ///
220 /// # Errors
221 ///
222 /// Returns an error if:
223 /// - The hyperd executable cannot be found
224 /// - The callback listener cannot be created
225 /// - The server fails to start
226 /// - Hyper doesn't connect back within the timeout (30 seconds)
227 ///
228 /// # Example
229 ///
230 /// ```no_run
231 /// use hyperdb_api::{HyperProcess, Result};
232 /// use std::path::Path;
233 ///
234 /// fn main() -> Result<()> {
235 /// // Auto-detect hyperd location
236 /// let hyper = HyperProcess::new(None, None)?;
237 ///
238 /// // Or specify explicit path
239 /// let hyper2 = HyperProcess::new(
240 /// Some(Path::new("/path/to/hyperd")),
241 /// None,
242 /// )?;
243 /// Ok(())
244 /// }
245 /// ```
246 pub fn new(hyper_path: Option<&Path>, parameters: Option<&Parameters>) -> Result<Self> {
247 let hyperd_path = match hyper_path {
248 Some(path) => path.to_path_buf(),
249 None => Self::find_hyperd()?,
250 };
251 Self::start_server(&hyperd_path, parameters)
252 }
253
254 /// Resolves the hyperd executable from the `HYPERD_PATH` environment
255 /// variable. The value can point at the executable directly, or at a
256 /// directory containing it.
257 ///
258 /// If `HYPERD_PATH` is unset, returns an error instructing the caller
259 /// to either set it or run the `hyperd-bootstrap` downloader to
260 /// install a pinned release at `.hyperd/current/hyperd`.
261 fn find_hyperd() -> Result<PathBuf> {
262 #[cfg(windows)]
263 const HYPERD_EXE: &str = "hyperd.exe";
264 #[cfg(not(windows))]
265 const HYPERD_EXE: &str = "hyperd";
266
267 let Ok(path_str) = std::env::var("HYPERD_PATH") else {
268 // Walk up from CWD looking for .hyperd/current/<exe> written by
269 // `hyperd-bootstrap download`. This lets `node examples/foo.mjs`
270 // run from any subdirectory of the repo without exporting HYPERD_PATH.
271 if let Ok(cwd) = std::env::current_dir() {
272 let mut dir = cwd.as_path();
273 loop {
274 let candidate = dir.join(".hyperd").join("current").join(HYPERD_EXE);
275 if candidate.exists() {
276 return Ok(candidate);
277 }
278 match dir.parent() {
279 Some(parent) => dir = parent,
280 None => break,
281 }
282 }
283 }
284 return Err(Error::config(
285 "HYPERD_PATH is not set. Point it at a hyperd executable, \
286 or run `make download-hyperd` (or `cargo run -p hyperd-bootstrap -- download`) \
287 to install a pinned release at `.hyperd/current/hyperd`.",
288 ));
289 };
290
291 let path = PathBuf::from(&path_str);
292 if path.is_dir() {
293 let with_exe = path.join(HYPERD_EXE);
294 if with_exe.exists() {
295 return Ok(with_exe);
296 }
297 #[cfg(windows)]
298 {
299 let without_exe = path.join("hyperd");
300 if without_exe.exists() {
301 return Ok(without_exe);
302 }
303 }
304 return Err(Error::config(format!(
305 "HYPERD_PATH set to '{path_str}' but {HYPERD_EXE} not found in that directory"
306 )));
307 }
308 if path.exists() {
309 return Ok(path);
310 }
311 #[cfg(windows)]
312 {
313 let with_ext = PathBuf::from(format!("{path_str}.exe"));
314 if with_ext.exists() {
315 return Ok(with_ext);
316 }
317 }
318 Err(Error::config(format!(
319 "HYPERD_PATH set to '{}' but hyperd executable not found (checked: {})",
320 path_str,
321 path.display()
322 )))
323 }
324
325 /// Starts the hyperd server process with callback connection.
326 fn start_server(hyperd_path: &Path, parameters: Option<&Parameters>) -> Result<Self> {
327 // Verify hyperd exists
328 if !hyperd_path.exists() {
329 return Err(Error::config(format!(
330 "Hyper executable not found at: {}",
331 hyperd_path.display()
332 )));
333 }
334
335 info!(
336 target: "hyperdb_api",
337 path = %hyperd_path.display(),
338 "hyperd-starting"
339 );
340
341 // Create callback listener on ephemeral port
342 // This is the "dead man's switch" - when this connection is closed, Hyper shuts down
343 let callback_listener = TcpListener::bind("127.0.0.1:0")
344 .map_err(|e| Error::connection_with_io("Failed to create callback listener", e))?;
345
346 let callback_port = callback_listener
347 .local_addr()
348 .map_err(|e| Error::connection_with_io("Failed to get callback port", e))?
349 .port();
350
351 // Set a timeout for accepting the callback connection
352 callback_listener.set_nonblocking(false).map_err(|e| {
353 Error::connection_with_io("Failed to set callback listener to blocking", e)
354 })?;
355
356 // Check if user wants to disable default parameters
357 let use_defaults = parameters.map_or(true, |p| !p.contains_key(NO_DEFAULT_PARAMETERS));
358
359 // Get the listen mode
360 let listen_mode = parameters.and_then(|p| p.listen_mode).unwrap_or_default();
361
362 // Get transport mode (default to TCP until UDS performance is validated)
363 // See IPC_IMPLEMENTATION.md for details
364 #[cfg(unix)]
365 let transport_mode = parameters
366 .and_then(|p| p.transport_mode)
367 .unwrap_or(TransportMode::Tcp);
368 #[cfg(windows)]
369 let transport_mode = parameters
370 .and_then(|p| p.transport_mode)
371 .unwrap_or(TransportMode::Tcp);
372 #[cfg(not(any(unix, windows)))]
373 let transport_mode = TransportMode::Tcp;
374
375 // Create socket directory for UDS if needed (Unix only)
376 #[cfg(unix)]
377 let socket_directory: Option<PathBuf> = if transport_mode == TransportMode::Ipc {
378 // Use custom directory if provided, otherwise create temp directory
379 let dir = if let Some(custom_dir) =
380 parameters.and_then(|p| p.domain_socket_directory.as_ref())
381 {
382 custom_dir.clone()
383 } else {
384 // Create a temp directory for the socket
385 let temp_dir = std::env::temp_dir().join(format!("hyper-{}", std::process::id()));
386 std::fs::create_dir_all(&temp_dir).map_err(|e| {
387 Error::connection_with_io("Failed to create socket directory", e)
388 })?;
389 temp_dir
390 };
391 Some(dir)
392 } else {
393 None
394 };
395
396 // On non-Unix platforms there is no UDS socket directory; the variable
397 // is only referenced inside `#[cfg(unix)]` blocks so we do not need a
398 // placeholder binding here.
399
400 // Create pipe name for Named Pipes if needed (Windows only)
401 #[cfg(windows)]
402 let pipe_name: Option<String> = if transport_mode == TransportMode::Ipc {
403 Some(format!("hyper-{}", std::process::id()))
404 } else {
405 None
406 };
407
408 // Build command arguments
409 let mut cmd = Command::new(hyperd_path);
410
411 // The "run" subcommand starts the server
412 cmd.arg("run");
413
414 // Callback connection - Hyper will connect to this and send its endpoint
415 // When this connection is closed, Hyper will shut down gracefully
416 cmd.arg("--callback-connection")
417 .arg(format!("tab.tcp://127.0.0.1:{callback_port}"));
418
419 // Configure listen connection based on mode and transport
420 // Connection string formats:
421 // - tab.tcp://host:port - libpq over TCP
422 // - tab.domain://<dir>/domain/<name> - libpq over Unix Domain Socket
423 // - tcp.grpc://host:port - gRPC
424 #[cfg(unix)]
425 let listen_connection = if transport_mode == TransportMode::Ipc {
426 let socket_dir = socket_directory.as_ref().unwrap();
427 match listen_mode {
428 ListenMode::LibPq => format!("tab.domain://{}/domain/hyper", socket_dir.display()),
429 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
430 ListenMode::Both { grpc_port } => {
431 format!(
432 "tab.domain://{}/domain/hyper,tcp.grpc://127.0.0.1:{}",
433 socket_dir.display(),
434 grpc_port
435 )
436 }
437 }
438 } else {
439 match listen_mode {
440 ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
441 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
442 ListenMode::Both { grpc_port } => {
443 format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
444 }
445 }
446 };
447
448 #[cfg(windows)]
449 let listen_connection = if transport_mode == TransportMode::Ipc {
450 let pname = pipe_name.as_ref().unwrap();
451 match listen_mode {
452 ListenMode::LibPq => format!("tab.pipe://./pipe/{pname}"),
453 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
454 ListenMode::Both { grpc_port } => {
455 format!("tab.pipe://./pipe/{pname},tcp.grpc://127.0.0.1:{grpc_port}")
456 }
457 }
458 } else {
459 match listen_mode {
460 ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
461 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
462 ListenMode::Both { grpc_port } => {
463 format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
464 }
465 }
466 };
467
468 #[cfg(not(any(unix, windows)))]
469 let listen_connection = match listen_mode {
470 ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
471 ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{}", port),
472 ListenMode::Both { grpc_port } => {
473 format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{}", grpc_port)
474 }
475 };
476
477 cmd.arg("--listen-connection").arg(&listen_connection);
478
479 // Helper to check if a parameter is already set by the user
480 let user_has_param =
481 |key: &str| -> bool { parameters.is_some_and(|p| p.contains_key(key)) };
482
483 // Apply default instance parameters (matching C++ HyperProcess behavior)
484 // These can be overridden by user parameters or disabled entirely with NO_DEFAULT_PARAMETERS
485 if use_defaults {
486 // Initial user for the Hyper instance
487 if !user_has_param("init_user") {
488 cmd.arg("--init-user=tableau_internal_user");
489 }
490
491 // Enable gRPC threads if gRPC mode is enabled
492 // Required for gRPC to function - without this, Hyper will fail to start with:
493 // "gRPC threads are required for running gRPC services"
494 // Using 4 threads as a reasonable default (can be overridden by user)
495 if matches!(
496 listen_mode,
497 ListenMode::Grpc { .. } | ListenMode::Both { .. }
498 ) && !user_has_param("grpc_threads")
499 {
500 cmd.arg("--grpc-threads=4");
501 }
502
503 // Enable gRPC result persistence if gRPC mode is enabled
504 // This is required for ADAPTIVE and ASYNC transfer modes
505 if matches!(
506 listen_mode,
507 ListenMode::Grpc { .. } | ListenMode::Both { .. }
508 ) && !user_has_param("grpc_persist_results")
509 {
510 cmd.arg("--grpc-persist-results=true");
511 }
512
513 // Default language setting
514 if !user_has_param("language") {
515 cmd.arg("--language=en_US");
516 }
517
518 // Log configuration: file-based JSON logging
519 if !user_has_param("log_config") {
520 cmd.arg(format!("--log-config={DEFAULT_LOG_CONFIG}"));
521 }
522
523 // Date style for date parsing (Month-Day-Year)
524 if !user_has_param("date_style") {
525 cmd.arg("--date-style=MDY");
526 }
527
528 // Enforce strict date_style (day/month/year ordering must match exactly)
529 if !user_has_param("date_style_lenient") {
530 cmd.arg("--date-style-lenient=false");
531 }
532
533 // Set default log directory to current directory
534 if !user_has_param("log_dir") {
535 if let Ok(cwd) = std::env::current_dir() {
536 cmd.arg(format!("--log-dir={}", cwd.display()));
537 }
538 }
539
540 // Disable password requirement for local development
541 if !user_has_param("no_password") {
542 cmd.arg("--no-password");
543 }
544
545 // Skip license check for local development
546 if !user_has_param("skip_license") {
547 cmd.arg("--skip-license");
548 }
549
550 // Default new .hyper databases to file format version 3, which
551 // adds support for 128-bit NUMERICs (required to ingest parquet
552 // files whose decimal columns are stored as DECIMAL128).
553 // File format 3 has shipped since Hyper 2022.4.0.
554 if !user_has_param("default_database_version") {
555 cmd.arg("--default-database-version=3");
556 }
557 }
558
559 // Add custom parameters from user
560 if let Some(params) = parameters {
561 for (key, value) in params.iter() {
562 // Skip internal/special parameters
563 if key == "callback_connection"
564 || key == "listen_connection"
565 || key == NO_DEFAULT_PARAMETERS
566 {
567 continue;
568 }
569
570 // Convert underscores to dashes for command-line arguments
571 let cli_key = key.replace('_', "-");
572
573 if value.is_empty() {
574 cmd.arg(format!("--{cli_key}"));
575 } else {
576 cmd.arg(format!("--{cli_key}={value}"));
577 }
578 }
579 }
580
581 // Resolve the effective log directory for later access via log_dir()
582 let resolved_log_dir =
583 if let Some(user_dir) = parameters.and_then(|p| p.get("log_dir")).map(PathBuf::from) {
584 Some(user_dir)
585 } else if use_defaults {
586 std::env::current_dir().ok()
587 } else {
588 None
589 };
590
591 // Redirect stdout/stderr to null - we get the endpoint via callback connection
592 cmd.stdout(Stdio::null());
593 cmd.stderr(Stdio::null());
594
595 // On Unix, start hyperd in its own process group so it doesn't receive
596 // Ctrl-C (SIGINT) signals meant for the parent process. This allows the
597 // parent to handle Ctrl-C gracefully and properly shut down hyperd via
598 // the callback connection mechanism.
599 #[cfg(unix)]
600 cmd.process_group(0);
601
602 // On Windows, prevent a console window from flashing when spawning hyperd.
603 // CREATE_NO_WINDOW (0x08000000) suppresses the creation of a visible console.
604 #[cfg(windows)]
605 {
606 use std::os::windows::process::CommandExt;
607 const CREATE_NO_WINDOW: u32 = 0x08000000;
608 cmd.creation_flags(CREATE_NO_WINDOW);
609 }
610
611 // Start the process
612 let child = cmd.spawn().map_err(|e| {
613 Error::connection_with_io(
614 format!("Failed to start Hyper server at {}", hyperd_path.display()),
615 e,
616 )
617 })?;
618
619 // Wait for Hyper to connect back to our callback listener
620 let (callback_connection, callback_endpoint) = Self::wait_for_callback(&callback_listener)?;
621
622 // Determine the endpoints based on listen mode and callback response
623 let (endpoint, grpc_endpoint) = match listen_mode {
624 ListenMode::LibPq => {
625 // Callback returns libpq endpoint
626 (Some(callback_endpoint), None)
627 }
628 ListenMode::Grpc { port } => {
629 // Callback returns gRPC endpoint (with resolved port if auto-assigned)
630 let grpc_ep = if port == 0 {
631 // The callback returns the actual gRPC endpoint with resolved port
632 callback_endpoint
633 } else {
634 format!("127.0.0.1:{port}")
635 };
636 (None, Some(grpc_ep))
637 }
638 ListenMode::Both { grpc_port } => {
639 // Callback returns libpq endpoint, gRPC uses specified port
640 (
641 Some(callback_endpoint),
642 Some(format!("127.0.0.1:{grpc_port}")),
643 )
644 }
645 };
646
647 // Parse connection endpoint if we have a libpq endpoint
648 let connection_endpoint = endpoint.as_ref().map(|ep| {
649 #[cfg(unix)]
650 {
651 // Check if it's a UDS path (contains path separator but no colon with port)
652 if ep.starts_with('/') || socket_directory.is_some() {
653 // UDS endpoint - construct from socket directory
654 if let Some(ref dir) = socket_directory {
655 return ConnectionEndpoint::domain_socket(dir, "hyper");
656 }
657 // Parse as path
658 let path = std::path::Path::new(ep);
659 let dir = path.parent().unwrap_or(std::path::Path::new("/"));
660 let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("hyper");
661 return ConnectionEndpoint::domain_socket(dir, name);
662 }
663 }
664 #[cfg(windows)]
665 {
666 // Check if it's a Named Pipe endpoint
667 if pipe_name.is_some() {
668 if let Some(ref pname) = pipe_name {
669 return ConnectionEndpoint::named_pipe(".", pname);
670 }
671 }
672 }
673 // TCP endpoint (host:port format)
674 let parts: Vec<&str> = ep.split(':').collect();
675 if parts.len() == 2 {
676 if let Ok(port) = parts[1].parse::<u16>() {
677 return ConnectionEndpoint::tcp(parts[0], port);
678 }
679 }
680 ConnectionEndpoint::tcp("localhost", 7483) // fallback
681 });
682
683 Ok(HyperProcess {
684 child: Some(child),
685 endpoint,
686 connection_endpoint,
687 grpc_endpoint,
688 hyperd_path: hyperd_path.to_path_buf(),
689 shutdown_initiated: Arc::new(AtomicBool::new(false)),
690 callback_connection: Some(callback_connection),
691 listen_mode,
692 transport_mode,
693 log_dir: resolved_log_dir,
694 #[cfg(unix)]
695 socket_directory,
696 #[cfg(windows)]
697 pipe_name,
698 })
699 }
700
701 /// Waits for Hyper to connect to our callback listener and send its endpoint.
702 ///
703 /// Protocol:
704 /// - Hyper connects to the callback listener
705 /// - Hyper sends: [1 byte length][N bytes connection descriptor string]
706 /// - Connection descriptor format: "tab.tcp://host:port"
707 fn wait_for_callback(listener: &TcpListener) -> Result<(TcpStream, String)> {
708 // Set a timeout for accepting connections
709 listener.set_nonblocking(true).ok();
710
711 let timeout = Duration::from_secs(60);
712 let start = std::time::Instant::now();
713
714 // Poll for incoming connection with timeout
715 let mut stream = loop {
716 if start.elapsed() > timeout {
717 return Err(Error::internal(
718 "Timeout waiting for Hyper to connect to callback listener. \
719 Hyper may have failed to start - check hyperd logs for details.",
720 ));
721 }
722
723 match listener.accept() {
724 Ok((stream, _addr)) => break stream,
725 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
726 thread::sleep(Duration::from_millis(50));
727 }
728 Err(e) => {
729 return Err(Error::connection_with_io(
730 "Failed to accept callback connection",
731 e,
732 ));
733 }
734 }
735 };
736
737 // Set stream back to blocking for reading
738 stream.set_nonblocking(false).map_err(|e| {
739 Error::connection_with_io("Failed to set callback stream to blocking", e)
740 })?;
741
742 // Set read timeout
743 stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
744
745 // Read the endpoint descriptor from Hyper
746 // Protocol: [1 byte length][N bytes descriptor string]
747 let mut len_buf = [0u8; 1];
748 stream.read_exact(&mut len_buf).map_err(|e| {
749 Error::connection_with_io("Failed to read endpoint length from Hyper", e)
750 })?;
751
752 let len = len_buf[0] as usize;
753 if len == 0 {
754 return Err(Error::internal("Hyper sent empty endpoint descriptor"));
755 }
756
757 let mut descriptor_buf = vec![0u8; len];
758 stream.read_exact(&mut descriptor_buf).map_err(|e| {
759 Error::connection_with_io("Failed to read endpoint descriptor from Hyper", e)
760 })?;
761
762 let descriptor = String::from_utf8(descriptor_buf)
763 .map_err(|e| Error::internal(format!("Invalid UTF-8 in endpoint descriptor: {e}")))?;
764
765 // Trim null bytes and whitespace that Hyper may include
766 let descriptor = descriptor.trim_matches(|c: char| c == '\0' || c.is_whitespace());
767
768 // Parse the connection descriptor (format: "tab.tcp://host:port")
769 let endpoint = Self::parse_connection_descriptor(descriptor)?;
770
771 // Clear read timeout for the connection we'll keep open
772 stream.set_read_timeout(None).ok();
773
774 info!(
775 target: "hyperdb_api",
776 %endpoint,
777 "hyperd-started"
778 );
779
780 Ok((stream, endpoint))
781 }
782
783 /// Parses a connection descriptor to extract host:port, socket path, or pipe path.
784 ///
785 /// Input formats:
786 /// - "tab.tcp://host:port" → "host:port"
787 /// - "tab.domain://<dir>/domain/<name>" → "<dir>/domain/<name>" (socket path)
788 /// - "tab.pipe://<host>/pipe/<name>" → "<host>/pipe/<name>" (named pipe)
789 /// - "tcp.grpc://host:port" → "host:port"
790 fn parse_connection_descriptor(descriptor: &str) -> Result<String> {
791 // Handle domain socket format
792 if let Some(rest) = descriptor.strip_prefix("tab.domain://") {
793 // Return the full path for UDS
794 if let Some(idx) = rest.find("/domain/") {
795 let dir = &rest[..idx];
796 let name = &rest[idx + 8..]; // "/domain/".len() == 8
797 let socket_path = format!("{dir}/domain/{name}");
798 return Ok(socket_path);
799 }
800 return Ok(rest.to_string());
801 }
802
803 // Handle named pipe format
804 if let Some(rest) = descriptor.strip_prefix("tab.pipe://") {
805 // Format: tab.pipe://<host>/pipe/<name>
806 // Return as pipe path: \\<host>\pipe\<name>
807 if let Some(idx) = rest.find("/pipe/") {
808 let host = &rest[..idx];
809 let name = &rest[idx + 6..]; // "/pipe/".len() == 6
810 let pipe_path = format!(r"\\{host}\pipe\{name}");
811 return Ok(pipe_path);
812 }
813 return Ok(rest.to_string());
814 }
815
816 // Handle TCP prefixes
817 let without_prefix = descriptor
818 .strip_prefix("tab.tcp://")
819 .or_else(|| descriptor.strip_prefix("tcp.grpc://"))
820 .or_else(|| descriptor.strip_prefix("tcp.grpctls://"))
821 .or_else(|| descriptor.strip_prefix("tcp://"))
822 .unwrap_or(descriptor);
823
824 // Validate it looks like host:port
825 if without_prefix.contains(':') && !without_prefix.is_empty() {
826 Ok(without_prefix.to_string())
827 } else {
828 Err(Error::internal(format!(
829 "Invalid connection descriptor format: '{descriptor}'. Expected '<scheme>://host:port' or 'tab.domain://<dir>/domain/<name>'"
830 )))
831 }
832 }
833
834 /// Returns the libpq endpoint for connecting to this instance.
835 ///
836 /// The endpoint is in the format "host:port" (e.g., "localhost:54321").
837 ///
838 /// Returns `None` if the process was started in gRPC-only mode.
839 /// Use [`grpc_endpoint`](Self::grpc_endpoint) for gRPC connections.
840 #[must_use]
841 pub fn endpoint(&self) -> Option<&str> {
842 self.endpoint.as_deref()
843 }
844
845 /// Returns the libpq endpoint, or an error if not available.
846 ///
847 /// This is a convenience method to avoid `unwrap()` calls when you need
848 /// the endpoint and want proper error handling.
849 ///
850 /// # Errors
851 ///
852 /// Returns an error if this process was started in gRPC-only mode.
853 ///
854 /// # Example
855 ///
856 /// ```no_run
857 /// use hyperdb_api::{HyperProcess, Result};
858 ///
859 /// fn main() -> Result<()> {
860 /// let hyper = HyperProcess::new(None, None)?;
861 /// let endpoint = hyper.require_endpoint()?; // No unwrap() needed!
862 /// println!("Server running at: {}", endpoint);
863 /// Ok(())
864 /// }
865 /// ```
866 pub fn require_endpoint(&self) -> crate::error::Result<&str> {
867 self.endpoint().ok_or_else(|| {
868 crate::error::Error::internal(
869 "HyperProcess does not have a libpq endpoint (gRPC-only mode). \
870 Use grpc_endpoint() instead or start with LibPq or Both listen mode.",
871 )
872 })
873 }
874
875 /// Returns the gRPC endpoint for connecting to this instance.
876 ///
877 /// The endpoint is in the format "host:port" (e.g., "127.0.0.1:7484").
878 ///
879 /// Returns `None` if the process was started in libpq-only mode.
880 #[must_use]
881 pub fn grpc_endpoint(&self) -> Option<&str> {
882 self.grpc_endpoint.as_deref()
883 }
884
885 /// Returns the gRPC endpoint, or an error if not available.
886 ///
887 /// This is a convenience method to avoid `unwrap()` calls when you need
888 /// the gRPC endpoint and want proper error handling.
889 ///
890 /// # Errors
891 ///
892 /// Returns an error if this process was started in libpq-only mode.
893 pub fn require_grpc_endpoint(&self) -> crate::error::Result<&str> {
894 self.grpc_endpoint().ok_or_else(|| {
895 crate::error::Error::internal(
896 "HyperProcess does not have a gRPC endpoint (libpq-only mode). \
897 Use endpoint() instead or start with Grpc or Both listen mode.",
898 )
899 })
900 }
901
902 /// Returns the gRPC endpoint as a full URL suitable for gRPC clients.
903 ///
904 /// Returns the endpoint prefixed with "http://" (e.g., "<http://127.0.0.1:7484>").
905 /// Returns `None` if the process was started in libpq-only mode.
906 #[must_use]
907 pub fn grpc_url(&self) -> Option<String> {
908 self.grpc_endpoint.as_ref().map(|ep| format!("http://{ep}"))
909 }
910
911 /// Returns the gRPC URL, or an error if not available.
912 ///
913 /// This is a convenience method to avoid `unwrap()` calls when you need
914 /// the gRPC URL and want proper error handling.
915 ///
916 /// # Errors
917 ///
918 /// Returns an error if this process was started in libpq-only mode.
919 pub fn require_grpc_url(&self) -> crate::error::Result<String> {
920 Ok(format!("http://{}", self.require_grpc_endpoint()?))
921 }
922
923 /// Returns the listen mode this process was started with.
924 #[must_use]
925 pub fn listen_mode(&self) -> ListenMode {
926 self.listen_mode
927 }
928
929 /// Returns the transport mode this process was started with.
930 #[must_use]
931 pub fn transport_mode(&self) -> TransportMode {
932 self.transport_mode
933 }
934
935 /// Returns the connection endpoint for this process.
936 ///
937 /// This returns a [`ConnectionEndpoint`] that can be used to connect
938 /// to this Hyper instance via TCP, Unix Domain Socket, or Named Pipe.
939 #[must_use]
940 pub fn connection_endpoint(&self) -> Option<&ConnectionEndpoint> {
941 self.connection_endpoint.as_ref()
942 }
943
944 /// Returns the log directory where hyperd writes its log files.
945 ///
946 /// The log file is typically `hyperd.log` within this directory.
947 /// Returns `None` if the log directory could not be determined (e.g.,
948 /// default parameters were disabled and no `log_dir` was specified).
949 ///
950 /// This is useful for setting up [`QueryStatsProvider`](crate::QueryStatsProvider)
951 /// implementations that parse the Hyper log file.
952 #[must_use]
953 pub fn log_dir(&self) -> Option<&Path> {
954 self.log_dir.as_deref()
955 }
956
957 /// Returns the socket directory used for UDS connections (Unix only).
958 ///
959 /// Returns `None` if the process is using TCP or if no socket directory was created.
960 #[cfg(unix)]
961 #[must_use]
962 pub fn socket_directory(&self) -> Option<&Path> {
963 self.socket_directory.as_deref()
964 }
965
966 /// Returns the pipe name used for Named Pipe connections (Windows only).
967 ///
968 /// Returns `None` if the process is using TCP.
969 #[cfg(windows)]
970 pub fn pipe_name(&self) -> Option<&str> {
971 self.pipe_name.as_deref()
972 }
973
974 /// Returns the process ID of the Hyper server.
975 #[must_use]
976 pub fn pid(&self) -> Option<u32> {
977 self.child.as_ref().map(std::process::Child::id)
978 }
979
980 /// Returns whether the Hyper server process is still running.
981 #[must_use]
982 pub fn is_running(&self) -> bool {
983 if let Some(ref child) = self.child {
984 // Try to check if process is alive without waiting
985 #[cfg(unix)]
986 {
987 match Command::new("kill")
988 .args(["-0", &child.id().to_string()])
989 .output()
990 {
991 Ok(output) => output.status.success(),
992 Err(_) => false,
993 }
994 }
995 #[cfg(not(unix))]
996 {
997 // On Windows, we can't easily check without waiting
998 // Assume it's running if we have a handle
999 let _ = child; // Silence unused variable warning
1000 true
1001 }
1002 } else {
1003 false
1004 }
1005 }
1006
1007 /// Returns true if the hyperd child process has exited (or no child exists).
1008 ///
1009 /// Uses [`std::process::Child::try_wait`] under the hood, which is correct
1010 /// on both Unix and Windows. On Unix this also reaps any zombie state as a
1011 /// side effect — a hyperd that has been SIGKILLed but not yet `wait()`ed
1012 /// on by the parent will be observed as exited and cleaned up here.
1013 ///
1014 /// Prefer this over [`Self::is_running`] when the caller owns the
1015 /// `HyperProcess` mutably and needs an authoritative liveness signal.
1016 /// `is_running` uses `kill -0` on Unix (which incorrectly reports zombies
1017 /// as alive) and is a no-op on Windows.
1018 pub fn has_exited(&mut self) -> bool {
1019 match self.child.as_mut() {
1020 Some(child) => match child.try_wait() {
1021 Ok(Some(_status)) => true,
1022 Ok(None) => false,
1023 Err(_) => true,
1024 },
1025 None => true,
1026 }
1027 }
1028
1029 /// Shuts down the Hyper server gracefully with a timeout.
1030 ///
1031 /// This closes the callback connection, which signals Hyper to shut down gracefully.
1032 /// If Hyper doesn't exit within the timeout, it will be forcefully terminated.
1033 ///
1034 /// # Arguments
1035 ///
1036 /// * `timeout` - Maximum time to wait for graceful shutdown before force-killing.
1037 ///
1038 /// # Errors
1039 ///
1040 /// Returns an error if the shutdown fails.
1041 pub fn shutdown_timeout(mut self, timeout: Duration) -> Result<()> {
1042 self.shutdown_initiated.store(true, Ordering::SeqCst);
1043 self.do_shutdown(Some(timeout))
1044 }
1045
1046 /// Shuts down the Hyper server gracefully, waiting indefinitely.
1047 ///
1048 /// This closes the callback connection and waits for Hyper to exit.
1049 /// Use [`shutdown_timeout`](Self::shutdown_timeout) if you need a timeout.
1050 ///
1051 /// # Errors
1052 ///
1053 /// Returns an error if the shutdown fails.
1054 pub fn shutdown_graceful(mut self) -> Result<()> {
1055 self.shutdown_initiated.store(true, Ordering::SeqCst);
1056 self.do_shutdown(None)
1057 }
1058
1059 /// Closes the callback connection to signal Hyper to shut down.
1060 ///
1061 /// This is the graceful shutdown mechanism - Hyper monitors the callback connection
1062 /// and will initiate shutdown when it's closed.
1063 fn close_callback_connection(&mut self) {
1064 if let Some(conn) = self.callback_connection.take() {
1065 // Gracefully shutdown both directions
1066 let _ = conn.shutdown(Shutdown::Both);
1067 // Connection is dropped here, closing the socket
1068 }
1069 }
1070
1071 /// Internal shutdown implementation.
1072 fn do_shutdown(&mut self, timeout: Option<Duration>) -> Result<()> {
1073 info!(target: "hyperdb_api", "hyperd-shutdown");
1074
1075 // Step 1: Close the callback connection to signal graceful shutdown
1076 // Hyper will detect this and begin shutting down
1077 self.close_callback_connection();
1078
1079 if let Some(mut child) = self.child.take() {
1080 // Step 2: Wait for the process to exit
1081 let wait_result = if let Some(timeout) = timeout {
1082 // Wait with timeout
1083 let start = std::time::Instant::now();
1084 loop {
1085 match child.try_wait() {
1086 Ok(Some(status)) => break Ok(status),
1087 Ok(None) => {
1088 if start.elapsed() > timeout {
1089 // Step 3: Force kill ONLY after timeout
1090 // This should rarely happen if Hyper is healthy
1091 #[cfg(unix)]
1092 {
1093 // Try SIGTERM first
1094 let _ = Command::new("kill")
1095 .args(["-TERM", &child.id().to_string()])
1096 .output();
1097 thread::sleep(Duration::from_millis(100));
1098 }
1099 // Then force kill
1100 let _ = child.kill();
1101 break child.wait().map_err(|e| {
1102 Error::connection_with_io("Failed to wait for hyperd", e)
1103 });
1104 }
1105 thread::sleep(Duration::from_millis(100));
1106 }
1107 Err(e) => {
1108 break Err(Error::connection_with_io("Failed to wait for hyperd", e))
1109 }
1110 }
1111 }
1112 } else {
1113 // Wait indefinitely
1114 child
1115 .wait()
1116 .map_err(|e| Error::connection_with_io("Failed to wait for hyperd", e))
1117 };
1118
1119 wait_result?;
1120 }
1121
1122 Ok(())
1123 }
1124}
1125
1126impl Drop for HyperProcess {
1127 fn drop(&mut self) {
1128 if !self.shutdown_initiated.load(Ordering::SeqCst) {
1129 // Try to gracefully shutdown with a short timeout
1130 let _ = self.do_shutdown(Some(Duration::from_secs(5)));
1131 }
1132
1133 // Clean up socket directory if we created one
1134 #[cfg(unix)]
1135 if let Some(ref dir) = self.socket_directory {
1136 // Only clean up if it's a temp directory we created (contains our PID)
1137 let dir_name = dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
1138 if dir_name.starts_with("hyper-") {
1139 let _ = std::fs::remove_dir_all(dir);
1140 }
1141 }
1142 }
1143}
1144
1145// SAFETY: `HyperProcess` owns its `std::process::Child` handle and (optionally)
1146// a TCP connection. Both are themselves `Send`, and no field holds thread-local
1147// state or a non-`Send` raw pointer. Ownership of a `HyperProcess` therefore
1148// transfers cleanly across thread boundaries.
1149unsafe impl Send for HyperProcess {}
1150
1151/// Special parameter key that disables default instance parameters when present.
1152///
1153/// By default, [`HyperProcess`] starts hyperd with a set of sensible default parameters
1154/// (matching the C++ `HyperProcess` behavior). If you need full control over all parameters,
1155/// include this key in your [`Parameters`] to disable all defaults.
1156pub(crate) const NO_DEFAULT_PARAMETERS: &str = "no_default_parameters";
1157
1158/// Default log configuration for hyperd: file-based JSON logging.
1159const DEFAULT_LOG_CONFIG: &str = "file,json,all,hyperd,0";
1160
1161/// Parameters for configuring the Hyper server.
1162///
1163/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1164/// (matching the C++ `HyperProcess` behavior):
1165///
1166/// | Parameter | Default Value | Description |
1167/// |-----------|---------------|-------------|
1168/// | `init_user` | `tableau_internal_user` | Initial user for the Hyper instance |
1169/// | `language` | `en_US` | Default language setting |
1170/// | `log_config` | `file,json,all,hyperd,0` | Log configuration |
1171/// | `date_style` | `MDY` | Date format (Month-Day-Year) |
1172/// | `date_style_lenient` | `false` | Strict date parsing |
1173/// | `log_dir` | Current directory | Log file directory |
1174/// | `no_password` | (flag) | Disable password requirement |
1175/// | `skip_license` | (flag) | Skip license check |
1176/// | `default_database_version` | `3` | File format version for newly created `.hyper` databases (v3 adds 128-bit NUMERIC support, required for DECIMAL128 parquet columns) |
1177///
1178/// To disable these defaults, add the `no_default_parameters` key (for example
1179/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1180///
1181/// # Listen Modes
1182///
1183/// Use [`set_listen_mode`](Parameters::set_listen_mode) to configure which protocols Hyper listens on:
1184///
1185/// ```
1186/// use hyperdb_api::{ListenMode, Parameters};
1187///
1188/// // gRPC only (for Arrow-based queries)
1189/// let mut params = Parameters::new();
1190/// params.set_listen_mode(ListenMode::Grpc { port: 0 });
1191///
1192/// // Both libpq and gRPC
1193/// let mut params = Parameters::new();
1194/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
1195/// ```
1196///
1197/// # Example
1198///
1199/// ```
1200/// use hyperdb_api::Parameters;
1201///
1202/// let mut params = Parameters::new();
1203/// params.set("log_file_size_limit", "100k");
1204/// params.set("log_file_max_count", "7");
1205/// ```
1206///
1207/// # Transport Modes
1208///
1209/// Use [`set_transport_mode`](Parameters::set_transport_mode) to control whether Hyper uses
1210/// TCP or IPC (Unix Domain Sockets):
1211///
1212/// ```
1213/// use hyperdb_api::{Parameters, TransportMode};
1214///
1215/// let mut params = Parameters::new();
1216/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1217/// ```
1218///
1219/// Transport mode for `HyperProcess` connections.
1220///
1221/// Controls whether the server uses TCP or Unix Domain Sockets (IPC) for connections.
1222/// On Unix systems, IPC is the default for better local performance.
1223/// On Windows, TCP is always used.
1224#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1225pub enum TransportMode {
1226 /// Use IPC (Unix Domain Sockets on Unix, Named Pipes on Windows).
1227 /// This is the default mode and provides better performance for local connections.
1228 #[default]
1229 Ipc,
1230
1231 /// Use TCP/IP connections.
1232 /// Required when connecting from remote clients or when IPC is not available.
1233 Tcp,
1234}
1235
1236/// Parameters for configuring the Hyper server.
1237///
1238/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1239/// (matching the C++ `HyperProcess` behavior). You can override these defaults or disable
1240/// them entirely by adding the `no_default_parameters` key (for example
1241/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1242///
1243/// # Transport Modes
1244///
1245/// Use [`set_transport_mode`](Self::set_transport_mode) to control whether Hyper uses
1246/// TCP or IPC (Unix Domain Sockets on Unix systems).
1247///
1248/// # Example
1249///
1250/// ```
1251/// use hyperdb_api::{Parameters, TransportMode};
1252///
1253/// let mut params = Parameters::new();
1254/// params.set("log_file_size_limit", "100k");
1255/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1256/// ```
1257#[derive(Debug, Clone, Default)]
1258pub struct Parameters {
1259 values: Vec<(String, String)>,
1260 /// The listen mode for the Hyper server.
1261 pub(crate) listen_mode: Option<ListenMode>,
1262 /// The transport mode (TCP or IPC/UDS).
1263 pub(crate) transport_mode: Option<TransportMode>,
1264 /// Custom domain socket directory (Unix only).
1265 #[cfg(unix)]
1266 pub(crate) domain_socket_directory: Option<PathBuf>,
1267}
1268
1269impl Parameters {
1270 /// Creates a new empty Parameters instance.
1271 #[must_use]
1272 pub fn new() -> Self {
1273 Parameters {
1274 values: Vec::new(),
1275 listen_mode: None,
1276 transport_mode: None,
1277 #[cfg(unix)]
1278 domain_socket_directory: None,
1279 }
1280 }
1281
1282 /// Sets the transport mode (TCP or IPC/UDS).
1283 ///
1284 /// By default, `HyperProcess` uses IPC (Unix Domain Sockets on Unix) for better
1285 /// performance. Use `TransportMode::Tcp` if you need TCP connections.
1286 ///
1287 /// # Example
1288 ///
1289 /// ```
1290 /// use hyperdb_api::{Parameters, TransportMode};
1291 ///
1292 /// let mut params = Parameters::new();
1293 /// params.set_transport_mode(TransportMode::Tcp); // Use TCP instead of IPC
1294 /// ```
1295 pub fn set_transport_mode(&mut self, mode: TransportMode) -> &mut Self {
1296 self.transport_mode = Some(mode);
1297 self
1298 }
1299
1300 /// Returns the configured transport mode.
1301 #[must_use]
1302 pub fn transport_mode(&self) -> Option<TransportMode> {
1303 self.transport_mode
1304 }
1305
1306 /// Sets a custom domain socket directory (Unix only).
1307 ///
1308 /// By default, `HyperProcess` creates sockets in a temporary directory.
1309 /// Use this to specify a custom location.
1310 #[cfg(unix)]
1311 pub fn set_domain_socket_directory(&mut self, dir: impl Into<PathBuf>) -> &mut Self {
1312 self.domain_socket_directory = Some(dir.into());
1313 self
1314 }
1315
1316 /// Returns the configured domain socket directory (Unix only).
1317 #[cfg(unix)]
1318 #[must_use]
1319 pub fn domain_socket_directory(&self) -> Option<&Path> {
1320 self.domain_socket_directory.as_deref()
1321 }
1322
1323 /// Sets a parameter value.
1324 ///
1325 /// # Arguments
1326 ///
1327 /// * `key` - The parameter name.
1328 /// * `value` - The parameter value (empty string for flags).
1329 pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
1330 let key = key.into();
1331 let value = value.into();
1332
1333 // Update existing or add new
1334 if let Some(entry) = self.values.iter_mut().find(|(k, _)| k == &key) {
1335 entry.1 = value;
1336 } else {
1337 self.values.push((key, value));
1338 }
1339
1340 self
1341 }
1342
1343 /// Sets the listen mode for the Hyper server.
1344 ///
1345 /// This controls which protocols the server listens on:
1346 /// - [`ListenMode::LibPq`]: `PostgreSQL` wire protocol only (default)
1347 /// - [`ListenMode::Grpc`]: gRPC protocol only (query-only, Arrow results)
1348 /// - [`ListenMode::Both`]: Both protocols enabled
1349 ///
1350 /// # Example
1351 ///
1352 /// ```
1353 /// use hyperdb_api::{ListenMode, Parameters};
1354 ///
1355 /// let mut params = Parameters::new();
1356 /// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // Auto-assign port
1357 /// ```
1358 pub fn set_listen_mode(&mut self, mode: ListenMode) -> &mut Self {
1359 self.listen_mode = Some(mode);
1360 self
1361 }
1362
1363 /// Returns the configured listen mode, if any.
1364 #[must_use]
1365 pub fn listen_mode(&self) -> Option<ListenMode> {
1366 self.listen_mode
1367 }
1368
1369 /// Gets a parameter value.
1370 #[must_use]
1371 pub fn get(&self, key: &str) -> Option<&str> {
1372 self.values
1373 .iter()
1374 .find(|(k, _)| k == key)
1375 .map(|(_, v)| v.as_str())
1376 }
1377
1378 /// Returns whether the parameters contain the given key.
1379 #[must_use]
1380 pub fn contains_key(&self, key: &str) -> bool {
1381 self.values.iter().any(|(k, _)| k == key)
1382 }
1383
1384 /// Returns an iterator over the parameters.
1385 pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
1386 self.values.iter().map(|(k, v)| (k.as_str(), v.as_str()))
1387 }
1388
1389 /// Returns whether the parameters are empty.
1390 #[must_use]
1391 pub fn is_empty(&self) -> bool {
1392 self.values.is_empty()
1393 }
1394
1395 /// Returns the number of parameters.
1396 #[must_use]
1397 pub fn len(&self) -> usize {
1398 self.values.len()
1399 }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404 use super::*;
1405
1406 #[test]
1407 fn test_parameters() {
1408 let mut params = Parameters::new();
1409 params.set("key1", "value1");
1410 params.set("key2", "value2");
1411
1412 assert_eq!(params.get("key1"), Some("value1"));
1413 assert_eq!(params.get("key2"), Some("value2"));
1414 assert_eq!(params.get("key3"), None);
1415 assert_eq!(params.len(), 2);
1416 }
1417
1418 #[test]
1419 fn test_parameters_update() {
1420 let mut params = Parameters::new();
1421 params.set("key", "value1");
1422 params.set("key", "value2");
1423
1424 assert_eq!(params.get("key"), Some("value2"));
1425 assert_eq!(params.len(), 1);
1426 }
1427
1428 #[test]
1429 fn test_parse_connection_descriptor() {
1430 assert_eq!(
1431 HyperProcess::parse_connection_descriptor("tab.tcp://localhost:12345").unwrap(),
1432 "localhost:12345"
1433 );
1434 assert_eq!(
1435 HyperProcess::parse_connection_descriptor("tab.tcp://127.0.0.1:7483").unwrap(),
1436 "127.0.0.1:7483"
1437 );
1438 assert_eq!(
1439 HyperProcess::parse_connection_descriptor("tcp://localhost:8080").unwrap(),
1440 "localhost:8080"
1441 );
1442 // Already in host:port format
1443 assert_eq!(
1444 HyperProcess::parse_connection_descriptor("localhost:9999").unwrap(),
1445 "localhost:9999"
1446 );
1447 }
1448
1449 #[test]
1450 fn test_parse_connection_descriptor_named_pipe() {
1451 assert_eq!(
1452 HyperProcess::parse_connection_descriptor("tab.pipe://./pipe/hyper-12345").unwrap(),
1453 r"\\.\pipe\hyper-12345"
1454 );
1455 assert_eq!(
1456 HyperProcess::parse_connection_descriptor("tab.pipe://server1/pipe/mydb").unwrap(),
1457 r"\\server1\pipe\mydb"
1458 );
1459 }
1460
1461 #[test]
1462 fn test_parse_connection_descriptor_invalid() {
1463 assert!(HyperProcess::parse_connection_descriptor("").is_err());
1464 assert!(HyperProcess::parse_connection_descriptor("invalid").is_err());
1465 }
1466
1467 #[test]
1468 fn test_parameters_contains_key() {
1469 let mut params = Parameters::new();
1470 params.set("key1", "value1");
1471
1472 assert!(params.contains_key("key1"));
1473 assert!(!params.contains_key("key2"));
1474 }
1475
1476 #[test]
1477 fn test_no_default_parameters_constant() {
1478 // Verify the constant matches what C++ uses
1479 assert_eq!(NO_DEFAULT_PARAMETERS, "no_default_parameters");
1480 }
1481
1482 #[test]
1483 fn test_parameters_with_no_defaults() {
1484 let mut params = Parameters::new();
1485 params.set(NO_DEFAULT_PARAMETERS, "");
1486 params.set("init_user", "custom_user");
1487
1488 assert!(params.contains_key(NO_DEFAULT_PARAMETERS));
1489 assert_eq!(params.get("init_user"), Some("custom_user"));
1490 }
1491}