1use std::collections::HashMap;
4use std::fs;
5use std::path::PathBuf;
6use std::time::Duration;
7
8use tokio::process::Command;
9
10use crate::cli::RedisCli;
11use crate::error::{Error, Result};
12
13#[derive(Debug, Clone)]
38pub struct RedisServerConfig {
39 pub port: u16,
42 pub bind: String,
44 pub protected_mode: bool,
46 pub tcp_backlog: Option<u32>,
48 pub unixsocket: Option<PathBuf>,
50 pub unixsocketperm: Option<u32>,
52 pub timeout: Option<u32>,
54 pub tcp_keepalive: Option<u32>,
56
57 pub tls_port: Option<u16>,
60 pub tls_cert_file: Option<PathBuf>,
62 pub tls_key_file: Option<PathBuf>,
64 pub tls_ca_cert_file: Option<PathBuf>,
66 pub tls_auth_clients: Option<bool>,
68
69 pub daemonize: bool,
72 pub dir: PathBuf,
74 pub logfile: Option<String>,
76 pub loglevel: LogLevel,
78 pub databases: Option<u32>,
80
81 pub maxmemory: Option<String>,
84 pub maxmemory_policy: Option<String>,
86 pub maxclients: Option<u32>,
88
89 pub save: bool,
92 pub appendonly: bool,
94
95 pub replicaof: Option<(String, u16)>,
98 pub masterauth: Option<String>,
100
101 pub password: Option<String>,
104 pub acl_file: Option<PathBuf>,
106
107 pub cluster_enabled: bool,
110 pub cluster_node_timeout: Option<u64>,
112
113 pub loadmodule: Vec<PathBuf>,
116
117 pub hz: Option<u32>,
120 pub io_threads: Option<u32>,
122 pub io_threads_do_reads: Option<bool>,
124 pub notify_keyspace_events: Option<String>,
126
127 pub extra: HashMap<String, String>,
130
131 pub redis_server_bin: String,
134 pub redis_cli_bin: String,
136}
137
138#[derive(Debug, Clone, Copy)]
140pub enum LogLevel {
141 Debug,
143 Verbose,
145 Notice,
147 Warning,
149}
150
151impl std::fmt::Display for LogLevel {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 match self {
154 LogLevel::Debug => f.write_str("debug"),
155 LogLevel::Verbose => f.write_str("verbose"),
156 LogLevel::Notice => f.write_str("notice"),
157 LogLevel::Warning => f.write_str("warning"),
158 }
159 }
160}
161
162impl Default for RedisServerConfig {
163 fn default() -> Self {
164 Self {
165 port: 6379,
166 bind: "127.0.0.1".into(),
167 protected_mode: false,
168 tcp_backlog: None,
169 unixsocket: None,
170 unixsocketperm: None,
171 timeout: None,
172 tcp_keepalive: None,
173 tls_port: None,
174 tls_cert_file: None,
175 tls_key_file: None,
176 tls_ca_cert_file: None,
177 tls_auth_clients: None,
178 daemonize: true,
179 dir: std::env::temp_dir().join("redis-server-wrapper"),
180 logfile: None,
181 loglevel: LogLevel::Notice,
182 databases: None,
183 maxmemory: None,
184 maxmemory_policy: None,
185 maxclients: None,
186 save: false,
187 appendonly: false,
188 replicaof: None,
189 masterauth: None,
190 password: None,
191 acl_file: None,
192 cluster_enabled: false,
193 cluster_node_timeout: None,
194 loadmodule: Vec::new(),
195 hz: None,
196 io_threads: None,
197 io_threads_do_reads: None,
198 notify_keyspace_events: None,
199 extra: HashMap::new(),
200 redis_server_bin: "redis-server".into(),
201 redis_cli_bin: "redis-cli".into(),
202 }
203 }
204}
205
206pub struct RedisServer {
208 config: RedisServerConfig,
209}
210
211impl RedisServer {
212 pub fn new() -> Self {
214 Self {
215 config: RedisServerConfig::default(),
216 }
217 }
218
219 pub fn port(mut self, port: u16) -> Self {
223 self.config.port = port;
224 self
225 }
226
227 pub fn bind(mut self, bind: impl Into<String>) -> Self {
229 self.config.bind = bind.into();
230 self
231 }
232
233 pub fn protected_mode(mut self, protected: bool) -> Self {
235 self.config.protected_mode = protected;
236 self
237 }
238
239 pub fn tcp_backlog(mut self, backlog: u32) -> Self {
241 self.config.tcp_backlog = Some(backlog);
242 self
243 }
244
245 pub fn unixsocket(mut self, path: impl Into<PathBuf>) -> Self {
247 self.config.unixsocket = Some(path.into());
248 self
249 }
250
251 pub fn unixsocketperm(mut self, perm: u32) -> Self {
253 self.config.unixsocketperm = Some(perm);
254 self
255 }
256
257 pub fn timeout(mut self, seconds: u32) -> Self {
259 self.config.timeout = Some(seconds);
260 self
261 }
262
263 pub fn tcp_keepalive(mut self, seconds: u32) -> Self {
265 self.config.tcp_keepalive = Some(seconds);
266 self
267 }
268
269 pub fn tls_port(mut self, port: u16) -> Self {
273 self.config.tls_port = Some(port);
274 self
275 }
276
277 pub fn tls_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
279 self.config.tls_cert_file = Some(path.into());
280 self
281 }
282
283 pub fn tls_key_file(mut self, path: impl Into<PathBuf>) -> Self {
285 self.config.tls_key_file = Some(path.into());
286 self
287 }
288
289 pub fn tls_ca_cert_file(mut self, path: impl Into<PathBuf>) -> Self {
291 self.config.tls_ca_cert_file = Some(path.into());
292 self
293 }
294
295 pub fn tls_auth_clients(mut self, require: bool) -> Self {
297 self.config.tls_auth_clients = Some(require);
298 self
299 }
300
301 pub fn dir(mut self, dir: impl Into<PathBuf>) -> Self {
305 self.config.dir = dir.into();
306 self
307 }
308
309 pub fn loglevel(mut self, level: LogLevel) -> Self {
311 self.config.loglevel = level;
312 self
313 }
314
315 pub fn logfile(mut self, path: impl Into<String>) -> Self {
317 self.config.logfile = Some(path.into());
318 self
319 }
320
321 pub fn databases(mut self, n: u32) -> Self {
323 self.config.databases = Some(n);
324 self
325 }
326
327 pub fn maxmemory(mut self, limit: impl Into<String>) -> Self {
331 self.config.maxmemory = Some(limit.into());
332 self
333 }
334
335 pub fn maxmemory_policy(mut self, policy: impl Into<String>) -> Self {
337 self.config.maxmemory_policy = Some(policy.into());
338 self
339 }
340
341 pub fn maxclients(mut self, n: u32) -> Self {
343 self.config.maxclients = Some(n);
344 self
345 }
346
347 pub fn save(mut self, save: bool) -> Self {
351 self.config.save = save;
352 self
353 }
354
355 pub fn appendonly(mut self, appendonly: bool) -> Self {
357 self.config.appendonly = appendonly;
358 self
359 }
360
361 pub fn replicaof(mut self, host: impl Into<String>, port: u16) -> Self {
365 self.config.replicaof = Some((host.into(), port));
366 self
367 }
368
369 pub fn masterauth(mut self, password: impl Into<String>) -> Self {
371 self.config.masterauth = Some(password.into());
372 self
373 }
374
375 pub fn password(mut self, password: impl Into<String>) -> Self {
379 self.config.password = Some(password.into());
380 self
381 }
382
383 pub fn acl_file(mut self, path: impl Into<PathBuf>) -> Self {
385 self.config.acl_file = Some(path.into());
386 self
387 }
388
389 pub fn cluster_enabled(mut self, enabled: bool) -> Self {
393 self.config.cluster_enabled = enabled;
394 self
395 }
396
397 pub fn cluster_node_timeout(mut self, ms: u64) -> Self {
399 self.config.cluster_node_timeout = Some(ms);
400 self
401 }
402
403 pub fn loadmodule(mut self, path: impl Into<PathBuf>) -> Self {
407 self.config.loadmodule.push(path.into());
408 self
409 }
410
411 pub fn hz(mut self, hz: u32) -> Self {
415 self.config.hz = Some(hz);
416 self
417 }
418
419 pub fn io_threads(mut self, n: u32) -> Self {
421 self.config.io_threads = Some(n);
422 self
423 }
424
425 pub fn io_threads_do_reads(mut self, enable: bool) -> Self {
427 self.config.io_threads_do_reads = Some(enable);
428 self
429 }
430
431 pub fn notify_keyspace_events(mut self, events: impl Into<String>) -> Self {
433 self.config.notify_keyspace_events = Some(events.into());
434 self
435 }
436
437 pub fn redis_server_bin(mut self, bin: impl Into<String>) -> Self {
441 self.config.redis_server_bin = bin.into();
442 self
443 }
444
445 pub fn redis_cli_bin(mut self, bin: impl Into<String>) -> Self {
447 self.config.redis_cli_bin = bin.into();
448 self
449 }
450
451 pub fn extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
453 self.config.extra.insert(key.into(), value.into());
454 self
455 }
456
457 pub async fn start(self) -> Result<RedisServerHandle> {
462 if which::which(&self.config.redis_server_bin).is_err() {
463 return Err(Error::BinaryNotFound {
464 binary: self.config.redis_server_bin.clone(),
465 });
466 }
467 if which::which(&self.config.redis_cli_bin).is_err() {
468 return Err(Error::BinaryNotFound {
469 binary: self.config.redis_cli_bin.clone(),
470 });
471 }
472
473 let node_dir = self.config.dir.join(format!("node-{}", self.config.port));
474 fs::create_dir_all(&node_dir)?;
475
476 let conf_path = node_dir.join("redis.conf");
477 let conf_content = self.generate_config(&node_dir);
478 fs::write(&conf_path, conf_content)?;
479
480 let status = Command::new(&self.config.redis_server_bin)
481 .arg(&conf_path)
482 .stdout(std::process::Stdio::null())
483 .stderr(std::process::Stdio::null())
484 .status()
485 .await?;
486
487 if !status.success() {
488 return Err(Error::ServerStart {
489 port: self.config.port,
490 });
491 }
492
493 let mut cli = RedisCli::new()
494 .bin(&self.config.redis_cli_bin)
495 .host(&self.config.bind)
496 .port(self.config.port);
497 if let Some(ref pw) = self.config.password {
498 cli = cli.password(pw);
499 }
500
501 cli.wait_for_ready(Duration::from_secs(10)).await?;
502
503 let pid_path = node_dir.join("redis.pid");
504 let pid: u32 = fs::read_to_string(&pid_path)
505 .map_err(Error::Io)?
506 .trim()
507 .parse()
508 .map_err(|_| Error::ServerStart {
509 port: self.config.port,
510 })?;
511
512 Ok(RedisServerHandle {
513 config: self.config,
514 cli,
515 pid,
516 detached: false,
517 })
518 }
519
520 fn generate_config(&self, node_dir: &std::path::Path) -> String {
521 let yn = |b: bool| if b { "yes" } else { "no" };
522
523 let mut conf = format!(
524 "port {port}\n\
525 bind {bind}\n\
526 daemonize {daemonize}\n\
527 pidfile {dir}/redis.pid\n\
528 dir {dir}\n\
529 loglevel {level}\n\
530 protected-mode {protected}\n",
531 port = self.config.port,
532 bind = self.config.bind,
533 daemonize = yn(self.config.daemonize),
534 dir = node_dir.display(),
535 level = self.config.loglevel,
536 protected = yn(self.config.protected_mode),
537 );
538
539 let logfile = self
540 .config
541 .logfile
542 .as_deref()
543 .map(str::to_owned)
544 .unwrap_or_else(|| format!("{}/redis.log", node_dir.display()));
545 conf.push_str(&format!("logfile {logfile}\n"));
546
547 if let Some(backlog) = self.config.tcp_backlog {
549 conf.push_str(&format!("tcp-backlog {backlog}\n"));
550 }
551 if let Some(ref path) = self.config.unixsocket {
552 conf.push_str(&format!("unixsocket {}\n", path.display()));
553 }
554 if let Some(perm) = self.config.unixsocketperm {
555 conf.push_str(&format!("unixsocketperm {perm}\n"));
556 }
557 if let Some(t) = self.config.timeout {
558 conf.push_str(&format!("timeout {t}\n"));
559 }
560 if let Some(ka) = self.config.tcp_keepalive {
561 conf.push_str(&format!("tcp-keepalive {ka}\n"));
562 }
563
564 if let Some(port) = self.config.tls_port {
566 conf.push_str(&format!("tls-port {port}\n"));
567 }
568 if let Some(ref path) = self.config.tls_cert_file {
569 conf.push_str(&format!("tls-cert-file {}\n", path.display()));
570 }
571 if let Some(ref path) = self.config.tls_key_file {
572 conf.push_str(&format!("tls-key-file {}\n", path.display()));
573 }
574 if let Some(ref path) = self.config.tls_ca_cert_file {
575 conf.push_str(&format!("tls-ca-cert-file {}\n", path.display()));
576 }
577 if let Some(auth) = self.config.tls_auth_clients {
578 conf.push_str(&format!("tls-auth-clients {}\n", yn(auth)));
579 }
580
581 if let Some(n) = self.config.databases {
583 conf.push_str(&format!("databases {n}\n"));
584 }
585
586 if let Some(ref limit) = self.config.maxmemory {
588 conf.push_str(&format!("maxmemory {limit}\n"));
589 }
590 if let Some(ref policy) = self.config.maxmemory_policy {
591 conf.push_str(&format!("maxmemory-policy {policy}\n"));
592 }
593 if let Some(n) = self.config.maxclients {
594 conf.push_str(&format!("maxclients {n}\n"));
595 }
596
597 if !self.config.save {
599 conf.push_str("save \"\"\n");
600 }
601 if self.config.appendonly {
602 conf.push_str("appendonly yes\n");
603 }
604
605 if let Some((ref host, port)) = self.config.replicaof {
607 conf.push_str(&format!("replicaof {host} {port}\n"));
608 }
609 if let Some(ref pw) = self.config.masterauth {
610 conf.push_str(&format!("masterauth {pw}\n"));
611 }
612
613 if let Some(ref pw) = self.config.password {
615 conf.push_str(&format!("requirepass {pw}\n"));
616 }
617 if let Some(ref path) = self.config.acl_file {
618 conf.push_str(&format!("aclfile {}\n", path.display()));
619 }
620
621 if self.config.cluster_enabled {
623 conf.push_str("cluster-enabled yes\n");
624 conf.push_str(&format!(
625 "cluster-config-file {}/nodes.conf\n",
626 node_dir.display()
627 ));
628 if let Some(timeout) = self.config.cluster_node_timeout {
629 conf.push_str(&format!("cluster-node-timeout {timeout}\n"));
630 }
631 }
632
633 for path in &self.config.loadmodule {
635 conf.push_str(&format!("loadmodule {}\n", path.display()));
636 }
637
638 if let Some(hz) = self.config.hz {
640 conf.push_str(&format!("hz {hz}\n"));
641 }
642 if let Some(n) = self.config.io_threads {
643 conf.push_str(&format!("io-threads {n}\n"));
644 }
645 if let Some(enable) = self.config.io_threads_do_reads {
646 conf.push_str(&format!("io-threads-do-reads {}\n", yn(enable)));
647 }
648 if let Some(ref events) = self.config.notify_keyspace_events {
649 conf.push_str(&format!("notify-keyspace-events {events}\n"));
650 }
651
652 for (key, value) in &self.config.extra {
654 conf.push_str(&format!("{key} {value}\n"));
655 }
656
657 conf
658 }
659}
660
661impl Default for RedisServer {
662 fn default() -> Self {
663 Self::new()
664 }
665}
666
667pub struct RedisServerHandle {
669 config: RedisServerConfig,
670 cli: RedisCli,
671 pid: u32,
672 detached: bool,
673}
674
675impl RedisServerHandle {
676 pub fn addr(&self) -> String {
678 format!("{}:{}", self.config.bind, self.config.port)
679 }
680
681 pub fn port(&self) -> u16 {
683 self.config.port
684 }
685
686 pub fn host(&self) -> &str {
688 &self.config.bind
689 }
690
691 pub fn pid(&self) -> u32 {
693 self.pid
694 }
695
696 pub async fn is_alive(&self) -> bool {
698 self.cli.ping().await
699 }
700
701 pub fn cli(&self) -> &RedisCli {
703 &self.cli
704 }
705
706 pub async fn run(&self, args: &[&str]) -> Result<String> {
708 self.cli.run(args).await
709 }
710
711 pub fn detach(mut self) {
713 self.detached = true;
714 }
715
716 pub fn stop(&self) {
718 self.cli.shutdown();
719 }
720
721 pub async fn wait_for_ready(&self, timeout: Duration) -> Result<()> {
723 self.cli.wait_for_ready(timeout).await
724 }
725}
726
727impl Drop for RedisServerHandle {
728 fn drop(&mut self) {
729 if !self.detached {
730 self.stop();
731 }
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 #[test]
740 fn default_config() {
741 let s = RedisServer::new();
742 assert_eq!(s.config.port, 6379);
743 assert_eq!(s.config.bind, "127.0.0.1");
744 assert!(!s.config.save);
745 }
746
747 #[test]
748 fn builder_chain() {
749 let s = RedisServer::new()
750 .port(6400)
751 .bind("0.0.0.0")
752 .save(true)
753 .appendonly(true)
754 .password("secret")
755 .logfile("/tmp/redis.log")
756 .loglevel(LogLevel::Warning)
757 .extra("maxmemory", "100mb");
758
759 assert_eq!(s.config.port, 6400);
760 assert_eq!(s.config.bind, "0.0.0.0");
761 assert!(s.config.save);
762 assert!(s.config.appendonly);
763 assert_eq!(s.config.password.as_deref(), Some("secret"));
764 assert_eq!(s.config.logfile.as_deref(), Some("/tmp/redis.log"));
765 assert_eq!(s.config.extra.get("maxmemory").unwrap(), "100mb");
766 }
767
768 #[test]
769 fn cluster_config() {
770 let s = RedisServer::new()
771 .port(7000)
772 .cluster_enabled(true)
773 .cluster_node_timeout(5000);
774
775 assert!(s.config.cluster_enabled);
776 assert_eq!(s.config.cluster_node_timeout, Some(5000));
777 }
778}