1use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub grpc_bind_addr: Option<String>,
71 pub grpc_tls_bind_addr: Option<String>,
75 pub grpc_tls_cert: Option<PathBuf>,
81 pub grpc_tls_key: Option<PathBuf>,
84 pub grpc_tls_client_ca: Option<PathBuf>,
89 pub http_bind_addr: Option<String>,
90 pub http_tls_bind_addr: Option<String>,
94 pub http_tls_cert: Option<PathBuf>,
97 pub http_tls_key: Option<PathBuf>,
100 pub http_tls_client_ca: Option<PathBuf>,
104 pub wire_bind_addr: Option<String>,
105 pub wire_tls_bind_addr: Option<String>,
107 pub wire_tls_cert: Option<PathBuf>,
109 pub wire_tls_key: Option<PathBuf>,
111 pub pg_bind_addr: Option<String>,
115 pub create_if_missing: bool,
116 pub read_only: bool,
117 pub role: String,
118 pub primary_addr: Option<String>,
119 pub vault: bool,
120 pub workers: Option<usize>,
122 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
125}
126
127#[derive(Debug, Clone)]
128pub struct SystemdServiceConfig {
129 pub service_name: String,
130 pub binary_path: PathBuf,
131 pub run_user: String,
132 pub run_group: String,
133 pub data_path: PathBuf,
134 pub router_bind_addr: Option<String>,
135 pub grpc_bind_addr: Option<String>,
136 pub http_bind_addr: Option<String>,
137}
138
139impl SystemdServiceConfig {
140 pub fn data_dir(&self) -> PathBuf {
141 self.data_path
142 .parent()
143 .map(PathBuf::from)
144 .unwrap_or_else(|| PathBuf::from("."))
145 }
146
147 pub fn unit_path(&self) -> PathBuf {
148 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
149 }
150}
151
152pub fn default_telemetry_for_path(
157 path: Option<&std::path::Path>,
158) -> crate::telemetry::TelemetryConfig {
159 let log_dir = match path {
160 Some(p) => p
161 .parent()
162 .map(|parent| parent.join("logs"))
163 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
164 None => None, };
166 crate::telemetry::TelemetryConfig {
167 log_dir,
168 file_prefix: "reddb.log".to_string(),
169 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
170 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
171 crate::telemetry::LogFormat::Pretty
172 } else {
173 crate::telemetry::LogFormat::Json
174 },
175 rotation_keep_days: 14,
176 service_name: "reddb",
177 level_explicit: false,
179 format_explicit: false,
180 rotation_keep_days_explicit: false,
181 file_prefix_explicit: false,
182 log_dir_explicit: false,
183 log_file_disabled: false,
184 }
185}
186
187impl ServerCommandConfig {
188 fn to_db_options(&self) -> RedDBOptions {
189 let mut options = match &self.path {
190 Some(path) => RedDBOptions::persistent(path),
191 None => RedDBOptions::in_memory(),
192 };
193
194 options.mode = StorageMode::Persistent;
195 options.create_if_missing = self.create_if_missing;
196 options.read_only = self.read_only
203 || env_nonempty("RED_READONLY")
204 .or_else(|| env_nonempty("REDDB_READONLY"))
205 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
206 .unwrap_or(false)
207 || self.path.as_ref().is_some_and(|data_path| {
208 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
209 data_path,
210 ))
211 .unwrap_or(false)
212 });
213
214 options.replication = match self.role.as_str() {
215 "primary" => ReplicationConfig::primary(),
216 "replica" => {
217 let primary_addr = self
218 .primary_addr
219 .clone()
220 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
221 ReplicationConfig::replica(primary_addr)
228 }
229 _ => ReplicationConfig::standalone(),
230 };
231
232 if self.vault {
233 options.auth.vault_enabled = true;
234 }
235
236 configure_remote_backend_from_env(&mut options);
237
238 options
239 }
240
241 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
242 let mut transports = Vec::with_capacity(3);
243 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
244 transports.push(ServerTransport::Grpc);
245 }
246 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
247 transports.push(ServerTransport::Http);
248 }
249 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
250 transports.push(ServerTransport::Wire);
251 }
252 transports
253 }
254}
255
256fn env_nonempty(name: &str) -> Option<String> {
261 crate::utils::env_with_file_fallback(name)
262}
263
264fn env_truthy(name: &str) -> bool {
265 env_nonempty(name)
266 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
267 .unwrap_or(false)
268}
269
270fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
271 let backend = env_nonempty("RED_BACKEND")
277 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
278 .unwrap_or_else(|| "none".to_string())
279 .to_ascii_lowercase();
280
281 match backend.as_str() {
282 "s3" | "minio" | "r2" => {
287 #[cfg(feature = "backend-s3")]
288 {
289 if let Some(config) = s3_config_from_env() {
290 let remote_key = env_nonempty("RED_REMOTE_KEY")
291 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
292 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
293 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
294 options.remote_backend = Some(backend.clone());
295 options.remote_backend_atomic = Some(backend);
296 options.remote_key = Some(remote_key);
297 }
298 }
299 #[cfg(not(feature = "backend-s3"))]
300 {
301 tracing::warn!(
302 backend = %backend,
303 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
304 );
305 }
306 }
307 "fs" | "local" => {
312 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
313 let remote_key = match (
314 base_path,
315 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
316 ) {
317 (Some(base), Some(rel)) => Some(format!(
318 "{}/{}",
319 base.trim_end_matches('/'),
320 rel.trim_start_matches('/')
321 )),
322 (Some(base), None) => Some(format!(
323 "{}/clusters/dev/data.rdb",
324 base.trim_end_matches('/')
325 )),
326 (None, Some(rel)) => Some(rel),
327 (None, None) => None,
328 };
329 if let Some(key) = remote_key {
330 let backend = Arc::new(crate::storage::backend::LocalBackend);
331 options.remote_backend = Some(backend.clone());
332 options.remote_backend_atomic = Some(backend);
333 options.remote_key = Some(key);
334 }
335 }
336 "http" => {
341 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
342 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
343 {
344 Some(u) => u,
345 None => {
346 tracing::warn!(
347 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
348 );
349 return;
350 }
351 };
352 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
353 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
354 .unwrap_or_default();
355 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
356 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
357 {
358 std::fs::read_to_string(&path)
359 .ok()
360 .map(|s| s.trim().to_string())
361 .filter(|s| !s.is_empty())
362 } else {
363 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
364 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
365 };
366
367 let mut config =
368 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
369 if let Some(auth) = auth_header {
370 config = config.with_auth_header(auth);
371 }
372 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
373 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
374 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
375 config = config.with_conditional_writes(conditional_writes);
376 if conditional_writes {
381 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
382 Ok(atomic) => {
383 let atomic_arc = Arc::new(atomic);
384 options.remote_backend = Some(atomic_arc.clone());
385 options.remote_backend_atomic = Some(atomic_arc);
386 }
387 Err(err) => {
388 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
389 options.remote_backend =
390 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
391 }
392 }
393 } else {
394 options.remote_backend =
395 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
396 }
397 options.remote_key = env_nonempty("RED_REMOTE_KEY")
398 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
399 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
400 }
401 "none" | "" => {}
404 other => {
405 tracing::warn!(
406 backend = %other,
407 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
408 );
409 }
410 }
411}
412
413#[cfg(feature = "backend-s3")]
418fn env_s3(suffix: &str) -> Option<String> {
419 env_nonempty(&format!("RED_S3_{suffix}"))
420 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
421}
422
423#[cfg(feature = "backend-s3")]
429fn env_s3_secret(suffix: &str) -> Option<String> {
430 let file_key_red = format!("RED_S3_{suffix}_FILE");
431 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
432 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
433 return std::fs::read_to_string(&path)
434 .ok()
435 .map(|s| s.trim().to_string())
436 .filter(|s| !s.is_empty());
437 }
438 env_s3(suffix)
439}
440
441#[cfg(feature = "backend-s3")]
442fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
443 let endpoint = env_s3("ENDPOINT")?;
444 let bucket = env_s3("BUCKET")?;
445 let access_key = env_s3_secret("ACCESS_KEY")?;
446 let secret_key = env_s3_secret("SECRET_KEY")?;
447 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
448 let key_prefix = env_s3("KEY_PREFIX")
449 .or_else(|| env_s3("PREFIX"))
450 .unwrap_or_default();
451 let path_style = env_s3("PATH_STYLE")
452 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
453 .unwrap_or(true);
454 Some(crate::storage::backend::S3Config {
455 endpoint,
456 bucket,
457 key_prefix,
458 access_key,
459 secret_key,
460 region,
461 path_style,
462 })
463}
464
465pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
466 let data_dir = config.data_dir();
467 let exec_start = render_systemd_exec_start(config);
468 format!(
469 "[Unit]\n\
470Description=RedDB unified database service\n\
471After=network-online.target\n\
472Wants=network-online.target\n\
473\n\
474[Service]\n\
475Type=simple\n\
476User={user}\n\
477Group={group}\n\
478WorkingDirectory={workdir}\n\
479ExecStart={exec_start}\n\
480Restart=always\n\
481RestartSec=2\n\
482LimitSTACK=16M\n\
483NoNewPrivileges=true\n\
484PrivateTmp=true\n\
485ProtectSystem=strict\n\
486ProtectHome=true\n\
487ProtectControlGroups=true\n\
488ProtectKernelTunables=true\n\
489ProtectKernelModules=true\n\
490RestrictNamespaces=true\n\
491LockPersonality=true\n\
492MemoryDenyWriteExecute=true\n\
493ReadWritePaths={workdir}\n\
494\n\
495[Install]\n\
496WantedBy=multi-user.target\n",
497 user = config.run_user,
498 group = config.run_group,
499 workdir = data_dir.display(),
500 exec_start = exec_start,
501 )
502}
503
504#[cfg(target_os = "linux")]
513pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
514 ensure_root()?;
515 ensure_command_available("systemctl")?;
516 ensure_command_available("getent")?;
517 ensure_command_available("groupadd")?;
518 ensure_command_available("useradd")?;
519 ensure_command_available("install")?;
520 ensure_executable(&config.binary_path)?;
521
522 if !command_success("getent", ["group", config.run_group.as_str()])? {
523 run_command("groupadd", ["--system", config.run_group.as_str()])?;
524 }
525
526 if !command_success("id", ["-u", config.run_user.as_str()])? {
527 let data_dir = config.data_dir();
528 run_command(
529 "useradd",
530 [
531 "--system",
532 "--gid",
533 config.run_group.as_str(),
534 "--home-dir",
535 data_dir.to_string_lossy().as_ref(),
536 "--shell",
537 "/usr/sbin/nologin",
538 config.run_user.as_str(),
539 ],
540 )?;
541 }
542
543 let data_dir = config.data_dir();
544 run_command(
545 "install",
546 [
547 "-d",
548 "-o",
549 config.run_user.as_str(),
550 "-g",
551 config.run_group.as_str(),
552 "-m",
553 "0750",
554 data_dir.to_string_lossy().as_ref(),
555 ],
556 )?;
557
558 std::fs::write(config.unit_path(), render_systemd_unit(config))
559 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
560
561 run_command("systemctl", ["daemon-reload"])?;
562 run_command(
563 "systemctl",
564 [
565 "enable",
566 "--now",
567 format!("{}.service", config.service_name).as_str(),
568 ],
569 )?;
570
571 Ok(())
572}
573
574#[cfg(not(target_os = "linux"))]
579pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
580 Err("systemd install is Linux-only — use sc.exe (Windows) or \
581 launchd (macOS) to install the service manually using the \
582 unit printed by `red service print-unit`"
583 .to_string())
584}
585
586#[cfg(target_os = "linux")]
587fn ensure_root() -> Result<(), String> {
588 let output = Command::new("id")
589 .arg("-u")
590 .output()
591 .map_err(|err| format!("failed to determine current uid: {err}"))?;
592 if !output.status.success() {
593 return Err("failed to determine current uid".to_string());
594 }
595 let uid = String::from_utf8_lossy(&output.stdout);
596 if uid.trim() != "0" {
597 return Err("run this command as root (sudo)".to_string());
598 }
599 Ok(())
600}
601
602#[cfg(target_os = "linux")]
603fn ensure_command_available(command: &str) -> Result<(), String> {
604 let status = Command::new("sh")
605 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
606 .status()
607 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
608 if status.success() {
609 Ok(())
610 } else {
611 Err(format!("required command not found: {command}"))
612 }
613}
614
615#[cfg(target_os = "linux")]
616fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
617 let metadata = std::fs::metadata(path)
618 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
619 #[cfg(unix)]
620 {
621 use std::os::unix::fs::PermissionsExt;
622 if metadata.permissions().mode() & 0o111 == 0 {
623 return Err(format!("binary is not executable: {}", path.display()));
624 }
625 }
626 #[cfg(not(unix))]
627 {
628 if !metadata.is_file() {
629 return Err(format!("binary is not a file: {}", path.display()));
630 }
631 }
632 Ok(())
633}
634
635#[cfg(target_os = "linux")]
636fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
637 Command::new(program)
638 .args(args)
639 .status()
640 .map(|status| status.success())
641 .map_err(|err| format!("failed to run {program}: {err}"))
642}
643
644#[cfg(target_os = "linux")]
645fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
646 let output = Command::new(program)
647 .args(args)
648 .output()
649 .map_err(|err| format!("failed to run {program}: {err}"))?;
650 if output.status.success() {
651 return Ok(());
652 }
653
654 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
655 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
656 let detail = if !stderr.is_empty() {
657 stderr
658 } else if !stdout.is_empty() {
659 stdout
660 } else {
661 format!("exit status {}", output.status)
662 };
663 Err(format!("{program} failed: {detail}"))
664}
665
666pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
667 let has_any = config.router_bind_addr.is_some()
668 || config.grpc_bind_addr.is_some()
669 || config.http_bind_addr.is_some()
670 || config.wire_bind_addr.is_some()
671 || config.pg_bind_addr.is_some();
672 if !has_any {
673 return Err("at least one server bind address must be configured".into());
674 }
675 let thread_name = if config.router_bind_addr.is_some() {
676 "red-server-router"
677 } else {
678 match (
679 config.grpc_bind_addr.is_some(),
680 config.http_bind_addr.is_some(),
681 ) {
682 (true, true) => "red-server-dual",
683 (true, false) => "red-server-grpc",
684 (false, true) => "red-server-http",
685 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
686 (false, false) => "red-server-pg-wire",
687 }
688 };
689
690 let handle = thread::Builder::new()
691 .name(thread_name.into())
692 .stack_size(8 * 1024 * 1024)
693 .spawn(move || run_configured_servers(config))
694 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
695
696 match handle.join() {
697 Ok(result) => result,
698 Err(_) => Err("server thread panicked".to_string()),
699 }
700}
701
702fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
703 let mut parts = vec![
704 config.binary_path.display().to_string(),
705 "server".to_string(),
706 "--path".to_string(),
707 config.data_path.display().to_string(),
708 ];
709
710 if let Some(bind_addr) = &config.router_bind_addr {
711 parts.push("--bind".to_string());
712 parts.push(bind_addr.clone());
713 } else if let Some(bind_addr) = &config.grpc_bind_addr {
714 parts.push("--grpc-bind".to_string());
715 parts.push(bind_addr.clone());
716 }
717 if let Some(bind_addr) = &config.http_bind_addr {
718 parts.push("--http-bind".to_string());
719 parts.push(bind_addr.clone());
720 }
721
722 parts.join(" ")
723}
724
725pub fn probe_listener(target: &str, timeout: Duration) -> bool {
726 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
727 Ok(addresses) => addresses.collect(),
728 Err(_) => return false,
729 };
730
731 addresses
732 .into_iter()
733 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
734}
735
736#[inline(never)]
737fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
738 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
744 return run_routed_server(config, router_bind_addr);
745 }
746
747 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
748 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
749 run_dual_server(config, grpc_bind_addr, http_bind_addr)
750 }
751 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
752 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
753 (None, None) => {
754 if let Some(wire_addr) = config.wire_bind_addr.clone() {
755 run_wire_only_server(config, wire_addr)
756 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
757 run_pg_only_server(config, pg_addr)
758 } else {
759 Err("at least one server bind address must be configured".to_string())
760 }
761 }
762 }
763}
764
765async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
788 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
789 .ok()
790 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
791 .unwrap_or(true);
792
793 #[cfg(unix)]
794 {
795 use tokio::signal::unix::{signal, SignalKind};
796
797 let mut sigterm = match signal(SignalKind::terminate()) {
798 Ok(s) => s,
799 Err(err) => {
800 tracing::warn!(
801 error = %err,
802 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
803 );
804 return;
805 }
806 };
807 let mut sigint = match signal(SignalKind::interrupt()) {
808 Ok(s) => s,
809 Err(err) => {
810 tracing::warn!(error = %err, "could not install SIGINT handler");
811 return;
812 }
813 };
814 let mut sighup = match signal(SignalKind::hangup()) {
820 Ok(s) => Some(s),
821 Err(err) => {
822 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
823 None
824 }
825 };
826
827 let reload_runtime = runtime.clone();
828 tokio::spawn(async move {
829 loop {
830 let signal_name = match &mut sighup {
831 Some(hup) => tokio::select! {
832 _ = sigterm.recv() => "SIGTERM",
833 _ = sigint.recv() => "SIGINT",
834 _ = hup.recv() => "SIGHUP",
835 },
836 None => tokio::select! {
837 _ = sigterm.recv() => "SIGTERM",
838 _ = sigint.recv() => "SIGINT",
839 },
840 };
841
842 if signal_name == "SIGHUP" {
843 handle_sighup_reload(&reload_runtime);
844 continue; }
846
847 tracing::info!(
848 signal = signal_name,
849 "lifecycle signal received; shutting down"
850 );
851 match runtime.graceful_shutdown(backup_on_shutdown) {
852 Ok(report) => {
853 tracing::info!(
854 duration_ms = report.duration_ms,
855 flushed_wal = report.flushed_wal,
856 final_checkpoint = report.final_checkpoint,
857 backup_uploaded = report.backup_uploaded,
858 "graceful shutdown complete"
859 );
860 }
861 Err(err) => {
862 tracing::error!(error = %err, "graceful shutdown failed");
863 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
869 reason: format!("graceful shutdown failed: {err}"),
870 }
871 .emit_global();
872 }
873 }
874 std::process::exit(0);
875 }
876 });
877 }
878
879 #[cfg(not(unix))]
880 {
881 tokio::spawn(async move {
882 let interrupted = tokio::signal::ctrl_c().await;
883 if let Err(err) = interrupted {
884 tracing::warn!(error = %err, "could not install Ctrl+C handler");
885 return;
886 }
887
888 tracing::info!(
889 signal = "Ctrl+C",
890 "lifecycle signal received; shutting down"
891 );
892 match runtime.graceful_shutdown(backup_on_shutdown) {
893 Ok(report) => {
894 tracing::info!(
895 duration_ms = report.duration_ms,
896 flushed_wal = report.flushed_wal,
897 final_checkpoint = report.final_checkpoint,
898 backup_uploaded = report.backup_uploaded,
899 "graceful shutdown complete"
900 );
901 }
902 Err(err) => {
903 tracing::error!(error = %err, "graceful shutdown failed");
904 }
905 }
906 std::process::exit(0);
907 });
908 }
909}
910
911fn handle_sighup_reload(runtime: &RedDBRuntime) {
920 let now_ms = std::time::SystemTime::now()
921 .duration_since(std::time::UNIX_EPOCH)
922 .map(|d| d.as_millis() as u64)
923 .unwrap_or(0);
924 tracing::info!(
925 target: "reddb::secrets",
926 ts_unix_ms = now_ms,
927 "SIGHUP received; secrets will be re-read from *_FILE on next access"
928 );
929 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
934 runtime.audit_log().record_event(
935 AuditEvent::builder("config/sighup_reload")
936 .source(AuditAuthSource::System)
937 .resource("secrets")
938 .outcome(Outcome::Success)
939 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
940 .build(),
941 );
942}
943
944#[inline(never)]
945fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
946 let workers = config.workers;
947 let cli_telemetry = config.telemetry.clone();
948 let db_options = config.to_db_options();
949 let rt_config = detect_runtime_config();
950 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
951 let (runtime, auth_store, _telemetry_guard) =
952 build_runtime_and_auth_store(db_options, cli_telemetry)?;
953
954 spawn_admin_metrics_listeners(&runtime, &auth_store);
955
956 let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
957 .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
958 let http_backend = http_listener
959 .local_addr()
960 .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
961 let http_server = build_http_server(
962 runtime.clone(),
963 auth_store.clone(),
964 http_backend.to_string(),
965 );
966 let http_handle = http_server.serve_in_background_on(http_listener);
967
968 thread::sleep(Duration::from_millis(100));
969 if http_handle.is_finished() {
970 return match http_handle.join() {
971 Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
972 Ok(Err(err)) => Err(err.to_string()),
973 Err(_) => Err("HTTP backend thread panicked".to_string()),
974 };
975 }
976
977 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
978 .enable_all()
979 .worker_threads(worker_threads)
980 .thread_stack_size(rt_config.stack_size)
981 .build()
982 .map_err(|err| format!("tokio runtime: {err}"))?;
983
984 let signal_runtime = runtime.clone();
985 tokio_runtime.block_on(async move {
986 spawn_lifecycle_signal_handler(signal_runtime).await;
987 let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
988 .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
989 let grpc_backend = grpc_listener
990 .local_addr()
991 .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
992 let grpc_server = RedDBGrpcServer::with_options(
993 runtime.clone(),
994 GrpcServerOptions {
995 bind_addr: grpc_backend.to_string(),
996 tls: None,
997 },
998 auth_store,
999 );
1000 tokio::spawn(async move {
1001 if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1002 tracing::error!(err = %err, "gRPC backend error");
1003 }
1004 });
1005
1006 let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1007 .await
1008 .map_err(|err| format!("bind internal wire listener: {err}"))?;
1009 let wire_backend = wire_listener
1010 .local_addr()
1011 .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1012 let wire_rt = Arc::new(runtime);
1013 tokio::spawn(async move {
1014 if let Err(err) =
1015 crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1016 .await
1017 {
1018 tracing::error!(err = %err, "redwire backend error");
1019 }
1020 });
1021
1022 tracing::info!(
1023 bind = %router_bind_addr,
1024 cpus = rt_config.available_cpus,
1025 workers = worker_threads,
1026 "router bootstrapping"
1027 );
1028 serve_tcp_router(TcpProtocolRouterConfig {
1029 bind_addr: router_bind_addr,
1030 grpc_backend,
1031 http_backend,
1032 wire_backend,
1033 })
1034 .await
1035 .map_err(|err| err.to_string())
1036 })
1037}
1038
1039fn spawn_wire_listeners(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1041 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1043 let wire_rt = Arc::new(runtime.clone());
1044 tokio::spawn(async move {
1045 #[cfg(unix)]
1048 {
1049 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1050 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1051 &wire_addr, wire_rt,
1052 )
1053 .await
1054 {
1055 tracing::error!(err = %e, "redwire unix listener error");
1056 }
1057 return;
1058 }
1059 }
1060 let cfg = crate::wire::RedWireConfig {
1061 bind_addr: wire_addr,
1062 auth_store: None,
1063 oauth: None,
1064 };
1065 if let Err(e) = crate::wire::start_redwire_listener(cfg, wire_rt).await {
1066 tracing::error!(err = %e, "redwire listener error");
1067 }
1068 });
1069 }
1070
1071 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1073 let tls_config = resolve_wire_tls_config(config);
1074 match tls_config {
1075 Ok(tls_cfg) => {
1076 let wire_rt = Arc::new(runtime.clone());
1077 tokio::spawn(async move {
1078 if let Err(e) =
1079 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1080 .await
1081 {
1082 tracing::error!(err = %e, "redwire+tls listener error");
1083 }
1084 });
1085 }
1086 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1087 }
1088 }
1089}
1090
1091fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1098 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1099 let rt = Arc::new(runtime.clone());
1100 tokio::spawn(async move {
1101 let cfg = crate::wire::PgWireConfig {
1102 bind_addr: pg_addr,
1103 ..Default::default()
1104 };
1105 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1106 tracing::error!(err = %e, "pg wire listener error");
1107 }
1108 });
1109 }
1110}
1111
1112fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1126 use crate::utils::secret_file::expand_file_env;
1127
1128 for var in [
1132 "REDDB_GRPC_TLS_CERT",
1133 "REDDB_GRPC_TLS_KEY",
1134 "REDDB_GRPC_TLS_CLIENT_CA",
1135 ] {
1136 if let Err(err) = expand_file_env(var) {
1137 tracing::warn!(
1138 target: "reddb::secrets",
1139 env = %var,
1140 err = %err,
1141 "could not expand *_FILE companion for gRPC TLS"
1142 );
1143 }
1144 }
1145
1146 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1147 (Some(cert), Some(key)) => {
1148 let cert_pem = std::fs::read(cert)
1149 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1150 let key_pem =
1151 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1152 (cert_pem, key_pem)
1153 }
1154 _ => {
1155 let dev = std::env::var("RED_GRPC_TLS_DEV")
1157 .ok()
1158 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1159 .unwrap_or(false);
1160 if !dev {
1161 return Err("gRPC TLS configured but no cert/key supplied — set \
1162 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1163 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1164 .to_string());
1165 }
1166 let dir = config
1167 .path
1168 .as_ref()
1169 .and_then(|p| p.parent())
1170 .map(PathBuf::from)
1171 .unwrap_or_else(|| PathBuf::from("."));
1172 let (cert_pem_str, key_pem_str) =
1173 crate::wire::tls::generate_self_signed_cert("localhost")
1174 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1175
1176 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1181 tracing::warn!(
1182 target: "reddb::security",
1183 transport = "grpc",
1184 cert_sha256 = %fp,
1185 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1186 DO NOT use in production"
1187 );
1188 let cert_path = dir.join("grpc-tls-cert.pem");
1190 let key_path = dir.join("grpc-tls-key.pem");
1191 if !cert_path.exists() || !key_path.exists() {
1192 let _ = std::fs::create_dir_all(&dir);
1193 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1194 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1195 std::fs::write(&key_path, key_pem_str.as_bytes())
1196 .map_err(|e| format!("write grpc dev key: {e}"))?;
1197 #[cfg(unix)]
1198 {
1199 use std::os::unix::fs::PermissionsExt;
1200 let _ =
1201 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1202 }
1203 }
1204 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1205 }
1206 };
1207
1208 let client_ca_pem = match &config.grpc_tls_client_ca {
1209 Some(path) => Some(
1210 std::fs::read(path)
1211 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1212 ),
1213 None => None,
1214 };
1215
1216 Ok(crate::GrpcTlsOptions {
1217 cert_pem,
1218 key_pem,
1219 client_ca_pem,
1220 })
1221}
1222
1223fn spawn_grpc_tls_listener_if_configured(
1227 config: &ServerCommandConfig,
1228 runtime: RedDBRuntime,
1229 auth_store: Arc<AuthStore>,
1230) {
1231 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1232 return;
1233 };
1234 let tls_opts = match resolve_grpc_tls_options(config) {
1235 Ok(opts) => opts,
1236 Err(err) => {
1237 tracing::error!(
1238 target: "reddb::security",
1239 transport = "grpc",
1240 err = %err,
1241 "gRPC TLS config error; TLS listener will not start"
1242 );
1243 return;
1244 }
1245 };
1246 tokio::spawn(async move {
1247 let server = RedDBGrpcServer::with_options(
1248 runtime,
1249 GrpcServerOptions {
1250 bind_addr: tls_bind.clone(),
1251 tls: Some(tls_opts),
1252 },
1253 auth_store,
1254 );
1255 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1256 if let Err(err) = server.serve().await {
1257 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1258 }
1259 });
1260}
1261
1262fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1265 use sha2::{Digest, Sha256};
1266 let mut h = Sha256::new();
1267 h.update(pem);
1268 let d = h.finalize();
1269 let mut buf = String::with_capacity(64);
1270 for b in d.iter() {
1271 buf.push_str(&format!("{b:02x}"));
1272 }
1273 buf
1274}
1275
1276fn resolve_wire_tls_config(
1278 config: &ServerCommandConfig,
1279) -> Result<crate::wire::WireTlsConfig, String> {
1280 match (&config.wire_tls_cert, &config.wire_tls_key) {
1281 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1282 cert_path: cert.clone(),
1283 key_path: key.clone(),
1284 }),
1285 _ => {
1286 let dir = config
1288 .path
1289 .as_ref()
1290 .and_then(|p| p.parent())
1291 .map(PathBuf::from)
1292 .unwrap_or_else(|| PathBuf::from("."));
1293 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1294 }
1295 }
1296}
1297
1298#[inline(never)]
1299fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1300 let rt_config = detect_runtime_config();
1301 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1302 let cli_telemetry = config.telemetry.clone();
1303 let db_options = config.to_db_options();
1304
1305 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1306 .enable_all()
1307 .worker_threads(workers)
1308 .thread_stack_size(rt_config.stack_size)
1309 .build()
1310 .map_err(|err| format!("tokio runtime: {err}"))?;
1311
1312 let (runtime, _auth_store, _telemetry_guard) =
1316 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1317 let signal_runtime = runtime.clone();
1318 tokio_runtime.block_on(async move {
1319 spawn_lifecycle_signal_handler(signal_runtime).await;
1320 spawn_pg_listener(&config, &runtime);
1321 let wire_rt = Arc::new(runtime);
1322 let cfg = crate::wire::RedWireConfig {
1323 bind_addr: wire_addr,
1324 auth_store: None,
1325 oauth: None,
1326 };
1327 crate::wire::start_redwire_listener(cfg, wire_rt)
1328 .await
1329 .map_err(|e| e.to_string())
1330 })
1331}
1332
1333#[inline(never)]
1334fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1335 let rt_config = detect_runtime_config();
1336 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1337 let cli_telemetry = config.telemetry.clone();
1338 let db_options = config.to_db_options();
1339
1340 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1341 .enable_all()
1342 .worker_threads(workers)
1343 .thread_stack_size(rt_config.stack_size)
1344 .build()
1345 .map_err(|err| format!("tokio runtime: {err}"))?;
1346
1347 let (runtime, _auth_store, _telemetry_guard) =
1348 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1349 let signal_runtime = runtime.clone();
1350 tokio_runtime.block_on(async move {
1351 spawn_lifecycle_signal_handler(signal_runtime).await;
1352 let cfg = crate::wire::PgWireConfig {
1353 bind_addr: pg_addr,
1354 ..Default::default()
1355 };
1356 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1357 .await
1358 .map_err(|e| e.to_string())
1359 })
1360}
1361
1362#[inline(never)]
1363fn build_runtime_and_auth_store(
1364 db_options: RedDBOptions,
1365 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1366) -> Result<
1367 (
1368 RedDBRuntime,
1369 Arc<AuthStore>,
1370 Option<crate::telemetry::TelemetryGuard>,
1371 ),
1372 String,
1373> {
1374 build_runtime_with_telemetry(db_options, cli_telemetry)
1381}
1382
1383pub(crate) fn build_runtime_with_telemetry(
1393 db_options: RedDBOptions,
1394 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1395) -> Result<
1396 (
1397 RedDBRuntime,
1398 Arc<AuthStore>,
1399 Option<crate::telemetry::TelemetryGuard>,
1400 ),
1401 String,
1402> {
1403 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1404 let msg = err.to_string();
1410 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1411 phase: "runtime_construction".to_string(),
1412 error: msg.clone(),
1413 }
1414 .emit_global();
1415 msg
1416 })?;
1417
1418 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1423 let msg = err.to_string();
1424 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1425 phase: "lease_loop".to_string(),
1426 error: msg.clone(),
1427 }
1428 .emit_global();
1429 msg
1430 })?;
1431
1432 if let Some(data_path) = db_options.data_path.as_deref() {
1436 let watch_dir = data_path.parent().unwrap_or(data_path);
1437 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1438 }
1439
1440 {
1444 let config_path = crate::runtime::config_overlay::config_file_path();
1445 let store = runtime.db().store();
1446 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1447 }
1448
1449 let merged = merge_telemetry_with_config(
1452 cli_telemetry
1453 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1454 &runtime,
1455 );
1456 let telemetry_guard = crate::telemetry::init(merged);
1457
1458 let auth_store =
1459 if db_options.auth.vault_enabled {
1460 let pager =
1461 runtime.db().store().pager().cloned().ok_or_else(|| {
1462 "vault requires a paged database (persistent mode)".to_string()
1463 })?;
1464 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1465 .map_err(|err| err.to_string())?;
1466 Arc::new(store)
1467 } else {
1468 Arc::new(AuthStore::new(db_options.auth.clone()))
1469 };
1470 auth_store.bootstrap_from_env();
1471
1472 {
1474 let store = Arc::clone(&auth_store);
1475 std::thread::Builder::new()
1476 .name("reddb-session-purge".into())
1477 .spawn(move || loop {
1478 std::thread::sleep(std::time::Duration::from_secs(300));
1479 store.purge_expired_sessions();
1480 })
1481 .ok();
1482 }
1483
1484 Ok((runtime, auth_store, telemetry_guard))
1485}
1486
1487fn merge_telemetry_with_config(
1498 mut cli: crate::telemetry::TelemetryConfig,
1499 runtime: &RedDBRuntime,
1500) -> crate::telemetry::TelemetryConfig {
1501 use crate::storage::schema::Value;
1502
1503 let store = runtime.db().store();
1504
1505 if !cli.level_explicit {
1506 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1507 cli.level_filter = v.to_string();
1508 }
1509 }
1510 if !cli.format_explicit {
1511 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1512 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1513 cli.format = parsed;
1514 }
1515 }
1516 }
1517 if !cli.rotation_keep_days_explicit {
1518 match store.get_config("red.logging.keep_days") {
1519 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1520 cli.rotation_keep_days = n as u16
1521 }
1522 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1523 cli.rotation_keep_days = n as u16
1524 }
1525 Some(Value::Text(v)) => {
1526 if let Ok(n) = v.parse::<u16>() {
1527 cli.rotation_keep_days = n;
1528 }
1529 }
1530 _ => {}
1531 }
1532 }
1533 if !cli.file_prefix_explicit {
1534 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1535 if !v.is_empty() {
1536 cli.file_prefix = v.to_string();
1537 }
1538 }
1539 }
1540 if !cli.log_dir_explicit && !cli.log_file_disabled {
1543 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1544 if !v.is_empty() {
1545 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1546 }
1547 }
1548 }
1549
1550 cli
1551}
1552
1553#[cfg(test)]
1554mod telemetry_merge_tests {
1555 use super::*;
1556 use crate::telemetry::{LogFormat, TelemetryConfig};
1557
1558 fn fresh_runtime() -> RedDBRuntime {
1559 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1560 }
1561
1562 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1563 runtime
1564 .db()
1565 .store()
1566 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1567 }
1568
1569 fn cli_base() -> TelemetryConfig {
1570 TelemetryConfig {
1573 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1574 format: LogFormat::Json,
1575 ..Default::default()
1576 }
1577 }
1578
1579 #[test]
1580 fn config_log_dir_promoted_when_flag_absent() {
1581 let runtime = fresh_runtime();
1582 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1583 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1584 assert_eq!(
1585 merged.log_dir.as_deref(),
1586 Some(std::path::Path::new("/var/log/reddb"))
1587 );
1588 }
1589
1590 #[test]
1591 fn explicit_log_dir_wins_over_config() {
1592 let runtime = fresh_runtime();
1593 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1594 let mut cli = cli_base();
1595 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1596 cli.log_dir_explicit = true;
1597 let merged = merge_telemetry_with_config(cli, &runtime);
1598 assert_eq!(
1599 merged.log_dir.as_deref(),
1600 Some(std::path::Path::new("/custom/dir"))
1601 );
1602 }
1603
1604 #[test]
1605 fn no_log_file_beats_config_log_dir() {
1606 let runtime = fresh_runtime();
1607 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1608 let mut cli = cli_base();
1609 cli.log_dir = None;
1610 cli.log_file_disabled = true;
1611 let merged = merge_telemetry_with_config(cli, &runtime);
1612 assert!(
1613 merged.log_dir.is_none(),
1614 "--no-log-file must veto config dir"
1615 );
1616 }
1617
1618 #[test]
1619 fn config_format_promoted_on_non_tty_default() {
1620 let runtime = fresh_runtime();
1624 set_str(&runtime, "red.logging.format", "pretty");
1625 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1626 assert_eq!(merged.format, LogFormat::Pretty);
1627 }
1628
1629 #[test]
1630 fn explicit_format_wins_over_config() {
1631 let runtime = fresh_runtime();
1632 set_str(&runtime, "red.logging.format", "pretty");
1633 let mut cli = cli_base();
1634 cli.format = LogFormat::Json;
1635 cli.format_explicit = true;
1636 let merged = merge_telemetry_with_config(cli, &runtime);
1637 assert_eq!(merged.format, LogFormat::Json);
1638 }
1639}
1640
1641#[inline(never)]
1642fn build_http_server(
1643 runtime: RedDBRuntime,
1644 auth_store: Arc<AuthStore>,
1645 bind_addr: String,
1646) -> RedDBServer {
1647 RedDBServer::with_options(
1648 runtime,
1649 ServerOptions {
1650 bind_addr,
1651 ..ServerOptions::default()
1652 },
1653 )
1654 .with_auth(auth_store)
1655}
1656
1657#[inline(never)]
1661fn build_admin_only_server(
1662 runtime: RedDBRuntime,
1663 auth_store: Arc<AuthStore>,
1664 bind_addr: String,
1665) -> RedDBServer {
1666 RedDBServer::with_options(
1667 runtime,
1668 ServerOptions {
1669 bind_addr,
1670 surface: crate::server::ServerSurface::AdminOnly,
1671 ..ServerOptions::default()
1672 },
1673 )
1674 .with_auth(auth_store)
1675}
1676
1677#[inline(never)]
1681fn build_metrics_only_server(
1682 runtime: RedDBRuntime,
1683 auth_store: Arc<AuthStore>,
1684 bind_addr: String,
1685) -> RedDBServer {
1686 RedDBServer::with_options(
1687 runtime,
1688 ServerOptions {
1689 bind_addr,
1690 surface: crate::server::ServerSurface::MetricsOnly,
1691 ..ServerOptions::default()
1692 },
1693 )
1694 .with_auth(auth_store)
1695}
1696
1697fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
1701 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
1702 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1703 let _ = server.serve_in_background();
1704 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
1705 }
1706 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
1707 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1708 let _ = server.serve_in_background();
1709 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
1710 }
1711}
1712
1713#[inline(never)]
1714fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1715 let cli_telemetry = config.telemetry.clone();
1716 let (runtime, auth_store, _telemetry_guard) =
1717 build_runtime_and_auth_store(config.to_db_options(), cli_telemetry)?;
1718 spawn_admin_metrics_listeners(&runtime, &auth_store);
1719 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1720 let server = build_http_server(runtime, auth_store, bind_addr.clone());
1721 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
1722 server.serve().map_err(|err| err.to_string())
1723}
1724
1725fn spawn_http_tls_listener(
1731 config: &ServerCommandConfig,
1732 runtime: &RedDBRuntime,
1733 auth_store: &Arc<AuthStore>,
1734) -> Result<(), String> {
1735 let Some(addr) = config.http_tls_bind_addr.clone() else {
1736 return Ok(());
1737 };
1738
1739 let tls_config = resolve_http_tls_config(config)?;
1740 let server_config = crate::server::tls::build_server_config(&tls_config)
1741 .map_err(|err| format!("HTTP TLS: {err}"))?;
1742
1743 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
1744 let _handle = server.serve_tls_in_background(server_config);
1745 tracing::info!(
1746 transport = "https",
1747 bind = %addr,
1748 mtls = %tls_config.client_ca_path.is_some(),
1749 "TLS listener online"
1750 );
1751 Ok(())
1752}
1753
1754fn resolve_http_tls_config(
1756 config: &ServerCommandConfig,
1757) -> Result<crate::server::tls::HttpTlsConfig, String> {
1758 match (&config.http_tls_cert, &config.http_tls_key) {
1759 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
1760 cert_path: cert.clone(),
1761 key_path: key.clone(),
1762 client_ca_path: config.http_tls_client_ca.clone(),
1763 }),
1764 (None, None) => {
1765 let dir = config
1767 .path
1768 .as_ref()
1769 .and_then(|p| p.parent().map(std::path::PathBuf::from))
1770 .unwrap_or_else(|| std::path::PathBuf::from("."));
1771 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
1772 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
1773 Ok(crate::server::tls::HttpTlsConfig {
1774 cert_path: auto.cert_path,
1775 key_path: auto.key_path,
1776 client_ca_path: config.http_tls_client_ca.clone(),
1777 })
1778 }
1779 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
1780 }
1781}
1782
1783#[inline(never)]
1784fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1785 let workers = config.workers;
1786 let cli_telemetry = config.telemetry.clone();
1787 let db_options = config.to_db_options();
1788 let rt_config = detect_runtime_config();
1789
1790 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1791
1792 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1793 .enable_all()
1794 .worker_threads(worker_threads)
1795 .thread_stack_size(rt_config.stack_size)
1796 .build()
1797 .map_err(|err| format!("tokio runtime: {err}"))?;
1798
1799 let (runtime, auth_store, _telemetry_guard) =
1801 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1802 let signal_runtime = runtime.clone();
1803 tokio_runtime.block_on(async move {
1804 spawn_lifecycle_signal_handler(signal_runtime).await;
1805 spawn_wire_listeners(&config, &runtime);
1807
1808 spawn_pg_listener(&config, &runtime);
1810
1811 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1815
1816 let server = RedDBGrpcServer::with_options(
1817 runtime,
1818 GrpcServerOptions {
1819 bind_addr: bind_addr.clone(),
1820 tls: None,
1821 },
1822 auth_store,
1823 );
1824
1825 tracing::info!(
1826 transport = "grpc",
1827 bind = %bind_addr,
1828 cpus = rt_config.available_cpus,
1829 workers = worker_threads,
1830 "listener online"
1831 );
1832 server.serve().await.map_err(|err| err.to_string())
1833 })
1834}
1835
1836#[inline(never)]
1837fn run_dual_server(
1838 config: ServerCommandConfig,
1839 grpc_bind_addr: String,
1840 http_bind_addr: String,
1841) -> Result<(), String> {
1842 let workers = config.workers;
1843 let wire_bind_addr = config.wire_bind_addr.clone();
1844 let cli_telemetry = config.telemetry.clone();
1845 let db_options = config.to_db_options();
1846 let rt_config = detect_runtime_config();
1847 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1848 let (runtime, auth_store, _telemetry_guard) =
1849 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1850
1851 spawn_admin_metrics_listeners(&runtime, &auth_store);
1852 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1853
1854 let http_server =
1855 build_http_server(runtime.clone(), auth_store.clone(), http_bind_addr.clone());
1856 let http_handle = http_server.serve_in_background();
1857
1858 thread::sleep(Duration::from_millis(150));
1859 if http_handle.is_finished() {
1860 return match http_handle.join() {
1861 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
1862 Ok(Err(err)) => Err(err.to_string()),
1863 Err(_) => Err("HTTP server thread panicked".to_string()),
1864 };
1865 }
1866
1867 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1868 .enable_all()
1869 .worker_threads(worker_threads)
1870 .thread_stack_size(rt_config.stack_size)
1871 .build()
1872 .map_err(|err| format!("tokio runtime: {err}"))?;
1873
1874 let signal_runtime = runtime.clone();
1875 tokio_runtime.block_on(async move {
1876 spawn_lifecycle_signal_handler(signal_runtime).await;
1877 spawn_wire_listeners(&config, &runtime);
1879
1880 spawn_pg_listener(&config, &runtime);
1882
1883 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1885
1886 let server = RedDBGrpcServer::with_options(
1887 runtime,
1888 GrpcServerOptions {
1889 bind_addr: grpc_bind_addr.clone(),
1890 tls: None,
1891 },
1892 auth_store,
1893 );
1894
1895 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
1896 tracing::info!(
1897 transport = "grpc",
1898 bind = %grpc_bind_addr,
1899 cpus = rt_config.available_cpus,
1900 workers = worker_threads,
1901 "listener online"
1902 );
1903 server.serve().await.map_err(|err| err.to_string())
1904 })
1905}
1906
1907#[cfg(test)]
1908mod tests {
1909 use super::*;
1910
1911 #[test]
1912 fn render_systemd_unit_contains_expected_execstart() {
1913 let config = SystemdServiceConfig {
1914 service_name: "reddb".to_string(),
1915 binary_path: PathBuf::from("/usr/local/bin/red"),
1916 run_user: "reddb".to_string(),
1917 run_group: "reddb".to_string(),
1918 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1919 router_bind_addr: None,
1920 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1921 http_bind_addr: None,
1922 };
1923
1924 let unit = render_systemd_unit(&config);
1925 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
1926 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
1927 }
1928
1929 #[test]
1930 fn systemd_service_config_derives_paths() {
1931 let config = SystemdServiceConfig {
1932 service_name: "reddb-api".to_string(),
1933 binary_path: PathBuf::from("/usr/local/bin/red"),
1934 run_user: "reddb".to_string(),
1935 run_group: "reddb".to_string(),
1936 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
1937 router_bind_addr: None,
1938 grpc_bind_addr: None,
1939 http_bind_addr: Some("127.0.0.1:5055".to_string()),
1940 };
1941
1942 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
1943 assert_eq!(
1944 config.unit_path(),
1945 PathBuf::from("/etc/systemd/system/reddb-api.service")
1946 );
1947 }
1948
1949 #[test]
1950 fn render_systemd_unit_supports_dual_transport() {
1951 let config = SystemdServiceConfig {
1952 service_name: "reddb".to_string(),
1953 binary_path: PathBuf::from("/usr/local/bin/red"),
1954 run_user: "reddb".to_string(),
1955 run_group: "reddb".to_string(),
1956 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1957 router_bind_addr: None,
1958 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1959 http_bind_addr: Some("0.0.0.0:5055".to_string()),
1960 };
1961
1962 let unit = render_systemd_unit(&config);
1963 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
1964 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
1965 }
1966
1967 #[test]
1968 fn render_systemd_unit_supports_router_mode() {
1969 let config = SystemdServiceConfig {
1970 service_name: "reddb".to_string(),
1971 binary_path: PathBuf::from("/usr/local/bin/red"),
1972 run_user: "reddb".to_string(),
1973 run_group: "reddb".to_string(),
1974 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1975 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
1976 grpc_bind_addr: None,
1977 http_bind_addr: None,
1978 };
1979
1980 let unit = render_systemd_unit(&config);
1981 assert!(unit.contains("--bind 127.0.0.1:5050"));
1982 assert!(!unit.contains("--grpc-bind"));
1983 assert!(!unit.contains("--http-bind"));
1984 }
1985}