1use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub grpc_bind_addr: Option<String>,
71 pub grpc_tls_bind_addr: Option<String>,
75 pub grpc_tls_cert: Option<PathBuf>,
81 pub grpc_tls_key: Option<PathBuf>,
84 pub grpc_tls_client_ca: Option<PathBuf>,
89 pub http_bind_addr: Option<String>,
90 pub http_tls_bind_addr: Option<String>,
94 pub http_tls_cert: Option<PathBuf>,
97 pub http_tls_key: Option<PathBuf>,
100 pub http_tls_client_ca: Option<PathBuf>,
104 pub wire_bind_addr: Option<String>,
105 pub wire_tls_bind_addr: Option<String>,
107 pub wire_tls_cert: Option<PathBuf>,
109 pub wire_tls_key: Option<PathBuf>,
111 pub pg_bind_addr: Option<String>,
115 pub create_if_missing: bool,
116 pub read_only: bool,
117 pub role: String,
118 pub primary_addr: Option<String>,
119 pub vault: bool,
120 pub workers: Option<usize>,
122 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
125}
126
127#[derive(Debug, Clone)]
128pub struct SystemdServiceConfig {
129 pub service_name: String,
130 pub binary_path: PathBuf,
131 pub run_user: String,
132 pub run_group: String,
133 pub data_path: PathBuf,
134 pub router_bind_addr: Option<String>,
135 pub grpc_bind_addr: Option<String>,
136 pub http_bind_addr: Option<String>,
137}
138
139impl SystemdServiceConfig {
140 pub fn data_dir(&self) -> PathBuf {
141 self.data_path
142 .parent()
143 .map(PathBuf::from)
144 .unwrap_or_else(|| PathBuf::from("."))
145 }
146
147 pub fn unit_path(&self) -> PathBuf {
148 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
149 }
150}
151
152pub fn default_telemetry_for_path(
157 path: Option<&std::path::Path>,
158) -> crate::telemetry::TelemetryConfig {
159 let log_dir = match path {
160 Some(p) => p
161 .parent()
162 .map(|parent| parent.join("logs"))
163 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
164 None => None, };
166 crate::telemetry::TelemetryConfig {
167 log_dir,
168 file_prefix: "reddb.log".to_string(),
169 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
170 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
171 crate::telemetry::LogFormat::Pretty
172 } else {
173 crate::telemetry::LogFormat::Json
174 },
175 rotation_keep_days: 14,
176 service_name: "reddb",
177 level_explicit: false,
179 format_explicit: false,
180 rotation_keep_days_explicit: false,
181 file_prefix_explicit: false,
182 log_dir_explicit: false,
183 log_file_disabled: false,
184 }
185}
186
187impl ServerCommandConfig {
188 fn to_db_options(&self) -> RedDBOptions {
189 let mut options = match &self.path {
190 Some(path) => RedDBOptions::persistent(path),
191 None => RedDBOptions::in_memory(),
192 };
193
194 options.mode = StorageMode::Persistent;
195 options.create_if_missing = self.create_if_missing;
196 options.read_only = self.read_only
203 || env_nonempty("RED_READONLY")
204 .or_else(|| env_nonempty("REDDB_READONLY"))
205 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
206 .unwrap_or(false)
207 || self.path.as_ref().is_some_and(|data_path| {
208 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
209 data_path,
210 ))
211 .unwrap_or(false)
212 });
213
214 options.replication = match self.role.as_str() {
215 "primary" => ReplicationConfig::primary(),
216 "replica" => {
217 let primary_addr = self
218 .primary_addr
219 .clone()
220 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
221 ReplicationConfig::replica(primary_addr)
228 }
229 _ => ReplicationConfig::standalone(),
230 };
231
232 if self.vault {
233 options.auth.vault_enabled = true;
234 }
235
236 configure_remote_backend_from_env(&mut options);
237
238 options
239 }
240
241 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
242 let mut transports = Vec::with_capacity(3);
243 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
244 transports.push(ServerTransport::Grpc);
245 }
246 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
247 transports.push(ServerTransport::Http);
248 }
249 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
250 transports.push(ServerTransport::Wire);
251 }
252 transports
253 }
254}
255
256fn env_nonempty(name: &str) -> Option<String> {
261 crate::utils::env_with_file_fallback(name)
262}
263
264fn env_truthy(name: &str) -> bool {
265 env_nonempty(name)
266 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
267 .unwrap_or(false)
268}
269
270fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
271 let backend = env_nonempty("RED_BACKEND")
277 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
278 .unwrap_or_else(|| "none".to_string())
279 .to_ascii_lowercase();
280
281 match backend.as_str() {
282 "s3" | "minio" | "r2" => {
287 #[cfg(feature = "backend-s3")]
288 {
289 if let Some(config) = s3_config_from_env() {
290 let remote_key = env_nonempty("RED_REMOTE_KEY")
291 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
292 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
293 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
294 options.remote_backend = Some(backend.clone());
295 options.remote_backend_atomic = Some(backend);
296 options.remote_key = Some(remote_key);
297 }
298 }
299 #[cfg(not(feature = "backend-s3"))]
300 {
301 tracing::warn!(
302 backend = %backend,
303 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
304 );
305 }
306 }
307 "fs" | "local" => {
312 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
313 let remote_key = match (
314 base_path,
315 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
316 ) {
317 (Some(base), Some(rel)) => Some(format!(
318 "{}/{}",
319 base.trim_end_matches('/'),
320 rel.trim_start_matches('/')
321 )),
322 (Some(base), None) => Some(format!(
323 "{}/clusters/dev/data.rdb",
324 base.trim_end_matches('/')
325 )),
326 (None, Some(rel)) => Some(rel),
327 (None, None) => None,
328 };
329 if let Some(key) = remote_key {
330 let backend = Arc::new(crate::storage::backend::LocalBackend);
331 options.remote_backend = Some(backend.clone());
332 options.remote_backend_atomic = Some(backend);
333 options.remote_key = Some(key);
334 }
335 }
336 "http" => {
341 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
342 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
343 {
344 Some(u) => u,
345 None => {
346 tracing::warn!(
347 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
348 );
349 return;
350 }
351 };
352 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
353 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
354 .unwrap_or_default();
355 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
356 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
357 {
358 std::fs::read_to_string(&path)
359 .ok()
360 .map(|s| s.trim().to_string())
361 .filter(|s| !s.is_empty())
362 } else {
363 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
364 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
365 };
366
367 let mut config =
368 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
369 if let Some(auth) = auth_header {
370 config = config.with_auth_header(auth);
371 }
372 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
373 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
374 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
375 config = config.with_conditional_writes(conditional_writes);
376 if conditional_writes {
381 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
382 Ok(atomic) => {
383 let atomic_arc = Arc::new(atomic);
384 options.remote_backend = Some(atomic_arc.clone());
385 options.remote_backend_atomic = Some(atomic_arc);
386 }
387 Err(err) => {
388 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
389 options.remote_backend =
390 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
391 }
392 }
393 } else {
394 options.remote_backend =
395 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
396 }
397 options.remote_key = env_nonempty("RED_REMOTE_KEY")
398 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
399 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
400 }
401 "none" | "" => {}
404 other => {
405 tracing::warn!(
406 backend = %other,
407 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
408 );
409 }
410 }
411}
412
413#[cfg(feature = "backend-s3")]
418fn env_s3(suffix: &str) -> Option<String> {
419 env_nonempty(&format!("RED_S3_{suffix}"))
420 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
421}
422
423#[cfg(feature = "backend-s3")]
429fn env_s3_secret(suffix: &str) -> Option<String> {
430 let file_key_red = format!("RED_S3_{suffix}_FILE");
431 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
432 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
433 return std::fs::read_to_string(&path)
434 .ok()
435 .map(|s| s.trim().to_string())
436 .filter(|s| !s.is_empty());
437 }
438 env_s3(suffix)
439}
440
441#[cfg(feature = "backend-s3")]
442fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
443 let endpoint = env_s3("ENDPOINT")?;
444 let bucket = env_s3("BUCKET")?;
445 let access_key = env_s3_secret("ACCESS_KEY")?;
446 let secret_key = env_s3_secret("SECRET_KEY")?;
447 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
448 let key_prefix = env_s3("KEY_PREFIX")
449 .or_else(|| env_s3("PREFIX"))
450 .unwrap_or_default();
451 let path_style = env_s3("PATH_STYLE")
452 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
453 .unwrap_or(true);
454 Some(crate::storage::backend::S3Config {
455 endpoint,
456 bucket,
457 key_prefix,
458 access_key,
459 secret_key,
460 region,
461 path_style,
462 })
463}
464
465pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
466 let data_dir = config.data_dir();
467 let exec_start = render_systemd_exec_start(config);
468 format!(
469 "[Unit]\n\
470Description=RedDB unified database service\n\
471After=network-online.target\n\
472Wants=network-online.target\n\
473\n\
474[Service]\n\
475Type=simple\n\
476User={user}\n\
477Group={group}\n\
478WorkingDirectory={workdir}\n\
479ExecStart={exec_start}\n\
480Restart=always\n\
481RestartSec=2\n\
482LimitSTACK=16M\n\
483NoNewPrivileges=true\n\
484PrivateTmp=true\n\
485ProtectSystem=strict\n\
486ProtectHome=true\n\
487ProtectControlGroups=true\n\
488ProtectKernelTunables=true\n\
489ProtectKernelModules=true\n\
490RestrictNamespaces=true\n\
491LockPersonality=true\n\
492MemoryDenyWriteExecute=true\n\
493ReadWritePaths={workdir}\n\
494\n\
495[Install]\n\
496WantedBy=multi-user.target\n",
497 user = config.run_user,
498 group = config.run_group,
499 workdir = data_dir.display(),
500 exec_start = exec_start,
501 )
502}
503
504#[cfg(target_os = "linux")]
513pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
514 ensure_root()?;
515 ensure_command_available("systemctl")?;
516 ensure_command_available("getent")?;
517 ensure_command_available("groupadd")?;
518 ensure_command_available("useradd")?;
519 ensure_command_available("install")?;
520 ensure_executable(&config.binary_path)?;
521
522 if !command_success("getent", ["group", config.run_group.as_str()])? {
523 run_command("groupadd", ["--system", config.run_group.as_str()])?;
524 }
525
526 if !command_success("id", ["-u", config.run_user.as_str()])? {
527 let data_dir = config.data_dir();
528 run_command(
529 "useradd",
530 [
531 "--system",
532 "--gid",
533 config.run_group.as_str(),
534 "--home-dir",
535 data_dir.to_string_lossy().as_ref(),
536 "--shell",
537 "/usr/sbin/nologin",
538 config.run_user.as_str(),
539 ],
540 )?;
541 }
542
543 let data_dir = config.data_dir();
544 run_command(
545 "install",
546 [
547 "-d",
548 "-o",
549 config.run_user.as_str(),
550 "-g",
551 config.run_group.as_str(),
552 "-m",
553 "0750",
554 data_dir.to_string_lossy().as_ref(),
555 ],
556 )?;
557
558 std::fs::write(config.unit_path(), render_systemd_unit(config))
559 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
560
561 run_command("systemctl", ["daemon-reload"])?;
562 run_command(
563 "systemctl",
564 [
565 "enable",
566 "--now",
567 format!("{}.service", config.service_name).as_str(),
568 ],
569 )?;
570
571 Ok(())
572}
573
574#[cfg(not(target_os = "linux"))]
579pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
580 Err("systemd install is Linux-only — use sc.exe (Windows) or \
581 launchd (macOS) to install the service manually using the \
582 unit printed by `red service print-unit`"
583 .to_string())
584}
585
586#[cfg(target_os = "linux")]
587fn ensure_root() -> Result<(), String> {
588 let output = Command::new("id")
589 .arg("-u")
590 .output()
591 .map_err(|err| format!("failed to determine current uid: {err}"))?;
592 if !output.status.success() {
593 return Err("failed to determine current uid".to_string());
594 }
595 let uid = String::from_utf8_lossy(&output.stdout);
596 if uid.trim() != "0" {
597 return Err("run this command as root (sudo)".to_string());
598 }
599 Ok(())
600}
601
602#[cfg(target_os = "linux")]
603fn ensure_command_available(command: &str) -> Result<(), String> {
604 let status = Command::new("sh")
605 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
606 .status()
607 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
608 if status.success() {
609 Ok(())
610 } else {
611 Err(format!("required command not found: {command}"))
612 }
613}
614
615#[cfg(target_os = "linux")]
616fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
617 let metadata = std::fs::metadata(path)
618 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
619 #[cfg(unix)]
620 {
621 use std::os::unix::fs::PermissionsExt;
622 if metadata.permissions().mode() & 0o111 == 0 {
623 return Err(format!("binary is not executable: {}", path.display()));
624 }
625 }
626 #[cfg(not(unix))]
627 {
628 if !metadata.is_file() {
629 return Err(format!("binary is not a file: {}", path.display()));
630 }
631 }
632 Ok(())
633}
634
635#[cfg(target_os = "linux")]
636fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
637 Command::new(program)
638 .args(args)
639 .status()
640 .map(|status| status.success())
641 .map_err(|err| format!("failed to run {program}: {err}"))
642}
643
644#[cfg(target_os = "linux")]
645fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
646 let output = Command::new(program)
647 .args(args)
648 .output()
649 .map_err(|err| format!("failed to run {program}: {err}"))?;
650 if output.status.success() {
651 return Ok(());
652 }
653
654 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
655 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
656 let detail = if !stderr.is_empty() {
657 stderr
658 } else if !stdout.is_empty() {
659 stdout
660 } else {
661 format!("exit status {}", output.status)
662 };
663 Err(format!("{program} failed: {detail}"))
664}
665
666pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
667 let has_any = config.router_bind_addr.is_some()
668 || config.grpc_bind_addr.is_some()
669 || config.http_bind_addr.is_some()
670 || config.wire_bind_addr.is_some();
671 if !has_any {
672 return Err("at least one server bind address must be configured".into());
673 }
674 let thread_name = if config.router_bind_addr.is_some() {
675 "red-server-router"
676 } else {
677 match (
678 config.grpc_bind_addr.is_some(),
679 config.http_bind_addr.is_some(),
680 ) {
681 (true, true) => "red-server-dual",
682 (true, false) => "red-server-grpc",
683 (false, true) => "red-server-http",
684 (false, false) => "red-server-wire",
685 }
686 };
687
688 let handle = thread::Builder::new()
689 .name(thread_name.into())
690 .stack_size(8 * 1024 * 1024)
691 .spawn(move || run_configured_servers(config))
692 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
693
694 match handle.join() {
695 Ok(result) => result,
696 Err(_) => Err("server thread panicked".to_string()),
697 }
698}
699
700fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
701 let mut parts = vec![
702 config.binary_path.display().to_string(),
703 "server".to_string(),
704 "--path".to_string(),
705 config.data_path.display().to_string(),
706 ];
707
708 if let Some(bind_addr) = &config.router_bind_addr {
709 parts.push("--bind".to_string());
710 parts.push(bind_addr.clone());
711 } else if let Some(bind_addr) = &config.grpc_bind_addr {
712 parts.push("--grpc-bind".to_string());
713 parts.push(bind_addr.clone());
714 }
715 if let Some(bind_addr) = &config.http_bind_addr {
716 parts.push("--http-bind".to_string());
717 parts.push(bind_addr.clone());
718 }
719
720 parts.join(" ")
721}
722
723pub fn probe_listener(target: &str, timeout: Duration) -> bool {
724 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
725 Ok(addresses) => addresses.collect(),
726 Err(_) => return false,
727 };
728
729 addresses
730 .into_iter()
731 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
732}
733
734#[inline(never)]
735fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
736 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
742 return run_routed_server(config, router_bind_addr);
743 }
744
745 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
746 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
747 run_dual_server(config, grpc_bind_addr, http_bind_addr)
748 }
749 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
750 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
751 (None, None) => {
752 if let Some(wire_addr) = config.wire_bind_addr.clone() {
754 run_wire_only_server(config, wire_addr)
755 } else {
756 Err("at least one server bind address must be configured".to_string())
757 }
758 }
759 }
760}
761
762async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
785 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
786 .ok()
787 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
788 .unwrap_or(true);
789
790 #[cfg(unix)]
791 {
792 use tokio::signal::unix::{signal, SignalKind};
793
794 let mut sigterm = match signal(SignalKind::terminate()) {
795 Ok(s) => s,
796 Err(err) => {
797 tracing::warn!(
798 error = %err,
799 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
800 );
801 return;
802 }
803 };
804 let mut sigint = match signal(SignalKind::interrupt()) {
805 Ok(s) => s,
806 Err(err) => {
807 tracing::warn!(error = %err, "could not install SIGINT handler");
808 return;
809 }
810 };
811 let mut sighup = match signal(SignalKind::hangup()) {
817 Ok(s) => Some(s),
818 Err(err) => {
819 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
820 None
821 }
822 };
823
824 let reload_runtime = runtime.clone();
825 tokio::spawn(async move {
826 loop {
827 let signal_name = match &mut sighup {
828 Some(hup) => tokio::select! {
829 _ = sigterm.recv() => "SIGTERM",
830 _ = sigint.recv() => "SIGINT",
831 _ = hup.recv() => "SIGHUP",
832 },
833 None => tokio::select! {
834 _ = sigterm.recv() => "SIGTERM",
835 _ = sigint.recv() => "SIGINT",
836 },
837 };
838
839 if signal_name == "SIGHUP" {
840 handle_sighup_reload(&reload_runtime);
841 continue; }
843
844 tracing::info!(
845 signal = signal_name,
846 "lifecycle signal received; shutting down"
847 );
848 match runtime.graceful_shutdown(backup_on_shutdown) {
849 Ok(report) => {
850 tracing::info!(
851 duration_ms = report.duration_ms,
852 flushed_wal = report.flushed_wal,
853 final_checkpoint = report.final_checkpoint,
854 backup_uploaded = report.backup_uploaded,
855 "graceful shutdown complete"
856 );
857 }
858 Err(err) => {
859 tracing::error!(error = %err, "graceful shutdown failed");
860 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
866 reason: format!("graceful shutdown failed: {err}"),
867 }
868 .emit_global();
869 }
870 }
871 std::process::exit(0);
872 }
873 });
874 }
875
876 #[cfg(not(unix))]
877 {
878 tokio::spawn(async move {
879 let interrupted = tokio::signal::ctrl_c().await;
880 if let Err(err) = interrupted {
881 tracing::warn!(error = %err, "could not install Ctrl+C handler");
882 return;
883 }
884
885 tracing::info!(
886 signal = "Ctrl+C",
887 "lifecycle signal received; shutting down"
888 );
889 match runtime.graceful_shutdown(backup_on_shutdown) {
890 Ok(report) => {
891 tracing::info!(
892 duration_ms = report.duration_ms,
893 flushed_wal = report.flushed_wal,
894 final_checkpoint = report.final_checkpoint,
895 backup_uploaded = report.backup_uploaded,
896 "graceful shutdown complete"
897 );
898 }
899 Err(err) => {
900 tracing::error!(error = %err, "graceful shutdown failed");
901 }
902 }
903 std::process::exit(0);
904 });
905 }
906}
907
908fn handle_sighup_reload(runtime: &RedDBRuntime) {
917 let now_ms = std::time::SystemTime::now()
918 .duration_since(std::time::UNIX_EPOCH)
919 .map(|d| d.as_millis() as u64)
920 .unwrap_or(0);
921 tracing::info!(
922 target: "reddb::secrets",
923 ts_unix_ms = now_ms,
924 "SIGHUP received; secrets will be re-read from *_FILE on next access"
925 );
926 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
931 runtime.audit_log().record_event(
932 AuditEvent::builder("config/sighup_reload")
933 .source(AuditAuthSource::System)
934 .resource("secrets")
935 .outcome(Outcome::Success)
936 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
937 .build(),
938 );
939}
940
941#[inline(never)]
942fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
943 let workers = config.workers;
944 let cli_telemetry = config.telemetry.clone();
945 let db_options = config.to_db_options();
946 let rt_config = detect_runtime_config();
947 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
948 let (runtime, auth_store, _telemetry_guard) =
949 build_runtime_and_auth_store(db_options, cli_telemetry)?;
950
951 spawn_admin_metrics_listeners(&runtime, &auth_store);
952
953 let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
954 .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
955 let http_backend = http_listener
956 .local_addr()
957 .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
958 let http_server = build_http_server(
959 runtime.clone(),
960 auth_store.clone(),
961 http_backend.to_string(),
962 );
963 let http_handle = http_server.serve_in_background_on(http_listener);
964
965 thread::sleep(Duration::from_millis(100));
966 if http_handle.is_finished() {
967 return match http_handle.join() {
968 Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
969 Ok(Err(err)) => Err(err.to_string()),
970 Err(_) => Err("HTTP backend thread panicked".to_string()),
971 };
972 }
973
974 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
975 .enable_all()
976 .worker_threads(worker_threads)
977 .thread_stack_size(rt_config.stack_size)
978 .build()
979 .map_err(|err| format!("tokio runtime: {err}"))?;
980
981 let signal_runtime = runtime.clone();
982 tokio_runtime.block_on(async move {
983 spawn_lifecycle_signal_handler(signal_runtime).await;
984 let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
985 .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
986 let grpc_backend = grpc_listener
987 .local_addr()
988 .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
989 let grpc_server = RedDBGrpcServer::with_options(
990 runtime.clone(),
991 GrpcServerOptions {
992 bind_addr: grpc_backend.to_string(),
993 tls: None,
994 },
995 auth_store,
996 );
997 tokio::spawn(async move {
998 if let Err(err) = grpc_server.serve_on(grpc_listener).await {
999 tracing::error!(err = %err, "gRPC backend error");
1000 }
1001 });
1002
1003 let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1004 .await
1005 .map_err(|err| format!("bind internal wire listener: {err}"))?;
1006 let wire_backend = wire_listener
1007 .local_addr()
1008 .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1009 let wire_rt = Arc::new(runtime);
1010 tokio::spawn(async move {
1011 if let Err(err) =
1012 crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1013 .await
1014 {
1015 tracing::error!(err = %err, "redwire backend error");
1016 }
1017 });
1018
1019 tracing::info!(
1020 bind = %router_bind_addr,
1021 cpus = rt_config.available_cpus,
1022 workers = worker_threads,
1023 "router bootstrapping"
1024 );
1025 serve_tcp_router(TcpProtocolRouterConfig {
1026 bind_addr: router_bind_addr,
1027 grpc_backend,
1028 http_backend,
1029 wire_backend,
1030 })
1031 .await
1032 .map_err(|err| err.to_string())
1033 })
1034}
1035
1036fn spawn_wire_listeners(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1038 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1040 let wire_rt = Arc::new(runtime.clone());
1041 tokio::spawn(async move {
1042 #[cfg(unix)]
1045 {
1046 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1047 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1048 &wire_addr, wire_rt,
1049 )
1050 .await
1051 {
1052 tracing::error!(err = %e, "redwire unix listener error");
1053 }
1054 return;
1055 }
1056 }
1057 let cfg = crate::wire::RedWireConfig {
1058 bind_addr: wire_addr,
1059 auth_store: None,
1060 oauth: None,
1061 };
1062 if let Err(e) = crate::wire::start_redwire_listener(cfg, wire_rt).await {
1063 tracing::error!(err = %e, "redwire listener error");
1064 }
1065 });
1066 }
1067
1068 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1070 let tls_config = resolve_wire_tls_config(config);
1071 match tls_config {
1072 Ok(tls_cfg) => {
1073 let wire_rt = Arc::new(runtime.clone());
1074 tokio::spawn(async move {
1075 if let Err(e) =
1076 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1077 .await
1078 {
1079 tracing::error!(err = %e, "redwire+tls listener error");
1080 }
1081 });
1082 }
1083 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1084 }
1085 }
1086}
1087
1088fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1095 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1096 let rt = Arc::new(runtime.clone());
1097 tokio::spawn(async move {
1098 let cfg = crate::wire::PgWireConfig {
1099 bind_addr: pg_addr,
1100 ..Default::default()
1101 };
1102 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1103 tracing::error!(err = %e, "pg wire listener error");
1104 }
1105 });
1106 }
1107}
1108
1109fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1123 use crate::utils::secret_file::expand_file_env;
1124
1125 for var in [
1129 "REDDB_GRPC_TLS_CERT",
1130 "REDDB_GRPC_TLS_KEY",
1131 "REDDB_GRPC_TLS_CLIENT_CA",
1132 ] {
1133 if let Err(err) = expand_file_env(var) {
1134 tracing::warn!(
1135 target: "reddb::secrets",
1136 env = %var,
1137 err = %err,
1138 "could not expand *_FILE companion for gRPC TLS"
1139 );
1140 }
1141 }
1142
1143 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1144 (Some(cert), Some(key)) => {
1145 let cert_pem = std::fs::read(cert)
1146 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1147 let key_pem =
1148 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1149 (cert_pem, key_pem)
1150 }
1151 _ => {
1152 let dev = std::env::var("RED_GRPC_TLS_DEV")
1154 .ok()
1155 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1156 .unwrap_or(false);
1157 if !dev {
1158 return Err("gRPC TLS configured but no cert/key supplied — set \
1159 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1160 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1161 .to_string());
1162 }
1163 let dir = config
1164 .path
1165 .as_ref()
1166 .and_then(|p| p.parent())
1167 .map(PathBuf::from)
1168 .unwrap_or_else(|| PathBuf::from("."));
1169 let (cert_pem_str, key_pem_str) =
1170 crate::wire::tls::generate_self_signed_cert("localhost")
1171 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1172
1173 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1178 tracing::warn!(
1179 target: "reddb::security",
1180 transport = "grpc",
1181 cert_sha256 = %fp,
1182 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1183 DO NOT use in production"
1184 );
1185 let cert_path = dir.join("grpc-tls-cert.pem");
1187 let key_path = dir.join("grpc-tls-key.pem");
1188 if !cert_path.exists() || !key_path.exists() {
1189 let _ = std::fs::create_dir_all(&dir);
1190 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1191 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1192 std::fs::write(&key_path, key_pem_str.as_bytes())
1193 .map_err(|e| format!("write grpc dev key: {e}"))?;
1194 #[cfg(unix)]
1195 {
1196 use std::os::unix::fs::PermissionsExt;
1197 let _ =
1198 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1199 }
1200 }
1201 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1202 }
1203 };
1204
1205 let client_ca_pem = match &config.grpc_tls_client_ca {
1206 Some(path) => Some(
1207 std::fs::read(path)
1208 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1209 ),
1210 None => None,
1211 };
1212
1213 Ok(crate::GrpcTlsOptions {
1214 cert_pem,
1215 key_pem,
1216 client_ca_pem,
1217 })
1218}
1219
1220fn spawn_grpc_tls_listener_if_configured(
1224 config: &ServerCommandConfig,
1225 runtime: RedDBRuntime,
1226 auth_store: Arc<AuthStore>,
1227) {
1228 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1229 return;
1230 };
1231 let tls_opts = match resolve_grpc_tls_options(config) {
1232 Ok(opts) => opts,
1233 Err(err) => {
1234 tracing::error!(
1235 target: "reddb::security",
1236 transport = "grpc",
1237 err = %err,
1238 "gRPC TLS config error; TLS listener will not start"
1239 );
1240 return;
1241 }
1242 };
1243 tokio::spawn(async move {
1244 let server = RedDBGrpcServer::with_options(
1245 runtime,
1246 GrpcServerOptions {
1247 bind_addr: tls_bind.clone(),
1248 tls: Some(tls_opts),
1249 },
1250 auth_store,
1251 );
1252 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1253 if let Err(err) = server.serve().await {
1254 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1255 }
1256 });
1257}
1258
1259fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1262 use sha2::{Digest, Sha256};
1263 let mut h = Sha256::new();
1264 h.update(pem);
1265 let d = h.finalize();
1266 let mut buf = String::with_capacity(64);
1267 for b in d.iter() {
1268 buf.push_str(&format!("{b:02x}"));
1269 }
1270 buf
1271}
1272
1273fn resolve_wire_tls_config(
1275 config: &ServerCommandConfig,
1276) -> Result<crate::wire::WireTlsConfig, String> {
1277 match (&config.wire_tls_cert, &config.wire_tls_key) {
1278 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1279 cert_path: cert.clone(),
1280 key_path: key.clone(),
1281 }),
1282 _ => {
1283 let dir = config
1285 .path
1286 .as_ref()
1287 .and_then(|p| p.parent())
1288 .map(PathBuf::from)
1289 .unwrap_or_else(|| PathBuf::from("."));
1290 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1291 }
1292 }
1293}
1294
1295#[inline(never)]
1296fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1297 let rt_config = detect_runtime_config();
1298 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1299 let cli_telemetry = config.telemetry.clone();
1300 let db_options = config.to_db_options();
1301
1302 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1303 .enable_all()
1304 .worker_threads(workers)
1305 .thread_stack_size(rt_config.stack_size)
1306 .build()
1307 .map_err(|err| format!("tokio runtime: {err}"))?;
1308
1309 let (runtime, _auth_store, _telemetry_guard) =
1313 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1314 let signal_runtime = runtime.clone();
1315 tokio_runtime.block_on(async move {
1316 spawn_lifecycle_signal_handler(signal_runtime).await;
1317 let wire_rt = Arc::new(runtime);
1318 let cfg = crate::wire::RedWireConfig {
1319 bind_addr: wire_addr,
1320 auth_store: None,
1321 oauth: None,
1322 };
1323 crate::wire::start_redwire_listener(cfg, wire_rt)
1324 .await
1325 .map_err(|e| e.to_string())
1326 })
1327}
1328
1329#[inline(never)]
1330fn build_runtime_and_auth_store(
1331 db_options: RedDBOptions,
1332 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1333) -> Result<
1334 (
1335 RedDBRuntime,
1336 Arc<AuthStore>,
1337 Option<crate::telemetry::TelemetryGuard>,
1338 ),
1339 String,
1340> {
1341 build_runtime_with_telemetry(db_options, cli_telemetry)
1348}
1349
1350pub(crate) fn build_runtime_with_telemetry(
1360 db_options: RedDBOptions,
1361 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1362) -> Result<
1363 (
1364 RedDBRuntime,
1365 Arc<AuthStore>,
1366 Option<crate::telemetry::TelemetryGuard>,
1367 ),
1368 String,
1369> {
1370 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1371 let msg = err.to_string();
1377 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1378 phase: "runtime_construction".to_string(),
1379 error: msg.clone(),
1380 }
1381 .emit_global();
1382 msg
1383 })?;
1384
1385 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1390 let msg = err.to_string();
1391 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1392 phase: "lease_loop".to_string(),
1393 error: msg.clone(),
1394 }
1395 .emit_global();
1396 msg
1397 })?;
1398
1399 if let Some(data_path) = db_options.data_path.as_deref() {
1403 let watch_dir = data_path.parent().unwrap_or(data_path);
1404 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1405 }
1406
1407 {
1411 let config_path = crate::runtime::config_overlay::config_file_path();
1412 let store = runtime.db().store();
1413 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1414 }
1415
1416 let merged = merge_telemetry_with_config(
1419 cli_telemetry
1420 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1421 &runtime,
1422 );
1423 let telemetry_guard = crate::telemetry::init(merged);
1424
1425 let auth_store =
1426 if db_options.auth.vault_enabled {
1427 let pager =
1428 runtime.db().store().pager().cloned().ok_or_else(|| {
1429 "vault requires a paged database (persistent mode)".to_string()
1430 })?;
1431 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1432 .map_err(|err| err.to_string())?;
1433 Arc::new(store)
1434 } else {
1435 Arc::new(AuthStore::new(db_options.auth.clone()))
1436 };
1437 auth_store.bootstrap_from_env();
1438
1439 {
1441 let store = Arc::clone(&auth_store);
1442 std::thread::Builder::new()
1443 .name("reddb-session-purge".into())
1444 .spawn(move || loop {
1445 std::thread::sleep(std::time::Duration::from_secs(300));
1446 store.purge_expired_sessions();
1447 })
1448 .ok();
1449 }
1450
1451 Ok((runtime, auth_store, telemetry_guard))
1452}
1453
1454fn merge_telemetry_with_config(
1465 mut cli: crate::telemetry::TelemetryConfig,
1466 runtime: &RedDBRuntime,
1467) -> crate::telemetry::TelemetryConfig {
1468 use crate::storage::schema::Value;
1469
1470 let store = runtime.db().store();
1471
1472 if !cli.level_explicit {
1473 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1474 cli.level_filter = v.to_string();
1475 }
1476 }
1477 if !cli.format_explicit {
1478 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1479 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1480 cli.format = parsed;
1481 }
1482 }
1483 }
1484 if !cli.rotation_keep_days_explicit {
1485 match store.get_config("red.logging.keep_days") {
1486 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1487 cli.rotation_keep_days = n as u16
1488 }
1489 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1490 cli.rotation_keep_days = n as u16
1491 }
1492 Some(Value::Text(v)) => {
1493 if let Ok(n) = v.parse::<u16>() {
1494 cli.rotation_keep_days = n;
1495 }
1496 }
1497 _ => {}
1498 }
1499 }
1500 if !cli.file_prefix_explicit {
1501 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1502 if !v.is_empty() {
1503 cli.file_prefix = v.to_string();
1504 }
1505 }
1506 }
1507 if !cli.log_dir_explicit && !cli.log_file_disabled {
1510 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1511 if !v.is_empty() {
1512 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1513 }
1514 }
1515 }
1516
1517 cli
1518}
1519
1520#[cfg(test)]
1521mod telemetry_merge_tests {
1522 use super::*;
1523 use crate::telemetry::{LogFormat, TelemetryConfig};
1524
1525 fn fresh_runtime() -> RedDBRuntime {
1526 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1527 }
1528
1529 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1530 runtime
1531 .db()
1532 .store()
1533 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1534 }
1535
1536 fn cli_base() -> TelemetryConfig {
1537 TelemetryConfig {
1540 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1541 format: LogFormat::Json,
1542 ..Default::default()
1543 }
1544 }
1545
1546 #[test]
1547 fn config_log_dir_promoted_when_flag_absent() {
1548 let runtime = fresh_runtime();
1549 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1550 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1551 assert_eq!(
1552 merged.log_dir.as_deref(),
1553 Some(std::path::Path::new("/var/log/reddb"))
1554 );
1555 }
1556
1557 #[test]
1558 fn explicit_log_dir_wins_over_config() {
1559 let runtime = fresh_runtime();
1560 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1561 let mut cli = cli_base();
1562 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1563 cli.log_dir_explicit = true;
1564 let merged = merge_telemetry_with_config(cli, &runtime);
1565 assert_eq!(
1566 merged.log_dir.as_deref(),
1567 Some(std::path::Path::new("/custom/dir"))
1568 );
1569 }
1570
1571 #[test]
1572 fn no_log_file_beats_config_log_dir() {
1573 let runtime = fresh_runtime();
1574 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1575 let mut cli = cli_base();
1576 cli.log_dir = None;
1577 cli.log_file_disabled = true;
1578 let merged = merge_telemetry_with_config(cli, &runtime);
1579 assert!(
1580 merged.log_dir.is_none(),
1581 "--no-log-file must veto config dir"
1582 );
1583 }
1584
1585 #[test]
1586 fn config_format_promoted_on_non_tty_default() {
1587 let runtime = fresh_runtime();
1591 set_str(&runtime, "red.logging.format", "pretty");
1592 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1593 assert_eq!(merged.format, LogFormat::Pretty);
1594 }
1595
1596 #[test]
1597 fn explicit_format_wins_over_config() {
1598 let runtime = fresh_runtime();
1599 set_str(&runtime, "red.logging.format", "pretty");
1600 let mut cli = cli_base();
1601 cli.format = LogFormat::Json;
1602 cli.format_explicit = true;
1603 let merged = merge_telemetry_with_config(cli, &runtime);
1604 assert_eq!(merged.format, LogFormat::Json);
1605 }
1606}
1607
1608#[inline(never)]
1609fn build_http_server(
1610 runtime: RedDBRuntime,
1611 auth_store: Arc<AuthStore>,
1612 bind_addr: String,
1613) -> RedDBServer {
1614 RedDBServer::with_options(
1615 runtime,
1616 ServerOptions {
1617 bind_addr,
1618 ..ServerOptions::default()
1619 },
1620 )
1621 .with_auth(auth_store)
1622}
1623
1624#[inline(never)]
1628fn build_admin_only_server(
1629 runtime: RedDBRuntime,
1630 auth_store: Arc<AuthStore>,
1631 bind_addr: String,
1632) -> RedDBServer {
1633 RedDBServer::with_options(
1634 runtime,
1635 ServerOptions {
1636 bind_addr,
1637 surface: crate::server::ServerSurface::AdminOnly,
1638 ..ServerOptions::default()
1639 },
1640 )
1641 .with_auth(auth_store)
1642}
1643
1644#[inline(never)]
1648fn build_metrics_only_server(
1649 runtime: RedDBRuntime,
1650 auth_store: Arc<AuthStore>,
1651 bind_addr: String,
1652) -> RedDBServer {
1653 RedDBServer::with_options(
1654 runtime,
1655 ServerOptions {
1656 bind_addr,
1657 surface: crate::server::ServerSurface::MetricsOnly,
1658 ..ServerOptions::default()
1659 },
1660 )
1661 .with_auth(auth_store)
1662}
1663
1664fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
1668 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
1669 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1670 let _ = server.serve_in_background();
1671 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
1672 }
1673 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
1674 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1675 let _ = server.serve_in_background();
1676 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
1677 }
1678}
1679
1680#[inline(never)]
1681fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1682 let cli_telemetry = config.telemetry.clone();
1683 let (runtime, auth_store, _telemetry_guard) =
1684 build_runtime_and_auth_store(config.to_db_options(), cli_telemetry)?;
1685 spawn_admin_metrics_listeners(&runtime, &auth_store);
1686 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1687 let server = build_http_server(runtime, auth_store, bind_addr.clone());
1688 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
1689 server.serve().map_err(|err| err.to_string())
1690}
1691
1692fn spawn_http_tls_listener(
1698 config: &ServerCommandConfig,
1699 runtime: &RedDBRuntime,
1700 auth_store: &Arc<AuthStore>,
1701) -> Result<(), String> {
1702 let Some(addr) = config.http_tls_bind_addr.clone() else {
1703 return Ok(());
1704 };
1705
1706 let tls_config = resolve_http_tls_config(config)?;
1707 let server_config = crate::server::tls::build_server_config(&tls_config)
1708 .map_err(|err| format!("HTTP TLS: {err}"))?;
1709
1710 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
1711 let _handle = server.serve_tls_in_background(server_config);
1712 tracing::info!(
1713 transport = "https",
1714 bind = %addr,
1715 mtls = %tls_config.client_ca_path.is_some(),
1716 "TLS listener online"
1717 );
1718 Ok(())
1719}
1720
1721fn resolve_http_tls_config(
1723 config: &ServerCommandConfig,
1724) -> Result<crate::server::tls::HttpTlsConfig, String> {
1725 match (&config.http_tls_cert, &config.http_tls_key) {
1726 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
1727 cert_path: cert.clone(),
1728 key_path: key.clone(),
1729 client_ca_path: config.http_tls_client_ca.clone(),
1730 }),
1731 (None, None) => {
1732 let dir = config
1734 .path
1735 .as_ref()
1736 .and_then(|p| p.parent().map(std::path::PathBuf::from))
1737 .unwrap_or_else(|| std::path::PathBuf::from("."));
1738 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
1739 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
1740 Ok(crate::server::tls::HttpTlsConfig {
1741 cert_path: auto.cert_path,
1742 key_path: auto.key_path,
1743 client_ca_path: config.http_tls_client_ca.clone(),
1744 })
1745 }
1746 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
1747 }
1748}
1749
1750#[inline(never)]
1751fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1752 let workers = config.workers;
1753 let cli_telemetry = config.telemetry.clone();
1754 let db_options = config.to_db_options();
1755 let rt_config = detect_runtime_config();
1756
1757 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1758
1759 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1760 .enable_all()
1761 .worker_threads(worker_threads)
1762 .thread_stack_size(rt_config.stack_size)
1763 .build()
1764 .map_err(|err| format!("tokio runtime: {err}"))?;
1765
1766 let (runtime, auth_store, _telemetry_guard) =
1768 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1769 let signal_runtime = runtime.clone();
1770 tokio_runtime.block_on(async move {
1771 spawn_lifecycle_signal_handler(signal_runtime).await;
1772 spawn_wire_listeners(&config, &runtime);
1774
1775 spawn_pg_listener(&config, &runtime);
1777
1778 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1782
1783 let server = RedDBGrpcServer::with_options(
1784 runtime,
1785 GrpcServerOptions {
1786 bind_addr: bind_addr.clone(),
1787 tls: None,
1788 },
1789 auth_store,
1790 );
1791
1792 tracing::info!(
1793 transport = "grpc",
1794 bind = %bind_addr,
1795 cpus = rt_config.available_cpus,
1796 workers = worker_threads,
1797 "listener online"
1798 );
1799 server.serve().await.map_err(|err| err.to_string())
1800 })
1801}
1802
1803#[inline(never)]
1804fn run_dual_server(
1805 config: ServerCommandConfig,
1806 grpc_bind_addr: String,
1807 http_bind_addr: String,
1808) -> Result<(), String> {
1809 let workers = config.workers;
1810 let wire_bind_addr = config.wire_bind_addr.clone();
1811 let cli_telemetry = config.telemetry.clone();
1812 let db_options = config.to_db_options();
1813 let rt_config = detect_runtime_config();
1814 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1815 let (runtime, auth_store, _telemetry_guard) =
1816 build_runtime_and_auth_store(db_options, cli_telemetry)?;
1817
1818 spawn_admin_metrics_listeners(&runtime, &auth_store);
1819 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1820
1821 let http_server =
1822 build_http_server(runtime.clone(), auth_store.clone(), http_bind_addr.clone());
1823 let http_handle = http_server.serve_in_background();
1824
1825 thread::sleep(Duration::from_millis(150));
1826 if http_handle.is_finished() {
1827 return match http_handle.join() {
1828 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
1829 Ok(Err(err)) => Err(err.to_string()),
1830 Err(_) => Err("HTTP server thread panicked".to_string()),
1831 };
1832 }
1833
1834 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1835 .enable_all()
1836 .worker_threads(worker_threads)
1837 .thread_stack_size(rt_config.stack_size)
1838 .build()
1839 .map_err(|err| format!("tokio runtime: {err}"))?;
1840
1841 let signal_runtime = runtime.clone();
1842 tokio_runtime.block_on(async move {
1843 spawn_lifecycle_signal_handler(signal_runtime).await;
1844 spawn_wire_listeners(&config, &runtime);
1846
1847 spawn_pg_listener(&config, &runtime);
1849
1850 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1852
1853 let server = RedDBGrpcServer::with_options(
1854 runtime,
1855 GrpcServerOptions {
1856 bind_addr: grpc_bind_addr.clone(),
1857 tls: None,
1858 },
1859 auth_store,
1860 );
1861
1862 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
1863 tracing::info!(
1864 transport = "grpc",
1865 bind = %grpc_bind_addr,
1866 cpus = rt_config.available_cpus,
1867 workers = worker_threads,
1868 "listener online"
1869 );
1870 server.serve().await.map_err(|err| err.to_string())
1871 })
1872}
1873
1874#[cfg(test)]
1875mod tests {
1876 use super::*;
1877
1878 #[test]
1879 fn render_systemd_unit_contains_expected_execstart() {
1880 let config = SystemdServiceConfig {
1881 service_name: "reddb".to_string(),
1882 binary_path: PathBuf::from("/usr/local/bin/red"),
1883 run_user: "reddb".to_string(),
1884 run_group: "reddb".to_string(),
1885 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1886 router_bind_addr: None,
1887 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1888 http_bind_addr: None,
1889 };
1890
1891 let unit = render_systemd_unit(&config);
1892 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
1893 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
1894 }
1895
1896 #[test]
1897 fn systemd_service_config_derives_paths() {
1898 let config = SystemdServiceConfig {
1899 service_name: "reddb-api".to_string(),
1900 binary_path: PathBuf::from("/usr/local/bin/red"),
1901 run_user: "reddb".to_string(),
1902 run_group: "reddb".to_string(),
1903 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
1904 router_bind_addr: None,
1905 grpc_bind_addr: None,
1906 http_bind_addr: Some("127.0.0.1:5055".to_string()),
1907 };
1908
1909 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
1910 assert_eq!(
1911 config.unit_path(),
1912 PathBuf::from("/etc/systemd/system/reddb-api.service")
1913 );
1914 }
1915
1916 #[test]
1917 fn render_systemd_unit_supports_dual_transport() {
1918 let config = SystemdServiceConfig {
1919 service_name: "reddb".to_string(),
1920 binary_path: PathBuf::from("/usr/local/bin/red"),
1921 run_user: "reddb".to_string(),
1922 run_group: "reddb".to_string(),
1923 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1924 router_bind_addr: None,
1925 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1926 http_bind_addr: Some("0.0.0.0:5055".to_string()),
1927 };
1928
1929 let unit = render_systemd_unit(&config);
1930 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
1931 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
1932 }
1933
1934 #[test]
1935 fn render_systemd_unit_supports_router_mode() {
1936 let config = SystemdServiceConfig {
1937 service_name: "reddb".to_string(),
1938 binary_path: PathBuf::from("/usr/local/bin/red"),
1939 run_user: "reddb".to_string(),
1940 run_group: "reddb".to_string(),
1941 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1942 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
1943 grpc_bind_addr: None,
1944 http_bind_addr: None,
1945 };
1946
1947 let unit = render_systemd_unit(&config);
1948 assert!(unit.contains("--bind 127.0.0.1:5050"));
1949 assert!(!unit.contains("--grpc-bind"));
1950 assert!(!unit.contains("--http-bind"));
1951 }
1952}