1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub router_bind_explicit: bool,
71 pub grpc_bind_addr: Option<String>,
72 pub grpc_bind_explicit: bool,
73 pub grpc_tls_bind_addr: Option<String>,
77 pub grpc_tls_cert: Option<PathBuf>,
83 pub grpc_tls_key: Option<PathBuf>,
86 pub grpc_tls_client_ca: Option<PathBuf>,
91 pub http_bind_addr: Option<String>,
92 pub http_bind_explicit: bool,
93 pub http_tls_bind_addr: Option<String>,
97 pub http_tls_cert: Option<PathBuf>,
100 pub http_tls_key: Option<PathBuf>,
103 pub http_tls_client_ca: Option<PathBuf>,
107 pub wire_bind_addr: Option<String>,
108 pub wire_bind_explicit: bool,
109 pub wire_tls_bind_addr: Option<String>,
111 pub wire_tls_cert: Option<PathBuf>,
113 pub wire_tls_key: Option<PathBuf>,
115 pub pg_bind_addr: Option<String>,
119 pub create_if_missing: bool,
120 pub read_only: bool,
121 pub role: String,
122 pub primary_addr: Option<String>,
123 pub vault: bool,
124 pub no_auth: bool,
135 pub workers: Option<usize>,
137 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
140 pub http_limits_cli: crate::server::HttpLimitsCliInput,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct TransportListenerState {
149 pub transport: String,
150 pub bind_addr: String,
151 pub explicit: bool,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct TransportListenerFailure {
156 pub transport: String,
157 pub bind_addr: String,
158 pub explicit: bool,
159 pub reason: String,
160}
161
162#[derive(Debug, Clone, Default, PartialEq, Eq)]
163pub struct TransportReadiness {
164 pub active: Vec<TransportListenerState>,
165 pub failed: Vec<TransportListenerFailure>,
166}
167
168impl TransportReadiness {
169 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
170 self.active.push(TransportListenerState {
171 transport: transport.to_string(),
172 bind_addr: bind_addr.to_string(),
173 explicit,
174 });
175 }
176
177 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
178 self.failed.push(TransportListenerFailure {
179 transport: transport.to_string(),
180 bind_addr: bind_addr.to_string(),
181 explicit,
182 reason,
183 });
184 }
185}
186
187#[derive(Debug, Clone)]
188pub struct SystemdServiceConfig {
189 pub service_name: String,
190 pub binary_path: PathBuf,
191 pub run_user: String,
192 pub run_group: String,
193 pub data_path: PathBuf,
194 pub router_bind_addr: Option<String>,
195 pub grpc_bind_addr: Option<String>,
196 pub http_bind_addr: Option<String>,
197}
198
199impl SystemdServiceConfig {
200 pub fn data_dir(&self) -> PathBuf {
201 self.data_path
202 .parent()
203 .map(PathBuf::from)
204 .unwrap_or_else(|| PathBuf::from("."))
205 }
206
207 pub fn unit_path(&self) -> PathBuf {
208 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
209 }
210}
211
212pub fn default_telemetry_for_path(
217 path: Option<&std::path::Path>,
218) -> crate::telemetry::TelemetryConfig {
219 let log_dir = match path {
220 Some(p) => p
221 .parent()
222 .map(|parent| parent.join("logs"))
223 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
224 None => None, };
226 crate::telemetry::TelemetryConfig {
227 log_dir,
228 file_prefix: "reddb.log".to_string(),
229 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
230 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
231 crate::telemetry::LogFormat::Pretty
232 } else {
233 crate::telemetry::LogFormat::Json
234 },
235 rotation_keep_days: 14,
236 service_name: "reddb",
237 level_explicit: false,
239 format_explicit: false,
240 rotation_keep_days_explicit: false,
241 file_prefix_explicit: false,
242 log_dir_explicit: false,
243 log_file_disabled: false,
244 }
245}
246
247const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
254const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
255const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
256const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
260
261pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
269
270pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
273 options
274 .metadata
275 .get(NO_AUTH_META)
276 .map(|v| v == "true")
277 .unwrap_or(false)
278}
279
280const NO_AUTH_WARNING: &str =
285 "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
286
287impl ServerCommandConfig {
288 fn to_db_options(&self) -> Result<RedDBOptions, String> {
289 let mut options = match &self.path {
290 Some(path) => RedDBOptions::persistent(path),
291 None => RedDBOptions::in_memory(),
292 };
293
294 options.mode = StorageMode::Persistent;
295 options.create_if_missing = self.create_if_missing;
296 options.read_only = self.read_only
303 || env_nonempty("RED_READONLY")
304 .or_else(|| env_nonempty("REDDB_READONLY"))
305 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
306 .unwrap_or(false)
307 || self.path.as_ref().is_some_and(|data_path| {
308 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
309 data_path,
310 ))
311 .unwrap_or(false)
312 });
313
314 options.replication = match self.role.as_str() {
315 "primary" => ReplicationConfig::primary(),
316 "replica" => {
317 let primary_addr = self
318 .primary_addr
319 .clone()
320 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
321 ReplicationConfig::replica(primary_addr)
328 }
329 _ => ReplicationConfig::standalone(),
330 };
331
332 if self.vault {
333 options.auth.vault_enabled = true;
334 }
335
336 if self.no_auth {
347 options.auth.enabled = false;
348 options.auth.require_auth = false;
349 options.auth.vault_enabled = false;
350 options
351 .metadata
352 .insert(NO_AUTH_META.to_string(), "true".to_string());
353 }
354
355 if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
360 options.control_events.compliance_mode = matches!(
361 raw.to_ascii_lowercase().as_str(),
362 "1" | "true" | "yes" | "on"
363 );
364 }
365 if env_nonempty(PRESET_ENV).is_some_and(|s| s.trim() == PRESET_REGULATED) {
366 options.control_events.compliance_mode = true;
367 options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
368 }
369
370 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
375 Err(msg) => {
376 return Err(format!("backup bootstrap: {msg}"));
377 }
378 Ok(Some(cfg)) => {
379 apply_backup_config(&mut options, &cfg);
380 }
381 Ok(None) => {
382 configure_remote_backend_from_env(&mut options);
383 }
384 }
385
386 Ok(options)
387 }
388
389 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
390 let mut transports = Vec::with_capacity(3);
391 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
392 transports.push(ServerTransport::Grpc);
393 }
394 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
395 transports.push(ServerTransport::Http);
396 }
397 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
398 transports.push(ServerTransport::Wire);
399 }
400 transports
401 }
402}
403
404fn env_nonempty(name: &str) -> Option<String> {
409 crate::utils::env_with_file_fallback(name)
410}
411
412fn env_truthy(name: &str) -> bool {
413 env_nonempty(name)
414 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
415 .unwrap_or(false)
416}
417
418fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
424 let endpoint_host = endpoint_host(&cfg.endpoint);
425
426 options.metadata.insert(
427 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
428 cfg.checkpoint_interval_secs.to_string(),
429 );
430 options.metadata.insert(
431 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
432 cfg.wal_flush_interval_secs.to_string(),
433 );
434 options
435 .metadata
436 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
437 options.metadata.insert(
438 BACKUP_PAUSE_ON_LAG_META.to_string(),
439 cfg.pause_on_lag_secs.to_string(),
440 );
441
442 #[cfg(feature = "backend-s3")]
443 {
444 let s3_cfg = crate::storage::backend::S3Config {
445 endpoint: cfg.endpoint.clone(),
446 bucket: cfg.bucket.clone(),
447 key_prefix: cfg.prefix.clone(),
448 access_key: cfg.access_key_id.clone(),
449 secret_key: cfg.secret_access_key.clone(),
450 region: cfg.region.clone(),
451 path_style: true,
452 };
453 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
454 options.remote_backend = Some(backend.clone());
455 options.remote_backend_atomic = Some(backend);
456 let trimmed = cfg.prefix.trim_end_matches('/');
461 options.remote_key = Some(format!("{}/data.rdb", trimmed));
462
463 tracing::info!(
464 backend = "s3",
465 endpoint = %endpoint_host,
466 bucket = %cfg.bucket,
467 prefix = %cfg.prefix,
468 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
469 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
470 "backup backend configured from REDDB_BACKUP_* env"
471 );
472 }
473
474 #[cfg(not(feature = "backend-s3"))]
475 {
476 tracing::warn!(
477 backend = "s3",
478 endpoint = %endpoint_host,
479 bucket = %cfg.bucket,
480 prefix = %cfg.prefix,
481 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
482 backend wiring skipped (archiver/checkpointer also disabled)"
483 );
484 }
485}
486
487fn endpoint_host(endpoint: &str) -> &str {
488 let after_scheme = endpoint
489 .split_once("://")
490 .map(|(_, r)| r)
491 .unwrap_or(endpoint);
492 after_scheme.split('/').next().unwrap_or(after_scheme)
493}
494
495fn spawn_backup_tasks_if_configured(
501 options: &RedDBOptions,
502 runtime: &RedDBRuntime,
503) -> Option<BackupTasksHandle> {
504 let checkpoint_secs: u64 = options
505 .metadata
506 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
507 .parse()
508 .ok()?;
509 let wal_secs: u64 = options
510 .metadata
511 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
512 .parse()
513 .ok()?;
514 let pause_on_lag_secs: u64 = options
517 .metadata
518 .get(BACKUP_PAUSE_ON_LAG_META)
519 .and_then(|raw| raw.parse().ok())
520 .unwrap_or(0);
521 options.remote_backend.as_ref()?;
522
523 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
524
525 if pause_on_lag_secs > 0 {
530 let now_ms = std::time::SystemTime::now()
531 .duration_since(std::time::UNIX_EPOCH)
532 .map(|d| d.as_millis() as u64)
533 .unwrap_or(0);
534 runtime
535 .write_gate()
536 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
537 tracing::info!(
538 pause_on_lag_secs,
539 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
540 );
541 }
542
543 let checkpoint_handle = {
544 let stop = Arc::clone(&stop);
545 let runtime = runtime.clone();
546 let interval = Duration::from_secs(checkpoint_secs);
547 thread::Builder::new()
548 .name("red-checkpointer".into())
549 .spawn(move || {
550 periodic_loop(stop, interval, move || {
551 if let Err(err) = runtime.checkpoint() {
552 tracing::warn!(error = %err, "periodic checkpoint failed");
553 }
554 })
555 })
556 .ok()
557 };
558
559 let archiver_handle = {
560 let stop = Arc::clone(&stop);
561 let runtime = runtime.clone();
562 let interval = Duration::from_secs(wal_secs);
563 let lag_enabled = pause_on_lag_secs > 0;
564 thread::Builder::new()
565 .name("red-wal-archiver".into())
566 .spawn(move || {
567 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
568 Ok(_) if lag_enabled => {
569 let now_ms = std::time::SystemTime::now()
570 .duration_since(std::time::UNIX_EPOCH)
571 .map(|d| d.as_millis() as u64)
572 .unwrap_or(0);
573 runtime.write_gate().record_archive_success(now_ms);
574 runtime.write_gate().evaluate_archive_lag(now_ms);
578 }
579 Ok(_) => {}
580 Err(err) => {
581 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
582 }
583 })
584 })
585 .ok()
586 };
587
588 let lag_monitor_handle = if pause_on_lag_secs > 0 {
593 let stop = Arc::clone(&stop);
594 let runtime = runtime.clone();
595 let interval = Duration::from_secs(5);
599 thread::Builder::new()
600 .name("red-archive-lag-monitor".into())
601 .spawn(move || {
602 periodic_loop(stop, interval, move || {
603 let now_ms = std::time::SystemTime::now()
604 .duration_since(std::time::UNIX_EPOCH)
605 .map(|d| d.as_millis() as u64)
606 .unwrap_or(0);
607 let was_paused = runtime.write_gate().is_auto_paused();
608 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
609 if now_paused && !was_paused {
610 tracing::warn!(
611 pause_on_lag_secs,
612 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
613 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
614 );
615 } else if !now_paused && was_paused {
616 tracing::info!(
617 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
618 );
619 }
620 })
621 })
622 .ok()
623 } else {
624 None
625 };
626
627 tracing::info!(
628 checkpoint_interval_secs = checkpoint_secs,
629 wal_flush_interval_secs = wal_secs,
630 "backup tasks spawned (checkpointer + WAL archiver)"
631 );
632
633 Some(BackupTasksHandle {
634 stop,
635 _checkpoint_handle: checkpoint_handle,
636 _archiver_handle: archiver_handle,
637 _lag_monitor_handle: lag_monitor_handle,
638 })
639}
640
641pub struct BackupTasksHandle {
644 stop: Arc<std::sync::atomic::AtomicBool>,
645 _checkpoint_handle: Option<thread::JoinHandle<()>>,
646 _archiver_handle: Option<thread::JoinHandle<()>>,
647 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
650}
651
652impl Drop for BackupTasksHandle {
653 fn drop(&mut self) {
654 self.stop.store(true, std::sync::atomic::Ordering::Release);
655 }
656}
657
658fn periodic_loop<F: FnMut()>(
659 stop: Arc<std::sync::atomic::AtomicBool>,
660 interval: Duration,
661 mut tick: F,
662) {
663 let wake = Duration::from_secs(1);
666 let mut elapsed = Duration::ZERO;
667 while !stop.load(std::sync::atomic::Ordering::Acquire) {
668 thread::sleep(wake);
669 elapsed += wake;
670 if elapsed >= interval {
671 tick();
672 elapsed = Duration::ZERO;
673 }
674 }
675}
676
677fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
678 let backend = env_nonempty("RED_BACKEND")
684 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
685 .unwrap_or_else(|| "none".to_string())
686 .to_ascii_lowercase();
687
688 match backend.as_str() {
689 "s3" | "minio" | "r2" => {
694 #[cfg(feature = "backend-s3")]
695 {
696 if let Some(config) = s3_config_from_env() {
697 let remote_key = env_nonempty("RED_REMOTE_KEY")
698 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
699 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
700 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
701 options.remote_backend = Some(backend.clone());
702 options.remote_backend_atomic = Some(backend);
703 options.remote_key = Some(remote_key);
704 }
705 }
706 #[cfg(not(feature = "backend-s3"))]
707 {
708 tracing::warn!(
709 backend = %backend,
710 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
711 );
712 }
713 }
714 "fs" | "local" => {
719 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
720 let remote_key = match (
721 base_path,
722 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
723 ) {
724 (Some(base), Some(rel)) => Some(format!(
725 "{}/{}",
726 base.trim_end_matches('/'),
727 rel.trim_start_matches('/')
728 )),
729 (Some(base), None) => Some(format!(
730 "{}/clusters/dev/data.rdb",
731 base.trim_end_matches('/')
732 )),
733 (None, Some(rel)) => Some(rel),
734 (None, None) => None,
735 };
736 if let Some(key) = remote_key {
737 let backend = Arc::new(crate::storage::backend::LocalBackend);
738 options.remote_backend = Some(backend.clone());
739 options.remote_backend_atomic = Some(backend);
740 options.remote_key = Some(key);
741 }
742 }
743 "http" => {
748 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
749 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
750 {
751 Some(u) => u,
752 None => {
753 tracing::warn!(
754 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
755 );
756 return;
757 }
758 };
759 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
760 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
761 .unwrap_or_default();
762 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
763 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
764 {
765 std::fs::read_to_string(&path)
766 .ok()
767 .map(|s| s.trim().to_string())
768 .filter(|s| !s.is_empty())
769 } else {
770 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
771 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
772 };
773
774 let mut config =
775 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
776 if let Some(auth) = auth_header {
777 config = config.with_auth_header(auth);
778 }
779 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
780 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
781 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
782 config = config.with_conditional_writes(conditional_writes);
783 if conditional_writes {
788 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
789 Ok(atomic) => {
790 let atomic_arc = Arc::new(atomic);
791 options.remote_backend = Some(atomic_arc.clone());
792 options.remote_backend_atomic = Some(atomic_arc);
793 }
794 Err(err) => {
795 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
796 options.remote_backend =
797 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
798 }
799 }
800 } else {
801 options.remote_backend =
802 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
803 }
804 options.remote_key = env_nonempty("RED_REMOTE_KEY")
805 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
806 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
807 }
808 "none" | "" => {}
811 other => {
812 tracing::warn!(
813 backend = %other,
814 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
815 );
816 }
817 }
818}
819
820#[cfg(feature = "backend-s3")]
825fn env_s3(suffix: &str) -> Option<String> {
826 env_nonempty(&format!("RED_S3_{suffix}"))
827 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
828}
829
830#[cfg(feature = "backend-s3")]
836fn env_s3_secret(suffix: &str) -> Option<String> {
837 let file_key_red = format!("RED_S3_{suffix}_FILE");
838 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
839 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
840 return std::fs::read_to_string(&path)
841 .ok()
842 .map(|s| s.trim().to_string())
843 .filter(|s| !s.is_empty());
844 }
845 env_s3(suffix)
846}
847
848#[cfg(feature = "backend-s3")]
849fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
850 let endpoint = env_s3("ENDPOINT")?;
851 let bucket = env_s3("BUCKET")?;
852 let access_key = env_s3_secret("ACCESS_KEY")?;
853 let secret_key = env_s3_secret("SECRET_KEY")?;
854 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
855 let key_prefix = env_s3("KEY_PREFIX")
856 .or_else(|| env_s3("PREFIX"))
857 .unwrap_or_default();
858 let path_style = env_s3("PATH_STYLE")
859 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
860 .unwrap_or(true);
861 Some(crate::storage::backend::S3Config {
862 endpoint,
863 bucket,
864 key_prefix,
865 access_key,
866 secret_key,
867 region,
868 path_style,
869 })
870}
871
872pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
873 let data_dir = config.data_dir();
874 let exec_start = render_systemd_exec_start(config);
875 format!(
876 "[Unit]\n\
877Description=RedDB unified database service\n\
878After=network-online.target\n\
879Wants=network-online.target\n\
880\n\
881[Service]\n\
882Type=simple\n\
883User={user}\n\
884Group={group}\n\
885WorkingDirectory={workdir}\n\
886ExecStart={exec_start}\n\
887Restart=always\n\
888RestartSec=2\n\
889LimitSTACK=16M\n\
890NoNewPrivileges=true\n\
891PrivateTmp=true\n\
892ProtectSystem=strict\n\
893ProtectHome=true\n\
894ProtectControlGroups=true\n\
895ProtectKernelTunables=true\n\
896ProtectKernelModules=true\n\
897RestrictNamespaces=true\n\
898LockPersonality=true\n\
899MemoryDenyWriteExecute=true\n\
900ReadWritePaths={workdir}\n\
901\n\
902[Install]\n\
903WantedBy=multi-user.target\n",
904 user = config.run_user,
905 group = config.run_group,
906 workdir = data_dir.display(),
907 exec_start = exec_start,
908 )
909}
910
911#[cfg(target_os = "linux")]
920pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
921 ensure_root()?;
922 ensure_command_available("systemctl")?;
923 ensure_command_available("getent")?;
924 ensure_command_available("groupadd")?;
925 ensure_command_available("useradd")?;
926 ensure_command_available("install")?;
927 ensure_executable(&config.binary_path)?;
928
929 if !command_success("getent", ["group", config.run_group.as_str()])? {
930 run_command("groupadd", ["--system", config.run_group.as_str()])?;
931 }
932
933 if !command_success("id", ["-u", config.run_user.as_str()])? {
934 let data_dir = config.data_dir();
935 run_command(
936 "useradd",
937 [
938 "--system",
939 "--gid",
940 config.run_group.as_str(),
941 "--home-dir",
942 data_dir.to_string_lossy().as_ref(),
943 "--shell",
944 "/usr/sbin/nologin",
945 config.run_user.as_str(),
946 ],
947 )?;
948 }
949
950 let data_dir = config.data_dir();
951 run_command(
952 "install",
953 [
954 "-d",
955 "-o",
956 config.run_user.as_str(),
957 "-g",
958 config.run_group.as_str(),
959 "-m",
960 "0750",
961 data_dir.to_string_lossy().as_ref(),
962 ],
963 )?;
964
965 std::fs::write(config.unit_path(), render_systemd_unit(config))
966 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
967
968 run_command("systemctl", ["daemon-reload"])?;
969 run_command(
970 "systemctl",
971 [
972 "enable",
973 "--now",
974 format!("{}.service", config.service_name).as_str(),
975 ],
976 )?;
977
978 Ok(())
979}
980
981#[cfg(not(target_os = "linux"))]
986pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
987 Err("systemd install is Linux-only — use sc.exe (Windows) or \
988 launchd (macOS) to install the service manually using the \
989 unit printed by `red service print-unit`"
990 .to_string())
991}
992
993#[cfg(target_os = "linux")]
994fn ensure_root() -> Result<(), String> {
995 let output = Command::new("id")
996 .arg("-u")
997 .output()
998 .map_err(|err| format!("failed to determine current uid: {err}"))?;
999 if !output.status.success() {
1000 return Err("failed to determine current uid".to_string());
1001 }
1002 let uid = String::from_utf8_lossy(&output.stdout);
1003 if uid.trim() != "0" {
1004 return Err("run this command as root (sudo)".to_string());
1005 }
1006 Ok(())
1007}
1008
1009#[cfg(target_os = "linux")]
1010fn ensure_command_available(command: &str) -> Result<(), String> {
1011 let status = Command::new("sh")
1012 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1013 .status()
1014 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1015 if status.success() {
1016 Ok(())
1017 } else {
1018 Err(format!("required command not found: {command}"))
1019 }
1020}
1021
1022#[cfg(target_os = "linux")]
1023fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1024 let metadata = std::fs::metadata(path)
1025 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1026 #[cfg(unix)]
1027 {
1028 use std::os::unix::fs::PermissionsExt;
1029 if metadata.permissions().mode() & 0o111 == 0 {
1030 return Err(format!("binary is not executable: {}", path.display()));
1031 }
1032 }
1033 #[cfg(not(unix))]
1034 {
1035 if !metadata.is_file() {
1036 return Err(format!("binary is not a file: {}", path.display()));
1037 }
1038 }
1039 Ok(())
1040}
1041
1042#[cfg(target_os = "linux")]
1043fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1044 Command::new(program)
1045 .args(args)
1046 .status()
1047 .map(|status| status.success())
1048 .map_err(|err| format!("failed to run {program}: {err}"))
1049}
1050
1051#[cfg(target_os = "linux")]
1052fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1053 let output = Command::new(program)
1054 .args(args)
1055 .output()
1056 .map_err(|err| format!("failed to run {program}: {err}"))?;
1057 if output.status.success() {
1058 return Ok(());
1059 }
1060
1061 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1062 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1063 let detail = if !stderr.is_empty() {
1064 stderr
1065 } else if !stdout.is_empty() {
1066 stdout
1067 } else {
1068 format!("exit status {}", output.status)
1069 };
1070 Err(format!("{program} failed: {detail}"))
1071}
1072
1073pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1074 let has_any = config.router_bind_addr.is_some()
1075 || config.grpc_bind_addr.is_some()
1076 || config.http_bind_addr.is_some()
1077 || config.wire_bind_addr.is_some()
1078 || config.pg_bind_addr.is_some();
1079 if !has_any {
1080 return Err("at least one server bind address must be configured".into());
1081 }
1082 let thread_name = if config.router_bind_addr.is_some() {
1083 "red-server-router"
1084 } else {
1085 match (
1086 config.grpc_bind_addr.is_some(),
1087 config.http_bind_addr.is_some(),
1088 ) {
1089 (true, true) => "red-server-dual",
1090 (true, false) => "red-server-grpc",
1091 (false, true) => "red-server-http",
1092 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1093 (false, false) => "red-server-pg-wire",
1094 }
1095 };
1096
1097 let handle = thread::Builder::new()
1098 .name(thread_name.into())
1099 .stack_size(8 * 1024 * 1024)
1100 .spawn(move || run_configured_servers(config))
1101 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1102
1103 match handle.join() {
1104 Ok(result) => result,
1105 Err(_) => Err("server thread panicked".to_string()),
1106 }
1107}
1108
1109fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1110 let mut parts = vec![
1111 config.binary_path.display().to_string(),
1112 "server".to_string(),
1113 "--path".to_string(),
1114 config.data_path.display().to_string(),
1115 ];
1116
1117 if let Some(bind_addr) = &config.router_bind_addr {
1118 parts.push("--bind".to_string());
1119 parts.push(bind_addr.clone());
1120 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1121 parts.push("--grpc-bind".to_string());
1122 parts.push(bind_addr.clone());
1123 }
1124 if let Some(bind_addr) = &config.http_bind_addr {
1125 parts.push("--http-bind".to_string());
1126 parts.push(bind_addr.clone());
1127 }
1128
1129 parts.join(" ")
1130}
1131
1132pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1133 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1134 Ok(addresses) => addresses.collect(),
1135 Err(_) => return false,
1136 };
1137
1138 addresses
1139 .into_iter()
1140 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1141}
1142
1143#[inline(never)]
1144fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1145 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1151 return run_routed_server(config, router_bind_addr);
1152 }
1153
1154 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1155 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1156 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1157 }
1158 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1159 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1160 (None, None) => {
1161 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1162 run_wire_only_server(config, wire_addr)
1163 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1164 run_pg_only_server(config, pg_addr)
1165 } else {
1166 Err("at least one server bind address must be configured".to_string())
1167 }
1168 }
1169 }
1170}
1171
1172pub fn bind_listener_for_startup(
1190 readiness: &mut TransportReadiness,
1191 transport: &str,
1192 bind_addr: &str,
1193 explicit: bool,
1194) -> Result<Option<TcpListener>, String> {
1195 match TcpListener::bind(bind_addr) {
1196 Ok(listener) => {
1197 readiness.active(transport, bind_addr, explicit);
1198 Ok(Some(listener))
1199 }
1200 Err(err) => {
1201 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1202 readiness.failed(transport, bind_addr, explicit, reason.clone());
1203 if explicit {
1204 tracing::error!(
1205 transport,
1206 bind = %bind_addr,
1207 error = %err,
1208 "fatal explicit bind failure"
1209 );
1210 Err(format!("explicit {reason}"))
1211 } else {
1212 tracing::warn!(
1213 transport,
1214 bind = %bind_addr,
1215 error = %err,
1216 "non-fatal implicit bind failure; listener degraded"
1217 );
1218 Ok(None)
1219 }
1220 }
1221 }
1222}
1223
1224async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1247 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1248 .ok()
1249 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1250 .unwrap_or(true);
1251
1252 #[cfg(unix)]
1253 {
1254 use tokio::signal::unix::{signal, SignalKind};
1255
1256 let mut sigterm = match signal(SignalKind::terminate()) {
1257 Ok(s) => s,
1258 Err(err) => {
1259 tracing::warn!(
1260 error = %err,
1261 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1262 );
1263 return;
1264 }
1265 };
1266 let mut sigint = match signal(SignalKind::interrupt()) {
1267 Ok(s) => s,
1268 Err(err) => {
1269 tracing::warn!(error = %err, "could not install SIGINT handler");
1270 return;
1271 }
1272 };
1273 let mut sighup = match signal(SignalKind::hangup()) {
1279 Ok(s) => Some(s),
1280 Err(err) => {
1281 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1282 None
1283 }
1284 };
1285
1286 let reload_runtime = runtime.clone();
1287 tokio::spawn(async move {
1288 loop {
1289 let signal_name = match &mut sighup {
1290 Some(hup) => tokio::select! {
1291 _ = sigterm.recv() => "SIGTERM",
1292 _ = sigint.recv() => "SIGINT",
1293 _ = hup.recv() => "SIGHUP",
1294 },
1295 None => tokio::select! {
1296 _ = sigterm.recv() => "SIGTERM",
1297 _ = sigint.recv() => "SIGINT",
1298 },
1299 };
1300
1301 if signal_name == "SIGHUP" {
1302 handle_sighup_reload(&reload_runtime);
1303 continue; }
1305
1306 tracing::info!(
1307 signal = signal_name,
1308 "lifecycle signal received; shutting down"
1309 );
1310 match runtime.graceful_shutdown(backup_on_shutdown) {
1311 Ok(report) => {
1312 tracing::info!(
1313 duration_ms = report.duration_ms,
1314 flushed_wal = report.flushed_wal,
1315 final_checkpoint = report.final_checkpoint,
1316 backup_uploaded = report.backup_uploaded,
1317 "graceful shutdown complete"
1318 );
1319 }
1320 Err(err) => {
1321 tracing::error!(error = %err, "graceful shutdown failed");
1322 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1328 reason: format!("graceful shutdown failed: {err}"),
1329 }
1330 .emit_global();
1331 }
1332 }
1333 std::process::exit(0);
1334 }
1335 });
1336 }
1337
1338 #[cfg(not(unix))]
1339 {
1340 tokio::spawn(async move {
1341 let interrupted = tokio::signal::ctrl_c().await;
1342 if let Err(err) = interrupted {
1343 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1344 return;
1345 }
1346
1347 tracing::info!(
1348 signal = "Ctrl+C",
1349 "lifecycle signal received; shutting down"
1350 );
1351 match runtime.graceful_shutdown(backup_on_shutdown) {
1352 Ok(report) => {
1353 tracing::info!(
1354 duration_ms = report.duration_ms,
1355 flushed_wal = report.flushed_wal,
1356 final_checkpoint = report.final_checkpoint,
1357 backup_uploaded = report.backup_uploaded,
1358 "graceful shutdown complete"
1359 );
1360 }
1361 Err(err) => {
1362 tracing::error!(error = %err, "graceful shutdown failed");
1363 }
1364 }
1365 std::process::exit(0);
1366 });
1367 }
1368}
1369
1370fn handle_sighup_reload(runtime: &RedDBRuntime) {
1379 let now_ms = std::time::SystemTime::now()
1380 .duration_since(std::time::UNIX_EPOCH)
1381 .map(|d| d.as_millis() as u64)
1382 .unwrap_or(0);
1383 tracing::info!(
1384 target: "reddb::secrets",
1385 ts_unix_ms = now_ms,
1386 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1387 );
1388 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1393 runtime.audit_log().record_event(
1394 AuditEvent::builder("config/sighup_reload")
1395 .source(AuditAuthSource::System)
1396 .resource("secrets")
1397 .outcome(Outcome::Success)
1398 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1399 .build(),
1400 );
1401}
1402
1403#[inline(never)]
1404fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1405 let workers = config.workers;
1406 let cli_telemetry = config.telemetry.clone();
1407 let db_options = config.to_db_options()?;
1408 let rt_config = detect_runtime_config();
1409 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1410 let (runtime, auth_store, _telemetry_guard) =
1411 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1412 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1413
1414 spawn_admin_metrics_listeners(&runtime, &auth_store);
1415
1416 let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1417 .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1418 let http_backend = http_listener
1419 .local_addr()
1420 .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1421 let http_server = build_http_server(
1422 runtime.clone(),
1423 auth_store.clone(),
1424 http_backend.to_string(),
1425 );
1426 let http_server = apply_http_limits(http_server, &config, &runtime);
1427 let http_handle = http_server.serve_in_background_on(http_listener);
1428
1429 thread::sleep(Duration::from_millis(100));
1430 if http_handle.is_finished() {
1431 return match http_handle.join() {
1432 Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1433 Ok(Err(err)) => Err(err.to_string()),
1434 Err(_) => Err("HTTP backend thread panicked".to_string()),
1435 };
1436 }
1437
1438 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1439 .enable_all()
1440 .worker_threads(worker_threads)
1441 .thread_stack_size(rt_config.stack_size)
1442 .build()
1443 .map_err(|err| format!("tokio runtime: {err}"))?;
1444
1445 let signal_runtime = runtime.clone();
1446 tokio_runtime.block_on(async move {
1447 spawn_lifecycle_signal_handler(signal_runtime).await;
1448 let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1449 .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1450 let grpc_backend = grpc_listener
1451 .local_addr()
1452 .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1453 let grpc_server = RedDBGrpcServer::with_options(
1454 runtime.clone(),
1455 GrpcServerOptions {
1456 bind_addr: grpc_backend.to_string(),
1457 tls: None,
1458 },
1459 auth_store,
1460 );
1461 tokio::spawn(async move {
1462 if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1463 tracing::error!(err = %err, "gRPC backend error");
1464 }
1465 });
1466
1467 let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1468 .await
1469 .map_err(|err| format!("bind internal wire listener: {err}"))?;
1470 let wire_backend = wire_listener
1471 .local_addr()
1472 .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1473 let wire_rt = Arc::new(runtime);
1474 tokio::spawn(async move {
1475 if let Err(err) =
1476 crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1477 .await
1478 {
1479 tracing::error!(err = %err, "redwire backend error");
1480 }
1481 });
1482
1483 tracing::info!(
1484 bind = %router_bind_addr,
1485 cpus = rt_config.available_cpus,
1486 workers = worker_threads,
1487 "router bootstrapping"
1488 );
1489 serve_tcp_router(TcpProtocolRouterConfig {
1490 bind_addr: router_bind_addr,
1491 grpc_backend,
1492 http_backend,
1493 wire_backend,
1494 })
1495 .await
1496 .map_err(|err| err.to_string())
1497 })
1498}
1499
1500async fn spawn_wire_listeners(
1502 config: &ServerCommandConfig,
1503 runtime: &RedDBRuntime,
1504 readiness: &mut TransportReadiness,
1505) -> Result<(), String> {
1506 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1508 let wire_rt = Arc::new(runtime.clone());
1509 #[cfg(unix)]
1512 {
1513 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1514 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1515 tokio::spawn(async move {
1516 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1517 &wire_addr, wire_rt,
1518 )
1519 .await
1520 {
1521 tracing::error!(err = %e, "redwire unix listener error");
1522 }
1523 });
1524 return Ok(());
1525 }
1526 }
1527 match tokio::net::TcpListener::bind(&wire_addr).await {
1528 Ok(listener) => {
1529 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1530 tokio::spawn(async move {
1531 if let Err(e) =
1532 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1533 .await
1534 {
1535 tracing::error!(err = %e, "redwire listener error");
1536 }
1537 });
1538 }
1539 Err(err) => {
1540 let reason = format!("wire listener bind {wire_addr}: {err}");
1541 readiness.failed(
1542 "wire",
1543 &wire_addr,
1544 config.wire_bind_explicit,
1545 reason.clone(),
1546 );
1547 if config.wire_bind_explicit {
1548 tracing::error!(
1549 transport = "wire",
1550 bind = %wire_addr,
1551 error = %err,
1552 "fatal explicit bind failure"
1553 );
1554 return Err(format!("explicit {reason}"));
1555 }
1556 tracing::warn!(
1557 transport = "wire",
1558 bind = %wire_addr,
1559 error = %err,
1560 "non-fatal implicit bind failure; listener degraded"
1561 );
1562 }
1563 }
1564 }
1565
1566 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1568 let tls_config = resolve_wire_tls_config(config);
1569 match tls_config {
1570 Ok(tls_cfg) => {
1571 let wire_rt = Arc::new(runtime.clone());
1572 tokio::spawn(async move {
1573 if let Err(e) =
1574 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1575 .await
1576 {
1577 tracing::error!(err = %e, "redwire+tls listener error");
1578 }
1579 });
1580 }
1581 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1582 }
1583 }
1584 Ok(())
1585}
1586
1587fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1594 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1595 let rt = Arc::new(runtime.clone());
1596 tokio::spawn(async move {
1597 let cfg = crate::wire::PgWireConfig {
1598 bind_addr: pg_addr,
1599 ..Default::default()
1600 };
1601 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1602 tracing::error!(err = %e, "pg wire listener error");
1603 }
1604 });
1605 }
1606}
1607
1608fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1622 use crate::utils::secret_file::expand_file_env;
1623
1624 for var in [
1628 "REDDB_GRPC_TLS_CERT",
1629 "REDDB_GRPC_TLS_KEY",
1630 "REDDB_GRPC_TLS_CLIENT_CA",
1631 ] {
1632 if let Err(err) = expand_file_env(var) {
1633 tracing::warn!(
1634 target: "reddb::secrets",
1635 env = %var,
1636 err = %err,
1637 "could not expand *_FILE companion for gRPC TLS"
1638 );
1639 }
1640 }
1641
1642 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1643 (Some(cert), Some(key)) => {
1644 let cert_pem = std::fs::read(cert)
1645 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1646 let key_pem =
1647 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1648 (cert_pem, key_pem)
1649 }
1650 _ => {
1651 let dev = std::env::var("RED_GRPC_TLS_DEV")
1653 .ok()
1654 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1655 .unwrap_or(false);
1656 if !dev {
1657 return Err("gRPC TLS configured but no cert/key supplied — set \
1658 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1659 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1660 .to_string());
1661 }
1662 let dir = config
1663 .path
1664 .as_ref()
1665 .and_then(|p| p.parent())
1666 .map(PathBuf::from)
1667 .unwrap_or_else(|| PathBuf::from("."));
1668 let (cert_pem_str, key_pem_str) =
1669 crate::wire::tls::generate_self_signed_cert("localhost")
1670 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1671
1672 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1677 tracing::warn!(
1678 target: "reddb::security",
1679 transport = "grpc",
1680 cert_sha256 = %fp,
1681 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1682 DO NOT use in production"
1683 );
1684 let cert_path = dir.join("grpc-tls-cert.pem");
1686 let key_path = dir.join("grpc-tls-key.pem");
1687 if !cert_path.exists() || !key_path.exists() {
1688 let _ = std::fs::create_dir_all(&dir);
1689 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1690 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1691 std::fs::write(&key_path, key_pem_str.as_bytes())
1692 .map_err(|e| format!("write grpc dev key: {e}"))?;
1693 #[cfg(unix)]
1694 {
1695 use std::os::unix::fs::PermissionsExt;
1696 let _ =
1697 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1698 }
1699 }
1700 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1701 }
1702 };
1703
1704 let client_ca_pem = match &config.grpc_tls_client_ca {
1705 Some(path) => Some(
1706 std::fs::read(path)
1707 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1708 ),
1709 None => None,
1710 };
1711
1712 Ok(crate::GrpcTlsOptions {
1713 cert_pem,
1714 key_pem,
1715 client_ca_pem,
1716 })
1717}
1718
1719fn spawn_grpc_tls_listener_if_configured(
1723 config: &ServerCommandConfig,
1724 runtime: RedDBRuntime,
1725 auth_store: Arc<AuthStore>,
1726) {
1727 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1728 return;
1729 };
1730 let tls_opts = match resolve_grpc_tls_options(config) {
1731 Ok(opts) => opts,
1732 Err(err) => {
1733 tracing::error!(
1734 target: "reddb::security",
1735 transport = "grpc",
1736 err = %err,
1737 "gRPC TLS config error; TLS listener will not start"
1738 );
1739 return;
1740 }
1741 };
1742 tokio::spawn(async move {
1743 let server = RedDBGrpcServer::with_options(
1744 runtime,
1745 GrpcServerOptions {
1746 bind_addr: tls_bind.clone(),
1747 tls: Some(tls_opts),
1748 },
1749 auth_store,
1750 );
1751 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1752 if let Err(err) = server.serve().await {
1753 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1754 }
1755 });
1756}
1757
1758fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1761 use sha2::{Digest, Sha256};
1762 let mut h = Sha256::new();
1763 h.update(pem);
1764 let d = h.finalize();
1765 let mut buf = String::with_capacity(64);
1766 for b in d.iter() {
1767 buf.push_str(&format!("{b:02x}"));
1768 }
1769 buf
1770}
1771
1772fn resolve_wire_tls_config(
1774 config: &ServerCommandConfig,
1775) -> Result<crate::wire::WireTlsConfig, String> {
1776 match (&config.wire_tls_cert, &config.wire_tls_key) {
1777 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1778 cert_path: cert.clone(),
1779 key_path: key.clone(),
1780 }),
1781 _ => {
1782 let dir = config
1784 .path
1785 .as_ref()
1786 .and_then(|p| p.parent())
1787 .map(PathBuf::from)
1788 .unwrap_or_else(|| PathBuf::from("."));
1789 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1790 }
1791 }
1792}
1793
1794#[inline(never)]
1795fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1796 let rt_config = detect_runtime_config();
1797 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1798 let cli_telemetry = config.telemetry.clone();
1799 let db_options = config.to_db_options()?;
1800 let mut transport_readiness = TransportReadiness::default();
1801
1802 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1803 .enable_all()
1804 .worker_threads(workers)
1805 .thread_stack_size(rt_config.stack_size)
1806 .build()
1807 .map_err(|err| format!("tokio runtime: {err}"))?;
1808
1809 let (runtime, _auth_store, _telemetry_guard) =
1813 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1814 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1815 let signal_runtime = runtime.clone();
1816 tokio_runtime.block_on(async move {
1817 spawn_lifecycle_signal_handler(signal_runtime).await;
1818 spawn_pg_listener(&config, &runtime);
1819 let wire_rt = Arc::new(runtime);
1820 let listener = tokio::net::TcpListener::bind(&wire_addr)
1821 .await
1822 .map_err(|err| {
1823 let reason = format!("wire listener bind {wire_addr}: {err}");
1824 transport_readiness.failed(
1825 "wire",
1826 &wire_addr,
1827 config.wire_bind_explicit,
1828 reason.clone(),
1829 );
1830 if config.wire_bind_explicit {
1831 format!("explicit {reason}")
1832 } else {
1833 reason
1834 }
1835 })?;
1836 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1837 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1838 .await
1839 .map_err(|e| e.to_string())
1840 })
1841}
1842
1843#[inline(never)]
1844fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1845 let rt_config = detect_runtime_config();
1846 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1847 let cli_telemetry = config.telemetry.clone();
1848 let db_options = config.to_db_options()?;
1849
1850 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1851 .enable_all()
1852 .worker_threads(workers)
1853 .thread_stack_size(rt_config.stack_size)
1854 .build()
1855 .map_err(|err| format!("tokio runtime: {err}"))?;
1856
1857 let (runtime, _auth_store, _telemetry_guard) =
1858 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1859 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1860 let signal_runtime = runtime.clone();
1861 tokio_runtime.block_on(async move {
1862 spawn_lifecycle_signal_handler(signal_runtime).await;
1863 let cfg = crate::wire::PgWireConfig {
1864 bind_addr: pg_addr,
1865 ..Default::default()
1866 };
1867 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1868 .await
1869 .map_err(|e| e.to_string())
1870 })
1871}
1872
1873#[inline(never)]
1874fn build_runtime_and_auth_store(
1875 db_options: RedDBOptions,
1876 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1877) -> Result<
1878 (
1879 RedDBRuntime,
1880 Arc<AuthStore>,
1881 Option<crate::telemetry::TelemetryGuard>,
1882 ),
1883 String,
1884> {
1885 build_runtime_with_telemetry(db_options, cli_telemetry)
1892}
1893
1894pub(crate) fn build_runtime_with_telemetry(
1904 db_options: RedDBOptions,
1905 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1906) -> Result<
1907 (
1908 RedDBRuntime,
1909 Arc<AuthStore>,
1910 Option<crate::telemetry::TelemetryGuard>,
1911 ),
1912 String,
1913> {
1914 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1915 let msg = err.to_string();
1921 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1922 phase: "runtime_construction".to_string(),
1923 error: msg.clone(),
1924 }
1925 .emit_global();
1926 msg
1927 })?;
1928
1929 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1934 let msg = err.to_string();
1935 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1936 phase: "lease_loop".to_string(),
1937 error: msg.clone(),
1938 }
1939 .emit_global();
1940 msg
1941 })?;
1942
1943 if let Some(data_path) = db_options.data_path.as_deref() {
1947 let watch_dir = data_path.parent().unwrap_or(data_path);
1948 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1949 }
1950
1951 {
1955 let config_path = crate::runtime::config_overlay::config_file_path();
1956 let store = runtime.db().store();
1957 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1958 }
1959
1960 let merged = merge_telemetry_with_config(
1963 cli_telemetry
1964 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1965 &runtime,
1966 );
1967 let telemetry_guard = crate::telemetry::init(merged);
1968
1969 let no_auth = no_auth_active(&db_options);
1970 let auth_store =
1971 if db_options.auth.vault_enabled {
1972 let pager =
1973 runtime.db().store().pager().cloned().ok_or_else(|| {
1974 "vault requires a paged database (persistent mode)".to_string()
1975 })?;
1976 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1977 .map_err(|err| err.to_string())?;
1978 Arc::new(store)
1979 } else {
1980 Arc::new(AuthStore::new(db_options.auth.clone()))
1981 };
1982 auth_store.configure_control_events(
1983 runtime.control_event_ledger(),
1984 runtime.control_event_config(),
1985 );
1986 if no_auth {
1991 eprintln!("{NO_AUTH_WARNING}");
1992 tracing::warn!("{NO_AUTH_WARNING}");
1993 } else {
1994 apply_preset(&runtime, &auth_store)?;
1995 maybe_apply_policy_break_glass(&auth_store);
1996 }
1997
1998 {
2000 let store = Arc::clone(&auth_store);
2001 std::thread::Builder::new()
2002 .name("reddb-session-purge".into())
2003 .spawn(move || loop {
2004 std::thread::sleep(std::time::Duration::from_secs(300));
2005 store.purge_expired_sessions();
2006 })
2007 .ok();
2008 }
2009
2010 Ok((runtime, auth_store, telemetry_guard))
2011}
2012
2013fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
2017 use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
2018
2019 let enabled = std::env::var(BREAK_GLASS_ENV)
2020 .ok()
2021 .map(|v| {
2022 let trimmed = v.trim().to_ascii_lowercase();
2023 matches!(trimmed.as_str(), "1" | "true" | "yes")
2024 })
2025 .unwrap_or(false);
2026 if !enabled {
2027 return;
2028 }
2029 let now = crate::utils::now_unix_millis() as u128;
2030 match auth_store.apply_policy_break_glass(now) {
2031 Ok(()) => {
2032 tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2033 }
2034 Err(err) => {
2035 tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2036 }
2037 }
2038}
2039
2040pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2045pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2046pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2047
2048pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2050pub(crate) const PRESET_SIMPLE: &str = "simple";
2051pub(crate) const PRESET_PRODUCTION: &str = "production";
2052pub(crate) const PRESET_REGULATED: &str = "regulated";
2053
2054pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2058pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2059pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2060pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2061pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2062
2063pub(crate) fn apply_preset(
2072 runtime: &RedDBRuntime,
2073 auth_store: &Arc<AuthStore>,
2074) -> Result<(), String> {
2075 let store = runtime.db().store();
2076
2077 if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2078 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2079 runtime,
2080 &runtime.config_registry(),
2081 )?;
2082 tracing::info!("bootstrap state present, skipping preset application");
2083 return Ok(());
2084 }
2085
2086 for var in ["REDDB_USERNAME", "REDDB_PASSWORD"] {
2090 crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2091 }
2092
2093 let preset = std::env::var(PRESET_ENV)
2094 .ok()
2095 .map(|s| s.trim().to_string())
2096 .filter(|s| !s.is_empty())
2097 .unwrap_or_else(|| PRESET_SIMPLE.to_string());
2098
2099 if let Ok(path) = std::env::var(crate::cli::bootstrap_manifest::MANIFEST_ENV) {
2100 let path = path.trim();
2101 if !path.is_empty() {
2102 let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2103 runtime,
2104 auth_store,
2105 &runtime.config_registry(),
2106 std::path::Path::new(path),
2107 )?;
2108 persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2109 tracing::info!("bootstrap manifest applied");
2110 return Ok(());
2111 }
2112 }
2113
2114 let first_admin_id = match preset.as_str() {
2115 PRESET_SIMPLE => {
2116 None
2120 }
2121 PRESET_PRODUCTION => Some(apply_production_preset(auth_store)?),
2122 PRESET_REGULATED => {
2123 apply_regulated_preset(runtime, auth_store)?;
2124 None
2125 }
2126 other => {
2127 return Err(format!(
2128 "REDDB_PRESET={other:?} is not recognised (expected `simple`, `production`, or `regulated`)"
2129 ));
2130 }
2131 };
2132
2133 persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2134 tracing::info!(preset = %preset, "bootstrap preset applied");
2135 Ok(())
2136}
2137
2138fn apply_production_preset(auth_store: &Arc<AuthStore>) -> Result<String, String> {
2139 use crate::auth::store::PrincipalRef;
2140 use crate::auth::{policies::Policy, UserId};
2141
2142 let username = std::env::var("REDDB_USERNAME")
2143 .ok()
2144 .filter(|s| !s.is_empty())
2145 .ok_or_else(|| {
2146 "REDDB_PRESET=production requires REDDB_USERNAME (or REDDB_USERNAME_FILE)".to_string()
2147 })?;
2148 let password = std::env::var("REDDB_PASSWORD")
2149 .ok()
2150 .filter(|s| !s.is_empty())
2151 .ok_or_else(|| {
2152 "REDDB_PRESET=production requires REDDB_PASSWORD (or REDDB_PASSWORD_FILE)".to_string()
2153 })?;
2154
2155 let result = auth_store
2158 .bootstrap_system_admin(&username, &password)
2159 .map_err(|err| format!("bootstrap first admin: {err}"))?;
2160 let first_admin = UserId::platform(result.user.username.clone());
2161
2162 let policy = Policy::from_json_str(&format!(
2164 r#"{{
2165 "id": "{id}",
2166 "version": 1,
2167 "statements": [{{
2168 "effect": "allow",
2169 "actions": ["*"],
2170 "resources": ["*"]
2171 }}]
2172 }}"#,
2173 id = FIRST_ADMIN_ALLOW_ALL_POLICY
2174 ))
2175 .map_err(|err| format!("compile allow-all policy: {err}"))?;
2176 auth_store
2177 .put_policy(policy)
2178 .map_err(|err| format!("install allow-all policy: {err}"))?;
2179
2180 auth_store
2182 .attach_policy(
2183 PrincipalRef::User(first_admin.clone()),
2184 FIRST_ADMIN_ALLOW_ALL_POLICY,
2185 )
2186 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2187
2188 Ok(first_admin.to_string())
2189}
2190
2191fn apply_regulated_preset(
2192 runtime: &RedDBRuntime,
2193 auth_store: &Arc<AuthStore>,
2194) -> Result<(), String> {
2195 use crate::auth::policies::Policy;
2196 use crate::auth::registry::EvidenceRequirement;
2197
2198 runtime.query_audit().enable_infrastructure();
2199
2200 let policy = Policy::from_json_str(&format!(
2201 r#"{{
2202 "id": "{id}",
2203 "version": 1,
2204 "statements": [
2205 {{
2206 "effect": "deny",
2207 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2208 "resources": ["policy:{id}"]
2209 }},
2210 {{
2211 "effect": "deny",
2212 "actions": ["config:write"],
2213 "resources": [
2214 "config:{audit}.*",
2215 "config:{evidence}.*",
2216 "config:{query_audit}.*"
2217 ]
2218 }}
2219 ]
2220 }}"#,
2221 id = REGULATED_PROTECT_MANAGED_POLICY,
2222 audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2223 evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2224 query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2225 ))
2226 .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2227 auth_store
2228 .put_policy(policy)
2229 .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2230
2231 let now_ms = crate::utils::now_unix_millis() as u128;
2232 let entries = vec![
2233 regulated_registry_entry(
2234 REGULATED_PROTECT_MANAGED_POLICY,
2235 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2236 "iam_policy",
2237 "policy:*",
2238 &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2239 EvidenceRequirement::Metadata,
2240 now_ms,
2241 ),
2242 regulated_registry_entry(
2243 REGULATED_AUDIT_CONFIG_NAMESPACE,
2244 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2245 "config_namespace",
2246 "config:write",
2247 &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2248 EvidenceRequirement::Metadata,
2249 now_ms,
2250 ),
2251 regulated_registry_entry(
2252 REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2253 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2254 "config_namespace",
2255 "config:write",
2256 &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2257 EvidenceRequirement::Metadata,
2258 now_ms,
2259 ),
2260 regulated_registry_entry(
2261 REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2262 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2263 "config_namespace",
2264 "config:write",
2265 &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2266 EvidenceRequirement::Metadata,
2267 now_ms,
2268 ),
2269 ];
2270
2271 for entry in entries.iter().cloned() {
2272 runtime
2273 .config_registry()
2274 .restore_bootstrap_entry(entry)
2275 .map_err(|err| format!("install regulated registry entry: {err}"))?;
2276 }
2277 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2278 Ok(())
2279}
2280
2281fn regulated_registry_entry(
2282 id: &str,
2283 resource_type: &str,
2284 schema: &str,
2285 required_action: &str,
2286 required_resource: &str,
2287 evidence_requirement: crate::auth::registry::EvidenceRequirement,
2288 updated_at_ms: u128,
2289) -> crate::auth::registry::ConfigRegistryEntry {
2290 crate::auth::registry::ConfigRegistryEntry {
2291 id: id.to_string(),
2292 version: 1,
2293 resource_type: resource_type.to_string(),
2294 schema: schema.to_string(),
2295 mutability: crate::auth::registry::Mutability::Immutable,
2296 sensitivity: crate::auth::registry::Sensitivity::Internal,
2297 managed: true,
2298 required_action: required_action.to_string(),
2299 required_resource: required_resource.to_string(),
2300 evidence_requirement,
2301 updated_by: "system:regulated-preset".to_string(),
2302 updated_at_ms,
2303 }
2304}
2305
2306fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2307 let store = runtime.db().store();
2308 let mut tree = crate::serde_json::Map::new();
2309 tree.insert(
2310 BOOTSTRAP_COMPLETED_KEY.to_string(),
2311 crate::serde_json::Value::Bool(true),
2312 );
2313 tree.insert(
2314 BOOTSTRAP_PRESET_KEY.to_string(),
2315 crate::serde_json::Value::String(preset.to_string()),
2316 );
2317 if let Some(id) = first_admin_id {
2318 tree.insert(
2319 BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2320 crate::serde_json::Value::String(id.to_string()),
2321 );
2322 }
2323 let json = crate::serde_json::Value::Object(tree);
2324 store.set_config_tree("", &json);
2325}
2326
2327fn merge_telemetry_with_config(
2338 mut cli: crate::telemetry::TelemetryConfig,
2339 runtime: &RedDBRuntime,
2340) -> crate::telemetry::TelemetryConfig {
2341 use crate::storage::schema::Value;
2342
2343 let store = runtime.db().store();
2344
2345 if !cli.level_explicit {
2346 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2347 cli.level_filter = v.to_string();
2348 }
2349 }
2350 if !cli.format_explicit {
2351 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2352 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2353 cli.format = parsed;
2354 }
2355 }
2356 }
2357 if !cli.rotation_keep_days_explicit {
2358 match store.get_config("red.logging.keep_days") {
2359 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2360 cli.rotation_keep_days = n as u16
2361 }
2362 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2363 cli.rotation_keep_days = n as u16
2364 }
2365 Some(Value::Text(v)) => {
2366 if let Ok(n) = v.parse::<u16>() {
2367 cli.rotation_keep_days = n;
2368 }
2369 }
2370 _ => {}
2371 }
2372 }
2373 if !cli.file_prefix_explicit {
2374 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2375 if !v.is_empty() {
2376 cli.file_prefix = v.to_string();
2377 }
2378 }
2379 }
2380 if !cli.log_dir_explicit && !cli.log_file_disabled {
2383 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2384 if !v.is_empty() {
2385 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2386 }
2387 }
2388 }
2389
2390 cli
2391}
2392
2393#[cfg(test)]
2394mod telemetry_merge_tests {
2395 use super::*;
2396 use crate::telemetry::{LogFormat, TelemetryConfig};
2397
2398 fn fresh_runtime() -> RedDBRuntime {
2399 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2400 }
2401
2402 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2403 runtime
2404 .db()
2405 .store()
2406 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2407 }
2408
2409 fn cli_base() -> TelemetryConfig {
2410 TelemetryConfig {
2413 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2414 format: LogFormat::Json,
2415 ..Default::default()
2416 }
2417 }
2418
2419 #[test]
2420 fn config_log_dir_promoted_when_flag_absent() {
2421 let runtime = fresh_runtime();
2422 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2423 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2424 assert_eq!(
2425 merged.log_dir.as_deref(),
2426 Some(std::path::Path::new("/var/log/reddb"))
2427 );
2428 }
2429
2430 #[test]
2431 fn explicit_log_dir_wins_over_config() {
2432 let runtime = fresh_runtime();
2433 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2434 let mut cli = cli_base();
2435 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2436 cli.log_dir_explicit = true;
2437 let merged = merge_telemetry_with_config(cli, &runtime);
2438 assert_eq!(
2439 merged.log_dir.as_deref(),
2440 Some(std::path::Path::new("/custom/dir"))
2441 );
2442 }
2443
2444 #[test]
2445 fn no_log_file_beats_config_log_dir() {
2446 let runtime = fresh_runtime();
2447 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2448 let mut cli = cli_base();
2449 cli.log_dir = None;
2450 cli.log_file_disabled = true;
2451 let merged = merge_telemetry_with_config(cli, &runtime);
2452 assert!(
2453 merged.log_dir.is_none(),
2454 "--no-log-file must veto config dir"
2455 );
2456 }
2457
2458 #[test]
2459 fn config_format_promoted_on_non_tty_default() {
2460 let runtime = fresh_runtime();
2464 set_str(&runtime, "red.logging.format", "pretty");
2465 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2466 assert_eq!(merged.format, LogFormat::Pretty);
2467 }
2468
2469 #[test]
2470 fn explicit_format_wins_over_config() {
2471 let runtime = fresh_runtime();
2472 set_str(&runtime, "red.logging.format", "pretty");
2473 let mut cli = cli_base();
2474 cli.format = LogFormat::Json;
2475 cli.format_explicit = true;
2476 let merged = merge_telemetry_with_config(cli, &runtime);
2477 assert_eq!(merged.format, LogFormat::Json);
2478 }
2479}
2480
2481#[inline(never)]
2482fn build_http_server(
2483 runtime: RedDBRuntime,
2484 auth_store: Arc<AuthStore>,
2485 bind_addr: String,
2486) -> RedDBServer {
2487 build_http_server_with_transport_readiness(
2488 runtime,
2489 auth_store,
2490 bind_addr,
2491 TransportReadiness::default(),
2492 )
2493}
2494
2495fn apply_http_limits(
2501 server: RedDBServer,
2502 config: &ServerCommandConfig,
2503 runtime: &RedDBRuntime,
2504) -> RedDBServer {
2505 let store = runtime.db().store();
2506 let resolved =
2507 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2508 .get_config(key)
2509 {
2510 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2511 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2512 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2513 _ => None,
2514 });
2515 tracing::info!(
2516 target: "reddb::http_limits",
2517 max_handlers = resolved.max_handlers,
2518 handler_timeout_ms = resolved.handler_timeout_ms,
2519 retry_after_secs = resolved.retry_after_secs,
2520 "http_limits resolved"
2521 );
2522 server.with_http_limits(resolved)
2523}
2524
2525#[inline(never)]
2526fn build_http_server_with_transport_readiness(
2527 runtime: RedDBRuntime,
2528 auth_store: Arc<AuthStore>,
2529 bind_addr: String,
2530 transport_readiness: TransportReadiness,
2531) -> RedDBServer {
2532 RedDBServer::with_options(
2533 runtime,
2534 ServerOptions {
2535 bind_addr,
2536 transport_readiness,
2537 ..ServerOptions::default()
2538 },
2539 )
2540 .with_auth(auth_store)
2541}
2542
2543#[inline(never)]
2547fn build_admin_only_server(
2548 runtime: RedDBRuntime,
2549 auth_store: Arc<AuthStore>,
2550 bind_addr: String,
2551) -> RedDBServer {
2552 RedDBServer::with_options(
2553 runtime,
2554 ServerOptions {
2555 bind_addr,
2556 surface: crate::server::ServerSurface::AdminOnly,
2557 ..ServerOptions::default()
2558 },
2559 )
2560 .with_auth(auth_store)
2561}
2562
2563#[inline(never)]
2567fn build_metrics_only_server(
2568 runtime: RedDBRuntime,
2569 auth_store: Arc<AuthStore>,
2570 bind_addr: String,
2571) -> RedDBServer {
2572 RedDBServer::with_options(
2573 runtime,
2574 ServerOptions {
2575 bind_addr,
2576 surface: crate::server::ServerSurface::MetricsOnly,
2577 ..ServerOptions::default()
2578 },
2579 )
2580 .with_auth(auth_store)
2581}
2582
2583fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2587 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2588 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2589 let _ = server.serve_in_background();
2590 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2591 }
2592 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2593 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2594 let _ = server.serve_in_background();
2595 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2596 }
2597}
2598
2599#[inline(never)]
2600fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2601 let cli_telemetry = config.telemetry.clone();
2602 let mut transport_readiness = TransportReadiness::default();
2603 let Some(listener) = bind_listener_for_startup(
2604 &mut transport_readiness,
2605 "http",
2606 &bind_addr,
2607 config.http_bind_explicit,
2608 )?
2609 else {
2610 return Err(format!(
2611 "no HTTP listener started; implicit bind {} failed",
2612 bind_addr
2613 ));
2614 };
2615 let db_options = config.to_db_options()?;
2616 let (runtime, auth_store, _telemetry_guard) =
2617 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2618 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2619 spawn_admin_metrics_listeners(&runtime, &auth_store);
2620 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2621 let server = build_http_server_with_transport_readiness(
2622 runtime.clone(),
2623 auth_store,
2624 bind_addr.clone(),
2625 transport_readiness,
2626 );
2627 let server = apply_http_limits(server, &config, &runtime);
2628 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2629 server.serve_on(listener).map_err(|err| err.to_string())
2630}
2631
2632fn spawn_http_tls_listener(
2638 config: &ServerCommandConfig,
2639 runtime: &RedDBRuntime,
2640 auth_store: &Arc<AuthStore>,
2641) -> Result<(), String> {
2642 let Some(addr) = config.http_tls_bind_addr.clone() else {
2643 return Ok(());
2644 };
2645
2646 let tls_config = resolve_http_tls_config(config)?;
2647 let server_config = crate::server::tls::build_server_config(&tls_config)
2648 .map_err(|err| format!("HTTP TLS: {err}"))?;
2649
2650 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2651 let server = apply_http_limits(server, config, runtime);
2652 let _handle = server.serve_tls_in_background(server_config);
2653 tracing::info!(
2654 transport = "https",
2655 bind = %addr,
2656 mtls = %tls_config.client_ca_path.is_some(),
2657 "TLS listener online"
2658 );
2659 Ok(())
2660}
2661
2662fn resolve_http_tls_config(
2664 config: &ServerCommandConfig,
2665) -> Result<crate::server::tls::HttpTlsConfig, String> {
2666 match (&config.http_tls_cert, &config.http_tls_key) {
2667 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2668 cert_path: cert.clone(),
2669 key_path: key.clone(),
2670 client_ca_path: config.http_tls_client_ca.clone(),
2671 }),
2672 (None, None) => {
2673 let dir = config
2675 .path
2676 .as_ref()
2677 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2678 .unwrap_or_else(|| std::path::PathBuf::from("."));
2679 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2680 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2681 Ok(crate::server::tls::HttpTlsConfig {
2682 cert_path: auto.cert_path,
2683 key_path: auto.key_path,
2684 client_ca_path: config.http_tls_client_ca.clone(),
2685 })
2686 }
2687 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2688 }
2689}
2690
2691#[inline(never)]
2692fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2693 let workers = config.workers;
2694 let cli_telemetry = config.telemetry.clone();
2695 let db_options = config.to_db_options()?;
2696 let rt_config = detect_runtime_config();
2697 let mut transport_readiness = TransportReadiness::default();
2698 let Some(grpc_listener) = bind_listener_for_startup(
2699 &mut transport_readiness,
2700 "grpc",
2701 &bind_addr,
2702 config.grpc_bind_explicit,
2703 )?
2704 else {
2705 return Err(format!(
2706 "no gRPC listener started; implicit bind {} failed",
2707 bind_addr
2708 ));
2709 };
2710
2711 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2712
2713 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2714 .enable_all()
2715 .worker_threads(worker_threads)
2716 .thread_stack_size(rt_config.stack_size)
2717 .build()
2718 .map_err(|err| format!("tokio runtime: {err}"))?;
2719
2720 let (runtime, auth_store, _telemetry_guard) =
2722 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2723 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2724 let signal_runtime = runtime.clone();
2725 tokio_runtime.block_on(async move {
2726 spawn_lifecycle_signal_handler(signal_runtime).await;
2727 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2729
2730 spawn_pg_listener(&config, &runtime);
2732
2733 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2737
2738 let server = RedDBGrpcServer::with_options(
2739 runtime,
2740 GrpcServerOptions {
2741 bind_addr: bind_addr.clone(),
2742 tls: None,
2743 },
2744 auth_store,
2745 );
2746
2747 tracing::info!(
2748 transport = "grpc",
2749 bind = %bind_addr,
2750 cpus = rt_config.available_cpus,
2751 workers = worker_threads,
2752 "listener online"
2753 );
2754 server
2755 .serve_on(grpc_listener)
2756 .await
2757 .map_err(|err| err.to_string())
2758 })
2759}
2760
2761#[inline(never)]
2762fn run_dual_server(
2763 config: ServerCommandConfig,
2764 grpc_bind_addr: String,
2765 http_bind_addr: String,
2766) -> Result<(), String> {
2767 let workers = config.workers;
2768 let cli_telemetry = config.telemetry.clone();
2769 let db_options = config.to_db_options()?;
2770 let rt_config = detect_runtime_config();
2771 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2772 let mut transport_readiness = TransportReadiness::default();
2773 let http_listener = bind_listener_for_startup(
2774 &mut transport_readiness,
2775 "http",
2776 &http_bind_addr,
2777 config.http_bind_explicit,
2778 )?;
2779 let grpc_listener = bind_listener_for_startup(
2780 &mut transport_readiness,
2781 "grpc",
2782 &grpc_bind_addr,
2783 config.grpc_bind_explicit,
2784 )?;
2785 if http_listener.is_none() && grpc_listener.is_none() {
2786 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2787 }
2788 let (runtime, auth_store, _telemetry_guard) =
2789 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2790 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2791
2792 spawn_admin_metrics_listeners(&runtime, &auth_store);
2793 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2794
2795 let http_handle = if let Some(listener) = http_listener {
2796 let http_server = build_http_server_with_transport_readiness(
2797 runtime.clone(),
2798 auth_store.clone(),
2799 http_bind_addr.clone(),
2800 transport_readiness.clone(),
2801 );
2802 let http_server = apply_http_limits(http_server, &config, &runtime);
2803 Some(http_server.serve_in_background_on(listener))
2804 } else {
2805 None
2806 };
2807
2808 thread::sleep(Duration::from_millis(150));
2809 if let Some(handle) = http_handle.as_ref() {
2810 if handle.is_finished() {
2811 let handle = http_handle.unwrap();
2812 return match handle.join() {
2813 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2814 Ok(Err(err)) => Err(err.to_string()),
2815 Err(_) => Err("HTTP server thread panicked".to_string()),
2816 };
2817 }
2818 }
2819 if grpc_listener.is_none() {
2820 let Some(handle) = http_handle else {
2821 return Err("no listener started".to_string());
2822 };
2823 return match handle.join() {
2824 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2825 Ok(Err(err)) => Err(err.to_string()),
2826 Err(_) => Err("HTTP server thread panicked".to_string()),
2827 };
2828 }
2829 let grpc_listener = grpc_listener.expect("checked above");
2830
2831 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2832 .enable_all()
2833 .worker_threads(worker_threads)
2834 .thread_stack_size(rt_config.stack_size)
2835 .build()
2836 .map_err(|err| format!("tokio runtime: {err}"))?;
2837
2838 let signal_runtime = runtime.clone();
2839 tokio_runtime.block_on(async move {
2840 spawn_lifecycle_signal_handler(signal_runtime).await;
2841 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2843
2844 spawn_pg_listener(&config, &runtime);
2846
2847 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2849
2850 let server = RedDBGrpcServer::with_options(
2851 runtime,
2852 GrpcServerOptions {
2853 bind_addr: grpc_bind_addr.clone(),
2854 tls: None,
2855 },
2856 auth_store,
2857 );
2858
2859 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2860 tracing::info!(
2861 transport = "grpc",
2862 bind = %grpc_bind_addr,
2863 cpus = rt_config.available_cpus,
2864 workers = worker_threads,
2865 "listener online"
2866 );
2867 server
2868 .serve_on(grpc_listener)
2869 .await
2870 .map_err(|err| err.to_string())
2871 })
2872}
2873
2874#[cfg(test)]
2875mod tests {
2876 use super::*;
2877
2878 #[test]
2879 fn render_systemd_unit_contains_expected_execstart() {
2880 let config = SystemdServiceConfig {
2881 service_name: "reddb".to_string(),
2882 binary_path: PathBuf::from("/usr/local/bin/red"),
2883 run_user: "reddb".to_string(),
2884 run_group: "reddb".to_string(),
2885 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2886 router_bind_addr: None,
2887 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2888 http_bind_addr: None,
2889 };
2890
2891 let unit = render_systemd_unit(&config);
2892 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2893 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2894 }
2895
2896 #[test]
2897 fn systemd_service_config_derives_paths() {
2898 let config = SystemdServiceConfig {
2899 service_name: "reddb-api".to_string(),
2900 binary_path: PathBuf::from("/usr/local/bin/red"),
2901 run_user: "reddb".to_string(),
2902 run_group: "reddb".to_string(),
2903 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2904 router_bind_addr: None,
2905 grpc_bind_addr: None,
2906 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2907 };
2908
2909 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2910 assert_eq!(
2911 config.unit_path(),
2912 PathBuf::from("/etc/systemd/system/reddb-api.service")
2913 );
2914 }
2915
2916 #[test]
2917 fn render_systemd_unit_supports_dual_transport() {
2918 let config = SystemdServiceConfig {
2919 service_name: "reddb".to_string(),
2920 binary_path: PathBuf::from("/usr/local/bin/red"),
2921 run_user: "reddb".to_string(),
2922 run_group: "reddb".to_string(),
2923 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2924 router_bind_addr: None,
2925 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2926 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2927 };
2928
2929 let unit = render_systemd_unit(&config);
2930 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2931 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2932 }
2933
2934 #[test]
2935 fn render_systemd_unit_supports_router_mode() {
2936 let config = SystemdServiceConfig {
2937 service_name: "reddb".to_string(),
2938 binary_path: PathBuf::from("/usr/local/bin/red"),
2939 run_user: "reddb".to_string(),
2940 run_group: "reddb".to_string(),
2941 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2942 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2943 grpc_bind_addr: None,
2944 http_bind_addr: None,
2945 };
2946
2947 let unit = render_systemd_unit(&config);
2948 assert!(unit.contains("--bind 127.0.0.1:5050"));
2949 assert!(!unit.contains("--grpc-bind"));
2950 assert!(!unit.contains("--http-bind"));
2951 }
2952
2953 #[test]
2954 fn explicit_bind_collision_is_fatal() {
2955 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2956 let addr = held.local_addr().expect("held addr").to_string();
2957 let mut readiness = TransportReadiness::default();
2958
2959 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2960
2961 assert!(error.contains("explicit http listener bind"));
2962 assert_eq!(readiness.active.len(), 0);
2963 assert_eq!(readiness.failed.len(), 1);
2964 assert!(readiness.failed[0].explicit);
2965 assert_eq!(readiness.failed[0].bind_addr, addr);
2966 }
2967
2968 fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2975 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2976 LOCK.get_or_init(|| std::sync::Mutex::new(()))
2977 }
2978
2979 fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
2980 ServerCommandConfig {
2981 path: None,
2982 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2983 router_bind_explicit: false,
2984 grpc_bind_addr: None,
2985 grpc_bind_explicit: false,
2986 grpc_tls_bind_addr: None,
2987 grpc_tls_cert: None,
2988 grpc_tls_key: None,
2989 grpc_tls_client_ca: None,
2990 http_bind_addr: None,
2991 http_bind_explicit: false,
2992 http_tls_bind_addr: None,
2993 http_tls_cert: None,
2994 http_tls_key: None,
2995 http_tls_client_ca: None,
2996 wire_bind_addr: None,
2997 wire_bind_explicit: false,
2998 wire_tls_bind_addr: None,
2999 wire_tls_cert: None,
3000 wire_tls_key: None,
3001 pg_bind_addr: None,
3002 create_if_missing: true,
3003 read_only: false,
3004 role: "standalone".to_string(),
3005 primary_addr: None,
3006 vault: true,
3009 no_auth,
3010 workers: None,
3011 telemetry: None,
3012 http_limits_cli: crate::server::HttpLimitsCliInput::default(),
3013 }
3014 }
3015
3016 #[test]
3017 fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
3018 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3019 unsafe {
3024 std::env::set_var("REDDB_USERNAME", "admin");
3025 std::env::set_var("REDDB_PASSWORD", "hunter2");
3026 }
3027 let config = no_auth_test_config(true);
3028 let options = config.to_db_options().expect("to_db_options");
3029
3030 assert!(no_auth_active(&options), "metadata should be stamped");
3031 assert!(!options.auth.enabled, "auth.enabled must be forced off");
3032 assert!(
3033 !options.auth.require_auth,
3034 "require_auth must be forced off"
3035 );
3036 assert!(
3037 !options.auth.vault_enabled,
3038 "vault_enabled must be forced off (overrides --vault)"
3039 );
3040 assert_eq!(
3041 options.metadata.get(NO_AUTH_META).map(String::as_str),
3042 Some("true"),
3043 );
3044
3045 unsafe {
3047 std::env::remove_var("REDDB_USERNAME");
3048 std::env::remove_var("REDDB_PASSWORD");
3049 }
3050 }
3051
3052 #[test]
3053 fn default_behaviour_without_no_auth_flag_is_unchanged() {
3054 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3055 let config = no_auth_test_config(false);
3056 let options = config.to_db_options().expect("to_db_options");
3057
3058 assert!(
3059 !no_auth_active(&options),
3060 "default boot must not be marked no-auth"
3061 );
3062 assert!(
3063 options.metadata.get(NO_AUTH_META).is_none(),
3064 "metadata key must be absent when flag is off"
3065 );
3066 assert!(options.auth.vault_enabled);
3068 }
3069
3070 #[test]
3071 fn no_auth_active_blocks_bootstrap_from_env() {
3072 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3073 unsafe {
3078 std::env::set_var("REDDB_USERNAME", "admin");
3079 std::env::set_var("REDDB_PASSWORD", "hunter2");
3080 }
3081
3082 let options = no_auth_test_config(true)
3083 .to_db_options()
3084 .expect("to_db_options");
3085
3086 let auth_store = AuthStore::new(options.auth.clone());
3090 if !no_auth_active(&options) {
3091 auth_store.bootstrap_from_env();
3092 }
3093
3094 assert!(
3095 auth_store.needs_bootstrap(),
3096 "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3097 );
3098
3099 unsafe {
3101 std::env::remove_var("REDDB_USERNAME");
3102 std::env::remove_var("REDDB_PASSWORD");
3103 }
3104 }
3105
3106 fn clear_preset_env() {
3113 unsafe {
3115 std::env::remove_var(PRESET_ENV);
3116 std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3117 std::env::remove_var("REDDB_USERNAME");
3118 std::env::remove_var("REDDB_PASSWORD");
3119 std::env::remove_var("REDDB_USERNAME_FILE");
3120 std::env::remove_var("REDDB_PASSWORD_FILE");
3121 }
3122 }
3123
3124 fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3125 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3126 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3127 (runtime, auth_store)
3128 }
3129
3130 #[test]
3131 fn simple_preset_is_default_and_persists_state() {
3132 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3133 clear_preset_env();
3134
3135 let (runtime, auth_store) = fresh_runtime_and_store();
3136 apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3137
3138 assert!(
3140 auth_store.needs_bootstrap(),
3141 "simple preset must not create an admin"
3142 );
3143
3144 let store = runtime.db().store();
3146 let completed = store
3147 .get_config(BOOTSTRAP_COMPLETED_KEY)
3148 .expect("completed key persisted");
3149 assert!(matches!(
3150 completed,
3151 crate::storage::schema::Value::Boolean(true)
3152 ));
3153 let preset = store
3154 .get_config(BOOTSTRAP_PRESET_KEY)
3155 .expect("preset key persisted");
3156 match preset {
3157 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3158 other => panic!("expected Text(simple), got {other:?}"),
3159 }
3160 assert!(
3161 store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3162 "simple preset must not record a first admin"
3163 );
3164
3165 clear_preset_env();
3166 }
3167
3168 #[test]
3169 fn production_preset_creates_first_admin_with_allow_all_policy() {
3170 use crate::auth::policies::{EvalContext, ResourceRef};
3171 use crate::auth::UserId;
3172
3173 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3174 clear_preset_env();
3175 unsafe {
3177 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3178 std::env::set_var("REDDB_USERNAME", "ops");
3179 std::env::set_var("REDDB_PASSWORD", "hunter2");
3180 }
3181
3182 let (runtime, auth_store) = fresh_runtime_and_store();
3183 apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3184
3185 assert!(
3187 !auth_store.needs_bootstrap(),
3188 "production preset must seal bootstrap"
3189 );
3190 let users = auth_store.list_users();
3191 assert_eq!(users.len(), 1);
3192 let admin = &users[0];
3193 assert_eq!(admin.username, "ops");
3194 assert!(
3195 admin.system_owned,
3196 "first admin must be system-owned to pass the managed-config gate"
3197 );
3198 assert!(
3199 admin.tenant_id.is_none(),
3200 "first admin must be platform-scoped (tenant=None)"
3201 );
3202
3203 let policy = auth_store
3205 .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3206 .expect("allow-all policy installed");
3207 assert!(!policy.statements.is_empty());
3208
3209 let actor = UserId::platform("ops");
3212 let ctx = EvalContext {
3213 principal_tenant: None,
3214 current_tenant: None,
3215 peer_ip: None,
3216 mfa_present: false,
3217 now_ms: 1_700_000_000_000,
3218 principal_is_admin_role: true,
3219 principal_is_system_owned: true,
3220 principal_is_platform_scoped: true,
3221 };
3222 let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3223 assert!(
3224 auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3225 "allow-all policy must grant arbitrary actions via the evaluator"
3226 );
3227
3228 let store = runtime.db().store();
3230 match store
3231 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3232 .expect("first_admin_id persisted")
3233 {
3234 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3235 other => panic!("expected Text(ops), got {other:?}"),
3236 }
3237 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3238 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3239 other => panic!("expected Text(production), got {other:?}"),
3240 }
3241
3242 clear_preset_env();
3243 }
3244
3245 #[test]
3246 fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3247 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3248 clear_preset_env();
3249 unsafe {
3251 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3252 }
3253
3254 let (runtime, auth_store) = fresh_runtime_and_store();
3255 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3256
3257 assert!(runtime.query_audit().is_enabled());
3258 assert!(runtime.query_audit().rules().is_empty());
3259 assert!(
3260 runtime
3261 .db()
3262 .store()
3263 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3264 .is_some(),
3265 "regulated preset should create the query-audit stream"
3266 );
3267
3268 runtime
3269 .execute_query("CREATE TABLE docs (id INT)")
3270 .expect("create table");
3271 runtime
3272 .execute_query("INSERT INTO docs (id) VALUES (1)")
3273 .expect("insert");
3274 runtime.execute_query("SELECT * FROM docs").expect("select");
3275 let rows = runtime
3276 .db()
3277 .store()
3278 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3279 .expect("query audit collection")
3280 .query_all(|_| true);
3281 assert!(
3282 rows.is_empty(),
3283 "regulated preset must not globally audit every query"
3284 );
3285
3286 clear_preset_env();
3287 }
3288
3289 #[test]
3290 fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3291 use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3292 use crate::auth::store::PrincipalRef;
3293 use crate::auth::{Role, UserId};
3294 use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3295 use crate::storage::schema::Value;
3296
3297 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3298 clear_preset_env();
3299 unsafe {
3301 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3302 }
3303
3304 let options = no_auth_test_config(false)
3305 .to_db_options()
3306 .expect("regulated options");
3307 assert!(
3308 options.control_events.compliance_mode,
3309 "regulated preset must enable fail-closed control evidence before runtime boot"
3310 );
3311 assert!(
3312 options.query_audit.enabled && options.query_audit.rules.is_empty(),
3313 "regulated preset must enable query-audit infrastructure without global rules"
3314 );
3315
3316 let runtime = RedDBRuntime::with_options(options).expect("runtime");
3317 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3318 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3319 runtime.set_auth_store(Arc::clone(&auth_store));
3320
3321 assert!(runtime.control_events_require_persistence());
3322 assert!(runtime.query_audit().is_enabled());
3323 assert!(runtime.query_audit().rules().is_empty());
3324 assert!(auth_store
3325 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3326 .is_some());
3327
3328 let managed_policy = runtime
3329 .config_registry()
3330 .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3331 .expect("regulated managed policy registry entry");
3332 assert!(managed_policy.managed);
3333 assert_eq!(managed_policy.resource_type, "policy");
3334 assert!(
3335 runtime
3336 .config_registry()
3337 .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3338 .expect("regulated audit config namespace")
3339 .managed
3340 );
3341
3342 let registry_rows = runtime
3343 .execute_query(&format!(
3344 "SELECT id, managed FROM red.registry WHERE id = '{}'",
3345 REGULATED_PROTECT_MANAGED_POLICY
3346 ))
3347 .expect("red.registry query");
3348 assert_eq!(registry_rows.result.records.len(), 1);
3349 assert_eq!(
3350 registry_rows.result.records[0].get("managed"),
3351 Some(&Value::Boolean(true))
3352 );
3353
3354 let managed_policy_rows = runtime
3355 .execute_query(&format!(
3356 "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3357 REGULATED_PROTECT_MANAGED_POLICY
3358 ))
3359 .expect("red.managed_policies query");
3360 assert_eq!(managed_policy_rows.result.records.len(), 1);
3361
3362 let capability_rows = runtime
3363 .execute_query(
3364 "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3365 )
3366 .expect("red.control_capabilities query");
3367 assert_eq!(capability_rows.result.records.len(), 1);
3368
3369 auth_store
3370 .create_user("alice", "p", Role::Admin)
3371 .expect("create ordinary admin");
3372 let allow_all = Policy::from_json_str(
3373 r#"{
3374 "id": "alice-allow-all",
3375 "version": 1,
3376 "statements": [{
3377 "effect": "allow",
3378 "actions": ["*"],
3379 "resources": ["*"]
3380 }]
3381 }"#,
3382 )
3383 .expect("allow-all policy");
3384 auth_store.put_policy(allow_all).expect("install allow-all");
3385 auth_store
3386 .attach_policy(
3387 PrincipalRef::User(UserId::platform("alice")),
3388 "alice-allow-all",
3389 )
3390 .expect("attach allow-all");
3391 let ctx = EvalContext {
3392 principal_tenant: None,
3393 current_tenant: None,
3394 peer_ip: None,
3395 mfa_present: false,
3396 now_ms: 1_700_000_000_000,
3397 principal_is_admin_role: true,
3398 principal_is_system_owned: false,
3399 principal_is_platform_scoped: true,
3400 };
3401 assert!(
3402 auth_store.check_policy_authz(
3403 &UserId::platform("alice"),
3404 "policy:drop",
3405 &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
3406 &ctx,
3407 ),
3408 "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
3409 );
3410
3411 set_current_auth_identity("alice".to_string(), Role::Admin);
3412 let denied = runtime.execute_query(&format!(
3413 "DROP POLICY '{}'",
3414 REGULATED_PROTECT_MANAGED_POLICY
3415 ));
3416 clear_current_auth_identity();
3417 let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
3418 assert!(
3419 err.to_string().contains("managed policy"),
3420 "error should name the managed guardrail: {err}"
3421 );
3422 assert!(
3423 auth_store
3424 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3425 .is_some(),
3426 "denied mutation must leave managed policy installed"
3427 );
3428
3429 let denied_events = runtime
3430 .execute_query(&format!(
3431 "SELECT action, resource, outcome FROM red.control_events \
3432 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
3433 REGULATED_PROTECT_MANAGED_POLICY
3434 ))
3435 .expect("red.control_events denied policy drop");
3436 assert_eq!(denied_events.result.records.len(), 1);
3437 assert_eq!(
3438 denied_events.result.records[0].get("outcome"),
3439 Some(&Value::text("denied"))
3440 );
3441
3442 set_current_auth_identity("alice".to_string(), Role::Admin);
3443 let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
3444 clear_current_auth_identity();
3445 let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
3446 assert!(
3447 err.to_string().contains("managed config"),
3448 "error should name the managed config guardrail: {err}"
3449 );
3450
3451 let denied_config_events = runtime
3452 .execute_query(
3453 "SELECT action, resource, outcome FROM red.control_events \
3454 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
3455 )
3456 .expect("red.control_events denied config write");
3457 assert_eq!(denied_config_events.result.records.len(), 1);
3458 assert_eq!(
3459 denied_config_events.result.records[0].get("outcome"),
3460 Some(&Value::text("denied"))
3461 );
3462
3463 runtime
3464 .execute_query("CREATE TABLE regulated_docs (id INT)")
3465 .expect("create user table");
3466 runtime
3467 .execute_query("SELECT * FROM regulated_docs")
3468 .expect("select user table");
3469 let audit_rows = runtime
3470 .db()
3471 .store()
3472 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3473 .expect("query audit collection")
3474 .query_all(|_| true);
3475 assert!(
3476 audit_rows.is_empty(),
3477 "regulated preset must not globally audit data-plane queries"
3478 );
3479
3480 clear_preset_env();
3481 }
3482
3483 #[test]
3484 fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
3485 use crate::auth::policies::{EvalContext, ResourceRef};
3486 use crate::auth::UserId;
3487 use crate::storage::schema::Value;
3488
3489 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3490 clear_preset_env();
3491
3492 let manifest_path = std::env::temp_dir().join(format!(
3493 "reddb-bootstrap-manifest-{}-{}.json",
3494 std::process::id(),
3495 std::time::SystemTime::now()
3496 .duration_since(std::time::UNIX_EPOCH)
3497 .unwrap_or_default()
3498 .as_millis()
3499 ));
3500 std::fs::write(
3501 &manifest_path,
3502 r#"{
3503 "users": [
3504 {
3505 "username": "ops",
3506 "password": "hunter2",
3507 "role": "admin",
3508 "system_owned": true
3509 }
3510 ],
3511 "policies": [
3512 {
3513 "id": "bootstrap-registry-admin",
3514 "version": 1,
3515 "statements": [
3516 {
3517 "effect": "allow",
3518 "actions": ["red.registry:*", "policy:*", "config:write", "app:read"],
3519 "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
3520 }
3521 ]
3522 }
3523 ],
3524 "managed_policies": [
3525 {
3526 "id": "managed-deny-drop",
3527 "version": 1,
3528 "statements": [
3529 {
3530 "effect": "deny",
3531 "actions": ["policy:drop"],
3532 "resources": ["policy:managed-deny-drop"]
3533 }
3534 ],
3535 "required_resource": "policy:managed-deny-drop",
3536 "evidence": "full"
3537 }
3538 ],
3539 "attachments": [
3540 {"user": "ops", "policy": "bootstrap-registry-admin"}
3541 ],
3542 "managed_config_namespaces": [
3543 {
3544 "id": "red.ai",
3545 "required_action": "config:write",
3546 "required_resource": "config:red.ai.*",
3547 "evidence": "metadata"
3548 }
3549 ],
3550 "config": [
3551 {"key": "red.ai.default.provider", "value": "openai"},
3552 {
3553 "key": "red.ai.openai.default.secret_ref",
3554 "secret_ref": {"collection": "red.vault", "key": "openai"}
3555 }
3556 ],
3557 "actor": "ops"
3558 }"#,
3559 )
3560 .expect("write manifest");
3561 unsafe {
3563 std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
3564 }
3565
3566 let (runtime, auth_store) = fresh_runtime_and_store();
3567 apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
3568
3569 let users = auth_store.list_users();
3570 assert_eq!(users.len(), 1);
3571 assert_eq!(users[0].username, "ops");
3572 assert!(users[0].system_owned);
3573
3574 let actor = UserId::platform("ops");
3575 let ctx = EvalContext {
3576 principal_tenant: None,
3577 current_tenant: None,
3578 peer_ip: None,
3579 mfa_present: false,
3580 now_ms: 1_700_000_000_000,
3581 principal_is_admin_role: true,
3582 principal_is_system_owned: true,
3583 principal_is_platform_scoped: true,
3584 };
3585 assert!(auth_store.check_policy_authz(
3586 &actor,
3587 "app:read",
3588 &ResourceRef::new("collection", "docs"),
3589 &ctx
3590 ));
3591
3592 let managed_policy = runtime
3593 .config_registry()
3594 .get_active("managed-deny-drop")
3595 .expect("managed policy registry entry");
3596 assert!(managed_policy.managed);
3597 assert_eq!(managed_policy.resource_type, "policy");
3598 let managed_config = runtime
3599 .config_registry()
3600 .get_active("red.ai")
3601 .expect("managed config namespace registry entry");
3602 assert!(managed_config.managed);
3603 assert_eq!(managed_config.resource_type, "config_namespace");
3604
3605 let store = runtime.db().store();
3606 match store
3607 .get_config("red.ai.default.provider")
3608 .expect("plain config persisted")
3609 {
3610 Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
3611 other => panic!("expected provider text, got {other:?}"),
3612 }
3613 let Value::Json(bytes) = store
3614 .get_config("red.ai.openai.default.secret_ref")
3615 .expect("secret ref config persisted")
3616 else {
3617 panic!("secret ref must be stored as structured JSON");
3618 };
3619 let reference: crate::serde_json::Value =
3620 crate::serde_json::from_slice(&bytes).expect("secret ref json");
3621 assert_eq!(
3622 reference.get("type").and_then(|v| v.as_str()),
3623 Some("secret_ref")
3624 );
3625 assert!(
3626 !String::from_utf8_lossy(&bytes).contains("hunter2"),
3627 "manifest password must not leak into secret ref config"
3628 );
3629
3630 let completed = store
3631 .get_config(BOOTSTRAP_COMPLETED_KEY)
3632 .expect("bootstrap completion persisted");
3633 assert!(matches!(completed, Value::Boolean(true)));
3634 assert!(
3635 store
3636 .get_config("system.bootstrap.manifest.registry_entries")
3637 .is_some(),
3638 "managed registry entries must be persisted internally"
3639 );
3640
3641 std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
3642 let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
3643 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
3644 .expect("registry rehydrates without manifest file");
3645 assert!(restored_registry.get_active("managed-deny-drop").is_some());
3646 assert!(restored_registry.get_active("red.ai").is_some());
3647
3648 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3649 apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
3650 assert!(fresh.needs_bootstrap());
3651
3652 clear_preset_env();
3653 }
3654
3655 #[test]
3656 fn production_preset_refuses_to_start_without_password() {
3657 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3658 clear_preset_env();
3659 unsafe {
3661 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3662 std::env::set_var("REDDB_USERNAME", "ops");
3663 }
3664
3665 let (runtime, auth_store) = fresh_runtime_and_store();
3666 let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
3667 assert!(
3668 err.contains("REDDB_PASSWORD"),
3669 "error must name the missing env: {err}"
3670 );
3671
3672 assert!(auth_store.needs_bootstrap());
3674 assert!(runtime
3675 .db()
3676 .store()
3677 .get_config(BOOTSTRAP_COMPLETED_KEY)
3678 .is_none());
3679
3680 clear_preset_env();
3681 }
3682
3683 #[test]
3684 fn re_running_production_after_first_boot_is_a_silent_skip() {
3685 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3686 clear_preset_env();
3687 unsafe {
3689 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3690 std::env::set_var("REDDB_USERNAME", "ops");
3691 std::env::set_var("REDDB_PASSWORD", "hunter2");
3692 }
3693
3694 let (runtime, auth_store) = fresh_runtime_and_store();
3695 apply_preset(&runtime, &auth_store).expect("first apply");
3696 assert_eq!(auth_store.list_users().len(), 1);
3697
3698 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3705 apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
3706 assert!(
3707 fresh.needs_bootstrap(),
3708 "re-run must not create a second admin"
3709 );
3710 assert!(
3711 fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
3712 "re-run must not re-install the allow-all policy on the fresh store"
3713 );
3714
3715 clear_preset_env();
3716 }
3717
3718 #[test]
3719 fn unrecognised_preset_value_is_rejected() {
3720 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3721 clear_preset_env();
3722 unsafe {
3724 std::env::set_var(PRESET_ENV, "weird");
3725 }
3726
3727 let (runtime, auth_store) = fresh_runtime_and_store();
3728 let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
3729 assert!(err.contains("weird"), "error must echo the value: {err}");
3730 assert!(auth_store.needs_bootstrap());
3731
3732 clear_preset_env();
3733 }
3734
3735 #[test]
3736 fn no_auth_short_circuits_preset_entirely() {
3737 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3738 clear_preset_env();
3739 unsafe {
3742 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3743 std::env::set_var("REDDB_USERNAME", "ops");
3744 std::env::set_var("REDDB_PASSWORD", "hunter2");
3745 }
3746
3747 let options = no_auth_test_config(true)
3748 .to_db_options()
3749 .expect("to_db_options");
3750 assert!(no_auth_active(&options));
3751
3752 let (runtime, auth_store) = fresh_runtime_and_store();
3755 if !no_auth_active(&options) {
3756 apply_preset(&runtime, &auth_store).expect("would apply preset");
3757 }
3758
3759 assert!(
3760 auth_store.needs_bootstrap(),
3761 "--no-auth must prevent any admin creation"
3762 );
3763 assert!(
3764 runtime
3765 .db()
3766 .store()
3767 .get_config(BOOTSTRAP_COMPLETED_KEY)
3768 .is_none(),
3769 "--no-auth must skip bootstrap-state persistence"
3770 );
3771
3772 clear_preset_env();
3773 }
3774
3775 #[test]
3776 fn implicit_bind_collision_degrades() {
3777 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3778 let addr = held.local_addr().expect("held addr").to_string();
3779 let mut readiness = TransportReadiness::default();
3780
3781 let listener =
3782 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
3783
3784 assert!(listener.is_none());
3785 assert_eq!(readiness.active.len(), 0);
3786 assert_eq!(readiness.failed.len(), 1);
3787 assert!(!readiness.failed[0].explicit);
3788 assert_eq!(readiness.failed[0].bind_addr, addr);
3789 }
3790}