Skip to main content

hyperdb_api/
process.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Hyper server process management.
5//!
6//! This module provides [`HyperProcess`] for spawning and managing local hyperd server instances.
7//!
8//! # Callback Connection Architecture
9//!
10//! The `HyperProcess` uses a **callback connection** mechanism for reliable process lifecycle
11//! management. This works as follows:
12//!
13//! 1. **Startup**: The client creates a TCP listener on an ephemeral port (the "callback proxy")
14//!    and passes this address to hyperd via `--callback-connection`. Hyper connects back to this
15//!    listener and sends its actual listen endpoint over this connection.
16//!
17//! 2. **Runtime**: The callback connection remains open for the lifetime of the `HyperProcess`.
18//!    This connection acts as a "dead man's switch" - Hyper monitors it continuously.
19//!
20//! 3. **Graceful Shutdown**: When `HyperProcess` is dropped or explicitly shut down, the callback
21//!    connection is closed. Hyper detects this and initiates graceful shutdown automatically.
22//!
23//! 4. **Crash Safety**: If the client process crashes or is killed, the OS automatically closes
24//!    the TCP connection. Hyper detects this and shuts down gracefully, preventing orphan
25//!    processes. This is the key advantage over signal-based shutdown.
26//!
27//! # Protocol Details
28//!
29//! The callback connection protocol is simple:
30//! - Client listens on `127.0.0.1:<ephemeral_port>`
31//! - Client starts hyperd with `--callback-connection=tab.tcp://127.0.0.1:<port>`
32//! - Hyper connects to this address and sends: `[1 byte length][N bytes descriptor]`
33//! - The descriptor is the actual listen endpoint (e.g., `tab.tcp://localhost:54321`)
34//! - Connection stays open until shutdown is desired
35//!
36//! # Listen Modes
37//!
38//! `HyperProcess` supports different listen modes via [`ListenMode`]:
39//!
40//! - **`LibPq`** (default): `PostgreSQL` wire protocol for full read/write access
41//! - **Grpc**: gRPC protocol for query-only Arrow-based access
42//! - **Both**: Both protocols enabled (libpq for read/write, gRPC for Arrow queries)
43//!
44//! ```no_run
45//! use hyperdb_api::{HyperProcess, ListenMode, Parameters, Result};
46//!
47//! fn main() -> Result<()> {
48//!     // gRPC only
49//!     let mut params = Parameters::new();
50//!     params.set_listen_mode(ListenMode::Grpc { port: 0 });
51//!     let hyper = HyperProcess::new(None, Some(&params))?;
52//!     println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
53//!
54//!     // Both libpq and gRPC
55//!     let mut params = Parameters::new();
56//!     params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
57//!     let hyper = HyperProcess::new(None, Some(&params))?;
58//!     println!("libpq endpoint: {}", hyper.endpoint().unwrap());
59//!     println!("gRPC endpoint: {}", hyper.grpc_endpoint().unwrap());
60//!     Ok(())
61//! }
62//! ```
63
64use std::io::Read;
65use std::net::{Shutdown, TcpListener, TcpStream};
66use std::path::{Path, PathBuf};
67use std::process::{Child, Command, Stdio};
68use std::sync::atomic::{AtomicBool, Ordering};
69use std::sync::Arc;
70use std::thread;
71use std::time::Duration;
72
73#[cfg(unix)]
74use std::os::unix::process::CommandExt;
75
76#[cfg(any(unix, windows))]
77use hyperdb_api_core::client::ConnectionEndpoint;
78
79use tracing::info;
80
81use crate::error::{Error, Result};
82
83/// Specifies which protocols `HyperProcess` should listen on.
84///
85/// # Examples
86///
87/// ```
88/// use hyperdb_api::{ListenMode, Parameters};
89///
90/// // LibPq only (default) - for full read/write access
91/// let mut params = Parameters::new();
92/// params.set_listen_mode(ListenMode::LibPq);
93///
94/// // gRPC only - for Arrow-based query access
95/// let mut params = Parameters::new();
96/// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // auto-assign port
97///
98/// // Both protocols - libpq for writes, gRPC for Arrow queries
99/// let mut params = Parameters::new();
100/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
101/// ```
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
103pub enum ListenMode {
104    /// `PostgreSQL` wire protocol only (default).
105    ///
106    /// This is the traditional connection mode that supports all Hyper features
107    /// including read and write operations.
108    #[default]
109    LibPq,
110
111    /// gRPC protocol only.
112    ///
113    /// This mode is optimized for query-only workloads and returns results in
114    /// Arrow IPC format. Note that gRPC mode does not support write operations.
115    ///
116    /// Set `port` to 0 to auto-assign an available port.
117    Grpc {
118        /// The port to listen on (0 for auto-assign).
119        port: u16,
120    },
121
122    /// Both libpq and gRPC protocols.
123    ///
124    /// This mode enables full read/write access via libpq while also providing
125    /// gRPC access for Arrow-based queries. The libpq port is auto-assigned,
126    /// while the gRPC port is specified.
127    ///
128    /// Note: When using `Both` mode, the callback connection returns the libpq
129    /// endpoint. Use `HyperProcess::grpc_endpoint()` to get the gRPC endpoint.
130    Both {
131        /// The gRPC port to listen on (cannot be 0 - must be a specific port).
132        grpc_port: u16,
133    },
134}
135
136/// A running Hyper server instance.
137///
138/// This struct manages the lifecycle of a local Hyper server process using a callback
139/// connection for reliable shutdown. The server is automatically shut down when this
140/// object is dropped.
141///
142/// # Callback Connection (Dead Man's Switch)
143///
144/// Unlike traditional process management that relies on signals, `HyperProcess` maintains
145/// a TCP connection to the Hyper server. When this connection is closed (either explicitly
146/// or because the client process exits), Hyper automatically shuts down gracefully.
147///
148/// This provides several benefits:
149/// - **No orphan processes**: If your application crashes, Hyper shuts down automatically
150/// - **Graceful shutdown**: Hyper can flush data and clean up properly
151/// - **Cross-platform**: Works reliably on macOS, Linux, and Windows
152///
153/// # Example
154///
155/// ```no_run
156/// use hyperdb_api::{HyperProcess, Result};
157///
158/// fn main() -> Result<()> {
159///     // Start a Hyper server (auto-detect hyperd location)
160///     let hyper = HyperProcess::new(None, None)?;
161///
162///     println!("Hyper server running at: {}", hyper.endpoint().unwrap());
163///
164///     // Server automatically shuts down when `hyper` goes out of scope
165///     // via the callback connection mechanism
166///     Ok(())
167/// }
168/// ```
169#[must_use = "HyperProcess will shut down when dropped; store it to keep the server running"]
170#[derive(Debug)]
171pub struct HyperProcess {
172    /// The child process handle.
173    child: Option<Child>,
174    /// The libpq endpoint descriptor (host:port or socket path), if libpq is enabled.
175    endpoint: Option<String>,
176    /// The parsed connection endpoint for libpq.
177    connection_endpoint: Option<ConnectionEndpoint>,
178    /// The gRPC endpoint (host:port), if gRPC is enabled.
179    grpc_endpoint: Option<String>,
180    /// Path to the hyperd executable.
181    #[expect(
182        dead_code,
183        reason = "retained for diagnostics and future restart/respawn paths"
184    )]
185    hyperd_path: PathBuf,
186    /// Whether shutdown has been initiated.
187    shutdown_initiated: Arc<AtomicBool>,
188    /// The callback connection to hyperd.
189    /// Keeping this open maintains the "dead man's switch" - when dropped, Hyper shuts down.
190    callback_connection: Option<TcpStream>,
191    /// The listen mode this process was started with.
192    listen_mode: ListenMode,
193    /// The transport mode this process was started with.
194    transport_mode: TransportMode,
195    /// The socket directory for UDS connections (Unix only).
196    /// This directory is automatically cleaned up on drop.
197    #[cfg(unix)]
198    socket_directory: Option<PathBuf>,
199    /// The pipe name for Named Pipe connections (Windows only).
200    #[cfg(windows)]
201    pipe_name: Option<String>,
202    /// The log directory where hyperd writes its log files.
203    log_dir: Option<PathBuf>,
204}
205
206impl HyperProcess {
207    /// Starts a new Hyper server instance.
208    ///
209    /// This method:
210    /// 1. Creates a callback listener on an ephemeral port
211    /// 2. Starts the hyperd process with the callback connection address
212    /// 3. Waits for Hyper to connect back and provide its listen endpoint
213    ///
214    /// # Arguments
215    ///
216    /// * `hyper_path` - Optional path to the hyperd executable. If `None`, searches
217    ///   in common locations (`HYPERD_PATH` env var, then known build output paths).
218    /// * `parameters` - Optional parameters for the server.
219    ///
220    /// # Errors
221    ///
222    /// Returns an error if:
223    /// - The hyperd executable cannot be found
224    /// - The callback listener cannot be created
225    /// - The server fails to start
226    /// - Hyper doesn't connect back within the timeout (30 seconds)
227    ///
228    /// # Example
229    ///
230    /// ```no_run
231    /// use hyperdb_api::{HyperProcess, Result};
232    /// use std::path::Path;
233    ///
234    /// fn main() -> Result<()> {
235    ///     // Auto-detect hyperd location
236    ///     let hyper = HyperProcess::new(None, None)?;
237    ///
238    ///     // Or specify explicit path
239    ///     let hyper2 = HyperProcess::new(
240    ///         Some(Path::new("/path/to/hyperd")),
241    ///         None,
242    ///     )?;
243    ///     Ok(())
244    /// }
245    /// ```
246    pub fn new(hyper_path: Option<&Path>, parameters: Option<&Parameters>) -> Result<Self> {
247        let hyperd_path = match hyper_path {
248            Some(path) => path.to_path_buf(),
249            None => Self::find_hyperd()?,
250        };
251        Self::start_server(&hyperd_path, parameters)
252    }
253
254    /// Resolves the hyperd executable from the `HYPERD_PATH` environment
255    /// variable. The value can point at the executable directly, or at a
256    /// directory containing it.
257    ///
258    /// If `HYPERD_PATH` is unset, returns an error instructing the caller
259    /// to either set it or run the `hyperd-bootstrap` downloader to
260    /// install a pinned release at `.hyperd/current/hyperd`.
261    fn find_hyperd() -> Result<PathBuf> {
262        #[cfg(windows)]
263        const HYPERD_EXE: &str = "hyperd.exe";
264        #[cfg(not(windows))]
265        const HYPERD_EXE: &str = "hyperd";
266
267        let Ok(path_str) = std::env::var("HYPERD_PATH") else {
268            // Walk up from CWD looking for .hyperd/current/<exe> written by
269            // `hyperd-bootstrap download`. This lets `node examples/foo.mjs`
270            // run from any subdirectory of the repo without exporting HYPERD_PATH.
271            if let Ok(cwd) = std::env::current_dir() {
272                let mut dir = cwd.as_path();
273                loop {
274                    let candidate = dir.join(".hyperd").join("current").join(HYPERD_EXE);
275                    if candidate.exists() {
276                        return Ok(candidate);
277                    }
278                    match dir.parent() {
279                        Some(parent) => dir = parent,
280                        None => break,
281                    }
282                }
283            }
284            return Err(Error::new(
285                "HYPERD_PATH is not set. Point it at a hyperd executable, \
286                or run `make download-hyperd` (or `cargo run -p hyperd-bootstrap -- download`) \
287                to install a pinned release at `.hyperd/current/hyperd`.",
288            ));
289        };
290
291        let path = PathBuf::from(&path_str);
292        if path.is_dir() {
293            let with_exe = path.join(HYPERD_EXE);
294            if with_exe.exists() {
295                return Ok(with_exe);
296            }
297            #[cfg(windows)]
298            {
299                let without_exe = path.join("hyperd");
300                if without_exe.exists() {
301                    return Ok(without_exe);
302                }
303            }
304            return Err(Error::new(format!(
305                "HYPERD_PATH set to '{path_str}' but {HYPERD_EXE} not found in that directory"
306            )));
307        }
308        if path.exists() {
309            return Ok(path);
310        }
311        #[cfg(windows)]
312        {
313            let with_ext = PathBuf::from(format!("{path_str}.exe"));
314            if with_ext.exists() {
315                return Ok(with_ext);
316            }
317        }
318        Err(Error::new(format!(
319            "HYPERD_PATH set to '{}' but hyperd executable not found (checked: {})",
320            path_str,
321            path.display()
322        )))
323    }
324
325    /// Starts the hyperd server process with callback connection.
326    fn start_server(hyperd_path: &Path, parameters: Option<&Parameters>) -> Result<Self> {
327        // Verify hyperd exists
328        if !hyperd_path.exists() {
329            return Err(Error::new(format!(
330                "Hyper executable not found at: {}",
331                hyperd_path.display()
332            )));
333        }
334
335        info!(
336            target: "hyperdb_api",
337            path = %hyperd_path.display(),
338            "hyperd-starting"
339        );
340
341        // Create callback listener on ephemeral port
342        // This is the "dead man's switch" - when this connection is closed, Hyper shuts down
343        let callback_listener = TcpListener::bind("127.0.0.1:0")
344            .map_err(|e| Error::new(format!("Failed to create callback listener: {e}")))?;
345
346        let callback_port = callback_listener
347            .local_addr()
348            .map_err(|e| Error::new(format!("Failed to get callback port: {e}")))?
349            .port();
350
351        // Set a timeout for accepting the callback connection
352        callback_listener
353            .set_nonblocking(false)
354            .map_err(|e| Error::new(format!("Failed to set callback listener to blocking: {e}")))?;
355
356        // Check if user wants to disable default parameters
357        let use_defaults = parameters.map_or(true, |p| !p.contains_key(NO_DEFAULT_PARAMETERS));
358
359        // Get the listen mode
360        let listen_mode = parameters.and_then(|p| p.listen_mode).unwrap_or_default();
361
362        // Get transport mode (default to TCP until UDS performance is validated)
363        // See IPC_IMPLEMENTATION.md for details
364        #[cfg(unix)]
365        let transport_mode = parameters
366            .and_then(|p| p.transport_mode)
367            .unwrap_or(TransportMode::Tcp);
368        #[cfg(windows)]
369        let transport_mode = parameters
370            .and_then(|p| p.transport_mode)
371            .unwrap_or(TransportMode::Tcp);
372        #[cfg(not(any(unix, windows)))]
373        let transport_mode = TransportMode::Tcp;
374
375        // Create socket directory for UDS if needed (Unix only)
376        #[cfg(unix)]
377        let socket_directory: Option<PathBuf> = if transport_mode == TransportMode::Ipc {
378            // Use custom directory if provided, otherwise create temp directory
379            let dir = if let Some(custom_dir) =
380                parameters.and_then(|p| p.domain_socket_directory.as_ref())
381            {
382                custom_dir.clone()
383            } else {
384                // Create a temp directory for the socket
385                let temp_dir = std::env::temp_dir().join(format!("hyper-{}", std::process::id()));
386                std::fs::create_dir_all(&temp_dir)
387                    .map_err(|e| Error::new(format!("Failed to create socket directory: {e}")))?;
388                temp_dir
389            };
390            Some(dir)
391        } else {
392            None
393        };
394
395        // On non-Unix platforms there is no UDS socket directory; the variable
396        // is only referenced inside `#[cfg(unix)]` blocks so we do not need a
397        // placeholder binding here.
398
399        // Create pipe name for Named Pipes if needed (Windows only)
400        #[cfg(windows)]
401        let pipe_name: Option<String> = if transport_mode == TransportMode::Ipc {
402            Some(format!("hyper-{}", std::process::id()))
403        } else {
404            None
405        };
406
407        // Build command arguments
408        let mut cmd = Command::new(hyperd_path);
409
410        // The "run" subcommand starts the server
411        cmd.arg("run");
412
413        // Callback connection - Hyper will connect to this and send its endpoint
414        // When this connection is closed, Hyper will shut down gracefully
415        cmd.arg("--callback-connection")
416            .arg(format!("tab.tcp://127.0.0.1:{callback_port}"));
417
418        // Configure listen connection based on mode and transport
419        // Connection string formats:
420        // - tab.tcp://host:port - libpq over TCP
421        // - tab.domain://<dir>/domain/<name> - libpq over Unix Domain Socket
422        // - tcp.grpc://host:port - gRPC
423        #[cfg(unix)]
424        let listen_connection = if transport_mode == TransportMode::Ipc {
425            let socket_dir = socket_directory.as_ref().unwrap();
426            match listen_mode {
427                ListenMode::LibPq => format!("tab.domain://{}/domain/hyper", socket_dir.display()),
428                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
429                ListenMode::Both { grpc_port } => {
430                    format!(
431                        "tab.domain://{}/domain/hyper,tcp.grpc://127.0.0.1:{}",
432                        socket_dir.display(),
433                        grpc_port
434                    )
435                }
436            }
437        } else {
438            match listen_mode {
439                ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
440                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
441                ListenMode::Both { grpc_port } => {
442                    format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
443                }
444            }
445        };
446
447        #[cfg(windows)]
448        let listen_connection = if transport_mode == TransportMode::Ipc {
449            let pname = pipe_name.as_ref().unwrap();
450            match listen_mode {
451                ListenMode::LibPq => format!("tab.pipe://./pipe/{pname}"),
452                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
453                ListenMode::Both { grpc_port } => {
454                    format!("tab.pipe://./pipe/{pname},tcp.grpc://127.0.0.1:{grpc_port}")
455                }
456            }
457        } else {
458            match listen_mode {
459                ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
460                ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
461                ListenMode::Both { grpc_port } => {
462                    format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
463                }
464            }
465        };
466
467        #[cfg(not(any(unix, windows)))]
468        let listen_connection = match listen_mode {
469            ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
470            ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{}", port),
471            ListenMode::Both { grpc_port } => {
472                format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{}", grpc_port)
473            }
474        };
475
476        cmd.arg("--listen-connection").arg(&listen_connection);
477
478        // Helper to check if a parameter is already set by the user
479        let user_has_param =
480            |key: &str| -> bool { parameters.is_some_and(|p| p.contains_key(key)) };
481
482        // Apply default instance parameters (matching C++ HyperProcess behavior)
483        // These can be overridden by user parameters or disabled entirely with NO_DEFAULT_PARAMETERS
484        if use_defaults {
485            // Initial user for the Hyper instance
486            if !user_has_param("init_user") {
487                cmd.arg("--init-user=tableau_internal_user");
488            }
489
490            // Enable gRPC threads if gRPC mode is enabled
491            // Required for gRPC to function - without this, Hyper will fail to start with:
492            // "gRPC threads are required for running gRPC services"
493            // Using 4 threads as a reasonable default (can be overridden by user)
494            if matches!(
495                listen_mode,
496                ListenMode::Grpc { .. } | ListenMode::Both { .. }
497            ) && !user_has_param("grpc_threads")
498            {
499                cmd.arg("--grpc-threads=4");
500            }
501
502            // Enable gRPC result persistence if gRPC mode is enabled
503            // This is required for ADAPTIVE and ASYNC transfer modes
504            if matches!(
505                listen_mode,
506                ListenMode::Grpc { .. } | ListenMode::Both { .. }
507            ) && !user_has_param("grpc_persist_results")
508            {
509                cmd.arg("--grpc-persist-results=true");
510            }
511
512            // Default language setting
513            if !user_has_param("language") {
514                cmd.arg("--language=en_US");
515            }
516
517            // Log configuration: file-based JSON logging
518            if !user_has_param("log_config") {
519                cmd.arg(format!("--log-config={DEFAULT_LOG_CONFIG}"));
520            }
521
522            // Date style for date parsing (Month-Day-Year)
523            if !user_has_param("date_style") {
524                cmd.arg("--date-style=MDY");
525            }
526
527            // Enforce strict date_style (day/month/year ordering must match exactly)
528            if !user_has_param("date_style_lenient") {
529                cmd.arg("--date-style-lenient=false");
530            }
531
532            // Set default log directory to current directory
533            if !user_has_param("log_dir") {
534                if let Ok(cwd) = std::env::current_dir() {
535                    cmd.arg(format!("--log-dir={}", cwd.display()));
536                }
537            }
538
539            // Disable password requirement for local development
540            if !user_has_param("no_password") {
541                cmd.arg("--no-password");
542            }
543
544            // Skip license check for local development
545            if !user_has_param("skip_license") {
546                cmd.arg("--skip-license");
547            }
548
549            // Default new .hyper databases to file format version 3, which
550            // adds support for 128-bit NUMERICs (required to ingest parquet
551            // files whose decimal columns are stored as DECIMAL128).
552            // File format 3 has shipped since Hyper 2022.4.0.
553            if !user_has_param("default_database_version") {
554                cmd.arg("--default-database-version=3");
555            }
556        }
557
558        // Add custom parameters from user
559        if let Some(params) = parameters {
560            for (key, value) in params.iter() {
561                // Skip internal/special parameters
562                if key == "callback_connection"
563                    || key == "listen_connection"
564                    || key == NO_DEFAULT_PARAMETERS
565                {
566                    continue;
567                }
568
569                // Convert underscores to dashes for command-line arguments
570                let cli_key = key.replace('_', "-");
571
572                if value.is_empty() {
573                    cmd.arg(format!("--{cli_key}"));
574                } else {
575                    cmd.arg(format!("--{cli_key}={value}"));
576                }
577            }
578        }
579
580        // Resolve the effective log directory for later access via log_dir()
581        let resolved_log_dir =
582            if let Some(user_dir) = parameters.and_then(|p| p.get("log_dir")).map(PathBuf::from) {
583                Some(user_dir)
584            } else if use_defaults {
585                std::env::current_dir().ok()
586            } else {
587                None
588            };
589
590        // Redirect stdout/stderr to null - we get the endpoint via callback connection
591        cmd.stdout(Stdio::null());
592        cmd.stderr(Stdio::null());
593
594        // On Unix, start hyperd in its own process group so it doesn't receive
595        // Ctrl-C (SIGINT) signals meant for the parent process. This allows the
596        // parent to handle Ctrl-C gracefully and properly shut down hyperd via
597        // the callback connection mechanism.
598        #[cfg(unix)]
599        cmd.process_group(0);
600
601        // On Windows, prevent a console window from flashing when spawning hyperd.
602        // CREATE_NO_WINDOW (0x08000000) suppresses the creation of a visible console.
603        #[cfg(windows)]
604        {
605            use std::os::windows::process::CommandExt;
606            const CREATE_NO_WINDOW: u32 = 0x08000000;
607            cmd.creation_flags(CREATE_NO_WINDOW);
608        }
609
610        // Start the process
611        let child = cmd.spawn().map_err(|e| {
612            Error::new(format!(
613                "Failed to start Hyper server at {}: {}",
614                hyperd_path.display(),
615                e
616            ))
617        })?;
618
619        // Wait for Hyper to connect back to our callback listener
620        let (callback_connection, callback_endpoint) = Self::wait_for_callback(&callback_listener)?;
621
622        // Determine the endpoints based on listen mode and callback response
623        let (endpoint, grpc_endpoint) = match listen_mode {
624            ListenMode::LibPq => {
625                // Callback returns libpq endpoint
626                (Some(callback_endpoint), None)
627            }
628            ListenMode::Grpc { port } => {
629                // Callback returns gRPC endpoint (with resolved port if auto-assigned)
630                let grpc_ep = if port == 0 {
631                    // The callback returns the actual gRPC endpoint with resolved port
632                    callback_endpoint
633                } else {
634                    format!("127.0.0.1:{port}")
635                };
636                (None, Some(grpc_ep))
637            }
638            ListenMode::Both { grpc_port } => {
639                // Callback returns libpq endpoint, gRPC uses specified port
640                (
641                    Some(callback_endpoint),
642                    Some(format!("127.0.0.1:{grpc_port}")),
643                )
644            }
645        };
646
647        // Parse connection endpoint if we have a libpq endpoint
648        let connection_endpoint = endpoint.as_ref().map(|ep| {
649            #[cfg(unix)]
650            {
651                // Check if it's a UDS path (contains path separator but no colon with port)
652                if ep.starts_with('/') || socket_directory.is_some() {
653                    // UDS endpoint - construct from socket directory
654                    if let Some(ref dir) = socket_directory {
655                        return ConnectionEndpoint::domain_socket(dir, "hyper");
656                    }
657                    // Parse as path
658                    let path = std::path::Path::new(ep);
659                    let dir = path.parent().unwrap_or(std::path::Path::new("/"));
660                    let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("hyper");
661                    return ConnectionEndpoint::domain_socket(dir, name);
662                }
663            }
664            #[cfg(windows)]
665            {
666                // Check if it's a Named Pipe endpoint
667                if pipe_name.is_some() {
668                    if let Some(ref pname) = pipe_name {
669                        return ConnectionEndpoint::named_pipe(".", pname);
670                    }
671                }
672            }
673            // TCP endpoint (host:port format)
674            let parts: Vec<&str> = ep.split(':').collect();
675            if parts.len() == 2 {
676                if let Ok(port) = parts[1].parse::<u16>() {
677                    return ConnectionEndpoint::tcp(parts[0], port);
678                }
679            }
680            ConnectionEndpoint::tcp("localhost", 7483) // fallback
681        });
682
683        Ok(HyperProcess {
684            child: Some(child),
685            endpoint,
686            connection_endpoint,
687            grpc_endpoint,
688            hyperd_path: hyperd_path.to_path_buf(),
689            shutdown_initiated: Arc::new(AtomicBool::new(false)),
690            callback_connection: Some(callback_connection),
691            listen_mode,
692            transport_mode,
693            log_dir: resolved_log_dir,
694            #[cfg(unix)]
695            socket_directory,
696            #[cfg(windows)]
697            pipe_name,
698        })
699    }
700
701    /// Waits for Hyper to connect to our callback listener and send its endpoint.
702    ///
703    /// Protocol:
704    /// - Hyper connects to the callback listener
705    /// - Hyper sends: [1 byte length][N bytes connection descriptor string]
706    /// - Connection descriptor format: "tab.tcp://host:port"
707    fn wait_for_callback(listener: &TcpListener) -> Result<(TcpStream, String)> {
708        // Set a timeout for accepting connections
709        listener.set_nonblocking(true).ok();
710
711        let timeout = Duration::from_secs(60);
712        let start = std::time::Instant::now();
713
714        // Poll for incoming connection with timeout
715        let mut stream = loop {
716            if start.elapsed() > timeout {
717                return Err(Error::new(
718                    "Timeout waiting for Hyper to connect to callback listener. \
719                    Hyper may have failed to start - check hyperd logs for details.",
720                ));
721            }
722
723            match listener.accept() {
724                Ok((stream, _addr)) => break stream,
725                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
726                    thread::sleep(Duration::from_millis(50));
727                }
728                Err(e) => {
729                    return Err(Error::new(format!(
730                        "Failed to accept callback connection: {e}"
731                    )));
732                }
733            }
734        };
735
736        // Set stream back to blocking for reading
737        stream
738            .set_nonblocking(false)
739            .map_err(|e| Error::new(format!("Failed to set callback stream to blocking: {e}")))?;
740
741        // Set read timeout
742        stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
743
744        // Read the endpoint descriptor from Hyper
745        // Protocol: [1 byte length][N bytes descriptor string]
746        let mut len_buf = [0u8; 1];
747        stream
748            .read_exact(&mut len_buf)
749            .map_err(|e| Error::new(format!("Failed to read endpoint length from Hyper: {e}")))?;
750
751        let len = len_buf[0] as usize;
752        if len == 0 {
753            return Err(Error::new("Hyper sent empty endpoint descriptor"));
754        }
755
756        let mut descriptor_buf = vec![0u8; len];
757        stream.read_exact(&mut descriptor_buf).map_err(|e| {
758            Error::new(format!(
759                "Failed to read endpoint descriptor from Hyper: {e}"
760            ))
761        })?;
762
763        let descriptor = String::from_utf8(descriptor_buf)
764            .map_err(|e| Error::new(format!("Invalid UTF-8 in endpoint descriptor: {e}")))?;
765
766        // Trim null bytes and whitespace that Hyper may include
767        let descriptor = descriptor.trim_matches(|c: char| c == '\0' || c.is_whitespace());
768
769        // Parse the connection descriptor (format: "tab.tcp://host:port")
770        let endpoint = Self::parse_connection_descriptor(descriptor)?;
771
772        // Clear read timeout for the connection we'll keep open
773        stream.set_read_timeout(None).ok();
774
775        info!(
776            target: "hyperdb_api",
777            %endpoint,
778            "hyperd-started"
779        );
780
781        Ok((stream, endpoint))
782    }
783
784    /// Parses a connection descriptor to extract host:port, socket path, or pipe path.
785    ///
786    /// Input formats:
787    /// - "tab.tcp://host:port" → "host:port"
788    /// - "tab.domain://<dir>/domain/<name>" → "<dir>/domain/<name>" (socket path)
789    /// - "tab.pipe://<host>/pipe/<name>" → "<host>/pipe/<name>" (named pipe)
790    /// - "tcp.grpc://host:port" → "host:port"
791    fn parse_connection_descriptor(descriptor: &str) -> Result<String> {
792        // Handle domain socket format
793        if let Some(rest) = descriptor.strip_prefix("tab.domain://") {
794            // Return the full path for UDS
795            if let Some(idx) = rest.find("/domain/") {
796                let dir = &rest[..idx];
797                let name = &rest[idx + 8..]; // "/domain/".len() == 8
798                let socket_path = format!("{dir}/domain/{name}");
799                return Ok(socket_path);
800            }
801            return Ok(rest.to_string());
802        }
803
804        // Handle named pipe format
805        if let Some(rest) = descriptor.strip_prefix("tab.pipe://") {
806            // Format: tab.pipe://<host>/pipe/<name>
807            // Return as pipe path: \\<host>\pipe\<name>
808            if let Some(idx) = rest.find("/pipe/") {
809                let host = &rest[..idx];
810                let name = &rest[idx + 6..]; // "/pipe/".len() == 6
811                let pipe_path = format!(r"\\{host}\pipe\{name}");
812                return Ok(pipe_path);
813            }
814            return Ok(rest.to_string());
815        }
816
817        // Handle TCP prefixes
818        let without_prefix = descriptor
819            .strip_prefix("tab.tcp://")
820            .or_else(|| descriptor.strip_prefix("tcp.grpc://"))
821            .or_else(|| descriptor.strip_prefix("tcp.grpctls://"))
822            .or_else(|| descriptor.strip_prefix("tcp://"))
823            .unwrap_or(descriptor);
824
825        // Validate it looks like host:port
826        if without_prefix.contains(':') && !without_prefix.is_empty() {
827            Ok(without_prefix.to_string())
828        } else {
829            Err(Error::new(format!(
830                "Invalid connection descriptor format: '{descriptor}'. Expected '<scheme>://host:port' or 'tab.domain://<dir>/domain/<name>'"
831            )))
832        }
833    }
834
835    /// Returns the libpq endpoint for connecting to this instance.
836    ///
837    /// The endpoint is in the format "host:port" (e.g., "localhost:54321").
838    ///
839    /// Returns `None` if the process was started in gRPC-only mode.
840    /// Use [`grpc_endpoint`](Self::grpc_endpoint) for gRPC connections.
841    #[must_use]
842    pub fn endpoint(&self) -> Option<&str> {
843        self.endpoint.as_deref()
844    }
845
846    /// Returns the libpq endpoint, or an error if not available.
847    ///
848    /// This is a convenience method to avoid `unwrap()` calls when you need
849    /// the endpoint and want proper error handling.
850    ///
851    /// # Errors
852    ///
853    /// Returns an error if this process was started in gRPC-only mode.
854    ///
855    /// # Example
856    ///
857    /// ```no_run
858    /// use hyperdb_api::{HyperProcess, Result};
859    ///
860    /// fn main() -> Result<()> {
861    ///     let hyper = HyperProcess::new(None, None)?;
862    ///     let endpoint = hyper.require_endpoint()?; // No unwrap() needed!
863    ///     println!("Server running at: {}", endpoint);
864    ///     Ok(())
865    /// }
866    /// ```
867    pub fn require_endpoint(&self) -> crate::error::Result<&str> {
868        self.endpoint().ok_or_else(|| {
869            crate::error::Error::new(
870                "HyperProcess does not have a libpq endpoint (gRPC-only mode). \
871                 Use grpc_endpoint() instead or start with LibPq or Both listen mode.",
872            )
873        })
874    }
875
876    /// Returns the gRPC endpoint for connecting to this instance.
877    ///
878    /// The endpoint is in the format "host:port" (e.g., "127.0.0.1:7484").
879    ///
880    /// Returns `None` if the process was started in libpq-only mode.
881    #[must_use]
882    pub fn grpc_endpoint(&self) -> Option<&str> {
883        self.grpc_endpoint.as_deref()
884    }
885
886    /// Returns the gRPC endpoint, or an error if not available.
887    ///
888    /// This is a convenience method to avoid `unwrap()` calls when you need
889    /// the gRPC endpoint and want proper error handling.
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if this process was started in libpq-only mode.
894    pub fn require_grpc_endpoint(&self) -> crate::error::Result<&str> {
895        self.grpc_endpoint().ok_or_else(|| {
896            crate::error::Error::new(
897                "HyperProcess does not have a gRPC endpoint (libpq-only mode). \
898                 Use endpoint() instead or start with Grpc or Both listen mode.",
899            )
900        })
901    }
902
903    /// Returns the gRPC endpoint as a full URL suitable for gRPC clients.
904    ///
905    /// Returns the endpoint prefixed with "http://" (e.g., "<http://127.0.0.1:7484>").
906    /// Returns `None` if the process was started in libpq-only mode.
907    #[must_use]
908    pub fn grpc_url(&self) -> Option<String> {
909        self.grpc_endpoint.as_ref().map(|ep| format!("http://{ep}"))
910    }
911
912    /// Returns the gRPC URL, or an error if not available.
913    ///
914    /// This is a convenience method to avoid `unwrap()` calls when you need
915    /// the gRPC URL and want proper error handling.
916    ///
917    /// # Errors
918    ///
919    /// Returns an error if this process was started in libpq-only mode.
920    pub fn require_grpc_url(&self) -> crate::error::Result<String> {
921        Ok(format!("http://{}", self.require_grpc_endpoint()?))
922    }
923
924    /// Returns the listen mode this process was started with.
925    #[must_use]
926    pub fn listen_mode(&self) -> ListenMode {
927        self.listen_mode
928    }
929
930    /// Returns the transport mode this process was started with.
931    #[must_use]
932    pub fn transport_mode(&self) -> TransportMode {
933        self.transport_mode
934    }
935
936    /// Returns the connection endpoint for this process.
937    ///
938    /// This returns a [`ConnectionEndpoint`] that can be used to connect
939    /// to this Hyper instance via TCP, Unix Domain Socket, or Named Pipe.
940    #[must_use]
941    pub fn connection_endpoint(&self) -> Option<&ConnectionEndpoint> {
942        self.connection_endpoint.as_ref()
943    }
944
945    /// Returns the log directory where hyperd writes its log files.
946    ///
947    /// The log file is typically `hyperd.log` within this directory.
948    /// Returns `None` if the log directory could not be determined (e.g.,
949    /// default parameters were disabled and no `log_dir` was specified).
950    ///
951    /// This is useful for setting up [`QueryStatsProvider`](crate::QueryStatsProvider)
952    /// implementations that parse the Hyper log file.
953    #[must_use]
954    pub fn log_dir(&self) -> Option<&Path> {
955        self.log_dir.as_deref()
956    }
957
958    /// Returns the socket directory used for UDS connections (Unix only).
959    ///
960    /// Returns `None` if the process is using TCP or if no socket directory was created.
961    #[cfg(unix)]
962    #[must_use]
963    pub fn socket_directory(&self) -> Option<&Path> {
964        self.socket_directory.as_deref()
965    }
966
967    /// Returns the pipe name used for Named Pipe connections (Windows only).
968    ///
969    /// Returns `None` if the process is using TCP.
970    #[cfg(windows)]
971    pub fn pipe_name(&self) -> Option<&str> {
972        self.pipe_name.as_deref()
973    }
974
975    /// Returns the process ID of the Hyper server.
976    #[must_use]
977    pub fn pid(&self) -> Option<u32> {
978        self.child.as_ref().map(std::process::Child::id)
979    }
980
981    /// Returns whether the Hyper server process is still running.
982    #[must_use]
983    pub fn is_running(&self) -> bool {
984        if let Some(ref child) = self.child {
985            // Try to check if process is alive without waiting
986            #[cfg(unix)]
987            {
988                match Command::new("kill")
989                    .args(["-0", &child.id().to_string()])
990                    .output()
991                {
992                    Ok(output) => output.status.success(),
993                    Err(_) => false,
994                }
995            }
996            #[cfg(not(unix))]
997            {
998                // On Windows, we can't easily check without waiting
999                // Assume it's running if we have a handle
1000                let _ = child; // Silence unused variable warning
1001                true
1002            }
1003        } else {
1004            false
1005        }
1006    }
1007
1008    /// Shuts down the Hyper server gracefully with a timeout.
1009    ///
1010    /// This closes the callback connection, which signals Hyper to shut down gracefully.
1011    /// If Hyper doesn't exit within the timeout, it will be forcefully terminated.
1012    ///
1013    /// # Arguments
1014    ///
1015    /// * `timeout` - Maximum time to wait for graceful shutdown before force-killing.
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the shutdown fails.
1020    pub fn shutdown_timeout(mut self, timeout: Duration) -> Result<()> {
1021        self.shutdown_initiated.store(true, Ordering::SeqCst);
1022        self.do_shutdown(Some(timeout))
1023    }
1024
1025    /// Shuts down the Hyper server gracefully, waiting indefinitely.
1026    ///
1027    /// This closes the callback connection and waits for Hyper to exit.
1028    /// Use [`shutdown_timeout`](Self::shutdown_timeout) if you need a timeout.
1029    ///
1030    /// # Errors
1031    ///
1032    /// Returns an error if the shutdown fails.
1033    pub fn shutdown_graceful(mut self) -> Result<()> {
1034        self.shutdown_initiated.store(true, Ordering::SeqCst);
1035        self.do_shutdown(None)
1036    }
1037
1038    /// Closes the callback connection to signal Hyper to shut down.
1039    ///
1040    /// This is the graceful shutdown mechanism - Hyper monitors the callback connection
1041    /// and will initiate shutdown when it's closed.
1042    fn close_callback_connection(&mut self) {
1043        if let Some(conn) = self.callback_connection.take() {
1044            // Gracefully shutdown both directions
1045            let _ = conn.shutdown(Shutdown::Both);
1046            // Connection is dropped here, closing the socket
1047        }
1048    }
1049
1050    /// Internal shutdown implementation.
1051    fn do_shutdown(&mut self, timeout: Option<Duration>) -> Result<()> {
1052        info!(target: "hyperdb_api", "hyperd-shutdown");
1053
1054        // Step 1: Close the callback connection to signal graceful shutdown
1055        // Hyper will detect this and begin shutting down
1056        self.close_callback_connection();
1057
1058        if let Some(mut child) = self.child.take() {
1059            // Step 2: Wait for the process to exit
1060            let wait_result = if let Some(timeout) = timeout {
1061                // Wait with timeout
1062                let start = std::time::Instant::now();
1063                loop {
1064                    match child.try_wait() {
1065                        Ok(Some(status)) => break Ok(status),
1066                        Ok(None) => {
1067                            if start.elapsed() > timeout {
1068                                // Step 3: Force kill ONLY after timeout
1069                                // This should rarely happen if Hyper is healthy
1070                                #[cfg(unix)]
1071                                {
1072                                    // Try SIGTERM first
1073                                    let _ = Command::new("kill")
1074                                        .args(["-TERM", &child.id().to_string()])
1075                                        .output();
1076                                    thread::sleep(Duration::from_millis(100));
1077                                }
1078                                // Then force kill
1079                                let _ = child.kill();
1080                                break child.wait().map_err(|e| {
1081                                    Error::new(format!("Failed to wait for hyperd: {e}"))
1082                                });
1083                            }
1084                            thread::sleep(Duration::from_millis(100));
1085                        }
1086                        Err(e) => break Err(Error::new(format!("Failed to wait for hyperd: {e}"))),
1087                    }
1088                }
1089            } else {
1090                // Wait indefinitely
1091                child
1092                    .wait()
1093                    .map_err(|e| Error::new(format!("Failed to wait for hyperd: {e}")))
1094            };
1095
1096            wait_result?;
1097        }
1098
1099        Ok(())
1100    }
1101}
1102
1103impl Drop for HyperProcess {
1104    fn drop(&mut self) {
1105        if !self.shutdown_initiated.load(Ordering::SeqCst) {
1106            // Try to gracefully shutdown with a short timeout
1107            let _ = self.do_shutdown(Some(Duration::from_secs(5)));
1108        }
1109
1110        // Clean up socket directory if we created one
1111        #[cfg(unix)]
1112        if let Some(ref dir) = self.socket_directory {
1113            // Only clean up if it's a temp directory we created (contains our PID)
1114            let dir_name = dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
1115            if dir_name.starts_with("hyper-") {
1116                let _ = std::fs::remove_dir_all(dir);
1117            }
1118        }
1119    }
1120}
1121
1122// SAFETY: `HyperProcess` owns its `std::process::Child` handle and (optionally)
1123// a TCP connection. Both are themselves `Send`, and no field holds thread-local
1124// state or a non-`Send` raw pointer. Ownership of a `HyperProcess` therefore
1125// transfers cleanly across thread boundaries.
1126unsafe impl Send for HyperProcess {}
1127
1128/// Special parameter key that disables default instance parameters when present.
1129///
1130/// By default, [`HyperProcess`] starts hyperd with a set of sensible default parameters
1131/// (matching the C++ `HyperProcess` behavior). If you need full control over all parameters,
1132/// include this key in your [`Parameters`] to disable all defaults.
1133pub(crate) const NO_DEFAULT_PARAMETERS: &str = "no_default_parameters";
1134
1135/// Default log configuration for hyperd: file-based JSON logging.
1136const DEFAULT_LOG_CONFIG: &str = "file,json,all,hyperd,0";
1137
1138/// Parameters for configuring the Hyper server.
1139///
1140/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1141/// (matching the C++ `HyperProcess` behavior):
1142///
1143/// | Parameter | Default Value | Description |
1144/// |-----------|---------------|-------------|
1145/// | `init_user` | `tableau_internal_user` | Initial user for the Hyper instance |
1146/// | `language` | `en_US` | Default language setting |
1147/// | `log_config` | `file,json,all,hyperd,0` | Log configuration |
1148/// | `date_style` | `MDY` | Date format (Month-Day-Year) |
1149/// | `date_style_lenient` | `false` | Strict date parsing |
1150/// | `log_dir` | Current directory | Log file directory |
1151/// | `no_password` | (flag) | Disable password requirement |
1152/// | `skip_license` | (flag) | Skip license check |
1153/// | `default_database_version` | `3` | File format version for newly created `.hyper` databases (v3 adds 128-bit NUMERIC support, required for DECIMAL128 parquet columns) |
1154///
1155/// To disable these defaults, add the `no_default_parameters` key (for example
1156/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1157///
1158/// # Listen Modes
1159///
1160/// Use [`set_listen_mode`](Parameters::set_listen_mode) to configure which protocols Hyper listens on:
1161///
1162/// ```
1163/// use hyperdb_api::{ListenMode, Parameters};
1164///
1165/// // gRPC only (for Arrow-based queries)
1166/// let mut params = Parameters::new();
1167/// params.set_listen_mode(ListenMode::Grpc { port: 0 });
1168///
1169/// // Both libpq and gRPC
1170/// let mut params = Parameters::new();
1171/// params.set_listen_mode(ListenMode::Both { grpc_port: 7484 });
1172/// ```
1173///
1174/// # Example
1175///
1176/// ```
1177/// use hyperdb_api::Parameters;
1178///
1179/// let mut params = Parameters::new();
1180/// params.set("log_file_size_limit", "100k");
1181/// params.set("log_file_max_count", "7");
1182/// ```
1183///
1184/// # Transport Modes
1185///
1186/// Use [`set_transport_mode`](Parameters::set_transport_mode) to control whether Hyper uses
1187/// TCP or IPC (Unix Domain Sockets):
1188///
1189/// ```
1190/// use hyperdb_api::{Parameters, TransportMode};
1191///
1192/// let mut params = Parameters::new();
1193/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1194/// ```
1195///
1196/// Transport mode for `HyperProcess` connections.
1197///
1198/// Controls whether the server uses TCP or Unix Domain Sockets (IPC) for connections.
1199/// On Unix systems, IPC is the default for better local performance.
1200/// On Windows, TCP is always used.
1201#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1202pub enum TransportMode {
1203    /// Use IPC (Unix Domain Sockets on Unix, Named Pipes on Windows).
1204    /// This is the default mode and provides better performance for local connections.
1205    #[default]
1206    Ipc,
1207
1208    /// Use TCP/IP connections.
1209    /// Required when connecting from remote clients or when IPC is not available.
1210    Tcp,
1211}
1212
1213/// Parameters for configuring the Hyper server.
1214///
1215/// When starting a [`HyperProcess`], a set of default parameters are automatically applied
1216/// (matching the C++ `HyperProcess` behavior). You can override these defaults or disable
1217/// them entirely by adding the `no_default_parameters` key (for example
1218/// `params.set("no_default_parameters", "")` via [`Parameters::set`].
1219///
1220/// # Transport Modes
1221///
1222/// Use [`set_transport_mode`](Self::set_transport_mode) to control whether Hyper uses
1223/// TCP or IPC (Unix Domain Sockets on Unix systems).
1224///
1225/// # Example
1226///
1227/// ```
1228/// use hyperdb_api::{Parameters, TransportMode};
1229///
1230/// let mut params = Parameters::new();
1231/// params.set("log_file_size_limit", "100k");
1232/// params.set_transport_mode(TransportMode::Tcp); // Force TCP instead of IPC
1233/// ```
1234#[derive(Debug, Clone, Default)]
1235pub struct Parameters {
1236    values: Vec<(String, String)>,
1237    /// The listen mode for the Hyper server.
1238    pub(crate) listen_mode: Option<ListenMode>,
1239    /// The transport mode (TCP or IPC/UDS).
1240    pub(crate) transport_mode: Option<TransportMode>,
1241    /// Custom domain socket directory (Unix only).
1242    #[cfg(unix)]
1243    pub(crate) domain_socket_directory: Option<PathBuf>,
1244}
1245
1246impl Parameters {
1247    /// Creates a new empty Parameters instance.
1248    #[must_use]
1249    pub fn new() -> Self {
1250        Parameters {
1251            values: Vec::new(),
1252            listen_mode: None,
1253            transport_mode: None,
1254            #[cfg(unix)]
1255            domain_socket_directory: None,
1256        }
1257    }
1258
1259    /// Sets the transport mode (TCP or IPC/UDS).
1260    ///
1261    /// By default, `HyperProcess` uses IPC (Unix Domain Sockets on Unix) for better
1262    /// performance. Use `TransportMode::Tcp` if you need TCP connections.
1263    ///
1264    /// # Example
1265    ///
1266    /// ```
1267    /// use hyperdb_api::{Parameters, TransportMode};
1268    ///
1269    /// let mut params = Parameters::new();
1270    /// params.set_transport_mode(TransportMode::Tcp); // Use TCP instead of IPC
1271    /// ```
1272    pub fn set_transport_mode(&mut self, mode: TransportMode) -> &mut Self {
1273        self.transport_mode = Some(mode);
1274        self
1275    }
1276
1277    /// Returns the configured transport mode.
1278    #[must_use]
1279    pub fn transport_mode(&self) -> Option<TransportMode> {
1280        self.transport_mode
1281    }
1282
1283    /// Sets a custom domain socket directory (Unix only).
1284    ///
1285    /// By default, `HyperProcess` creates sockets in a temporary directory.
1286    /// Use this to specify a custom location.
1287    #[cfg(unix)]
1288    pub fn set_domain_socket_directory(&mut self, dir: impl Into<PathBuf>) -> &mut Self {
1289        self.domain_socket_directory = Some(dir.into());
1290        self
1291    }
1292
1293    /// Returns the configured domain socket directory (Unix only).
1294    #[cfg(unix)]
1295    #[must_use]
1296    pub fn domain_socket_directory(&self) -> Option<&Path> {
1297        self.domain_socket_directory.as_deref()
1298    }
1299
1300    /// Sets a parameter value.
1301    ///
1302    /// # Arguments
1303    ///
1304    /// * `key` - The parameter name.
1305    /// * `value` - The parameter value (empty string for flags).
1306    pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
1307        let key = key.into();
1308        let value = value.into();
1309
1310        // Update existing or add new
1311        if let Some(entry) = self.values.iter_mut().find(|(k, _)| k == &key) {
1312            entry.1 = value;
1313        } else {
1314            self.values.push((key, value));
1315        }
1316
1317        self
1318    }
1319
1320    /// Sets the listen mode for the Hyper server.
1321    ///
1322    /// This controls which protocols the server listens on:
1323    /// - [`ListenMode::LibPq`]: `PostgreSQL` wire protocol only (default)
1324    /// - [`ListenMode::Grpc`]: gRPC protocol only (query-only, Arrow results)
1325    /// - [`ListenMode::Both`]: Both protocols enabled
1326    ///
1327    /// # Example
1328    ///
1329    /// ```
1330    /// use hyperdb_api::{ListenMode, Parameters};
1331    ///
1332    /// let mut params = Parameters::new();
1333    /// params.set_listen_mode(ListenMode::Grpc { port: 0 }); // Auto-assign port
1334    /// ```
1335    pub fn set_listen_mode(&mut self, mode: ListenMode) -> &mut Self {
1336        self.listen_mode = Some(mode);
1337        self
1338    }
1339
1340    /// Returns the configured listen mode, if any.
1341    #[must_use]
1342    pub fn listen_mode(&self) -> Option<ListenMode> {
1343        self.listen_mode
1344    }
1345
1346    /// Gets a parameter value.
1347    #[must_use]
1348    pub fn get(&self, key: &str) -> Option<&str> {
1349        self.values
1350            .iter()
1351            .find(|(k, _)| k == key)
1352            .map(|(_, v)| v.as_str())
1353    }
1354
1355    /// Returns whether the parameters contain the given key.
1356    #[must_use]
1357    pub fn contains_key(&self, key: &str) -> bool {
1358        self.values.iter().any(|(k, _)| k == key)
1359    }
1360
1361    /// Returns an iterator over the parameters.
1362    pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
1363        self.values.iter().map(|(k, v)| (k.as_str(), v.as_str()))
1364    }
1365
1366    /// Returns whether the parameters are empty.
1367    #[must_use]
1368    pub fn is_empty(&self) -> bool {
1369        self.values.is_empty()
1370    }
1371
1372    /// Returns the number of parameters.
1373    #[must_use]
1374    pub fn len(&self) -> usize {
1375        self.values.len()
1376    }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381    use super::*;
1382
1383    #[test]
1384    fn test_parameters() {
1385        let mut params = Parameters::new();
1386        params.set("key1", "value1");
1387        params.set("key2", "value2");
1388
1389        assert_eq!(params.get("key1"), Some("value1"));
1390        assert_eq!(params.get("key2"), Some("value2"));
1391        assert_eq!(params.get("key3"), None);
1392        assert_eq!(params.len(), 2);
1393    }
1394
1395    #[test]
1396    fn test_parameters_update() {
1397        let mut params = Parameters::new();
1398        params.set("key", "value1");
1399        params.set("key", "value2");
1400
1401        assert_eq!(params.get("key"), Some("value2"));
1402        assert_eq!(params.len(), 1);
1403    }
1404
1405    #[test]
1406    fn test_parse_connection_descriptor() {
1407        assert_eq!(
1408            HyperProcess::parse_connection_descriptor("tab.tcp://localhost:12345").unwrap(),
1409            "localhost:12345"
1410        );
1411        assert_eq!(
1412            HyperProcess::parse_connection_descriptor("tab.tcp://127.0.0.1:7483").unwrap(),
1413            "127.0.0.1:7483"
1414        );
1415        assert_eq!(
1416            HyperProcess::parse_connection_descriptor("tcp://localhost:8080").unwrap(),
1417            "localhost:8080"
1418        );
1419        // Already in host:port format
1420        assert_eq!(
1421            HyperProcess::parse_connection_descriptor("localhost:9999").unwrap(),
1422            "localhost:9999"
1423        );
1424    }
1425
1426    #[test]
1427    fn test_parse_connection_descriptor_named_pipe() {
1428        assert_eq!(
1429            HyperProcess::parse_connection_descriptor("tab.pipe://./pipe/hyper-12345").unwrap(),
1430            r"\\.\pipe\hyper-12345"
1431        );
1432        assert_eq!(
1433            HyperProcess::parse_connection_descriptor("tab.pipe://server1/pipe/mydb").unwrap(),
1434            r"\\server1\pipe\mydb"
1435        );
1436    }
1437
1438    #[test]
1439    fn test_parse_connection_descriptor_invalid() {
1440        assert!(HyperProcess::parse_connection_descriptor("").is_err());
1441        assert!(HyperProcess::parse_connection_descriptor("invalid").is_err());
1442    }
1443
1444    #[test]
1445    fn test_parameters_contains_key() {
1446        let mut params = Parameters::new();
1447        params.set("key1", "value1");
1448
1449        assert!(params.contains_key("key1"));
1450        assert!(!params.contains_key("key2"));
1451    }
1452
1453    #[test]
1454    fn test_no_default_parameters_constant() {
1455        // Verify the constant matches what C++ uses
1456        assert_eq!(NO_DEFAULT_PARAMETERS, "no_default_parameters");
1457    }
1458
1459    #[test]
1460    fn test_parameters_with_no_defaults() {
1461        let mut params = Parameters::new();
1462        params.set(NO_DEFAULT_PARAMETERS, "");
1463        params.set("init_user", "custom_user");
1464
1465        assert!(params.contains_key(NO_DEFAULT_PARAMETERS));
1466        assert_eq!(params.get("init_user"), Some("custom_user"));
1467    }
1468}