use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::time::Duration;
use tokio::process::Command;
use crate::cli::RedisCli;
use crate::error::{Error, Result};
#[derive(Debug, Clone)]
pub struct RedisServerConfig {
pub port: u16,
pub bind: String,
pub protected_mode: bool,
pub tcp_backlog: Option<u32>,
pub unixsocket: Option<PathBuf>,
pub unixsocketperm: Option<u32>,
pub timeout: Option<u32>,
pub tcp_keepalive: Option<u32>,
pub tls_port: Option<u16>,
pub tls_cert_file: Option<PathBuf>,
pub tls_key_file: Option<PathBuf>,
pub tls_key_file_pass: Option<String>,
pub tls_ca_cert_file: Option<PathBuf>,
pub tls_ca_cert_dir: Option<PathBuf>,
pub tls_auth_clients: Option<bool>,
pub tls_client_cert_file: Option<PathBuf>,
pub tls_client_key_file: Option<PathBuf>,
pub tls_client_key_file_pass: Option<String>,
pub tls_dh_params_file: Option<PathBuf>,
pub tls_ciphers: Option<String>,
pub tls_ciphersuites: Option<String>,
pub tls_protocols: Option<String>,
pub tls_prefer_server_ciphers: Option<bool>,
pub tls_session_caching: Option<bool>,
pub tls_session_cache_size: Option<u32>,
pub tls_session_cache_timeout: Option<u32>,
pub tls_replication: Option<bool>,
pub tls_cluster: Option<bool>,
pub daemonize: bool,
pub dir: PathBuf,
pub logfile: Option<String>,
pub loglevel: LogLevel,
pub databases: Option<u32>,
pub maxmemory: Option<String>,
pub maxmemory_policy: Option<String>,
pub maxmemory_samples: Option<u32>,
pub maxmemory_clients: Option<String>,
pub maxmemory_eviction_tenacity: Option<u32>,
pub maxclients: Option<u32>,
pub lfu_log_factor: Option<u32>,
pub lfu_decay_time: Option<u32>,
pub active_expire_effort: Option<u32>,
pub lazyfree_lazy_eviction: Option<bool>,
pub lazyfree_lazy_expire: Option<bool>,
pub lazyfree_lazy_server_del: Option<bool>,
pub lazyfree_lazy_user_del: Option<bool>,
pub lazyfree_lazy_user_flush: Option<bool>,
pub save: SavePolicy,
pub appendonly: bool,
pub appendfsync: Option<AppendFsync>,
pub appendfilename: Option<String>,
pub appenddirname: Option<PathBuf>,
pub aof_use_rdb_preamble: Option<bool>,
pub aof_load_truncated: Option<bool>,
pub aof_load_corrupt_tail_max_size: Option<String>,
pub aof_rewrite_incremental_fsync: Option<bool>,
pub aof_timestamp_enabled: Option<bool>,
pub auto_aof_rewrite_percentage: Option<u32>,
pub auto_aof_rewrite_min_size: Option<String>,
pub no_appendfsync_on_rewrite: Option<bool>,
pub replicaof: Option<(String, u16)>,
pub masterauth: Option<String>,
pub masteruser: Option<String>,
pub repl_backlog_size: Option<String>,
pub repl_backlog_ttl: Option<u32>,
pub repl_disable_tcp_nodelay: Option<bool>,
pub repl_diskless_load: Option<ReplDisklessLoad>,
pub repl_diskless_sync: Option<bool>,
pub repl_diskless_sync_delay: Option<u32>,
pub repl_diskless_sync_max_replicas: Option<u32>,
pub repl_ping_replica_period: Option<u32>,
pub repl_timeout: Option<u32>,
pub replica_announce_ip: Option<String>,
pub replica_announce_port: Option<u16>,
pub replica_announced: Option<bool>,
pub replica_full_sync_buffer_limit: Option<String>,
pub replica_ignore_disk_write_errors: Option<bool>,
pub replica_ignore_maxmemory: Option<bool>,
pub replica_lazy_flush: Option<bool>,
pub replica_priority: Option<u32>,
pub replica_read_only: Option<bool>,
pub replica_serve_stale_data: Option<bool>,
pub min_replicas_to_write: Option<u32>,
pub min_replicas_max_lag: Option<u32>,
pub password: Option<String>,
pub acl_file: Option<PathBuf>,
pub cluster_enabled: bool,
pub cluster_node_timeout: Option<u64>,
pub cluster_config_file: Option<PathBuf>,
pub cluster_require_full_coverage: Option<bool>,
pub cluster_allow_reads_when_down: Option<bool>,
pub cluster_allow_pubsubshard_when_down: Option<bool>,
pub cluster_allow_replica_migration: Option<bool>,
pub cluster_migration_barrier: Option<u32>,
pub cluster_replica_no_failover: Option<bool>,
pub cluster_replica_validity_factor: Option<u32>,
pub cluster_announce_ip: Option<String>,
pub cluster_announce_port: Option<u16>,
pub cluster_announce_bus_port: Option<u16>,
pub cluster_announce_tls_port: Option<u16>,
pub cluster_announce_hostname: Option<String>,
pub cluster_announce_human_nodename: Option<String>,
pub cluster_port: Option<u16>,
pub cluster_preferred_endpoint_type: Option<String>,
pub cluster_link_sendbuf_limit: Option<u64>,
pub cluster_compatibility_sample_ratio: Option<u32>,
pub cluster_slot_migration_handoff_max_lag_bytes: Option<u64>,
pub cluster_slot_migration_write_pause_timeout: Option<u64>,
pub cluster_slot_stats_enabled: Option<bool>,
pub hash_max_listpack_entries: Option<u32>,
pub hash_max_listpack_value: Option<u32>,
pub list_max_listpack_size: Option<i32>,
pub list_compress_depth: Option<u32>,
pub set_max_intset_entries: Option<u32>,
pub set_max_listpack_entries: Option<u32>,
pub set_max_listpack_value: Option<u32>,
pub zset_max_listpack_entries: Option<u32>,
pub zset_max_listpack_value: Option<u32>,
pub hll_sparse_max_bytes: Option<u32>,
pub stream_node_max_bytes: Option<u32>,
pub stream_node_max_entries: Option<u32>,
pub stream_idmp_duration: Option<u64>,
pub stream_idmp_maxsize: Option<u64>,
pub loadmodule: Vec<PathBuf>,
pub hz: Option<u32>,
pub io_threads: Option<u32>,
pub io_threads_do_reads: Option<bool>,
pub notify_keyspace_events: Option<String>,
pub slowlog_log_slower_than: Option<i64>,
pub slowlog_max_len: Option<u32>,
pub latency_monitor_threshold: Option<u64>,
pub latency_tracking: Option<bool>,
pub latency_tracking_info_percentiles: Option<String>,
pub activedefrag: Option<bool>,
pub active_defrag_ignore_bytes: Option<String>,
pub active_defrag_threshold_lower: Option<u32>,
pub active_defrag_threshold_upper: Option<u32>,
pub active_defrag_cycle_min: Option<u32>,
pub active_defrag_cycle_max: Option<u32>,
pub active_defrag_max_scan_fields: Option<u32>,
pub syslog_enabled: Option<bool>,
pub syslog_ident: Option<String>,
pub syslog_facility: Option<String>,
pub supervised: Option<String>,
pub always_show_logo: Option<bool>,
pub set_proc_title: Option<bool>,
pub proc_title_template: Option<String>,
pub acl_pubsub_default: Option<String>,
pub acllog_max_len: Option<u32>,
pub enable_debug_command: Option<String>,
pub enable_module_command: Option<String>,
pub enable_protected_configs: Option<String>,
pub rename_command: Vec<(String, String)>,
pub sanitize_dump_payload: Option<String>,
pub hide_user_data_from_log: Option<bool>,
pub bind_source_addr: Option<String>,
pub busy_reply_threshold: Option<u64>,
pub client_output_buffer_limit: Vec<String>,
pub client_query_buffer_limit: Option<String>,
pub proto_max_bulk_len: Option<String>,
pub max_new_connections_per_cycle: Option<u32>,
pub max_new_tls_connections_per_cycle: Option<u32>,
pub socket_mark_id: Option<u32>,
pub dbfilename: Option<String>,
pub rdbcompression: Option<bool>,
pub rdbchecksum: Option<bool>,
pub rdb_save_incremental_fsync: Option<bool>,
pub rdb_del_sync_files: Option<bool>,
pub stop_writes_on_bgsave_error: Option<bool>,
pub shutdown_on_sigint: Option<String>,
pub shutdown_on_sigterm: Option<String>,
pub shutdown_timeout: Option<u32>,
pub activerehashing: Option<bool>,
pub crash_log_enabled: Option<bool>,
pub crash_memcheck_enabled: Option<bool>,
pub disable_thp: Option<bool>,
pub dynamic_hz: Option<bool>,
pub ignore_warnings: Option<String>,
pub include: Vec<PathBuf>,
pub jemalloc_bg_thread: Option<bool>,
pub locale_collate: Option<String>,
pub lua_time_limit: Option<u64>,
pub oom_score_adj: Option<String>,
pub oom_score_adj_values: Option<String>,
pub propagation_error_behavior: Option<String>,
pub tracking_table_max_keys: Option<u64>,
pub extra: HashMap<String, String>,
pub redis_server_bin: String,
pub redis_cli_bin: String,
}
#[derive(Debug, Clone, Copy)]
pub enum AppendFsync {
Always,
Everysec,
No,
}
impl std::fmt::Display for AppendFsync {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AppendFsync::Always => f.write_str("always"),
AppendFsync::Everysec => f.write_str("everysec"),
AppendFsync::No => f.write_str("no"),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum ReplDisklessLoad {
Disabled,
OnEmptyDb,
Swapdb,
}
impl std::fmt::Display for ReplDisklessLoad {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReplDisklessLoad::Disabled => f.write_str("disabled"),
ReplDisklessLoad::OnEmptyDb => f.write_str("on-empty-db"),
ReplDisklessLoad::Swapdb => f.write_str("swapdb"),
}
}
}
#[derive(Debug, Clone, Default)]
pub enum SavePolicy {
#[default]
Disabled,
Default,
Custom(Vec<(u64, u64)>),
}
#[derive(Debug, Clone, Copy)]
pub enum LogLevel {
Debug,
Verbose,
Notice,
Warning,
}
impl std::fmt::Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Debug => f.write_str("debug"),
LogLevel::Verbose => f.write_str("verbose"),
LogLevel::Notice => f.write_str("notice"),
LogLevel::Warning => f.write_str("warning"),
}
}
}
impl Default for RedisServerConfig {
fn default() -> Self {
Self {
port: 6379,
bind: "127.0.0.1".into(),
protected_mode: false,
tcp_backlog: None,
unixsocket: None,
unixsocketperm: None,
timeout: None,
tcp_keepalive: None,
tls_port: None,
tls_cert_file: None,
tls_key_file: None,
tls_key_file_pass: None,
tls_ca_cert_file: None,
tls_ca_cert_dir: None,
tls_auth_clients: None,
tls_client_cert_file: None,
tls_client_key_file: None,
tls_client_key_file_pass: None,
tls_dh_params_file: None,
tls_ciphers: None,
tls_ciphersuites: None,
tls_protocols: None,
tls_prefer_server_ciphers: None,
tls_session_caching: None,
tls_session_cache_size: None,
tls_session_cache_timeout: None,
tls_replication: None,
tls_cluster: None,
daemonize: true,
dir: std::env::temp_dir().join("redis-server-wrapper"),
logfile: None,
loglevel: LogLevel::Notice,
databases: None,
maxmemory: None,
maxmemory_policy: None,
maxmemory_samples: None,
maxmemory_clients: None,
maxmemory_eviction_tenacity: None,
maxclients: None,
lfu_log_factor: None,
lfu_decay_time: None,
active_expire_effort: None,
lazyfree_lazy_eviction: None,
lazyfree_lazy_expire: None,
lazyfree_lazy_server_del: None,
lazyfree_lazy_user_del: None,
lazyfree_lazy_user_flush: None,
save: SavePolicy::Disabled,
appendonly: false,
appendfsync: None,
appendfilename: None,
appenddirname: None,
aof_use_rdb_preamble: None,
aof_load_truncated: None,
aof_load_corrupt_tail_max_size: None,
aof_rewrite_incremental_fsync: None,
aof_timestamp_enabled: None,
auto_aof_rewrite_percentage: None,
auto_aof_rewrite_min_size: None,
no_appendfsync_on_rewrite: None,
replicaof: None,
masterauth: None,
masteruser: None,
repl_backlog_size: None,
repl_backlog_ttl: None,
repl_disable_tcp_nodelay: None,
repl_diskless_load: None,
repl_diskless_sync: None,
repl_diskless_sync_delay: None,
repl_diskless_sync_max_replicas: None,
repl_ping_replica_period: None,
repl_timeout: None,
replica_announce_ip: None,
replica_announce_port: None,
replica_announced: None,
replica_full_sync_buffer_limit: None,
replica_ignore_disk_write_errors: None,
replica_ignore_maxmemory: None,
replica_lazy_flush: None,
replica_priority: None,
replica_read_only: None,
replica_serve_stale_data: None,
min_replicas_to_write: None,
min_replicas_max_lag: None,
password: None,
acl_file: None,
cluster_enabled: false,
cluster_node_timeout: None,
cluster_config_file: None,
cluster_require_full_coverage: None,
cluster_allow_reads_when_down: None,
cluster_allow_pubsubshard_when_down: None,
cluster_allow_replica_migration: None,
cluster_migration_barrier: None,
cluster_replica_no_failover: None,
cluster_replica_validity_factor: None,
cluster_announce_ip: None,
cluster_announce_port: None,
cluster_announce_bus_port: None,
cluster_announce_tls_port: None,
cluster_announce_hostname: None,
cluster_announce_human_nodename: None,
cluster_port: None,
cluster_preferred_endpoint_type: None,
cluster_link_sendbuf_limit: None,
cluster_compatibility_sample_ratio: None,
cluster_slot_migration_handoff_max_lag_bytes: None,
cluster_slot_migration_write_pause_timeout: None,
cluster_slot_stats_enabled: None,
hash_max_listpack_entries: None,
hash_max_listpack_value: None,
list_max_listpack_size: None,
list_compress_depth: None,
set_max_intset_entries: None,
set_max_listpack_entries: None,
set_max_listpack_value: None,
zset_max_listpack_entries: None,
zset_max_listpack_value: None,
hll_sparse_max_bytes: None,
stream_node_max_bytes: None,
stream_node_max_entries: None,
stream_idmp_duration: None,
stream_idmp_maxsize: None,
loadmodule: Vec::new(),
hz: None,
io_threads: None,
io_threads_do_reads: None,
notify_keyspace_events: None,
slowlog_log_slower_than: None,
slowlog_max_len: None,
latency_monitor_threshold: None,
latency_tracking: None,
latency_tracking_info_percentiles: None,
activedefrag: None,
active_defrag_ignore_bytes: None,
active_defrag_threshold_lower: None,
active_defrag_threshold_upper: None,
active_defrag_cycle_min: None,
active_defrag_cycle_max: None,
active_defrag_max_scan_fields: None,
syslog_enabled: None,
syslog_ident: None,
syslog_facility: None,
supervised: None,
always_show_logo: None,
set_proc_title: None,
proc_title_template: None,
acl_pubsub_default: None,
acllog_max_len: None,
enable_debug_command: None,
enable_module_command: None,
enable_protected_configs: None,
rename_command: Vec::new(),
sanitize_dump_payload: None,
hide_user_data_from_log: None,
bind_source_addr: None,
busy_reply_threshold: None,
client_output_buffer_limit: Vec::new(),
client_query_buffer_limit: None,
proto_max_bulk_len: None,
max_new_connections_per_cycle: None,
max_new_tls_connections_per_cycle: None,
socket_mark_id: None,
dbfilename: None,
rdbcompression: None,
rdbchecksum: None,
rdb_save_incremental_fsync: None,
rdb_del_sync_files: None,
stop_writes_on_bgsave_error: None,
shutdown_on_sigint: None,
shutdown_on_sigterm: None,
shutdown_timeout: None,
activerehashing: None,
crash_log_enabled: None,
crash_memcheck_enabled: None,
disable_thp: None,
dynamic_hz: None,
ignore_warnings: None,
include: Vec::new(),
jemalloc_bg_thread: None,
locale_collate: None,
lua_time_limit: None,
oom_score_adj: None,
oom_score_adj_values: None,
propagation_error_behavior: None,
tracking_table_max_keys: None,
extra: HashMap::new(),
redis_server_bin: "redis-server".into(),
redis_cli_bin: "redis-cli".into(),
}
}
}
pub struct RedisServer {
config: RedisServerConfig,
}
impl RedisServer {
pub fn new() -> Self {
Self {
config: RedisServerConfig::default(),
}
}
pub fn port(mut self, port: u16) -> Self {
self.config.port = port;
self
}
pub fn bind(mut self, bind: impl Into<String>) -> Self {
self.config.bind = bind.into();
self
}
pub fn protected_mode(mut self, protected: bool) -> Self {
self.config.protected_mode = protected;
self
}
pub fn tcp_backlog(mut self, backlog: u32) -> Self {
self.config.tcp_backlog = Some(backlog);
self
}
pub fn unixsocket(mut self, path: impl Into<PathBuf>) -> Self {
self.config.unixsocket = Some(path.into());
self
}
pub fn unixsocketperm(mut self, perm: u32) -> Self {
self.config.unixsocketperm = Some(perm);
self
}
pub fn timeout(mut self, seconds: u32) -> Self {
self.config.timeout = Some(seconds);
self
}
pub fn tcp_keepalive(mut self, seconds: u32) -> Self {
self.config.tcp_keepalive = Some(seconds);
self
}
pub fn tls_port(mut self, port: u16) -> Self {
self.config.tls_port = Some(port);
self
}
pub fn tls_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.tls_cert_file = Some(path.into());
self
}
pub fn tls_key_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.tls_key_file = Some(path.into());
self
}
pub fn tls_ca_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.tls_ca_cert_file = Some(path.into());
self
}
pub fn tls_auth_clients(mut self, require: bool) -> Self {
self.config.tls_auth_clients = Some(require);
self
}
pub fn tls_key_file_pass(mut self, pass: impl Into<String>) -> Self {
self.config.tls_key_file_pass = Some(pass.into());
self
}
pub fn tls_ca_cert_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.config.tls_ca_cert_dir = Some(path.into());
self
}
pub fn tls_client_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.tls_client_cert_file = Some(path.into());
self
}
pub fn tls_client_key_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.tls_client_key_file = Some(path.into());
self
}
pub fn tls_client_key_file_pass(mut self, pass: impl Into<String>) -> Self {
self.config.tls_client_key_file_pass = Some(pass.into());
self
}
pub fn tls_dh_params_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.tls_dh_params_file = Some(path.into());
self
}
pub fn tls_ciphers(mut self, ciphers: impl Into<String>) -> Self {
self.config.tls_ciphers = Some(ciphers.into());
self
}
pub fn tls_ciphersuites(mut self, suites: impl Into<String>) -> Self {
self.config.tls_ciphersuites = Some(suites.into());
self
}
pub fn tls_protocols(mut self, protocols: impl Into<String>) -> Self {
self.config.tls_protocols = Some(protocols.into());
self
}
pub fn tls_prefer_server_ciphers(mut self, prefer: bool) -> Self {
self.config.tls_prefer_server_ciphers = Some(prefer);
self
}
pub fn tls_session_caching(mut self, enable: bool) -> Self {
self.config.tls_session_caching = Some(enable);
self
}
pub fn tls_session_cache_size(mut self, size: u32) -> Self {
self.config.tls_session_cache_size = Some(size);
self
}
pub fn tls_session_cache_timeout(mut self, seconds: u32) -> Self {
self.config.tls_session_cache_timeout = Some(seconds);
self
}
pub fn tls_replication(mut self, enable: bool) -> Self {
self.config.tls_replication = Some(enable);
self
}
pub fn tls_cluster(mut self, enable: bool) -> Self {
self.config.tls_cluster = Some(enable);
self
}
pub fn dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.config.dir = dir.into();
self
}
pub fn loglevel(mut self, level: LogLevel) -> Self {
self.config.loglevel = level;
self
}
pub fn logfile(mut self, path: impl Into<String>) -> Self {
self.config.logfile = Some(path.into());
self
}
pub fn databases(mut self, n: u32) -> Self {
self.config.databases = Some(n);
self
}
pub fn maxmemory(mut self, limit: impl Into<String>) -> Self {
self.config.maxmemory = Some(limit.into());
self
}
pub fn maxmemory_policy(mut self, policy: impl Into<String>) -> Self {
self.config.maxmemory_policy = Some(policy.into());
self
}
pub fn maxmemory_samples(mut self, n: u32) -> Self {
self.config.maxmemory_samples = Some(n);
self
}
pub fn maxmemory_clients(mut self, limit: impl Into<String>) -> Self {
self.config.maxmemory_clients = Some(limit.into());
self
}
pub fn maxmemory_eviction_tenacity(mut self, tenacity: u32) -> Self {
self.config.maxmemory_eviction_tenacity = Some(tenacity);
self
}
pub fn maxclients(mut self, n: u32) -> Self {
self.config.maxclients = Some(n);
self
}
pub fn lfu_log_factor(mut self, factor: u32) -> Self {
self.config.lfu_log_factor = Some(factor);
self
}
pub fn lfu_decay_time(mut self, minutes: u32) -> Self {
self.config.lfu_decay_time = Some(minutes);
self
}
pub fn active_expire_effort(mut self, effort: u32) -> Self {
self.config.active_expire_effort = Some(effort);
self
}
pub fn lazyfree_lazy_eviction(mut self, enable: bool) -> Self {
self.config.lazyfree_lazy_eviction = Some(enable);
self
}
pub fn lazyfree_lazy_expire(mut self, enable: bool) -> Self {
self.config.lazyfree_lazy_expire = Some(enable);
self
}
pub fn lazyfree_lazy_server_del(mut self, enable: bool) -> Self {
self.config.lazyfree_lazy_server_del = Some(enable);
self
}
pub fn lazyfree_lazy_user_del(mut self, enable: bool) -> Self {
self.config.lazyfree_lazy_user_del = Some(enable);
self
}
pub fn lazyfree_lazy_user_flush(mut self, enable: bool) -> Self {
self.config.lazyfree_lazy_user_flush = Some(enable);
self
}
pub fn save(mut self, save: bool) -> Self {
self.config.save = if save {
SavePolicy::Default
} else {
SavePolicy::Disabled
};
self
}
pub fn save_schedule(mut self, schedule: Vec<(u64, u64)>) -> Self {
self.config.save = SavePolicy::Custom(schedule);
self
}
pub fn appendonly(mut self, appendonly: bool) -> Self {
self.config.appendonly = appendonly;
self
}
pub fn appendfsync(mut self, policy: AppendFsync) -> Self {
self.config.appendfsync = Some(policy);
self
}
pub fn appendfilename(mut self, name: impl Into<String>) -> Self {
self.config.appendfilename = Some(name.into());
self
}
pub fn appenddirname(mut self, name: impl Into<PathBuf>) -> Self {
self.config.appenddirname = Some(name.into());
self
}
pub fn aof_use_rdb_preamble(mut self, enable: bool) -> Self {
self.config.aof_use_rdb_preamble = Some(enable);
self
}
pub fn aof_load_truncated(mut self, enable: bool) -> Self {
self.config.aof_load_truncated = Some(enable);
self
}
pub fn aof_load_corrupt_tail_max_size(mut self, size: impl Into<String>) -> Self {
self.config.aof_load_corrupt_tail_max_size = Some(size.into());
self
}
pub fn aof_rewrite_incremental_fsync(mut self, enable: bool) -> Self {
self.config.aof_rewrite_incremental_fsync = Some(enable);
self
}
pub fn aof_timestamp_enabled(mut self, enable: bool) -> Self {
self.config.aof_timestamp_enabled = Some(enable);
self
}
pub fn auto_aof_rewrite_percentage(mut self, pct: u32) -> Self {
self.config.auto_aof_rewrite_percentage = Some(pct);
self
}
pub fn auto_aof_rewrite_min_size(mut self, size: impl Into<String>) -> Self {
self.config.auto_aof_rewrite_min_size = Some(size.into());
self
}
pub fn no_appendfsync_on_rewrite(mut self, enable: bool) -> Self {
self.config.no_appendfsync_on_rewrite = Some(enable);
self
}
pub fn replicaof(mut self, host: impl Into<String>, port: u16) -> Self {
self.config.replicaof = Some((host.into(), port));
self
}
pub fn masterauth(mut self, password: impl Into<String>) -> Self {
self.config.masterauth = Some(password.into());
self
}
pub fn masteruser(mut self, user: impl Into<String>) -> Self {
self.config.masteruser = Some(user.into());
self
}
pub fn repl_backlog_size(mut self, size: impl Into<String>) -> Self {
self.config.repl_backlog_size = Some(size.into());
self
}
pub fn repl_backlog_ttl(mut self, seconds: u32) -> Self {
self.config.repl_backlog_ttl = Some(seconds);
self
}
pub fn repl_disable_tcp_nodelay(mut self, disable: bool) -> Self {
self.config.repl_disable_tcp_nodelay = Some(disable);
self
}
pub fn repl_diskless_load(mut self, policy: ReplDisklessLoad) -> Self {
self.config.repl_diskless_load = Some(policy);
self
}
pub fn repl_diskless_sync(mut self, enable: bool) -> Self {
self.config.repl_diskless_sync = Some(enable);
self
}
pub fn repl_diskless_sync_delay(mut self, seconds: u32) -> Self {
self.config.repl_diskless_sync_delay = Some(seconds);
self
}
pub fn repl_diskless_sync_max_replicas(mut self, n: u32) -> Self {
self.config.repl_diskless_sync_max_replicas = Some(n);
self
}
pub fn repl_ping_replica_period(mut self, seconds: u32) -> Self {
self.config.repl_ping_replica_period = Some(seconds);
self
}
pub fn repl_timeout(mut self, seconds: u32) -> Self {
self.config.repl_timeout = Some(seconds);
self
}
pub fn replica_announce_ip(mut self, ip: impl Into<String>) -> Self {
self.config.replica_announce_ip = Some(ip.into());
self
}
pub fn replica_announce_port(mut self, port: u16) -> Self {
self.config.replica_announce_port = Some(port);
self
}
pub fn replica_announced(mut self, announced: bool) -> Self {
self.config.replica_announced = Some(announced);
self
}
pub fn replica_full_sync_buffer_limit(mut self, size: impl Into<String>) -> Self {
self.config.replica_full_sync_buffer_limit = Some(size.into());
self
}
pub fn replica_ignore_disk_write_errors(mut self, ignore: bool) -> Self {
self.config.replica_ignore_disk_write_errors = Some(ignore);
self
}
pub fn replica_ignore_maxmemory(mut self, ignore: bool) -> Self {
self.config.replica_ignore_maxmemory = Some(ignore);
self
}
pub fn replica_lazy_flush(mut self, enable: bool) -> Self {
self.config.replica_lazy_flush = Some(enable);
self
}
pub fn replica_priority(mut self, priority: u32) -> Self {
self.config.replica_priority = Some(priority);
self
}
pub fn replica_read_only(mut self, read_only: bool) -> Self {
self.config.replica_read_only = Some(read_only);
self
}
pub fn replica_serve_stale_data(mut self, serve: bool) -> Self {
self.config.replica_serve_stale_data = Some(serve);
self
}
pub fn min_replicas_to_write(mut self, n: u32) -> Self {
self.config.min_replicas_to_write = Some(n);
self
}
pub fn min_replicas_max_lag(mut self, seconds: u32) -> Self {
self.config.min_replicas_max_lag = Some(seconds);
self
}
pub fn password(mut self, password: impl Into<String>) -> Self {
self.config.password = Some(password.into());
self
}
pub fn acl_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.acl_file = Some(path.into());
self
}
pub fn cluster_enabled(mut self, enabled: bool) -> Self {
self.config.cluster_enabled = enabled;
self
}
pub fn cluster_node_timeout(mut self, ms: u64) -> Self {
self.config.cluster_node_timeout = Some(ms);
self
}
pub fn cluster_config_file(mut self, path: impl Into<PathBuf>) -> Self {
self.config.cluster_config_file = Some(path.into());
self
}
pub fn cluster_require_full_coverage(mut self, require: bool) -> Self {
self.config.cluster_require_full_coverage = Some(require);
self
}
pub fn cluster_allow_reads_when_down(mut self, allow: bool) -> Self {
self.config.cluster_allow_reads_when_down = Some(allow);
self
}
pub fn cluster_allow_pubsubshard_when_down(mut self, allow: bool) -> Self {
self.config.cluster_allow_pubsubshard_when_down = Some(allow);
self
}
pub fn cluster_allow_replica_migration(mut self, allow: bool) -> Self {
self.config.cluster_allow_replica_migration = Some(allow);
self
}
pub fn cluster_migration_barrier(mut self, barrier: u32) -> Self {
self.config.cluster_migration_barrier = Some(barrier);
self
}
pub fn cluster_replica_no_failover(mut self, no_failover: bool) -> Self {
self.config.cluster_replica_no_failover = Some(no_failover);
self
}
pub fn cluster_replica_validity_factor(mut self, factor: u32) -> Self {
self.config.cluster_replica_validity_factor = Some(factor);
self
}
pub fn cluster_announce_ip(mut self, ip: impl Into<String>) -> Self {
self.config.cluster_announce_ip = Some(ip.into());
self
}
pub fn cluster_announce_port(mut self, port: u16) -> Self {
self.config.cluster_announce_port = Some(port);
self
}
pub fn cluster_announce_bus_port(mut self, port: u16) -> Self {
self.config.cluster_announce_bus_port = Some(port);
self
}
pub fn cluster_announce_tls_port(mut self, port: u16) -> Self {
self.config.cluster_announce_tls_port = Some(port);
self
}
pub fn cluster_announce_hostname(mut self, hostname: impl Into<String>) -> Self {
self.config.cluster_announce_hostname = Some(hostname.into());
self
}
pub fn cluster_announce_human_nodename(mut self, name: impl Into<String>) -> Self {
self.config.cluster_announce_human_nodename = Some(name.into());
self
}
pub fn cluster_port(mut self, port: u16) -> Self {
self.config.cluster_port = Some(port);
self
}
pub fn cluster_preferred_endpoint_type(mut self, endpoint_type: impl Into<String>) -> Self {
self.config.cluster_preferred_endpoint_type = Some(endpoint_type.into());
self
}
pub fn cluster_link_sendbuf_limit(mut self, limit: u64) -> Self {
self.config.cluster_link_sendbuf_limit = Some(limit);
self
}
pub fn cluster_compatibility_sample_ratio(mut self, ratio: u32) -> Self {
self.config.cluster_compatibility_sample_ratio = Some(ratio);
self
}
pub fn cluster_slot_migration_handoff_max_lag_bytes(mut self, bytes: u64) -> Self {
self.config.cluster_slot_migration_handoff_max_lag_bytes = Some(bytes);
self
}
pub fn cluster_slot_migration_write_pause_timeout(mut self, ms: u64) -> Self {
self.config.cluster_slot_migration_write_pause_timeout = Some(ms);
self
}
pub fn cluster_slot_stats_enabled(mut self, enable: bool) -> Self {
self.config.cluster_slot_stats_enabled = Some(enable);
self
}
pub fn hash_max_listpack_entries(mut self, n: u32) -> Self {
self.config.hash_max_listpack_entries = Some(n);
self
}
pub fn hash_max_listpack_value(mut self, n: u32) -> Self {
self.config.hash_max_listpack_value = Some(n);
self
}
pub fn list_max_listpack_size(mut self, n: i32) -> Self {
self.config.list_max_listpack_size = Some(n);
self
}
pub fn list_compress_depth(mut self, n: u32) -> Self {
self.config.list_compress_depth = Some(n);
self
}
pub fn set_max_intset_entries(mut self, n: u32) -> Self {
self.config.set_max_intset_entries = Some(n);
self
}
pub fn set_max_listpack_entries(mut self, n: u32) -> Self {
self.config.set_max_listpack_entries = Some(n);
self
}
pub fn set_max_listpack_value(mut self, n: u32) -> Self {
self.config.set_max_listpack_value = Some(n);
self
}
pub fn zset_max_listpack_entries(mut self, n: u32) -> Self {
self.config.zset_max_listpack_entries = Some(n);
self
}
pub fn zset_max_listpack_value(mut self, n: u32) -> Self {
self.config.zset_max_listpack_value = Some(n);
self
}
pub fn hll_sparse_max_bytes(mut self, n: u32) -> Self {
self.config.hll_sparse_max_bytes = Some(n);
self
}
pub fn stream_node_max_bytes(mut self, n: u32) -> Self {
self.config.stream_node_max_bytes = Some(n);
self
}
pub fn stream_node_max_entries(mut self, n: u32) -> Self {
self.config.stream_node_max_entries = Some(n);
self
}
pub fn stream_idmp_duration(mut self, ms: u64) -> Self {
self.config.stream_idmp_duration = Some(ms);
self
}
pub fn stream_idmp_maxsize(mut self, n: u64) -> Self {
self.config.stream_idmp_maxsize = Some(n);
self
}
pub fn loadmodule(mut self, path: impl Into<PathBuf>) -> Self {
self.config.loadmodule.push(path.into());
self
}
pub fn hz(mut self, hz: u32) -> Self {
self.config.hz = Some(hz);
self
}
pub fn io_threads(mut self, n: u32) -> Self {
self.config.io_threads = Some(n);
self
}
pub fn io_threads_do_reads(mut self, enable: bool) -> Self {
self.config.io_threads_do_reads = Some(enable);
self
}
pub fn notify_keyspace_events(mut self, events: impl Into<String>) -> Self {
self.config.notify_keyspace_events = Some(events.into());
self
}
pub fn slowlog_log_slower_than(mut self, us: i64) -> Self {
self.config.slowlog_log_slower_than = Some(us);
self
}
pub fn slowlog_max_len(mut self, n: u32) -> Self {
self.config.slowlog_max_len = Some(n);
self
}
pub fn latency_monitor_threshold(mut self, ms: u64) -> Self {
self.config.latency_monitor_threshold = Some(ms);
self
}
pub fn latency_tracking(mut self, enable: bool) -> Self {
self.config.latency_tracking = Some(enable);
self
}
pub fn latency_tracking_info_percentiles(mut self, percentiles: impl Into<String>) -> Self {
self.config.latency_tracking_info_percentiles = Some(percentiles.into());
self
}
pub fn activedefrag(mut self, enable: bool) -> Self {
self.config.activedefrag = Some(enable);
self
}
pub fn active_defrag_ignore_bytes(mut self, bytes: impl Into<String>) -> Self {
self.config.active_defrag_ignore_bytes = Some(bytes.into());
self
}
pub fn active_defrag_threshold_lower(mut self, pct: u32) -> Self {
self.config.active_defrag_threshold_lower = Some(pct);
self
}
pub fn active_defrag_threshold_upper(mut self, pct: u32) -> Self {
self.config.active_defrag_threshold_upper = Some(pct);
self
}
pub fn active_defrag_cycle_min(mut self, pct: u32) -> Self {
self.config.active_defrag_cycle_min = Some(pct);
self
}
pub fn active_defrag_cycle_max(mut self, pct: u32) -> Self {
self.config.active_defrag_cycle_max = Some(pct);
self
}
pub fn active_defrag_max_scan_fields(mut self, n: u32) -> Self {
self.config.active_defrag_max_scan_fields = Some(n);
self
}
pub fn syslog_enabled(mut self, enable: bool) -> Self {
self.config.syslog_enabled = Some(enable);
self
}
pub fn syslog_ident(mut self, ident: impl Into<String>) -> Self {
self.config.syslog_ident = Some(ident.into());
self
}
pub fn syslog_facility(mut self, facility: impl Into<String>) -> Self {
self.config.syslog_facility = Some(facility.into());
self
}
pub fn supervised(mut self, mode: impl Into<String>) -> Self {
self.config.supervised = Some(mode.into());
self
}
pub fn always_show_logo(mut self, enable: bool) -> Self {
self.config.always_show_logo = Some(enable);
self
}
pub fn set_proc_title(mut self, enable: bool) -> Self {
self.config.set_proc_title = Some(enable);
self
}
pub fn proc_title_template(mut self, template: impl Into<String>) -> Self {
self.config.proc_title_template = Some(template.into());
self
}
pub fn acl_pubsub_default(mut self, default: impl Into<String>) -> Self {
self.config.acl_pubsub_default = Some(default.into());
self
}
pub fn acllog_max_len(mut self, n: u32) -> Self {
self.config.acllog_max_len = Some(n);
self
}
pub fn enable_debug_command(mut self, mode: impl Into<String>) -> Self {
self.config.enable_debug_command = Some(mode.into());
self
}
pub fn enable_module_command(mut self, mode: impl Into<String>) -> Self {
self.config.enable_module_command = Some(mode.into());
self
}
pub fn enable_protected_configs(mut self, mode: impl Into<String>) -> Self {
self.config.enable_protected_configs = Some(mode.into());
self
}
pub fn rename_command(
mut self,
command: impl Into<String>,
new_name: impl Into<String>,
) -> Self {
self.config
.rename_command
.push((command.into(), new_name.into()));
self
}
pub fn sanitize_dump_payload(mut self, mode: impl Into<String>) -> Self {
self.config.sanitize_dump_payload = Some(mode.into());
self
}
pub fn hide_user_data_from_log(mut self, enable: bool) -> Self {
self.config.hide_user_data_from_log = Some(enable);
self
}
pub fn bind_source_addr(mut self, addr: impl Into<String>) -> Self {
self.config.bind_source_addr = Some(addr.into());
self
}
pub fn busy_reply_threshold(mut self, ms: u64) -> Self {
self.config.busy_reply_threshold = Some(ms);
self
}
pub fn client_output_buffer_limit(mut self, limit: impl Into<String>) -> Self {
self.config.client_output_buffer_limit.push(limit.into());
self
}
pub fn client_query_buffer_limit(mut self, limit: impl Into<String>) -> Self {
self.config.client_query_buffer_limit = Some(limit.into());
self
}
pub fn proto_max_bulk_len(mut self, len: impl Into<String>) -> Self {
self.config.proto_max_bulk_len = Some(len.into());
self
}
pub fn max_new_connections_per_cycle(mut self, n: u32) -> Self {
self.config.max_new_connections_per_cycle = Some(n);
self
}
pub fn max_new_tls_connections_per_cycle(mut self, n: u32) -> Self {
self.config.max_new_tls_connections_per_cycle = Some(n);
self
}
pub fn socket_mark_id(mut self, id: u32) -> Self {
self.config.socket_mark_id = Some(id);
self
}
pub fn dbfilename(mut self, name: impl Into<String>) -> Self {
self.config.dbfilename = Some(name.into());
self
}
pub fn rdbcompression(mut self, enable: bool) -> Self {
self.config.rdbcompression = Some(enable);
self
}
pub fn rdbchecksum(mut self, enable: bool) -> Self {
self.config.rdbchecksum = Some(enable);
self
}
pub fn rdb_save_incremental_fsync(mut self, enable: bool) -> Self {
self.config.rdb_save_incremental_fsync = Some(enable);
self
}
pub fn rdb_del_sync_files(mut self, enable: bool) -> Self {
self.config.rdb_del_sync_files = Some(enable);
self
}
pub fn stop_writes_on_bgsave_error(mut self, enable: bool) -> Self {
self.config.stop_writes_on_bgsave_error = Some(enable);
self
}
pub fn shutdown_on_sigint(mut self, behavior: impl Into<String>) -> Self {
self.config.shutdown_on_sigint = Some(behavior.into());
self
}
pub fn shutdown_on_sigterm(mut self, behavior: impl Into<String>) -> Self {
self.config.shutdown_on_sigterm = Some(behavior.into());
self
}
pub fn shutdown_timeout(mut self, seconds: u32) -> Self {
self.config.shutdown_timeout = Some(seconds);
self
}
pub fn activerehashing(mut self, enable: bool) -> Self {
self.config.activerehashing = Some(enable);
self
}
pub fn crash_log_enabled(mut self, enable: bool) -> Self {
self.config.crash_log_enabled = Some(enable);
self
}
pub fn crash_memcheck_enabled(mut self, enable: bool) -> Self {
self.config.crash_memcheck_enabled = Some(enable);
self
}
pub fn disable_thp(mut self, enable: bool) -> Self {
self.config.disable_thp = Some(enable);
self
}
pub fn dynamic_hz(mut self, enable: bool) -> Self {
self.config.dynamic_hz = Some(enable);
self
}
pub fn ignore_warnings(mut self, warning: impl Into<String>) -> Self {
self.config.ignore_warnings = Some(warning.into());
self
}
pub fn include(mut self, path: impl Into<PathBuf>) -> Self {
self.config.include.push(path.into());
self
}
pub fn jemalloc_bg_thread(mut self, enable: bool) -> Self {
self.config.jemalloc_bg_thread = Some(enable);
self
}
pub fn locale_collate(mut self, locale: impl Into<String>) -> Self {
self.config.locale_collate = Some(locale.into());
self
}
pub fn lua_time_limit(mut self, ms: u64) -> Self {
self.config.lua_time_limit = Some(ms);
self
}
pub fn oom_score_adj(mut self, mode: impl Into<String>) -> Self {
self.config.oom_score_adj = Some(mode.into());
self
}
pub fn oom_score_adj_values(mut self, values: impl Into<String>) -> Self {
self.config.oom_score_adj_values = Some(values.into());
self
}
pub fn propagation_error_behavior(mut self, behavior: impl Into<String>) -> Self {
self.config.propagation_error_behavior = Some(behavior.into());
self
}
pub fn tracking_table_max_keys(mut self, n: u64) -> Self {
self.config.tracking_table_max_keys = Some(n);
self
}
pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
self.config.redis_server_bin = bin.into();
self
}
pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
self.config.redis_cli_bin = bin.into();
self
}
pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.config.extra.insert(key.into(), value.into());
self
}
pub async fn start(self) -> Result<RedisServerHandle> {
if which::which(&self.config.redis_server_bin).is_err() {
return Err(Error::BinaryNotFound {
binary: self.config.redis_server_bin.clone(),
});
}
if which::which(&self.config.redis_cli_bin).is_err() {
return Err(Error::BinaryNotFound {
binary: self.config.redis_cli_bin.clone(),
});
}
let node_dir = self.config.dir.join(format!("node-{}", self.config.port));
fs::create_dir_all(&node_dir)?;
let conf_path = node_dir.join("redis.conf");
let conf_content = self.generate_config(&node_dir);
fs::write(&conf_path, conf_content)?;
let status = Command::new(&self.config.redis_server_bin)
.arg(&conf_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.await?;
if !status.success() {
return Err(Error::ServerStart {
port: self.config.port,
});
}
let mut cli = RedisCli::new()
.bin(&self.config.redis_cli_bin)
.host(&self.config.bind)
.port(self.config.port);
if let Some(ref pw) = self.config.password {
cli = cli.password(pw);
}
cli.wait_for_ready(Duration::from_secs(10)).await?;
let pid_path = node_dir.join("redis.pid");
let pid: u32 = fs::read_to_string(&pid_path)
.map_err(Error::Io)?
.trim()
.parse()
.map_err(|_| Error::ServerStart {
port: self.config.port,
})?;
Ok(RedisServerHandle {
config: self.config,
cli,
pid,
detached: false,
})
}
fn generate_config(&self, node_dir: &std::path::Path) -> String {
let yn = |b: bool| if b { "yes" } else { "no" };
let mut conf = format!(
"port {port}\n\
bind {bind}\n\
daemonize {daemonize}\n\
pidfile {dir}/redis.pid\n\
dir {dir}\n\
loglevel {level}\n\
protected-mode {protected}\n",
port = self.config.port,
bind = self.config.bind,
daemonize = yn(self.config.daemonize),
dir = node_dir.display(),
level = self.config.loglevel,
protected = yn(self.config.protected_mode),
);
let logfile = self
.config
.logfile
.as_deref()
.map(str::to_owned)
.unwrap_or_else(|| format!("{}/redis.log", node_dir.display()));
conf.push_str(&format!("logfile {logfile}\n"));
if let Some(backlog) = self.config.tcp_backlog {
conf.push_str(&format!("tcp-backlog {backlog}\n"));
}
if let Some(ref path) = self.config.unixsocket {
conf.push_str(&format!("unixsocket {}\n", path.display()));
}
if let Some(perm) = self.config.unixsocketperm {
conf.push_str(&format!("unixsocketperm {perm}\n"));
}
if let Some(t) = self.config.timeout {
conf.push_str(&format!("timeout {t}\n"));
}
if let Some(ka) = self.config.tcp_keepalive {
conf.push_str(&format!("tcp-keepalive {ka}\n"));
}
if let Some(port) = self.config.tls_port {
conf.push_str(&format!("tls-port {port}\n"));
}
if let Some(ref path) = self.config.tls_cert_file {
conf.push_str(&format!("tls-cert-file {}\n", path.display()));
}
if let Some(ref path) = self.config.tls_key_file {
conf.push_str(&format!("tls-key-file {}\n", path.display()));
}
if let Some(ref pass) = self.config.tls_key_file_pass {
conf.push_str(&format!("tls-key-file-pass {pass}\n"));
}
if let Some(ref path) = self.config.tls_ca_cert_file {
conf.push_str(&format!("tls-ca-cert-file {}\n", path.display()));
}
if let Some(ref path) = self.config.tls_ca_cert_dir {
conf.push_str(&format!("tls-ca-cert-dir {}\n", path.display()));
}
if let Some(auth) = self.config.tls_auth_clients {
conf.push_str(&format!("tls-auth-clients {}\n", yn(auth)));
}
if let Some(ref path) = self.config.tls_client_cert_file {
conf.push_str(&format!("tls-client-cert-file {}\n", path.display()));
}
if let Some(ref path) = self.config.tls_client_key_file {
conf.push_str(&format!("tls-client-key-file {}\n", path.display()));
}
if let Some(ref pass) = self.config.tls_client_key_file_pass {
conf.push_str(&format!("tls-client-key-file-pass {pass}\n"));
}
if let Some(ref path) = self.config.tls_dh_params_file {
conf.push_str(&format!("tls-dh-params-file {}\n", path.display()));
}
if let Some(ref ciphers) = self.config.tls_ciphers {
conf.push_str(&format!("tls-ciphers {ciphers}\n"));
}
if let Some(ref suites) = self.config.tls_ciphersuites {
conf.push_str(&format!("tls-ciphersuites {suites}\n"));
}
if let Some(ref protocols) = self.config.tls_protocols {
conf.push_str(&format!("tls-protocols {protocols}\n"));
}
if let Some(v) = self.config.tls_prefer_server_ciphers {
conf.push_str(&format!("tls-prefer-server-ciphers {}\n", yn(v)));
}
if let Some(v) = self.config.tls_session_caching {
conf.push_str(&format!("tls-session-caching {}\n", yn(v)));
}
if let Some(size) = self.config.tls_session_cache_size {
conf.push_str(&format!("tls-session-cache-size {size}\n"));
}
if let Some(timeout) = self.config.tls_session_cache_timeout {
conf.push_str(&format!("tls-session-cache-timeout {timeout}\n"));
}
if let Some(v) = self.config.tls_replication {
conf.push_str(&format!("tls-replication {}\n", yn(v)));
}
if let Some(v) = self.config.tls_cluster {
conf.push_str(&format!("tls-cluster {}\n", yn(v)));
}
if let Some(n) = self.config.databases {
conf.push_str(&format!("databases {n}\n"));
}
if let Some(ref limit) = self.config.maxmemory {
conf.push_str(&format!("maxmemory {limit}\n"));
}
if let Some(ref policy) = self.config.maxmemory_policy {
conf.push_str(&format!("maxmemory-policy {policy}\n"));
}
if let Some(n) = self.config.maxmemory_samples {
conf.push_str(&format!("maxmemory-samples {n}\n"));
}
if let Some(ref limit) = self.config.maxmemory_clients {
conf.push_str(&format!("maxmemory-clients {limit}\n"));
}
if let Some(n) = self.config.maxmemory_eviction_tenacity {
conf.push_str(&format!("maxmemory-eviction-tenacity {n}\n"));
}
if let Some(n) = self.config.maxclients {
conf.push_str(&format!("maxclients {n}\n"));
}
if let Some(n) = self.config.lfu_log_factor {
conf.push_str(&format!("lfu-log-factor {n}\n"));
}
if let Some(n) = self.config.lfu_decay_time {
conf.push_str(&format!("lfu-decay-time {n}\n"));
}
if let Some(n) = self.config.active_expire_effort {
conf.push_str(&format!("active-expire-effort {n}\n"));
}
if let Some(v) = self.config.lazyfree_lazy_eviction {
conf.push_str(&format!("lazyfree-lazy-eviction {}\n", yn(v)));
}
if let Some(v) = self.config.lazyfree_lazy_expire {
conf.push_str(&format!("lazyfree-lazy-expire {}\n", yn(v)));
}
if let Some(v) = self.config.lazyfree_lazy_server_del {
conf.push_str(&format!("lazyfree-lazy-server-del {}\n", yn(v)));
}
if let Some(v) = self.config.lazyfree_lazy_user_del {
conf.push_str(&format!("lazyfree-lazy-user-del {}\n", yn(v)));
}
if let Some(v) = self.config.lazyfree_lazy_user_flush {
conf.push_str(&format!("lazyfree-lazy-user-flush {}\n", yn(v)));
}
match &self.config.save {
SavePolicy::Disabled => conf.push_str("save \"\"\n"),
SavePolicy::Default => {}
SavePolicy::Custom(pairs) => {
for (secs, changes) in pairs {
conf.push_str(&format!("save {secs} {changes}\n"));
}
}
}
if self.config.appendonly {
conf.push_str("appendonly yes\n");
}
if let Some(ref policy) = self.config.appendfsync {
conf.push_str(&format!("appendfsync {policy}\n"));
}
if let Some(ref name) = self.config.appendfilename {
conf.push_str(&format!("appendfilename \"{name}\"\n"));
}
if let Some(ref name) = self.config.appenddirname {
conf.push_str(&format!("appenddirname \"{}\"\n", name.display()));
}
if let Some(v) = self.config.aof_use_rdb_preamble {
conf.push_str(&format!("aof-use-rdb-preamble {}\n", yn(v)));
}
if let Some(v) = self.config.aof_load_truncated {
conf.push_str(&format!("aof-load-truncated {}\n", yn(v)));
}
if let Some(ref size) = self.config.aof_load_corrupt_tail_max_size {
conf.push_str(&format!("aof-load-corrupt-tail-max-size {size}\n"));
}
if let Some(v) = self.config.aof_rewrite_incremental_fsync {
conf.push_str(&format!("aof-rewrite-incremental-fsync {}\n", yn(v)));
}
if let Some(v) = self.config.aof_timestamp_enabled {
conf.push_str(&format!("aof-timestamp-enabled {}\n", yn(v)));
}
if let Some(pct) = self.config.auto_aof_rewrite_percentage {
conf.push_str(&format!("auto-aof-rewrite-percentage {pct}\n"));
}
if let Some(ref size) = self.config.auto_aof_rewrite_min_size {
conf.push_str(&format!("auto-aof-rewrite-min-size {size}\n"));
}
if let Some(v) = self.config.no_appendfsync_on_rewrite {
conf.push_str(&format!("no-appendfsync-on-rewrite {}\n", yn(v)));
}
if let Some((ref host, port)) = self.config.replicaof {
conf.push_str(&format!("replicaof {host} {port}\n"));
}
if let Some(ref pw) = self.config.masterauth {
conf.push_str(&format!("masterauth {pw}\n"));
}
if let Some(ref user) = self.config.masteruser {
conf.push_str(&format!("masteruser {user}\n"));
}
if let Some(ref size) = self.config.repl_backlog_size {
conf.push_str(&format!("repl-backlog-size {size}\n"));
}
if let Some(ttl) = self.config.repl_backlog_ttl {
conf.push_str(&format!("repl-backlog-ttl {ttl}\n"));
}
if let Some(v) = self.config.repl_disable_tcp_nodelay {
conf.push_str(&format!("repl-disable-tcp-nodelay {}\n", yn(v)));
}
if let Some(ref policy) = self.config.repl_diskless_load {
conf.push_str(&format!("repl-diskless-load {policy}\n"));
}
if let Some(v) = self.config.repl_diskless_sync {
conf.push_str(&format!("repl-diskless-sync {}\n", yn(v)));
}
if let Some(delay) = self.config.repl_diskless_sync_delay {
conf.push_str(&format!("repl-diskless-sync-delay {delay}\n"));
}
if let Some(n) = self.config.repl_diskless_sync_max_replicas {
conf.push_str(&format!("repl-diskless-sync-max-replicas {n}\n"));
}
if let Some(period) = self.config.repl_ping_replica_period {
conf.push_str(&format!("repl-ping-replica-period {period}\n"));
}
if let Some(t) = self.config.repl_timeout {
conf.push_str(&format!("repl-timeout {t}\n"));
}
if let Some(ref ip) = self.config.replica_announce_ip {
conf.push_str(&format!("replica-announce-ip {ip}\n"));
}
if let Some(port) = self.config.replica_announce_port {
conf.push_str(&format!("replica-announce-port {port}\n"));
}
if let Some(v) = self.config.replica_announced {
conf.push_str(&format!("replica-announced {}\n", yn(v)));
}
if let Some(ref size) = self.config.replica_full_sync_buffer_limit {
conf.push_str(&format!("replica-full-sync-buffer-limit {size}\n"));
}
if let Some(v) = self.config.replica_ignore_disk_write_errors {
conf.push_str(&format!("replica-ignore-disk-write-errors {}\n", yn(v)));
}
if let Some(v) = self.config.replica_ignore_maxmemory {
conf.push_str(&format!("replica-ignore-maxmemory {}\n", yn(v)));
}
if let Some(v) = self.config.replica_lazy_flush {
conf.push_str(&format!("replica-lazy-flush {}\n", yn(v)));
}
if let Some(priority) = self.config.replica_priority {
conf.push_str(&format!("replica-priority {priority}\n"));
}
if let Some(v) = self.config.replica_read_only {
conf.push_str(&format!("replica-read-only {}\n", yn(v)));
}
if let Some(v) = self.config.replica_serve_stale_data {
conf.push_str(&format!("replica-serve-stale-data {}\n", yn(v)));
}
if let Some(n) = self.config.min_replicas_to_write {
conf.push_str(&format!("min-replicas-to-write {n}\n"));
}
if let Some(lag) = self.config.min_replicas_max_lag {
conf.push_str(&format!("min-replicas-max-lag {lag}\n"));
}
if let Some(ref pw) = self.config.password {
conf.push_str(&format!("requirepass {pw}\n"));
}
if let Some(ref path) = self.config.acl_file {
conf.push_str(&format!("aclfile {}\n", path.display()));
}
if self.config.cluster_enabled {
conf.push_str("cluster-enabled yes\n");
if let Some(ref path) = self.config.cluster_config_file {
conf.push_str(&format!("cluster-config-file {}\n", path.display()));
} else {
conf.push_str(&format!(
"cluster-config-file {}/nodes.conf\n",
node_dir.display()
));
}
if let Some(timeout) = self.config.cluster_node_timeout {
conf.push_str(&format!("cluster-node-timeout {timeout}\n"));
}
if let Some(v) = self.config.cluster_require_full_coverage {
conf.push_str(&format!("cluster-require-full-coverage {}\n", yn(v)));
}
if let Some(v) = self.config.cluster_allow_reads_when_down {
conf.push_str(&format!("cluster-allow-reads-when-down {}\n", yn(v)));
}
if let Some(v) = self.config.cluster_allow_pubsubshard_when_down {
conf.push_str(&format!("cluster-allow-pubsubshard-when-down {}\n", yn(v)));
}
if let Some(v) = self.config.cluster_allow_replica_migration {
conf.push_str(&format!("cluster-allow-replica-migration {}\n", yn(v)));
}
if let Some(barrier) = self.config.cluster_migration_barrier {
conf.push_str(&format!("cluster-migration-barrier {barrier}\n"));
}
if let Some(v) = self.config.cluster_replica_no_failover {
conf.push_str(&format!("cluster-replica-no-failover {}\n", yn(v)));
}
if let Some(factor) = self.config.cluster_replica_validity_factor {
conf.push_str(&format!("cluster-replica-validity-factor {factor}\n"));
}
if let Some(ref ip) = self.config.cluster_announce_ip {
conf.push_str(&format!("cluster-announce-ip {ip}\n"));
}
if let Some(port) = self.config.cluster_announce_port {
conf.push_str(&format!("cluster-announce-port {port}\n"));
}
if let Some(port) = self.config.cluster_announce_bus_port {
conf.push_str(&format!("cluster-announce-bus-port {port}\n"));
}
if let Some(port) = self.config.cluster_announce_tls_port {
conf.push_str(&format!("cluster-announce-tls-port {port}\n"));
}
if let Some(ref hostname) = self.config.cluster_announce_hostname {
conf.push_str(&format!("cluster-announce-hostname {hostname}\n"));
}
if let Some(ref name) = self.config.cluster_announce_human_nodename {
conf.push_str(&format!("cluster-announce-human-nodename {name}\n"));
}
if let Some(port) = self.config.cluster_port {
conf.push_str(&format!("cluster-port {port}\n"));
}
if let Some(ref endpoint_type) = self.config.cluster_preferred_endpoint_type {
conf.push_str(&format!(
"cluster-preferred-endpoint-type {endpoint_type}\n"
));
}
if let Some(limit) = self.config.cluster_link_sendbuf_limit {
conf.push_str(&format!("cluster-link-sendbuf-limit {limit}\n"));
}
if let Some(ratio) = self.config.cluster_compatibility_sample_ratio {
conf.push_str(&format!("cluster-compatibility-sample-ratio {ratio}\n"));
}
if let Some(bytes) = self.config.cluster_slot_migration_handoff_max_lag_bytes {
conf.push_str(&format!(
"cluster-slot-migration-handoff-max-lag-bytes {bytes}\n"
));
}
if let Some(ms) = self.config.cluster_slot_migration_write_pause_timeout {
conf.push_str(&format!(
"cluster-slot-migration-write-pause-timeout {ms}\n"
));
}
if let Some(v) = self.config.cluster_slot_stats_enabled {
conf.push_str(&format!("cluster-slot-stats-enabled {}\n", yn(v)));
}
}
if let Some(n) = self.config.hash_max_listpack_entries {
conf.push_str(&format!("hash-max-listpack-entries {n}\n"));
}
if let Some(n) = self.config.hash_max_listpack_value {
conf.push_str(&format!("hash-max-listpack-value {n}\n"));
}
if let Some(n) = self.config.list_max_listpack_size {
conf.push_str(&format!("list-max-listpack-size {n}\n"));
}
if let Some(n) = self.config.list_compress_depth {
conf.push_str(&format!("list-compress-depth {n}\n"));
}
if let Some(n) = self.config.set_max_intset_entries {
conf.push_str(&format!("set-max-intset-entries {n}\n"));
}
if let Some(n) = self.config.set_max_listpack_entries {
conf.push_str(&format!("set-max-listpack-entries {n}\n"));
}
if let Some(n) = self.config.set_max_listpack_value {
conf.push_str(&format!("set-max-listpack-value {n}\n"));
}
if let Some(n) = self.config.zset_max_listpack_entries {
conf.push_str(&format!("zset-max-listpack-entries {n}\n"));
}
if let Some(n) = self.config.zset_max_listpack_value {
conf.push_str(&format!("zset-max-listpack-value {n}\n"));
}
if let Some(n) = self.config.hll_sparse_max_bytes {
conf.push_str(&format!("hll-sparse-max-bytes {n}\n"));
}
if let Some(n) = self.config.stream_node_max_bytes {
conf.push_str(&format!("stream-node-max-bytes {n}\n"));
}
if let Some(n) = self.config.stream_node_max_entries {
conf.push_str(&format!("stream-node-max-entries {n}\n"));
}
if let Some(ms) = self.config.stream_idmp_duration {
conf.push_str(&format!("stream-idmp-duration {ms}\n"));
}
if let Some(n) = self.config.stream_idmp_maxsize {
conf.push_str(&format!("stream-idmp-maxsize {n}\n"));
}
for path in &self.config.loadmodule {
conf.push_str(&format!("loadmodule {}\n", path.display()));
}
if let Some(hz) = self.config.hz {
conf.push_str(&format!("hz {hz}\n"));
}
if let Some(n) = self.config.io_threads {
conf.push_str(&format!("io-threads {n}\n"));
}
if let Some(enable) = self.config.io_threads_do_reads {
conf.push_str(&format!("io-threads-do-reads {}\n", yn(enable)));
}
if let Some(ref events) = self.config.notify_keyspace_events {
conf.push_str(&format!("notify-keyspace-events {events}\n"));
}
if let Some(us) = self.config.slowlog_log_slower_than {
conf.push_str(&format!("slowlog-log-slower-than {us}\n"));
}
if let Some(n) = self.config.slowlog_max_len {
conf.push_str(&format!("slowlog-max-len {n}\n"));
}
if let Some(ms) = self.config.latency_monitor_threshold {
conf.push_str(&format!("latency-monitor-threshold {ms}\n"));
}
if let Some(enable) = self.config.latency_tracking {
conf.push_str(&format!("latency-tracking {}\n", yn(enable)));
}
if let Some(ref pcts) = self.config.latency_tracking_info_percentiles {
conf.push_str(&format!("latency-tracking-info-percentiles \"{pcts}\"\n"));
}
if let Some(enable) = self.config.activedefrag {
conf.push_str(&format!("activedefrag {}\n", yn(enable)));
}
if let Some(ref bytes) = self.config.active_defrag_ignore_bytes {
conf.push_str(&format!("active-defrag-ignore-bytes {bytes}\n"));
}
if let Some(pct) = self.config.active_defrag_threshold_lower {
conf.push_str(&format!("active-defrag-threshold-lower {pct}\n"));
}
if let Some(pct) = self.config.active_defrag_threshold_upper {
conf.push_str(&format!("active-defrag-threshold-upper {pct}\n"));
}
if let Some(pct) = self.config.active_defrag_cycle_min {
conf.push_str(&format!("active-defrag-cycle-min {pct}\n"));
}
if let Some(pct) = self.config.active_defrag_cycle_max {
conf.push_str(&format!("active-defrag-cycle-max {pct}\n"));
}
if let Some(n) = self.config.active_defrag_max_scan_fields {
conf.push_str(&format!("active-defrag-max-scan-fields {n}\n"));
}
if let Some(enable) = self.config.syslog_enabled {
conf.push_str(&format!("syslog-enabled {}\n", yn(enable)));
}
if let Some(ref ident) = self.config.syslog_ident {
conf.push_str(&format!("syslog-ident {ident}\n"));
}
if let Some(ref facility) = self.config.syslog_facility {
conf.push_str(&format!("syslog-facility {facility}\n"));
}
if let Some(ref mode) = self.config.supervised {
conf.push_str(&format!("supervised {mode}\n"));
}
if let Some(enable) = self.config.always_show_logo {
conf.push_str(&format!("always-show-logo {}\n", yn(enable)));
}
if let Some(enable) = self.config.set_proc_title {
conf.push_str(&format!("set-proc-title {}\n", yn(enable)));
}
if let Some(ref template) = self.config.proc_title_template {
conf.push_str(&format!("proc-title-template \"{template}\"\n"));
}
if let Some(ref default) = self.config.acl_pubsub_default {
conf.push_str(&format!("acl-pubsub-default {default}\n"));
}
if let Some(n) = self.config.acllog_max_len {
conf.push_str(&format!("acllog-max-len {n}\n"));
}
if let Some(ref mode) = self.config.enable_debug_command {
conf.push_str(&format!("enable-debug-command {mode}\n"));
}
if let Some(ref mode) = self.config.enable_module_command {
conf.push_str(&format!("enable-module-command {mode}\n"));
}
if let Some(ref mode) = self.config.enable_protected_configs {
conf.push_str(&format!("enable-protected-configs {mode}\n"));
}
for (cmd, new_name) in &self.config.rename_command {
conf.push_str(&format!("rename-command {cmd} \"{new_name}\"\n"));
}
if let Some(ref mode) = self.config.sanitize_dump_payload {
conf.push_str(&format!("sanitize-dump-payload {mode}\n"));
}
if let Some(enable) = self.config.hide_user_data_from_log {
conf.push_str(&format!("hide-user-data-from-log {}\n", yn(enable)));
}
if let Some(ref addr) = self.config.bind_source_addr {
conf.push_str(&format!("bind-source-addr {addr}\n"));
}
if let Some(ms) = self.config.busy_reply_threshold {
conf.push_str(&format!("busy-reply-threshold {ms}\n"));
}
for limit in &self.config.client_output_buffer_limit {
conf.push_str(&format!("client-output-buffer-limit {limit}\n"));
}
if let Some(ref limit) = self.config.client_query_buffer_limit {
conf.push_str(&format!("client-query-buffer-limit {limit}\n"));
}
if let Some(ref len) = self.config.proto_max_bulk_len {
conf.push_str(&format!("proto-max-bulk-len {len}\n"));
}
if let Some(n) = self.config.max_new_connections_per_cycle {
conf.push_str(&format!("max-new-connections-per-cycle {n}\n"));
}
if let Some(n) = self.config.max_new_tls_connections_per_cycle {
conf.push_str(&format!("max-new-tls-connections-per-cycle {n}\n"));
}
if let Some(id) = self.config.socket_mark_id {
conf.push_str(&format!("socket-mark-id {id}\n"));
}
if let Some(ref name) = self.config.dbfilename {
conf.push_str(&format!("dbfilename {name}\n"));
}
if let Some(enable) = self.config.rdbcompression {
conf.push_str(&format!("rdbcompression {}\n", yn(enable)));
}
if let Some(enable) = self.config.rdbchecksum {
conf.push_str(&format!("rdbchecksum {}\n", yn(enable)));
}
if let Some(enable) = self.config.rdb_save_incremental_fsync {
conf.push_str(&format!("rdb-save-incremental-fsync {}\n", yn(enable)));
}
if let Some(enable) = self.config.rdb_del_sync_files {
conf.push_str(&format!("rdb-del-sync-files {}\n", yn(enable)));
}
if let Some(enable) = self.config.stop_writes_on_bgsave_error {
conf.push_str(&format!("stop-writes-on-bgsave-error {}\n", yn(enable)));
}
if let Some(ref behavior) = self.config.shutdown_on_sigint {
conf.push_str(&format!("shutdown-on-sigint {behavior}\n"));
}
if let Some(ref behavior) = self.config.shutdown_on_sigterm {
conf.push_str(&format!("shutdown-on-sigterm {behavior}\n"));
}
if let Some(seconds) = self.config.shutdown_timeout {
conf.push_str(&format!("shutdown-timeout {seconds}\n"));
}
if let Some(enable) = self.config.activerehashing {
conf.push_str(&format!("activerehashing {}\n", yn(enable)));
}
if let Some(enable) = self.config.crash_log_enabled {
conf.push_str(&format!("crash-log-enabled {}\n", yn(enable)));
}
if let Some(enable) = self.config.crash_memcheck_enabled {
conf.push_str(&format!("crash-memcheck-enabled {}\n", yn(enable)));
}
if let Some(enable) = self.config.disable_thp {
conf.push_str(&format!("disable-thp {}\n", yn(enable)));
}
if let Some(enable) = self.config.dynamic_hz {
conf.push_str(&format!("dynamic-hz {}\n", yn(enable)));
}
if let Some(ref warning) = self.config.ignore_warnings {
conf.push_str(&format!("ignore-warnings {warning}\n"));
}
for path in &self.config.include {
conf.push_str(&format!("include {}\n", path.display()));
}
if let Some(enable) = self.config.jemalloc_bg_thread {
conf.push_str(&format!("jemalloc-bg-thread {}\n", yn(enable)));
}
if let Some(ref locale) = self.config.locale_collate {
conf.push_str(&format!("locale-collate {locale}\n"));
}
if let Some(ms) = self.config.lua_time_limit {
conf.push_str(&format!("lua-time-limit {ms}\n"));
}
if let Some(ref mode) = self.config.oom_score_adj {
conf.push_str(&format!("oom-score-adj {mode}\n"));
}
if let Some(ref values) = self.config.oom_score_adj_values {
conf.push_str(&format!("oom-score-adj-values {values}\n"));
}
if let Some(ref behavior) = self.config.propagation_error_behavior {
conf.push_str(&format!("propagation-error-behavior {behavior}\n"));
}
if let Some(n) = self.config.tracking_table_max_keys {
conf.push_str(&format!("tracking-table-max-keys {n}\n"));
}
for (key, value) in &self.config.extra {
conf.push_str(&format!("{key} {value}\n"));
}
conf
}
}
impl Default for RedisServer {
fn default() -> Self {
Self::new()
}
}
pub struct RedisServerHandle {
config: RedisServerConfig,
cli: RedisCli,
pid: u32,
detached: bool,
}
impl RedisServerHandle {
pub fn addr(&self) -> String {
format!("{}:{}", self.config.bind, self.config.port)
}
pub fn port(&self) -> u16 {
self.config.port
}
pub fn host(&self) -> &str {
&self.config.bind
}
pub fn pid(&self) -> u32 {
self.pid
}
pub async fn is_alive(&self) -> bool {
self.cli.ping().await
}
pub fn cli(&self) -> &RedisCli {
&self.cli
}
pub async fn run(&self, args: &[&str]) -> Result<String> {
self.cli.run(args).await
}
pub fn detach(mut self) {
self.detached = true;
}
pub fn stop(&self) {
self.cli.shutdown();
}
pub async fn wait_for_ready(&self, timeout: Duration) -> Result<()> {
self.cli.wait_for_ready(timeout).await
}
}
impl Drop for RedisServerHandle {
fn drop(&mut self) {
if !self.detached {
self.stop();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let s = RedisServer::new();
assert_eq!(s.config.port, 6379);
assert_eq!(s.config.bind, "127.0.0.1");
assert!(matches!(s.config.save, SavePolicy::Disabled));
}
#[test]
fn builder_chain() {
let s = RedisServer::new()
.port(6400)
.bind("0.0.0.0")
.save(true)
.appendonly(true)
.password("secret")
.logfile("/tmp/redis.log")
.loglevel(LogLevel::Warning)
.extra("maxmemory", "100mb");
assert_eq!(s.config.port, 6400);
assert_eq!(s.config.bind, "0.0.0.0");
assert!(matches!(s.config.save, SavePolicy::Default));
assert!(s.config.appendonly);
assert_eq!(s.config.password.as_deref(), Some("secret"));
assert_eq!(s.config.logfile.as_deref(), Some("/tmp/redis.log"));
assert_eq!(s.config.extra.get("maxmemory").unwrap(), "100mb");
}
#[test]
fn save_schedule() {
let s = RedisServer::new().save_schedule(vec![(900, 1), (300, 10)]);
match &s.config.save {
SavePolicy::Custom(pairs) => {
assert_eq!(pairs, &[(900, 1), (300, 10)]);
}
_ => panic!("expected SavePolicy::Custom"),
}
}
#[test]
fn aof_tuning() {
let s = RedisServer::new()
.appendonly(true)
.appendfsync(AppendFsync::Always)
.appendfilename("my.aof")
.aof_use_rdb_preamble(true)
.auto_aof_rewrite_percentage(100)
.auto_aof_rewrite_min_size("64mb")
.no_appendfsync_on_rewrite(true);
assert!(s.config.appendonly);
assert!(matches!(s.config.appendfsync, Some(AppendFsync::Always)));
assert_eq!(s.config.appendfilename.as_deref(), Some("my.aof"));
assert_eq!(s.config.aof_use_rdb_preamble, Some(true));
assert_eq!(s.config.auto_aof_rewrite_percentage, Some(100));
assert_eq!(s.config.auto_aof_rewrite_min_size.as_deref(), Some("64mb"));
assert_eq!(s.config.no_appendfsync_on_rewrite, Some(true));
}
#[test]
fn memory_eviction_and_lazyfree() {
let s = RedisServer::new()
.maxmemory("256mb")
.maxmemory_policy("allkeys-lfu")
.maxmemory_samples(10)
.maxmemory_clients("0")
.maxmemory_eviction_tenacity(50)
.lfu_log_factor(10)
.lfu_decay_time(1)
.active_expire_effort(25)
.lazyfree_lazy_eviction(true)
.lazyfree_lazy_expire(true)
.lazyfree_lazy_server_del(true)
.lazyfree_lazy_user_del(false)
.lazyfree_lazy_user_flush(true);
assert_eq!(s.config.maxmemory.as_deref(), Some("256mb"));
assert_eq!(s.config.maxmemory_policy.as_deref(), Some("allkeys-lfu"));
assert_eq!(s.config.maxmemory_samples, Some(10));
assert_eq!(s.config.maxmemory_clients.as_deref(), Some("0"));
assert_eq!(s.config.maxmemory_eviction_tenacity, Some(50));
assert_eq!(s.config.lfu_log_factor, Some(10));
assert_eq!(s.config.lfu_decay_time, Some(1));
assert_eq!(s.config.active_expire_effort, Some(25));
assert_eq!(s.config.lazyfree_lazy_eviction, Some(true));
assert_eq!(s.config.lazyfree_lazy_expire, Some(true));
assert_eq!(s.config.lazyfree_lazy_server_del, Some(true));
assert_eq!(s.config.lazyfree_lazy_user_del, Some(false));
assert_eq!(s.config.lazyfree_lazy_user_flush, Some(true));
}
#[test]
fn replication_tuning() {
let s = RedisServer::new()
.replicaof("127.0.0.1", 6379)
.masterauth("secret")
.masteruser("repl-user")
.repl_backlog_size("1mb")
.repl_backlog_ttl(3600)
.repl_disable_tcp_nodelay(true)
.repl_diskless_load(ReplDisklessLoad::Swapdb)
.repl_diskless_sync(true)
.repl_diskless_sync_delay(5)
.repl_diskless_sync_max_replicas(3)
.repl_ping_replica_period(10)
.repl_timeout(60)
.replica_announce_ip("10.0.0.1")
.replica_announce_port(6380)
.replica_announced(true)
.replica_full_sync_buffer_limit("256mb")
.replica_ignore_disk_write_errors(false)
.replica_ignore_maxmemory(true)
.replica_lazy_flush(true)
.replica_priority(100)
.replica_read_only(true)
.replica_serve_stale_data(false)
.min_replicas_to_write(2)
.min_replicas_max_lag(10);
assert_eq!(s.config.replicaof, Some(("127.0.0.1".into(), 6379)));
assert_eq!(s.config.masterauth.as_deref(), Some("secret"));
assert_eq!(s.config.masteruser.as_deref(), Some("repl-user"));
assert_eq!(s.config.repl_backlog_size.as_deref(), Some("1mb"));
assert_eq!(s.config.repl_backlog_ttl, Some(3600));
assert_eq!(s.config.repl_disable_tcp_nodelay, Some(true));
assert!(matches!(
s.config.repl_diskless_load,
Some(ReplDisklessLoad::Swapdb)
));
assert_eq!(s.config.repl_diskless_sync, Some(true));
assert_eq!(s.config.repl_diskless_sync_delay, Some(5));
assert_eq!(s.config.repl_diskless_sync_max_replicas, Some(3));
assert_eq!(s.config.repl_ping_replica_period, Some(10));
assert_eq!(s.config.repl_timeout, Some(60));
assert_eq!(s.config.replica_announce_ip.as_deref(), Some("10.0.0.1"));
assert_eq!(s.config.replica_announce_port, Some(6380));
assert_eq!(s.config.replica_announced, Some(true));
assert_eq!(
s.config.replica_full_sync_buffer_limit.as_deref(),
Some("256mb")
);
assert_eq!(s.config.replica_ignore_disk_write_errors, Some(false));
assert_eq!(s.config.replica_ignore_maxmemory, Some(true));
assert_eq!(s.config.replica_lazy_flush, Some(true));
assert_eq!(s.config.replica_priority, Some(100));
assert_eq!(s.config.replica_read_only, Some(true));
assert_eq!(s.config.replica_serve_stale_data, Some(false));
assert_eq!(s.config.min_replicas_to_write, Some(2));
assert_eq!(s.config.min_replicas_max_lag, Some(10));
}
#[test]
fn cluster_config() {
let s = RedisServer::new()
.port(7000)
.cluster_enabled(true)
.cluster_node_timeout(5000)
.cluster_config_file("/tmp/nodes.conf")
.cluster_require_full_coverage(false)
.cluster_allow_reads_when_down(true)
.cluster_allow_pubsubshard_when_down(true)
.cluster_allow_replica_migration(true)
.cluster_migration_barrier(1)
.cluster_replica_no_failover(false)
.cluster_replica_validity_factor(10)
.cluster_announce_ip("10.0.0.1")
.cluster_announce_port(7000)
.cluster_announce_bus_port(17000)
.cluster_announce_tls_port(7100)
.cluster_announce_hostname("node1.example.com")
.cluster_announce_human_nodename("node-1")
.cluster_port(17000)
.cluster_preferred_endpoint_type("ip")
.cluster_link_sendbuf_limit(67108864)
.cluster_compatibility_sample_ratio(50)
.cluster_slot_migration_handoff_max_lag_bytes(1048576)
.cluster_slot_migration_write_pause_timeout(5000)
.cluster_slot_stats_enabled(true);
assert!(s.config.cluster_enabled);
assert_eq!(s.config.cluster_node_timeout, Some(5000));
assert_eq!(
s.config.cluster_config_file,
Some(PathBuf::from("/tmp/nodes.conf"))
);
assert_eq!(s.config.cluster_require_full_coverage, Some(false));
assert_eq!(s.config.cluster_allow_reads_when_down, Some(true));
assert_eq!(s.config.cluster_allow_pubsubshard_when_down, Some(true));
assert_eq!(s.config.cluster_allow_replica_migration, Some(true));
assert_eq!(s.config.cluster_migration_barrier, Some(1));
assert_eq!(s.config.cluster_replica_no_failover, Some(false));
assert_eq!(s.config.cluster_replica_validity_factor, Some(10));
assert_eq!(s.config.cluster_announce_ip.as_deref(), Some("10.0.0.1"));
assert_eq!(s.config.cluster_announce_port, Some(7000));
assert_eq!(s.config.cluster_announce_bus_port, Some(17000));
assert_eq!(s.config.cluster_announce_tls_port, Some(7100));
assert_eq!(
s.config.cluster_announce_hostname.as_deref(),
Some("node1.example.com")
);
assert_eq!(
s.config.cluster_announce_human_nodename.as_deref(),
Some("node-1")
);
assert_eq!(s.config.cluster_port, Some(17000));
assert_eq!(
s.config.cluster_preferred_endpoint_type.as_deref(),
Some("ip")
);
assert_eq!(s.config.cluster_link_sendbuf_limit, Some(67108864));
assert_eq!(s.config.cluster_compatibility_sample_ratio, Some(50));
assert_eq!(
s.config.cluster_slot_migration_handoff_max_lag_bytes,
Some(1048576)
);
assert_eq!(
s.config.cluster_slot_migration_write_pause_timeout,
Some(5000)
);
assert_eq!(s.config.cluster_slot_stats_enabled, Some(true));
}
#[test]
fn data_structure_tuning() {
let s = RedisServer::new()
.hash_max_listpack_entries(128)
.hash_max_listpack_value(64)
.list_max_listpack_size(-2)
.list_compress_depth(1)
.set_max_intset_entries(512)
.set_max_listpack_entries(128)
.set_max_listpack_value(64)
.zset_max_listpack_entries(128)
.zset_max_listpack_value(64)
.hll_sparse_max_bytes(3000)
.stream_node_max_bytes(4096)
.stream_node_max_entries(100)
.stream_idmp_duration(5000)
.stream_idmp_maxsize(1000);
assert_eq!(s.config.hash_max_listpack_entries, Some(128));
assert_eq!(s.config.hash_max_listpack_value, Some(64));
assert_eq!(s.config.list_max_listpack_size, Some(-2));
assert_eq!(s.config.list_compress_depth, Some(1));
assert_eq!(s.config.set_max_intset_entries, Some(512));
assert_eq!(s.config.set_max_listpack_entries, Some(128));
assert_eq!(s.config.set_max_listpack_value, Some(64));
assert_eq!(s.config.zset_max_listpack_entries, Some(128));
assert_eq!(s.config.zset_max_listpack_value, Some(64));
assert_eq!(s.config.hll_sparse_max_bytes, Some(3000));
assert_eq!(s.config.stream_node_max_bytes, Some(4096));
assert_eq!(s.config.stream_node_max_entries, Some(100));
assert_eq!(s.config.stream_idmp_duration, Some(5000));
assert_eq!(s.config.stream_idmp_maxsize, Some(1000));
}
#[test]
fn tls_config() {
let s = RedisServer::new()
.port(6400)
.tls_port(6401)
.tls_cert_file("/etc/tls/redis.crt")
.tls_key_file("/etc/tls/redis.key")
.tls_key_file_pass("keypass")
.tls_ca_cert_file("/etc/tls/ca.crt")
.tls_ca_cert_dir("/etc/tls/certs")
.tls_auth_clients(true)
.tls_client_cert_file("/etc/tls/client.crt")
.tls_client_key_file("/etc/tls/client.key")
.tls_client_key_file_pass("clientpass")
.tls_dh_params_file("/etc/tls/dhparams.pem")
.tls_ciphers("ECDHE-RSA-AES256-GCM-SHA384")
.tls_ciphersuites("TLS_AES_256_GCM_SHA384")
.tls_protocols("TLSv1.2 TLSv1.3")
.tls_prefer_server_ciphers(true)
.tls_session_caching(true)
.tls_session_cache_size(20480)
.tls_session_cache_timeout(300)
.tls_replication(true)
.tls_cluster(true);
assert_eq!(s.config.tls_port, Some(6401));
assert_eq!(
s.config.tls_cert_file.as_deref(),
Some(std::path::Path::new("/etc/tls/redis.crt"))
);
assert_eq!(
s.config.tls_key_file.as_deref(),
Some(std::path::Path::new("/etc/tls/redis.key"))
);
assert_eq!(s.config.tls_key_file_pass.as_deref(), Some("keypass"));
assert_eq!(
s.config.tls_ca_cert_file.as_deref(),
Some(std::path::Path::new("/etc/tls/ca.crt"))
);
assert_eq!(
s.config.tls_ca_cert_dir.as_deref(),
Some(std::path::Path::new("/etc/tls/certs"))
);
assert_eq!(s.config.tls_auth_clients, Some(true));
assert_eq!(
s.config.tls_client_cert_file.as_deref(),
Some(std::path::Path::new("/etc/tls/client.crt"))
);
assert_eq!(
s.config.tls_client_key_file.as_deref(),
Some(std::path::Path::new("/etc/tls/client.key"))
);
assert_eq!(
s.config.tls_client_key_file_pass.as_deref(),
Some("clientpass")
);
assert_eq!(
s.config.tls_dh_params_file.as_deref(),
Some(std::path::Path::new("/etc/tls/dhparams.pem"))
);
assert_eq!(
s.config.tls_ciphers.as_deref(),
Some("ECDHE-RSA-AES256-GCM-SHA384")
);
assert_eq!(
s.config.tls_ciphersuites.as_deref(),
Some("TLS_AES_256_GCM_SHA384")
);
assert_eq!(s.config.tls_protocols.as_deref(), Some("TLSv1.2 TLSv1.3"));
assert_eq!(s.config.tls_prefer_server_ciphers, Some(true));
assert_eq!(s.config.tls_session_caching, Some(true));
assert_eq!(s.config.tls_session_cache_size, Some(20480));
assert_eq!(s.config.tls_session_cache_timeout, Some(300));
assert_eq!(s.config.tls_replication, Some(true));
assert_eq!(s.config.tls_cluster, Some(true));
}
}