1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub router_bind_explicit: bool,
71 pub grpc_bind_addr: Option<String>,
72 pub grpc_bind_explicit: bool,
73 pub grpc_tls_bind_addr: Option<String>,
77 pub grpc_tls_cert: Option<PathBuf>,
83 pub grpc_tls_key: Option<PathBuf>,
86 pub grpc_tls_client_ca: Option<PathBuf>,
91 pub http_bind_addr: Option<String>,
92 pub http_bind_explicit: bool,
93 pub http_tls_bind_addr: Option<String>,
97 pub http_tls_cert: Option<PathBuf>,
100 pub http_tls_key: Option<PathBuf>,
103 pub http_tls_client_ca: Option<PathBuf>,
107 pub wire_bind_addr: Option<String>,
108 pub wire_bind_explicit: bool,
109 pub wire_tls_bind_addr: Option<String>,
111 pub wire_tls_cert: Option<PathBuf>,
113 pub wire_tls_key: Option<PathBuf>,
115 pub pg_bind_addr: Option<String>,
119 pub create_if_missing: bool,
120 pub read_only: bool,
121 pub role: String,
122 pub primary_addr: Option<String>,
123 pub vault: bool,
124 pub no_auth: bool,
135 pub workers: Option<usize>,
137 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
140 pub http_limits_cli: crate::server::HttpLimitsCliInput,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct TransportListenerState {
149 pub transport: String,
150 pub bind_addr: String,
151 pub explicit: bool,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct TransportListenerFailure {
156 pub transport: String,
157 pub bind_addr: String,
158 pub explicit: bool,
159 pub reason: String,
160}
161
162#[derive(Debug, Clone, Default, PartialEq, Eq)]
163pub struct TransportReadiness {
164 pub active: Vec<TransportListenerState>,
165 pub failed: Vec<TransportListenerFailure>,
166}
167
168impl TransportReadiness {
169 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
170 self.active.push(TransportListenerState {
171 transport: transport.to_string(),
172 bind_addr: bind_addr.to_string(),
173 explicit,
174 });
175 }
176
177 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
178 self.failed.push(TransportListenerFailure {
179 transport: transport.to_string(),
180 bind_addr: bind_addr.to_string(),
181 explicit,
182 reason,
183 });
184 }
185}
186
187#[derive(Debug, Clone)]
188pub struct SystemdServiceConfig {
189 pub service_name: String,
190 pub binary_path: PathBuf,
191 pub run_user: String,
192 pub run_group: String,
193 pub data_path: PathBuf,
194 pub router_bind_addr: Option<String>,
195 pub grpc_bind_addr: Option<String>,
196 pub http_bind_addr: Option<String>,
197}
198
199impl SystemdServiceConfig {
200 pub fn data_dir(&self) -> PathBuf {
201 self.data_path
202 .parent()
203 .map(PathBuf::from)
204 .unwrap_or_else(|| PathBuf::from("."))
205 }
206
207 pub fn unit_path(&self) -> PathBuf {
208 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
209 }
210}
211
212pub fn default_telemetry_for_path(
217 path: Option<&std::path::Path>,
218) -> crate::telemetry::TelemetryConfig {
219 let log_dir = match path {
220 Some(p) => p
221 .parent()
222 .map(|parent| parent.join("logs"))
223 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
224 None => None, };
226 crate::telemetry::TelemetryConfig {
227 log_dir,
228 file_prefix: "reddb.log".to_string(),
229 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
230 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
231 crate::telemetry::LogFormat::Pretty
232 } else {
233 crate::telemetry::LogFormat::Json
234 },
235 rotation_keep_days: 14,
236 service_name: "reddb",
237 level_explicit: false,
239 format_explicit: false,
240 rotation_keep_days_explicit: false,
241 file_prefix_explicit: false,
242 log_dir_explicit: false,
243 log_file_disabled: false,
244 }
245}
246
247const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
254const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
255const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
256const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
260
261pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
269
270pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
273 options
274 .metadata
275 .get(NO_AUTH_META)
276 .map(|v| v == "true")
277 .unwrap_or(false)
278}
279
280const NO_AUTH_WARNING: &str =
285 "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
286
287impl ServerCommandConfig {
288 fn to_db_options(&self) -> Result<RedDBOptions, String> {
289 let mut options = match &self.path {
290 Some(path) => RedDBOptions::persistent(path),
291 None => RedDBOptions::in_memory(),
292 };
293
294 options.mode = StorageMode::Persistent;
295 options.create_if_missing = self.create_if_missing;
296 options.read_only = self.read_only
303 || env_nonempty("RED_READONLY")
304 .or_else(|| env_nonempty("REDDB_READONLY"))
305 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
306 .unwrap_or(false)
307 || self.path.as_ref().is_some_and(|data_path| {
308 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
309 data_path,
310 ))
311 .unwrap_or(false)
312 });
313
314 options.replication = match self.role.as_str() {
315 "primary" => ReplicationConfig::primary(),
316 "replica" => {
317 let primary_addr = self
318 .primary_addr
319 .clone()
320 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
321 ReplicationConfig::replica(primary_addr)
328 }
329 _ => ReplicationConfig::standalone(),
330 };
331
332 if self.vault {
333 options.auth.vault_enabled = true;
334 }
335
336 if self.no_auth {
347 options.auth.enabled = false;
348 options.auth.require_auth = false;
349 options.auth.vault_enabled = false;
350 options
351 .metadata
352 .insert(NO_AUTH_META.to_string(), "true".to_string());
353 }
354
355 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
360 Err(msg) => {
361 return Err(format!("backup bootstrap: {msg}"));
362 }
363 Ok(Some(cfg)) => {
364 apply_backup_config(&mut options, &cfg);
365 }
366 Ok(None) => {
367 configure_remote_backend_from_env(&mut options);
368 }
369 }
370
371 Ok(options)
372 }
373
374 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
375 let mut transports = Vec::with_capacity(3);
376 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
377 transports.push(ServerTransport::Grpc);
378 }
379 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
380 transports.push(ServerTransport::Http);
381 }
382 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
383 transports.push(ServerTransport::Wire);
384 }
385 transports
386 }
387}
388
389fn env_nonempty(name: &str) -> Option<String> {
394 crate::utils::env_with_file_fallback(name)
395}
396
397fn env_truthy(name: &str) -> bool {
398 env_nonempty(name)
399 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
400 .unwrap_or(false)
401}
402
403fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
409 let endpoint_host = endpoint_host(&cfg.endpoint);
410
411 options.metadata.insert(
412 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
413 cfg.checkpoint_interval_secs.to_string(),
414 );
415 options.metadata.insert(
416 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
417 cfg.wal_flush_interval_secs.to_string(),
418 );
419 options
420 .metadata
421 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
422 options.metadata.insert(
423 BACKUP_PAUSE_ON_LAG_META.to_string(),
424 cfg.pause_on_lag_secs.to_string(),
425 );
426
427 #[cfg(feature = "backend-s3")]
428 {
429 let s3_cfg = crate::storage::backend::S3Config {
430 endpoint: cfg.endpoint.clone(),
431 bucket: cfg.bucket.clone(),
432 key_prefix: cfg.prefix.clone(),
433 access_key: cfg.access_key_id.clone(),
434 secret_key: cfg.secret_access_key.clone(),
435 region: cfg.region.clone(),
436 path_style: true,
437 };
438 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
439 options.remote_backend = Some(backend.clone());
440 options.remote_backend_atomic = Some(backend);
441 let trimmed = cfg.prefix.trim_end_matches('/');
446 options.remote_key = Some(format!("{}/data.rdb", trimmed));
447
448 tracing::info!(
449 backend = "s3",
450 endpoint = %endpoint_host,
451 bucket = %cfg.bucket,
452 prefix = %cfg.prefix,
453 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
454 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
455 "backup backend configured from REDDB_BACKUP_* env"
456 );
457 }
458
459 #[cfg(not(feature = "backend-s3"))]
460 {
461 tracing::warn!(
462 backend = "s3",
463 endpoint = %endpoint_host,
464 bucket = %cfg.bucket,
465 prefix = %cfg.prefix,
466 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
467 backend wiring skipped (archiver/checkpointer also disabled)"
468 );
469 }
470}
471
472fn endpoint_host(endpoint: &str) -> &str {
473 let after_scheme = endpoint
474 .split_once("://")
475 .map(|(_, r)| r)
476 .unwrap_or(endpoint);
477 after_scheme.split('/').next().unwrap_or(after_scheme)
478}
479
480fn spawn_backup_tasks_if_configured(
486 options: &RedDBOptions,
487 runtime: &RedDBRuntime,
488) -> Option<BackupTasksHandle> {
489 let checkpoint_secs: u64 = options
490 .metadata
491 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
492 .parse()
493 .ok()?;
494 let wal_secs: u64 = options
495 .metadata
496 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
497 .parse()
498 .ok()?;
499 let pause_on_lag_secs: u64 = options
502 .metadata
503 .get(BACKUP_PAUSE_ON_LAG_META)
504 .and_then(|raw| raw.parse().ok())
505 .unwrap_or(0);
506 options.remote_backend.as_ref()?;
507
508 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
509
510 if pause_on_lag_secs > 0 {
515 let now_ms = std::time::SystemTime::now()
516 .duration_since(std::time::UNIX_EPOCH)
517 .map(|d| d.as_millis() as u64)
518 .unwrap_or(0);
519 runtime
520 .write_gate()
521 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
522 tracing::info!(
523 pause_on_lag_secs,
524 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
525 );
526 }
527
528 let checkpoint_handle = {
529 let stop = Arc::clone(&stop);
530 let runtime = runtime.clone();
531 let interval = Duration::from_secs(checkpoint_secs);
532 thread::Builder::new()
533 .name("red-checkpointer".into())
534 .spawn(move || {
535 periodic_loop(stop, interval, move || {
536 if let Err(err) = runtime.checkpoint() {
537 tracing::warn!(error = %err, "periodic checkpoint failed");
538 }
539 })
540 })
541 .ok()
542 };
543
544 let archiver_handle = {
545 let stop = Arc::clone(&stop);
546 let runtime = runtime.clone();
547 let interval = Duration::from_secs(wal_secs);
548 let lag_enabled = pause_on_lag_secs > 0;
549 thread::Builder::new()
550 .name("red-wal-archiver".into())
551 .spawn(move || {
552 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
553 Ok(_) if lag_enabled => {
554 let now_ms = std::time::SystemTime::now()
555 .duration_since(std::time::UNIX_EPOCH)
556 .map(|d| d.as_millis() as u64)
557 .unwrap_or(0);
558 runtime.write_gate().record_archive_success(now_ms);
559 runtime.write_gate().evaluate_archive_lag(now_ms);
563 }
564 Ok(_) => {}
565 Err(err) => {
566 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
567 }
568 })
569 })
570 .ok()
571 };
572
573 let lag_monitor_handle = if pause_on_lag_secs > 0 {
578 let stop = Arc::clone(&stop);
579 let runtime = runtime.clone();
580 let interval = Duration::from_secs(5);
584 thread::Builder::new()
585 .name("red-archive-lag-monitor".into())
586 .spawn(move || {
587 periodic_loop(stop, interval, move || {
588 let now_ms = std::time::SystemTime::now()
589 .duration_since(std::time::UNIX_EPOCH)
590 .map(|d| d.as_millis() as u64)
591 .unwrap_or(0);
592 let was_paused = runtime.write_gate().is_auto_paused();
593 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
594 if now_paused && !was_paused {
595 tracing::warn!(
596 pause_on_lag_secs,
597 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
598 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
599 );
600 } else if !now_paused && was_paused {
601 tracing::info!(
602 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
603 );
604 }
605 })
606 })
607 .ok()
608 } else {
609 None
610 };
611
612 tracing::info!(
613 checkpoint_interval_secs = checkpoint_secs,
614 wal_flush_interval_secs = wal_secs,
615 "backup tasks spawned (checkpointer + WAL archiver)"
616 );
617
618 Some(BackupTasksHandle {
619 stop,
620 _checkpoint_handle: checkpoint_handle,
621 _archiver_handle: archiver_handle,
622 _lag_monitor_handle: lag_monitor_handle,
623 })
624}
625
626pub struct BackupTasksHandle {
629 stop: Arc<std::sync::atomic::AtomicBool>,
630 _checkpoint_handle: Option<thread::JoinHandle<()>>,
631 _archiver_handle: Option<thread::JoinHandle<()>>,
632 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
635}
636
637impl Drop for BackupTasksHandle {
638 fn drop(&mut self) {
639 self.stop.store(true, std::sync::atomic::Ordering::Release);
640 }
641}
642
643fn periodic_loop<F: FnMut()>(
644 stop: Arc<std::sync::atomic::AtomicBool>,
645 interval: Duration,
646 mut tick: F,
647) {
648 let wake = Duration::from_secs(1);
651 let mut elapsed = Duration::ZERO;
652 while !stop.load(std::sync::atomic::Ordering::Acquire) {
653 thread::sleep(wake);
654 elapsed += wake;
655 if elapsed >= interval {
656 tick();
657 elapsed = Duration::ZERO;
658 }
659 }
660}
661
662fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
663 let backend = env_nonempty("RED_BACKEND")
669 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
670 .unwrap_or_else(|| "none".to_string())
671 .to_ascii_lowercase();
672
673 match backend.as_str() {
674 "s3" | "minio" | "r2" => {
679 #[cfg(feature = "backend-s3")]
680 {
681 if let Some(config) = s3_config_from_env() {
682 let remote_key = env_nonempty("RED_REMOTE_KEY")
683 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
684 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
685 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
686 options.remote_backend = Some(backend.clone());
687 options.remote_backend_atomic = Some(backend);
688 options.remote_key = Some(remote_key);
689 }
690 }
691 #[cfg(not(feature = "backend-s3"))]
692 {
693 tracing::warn!(
694 backend = %backend,
695 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
696 );
697 }
698 }
699 "fs" | "local" => {
704 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
705 let remote_key = match (
706 base_path,
707 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
708 ) {
709 (Some(base), Some(rel)) => Some(format!(
710 "{}/{}",
711 base.trim_end_matches('/'),
712 rel.trim_start_matches('/')
713 )),
714 (Some(base), None) => Some(format!(
715 "{}/clusters/dev/data.rdb",
716 base.trim_end_matches('/')
717 )),
718 (None, Some(rel)) => Some(rel),
719 (None, None) => None,
720 };
721 if let Some(key) = remote_key {
722 let backend = Arc::new(crate::storage::backend::LocalBackend);
723 options.remote_backend = Some(backend.clone());
724 options.remote_backend_atomic = Some(backend);
725 options.remote_key = Some(key);
726 }
727 }
728 "http" => {
733 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
734 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
735 {
736 Some(u) => u,
737 None => {
738 tracing::warn!(
739 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
740 );
741 return;
742 }
743 };
744 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
745 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
746 .unwrap_or_default();
747 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
748 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
749 {
750 std::fs::read_to_string(&path)
751 .ok()
752 .map(|s| s.trim().to_string())
753 .filter(|s| !s.is_empty())
754 } else {
755 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
756 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
757 };
758
759 let mut config =
760 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
761 if let Some(auth) = auth_header {
762 config = config.with_auth_header(auth);
763 }
764 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
765 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
766 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
767 config = config.with_conditional_writes(conditional_writes);
768 if conditional_writes {
773 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
774 Ok(atomic) => {
775 let atomic_arc = Arc::new(atomic);
776 options.remote_backend = Some(atomic_arc.clone());
777 options.remote_backend_atomic = Some(atomic_arc);
778 }
779 Err(err) => {
780 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
781 options.remote_backend =
782 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
783 }
784 }
785 } else {
786 options.remote_backend =
787 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
788 }
789 options.remote_key = env_nonempty("RED_REMOTE_KEY")
790 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
791 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
792 }
793 "none" | "" => {}
796 other => {
797 tracing::warn!(
798 backend = %other,
799 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
800 );
801 }
802 }
803}
804
805#[cfg(feature = "backend-s3")]
810fn env_s3(suffix: &str) -> Option<String> {
811 env_nonempty(&format!("RED_S3_{suffix}"))
812 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
813}
814
815#[cfg(feature = "backend-s3")]
821fn env_s3_secret(suffix: &str) -> Option<String> {
822 let file_key_red = format!("RED_S3_{suffix}_FILE");
823 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
824 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
825 return std::fs::read_to_string(&path)
826 .ok()
827 .map(|s| s.trim().to_string())
828 .filter(|s| !s.is_empty());
829 }
830 env_s3(suffix)
831}
832
833#[cfg(feature = "backend-s3")]
834fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
835 let endpoint = env_s3("ENDPOINT")?;
836 let bucket = env_s3("BUCKET")?;
837 let access_key = env_s3_secret("ACCESS_KEY")?;
838 let secret_key = env_s3_secret("SECRET_KEY")?;
839 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
840 let key_prefix = env_s3("KEY_PREFIX")
841 .or_else(|| env_s3("PREFIX"))
842 .unwrap_or_default();
843 let path_style = env_s3("PATH_STYLE")
844 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
845 .unwrap_or(true);
846 Some(crate::storage::backend::S3Config {
847 endpoint,
848 bucket,
849 key_prefix,
850 access_key,
851 secret_key,
852 region,
853 path_style,
854 })
855}
856
857pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
858 let data_dir = config.data_dir();
859 let exec_start = render_systemd_exec_start(config);
860 format!(
861 "[Unit]\n\
862Description=RedDB unified database service\n\
863After=network-online.target\n\
864Wants=network-online.target\n\
865\n\
866[Service]\n\
867Type=simple\n\
868User={user}\n\
869Group={group}\n\
870WorkingDirectory={workdir}\n\
871ExecStart={exec_start}\n\
872Restart=always\n\
873RestartSec=2\n\
874LimitSTACK=16M\n\
875NoNewPrivileges=true\n\
876PrivateTmp=true\n\
877ProtectSystem=strict\n\
878ProtectHome=true\n\
879ProtectControlGroups=true\n\
880ProtectKernelTunables=true\n\
881ProtectKernelModules=true\n\
882RestrictNamespaces=true\n\
883LockPersonality=true\n\
884MemoryDenyWriteExecute=true\n\
885ReadWritePaths={workdir}\n\
886\n\
887[Install]\n\
888WantedBy=multi-user.target\n",
889 user = config.run_user,
890 group = config.run_group,
891 workdir = data_dir.display(),
892 exec_start = exec_start,
893 )
894}
895
896#[cfg(target_os = "linux")]
905pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
906 ensure_root()?;
907 ensure_command_available("systemctl")?;
908 ensure_command_available("getent")?;
909 ensure_command_available("groupadd")?;
910 ensure_command_available("useradd")?;
911 ensure_command_available("install")?;
912 ensure_executable(&config.binary_path)?;
913
914 if !command_success("getent", ["group", config.run_group.as_str()])? {
915 run_command("groupadd", ["--system", config.run_group.as_str()])?;
916 }
917
918 if !command_success("id", ["-u", config.run_user.as_str()])? {
919 let data_dir = config.data_dir();
920 run_command(
921 "useradd",
922 [
923 "--system",
924 "--gid",
925 config.run_group.as_str(),
926 "--home-dir",
927 data_dir.to_string_lossy().as_ref(),
928 "--shell",
929 "/usr/sbin/nologin",
930 config.run_user.as_str(),
931 ],
932 )?;
933 }
934
935 let data_dir = config.data_dir();
936 run_command(
937 "install",
938 [
939 "-d",
940 "-o",
941 config.run_user.as_str(),
942 "-g",
943 config.run_group.as_str(),
944 "-m",
945 "0750",
946 data_dir.to_string_lossy().as_ref(),
947 ],
948 )?;
949
950 std::fs::write(config.unit_path(), render_systemd_unit(config))
951 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
952
953 run_command("systemctl", ["daemon-reload"])?;
954 run_command(
955 "systemctl",
956 [
957 "enable",
958 "--now",
959 format!("{}.service", config.service_name).as_str(),
960 ],
961 )?;
962
963 Ok(())
964}
965
966#[cfg(not(target_os = "linux"))]
971pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
972 Err("systemd install is Linux-only — use sc.exe (Windows) or \
973 launchd (macOS) to install the service manually using the \
974 unit printed by `red service print-unit`"
975 .to_string())
976}
977
978#[cfg(target_os = "linux")]
979fn ensure_root() -> Result<(), String> {
980 let output = Command::new("id")
981 .arg("-u")
982 .output()
983 .map_err(|err| format!("failed to determine current uid: {err}"))?;
984 if !output.status.success() {
985 return Err("failed to determine current uid".to_string());
986 }
987 let uid = String::from_utf8_lossy(&output.stdout);
988 if uid.trim() != "0" {
989 return Err("run this command as root (sudo)".to_string());
990 }
991 Ok(())
992}
993
994#[cfg(target_os = "linux")]
995fn ensure_command_available(command: &str) -> Result<(), String> {
996 let status = Command::new("sh")
997 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
998 .status()
999 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1000 if status.success() {
1001 Ok(())
1002 } else {
1003 Err(format!("required command not found: {command}"))
1004 }
1005}
1006
1007#[cfg(target_os = "linux")]
1008fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1009 let metadata = std::fs::metadata(path)
1010 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1011 #[cfg(unix)]
1012 {
1013 use std::os::unix::fs::PermissionsExt;
1014 if metadata.permissions().mode() & 0o111 == 0 {
1015 return Err(format!("binary is not executable: {}", path.display()));
1016 }
1017 }
1018 #[cfg(not(unix))]
1019 {
1020 if !metadata.is_file() {
1021 return Err(format!("binary is not a file: {}", path.display()));
1022 }
1023 }
1024 Ok(())
1025}
1026
1027#[cfg(target_os = "linux")]
1028fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1029 Command::new(program)
1030 .args(args)
1031 .status()
1032 .map(|status| status.success())
1033 .map_err(|err| format!("failed to run {program}: {err}"))
1034}
1035
1036#[cfg(target_os = "linux")]
1037fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1038 let output = Command::new(program)
1039 .args(args)
1040 .output()
1041 .map_err(|err| format!("failed to run {program}: {err}"))?;
1042 if output.status.success() {
1043 return Ok(());
1044 }
1045
1046 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1047 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1048 let detail = if !stderr.is_empty() {
1049 stderr
1050 } else if !stdout.is_empty() {
1051 stdout
1052 } else {
1053 format!("exit status {}", output.status)
1054 };
1055 Err(format!("{program} failed: {detail}"))
1056}
1057
1058pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1059 let has_any = config.router_bind_addr.is_some()
1060 || config.grpc_bind_addr.is_some()
1061 || config.http_bind_addr.is_some()
1062 || config.wire_bind_addr.is_some()
1063 || config.pg_bind_addr.is_some();
1064 if !has_any {
1065 return Err("at least one server bind address must be configured".into());
1066 }
1067 let thread_name = if config.router_bind_addr.is_some() {
1068 "red-server-router"
1069 } else {
1070 match (
1071 config.grpc_bind_addr.is_some(),
1072 config.http_bind_addr.is_some(),
1073 ) {
1074 (true, true) => "red-server-dual",
1075 (true, false) => "red-server-grpc",
1076 (false, true) => "red-server-http",
1077 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1078 (false, false) => "red-server-pg-wire",
1079 }
1080 };
1081
1082 let handle = thread::Builder::new()
1083 .name(thread_name.into())
1084 .stack_size(8 * 1024 * 1024)
1085 .spawn(move || run_configured_servers(config))
1086 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1087
1088 match handle.join() {
1089 Ok(result) => result,
1090 Err(_) => Err("server thread panicked".to_string()),
1091 }
1092}
1093
1094fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1095 let mut parts = vec![
1096 config.binary_path.display().to_string(),
1097 "server".to_string(),
1098 "--path".to_string(),
1099 config.data_path.display().to_string(),
1100 ];
1101
1102 if let Some(bind_addr) = &config.router_bind_addr {
1103 parts.push("--bind".to_string());
1104 parts.push(bind_addr.clone());
1105 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1106 parts.push("--grpc-bind".to_string());
1107 parts.push(bind_addr.clone());
1108 }
1109 if let Some(bind_addr) = &config.http_bind_addr {
1110 parts.push("--http-bind".to_string());
1111 parts.push(bind_addr.clone());
1112 }
1113
1114 parts.join(" ")
1115}
1116
1117pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1118 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1119 Ok(addresses) => addresses.collect(),
1120 Err(_) => return false,
1121 };
1122
1123 addresses
1124 .into_iter()
1125 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1126}
1127
1128#[inline(never)]
1129fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1130 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1136 return run_routed_server(config, router_bind_addr);
1137 }
1138
1139 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1140 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1141 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1142 }
1143 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1144 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1145 (None, None) => {
1146 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1147 run_wire_only_server(config, wire_addr)
1148 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1149 run_pg_only_server(config, pg_addr)
1150 } else {
1151 Err("at least one server bind address must be configured".to_string())
1152 }
1153 }
1154 }
1155}
1156
1157pub fn bind_listener_for_startup(
1175 readiness: &mut TransportReadiness,
1176 transport: &str,
1177 bind_addr: &str,
1178 explicit: bool,
1179) -> Result<Option<TcpListener>, String> {
1180 match TcpListener::bind(bind_addr) {
1181 Ok(listener) => {
1182 readiness.active(transport, bind_addr, explicit);
1183 Ok(Some(listener))
1184 }
1185 Err(err) => {
1186 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1187 readiness.failed(transport, bind_addr, explicit, reason.clone());
1188 if explicit {
1189 tracing::error!(
1190 transport,
1191 bind = %bind_addr,
1192 error = %err,
1193 "fatal explicit bind failure"
1194 );
1195 Err(format!("explicit {reason}"))
1196 } else {
1197 tracing::warn!(
1198 transport,
1199 bind = %bind_addr,
1200 error = %err,
1201 "non-fatal implicit bind failure; listener degraded"
1202 );
1203 Ok(None)
1204 }
1205 }
1206 }
1207}
1208
1209async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1232 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1233 .ok()
1234 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1235 .unwrap_or(true);
1236
1237 #[cfg(unix)]
1238 {
1239 use tokio::signal::unix::{signal, SignalKind};
1240
1241 let mut sigterm = match signal(SignalKind::terminate()) {
1242 Ok(s) => s,
1243 Err(err) => {
1244 tracing::warn!(
1245 error = %err,
1246 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1247 );
1248 return;
1249 }
1250 };
1251 let mut sigint = match signal(SignalKind::interrupt()) {
1252 Ok(s) => s,
1253 Err(err) => {
1254 tracing::warn!(error = %err, "could not install SIGINT handler");
1255 return;
1256 }
1257 };
1258 let mut sighup = match signal(SignalKind::hangup()) {
1264 Ok(s) => Some(s),
1265 Err(err) => {
1266 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1267 None
1268 }
1269 };
1270
1271 let reload_runtime = runtime.clone();
1272 tokio::spawn(async move {
1273 loop {
1274 let signal_name = match &mut sighup {
1275 Some(hup) => tokio::select! {
1276 _ = sigterm.recv() => "SIGTERM",
1277 _ = sigint.recv() => "SIGINT",
1278 _ = hup.recv() => "SIGHUP",
1279 },
1280 None => tokio::select! {
1281 _ = sigterm.recv() => "SIGTERM",
1282 _ = sigint.recv() => "SIGINT",
1283 },
1284 };
1285
1286 if signal_name == "SIGHUP" {
1287 handle_sighup_reload(&reload_runtime);
1288 continue; }
1290
1291 tracing::info!(
1292 signal = signal_name,
1293 "lifecycle signal received; shutting down"
1294 );
1295 match runtime.graceful_shutdown(backup_on_shutdown) {
1296 Ok(report) => {
1297 tracing::info!(
1298 duration_ms = report.duration_ms,
1299 flushed_wal = report.flushed_wal,
1300 final_checkpoint = report.final_checkpoint,
1301 backup_uploaded = report.backup_uploaded,
1302 "graceful shutdown complete"
1303 );
1304 }
1305 Err(err) => {
1306 tracing::error!(error = %err, "graceful shutdown failed");
1307 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1313 reason: format!("graceful shutdown failed: {err}"),
1314 }
1315 .emit_global();
1316 }
1317 }
1318 std::process::exit(0);
1319 }
1320 });
1321 }
1322
1323 #[cfg(not(unix))]
1324 {
1325 tokio::spawn(async move {
1326 let interrupted = tokio::signal::ctrl_c().await;
1327 if let Err(err) = interrupted {
1328 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1329 return;
1330 }
1331
1332 tracing::info!(
1333 signal = "Ctrl+C",
1334 "lifecycle signal received; shutting down"
1335 );
1336 match runtime.graceful_shutdown(backup_on_shutdown) {
1337 Ok(report) => {
1338 tracing::info!(
1339 duration_ms = report.duration_ms,
1340 flushed_wal = report.flushed_wal,
1341 final_checkpoint = report.final_checkpoint,
1342 backup_uploaded = report.backup_uploaded,
1343 "graceful shutdown complete"
1344 );
1345 }
1346 Err(err) => {
1347 tracing::error!(error = %err, "graceful shutdown failed");
1348 }
1349 }
1350 std::process::exit(0);
1351 });
1352 }
1353}
1354
1355fn handle_sighup_reload(runtime: &RedDBRuntime) {
1364 let now_ms = std::time::SystemTime::now()
1365 .duration_since(std::time::UNIX_EPOCH)
1366 .map(|d| d.as_millis() as u64)
1367 .unwrap_or(0);
1368 tracing::info!(
1369 target: "reddb::secrets",
1370 ts_unix_ms = now_ms,
1371 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1372 );
1373 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1378 runtime.audit_log().record_event(
1379 AuditEvent::builder("config/sighup_reload")
1380 .source(AuditAuthSource::System)
1381 .resource("secrets")
1382 .outcome(Outcome::Success)
1383 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1384 .build(),
1385 );
1386}
1387
1388#[inline(never)]
1389fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1390 let workers = config.workers;
1391 let cli_telemetry = config.telemetry.clone();
1392 let db_options = config.to_db_options()?;
1393 let rt_config = detect_runtime_config();
1394 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1395 let (runtime, auth_store, _telemetry_guard) =
1396 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1397 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1398
1399 spawn_admin_metrics_listeners(&runtime, &auth_store);
1400
1401 let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1402 .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1403 let http_backend = http_listener
1404 .local_addr()
1405 .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1406 let http_server = build_http_server(
1407 runtime.clone(),
1408 auth_store.clone(),
1409 http_backend.to_string(),
1410 );
1411 let http_server = apply_http_limits(http_server, &config, &runtime);
1412 let http_handle = http_server.serve_in_background_on(http_listener);
1413
1414 thread::sleep(Duration::from_millis(100));
1415 if http_handle.is_finished() {
1416 return match http_handle.join() {
1417 Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1418 Ok(Err(err)) => Err(err.to_string()),
1419 Err(_) => Err("HTTP backend thread panicked".to_string()),
1420 };
1421 }
1422
1423 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1424 .enable_all()
1425 .worker_threads(worker_threads)
1426 .thread_stack_size(rt_config.stack_size)
1427 .build()
1428 .map_err(|err| format!("tokio runtime: {err}"))?;
1429
1430 let signal_runtime = runtime.clone();
1431 tokio_runtime.block_on(async move {
1432 spawn_lifecycle_signal_handler(signal_runtime).await;
1433 let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1434 .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1435 let grpc_backend = grpc_listener
1436 .local_addr()
1437 .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1438 let grpc_server = RedDBGrpcServer::with_options(
1439 runtime.clone(),
1440 GrpcServerOptions {
1441 bind_addr: grpc_backend.to_string(),
1442 tls: None,
1443 },
1444 auth_store,
1445 );
1446 tokio::spawn(async move {
1447 if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1448 tracing::error!(err = %err, "gRPC backend error");
1449 }
1450 });
1451
1452 let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1453 .await
1454 .map_err(|err| format!("bind internal wire listener: {err}"))?;
1455 let wire_backend = wire_listener
1456 .local_addr()
1457 .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1458 let wire_rt = Arc::new(runtime);
1459 tokio::spawn(async move {
1460 if let Err(err) =
1461 crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1462 .await
1463 {
1464 tracing::error!(err = %err, "redwire backend error");
1465 }
1466 });
1467
1468 tracing::info!(
1469 bind = %router_bind_addr,
1470 cpus = rt_config.available_cpus,
1471 workers = worker_threads,
1472 "router bootstrapping"
1473 );
1474 serve_tcp_router(TcpProtocolRouterConfig {
1475 bind_addr: router_bind_addr,
1476 grpc_backend,
1477 http_backend,
1478 wire_backend,
1479 })
1480 .await
1481 .map_err(|err| err.to_string())
1482 })
1483}
1484
1485async fn spawn_wire_listeners(
1487 config: &ServerCommandConfig,
1488 runtime: &RedDBRuntime,
1489 readiness: &mut TransportReadiness,
1490) -> Result<(), String> {
1491 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1493 let wire_rt = Arc::new(runtime.clone());
1494 #[cfg(unix)]
1497 {
1498 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1499 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1500 tokio::spawn(async move {
1501 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1502 &wire_addr, wire_rt,
1503 )
1504 .await
1505 {
1506 tracing::error!(err = %e, "redwire unix listener error");
1507 }
1508 });
1509 return Ok(());
1510 }
1511 }
1512 match tokio::net::TcpListener::bind(&wire_addr).await {
1513 Ok(listener) => {
1514 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1515 tokio::spawn(async move {
1516 if let Err(e) =
1517 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1518 .await
1519 {
1520 tracing::error!(err = %e, "redwire listener error");
1521 }
1522 });
1523 }
1524 Err(err) => {
1525 let reason = format!("wire listener bind {wire_addr}: {err}");
1526 readiness.failed(
1527 "wire",
1528 &wire_addr,
1529 config.wire_bind_explicit,
1530 reason.clone(),
1531 );
1532 if config.wire_bind_explicit {
1533 tracing::error!(
1534 transport = "wire",
1535 bind = %wire_addr,
1536 error = %err,
1537 "fatal explicit bind failure"
1538 );
1539 return Err(format!("explicit {reason}"));
1540 }
1541 tracing::warn!(
1542 transport = "wire",
1543 bind = %wire_addr,
1544 error = %err,
1545 "non-fatal implicit bind failure; listener degraded"
1546 );
1547 }
1548 }
1549 }
1550
1551 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1553 let tls_config = resolve_wire_tls_config(config);
1554 match tls_config {
1555 Ok(tls_cfg) => {
1556 let wire_rt = Arc::new(runtime.clone());
1557 tokio::spawn(async move {
1558 if let Err(e) =
1559 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1560 .await
1561 {
1562 tracing::error!(err = %e, "redwire+tls listener error");
1563 }
1564 });
1565 }
1566 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1567 }
1568 }
1569 Ok(())
1570}
1571
1572fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1579 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1580 let rt = Arc::new(runtime.clone());
1581 tokio::spawn(async move {
1582 let cfg = crate::wire::PgWireConfig {
1583 bind_addr: pg_addr,
1584 ..Default::default()
1585 };
1586 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1587 tracing::error!(err = %e, "pg wire listener error");
1588 }
1589 });
1590 }
1591}
1592
1593fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1607 use crate::utils::secret_file::expand_file_env;
1608
1609 for var in [
1613 "REDDB_GRPC_TLS_CERT",
1614 "REDDB_GRPC_TLS_KEY",
1615 "REDDB_GRPC_TLS_CLIENT_CA",
1616 ] {
1617 if let Err(err) = expand_file_env(var) {
1618 tracing::warn!(
1619 target: "reddb::secrets",
1620 env = %var,
1621 err = %err,
1622 "could not expand *_FILE companion for gRPC TLS"
1623 );
1624 }
1625 }
1626
1627 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1628 (Some(cert), Some(key)) => {
1629 let cert_pem = std::fs::read(cert)
1630 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1631 let key_pem =
1632 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1633 (cert_pem, key_pem)
1634 }
1635 _ => {
1636 let dev = std::env::var("RED_GRPC_TLS_DEV")
1638 .ok()
1639 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1640 .unwrap_or(false);
1641 if !dev {
1642 return Err("gRPC TLS configured but no cert/key supplied — set \
1643 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1644 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1645 .to_string());
1646 }
1647 let dir = config
1648 .path
1649 .as_ref()
1650 .and_then(|p| p.parent())
1651 .map(PathBuf::from)
1652 .unwrap_or_else(|| PathBuf::from("."));
1653 let (cert_pem_str, key_pem_str) =
1654 crate::wire::tls::generate_self_signed_cert("localhost")
1655 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1656
1657 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1662 tracing::warn!(
1663 target: "reddb::security",
1664 transport = "grpc",
1665 cert_sha256 = %fp,
1666 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1667 DO NOT use in production"
1668 );
1669 let cert_path = dir.join("grpc-tls-cert.pem");
1671 let key_path = dir.join("grpc-tls-key.pem");
1672 if !cert_path.exists() || !key_path.exists() {
1673 let _ = std::fs::create_dir_all(&dir);
1674 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1675 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1676 std::fs::write(&key_path, key_pem_str.as_bytes())
1677 .map_err(|e| format!("write grpc dev key: {e}"))?;
1678 #[cfg(unix)]
1679 {
1680 use std::os::unix::fs::PermissionsExt;
1681 let _ =
1682 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1683 }
1684 }
1685 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1686 }
1687 };
1688
1689 let client_ca_pem = match &config.grpc_tls_client_ca {
1690 Some(path) => Some(
1691 std::fs::read(path)
1692 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1693 ),
1694 None => None,
1695 };
1696
1697 Ok(crate::GrpcTlsOptions {
1698 cert_pem,
1699 key_pem,
1700 client_ca_pem,
1701 })
1702}
1703
1704fn spawn_grpc_tls_listener_if_configured(
1708 config: &ServerCommandConfig,
1709 runtime: RedDBRuntime,
1710 auth_store: Arc<AuthStore>,
1711) {
1712 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1713 return;
1714 };
1715 let tls_opts = match resolve_grpc_tls_options(config) {
1716 Ok(opts) => opts,
1717 Err(err) => {
1718 tracing::error!(
1719 target: "reddb::security",
1720 transport = "grpc",
1721 err = %err,
1722 "gRPC TLS config error; TLS listener will not start"
1723 );
1724 return;
1725 }
1726 };
1727 tokio::spawn(async move {
1728 let server = RedDBGrpcServer::with_options(
1729 runtime,
1730 GrpcServerOptions {
1731 bind_addr: tls_bind.clone(),
1732 tls: Some(tls_opts),
1733 },
1734 auth_store,
1735 );
1736 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1737 if let Err(err) = server.serve().await {
1738 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1739 }
1740 });
1741}
1742
1743fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1746 use sha2::{Digest, Sha256};
1747 let mut h = Sha256::new();
1748 h.update(pem);
1749 let d = h.finalize();
1750 let mut buf = String::with_capacity(64);
1751 for b in d.iter() {
1752 buf.push_str(&format!("{b:02x}"));
1753 }
1754 buf
1755}
1756
1757fn resolve_wire_tls_config(
1759 config: &ServerCommandConfig,
1760) -> Result<crate::wire::WireTlsConfig, String> {
1761 match (&config.wire_tls_cert, &config.wire_tls_key) {
1762 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1763 cert_path: cert.clone(),
1764 key_path: key.clone(),
1765 }),
1766 _ => {
1767 let dir = config
1769 .path
1770 .as_ref()
1771 .and_then(|p| p.parent())
1772 .map(PathBuf::from)
1773 .unwrap_or_else(|| PathBuf::from("."));
1774 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1775 }
1776 }
1777}
1778
1779#[inline(never)]
1780fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1781 let rt_config = detect_runtime_config();
1782 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1783 let cli_telemetry = config.telemetry.clone();
1784 let db_options = config.to_db_options()?;
1785 let mut transport_readiness = TransportReadiness::default();
1786
1787 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1788 .enable_all()
1789 .worker_threads(workers)
1790 .thread_stack_size(rt_config.stack_size)
1791 .build()
1792 .map_err(|err| format!("tokio runtime: {err}"))?;
1793
1794 let (runtime, _auth_store, _telemetry_guard) =
1798 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1799 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1800 let signal_runtime = runtime.clone();
1801 tokio_runtime.block_on(async move {
1802 spawn_lifecycle_signal_handler(signal_runtime).await;
1803 spawn_pg_listener(&config, &runtime);
1804 let wire_rt = Arc::new(runtime);
1805 let listener = tokio::net::TcpListener::bind(&wire_addr)
1806 .await
1807 .map_err(|err| {
1808 let reason = format!("wire listener bind {wire_addr}: {err}");
1809 transport_readiness.failed(
1810 "wire",
1811 &wire_addr,
1812 config.wire_bind_explicit,
1813 reason.clone(),
1814 );
1815 if config.wire_bind_explicit {
1816 format!("explicit {reason}")
1817 } else {
1818 reason
1819 }
1820 })?;
1821 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1822 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1823 .await
1824 .map_err(|e| e.to_string())
1825 })
1826}
1827
1828#[inline(never)]
1829fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1830 let rt_config = detect_runtime_config();
1831 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1832 let cli_telemetry = config.telemetry.clone();
1833 let db_options = config.to_db_options()?;
1834
1835 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1836 .enable_all()
1837 .worker_threads(workers)
1838 .thread_stack_size(rt_config.stack_size)
1839 .build()
1840 .map_err(|err| format!("tokio runtime: {err}"))?;
1841
1842 let (runtime, _auth_store, _telemetry_guard) =
1843 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1844 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1845 let signal_runtime = runtime.clone();
1846 tokio_runtime.block_on(async move {
1847 spawn_lifecycle_signal_handler(signal_runtime).await;
1848 let cfg = crate::wire::PgWireConfig {
1849 bind_addr: pg_addr,
1850 ..Default::default()
1851 };
1852 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1853 .await
1854 .map_err(|e| e.to_string())
1855 })
1856}
1857
1858#[inline(never)]
1859fn build_runtime_and_auth_store(
1860 db_options: RedDBOptions,
1861 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1862) -> Result<
1863 (
1864 RedDBRuntime,
1865 Arc<AuthStore>,
1866 Option<crate::telemetry::TelemetryGuard>,
1867 ),
1868 String,
1869> {
1870 build_runtime_with_telemetry(db_options, cli_telemetry)
1877}
1878
1879pub(crate) fn build_runtime_with_telemetry(
1889 db_options: RedDBOptions,
1890 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1891) -> Result<
1892 (
1893 RedDBRuntime,
1894 Arc<AuthStore>,
1895 Option<crate::telemetry::TelemetryGuard>,
1896 ),
1897 String,
1898> {
1899 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1900 let msg = err.to_string();
1906 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1907 phase: "runtime_construction".to_string(),
1908 error: msg.clone(),
1909 }
1910 .emit_global();
1911 msg
1912 })?;
1913
1914 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1919 let msg = err.to_string();
1920 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1921 phase: "lease_loop".to_string(),
1922 error: msg.clone(),
1923 }
1924 .emit_global();
1925 msg
1926 })?;
1927
1928 if let Some(data_path) = db_options.data_path.as_deref() {
1932 let watch_dir = data_path.parent().unwrap_or(data_path);
1933 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1934 }
1935
1936 {
1940 let config_path = crate::runtime::config_overlay::config_file_path();
1941 let store = runtime.db().store();
1942 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1943 }
1944
1945 let merged = merge_telemetry_with_config(
1948 cli_telemetry
1949 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1950 &runtime,
1951 );
1952 let telemetry_guard = crate::telemetry::init(merged);
1953
1954 let no_auth = no_auth_active(&db_options);
1955 let auth_store =
1956 if db_options.auth.vault_enabled {
1957 let pager =
1958 runtime.db().store().pager().cloned().ok_or_else(|| {
1959 "vault requires a paged database (persistent mode)".to_string()
1960 })?;
1961 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1962 .map_err(|err| err.to_string())?;
1963 Arc::new(store)
1964 } else {
1965 Arc::new(AuthStore::new(db_options.auth.clone()))
1966 };
1967 if no_auth {
1973 eprintln!("{NO_AUTH_WARNING}");
1974 tracing::warn!("{NO_AUTH_WARNING}");
1975 } else {
1976 auth_store.bootstrap_from_env();
1977 }
1978
1979 {
1981 let store = Arc::clone(&auth_store);
1982 std::thread::Builder::new()
1983 .name("reddb-session-purge".into())
1984 .spawn(move || loop {
1985 std::thread::sleep(std::time::Duration::from_secs(300));
1986 store.purge_expired_sessions();
1987 })
1988 .ok();
1989 }
1990
1991 Ok((runtime, auth_store, telemetry_guard))
1992}
1993
1994fn merge_telemetry_with_config(
2005 mut cli: crate::telemetry::TelemetryConfig,
2006 runtime: &RedDBRuntime,
2007) -> crate::telemetry::TelemetryConfig {
2008 use crate::storage::schema::Value;
2009
2010 let store = runtime.db().store();
2011
2012 if !cli.level_explicit {
2013 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2014 cli.level_filter = v.to_string();
2015 }
2016 }
2017 if !cli.format_explicit {
2018 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2019 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2020 cli.format = parsed;
2021 }
2022 }
2023 }
2024 if !cli.rotation_keep_days_explicit {
2025 match store.get_config("red.logging.keep_days") {
2026 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2027 cli.rotation_keep_days = n as u16
2028 }
2029 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2030 cli.rotation_keep_days = n as u16
2031 }
2032 Some(Value::Text(v)) => {
2033 if let Ok(n) = v.parse::<u16>() {
2034 cli.rotation_keep_days = n;
2035 }
2036 }
2037 _ => {}
2038 }
2039 }
2040 if !cli.file_prefix_explicit {
2041 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2042 if !v.is_empty() {
2043 cli.file_prefix = v.to_string();
2044 }
2045 }
2046 }
2047 if !cli.log_dir_explicit && !cli.log_file_disabled {
2050 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2051 if !v.is_empty() {
2052 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2053 }
2054 }
2055 }
2056
2057 cli
2058}
2059
2060#[cfg(test)]
2061mod telemetry_merge_tests {
2062 use super::*;
2063 use crate::telemetry::{LogFormat, TelemetryConfig};
2064
2065 fn fresh_runtime() -> RedDBRuntime {
2066 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2067 }
2068
2069 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2070 runtime
2071 .db()
2072 .store()
2073 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2074 }
2075
2076 fn cli_base() -> TelemetryConfig {
2077 TelemetryConfig {
2080 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2081 format: LogFormat::Json,
2082 ..Default::default()
2083 }
2084 }
2085
2086 #[test]
2087 fn config_log_dir_promoted_when_flag_absent() {
2088 let runtime = fresh_runtime();
2089 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2090 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2091 assert_eq!(
2092 merged.log_dir.as_deref(),
2093 Some(std::path::Path::new("/var/log/reddb"))
2094 );
2095 }
2096
2097 #[test]
2098 fn explicit_log_dir_wins_over_config() {
2099 let runtime = fresh_runtime();
2100 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2101 let mut cli = cli_base();
2102 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2103 cli.log_dir_explicit = true;
2104 let merged = merge_telemetry_with_config(cli, &runtime);
2105 assert_eq!(
2106 merged.log_dir.as_deref(),
2107 Some(std::path::Path::new("/custom/dir"))
2108 );
2109 }
2110
2111 #[test]
2112 fn no_log_file_beats_config_log_dir() {
2113 let runtime = fresh_runtime();
2114 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2115 let mut cli = cli_base();
2116 cli.log_dir = None;
2117 cli.log_file_disabled = true;
2118 let merged = merge_telemetry_with_config(cli, &runtime);
2119 assert!(
2120 merged.log_dir.is_none(),
2121 "--no-log-file must veto config dir"
2122 );
2123 }
2124
2125 #[test]
2126 fn config_format_promoted_on_non_tty_default() {
2127 let runtime = fresh_runtime();
2131 set_str(&runtime, "red.logging.format", "pretty");
2132 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2133 assert_eq!(merged.format, LogFormat::Pretty);
2134 }
2135
2136 #[test]
2137 fn explicit_format_wins_over_config() {
2138 let runtime = fresh_runtime();
2139 set_str(&runtime, "red.logging.format", "pretty");
2140 let mut cli = cli_base();
2141 cli.format = LogFormat::Json;
2142 cli.format_explicit = true;
2143 let merged = merge_telemetry_with_config(cli, &runtime);
2144 assert_eq!(merged.format, LogFormat::Json);
2145 }
2146}
2147
2148#[inline(never)]
2149fn build_http_server(
2150 runtime: RedDBRuntime,
2151 auth_store: Arc<AuthStore>,
2152 bind_addr: String,
2153) -> RedDBServer {
2154 build_http_server_with_transport_readiness(
2155 runtime,
2156 auth_store,
2157 bind_addr,
2158 TransportReadiness::default(),
2159 )
2160}
2161
2162fn apply_http_limits(
2168 server: RedDBServer,
2169 config: &ServerCommandConfig,
2170 runtime: &RedDBRuntime,
2171) -> RedDBServer {
2172 let store = runtime.db().store();
2173 let resolved =
2174 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2175 .get_config(key)
2176 {
2177 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2178 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2179 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2180 _ => None,
2181 });
2182 tracing::info!(
2183 target: "reddb::http_limits",
2184 max_handlers = resolved.max_handlers,
2185 handler_timeout_ms = resolved.handler_timeout_ms,
2186 retry_after_secs = resolved.retry_after_secs,
2187 "http_limits resolved"
2188 );
2189 server.with_http_limits(resolved)
2190}
2191
2192#[inline(never)]
2193fn build_http_server_with_transport_readiness(
2194 runtime: RedDBRuntime,
2195 auth_store: Arc<AuthStore>,
2196 bind_addr: String,
2197 transport_readiness: TransportReadiness,
2198) -> RedDBServer {
2199 RedDBServer::with_options(
2200 runtime,
2201 ServerOptions {
2202 bind_addr,
2203 transport_readiness,
2204 ..ServerOptions::default()
2205 },
2206 )
2207 .with_auth(auth_store)
2208}
2209
2210#[inline(never)]
2214fn build_admin_only_server(
2215 runtime: RedDBRuntime,
2216 auth_store: Arc<AuthStore>,
2217 bind_addr: String,
2218) -> RedDBServer {
2219 RedDBServer::with_options(
2220 runtime,
2221 ServerOptions {
2222 bind_addr,
2223 surface: crate::server::ServerSurface::AdminOnly,
2224 ..ServerOptions::default()
2225 },
2226 )
2227 .with_auth(auth_store)
2228}
2229
2230#[inline(never)]
2234fn build_metrics_only_server(
2235 runtime: RedDBRuntime,
2236 auth_store: Arc<AuthStore>,
2237 bind_addr: String,
2238) -> RedDBServer {
2239 RedDBServer::with_options(
2240 runtime,
2241 ServerOptions {
2242 bind_addr,
2243 surface: crate::server::ServerSurface::MetricsOnly,
2244 ..ServerOptions::default()
2245 },
2246 )
2247 .with_auth(auth_store)
2248}
2249
2250fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2254 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2255 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2256 let _ = server.serve_in_background();
2257 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2258 }
2259 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2260 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2261 let _ = server.serve_in_background();
2262 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2263 }
2264}
2265
2266#[inline(never)]
2267fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2268 let cli_telemetry = config.telemetry.clone();
2269 let mut transport_readiness = TransportReadiness::default();
2270 let Some(listener) = bind_listener_for_startup(
2271 &mut transport_readiness,
2272 "http",
2273 &bind_addr,
2274 config.http_bind_explicit,
2275 )?
2276 else {
2277 return Err(format!(
2278 "no HTTP listener started; implicit bind {} failed",
2279 bind_addr
2280 ));
2281 };
2282 let db_options = config.to_db_options()?;
2283 let (runtime, auth_store, _telemetry_guard) =
2284 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2285 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2286 spawn_admin_metrics_listeners(&runtime, &auth_store);
2287 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2288 let server = build_http_server_with_transport_readiness(
2289 runtime.clone(),
2290 auth_store,
2291 bind_addr.clone(),
2292 transport_readiness,
2293 );
2294 let server = apply_http_limits(server, &config, &runtime);
2295 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2296 server.serve_on(listener).map_err(|err| err.to_string())
2297}
2298
2299fn spawn_http_tls_listener(
2305 config: &ServerCommandConfig,
2306 runtime: &RedDBRuntime,
2307 auth_store: &Arc<AuthStore>,
2308) -> Result<(), String> {
2309 let Some(addr) = config.http_tls_bind_addr.clone() else {
2310 return Ok(());
2311 };
2312
2313 let tls_config = resolve_http_tls_config(config)?;
2314 let server_config = crate::server::tls::build_server_config(&tls_config)
2315 .map_err(|err| format!("HTTP TLS: {err}"))?;
2316
2317 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2318 let server = apply_http_limits(server, config, runtime);
2319 let _handle = server.serve_tls_in_background(server_config);
2320 tracing::info!(
2321 transport = "https",
2322 bind = %addr,
2323 mtls = %tls_config.client_ca_path.is_some(),
2324 "TLS listener online"
2325 );
2326 Ok(())
2327}
2328
2329fn resolve_http_tls_config(
2331 config: &ServerCommandConfig,
2332) -> Result<crate::server::tls::HttpTlsConfig, String> {
2333 match (&config.http_tls_cert, &config.http_tls_key) {
2334 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2335 cert_path: cert.clone(),
2336 key_path: key.clone(),
2337 client_ca_path: config.http_tls_client_ca.clone(),
2338 }),
2339 (None, None) => {
2340 let dir = config
2342 .path
2343 .as_ref()
2344 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2345 .unwrap_or_else(|| std::path::PathBuf::from("."));
2346 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2347 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2348 Ok(crate::server::tls::HttpTlsConfig {
2349 cert_path: auto.cert_path,
2350 key_path: auto.key_path,
2351 client_ca_path: config.http_tls_client_ca.clone(),
2352 })
2353 }
2354 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2355 }
2356}
2357
2358#[inline(never)]
2359fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2360 let workers = config.workers;
2361 let cli_telemetry = config.telemetry.clone();
2362 let db_options = config.to_db_options()?;
2363 let rt_config = detect_runtime_config();
2364 let mut transport_readiness = TransportReadiness::default();
2365 let Some(grpc_listener) = bind_listener_for_startup(
2366 &mut transport_readiness,
2367 "grpc",
2368 &bind_addr,
2369 config.grpc_bind_explicit,
2370 )?
2371 else {
2372 return Err(format!(
2373 "no gRPC listener started; implicit bind {} failed",
2374 bind_addr
2375 ));
2376 };
2377
2378 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2379
2380 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2381 .enable_all()
2382 .worker_threads(worker_threads)
2383 .thread_stack_size(rt_config.stack_size)
2384 .build()
2385 .map_err(|err| format!("tokio runtime: {err}"))?;
2386
2387 let (runtime, auth_store, _telemetry_guard) =
2389 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2390 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2391 let signal_runtime = runtime.clone();
2392 tokio_runtime.block_on(async move {
2393 spawn_lifecycle_signal_handler(signal_runtime).await;
2394 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2396
2397 spawn_pg_listener(&config, &runtime);
2399
2400 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2404
2405 let server = RedDBGrpcServer::with_options(
2406 runtime,
2407 GrpcServerOptions {
2408 bind_addr: bind_addr.clone(),
2409 tls: None,
2410 },
2411 auth_store,
2412 );
2413
2414 tracing::info!(
2415 transport = "grpc",
2416 bind = %bind_addr,
2417 cpus = rt_config.available_cpus,
2418 workers = worker_threads,
2419 "listener online"
2420 );
2421 server
2422 .serve_on(grpc_listener)
2423 .await
2424 .map_err(|err| err.to_string())
2425 })
2426}
2427
2428#[inline(never)]
2429fn run_dual_server(
2430 config: ServerCommandConfig,
2431 grpc_bind_addr: String,
2432 http_bind_addr: String,
2433) -> Result<(), String> {
2434 let workers = config.workers;
2435 let cli_telemetry = config.telemetry.clone();
2436 let db_options = config.to_db_options()?;
2437 let rt_config = detect_runtime_config();
2438 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2439 let mut transport_readiness = TransportReadiness::default();
2440 let http_listener = bind_listener_for_startup(
2441 &mut transport_readiness,
2442 "http",
2443 &http_bind_addr,
2444 config.http_bind_explicit,
2445 )?;
2446 let grpc_listener = bind_listener_for_startup(
2447 &mut transport_readiness,
2448 "grpc",
2449 &grpc_bind_addr,
2450 config.grpc_bind_explicit,
2451 )?;
2452 if http_listener.is_none() && grpc_listener.is_none() {
2453 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2454 }
2455 let (runtime, auth_store, _telemetry_guard) =
2456 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2457 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2458
2459 spawn_admin_metrics_listeners(&runtime, &auth_store);
2460 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2461
2462 let http_handle = if let Some(listener) = http_listener {
2463 let http_server = build_http_server_with_transport_readiness(
2464 runtime.clone(),
2465 auth_store.clone(),
2466 http_bind_addr.clone(),
2467 transport_readiness.clone(),
2468 );
2469 let http_server = apply_http_limits(http_server, &config, &runtime);
2470 Some(http_server.serve_in_background_on(listener))
2471 } else {
2472 None
2473 };
2474
2475 thread::sleep(Duration::from_millis(150));
2476 if let Some(handle) = http_handle.as_ref() {
2477 if handle.is_finished() {
2478 let handle = http_handle.unwrap();
2479 return match handle.join() {
2480 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2481 Ok(Err(err)) => Err(err.to_string()),
2482 Err(_) => Err("HTTP server thread panicked".to_string()),
2483 };
2484 }
2485 }
2486 if grpc_listener.is_none() {
2487 let Some(handle) = http_handle else {
2488 return Err("no listener started".to_string());
2489 };
2490 return match handle.join() {
2491 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2492 Ok(Err(err)) => Err(err.to_string()),
2493 Err(_) => Err("HTTP server thread panicked".to_string()),
2494 };
2495 }
2496 let grpc_listener = grpc_listener.expect("checked above");
2497
2498 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2499 .enable_all()
2500 .worker_threads(worker_threads)
2501 .thread_stack_size(rt_config.stack_size)
2502 .build()
2503 .map_err(|err| format!("tokio runtime: {err}"))?;
2504
2505 let signal_runtime = runtime.clone();
2506 tokio_runtime.block_on(async move {
2507 spawn_lifecycle_signal_handler(signal_runtime).await;
2508 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2510
2511 spawn_pg_listener(&config, &runtime);
2513
2514 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2516
2517 let server = RedDBGrpcServer::with_options(
2518 runtime,
2519 GrpcServerOptions {
2520 bind_addr: grpc_bind_addr.clone(),
2521 tls: None,
2522 },
2523 auth_store,
2524 );
2525
2526 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2527 tracing::info!(
2528 transport = "grpc",
2529 bind = %grpc_bind_addr,
2530 cpus = rt_config.available_cpus,
2531 workers = worker_threads,
2532 "listener online"
2533 );
2534 server
2535 .serve_on(grpc_listener)
2536 .await
2537 .map_err(|err| err.to_string())
2538 })
2539}
2540
2541#[cfg(test)]
2542mod tests {
2543 use super::*;
2544
2545 #[test]
2546 fn render_systemd_unit_contains_expected_execstart() {
2547 let config = SystemdServiceConfig {
2548 service_name: "reddb".to_string(),
2549 binary_path: PathBuf::from("/usr/local/bin/red"),
2550 run_user: "reddb".to_string(),
2551 run_group: "reddb".to_string(),
2552 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2553 router_bind_addr: None,
2554 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2555 http_bind_addr: None,
2556 };
2557
2558 let unit = render_systemd_unit(&config);
2559 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2560 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2561 }
2562
2563 #[test]
2564 fn systemd_service_config_derives_paths() {
2565 let config = SystemdServiceConfig {
2566 service_name: "reddb-api".to_string(),
2567 binary_path: PathBuf::from("/usr/local/bin/red"),
2568 run_user: "reddb".to_string(),
2569 run_group: "reddb".to_string(),
2570 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2571 router_bind_addr: None,
2572 grpc_bind_addr: None,
2573 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2574 };
2575
2576 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2577 assert_eq!(
2578 config.unit_path(),
2579 PathBuf::from("/etc/systemd/system/reddb-api.service")
2580 );
2581 }
2582
2583 #[test]
2584 fn render_systemd_unit_supports_dual_transport() {
2585 let config = SystemdServiceConfig {
2586 service_name: "reddb".to_string(),
2587 binary_path: PathBuf::from("/usr/local/bin/red"),
2588 run_user: "reddb".to_string(),
2589 run_group: "reddb".to_string(),
2590 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2591 router_bind_addr: None,
2592 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2593 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2594 };
2595
2596 let unit = render_systemd_unit(&config);
2597 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2598 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2599 }
2600
2601 #[test]
2602 fn render_systemd_unit_supports_router_mode() {
2603 let config = SystemdServiceConfig {
2604 service_name: "reddb".to_string(),
2605 binary_path: PathBuf::from("/usr/local/bin/red"),
2606 run_user: "reddb".to_string(),
2607 run_group: "reddb".to_string(),
2608 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2609 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2610 grpc_bind_addr: None,
2611 http_bind_addr: None,
2612 };
2613
2614 let unit = render_systemd_unit(&config);
2615 assert!(unit.contains("--bind 127.0.0.1:5050"));
2616 assert!(!unit.contains("--grpc-bind"));
2617 assert!(!unit.contains("--http-bind"));
2618 }
2619
2620 #[test]
2621 fn explicit_bind_collision_is_fatal() {
2622 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2623 let addr = held.local_addr().expect("held addr").to_string();
2624 let mut readiness = TransportReadiness::default();
2625
2626 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2627
2628 assert!(error.contains("explicit http listener bind"));
2629 assert_eq!(readiness.active.len(), 0);
2630 assert_eq!(readiness.failed.len(), 1);
2631 assert!(readiness.failed[0].explicit);
2632 assert_eq!(readiness.failed[0].bind_addr, addr);
2633 }
2634
2635 fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2642 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2643 LOCK.get_or_init(|| std::sync::Mutex::new(()))
2644 }
2645
2646 fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
2647 ServerCommandConfig {
2648 path: None,
2649 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2650 router_bind_explicit: false,
2651 grpc_bind_addr: None,
2652 grpc_bind_explicit: false,
2653 grpc_tls_bind_addr: None,
2654 grpc_tls_cert: None,
2655 grpc_tls_key: None,
2656 grpc_tls_client_ca: None,
2657 http_bind_addr: None,
2658 http_bind_explicit: false,
2659 http_tls_bind_addr: None,
2660 http_tls_cert: None,
2661 http_tls_key: None,
2662 http_tls_client_ca: None,
2663 wire_bind_addr: None,
2664 wire_bind_explicit: false,
2665 wire_tls_bind_addr: None,
2666 wire_tls_cert: None,
2667 wire_tls_key: None,
2668 pg_bind_addr: None,
2669 create_if_missing: true,
2670 read_only: false,
2671 role: "standalone".to_string(),
2672 primary_addr: None,
2673 vault: true,
2676 no_auth,
2677 workers: None,
2678 telemetry: None,
2679 http_limits_cli: crate::server::HttpLimitsCliInput::default(),
2680 }
2681 }
2682
2683 #[test]
2684 fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
2685 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2686 unsafe {
2691 std::env::set_var("REDDB_USERNAME", "admin");
2692 std::env::set_var("REDDB_PASSWORD", "hunter2");
2693 }
2694 let config = no_auth_test_config(true);
2695 let options = config.to_db_options().expect("to_db_options");
2696
2697 assert!(no_auth_active(&options), "metadata should be stamped");
2698 assert!(!options.auth.enabled, "auth.enabled must be forced off");
2699 assert!(
2700 !options.auth.require_auth,
2701 "require_auth must be forced off"
2702 );
2703 assert!(
2704 !options.auth.vault_enabled,
2705 "vault_enabled must be forced off (overrides --vault)"
2706 );
2707 assert_eq!(
2708 options.metadata.get(NO_AUTH_META).map(String::as_str),
2709 Some("true"),
2710 );
2711
2712 unsafe {
2714 std::env::remove_var("REDDB_USERNAME");
2715 std::env::remove_var("REDDB_PASSWORD");
2716 }
2717 }
2718
2719 #[test]
2720 fn default_behaviour_without_no_auth_flag_is_unchanged() {
2721 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2722 let config = no_auth_test_config(false);
2723 let options = config.to_db_options().expect("to_db_options");
2724
2725 assert!(
2726 !no_auth_active(&options),
2727 "default boot must not be marked no-auth"
2728 );
2729 assert!(
2730 options.metadata.get(NO_AUTH_META).is_none(),
2731 "metadata key must be absent when flag is off"
2732 );
2733 assert!(options.auth.vault_enabled);
2735 }
2736
2737 #[test]
2738 fn no_auth_active_blocks_bootstrap_from_env() {
2739 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2740 unsafe {
2745 std::env::set_var("REDDB_USERNAME", "admin");
2746 std::env::set_var("REDDB_PASSWORD", "hunter2");
2747 }
2748
2749 let options = no_auth_test_config(true)
2750 .to_db_options()
2751 .expect("to_db_options");
2752
2753 let auth_store = AuthStore::new(options.auth.clone());
2757 if !no_auth_active(&options) {
2758 auth_store.bootstrap_from_env();
2759 }
2760
2761 assert!(
2762 auth_store.needs_bootstrap(),
2763 "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
2764 );
2765
2766 unsafe {
2768 std::env::remove_var("REDDB_USERNAME");
2769 std::env::remove_var("REDDB_PASSWORD");
2770 }
2771 }
2772
2773 #[test]
2774 fn implicit_bind_collision_degrades() {
2775 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2776 let addr = held.local_addr().expect("held addr").to_string();
2777 let mut readiness = TransportReadiness::default();
2778
2779 let listener =
2780 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2781
2782 assert!(listener.is_none());
2783 assert_eq!(readiness.active.len(), 0);
2784 assert_eq!(readiness.failed.len(), 1);
2785 assert!(!readiness.failed[0].explicit);
2786 assert_eq!(readiness.failed[0].bind_addr, addr);
2787 }
2788}