1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::{
13 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18pub fn detect_runtime_config() -> RuntimeConfig {
20 let cpus = thread::available_parallelism()
21 .map(|n| n.get())
22 .unwrap_or(1);
23
24 let suggested_workers = cpus.saturating_sub(1).max(1);
26
27 RuntimeConfig {
28 available_cpus: cpus,
29 suggested_workers,
30 stack_size: 8 * 1024 * 1024, }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36 pub available_cpus: usize,
37 pub suggested_workers: usize,
38 pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43 Grpc,
44 Http,
45 Wire,
46}
47
48impl ServerTransport {
49 pub const fn as_str(self) -> &'static str {
50 match self {
51 Self::Grpc => "gRPC",
52 Self::Http => "HTTP",
53 Self::Wire => "wire",
54 }
55 }
56
57 pub const fn default_bind_addr(self) -> &'static str {
58 match self {
59 Self::Grpc => "127.0.0.1:5555",
60 Self::Http => "127.0.0.1:5055",
61 Self::Wire => "127.0.0.1:5050",
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68 pub path: Option<PathBuf>,
69 pub router_bind_addr: Option<String>,
70 pub router_bind_explicit: bool,
71 pub grpc_bind_addr: Option<String>,
72 pub grpc_bind_explicit: bool,
73 pub grpc_tls_bind_addr: Option<String>,
77 pub grpc_tls_cert: Option<PathBuf>,
83 pub grpc_tls_key: Option<PathBuf>,
86 pub grpc_tls_client_ca: Option<PathBuf>,
91 pub http_bind_addr: Option<String>,
92 pub http_bind_explicit: bool,
93 pub http_tls_bind_addr: Option<String>,
97 pub http_tls_cert: Option<PathBuf>,
100 pub http_tls_key: Option<PathBuf>,
103 pub http_tls_client_ca: Option<PathBuf>,
107 pub wire_bind_addr: Option<String>,
108 pub wire_bind_explicit: bool,
109 pub wire_tls_bind_addr: Option<String>,
111 pub wire_tls_cert: Option<PathBuf>,
113 pub wire_tls_key: Option<PathBuf>,
115 pub pg_bind_addr: Option<String>,
119 pub create_if_missing: bool,
120 pub read_only: bool,
121 pub role: String,
122 pub primary_addr: Option<String>,
123 pub vault: bool,
124 pub no_auth: bool,
135 pub workers: Option<usize>,
137 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
140 pub http_limits_cli: crate::server::HttpLimitsCliInput,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct TransportListenerState {
149 pub transport: String,
150 pub bind_addr: String,
151 pub explicit: bool,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct TransportListenerFailure {
156 pub transport: String,
157 pub bind_addr: String,
158 pub explicit: bool,
159 pub reason: String,
160}
161
162#[derive(Debug, Clone, Default, PartialEq, Eq)]
163pub struct TransportReadiness {
164 pub active: Vec<TransportListenerState>,
165 pub failed: Vec<TransportListenerFailure>,
166}
167
168impl TransportReadiness {
169 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
170 self.active.push(TransportListenerState {
171 transport: transport.to_string(),
172 bind_addr: bind_addr.to_string(),
173 explicit,
174 });
175 }
176
177 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
178 self.failed.push(TransportListenerFailure {
179 transport: transport.to_string(),
180 bind_addr: bind_addr.to_string(),
181 explicit,
182 reason,
183 });
184 }
185}
186
187#[derive(Debug, Clone)]
188pub struct SystemdServiceConfig {
189 pub service_name: String,
190 pub binary_path: PathBuf,
191 pub run_user: String,
192 pub run_group: String,
193 pub data_path: PathBuf,
194 pub router_bind_addr: Option<String>,
195 pub grpc_bind_addr: Option<String>,
196 pub http_bind_addr: Option<String>,
197}
198
199impl SystemdServiceConfig {
200 pub fn data_dir(&self) -> PathBuf {
201 self.data_path
202 .parent()
203 .map(PathBuf::from)
204 .unwrap_or_else(|| PathBuf::from("."))
205 }
206
207 pub fn unit_path(&self) -> PathBuf {
208 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
209 }
210}
211
212pub fn default_telemetry_for_path(
217 path: Option<&std::path::Path>,
218) -> crate::telemetry::TelemetryConfig {
219 let log_dir = match path {
220 Some(p) => p
221 .parent()
222 .map(|parent| parent.join("logs"))
223 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
224 None => None, };
226 crate::telemetry::TelemetryConfig {
227 log_dir,
228 file_prefix: "reddb.log".to_string(),
229 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
230 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
231 crate::telemetry::LogFormat::Pretty
232 } else {
233 crate::telemetry::LogFormat::Json
234 },
235 rotation_keep_days: 14,
236 service_name: "reddb",
237 level_explicit: false,
239 format_explicit: false,
240 rotation_keep_days_explicit: false,
241 file_prefix_explicit: false,
242 log_dir_explicit: false,
243 log_file_disabled: false,
244 }
245}
246
247const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
254const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
255const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
256const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
260
261pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
269
270pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
273 options
274 .metadata
275 .get(NO_AUTH_META)
276 .map(|v| v == "true")
277 .unwrap_or(false)
278}
279
280const NO_AUTH_WARNING: &str =
285 "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
286
287impl ServerCommandConfig {
288 fn to_db_options(&self) -> Result<RedDBOptions, String> {
289 let mut options = match &self.path {
290 Some(path) => RedDBOptions::persistent(path),
291 None => RedDBOptions::in_memory(),
292 };
293
294 options.mode = StorageMode::Persistent;
295 options.create_if_missing = self.create_if_missing;
296 options.read_only = self.read_only
303 || env_nonempty("RED_READONLY")
304 .or_else(|| env_nonempty("REDDB_READONLY"))
305 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
306 .unwrap_or(false)
307 || self.path.as_ref().is_some_and(|data_path| {
308 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
309 data_path,
310 ))
311 .unwrap_or(false)
312 });
313
314 options.replication = match self.role.as_str() {
315 "primary" => ReplicationConfig::primary(),
316 "replica" => {
317 let primary_addr = self
318 .primary_addr
319 .clone()
320 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
321 ReplicationConfig::replica(primary_addr)
328 }
329 _ => ReplicationConfig::standalone(),
330 };
331
332 if self.vault {
333 options.auth.vault_enabled = true;
334 }
335
336 if self.no_auth {
347 options.auth.enabled = false;
348 options.auth.require_auth = false;
349 options.auth.vault_enabled = false;
350 options
351 .metadata
352 .insert(NO_AUTH_META.to_string(), "true".to_string());
353 }
354
355 if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
360 options.control_events.compliance_mode = matches!(
361 raw.to_ascii_lowercase().as_str(),
362 "1" | "true" | "yes" | "on"
363 );
364 }
365 if env_nonempty(PRESET_ENV).is_some_and(|s| s.trim() == PRESET_REGULATED) {
366 options.control_events.compliance_mode = true;
367 options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
368 }
369
370 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
375 Err(msg) => {
376 return Err(format!("backup bootstrap: {msg}"));
377 }
378 Ok(Some(cfg)) => {
379 apply_backup_config(&mut options, &cfg);
380 }
381 Ok(None) => {
382 configure_remote_backend_from_env(&mut options);
383 }
384 }
385
386 Ok(options)
387 }
388
389 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
390 let mut transports = Vec::with_capacity(3);
391 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
392 transports.push(ServerTransport::Grpc);
393 }
394 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
395 transports.push(ServerTransport::Http);
396 }
397 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
398 transports.push(ServerTransport::Wire);
399 }
400 transports
401 }
402}
403
404fn env_nonempty(name: &str) -> Option<String> {
409 crate::utils::env_with_file_fallback(name)
410}
411
412fn env_truthy(name: &str) -> bool {
413 env_nonempty(name)
414 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
415 .unwrap_or(false)
416}
417
418fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
424 let endpoint_host = endpoint_host(&cfg.endpoint);
425
426 options.metadata.insert(
427 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
428 cfg.checkpoint_interval_secs.to_string(),
429 );
430 options.metadata.insert(
431 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
432 cfg.wal_flush_interval_secs.to_string(),
433 );
434 options
435 .metadata
436 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
437 options.metadata.insert(
438 BACKUP_PAUSE_ON_LAG_META.to_string(),
439 cfg.pause_on_lag_secs.to_string(),
440 );
441
442 #[cfg(feature = "backend-s3")]
443 {
444 let s3_cfg = crate::storage::backend::S3Config {
445 endpoint: cfg.endpoint.clone(),
446 bucket: cfg.bucket.clone(),
447 key_prefix: cfg.prefix.clone(),
448 access_key: cfg.access_key_id.clone(),
449 secret_key: cfg.secret_access_key.clone(),
450 region: cfg.region.clone(),
451 path_style: true,
452 };
453 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
454 options.remote_backend = Some(backend.clone());
455 options.remote_backend_atomic = Some(backend);
456 let trimmed = cfg.prefix.trim_end_matches('/');
461 options.remote_key = Some(format!("{}/data.rdb", trimmed));
462
463 tracing::info!(
464 backend = "s3",
465 endpoint = %endpoint_host,
466 bucket = %cfg.bucket,
467 prefix = %cfg.prefix,
468 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
469 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
470 "backup backend configured from REDDB_BACKUP_* env"
471 );
472 }
473
474 #[cfg(not(feature = "backend-s3"))]
475 {
476 tracing::warn!(
477 backend = "s3",
478 endpoint = %endpoint_host,
479 bucket = %cfg.bucket,
480 prefix = %cfg.prefix,
481 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
482 backend wiring skipped (archiver/checkpointer also disabled)"
483 );
484 }
485}
486
487fn endpoint_host(endpoint: &str) -> &str {
488 let after_scheme = endpoint
489 .split_once("://")
490 .map(|(_, r)| r)
491 .unwrap_or(endpoint);
492 after_scheme.split('/').next().unwrap_or(after_scheme)
493}
494
495fn spawn_backup_tasks_if_configured(
501 options: &RedDBOptions,
502 runtime: &RedDBRuntime,
503) -> Option<BackupTasksHandle> {
504 let checkpoint_secs: u64 = options
505 .metadata
506 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
507 .parse()
508 .ok()?;
509 let wal_secs: u64 = options
510 .metadata
511 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
512 .parse()
513 .ok()?;
514 let pause_on_lag_secs: u64 = options
517 .metadata
518 .get(BACKUP_PAUSE_ON_LAG_META)
519 .and_then(|raw| raw.parse().ok())
520 .unwrap_or(0);
521 options.remote_backend.as_ref()?;
522
523 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
524
525 if pause_on_lag_secs > 0 {
530 let now_ms = std::time::SystemTime::now()
531 .duration_since(std::time::UNIX_EPOCH)
532 .map(|d| d.as_millis() as u64)
533 .unwrap_or(0);
534 runtime
535 .write_gate()
536 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
537 tracing::info!(
538 pause_on_lag_secs,
539 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
540 );
541 }
542
543 let checkpoint_handle = {
544 let stop = Arc::clone(&stop);
545 let runtime = runtime.clone();
546 let interval = Duration::from_secs(checkpoint_secs);
547 thread::Builder::new()
548 .name("red-checkpointer".into())
549 .spawn(move || {
550 periodic_loop(stop, interval, move || {
551 if let Err(err) = runtime.checkpoint() {
552 tracing::warn!(error = %err, "periodic checkpoint failed");
553 }
554 })
555 })
556 .ok()
557 };
558
559 let archiver_handle = {
560 let stop = Arc::clone(&stop);
561 let runtime = runtime.clone();
562 let interval = Duration::from_secs(wal_secs);
563 let lag_enabled = pause_on_lag_secs > 0;
564 thread::Builder::new()
565 .name("red-wal-archiver".into())
566 .spawn(move || {
567 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
568 Ok(_) if lag_enabled => {
569 let now_ms = std::time::SystemTime::now()
570 .duration_since(std::time::UNIX_EPOCH)
571 .map(|d| d.as_millis() as u64)
572 .unwrap_or(0);
573 runtime.write_gate().record_archive_success(now_ms);
574 runtime.write_gate().evaluate_archive_lag(now_ms);
578 }
579 Ok(_) => {}
580 Err(err) => {
581 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
582 }
583 })
584 })
585 .ok()
586 };
587
588 let lag_monitor_handle = if pause_on_lag_secs > 0 {
593 let stop = Arc::clone(&stop);
594 let runtime = runtime.clone();
595 let interval = Duration::from_secs(5);
599 thread::Builder::new()
600 .name("red-archive-lag-monitor".into())
601 .spawn(move || {
602 periodic_loop(stop, interval, move || {
603 let now_ms = std::time::SystemTime::now()
604 .duration_since(std::time::UNIX_EPOCH)
605 .map(|d| d.as_millis() as u64)
606 .unwrap_or(0);
607 let was_paused = runtime.write_gate().is_auto_paused();
608 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
609 if now_paused && !was_paused {
610 tracing::warn!(
611 pause_on_lag_secs,
612 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
613 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
614 );
615 } else if !now_paused && was_paused {
616 tracing::info!(
617 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
618 );
619 }
620 })
621 })
622 .ok()
623 } else {
624 None
625 };
626
627 tracing::info!(
628 checkpoint_interval_secs = checkpoint_secs,
629 wal_flush_interval_secs = wal_secs,
630 "backup tasks spawned (checkpointer + WAL archiver)"
631 );
632
633 Some(BackupTasksHandle {
634 stop,
635 _checkpoint_handle: checkpoint_handle,
636 _archiver_handle: archiver_handle,
637 _lag_monitor_handle: lag_monitor_handle,
638 })
639}
640
641pub struct BackupTasksHandle {
644 stop: Arc<std::sync::atomic::AtomicBool>,
645 _checkpoint_handle: Option<thread::JoinHandle<()>>,
646 _archiver_handle: Option<thread::JoinHandle<()>>,
647 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
650}
651
652impl Drop for BackupTasksHandle {
653 fn drop(&mut self) {
654 self.stop.store(true, std::sync::atomic::Ordering::Release);
655 }
656}
657
658fn periodic_loop<F: FnMut()>(
659 stop: Arc<std::sync::atomic::AtomicBool>,
660 interval: Duration,
661 mut tick: F,
662) {
663 let wake = Duration::from_secs(1);
666 let mut elapsed = Duration::ZERO;
667 while !stop.load(std::sync::atomic::Ordering::Acquire) {
668 thread::sleep(wake);
669 elapsed += wake;
670 if elapsed >= interval {
671 tick();
672 elapsed = Duration::ZERO;
673 }
674 }
675}
676
677fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
678 let backend = env_nonempty("RED_BACKEND")
684 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
685 .unwrap_or_else(|| "none".to_string())
686 .to_ascii_lowercase();
687
688 match backend.as_str() {
689 "s3" | "minio" | "r2" => {
694 #[cfg(feature = "backend-s3")]
695 {
696 if let Some(config) = s3_config_from_env() {
697 let remote_key = env_nonempty("RED_REMOTE_KEY")
698 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
699 .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
700 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
701 options.remote_backend = Some(backend.clone());
702 options.remote_backend_atomic = Some(backend);
703 options.remote_key = Some(remote_key);
704 }
705 }
706 #[cfg(not(feature = "backend-s3"))]
707 {
708 tracing::warn!(
709 backend = %backend,
710 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
711 );
712 }
713 }
714 "fs" | "local" => {
719 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
720 let remote_key = match (
721 base_path,
722 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
723 ) {
724 (Some(base), Some(rel)) => Some(format!(
725 "{}/{}",
726 base.trim_end_matches('/'),
727 rel.trim_start_matches('/')
728 )),
729 (Some(base), None) => Some(format!(
730 "{}/clusters/dev/data.rdb",
731 base.trim_end_matches('/')
732 )),
733 (None, Some(rel)) => Some(rel),
734 (None, None) => None,
735 };
736 if let Some(key) = remote_key {
737 let backend = Arc::new(crate::storage::backend::LocalBackend);
738 options.remote_backend = Some(backend.clone());
739 options.remote_backend_atomic = Some(backend);
740 options.remote_key = Some(key);
741 }
742 }
743 "http" => {
748 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
749 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
750 {
751 Some(u) => u,
752 None => {
753 tracing::warn!(
754 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
755 );
756 return;
757 }
758 };
759 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
760 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
761 .unwrap_or_default();
762 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
763 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
764 {
765 std::fs::read_to_string(&path)
766 .ok()
767 .map(|s| s.trim().to_string())
768 .filter(|s| !s.is_empty())
769 } else {
770 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
771 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
772 };
773
774 let mut config =
775 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
776 if let Some(auth) = auth_header {
777 config = config.with_auth_header(auth);
778 }
779 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
780 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
781 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
782 config = config.with_conditional_writes(conditional_writes);
783 if conditional_writes {
788 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
789 Ok(atomic) => {
790 let atomic_arc = Arc::new(atomic);
791 options.remote_backend = Some(atomic_arc.clone());
792 options.remote_backend_atomic = Some(atomic_arc);
793 }
794 Err(err) => {
795 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
796 options.remote_backend =
797 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
798 }
799 }
800 } else {
801 options.remote_backend =
802 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
803 }
804 options.remote_key = env_nonempty("RED_REMOTE_KEY")
805 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
806 .or_else(|| Some("clusters/dev/data.rdb".to_string()));
807 }
808 "none" | "" => {}
811 other => {
812 tracing::warn!(
813 backend = %other,
814 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
815 );
816 }
817 }
818}
819
820#[cfg(feature = "backend-s3")]
825fn env_s3(suffix: &str) -> Option<String> {
826 env_nonempty(&format!("RED_S3_{suffix}"))
827 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
828}
829
830#[cfg(feature = "backend-s3")]
836fn env_s3_secret(suffix: &str) -> Option<String> {
837 let file_key_red = format!("RED_S3_{suffix}_FILE");
838 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
839 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
840 return std::fs::read_to_string(&path)
841 .ok()
842 .map(|s| s.trim().to_string())
843 .filter(|s| !s.is_empty());
844 }
845 env_s3(suffix)
846}
847
848#[cfg(feature = "backend-s3")]
849fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
850 let endpoint = env_s3("ENDPOINT")?;
851 let bucket = env_s3("BUCKET")?;
852 let access_key = env_s3_secret("ACCESS_KEY")?;
853 let secret_key = env_s3_secret("SECRET_KEY")?;
854 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
855 let key_prefix = env_s3("KEY_PREFIX")
856 .or_else(|| env_s3("PREFIX"))
857 .unwrap_or_default();
858 let path_style = env_s3("PATH_STYLE")
859 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
860 .unwrap_or(true);
861 Some(crate::storage::backend::S3Config {
862 endpoint,
863 bucket,
864 key_prefix,
865 access_key,
866 secret_key,
867 region,
868 path_style,
869 })
870}
871
872pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
873 let data_dir = config.data_dir();
874 let exec_start = render_systemd_exec_start(config);
875 format!(
876 "[Unit]\n\
877Description=RedDB unified database service\n\
878After=network-online.target\n\
879Wants=network-online.target\n\
880\n\
881[Service]\n\
882Type=simple\n\
883User={user}\n\
884Group={group}\n\
885WorkingDirectory={workdir}\n\
886ExecStart={exec_start}\n\
887Restart=always\n\
888RestartSec=2\n\
889LimitSTACK=16M\n\
890NoNewPrivileges=true\n\
891PrivateTmp=true\n\
892ProtectSystem=strict\n\
893ProtectHome=true\n\
894ProtectControlGroups=true\n\
895ProtectKernelTunables=true\n\
896ProtectKernelModules=true\n\
897RestrictNamespaces=true\n\
898LockPersonality=true\n\
899MemoryDenyWriteExecute=true\n\
900ReadWritePaths={workdir}\n\
901\n\
902[Install]\n\
903WantedBy=multi-user.target\n",
904 user = config.run_user,
905 group = config.run_group,
906 workdir = data_dir.display(),
907 exec_start = exec_start,
908 )
909}
910
911#[cfg(target_os = "linux")]
920pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
921 ensure_root()?;
922 ensure_command_available("systemctl")?;
923 ensure_command_available("getent")?;
924 ensure_command_available("groupadd")?;
925 ensure_command_available("useradd")?;
926 ensure_command_available("install")?;
927 ensure_executable(&config.binary_path)?;
928
929 if !command_success("getent", ["group", config.run_group.as_str()])? {
930 run_command("groupadd", ["--system", config.run_group.as_str()])?;
931 }
932
933 if !command_success("id", ["-u", config.run_user.as_str()])? {
934 let data_dir = config.data_dir();
935 run_command(
936 "useradd",
937 [
938 "--system",
939 "--gid",
940 config.run_group.as_str(),
941 "--home-dir",
942 data_dir.to_string_lossy().as_ref(),
943 "--shell",
944 "/usr/sbin/nologin",
945 config.run_user.as_str(),
946 ],
947 )?;
948 }
949
950 let data_dir = config.data_dir();
951 run_command(
952 "install",
953 [
954 "-d",
955 "-o",
956 config.run_user.as_str(),
957 "-g",
958 config.run_group.as_str(),
959 "-m",
960 "0750",
961 data_dir.to_string_lossy().as_ref(),
962 ],
963 )?;
964
965 std::fs::write(config.unit_path(), render_systemd_unit(config))
966 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
967
968 run_command("systemctl", ["daemon-reload"])?;
969 run_command(
970 "systemctl",
971 [
972 "enable",
973 "--now",
974 format!("{}.service", config.service_name).as_str(),
975 ],
976 )?;
977
978 Ok(())
979}
980
981#[cfg(not(target_os = "linux"))]
986pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
987 Err("systemd install is Linux-only — use sc.exe (Windows) or \
988 launchd (macOS) to install the service manually using the \
989 unit printed by `red service print-unit`"
990 .to_string())
991}
992
993#[cfg(target_os = "linux")]
994fn ensure_root() -> Result<(), String> {
995 let output = Command::new("id")
996 .arg("-u")
997 .output()
998 .map_err(|err| format!("failed to determine current uid: {err}"))?;
999 if !output.status.success() {
1000 return Err("failed to determine current uid".to_string());
1001 }
1002 let uid = String::from_utf8_lossy(&output.stdout);
1003 if uid.trim() != "0" {
1004 return Err("run this command as root (sudo)".to_string());
1005 }
1006 Ok(())
1007}
1008
1009#[cfg(target_os = "linux")]
1010fn ensure_command_available(command: &str) -> Result<(), String> {
1011 let status = Command::new("sh")
1012 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1013 .status()
1014 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1015 if status.success() {
1016 Ok(())
1017 } else {
1018 Err(format!("required command not found: {command}"))
1019 }
1020}
1021
1022#[cfg(target_os = "linux")]
1023fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1024 let metadata = std::fs::metadata(path)
1025 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1026 #[cfg(unix)]
1027 {
1028 use std::os::unix::fs::PermissionsExt;
1029 if metadata.permissions().mode() & 0o111 == 0 {
1030 return Err(format!("binary is not executable: {}", path.display()));
1031 }
1032 }
1033 #[cfg(not(unix))]
1034 {
1035 if !metadata.is_file() {
1036 return Err(format!("binary is not a file: {}", path.display()));
1037 }
1038 }
1039 Ok(())
1040}
1041
1042#[cfg(target_os = "linux")]
1043fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1044 Command::new(program)
1045 .args(args)
1046 .status()
1047 .map(|status| status.success())
1048 .map_err(|err| format!("failed to run {program}: {err}"))
1049}
1050
1051#[cfg(target_os = "linux")]
1052fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1053 let output = Command::new(program)
1054 .args(args)
1055 .output()
1056 .map_err(|err| format!("failed to run {program}: {err}"))?;
1057 if output.status.success() {
1058 return Ok(());
1059 }
1060
1061 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1062 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1063 let detail = if !stderr.is_empty() {
1064 stderr
1065 } else if !stdout.is_empty() {
1066 stdout
1067 } else {
1068 format!("exit status {}", output.status)
1069 };
1070 Err(format!("{program} failed: {detail}"))
1071}
1072
1073pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1074 let has_any = config.router_bind_addr.is_some()
1075 || config.grpc_bind_addr.is_some()
1076 || config.http_bind_addr.is_some()
1077 || config.wire_bind_addr.is_some()
1078 || config.pg_bind_addr.is_some();
1079 if !has_any {
1080 return Err("at least one server bind address must be configured".into());
1081 }
1082 let thread_name = if config.router_bind_addr.is_some() {
1083 "red-server-router"
1084 } else {
1085 match (
1086 config.grpc_bind_addr.is_some(),
1087 config.http_bind_addr.is_some(),
1088 ) {
1089 (true, true) => "red-server-dual",
1090 (true, false) => "red-server-grpc",
1091 (false, true) => "red-server-http",
1092 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1093 (false, false) => "red-server-pg-wire",
1094 }
1095 };
1096
1097 let handle = thread::Builder::new()
1098 .name(thread_name.into())
1099 .stack_size(8 * 1024 * 1024)
1100 .spawn(move || run_configured_servers(config))
1101 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1102
1103 match handle.join() {
1104 Ok(result) => result,
1105 Err(_) => Err("server thread panicked".to_string()),
1106 }
1107}
1108
1109fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1110 let mut parts = vec![
1111 config.binary_path.display().to_string(),
1112 "server".to_string(),
1113 "--path".to_string(),
1114 config.data_path.display().to_string(),
1115 ];
1116
1117 if let Some(bind_addr) = &config.router_bind_addr {
1118 parts.push("--bind".to_string());
1119 parts.push(bind_addr.clone());
1120 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1121 parts.push("--grpc-bind".to_string());
1122 parts.push(bind_addr.clone());
1123 }
1124 if let Some(bind_addr) = &config.http_bind_addr {
1125 parts.push("--http-bind".to_string());
1126 parts.push(bind_addr.clone());
1127 }
1128
1129 parts.join(" ")
1130}
1131
1132pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1133 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1134 Ok(addresses) => addresses.collect(),
1135 Err(_) => return false,
1136 };
1137
1138 addresses
1139 .into_iter()
1140 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1141}
1142
1143#[inline(never)]
1144fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1145 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1151 return run_routed_server(config, router_bind_addr);
1152 }
1153
1154 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1155 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1156 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1157 }
1158 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1159 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1160 (None, None) => {
1161 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1162 run_wire_only_server(config, wire_addr)
1163 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1164 run_pg_only_server(config, pg_addr)
1165 } else {
1166 Err("at least one server bind address must be configured".to_string())
1167 }
1168 }
1169 }
1170}
1171
1172pub fn bind_listener_for_startup(
1190 readiness: &mut TransportReadiness,
1191 transport: &str,
1192 bind_addr: &str,
1193 explicit: bool,
1194) -> Result<Option<TcpListener>, String> {
1195 match TcpListener::bind(bind_addr) {
1196 Ok(listener) => {
1197 readiness.active(transport, bind_addr, explicit);
1198 Ok(Some(listener))
1199 }
1200 Err(err) => {
1201 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1202 readiness.failed(transport, bind_addr, explicit, reason.clone());
1203 if explicit {
1204 tracing::error!(
1205 transport,
1206 bind = %bind_addr,
1207 error = %err,
1208 "fatal explicit bind failure"
1209 );
1210 Err(format!("explicit {reason}"))
1211 } else {
1212 tracing::warn!(
1213 transport,
1214 bind = %bind_addr,
1215 error = %err,
1216 "non-fatal implicit bind failure; listener degraded"
1217 );
1218 Ok(None)
1219 }
1220 }
1221 }
1222}
1223
1224async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1247 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1248 .ok()
1249 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1250 .unwrap_or(true);
1251
1252 #[cfg(unix)]
1253 {
1254 use tokio::signal::unix::{signal, SignalKind};
1255
1256 let mut sigterm = match signal(SignalKind::terminate()) {
1257 Ok(s) => s,
1258 Err(err) => {
1259 tracing::warn!(
1260 error = %err,
1261 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1262 );
1263 return;
1264 }
1265 };
1266 let mut sigint = match signal(SignalKind::interrupt()) {
1267 Ok(s) => s,
1268 Err(err) => {
1269 tracing::warn!(error = %err, "could not install SIGINT handler");
1270 return;
1271 }
1272 };
1273 let mut sighup = match signal(SignalKind::hangup()) {
1279 Ok(s) => Some(s),
1280 Err(err) => {
1281 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1282 None
1283 }
1284 };
1285
1286 let reload_runtime = runtime.clone();
1287 tokio::spawn(async move {
1288 loop {
1289 let signal_name = match &mut sighup {
1290 Some(hup) => tokio::select! {
1291 _ = sigterm.recv() => "SIGTERM",
1292 _ = sigint.recv() => "SIGINT",
1293 _ = hup.recv() => "SIGHUP",
1294 },
1295 None => tokio::select! {
1296 _ = sigterm.recv() => "SIGTERM",
1297 _ = sigint.recv() => "SIGINT",
1298 },
1299 };
1300
1301 if signal_name == "SIGHUP" {
1302 handle_sighup_reload(&reload_runtime);
1303 continue; }
1305
1306 tracing::info!(
1307 signal = signal_name,
1308 "lifecycle signal received; shutting down"
1309 );
1310 match runtime.graceful_shutdown(backup_on_shutdown) {
1311 Ok(report) => {
1312 tracing::info!(
1313 duration_ms = report.duration_ms,
1314 flushed_wal = report.flushed_wal,
1315 final_checkpoint = report.final_checkpoint,
1316 backup_uploaded = report.backup_uploaded,
1317 "graceful shutdown complete"
1318 );
1319 }
1320 Err(err) => {
1321 tracing::error!(error = %err, "graceful shutdown failed");
1322 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1328 reason: format!("graceful shutdown failed: {err}"),
1329 }
1330 .emit_global();
1331 }
1332 }
1333 std::process::exit(0);
1334 }
1335 });
1336 }
1337
1338 #[cfg(not(unix))]
1339 {
1340 tokio::spawn(async move {
1341 let interrupted = tokio::signal::ctrl_c().await;
1342 if let Err(err) = interrupted {
1343 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1344 return;
1345 }
1346
1347 tracing::info!(
1348 signal = "Ctrl+C",
1349 "lifecycle signal received; shutting down"
1350 );
1351 match runtime.graceful_shutdown(backup_on_shutdown) {
1352 Ok(report) => {
1353 tracing::info!(
1354 duration_ms = report.duration_ms,
1355 flushed_wal = report.flushed_wal,
1356 final_checkpoint = report.final_checkpoint,
1357 backup_uploaded = report.backup_uploaded,
1358 "graceful shutdown complete"
1359 );
1360 }
1361 Err(err) => {
1362 tracing::error!(error = %err, "graceful shutdown failed");
1363 }
1364 }
1365 std::process::exit(0);
1366 });
1367 }
1368}
1369
1370fn handle_sighup_reload(runtime: &RedDBRuntime) {
1379 let now_ms = std::time::SystemTime::now()
1380 .duration_since(std::time::UNIX_EPOCH)
1381 .map(|d| d.as_millis() as u64)
1382 .unwrap_or(0);
1383 tracing::info!(
1384 target: "reddb::secrets",
1385 ts_unix_ms = now_ms,
1386 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1387 );
1388 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1393 runtime.audit_log().record_event(
1394 AuditEvent::builder("config/sighup_reload")
1395 .source(AuditAuthSource::System)
1396 .resource("secrets")
1397 .outcome(Outcome::Success)
1398 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1399 .build(),
1400 );
1401}
1402
1403#[inline(never)]
1404fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1405 let workers = config.workers;
1406 let cli_telemetry = config.telemetry.clone();
1407 let db_options = config.to_db_options()?;
1408 let rt_config = detect_runtime_config();
1409 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1410 let (runtime, auth_store, _telemetry_guard) =
1411 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1412 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1413
1414 spawn_admin_metrics_listeners(&runtime, &auth_store);
1415
1416 let http_server = build_http_server(
1422 runtime.clone(),
1423 auth_store.clone(),
1424 router_bind_addr.clone(),
1425 );
1426 let http_server = apply_http_limits(http_server, &config, &runtime);
1427
1428 let grpc_server = RedDBGrpcServer::with_options(
1429 runtime.clone(),
1430 GrpcServerOptions {
1431 bind_addr: router_bind_addr.clone(),
1432 tls: None,
1433 },
1434 auth_store,
1435 );
1436
1437 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1438 .enable_all()
1439 .worker_threads(worker_threads)
1440 .thread_stack_size(rt_config.stack_size)
1441 .build()
1442 .map_err(|err| format!("tokio runtime: {err}"))?;
1443
1444 let signal_runtime = runtime.clone();
1445 let wire_runtime = Arc::new(runtime);
1446 tokio_runtime.block_on(async move {
1447 spawn_lifecycle_signal_handler(signal_runtime).await;
1448 tracing::info!(
1449 bind = %router_bind_addr,
1450 cpus = rt_config.available_cpus,
1451 workers = worker_threads,
1452 "router bootstrapping"
1453 );
1454 serve_tcp_router(InProcessRouterConfig {
1455 bind_addr: router_bind_addr,
1456 http_server,
1457 grpc_server,
1458 wire_runtime,
1459 })
1460 .await
1461 .map_err(|err| err.to_string())
1462 })
1463}
1464
1465async fn spawn_wire_listeners(
1467 config: &ServerCommandConfig,
1468 runtime: &RedDBRuntime,
1469 readiness: &mut TransportReadiness,
1470) -> Result<(), String> {
1471 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1473 let wire_rt = Arc::new(runtime.clone());
1474 #[cfg(unix)]
1477 {
1478 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1479 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1480 tokio::spawn(async move {
1481 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1482 &wire_addr, wire_rt,
1483 )
1484 .await
1485 {
1486 tracing::error!(err = %e, "redwire unix listener error");
1487 }
1488 });
1489 return Ok(());
1490 }
1491 }
1492 match tokio::net::TcpListener::bind(&wire_addr).await {
1493 Ok(listener) => {
1494 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1495 tokio::spawn(async move {
1496 if let Err(e) =
1497 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1498 .await
1499 {
1500 tracing::error!(err = %e, "redwire listener error");
1501 }
1502 });
1503 }
1504 Err(err) => {
1505 let reason = format!("wire listener bind {wire_addr}: {err}");
1506 readiness.failed(
1507 "wire",
1508 &wire_addr,
1509 config.wire_bind_explicit,
1510 reason.clone(),
1511 );
1512 if config.wire_bind_explicit {
1513 tracing::error!(
1514 transport = "wire",
1515 bind = %wire_addr,
1516 error = %err,
1517 "fatal explicit bind failure"
1518 );
1519 return Err(format!("explicit {reason}"));
1520 }
1521 tracing::warn!(
1522 transport = "wire",
1523 bind = %wire_addr,
1524 error = %err,
1525 "non-fatal implicit bind failure; listener degraded"
1526 );
1527 }
1528 }
1529 }
1530
1531 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1533 let tls_config = resolve_wire_tls_config(config);
1534 match tls_config {
1535 Ok(tls_cfg) => {
1536 let wire_rt = Arc::new(runtime.clone());
1537 tokio::spawn(async move {
1538 if let Err(e) =
1539 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1540 .await
1541 {
1542 tracing::error!(err = %e, "redwire+tls listener error");
1543 }
1544 });
1545 }
1546 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1547 }
1548 }
1549 Ok(())
1550}
1551
1552fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1559 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1560 let rt = Arc::new(runtime.clone());
1561 tokio::spawn(async move {
1562 let cfg = crate::wire::PgWireConfig {
1563 bind_addr: pg_addr,
1564 ..Default::default()
1565 };
1566 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1567 tracing::error!(err = %e, "pg wire listener error");
1568 }
1569 });
1570 }
1571}
1572
1573fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1587 use crate::utils::secret_file::expand_file_env;
1588
1589 for var in [
1593 "REDDB_GRPC_TLS_CERT",
1594 "REDDB_GRPC_TLS_KEY",
1595 "REDDB_GRPC_TLS_CLIENT_CA",
1596 ] {
1597 if let Err(err) = expand_file_env(var) {
1598 tracing::warn!(
1599 target: "reddb::secrets",
1600 env = %var,
1601 err = %err,
1602 "could not expand *_FILE companion for gRPC TLS"
1603 );
1604 }
1605 }
1606
1607 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1608 (Some(cert), Some(key)) => {
1609 let cert_pem = std::fs::read(cert)
1610 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1611 let key_pem =
1612 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1613 (cert_pem, key_pem)
1614 }
1615 _ => {
1616 let dev = std::env::var("RED_GRPC_TLS_DEV")
1618 .ok()
1619 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1620 .unwrap_or(false);
1621 if !dev {
1622 return Err("gRPC TLS configured but no cert/key supplied — set \
1623 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1624 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1625 .to_string());
1626 }
1627 let dir = config
1628 .path
1629 .as_ref()
1630 .and_then(|p| p.parent())
1631 .map(PathBuf::from)
1632 .unwrap_or_else(|| PathBuf::from("."));
1633 let (cert_pem_str, key_pem_str) =
1634 crate::wire::tls::generate_self_signed_cert("localhost")
1635 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1636
1637 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1642 tracing::warn!(
1643 target: "reddb::security",
1644 transport = "grpc",
1645 cert_sha256 = %fp,
1646 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1647 DO NOT use in production"
1648 );
1649 let cert_path = dir.join("grpc-tls-cert.pem");
1651 let key_path = dir.join("grpc-tls-key.pem");
1652 if !cert_path.exists() || !key_path.exists() {
1653 let _ = std::fs::create_dir_all(&dir);
1654 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1655 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1656 std::fs::write(&key_path, key_pem_str.as_bytes())
1657 .map_err(|e| format!("write grpc dev key: {e}"))?;
1658 #[cfg(unix)]
1659 {
1660 use std::os::unix::fs::PermissionsExt;
1661 let _ =
1662 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1663 }
1664 }
1665 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1666 }
1667 };
1668
1669 let client_ca_pem = match &config.grpc_tls_client_ca {
1670 Some(path) => Some(
1671 std::fs::read(path)
1672 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1673 ),
1674 None => None,
1675 };
1676
1677 Ok(crate::GrpcTlsOptions {
1678 cert_pem,
1679 key_pem,
1680 client_ca_pem,
1681 })
1682}
1683
1684fn spawn_grpc_tls_listener_if_configured(
1688 config: &ServerCommandConfig,
1689 runtime: RedDBRuntime,
1690 auth_store: Arc<AuthStore>,
1691) {
1692 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1693 return;
1694 };
1695 let tls_opts = match resolve_grpc_tls_options(config) {
1696 Ok(opts) => opts,
1697 Err(err) => {
1698 tracing::error!(
1699 target: "reddb::security",
1700 transport = "grpc",
1701 err = %err,
1702 "gRPC TLS config error; TLS listener will not start"
1703 );
1704 return;
1705 }
1706 };
1707 tokio::spawn(async move {
1708 let server = RedDBGrpcServer::with_options(
1709 runtime,
1710 GrpcServerOptions {
1711 bind_addr: tls_bind.clone(),
1712 tls: Some(tls_opts),
1713 },
1714 auth_store,
1715 );
1716 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1717 if let Err(err) = server.serve().await {
1718 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1719 }
1720 });
1721}
1722
1723fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1726 use sha2::{Digest, Sha256};
1727 let mut h = Sha256::new();
1728 h.update(pem);
1729 let d = h.finalize();
1730 let mut buf = String::with_capacity(64);
1731 for b in d.iter() {
1732 buf.push_str(&format!("{b:02x}"));
1733 }
1734 buf
1735}
1736
1737fn resolve_wire_tls_config(
1739 config: &ServerCommandConfig,
1740) -> Result<crate::wire::WireTlsConfig, String> {
1741 match (&config.wire_tls_cert, &config.wire_tls_key) {
1742 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1743 cert_path: cert.clone(),
1744 key_path: key.clone(),
1745 }),
1746 _ => {
1747 let dir = config
1749 .path
1750 .as_ref()
1751 .and_then(|p| p.parent())
1752 .map(PathBuf::from)
1753 .unwrap_or_else(|| PathBuf::from("."));
1754 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1755 }
1756 }
1757}
1758
1759#[inline(never)]
1760fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1761 let rt_config = detect_runtime_config();
1762 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1763 let cli_telemetry = config.telemetry.clone();
1764 let db_options = config.to_db_options()?;
1765 let mut transport_readiness = TransportReadiness::default();
1766
1767 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1768 .enable_all()
1769 .worker_threads(workers)
1770 .thread_stack_size(rt_config.stack_size)
1771 .build()
1772 .map_err(|err| format!("tokio runtime: {err}"))?;
1773
1774 let (runtime, _auth_store, _telemetry_guard) =
1778 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1779 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1780 let signal_runtime = runtime.clone();
1781 tokio_runtime.block_on(async move {
1782 spawn_lifecycle_signal_handler(signal_runtime).await;
1783 spawn_pg_listener(&config, &runtime);
1784 let wire_rt = Arc::new(runtime);
1785 let listener = tokio::net::TcpListener::bind(&wire_addr)
1786 .await
1787 .map_err(|err| {
1788 let reason = format!("wire listener bind {wire_addr}: {err}");
1789 transport_readiness.failed(
1790 "wire",
1791 &wire_addr,
1792 config.wire_bind_explicit,
1793 reason.clone(),
1794 );
1795 if config.wire_bind_explicit {
1796 format!("explicit {reason}")
1797 } else {
1798 reason
1799 }
1800 })?;
1801 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1802 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1803 .await
1804 .map_err(|e| e.to_string())
1805 })
1806}
1807
1808#[inline(never)]
1809fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1810 let rt_config = detect_runtime_config();
1811 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1812 let cli_telemetry = config.telemetry.clone();
1813 let db_options = config.to_db_options()?;
1814
1815 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1816 .enable_all()
1817 .worker_threads(workers)
1818 .thread_stack_size(rt_config.stack_size)
1819 .build()
1820 .map_err(|err| format!("tokio runtime: {err}"))?;
1821
1822 let (runtime, _auth_store, _telemetry_guard) =
1823 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1824 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1825 let signal_runtime = runtime.clone();
1826 tokio_runtime.block_on(async move {
1827 spawn_lifecycle_signal_handler(signal_runtime).await;
1828 let cfg = crate::wire::PgWireConfig {
1829 bind_addr: pg_addr,
1830 ..Default::default()
1831 };
1832 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1833 .await
1834 .map_err(|e| e.to_string())
1835 })
1836}
1837
1838#[inline(never)]
1839fn build_runtime_and_auth_store(
1840 db_options: RedDBOptions,
1841 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1842) -> Result<
1843 (
1844 RedDBRuntime,
1845 Arc<AuthStore>,
1846 Option<crate::telemetry::TelemetryGuard>,
1847 ),
1848 String,
1849> {
1850 build_runtime_with_telemetry(db_options, cli_telemetry)
1857}
1858
1859pub(crate) fn build_runtime_with_telemetry(
1869 db_options: RedDBOptions,
1870 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1871) -> Result<
1872 (
1873 RedDBRuntime,
1874 Arc<AuthStore>,
1875 Option<crate::telemetry::TelemetryGuard>,
1876 ),
1877 String,
1878> {
1879 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1880 let msg = err.to_string();
1886 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1887 phase: "runtime_construction".to_string(),
1888 error: msg.clone(),
1889 }
1890 .emit_global();
1891 msg
1892 })?;
1893
1894 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1899 let msg = err.to_string();
1900 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1901 phase: "lease_loop".to_string(),
1902 error: msg.clone(),
1903 }
1904 .emit_global();
1905 msg
1906 })?;
1907
1908 if let Some(data_path) = db_options.data_path.as_deref() {
1912 let watch_dir = data_path.parent().unwrap_or(data_path);
1913 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1914 }
1915
1916 {
1920 let config_path = crate::runtime::config_overlay::config_file_path();
1921 let store = runtime.db().store();
1922 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1923 }
1924
1925 let merged = merge_telemetry_with_config(
1928 cli_telemetry
1929 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1930 &runtime,
1931 );
1932 let telemetry_guard = crate::telemetry::init(merged);
1933
1934 let no_auth = no_auth_active(&db_options);
1935 let auth_store =
1936 if db_options.auth.vault_enabled {
1937 let pager =
1938 runtime.db().store().pager().cloned().ok_or_else(|| {
1939 "vault requires a paged database (persistent mode)".to_string()
1940 })?;
1941 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1942 .map_err(|err| err.to_string())?;
1943 Arc::new(store)
1944 } else {
1945 Arc::new(AuthStore::new(db_options.auth.clone()))
1946 };
1947 auth_store.configure_control_events(
1948 runtime.control_event_ledger(),
1949 runtime.control_event_config(),
1950 );
1951 if no_auth {
1956 eprintln!("{NO_AUTH_WARNING}");
1957 tracing::warn!("{NO_AUTH_WARNING}");
1958 } else {
1959 apply_preset(&runtime, &auth_store)?;
1960 maybe_apply_policy_break_glass(&auth_store);
1961 }
1962
1963 {
1965 let store = Arc::clone(&auth_store);
1966 std::thread::Builder::new()
1967 .name("reddb-session-purge".into())
1968 .spawn(move || loop {
1969 std::thread::sleep(std::time::Duration::from_secs(300));
1970 store.purge_expired_sessions();
1971 })
1972 .ok();
1973 }
1974
1975 Ok((runtime, auth_store, telemetry_guard))
1976}
1977
1978fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
1982 use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
1983
1984 let enabled = std::env::var(BREAK_GLASS_ENV)
1985 .ok()
1986 .map(|v| {
1987 let trimmed = v.trim().to_ascii_lowercase();
1988 matches!(trimmed.as_str(), "1" | "true" | "yes")
1989 })
1990 .unwrap_or(false);
1991 if !enabled {
1992 return;
1993 }
1994 let now = crate::utils::now_unix_millis() as u128;
1995 match auth_store.apply_policy_break_glass(now) {
1996 Ok(()) => {
1997 tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
1998 }
1999 Err(err) => {
2000 tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2001 }
2002 }
2003}
2004
2005pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2010pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2011pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2012
2013pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2015pub(crate) const PRESET_SIMPLE: &str = "simple";
2016pub(crate) const PRESET_PRODUCTION: &str = "production";
2017pub(crate) const PRESET_REGULATED: &str = "regulated";
2018
2019pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2023pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2024pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2025pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2026pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2027
2028pub(crate) fn apply_preset(
2037 runtime: &RedDBRuntime,
2038 auth_store: &Arc<AuthStore>,
2039) -> Result<(), String> {
2040 let store = runtime.db().store();
2041
2042 if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2043 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2044 runtime,
2045 &runtime.config_registry(),
2046 )?;
2047 tracing::info!("bootstrap state present, skipping preset application");
2048 return Ok(());
2049 }
2050
2051 for var in ["REDDB_USERNAME", "REDDB_PASSWORD"] {
2055 crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2056 }
2057
2058 let preset = std::env::var(PRESET_ENV)
2059 .ok()
2060 .map(|s| s.trim().to_string())
2061 .filter(|s| !s.is_empty())
2062 .unwrap_or_else(|| PRESET_SIMPLE.to_string());
2063
2064 if let Ok(path) = std::env::var(crate::cli::bootstrap_manifest::MANIFEST_ENV) {
2065 let path = path.trim();
2066 if !path.is_empty() {
2067 let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2068 runtime,
2069 auth_store,
2070 &runtime.config_registry(),
2071 std::path::Path::new(path),
2072 )?;
2073 persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2074 tracing::info!("bootstrap manifest applied");
2075 return Ok(());
2076 }
2077 }
2078
2079 let first_admin_id = match preset.as_str() {
2080 PRESET_SIMPLE => {
2081 None
2085 }
2086 PRESET_PRODUCTION => Some(apply_production_preset(auth_store)?),
2087 PRESET_REGULATED => {
2088 apply_regulated_preset(runtime, auth_store)?;
2089 None
2090 }
2091 other => {
2092 return Err(format!(
2093 "REDDB_PRESET={other:?} is not recognised (expected `simple`, `production`, or `regulated`)"
2094 ));
2095 }
2096 };
2097
2098 persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2099 tracing::info!(preset = %preset, "bootstrap preset applied");
2100 Ok(())
2101}
2102
2103fn apply_production_preset(auth_store: &Arc<AuthStore>) -> Result<String, String> {
2104 use crate::auth::store::PrincipalRef;
2105 use crate::auth::{policies::Policy, UserId};
2106
2107 let username = std::env::var("REDDB_USERNAME")
2108 .ok()
2109 .filter(|s| !s.is_empty())
2110 .ok_or_else(|| {
2111 "REDDB_PRESET=production requires REDDB_USERNAME (or REDDB_USERNAME_FILE)".to_string()
2112 })?;
2113 let password = std::env::var("REDDB_PASSWORD")
2114 .ok()
2115 .filter(|s| !s.is_empty())
2116 .ok_or_else(|| {
2117 "REDDB_PRESET=production requires REDDB_PASSWORD (or REDDB_PASSWORD_FILE)".to_string()
2118 })?;
2119
2120 let result = auth_store
2123 .bootstrap_system_admin(&username, &password)
2124 .map_err(|err| format!("bootstrap first admin: {err}"))?;
2125 let first_admin = UserId::platform(result.user.username.clone());
2126
2127 let policy = Policy::from_json_str(&format!(
2129 r#"{{
2130 "id": "{id}",
2131 "version": 1,
2132 "statements": [{{
2133 "effect": "allow",
2134 "actions": ["*"],
2135 "resources": ["*"]
2136 }}]
2137 }}"#,
2138 id = FIRST_ADMIN_ALLOW_ALL_POLICY
2139 ))
2140 .map_err(|err| format!("compile allow-all policy: {err}"))?;
2141 auth_store
2142 .put_policy(policy)
2143 .map_err(|err| format!("install allow-all policy: {err}"))?;
2144
2145 auth_store
2147 .attach_policy(
2148 PrincipalRef::User(first_admin.clone()),
2149 FIRST_ADMIN_ALLOW_ALL_POLICY,
2150 )
2151 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2152
2153 Ok(first_admin.to_string())
2154}
2155
2156fn apply_regulated_preset(
2157 runtime: &RedDBRuntime,
2158 auth_store: &Arc<AuthStore>,
2159) -> Result<(), String> {
2160 use crate::auth::policies::Policy;
2161 use crate::auth::registry::EvidenceRequirement;
2162
2163 runtime.query_audit().enable_infrastructure();
2164
2165 let policy = Policy::from_json_str(&format!(
2166 r#"{{
2167 "id": "{id}",
2168 "version": 1,
2169 "statements": [
2170 {{
2171 "effect": "deny",
2172 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2173 "resources": ["policy:{id}"]
2174 }},
2175 {{
2176 "effect": "deny",
2177 "actions": ["config:write"],
2178 "resources": [
2179 "config:{audit}.*",
2180 "config:{evidence}.*",
2181 "config:{query_audit}.*"
2182 ]
2183 }}
2184 ]
2185 }}"#,
2186 id = REGULATED_PROTECT_MANAGED_POLICY,
2187 audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2188 evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2189 query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2190 ))
2191 .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2192 auth_store
2193 .put_policy(policy)
2194 .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2195
2196 let now_ms = crate::utils::now_unix_millis() as u128;
2197 let entries = vec![
2198 regulated_registry_entry(
2199 REGULATED_PROTECT_MANAGED_POLICY,
2200 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2201 "iam_policy",
2202 "policy:*",
2203 &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2204 EvidenceRequirement::Metadata,
2205 now_ms,
2206 ),
2207 regulated_registry_entry(
2208 REGULATED_AUDIT_CONFIG_NAMESPACE,
2209 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2210 "config_namespace",
2211 "config:write",
2212 &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2213 EvidenceRequirement::Metadata,
2214 now_ms,
2215 ),
2216 regulated_registry_entry(
2217 REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2218 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2219 "config_namespace",
2220 "config:write",
2221 &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2222 EvidenceRequirement::Metadata,
2223 now_ms,
2224 ),
2225 regulated_registry_entry(
2226 REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2227 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2228 "config_namespace",
2229 "config:write",
2230 &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2231 EvidenceRequirement::Metadata,
2232 now_ms,
2233 ),
2234 ];
2235
2236 for entry in entries.iter().cloned() {
2237 runtime
2238 .config_registry()
2239 .restore_bootstrap_entry(entry)
2240 .map_err(|err| format!("install regulated registry entry: {err}"))?;
2241 }
2242 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2243 Ok(())
2244}
2245
2246fn regulated_registry_entry(
2247 id: &str,
2248 resource_type: &str,
2249 schema: &str,
2250 required_action: &str,
2251 required_resource: &str,
2252 evidence_requirement: crate::auth::registry::EvidenceRequirement,
2253 updated_at_ms: u128,
2254) -> crate::auth::registry::ConfigRegistryEntry {
2255 crate::auth::registry::ConfigRegistryEntry {
2256 id: id.to_string(),
2257 version: 1,
2258 resource_type: resource_type.to_string(),
2259 schema: schema.to_string(),
2260 mutability: crate::auth::registry::Mutability::Immutable,
2261 sensitivity: crate::auth::registry::Sensitivity::Internal,
2262 managed: true,
2263 required_action: required_action.to_string(),
2264 required_resource: required_resource.to_string(),
2265 evidence_requirement,
2266 updated_by: "system:regulated-preset".to_string(),
2267 updated_at_ms,
2268 }
2269}
2270
2271fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2272 let store = runtime.db().store();
2273 let mut tree = crate::serde_json::Map::new();
2274 tree.insert(
2275 BOOTSTRAP_COMPLETED_KEY.to_string(),
2276 crate::serde_json::Value::Bool(true),
2277 );
2278 tree.insert(
2279 BOOTSTRAP_PRESET_KEY.to_string(),
2280 crate::serde_json::Value::String(preset.to_string()),
2281 );
2282 if let Some(id) = first_admin_id {
2283 tree.insert(
2284 BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2285 crate::serde_json::Value::String(id.to_string()),
2286 );
2287 }
2288 let json = crate::serde_json::Value::Object(tree);
2289 store.set_config_tree("", &json);
2290}
2291
2292fn merge_telemetry_with_config(
2303 mut cli: crate::telemetry::TelemetryConfig,
2304 runtime: &RedDBRuntime,
2305) -> crate::telemetry::TelemetryConfig {
2306 use crate::storage::schema::Value;
2307
2308 let store = runtime.db().store();
2309
2310 if !cli.level_explicit {
2311 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2312 cli.level_filter = v.to_string();
2313 }
2314 }
2315 if !cli.format_explicit {
2316 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2317 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2318 cli.format = parsed;
2319 }
2320 }
2321 }
2322 if !cli.rotation_keep_days_explicit {
2323 match store.get_config("red.logging.keep_days") {
2324 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2325 cli.rotation_keep_days = n as u16
2326 }
2327 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2328 cli.rotation_keep_days = n as u16
2329 }
2330 Some(Value::Text(v)) => {
2331 if let Ok(n) = v.parse::<u16>() {
2332 cli.rotation_keep_days = n;
2333 }
2334 }
2335 _ => {}
2336 }
2337 }
2338 if !cli.file_prefix_explicit {
2339 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2340 if !v.is_empty() {
2341 cli.file_prefix = v.to_string();
2342 }
2343 }
2344 }
2345 if !cli.log_dir_explicit && !cli.log_file_disabled {
2348 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2349 if !v.is_empty() {
2350 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2351 }
2352 }
2353 }
2354
2355 cli
2356}
2357
2358#[cfg(test)]
2359mod telemetry_merge_tests {
2360 use super::*;
2361 use crate::telemetry::{LogFormat, TelemetryConfig};
2362
2363 fn fresh_runtime() -> RedDBRuntime {
2364 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2365 }
2366
2367 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2368 runtime
2369 .db()
2370 .store()
2371 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2372 }
2373
2374 fn cli_base() -> TelemetryConfig {
2375 TelemetryConfig {
2378 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2379 format: LogFormat::Json,
2380 ..Default::default()
2381 }
2382 }
2383
2384 #[test]
2385 fn config_log_dir_promoted_when_flag_absent() {
2386 let runtime = fresh_runtime();
2387 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2388 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2389 assert_eq!(
2390 merged.log_dir.as_deref(),
2391 Some(std::path::Path::new("/var/log/reddb"))
2392 );
2393 }
2394
2395 #[test]
2396 fn explicit_log_dir_wins_over_config() {
2397 let runtime = fresh_runtime();
2398 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2399 let mut cli = cli_base();
2400 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2401 cli.log_dir_explicit = true;
2402 let merged = merge_telemetry_with_config(cli, &runtime);
2403 assert_eq!(
2404 merged.log_dir.as_deref(),
2405 Some(std::path::Path::new("/custom/dir"))
2406 );
2407 }
2408
2409 #[test]
2410 fn no_log_file_beats_config_log_dir() {
2411 let runtime = fresh_runtime();
2412 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2413 let mut cli = cli_base();
2414 cli.log_dir = None;
2415 cli.log_file_disabled = true;
2416 let merged = merge_telemetry_with_config(cli, &runtime);
2417 assert!(
2418 merged.log_dir.is_none(),
2419 "--no-log-file must veto config dir"
2420 );
2421 }
2422
2423 #[test]
2424 fn config_format_promoted_on_non_tty_default() {
2425 let runtime = fresh_runtime();
2429 set_str(&runtime, "red.logging.format", "pretty");
2430 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2431 assert_eq!(merged.format, LogFormat::Pretty);
2432 }
2433
2434 #[test]
2435 fn explicit_format_wins_over_config() {
2436 let runtime = fresh_runtime();
2437 set_str(&runtime, "red.logging.format", "pretty");
2438 let mut cli = cli_base();
2439 cli.format = LogFormat::Json;
2440 cli.format_explicit = true;
2441 let merged = merge_telemetry_with_config(cli, &runtime);
2442 assert_eq!(merged.format, LogFormat::Json);
2443 }
2444}
2445
2446#[inline(never)]
2447fn build_http_server(
2448 runtime: RedDBRuntime,
2449 auth_store: Arc<AuthStore>,
2450 bind_addr: String,
2451) -> RedDBServer {
2452 build_http_server_with_transport_readiness(
2453 runtime,
2454 auth_store,
2455 bind_addr,
2456 TransportReadiness::default(),
2457 )
2458}
2459
2460fn apply_http_limits(
2466 server: RedDBServer,
2467 config: &ServerCommandConfig,
2468 runtime: &RedDBRuntime,
2469) -> RedDBServer {
2470 let store = runtime.db().store();
2471 let resolved =
2472 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2473 .get_config(key)
2474 {
2475 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2476 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2477 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2478 _ => None,
2479 });
2480 tracing::info!(
2481 target: "reddb::http_limits",
2482 max_handlers = resolved.max_handlers,
2483 handler_timeout_ms = resolved.handler_timeout_ms,
2484 retry_after_secs = resolved.retry_after_secs,
2485 max_inflight_per_principal = resolved.max_inflight_per_principal,
2486 "http_limits resolved"
2487 );
2488 server.with_http_limits(resolved)
2489}
2490
2491#[inline(never)]
2492fn build_http_server_with_transport_readiness(
2493 runtime: RedDBRuntime,
2494 auth_store: Arc<AuthStore>,
2495 bind_addr: String,
2496 transport_readiness: TransportReadiness,
2497) -> RedDBServer {
2498 RedDBServer::with_options(
2499 runtime,
2500 ServerOptions {
2501 bind_addr,
2502 transport_readiness,
2503 ..ServerOptions::default()
2504 },
2505 )
2506 .with_auth(auth_store)
2507}
2508
2509#[inline(never)]
2513fn build_admin_only_server(
2514 runtime: RedDBRuntime,
2515 auth_store: Arc<AuthStore>,
2516 bind_addr: String,
2517) -> RedDBServer {
2518 RedDBServer::with_options(
2519 runtime,
2520 ServerOptions {
2521 bind_addr,
2522 surface: crate::server::ServerSurface::AdminOnly,
2523 ..ServerOptions::default()
2524 },
2525 )
2526 .with_auth(auth_store)
2527}
2528
2529#[inline(never)]
2533fn build_metrics_only_server(
2534 runtime: RedDBRuntime,
2535 auth_store: Arc<AuthStore>,
2536 bind_addr: String,
2537) -> RedDBServer {
2538 RedDBServer::with_options(
2539 runtime,
2540 ServerOptions {
2541 bind_addr,
2542 surface: crate::server::ServerSurface::MetricsOnly,
2543 ..ServerOptions::default()
2544 },
2545 )
2546 .with_auth(auth_store)
2547}
2548
2549fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2553 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2554 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2555 let _ = server.serve_in_background();
2556 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2557 }
2558 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2559 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2560 let _ = server.serve_in_background();
2561 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2562 }
2563}
2564
2565#[inline(never)]
2566fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2567 let cli_telemetry = config.telemetry.clone();
2568 let mut transport_readiness = TransportReadiness::default();
2569 let Some(listener) = bind_listener_for_startup(
2570 &mut transport_readiness,
2571 "http",
2572 &bind_addr,
2573 config.http_bind_explicit,
2574 )?
2575 else {
2576 return Err(format!(
2577 "no HTTP listener started; implicit bind {} failed",
2578 bind_addr
2579 ));
2580 };
2581 let db_options = config.to_db_options()?;
2582 let (runtime, auth_store, _telemetry_guard) =
2583 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2584 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2585 spawn_admin_metrics_listeners(&runtime, &auth_store);
2586 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2587 let server = build_http_server_with_transport_readiness(
2588 runtime.clone(),
2589 auth_store,
2590 bind_addr.clone(),
2591 transport_readiness,
2592 );
2593 let server = apply_http_limits(server, &config, &runtime);
2594 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2595 server.serve_on(listener).map_err(|err| err.to_string())
2596}
2597
2598fn spawn_http_tls_listener(
2604 config: &ServerCommandConfig,
2605 runtime: &RedDBRuntime,
2606 auth_store: &Arc<AuthStore>,
2607) -> Result<(), String> {
2608 let Some(addr) = config.http_tls_bind_addr.clone() else {
2609 return Ok(());
2610 };
2611
2612 let tls_config = resolve_http_tls_config(config)?;
2613 let server_config = crate::server::tls::build_server_config(&tls_config)
2614 .map_err(|err| format!("HTTP TLS: {err}"))?;
2615
2616 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2617 let server = apply_http_limits(server, config, runtime);
2618 let _handle = server.serve_tls_in_background(server_config);
2619 tracing::info!(
2620 transport = "https",
2621 bind = %addr,
2622 mtls = %tls_config.client_ca_path.is_some(),
2623 "TLS listener online"
2624 );
2625 Ok(())
2626}
2627
2628fn resolve_http_tls_config(
2630 config: &ServerCommandConfig,
2631) -> Result<crate::server::tls::HttpTlsConfig, String> {
2632 match (&config.http_tls_cert, &config.http_tls_key) {
2633 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2634 cert_path: cert.clone(),
2635 key_path: key.clone(),
2636 client_ca_path: config.http_tls_client_ca.clone(),
2637 }),
2638 (None, None) => {
2639 let dir = config
2641 .path
2642 .as_ref()
2643 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2644 .unwrap_or_else(|| std::path::PathBuf::from("."));
2645 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2646 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2647 Ok(crate::server::tls::HttpTlsConfig {
2648 cert_path: auto.cert_path,
2649 key_path: auto.key_path,
2650 client_ca_path: config.http_tls_client_ca.clone(),
2651 })
2652 }
2653 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2654 }
2655}
2656
2657#[inline(never)]
2658fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2659 let workers = config.workers;
2660 let cli_telemetry = config.telemetry.clone();
2661 let db_options = config.to_db_options()?;
2662 let rt_config = detect_runtime_config();
2663 let mut transport_readiness = TransportReadiness::default();
2664 let Some(grpc_listener) = bind_listener_for_startup(
2665 &mut transport_readiness,
2666 "grpc",
2667 &bind_addr,
2668 config.grpc_bind_explicit,
2669 )?
2670 else {
2671 return Err(format!(
2672 "no gRPC listener started; implicit bind {} failed",
2673 bind_addr
2674 ));
2675 };
2676
2677 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2678
2679 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2680 .enable_all()
2681 .worker_threads(worker_threads)
2682 .thread_stack_size(rt_config.stack_size)
2683 .build()
2684 .map_err(|err| format!("tokio runtime: {err}"))?;
2685
2686 let (runtime, auth_store, _telemetry_guard) =
2688 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2689 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2690 let signal_runtime = runtime.clone();
2691 tokio_runtime.block_on(async move {
2692 spawn_lifecycle_signal_handler(signal_runtime).await;
2693 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2695
2696 spawn_pg_listener(&config, &runtime);
2698
2699 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2703
2704 let server = RedDBGrpcServer::with_options(
2705 runtime,
2706 GrpcServerOptions {
2707 bind_addr: bind_addr.clone(),
2708 tls: None,
2709 },
2710 auth_store,
2711 );
2712
2713 tracing::info!(
2714 transport = "grpc",
2715 bind = %bind_addr,
2716 cpus = rt_config.available_cpus,
2717 workers = worker_threads,
2718 "listener online"
2719 );
2720 server
2721 .serve_on(grpc_listener)
2722 .await
2723 .map_err(|err| err.to_string())
2724 })
2725}
2726
2727#[inline(never)]
2728fn run_dual_server(
2729 config: ServerCommandConfig,
2730 grpc_bind_addr: String,
2731 http_bind_addr: String,
2732) -> Result<(), String> {
2733 let workers = config.workers;
2734 let cli_telemetry = config.telemetry.clone();
2735 let db_options = config.to_db_options()?;
2736 let rt_config = detect_runtime_config();
2737 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2738 let mut transport_readiness = TransportReadiness::default();
2739 let http_listener = bind_listener_for_startup(
2740 &mut transport_readiness,
2741 "http",
2742 &http_bind_addr,
2743 config.http_bind_explicit,
2744 )?;
2745 let grpc_listener = bind_listener_for_startup(
2746 &mut transport_readiness,
2747 "grpc",
2748 &grpc_bind_addr,
2749 config.grpc_bind_explicit,
2750 )?;
2751 if http_listener.is_none() && grpc_listener.is_none() {
2752 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2753 }
2754 let (runtime, auth_store, _telemetry_guard) =
2755 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2756 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2757
2758 spawn_admin_metrics_listeners(&runtime, &auth_store);
2759 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2760
2761 let http_handle = if let Some(listener) = http_listener {
2762 let http_server = build_http_server_with_transport_readiness(
2763 runtime.clone(),
2764 auth_store.clone(),
2765 http_bind_addr.clone(),
2766 transport_readiness.clone(),
2767 );
2768 let http_server = apply_http_limits(http_server, &config, &runtime);
2769 Some(http_server.serve_in_background_on(listener))
2770 } else {
2771 None
2772 };
2773
2774 thread::sleep(Duration::from_millis(150));
2775 if let Some(handle) = http_handle.as_ref() {
2776 if handle.is_finished() {
2777 let handle = http_handle.unwrap();
2778 return match handle.join() {
2779 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2780 Ok(Err(err)) => Err(err.to_string()),
2781 Err(_) => Err("HTTP server thread panicked".to_string()),
2782 };
2783 }
2784 }
2785 if grpc_listener.is_none() {
2786 let Some(handle) = http_handle else {
2787 return Err("no listener started".to_string());
2788 };
2789 return match handle.join() {
2790 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2791 Ok(Err(err)) => Err(err.to_string()),
2792 Err(_) => Err("HTTP server thread panicked".to_string()),
2793 };
2794 }
2795 let grpc_listener = grpc_listener.expect("checked above");
2796
2797 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2798 .enable_all()
2799 .worker_threads(worker_threads)
2800 .thread_stack_size(rt_config.stack_size)
2801 .build()
2802 .map_err(|err| format!("tokio runtime: {err}"))?;
2803
2804 let signal_runtime = runtime.clone();
2805 tokio_runtime.block_on(async move {
2806 spawn_lifecycle_signal_handler(signal_runtime).await;
2807 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2809
2810 spawn_pg_listener(&config, &runtime);
2812
2813 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2815
2816 let server = RedDBGrpcServer::with_options(
2817 runtime,
2818 GrpcServerOptions {
2819 bind_addr: grpc_bind_addr.clone(),
2820 tls: None,
2821 },
2822 auth_store,
2823 );
2824
2825 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2826 tracing::info!(
2827 transport = "grpc",
2828 bind = %grpc_bind_addr,
2829 cpus = rt_config.available_cpus,
2830 workers = worker_threads,
2831 "listener online"
2832 );
2833 server
2834 .serve_on(grpc_listener)
2835 .await
2836 .map_err(|err| err.to_string())
2837 })
2838}
2839
2840#[cfg(test)]
2841mod tests {
2842 use super::*;
2843
2844 #[test]
2845 fn render_systemd_unit_contains_expected_execstart() {
2846 let config = SystemdServiceConfig {
2847 service_name: "reddb".to_string(),
2848 binary_path: PathBuf::from("/usr/local/bin/red"),
2849 run_user: "reddb".to_string(),
2850 run_group: "reddb".to_string(),
2851 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2852 router_bind_addr: None,
2853 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2854 http_bind_addr: None,
2855 };
2856
2857 let unit = render_systemd_unit(&config);
2858 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2859 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2860 }
2861
2862 #[test]
2863 fn systemd_service_config_derives_paths() {
2864 let config = SystemdServiceConfig {
2865 service_name: "reddb-api".to_string(),
2866 binary_path: PathBuf::from("/usr/local/bin/red"),
2867 run_user: "reddb".to_string(),
2868 run_group: "reddb".to_string(),
2869 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2870 router_bind_addr: None,
2871 grpc_bind_addr: None,
2872 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2873 };
2874
2875 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2876 assert_eq!(
2877 config.unit_path(),
2878 PathBuf::from("/etc/systemd/system/reddb-api.service")
2879 );
2880 }
2881
2882 #[test]
2883 fn render_systemd_unit_supports_dual_transport() {
2884 let config = SystemdServiceConfig {
2885 service_name: "reddb".to_string(),
2886 binary_path: PathBuf::from("/usr/local/bin/red"),
2887 run_user: "reddb".to_string(),
2888 run_group: "reddb".to_string(),
2889 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2890 router_bind_addr: None,
2891 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2892 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2893 };
2894
2895 let unit = render_systemd_unit(&config);
2896 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2897 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2898 }
2899
2900 #[test]
2901 fn render_systemd_unit_supports_router_mode() {
2902 let config = SystemdServiceConfig {
2903 service_name: "reddb".to_string(),
2904 binary_path: PathBuf::from("/usr/local/bin/red"),
2905 run_user: "reddb".to_string(),
2906 run_group: "reddb".to_string(),
2907 data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2908 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2909 grpc_bind_addr: None,
2910 http_bind_addr: None,
2911 };
2912
2913 let unit = render_systemd_unit(&config);
2914 assert!(unit.contains("--bind 127.0.0.1:5050"));
2915 assert!(!unit.contains("--grpc-bind"));
2916 assert!(!unit.contains("--http-bind"));
2917 }
2918
2919 #[test]
2920 fn explicit_bind_collision_is_fatal() {
2921 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2922 let addr = held.local_addr().expect("held addr").to_string();
2923 let mut readiness = TransportReadiness::default();
2924
2925 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2926
2927 assert!(error.contains("explicit http listener bind"));
2928 assert_eq!(readiness.active.len(), 0);
2929 assert_eq!(readiness.failed.len(), 1);
2930 assert!(readiness.failed[0].explicit);
2931 assert_eq!(readiness.failed[0].bind_addr, addr);
2932 }
2933
2934 fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2941 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2942 LOCK.get_or_init(|| std::sync::Mutex::new(()))
2943 }
2944
2945 fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
2946 ServerCommandConfig {
2947 path: None,
2948 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2949 router_bind_explicit: false,
2950 grpc_bind_addr: None,
2951 grpc_bind_explicit: false,
2952 grpc_tls_bind_addr: None,
2953 grpc_tls_cert: None,
2954 grpc_tls_key: None,
2955 grpc_tls_client_ca: None,
2956 http_bind_addr: None,
2957 http_bind_explicit: false,
2958 http_tls_bind_addr: None,
2959 http_tls_cert: None,
2960 http_tls_key: None,
2961 http_tls_client_ca: None,
2962 wire_bind_addr: None,
2963 wire_bind_explicit: false,
2964 wire_tls_bind_addr: None,
2965 wire_tls_cert: None,
2966 wire_tls_key: None,
2967 pg_bind_addr: None,
2968 create_if_missing: true,
2969 read_only: false,
2970 role: "standalone".to_string(),
2971 primary_addr: None,
2972 vault: true,
2975 no_auth,
2976 workers: None,
2977 telemetry: None,
2978 http_limits_cli: crate::server::HttpLimitsCliInput::default(),
2979 }
2980 }
2981
2982 #[test]
2983 fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
2984 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2985 unsafe {
2990 std::env::set_var("REDDB_USERNAME", "admin");
2991 std::env::set_var("REDDB_PASSWORD", "hunter2");
2992 }
2993 let config = no_auth_test_config(true);
2994 let options = config.to_db_options().expect("to_db_options");
2995
2996 assert!(no_auth_active(&options), "metadata should be stamped");
2997 assert!(!options.auth.enabled, "auth.enabled must be forced off");
2998 assert!(
2999 !options.auth.require_auth,
3000 "require_auth must be forced off"
3001 );
3002 assert!(
3003 !options.auth.vault_enabled,
3004 "vault_enabled must be forced off (overrides --vault)"
3005 );
3006 assert_eq!(
3007 options.metadata.get(NO_AUTH_META).map(String::as_str),
3008 Some("true"),
3009 );
3010
3011 unsafe {
3013 std::env::remove_var("REDDB_USERNAME");
3014 std::env::remove_var("REDDB_PASSWORD");
3015 }
3016 }
3017
3018 #[test]
3019 fn default_behaviour_without_no_auth_flag_is_unchanged() {
3020 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3021 let config = no_auth_test_config(false);
3022 let options = config.to_db_options().expect("to_db_options");
3023
3024 assert!(
3025 !no_auth_active(&options),
3026 "default boot must not be marked no-auth"
3027 );
3028 assert!(
3029 options.metadata.get(NO_AUTH_META).is_none(),
3030 "metadata key must be absent when flag is off"
3031 );
3032 assert!(options.auth.vault_enabled);
3034 }
3035
3036 #[test]
3037 fn no_auth_active_blocks_bootstrap_from_env() {
3038 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3039 unsafe {
3044 std::env::set_var("REDDB_USERNAME", "admin");
3045 std::env::set_var("REDDB_PASSWORD", "hunter2");
3046 }
3047
3048 let options = no_auth_test_config(true)
3049 .to_db_options()
3050 .expect("to_db_options");
3051
3052 let auth_store = AuthStore::new(options.auth.clone());
3056 if !no_auth_active(&options) {
3057 auth_store.bootstrap_from_env();
3058 }
3059
3060 assert!(
3061 auth_store.needs_bootstrap(),
3062 "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3063 );
3064
3065 unsafe {
3067 std::env::remove_var("REDDB_USERNAME");
3068 std::env::remove_var("REDDB_PASSWORD");
3069 }
3070 }
3071
3072 fn clear_preset_env() {
3079 unsafe {
3081 std::env::remove_var(PRESET_ENV);
3082 std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3083 std::env::remove_var("REDDB_USERNAME");
3084 std::env::remove_var("REDDB_PASSWORD");
3085 std::env::remove_var("REDDB_USERNAME_FILE");
3086 std::env::remove_var("REDDB_PASSWORD_FILE");
3087 }
3088 }
3089
3090 fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3091 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3092 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3093 (runtime, auth_store)
3094 }
3095
3096 #[test]
3097 fn simple_preset_is_default_and_persists_state() {
3098 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3099 clear_preset_env();
3100
3101 let (runtime, auth_store) = fresh_runtime_and_store();
3102 apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3103
3104 assert!(
3106 auth_store.needs_bootstrap(),
3107 "simple preset must not create an admin"
3108 );
3109
3110 let store = runtime.db().store();
3112 let completed = store
3113 .get_config(BOOTSTRAP_COMPLETED_KEY)
3114 .expect("completed key persisted");
3115 assert!(matches!(
3116 completed,
3117 crate::storage::schema::Value::Boolean(true)
3118 ));
3119 let preset = store
3120 .get_config(BOOTSTRAP_PRESET_KEY)
3121 .expect("preset key persisted");
3122 match preset {
3123 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3124 other => panic!("expected Text(simple), got {other:?}"),
3125 }
3126 assert!(
3127 store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3128 "simple preset must not record a first admin"
3129 );
3130
3131 clear_preset_env();
3132 }
3133
3134 #[test]
3135 fn production_preset_creates_first_admin_with_allow_all_policy() {
3136 use crate::auth::policies::{EvalContext, ResourceRef};
3137 use crate::auth::UserId;
3138
3139 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3140 clear_preset_env();
3141 unsafe {
3143 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3144 std::env::set_var("REDDB_USERNAME", "ops");
3145 std::env::set_var("REDDB_PASSWORD", "hunter2");
3146 }
3147
3148 let (runtime, auth_store) = fresh_runtime_and_store();
3149 apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3150
3151 assert!(
3153 !auth_store.needs_bootstrap(),
3154 "production preset must seal bootstrap"
3155 );
3156 let users = auth_store.list_users();
3157 assert_eq!(users.len(), 1);
3158 let admin = &users[0];
3159 assert_eq!(admin.username, "ops");
3160 assert!(
3161 admin.system_owned,
3162 "first admin must be system-owned to pass the managed-config gate"
3163 );
3164 assert!(
3165 admin.tenant_id.is_none(),
3166 "first admin must be platform-scoped (tenant=None)"
3167 );
3168
3169 let policy = auth_store
3171 .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3172 .expect("allow-all policy installed");
3173 assert!(!policy.statements.is_empty());
3174
3175 let actor = UserId::platform("ops");
3178 let ctx = EvalContext {
3179 principal_tenant: None,
3180 current_tenant: None,
3181 peer_ip: None,
3182 mfa_present: false,
3183 now_ms: 1_700_000_000_000,
3184 principal_is_admin_role: true,
3185 principal_is_system_owned: true,
3186 principal_is_platform_scoped: true,
3187 };
3188 let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3189 assert!(
3190 auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3191 "allow-all policy must grant arbitrary actions via the evaluator"
3192 );
3193
3194 let store = runtime.db().store();
3196 match store
3197 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3198 .expect("first_admin_id persisted")
3199 {
3200 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3201 other => panic!("expected Text(ops), got {other:?}"),
3202 }
3203 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3204 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3205 other => panic!("expected Text(production), got {other:?}"),
3206 }
3207
3208 clear_preset_env();
3209 }
3210
3211 #[test]
3212 fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3213 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3214 clear_preset_env();
3215 unsafe {
3217 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3218 }
3219
3220 let (runtime, auth_store) = fresh_runtime_and_store();
3221 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3222
3223 assert!(runtime.query_audit().is_enabled());
3224 assert!(runtime.query_audit().rules().is_empty());
3225 assert!(
3226 runtime
3227 .db()
3228 .store()
3229 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3230 .is_some(),
3231 "regulated preset should create the query-audit stream"
3232 );
3233
3234 runtime
3235 .execute_query("CREATE TABLE docs (id INT)")
3236 .expect("create table");
3237 runtime
3238 .execute_query("INSERT INTO docs (id) VALUES (1)")
3239 .expect("insert");
3240 runtime.execute_query("SELECT * FROM docs").expect("select");
3241 let rows = runtime
3242 .db()
3243 .store()
3244 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3245 .expect("query audit collection")
3246 .query_all(|_| true);
3247 assert!(
3248 rows.is_empty(),
3249 "regulated preset must not globally audit every query"
3250 );
3251
3252 clear_preset_env();
3253 }
3254
3255 #[test]
3256 fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3257 use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3258 use crate::auth::store::PrincipalRef;
3259 use crate::auth::{Role, UserId};
3260 use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3261 use crate::storage::schema::Value;
3262
3263 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3264 clear_preset_env();
3265 unsafe {
3267 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3268 }
3269
3270 let options = no_auth_test_config(false)
3271 .to_db_options()
3272 .expect("regulated options");
3273 assert!(
3274 options.control_events.compliance_mode,
3275 "regulated preset must enable fail-closed control evidence before runtime boot"
3276 );
3277 assert!(
3278 options.query_audit.enabled && options.query_audit.rules.is_empty(),
3279 "regulated preset must enable query-audit infrastructure without global rules"
3280 );
3281
3282 let runtime = RedDBRuntime::with_options(options).expect("runtime");
3283 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3284 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3285 runtime.set_auth_store(Arc::clone(&auth_store));
3286
3287 assert!(runtime.control_events_require_persistence());
3288 assert!(runtime.query_audit().is_enabled());
3289 assert!(runtime.query_audit().rules().is_empty());
3290 assert!(auth_store
3291 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3292 .is_some());
3293
3294 let managed_policy = runtime
3295 .config_registry()
3296 .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3297 .expect("regulated managed policy registry entry");
3298 assert!(managed_policy.managed);
3299 assert_eq!(managed_policy.resource_type, "policy");
3300 assert!(
3301 runtime
3302 .config_registry()
3303 .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3304 .expect("regulated audit config namespace")
3305 .managed
3306 );
3307
3308 let registry_rows = runtime
3309 .execute_query(&format!(
3310 "SELECT id, managed FROM red.registry WHERE id = '{}'",
3311 REGULATED_PROTECT_MANAGED_POLICY
3312 ))
3313 .expect("red.registry query");
3314 assert_eq!(registry_rows.result.records.len(), 1);
3315 assert_eq!(
3316 registry_rows.result.records[0].get("managed"),
3317 Some(&Value::Boolean(true))
3318 );
3319
3320 let managed_policy_rows = runtime
3321 .execute_query(&format!(
3322 "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3323 REGULATED_PROTECT_MANAGED_POLICY
3324 ))
3325 .expect("red.managed_policies query");
3326 assert_eq!(managed_policy_rows.result.records.len(), 1);
3327
3328 let capability_rows = runtime
3329 .execute_query(
3330 "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3331 )
3332 .expect("red.control_capabilities query");
3333 assert_eq!(capability_rows.result.records.len(), 1);
3334
3335 auth_store
3336 .create_user("alice", "p", Role::Admin)
3337 .expect("create ordinary admin");
3338 let allow_all = Policy::from_json_str(
3339 r#"{
3340 "id": "alice-allow-all",
3341 "version": 1,
3342 "statements": [{
3343 "effect": "allow",
3344 "actions": ["*"],
3345 "resources": ["*"]
3346 }]
3347 }"#,
3348 )
3349 .expect("allow-all policy");
3350 auth_store.put_policy(allow_all).expect("install allow-all");
3351 auth_store
3352 .attach_policy(
3353 PrincipalRef::User(UserId::platform("alice")),
3354 "alice-allow-all",
3355 )
3356 .expect("attach allow-all");
3357 let ctx = EvalContext {
3358 principal_tenant: None,
3359 current_tenant: None,
3360 peer_ip: None,
3361 mfa_present: false,
3362 now_ms: 1_700_000_000_000,
3363 principal_is_admin_role: true,
3364 principal_is_system_owned: false,
3365 principal_is_platform_scoped: true,
3366 };
3367 assert!(
3368 auth_store.check_policy_authz(
3369 &UserId::platform("alice"),
3370 "policy:drop",
3371 &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
3372 &ctx,
3373 ),
3374 "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
3375 );
3376
3377 set_current_auth_identity("alice".to_string(), Role::Admin);
3378 let denied = runtime.execute_query(&format!(
3379 "DROP POLICY '{}'",
3380 REGULATED_PROTECT_MANAGED_POLICY
3381 ));
3382 clear_current_auth_identity();
3383 let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
3384 assert!(
3385 err.to_string().contains("managed policy"),
3386 "error should name the managed guardrail: {err}"
3387 );
3388 assert!(
3389 auth_store
3390 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3391 .is_some(),
3392 "denied mutation must leave managed policy installed"
3393 );
3394
3395 let denied_events = runtime
3396 .execute_query(&format!(
3397 "SELECT action, resource, outcome FROM red.control_events \
3398 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
3399 REGULATED_PROTECT_MANAGED_POLICY
3400 ))
3401 .expect("red.control_events denied policy drop");
3402 assert_eq!(denied_events.result.records.len(), 1);
3403 assert_eq!(
3404 denied_events.result.records[0].get("outcome"),
3405 Some(&Value::text("denied"))
3406 );
3407
3408 set_current_auth_identity("alice".to_string(), Role::Admin);
3409 let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
3410 clear_current_auth_identity();
3411 let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
3412 assert!(
3413 err.to_string().contains("managed config"),
3414 "error should name the managed config guardrail: {err}"
3415 );
3416
3417 let denied_config_events = runtime
3418 .execute_query(
3419 "SELECT action, resource, outcome FROM red.control_events \
3420 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
3421 )
3422 .expect("red.control_events denied config write");
3423 assert_eq!(denied_config_events.result.records.len(), 1);
3424 assert_eq!(
3425 denied_config_events.result.records[0].get("outcome"),
3426 Some(&Value::text("denied"))
3427 );
3428
3429 runtime
3430 .execute_query("CREATE TABLE regulated_docs (id INT)")
3431 .expect("create user table");
3432 runtime
3433 .execute_query("SELECT * FROM regulated_docs")
3434 .expect("select user table");
3435 let audit_rows = runtime
3436 .db()
3437 .store()
3438 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3439 .expect("query audit collection")
3440 .query_all(|_| true);
3441 assert!(
3442 audit_rows.is_empty(),
3443 "regulated preset must not globally audit data-plane queries"
3444 );
3445
3446 clear_preset_env();
3447 }
3448
3449 #[test]
3450 fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
3451 use crate::auth::policies::{EvalContext, ResourceRef};
3452 use crate::auth::UserId;
3453 use crate::storage::schema::Value;
3454
3455 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3456 clear_preset_env();
3457
3458 let manifest_path = std::env::temp_dir().join(format!(
3459 "reddb-bootstrap-manifest-{}-{}.json",
3460 std::process::id(),
3461 std::time::SystemTime::now()
3462 .duration_since(std::time::UNIX_EPOCH)
3463 .unwrap_or_default()
3464 .as_millis()
3465 ));
3466 std::fs::write(
3467 &manifest_path,
3468 r#"{
3469 "users": [
3470 {
3471 "username": "ops",
3472 "password": "hunter2",
3473 "role": "admin",
3474 "system_owned": true
3475 }
3476 ],
3477 "policies": [
3478 {
3479 "id": "bootstrap-registry-admin",
3480 "version": 1,
3481 "statements": [
3482 {
3483 "effect": "allow",
3484 "actions": ["red.registry:*", "policy:*", "config:write", "app:read"],
3485 "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
3486 }
3487 ]
3488 }
3489 ],
3490 "managed_policies": [
3491 {
3492 "id": "managed-deny-drop",
3493 "version": 1,
3494 "statements": [
3495 {
3496 "effect": "deny",
3497 "actions": ["policy:drop"],
3498 "resources": ["policy:managed-deny-drop"]
3499 }
3500 ],
3501 "required_resource": "policy:managed-deny-drop",
3502 "evidence": "full"
3503 }
3504 ],
3505 "attachments": [
3506 {"user": "ops", "policy": "bootstrap-registry-admin"}
3507 ],
3508 "managed_config_namespaces": [
3509 {
3510 "id": "red.ai",
3511 "required_action": "config:write",
3512 "required_resource": "config:red.ai.*",
3513 "evidence": "metadata"
3514 }
3515 ],
3516 "config": [
3517 {"key": "red.ai.default.provider", "value": "openai"},
3518 {
3519 "key": "red.ai.openai.default.secret_ref",
3520 "secret_ref": {"collection": "red.vault", "key": "openai"}
3521 }
3522 ],
3523 "actor": "ops"
3524 }"#,
3525 )
3526 .expect("write manifest");
3527 unsafe {
3529 std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
3530 }
3531
3532 let (runtime, auth_store) = fresh_runtime_and_store();
3533 apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
3534
3535 let users = auth_store.list_users();
3536 assert_eq!(users.len(), 1);
3537 assert_eq!(users[0].username, "ops");
3538 assert!(users[0].system_owned);
3539
3540 let actor = UserId::platform("ops");
3541 let ctx = EvalContext {
3542 principal_tenant: None,
3543 current_tenant: None,
3544 peer_ip: None,
3545 mfa_present: false,
3546 now_ms: 1_700_000_000_000,
3547 principal_is_admin_role: true,
3548 principal_is_system_owned: true,
3549 principal_is_platform_scoped: true,
3550 };
3551 assert!(auth_store.check_policy_authz(
3552 &actor,
3553 "app:read",
3554 &ResourceRef::new("collection", "docs"),
3555 &ctx
3556 ));
3557
3558 let managed_policy = runtime
3559 .config_registry()
3560 .get_active("managed-deny-drop")
3561 .expect("managed policy registry entry");
3562 assert!(managed_policy.managed);
3563 assert_eq!(managed_policy.resource_type, "policy");
3564 let managed_config = runtime
3565 .config_registry()
3566 .get_active("red.ai")
3567 .expect("managed config namespace registry entry");
3568 assert!(managed_config.managed);
3569 assert_eq!(managed_config.resource_type, "config_namespace");
3570
3571 let store = runtime.db().store();
3572 match store
3573 .get_config("red.ai.default.provider")
3574 .expect("plain config persisted")
3575 {
3576 Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
3577 other => panic!("expected provider text, got {other:?}"),
3578 }
3579 let Value::Json(bytes) = store
3580 .get_config("red.ai.openai.default.secret_ref")
3581 .expect("secret ref config persisted")
3582 else {
3583 panic!("secret ref must be stored as structured JSON");
3584 };
3585 let reference: crate::serde_json::Value =
3586 crate::serde_json::from_slice(&bytes).expect("secret ref json");
3587 assert_eq!(
3588 reference.get("type").and_then(|v| v.as_str()),
3589 Some("secret_ref")
3590 );
3591 assert!(
3592 !String::from_utf8_lossy(&bytes).contains("hunter2"),
3593 "manifest password must not leak into secret ref config"
3594 );
3595
3596 let completed = store
3597 .get_config(BOOTSTRAP_COMPLETED_KEY)
3598 .expect("bootstrap completion persisted");
3599 assert!(matches!(completed, Value::Boolean(true)));
3600 assert!(
3601 store
3602 .get_config("system.bootstrap.manifest.registry_entries")
3603 .is_some(),
3604 "managed registry entries must be persisted internally"
3605 );
3606
3607 std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
3608 let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
3609 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
3610 .expect("registry rehydrates without manifest file");
3611 assert!(restored_registry.get_active("managed-deny-drop").is_some());
3612 assert!(restored_registry.get_active("red.ai").is_some());
3613
3614 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3615 apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
3616 assert!(fresh.needs_bootstrap());
3617
3618 clear_preset_env();
3619 }
3620
3621 #[test]
3622 fn production_preset_refuses_to_start_without_password() {
3623 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3624 clear_preset_env();
3625 unsafe {
3627 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3628 std::env::set_var("REDDB_USERNAME", "ops");
3629 }
3630
3631 let (runtime, auth_store) = fresh_runtime_and_store();
3632 let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
3633 assert!(
3634 err.contains("REDDB_PASSWORD"),
3635 "error must name the missing env: {err}"
3636 );
3637
3638 assert!(auth_store.needs_bootstrap());
3640 assert!(runtime
3641 .db()
3642 .store()
3643 .get_config(BOOTSTRAP_COMPLETED_KEY)
3644 .is_none());
3645
3646 clear_preset_env();
3647 }
3648
3649 #[test]
3650 fn re_running_production_after_first_boot_is_a_silent_skip() {
3651 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3652 clear_preset_env();
3653 unsafe {
3655 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3656 std::env::set_var("REDDB_USERNAME", "ops");
3657 std::env::set_var("REDDB_PASSWORD", "hunter2");
3658 }
3659
3660 let (runtime, auth_store) = fresh_runtime_and_store();
3661 apply_preset(&runtime, &auth_store).expect("first apply");
3662 assert_eq!(auth_store.list_users().len(), 1);
3663
3664 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3671 apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
3672 assert!(
3673 fresh.needs_bootstrap(),
3674 "re-run must not create a second admin"
3675 );
3676 assert!(
3677 fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
3678 "re-run must not re-install the allow-all policy on the fresh store"
3679 );
3680
3681 clear_preset_env();
3682 }
3683
3684 #[test]
3685 fn unrecognised_preset_value_is_rejected() {
3686 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3687 clear_preset_env();
3688 unsafe {
3690 std::env::set_var(PRESET_ENV, "weird");
3691 }
3692
3693 let (runtime, auth_store) = fresh_runtime_and_store();
3694 let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
3695 assert!(err.contains("weird"), "error must echo the value: {err}");
3696 assert!(auth_store.needs_bootstrap());
3697
3698 clear_preset_env();
3699 }
3700
3701 #[test]
3702 fn no_auth_short_circuits_preset_entirely() {
3703 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3704 clear_preset_env();
3705 unsafe {
3708 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3709 std::env::set_var("REDDB_USERNAME", "ops");
3710 std::env::set_var("REDDB_PASSWORD", "hunter2");
3711 }
3712
3713 let options = no_auth_test_config(true)
3714 .to_db_options()
3715 .expect("to_db_options");
3716 assert!(no_auth_active(&options));
3717
3718 let (runtime, auth_store) = fresh_runtime_and_store();
3721 if !no_auth_active(&options) {
3722 apply_preset(&runtime, &auth_store).expect("would apply preset");
3723 }
3724
3725 assert!(
3726 auth_store.needs_bootstrap(),
3727 "--no-auth must prevent any admin creation"
3728 );
3729 assert!(
3730 runtime
3731 .db()
3732 .store()
3733 .get_config(BOOTSTRAP_COMPLETED_KEY)
3734 .is_none(),
3735 "--no-auth must skip bootstrap-state persistence"
3736 );
3737
3738 clear_preset_env();
3739 }
3740
3741 #[test]
3742 fn implicit_bind_collision_degrades() {
3743 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3744 let addr = held.local_addr().expect("held addr").to_string();
3745 let mut readiness = TransportReadiness::default();
3746
3747 let listener =
3748 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
3749
3750 assert!(listener.is_none());
3751 assert_eq!(readiness.active.len(), 0);
3752 assert_eq!(readiness.failed.len(), 1);
3753 assert!(!readiness.failed[0].explicit);
3754 assert_eq!(readiness.failed[0].bind_addr, addr);
3755 }
3756}