1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub router_bind_explicit: bool,
71 pub grpc_bind_addr: Option<String>,
72 pub grpc_bind_explicit: bool,
73 pub grpc_tls_bind_addr: Option<String>,
77 pub grpc_tls_cert: Option<PathBuf>,
83 pub grpc_tls_key: Option<PathBuf>,
86 pub grpc_tls_client_ca: Option<PathBuf>,
91 pub http_bind_addr: Option<String>,
92 pub http_bind_explicit: bool,
93 pub http_tls_bind_addr: Option<String>,
97 pub http_tls_cert: Option<PathBuf>,
100 pub http_tls_key: Option<PathBuf>,
103 pub http_tls_client_ca: Option<PathBuf>,
107 pub wire_bind_addr: Option<String>,
108 pub wire_bind_explicit: bool,
109 pub wire_tls_bind_addr: Option<String>,
111 pub wire_tls_cert: Option<PathBuf>,
113 pub wire_tls_key: Option<PathBuf>,
115 pub pg_bind_addr: Option<String>,
119 pub create_if_missing: bool,
120 pub read_only: bool,
121 pub role: String,
122 pub primary_addr: Option<String>,
123 pub vault: bool,
124 pub workers: Option<usize>,
126 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
129 pub http_limits_cli: crate::server::HttpLimitsCliInput,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct TransportListenerState {
138 pub transport: String,
139 pub bind_addr: String,
140 pub explicit: bool,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct TransportListenerFailure {
145 pub transport: String,
146 pub bind_addr: String,
147 pub explicit: bool,
148 pub reason: String,
149}
150
151#[derive(Debug, Clone, Default, PartialEq, Eq)]
152pub struct TransportReadiness {
153 pub active: Vec<TransportListenerState>,
154 pub failed: Vec<TransportListenerFailure>,
155}
156
157impl TransportReadiness {
158 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
159 self.active.push(TransportListenerState {
160 transport: transport.to_string(),
161 bind_addr: bind_addr.to_string(),
162 explicit,
163 });
164 }
165
166 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
167 self.failed.push(TransportListenerFailure {
168 transport: transport.to_string(),
169 bind_addr: bind_addr.to_string(),
170 explicit,
171 reason,
172 });
173 }
174}
175
176#[derive(Debug, Clone)]
177pub struct SystemdServiceConfig {
178 pub service_name: String,
179 pub binary_path: PathBuf,
180 pub run_user: String,
181 pub run_group: String,
182 pub data_path: PathBuf,
183 pub router_bind_addr: Option<String>,
184 pub grpc_bind_addr: Option<String>,
185 pub http_bind_addr: Option<String>,
186}
187
188impl SystemdServiceConfig {
189 pub fn data_dir(&self) -> PathBuf {
190 self.data_path
191 .parent()
192 .map(PathBuf::from)
193 .unwrap_or_else(|| PathBuf::from("."))
194 }
195
196 pub fn unit_path(&self) -> PathBuf {
197 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
198 }
199}
200
201pub fn default_telemetry_for_path(
206 path: Option<&std::path::Path>,
207) -> crate::telemetry::TelemetryConfig {
208 let log_dir = match path {
209 Some(p) => p
210 .parent()
211 .map(|parent| parent.join("logs"))
212 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
213 None => None, };
215 crate::telemetry::TelemetryConfig {
216 log_dir,
217 file_prefix: "reddb.log".to_string(),
218 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
219 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
220 crate::telemetry::LogFormat::Pretty
221 } else {
222 crate::telemetry::LogFormat::Json
223 },
224 rotation_keep_days: 14,
225 service_name: "reddb",
226 level_explicit: false,
228 format_explicit: false,
229 rotation_keep_days_explicit: false,
230 file_prefix_explicit: false,
231 log_dir_explicit: false,
232 log_file_disabled: false,
233 }
234}
235
236const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
243const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
244const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
245
246impl ServerCommandConfig {
247 fn to_db_options(&self) -> Result<RedDBOptions, String> {
248 let mut options = match &self.path {
249 Some(path) => RedDBOptions::persistent(path),
250 None => RedDBOptions::in_memory(),
251 };
252
253 options.mode = StorageMode::Persistent;
254 options.create_if_missing = self.create_if_missing;
255 options.read_only = self.read_only
262 || env_nonempty("RED_READONLY")
263 .or_else(|| env_nonempty("REDDB_READONLY"))
264 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
265 .unwrap_or(false)
266 || self.path.as_ref().is_some_and(|data_path| {
267 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
268 data_path,
269 ))
270 .unwrap_or(false)
271 });
272
273 options.replication = match self.role.as_str() {
274 "primary" => ReplicationConfig::primary(),
275 "replica" => {
276 let primary_addr = self
277 .primary_addr
278 .clone()
279 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
280 ReplicationConfig::replica(primary_addr)
287 }
288 _ => ReplicationConfig::standalone(),
289 };
290
291 if self.vault {
292 options.auth.vault_enabled = true;
293 }
294
295 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
300 Err(msg) => {
301 return Err(format!("backup bootstrap: {msg}"));
302 }
303 Ok(Some(cfg)) => {
304 apply_backup_config(&mut options, &cfg);
305 }
306 Ok(None) => {
307 configure_remote_backend_from_env(&mut options);
308 }
309 }
310
311 Ok(options)
312 }
313
314 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
315 let mut transports = Vec::with_capacity(3);
316 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
317 transports.push(ServerTransport::Grpc);
318 }
319 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
320 transports.push(ServerTransport::Http);
321 }
322 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
323 transports.push(ServerTransport::Wire);
324 }
325 transports
326 }
327}
328
329fn env_nonempty(name: &str) -> Option<String> {
334 crate::utils::env_with_file_fallback(name)
335}
336
337fn env_truthy(name: &str) -> bool {
338 env_nonempty(name)
339 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
340 .unwrap_or(false)
341}
342
343fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
349 let endpoint_host = endpoint_host(&cfg.endpoint);
350
351 options.metadata.insert(
352 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
353 cfg.checkpoint_interval_secs.to_string(),
354 );
355 options.metadata.insert(
356 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
357 cfg.wal_flush_interval_secs.to_string(),
358 );
359 options
360 .metadata
361 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
362
363 #[cfg(feature = "backend-s3")]
364 {
365 let s3_cfg = crate::storage::backend::S3Config {
366 endpoint: cfg.endpoint.clone(),
367 bucket: cfg.bucket.clone(),
368 key_prefix: cfg.prefix.clone(),
369 access_key: cfg.access_key_id.clone(),
370 secret_key: cfg.secret_access_key.clone(),
371 region: cfg.region.clone(),
372 path_style: true,
373 };
374 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
375 options.remote_backend = Some(backend.clone());
376 options.remote_backend_atomic = Some(backend);
377 let trimmed = cfg.prefix.trim_end_matches('/');
382 options.remote_key = Some(format!("{}/data.rdb", trimmed));
383
384 tracing::info!(
385 backend = "s3",
386 endpoint = %endpoint_host,
387 bucket = %cfg.bucket,
388 prefix = %cfg.prefix,
389 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
390 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
391 "backup backend configured from REDDB_BACKUP_* env"
392 );
393 }
394
395 #[cfg(not(feature = "backend-s3"))]
396 {
397 tracing::warn!(
398 backend = "s3",
399 endpoint = %endpoint_host,
400 bucket = %cfg.bucket,
401 prefix = %cfg.prefix,
402 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
403 backend wiring skipped (archiver/checkpointer also disabled)"
404 );
405 }
406}
407
408fn endpoint_host(endpoint: &str) -> &str {
409 let after_scheme = endpoint
410 .split_once("://")
411 .map(|(_, r)| r)
412 .unwrap_or(endpoint);
413 after_scheme.split('/').next().unwrap_or(after_scheme)
414}
415
416fn spawn_backup_tasks_if_configured(
422 options: &RedDBOptions,
423 runtime: &RedDBRuntime,
424) -> Option<BackupTasksHandle> {
425 let checkpoint_secs: u64 = options
426 .metadata
427 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
428 .parse()
429 .ok()?;
430 let wal_secs: u64 = options
431 .metadata
432 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
433 .parse()
434 .ok()?;
435 if options.remote_backend.is_none() {
436 return None;
437 }
438
439 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
440
441 let checkpoint_handle = {
442 let stop = Arc::clone(&stop);
443 let runtime = runtime.clone();
444 let interval = Duration::from_secs(checkpoint_secs);
445 thread::Builder::new()
446 .name("red-checkpointer".into())
447 .spawn(move || {
448 periodic_loop(stop, interval, move || {
449 if let Err(err) = runtime.checkpoint() {
450 tracing::warn!(error = %err, "periodic checkpoint failed");
451 }
452 })
453 })
454 .ok()
455 };
456
457 let archiver_handle = {
458 let stop = Arc::clone(&stop);
459 let runtime = runtime.clone();
460 let interval = Duration::from_secs(wal_secs);
461 thread::Builder::new()
462 .name("red-wal-archiver".into())
463 .spawn(move || {
464 periodic_loop(stop, interval, move || {
465 if let Err(err) = runtime.trigger_backup() {
466 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
467 }
468 })
469 })
470 .ok()
471 };
472
473 tracing::info!(
474 checkpoint_interval_secs = checkpoint_secs,
475 wal_flush_interval_secs = wal_secs,
476 "backup tasks spawned (checkpointer + WAL archiver)"
477 );
478
479 Some(BackupTasksHandle {
480 stop,
481 _checkpoint_handle: checkpoint_handle,
482 _archiver_handle: archiver_handle,
483 })
484}
485
486pub struct BackupTasksHandle {
489 stop: Arc<std::sync::atomic::AtomicBool>,
490 _checkpoint_handle: Option<thread::JoinHandle<()>>,
491 _archiver_handle: Option<thread::JoinHandle<()>>,
492}
493
494impl Drop for BackupTasksHandle {
495 fn drop(&mut self) {
496 self.stop.store(true, std::sync::atomic::Ordering::Release);
497 }
498}
499
500fn periodic_loop<F: FnMut()>(
501 stop: Arc<std::sync::atomic::AtomicBool>,
502 interval: Duration,
503 mut tick: F,
504) {
505 let wake = Duration::from_secs(1);
508 let mut elapsed = Duration::ZERO;
509 while !stop.load(std::sync::atomic::Ordering::Acquire) {
510 thread::sleep(wake);
511 elapsed += wake;
512 if elapsed >= interval {
513 tick();
514 elapsed = Duration::ZERO;
515 }
516 }
517}
518
519fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
520 let backend = env_nonempty("RED_BACKEND")
526 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
527 .unwrap_or_else(|| "none".to_string())
528 .to_ascii_lowercase();
529
530 match backend.as_str() {
531 "s3" | "minio" | "r2" => {
536 #[cfg(feature = "backend-s3")]
537 {
538 if let Some(config) = s3_config_from_env() {
539 let remote_key = env_nonempty("RED_REMOTE_KEY")
540 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
541 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
542 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
543 options.remote_backend = Some(backend.clone());
544 options.remote_backend_atomic = Some(backend);
545 options.remote_key = Some(remote_key);
546 }
547 }
548 #[cfg(not(feature = "backend-s3"))]
549 {
550 tracing::warn!(
551 backend = %backend,
552 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
553 );
554 }
555 }
556 "fs" | "local" => {
561 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
562 let remote_key = match (
563 base_path,
564 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
565 ) {
566 (Some(base), Some(rel)) => Some(format!(
567 "{}/{}",
568 base.trim_end_matches('/'),
569 rel.trim_start_matches('/')
570 )),
571 (Some(base), None) => Some(format!(
572 "{}/clusters/dev/data.rdb",
573 base.trim_end_matches('/')
574 )),
575 (None, Some(rel)) => Some(rel),
576 (None, None) => None,
577 };
578 if let Some(key) = remote_key {
579 let backend = Arc::new(crate::storage::backend::LocalBackend);
580 options.remote_backend = Some(backend.clone());
581 options.remote_backend_atomic = Some(backend);
582 options.remote_key = Some(key);
583 }
584 }
585 "http" => {
590 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
591 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
592 {
593 Some(u) => u,
594 None => {
595 tracing::warn!(
596 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
597 );
598 return;
599 }
600 };
601 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
602 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
603 .unwrap_or_default();
604 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
605 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
606 {
607 std::fs::read_to_string(&path)
608 .ok()
609 .map(|s| s.trim().to_string())
610 .filter(|s| !s.is_empty())
611 } else {
612 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
613 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
614 };
615
616 let mut config =
617 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
618 if let Some(auth) = auth_header {
619 config = config.with_auth_header(auth);
620 }
621 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
622 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
623 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
624 config = config.with_conditional_writes(conditional_writes);
625 if conditional_writes {
630 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
631 Ok(atomic) => {
632 let atomic_arc = Arc::new(atomic);
633 options.remote_backend = Some(atomic_arc.clone());
634 options.remote_backend_atomic = Some(atomic_arc);
635 }
636 Err(err) => {
637 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
638 options.remote_backend =
639 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
640 }
641 }
642 } else {
643 options.remote_backend =
644 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
645 }
646 options.remote_key = env_nonempty("RED_REMOTE_KEY")
647 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
648 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
649 }
650 "none" | "" => {}
653 other => {
654 tracing::warn!(
655 backend = %other,
656 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
657 );
658 }
659 }
660}
661
662#[cfg(feature = "backend-s3")]
667fn env_s3(suffix: &str) -> Option<String> {
668 env_nonempty(&format!("RED_S3_{suffix}"))
669 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
670}
671
672#[cfg(feature = "backend-s3")]
678fn env_s3_secret(suffix: &str) -> Option<String> {
679 let file_key_red = format!("RED_S3_{suffix}_FILE");
680 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
681 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
682 return std::fs::read_to_string(&path)
683 .ok()
684 .map(|s| s.trim().to_string())
685 .filter(|s| !s.is_empty());
686 }
687 env_s3(suffix)
688}
689
690#[cfg(feature = "backend-s3")]
691fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
692 let endpoint = env_s3("ENDPOINT")?;
693 let bucket = env_s3("BUCKET")?;
694 let access_key = env_s3_secret("ACCESS_KEY")?;
695 let secret_key = env_s3_secret("SECRET_KEY")?;
696 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
697 let key_prefix = env_s3("KEY_PREFIX")
698 .or_else(|| env_s3("PREFIX"))
699 .unwrap_or_default();
700 let path_style = env_s3("PATH_STYLE")
701 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
702 .unwrap_or(true);
703 Some(crate::storage::backend::S3Config {
704 endpoint,
705 bucket,
706 key_prefix,
707 access_key,
708 secret_key,
709 region,
710 path_style,
711 })
712}
713
714pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
715 let data_dir = config.data_dir();
716 let exec_start = render_systemd_exec_start(config);
717 format!(
718 "[Unit]\n\
719Description=RedDB unified database service\n\
720After=network-online.target\n\
721Wants=network-online.target\n\
722\n\
723[Service]\n\
724Type=simple\n\
725User={user}\n\
726Group={group}\n\
727WorkingDirectory={workdir}\n\
728ExecStart={exec_start}\n\
729Restart=always\n\
730RestartSec=2\n\
731LimitSTACK=16M\n\
732NoNewPrivileges=true\n\
733PrivateTmp=true\n\
734ProtectSystem=strict\n\
735ProtectHome=true\n\
736ProtectControlGroups=true\n\
737ProtectKernelTunables=true\n\
738ProtectKernelModules=true\n\
739RestrictNamespaces=true\n\
740LockPersonality=true\n\
741MemoryDenyWriteExecute=true\n\
742ReadWritePaths={workdir}\n\
743\n\
744[Install]\n\
745WantedBy=multi-user.target\n",
746 user = config.run_user,
747 group = config.run_group,
748 workdir = data_dir.display(),
749 exec_start = exec_start,
750 )
751}
752
753#[cfg(target_os = "linux")]
762pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
763 ensure_root()?;
764 ensure_command_available("systemctl")?;
765 ensure_command_available("getent")?;
766 ensure_command_available("groupadd")?;
767 ensure_command_available("useradd")?;
768 ensure_command_available("install")?;
769 ensure_executable(&config.binary_path)?;
770
771 if !command_success("getent", ["group", config.run_group.as_str()])? {
772 run_command("groupadd", ["--system", config.run_group.as_str()])?;
773 }
774
775 if !command_success("id", ["-u", config.run_user.as_str()])? {
776 let data_dir = config.data_dir();
777 run_command(
778 "useradd",
779 [
780 "--system",
781 "--gid",
782 config.run_group.as_str(),
783 "--home-dir",
784 data_dir.to_string_lossy().as_ref(),
785 "--shell",
786 "/usr/sbin/nologin",
787 config.run_user.as_str(),
788 ],
789 )?;
790 }
791
792 let data_dir = config.data_dir();
793 run_command(
794 "install",
795 [
796 "-d",
797 "-o",
798 config.run_user.as_str(),
799 "-g",
800 config.run_group.as_str(),
801 "-m",
802 "0750",
803 data_dir.to_string_lossy().as_ref(),
804 ],
805 )?;
806
807 std::fs::write(config.unit_path(), render_systemd_unit(config))
808 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
809
810 run_command("systemctl", ["daemon-reload"])?;
811 run_command(
812 "systemctl",
813 [
814 "enable",
815 "--now",
816 format!("{}.service", config.service_name).as_str(),
817 ],
818 )?;
819
820 Ok(())
821}
822
823#[cfg(not(target_os = "linux"))]
828pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
829 Err("systemd install is Linux-only — use sc.exe (Windows) or \
830 launchd (macOS) to install the service manually using the \
831 unit printed by `red service print-unit`"
832 .to_string())
833}
834
835#[cfg(target_os = "linux")]
836fn ensure_root() -> Result<(), String> {
837 let output = Command::new("id")
838 .arg("-u")
839 .output()
840 .map_err(|err| format!("failed to determine current uid: {err}"))?;
841 if !output.status.success() {
842 return Err("failed to determine current uid".to_string());
843 }
844 let uid = String::from_utf8_lossy(&output.stdout);
845 if uid.trim() != "0" {
846 return Err("run this command as root (sudo)".to_string());
847 }
848 Ok(())
849}
850
851#[cfg(target_os = "linux")]
852fn ensure_command_available(command: &str) -> Result<(), String> {
853 let status = Command::new("sh")
854 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
855 .status()
856 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
857 if status.success() {
858 Ok(())
859 } else {
860 Err(format!("required command not found: {command}"))
861 }
862}
863
864#[cfg(target_os = "linux")]
865fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
866 let metadata = std::fs::metadata(path)
867 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
868 #[cfg(unix)]
869 {
870 use std::os::unix::fs::PermissionsExt;
871 if metadata.permissions().mode() & 0o111 == 0 {
872 return Err(format!("binary is not executable: {}", path.display()));
873 }
874 }
875 #[cfg(not(unix))]
876 {
877 if !metadata.is_file() {
878 return Err(format!("binary is not a file: {}", path.display()));
879 }
880 }
881 Ok(())
882}
883
884#[cfg(target_os = "linux")]
885fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
886 Command::new(program)
887 .args(args)
888 .status()
889 .map(|status| status.success())
890 .map_err(|err| format!("failed to run {program}: {err}"))
891}
892
893#[cfg(target_os = "linux")]
894fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
895 let output = Command::new(program)
896 .args(args)
897 .output()
898 .map_err(|err| format!("failed to run {program}: {err}"))?;
899 if output.status.success() {
900 return Ok(());
901 }
902
903 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
904 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
905 let detail = if !stderr.is_empty() {
906 stderr
907 } else if !stdout.is_empty() {
908 stdout
909 } else {
910 format!("exit status {}", output.status)
911 };
912 Err(format!("{program} failed: {detail}"))
913}
914
915pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
916 let has_any = config.router_bind_addr.is_some()
917 || config.grpc_bind_addr.is_some()
918 || config.http_bind_addr.is_some()
919 || config.wire_bind_addr.is_some()
920 || config.pg_bind_addr.is_some();
921 if !has_any {
922 return Err("at least one server bind address must be configured".into());
923 }
924 let thread_name = if config.router_bind_addr.is_some() {
925 "red-server-router"
926 } else {
927 match (
928 config.grpc_bind_addr.is_some(),
929 config.http_bind_addr.is_some(),
930 ) {
931 (true, true) => "red-server-dual",
932 (true, false) => "red-server-grpc",
933 (false, true) => "red-server-http",
934 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
935 (false, false) => "red-server-pg-wire",
936 }
937 };
938
939 let handle = thread::Builder::new()
940 .name(thread_name.into())
941 .stack_size(8 * 1024 * 1024)
942 .spawn(move || run_configured_servers(config))
943 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
944
945 match handle.join() {
946 Ok(result) => result,
947 Err(_) => Err("server thread panicked".to_string()),
948 }
949}
950
951fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
952 let mut parts = vec![
953 config.binary_path.display().to_string(),
954 "server".to_string(),
955 "--path".to_string(),
956 config.data_path.display().to_string(),
957 ];
958
959 if let Some(bind_addr) = &config.router_bind_addr {
960 parts.push("--bind".to_string());
961 parts.push(bind_addr.clone());
962 } else if let Some(bind_addr) = &config.grpc_bind_addr {
963 parts.push("--grpc-bind".to_string());
964 parts.push(bind_addr.clone());
965 }
966 if let Some(bind_addr) = &config.http_bind_addr {
967 parts.push("--http-bind".to_string());
968 parts.push(bind_addr.clone());
969 }
970
971 parts.join(" ")
972}
973
974pub fn probe_listener(target: &str, timeout: Duration) -> bool {
975 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
976 Ok(addresses) => addresses.collect(),
977 Err(_) => return false,
978 };
979
980 addresses
981 .into_iter()
982 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
983}
984
985#[inline(never)]
986fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
987 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
993 return run_routed_server(config, router_bind_addr);
994 }
995
996 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
997 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
998 run_dual_server(config, grpc_bind_addr, http_bind_addr)
999 }
1000 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1001 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1002 (None, None) => {
1003 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1004 run_wire_only_server(config, wire_addr)
1005 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1006 run_pg_only_server(config, pg_addr)
1007 } else {
1008 Err("at least one server bind address must be configured".to_string())
1009 }
1010 }
1011 }
1012}
1013
1014pub fn bind_listener_for_startup(
1032 readiness: &mut TransportReadiness,
1033 transport: &str,
1034 bind_addr: &str,
1035 explicit: bool,
1036) -> Result<Option<TcpListener>, String> {
1037 match TcpListener::bind(bind_addr) {
1038 Ok(listener) => {
1039 readiness.active(transport, bind_addr, explicit);
1040 Ok(Some(listener))
1041 }
1042 Err(err) => {
1043 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1044 readiness.failed(transport, bind_addr, explicit, reason.clone());
1045 if explicit {
1046 tracing::error!(
1047 transport,
1048 bind = %bind_addr,
1049 error = %err,
1050 "fatal explicit bind failure"
1051 );
1052 Err(format!("explicit {reason}"))
1053 } else {
1054 tracing::warn!(
1055 transport,
1056 bind = %bind_addr,
1057 error = %err,
1058 "non-fatal implicit bind failure; listener degraded"
1059 );
1060 Ok(None)
1061 }
1062 }
1063 }
1064}
1065
1066async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1089 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1090 .ok()
1091 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1092 .unwrap_or(true);
1093
1094 #[cfg(unix)]
1095 {
1096 use tokio::signal::unix::{signal, SignalKind};
1097
1098 let mut sigterm = match signal(SignalKind::terminate()) {
1099 Ok(s) => s,
1100 Err(err) => {
1101 tracing::warn!(
1102 error = %err,
1103 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1104 );
1105 return;
1106 }
1107 };
1108 let mut sigint = match signal(SignalKind::interrupt()) {
1109 Ok(s) => s,
1110 Err(err) => {
1111 tracing::warn!(error = %err, "could not install SIGINT handler");
1112 return;
1113 }
1114 };
1115 let mut sighup = match signal(SignalKind::hangup()) {
1121 Ok(s) => Some(s),
1122 Err(err) => {
1123 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1124 None
1125 }
1126 };
1127
1128 let reload_runtime = runtime.clone();
1129 tokio::spawn(async move {
1130 loop {
1131 let signal_name = match &mut sighup {
1132 Some(hup) => tokio::select! {
1133 _ = sigterm.recv() => "SIGTERM",
1134 _ = sigint.recv() => "SIGINT",
1135 _ = hup.recv() => "SIGHUP",
1136 },
1137 None => tokio::select! {
1138 _ = sigterm.recv() => "SIGTERM",
1139 _ = sigint.recv() => "SIGINT",
1140 },
1141 };
1142
1143 if signal_name == "SIGHUP" {
1144 handle_sighup_reload(&reload_runtime);
1145 continue; }
1147
1148 tracing::info!(
1149 signal = signal_name,
1150 "lifecycle signal received; shutting down"
1151 );
1152 match runtime.graceful_shutdown(backup_on_shutdown) {
1153 Ok(report) => {
1154 tracing::info!(
1155 duration_ms = report.duration_ms,
1156 flushed_wal = report.flushed_wal,
1157 final_checkpoint = report.final_checkpoint,
1158 backup_uploaded = report.backup_uploaded,
1159 "graceful shutdown complete"
1160 );
1161 }
1162 Err(err) => {
1163 tracing::error!(error = %err, "graceful shutdown failed");
1164 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1170 reason: format!("graceful shutdown failed: {err}"),
1171 }
1172 .emit_global();
1173 }
1174 }
1175 std::process::exit(0);
1176 }
1177 });
1178 }
1179
1180 #[cfg(not(unix))]
1181 {
1182 tokio::spawn(async move {
1183 let interrupted = tokio::signal::ctrl_c().await;
1184 if let Err(err) = interrupted {
1185 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1186 return;
1187 }
1188
1189 tracing::info!(
1190 signal = "Ctrl+C",
1191 "lifecycle signal received; shutting down"
1192 );
1193 match runtime.graceful_shutdown(backup_on_shutdown) {
1194 Ok(report) => {
1195 tracing::info!(
1196 duration_ms = report.duration_ms,
1197 flushed_wal = report.flushed_wal,
1198 final_checkpoint = report.final_checkpoint,
1199 backup_uploaded = report.backup_uploaded,
1200 "graceful shutdown complete"
1201 );
1202 }
1203 Err(err) => {
1204 tracing::error!(error = %err, "graceful shutdown failed");
1205 }
1206 }
1207 std::process::exit(0);
1208 });
1209 }
1210}
1211
1212fn handle_sighup_reload(runtime: &RedDBRuntime) {
1221 let now_ms = std::time::SystemTime::now()
1222 .duration_since(std::time::UNIX_EPOCH)
1223 .map(|d| d.as_millis() as u64)
1224 .unwrap_or(0);
1225 tracing::info!(
1226 target: "reddb::secrets",
1227 ts_unix_ms = now_ms,
1228 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1229 );
1230 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1235 runtime.audit_log().record_event(
1236 AuditEvent::builder("config/sighup_reload")
1237 .source(AuditAuthSource::System)
1238 .resource("secrets")
1239 .outcome(Outcome::Success)
1240 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1241 .build(),
1242 );
1243}
1244
1245#[inline(never)]
1246fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1247 let workers = config.workers;
1248 let cli_telemetry = config.telemetry.clone();
1249 let db_options = config.to_db_options()?;
1250 let rt_config = detect_runtime_config();
1251 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1252 let (runtime, auth_store, _telemetry_guard) =
1253 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1254 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1255
1256 spawn_admin_metrics_listeners(&runtime, &auth_store);
1257
1258 let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1259 .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1260 let http_backend = http_listener
1261 .local_addr()
1262 .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1263 let http_server = build_http_server(
1264 runtime.clone(),
1265 auth_store.clone(),
1266 http_backend.to_string(),
1267 );
1268 let http_server = apply_http_limits(http_server, &config, &runtime);
1269 let http_handle = http_server.serve_in_background_on(http_listener);
1270
1271 thread::sleep(Duration::from_millis(100));
1272 if http_handle.is_finished() {
1273 return match http_handle.join() {
1274 Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1275 Ok(Err(err)) => Err(err.to_string()),
1276 Err(_) => Err("HTTP backend thread panicked".to_string()),
1277 };
1278 }
1279
1280 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1281 .enable_all()
1282 .worker_threads(worker_threads)
1283 .thread_stack_size(rt_config.stack_size)
1284 .build()
1285 .map_err(|err| format!("tokio runtime: {err}"))?;
1286
1287 let signal_runtime = runtime.clone();
1288 tokio_runtime.block_on(async move {
1289 spawn_lifecycle_signal_handler(signal_runtime).await;
1290 let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1291 .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1292 let grpc_backend = grpc_listener
1293 .local_addr()
1294 .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1295 let grpc_server = RedDBGrpcServer::with_options(
1296 runtime.clone(),
1297 GrpcServerOptions {
1298 bind_addr: grpc_backend.to_string(),
1299 tls: None,
1300 },
1301 auth_store,
1302 );
1303 tokio::spawn(async move {
1304 if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1305 tracing::error!(err = %err, "gRPC backend error");
1306 }
1307 });
1308
1309 let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1310 .await
1311 .map_err(|err| format!("bind internal wire listener: {err}"))?;
1312 let wire_backend = wire_listener
1313 .local_addr()
1314 .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1315 let wire_rt = Arc::new(runtime);
1316 tokio::spawn(async move {
1317 if let Err(err) =
1318 crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1319 .await
1320 {
1321 tracing::error!(err = %err, "redwire backend error");
1322 }
1323 });
1324
1325 tracing::info!(
1326 bind = %router_bind_addr,
1327 cpus = rt_config.available_cpus,
1328 workers = worker_threads,
1329 "router bootstrapping"
1330 );
1331 serve_tcp_router(TcpProtocolRouterConfig {
1332 bind_addr: router_bind_addr,
1333 grpc_backend,
1334 http_backend,
1335 wire_backend,
1336 })
1337 .await
1338 .map_err(|err| err.to_string())
1339 })
1340}
1341
1342async fn spawn_wire_listeners(
1344 config: &ServerCommandConfig,
1345 runtime: &RedDBRuntime,
1346 readiness: &mut TransportReadiness,
1347) -> Result<(), String> {
1348 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1350 let wire_rt = Arc::new(runtime.clone());
1351 #[cfg(unix)]
1354 {
1355 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1356 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1357 tokio::spawn(async move {
1358 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1359 &wire_addr, wire_rt,
1360 )
1361 .await
1362 {
1363 tracing::error!(err = %e, "redwire unix listener error");
1364 }
1365 });
1366 return Ok(());
1367 }
1368 }
1369 match tokio::net::TcpListener::bind(&wire_addr).await {
1370 Ok(listener) => {
1371 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1372 tokio::spawn(async move {
1373 if let Err(e) =
1374 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1375 .await
1376 {
1377 tracing::error!(err = %e, "redwire listener error");
1378 }
1379 });
1380 }
1381 Err(err) => {
1382 let reason = format!("wire listener bind {wire_addr}: {err}");
1383 readiness.failed(
1384 "wire",
1385 &wire_addr,
1386 config.wire_bind_explicit,
1387 reason.clone(),
1388 );
1389 if config.wire_bind_explicit {
1390 tracing::error!(
1391 transport = "wire",
1392 bind = %wire_addr,
1393 error = %err,
1394 "fatal explicit bind failure"
1395 );
1396 return Err(format!("explicit {reason}"));
1397 }
1398 tracing::warn!(
1399 transport = "wire",
1400 bind = %wire_addr,
1401 error = %err,
1402 "non-fatal implicit bind failure; listener degraded"
1403 );
1404 }
1405 }
1406 }
1407
1408 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1410 let tls_config = resolve_wire_tls_config(config);
1411 match tls_config {
1412 Ok(tls_cfg) => {
1413 let wire_rt = Arc::new(runtime.clone());
1414 tokio::spawn(async move {
1415 if let Err(e) =
1416 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1417 .await
1418 {
1419 tracing::error!(err = %e, "redwire+tls listener error");
1420 }
1421 });
1422 }
1423 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1424 }
1425 }
1426 Ok(())
1427}
1428
1429fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1436 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1437 let rt = Arc::new(runtime.clone());
1438 tokio::spawn(async move {
1439 let cfg = crate::wire::PgWireConfig {
1440 bind_addr: pg_addr,
1441 ..Default::default()
1442 };
1443 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1444 tracing::error!(err = %e, "pg wire listener error");
1445 }
1446 });
1447 }
1448}
1449
1450fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1464 use crate::utils::secret_file::expand_file_env;
1465
1466 for var in [
1470 "REDDB_GRPC_TLS_CERT",
1471 "REDDB_GRPC_TLS_KEY",
1472 "REDDB_GRPC_TLS_CLIENT_CA",
1473 ] {
1474 if let Err(err) = expand_file_env(var) {
1475 tracing::warn!(
1476 target: "reddb::secrets",
1477 env = %var,
1478 err = %err,
1479 "could not expand *_FILE companion for gRPC TLS"
1480 );
1481 }
1482 }
1483
1484 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1485 (Some(cert), Some(key)) => {
1486 let cert_pem = std::fs::read(cert)
1487 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1488 let key_pem =
1489 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1490 (cert_pem, key_pem)
1491 }
1492 _ => {
1493 let dev = std::env::var("RED_GRPC_TLS_DEV")
1495 .ok()
1496 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1497 .unwrap_or(false);
1498 if !dev {
1499 return Err("gRPC TLS configured but no cert/key supplied — set \
1500 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1501 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1502 .to_string());
1503 }
1504 let dir = config
1505 .path
1506 .as_ref()
1507 .and_then(|p| p.parent())
1508 .map(PathBuf::from)
1509 .unwrap_or_else(|| PathBuf::from("."));
1510 let (cert_pem_str, key_pem_str) =
1511 crate::wire::tls::generate_self_signed_cert("localhost")
1512 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1513
1514 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1519 tracing::warn!(
1520 target: "reddb::security",
1521 transport = "grpc",
1522 cert_sha256 = %fp,
1523 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1524 DO NOT use in production"
1525 );
1526 let cert_path = dir.join("grpc-tls-cert.pem");
1528 let key_path = dir.join("grpc-tls-key.pem");
1529 if !cert_path.exists() || !key_path.exists() {
1530 let _ = std::fs::create_dir_all(&dir);
1531 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1532 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1533 std::fs::write(&key_path, key_pem_str.as_bytes())
1534 .map_err(|e| format!("write grpc dev key: {e}"))?;
1535 #[cfg(unix)]
1536 {
1537 use std::os::unix::fs::PermissionsExt;
1538 let _ =
1539 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1540 }
1541 }
1542 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1543 }
1544 };
1545
1546 let client_ca_pem = match &config.grpc_tls_client_ca {
1547 Some(path) => Some(
1548 std::fs::read(path)
1549 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1550 ),
1551 None => None,
1552 };
1553
1554 Ok(crate::GrpcTlsOptions {
1555 cert_pem,
1556 key_pem,
1557 client_ca_pem,
1558 })
1559}
1560
1561fn spawn_grpc_tls_listener_if_configured(
1565 config: &ServerCommandConfig,
1566 runtime: RedDBRuntime,
1567 auth_store: Arc<AuthStore>,
1568) {
1569 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1570 return;
1571 };
1572 let tls_opts = match resolve_grpc_tls_options(config) {
1573 Ok(opts) => opts,
1574 Err(err) => {
1575 tracing::error!(
1576 target: "reddb::security",
1577 transport = "grpc",
1578 err = %err,
1579 "gRPC TLS config error; TLS listener will not start"
1580 );
1581 return;
1582 }
1583 };
1584 tokio::spawn(async move {
1585 let server = RedDBGrpcServer::with_options(
1586 runtime,
1587 GrpcServerOptions {
1588 bind_addr: tls_bind.clone(),
1589 tls: Some(tls_opts),
1590 },
1591 auth_store,
1592 );
1593 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1594 if let Err(err) = server.serve().await {
1595 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1596 }
1597 });
1598}
1599
1600fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1603 use sha2::{Digest, Sha256};
1604 let mut h = Sha256::new();
1605 h.update(pem);
1606 let d = h.finalize();
1607 let mut buf = String::with_capacity(64);
1608 for b in d.iter() {
1609 buf.push_str(&format!("{b:02x}"));
1610 }
1611 buf
1612}
1613
1614fn resolve_wire_tls_config(
1616 config: &ServerCommandConfig,
1617) -> Result<crate::wire::WireTlsConfig, String> {
1618 match (&config.wire_tls_cert, &config.wire_tls_key) {
1619 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1620 cert_path: cert.clone(),
1621 key_path: key.clone(),
1622 }),
1623 _ => {
1624 let dir = config
1626 .path
1627 .as_ref()
1628 .and_then(|p| p.parent())
1629 .map(PathBuf::from)
1630 .unwrap_or_else(|| PathBuf::from("."));
1631 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1632 }
1633 }
1634}
1635
1636#[inline(never)]
1637fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1638 let rt_config = detect_runtime_config();
1639 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1640 let cli_telemetry = config.telemetry.clone();
1641 let db_options = config.to_db_options()?;
1642 let mut transport_readiness = TransportReadiness::default();
1643
1644 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1645 .enable_all()
1646 .worker_threads(workers)
1647 .thread_stack_size(rt_config.stack_size)
1648 .build()
1649 .map_err(|err| format!("tokio runtime: {err}"))?;
1650
1651 let (runtime, _auth_store, _telemetry_guard) =
1655 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1656 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1657 let signal_runtime = runtime.clone();
1658 tokio_runtime.block_on(async move {
1659 spawn_lifecycle_signal_handler(signal_runtime).await;
1660 spawn_pg_listener(&config, &runtime);
1661 let wire_rt = Arc::new(runtime);
1662 let listener = tokio::net::TcpListener::bind(&wire_addr)
1663 .await
1664 .map_err(|err| {
1665 let reason = format!("wire listener bind {wire_addr}: {err}");
1666 transport_readiness.failed(
1667 "wire",
1668 &wire_addr,
1669 config.wire_bind_explicit,
1670 reason.clone(),
1671 );
1672 if config.wire_bind_explicit {
1673 format!("explicit {reason}")
1674 } else {
1675 reason
1676 }
1677 })?;
1678 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1679 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1680 .await
1681 .map_err(|e| e.to_string())
1682 })
1683}
1684
1685#[inline(never)]
1686fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1687 let rt_config = detect_runtime_config();
1688 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1689 let cli_telemetry = config.telemetry.clone();
1690 let db_options = config.to_db_options()?;
1691
1692 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1693 .enable_all()
1694 .worker_threads(workers)
1695 .thread_stack_size(rt_config.stack_size)
1696 .build()
1697 .map_err(|err| format!("tokio runtime: {err}"))?;
1698
1699 let (runtime, _auth_store, _telemetry_guard) =
1700 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1701 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1702 let signal_runtime = runtime.clone();
1703 tokio_runtime.block_on(async move {
1704 spawn_lifecycle_signal_handler(signal_runtime).await;
1705 let cfg = crate::wire::PgWireConfig {
1706 bind_addr: pg_addr,
1707 ..Default::default()
1708 };
1709 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1710 .await
1711 .map_err(|e| e.to_string())
1712 })
1713}
1714
1715#[inline(never)]
1716fn build_runtime_and_auth_store(
1717 db_options: RedDBOptions,
1718 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1719) -> Result<
1720 (
1721 RedDBRuntime,
1722 Arc<AuthStore>,
1723 Option<crate::telemetry::TelemetryGuard>,
1724 ),
1725 String,
1726> {
1727 build_runtime_with_telemetry(db_options, cli_telemetry)
1734}
1735
1736pub(crate) fn build_runtime_with_telemetry(
1746 db_options: RedDBOptions,
1747 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1748) -> Result<
1749 (
1750 RedDBRuntime,
1751 Arc<AuthStore>,
1752 Option<crate::telemetry::TelemetryGuard>,
1753 ),
1754 String,
1755> {
1756 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1757 let msg = err.to_string();
1763 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1764 phase: "runtime_construction".to_string(),
1765 error: msg.clone(),
1766 }
1767 .emit_global();
1768 msg
1769 })?;
1770
1771 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1776 let msg = err.to_string();
1777 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1778 phase: "lease_loop".to_string(),
1779 error: msg.clone(),
1780 }
1781 .emit_global();
1782 msg
1783 })?;
1784
1785 if let Some(data_path) = db_options.data_path.as_deref() {
1789 let watch_dir = data_path.parent().unwrap_or(data_path);
1790 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1791 }
1792
1793 {
1797 let config_path = crate::runtime::config_overlay::config_file_path();
1798 let store = runtime.db().store();
1799 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1800 }
1801
1802 let merged = merge_telemetry_with_config(
1805 cli_telemetry
1806 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1807 &runtime,
1808 );
1809 let telemetry_guard = crate::telemetry::init(merged);
1810
1811 let auth_store =
1812 if db_options.auth.vault_enabled {
1813 let pager =
1814 runtime.db().store().pager().cloned().ok_or_else(|| {
1815 "vault requires a paged database (persistent mode)".to_string()
1816 })?;
1817 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1818 .map_err(|err| err.to_string())?;
1819 Arc::new(store)
1820 } else {
1821 Arc::new(AuthStore::new(db_options.auth.clone()))
1822 };
1823 auth_store.bootstrap_from_env();
1824
1825 {
1827 let store = Arc::clone(&auth_store);
1828 std::thread::Builder::new()
1829 .name("reddb-session-purge".into())
1830 .spawn(move || loop {
1831 std::thread::sleep(std::time::Duration::from_secs(300));
1832 store.purge_expired_sessions();
1833 })
1834 .ok();
1835 }
1836
1837 Ok((runtime, auth_store, telemetry_guard))
1838}
1839
1840fn merge_telemetry_with_config(
1851 mut cli: crate::telemetry::TelemetryConfig,
1852 runtime: &RedDBRuntime,
1853) -> crate::telemetry::TelemetryConfig {
1854 use crate::storage::schema::Value;
1855
1856 let store = runtime.db().store();
1857
1858 if !cli.level_explicit {
1859 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1860 cli.level_filter = v.to_string();
1861 }
1862 }
1863 if !cli.format_explicit {
1864 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1865 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1866 cli.format = parsed;
1867 }
1868 }
1869 }
1870 if !cli.rotation_keep_days_explicit {
1871 match store.get_config("red.logging.keep_days") {
1872 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1873 cli.rotation_keep_days = n as u16
1874 }
1875 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1876 cli.rotation_keep_days = n as u16
1877 }
1878 Some(Value::Text(v)) => {
1879 if let Ok(n) = v.parse::<u16>() {
1880 cli.rotation_keep_days = n;
1881 }
1882 }
1883 _ => {}
1884 }
1885 }
1886 if !cli.file_prefix_explicit {
1887 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1888 if !v.is_empty() {
1889 cli.file_prefix = v.to_string();
1890 }
1891 }
1892 }
1893 if !cli.log_dir_explicit && !cli.log_file_disabled {
1896 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1897 if !v.is_empty() {
1898 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1899 }
1900 }
1901 }
1902
1903 cli
1904}
1905
1906#[cfg(test)]
1907mod telemetry_merge_tests {
1908 use super::*;
1909 use crate::telemetry::{LogFormat, TelemetryConfig};
1910
1911 fn fresh_runtime() -> RedDBRuntime {
1912 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1913 }
1914
1915 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1916 runtime
1917 .db()
1918 .store()
1919 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1920 }
1921
1922 fn cli_base() -> TelemetryConfig {
1923 TelemetryConfig {
1926 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1927 format: LogFormat::Json,
1928 ..Default::default()
1929 }
1930 }
1931
1932 #[test]
1933 fn config_log_dir_promoted_when_flag_absent() {
1934 let runtime = fresh_runtime();
1935 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1936 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1937 assert_eq!(
1938 merged.log_dir.as_deref(),
1939 Some(std::path::Path::new("/var/log/reddb"))
1940 );
1941 }
1942
1943 #[test]
1944 fn explicit_log_dir_wins_over_config() {
1945 let runtime = fresh_runtime();
1946 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1947 let mut cli = cli_base();
1948 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1949 cli.log_dir_explicit = true;
1950 let merged = merge_telemetry_with_config(cli, &runtime);
1951 assert_eq!(
1952 merged.log_dir.as_deref(),
1953 Some(std::path::Path::new("/custom/dir"))
1954 );
1955 }
1956
1957 #[test]
1958 fn no_log_file_beats_config_log_dir() {
1959 let runtime = fresh_runtime();
1960 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1961 let mut cli = cli_base();
1962 cli.log_dir = None;
1963 cli.log_file_disabled = true;
1964 let merged = merge_telemetry_with_config(cli, &runtime);
1965 assert!(
1966 merged.log_dir.is_none(),
1967 "--no-log-file must veto config dir"
1968 );
1969 }
1970
1971 #[test]
1972 fn config_format_promoted_on_non_tty_default() {
1973 let runtime = fresh_runtime();
1977 set_str(&runtime, "red.logging.format", "pretty");
1978 let merged = merge_telemetry_with_config(cli_base(), &runtime);
1979 assert_eq!(merged.format, LogFormat::Pretty);
1980 }
1981
1982 #[test]
1983 fn explicit_format_wins_over_config() {
1984 let runtime = fresh_runtime();
1985 set_str(&runtime, "red.logging.format", "pretty");
1986 let mut cli = cli_base();
1987 cli.format = LogFormat::Json;
1988 cli.format_explicit = true;
1989 let merged = merge_telemetry_with_config(cli, &runtime);
1990 assert_eq!(merged.format, LogFormat::Json);
1991 }
1992}
1993
1994#[inline(never)]
1995fn build_http_server(
1996 runtime: RedDBRuntime,
1997 auth_store: Arc<AuthStore>,
1998 bind_addr: String,
1999) -> RedDBServer {
2000 build_http_server_with_transport_readiness(
2001 runtime,
2002 auth_store,
2003 bind_addr,
2004 TransportReadiness::default(),
2005 )
2006}
2007
2008fn apply_http_limits(
2014 server: RedDBServer,
2015 config: &ServerCommandConfig,
2016 runtime: &RedDBRuntime,
2017) -> RedDBServer {
2018 let store = runtime.db().store();
2019 let resolved =
2020 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2021 .get_config(key)
2022 {
2023 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2024 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2025 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2026 _ => None,
2027 });
2028 tracing::info!(
2029 target: "reddb::http_limits",
2030 max_handlers = resolved.max_handlers,
2031 handler_timeout_ms = resolved.handler_timeout_ms,
2032 retry_after_secs = resolved.retry_after_secs,
2033 "http_limits resolved"
2034 );
2035 server.with_http_limits(resolved)
2036}
2037
2038#[inline(never)]
2039fn build_http_server_with_transport_readiness(
2040 runtime: RedDBRuntime,
2041 auth_store: Arc<AuthStore>,
2042 bind_addr: String,
2043 transport_readiness: TransportReadiness,
2044) -> RedDBServer {
2045 RedDBServer::with_options(
2046 runtime,
2047 ServerOptions {
2048 bind_addr,
2049 transport_readiness,
2050 ..ServerOptions::default()
2051 },
2052 )
2053 .with_auth(auth_store)
2054}
2055
2056#[inline(never)]
2060fn build_admin_only_server(
2061 runtime: RedDBRuntime,
2062 auth_store: Arc<AuthStore>,
2063 bind_addr: String,
2064) -> RedDBServer {
2065 RedDBServer::with_options(
2066 runtime,
2067 ServerOptions {
2068 bind_addr,
2069 surface: crate::server::ServerSurface::AdminOnly,
2070 ..ServerOptions::default()
2071 },
2072 )
2073 .with_auth(auth_store)
2074}
2075
2076#[inline(never)]
2080fn build_metrics_only_server(
2081 runtime: RedDBRuntime,
2082 auth_store: Arc<AuthStore>,
2083 bind_addr: String,
2084) -> RedDBServer {
2085 RedDBServer::with_options(
2086 runtime,
2087 ServerOptions {
2088 bind_addr,
2089 surface: crate::server::ServerSurface::MetricsOnly,
2090 ..ServerOptions::default()
2091 },
2092 )
2093 .with_auth(auth_store)
2094}
2095
2096fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2100 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2101 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2102 let _ = server.serve_in_background();
2103 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2104 }
2105 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2106 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2107 let _ = server.serve_in_background();
2108 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2109 }
2110}
2111
2112#[inline(never)]
2113fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2114 let cli_telemetry = config.telemetry.clone();
2115 let mut transport_readiness = TransportReadiness::default();
2116 let Some(listener) = bind_listener_for_startup(
2117 &mut transport_readiness,
2118 "http",
2119 &bind_addr,
2120 config.http_bind_explicit,
2121 )?
2122 else {
2123 return Err(format!(
2124 "no HTTP listener started; implicit bind {} failed",
2125 bind_addr
2126 ));
2127 };
2128 let db_options = config.to_db_options()?;
2129 let (runtime, auth_store, _telemetry_guard) =
2130 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2131 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2132 spawn_admin_metrics_listeners(&runtime, &auth_store);
2133 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2134 let server = build_http_server_with_transport_readiness(
2135 runtime.clone(),
2136 auth_store,
2137 bind_addr.clone(),
2138 transport_readiness,
2139 );
2140 let server = apply_http_limits(server, &config, &runtime);
2141 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2142 server.serve_on(listener).map_err(|err| err.to_string())
2143}
2144
2145fn spawn_http_tls_listener(
2151 config: &ServerCommandConfig,
2152 runtime: &RedDBRuntime,
2153 auth_store: &Arc<AuthStore>,
2154) -> Result<(), String> {
2155 let Some(addr) = config.http_tls_bind_addr.clone() else {
2156 return Ok(());
2157 };
2158
2159 let tls_config = resolve_http_tls_config(config)?;
2160 let server_config = crate::server::tls::build_server_config(&tls_config)
2161 .map_err(|err| format!("HTTP TLS: {err}"))?;
2162
2163 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2164 let server = apply_http_limits(server, config, runtime);
2165 let _handle = server.serve_tls_in_background(server_config);
2166 tracing::info!(
2167 transport = "https",
2168 bind = %addr,
2169 mtls = %tls_config.client_ca_path.is_some(),
2170 "TLS listener online"
2171 );
2172 Ok(())
2173}
2174
2175fn resolve_http_tls_config(
2177 config: &ServerCommandConfig,
2178) -> Result<crate::server::tls::HttpTlsConfig, String> {
2179 match (&config.http_tls_cert, &config.http_tls_key) {
2180 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2181 cert_path: cert.clone(),
2182 key_path: key.clone(),
2183 client_ca_path: config.http_tls_client_ca.clone(),
2184 }),
2185 (None, None) => {
2186 let dir = config
2188 .path
2189 .as_ref()
2190 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2191 .unwrap_or_else(|| std::path::PathBuf::from("."));
2192 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2193 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2194 Ok(crate::server::tls::HttpTlsConfig {
2195 cert_path: auto.cert_path,
2196 key_path: auto.key_path,
2197 client_ca_path: config.http_tls_client_ca.clone(),
2198 })
2199 }
2200 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2201 }
2202}
2203
2204#[inline(never)]
2205fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2206 let workers = config.workers;
2207 let cli_telemetry = config.telemetry.clone();
2208 let db_options = config.to_db_options()?;
2209 let rt_config = detect_runtime_config();
2210 let mut transport_readiness = TransportReadiness::default();
2211 let Some(grpc_listener) = bind_listener_for_startup(
2212 &mut transport_readiness,
2213 "grpc",
2214 &bind_addr,
2215 config.grpc_bind_explicit,
2216 )?
2217 else {
2218 return Err(format!(
2219 "no gRPC listener started; implicit bind {} failed",
2220 bind_addr
2221 ));
2222 };
2223
2224 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2225
2226 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2227 .enable_all()
2228 .worker_threads(worker_threads)
2229 .thread_stack_size(rt_config.stack_size)
2230 .build()
2231 .map_err(|err| format!("tokio runtime: {err}"))?;
2232
2233 let (runtime, auth_store, _telemetry_guard) =
2235 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2236 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2237 let signal_runtime = runtime.clone();
2238 tokio_runtime.block_on(async move {
2239 spawn_lifecycle_signal_handler(signal_runtime).await;
2240 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2242
2243 spawn_pg_listener(&config, &runtime);
2245
2246 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2250
2251 let server = RedDBGrpcServer::with_options(
2252 runtime,
2253 GrpcServerOptions {
2254 bind_addr: bind_addr.clone(),
2255 tls: None,
2256 },
2257 auth_store,
2258 );
2259
2260 tracing::info!(
2261 transport = "grpc",
2262 bind = %bind_addr,
2263 cpus = rt_config.available_cpus,
2264 workers = worker_threads,
2265 "listener online"
2266 );
2267 server
2268 .serve_on(grpc_listener)
2269 .await
2270 .map_err(|err| err.to_string())
2271 })
2272}
2273
2274#[inline(never)]
2275fn run_dual_server(
2276 config: ServerCommandConfig,
2277 grpc_bind_addr: String,
2278 http_bind_addr: String,
2279) -> Result<(), String> {
2280 let workers = config.workers;
2281 let cli_telemetry = config.telemetry.clone();
2282 let db_options = config.to_db_options()?;
2283 let rt_config = detect_runtime_config();
2284 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2285 let mut transport_readiness = TransportReadiness::default();
2286 let http_listener = bind_listener_for_startup(
2287 &mut transport_readiness,
2288 "http",
2289 &http_bind_addr,
2290 config.http_bind_explicit,
2291 )?;
2292 let grpc_listener = bind_listener_for_startup(
2293 &mut transport_readiness,
2294 "grpc",
2295 &grpc_bind_addr,
2296 config.grpc_bind_explicit,
2297 )?;
2298 if http_listener.is_none() && grpc_listener.is_none() {
2299 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2300 }
2301 let (runtime, auth_store, _telemetry_guard) =
2302 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2303 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2304
2305 spawn_admin_metrics_listeners(&runtime, &auth_store);
2306 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2307
2308 let http_handle = if let Some(listener) = http_listener {
2309 let http_server = build_http_server_with_transport_readiness(
2310 runtime.clone(),
2311 auth_store.clone(),
2312 http_bind_addr.clone(),
2313 transport_readiness.clone(),
2314 );
2315 let http_server = apply_http_limits(http_server, &config, &runtime);
2316 Some(http_server.serve_in_background_on(listener))
2317 } else {
2318 None
2319 };
2320
2321 thread::sleep(Duration::from_millis(150));
2322 if let Some(handle) = http_handle.as_ref() {
2323 if handle.is_finished() {
2324 let handle = http_handle.unwrap();
2325 return match handle.join() {
2326 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2327 Ok(Err(err)) => Err(err.to_string()),
2328 Err(_) => Err("HTTP server thread panicked".to_string()),
2329 };
2330 }
2331 }
2332 if grpc_listener.is_none() {
2333 let Some(handle) = http_handle else {
2334 return Err("no listener started".to_string());
2335 };
2336 return match handle.join() {
2337 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2338 Ok(Err(err)) => Err(err.to_string()),
2339 Err(_) => Err("HTTP server thread panicked".to_string()),
2340 };
2341 }
2342 let grpc_listener = grpc_listener.expect("checked above");
2343
2344 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2345 .enable_all()
2346 .worker_threads(worker_threads)
2347 .thread_stack_size(rt_config.stack_size)
2348 .build()
2349 .map_err(|err| format!("tokio runtime: {err}"))?;
2350
2351 let signal_runtime = runtime.clone();
2352 tokio_runtime.block_on(async move {
2353 spawn_lifecycle_signal_handler(signal_runtime).await;
2354 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2356
2357 spawn_pg_listener(&config, &runtime);
2359
2360 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2362
2363 let server = RedDBGrpcServer::with_options(
2364 runtime,
2365 GrpcServerOptions {
2366 bind_addr: grpc_bind_addr.clone(),
2367 tls: None,
2368 },
2369 auth_store,
2370 );
2371
2372 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2373 tracing::info!(
2374 transport = "grpc",
2375 bind = %grpc_bind_addr,
2376 cpus = rt_config.available_cpus,
2377 workers = worker_threads,
2378 "listener online"
2379 );
2380 server
2381 .serve_on(grpc_listener)
2382 .await
2383 .map_err(|err| err.to_string())
2384 })
2385}
2386
2387#[cfg(test)]
2388mod tests {
2389 use super::*;
2390
2391 #[test]
2392 fn render_systemd_unit_contains_expected_execstart() {
2393 let config = SystemdServiceConfig {
2394 service_name: "reddb".to_string(),
2395 binary_path: PathBuf::from("/usr/local/bin/red"),
2396 run_user: "reddb".to_string(),
2397 run_group: "reddb".to_string(),
2398 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2399 router_bind_addr: None,
2400 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2401 http_bind_addr: None,
2402 };
2403
2404 let unit = render_systemd_unit(&config);
2405 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2406 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2407 }
2408
2409 #[test]
2410 fn systemd_service_config_derives_paths() {
2411 let config = SystemdServiceConfig {
2412 service_name: "reddb-api".to_string(),
2413 binary_path: PathBuf::from("/usr/local/bin/red"),
2414 run_user: "reddb".to_string(),
2415 run_group: "reddb".to_string(),
2416 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2417 router_bind_addr: None,
2418 grpc_bind_addr: None,
2419 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2420 };
2421
2422 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2423 assert_eq!(
2424 config.unit_path(),
2425 PathBuf::from("/etc/systemd/system/reddb-api.service")
2426 );
2427 }
2428
2429 #[test]
2430 fn render_systemd_unit_supports_dual_transport() {
2431 let config = SystemdServiceConfig {
2432 service_name: "reddb".to_string(),
2433 binary_path: PathBuf::from("/usr/local/bin/red"),
2434 run_user: "reddb".to_string(),
2435 run_group: "reddb".to_string(),
2436 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2437 router_bind_addr: None,
2438 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2439 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2440 };
2441
2442 let unit = render_systemd_unit(&config);
2443 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2444 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2445 }
2446
2447 #[test]
2448 fn render_systemd_unit_supports_router_mode() {
2449 let config = SystemdServiceConfig {
2450 service_name: "reddb".to_string(),
2451 binary_path: PathBuf::from("/usr/local/bin/red"),
2452 run_user: "reddb".to_string(),
2453 run_group: "reddb".to_string(),
2454 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2455 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2456 grpc_bind_addr: None,
2457 http_bind_addr: None,
2458 };
2459
2460 let unit = render_systemd_unit(&config);
2461 assert!(unit.contains("--bind 127.0.0.1:5050"));
2462 assert!(!unit.contains("--grpc-bind"));
2463 assert!(!unit.contains("--http-bind"));
2464 }
2465
2466 #[test]
2467 fn explicit_bind_collision_is_fatal() {
2468 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2469 let addr = held.local_addr().expect("held addr").to_string();
2470 let mut readiness = TransportReadiness::default();
2471
2472 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2473
2474 assert!(error.contains("explicit http listener bind"));
2475 assert_eq!(readiness.active.len(), 0);
2476 assert_eq!(readiness.failed.len(), 1);
2477 assert!(readiness.failed[0].explicit);
2478 assert_eq!(readiness.failed[0].bind_addr, addr);
2479 }
2480
2481 #[test]
2482 fn implicit_bind_collision_degrades() {
2483 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2484 let addr = held.local_addr().expect("held addr").to_string();
2485 let mut readiness = TransportReadiness::default();
2486
2487 let listener =
2488 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2489
2490 assert!(listener.is_none());
2491 assert_eq!(readiness.active.len(), 0);
2492 assert_eq!(readiness.failed.len(), 1);
2493 assert!(!readiness.failed[0].explicit);
2494 assert_eq!(readiness.failed[0].bind_addr, addr);
2495 }
2496}