use std::io::Read;
use std::net::{Shutdown, TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(any(unix, windows))]
use hyperdb_api_core::client::ConnectionEndpoint;
use tracing::info;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ListenMode {
#[default]
LibPq,
Grpc {
port: u16,
},
Both {
grpc_port: u16,
},
}
#[must_use = "HyperProcess will shut down when dropped; store it to keep the server running"]
#[derive(Debug)]
pub struct HyperProcess {
child: Option<Child>,
endpoint: Option<String>,
connection_endpoint: Option<ConnectionEndpoint>,
grpc_endpoint: Option<String>,
#[expect(
dead_code,
reason = "retained for diagnostics and future restart/respawn paths"
)]
hyperd_path: PathBuf,
shutdown_initiated: Arc<AtomicBool>,
callback_connection: Option<TcpStream>,
listen_mode: ListenMode,
transport_mode: TransportMode,
#[cfg(unix)]
socket_directory: Option<PathBuf>,
#[cfg(windows)]
pipe_name: Option<String>,
log_dir: Option<PathBuf>,
}
impl HyperProcess {
pub fn new(hyper_path: Option<&Path>, parameters: Option<&Parameters>) -> Result<Self> {
let hyperd_path = match hyper_path {
Some(path) => path.to_path_buf(),
None => Self::find_hyperd()?,
};
Self::start_server(&hyperd_path, parameters)
}
fn find_hyperd() -> Result<PathBuf> {
#[cfg(windows)]
const HYPERD_EXE: &str = "hyperd.exe";
#[cfg(not(windows))]
const HYPERD_EXE: &str = "hyperd";
let Ok(path_str) = std::env::var("HYPERD_PATH") else {
if let Ok(cwd) = std::env::current_dir() {
let mut dir = cwd.as_path();
loop {
let candidate = dir.join(".hyperd").join("current").join(HYPERD_EXE);
if candidate.exists() {
return Ok(candidate);
}
match dir.parent() {
Some(parent) => dir = parent,
None => break,
}
}
}
return Err(Error::new(
"HYPERD_PATH is not set. Point it at a hyperd executable, \
or run `make download-hyperd` (or `cargo run -p hyperd-bootstrap -- download`) \
to install a pinned release at `.hyperd/current/hyperd`.",
));
};
let path = PathBuf::from(&path_str);
if path.is_dir() {
let with_exe = path.join(HYPERD_EXE);
if with_exe.exists() {
return Ok(with_exe);
}
#[cfg(windows)]
{
let without_exe = path.join("hyperd");
if without_exe.exists() {
return Ok(without_exe);
}
}
return Err(Error::new(format!(
"HYPERD_PATH set to '{path_str}' but {HYPERD_EXE} not found in that directory"
)));
}
if path.exists() {
return Ok(path);
}
#[cfg(windows)]
{
let with_ext = PathBuf::from(format!("{path_str}.exe"));
if with_ext.exists() {
return Ok(with_ext);
}
}
Err(Error::new(format!(
"HYPERD_PATH set to '{}' but hyperd executable not found (checked: {})",
path_str,
path.display()
)))
}
fn start_server(hyperd_path: &Path, parameters: Option<&Parameters>) -> Result<Self> {
if !hyperd_path.exists() {
return Err(Error::new(format!(
"Hyper executable not found at: {}",
hyperd_path.display()
)));
}
info!(
target: "hyperdb_api",
path = %hyperd_path.display(),
"hyperd-starting"
);
let callback_listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| Error::new(format!("Failed to create callback listener: {e}")))?;
let callback_port = callback_listener
.local_addr()
.map_err(|e| Error::new(format!("Failed to get callback port: {e}")))?
.port();
callback_listener
.set_nonblocking(false)
.map_err(|e| Error::new(format!("Failed to set callback listener to blocking: {e}")))?;
let use_defaults = parameters.map_or(true, |p| !p.contains_key(NO_DEFAULT_PARAMETERS));
let listen_mode = parameters.and_then(|p| p.listen_mode).unwrap_or_default();
#[cfg(unix)]
let transport_mode = parameters
.and_then(|p| p.transport_mode)
.unwrap_or(TransportMode::Tcp);
#[cfg(windows)]
let transport_mode = parameters
.and_then(|p| p.transport_mode)
.unwrap_or(TransportMode::Tcp);
#[cfg(not(any(unix, windows)))]
let transport_mode = TransportMode::Tcp;
#[cfg(unix)]
let socket_directory: Option<PathBuf> = if transport_mode == TransportMode::Ipc {
let dir = if let Some(custom_dir) =
parameters.and_then(|p| p.domain_socket_directory.as_ref())
{
custom_dir.clone()
} else {
let temp_dir = std::env::temp_dir().join(format!("hyper-{}", std::process::id()));
std::fs::create_dir_all(&temp_dir)
.map_err(|e| Error::new(format!("Failed to create socket directory: {e}")))?;
temp_dir
};
Some(dir)
} else {
None
};
#[cfg(windows)]
let pipe_name: Option<String> = if transport_mode == TransportMode::Ipc {
Some(format!("hyper-{}", std::process::id()))
} else {
None
};
let mut cmd = Command::new(hyperd_path);
cmd.arg("run");
cmd.arg("--callback-connection")
.arg(format!("tab.tcp://127.0.0.1:{callback_port}"));
#[cfg(unix)]
let listen_connection = if transport_mode == TransportMode::Ipc {
let socket_dir = socket_directory.as_ref().unwrap();
match listen_mode {
ListenMode::LibPq => format!("tab.domain://{}/domain/hyper", socket_dir.display()),
ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
ListenMode::Both { grpc_port } => {
format!(
"tab.domain://{}/domain/hyper,tcp.grpc://127.0.0.1:{}",
socket_dir.display(),
grpc_port
)
}
}
} else {
match listen_mode {
ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
ListenMode::Both { grpc_port } => {
format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
}
}
};
#[cfg(windows)]
let listen_connection = if transport_mode == TransportMode::Ipc {
let pname = pipe_name.as_ref().unwrap();
match listen_mode {
ListenMode::LibPq => format!("tab.pipe://./pipe/{pname}"),
ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
ListenMode::Both { grpc_port } => {
format!("tab.pipe://./pipe/{pname},tcp.grpc://127.0.0.1:{grpc_port}")
}
}
} else {
match listen_mode {
ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{port}"),
ListenMode::Both { grpc_port } => {
format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{grpc_port}")
}
}
};
#[cfg(not(any(unix, windows)))]
let listen_connection = match listen_mode {
ListenMode::LibPq => "tab.tcp://localhost:0".to_string(),
ListenMode::Grpc { port } => format!("tcp.grpc://127.0.0.1:{}", port),
ListenMode::Both { grpc_port } => {
format!("tab.tcp://localhost:0,tcp.grpc://127.0.0.1:{}", grpc_port)
}
};
cmd.arg("--listen-connection").arg(&listen_connection);
let user_has_param =
|key: &str| -> bool { parameters.is_some_and(|p| p.contains_key(key)) };
if use_defaults {
if !user_has_param("init_user") {
cmd.arg("--init-user=tableau_internal_user");
}
if matches!(
listen_mode,
ListenMode::Grpc { .. } | ListenMode::Both { .. }
) && !user_has_param("grpc_threads")
{
cmd.arg("--grpc-threads=4");
}
if matches!(
listen_mode,
ListenMode::Grpc { .. } | ListenMode::Both { .. }
) && !user_has_param("grpc_persist_results")
{
cmd.arg("--grpc-persist-results=true");
}
if !user_has_param("language") {
cmd.arg("--language=en_US");
}
if !user_has_param("log_config") {
cmd.arg(format!("--log-config={DEFAULT_LOG_CONFIG}"));
}
if !user_has_param("date_style") {
cmd.arg("--date-style=MDY");
}
if !user_has_param("date_style_lenient") {
cmd.arg("--date-style-lenient=false");
}
if !user_has_param("log_dir") {
if let Ok(cwd) = std::env::current_dir() {
cmd.arg(format!("--log-dir={}", cwd.display()));
}
}
if !user_has_param("no_password") {
cmd.arg("--no-password");
}
if !user_has_param("skip_license") {
cmd.arg("--skip-license");
}
if !user_has_param("default_database_version") {
cmd.arg("--default-database-version=3");
}
}
if let Some(params) = parameters {
for (key, value) in params.iter() {
if key == "callback_connection"
|| key == "listen_connection"
|| key == NO_DEFAULT_PARAMETERS
{
continue;
}
let cli_key = key.replace('_', "-");
if value.is_empty() {
cmd.arg(format!("--{cli_key}"));
} else {
cmd.arg(format!("--{cli_key}={value}"));
}
}
}
let resolved_log_dir =
if let Some(user_dir) = parameters.and_then(|p| p.get("log_dir")).map(PathBuf::from) {
Some(user_dir)
} else if use_defaults {
std::env::current_dir().ok()
} else {
None
};
cmd.stdout(Stdio::null());
cmd.stderr(Stdio::null());
#[cfg(unix)]
cmd.process_group(0);
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
const CREATE_NO_WINDOW: u32 = 0x08000000;
cmd.creation_flags(CREATE_NO_WINDOW);
}
let child = cmd.spawn().map_err(|e| {
Error::new(format!(
"Failed to start Hyper server at {}: {}",
hyperd_path.display(),
e
))
})?;
let (callback_connection, callback_endpoint) = Self::wait_for_callback(&callback_listener)?;
let (endpoint, grpc_endpoint) = match listen_mode {
ListenMode::LibPq => {
(Some(callback_endpoint), None)
}
ListenMode::Grpc { port } => {
let grpc_ep = if port == 0 {
callback_endpoint
} else {
format!("127.0.0.1:{port}")
};
(None, Some(grpc_ep))
}
ListenMode::Both { grpc_port } => {
(
Some(callback_endpoint),
Some(format!("127.0.0.1:{grpc_port}")),
)
}
};
let connection_endpoint = endpoint.as_ref().map(|ep| {
#[cfg(unix)]
{
if ep.starts_with('/') || socket_directory.is_some() {
if let Some(ref dir) = socket_directory {
return ConnectionEndpoint::domain_socket(dir, "hyper");
}
let path = std::path::Path::new(ep);
let dir = path.parent().unwrap_or(std::path::Path::new("/"));
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("hyper");
return ConnectionEndpoint::domain_socket(dir, name);
}
}
#[cfg(windows)]
{
if pipe_name.is_some() {
if let Some(ref pname) = pipe_name {
return ConnectionEndpoint::named_pipe(".", pname);
}
}
}
let parts: Vec<&str> = ep.split(':').collect();
if parts.len() == 2 {
if let Ok(port) = parts[1].parse::<u16>() {
return ConnectionEndpoint::tcp(parts[0], port);
}
}
ConnectionEndpoint::tcp("localhost", 7483) });
Ok(HyperProcess {
child: Some(child),
endpoint,
connection_endpoint,
grpc_endpoint,
hyperd_path: hyperd_path.to_path_buf(),
shutdown_initiated: Arc::new(AtomicBool::new(false)),
callback_connection: Some(callback_connection),
listen_mode,
transport_mode,
log_dir: resolved_log_dir,
#[cfg(unix)]
socket_directory,
#[cfg(windows)]
pipe_name,
})
}
fn wait_for_callback(listener: &TcpListener) -> Result<(TcpStream, String)> {
listener.set_nonblocking(true).ok();
let timeout = Duration::from_secs(60);
let start = std::time::Instant::now();
let mut stream = loop {
if start.elapsed() > timeout {
return Err(Error::new(
"Timeout waiting for Hyper to connect to callback listener. \
Hyper may have failed to start - check hyperd logs for details.",
));
}
match listener.accept() {
Ok((stream, _addr)) => break stream,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(50));
}
Err(e) => {
return Err(Error::new(format!(
"Failed to accept callback connection: {e}"
)));
}
}
};
stream
.set_nonblocking(false)
.map_err(|e| Error::new(format!("Failed to set callback stream to blocking: {e}")))?;
stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
let mut len_buf = [0u8; 1];
stream
.read_exact(&mut len_buf)
.map_err(|e| Error::new(format!("Failed to read endpoint length from Hyper: {e}")))?;
let len = len_buf[0] as usize;
if len == 0 {
return Err(Error::new("Hyper sent empty endpoint descriptor"));
}
let mut descriptor_buf = vec![0u8; len];
stream.read_exact(&mut descriptor_buf).map_err(|e| {
Error::new(format!(
"Failed to read endpoint descriptor from Hyper: {e}"
))
})?;
let descriptor = String::from_utf8(descriptor_buf)
.map_err(|e| Error::new(format!("Invalid UTF-8 in endpoint descriptor: {e}")))?;
let descriptor = descriptor.trim_matches(|c: char| c == '\0' || c.is_whitespace());
let endpoint = Self::parse_connection_descriptor(descriptor)?;
stream.set_read_timeout(None).ok();
info!(
target: "hyperdb_api",
%endpoint,
"hyperd-started"
);
Ok((stream, endpoint))
}
fn parse_connection_descriptor(descriptor: &str) -> Result<String> {
if let Some(rest) = descriptor.strip_prefix("tab.domain://") {
if let Some(idx) = rest.find("/domain/") {
let dir = &rest[..idx];
let name = &rest[idx + 8..]; let socket_path = format!("{dir}/domain/{name}");
return Ok(socket_path);
}
return Ok(rest.to_string());
}
if let Some(rest) = descriptor.strip_prefix("tab.pipe://") {
if let Some(idx) = rest.find("/pipe/") {
let host = &rest[..idx];
let name = &rest[idx + 6..]; let pipe_path = format!(r"\\{host}\pipe\{name}");
return Ok(pipe_path);
}
return Ok(rest.to_string());
}
let without_prefix = descriptor
.strip_prefix("tab.tcp://")
.or_else(|| descriptor.strip_prefix("tcp.grpc://"))
.or_else(|| descriptor.strip_prefix("tcp.grpctls://"))
.or_else(|| descriptor.strip_prefix("tcp://"))
.unwrap_or(descriptor);
if without_prefix.contains(':') && !without_prefix.is_empty() {
Ok(without_prefix.to_string())
} else {
Err(Error::new(format!(
"Invalid connection descriptor format: '{descriptor}'. Expected '<scheme>://host:port' or 'tab.domain://<dir>/domain/<name>'"
)))
}
}
#[must_use]
pub fn endpoint(&self) -> Option<&str> {
self.endpoint.as_deref()
}
pub fn require_endpoint(&self) -> crate::error::Result<&str> {
self.endpoint().ok_or_else(|| {
crate::error::Error::new(
"HyperProcess does not have a libpq endpoint (gRPC-only mode). \
Use grpc_endpoint() instead or start with LibPq or Both listen mode.",
)
})
}
#[must_use]
pub fn grpc_endpoint(&self) -> Option<&str> {
self.grpc_endpoint.as_deref()
}
pub fn require_grpc_endpoint(&self) -> crate::error::Result<&str> {
self.grpc_endpoint().ok_or_else(|| {
crate::error::Error::new(
"HyperProcess does not have a gRPC endpoint (libpq-only mode). \
Use endpoint() instead or start with Grpc or Both listen mode.",
)
})
}
#[must_use]
pub fn grpc_url(&self) -> Option<String> {
self.grpc_endpoint.as_ref().map(|ep| format!("http://{ep}"))
}
pub fn require_grpc_url(&self) -> crate::error::Result<String> {
Ok(format!("http://{}", self.require_grpc_endpoint()?))
}
#[must_use]
pub fn listen_mode(&self) -> ListenMode {
self.listen_mode
}
#[must_use]
pub fn transport_mode(&self) -> TransportMode {
self.transport_mode
}
#[must_use]
pub fn connection_endpoint(&self) -> Option<&ConnectionEndpoint> {
self.connection_endpoint.as_ref()
}
#[must_use]
pub fn log_dir(&self) -> Option<&Path> {
self.log_dir.as_deref()
}
#[cfg(unix)]
#[must_use]
pub fn socket_directory(&self) -> Option<&Path> {
self.socket_directory.as_deref()
}
#[cfg(windows)]
pub fn pipe_name(&self) -> Option<&str> {
self.pipe_name.as_deref()
}
#[must_use]
pub fn pid(&self) -> Option<u32> {
self.child.as_ref().map(std::process::Child::id)
}
#[must_use]
pub fn is_running(&self) -> bool {
if let Some(ref child) = self.child {
#[cfg(unix)]
{
match Command::new("kill")
.args(["-0", &child.id().to_string()])
.output()
{
Ok(output) => output.status.success(),
Err(_) => false,
}
}
#[cfg(not(unix))]
{
let _ = child; true
}
} else {
false
}
}
pub fn shutdown_timeout(mut self, timeout: Duration) -> Result<()> {
self.shutdown_initiated.store(true, Ordering::SeqCst);
self.do_shutdown(Some(timeout))
}
pub fn shutdown_graceful(mut self) -> Result<()> {
self.shutdown_initiated.store(true, Ordering::SeqCst);
self.do_shutdown(None)
}
fn close_callback_connection(&mut self) {
if let Some(conn) = self.callback_connection.take() {
let _ = conn.shutdown(Shutdown::Both);
}
}
fn do_shutdown(&mut self, timeout: Option<Duration>) -> Result<()> {
info!(target: "hyperdb_api", "hyperd-shutdown");
self.close_callback_connection();
if let Some(mut child) = self.child.take() {
let wait_result = if let Some(timeout) = timeout {
let start = std::time::Instant::now();
loop {
match child.try_wait() {
Ok(Some(status)) => break Ok(status),
Ok(None) => {
if start.elapsed() > timeout {
#[cfg(unix)]
{
let _ = Command::new("kill")
.args(["-TERM", &child.id().to_string()])
.output();
thread::sleep(Duration::from_millis(100));
}
let _ = child.kill();
break child.wait().map_err(|e| {
Error::new(format!("Failed to wait for hyperd: {e}"))
});
}
thread::sleep(Duration::from_millis(100));
}
Err(e) => break Err(Error::new(format!("Failed to wait for hyperd: {e}"))),
}
}
} else {
child
.wait()
.map_err(|e| Error::new(format!("Failed to wait for hyperd: {e}")))
};
wait_result?;
}
Ok(())
}
}
impl Drop for HyperProcess {
fn drop(&mut self) {
if !self.shutdown_initiated.load(Ordering::SeqCst) {
let _ = self.do_shutdown(Some(Duration::from_secs(5)));
}
#[cfg(unix)]
if let Some(ref dir) = self.socket_directory {
let dir_name = dir.file_name().and_then(|n| n.to_str()).unwrap_or("");
if dir_name.starts_with("hyper-") {
let _ = std::fs::remove_dir_all(dir);
}
}
}
}
unsafe impl Send for HyperProcess {}
pub(crate) const NO_DEFAULT_PARAMETERS: &str = "no_default_parameters";
const DEFAULT_LOG_CONFIG: &str = "file,json,all,hyperd,0";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TransportMode {
#[default]
Ipc,
Tcp,
}
#[derive(Debug, Clone, Default)]
pub struct Parameters {
values: Vec<(String, String)>,
pub(crate) listen_mode: Option<ListenMode>,
pub(crate) transport_mode: Option<TransportMode>,
#[cfg(unix)]
pub(crate) domain_socket_directory: Option<PathBuf>,
}
impl Parameters {
#[must_use]
pub fn new() -> Self {
Parameters {
values: Vec::new(),
listen_mode: None,
transport_mode: None,
#[cfg(unix)]
domain_socket_directory: None,
}
}
pub fn set_transport_mode(&mut self, mode: TransportMode) -> &mut Self {
self.transport_mode = Some(mode);
self
}
#[must_use]
pub fn transport_mode(&self) -> Option<TransportMode> {
self.transport_mode
}
#[cfg(unix)]
pub fn set_domain_socket_directory(&mut self, dir: impl Into<PathBuf>) -> &mut Self {
self.domain_socket_directory = Some(dir.into());
self
}
#[cfg(unix)]
#[must_use]
pub fn domain_socket_directory(&self) -> Option<&Path> {
self.domain_socket_directory.as_deref()
}
pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
let key = key.into();
let value = value.into();
if let Some(entry) = self.values.iter_mut().find(|(k, _)| k == &key) {
entry.1 = value;
} else {
self.values.push((key, value));
}
self
}
pub fn set_listen_mode(&mut self, mode: ListenMode) -> &mut Self {
self.listen_mode = Some(mode);
self
}
#[must_use]
pub fn listen_mode(&self) -> Option<ListenMode> {
self.listen_mode
}
#[must_use]
pub fn get(&self, key: &str) -> Option<&str> {
self.values
.iter()
.find(|(k, _)| k == key)
.map(|(_, v)| v.as_str())
}
#[must_use]
pub fn contains_key(&self, key: &str) -> bool {
self.values.iter().any(|(k, _)| k == key)
}
pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
self.values.iter().map(|(k, v)| (k.as_str(), v.as_str()))
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.values.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parameters() {
let mut params = Parameters::new();
params.set("key1", "value1");
params.set("key2", "value2");
assert_eq!(params.get("key1"), Some("value1"));
assert_eq!(params.get("key2"), Some("value2"));
assert_eq!(params.get("key3"), None);
assert_eq!(params.len(), 2);
}
#[test]
fn test_parameters_update() {
let mut params = Parameters::new();
params.set("key", "value1");
params.set("key", "value2");
assert_eq!(params.get("key"), Some("value2"));
assert_eq!(params.len(), 1);
}
#[test]
fn test_parse_connection_descriptor() {
assert_eq!(
HyperProcess::parse_connection_descriptor("tab.tcp://localhost:12345").unwrap(),
"localhost:12345"
);
assert_eq!(
HyperProcess::parse_connection_descriptor("tab.tcp://127.0.0.1:7483").unwrap(),
"127.0.0.1:7483"
);
assert_eq!(
HyperProcess::parse_connection_descriptor("tcp://localhost:8080").unwrap(),
"localhost:8080"
);
assert_eq!(
HyperProcess::parse_connection_descriptor("localhost:9999").unwrap(),
"localhost:9999"
);
}
#[test]
fn test_parse_connection_descriptor_named_pipe() {
assert_eq!(
HyperProcess::parse_connection_descriptor("tab.pipe://./pipe/hyper-12345").unwrap(),
r"\\.\pipe\hyper-12345"
);
assert_eq!(
HyperProcess::parse_connection_descriptor("tab.pipe://server1/pipe/mydb").unwrap(),
r"\\server1\pipe\mydb"
);
}
#[test]
fn test_parse_connection_descriptor_invalid() {
assert!(HyperProcess::parse_connection_descriptor("").is_err());
assert!(HyperProcess::parse_connection_descriptor("invalid").is_err());
}
#[test]
fn test_parameters_contains_key() {
let mut params = Parameters::new();
params.set("key1", "value1");
assert!(params.contains_key("key1"));
assert!(!params.contains_key("key2"));
}
#[test]
fn test_no_default_parameters_constant() {
assert_eq!(NO_DEFAULT_PARAMETERS, "no_default_parameters");
}
#[test]
fn test_parameters_with_no_defaults() {
let mut params = Parameters::new();
params.set(NO_DEFAULT_PARAMETERS, "");
params.set("init_user", "custom_user");
assert!(params.contains_key(NO_DEFAULT_PARAMETERS));
assert_eq!(params.get("init_user"), Some("custom_user"));
}
}