1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub router_bind_explicit: bool,
71 pub grpc_bind_addr: Option<String>,
72 pub grpc_bind_explicit: bool,
73 pub grpc_tls_bind_addr: Option<String>,
77 pub grpc_tls_cert: Option<PathBuf>,
83 pub grpc_tls_key: Option<PathBuf>,
86 pub grpc_tls_client_ca: Option<PathBuf>,
91 pub http_bind_addr: Option<String>,
92 pub http_bind_explicit: bool,
93 pub http_tls_bind_addr: Option<String>,
97 pub http_tls_cert: Option<PathBuf>,
100 pub http_tls_key: Option<PathBuf>,
103 pub http_tls_client_ca: Option<PathBuf>,
107 pub wire_bind_addr: Option<String>,
108 pub wire_bind_explicit: bool,
109 pub wire_tls_bind_addr: Option<String>,
111 pub wire_tls_cert: Option<PathBuf>,
113 pub wire_tls_key: Option<PathBuf>,
115 pub pg_bind_addr: Option<String>,
119 pub create_if_missing: bool,
120 pub read_only: bool,
121 pub role: String,
122 pub primary_addr: Option<String>,
123 pub vault: bool,
124 pub workers: Option<usize>,
126 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct TransportListenerState {
133 pub transport: String,
134 pub bind_addr: String,
135 pub explicit: bool,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct TransportListenerFailure {
140 pub transport: String,
141 pub bind_addr: String,
142 pub explicit: bool,
143 pub reason: String,
144}
145
146#[derive(Debug, Clone, Default, PartialEq, Eq)]
147pub struct TransportReadiness {
148 pub active: Vec<TransportListenerState>,
149 pub failed: Vec<TransportListenerFailure>,
150}
151
152impl TransportReadiness {
153 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
154 self.active.push(TransportListenerState {
155 transport: transport.to_string(),
156 bind_addr: bind_addr.to_string(),
157 explicit,
158 });
159 }
160
161 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
162 self.failed.push(TransportListenerFailure {
163 transport: transport.to_string(),
164 bind_addr: bind_addr.to_string(),
165 explicit,
166 reason,
167 });
168 }
169}
170
171#[derive(Debug, Clone)]
172pub struct SystemdServiceConfig {
173 pub service_name: String,
174 pub binary_path: PathBuf,
175 pub run_user: String,
176 pub run_group: String,
177 pub data_path: PathBuf,
178 pub router_bind_addr: Option<String>,
179 pub grpc_bind_addr: Option<String>,
180 pub http_bind_addr: Option<String>,
181}
182
183impl SystemdServiceConfig {
184 pub fn data_dir(&self) -> PathBuf {
185 self.data_path
186 .parent()
187 .map(PathBuf::from)
188 .unwrap_or_else(|| PathBuf::from("."))
189 }
190
191 pub fn unit_path(&self) -> PathBuf {
192 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
193 }
194}
195
196pub fn default_telemetry_for_path(
201 path: Option<&std::path::Path>,
202) -> crate::telemetry::TelemetryConfig {
203 let log_dir = match path {
204 Some(p) => p
205 .parent()
206 .map(|parent| parent.join("logs"))
207 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
208 None => None, };
210 crate::telemetry::TelemetryConfig {
211 log_dir,
212 file_prefix: "reddb.log".to_string(),
213 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
214 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
215 crate::telemetry::LogFormat::Pretty
216 } else {
217 crate::telemetry::LogFormat::Json
218 },
219 rotation_keep_days: 14,
220 service_name: "reddb",
221 level_explicit: false,
223 format_explicit: false,
224 rotation_keep_days_explicit: false,
225 file_prefix_explicit: false,
226 log_dir_explicit: false,
227 log_file_disabled: false,
228 }
229}
230
231impl ServerCommandConfig {
232 fn to_db_options(&self) -> RedDBOptions {
233 let mut options = match &self.path {
234 Some(path) => RedDBOptions::persistent(path),
235 None => RedDBOptions::in_memory(),
236 };
237
238 options.mode = StorageMode::Persistent;
239 options.create_if_missing = self.create_if_missing;
240 options.read_only = self.read_only
247 || env_nonempty("RED_READONLY")
248 .or_else(|| env_nonempty("REDDB_READONLY"))
249 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
250 .unwrap_or(false)
251 || self.path.as_ref().is_some_and(|data_path| {
252 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
253 data_path,
254 ))
255 .unwrap_or(false)
256 });
257
258 options.replication = match self.role.as_str() {
259 "primary" => ReplicationConfig::primary(),
260 "replica" => {
261 let primary_addr = self
262 .primary_addr
263 .clone()
264 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
265 ReplicationConfig::replica(primary_addr)
272 }
273 _ => ReplicationConfig::standalone(),
274 };
275
276 if self.vault {
277 options.auth.vault_enabled = true;
278 }
279
280 configure_remote_backend_from_env(&mut options);
281
282 options
283 }
284
285 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
286 let mut transports = Vec::with_capacity(3);
287 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
288 transports.push(ServerTransport::Grpc);
289 }
290 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
291 transports.push(ServerTransport::Http);
292 }
293 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
294 transports.push(ServerTransport::Wire);
295 }
296 transports
297 }
298}
299
300fn env_nonempty(name: &str) -> Option<String> {
305 crate::utils::env_with_file_fallback(name)
306}
307
308fn env_truthy(name: &str) -> bool {
309 env_nonempty(name)
310 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
311 .unwrap_or(false)
312}
313
314fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
315 let backend = env_nonempty("RED_BACKEND")
321 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
322 .unwrap_or_else(|| "none".to_string())
323 .to_ascii_lowercase();
324
325 match backend.as_str() {
326 "s3" | "minio" | "r2" => {
331 #[cfg(feature = "backend-s3")]
332 {
333 if let Some(config) = s3_config_from_env() {
334 let remote_key = env_nonempty("RED_REMOTE_KEY")
335 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
336 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
337 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
338 options.remote_backend = Some(backend.clone());
339 options.remote_backend_atomic = Some(backend);
340 options.remote_key = Some(remote_key);
341 }
342 }
343 #[cfg(not(feature = "backend-s3"))]
344 {
345 tracing::warn!(
346 backend = %backend,
347 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
348 );
349 }
350 }
351 "fs" | "local" => {
356 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
357 let remote_key = match (
358 base_path,
359 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
360 ) {
361 (Some(base), Some(rel)) => Some(format!(
362 "{}/{}",
363 base.trim_end_matches('/'),
364 rel.trim_start_matches('/')
365 )),
366 (Some(base), None) => Some(format!(
367 "{}/clusters/dev/data.rdb",
368 base.trim_end_matches('/')
369 )),
370 (None, Some(rel)) => Some(rel),
371 (None, None) => None,
372 };
373 if let Some(key) = remote_key {
374 let backend = Arc::new(crate::storage::backend::LocalBackend);
375 options.remote_backend = Some(backend.clone());
376 options.remote_backend_atomic = Some(backend);
377 options.remote_key = Some(key);
378 }
379 }
380 "http" => {
385 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
386 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
387 {
388 Some(u) => u,
389 None => {
390 tracing::warn!(
391 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
392 );
393 return;
394 }
395 };
396 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
397 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
398 .unwrap_or_default();
399 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
400 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
401 {
402 std::fs::read_to_string(&path)
403 .ok()
404 .map(|s| s.trim().to_string())
405 .filter(|s| !s.is_empty())
406 } else {
407 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
408 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
409 };
410
411 let mut config =
412 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
413 if let Some(auth) = auth_header {
414 config = config.with_auth_header(auth);
415 }
416 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
417 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
418 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
419 config = config.with_conditional_writes(conditional_writes);
420 if conditional_writes {
425 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
426 Ok(atomic) => {
427 let atomic_arc = Arc::new(atomic);
428 options.remote_backend = Some(atomic_arc.clone());
429 options.remote_backend_atomic = Some(atomic_arc);
430 }
431 Err(err) => {
432 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
433 options.remote_backend =
434 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
435 }
436 }
437 } else {
438 options.remote_backend =
439 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
440 }
441 options.remote_key = env_nonempty("RED_REMOTE_KEY")
442 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
443 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
444 }
445 "none" | "" => {}
448 other => {
449 tracing::warn!(
450 backend = %other,
451 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
452 );
453 }
454 }
455}
456
457#[cfg(feature = "backend-s3")]
462fn env_s3(suffix: &str) -> Option<String> {
463 env_nonempty(&format!("RED_S3_{suffix}"))
464 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
465}
466
467#[cfg(feature = "backend-s3")]
473fn env_s3_secret(suffix: &str) -> Option<String> {
474 let file_key_red = format!("RED_S3_{suffix}_FILE");
475 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
476 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
477 return std::fs::read_to_string(&path)
478 .ok()
479 .map(|s| s.trim().to_string())
480 .filter(|s| !s.is_empty());
481 }
482 env_s3(suffix)
483}
484
485#[cfg(feature = "backend-s3")]
486fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
487 let endpoint = env_s3("ENDPOINT")?;
488 let bucket = env_s3("BUCKET")?;
489 let access_key = env_s3_secret("ACCESS_KEY")?;
490 let secret_key = env_s3_secret("SECRET_KEY")?;
491 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
492 let key_prefix = env_s3("KEY_PREFIX")
493 .or_else(|| env_s3("PREFIX"))
494 .unwrap_or_default();
495 let path_style = env_s3("PATH_STYLE")
496 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
497 .unwrap_or(true);
498 Some(crate::storage::backend::S3Config {
499 endpoint,
500 bucket,
501 key_prefix,
502 access_key,
503 secret_key,
504 region,
505 path_style,
506 })
507}
508
509pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
510 let data_dir = config.data_dir();
511 let exec_start = render_systemd_exec_start(config);
512 format!(
513 "[Unit]\n\
514Description=RedDB unified database service\n\
515After=network-online.target\n\
516Wants=network-online.target\n\
517\n\
518[Service]\n\
519Type=simple\n\
520User={user}\n\
521Group={group}\n\
522WorkingDirectory={workdir}\n\
523ExecStart={exec_start}\n\
524Restart=always\n\
525RestartSec=2\n\
526LimitSTACK=16M\n\
527NoNewPrivileges=true\n\
528PrivateTmp=true\n\
529ProtectSystem=strict\n\
530ProtectHome=true\n\
531ProtectControlGroups=true\n\
532ProtectKernelTunables=true\n\
533ProtectKernelModules=true\n\
534RestrictNamespaces=true\n\
535LockPersonality=true\n\
536MemoryDenyWriteExecute=true\n\
537ReadWritePaths={workdir}\n\
538\n\
539[Install]\n\
540WantedBy=multi-user.target\n",
541 user = config.run_user,
542 group = config.run_group,
543 workdir = data_dir.display(),
544 exec_start = exec_start,
545 )
546}
547
548#[cfg(target_os = "linux")]
557pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
558 ensure_root()?;
559 ensure_command_available("systemctl")?;
560 ensure_command_available("getent")?;
561 ensure_command_available("groupadd")?;
562 ensure_command_available("useradd")?;
563 ensure_command_available("install")?;
564 ensure_executable(&config.binary_path)?;
565
566 if !command_success("getent", ["group", config.run_group.as_str()])? {
567 run_command("groupadd", ["--system", config.run_group.as_str()])?;
568 }
569
570 if !command_success("id", ["-u", config.run_user.as_str()])? {
571 let data_dir = config.data_dir();
572 run_command(
573 "useradd",
574 [
575 "--system",
576 "--gid",
577 config.run_group.as_str(),
578 "--home-dir",
579 data_dir.to_string_lossy().as_ref(),
580 "--shell",
581 "/usr/sbin/nologin",
582 config.run_user.as_str(),
583 ],
584 )?;
585 }
586
587 let data_dir = config.data_dir();
588 run_command(
589 "install",
590 [
591 "-d",
592 "-o",
593 config.run_user.as_str(),
594 "-g",
595 config.run_group.as_str(),
596 "-m",
597 "0750",
598 data_dir.to_string_lossy().as_ref(),
599 ],
600 )?;
601
602 std::fs::write(config.unit_path(), render_systemd_unit(config))
603 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
604
605 run_command("systemctl", ["daemon-reload"])?;
606 run_command(
607 "systemctl",
608 [
609 "enable",
610 "--now",
611 format!("{}.service", config.service_name).as_str(),
612 ],
613 )?;
614
615 Ok(())
616}
617
618#[cfg(not(target_os = "linux"))]
623pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
624 Err("systemd install is Linux-only — use sc.exe (Windows) or \
625 launchd (macOS) to install the service manually using the \
626 unit printed by `red service print-unit`"
627 .to_string())
628}
629
630#[cfg(target_os = "linux")]
631fn ensure_root() -> Result<(), String> {
632 let output = Command::new("id")
633 .arg("-u")
634 .output()
635 .map_err(|err| format!("failed to determine current uid: {err}"))?;
636 if !output.status.success() {
637 return Err("failed to determine current uid".to_string());
638 }
639 let uid = String::from_utf8_lossy(&output.stdout);
640 if uid.trim() != "0" {
641 return Err("run this command as root (sudo)".to_string());
642 }
643 Ok(())
644}
645
646#[cfg(target_os = "linux")]
647fn ensure_command_available(command: &str) -> Result<(), String> {
648 let status = Command::new("sh")
649 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
650 .status()
651 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
652 if status.success() {
653 Ok(())
654 } else {
655 Err(format!("required command not found: {command}"))
656 }
657}
658
659#[cfg(target_os = "linux")]
660fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
661 let metadata = std::fs::metadata(path)
662 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
663 #[cfg(unix)]
664 {
665 use std::os::unix::fs::PermissionsExt;
666 if metadata.permissions().mode() & 0o111 == 0 {
667 return Err(format!("binary is not executable: {}", path.display()));
668 }
669 }
670 #[cfg(not(unix))]
671 {
672 if !metadata.is_file() {
673 return Err(format!("binary is not a file: {}", path.display()));
674 }
675 }
676 Ok(())
677}
678
679#[cfg(target_os = "linux")]
680fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
681 Command::new(program)
682 .args(args)
683 .status()
684 .map(|status| status.success())
685 .map_err(|err| format!("failed to run {program}: {err}"))
686}
687
688#[cfg(target_os = "linux")]
689fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
690 let output = Command::new(program)
691 .args(args)
692 .output()
693 .map_err(|err| format!("failed to run {program}: {err}"))?;
694 if output.status.success() {
695 return Ok(());
696 }
697
698 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
699 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
700 let detail = if !stderr.is_empty() {
701 stderr
702 } else if !stdout.is_empty() {
703 stdout
704 } else {
705 format!("exit status {}", output.status)
706 };
707 Err(format!("{program} failed: {detail}"))
708}
709
710pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
711 let has_any = config.router_bind_addr.is_some()
712 || config.grpc_bind_addr.is_some()
713 || config.http_bind_addr.is_some()
714 || config.wire_bind_addr.is_some()
715 || config.pg_bind_addr.is_some();
716 if !has_any {
717 return Err("at least one server bind address must be configured".into());
718 }
719 let thread_name = if config.router_bind_addr.is_some() {
720 "red-server-router"
721 } else {
722 match (
723 config.grpc_bind_addr.is_some(),
724 config.http_bind_addr.is_some(),
725 ) {
726 (true, true) => "red-server-dual",
727 (true, false) => "red-server-grpc",
728 (false, true) => "red-server-http",
729 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
730 (false, false) => "red-server-pg-wire",
731 }
732 };
733
734 let handle = thread::Builder::new()
735 .name(thread_name.into())
736 .stack_size(8 * 1024 * 1024)
737 .spawn(move || run_configured_servers(config))
738 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
739
740 match handle.join() {
741 Ok(result) => result,
742 Err(_) => Err("server thread panicked".to_string()),
743 }
744}
745
746fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
747 let mut parts = vec![
748 config.binary_path.display().to_string(),
749 "server".to_string(),
750 "--path".to_string(),
751 config.data_path.display().to_string(),
752 ];
753
754 if let Some(bind_addr) = &config.router_bind_addr {
755 parts.push("--bind".to_string());
756 parts.push(bind_addr.clone());
757 } else if let Some(bind_addr) = &config.grpc_bind_addr {
758 parts.push("--grpc-bind".to_string());
759 parts.push(bind_addr.clone());
760 }
761 if let Some(bind_addr) = &config.http_bind_addr {
762 parts.push("--http-bind".to_string());
763 parts.push(bind_addr.clone());
764 }
765
766 parts.join(" ")
767}
768
769pub fn probe_listener(target: &str, timeout: Duration) -> bool {
770 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
771 Ok(addresses) => addresses.collect(),
772 Err(_) => return false,
773 };
774
775 addresses
776 .into_iter()
777 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
778}
779
780#[inline(never)]
781fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
782 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
788 return run_routed_server(config, router_bind_addr);
789 }
790
791 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
792 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
793 run_dual_server(config, grpc_bind_addr, http_bind_addr)
794 }
795 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
796 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
797 (None, None) => {
798 if let Some(wire_addr) = config.wire_bind_addr.clone() {
799 run_wire_only_server(config, wire_addr)
800 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
801 run_pg_only_server(config, pg_addr)
802 } else {
803 Err("at least one server bind address must be configured".to_string())
804 }
805 }
806 }
807}
808
809fn bind_listener_for_startup(
810 readiness: &mut TransportReadiness,
811 transport: &str,
812 bind_addr: &str,
813 explicit: bool,
814) -> Result<Option<TcpListener>, String> {
815 match TcpListener::bind(bind_addr) {
816 Ok(listener) => {
817 readiness.active(transport, bind_addr, explicit);
818 Ok(Some(listener))
819 }
820 Err(err) => {
821 let reason = format!("{transport} listener bind {bind_addr}: {err}");
822 readiness.failed(transport, bind_addr, explicit, reason.clone());
823 if explicit {
824 tracing::error!(
825 transport,
826 bind = %bind_addr,
827 error = %err,
828 "fatal explicit bind failure"
829 );
830 Err(format!("explicit {reason}"))
831 } else {
832 tracing::warn!(
833 transport,
834 bind = %bind_addr,
835 error = %err,
836 "non-fatal implicit bind failure; listener degraded"
837 );
838 Ok(None)
839 }
840 }
841 }
842}
843
844async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
867 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
868 .ok()
869 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
870 .unwrap_or(true);
871
872 #[cfg(unix)]
873 {
874 use tokio::signal::unix::{signal, SignalKind};
875
876 let mut sigterm = match signal(SignalKind::terminate()) {
877 Ok(s) => s,
878 Err(err) => {
879 tracing::warn!(
880 error = %err,
881 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
882 );
883 return;
884 }
885 };
886 let mut sigint = match signal(SignalKind::interrupt()) {
887 Ok(s) => s,
888 Err(err) => {
889 tracing::warn!(error = %err, "could not install SIGINT handler");
890 return;
891 }
892 };
893 let mut sighup = match signal(SignalKind::hangup()) {
899 Ok(s) => Some(s),
900 Err(err) => {
901 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
902 None
903 }
904 };
905
906 let reload_runtime = runtime.clone();
907 tokio::spawn(async move {
908 loop {
909 let signal_name = match &mut sighup {
910 Some(hup) => tokio::select! {
911 _ = sigterm.recv() => "SIGTERM",
912 _ = sigint.recv() => "SIGINT",
913 _ = hup.recv() => "SIGHUP",
914 },
915 None => tokio::select! {
916 _ = sigterm.recv() => "SIGTERM",
917 _ = sigint.recv() => "SIGINT",
918 },
919 };
920
921 if signal_name == "SIGHUP" {
922 handle_sighup_reload(&reload_runtime);
923 continue; }
925
926 tracing::info!(
927 signal = signal_name,
928 "lifecycle signal received; shutting down"
929 );
930 match runtime.graceful_shutdown(backup_on_shutdown) {
931 Ok(report) => {
932 tracing::info!(
933 duration_ms = report.duration_ms,
934 flushed_wal = report.flushed_wal,
935 final_checkpoint = report.final_checkpoint,
936 backup_uploaded = report.backup_uploaded,
937 "graceful shutdown complete"
938 );
939 }
940 Err(err) => {
941 tracing::error!(error = %err, "graceful shutdown failed");
942 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
948 reason: format!("graceful shutdown failed: {err}"),
949 }
950 .emit_global();
951 }
952 }
953 std::process::exit(0);
954 }
955 });
956 }
957
958 #[cfg(not(unix))]
959 {
960 tokio::spawn(async move {
961 let interrupted = tokio::signal::ctrl_c().await;
962 if let Err(err) = interrupted {
963 tracing::warn!(error = %err, "could not install Ctrl+C handler");
964 return;
965 }
966
967 tracing::info!(
968 signal = "Ctrl+C",
969 "lifecycle signal received; shutting down"
970 );
971 match runtime.graceful_shutdown(backup_on_shutdown) {
972 Ok(report) => {
973 tracing::info!(
974 duration_ms = report.duration_ms,
975 flushed_wal = report.flushed_wal,
976 final_checkpoint = report.final_checkpoint,
977 backup_uploaded = report.backup_uploaded,
978 "graceful shutdown complete"
979 );
980 }
981 Err(err) => {
982 tracing::error!(error = %err, "graceful shutdown failed");
983 }
984 }
985 std::process::exit(0);
986 });
987 }
988}
989
990fn handle_sighup_reload(runtime: &RedDBRuntime) {
999 let now_ms = std::time::SystemTime::now()
1000 .duration_since(std::time::UNIX_EPOCH)
1001 .map(|d| d.as_millis() as u64)
1002 .unwrap_or(0);
1003 tracing::info!(
1004 target: "reddb::secrets",
1005 ts_unix_ms = now_ms,
1006 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1007 );
1008 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1013 runtime.audit_log().record_event(
1014 AuditEvent::builder("config/sighup_reload")
1015 .source(AuditAuthSource::System)
1016 .resource("secrets")
1017 .outcome(Outcome::Success)
1018 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1019 .build(),
1020 );
1021}
1022
1023#[inline(never)]
1024fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1025 let workers = config.workers;
1026 let cli_telemetry = config.telemetry.clone();
1027 let db_options = config.to_db_options();
1028 let rt_config = detect_runtime_config();
1029 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1030 let (runtime, auth_store, _telemetry_guard) =
1031 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1032
1033 spawn_admin_metrics_listeners(&runtime, &auth_store);
1034
1035 let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1036 .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1037 let http_backend = http_listener
1038 .local_addr()
1039 .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1040 let http_server = build_http_server(
1041 runtime.clone(),
1042 auth_store.clone(),
1043 http_backend.to_string(),
1044 );
1045 let http_handle = http_server.serve_in_background_on(http_listener);
1046
1047 thread::sleep(Duration::from_millis(100));
1048 if http_handle.is_finished() {
1049 return match http_handle.join() {
1050 Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1051 Ok(Err(err)) => Err(err.to_string()),
1052 Err(_) => Err("HTTP backend thread panicked".to_string()),
1053 };
1054 }
1055
1056 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1057 .enable_all()
1058 .worker_threads(worker_threads)
1059 .thread_stack_size(rt_config.stack_size)
1060 .build()
1061 .map_err(|err| format!("tokio runtime: {err}"))?;
1062
1063 let signal_runtime = runtime.clone();
1064 tokio_runtime.block_on(async move {
1065 spawn_lifecycle_signal_handler(signal_runtime).await;
1066 let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1067 .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1068 let grpc_backend = grpc_listener
1069 .local_addr()
1070 .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1071 let grpc_server = RedDBGrpcServer::with_options(
1072 runtime.clone(),
1073 GrpcServerOptions {
1074 bind_addr: grpc_backend.to_string(),
1075 tls: None,
1076 },
1077 auth_store,
1078 );
1079 tokio::spawn(async move {
1080 if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1081 tracing::error!(err = %err, "gRPC backend error");
1082 }
1083 });
1084
1085 let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1086 .await
1087 .map_err(|err| format!("bind internal wire listener: {err}"))?;
1088 let wire_backend = wire_listener
1089 .local_addr()
1090 .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1091 let wire_rt = Arc::new(runtime);
1092 tokio::spawn(async move {
1093 if let Err(err) =
1094 crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1095 .await
1096 {
1097 tracing::error!(err = %err, "redwire backend error");
1098 }
1099 });
1100
1101 tracing::info!(
1102 bind = %router_bind_addr,
1103 cpus = rt_config.available_cpus,
1104 workers = worker_threads,
1105 "router bootstrapping"
1106 );
1107 serve_tcp_router(TcpProtocolRouterConfig {
1108 bind_addr: router_bind_addr,
1109 grpc_backend,
1110 http_backend,
1111 wire_backend,
1112 })
1113 .await
1114 .map_err(|err| err.to_string())
1115 })
1116}
1117
1118async fn spawn_wire_listeners(
1120 config: &ServerCommandConfig,
1121 runtime: &RedDBRuntime,
1122 readiness: &mut TransportReadiness,
1123) -> Result<(), String> {
1124 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1126 let wire_rt = Arc::new(runtime.clone());
1127 #[cfg(unix)]
1130 {
1131 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1132 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1133 tokio::spawn(async move {
1134 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1135 &wire_addr, wire_rt,
1136 )
1137 .await
1138 {
1139 tracing::error!(err = %e, "redwire unix listener error");
1140 }
1141 });
1142 return Ok(());
1143 }
1144 }
1145 match tokio::net::TcpListener::bind(&wire_addr).await {
1146 Ok(listener) => {
1147 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1148 tokio::spawn(async move {
1149 if let Err(e) =
1150 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1151 .await
1152 {
1153 tracing::error!(err = %e, "redwire listener error");
1154 }
1155 });
1156 }
1157 Err(err) => {
1158 let reason = format!("wire listener bind {wire_addr}: {err}");
1159 readiness.failed(
1160 "wire",
1161 &wire_addr,
1162 config.wire_bind_explicit,
1163 reason.clone(),
1164 );
1165 if config.wire_bind_explicit {
1166 tracing::error!(
1167 transport = "wire",
1168 bind = %wire_addr,
1169 error = %err,
1170 "fatal explicit bind failure"
1171 );
1172 return Err(format!("explicit {reason}"));
1173 }
1174 tracing::warn!(
1175 transport = "wire",
1176 bind = %wire_addr,
1177 error = %err,
1178 "non-fatal implicit bind failure; listener degraded"
1179 );
1180 }
1181 }
1182 }
1183
1184 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1186 let tls_config = resolve_wire_tls_config(config);
1187 match tls_config {
1188 Ok(tls_cfg) => {
1189 let wire_rt = Arc::new(runtime.clone());
1190 tokio::spawn(async move {
1191 if let Err(e) =
1192 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1193 .await
1194 {
1195 tracing::error!(err = %e, "redwire+tls listener error");
1196 }
1197 });
1198 }
1199 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1200 }
1201 }
1202 Ok(())
1203}
1204
1205fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1212 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1213 let rt = Arc::new(runtime.clone());
1214 tokio::spawn(async move {
1215 let cfg = crate::wire::PgWireConfig {
1216 bind_addr: pg_addr,
1217 ..Default::default()
1218 };
1219 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1220 tracing::error!(err = %e, "pg wire listener error");
1221 }
1222 });
1223 }
1224}
1225
1226fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1240 use crate::utils::secret_file::expand_file_env;
1241
1242 for var in [
1246 "REDDB_GRPC_TLS_CERT",
1247 "REDDB_GRPC_TLS_KEY",
1248 "REDDB_GRPC_TLS_CLIENT_CA",
1249 ] {
1250 if let Err(err) = expand_file_env(var) {
1251 tracing::warn!(
1252 target: "reddb::secrets",
1253 env = %var,
1254 err = %err,
1255 "could not expand *_FILE companion for gRPC TLS"
1256 );
1257 }
1258 }
1259
1260 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1261 (Some(cert), Some(key)) => {
1262 let cert_pem = std::fs::read(cert)
1263 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1264 let key_pem =
1265 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1266 (cert_pem, key_pem)
1267 }
1268 _ => {
1269 let dev = std::env::var("RED_GRPC_TLS_DEV")
1271 .ok()
1272 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1273 .unwrap_or(false);
1274 if !dev {
1275 return Err("gRPC TLS configured but no cert/key supplied — set \
1276 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1277 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1278 .to_string());
1279 }
1280 let dir = config
1281 .path
1282 .as_ref()
1283 .and_then(|p| p.parent())
1284 .map(PathBuf::from)
1285 .unwrap_or_else(|| PathBuf::from("."));
1286 let (cert_pem_str, key_pem_str) =
1287 crate::wire::tls::generate_self_signed_cert("localhost")
1288 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1289
1290 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1295 tracing::warn!(
1296 target: "reddb::security",
1297 transport = "grpc",
1298 cert_sha256 = %fp,
1299 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1300 DO NOT use in production"
1301 );
1302 let cert_path = dir.join("grpc-tls-cert.pem");
1304 let key_path = dir.join("grpc-tls-key.pem");
1305 if !cert_path.exists() || !key_path.exists() {
1306 let _ = std::fs::create_dir_all(&dir);
1307 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1308 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1309 std::fs::write(&key_path, key_pem_str.as_bytes())
1310 .map_err(|e| format!("write grpc dev key: {e}"))?;
1311 #[cfg(unix)]
1312 {
1313 use std::os::unix::fs::PermissionsExt;
1314 let _ =
1315 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1316 }
1317 }
1318 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1319 }
1320 };
1321
1322 let client_ca_pem = match &config.grpc_tls_client_ca {
1323 Some(path) => Some(
1324 std::fs::read(path)
1325 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1326 ),
1327 None => None,
1328 };
1329
1330 Ok(crate::GrpcTlsOptions {
1331 cert_pem,
1332 key_pem,
1333 client_ca_pem,
1334 })
1335}
1336
1337fn spawn_grpc_tls_listener_if_configured(
1341 config: &ServerCommandConfig,
1342 runtime: RedDBRuntime,
1343 auth_store: Arc<AuthStore>,
1344) {
1345 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1346 return;
1347 };
1348 let tls_opts = match resolve_grpc_tls_options(config) {
1349 Ok(opts) => opts,
1350 Err(err) => {
1351 tracing::error!(
1352 target: "reddb::security",
1353 transport = "grpc",
1354 err = %err,
1355 "gRPC TLS config error; TLS listener will not start"
1356 );
1357 return;
1358 }
1359 };
1360 tokio::spawn(async move {
1361 let server = RedDBGrpcServer::with_options(
1362 runtime,
1363 GrpcServerOptions {
1364 bind_addr: tls_bind.clone(),
1365 tls: Some(tls_opts),
1366 },
1367 auth_store,
1368 );
1369 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1370 if let Err(err) = server.serve().await {
1371 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1372 }
1373 });
1374}
1375
1376fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1379 use sha2::{Digest, Sha256};
1380 let mut h = Sha256::new();
1381 h.update(pem);
1382 let d = h.finalize();
1383 let mut buf = String::with_capacity(64);
1384 for b in d.iter() {
1385 buf.push_str(&format!("{b:02x}"));
1386 }
1387 buf
1388}
1389
1390fn resolve_wire_tls_config(
1392 config: &ServerCommandConfig,
1393) -> Result<crate::wire::WireTlsConfig, String> {
1394 match (&config.wire_tls_cert, &config.wire_tls_key) {
1395 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1396 cert_path: cert.clone(),
1397 key_path: key.clone(),
1398 }),
1399 _ => {
1400 let dir = config
1402 .path
1403 .as_ref()
1404 .and_then(|p| p.parent())
1405 .map(PathBuf::from)
1406 .unwrap_or_else(|| PathBuf::from("."));
1407 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1408 }
1409 }
1410}
1411
1412#[inline(never)]
1413fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1414 let rt_config = detect_runtime_config();
1415 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1416 let cli_telemetry = config.telemetry.clone();
1417 let db_options = config.to_db_options();
1418 let mut transport_readiness = TransportReadiness::default();
1419
1420 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1421 .enable_all()
1422 .worker_threads(workers)
1423 .thread_stack_size(rt_config.stack_size)
1424 .build()
1425 .map_err(|err| format!("tokio runtime: {err}"))?;
1426
1427 let (runtime, _auth_store, _telemetry_guard) =
1431 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1432 let signal_runtime = runtime.clone();
1433 tokio_runtime.block_on(async move {
1434 spawn_lifecycle_signal_handler(signal_runtime).await;
1435 spawn_pg_listener(&config, &runtime);
1436 let wire_rt = Arc::new(runtime);
1437 let listener = tokio::net::TcpListener::bind(&wire_addr)
1438 .await
1439 .map_err(|err| {
1440 let reason = format!("wire listener bind {wire_addr}: {err}");
1441 transport_readiness.failed(
1442 "wire",
1443 &wire_addr,
1444 config.wire_bind_explicit,
1445 reason.clone(),
1446 );
1447 if config.wire_bind_explicit {
1448 format!("explicit {reason}")
1449 } else {
1450 reason
1451 }
1452 })?;
1453 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1454 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1455 .await
1456 .map_err(|e| e.to_string())
1457 })
1458}
1459
1460#[inline(never)]
1461fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1462 let rt_config = detect_runtime_config();
1463 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1464 let cli_telemetry = config.telemetry.clone();
1465 let db_options = config.to_db_options();
1466
1467 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1468 .enable_all()
1469 .worker_threads(workers)
1470 .thread_stack_size(rt_config.stack_size)
1471 .build()
1472 .map_err(|err| format!("tokio runtime: {err}"))?;
1473
1474 let (runtime, _auth_store, _telemetry_guard) =
1475 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1476 let signal_runtime = runtime.clone();
1477 tokio_runtime.block_on(async move {
1478 spawn_lifecycle_signal_handler(signal_runtime).await;
1479 let cfg = crate::wire::PgWireConfig {
1480 bind_addr: pg_addr,
1481 ..Default::default()
1482 };
1483 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1484 .await
1485 .map_err(|e| e.to_string())
1486 })
1487}
1488
1489#[inline(never)]
1490fn build_runtime_and_auth_store(
1491 db_options: RedDBOptions,
1492 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1493) -> Result<
1494 (
1495 RedDBRuntime,
1496 Arc<AuthStore>,
1497 Option<crate::telemetry::TelemetryGuard>,
1498 ),
1499 String,
1500> {
1501 build_runtime_with_telemetry(db_options, cli_telemetry)
1508}
1509
1510pub(crate) fn build_runtime_with_telemetry(
1520 db_options: RedDBOptions,
1521 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1522) -> Result<
1523 (
1524 RedDBRuntime,
1525 Arc<AuthStore>,
1526 Option<crate::telemetry::TelemetryGuard>,
1527 ),
1528 String,
1529> {
1530 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1531 let msg = err.to_string();
1537 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1538 phase: "runtime_construction".to_string(),
1539 error: msg.clone(),
1540 }
1541 .emit_global();
1542 msg
1543 })?;
1544
1545 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1550 let msg = err.to_string();
1551 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1552 phase: "lease_loop".to_string(),
1553 error: msg.clone(),
1554 }
1555 .emit_global();
1556 msg
1557 })?;
1558
1559 if let Some(data_path) = db_options.data_path.as_deref() {
1563 let watch_dir = data_path.parent().unwrap_or(data_path);
1564 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1565 }
1566
1567 {
1571 let config_path = crate::runtime::config_overlay::config_file_path();
1572 let store = runtime.db().store();
1573 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1574 }
1575
1576 let merged = merge_telemetry_with_config(
1579 cli_telemetry
1580 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1581 &runtime,
1582 );
1583 let telemetry_guard = crate::telemetry::init(merged);
1584
1585 let auth_store =
1586 if db_options.auth.vault_enabled {
1587 let pager =
1588 runtime.db().store().pager().cloned().ok_or_else(|| {
1589 "vault requires a paged database (persistent mode)".to_string()
1590 })?;
1591 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1592 .map_err(|err| err.to_string())?;
1593 Arc::new(store)
1594 } else {
1595 Arc::new(AuthStore::new(db_options.auth.clone()))
1596 };
1597 auth_store.bootstrap_from_env();
1598
1599 {
1601 let store = Arc::clone(&auth_store);
1602 std::thread::Builder::new()
1603 .name("reddb-session-purge".into())
1604 .spawn(move || loop {
1605 std::thread::sleep(std::time::Duration::from_secs(300));
1606 store.purge_expired_sessions();
1607 })
1608 .ok();
1609 }
1610
1611 Ok((runtime, auth_store, telemetry_guard))
1612}
1613
1614fn merge_telemetry_with_config(
1625 mut cli: crate::telemetry::TelemetryConfig,
1626 runtime: &RedDBRuntime,
1627) -> crate::telemetry::TelemetryConfig {
1628 use crate::storage::schema::Value;
1629
1630 let store = runtime.db().store();
1631
1632 if !cli.level_explicit {
1633 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1634 cli.level_filter = v.to_string();
1635 }
1636 }
1637 if !cli.format_explicit {
1638 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1639 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1640 cli.format = parsed;
1641 }
1642 }
1643 }
1644 if !cli.rotation_keep_days_explicit {
1645 match store.get_config("red.logging.keep_days") {
1646 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1647 cli.rotation_keep_days = n as u16
1648 }
1649 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1650 cli.rotation_keep_days = n as u16
1651 }
1652 Some(Value::Text(v)) => {
1653 if let Ok(n) = v.parse::<u16>() {
1654 cli.rotation_keep_days = n;
1655 }
1656 }
1657 _ => {}
1658 }
1659 }
1660 if !cli.file_prefix_explicit {
1661 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1662 if !v.is_empty() {
1663 cli.file_prefix = v.to_string();
1664 }
1665 }
1666 }
1667 if !cli.log_dir_explicit && !cli.log_file_disabled {
1670 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1671 if !v.is_empty() {
1672 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1673 }
1674 }
1675 }
1676
1677 cli
1678}
1679
1680#[cfg(test)]
1681mod telemetry_merge_tests {
1682 use super::*;
1683 use crate::telemetry::{LogFormat, TelemetryConfig};
1684
1685 fn fresh_runtime() -> RedDBRuntime {
1686 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1687 }
1688
1689 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1690 runtime
1691 .db()
1692 .store()
1693 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1694 }
1695
1696 fn cli_base() -> TelemetryConfig {
1697 TelemetryConfig {
1700 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1701 format: LogFormat::Json,
1702 ..Default::default()
1703 }
1704 }
1705
1706 #[test]
1707 fn config_log_dir_promoted_when_flag_absent() {
1708 let runtime = fresh_runtime();
1709 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1710 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1711 assert_eq!(
1712 merged.log_dir.as_deref(),
1713 Some(std::path::Path::new("/var/log/reddb"))
1714 );
1715 }
1716
1717 #[test]
1718 fn explicit_log_dir_wins_over_config() {
1719 let runtime = fresh_runtime();
1720 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1721 let mut cli = cli_base();
1722 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1723 cli.log_dir_explicit = true;
1724 let merged = merge_telemetry_with_config(cli, &runtime);
1725 assert_eq!(
1726 merged.log_dir.as_deref(),
1727 Some(std::path::Path::new("/custom/dir"))
1728 );
1729 }
1730
1731 #[test]
1732 fn no_log_file_beats_config_log_dir() {
1733 let runtime = fresh_runtime();
1734 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1735 let mut cli = cli_base();
1736 cli.log_dir = None;
1737 cli.log_file_disabled = true;
1738 let merged = merge_telemetry_with_config(cli, &runtime);
1739 assert!(
1740 merged.log_dir.is_none(),
1741 "--no-log-file must veto config dir"
1742 );
1743 }
1744
1745 #[test]
1746 fn config_format_promoted_on_non_tty_default() {
1747 let runtime = fresh_runtime();
1751 set_str(&runtime, "red.logging.format", "pretty");
1752 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1753 assert_eq!(merged.format, LogFormat::Pretty);
1754 }
1755
1756 #[test]
1757 fn explicit_format_wins_over_config() {
1758 let runtime = fresh_runtime();
1759 set_str(&runtime, "red.logging.format", "pretty");
1760 let mut cli = cli_base();
1761 cli.format = LogFormat::Json;
1762 cli.format_explicit = true;
1763 let merged = merge_telemetry_with_config(cli, &runtime);
1764 assert_eq!(merged.format, LogFormat::Json);
1765 }
1766}
1767
1768#[inline(never)]
1769fn build_http_server(
1770 runtime: RedDBRuntime,
1771 auth_store: Arc<AuthStore>,
1772 bind_addr: String,
1773) -> RedDBServer {
1774 build_http_server_with_transport_readiness(
1775 runtime,
1776 auth_store,
1777 bind_addr,
1778 TransportReadiness::default(),
1779 )
1780}
1781
1782#[inline(never)]
1783fn build_http_server_with_transport_readiness(
1784 runtime: RedDBRuntime,
1785 auth_store: Arc<AuthStore>,
1786 bind_addr: String,
1787 transport_readiness: TransportReadiness,
1788) -> RedDBServer {
1789 RedDBServer::with_options(
1790 runtime,
1791 ServerOptions {
1792 bind_addr,
1793 transport_readiness,
1794 ..ServerOptions::default()
1795 },
1796 )
1797 .with_auth(auth_store)
1798}
1799
1800#[inline(never)]
1804fn build_admin_only_server(
1805 runtime: RedDBRuntime,
1806 auth_store: Arc<AuthStore>,
1807 bind_addr: String,
1808) -> RedDBServer {
1809 RedDBServer::with_options(
1810 runtime,
1811 ServerOptions {
1812 bind_addr,
1813 surface: crate::server::ServerSurface::AdminOnly,
1814 ..ServerOptions::default()
1815 },
1816 )
1817 .with_auth(auth_store)
1818}
1819
1820#[inline(never)]
1824fn build_metrics_only_server(
1825 runtime: RedDBRuntime,
1826 auth_store: Arc<AuthStore>,
1827 bind_addr: String,
1828) -> RedDBServer {
1829 RedDBServer::with_options(
1830 runtime,
1831 ServerOptions {
1832 bind_addr,
1833 surface: crate::server::ServerSurface::MetricsOnly,
1834 ..ServerOptions::default()
1835 },
1836 )
1837 .with_auth(auth_store)
1838}
1839
1840fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
1844 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
1845 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1846 let _ = server.serve_in_background();
1847 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
1848 }
1849 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
1850 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1851 let _ = server.serve_in_background();
1852 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
1853 }
1854}
1855
1856#[inline(never)]
1857fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1858 let cli_telemetry = config.telemetry.clone();
1859 let mut transport_readiness = TransportReadiness::default();
1860 let Some(listener) = bind_listener_for_startup(
1861 &mut transport_readiness,
1862 "http",
1863 &bind_addr,
1864 config.http_bind_explicit,
1865 )?
1866 else {
1867 return Err(format!(
1868 "no HTTP listener started; implicit bind {} failed",
1869 bind_addr
1870 ));
1871 };
1872 let (runtime, auth_store, _telemetry_guard) =
1873 build_runtime_and_auth_store(config.to_db_options(), cli_telemetry)?;
1874 spawn_admin_metrics_listeners(&runtime, &auth_store);
1875 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1876 let server = build_http_server_with_transport_readiness(
1877 runtime,
1878 auth_store,
1879 bind_addr.clone(),
1880 transport_readiness,
1881 );
1882 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
1883 server.serve_on(listener).map_err(|err| err.to_string())
1884}
1885
1886fn spawn_http_tls_listener(
1892 config: &ServerCommandConfig,
1893 runtime: &RedDBRuntime,
1894 auth_store: &Arc<AuthStore>,
1895) -> Result<(), String> {
1896 let Some(addr) = config.http_tls_bind_addr.clone() else {
1897 return Ok(());
1898 };
1899
1900 let tls_config = resolve_http_tls_config(config)?;
1901 let server_config = crate::server::tls::build_server_config(&tls_config)
1902 .map_err(|err| format!("HTTP TLS: {err}"))?;
1903
1904 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
1905 let _handle = server.serve_tls_in_background(server_config);
1906 tracing::info!(
1907 transport = "https",
1908 bind = %addr,
1909 mtls = %tls_config.client_ca_path.is_some(),
1910 "TLS listener online"
1911 );
1912 Ok(())
1913}
1914
1915fn resolve_http_tls_config(
1917 config: &ServerCommandConfig,
1918) -> Result<crate::server::tls::HttpTlsConfig, String> {
1919 match (&config.http_tls_cert, &config.http_tls_key) {
1920 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
1921 cert_path: cert.clone(),
1922 key_path: key.clone(),
1923 client_ca_path: config.http_tls_client_ca.clone(),
1924 }),
1925 (None, None) => {
1926 let dir = config
1928 .path
1929 .as_ref()
1930 .and_then(|p| p.parent().map(std::path::PathBuf::from))
1931 .unwrap_or_else(|| std::path::PathBuf::from("."));
1932 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
1933 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
1934 Ok(crate::server::tls::HttpTlsConfig {
1935 cert_path: auto.cert_path,
1936 key_path: auto.key_path,
1937 client_ca_path: config.http_tls_client_ca.clone(),
1938 })
1939 }
1940 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
1941 }
1942}
1943
1944#[inline(never)]
1945fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1946 let workers = config.workers;
1947 let cli_telemetry = config.telemetry.clone();
1948 let db_options = config.to_db_options();
1949 let rt_config = detect_runtime_config();
1950 let mut transport_readiness = TransportReadiness::default();
1951 let Some(grpc_listener) = bind_listener_for_startup(
1952 &mut transport_readiness,
1953 "grpc",
1954 &bind_addr,
1955 config.grpc_bind_explicit,
1956 )?
1957 else {
1958 return Err(format!(
1959 "no gRPC listener started; implicit bind {} failed",
1960 bind_addr
1961 ));
1962 };
1963
1964 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1965
1966 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1967 .enable_all()
1968 .worker_threads(worker_threads)
1969 .thread_stack_size(rt_config.stack_size)
1970 .build()
1971 .map_err(|err| format!("tokio runtime: {err}"))?;
1972
1973 let (runtime, auth_store, _telemetry_guard) =
1975 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1976 let signal_runtime = runtime.clone();
1977 tokio_runtime.block_on(async move {
1978 spawn_lifecycle_signal_handler(signal_runtime).await;
1979 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
1981
1982 spawn_pg_listener(&config, &runtime);
1984
1985 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1989
1990 let server = RedDBGrpcServer::with_options(
1991 runtime,
1992 GrpcServerOptions {
1993 bind_addr: bind_addr.clone(),
1994 tls: None,
1995 },
1996 auth_store,
1997 );
1998
1999 tracing::info!(
2000 transport = "grpc",
2001 bind = %bind_addr,
2002 cpus = rt_config.available_cpus,
2003 workers = worker_threads,
2004 "listener online"
2005 );
2006 server
2007 .serve_on(grpc_listener)
2008 .await
2009 .map_err(|err| err.to_string())
2010 })
2011}
2012
2013#[inline(never)]
2014fn run_dual_server(
2015 config: ServerCommandConfig,
2016 grpc_bind_addr: String,
2017 http_bind_addr: String,
2018) -> Result<(), String> {
2019 let workers = config.workers;
2020 let cli_telemetry = config.telemetry.clone();
2021 let db_options = config.to_db_options();
2022 let rt_config = detect_runtime_config();
2023 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2024 let mut transport_readiness = TransportReadiness::default();
2025 let http_listener = bind_listener_for_startup(
2026 &mut transport_readiness,
2027 "http",
2028 &http_bind_addr,
2029 config.http_bind_explicit,
2030 )?;
2031 let grpc_listener = bind_listener_for_startup(
2032 &mut transport_readiness,
2033 "grpc",
2034 &grpc_bind_addr,
2035 config.grpc_bind_explicit,
2036 )?;
2037 if http_listener.is_none() && grpc_listener.is_none() {
2038 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2039 }
2040 let (runtime, auth_store, _telemetry_guard) =
2041 build_runtime_and_auth_store(db_options, cli_telemetry)?;
2042
2043 spawn_admin_metrics_listeners(&runtime, &auth_store);
2044 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2045
2046 let http_handle = if let Some(listener) = http_listener {
2047 let http_server = build_http_server_with_transport_readiness(
2048 runtime.clone(),
2049 auth_store.clone(),
2050 http_bind_addr.clone(),
2051 transport_readiness.clone(),
2052 );
2053 Some(http_server.serve_in_background_on(listener))
2054 } else {
2055 None
2056 };
2057
2058 thread::sleep(Duration::from_millis(150));
2059 if let Some(handle) = http_handle.as_ref() {
2060 if handle.is_finished() {
2061 let handle = http_handle.unwrap();
2062 return match handle.join() {
2063 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2064 Ok(Err(err)) => Err(err.to_string()),
2065 Err(_) => Err("HTTP server thread panicked".to_string()),
2066 };
2067 }
2068 }
2069 if grpc_listener.is_none() {
2070 let Some(handle) = http_handle else {
2071 return Err("no listener started".to_string());
2072 };
2073 return match handle.join() {
2074 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2075 Ok(Err(err)) => Err(err.to_string()),
2076 Err(_) => Err("HTTP server thread panicked".to_string()),
2077 };
2078 }
2079 let grpc_listener = grpc_listener.expect("checked above");
2080
2081 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2082 .enable_all()
2083 .worker_threads(worker_threads)
2084 .thread_stack_size(rt_config.stack_size)
2085 .build()
2086 .map_err(|err| format!("tokio runtime: {err}"))?;
2087
2088 let signal_runtime = runtime.clone();
2089 tokio_runtime.block_on(async move {
2090 spawn_lifecycle_signal_handler(signal_runtime).await;
2091 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2093
2094 spawn_pg_listener(&config, &runtime);
2096
2097 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2099
2100 let server = RedDBGrpcServer::with_options(
2101 runtime,
2102 GrpcServerOptions {
2103 bind_addr: grpc_bind_addr.clone(),
2104 tls: None,
2105 },
2106 auth_store,
2107 );
2108
2109 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2110 tracing::info!(
2111 transport = "grpc",
2112 bind = %grpc_bind_addr,
2113 cpus = rt_config.available_cpus,
2114 workers = worker_threads,
2115 "listener online"
2116 );
2117 server
2118 .serve_on(grpc_listener)
2119 .await
2120 .map_err(|err| err.to_string())
2121 })
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126 use super::*;
2127
2128 #[test]
2129 fn render_systemd_unit_contains_expected_execstart() {
2130 let config = SystemdServiceConfig {
2131 service_name: "reddb".to_string(),
2132 binary_path: PathBuf::from("/usr/local/bin/red"),
2133 run_user: "reddb".to_string(),
2134 run_group: "reddb".to_string(),
2135 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2136 router_bind_addr: None,
2137 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2138 http_bind_addr: None,
2139 };
2140
2141 let unit = render_systemd_unit(&config);
2142 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2143 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2144 }
2145
2146 #[test]
2147 fn systemd_service_config_derives_paths() {
2148 let config = SystemdServiceConfig {
2149 service_name: "reddb-api".to_string(),
2150 binary_path: PathBuf::from("/usr/local/bin/red"),
2151 run_user: "reddb".to_string(),
2152 run_group: "reddb".to_string(),
2153 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2154 router_bind_addr: None,
2155 grpc_bind_addr: None,
2156 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2157 };
2158
2159 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2160 assert_eq!(
2161 config.unit_path(),
2162 PathBuf::from("/etc/systemd/system/reddb-api.service")
2163 );
2164 }
2165
2166 #[test]
2167 fn render_systemd_unit_supports_dual_transport() {
2168 let config = SystemdServiceConfig {
2169 service_name: "reddb".to_string(),
2170 binary_path: PathBuf::from("/usr/local/bin/red"),
2171 run_user: "reddb".to_string(),
2172 run_group: "reddb".to_string(),
2173 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2174 router_bind_addr: None,
2175 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2176 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2177 };
2178
2179 let unit = render_systemd_unit(&config);
2180 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2181 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2182 }
2183
2184 #[test]
2185 fn render_systemd_unit_supports_router_mode() {
2186 let config = SystemdServiceConfig {
2187 service_name: "reddb".to_string(),
2188 binary_path: PathBuf::from("/usr/local/bin/red"),
2189 run_user: "reddb".to_string(),
2190 run_group: "reddb".to_string(),
2191 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2192 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2193 grpc_bind_addr: None,
2194 http_bind_addr: None,
2195 };
2196
2197 let unit = render_systemd_unit(&config);
2198 assert!(unit.contains("--bind 127.0.0.1:5050"));
2199 assert!(!unit.contains("--grpc-bind"));
2200 assert!(!unit.contains("--http-bind"));
2201 }
2202
2203 #[test]
2204 fn explicit_bind_collision_is_fatal() {
2205 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2206 let addr = held.local_addr().expect("held addr").to_string();
2207 let mut readiness = TransportReadiness::default();
2208
2209 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2210
2211 assert!(error.contains("explicit http listener bind"));
2212 assert_eq!(readiness.active.len(), 0);
2213 assert_eq!(readiness.failed.len(), 1);
2214 assert!(readiness.failed[0].explicit);
2215 assert_eq!(readiness.failed[0].bind_addr, addr);
2216 }
2217
2218 #[test]
2219 fn implicit_bind_collision_degrades() {
2220 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2221 let addr = held.local_addr().expect("held addr").to_string();
2222 let mut readiness = TransportReadiness::default();
2223
2224 let listener =
2225 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2226
2227 assert!(listener.is_none());
2228 assert_eq!(readiness.active.len(), 0);
2229 assert_eq!(readiness.failed.len(), 1);
2230 assert!(!readiness.failed[0].explicit);
2231 assert_eq!(readiness.failed[0].bind_addr, addr);
2232 }
2233}