1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub router_bind_explicit: bool,
71 pub grpc_bind_addr: Option<String>,
72 pub grpc_bind_explicit: bool,
73 pub grpc_tls_bind_addr: Option<String>,
77 pub grpc_tls_cert: Option<PathBuf>,
83 pub grpc_tls_key: Option<PathBuf>,
86 pub grpc_tls_client_ca: Option<PathBuf>,
91 pub http_bind_addr: Option<String>,
92 pub http_bind_explicit: bool,
93 pub http_tls_bind_addr: Option<String>,
97 pub http_tls_cert: Option<PathBuf>,
100 pub http_tls_key: Option<PathBuf>,
103 pub http_tls_client_ca: Option<PathBuf>,
107 pub wire_bind_addr: Option<String>,
108 pub wire_bind_explicit: bool,
109 pub wire_tls_bind_addr: Option<String>,
111 pub wire_tls_cert: Option<PathBuf>,
113 pub wire_tls_key: Option<PathBuf>,
115 pub pg_bind_addr: Option<String>,
119 pub create_if_missing: bool,
120 pub read_only: bool,
121 pub role: String,
122 pub primary_addr: Option<String>,
123 pub vault: bool,
124 pub workers: Option<usize>,
126 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
129 pub http_limits_cli: crate::server::HttpLimitsCliInput,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct TransportListenerState {
138 pub transport: String,
139 pub bind_addr: String,
140 pub explicit: bool,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct TransportListenerFailure {
145 pub transport: String,
146 pub bind_addr: String,
147 pub explicit: bool,
148 pub reason: String,
149}
150
151#[derive(Debug, Clone, Default, PartialEq, Eq)]
152pub struct TransportReadiness {
153 pub active: Vec<TransportListenerState>,
154 pub failed: Vec<TransportListenerFailure>,
155}
156
157impl TransportReadiness {
158 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
159 self.active.push(TransportListenerState {
160 transport: transport.to_string(),
161 bind_addr: bind_addr.to_string(),
162 explicit,
163 });
164 }
165
166 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
167 self.failed.push(TransportListenerFailure {
168 transport: transport.to_string(),
169 bind_addr: bind_addr.to_string(),
170 explicit,
171 reason,
172 });
173 }
174}
175
176#[derive(Debug, Clone)]
177pub struct SystemdServiceConfig {
178 pub service_name: String,
179 pub binary_path: PathBuf,
180 pub run_user: String,
181 pub run_group: String,
182 pub data_path: PathBuf,
183 pub router_bind_addr: Option<String>,
184 pub grpc_bind_addr: Option<String>,
185 pub http_bind_addr: Option<String>,
186}
187
188impl SystemdServiceConfig {
189 pub fn data_dir(&self) -> PathBuf {
190 self.data_path
191 .parent()
192 .map(PathBuf::from)
193 .unwrap_or_else(|| PathBuf::from("."))
194 }
195
196 pub fn unit_path(&self) -> PathBuf {
197 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
198 }
199}
200
201pub fn default_telemetry_for_path(
206 path: Option<&std::path::Path>,
207) -> crate::telemetry::TelemetryConfig {
208 let log_dir = match path {
209 Some(p) => p
210 .parent()
211 .map(|parent| parent.join("logs"))
212 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
213 None => None, };
215 crate::telemetry::TelemetryConfig {
216 log_dir,
217 file_prefix: "reddb.log".to_string(),
218 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
219 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
220 crate::telemetry::LogFormat::Pretty
221 } else {
222 crate::telemetry::LogFormat::Json
223 },
224 rotation_keep_days: 14,
225 service_name: "reddb",
226 level_explicit: false,
228 format_explicit: false,
229 rotation_keep_days_explicit: false,
230 file_prefix_explicit: false,
231 log_dir_explicit: false,
232 log_file_disabled: false,
233 }
234}
235
236const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
243const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
244const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
245const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
249
250impl ServerCommandConfig {
251 fn to_db_options(&self) -> Result<RedDBOptions, String> {
252 let mut options = match &self.path {
253 Some(path) => RedDBOptions::persistent(path),
254 None => RedDBOptions::in_memory(),
255 };
256
257 options.mode = StorageMode::Persistent;
258 options.create_if_missing = self.create_if_missing;
259 options.read_only = self.read_only
266 || env_nonempty("RED_READONLY")
267 .or_else(|| env_nonempty("REDDB_READONLY"))
268 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
269 .unwrap_or(false)
270 || self.path.as_ref().is_some_and(|data_path| {
271 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
272 data_path,
273 ))
274 .unwrap_or(false)
275 });
276
277 options.replication = match self.role.as_str() {
278 "primary" => ReplicationConfig::primary(),
279 "replica" => {
280 let primary_addr = self
281 .primary_addr
282 .clone()
283 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
284 ReplicationConfig::replica(primary_addr)
291 }
292 _ => ReplicationConfig::standalone(),
293 };
294
295 if self.vault {
296 options.auth.vault_enabled = true;
297 }
298
299 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
304 Err(msg) => {
305 return Err(format!("backup bootstrap: {msg}"));
306 }
307 Ok(Some(cfg)) => {
308 apply_backup_config(&mut options, &cfg);
309 }
310 Ok(None) => {
311 configure_remote_backend_from_env(&mut options);
312 }
313 }
314
315 Ok(options)
316 }
317
318 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
319 let mut transports = Vec::with_capacity(3);
320 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
321 transports.push(ServerTransport::Grpc);
322 }
323 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
324 transports.push(ServerTransport::Http);
325 }
326 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
327 transports.push(ServerTransport::Wire);
328 }
329 transports
330 }
331}
332
333fn env_nonempty(name: &str) -> Option<String> {
338 crate::utils::env_with_file_fallback(name)
339}
340
341fn env_truthy(name: &str) -> bool {
342 env_nonempty(name)
343 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
344 .unwrap_or(false)
345}
346
347fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
353 let endpoint_host = endpoint_host(&cfg.endpoint);
354
355 options.metadata.insert(
356 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
357 cfg.checkpoint_interval_secs.to_string(),
358 );
359 options.metadata.insert(
360 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
361 cfg.wal_flush_interval_secs.to_string(),
362 );
363 options
364 .metadata
365 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
366 options.metadata.insert(
367 BACKUP_PAUSE_ON_LAG_META.to_string(),
368 cfg.pause_on_lag_secs.to_string(),
369 );
370
371 #[cfg(feature = "backend-s3")]
372 {
373 let s3_cfg = crate::storage::backend::S3Config {
374 endpoint: cfg.endpoint.clone(),
375 bucket: cfg.bucket.clone(),
376 key_prefix: cfg.prefix.clone(),
377 access_key: cfg.access_key_id.clone(),
378 secret_key: cfg.secret_access_key.clone(),
379 region: cfg.region.clone(),
380 path_style: true,
381 };
382 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
383 options.remote_backend = Some(backend.clone());
384 options.remote_backend_atomic = Some(backend);
385 let trimmed = cfg.prefix.trim_end_matches('/');
390 options.remote_key = Some(format!("{}/data.rdb", trimmed));
391
392 tracing::info!(
393 backend = "s3",
394 endpoint = %endpoint_host,
395 bucket = %cfg.bucket,
396 prefix = %cfg.prefix,
397 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
398 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
399 "backup backend configured from REDDB_BACKUP_* env"
400 );
401 }
402
403 #[cfg(not(feature = "backend-s3"))]
404 {
405 tracing::warn!(
406 backend = "s3",
407 endpoint = %endpoint_host,
408 bucket = %cfg.bucket,
409 prefix = %cfg.prefix,
410 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
411 backend wiring skipped (archiver/checkpointer also disabled)"
412 );
413 }
414}
415
416fn endpoint_host(endpoint: &str) -> &str {
417 let after_scheme = endpoint
418 .split_once("://")
419 .map(|(_, r)| r)
420 .unwrap_or(endpoint);
421 after_scheme.split('/').next().unwrap_or(after_scheme)
422}
423
424fn spawn_backup_tasks_if_configured(
430 options: &RedDBOptions,
431 runtime: &RedDBRuntime,
432) -> Option<BackupTasksHandle> {
433 let checkpoint_secs: u64 = options
434 .metadata
435 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
436 .parse()
437 .ok()?;
438 let wal_secs: u64 = options
439 .metadata
440 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
441 .parse()
442 .ok()?;
443 let pause_on_lag_secs: u64 = options
446 .metadata
447 .get(BACKUP_PAUSE_ON_LAG_META)
448 .and_then(|raw| raw.parse().ok())
449 .unwrap_or(0);
450 options.remote_backend.as_ref()?;
451
452 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
453
454 if pause_on_lag_secs > 0 {
459 let now_ms = std::time::SystemTime::now()
460 .duration_since(std::time::UNIX_EPOCH)
461 .map(|d| d.as_millis() as u64)
462 .unwrap_or(0);
463 runtime
464 .write_gate()
465 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
466 tracing::info!(
467 pause_on_lag_secs,
468 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
469 );
470 }
471
472 let checkpoint_handle = {
473 let stop = Arc::clone(&stop);
474 let runtime = runtime.clone();
475 let interval = Duration::from_secs(checkpoint_secs);
476 thread::Builder::new()
477 .name("red-checkpointer".into())
478 .spawn(move || {
479 periodic_loop(stop, interval, move || {
480 if let Err(err) = runtime.checkpoint() {
481 tracing::warn!(error = %err, "periodic checkpoint failed");
482 }
483 })
484 })
485 .ok()
486 };
487
488 let archiver_handle = {
489 let stop = Arc::clone(&stop);
490 let runtime = runtime.clone();
491 let interval = Duration::from_secs(wal_secs);
492 let lag_enabled = pause_on_lag_secs > 0;
493 thread::Builder::new()
494 .name("red-wal-archiver".into())
495 .spawn(move || {
496 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
497 Ok(_) if lag_enabled => {
498 let now_ms = std::time::SystemTime::now()
499 .duration_since(std::time::UNIX_EPOCH)
500 .map(|d| d.as_millis() as u64)
501 .unwrap_or(0);
502 runtime.write_gate().record_archive_success(now_ms);
503 runtime.write_gate().evaluate_archive_lag(now_ms);
507 }
508 Ok(_) => {}
509 Err(err) => {
510 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
511 }
512 })
513 })
514 .ok()
515 };
516
517 let lag_monitor_handle = if pause_on_lag_secs > 0 {
522 let stop = Arc::clone(&stop);
523 let runtime = runtime.clone();
524 let interval = Duration::from_secs(5);
528 thread::Builder::new()
529 .name("red-archive-lag-monitor".into())
530 .spawn(move || {
531 periodic_loop(stop, interval, move || {
532 let now_ms = std::time::SystemTime::now()
533 .duration_since(std::time::UNIX_EPOCH)
534 .map(|d| d.as_millis() as u64)
535 .unwrap_or(0);
536 let was_paused = runtime.write_gate().is_auto_paused();
537 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
538 if now_paused && !was_paused {
539 tracing::warn!(
540 pause_on_lag_secs,
541 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
542 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
543 );
544 } else if !now_paused && was_paused {
545 tracing::info!(
546 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
547 );
548 }
549 })
550 })
551 .ok()
552 } else {
553 None
554 };
555
556 tracing::info!(
557 checkpoint_interval_secs = checkpoint_secs,
558 wal_flush_interval_secs = wal_secs,
559 "backup tasks spawned (checkpointer + WAL archiver)"
560 );
561
562 Some(BackupTasksHandle {
563 stop,
564 _checkpoint_handle: checkpoint_handle,
565 _archiver_handle: archiver_handle,
566 _lag_monitor_handle: lag_monitor_handle,
567 })
568}
569
570pub struct BackupTasksHandle {
573 stop: Arc<std::sync::atomic::AtomicBool>,
574 _checkpoint_handle: Option<thread::JoinHandle<()>>,
575 _archiver_handle: Option<thread::JoinHandle<()>>,
576 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
579}
580
581impl Drop for BackupTasksHandle {
582 fn drop(&mut self) {
583 self.stop.store(true, std::sync::atomic::Ordering::Release);
584 }
585}
586
587fn periodic_loop<F: FnMut()>(
588 stop: Arc<std::sync::atomic::AtomicBool>,
589 interval: Duration,
590 mut tick: F,
591) {
592 let wake = Duration::from_secs(1);
595 let mut elapsed = Duration::ZERO;
596 while !stop.load(std::sync::atomic::Ordering::Acquire) {
597 thread::sleep(wake);
598 elapsed += wake;
599 if elapsed >= interval {
600 tick();
601 elapsed = Duration::ZERO;
602 }
603 }
604}
605
606fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
607 let backend = env_nonempty("RED_BACKEND")
613 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
614 .unwrap_or_else(|| "none".to_string())
615 .to_ascii_lowercase();
616
617 match backend.as_str() {
618 "s3" | "minio" | "r2" => {
623 #[cfg(feature = "backend-s3")]
624 {
625 if let Some(config) = s3_config_from_env() {
626 let remote_key = env_nonempty("RED_REMOTE_KEY")
627 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
628 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
629 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
630 options.remote_backend = Some(backend.clone());
631 options.remote_backend_atomic = Some(backend);
632 options.remote_key = Some(remote_key);
633 }
634 }
635 #[cfg(not(feature = "backend-s3"))]
636 {
637 tracing::warn!(
638 backend = %backend,
639 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
640 );
641 }
642 }
643 "fs" | "local" => {
648 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
649 let remote_key = match (
650 base_path,
651 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
652 ) {
653 (Some(base), Some(rel)) => Some(format!(
654 "{}/{}",
655 base.trim_end_matches('/'),
656 rel.trim_start_matches('/')
657 )),
658 (Some(base), None) => Some(format!(
659 "{}/clusters/dev/data.rdb",
660 base.trim_end_matches('/')
661 )),
662 (None, Some(rel)) => Some(rel),
663 (None, None) => None,
664 };
665 if let Some(key) = remote_key {
666 let backend = Arc::new(crate::storage::backend::LocalBackend);
667 options.remote_backend = Some(backend.clone());
668 options.remote_backend_atomic = Some(backend);
669 options.remote_key = Some(key);
670 }
671 }
672 "http" => {
677 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
678 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
679 {
680 Some(u) => u,
681 None => {
682 tracing::warn!(
683 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
684 );
685 return;
686 }
687 };
688 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
689 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
690 .unwrap_or_default();
691 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
692 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
693 {
694 std::fs::read_to_string(&path)
695 .ok()
696 .map(|s| s.trim().to_string())
697 .filter(|s| !s.is_empty())
698 } else {
699 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
700 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
701 };
702
703 let mut config =
704 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
705 if let Some(auth) = auth_header {
706 config = config.with_auth_header(auth);
707 }
708 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
709 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
710 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
711 config = config.with_conditional_writes(conditional_writes);
712 if conditional_writes {
717 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
718 Ok(atomic) => {
719 let atomic_arc = Arc::new(atomic);
720 options.remote_backend = Some(atomic_arc.clone());
721 options.remote_backend_atomic = Some(atomic_arc);
722 }
723 Err(err) => {
724 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
725 options.remote_backend =
726 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
727 }
728 }
729 } else {
730 options.remote_backend =
731 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
732 }
733 options.remote_key = env_nonempty("RED_REMOTE_KEY")
734 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
735 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
736 }
737 "none" | "" => {}
740 other => {
741 tracing::warn!(
742 backend = %other,
743 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
744 );
745 }
746 }
747}
748
749#[cfg(feature = "backend-s3")]
754fn env_s3(suffix: &str) -> Option<String> {
755 env_nonempty(&format!("RED_S3_{suffix}"))
756 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
757}
758
759#[cfg(feature = "backend-s3")]
765fn env_s3_secret(suffix: &str) -> Option<String> {
766 let file_key_red = format!("RED_S3_{suffix}_FILE");
767 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
768 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
769 return std::fs::read_to_string(&path)
770 .ok()
771 .map(|s| s.trim().to_string())
772 .filter(|s| !s.is_empty());
773 }
774 env_s3(suffix)
775}
776
777#[cfg(feature = "backend-s3")]
778fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
779 let endpoint = env_s3("ENDPOINT")?;
780 let bucket = env_s3("BUCKET")?;
781 let access_key = env_s3_secret("ACCESS_KEY")?;
782 let secret_key = env_s3_secret("SECRET_KEY")?;
783 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
784 let key_prefix = env_s3("KEY_PREFIX")
785 .or_else(|| env_s3("PREFIX"))
786 .unwrap_or_default();
787 let path_style = env_s3("PATH_STYLE")
788 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
789 .unwrap_or(true);
790 Some(crate::storage::backend::S3Config {
791 endpoint,
792 bucket,
793 key_prefix,
794 access_key,
795 secret_key,
796 region,
797 path_style,
798 })
799}
800
801pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
802 let data_dir = config.data_dir();
803 let exec_start = render_systemd_exec_start(config);
804 format!(
805 "[Unit]\n\
806Description=RedDB unified database service\n\
807After=network-online.target\n\
808Wants=network-online.target\n\
809\n\
810[Service]\n\
811Type=simple\n\
812User={user}\n\
813Group={group}\n\
814WorkingDirectory={workdir}\n\
815ExecStart={exec_start}\n\
816Restart=always\n\
817RestartSec=2\n\
818LimitSTACK=16M\n\
819NoNewPrivileges=true\n\
820PrivateTmp=true\n\
821ProtectSystem=strict\n\
822ProtectHome=true\n\
823ProtectControlGroups=true\n\
824ProtectKernelTunables=true\n\
825ProtectKernelModules=true\n\
826RestrictNamespaces=true\n\
827LockPersonality=true\n\
828MemoryDenyWriteExecute=true\n\
829ReadWritePaths={workdir}\n\
830\n\
831[Install]\n\
832WantedBy=multi-user.target\n",
833 user = config.run_user,
834 group = config.run_group,
835 workdir = data_dir.display(),
836 exec_start = exec_start,
837 )
838}
839
840#[cfg(target_os = "linux")]
849pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
850 ensure_root()?;
851 ensure_command_available("systemctl")?;
852 ensure_command_available("getent")?;
853 ensure_command_available("groupadd")?;
854 ensure_command_available("useradd")?;
855 ensure_command_available("install")?;
856 ensure_executable(&config.binary_path)?;
857
858 if !command_success("getent", ["group", config.run_group.as_str()])? {
859 run_command("groupadd", ["--system", config.run_group.as_str()])?;
860 }
861
862 if !command_success("id", ["-u", config.run_user.as_str()])? {
863 let data_dir = config.data_dir();
864 run_command(
865 "useradd",
866 [
867 "--system",
868 "--gid",
869 config.run_group.as_str(),
870 "--home-dir",
871 data_dir.to_string_lossy().as_ref(),
872 "--shell",
873 "/usr/sbin/nologin",
874 config.run_user.as_str(),
875 ],
876 )?;
877 }
878
879 let data_dir = config.data_dir();
880 run_command(
881 "install",
882 [
883 "-d",
884 "-o",
885 config.run_user.as_str(),
886 "-g",
887 config.run_group.as_str(),
888 "-m",
889 "0750",
890 data_dir.to_string_lossy().as_ref(),
891 ],
892 )?;
893
894 std::fs::write(config.unit_path(), render_systemd_unit(config))
895 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
896
897 run_command("systemctl", ["daemon-reload"])?;
898 run_command(
899 "systemctl",
900 [
901 "enable",
902 "--now",
903 format!("{}.service", config.service_name).as_str(),
904 ],
905 )?;
906
907 Ok(())
908}
909
910#[cfg(not(target_os = "linux"))]
915pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
916 Err("systemd install is Linux-only — use sc.exe (Windows) or \
917 launchd (macOS) to install the service manually using the \
918 unit printed by `red service print-unit`"
919 .to_string())
920}
921
922#[cfg(target_os = "linux")]
923fn ensure_root() -> Result<(), String> {
924 let output = Command::new("id")
925 .arg("-u")
926 .output()
927 .map_err(|err| format!("failed to determine current uid: {err}"))?;
928 if !output.status.success() {
929 return Err("failed to determine current uid".to_string());
930 }
931 let uid = String::from_utf8_lossy(&output.stdout);
932 if uid.trim() != "0" {
933 return Err("run this command as root (sudo)".to_string());
934 }
935 Ok(())
936}
937
938#[cfg(target_os = "linux")]
939fn ensure_command_available(command: &str) -> Result<(), String> {
940 let status = Command::new("sh")
941 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
942 .status()
943 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
944 if status.success() {
945 Ok(())
946 } else {
947 Err(format!("required command not found: {command}"))
948 }
949}
950
951#[cfg(target_os = "linux")]
952fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
953 let metadata = std::fs::metadata(path)
954 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
955 #[cfg(unix)]
956 {
957 use std::os::unix::fs::PermissionsExt;
958 if metadata.permissions().mode() & 0o111 == 0 {
959 return Err(format!("binary is not executable: {}", path.display()));
960 }
961 }
962 #[cfg(not(unix))]
963 {
964 if !metadata.is_file() {
965 return Err(format!("binary is not a file: {}", path.display()));
966 }
967 }
968 Ok(())
969}
970
971#[cfg(target_os = "linux")]
972fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
973 Command::new(program)
974 .args(args)
975 .status()
976 .map(|status| status.success())
977 .map_err(|err| format!("failed to run {program}: {err}"))
978}
979
980#[cfg(target_os = "linux")]
981fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
982 let output = Command::new(program)
983 .args(args)
984 .output()
985 .map_err(|err| format!("failed to run {program}: {err}"))?;
986 if output.status.success() {
987 return Ok(());
988 }
989
990 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
991 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
992 let detail = if !stderr.is_empty() {
993 stderr
994 } else if !stdout.is_empty() {
995 stdout
996 } else {
997 format!("exit status {}", output.status)
998 };
999 Err(format!("{program} failed: {detail}"))
1000}
1001
1002pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1003 let has_any = config.router_bind_addr.is_some()
1004 || config.grpc_bind_addr.is_some()
1005 || config.http_bind_addr.is_some()
1006 || config.wire_bind_addr.is_some()
1007 || config.pg_bind_addr.is_some();
1008 if !has_any {
1009 return Err("at least one server bind address must be configured".into());
1010 }
1011 let thread_name = if config.router_bind_addr.is_some() {
1012 "red-server-router"
1013 } else {
1014 match (
1015 config.grpc_bind_addr.is_some(),
1016 config.http_bind_addr.is_some(),
1017 ) {
1018 (true, true) => "red-server-dual",
1019 (true, false) => "red-server-grpc",
1020 (false, true) => "red-server-http",
1021 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1022 (false, false) => "red-server-pg-wire",
1023 }
1024 };
1025
1026 let handle = thread::Builder::new()
1027 .name(thread_name.into())
1028 .stack_size(8 * 1024 * 1024)
1029 .spawn(move || run_configured_servers(config))
1030 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1031
1032 match handle.join() {
1033 Ok(result) => result,
1034 Err(_) => Err("server thread panicked".to_string()),
1035 }
1036}
1037
1038fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1039 let mut parts = vec![
1040 config.binary_path.display().to_string(),
1041 "server".to_string(),
1042 "--path".to_string(),
1043 config.data_path.display().to_string(),
1044 ];
1045
1046 if let Some(bind_addr) = &config.router_bind_addr {
1047 parts.push("--bind".to_string());
1048 parts.push(bind_addr.clone());
1049 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1050 parts.push("--grpc-bind".to_string());
1051 parts.push(bind_addr.clone());
1052 }
1053 if let Some(bind_addr) = &config.http_bind_addr {
1054 parts.push("--http-bind".to_string());
1055 parts.push(bind_addr.clone());
1056 }
1057
1058 parts.join(" ")
1059}
1060
1061pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1062 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1063 Ok(addresses) => addresses.collect(),
1064 Err(_) => return false,
1065 };
1066
1067 addresses
1068 .into_iter()
1069 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1070}
1071
1072#[inline(never)]
1073fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1074 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1080 return run_routed_server(config, router_bind_addr);
1081 }
1082
1083 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1084 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1085 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1086 }
1087 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1088 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1089 (None, None) => {
1090 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1091 run_wire_only_server(config, wire_addr)
1092 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1093 run_pg_only_server(config, pg_addr)
1094 } else {
1095 Err("at least one server bind address must be configured".to_string())
1096 }
1097 }
1098 }
1099}
1100
1101pub fn bind_listener_for_startup(
1119 readiness: &mut TransportReadiness,
1120 transport: &str,
1121 bind_addr: &str,
1122 explicit: bool,
1123) -> Result<Option<TcpListener>, String> {
1124 match TcpListener::bind(bind_addr) {
1125 Ok(listener) => {
1126 readiness.active(transport, bind_addr, explicit);
1127 Ok(Some(listener))
1128 }
1129 Err(err) => {
1130 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1131 readiness.failed(transport, bind_addr, explicit, reason.clone());
1132 if explicit {
1133 tracing::error!(
1134 transport,
1135 bind = %bind_addr,
1136 error = %err,
1137 "fatal explicit bind failure"
1138 );
1139 Err(format!("explicit {reason}"))
1140 } else {
1141 tracing::warn!(
1142 transport,
1143 bind = %bind_addr,
1144 error = %err,
1145 "non-fatal implicit bind failure; listener degraded"
1146 );
1147 Ok(None)
1148 }
1149 }
1150 }
1151}
1152
1153async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1176 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1177 .ok()
1178 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1179 .unwrap_or(true);
1180
1181 #[cfg(unix)]
1182 {
1183 use tokio::signal::unix::{signal, SignalKind};
1184
1185 let mut sigterm = match signal(SignalKind::terminate()) {
1186 Ok(s) => s,
1187 Err(err) => {
1188 tracing::warn!(
1189 error = %err,
1190 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1191 );
1192 return;
1193 }
1194 };
1195 let mut sigint = match signal(SignalKind::interrupt()) {
1196 Ok(s) => s,
1197 Err(err) => {
1198 tracing::warn!(error = %err, "could not install SIGINT handler");
1199 return;
1200 }
1201 };
1202 let mut sighup = match signal(SignalKind::hangup()) {
1208 Ok(s) => Some(s),
1209 Err(err) => {
1210 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1211 None
1212 }
1213 };
1214
1215 let reload_runtime = runtime.clone();
1216 tokio::spawn(async move {
1217 loop {
1218 let signal_name = match &mut sighup {
1219 Some(hup) => tokio::select! {
1220 _ = sigterm.recv() => "SIGTERM",
1221 _ = sigint.recv() => "SIGINT",
1222 _ = hup.recv() => "SIGHUP",
1223 },
1224 None => tokio::select! {
1225 _ = sigterm.recv() => "SIGTERM",
1226 _ = sigint.recv() => "SIGINT",
1227 },
1228 };
1229
1230 if signal_name == "SIGHUP" {
1231 handle_sighup_reload(&reload_runtime);
1232 continue; }
1234
1235 tracing::info!(
1236 signal = signal_name,
1237 "lifecycle signal received; shutting down"
1238 );
1239 match runtime.graceful_shutdown(backup_on_shutdown) {
1240 Ok(report) => {
1241 tracing::info!(
1242 duration_ms = report.duration_ms,
1243 flushed_wal = report.flushed_wal,
1244 final_checkpoint = report.final_checkpoint,
1245 backup_uploaded = report.backup_uploaded,
1246 "graceful shutdown complete"
1247 );
1248 }
1249 Err(err) => {
1250 tracing::error!(error = %err, "graceful shutdown failed");
1251 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1257 reason: format!("graceful shutdown failed: {err}"),
1258 }
1259 .emit_global();
1260 }
1261 }
1262 std::process::exit(0);
1263 }
1264 });
1265 }
1266
1267 #[cfg(not(unix))]
1268 {
1269 tokio::spawn(async move {
1270 let interrupted = tokio::signal::ctrl_c().await;
1271 if let Err(err) = interrupted {
1272 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1273 return;
1274 }
1275
1276 tracing::info!(
1277 signal = "Ctrl+C",
1278 "lifecycle signal received; shutting down"
1279 );
1280 match runtime.graceful_shutdown(backup_on_shutdown) {
1281 Ok(report) => {
1282 tracing::info!(
1283 duration_ms = report.duration_ms,
1284 flushed_wal = report.flushed_wal,
1285 final_checkpoint = report.final_checkpoint,
1286 backup_uploaded = report.backup_uploaded,
1287 "graceful shutdown complete"
1288 );
1289 }
1290 Err(err) => {
1291 tracing::error!(error = %err, "graceful shutdown failed");
1292 }
1293 }
1294 std::process::exit(0);
1295 });
1296 }
1297}
1298
1299fn handle_sighup_reload(runtime: &RedDBRuntime) {
1308 let now_ms = std::time::SystemTime::now()
1309 .duration_since(std::time::UNIX_EPOCH)
1310 .map(|d| d.as_millis() as u64)
1311 .unwrap_or(0);
1312 tracing::info!(
1313 target: "reddb::secrets",
1314 ts_unix_ms = now_ms,
1315 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1316 );
1317 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1322 runtime.audit_log().record_event(
1323 AuditEvent::builder("config/sighup_reload")
1324 .source(AuditAuthSource::System)
1325 .resource("secrets")
1326 .outcome(Outcome::Success)
1327 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1328 .build(),
1329 );
1330}
1331
1332#[inline(never)]
1333fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1334 let workers = config.workers;
1335 let cli_telemetry = config.telemetry.clone();
1336 let db_options = config.to_db_options()?;
1337 let rt_config = detect_runtime_config();
1338 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1339 let (runtime, auth_store, _telemetry_guard) =
1340 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1341 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1342
1343 spawn_admin_metrics_listeners(&runtime, &auth_store);
1344
1345 let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1346 .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1347 let http_backend = http_listener
1348 .local_addr()
1349 .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1350 let http_server = build_http_server(
1351 runtime.clone(),
1352 auth_store.clone(),
1353 http_backend.to_string(),
1354 );
1355 let http_server = apply_http_limits(http_server, &config, &runtime);
1356 let http_handle = http_server.serve_in_background_on(http_listener);
1357
1358 thread::sleep(Duration::from_millis(100));
1359 if http_handle.is_finished() {
1360 return match http_handle.join() {
1361 Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1362 Ok(Err(err)) => Err(err.to_string()),
1363 Err(_) => Err("HTTP backend thread panicked".to_string()),
1364 };
1365 }
1366
1367 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1368 .enable_all()
1369 .worker_threads(worker_threads)
1370 .thread_stack_size(rt_config.stack_size)
1371 .build()
1372 .map_err(|err| format!("tokio runtime: {err}"))?;
1373
1374 let signal_runtime = runtime.clone();
1375 tokio_runtime.block_on(async move {
1376 spawn_lifecycle_signal_handler(signal_runtime).await;
1377 let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1378 .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1379 let grpc_backend = grpc_listener
1380 .local_addr()
1381 .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1382 let grpc_server = RedDBGrpcServer::with_options(
1383 runtime.clone(),
1384 GrpcServerOptions {
1385 bind_addr: grpc_backend.to_string(),
1386 tls: None,
1387 },
1388 auth_store,
1389 );
1390 tokio::spawn(async move {
1391 if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1392 tracing::error!(err = %err, "gRPC backend error");
1393 }
1394 });
1395
1396 let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1397 .await
1398 .map_err(|err| format!("bind internal wire listener: {err}"))?;
1399 let wire_backend = wire_listener
1400 .local_addr()
1401 .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1402 let wire_rt = Arc::new(runtime);
1403 tokio::spawn(async move {
1404 if let Err(err) =
1405 crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1406 .await
1407 {
1408 tracing::error!(err = %err, "redwire backend error");
1409 }
1410 });
1411
1412 tracing::info!(
1413 bind = %router_bind_addr,
1414 cpus = rt_config.available_cpus,
1415 workers = worker_threads,
1416 "router bootstrapping"
1417 );
1418 serve_tcp_router(TcpProtocolRouterConfig {
1419 bind_addr: router_bind_addr,
1420 grpc_backend,
1421 http_backend,
1422 wire_backend,
1423 })
1424 .await
1425 .map_err(|err| err.to_string())
1426 })
1427}
1428
1429async fn spawn_wire_listeners(
1431 config: &ServerCommandConfig,
1432 runtime: &RedDBRuntime,
1433 readiness: &mut TransportReadiness,
1434) -> Result<(), String> {
1435 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1437 let wire_rt = Arc::new(runtime.clone());
1438 #[cfg(unix)]
1441 {
1442 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1443 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1444 tokio::spawn(async move {
1445 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1446 &wire_addr, wire_rt,
1447 )
1448 .await
1449 {
1450 tracing::error!(err = %e, "redwire unix listener error");
1451 }
1452 });
1453 return Ok(());
1454 }
1455 }
1456 match tokio::net::TcpListener::bind(&wire_addr).await {
1457 Ok(listener) => {
1458 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1459 tokio::spawn(async move {
1460 if let Err(e) =
1461 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1462 .await
1463 {
1464 tracing::error!(err = %e, "redwire listener error");
1465 }
1466 });
1467 }
1468 Err(err) => {
1469 let reason = format!("wire listener bind {wire_addr}: {err}");
1470 readiness.failed(
1471 "wire",
1472 &wire_addr,
1473 config.wire_bind_explicit,
1474 reason.clone(),
1475 );
1476 if config.wire_bind_explicit {
1477 tracing::error!(
1478 transport = "wire",
1479 bind = %wire_addr,
1480 error = %err,
1481 "fatal explicit bind failure"
1482 );
1483 return Err(format!("explicit {reason}"));
1484 }
1485 tracing::warn!(
1486 transport = "wire",
1487 bind = %wire_addr,
1488 error = %err,
1489 "non-fatal implicit bind failure; listener degraded"
1490 );
1491 }
1492 }
1493 }
1494
1495 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1497 let tls_config = resolve_wire_tls_config(config);
1498 match tls_config {
1499 Ok(tls_cfg) => {
1500 let wire_rt = Arc::new(runtime.clone());
1501 tokio::spawn(async move {
1502 if let Err(e) =
1503 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1504 .await
1505 {
1506 tracing::error!(err = %e, "redwire+tls listener error");
1507 }
1508 });
1509 }
1510 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1511 }
1512 }
1513 Ok(())
1514}
1515
1516fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1523 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1524 let rt = Arc::new(runtime.clone());
1525 tokio::spawn(async move {
1526 let cfg = crate::wire::PgWireConfig {
1527 bind_addr: pg_addr,
1528 ..Default::default()
1529 };
1530 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1531 tracing::error!(err = %e, "pg wire listener error");
1532 }
1533 });
1534 }
1535}
1536
1537fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1551 use crate::utils::secret_file::expand_file_env;
1552
1553 for var in [
1557 "REDDB_GRPC_TLS_CERT",
1558 "REDDB_GRPC_TLS_KEY",
1559 "REDDB_GRPC_TLS_CLIENT_CA",
1560 ] {
1561 if let Err(err) = expand_file_env(var) {
1562 tracing::warn!(
1563 target: "reddb::secrets",
1564 env = %var,
1565 err = %err,
1566 "could not expand *_FILE companion for gRPC TLS"
1567 );
1568 }
1569 }
1570
1571 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1572 (Some(cert), Some(key)) => {
1573 let cert_pem = std::fs::read(cert)
1574 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1575 let key_pem =
1576 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1577 (cert_pem, key_pem)
1578 }
1579 _ => {
1580 let dev = std::env::var("RED_GRPC_TLS_DEV")
1582 .ok()
1583 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1584 .unwrap_or(false);
1585 if !dev {
1586 return Err("gRPC TLS configured but no cert/key supplied — set \
1587 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1588 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1589 .to_string());
1590 }
1591 let dir = config
1592 .path
1593 .as_ref()
1594 .and_then(|p| p.parent())
1595 .map(PathBuf::from)
1596 .unwrap_or_else(|| PathBuf::from("."));
1597 let (cert_pem_str, key_pem_str) =
1598 crate::wire::tls::generate_self_signed_cert("localhost")
1599 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1600
1601 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1606 tracing::warn!(
1607 target: "reddb::security",
1608 transport = "grpc",
1609 cert_sha256 = %fp,
1610 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1611 DO NOT use in production"
1612 );
1613 let cert_path = dir.join("grpc-tls-cert.pem");
1615 let key_path = dir.join("grpc-tls-key.pem");
1616 if !cert_path.exists() || !key_path.exists() {
1617 let _ = std::fs::create_dir_all(&dir);
1618 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1619 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1620 std::fs::write(&key_path, key_pem_str.as_bytes())
1621 .map_err(|e| format!("write grpc dev key: {e}"))?;
1622 #[cfg(unix)]
1623 {
1624 use std::os::unix::fs::PermissionsExt;
1625 let _ =
1626 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1627 }
1628 }
1629 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1630 }
1631 };
1632
1633 let client_ca_pem = match &config.grpc_tls_client_ca {
1634 Some(path) => Some(
1635 std::fs::read(path)
1636 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1637 ),
1638 None => None,
1639 };
1640
1641 Ok(crate::GrpcTlsOptions {
1642 cert_pem,
1643 key_pem,
1644 client_ca_pem,
1645 })
1646}
1647
1648fn spawn_grpc_tls_listener_if_configured(
1652 config: &ServerCommandConfig,
1653 runtime: RedDBRuntime,
1654 auth_store: Arc<AuthStore>,
1655) {
1656 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1657 return;
1658 };
1659 let tls_opts = match resolve_grpc_tls_options(config) {
1660 Ok(opts) => opts,
1661 Err(err) => {
1662 tracing::error!(
1663 target: "reddb::security",
1664 transport = "grpc",
1665 err = %err,
1666 "gRPC TLS config error; TLS listener will not start"
1667 );
1668 return;
1669 }
1670 };
1671 tokio::spawn(async move {
1672 let server = RedDBGrpcServer::with_options(
1673 runtime,
1674 GrpcServerOptions {
1675 bind_addr: tls_bind.clone(),
1676 tls: Some(tls_opts),
1677 },
1678 auth_store,
1679 );
1680 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1681 if let Err(err) = server.serve().await {
1682 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1683 }
1684 });
1685}
1686
1687fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1690 use sha2::{Digest, Sha256};
1691 let mut h = Sha256::new();
1692 h.update(pem);
1693 let d = h.finalize();
1694 let mut buf = String::with_capacity(64);
1695 for b in d.iter() {
1696 buf.push_str(&format!("{b:02x}"));
1697 }
1698 buf
1699}
1700
1701fn resolve_wire_tls_config(
1703 config: &ServerCommandConfig,
1704) -> Result<crate::wire::WireTlsConfig, String> {
1705 match (&config.wire_tls_cert, &config.wire_tls_key) {
1706 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1707 cert_path: cert.clone(),
1708 key_path: key.clone(),
1709 }),
1710 _ => {
1711 let dir = config
1713 .path
1714 .as_ref()
1715 .and_then(|p| p.parent())
1716 .map(PathBuf::from)
1717 .unwrap_or_else(|| PathBuf::from("."));
1718 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1719 }
1720 }
1721}
1722
1723#[inline(never)]
1724fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1725 let rt_config = detect_runtime_config();
1726 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1727 let cli_telemetry = config.telemetry.clone();
1728 let db_options = config.to_db_options()?;
1729 let mut transport_readiness = TransportReadiness::default();
1730
1731 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1732 .enable_all()
1733 .worker_threads(workers)
1734 .thread_stack_size(rt_config.stack_size)
1735 .build()
1736 .map_err(|err| format!("tokio runtime: {err}"))?;
1737
1738 let (runtime, _auth_store, _telemetry_guard) =
1742 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1743 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1744 let signal_runtime = runtime.clone();
1745 tokio_runtime.block_on(async move {
1746 spawn_lifecycle_signal_handler(signal_runtime).await;
1747 spawn_pg_listener(&config, &runtime);
1748 let wire_rt = Arc::new(runtime);
1749 let listener = tokio::net::TcpListener::bind(&wire_addr)
1750 .await
1751 .map_err(|err| {
1752 let reason = format!("wire listener bind {wire_addr}: {err}");
1753 transport_readiness.failed(
1754 "wire",
1755 &wire_addr,
1756 config.wire_bind_explicit,
1757 reason.clone(),
1758 );
1759 if config.wire_bind_explicit {
1760 format!("explicit {reason}")
1761 } else {
1762 reason
1763 }
1764 })?;
1765 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1766 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1767 .await
1768 .map_err(|e| e.to_string())
1769 })
1770}
1771
1772#[inline(never)]
1773fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1774 let rt_config = detect_runtime_config();
1775 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1776 let cli_telemetry = config.telemetry.clone();
1777 let db_options = config.to_db_options()?;
1778
1779 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1780 .enable_all()
1781 .worker_threads(workers)
1782 .thread_stack_size(rt_config.stack_size)
1783 .build()
1784 .map_err(|err| format!("tokio runtime: {err}"))?;
1785
1786 let (runtime, _auth_store, _telemetry_guard) =
1787 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1788 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1789 let signal_runtime = runtime.clone();
1790 tokio_runtime.block_on(async move {
1791 spawn_lifecycle_signal_handler(signal_runtime).await;
1792 let cfg = crate::wire::PgWireConfig {
1793 bind_addr: pg_addr,
1794 ..Default::default()
1795 };
1796 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1797 .await
1798 .map_err(|e| e.to_string())
1799 })
1800}
1801
1802#[inline(never)]
1803fn build_runtime_and_auth_store(
1804 db_options: RedDBOptions,
1805 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1806) -> Result<
1807 (
1808 RedDBRuntime,
1809 Arc<AuthStore>,
1810 Option<crate::telemetry::TelemetryGuard>,
1811 ),
1812 String,
1813> {
1814 build_runtime_with_telemetry(db_options, cli_telemetry)
1821}
1822
1823pub(crate) fn build_runtime_with_telemetry(
1833 db_options: RedDBOptions,
1834 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1835) -> Result<
1836 (
1837 RedDBRuntime,
1838 Arc<AuthStore>,
1839 Option<crate::telemetry::TelemetryGuard>,
1840 ),
1841 String,
1842> {
1843 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1844 let msg = err.to_string();
1850 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1851 phase: "runtime_construction".to_string(),
1852 error: msg.clone(),
1853 }
1854 .emit_global();
1855 msg
1856 })?;
1857
1858 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1863 let msg = err.to_string();
1864 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1865 phase: "lease_loop".to_string(),
1866 error: msg.clone(),
1867 }
1868 .emit_global();
1869 msg
1870 })?;
1871
1872 if let Some(data_path) = db_options.data_path.as_deref() {
1876 let watch_dir = data_path.parent().unwrap_or(data_path);
1877 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1878 }
1879
1880 {
1884 let config_path = crate::runtime::config_overlay::config_file_path();
1885 let store = runtime.db().store();
1886 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1887 }
1888
1889 let merged = merge_telemetry_with_config(
1892 cli_telemetry
1893 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1894 &runtime,
1895 );
1896 let telemetry_guard = crate::telemetry::init(merged);
1897
1898 let auth_store =
1899 if db_options.auth.vault_enabled {
1900 let pager =
1901 runtime.db().store().pager().cloned().ok_or_else(|| {
1902 "vault requires a paged database (persistent mode)".to_string()
1903 })?;
1904 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1905 .map_err(|err| err.to_string())?;
1906 Arc::new(store)
1907 } else {
1908 Arc::new(AuthStore::new(db_options.auth.clone()))
1909 };
1910 auth_store.bootstrap_from_env();
1911
1912 {
1914 let store = Arc::clone(&auth_store);
1915 std::thread::Builder::new()
1916 .name("reddb-session-purge".into())
1917 .spawn(move || loop {
1918 std::thread::sleep(std::time::Duration::from_secs(300));
1919 store.purge_expired_sessions();
1920 })
1921 .ok();
1922 }
1923
1924 Ok((runtime, auth_store, telemetry_guard))
1925}
1926
1927fn merge_telemetry_with_config(
1938 mut cli: crate::telemetry::TelemetryConfig,
1939 runtime: &RedDBRuntime,
1940) -> crate::telemetry::TelemetryConfig {
1941 use crate::storage::schema::Value;
1942
1943 let store = runtime.db().store();
1944
1945 if !cli.level_explicit {
1946 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1947 cli.level_filter = v.to_string();
1948 }
1949 }
1950 if !cli.format_explicit {
1951 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1952 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1953 cli.format = parsed;
1954 }
1955 }
1956 }
1957 if !cli.rotation_keep_days_explicit {
1958 match store.get_config("red.logging.keep_days") {
1959 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1960 cli.rotation_keep_days = n as u16
1961 }
1962 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1963 cli.rotation_keep_days = n as u16
1964 }
1965 Some(Value::Text(v)) => {
1966 if let Ok(n) = v.parse::<u16>() {
1967 cli.rotation_keep_days = n;
1968 }
1969 }
1970 _ => {}
1971 }
1972 }
1973 if !cli.file_prefix_explicit {
1974 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1975 if !v.is_empty() {
1976 cli.file_prefix = v.to_string();
1977 }
1978 }
1979 }
1980 if !cli.log_dir_explicit && !cli.log_file_disabled {
1983 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1984 if !v.is_empty() {
1985 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1986 }
1987 }
1988 }
1989
1990 cli
1991}
1992
1993#[cfg(test)]
1994mod telemetry_merge_tests {
1995 use super::*;
1996 use crate::telemetry::{LogFormat, TelemetryConfig};
1997
1998 fn fresh_runtime() -> RedDBRuntime {
1999 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2000 }
2001
2002 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2003 runtime
2004 .db()
2005 .store()
2006 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2007 }
2008
2009 fn cli_base() -> TelemetryConfig {
2010 TelemetryConfig {
2013 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2014 format: LogFormat::Json,
2015 ..Default::default()
2016 }
2017 }
2018
2019 #[test]
2020 fn config_log_dir_promoted_when_flag_absent() {
2021 let runtime = fresh_runtime();
2022 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2023 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2024 assert_eq!(
2025 merged.log_dir.as_deref(),
2026 Some(std::path::Path::new("/var/log/reddb"))
2027 );
2028 }
2029
2030 #[test]
2031 fn explicit_log_dir_wins_over_config() {
2032 let runtime = fresh_runtime();
2033 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2034 let mut cli = cli_base();
2035 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2036 cli.log_dir_explicit = true;
2037 let merged = merge_telemetry_with_config(cli, &runtime);
2038 assert_eq!(
2039 merged.log_dir.as_deref(),
2040 Some(std::path::Path::new("/custom/dir"))
2041 );
2042 }
2043
2044 #[test]
2045 fn no_log_file_beats_config_log_dir() {
2046 let runtime = fresh_runtime();
2047 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2048 let mut cli = cli_base();
2049 cli.log_dir = None;
2050 cli.log_file_disabled = true;
2051 let merged = merge_telemetry_with_config(cli, &runtime);
2052 assert!(
2053 merged.log_dir.is_none(),
2054 "--no-log-file must veto config dir"
2055 );
2056 }
2057
2058 #[test]
2059 fn config_format_promoted_on_non_tty_default() {
2060 let runtime = fresh_runtime();
2064 set_str(&runtime, "red.logging.format", "pretty");
2065 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2066 assert_eq!(merged.format, LogFormat::Pretty);
2067 }
2068
2069 #[test]
2070 fn explicit_format_wins_over_config() {
2071 let runtime = fresh_runtime();
2072 set_str(&runtime, "red.logging.format", "pretty");
2073 let mut cli = cli_base();
2074 cli.format = LogFormat::Json;
2075 cli.format_explicit = true;
2076 let merged = merge_telemetry_with_config(cli, &runtime);
2077 assert_eq!(merged.format, LogFormat::Json);
2078 }
2079}
2080
2081#[inline(never)]
2082fn build_http_server(
2083 runtime: RedDBRuntime,
2084 auth_store: Arc<AuthStore>,
2085 bind_addr: String,
2086) -> RedDBServer {
2087 build_http_server_with_transport_readiness(
2088 runtime,
2089 auth_store,
2090 bind_addr,
2091 TransportReadiness::default(),
2092 )
2093}
2094
2095fn apply_http_limits(
2101 server: RedDBServer,
2102 config: &ServerCommandConfig,
2103 runtime: &RedDBRuntime,
2104) -> RedDBServer {
2105 let store = runtime.db().store();
2106 let resolved =
2107 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2108 .get_config(key)
2109 {
2110 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2111 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2112 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2113 _ => None,
2114 });
2115 tracing::info!(
2116 target: "reddb::http_limits",
2117 max_handlers = resolved.max_handlers,
2118 handler_timeout_ms = resolved.handler_timeout_ms,
2119 retry_after_secs = resolved.retry_after_secs,
2120 "http_limits resolved"
2121 );
2122 server.with_http_limits(resolved)
2123}
2124
2125#[inline(never)]
2126fn build_http_server_with_transport_readiness(
2127 runtime: RedDBRuntime,
2128 auth_store: Arc<AuthStore>,
2129 bind_addr: String,
2130 transport_readiness: TransportReadiness,
2131) -> RedDBServer {
2132 RedDBServer::with_options(
2133 runtime,
2134 ServerOptions {
2135 bind_addr,
2136 transport_readiness,
2137 ..ServerOptions::default()
2138 },
2139 )
2140 .with_auth(auth_store)
2141}
2142
2143#[inline(never)]
2147fn build_admin_only_server(
2148 runtime: RedDBRuntime,
2149 auth_store: Arc<AuthStore>,
2150 bind_addr: String,
2151) -> RedDBServer {
2152 RedDBServer::with_options(
2153 runtime,
2154 ServerOptions {
2155 bind_addr,
2156 surface: crate::server::ServerSurface::AdminOnly,
2157 ..ServerOptions::default()
2158 },
2159 )
2160 .with_auth(auth_store)
2161}
2162
2163#[inline(never)]
2167fn build_metrics_only_server(
2168 runtime: RedDBRuntime,
2169 auth_store: Arc<AuthStore>,
2170 bind_addr: String,
2171) -> RedDBServer {
2172 RedDBServer::with_options(
2173 runtime,
2174 ServerOptions {
2175 bind_addr,
2176 surface: crate::server::ServerSurface::MetricsOnly,
2177 ..ServerOptions::default()
2178 },
2179 )
2180 .with_auth(auth_store)
2181}
2182
2183fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2187 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2188 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2189 let _ = server.serve_in_background();
2190 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2191 }
2192 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2193 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2194 let _ = server.serve_in_background();
2195 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2196 }
2197}
2198
2199#[inline(never)]
2200fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2201 let cli_telemetry = config.telemetry.clone();
2202 let mut transport_readiness = TransportReadiness::default();
2203 let Some(listener) = bind_listener_for_startup(
2204 &mut transport_readiness,
2205 "http",
2206 &bind_addr,
2207 config.http_bind_explicit,
2208 )?
2209 else {
2210 return Err(format!(
2211 "no HTTP listener started; implicit bind {} failed",
2212 bind_addr
2213 ));
2214 };
2215 let db_options = config.to_db_options()?;
2216 let (runtime, auth_store, _telemetry_guard) =
2217 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2218 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2219 spawn_admin_metrics_listeners(&runtime, &auth_store);
2220 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2221 let server = build_http_server_with_transport_readiness(
2222 runtime.clone(),
2223 auth_store,
2224 bind_addr.clone(),
2225 transport_readiness,
2226 );
2227 let server = apply_http_limits(server, &config, &runtime);
2228 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2229 server.serve_on(listener).map_err(|err| err.to_string())
2230}
2231
2232fn spawn_http_tls_listener(
2238 config: &ServerCommandConfig,
2239 runtime: &RedDBRuntime,
2240 auth_store: &Arc<AuthStore>,
2241) -> Result<(), String> {
2242 let Some(addr) = config.http_tls_bind_addr.clone() else {
2243 return Ok(());
2244 };
2245
2246 let tls_config = resolve_http_tls_config(config)?;
2247 let server_config = crate::server::tls::build_server_config(&tls_config)
2248 .map_err(|err| format!("HTTP TLS: {err}"))?;
2249
2250 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2251 let server = apply_http_limits(server, config, runtime);
2252 let _handle = server.serve_tls_in_background(server_config);
2253 tracing::info!(
2254 transport = "https",
2255 bind = %addr,
2256 mtls = %tls_config.client_ca_path.is_some(),
2257 "TLS listener online"
2258 );
2259 Ok(())
2260}
2261
2262fn resolve_http_tls_config(
2264 config: &ServerCommandConfig,
2265) -> Result<crate::server::tls::HttpTlsConfig, String> {
2266 match (&config.http_tls_cert, &config.http_tls_key) {
2267 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2268 cert_path: cert.clone(),
2269 key_path: key.clone(),
2270 client_ca_path: config.http_tls_client_ca.clone(),
2271 }),
2272 (None, None) => {
2273 let dir = config
2275 .path
2276 .as_ref()
2277 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2278 .unwrap_or_else(|| std::path::PathBuf::from("."));
2279 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2280 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2281 Ok(crate::server::tls::HttpTlsConfig {
2282 cert_path: auto.cert_path,
2283 key_path: auto.key_path,
2284 client_ca_path: config.http_tls_client_ca.clone(),
2285 })
2286 }
2287 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2288 }
2289}
2290
2291#[inline(never)]
2292fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2293 let workers = config.workers;
2294 let cli_telemetry = config.telemetry.clone();
2295 let db_options = config.to_db_options()?;
2296 let rt_config = detect_runtime_config();
2297 let mut transport_readiness = TransportReadiness::default();
2298 let Some(grpc_listener) = bind_listener_for_startup(
2299 &mut transport_readiness,
2300 "grpc",
2301 &bind_addr,
2302 config.grpc_bind_explicit,
2303 )?
2304 else {
2305 return Err(format!(
2306 "no gRPC listener started; implicit bind {} failed",
2307 bind_addr
2308 ));
2309 };
2310
2311 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2312
2313 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2314 .enable_all()
2315 .worker_threads(worker_threads)
2316 .thread_stack_size(rt_config.stack_size)
2317 .build()
2318 .map_err(|err| format!("tokio runtime: {err}"))?;
2319
2320 let (runtime, auth_store, _telemetry_guard) =
2322 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2323 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2324 let signal_runtime = runtime.clone();
2325 tokio_runtime.block_on(async move {
2326 spawn_lifecycle_signal_handler(signal_runtime).await;
2327 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2329
2330 spawn_pg_listener(&config, &runtime);
2332
2333 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2337
2338 let server = RedDBGrpcServer::with_options(
2339 runtime,
2340 GrpcServerOptions {
2341 bind_addr: bind_addr.clone(),
2342 tls: None,
2343 },
2344 auth_store,
2345 );
2346
2347 tracing::info!(
2348 transport = "grpc",
2349 bind = %bind_addr,
2350 cpus = rt_config.available_cpus,
2351 workers = worker_threads,
2352 "listener online"
2353 );
2354 server
2355 .serve_on(grpc_listener)
2356 .await
2357 .map_err(|err| err.to_string())
2358 })
2359}
2360
2361#[inline(never)]
2362fn run_dual_server(
2363 config: ServerCommandConfig,
2364 grpc_bind_addr: String,
2365 http_bind_addr: String,
2366) -> Result<(), String> {
2367 let workers = config.workers;
2368 let cli_telemetry = config.telemetry.clone();
2369 let db_options = config.to_db_options()?;
2370 let rt_config = detect_runtime_config();
2371 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2372 let mut transport_readiness = TransportReadiness::default();
2373 let http_listener = bind_listener_for_startup(
2374 &mut transport_readiness,
2375 "http",
2376 &http_bind_addr,
2377 config.http_bind_explicit,
2378 )?;
2379 let grpc_listener = bind_listener_for_startup(
2380 &mut transport_readiness,
2381 "grpc",
2382 &grpc_bind_addr,
2383 config.grpc_bind_explicit,
2384 )?;
2385 if http_listener.is_none() && grpc_listener.is_none() {
2386 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2387 }
2388 let (runtime, auth_store, _telemetry_guard) =
2389 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2390 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2391
2392 spawn_admin_metrics_listeners(&runtime, &auth_store);
2393 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2394
2395 let http_handle = if let Some(listener) = http_listener {
2396 let http_server = build_http_server_with_transport_readiness(
2397 runtime.clone(),
2398 auth_store.clone(),
2399 http_bind_addr.clone(),
2400 transport_readiness.clone(),
2401 );
2402 let http_server = apply_http_limits(http_server, &config, &runtime);
2403 Some(http_server.serve_in_background_on(listener))
2404 } else {
2405 None
2406 };
2407
2408 thread::sleep(Duration::from_millis(150));
2409 if let Some(handle) = http_handle.as_ref() {
2410 if handle.is_finished() {
2411 let handle = http_handle.unwrap();
2412 return match handle.join() {
2413 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2414 Ok(Err(err)) => Err(err.to_string()),
2415 Err(_) => Err("HTTP server thread panicked".to_string()),
2416 };
2417 }
2418 }
2419 if grpc_listener.is_none() {
2420 let Some(handle) = http_handle else {
2421 return Err("no listener started".to_string());
2422 };
2423 return match handle.join() {
2424 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2425 Ok(Err(err)) => Err(err.to_string()),
2426 Err(_) => Err("HTTP server thread panicked".to_string()),
2427 };
2428 }
2429 let grpc_listener = grpc_listener.expect("checked above");
2430
2431 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2432 .enable_all()
2433 .worker_threads(worker_threads)
2434 .thread_stack_size(rt_config.stack_size)
2435 .build()
2436 .map_err(|err| format!("tokio runtime: {err}"))?;
2437
2438 let signal_runtime = runtime.clone();
2439 tokio_runtime.block_on(async move {
2440 spawn_lifecycle_signal_handler(signal_runtime).await;
2441 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2443
2444 spawn_pg_listener(&config, &runtime);
2446
2447 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2449
2450 let server = RedDBGrpcServer::with_options(
2451 runtime,
2452 GrpcServerOptions {
2453 bind_addr: grpc_bind_addr.clone(),
2454 tls: None,
2455 },
2456 auth_store,
2457 );
2458
2459 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2460 tracing::info!(
2461 transport = "grpc",
2462 bind = %grpc_bind_addr,
2463 cpus = rt_config.available_cpus,
2464 workers = worker_threads,
2465 "listener online"
2466 );
2467 server
2468 .serve_on(grpc_listener)
2469 .await
2470 .map_err(|err| err.to_string())
2471 })
2472}
2473
2474#[cfg(test)]
2475mod tests {
2476 use super::*;
2477
2478 #[test]
2479 fn render_systemd_unit_contains_expected_execstart() {
2480 let config = SystemdServiceConfig {
2481 service_name: "reddb".to_string(),
2482 binary_path: PathBuf::from("/usr/local/bin/red"),
2483 run_user: "reddb".to_string(),
2484 run_group: "reddb".to_string(),
2485 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2486 router_bind_addr: None,
2487 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2488 http_bind_addr: None,
2489 };
2490
2491 let unit = render_systemd_unit(&config);
2492 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2493 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2494 }
2495
2496 #[test]
2497 fn systemd_service_config_derives_paths() {
2498 let config = SystemdServiceConfig {
2499 service_name: "reddb-api".to_string(),
2500 binary_path: PathBuf::from("/usr/local/bin/red"),
2501 run_user: "reddb".to_string(),
2502 run_group: "reddb".to_string(),
2503 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2504 router_bind_addr: None,
2505 grpc_bind_addr: None,
2506 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2507 };
2508
2509 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2510 assert_eq!(
2511 config.unit_path(),
2512 PathBuf::from("/etc/systemd/system/reddb-api.service")
2513 );
2514 }
2515
2516 #[test]
2517 fn render_systemd_unit_supports_dual_transport() {
2518 let config = SystemdServiceConfig {
2519 service_name: "reddb".to_string(),
2520 binary_path: PathBuf::from("/usr/local/bin/red"),
2521 run_user: "reddb".to_string(),
2522 run_group: "reddb".to_string(),
2523 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2524 router_bind_addr: None,
2525 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2526 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2527 };
2528
2529 let unit = render_systemd_unit(&config);
2530 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2531 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2532 }
2533
2534 #[test]
2535 fn render_systemd_unit_supports_router_mode() {
2536 let config = SystemdServiceConfig {
2537 service_name: "reddb".to_string(),
2538 binary_path: PathBuf::from("/usr/local/bin/red"),
2539 run_user: "reddb".to_string(),
2540 run_group: "reddb".to_string(),
2541 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2542 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2543 grpc_bind_addr: None,
2544 http_bind_addr: None,
2545 };
2546
2547 let unit = render_systemd_unit(&config);
2548 assert!(unit.contains("--bind 127.0.0.1:5050"));
2549 assert!(!unit.contains("--grpc-bind"));
2550 assert!(!unit.contains("--http-bind"));
2551 }
2552
2553 #[test]
2554 fn explicit_bind_collision_is_fatal() {
2555 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2556 let addr = held.local_addr().expect("held addr").to_string();
2557 let mut readiness = TransportReadiness::default();
2558
2559 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2560
2561 assert!(error.contains("explicit http listener bind"));
2562 assert_eq!(readiness.active.len(), 0);
2563 assert_eq!(readiness.failed.len(), 1);
2564 assert!(readiness.failed[0].explicit);
2565 assert_eq!(readiness.failed[0].bind_addr, addr);
2566 }
2567
2568 #[test]
2569 fn implicit_bind_collision_degrades() {
2570 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2571 let addr = held.local_addr().expect("held addr").to_string();
2572 let mut readiness = TransportReadiness::default();
2573
2574 let listener =
2575 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2576
2577 assert!(listener.is_none());
2578 assert_eq!(readiness.active.len(), 0);
2579 assert_eq!(readiness.failed.len(), 1);
2580 assert!(!readiness.failed[0].explicit);
2581 assert_eq!(readiness.failed[0].bind_addr, addr);
2582 }
2583}