Skip to main content

hyperdb_api/
process.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Hyper server process management.
5//!
6//! This module provides [`HyperProcess`] for spawning and managing local hyperd server instances.
7//!
8//! # Callback Connection Architecture
9//!
10//! The `HyperProcess` uses a **callback connection** mechanism for reliable process lifecycle
11//! management. This works as follows:
12//!
13//! 1. **Startup**: The client creates a TCP listener on an ephemeral port (the "callback proxy")
14//!    and passes this address to hyperd via `--callback-connection`. Hyper connects back to this
15//!    listener and sends its actual listen endpoint over this connection.
16//!
17//! 2. **Runtime**: The callback connection remains open for the lifetime of the `HyperProcess`.
18//!    This connection acts as a "dead man's switch" - Hyper monitors it continuously.
19//!
20//! 3. **Graceful Shutdown**: When `HyperProcess` is dropped or explicitly shut down, the callback
21//!    connection is closed. Hyper detects this and initiates graceful shutdown automatically.
22//!
23//! 4. **Crash Safety**: If the client process crashes or is killed, the OS automatically closes
24//!    the TCP connection. Hyper detects this and shuts down gracefully, preventing orphan
25//!    processes. This is the key advantage over signal-based shutdown.
26//!
27//! # Protocol Details
28//!
29//! The callback connection protocol is simple:
30//! - Client listens on `127.0.0.1:<ephemeral_port>`
31//! - Client starts hyperd with `--callback-connection=tab.tcp://127.0.0.1:<port>`
32//! - Hyper connects to this address and sends: `[1 byte length][N bytes descriptor]`
33//! - The descriptor is the actual listen endpoint (e.g., `tab.tcp://localhost:54321`)
34//! - Connection stays open until shutdown is desired
35//!
36//! # Listen Modes
37//!
38//! `HyperProcess` supports different listen modes via [`ListenMode`]:
39//!
40//! - **`LibPq`** (default): `PostgreSQL` wire protocol for full read/write access
41//! - **Grpc**: gRPC protocol for query-only Arrow-based access
42//! - **Both**: Both protocols enabled (libpq for read/write, gRPC for Arrow queries)
43//!
44//! ```no_run
45//! use hyperdb_api::{HyperProcess, ListenMode, Parameters, Result};
46//!
47//! fn main() -> Result<()> {
48//!     // gRPC only
49//!     let mut params = Parameters::new();
50//!     params.set_listen_mode(ListenMode::Grpc { port: 0 });
51//!     let hyper = HyperProcess::new(None, Some(&params))?;
52//!     println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
53//!
54//!     // Both libpq and gRPC
55//!     let mut params = Parameters::new();
56//!     params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
57//!     let hyper = HyperProcess::new(None, Some(&params))?;
58//!     println!("libpq endpoint: {}", hyper.endpoint().unwrap());
59//!     println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
60//!     Ok(())
61//! }
62//! ```
63
64use std::io::Read;
65use std::net::{Shutdown, TcpListener, TcpStream};
66use std::path::{Path, PathBuf};
67use std::process::{Child, Command, Stdio};
68use std::sync::atomic::{AtomicBool, Ordering};
69use std::sync::Arc;
70use std::thread;
71use std::time::Duration;
72
73#[cfg(unix)]
74use std::os::unix::process::CommandExt;
75
76#[cfg(any(unix, windows))]
77use hyperdb_api_core::client::ConnectionEndpoint;
78
79use tracing::info;
80
81use crate::error::{Error, Result};
82
83/// Specifies which protocols `HyperProcess` should listen on.
84///
85/// # Examples
86///
87/// ```
88/// use hyperdb_api::{ListenMode, Parameters};
89///
90/// // LibPq only (default) - for full read/write access
91/// let mut params = Parameters::new();
92/// params.set_listen_mode(ListenMode::LibPq);
93///
94/// // gRPC only - for Arrow-based query access
95/// let mut params = Parameters::new();
96/// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // auto-assign port
97///
98/// // Both protocols - libpq for writes, gRPC for Arrow queries
99/// let mut params = Parameters::new();
100/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
101/// ```
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
103pub enum ListenMode {
104    /// `PostgreSQL` wire protocol only (default).
105    ///
106    /// This is the traditional connection mode that supports all Hyper features
107    /// including read and write operations.
108    #[default]
109    LibPq,
110
111    /// gRPC protocol only.
112    ///
113    /// This mode is optimized for query-only workloads and returns results in
114    /// Arrow IPC format. Note that gRPC mode does not support write operations.
115    ///
116    /// Set `port` to 0 to auto-assign an available port.
117    Grpc {
118        /// The port to listen on (0 for auto-assign).
119        port: u16,
120    },
121
122    /// Both libpq and gRPC protocols.
123    ///
124    /// This mode enables full read/write access via libpq while also providing
125    /// gRPC access for Arrow-based queries. The libpq port is auto-assigned,
126    /// while the gRPC port is specified.
127    ///
128    /// Note: When using `Both` mode, the callback connection returns the libpq
129    /// endpoint. Use `HyperProcess::grpc_endpoint()` to get the gRPC endpoint.
130    Both {
131        /// The gRPC port to listen on (cannot be 0 - must be a specific port).
132        grpc_port: u16,
133    },
134}
135
136/// A running Hyper server instance.
137///
138/// This struct manages the lifecycle of a local Hyper server process using a callback
139/// connection for reliable shutdown. The server is automatically shut down when this
140/// object is dropped.
141///
142/// # Callback Connection (Dead Man's Switch)
143///
144/// Unlike traditional process management that relies on signals, `HyperProcess` maintains
145/// a TCP connection to the Hyper server. When this connection is closed (either explicitly
146/// or because the client process exits), Hyper automatically shuts down gracefully.
147///
148/// This provides several benefits:
149/// - **No orphan processes**: If your application crashes, Hyper shuts down automatically
150/// - **Graceful shutdown**: Hyper can flush data and clean up properly
151/// - **Cross-platform**: Works reliably on macOS, Linux, and Windows
152///
153/// # Example
154///
155/// ```no_run
156/// use hyperdb_api::{HyperProcess, Result};
157///
158/// fn main() -> Result<()> {
159///     // Start a Hyper server (auto-detect hyperd location)
160///     let hyper = HyperProcess::new(None, None)?;
161///
162///     println!("Hyper server running at: {}", hyper.endpoint().unwrap());
163///
164///     // Server automatically shuts down when `hyper` goes out of scope
165///     // via the callback connection mechanism
166///     Ok(())
167/// }
168/// ```
169#[must_use = "HyperProcess will shut down when dropped; store it to keep the server running"]
170#[derive(Debug)]
171pub struct HyperProcess {
172    /// The child process handle.
173    child: Option<Child>,
174    /// The libpq endpoint descriptor (host:port or socket path), if libpq is enabled.
175    endpoint: Option<String>,
176    /// The parsed connection endpoint for libpq.
177    connection_endpoint: Option<ConnectionEndpoint>,
178    /// The gRPC endpoint (host:port), if gRPC is enabled.
179    grpc_endpoint: Option<String>,
180    /// Path to the hyperd executable.
181    #[expect(
182        dead_code,
183        reason = "retained for diagnostics and future restart/respawn paths"
184    )]
185    hyperd_path: PathBuf,
186    /// Whether shutdown has been initiated.
187    shutdown_initiated: Arc<AtomicBool>,
188    /// The callback connection to hyperd.
189    /// Keeping this open maintains the "dead man's switch" - when dropped, Hyper shuts down.
190    callback_connection: Option<TcpStream>,
191    /// The listen mode this process was started with.
192    listen_mode: ListenMode,
193    /// The transport mode this process was started with.
194    transport_mode: TransportMode,
195    /// The socket directory for UDS connections (Unix only).
196    /// This directory is automatically cleaned up on drop.
197    #[cfg(unix)]
198    socket_directory: Option<PathBuf>,
199    /// The pipe name for Named Pipe connections (Windows only).
200    #[cfg(windows)]
201    pipe_name: Option<String>,
202    /// The log directory where hyperd writes its log files.
203    log_dir: Option<PathBuf>,
204}
205
206impl HyperProcess {
207    /// Starts a new Hyper server instance.
208    ///
209    /// This method:
210    /// 1. Creates a callback listener on an ephemeral port
211    /// 2. Starts the hyperd process with the callback connection address
212    /// 3. Waits for Hyper to connect back and provide its listen endpoint
213    ///
214    /// # Arguments
215    ///
216    /// * `hyper_path` - Optional path to the hyperd executable. If `None`, searches
217    ///   in common locations (`HYPERD_PATH` env var, then known build output paths).
218    /// * `parameters` - Optional parameters for the server.
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if:
223    /// - The hyperd executable cannot be found
224    /// - The callback listener cannot be created
225    /// - The server fails to start
226    /// - Hyper doesn't connect back within the timeout (30 seconds)
227    ///
228    /// # Example
229    ///
230    /// ```no_run
231    /// use hyperdb_api::{HyperProcess, Result};
232    /// use std::path::Path;
233    ///
234    /// fn main() -> Result<()> {
235    ///     // Auto-detect hyperd location
236    ///     let hyper = HyperProcess::new(None, None)?;
237    ///
238    ///     // Or specify explicit path
239    ///     let hyper2 = HyperProcess::new(
240    ///         Some(Path::new("/path/to/hyperd")),
241    ///         None,
242    ///     )?;
243    ///     Ok(())
244    /// }
245    /// ```
246    pub fn new(hyper_path: Option<&Path>, parameters: Option<&Parameters>) -> Result<Self> {
247        let hyperd_path = match hyper_path {
248            Some(path) => path.to_path_buf(),
249            None => Self::find_hyperd()?,
250        };
251        Self::start_server(&hyperd_path, parameters)
252    }
253
254    /// Resolves the hyperd executable from the `HYPERD_PATH` environment
255    /// variable. The value can point at the executable directly, or at a
256    /// directory containing it.
257    ///
258    /// If `HYPERD_PATH` is unset, returns an error instructing the caller
259    /// to either set it or run the `hyperd-bootstrap` downloader to
260    /// install a pinned release at `.hyperd/current/hyperd`.
261    fn find_hyperd() -> Result<PathBuf> {
262        #[cfg(windows)]
263        const HYPERD_EXE: &str = "hyperd.exe";
264        #[cfg(not(windows))]
265        const HYPERD_EXE: &str = "hyperd";
266
267        let Ok(path_str) = std::env::var("HYPERD_PATH") else {
268            // Walk up from CWD looking for .hyperd/current/<exe> written by
269            // `hyperd-bootstrap download`. This lets `node examples/foo.mjs`
270            // run from any subdirectory of the repo without exporting HYPERD_PATH.
271            if let Ok(cwd) = std::env::current_dir() {
272                let mut dir = cwd.as_path();
273                loop {
274                    let candidate = dir.join(".hyperd").join("current").join(HYPERD_EXE);
275                    if candidate.exists() {
276                        return Ok(candidate);
277                    }
278                    match dir.parent() {
279                        Some(parent) => dir = parent,
280                        None => break,
281                    }
282                }
283            }
284            return Err(Error::config(
285                "HYPERD_PATH is not set. Point it at a hyperd executable, \
286                or run `make download-hyperd` (or `cargo run -p hyperd-bootstrap -- download`) \
287                to install a pinned release at `.hyperd/current/hyperd`.",
288            ));
289        };
290
291        let path = PathBuf::from(&path_str);
292        if path.is_dir() {
293            let with_exe = path.join(HYPERD_EXE);
294            if with_exe.exists() {
295                return Ok(with_exe);
296            }
297            #[cfg(windows)]
298            {
299                let without_exe = path.join("hyperd");
300                if without_exe.exists() {
301                    return Ok(without_exe);
302                }
303            }
304            return Err(Error::config(format!(
305                "HYPERD_PATH set to '{path_str}' but {HYPERD_EXE} not found in that directory"
306            )));
307        }
308        if path.exists() {
309            return Ok(path);
310        }
311        #[cfg(windows)]
312        {
313            let with_ext = PathBuf::from(format!("{path_str}.exe"));
314            if with_ext.exists() {
315                return Ok(with_ext);
316            }
317        }
318        Err(Error::config(format!(
319            "HYPERD_PATH set to '{}' but hyperd executable not found (checked: {})",
320            path_str,
321            path.display()
322        )))
323    }
324
325    /// Starts the hyperd server process with callback connection.
326    fn start_server(hyperd_path: &Path, parameters: Option<&Parameters>) -> Result<Self> {
327        // Verify hyperd exists
328        if !hyperd_path.exists() {
329            return Err(Error::config(format!(
330                "Hyper executable not found at: {}",
331                hyperd_path.display()
332            )));
333        }
334
335        info!(
336            target: "hyperdb_api",
337            path = %hyperd_path.display(),
338            "hyperd-starting"
339        );
340
341        // Create callback listener on ephemeral port
342        // This is the "dead man's switch" - when this connection is closed, Hyper shuts down
343        let callback_listener = TcpListener::bind("127.0.0.1:0")
344            .map_err(|e| Error::connection_with_io("Failed to create callback listener", e))?;
345
346        let callback_port = callback_listener
347            .local_addr()
348            .map_err(|e| Error::connection_with_io("Failed to get callback port", e))?
349            .port();
350
351        // Set a timeout for accepting the callback connection
352        callback_listener.set_nonblocking(false).map_err(|e| {
353            Error::connection_with_io("Failed to set callback listener to blocking", e)
354        })?;
355
356        // Check if user wants to disable default parameters
357        let use_defaults = parameters.map_or(true, |p| !p.contains_key(NO_DEFAULT_PARAMETERS));
358
359        // Get the listen mode
360        let listen_mode = parameters.and_then(|p| p.listen_mode).unwrap_or_default();
361
362        // Get transport mode (default to TCP until UDS performance is validated)
363        // See IPC_IMPLEMENTATION.md for details
364        #[cfg(unix)]
365        let transport_mode = parameters
366            .and_then(|p| p.transport_mode)
367            .unwrap_or(TransportMode::Tcp);
368        #[cfg(windows)]
369        let transport_mode = parameters
370            .and_then(|p| p.transport_mode)
371            .unwrap_or(TransportMode::Tcp);
372        #[cfg(not(any(unix, windows)))]
373        let transport_mode = TransportMode::Tcp;
374
375        // Create socket directory for UDS if needed (Unix only)
376        #[cfg(unix)]
377        let socket_directory: Option<PathBuf> = if transport_mode == TransportMode::Ipc {
378            // Use custom directory if provided, otherwise create temp directory
379            let dir = if let Some(custom_dir) =
380                parameters.and_then(|p| p.domain_socket_directory.as_ref())
381            {
382                custom_dir.clone()
383            } else {
384                // Create a temp directory for the socket
385                let temp_dir = std::env::temp_dir().join(format!("hyper-{}", std::process::id()));
386                std::fs::create_dir_all(&temp_dir).map_err(|e| {
387                    Error::connection_with_io("Failed to create socket directory", e)
388                })?;
389                temp_dir
390            };
391            Some(dir)
392        } else {
393            None
394        };
395
396        // On non-Unix platforms there is no UDS socket directory; the variable
397        // is only referenced inside `#[cfg(unix)]` blocks so we do not need a
398        // placeholder binding here.
399
400        // Create pipe name for Named Pipes if needed (Windows only)
401        #[cfg(windows)]
402        let pipe_name: Option<String> = if transport_mode == TransportMode::Ipc {
403            Some(format!("hyper-{}", std::process::id()))
404        } else {
405            None
406        };
407
408        // Build command arguments
409        let mut cmd = Command::new(hyperd_path);
410
411        // The "run" subcommand starts the server
412        cmd.arg("run");
413
414        // Callback connection - Hyper will connect to this and send its endpoint
415        // When this connection is closed, Hyper will shut down gracefully
416        cmd.arg("--callback-connection")
417            .arg(format!("tab.tcp://127.0.0.1:{callback_port}"));
418
419        // Configure listen connection based on mode and transport
420        // Connection string formats:
421        // - tab.tcp://host:port - libpq over TCP
422        // - tab.domain://<dir>/domain/<name> - libpq over Unix Domain Socket
423        // - tcp.grpc://host:port - gRPC
424        #[cfg(unix)]
425        let listen_connection = if transport_mode == TransportMode::Ipc {
426            let socket_dir = socket_directory.as_ref().unwrap();
427            match listen_mode {
428                ListenMode::LibPq => format!("tab.domain://{}/domain/hyper", socket_dir.display()),
429                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
430                ListenMode::Both { grpc_port } => {
431                    format!(
432                        "tab.domain://{}/domain/hyper,tcp.grpc://127.0.0.1:{}",
433                        socket_dir.display(),
434                        grpc_port
435                    )
436                }
437            }
438        } else {
439            match listen_mode {
440                ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
441                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
442                ListenMode::Both { grpc_port } => {
443                    format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
444                }
445            }
446        };
447
448        #[cfg(windows)]
449        let listen_connection = if transport_mode == TransportMode::Ipc {
450            let pname = pipe_name.as_ref().unwrap();
451            match listen_mode {
452                ListenMode::LibPq => format!("tab.pipe://./pipe/{pname}"),
453                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
454                ListenMode::Both { grpc_port } => {
455                    format!("tab.pipe://./pipe/{pname},tcp.grpc://127.0.0.1:{grpc_port}")
456                }
457            }
458        } else {
459            match listen_mode {
460                ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
461                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
462                ListenMode::Both { grpc_port } => {
463                    format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
464                }
465            }
466        };
467
468        #[cfg(not(any(unix, windows)))]
469        let listen_connection = match listen_mode {
470            ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
471            ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{}", port),
472            ListenMode::Both { grpc_port } => {
473                format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{}", grpc_port)
474            }
475        };
476
477        cmd.arg("--listen-connection").arg(&listen_connection);
478
479        // Helper to check if a parameter is already set by the user
480        let user_has_param =
481            |key: &str| -> bool { parameters.is_some_and(|p| p.contains_key(key)) };
482
483        // Apply default instance parameters (matching C++ HyperProcess behavior)
484        // These can be overridden by user parameters or disabled entirely with NO_DEFAULT_PARAMETERS
485        if use_defaults {
486            // Initial user for the Hyper instance
487            if !user_has_param("init_user") {
488                cmd.arg("--init-user=tableau_internal_user");
489            }
490
491            // Enable gRPC threads if gRPC mode is enabled
492            // Required for gRPC to function - without this, Hyper will fail to start with:
493            // "gRPC threads are required for running gRPC services"
494            // Using 4 threads as a reasonable default (can be overridden by user)
495            if matches!(
496                listen_mode,
497                ListenMode::Grpc { .. } | ListenMode::Both { .. }
498            ) && !user_has_param("grpc_threads")
499            {
500                cmd.arg("--grpc-threads=4");
501            }
502
503            // Enable gRPC result persistence if gRPC mode is enabled
504            // This is required for ADAPTIVE and ASYNC transfer modes
505            if matches!(
506                listen_mode,
507                ListenMode::Grpc { .. } | ListenMode::Both { .. }
508            ) && !user_has_param("grpc_persist_results")
509            {
510                cmd.arg("--grpc-persist-results=true");
511            }
512
513            // Default language setting
514            if !user_has_param("language") {
515                cmd.arg("--language=en_US");
516            }
517
518            // Log configuration: file-based JSON logging
519            if !user_has_param("log_config") {
520                cmd.arg(format!("--log-config={DEFAULT_LOG_CONFIG}"));
521            }
522
523            // Date style for date parsing (Month-Day-Year)
524            if !user_has_param("date_style") {
525                cmd.arg("--date-style=MDY");
526            }
527
528            // Enforce strict date_style (day/month/year ordering must match exactly)
529            if !user_has_param("date_style_lenient") {
530                cmd.arg("--date-style-lenient=false");
531            }
532
533            // Set default log directory to current directory
534            if !user_has_param("log_dir") {
535                if let Ok(cwd) = std::env::current_dir() {
536                    cmd.arg(format!("--log-dir={}", cwd.display()));
537                }
538            }
539
540            // Disable password requirement for local development
541            if !user_has_param("no_password") {
542                cmd.arg("--no-password");
543            }
544
545            // Skip license check for local development
546            if !user_has_param("skip_license") {
547                cmd.arg("--skip-license");
548            }
549
550            // Default new .hyper databases to file format version 3, which
551            // adds support for 128-bit NUMERICs (required to ingest parquet
552            // files whose decimal columns are stored as DECIMAL128).
553            // File format 3 has shipped since Hyper 2022.4.0.
554            if !user_has_param("default_database_version") {
555                cmd.arg("--default-database-version=3");
556            }
557        }
558
559        // Add custom parameters from user
560        if let Some(params) = parameters {
561            for (key, value) in params.iter() {
562                // Skip internal/special parameters
563                if key == "callback_connection"
564                    || key == "listen_connection"
565                    || key == NO_DEFAULT_PARAMETERS
566                {
567                    continue;
568                }
569
570                // Convert underscores to dashes for command-line arguments
571                let cli_key = key.replace('_', "-");
572
573                if value.is_empty() {
574                    cmd.arg(format!("--{cli_key}"));
575                } else {
576                    cmd.arg(format!("--{cli_key}={value}"));
577                }
578            }
579        }
580
581        // Resolve the effective log directory for later access via log_dir()
582        let resolved_log_dir =
583            if let Some(user_dir) = parameters.and_then(|p| p.get("log_dir")).map(PathBuf::from) {
584                Some(user_dir)
585            } else if use_defaults {
586                std::env::current_dir().ok()
587            } else {
588                None
589            };
590
591        // Redirect stdout/stderr to null - we get the endpoint via callback connection
592        cmd.stdout(Stdio::null());
593        cmd.stderr(Stdio::null());
594
595        // On Unix, start hyperd in its own process group so it doesn't receive
596        // Ctrl-C (SIGINT) signals meant for the parent process. This allows the
597        // parent to handle Ctrl-C gracefully and properly shut down hyperd via
598        // the callback connection mechanism.
599        #[cfg(unix)]
600        cmd.process_group(0);
601
602        // On Windows, prevent a console window from flashing when spawning hyperd.
603        // CREATE_NO_WINDOW (0x08000000) suppresses the creation of a visible console.
604        #[cfg(windows)]
605        {
606            use std::os::windows::process::CommandExt;
607            const CREATE_NO_WINDOW: u32 = 0x08000000;
608            cmd.creation_flags(CREATE_NO_WINDOW);
609        }
610
611        // Start the process
612        let child = cmd.spawn().map_err(|e| {
613            Error::connection_with_io(
614                format!("Failed to start Hyper server at {}", hyperd_path.display()),
615                e,
616            )
617        })?;
618
619        // Wait for Hyper to connect back to our callback listener
620        let (callback_connection, callback_endpoint) = Self::wait_for_callback(&callback_listener)?;
621
622        // Determine the endpoints based on listen mode and callback response
623        let (endpoint, grpc_endpoint) = match listen_mode {
624            ListenMode::LibPq => {
625                // Callback returns libpq endpoint
626                (Some(callback_endpoint), None)
627            }
628            ListenMode::Grpc { port } => {
629                // Callback returns gRPC endpoint (with resolved port if auto-assigned)
630                let grpc_ep = if port == 0 {
631                    // The callback returns the actual gRPC endpoint with resolved port
632                    callback_endpoint
633                } else {
634                    format!("127.0.0.1:{port}")
635                };
636                (None, Some(grpc_ep))
637            }
638            ListenMode::Both { grpc_port } => {
639                // Callback returns libpq endpoint, gRPC uses specified port
640                (
641                    Some(callback_endpoint),
642                    Some(format!("127.0.0.1:{grpc_port}")),
643                )
644            }
645        };
646
647        // Parse connection endpoint if we have a libpq endpoint
648        let connection_endpoint = endpoint.as_ref().map(|ep| {
649            #[cfg(unix)]
650            {
651                // Check if it's a UDS path (contains path separator but no colon with port)
652                if ep.starts_with('/') || socket_directory.is_some() {
653                    // UDS endpoint - construct from socket directory
654                    if let Some(ref dir) = socket_directory {
655                        return ConnectionEndpoint::domain_socket(dir, "hyper");
656                    }
657                    // Parse as path
658                    let path = std::path::Path::new(ep);
659                    let dir = path.parent().unwrap_or(std::path::Path::new("/"));
660                    let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("hyper");
661                    return ConnectionEndpoint::domain_socket(dir, name);
662                }
663            }
664            #[cfg(windows)]
665            {
666                // Check if it's a Named Pipe endpoint
667                if pipe_name.is_some() {
668                    if let Some(ref pname) = pipe_name {
669                        return ConnectionEndpoint::named_pipe(".", pname);
670                    }
671                }
672            }
673            // TCP endpoint (host:port format)
674            let parts: Vec<&str> = ep.split(':').collect();
675            if parts.len() == 2 {
676                if let Ok(port) = parts[1].parse::<u16>() {
677                    return ConnectionEndpoint::tcp(parts[0], port);
678                }
679            }
680            ConnectionEndpoint::tcp("localhost", 7483) // fallback
681        });
682
683        Ok(HyperProcess {
684            child: Some(child),
685            endpoint,
686            connection_endpoint,
687            grpc_endpoint,
688            hyperd_path: hyperd_path.to_path_buf(),
689            shutdown_initiated: Arc::new(AtomicBool::new(false)),
690            callback_connection: Some(callback_connection),
691            listen_mode,
692            transport_mode,
693            log_dir: resolved_log_dir,
694            #[cfg(unix)]
695            socket_directory,
696            #[cfg(windows)]
697            pipe_name,
698        })
699    }
700
701    /// Waits for Hyper to connect to our callback listener and send its endpoint.
702    ///
703    /// Protocol:
704    /// - Hyper connects to the callback listener
705    /// - Hyper sends: [1 byte length][N bytes connection descriptor string]
706    /// - Connection descriptor format: "tab.tcp://host:port"
707    fn wait_for_callback(listener: &TcpListener) -> Result<(TcpStream, String)> {
708        // Set a timeout for accepting connections
709        listener.set_nonblocking(true).ok();
710
711        let timeout = Duration::from_secs(60);
712        let start = std::time::Instant::now();
713
714        // Poll for incoming connection with timeout
715        let mut stream = loop {
716            if start.elapsed() > timeout {
717                return Err(Error::internal(
718                    "Timeout waiting for Hyper to connect to callback listener. \
719                    Hyper may have failed to start - check hyperd logs for details.",
720                ));
721            }
722
723            match listener.accept() {
724                Ok((stream, _addr)) => break stream,
725                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
726                    thread::sleep(Duration::from_millis(50));
727                }
728                Err(e) => {
729                    return Err(Error::connection_with_io(
730                        "Failed to accept callback connection",
731                        e,
732                    ));
733                }
734            }
735        };
736
737        // Set stream back to blocking for reading
738        stream.set_nonblocking(false).map_err(|e| {
739            Error::connection_with_io("Failed to set callback stream to blocking", e)
740        })?;
741
742        // Set read timeout
743        stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
744
745        // Read the endpoint descriptor from Hyper
746        // Protocol: [1 byte length][N bytes descriptor string]
747        let mut len_buf = [0u8; 1];
748        stream.read_exact(&mut len_buf).map_err(|e| {
749            Error::connection_with_io("Failed to read endpoint length from Hyper", e)
750        })?;
751
752        let len = len_buf[0] as usize;
753        if len == 0 {
754            return Err(Error::internal("Hyper sent empty endpoint descriptor"));
755        }
756
757        let mut descriptor_buf = vec![0u8; len];
758        stream.read_exact(&mut descriptor_buf).map_err(|e| {
759            Error::connection_with_io("Failed to read endpoint descriptor from Hyper", e)
760        })?;
761
762        let descriptor = String::from_utf8(descriptor_buf)
763            .map_err(|e| Error::internal(format!("Invalid UTF-8 in endpoint descriptor: {e}")))?;
764
765        // Trim null bytes and whitespace that Hyper may include
766        let descriptor = descriptor.trim_matches(|c: char| c == '\0' || c.is_whitespace());
767
768        // Parse the connection descriptor (format: "tab.tcp://host:port")
769        let endpoint = Self::parse_connection_descriptor(descriptor)?;
770
771        // Clear read timeout for the connection we'll keep open
772        stream.set_read_timeout(None).ok();
773
774        info!(
775            target: "hyperdb_api",
776            %endpoint,
777            "hyperd-started"
778        );
779
780        Ok((stream, endpoint))
781    }
782
783    /// Parses a connection descriptor to extract host:port, socket path, or pipe path.
784    ///
785    /// Input formats:
786    /// - "tab.tcp://host:port" → "host:port"
787    /// - "tab.domain://<dir>/domain/<name>" → "<dir>/domain/<name>" (socket path)
788    /// - "tab.pipe://<host>/pipe/<name>" → "<host>/pipe/<name>" (named pipe)
789    /// - "tcp.grpc://host:port" → "host:port"
790    fn parse_connection_descriptor(descriptor: &str) -> Result<String> {
791        // Handle domain socket format
792        if let Some(rest) = descriptor.strip_prefix("tab.domain://") {
793            // Return the full path for UDS
794            if let Some(idx) = rest.find("/domain/") {
795                let dir = &rest[..idx];
796                let name = &rest[idx + 8..]; // "/domain/".len() == 8
797                let socket_path = format!("{dir}/domain/{name}");
798                return Ok(socket_path);
799            }
800            return Ok(rest.to_string());
801        }
802
803        // Handle named pipe format
804        if let Some(rest) = descriptor.strip_prefix("tab.pipe://") {
805            // Format: tab.pipe://<host>/pipe/<name>
806            // Return as pipe path: \\<host>\pipe\<name>
807            if let Some(idx) = rest.find("/pipe/") {
808                let host = &rest[..idx];
809                let name = &rest[idx + 6..]; // "/pipe/".len() == 6
810                let pipe_path = format!(r"\\{host}\pipe\{name}");
811                return Ok(pipe_path);
812            }
813            return Ok(rest.to_string());
814        }
815
816        // Handle TCP prefixes
817        let without_prefix = descriptor
818            .strip_prefix("tab.tcp://")
819            .or_else(|| descriptor.strip_prefix("tcp.grpc://"))
820            .or_else(|| descriptor.strip_prefix("tcp.grpctls://"))
821            .or_else(|| descriptor.strip_prefix("tcp://"))
822            .unwrap_or(descriptor);
823
824        // Validate it looks like host:port
825        if without_prefix.contains(':') && !without_prefix.is_empty() {
826            Ok(without_prefix.to_string())
827        } else {
828            Err(Error::internal(format!(
829                "Invalid connection descriptor format: '{descriptor}'. Expected '<scheme>://host:port' or 'tab.domain://<dir>/domain/<name>'"
830            )))
831        }
832    }
833
834    /// Returns the libpq endpoint for connecting to this instance.
835    ///
836    /// The endpoint is in the format "host:port" (e.g., "localhost:54321").
837    ///
838    /// Returns `None` if the process was started in gRPC-only mode.
839    /// Use [`grpc_endpoint`](Self::grpc_endpoint) for gRPC connections.
840    #[must_use]
841    pub fn endpoint(&self) -> Option<&str> {
842        self.endpoint.as_deref()
843    }
844
845    /// Returns the libpq endpoint, or an error if not available.
846    ///
847    /// This is a convenience method to avoid `unwrap()` calls when you need
848    /// the endpoint and want proper error handling.
849    ///
850    /// # Errors
851    ///
852    /// Returns an error if this process was started in gRPC-only mode.
853    ///
854    /// # Example
855    ///
856    /// ```no_run
857    /// use hyperdb_api::{HyperProcess, Result};
858    ///
859    /// fn main() -> Result<()> {
860    ///     let hyper = HyperProcess::new(None, None)?;
861    ///     let endpoint = hyper.require_endpoint()?; // No unwrap() needed!
862    ///     println!("Server running at: {}", endpoint);
863    ///     Ok(())
864    /// }
865    /// ```
866    pub fn require_endpoint(&self) -> crate::error::Result<&str> {
867        self.endpoint().ok_or_else(|| {
868            crate::error::Error::internal(
869                "HyperProcess does not have a libpq endpoint (gRPC-only mode). \
870                 Use grpc_endpoint() instead or start with LibPq or Both listen mode.",
871            )
872        })
873    }
874
875    /// Returns the gRPC endpoint for connecting to this instance.
876    ///
877    /// The endpoint is in the format "host:port" (e.g., "127.0.0.1:7484").
878    ///
879    /// Returns `None` if the process was started in libpq-only mode.
880    #[must_use]
881    pub fn grpc_endpoint(&self) -> Option<&str> {
882        self.grpc_endpoint.as_deref()
883    }
884
885    /// Returns the gRPC endpoint, or an error if not available.
886    ///
887    /// This is a convenience method to avoid `unwrap()` calls when you need
888    /// the gRPC endpoint and want proper error handling.
889    ///
890    /// # Errors
891    ///
892    /// Returns an error if this process was started in libpq-only mode.
893    pub fn require_grpc_endpoint(&self) -> crate::error::Result<&str> {
894        self.grpc_endpoint().ok_or_else(|| {
895            crate::error::Error::internal(
896                "HyperProcess does not have a gRPC endpoint (libpq-only mode). \
897                 Use endpoint() instead or start with Grpc or Both listen mode.",
898            )
899        })
900    }
901
902    /// Returns the gRPC endpoint as a full URL suitable for gRPC clients.
903    ///
904    /// Returns the endpoint prefixed with "http://" (e.g., "<http://127.0.0.1:7484>").
905    /// Returns `None` if the process was started in libpq-only mode.
906    #[must_use]
907    pub fn grpc_url(&self) -> Option<String> {
908        self.grpc_endpoint.as_ref().map(|ep| format!("http://{ep}"))
909    }
910
911    /// Returns the gRPC URL, or an error if not available.
912    ///
913    /// This is a convenience method to avoid `unwrap()` calls when you need
914    /// the gRPC URL and want proper error handling.
915    ///
916    /// # Errors
917    ///
918    /// Returns an error if this process was started in libpq-only mode.
919    pub fn require_grpc_url(&self) -> crate::error::Result<String> {
920        Ok(format!("http://{}", self.require_grpc_endpoint()?))
921    }
922
923    /// Returns the listen mode this process was started with.
924    #[must_use]
925    pub fn listen_mode(&self) -> ListenMode {
926        self.listen_mode
927    }
928
929    /// Returns the transport mode this process was started with.
930    #[must_use]
931    pub fn transport_mode(&self) -> TransportMode {
932        self.transport_mode
933    }
934
935    /// Returns the connection endpoint for this process.
936    ///
937    /// This returns a [`ConnectionEndpoint`] that can be used to connect
938    /// to this Hyper instance via TCP, Unix Domain Socket, or Named Pipe.
939    #[must_use]
940    pub fn connection_endpoint(&self) -> Option<&ConnectionEndpoint> {
941        self.connection_endpoint.as_ref()
942    }
943
944    /// Returns the log directory where hyperd writes its log files.
945    ///
946    /// The log file is typically `hyperd.log` within this directory.
947    /// Returns `None` if the log directory could not be determined (e.g.,
948    /// default parameters were disabled and no `log_dir` was specified).
949    ///
950    /// This is useful for setting up [`QueryStatsProvider`](crate::QueryStatsProvider)
951    /// implementations that parse the Hyper log file.
952    #[must_use]
953    pub fn log_dir(&self) -> Option<&Path> {
954        self.log_dir.as_deref()
955    }
956
957    /// Returns the socket directory used for UDS connections (Unix only).
958    ///
959    /// Returns `None` if the process is using TCP or if no socket directory was created.
960    #[cfg(unix)]
961    #[must_use]
962    pub fn socket_directory(&self) -> Option<&Path> {
963        self.socket_directory.as_deref()
964    }
965
966    /// Returns the pipe name used for Named Pipe connections (Windows only).
967    ///
968    /// Returns `None` if the process is using TCP.
969    #[cfg(windows)]
970    pub fn pipe_name(&self) -> Option<&str> {
971        self.pipe_name.as_deref()
972    }
973
974    /// Returns the process ID of the Hyper server.
975    #[must_use]
976    pub fn pid(&self) -> Option<u32> {
977        self.child.as_ref().map(std::process::Child::id)
978    }
979
980    /// Returns whether the Hyper server process is still running.
981    #[must_use]
982    pub fn is_running(&self) -> bool {
983        if let Some(ref child) = self.child {
984            // Try to check if process is alive without waiting
985            #[cfg(unix)]
986            {
987                match Command::new("kill")
988                    .args(["-0", &child.id().to_string()])
989                    .output()
990                {
991                    Ok(output) => output.status.success(),
992                    Err(_) => false,
993                }
994            }
995            #[cfg(not(unix))]
996            {
997                // On Windows, we can't easily check without waiting
998                // Assume it's running if we have a handle
999                let _ = child; // Silence unused variable warning
1000                true
1001            }
1002        } else {
1003            false
1004        }
1005    }
1006
1007    /// Returns true if the hyperd child process has exited (or no child exists).
1008    ///
1009    /// Uses [`std::process::Child::try_wait`] under the hood, which is correct
1010    /// on both Unix and Windows. On Unix this also reaps any zombie state as a
1011    /// side effect — a hyperd that has been SIGKILLed but not yet `wait()`ed
1012    /// on by the parent will be observed as exited and cleaned up here.
1013    ///
1014    /// Prefer this over [`Self::is_running`] when the caller owns the
1015    /// `HyperProcess` mutably and needs an authoritative liveness signal.
1016    /// `is_running` uses `kill -0` on Unix (which incorrectly reports zombies
1017    /// as alive) and is a no-op on Windows.
1018    pub fn has_exited(&mut self) -> bool {
1019        match self.child.as_mut() {
1020            Some(child) => match child.try_wait() {
1021                Ok(Some(_status)) => true,
1022                Ok(None) => false,
1023                Err(_) => true,
1024            },
1025            None => true,
1026        }
1027    }
1028
1029    /// Shuts down the Hyper server gracefully with a timeout.
1030    ///
1031    /// This closes the callback connection, which signals Hyper to shut down gracefully.
1032    /// If Hyper doesn't exit within the timeout, it will be forcefully terminated.
1033    ///
1034    /// # Arguments
1035    ///
1036    /// * `timeout` - Maximum time to wait for graceful shutdown before force-killing.
1037    ///
1038    /// # Errors
1039    ///
1040    /// Returns an error if the shutdown fails.
1041    pub fn shutdown_timeout(mut self, timeout: Duration) -> Result<()> {
1042        self.shutdown_initiated.store(true, Ordering::SeqCst);
1043        self.do_shutdown(Some(timeout))
1044    }
1045
1046    /// Shuts down the Hyper server gracefully, waiting indefinitely.
1047    ///
1048    /// This closes the callback connection and waits for Hyper to exit.
1049    /// Use [`shutdown_timeout`](Self::shutdown_timeout) if you need a timeout.
1050    ///
1051    /// # Errors
1052    ///
1053    /// Returns an error if the shutdown fails.
1054    pub fn shutdown_graceful(mut self) -> Result<()> {
1055        self.shutdown_initiated.store(true, Ordering::SeqCst);
1056        self.do_shutdown(None)
1057    }
1058
1059    /// Closes the callback connection to signal Hyper to shut down.
1060    ///
1061    /// This is the graceful shutdown mechanism - Hyper monitors the callback connection
1062    /// and will initiate shutdown when it's closed.
1063    fn close_callback_connection(&mut self) {
1064        if let Some(conn) = self.callback_connection.take() {
1065            // Gracefully shutdown both directions
1066            let _ = conn.shutdown(Shutdown::Both);
1067            // Connection is dropped here, closing the socket
1068        }
1069    }
1070
1071    /// Internal shutdown implementation.
1072    fn do_shutdown(&mut self, timeout: Option<Duration>) -> Result<()> {
1073        info!(target: "hyperdb_api", "hyperd-shutdown");
1074
1075        // Step 1: Close the callback connection to signal graceful shutdown
1076        // Hyper will detect this and begin shutting down
1077        self.close_callback_connection();
1078
1079        if let Some(mut child) = self.child.take() {
1080            // Step 2: Wait for the process to exit
1081            let wait_result = if let Some(timeout) = timeout {
1082                // Wait with timeout
1083                let start = std::time::Instant::now();
1084                loop {
1085                    match child.try_wait() {
1086                        Ok(Some(status)) => break Ok(status),
1087                        Ok(None) => {
1088                            if start.elapsed() > timeout {
1089                                // Step 3: Force kill ONLY after timeout
1090                                // This should rarely happen if Hyper is healthy
1091                                #[cfg(unix)]
1092                                {
1093                                    // Try SIGTERM first
1094                                    let _ = Command::new("kill")
1095                                        .args(["-TERM", &child.id().to_string()])
1096                                        .output();
1097                                    thread::sleep(Duration::from_millis(100));
1098                                }
1099                                // Then force kill
1100                                let _ = child.kill();
1101                                break child.wait().map_err(|e| {
1102                                    Error::connection_with_io("Failed to wait for hyperd", e)
1103                                });
1104                            }
1105                            thread::sleep(Duration::from_millis(100));
1106                        }
1107                        Err(e) => {
1108                            break Err(Error::connection_with_io("Failed to wait for hyperd", e))
1109                        }
1110                    }
1111                }
1112            } else {
1113                // Wait indefinitely
1114                child
1115                    .wait()
1116                    .map_err(|e| Error::connection_with_io("Failed to wait for hyperd", e))
1117            };
1118
1119            wait_result?;
1120        }
1121
1122        Ok(())
1123    }
1124}
1125
1126impl Drop for HyperProcess {
1127    fn drop(&mut self) {
1128        if !self.shutdown_initiated.load(Ordering::SeqCst) {
1129            // Try to gracefully shutdown with a short timeout
1130            let _ = self.do_shutdown(Some(Duration::from_secs(5)));
1131        }
1132
1133        // Clean up socket directory if we created one
1134        #[cfg(unix)]
1135        if let Some(ref dir) = self.socket_directory {
1136            // Only clean up if it's a temp directory we created (contains our PID)
1137            let dir_name = dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
1138            if dir_name.starts_with("hyper-") {
1139                let _ = std::fs::remove_dir_all(dir);
1140            }
1141        }
1142    }
1143}
1144
1145// SAFETY: `HyperProcess` owns its `std::process::Child` handle and (optionally)
1146// a TCP connection. Both are themselves `Send`, and no field holds thread-local
1147// state or a non-`Send` raw pointer. Ownership of a `HyperProcess` therefore
1148// transfers cleanly across thread boundaries.
1149unsafe impl Send for HyperProcess {}
1150
1151/// Special parameter key that disables default instance parameters when present.
1152///
1153/// By default, [`HyperProcess`] starts hyperd with a set of sensible default parameters
1154/// (matching the C++ `HyperProcess` behavior). If you need full control over all parameters,
1155/// include this key in your [`Parameters`] to disable all defaults.
1156pub(crate) const NO_DEFAULT_PARAMETERS: &str = "no_default_parameters";
1157
1158/// Default log configuration for hyperd: file-based JSON logging.
1159const DEFAULT_LOG_CONFIG: &str = "file,json,all,hyperd,0";
1160
1161/// Parameters for configuring the Hyper server.
1162///
1163/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1164/// (matching the C++ `HyperProcess` behavior):
1165///
1166/// | Parameter | Default Value | Description |
1167/// |-----------|---------------|-------------|
1168/// | `init_user` | `tableau_internal_user` | Initial user for the Hyper instance |
1169/// | `language` | `en_US` | Default language setting |
1170/// | `log_config` | `file,json,all,hyperd,0` | Log configuration |
1171/// | `date_style` | `MDY` | Date format (Month-Day-Year) |
1172/// | `date_style_lenient` | `false` | Strict date parsing |
1173/// | `log_dir` | Current directory | Log file directory |
1174/// | `no_password` | (flag) | Disable password requirement |
1175/// | `skip_license` | (flag) | Skip license check |
1176/// | `default_database_version` | `3` | File format version for newly created `.hyper` databases (v3 adds 128-bit NUMERIC support, required for DECIMAL128 parquet columns) |
1177///
1178/// To disable these defaults, add the `no_default_parameters` key (for example
1179/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1180///
1181/// # Listen Modes
1182///
1183/// Use [`set_listen_mode`](Parameters::set_listen_mode) to configure which protocols Hyper listens on:
1184///
1185/// ```
1186/// use hyperdb_api::{ListenMode, Parameters};
1187///
1188/// // gRPC only (for Arrow-based queries)
1189/// let mut params = Parameters::new();
1190/// params.set_listen_mode(ListenMode::Grpc { port: 0 });
1191///
1192/// // Both libpq and gRPC
1193/// let mut params = Parameters::new();
1194/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
1195/// ```
1196///
1197/// # Example
1198///
1199/// ```
1200/// use hyperdb_api::Parameters;
1201///
1202/// let mut params = Parameters::new();
1203/// params.set("log_file_size_limit", "100k");
1204/// params.set("log_file_max_count", "7");
1205/// ```
1206///
1207/// # Transport Modes
1208///
1209/// Use [`set_transport_mode`](Parameters::set_transport_mode) to control whether Hyper uses
1210/// TCP or IPC (Unix Domain Sockets):
1211///
1212/// ```
1213/// use hyperdb_api::{Parameters, TransportMode};
1214///
1215/// let mut params = Parameters::new();
1216/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1217/// ```
1218///
1219/// Transport mode for `HyperProcess` connections.
1220///
1221/// Controls whether the server uses TCP or Unix Domain Sockets (IPC) for connections.
1222/// On Unix systems, IPC is the default for better local performance.
1223/// On Windows, TCP is always used.
1224#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1225pub enum TransportMode {
1226    /// Use IPC (Unix Domain Sockets on Unix, Named Pipes on Windows).
1227    /// This is the default mode and provides better performance for local connections.
1228    #[default]
1229    Ipc,
1230
1231    /// Use TCP/IP connections.
1232    /// Required when connecting from remote clients or when IPC is not available.
1233    Tcp,
1234}
1235
1236/// Parameters for configuring the Hyper server.
1237///
1238/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1239/// (matching the C++ `HyperProcess` behavior). You can override these defaults or disable
1240/// them entirely by adding the `no_default_parameters` key (for example
1241/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1242///
1243/// # Transport Modes
1244///
1245/// Use [`set_transport_mode`](Self::set_transport_mode) to control whether Hyper uses
1246/// TCP or IPC (Unix Domain Sockets on Unix systems).
1247///
1248/// # Example
1249///
1250/// ```
1251/// use hyperdb_api::{Parameters, TransportMode};
1252///
1253/// let mut params = Parameters::new();
1254/// params.set("log_file_size_limit", "100k");
1255/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1256/// ```
1257#[derive(Debug, Clone, Default)]
1258pub struct Parameters {
1259    values: Vec<(String, String)>,
1260    /// The listen mode for the Hyper server.
1261    pub(crate) listen_mode: Option<ListenMode>,
1262    /// The transport mode (TCP or IPC/UDS).
1263    pub(crate) transport_mode: Option<TransportMode>,
1264    /// Custom domain socket directory (Unix only).
1265    #[cfg(unix)]
1266    pub(crate) domain_socket_directory: Option<PathBuf>,
1267}
1268
1269impl Parameters {
1270    /// Creates a new empty Parameters instance.
1271    #[must_use]
1272    pub fn new() -> Self {
1273        Parameters {
1274            values: Vec::new(),
1275            listen_mode: None,
1276            transport_mode: None,
1277            #[cfg(unix)]
1278            domain_socket_directory: None,
1279        }
1280    }
1281
1282    /// Sets the transport mode (TCP or IPC/UDS).
1283    ///
1284    /// By default, `HyperProcess` uses IPC (Unix Domain Sockets on Unix) for better
1285    /// performance. Use `TransportMode::Tcp` if you need TCP connections.
1286    ///
1287    /// # Example
1288    ///
1289    /// ```
1290    /// use hyperdb_api::{Parameters, TransportMode};
1291    ///
1292    /// let mut params = Parameters::new();
1293    /// params.set_transport_mode(TransportMode::Tcp); // Use TCP instead of IPC
1294    /// ```
1295    pub fn set_transport_mode(&mut self, mode: TransportMode) -> &mut Self {
1296        self.transport_mode = Some(mode);
1297        self
1298    }
1299
1300    /// Returns the configured transport mode.
1301    #[must_use]
1302    pub fn transport_mode(&self) -> Option<TransportMode> {
1303        self.transport_mode
1304    }
1305
1306    /// Sets a custom domain socket directory (Unix only).
1307    ///
1308    /// By default, `HyperProcess` creates sockets in a temporary directory.
1309    /// Use this to specify a custom location.
1310    #[cfg(unix)]
1311    pub fn set_domain_socket_directory(&mut self, dir: impl Into<PathBuf>) -> &mut Self {
1312        self.domain_socket_directory = Some(dir.into());
1313        self
1314    }
1315
1316    /// Returns the configured domain socket directory (Unix only).
1317    #[cfg(unix)]
1318    #[must_use]
1319    pub fn domain_socket_directory(&self) -> Option<&Path> {
1320        self.domain_socket_directory.as_deref()
1321    }
1322
1323    /// Sets a parameter value.
1324    ///
1325    /// # Arguments
1326    ///
1327    /// * `key` - The parameter name.
1328    /// * `value` - The parameter value (empty string for flags).
1329    pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
1330        let key = key.into();
1331        let value = value.into();
1332
1333        // Update existing or add new
1334        if let Some(entry) = self.values.iter_mut().find(|(k, _)| k == &key) {
1335            entry.1 = value;
1336        } else {
1337            self.values.push((key, value));
1338        }
1339
1340        self
1341    }
1342
1343    /// Sets the listen mode for the Hyper server.
1344    ///
1345    /// This controls which protocols the server listens on:
1346    /// - [`ListenMode::LibPq`]: `PostgreSQL` wire protocol only (default)
1347    /// - [`ListenMode::Grpc`]: gRPC protocol only (query-only, Arrow results)
1348    /// - [`ListenMode::Both`]: Both protocols enabled
1349    ///
1350    /// # Example
1351    ///
1352    /// ```
1353    /// use hyperdb_api::{ListenMode, Parameters};
1354    ///
1355    /// let mut params = Parameters::new();
1356    /// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // Auto-assign port
1357    /// ```
1358    pub fn set_listen_mode(&mut self, mode: ListenMode) -> &mut Self {
1359        self.listen_mode = Some(mode);
1360        self
1361    }
1362
1363    /// Returns the configured listen mode, if any.
1364    #[must_use]
1365    pub fn listen_mode(&self) -> Option<ListenMode> {
1366        self.listen_mode
1367    }
1368
1369    /// Gets a parameter value.
1370    #[must_use]
1371    pub fn get(&self, key: &str) -> Option<&str> {
1372        self.values
1373            .iter()
1374            .find(|(k, _)| k == key)
1375            .map(|(_, v)| v.as_str())
1376    }
1377
1378    /// Returns whether the parameters contain the given key.
1379    #[must_use]
1380    pub fn contains_key(&self, key: &str) -> bool {
1381        self.values.iter().any(|(k, _)| k == key)
1382    }
1383
1384    /// Returns an iterator over the parameters.
1385    pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
1386        self.values.iter().map(|(k, v)| (k.as_str(), v.as_str()))
1387    }
1388
1389    /// Returns whether the parameters are empty.
1390    #[must_use]
1391    pub fn is_empty(&self) -> bool {
1392        self.values.is_empty()
1393    }
1394
1395    /// Returns the number of parameters.
1396    #[must_use]
1397    pub fn len(&self) -> usize {
1398        self.values.len()
1399    }
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404    use super::*;
1405
1406    #[test]
1407    fn test_parameters() {
1408        let mut params = Parameters::new();
1409        params.set("key1", "value1");
1410        params.set("key2", "value2");
1411
1412        assert_eq!(params.get("key1"), Some("value1"));
1413        assert_eq!(params.get("key2"), Some("value2"));
1414        assert_eq!(params.get("key3"), None);
1415        assert_eq!(params.len(), 2);
1416    }
1417
1418    #[test]
1419    fn test_parameters_update() {
1420        let mut params = Parameters::new();
1421        params.set("key", "value1");
1422        params.set("key", "value2");
1423
1424        assert_eq!(params.get("key"), Some("value2"));
1425        assert_eq!(params.len(), 1);
1426    }
1427
1428    #[test]
1429    fn test_parse_connection_descriptor() {
1430        assert_eq!(
1431            HyperProcess::parse_connection_descriptor("tab.tcp://localhost:12345").unwrap(),
1432            "localhost:12345"
1433        );
1434        assert_eq!(
1435            HyperProcess::parse_connection_descriptor("tab.tcp://127.0.0.1:7483").unwrap(),
1436            "127.0.0.1:7483"
1437        );
1438        assert_eq!(
1439            HyperProcess::parse_connection_descriptor("tcp://localhost:8080").unwrap(),
1440            "localhost:8080"
1441        );
1442        // Already in host:port format
1443        assert_eq!(
1444            HyperProcess::parse_connection_descriptor("localhost:9999").unwrap(),
1445            "localhost:9999"
1446        );
1447    }
1448
1449    #[test]
1450    fn test_parse_connection_descriptor_named_pipe() {
1451        assert_eq!(
1452            HyperProcess::parse_connection_descriptor("tab.pipe://./pipe/hyper-12345").unwrap(),
1453            r"\\.\pipe\hyper-12345"
1454        );
1455        assert_eq!(
1456            HyperProcess::parse_connection_descriptor("tab.pipe://server1/pipe/mydb").unwrap(),
1457            r"\\server1\pipe\mydb"
1458        );
1459    }
1460
1461    #[test]
1462    fn test_parse_connection_descriptor_invalid() {
1463        assert!(HyperProcess::parse_connection_descriptor("").is_err());
1464        assert!(HyperProcess::parse_connection_descriptor("invalid").is_err());
1465    }
1466
1467    #[test]
1468    fn test_parameters_contains_key() {
1469        let mut params = Parameters::new();
1470        params.set("key1", "value1");
1471
1472        assert!(params.contains_key("key1"));
1473        assert!(!params.contains_key("key2"));
1474    }
1475
1476    #[test]
1477    fn test_no_default_parameters_constant() {
1478        // Verify the constant matches what C++ uses
1479        assert_eq!(NO_DEFAULT_PARAMETERS, "no_default_parameters");
1480    }
1481
1482    #[test]
1483    fn test_parameters_with_no_defaults() {
1484        let mut params = Parameters::new();
1485        params.set(NO_DEFAULT_PARAMETERS, "");
1486        params.set("init_user", "custom_user");
1487
1488        assert!(params.contains_key(NO_DEFAULT_PARAMETERS));
1489        assert_eq!(params.get("init_user"), Some("custom_user"));
1490    }
1491}