1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::storage::StorageProfileSelection;
13use crate::{
14 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
15};
16
17pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
18
19pub fn detect_runtime_config() -> RuntimeConfig {
21 let cpus = thread::available_parallelism()
22 .map(|n| n.get())
23 .unwrap_or(1);
24
25 let suggested_workers = cpus.saturating_sub(1).max(1);
27
28 RuntimeConfig {
29 available_cpus: cpus,
30 suggested_workers,
31 stack_size: 8 * 1024 * 1024, }
33}
34
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37 pub available_cpus: usize,
38 pub suggested_workers: usize,
39 pub stack_size: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerTransport {
44 Grpc,
45 Http,
46 Wire,
47}
48
49impl ServerTransport {
50 pub const fn as_str(self) -> &'static str {
51 match self {
52 Self::Grpc => "gRPC",
53 Self::Http => "HTTP",
54 Self::Wire => "wire",
55 }
56 }
57
58 pub const fn default_bind_addr(self) -> &'static str {
59 match self {
60 Self::Grpc => "127.0.0.1:5555",
61 Self::Http => "127.0.0.1:5055",
62 Self::Wire => "127.0.0.1:5050",
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct ServerCommandConfig {
69 pub path: Option<PathBuf>,
70 pub router_bind_addr: Option<String>,
71 pub router_bind_explicit: bool,
72 pub grpc_bind_addr: Option<String>,
73 pub grpc_bind_explicit: bool,
74 pub grpc_tls_bind_addr: Option<String>,
78 pub grpc_tls_cert: Option<PathBuf>,
84 pub grpc_tls_key: Option<PathBuf>,
87 pub grpc_tls_client_ca: Option<PathBuf>,
92 pub http_bind_addr: Option<String>,
93 pub http_bind_explicit: bool,
94 pub http_tls_bind_addr: Option<String>,
98 pub http_tls_cert: Option<PathBuf>,
101 pub http_tls_key: Option<PathBuf>,
104 pub http_tls_client_ca: Option<PathBuf>,
108 pub wire_bind_addr: Option<String>,
109 pub wire_bind_explicit: bool,
110 pub wire_tls_bind_addr: Option<String>,
112 pub wire_tls_cert: Option<PathBuf>,
114 pub wire_tls_key: Option<PathBuf>,
116 pub pg_bind_addr: Option<String>,
120 pub create_if_missing: bool,
121 pub read_only: bool,
122 pub role: String,
123 pub primary_addr: Option<String>,
124 pub storage_profile: StorageProfileSelection,
125 pub auth: bool,
128 pub require_auth: bool,
130 pub vault: bool,
131 pub no_auth: bool,
142 pub workers: Option<usize>,
144 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
147 pub http_limits_cli: crate::server::HttpLimitsCliInput,
152 pub ui: bool,
157 pub ui_dir: Option<PathBuf>,
161 pub bootstrap: BootstrapConfig,
164}
165
166#[derive(Debug, Clone, Default)]
167pub struct BootstrapConfig {
168 pub preset: Option<String>,
169 pub manifest: Option<PathBuf>,
170 pub admin_username: Option<String>,
171 pub admin_password: Option<String>,
172 pub cloud_head_admin: Option<String>,
173 pub cloud_head_admin_password: Option<String>,
174 pub customer_admin: Option<String>,
175 pub customer_admin_password: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct TransportListenerState {
180 pub transport: String,
181 pub bind_addr: String,
182 pub explicit: bool,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct TransportListenerFailure {
187 pub transport: String,
188 pub bind_addr: String,
189 pub explicit: bool,
190 pub reason: String,
191}
192
193#[derive(Debug, Clone, Default, PartialEq, Eq)]
194pub struct TransportReadiness {
195 pub active: Vec<TransportListenerState>,
196 pub failed: Vec<TransportListenerFailure>,
197}
198
199impl TransportReadiness {
200 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
201 self.active.push(TransportListenerState {
202 transport: transport.to_string(),
203 bind_addr: bind_addr.to_string(),
204 explicit,
205 });
206 }
207
208 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
209 self.failed.push(TransportListenerFailure {
210 transport: transport.to_string(),
211 bind_addr: bind_addr.to_string(),
212 explicit,
213 reason,
214 });
215 }
216}
217
218#[derive(Debug, Clone)]
219pub struct SystemdServiceConfig {
220 pub service_name: String,
221 pub binary_path: PathBuf,
222 pub run_user: String,
223 pub run_group: String,
224 pub data_path: PathBuf,
225 pub router_bind_addr: Option<String>,
226 pub grpc_bind_addr: Option<String>,
227 pub http_bind_addr: Option<String>,
228}
229
230impl SystemdServiceConfig {
231 pub fn data_dir(&self) -> PathBuf {
232 self.data_path
233 .parent()
234 .map(PathBuf::from)
235 .unwrap_or_else(|| PathBuf::from("."))
236 }
237
238 pub fn unit_path(&self) -> PathBuf {
239 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
240 }
241}
242
243pub fn default_telemetry_for_path(
248 path: Option<&std::path::Path>,
249) -> crate::telemetry::TelemetryConfig {
250 let log_dir = match path {
251 Some(p) => p
252 .parent()
253 .map(|parent| parent.join("logs"))
254 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
255 None => None, };
257 crate::telemetry::TelemetryConfig {
258 log_dir,
259 file_prefix: "reddb.log".to_string(),
260 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
261 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
262 crate::telemetry::LogFormat::Pretty
263 } else {
264 crate::telemetry::LogFormat::Json
265 },
266 rotation_keep_days: 14,
267 service_name: "reddb",
268 level_explicit: false,
270 format_explicit: false,
271 rotation_keep_days_explicit: false,
272 file_prefix_explicit: false,
273 log_dir_explicit: false,
274 log_file_disabled: false,
275 }
276}
277
278const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
285const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
286const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
287const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
291
292pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
300
301pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
304 options
305 .metadata
306 .get(NO_AUTH_META)
307 .map(|v| v == "true")
308 .unwrap_or(false)
309}
310
311const NO_AUTH_WARNING: &str =
316 "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
317
318impl ServerCommandConfig {
319 fn to_db_options(&self) -> Result<RedDBOptions, String> {
320 let mut options = match &self.path {
321 Some(path) => RedDBOptions::persistent(path),
322 None => RedDBOptions::in_memory(),
323 };
324
325 options.mode = StorageMode::Persistent;
326 options.create_if_missing = self.create_if_missing;
327 options.read_only = self.read_only
334 || env_nonempty("RED_READONLY")
335 .or_else(|| env_nonempty("REDDB_READONLY"))
336 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
337 .unwrap_or(false)
338 || self.path.as_ref().is_some_and(|data_path| {
339 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
340 data_path,
341 ))
342 .unwrap_or(false)
343 });
344
345 options.replication = match self.role.as_str() {
346 "primary" => ReplicationConfig::primary(),
347 "replica" => {
348 let primary_addr = self
349 .primary_addr
350 .clone()
351 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
352 ReplicationConfig::replica(primary_addr)
359 }
360 _ => ReplicationConfig::standalone(),
361 };
362 options.storage_profile = self.storage_profile.validate()?;
363
364 let no_auth = self.no_auth || env_truthy("REDDB_NO_AUTH") || env_truthy("REDDB_DEV");
365 let preset = self.bootstrap.resolved_preset();
366 let preset_requires_auth = matches!(preset.as_str(), PRESET_PRODUCTION | PRESET_CLOUD);
367
368 if self.auth || env_truthy("REDDB_AUTH") || preset_requires_auth {
369 options.auth.enabled = true;
370 }
371 if self.require_auth || env_truthy("REDDB_REQUIRE_AUTH") || preset_requires_auth {
372 options.auth.enabled = true;
373 options.auth.require_auth = true;
374 }
375 if self.vault || env_truthy("REDDB_VAULT") || preset_requires_auth {
376 options.auth.vault_enabled = true;
377 }
378
379 if no_auth {
390 options.auth.enabled = false;
391 options.auth.require_auth = false;
392 options.auth.vault_enabled = false;
393 options
394 .metadata
395 .insert(NO_AUTH_META.to_string(), "true".to_string());
396 }
397
398 if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
403 options.control_events.compliance_mode = matches!(
404 raw.to_ascii_lowercase().as_str(),
405 "1" | "true" | "yes" | "on"
406 );
407 }
408 if preset == PRESET_REGULATED {
409 options.control_events.compliance_mode = true;
410 options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
411 }
412
413 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
418 Err(msg) => {
419 return Err(format!("backup bootstrap: {msg}"));
420 }
421 Ok(Some(cfg)) => {
422 apply_backup_config(&mut options, &cfg);
423 }
424 Ok(None) => {
425 configure_remote_backend_from_env(&mut options);
426 }
427 }
428
429 if options.remote_backend.is_some()
430 || options
431 .metadata
432 .contains_key(BACKUP_INTERVAL_META_CHECKPOINT)
433 {
434 let mut selection = options.storage_profile;
435 selection.managed_backup = true;
436 options.storage_profile = selection.validate()?;
437 }
438
439 Ok(options)
440 }
441
442 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
443 let mut transports = Vec::with_capacity(3);
444 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
445 transports.push(ServerTransport::Grpc);
446 }
447 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
448 transports.push(ServerTransport::Http);
449 }
450 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
451 transports.push(ServerTransport::Wire);
452 }
453 transports
454 }
455}
456
457fn env_nonempty(name: &str) -> Option<String> {
462 crate::utils::env_with_file_fallback(name)
463}
464
465fn env_truthy(name: &str) -> bool {
466 env_nonempty(name)
467 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
468 .unwrap_or(false)
469}
470
471impl BootstrapConfig {
472 fn resolved_preset(&self) -> String {
473 self.preset
474 .as_deref()
475 .map(str::trim)
476 .filter(|s| !s.is_empty())
477 .map(str::to_string)
478 .or_else(|| env_nonempty(BOOTSTRAP_PRESET_ENV).or_else(|| env_nonempty(PRESET_ENV)))
479 .unwrap_or_else(|| PRESET_SIMPLE.to_string())
480 }
481
482 fn selected_manifest(&self) -> Option<PathBuf> {
483 self.manifest.clone().or_else(|| {
484 env_nonempty(crate::cli::bootstrap_manifest::MANIFEST_ENV).map(PathBuf::from)
485 })
486 }
487
488 fn auth_bootstrap_input(&self) -> crate::cluster::AuthBootstrapInput {
492 use crate::cluster::AuthBootstrapInput;
493 if self.selected_manifest().is_some() {
494 return AuthBootstrapInput::Manifest;
495 }
496 let preset = self.resolved_preset();
497 let preset_creates_auth = matches!(
498 preset.as_str(),
499 PRESET_PRODUCTION | PRESET_CLOUD | PRESET_REGULATED
500 );
501 let credentials_present =
506 self.production_username().is_some() && self.production_password().is_some();
507 if preset_creates_auth || credentials_present {
508 AuthBootstrapInput::Env
509 } else {
510 AuthBootstrapInput::None
511 }
512 }
513
514 fn explicit_value(value: &Option<String>) -> Option<String> {
515 value
516 .as_deref()
517 .map(str::trim)
518 .filter(|s| !s.is_empty())
519 .map(str::to_string)
520 }
521
522 fn value_or_env(value: &Option<String>, env: &str) -> Option<String> {
523 Self::explicit_value(value).or_else(|| env_nonempty(env))
524 }
525
526 fn production_username(&self) -> Option<String> {
527 Self::value_or_env(&self.admin_username, "REDDB_USERNAME")
528 }
529
530 fn production_password(&self) -> Option<String> {
531 Self::value_or_env(&self.admin_password, "REDDB_PASSWORD")
532 }
533
534 fn cloud_head_username(&self) -> Option<String> {
535 Self::explicit_value(&self.cloud_head_admin)
536 .or_else(|| Self::explicit_value(&self.admin_username))
537 .or_else(|| env_nonempty("REDDB_CLOUD_HEAD_ADMIN"))
538 .or_else(|| env_nonempty("REDDB_USERNAME"))
539 }
540
541 fn cloud_head_password(&self) -> Option<String> {
542 Self::explicit_value(&self.cloud_head_admin_password)
543 .or_else(|| Self::explicit_value(&self.admin_password))
544 .or_else(|| env_nonempty("REDDB_CLOUD_HEAD_ADMIN_PASSWORD"))
545 .or_else(|| env_nonempty("REDDB_PASSWORD"))
546 }
547
548 fn customer_username(&self) -> Option<String> {
549 Self::value_or_env(&self.customer_admin, "REDDB_CUSTOMER_ADMIN")
550 }
551
552 fn customer_password(&self) -> Option<String> {
553 Self::value_or_env(
554 &self.customer_admin_password,
555 "REDDB_CUSTOMER_ADMIN_PASSWORD",
556 )
557 }
558
559 fn secret_env_vars_to_expand(&self, preset: &str) -> Vec<&'static str> {
560 let mut vars = Vec::new();
561 match preset {
562 PRESET_PRODUCTION => {
563 if Self::explicit_value(&self.admin_username).is_none() {
564 vars.push("REDDB_USERNAME");
565 }
566 if Self::explicit_value(&self.admin_password).is_none() {
567 vars.push("REDDB_PASSWORD");
568 }
569 }
570 PRESET_CLOUD => {
571 if Self::explicit_value(&self.cloud_head_admin).is_none()
572 && Self::explicit_value(&self.admin_username).is_none()
573 {
574 vars.push("REDDB_USERNAME");
575 }
576 if Self::explicit_value(&self.cloud_head_admin_password).is_none()
577 && Self::explicit_value(&self.admin_password).is_none()
578 {
579 vars.push("REDDB_CLOUD_HEAD_ADMIN_PASSWORD");
580 vars.push("REDDB_PASSWORD");
581 }
582 if Self::explicit_value(&self.customer_admin_password).is_none() {
583 vars.push("REDDB_CUSTOMER_ADMIN_PASSWORD");
584 }
585 }
586 _ => {}
587 }
588 vars
589 }
590}
591
592fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
598 let endpoint_host = endpoint_host(&cfg.endpoint);
599
600 options.metadata.insert(
601 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
602 cfg.checkpoint_interval_secs.to_string(),
603 );
604 options.metadata.insert(
605 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
606 cfg.wal_flush_interval_secs.to_string(),
607 );
608 options
609 .metadata
610 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
611 options.metadata.insert(
612 BACKUP_PAUSE_ON_LAG_META.to_string(),
613 cfg.pause_on_lag_secs.to_string(),
614 );
615
616 #[cfg(feature = "backend-s3")]
617 {
618 let s3_cfg = crate::storage::backend::S3Config {
619 endpoint: cfg.endpoint.clone(),
620 bucket: cfg.bucket.clone(),
621 key_prefix: cfg.prefix.clone(),
622 access_key: cfg.access_key_id.clone(),
623 secret_key: cfg.secret_access_key.clone(),
624 region: cfg.region.clone(),
625 path_style: true,
626 };
627 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
628 options.remote_backend = Some(backend.clone());
629 options.remote_backend_atomic = Some(backend);
630 let trimmed = cfg.prefix.trim_end_matches('/');
635 options.remote_key = Some(reddb_file::remote_database_key(trimmed));
636
637 tracing::info!(
638 backend = "s3",
639 endpoint = %endpoint_host,
640 bucket = %cfg.bucket,
641 prefix = %cfg.prefix,
642 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
643 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
644 "backup backend configured from REDDB_BACKUP_* env"
645 );
646 }
647
648 #[cfg(not(feature = "backend-s3"))]
649 {
650 tracing::warn!(
651 backend = "s3",
652 endpoint = %endpoint_host,
653 bucket = %cfg.bucket,
654 prefix = %cfg.prefix,
655 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
656 backend wiring skipped (archiver/checkpointer also disabled)"
657 );
658 }
659}
660
661fn endpoint_host(endpoint: &str) -> &str {
662 let after_scheme = endpoint
663 .split_once("://")
664 .map(|(_, r)| r)
665 .unwrap_or(endpoint);
666 after_scheme.split('/').next().unwrap_or(after_scheme)
667}
668
669fn spawn_backup_tasks_if_configured(
675 options: &RedDBOptions,
676 runtime: &RedDBRuntime,
677) -> Option<BackupTasksHandle> {
678 let checkpoint_secs: u64 = options
679 .metadata
680 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
681 .parse()
682 .ok()?;
683 let wal_secs: u64 = options
684 .metadata
685 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
686 .parse()
687 .ok()?;
688 let pause_on_lag_secs: u64 = options
691 .metadata
692 .get(BACKUP_PAUSE_ON_LAG_META)
693 .and_then(|raw| raw.parse().ok())
694 .unwrap_or(0);
695 options.remote_backend.as_ref()?;
696
697 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
698
699 if pause_on_lag_secs > 0 {
704 let now_ms = std::time::SystemTime::now()
705 .duration_since(std::time::UNIX_EPOCH)
706 .map(|d| d.as_millis() as u64)
707 .unwrap_or(0);
708 runtime
709 .write_gate()
710 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
711 tracing::info!(
712 pause_on_lag_secs,
713 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
714 );
715 }
716
717 let checkpoint_handle = {
718 let stop = Arc::clone(&stop);
719 let runtime = runtime.clone();
720 let interval = Duration::from_secs(checkpoint_secs);
721 thread::Builder::new()
722 .name("red-checkpointer".into())
723 .spawn(move || {
724 periodic_loop(stop, interval, move || {
725 if let Err(err) = runtime.checkpoint() {
726 tracing::warn!(error = %err, "periodic checkpoint failed");
727 }
728 })
729 })
730 .ok()
731 };
732
733 let archiver_handle = {
734 let stop = Arc::clone(&stop);
735 let runtime = runtime.clone();
736 let interval = Duration::from_secs(wal_secs);
737 let lag_enabled = pause_on_lag_secs > 0;
738 thread::Builder::new()
739 .name("red-wal-archiver".into())
740 .spawn(move || {
741 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
742 Ok(_) if lag_enabled => {
743 let now_ms = std::time::SystemTime::now()
744 .duration_since(std::time::UNIX_EPOCH)
745 .map(|d| d.as_millis() as u64)
746 .unwrap_or(0);
747 runtime.write_gate().record_archive_success(now_ms);
748 runtime.write_gate().evaluate_archive_lag(now_ms);
752 }
753 Ok(_) => {}
754 Err(err) => {
755 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
756 }
757 })
758 })
759 .ok()
760 };
761
762 let lag_monitor_handle = if pause_on_lag_secs > 0 {
767 let stop = Arc::clone(&stop);
768 let runtime = runtime.clone();
769 let interval = Duration::from_secs(5);
773 thread::Builder::new()
774 .name("red-archive-lag-monitor".into())
775 .spawn(move || {
776 periodic_loop(stop, interval, move || {
777 let now_ms = std::time::SystemTime::now()
778 .duration_since(std::time::UNIX_EPOCH)
779 .map(|d| d.as_millis() as u64)
780 .unwrap_or(0);
781 let was_paused = runtime.write_gate().is_auto_paused();
782 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
783 if now_paused && !was_paused {
784 tracing::warn!(
785 pause_on_lag_secs,
786 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
787 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
788 );
789 } else if !now_paused && was_paused {
790 tracing::info!(
791 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
792 );
793 }
794 })
795 })
796 .ok()
797 } else {
798 None
799 };
800
801 tracing::info!(
802 checkpoint_interval_secs = checkpoint_secs,
803 wal_flush_interval_secs = wal_secs,
804 "backup tasks spawned (checkpointer + WAL archiver)"
805 );
806
807 Some(BackupTasksHandle {
808 stop,
809 _checkpoint_handle: checkpoint_handle,
810 _archiver_handle: archiver_handle,
811 _lag_monitor_handle: lag_monitor_handle,
812 })
813}
814
815pub struct BackupTasksHandle {
818 stop: Arc<std::sync::atomic::AtomicBool>,
819 _checkpoint_handle: Option<thread::JoinHandle<()>>,
820 _archiver_handle: Option<thread::JoinHandle<()>>,
821 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
824}
825
826impl Drop for BackupTasksHandle {
827 fn drop(&mut self) {
828 self.stop.store(true, std::sync::atomic::Ordering::Release);
829 }
830}
831
832fn periodic_loop<F: FnMut()>(
833 stop: Arc<std::sync::atomic::AtomicBool>,
834 interval: Duration,
835 mut tick: F,
836) {
837 let wake = Duration::from_secs(1);
840 let mut elapsed = Duration::ZERO;
841 while !stop.load(std::sync::atomic::Ordering::Acquire) {
842 thread::sleep(wake);
843 elapsed += wake;
844 if elapsed >= interval {
845 tick();
846 elapsed = Duration::ZERO;
847 }
848 }
849}
850
851fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
852 let backend = env_nonempty("RED_BACKEND")
858 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
859 .unwrap_or_else(|| "none".to_string())
860 .to_ascii_lowercase();
861
862 match backend.as_str() {
863 "s3" | "minio" | "r2" => {
868 #[cfg(feature = "backend-s3")]
869 {
870 if let Some(config) = s3_config_from_env() {
871 let remote_key = env_nonempty("RED_REMOTE_KEY")
872 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
873 .unwrap_or_else(|| reddb_file::remote_database_key("clusters/dev"));
874 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
875 options.remote_backend = Some(backend.clone());
876 options.remote_backend_atomic = Some(backend);
877 options.remote_key = Some(remote_key);
878 }
879 }
880 #[cfg(not(feature = "backend-s3"))]
881 {
882 tracing::warn!(
883 backend = %backend,
884 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
885 );
886 }
887 }
888 "fs" | "local" => {
893 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
894 let remote_key = match (
895 base_path,
896 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
897 ) {
898 (Some(base), Some(rel)) => Some(format!(
899 "{}/{}",
900 base.trim_end_matches('/'),
901 rel.trim_start_matches('/')
902 )),
903 (Some(base), None) => Some(reddb_file::remote_database_key(&format!(
904 "{}/clusters/dev",
905 base.trim_end_matches('/')
906 ))),
907 (None, Some(rel)) => Some(rel),
908 (None, None) => None,
909 };
910 if let Some(key) = remote_key {
911 let backend = Arc::new(crate::storage::backend::LocalBackend);
912 options.remote_backend = Some(backend.clone());
913 options.remote_backend_atomic = Some(backend);
914 options.remote_key = Some(key);
915 }
916 }
917 "http" => {
922 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
923 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
924 {
925 Some(u) => u,
926 None => {
927 tracing::warn!(
928 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
929 );
930 return;
931 }
932 };
933 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
934 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
935 .unwrap_or_default();
936 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
937 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
938 {
939 std::fs::read_to_string(&path)
940 .ok()
941 .map(|s| s.trim().to_string())
942 .filter(|s| !s.is_empty())
943 } else {
944 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
945 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
946 };
947
948 let mut config =
949 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
950 if let Some(auth) = auth_header {
951 config = config.with_auth_header(auth);
952 }
953 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
954 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
955 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
956 config = config.with_conditional_writes(conditional_writes);
957 if conditional_writes {
962 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
963 Ok(atomic) => {
964 let atomic_arc = Arc::new(atomic);
965 options.remote_backend = Some(atomic_arc.clone());
966 options.remote_backend_atomic = Some(atomic_arc);
967 }
968 Err(err) => {
969 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
970 options.remote_backend =
971 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
972 }
973 }
974 } else {
975 options.remote_backend =
976 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
977 }
978 options.remote_key = env_nonempty("RED_REMOTE_KEY")
979 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
980 .or_else(|| Some(reddb_file::remote_database_key("clusters/dev")));
981 }
982 "none" | "" => {}
985 other => {
986 tracing::warn!(
987 backend = %other,
988 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
989 );
990 }
991 }
992}
993
994#[cfg(feature = "backend-s3")]
999fn env_s3(suffix: &str) -> Option<String> {
1000 env_nonempty(&format!("RED_S3_{suffix}"))
1001 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
1002}
1003
1004#[cfg(feature = "backend-s3")]
1010fn env_s3_secret(suffix: &str) -> Option<String> {
1011 let file_key_red = format!("RED_S3_{suffix}_FILE");
1012 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
1013 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
1014 return std::fs::read_to_string(&path)
1015 .ok()
1016 .map(|s| s.trim().to_string())
1017 .filter(|s| !s.is_empty());
1018 }
1019 env_s3(suffix)
1020}
1021
1022#[cfg(feature = "backend-s3")]
1023fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
1024 let endpoint = env_s3("ENDPOINT")?;
1025 let bucket = env_s3("BUCKET")?;
1026 let access_key = env_s3_secret("ACCESS_KEY")?;
1027 let secret_key = env_s3_secret("SECRET_KEY")?;
1028 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
1029 let key_prefix = env_s3("KEY_PREFIX")
1030 .or_else(|| env_s3("PREFIX"))
1031 .unwrap_or_default();
1032 let path_style = env_s3("PATH_STYLE")
1033 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1034 .unwrap_or(true);
1035 Some(crate::storage::backend::S3Config {
1036 endpoint,
1037 bucket,
1038 key_prefix,
1039 access_key,
1040 secret_key,
1041 region,
1042 path_style,
1043 })
1044}
1045
1046pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
1047 let data_dir = config.data_dir();
1048 let exec_start = render_systemd_exec_start(config);
1049 format!(
1050 "[Unit]\n\
1051Description=RedDB unified database service\n\
1052After=network-online.target\n\
1053Wants=network-online.target\n\
1054\n\
1055[Service]\n\
1056Type=simple\n\
1057User={user}\n\
1058Group={group}\n\
1059WorkingDirectory={workdir}\n\
1060ExecStart={exec_start}\n\
1061Restart=always\n\
1062RestartSec=2\n\
1063LimitSTACK=16M\n\
1064NoNewPrivileges=true\n\
1065PrivateTmp=true\n\
1066ProtectSystem=strict\n\
1067ProtectHome=true\n\
1068ProtectControlGroups=true\n\
1069ProtectKernelTunables=true\n\
1070ProtectKernelModules=true\n\
1071RestrictNamespaces=true\n\
1072LockPersonality=true\n\
1073MemoryDenyWriteExecute=true\n\
1074ReadWritePaths={workdir}\n\
1075\n\
1076[Install]\n\
1077WantedBy=multi-user.target\n",
1078 user = config.run_user,
1079 group = config.run_group,
1080 workdir = data_dir.display(),
1081 exec_start = exec_start,
1082 )
1083}
1084
1085#[cfg(target_os = "linux")]
1094pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
1095 ensure_root()?;
1096 ensure_command_available("systemctl")?;
1097 ensure_command_available("getent")?;
1098 ensure_command_available("groupadd")?;
1099 ensure_command_available("useradd")?;
1100 ensure_command_available("install")?;
1101 ensure_executable(&config.binary_path)?;
1102
1103 if !command_success("getent", ["group", config.run_group.as_str()])? {
1104 run_command("groupadd", ["--system", config.run_group.as_str()])?;
1105 }
1106
1107 if !command_success("id", ["-u", config.run_user.as_str()])? {
1108 let data_dir = config.data_dir();
1109 run_command(
1110 "useradd",
1111 [
1112 "--system",
1113 "--gid",
1114 config.run_group.as_str(),
1115 "--home-dir",
1116 data_dir.to_string_lossy().as_ref(),
1117 "--shell",
1118 "/usr/sbin/nologin",
1119 config.run_user.as_str(),
1120 ],
1121 )?;
1122 }
1123
1124 let data_dir = config.data_dir();
1125 run_command(
1126 "install",
1127 [
1128 "-d",
1129 "-o",
1130 config.run_user.as_str(),
1131 "-g",
1132 config.run_group.as_str(),
1133 "-m",
1134 "0750",
1135 data_dir.to_string_lossy().as_ref(),
1136 ],
1137 )?;
1138
1139 std::fs::write(config.unit_path(), render_systemd_unit(config))
1140 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
1141
1142 run_command("systemctl", ["daemon-reload"])?;
1143 run_command(
1144 "systemctl",
1145 [
1146 "enable",
1147 "--now",
1148 format!("{}.service", config.service_name).as_str(),
1149 ],
1150 )?;
1151
1152 Ok(())
1153}
1154
1155#[cfg(not(target_os = "linux"))]
1160pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
1161 Err("systemd install is Linux-only — use sc.exe (Windows) or \
1162 launchd (macOS) to install the service manually using the \
1163 unit printed by `red service print-unit`"
1164 .to_string())
1165}
1166
1167#[cfg(target_os = "linux")]
1168fn ensure_root() -> Result<(), String> {
1169 let output = Command::new("id")
1170 .arg("-u")
1171 .output()
1172 .map_err(|err| format!("failed to determine current uid: {err}"))?;
1173 if !output.status.success() {
1174 return Err("failed to determine current uid".to_string());
1175 }
1176 let uid = String::from_utf8_lossy(&output.stdout);
1177 if uid.trim() != "0" {
1178 return Err("run this command as root (sudo)".to_string());
1179 }
1180 Ok(())
1181}
1182
1183#[cfg(target_os = "linux")]
1184fn ensure_command_available(command: &str) -> Result<(), String> {
1185 let status = Command::new("sh")
1186 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1187 .status()
1188 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1189 if status.success() {
1190 Ok(())
1191 } else {
1192 Err(format!("required command not found: {command}"))
1193 }
1194}
1195
1196#[cfg(target_os = "linux")]
1197fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1198 let metadata = std::fs::metadata(path)
1199 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1200 #[cfg(unix)]
1201 {
1202 use std::os::unix::fs::PermissionsExt;
1203 if metadata.permissions().mode() & 0o111 == 0 {
1204 return Err(format!("binary is not executable: {}", path.display()));
1205 }
1206 }
1207 #[cfg(not(unix))]
1208 {
1209 if !metadata.is_file() {
1210 return Err(format!("binary is not a file: {}", path.display()));
1211 }
1212 }
1213 Ok(())
1214}
1215
1216#[cfg(target_os = "linux")]
1217fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1218 Command::new(program)
1219 .args(args)
1220 .status()
1221 .map(|status| status.success())
1222 .map_err(|err| format!("failed to run {program}: {err}"))
1223}
1224
1225#[cfg(target_os = "linux")]
1226fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1227 let output = Command::new(program)
1228 .args(args)
1229 .output()
1230 .map_err(|err| format!("failed to run {program}: {err}"))?;
1231 if output.status.success() {
1232 return Ok(());
1233 }
1234
1235 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1236 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1237 let detail = if !stderr.is_empty() {
1238 stderr
1239 } else if !stdout.is_empty() {
1240 stdout
1241 } else {
1242 format!("exit status {}", output.status)
1243 };
1244 Err(format!("{program} failed: {detail}"))
1245}
1246
1247pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1248 let has_any = config.router_bind_addr.is_some()
1249 || config.grpc_bind_addr.is_some()
1250 || config.http_bind_addr.is_some()
1251 || config.wire_bind_addr.is_some()
1252 || config.pg_bind_addr.is_some();
1253 if !has_any {
1254 return Err("at least one server bind address must be configured".into());
1255 }
1256 let thread_name = if config.router_bind_addr.is_some() {
1257 "red-server-router"
1258 } else {
1259 match (
1260 config.grpc_bind_addr.is_some(),
1261 config.http_bind_addr.is_some(),
1262 ) {
1263 (true, true) => "red-server-dual",
1264 (true, false) => "red-server-grpc",
1265 (false, true) => "red-server-http",
1266 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1267 (false, false) => "red-server-pg-wire",
1268 }
1269 };
1270
1271 let handle = thread::Builder::new()
1272 .name(thread_name.into())
1273 .stack_size(8 * 1024 * 1024)
1274 .spawn(move || run_configured_servers(config))
1275 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1276
1277 match handle.join() {
1278 Ok(result) => result,
1279 Err(_) => Err("server thread panicked".to_string()),
1280 }
1281}
1282
1283fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1284 let mut parts = vec![
1285 config.binary_path.display().to_string(),
1286 "server".to_string(),
1287 "--path".to_string(),
1288 config.data_path.display().to_string(),
1289 ];
1290
1291 if let Some(bind_addr) = &config.router_bind_addr {
1292 parts.push("--bind".to_string());
1293 parts.push(bind_addr.clone());
1294 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1295 parts.push("--grpc-bind".to_string());
1296 parts.push(bind_addr.clone());
1297 }
1298 if let Some(bind_addr) = &config.http_bind_addr {
1299 parts.push("--http-bind".to_string());
1300 parts.push(bind_addr.clone());
1301 }
1302
1303 parts.join(" ")
1304}
1305
1306pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1307 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1308 Ok(addresses) => addresses.collect(),
1309 Err(_) => return false,
1310 };
1311
1312 addresses
1313 .into_iter()
1314 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1315}
1316
1317#[inline(never)]
1318fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1319 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1325 return run_routed_server(config, router_bind_addr);
1326 }
1327
1328 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1329 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1330 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1331 }
1332 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1333 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1334 (None, None) => {
1335 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1336 run_wire_only_server(config, wire_addr)
1337 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1338 run_pg_only_server(config, pg_addr)
1339 } else {
1340 Err("at least one server bind address must be configured".to_string())
1341 }
1342 }
1343 }
1344}
1345
1346pub fn bind_listener_for_startup(
1364 readiness: &mut TransportReadiness,
1365 transport: &str,
1366 bind_addr: &str,
1367 explicit: bool,
1368) -> Result<Option<TcpListener>, String> {
1369 match TcpListener::bind(bind_addr) {
1370 Ok(listener) => {
1371 readiness.active(transport, bind_addr, explicit);
1372 Ok(Some(listener))
1373 }
1374 Err(err) => {
1375 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1376 readiness.failed(transport, bind_addr, explicit, reason.clone());
1377 if explicit {
1378 tracing::error!(
1379 transport,
1380 bind = %bind_addr,
1381 error = %err,
1382 "fatal explicit bind failure"
1383 );
1384 Err(format!("explicit {reason}"))
1385 } else {
1386 tracing::warn!(
1387 transport,
1388 bind = %bind_addr,
1389 error = %err,
1390 "non-fatal implicit bind failure; listener degraded"
1391 );
1392 Ok(None)
1393 }
1394 }
1395 }
1396}
1397
1398async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1421 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1422 .ok()
1423 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1424 .unwrap_or(true);
1425
1426 #[cfg(unix)]
1427 {
1428 use tokio::signal::unix::{signal, SignalKind};
1429
1430 let mut sigterm = match signal(SignalKind::terminate()) {
1431 Ok(s) => s,
1432 Err(err) => {
1433 tracing::warn!(
1434 error = %err,
1435 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1436 );
1437 return;
1438 }
1439 };
1440 let mut sigint = match signal(SignalKind::interrupt()) {
1441 Ok(s) => s,
1442 Err(err) => {
1443 tracing::warn!(error = %err, "could not install SIGINT handler");
1444 return;
1445 }
1446 };
1447 let mut sighup = match signal(SignalKind::hangup()) {
1453 Ok(s) => Some(s),
1454 Err(err) => {
1455 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1456 None
1457 }
1458 };
1459
1460 let reload_runtime = runtime.clone();
1461 tokio::spawn(async move {
1462 loop {
1463 let signal_name = match &mut sighup {
1464 Some(hup) => tokio::select! {
1465 _ = sigterm.recv() => "SIGTERM",
1466 _ = sigint.recv() => "SIGINT",
1467 _ = hup.recv() => "SIGHUP",
1468 },
1469 None => tokio::select! {
1470 _ = sigterm.recv() => "SIGTERM",
1471 _ = sigint.recv() => "SIGINT",
1472 },
1473 };
1474
1475 if signal_name == "SIGHUP" {
1476 handle_sighup_reload(&reload_runtime);
1477 continue; }
1479
1480 tracing::info!(
1481 signal = signal_name,
1482 "lifecycle signal received; shutting down"
1483 );
1484 match runtime.graceful_shutdown(backup_on_shutdown) {
1485 Ok(report) => {
1486 tracing::info!(
1487 duration_ms = report.duration_ms,
1488 flushed_wal = report.flushed_wal,
1489 final_checkpoint = report.final_checkpoint,
1490 backup_uploaded = report.backup_uploaded,
1491 "graceful shutdown complete"
1492 );
1493 }
1494 Err(err) => {
1495 tracing::error!(error = %err, "graceful shutdown failed");
1496 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1502 reason: format!("graceful shutdown failed: {err}"),
1503 }
1504 .emit_global();
1505 }
1506 }
1507 std::process::exit(0);
1508 }
1509 });
1510 }
1511
1512 #[cfg(not(unix))]
1513 {
1514 tokio::spawn(async move {
1515 let interrupted = tokio::signal::ctrl_c().await;
1516 if let Err(err) = interrupted {
1517 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1518 return;
1519 }
1520
1521 tracing::info!(
1522 signal = "Ctrl+C",
1523 "lifecycle signal received; shutting down"
1524 );
1525 match runtime.graceful_shutdown(backup_on_shutdown) {
1526 Ok(report) => {
1527 tracing::info!(
1528 duration_ms = report.duration_ms,
1529 flushed_wal = report.flushed_wal,
1530 final_checkpoint = report.final_checkpoint,
1531 backup_uploaded = report.backup_uploaded,
1532 "graceful shutdown complete"
1533 );
1534 }
1535 Err(err) => {
1536 tracing::error!(error = %err, "graceful shutdown failed");
1537 }
1538 }
1539 std::process::exit(0);
1540 });
1541 }
1542}
1543
1544fn handle_sighup_reload(runtime: &RedDBRuntime) {
1553 let now_ms = std::time::SystemTime::now()
1554 .duration_since(std::time::UNIX_EPOCH)
1555 .map(|d| d.as_millis() as u64)
1556 .unwrap_or(0);
1557 tracing::info!(
1558 target: "reddb::secrets",
1559 ts_unix_ms = now_ms,
1560 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1561 );
1562 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1567 runtime.audit_log().record_event(
1568 AuditEvent::builder("config/sighup_reload")
1569 .source(AuditAuthSource::System)
1570 .resource("secrets")
1571 .outcome(Outcome::Success)
1572 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1573 .build(),
1574 );
1575}
1576
1577#[inline(never)]
1578fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1579 let workers = config.workers;
1580 let db_options = config.to_db_options()?;
1581 let rt_config = detect_runtime_config();
1582 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1583 let (runtime, auth_store, _telemetry_guard) =
1584 build_runtime_and_auth_store(&config, db_options.clone())?;
1585 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1586
1587 spawn_admin_metrics_listeners(&runtime, &auth_store);
1588
1589 let http_server = build_http_server(
1595 runtime.clone(),
1596 auth_store.clone(),
1597 router_bind_addr.clone(),
1598 );
1599 let http_server = apply_http_limits(http_server, &config, &runtime);
1600 let http_server = apply_ui_bundle(http_server, &config)?;
1601
1602 let grpc_server = RedDBGrpcServer::with_options(
1603 runtime.clone(),
1604 GrpcServerOptions {
1605 bind_addr: router_bind_addr.clone(),
1606 tls: None,
1607 },
1608 auth_store,
1609 );
1610
1611 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1612 .enable_all()
1613 .worker_threads(worker_threads)
1614 .thread_stack_size(rt_config.stack_size)
1615 .build()
1616 .map_err(|err| format!("tokio runtime: {err}"))?;
1617
1618 let signal_runtime = runtime.clone();
1619 let wire_runtime = Arc::new(runtime);
1620 tokio_runtime.block_on(async move {
1621 spawn_lifecycle_signal_handler(signal_runtime).await;
1622 tracing::info!(
1623 bind = %router_bind_addr,
1624 cpus = rt_config.available_cpus,
1625 workers = worker_threads,
1626 "router bootstrapping"
1627 );
1628 serve_tcp_router(InProcessRouterConfig {
1629 bind_addr: router_bind_addr,
1630 http_server,
1631 grpc_server,
1632 wire_runtime,
1633 })
1634 .await
1635 .map_err(|err| err.to_string())
1636 })
1637}
1638
1639async fn spawn_wire_listeners(
1641 config: &ServerCommandConfig,
1642 runtime: &RedDBRuntime,
1643 readiness: &mut TransportReadiness,
1644) -> Result<(), String> {
1645 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1647 let wire_rt = Arc::new(runtime.clone());
1648 #[cfg(unix)]
1651 {
1652 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1653 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1654 tokio::spawn(async move {
1655 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1656 &wire_addr, wire_rt,
1657 )
1658 .await
1659 {
1660 tracing::error!(err = %e, "redwire unix listener error");
1661 }
1662 });
1663 return Ok(());
1664 }
1665 }
1666 match tokio::net::TcpListener::bind(&wire_addr).await {
1667 Ok(listener) => {
1668 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1669 tokio::spawn(async move {
1670 if let Err(e) =
1671 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1672 .await
1673 {
1674 tracing::error!(err = %e, "redwire listener error");
1675 }
1676 });
1677 }
1678 Err(err) => {
1679 let reason = format!("wire listener bind {wire_addr}: {err}");
1680 readiness.failed(
1681 "wire",
1682 &wire_addr,
1683 config.wire_bind_explicit,
1684 reason.clone(),
1685 );
1686 if config.wire_bind_explicit {
1687 tracing::error!(
1688 transport = "wire",
1689 bind = %wire_addr,
1690 error = %err,
1691 "fatal explicit bind failure"
1692 );
1693 return Err(format!("explicit {reason}"));
1694 }
1695 tracing::warn!(
1696 transport = "wire",
1697 bind = %wire_addr,
1698 error = %err,
1699 "non-fatal implicit bind failure; listener degraded"
1700 );
1701 }
1702 }
1703 }
1704
1705 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1707 let tls_config = resolve_wire_tls_config(config);
1708 match tls_config {
1709 Ok(tls_cfg) => {
1710 let wire_rt = Arc::new(runtime.clone());
1711 tokio::spawn(async move {
1712 if let Err(e) =
1713 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1714 .await
1715 {
1716 tracing::error!(err = %e, "redwire+tls listener error");
1717 }
1718 });
1719 }
1720 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1721 }
1722 }
1723 Ok(())
1724}
1725
1726fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1733 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1734 let rt = Arc::new(runtime.clone());
1735 tokio::spawn(async move {
1736 let cfg = crate::wire::PgWireConfig {
1737 bind_addr: pg_addr,
1738 ..Default::default()
1739 };
1740 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1741 tracing::error!(err = %e, "pg wire listener error");
1742 }
1743 });
1744 }
1745}
1746
1747fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1761 use crate::utils::secret_file::expand_file_env;
1762
1763 for var in [
1767 "REDDB_GRPC_TLS_CERT",
1768 "REDDB_GRPC_TLS_KEY",
1769 "REDDB_GRPC_TLS_CLIENT_CA",
1770 ] {
1771 if let Err(err) = expand_file_env(var) {
1772 tracing::warn!(
1773 target: "reddb::secrets",
1774 env = %var,
1775 err = %err,
1776 "could not expand *_FILE companion for gRPC TLS"
1777 );
1778 }
1779 }
1780
1781 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1782 (Some(cert), Some(key)) => {
1783 let cert_pem = std::fs::read(cert)
1784 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1785 let key_pem =
1786 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1787 (cert_pem, key_pem)
1788 }
1789 _ => {
1790 let dev = std::env::var("RED_GRPC_TLS_DEV")
1792 .ok()
1793 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1794 .unwrap_or(false);
1795 if !dev {
1796 return Err("gRPC TLS configured but no cert/key supplied — set \
1797 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1798 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1799 .to_string());
1800 }
1801 let dir = config
1802 .path
1803 .as_ref()
1804 .and_then(|p| p.parent())
1805 .map(PathBuf::from)
1806 .unwrap_or_else(|| PathBuf::from("."));
1807 let (cert_pem_str, key_pem_str) =
1808 crate::wire::tls::generate_self_signed_cert("localhost")
1809 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1810
1811 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1816 tracing::warn!(
1817 target: "reddb::security",
1818 transport = "grpc",
1819 cert_sha256 = %fp,
1820 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1821 DO NOT use in production"
1822 );
1823 let cert_path = dir.join("grpc-tls-cert.pem");
1825 let key_path = dir.join("grpc-tls-key.pem");
1826 if !cert_path.exists() || !key_path.exists() {
1827 let _ = std::fs::create_dir_all(&dir);
1828 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1829 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1830 std::fs::write(&key_path, key_pem_str.as_bytes())
1831 .map_err(|e| format!("write grpc dev key: {e}"))?;
1832 #[cfg(unix)]
1833 {
1834 use std::os::unix::fs::PermissionsExt;
1835 let _ =
1836 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1837 }
1838 }
1839 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1840 }
1841 };
1842
1843 let client_ca_pem = match &config.grpc_tls_client_ca {
1844 Some(path) => Some(
1845 std::fs::read(path)
1846 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1847 ),
1848 None => None,
1849 };
1850
1851 Ok(crate::GrpcTlsOptions {
1852 cert_pem,
1853 key_pem,
1854 client_ca_pem,
1855 })
1856}
1857
1858fn spawn_grpc_tls_listener_if_configured(
1862 config: &ServerCommandConfig,
1863 runtime: RedDBRuntime,
1864 auth_store: Arc<AuthStore>,
1865) {
1866 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1867 return;
1868 };
1869 let tls_opts = match resolve_grpc_tls_options(config) {
1870 Ok(opts) => opts,
1871 Err(err) => {
1872 tracing::error!(
1873 target: "reddb::security",
1874 transport = "grpc",
1875 err = %err,
1876 "gRPC TLS config error; TLS listener will not start"
1877 );
1878 return;
1879 }
1880 };
1881 tokio::spawn(async move {
1882 let server = RedDBGrpcServer::with_options(
1883 runtime,
1884 GrpcServerOptions {
1885 bind_addr: tls_bind.clone(),
1886 tls: Some(tls_opts),
1887 },
1888 auth_store,
1889 );
1890 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1891 if let Err(err) = server.serve().await {
1892 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1893 }
1894 });
1895}
1896
1897fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1900 use sha2::{Digest, Sha256};
1901 let mut h = Sha256::new();
1902 h.update(pem);
1903 let d = h.finalize();
1904 let mut buf = String::with_capacity(64);
1905 for b in d.iter() {
1906 buf.push_str(&format!("{b:02x}"));
1907 }
1908 buf
1909}
1910
1911fn resolve_wire_tls_config(
1913 config: &ServerCommandConfig,
1914) -> Result<crate::wire::WireTlsConfig, String> {
1915 match (&config.wire_tls_cert, &config.wire_tls_key) {
1916 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1917 cert_path: cert.clone(),
1918 key_path: key.clone(),
1919 }),
1920 _ => {
1921 let dir = config
1923 .path
1924 .as_ref()
1925 .and_then(|p| p.parent())
1926 .map(PathBuf::from)
1927 .unwrap_or_else(|| PathBuf::from("."));
1928 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1929 }
1930 }
1931}
1932
1933#[inline(never)]
1934fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1935 let rt_config = detect_runtime_config();
1936 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1937 let db_options = config.to_db_options()?;
1938 let mut transport_readiness = TransportReadiness::default();
1939
1940 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1941 .enable_all()
1942 .worker_threads(workers)
1943 .thread_stack_size(rt_config.stack_size)
1944 .build()
1945 .map_err(|err| format!("tokio runtime: {err}"))?;
1946
1947 let (runtime, _auth_store, _telemetry_guard) =
1951 build_runtime_and_auth_store(&config, db_options.clone())?;
1952 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1953 let signal_runtime = runtime.clone();
1954 tokio_runtime.block_on(async move {
1955 spawn_lifecycle_signal_handler(signal_runtime).await;
1956 spawn_pg_listener(&config, &runtime);
1957 let wire_rt = Arc::new(runtime);
1958 let listener = tokio::net::TcpListener::bind(&wire_addr)
1959 .await
1960 .map_err(|err| {
1961 let reason = format!("wire listener bind {wire_addr}: {err}");
1962 transport_readiness.failed(
1963 "wire",
1964 &wire_addr,
1965 config.wire_bind_explicit,
1966 reason.clone(),
1967 );
1968 if config.wire_bind_explicit {
1969 format!("explicit {reason}")
1970 } else {
1971 reason
1972 }
1973 })?;
1974 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1975 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1976 .await
1977 .map_err(|e| e.to_string())
1978 })
1979}
1980
1981#[inline(never)]
1982fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1983 let rt_config = detect_runtime_config();
1984 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1985 let db_options = config.to_db_options()?;
1986
1987 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1988 .enable_all()
1989 .worker_threads(workers)
1990 .thread_stack_size(rt_config.stack_size)
1991 .build()
1992 .map_err(|err| format!("tokio runtime: {err}"))?;
1993
1994 let (runtime, _auth_store, _telemetry_guard) =
1995 build_runtime_and_auth_store(&config, db_options.clone())?;
1996 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1997 let signal_runtime = runtime.clone();
1998 tokio_runtime.block_on(async move {
1999 spawn_lifecycle_signal_handler(signal_runtime).await;
2000 let cfg = crate::wire::PgWireConfig {
2001 bind_addr: pg_addr,
2002 ..Default::default()
2003 };
2004 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
2005 .await
2006 .map_err(|e| e.to_string())
2007 })
2008}
2009
2010#[inline(never)]
2011fn build_runtime_and_auth_store(
2012 config: &ServerCommandConfig,
2013 db_options: RedDBOptions,
2014) -> Result<
2015 (
2016 RedDBRuntime,
2017 Arc<AuthStore>,
2018 Option<crate::telemetry::TelemetryGuard>,
2019 ),
2020 String,
2021> {
2022 build_runtime_with_bootstrap(db_options, config.telemetry.clone(), &config.bootstrap)
2029}
2030
2031pub(crate) fn build_runtime_with_telemetry(
2041 db_options: RedDBOptions,
2042 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
2043) -> Result<
2044 (
2045 RedDBRuntime,
2046 Arc<AuthStore>,
2047 Option<crate::telemetry::TelemetryGuard>,
2048 ),
2049 String,
2050> {
2051 let bootstrap = BootstrapConfig::default();
2052 build_runtime_with_bootstrap(db_options, cli_telemetry, &bootstrap)
2053}
2054
2055fn build_runtime_with_bootstrap(
2056 db_options: RedDBOptions,
2057 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
2058 bootstrap: &BootstrapConfig,
2059) -> Result<
2060 (
2061 RedDBRuntime,
2062 Arc<AuthStore>,
2063 Option<crate::telemetry::TelemetryGuard>,
2064 ),
2065 String,
2066> {
2067 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
2068 let msg = err.to_string();
2074 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
2075 phase: "runtime_construction".to_string(),
2076 error: msg.clone(),
2077 }
2078 .emit_global();
2079 msg
2080 })?;
2081
2082 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
2087 let msg = err.to_string();
2088 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
2089 phase: "lease_loop".to_string(),
2090 error: msg.clone(),
2091 }
2092 .emit_global();
2093 msg
2094 })?;
2095
2096 if let Some(data_path) = db_options.data_path.as_deref() {
2100 let watch_dir = data_path.parent().unwrap_or(data_path);
2101 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
2102 }
2103
2104 {
2108 let config_path = crate::runtime::config_overlay::config_file_path();
2109 let store = runtime.db().store();
2110 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
2111 }
2112
2113 let merged = merge_telemetry_with_config(
2116 cli_telemetry
2117 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
2118 &runtime,
2119 );
2120 let telemetry_guard = crate::telemetry::init(merged);
2121
2122 let no_auth = no_auth_active(&db_options);
2123 let auth_store =
2124 if db_options.auth.vault_enabled {
2125 let pager =
2126 runtime.db().store().pager().cloned().ok_or_else(|| {
2127 "vault requires a paged database (persistent mode)".to_string()
2128 })?;
2129 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
2130 .map_err(|err| err.to_string())?;
2131 Arc::new(store)
2132 } else {
2133 Arc::new(AuthStore::new(db_options.auth.clone()))
2134 };
2135 auth_store.configure_control_events(
2136 runtime.control_event_ledger(),
2137 runtime.control_event_config(),
2138 );
2139 let bootstrap_already_completed = runtime
2156 .db()
2157 .store()
2158 .get_config(BOOTSTRAP_COMPLETED_KEY)
2159 .is_some();
2160 let disposition = crate::cluster::authorize_cluster_bootstrap(
2161 db_options.storage_profile.deploy_profile,
2162 no_auth,
2163 bootstrap.auth_bootstrap_input(),
2164 bootstrap_already_completed,
2165 )?;
2166 let vault_plan = crate::cluster::plan_vault_bootstrap(disposition);
2175 match disposition {
2176 crate::cluster::BootstrapDisposition::SkipDevBypass => {
2177 debug_assert_eq!(vault_plan, crate::cluster::VaultBootstrapPlan::SkipNoVault);
2178 eprintln!("{NO_AUTH_WARNING}");
2179 tracing::warn!("{NO_AUTH_WARNING}");
2180 }
2181 crate::cluster::BootstrapDisposition::AlreadyComplete => {
2182 debug_assert_eq!(
2189 vault_plan,
2190 crate::cluster::VaultBootstrapPlan::OpenClusterGlobalStore {
2191 consume_secret_inputs: false,
2192 }
2193 );
2194 apply_preset_from_config(&runtime, &auth_store, bootstrap)?;
2195 tracing::info!("bootstrap already completed, unsealing existing cluster auth store");
2196 }
2197 crate::cluster::BootstrapDisposition::ProceedLocal => {
2198 debug_assert_eq!(
2201 vault_plan,
2202 crate::cluster::VaultBootstrapPlan::OpenClusterGlobalStore {
2203 consume_secret_inputs: true,
2204 }
2205 );
2206 apply_preset_from_config(&runtime, &auth_store, bootstrap)?;
2207 maybe_apply_policy_break_glass(&auth_store);
2208 }
2209 }
2210
2211 {
2213 let store = Arc::clone(&auth_store);
2214 std::thread::Builder::new()
2215 .name("reddb-session-purge".into())
2216 .spawn(move || loop {
2217 std::thread::sleep(std::time::Duration::from_secs(300));
2218 store.purge_expired_sessions();
2219 })
2220 .ok();
2221 }
2222
2223 Ok((runtime, auth_store, telemetry_guard))
2224}
2225
2226fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
2230 use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
2231
2232 let enabled = std::env::var(BREAK_GLASS_ENV)
2233 .ok()
2234 .map(|v| {
2235 let trimmed = v.trim().to_ascii_lowercase();
2236 matches!(trimmed.as_str(), "1" | "true" | "yes")
2237 })
2238 .unwrap_or(false);
2239 if !enabled {
2240 return;
2241 }
2242 let now = crate::utils::now_unix_millis() as u128;
2243 match auth_store.apply_policy_break_glass(now) {
2244 Ok(()) => {
2245 tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2246 }
2247 Err(err) => {
2248 tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2249 }
2250 }
2251}
2252
2253pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2258pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2259pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2260
2261pub(crate) const BOOTSTRAP_PRESET_ENV: &str = "REDDB_BOOTSTRAP_PRESET";
2264pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2265pub(crate) const PRESET_SIMPLE: &str = "simple";
2266pub(crate) const PRESET_PRODUCTION: &str = "production";
2267pub(crate) const PRESET_REGULATED: &str = "regulated";
2268pub(crate) const PRESET_CLOUD: &str = "cloud";
2269
2270pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2274pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2275pub(crate) const CLOUD_PROTECT_MANAGED_POLICY: &str = "system.cloud.protect-managed";
2276pub(crate) const CLOUD_CONFIG_NAMESPACE: &str = "red.config.cloud";
2277pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2278pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2279pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2280
2281pub(crate) fn apply_preset(
2291 runtime: &RedDBRuntime,
2292 auth_store: &Arc<AuthStore>,
2293) -> Result<(), String> {
2294 let bootstrap = BootstrapConfig::default();
2295 apply_preset_from_config(runtime, auth_store, &bootstrap)
2296}
2297
2298fn apply_preset_from_config(
2299 runtime: &RedDBRuntime,
2300 auth_store: &Arc<AuthStore>,
2301 bootstrap: &BootstrapConfig,
2302) -> Result<(), String> {
2303 let store = runtime.db().store();
2304
2305 if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2306 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2307 runtime,
2308 &runtime.config_registry(),
2309 )?;
2310 tracing::info!("bootstrap state present, skipping preset application");
2311 return Ok(());
2312 }
2313
2314 let preset = bootstrap.resolved_preset();
2315
2316 for var in bootstrap.secret_env_vars_to_expand(&preset) {
2321 crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2322 }
2323
2324 if let Some(path) = bootstrap.selected_manifest() {
2325 let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2326 runtime,
2327 auth_store,
2328 &runtime.config_registry(),
2329 &path,
2330 )?;
2331 persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2332 tracing::info!("bootstrap manifest applied");
2333 return Ok(());
2334 }
2335
2336 let first_admin_id = match preset.as_str() {
2337 PRESET_SIMPLE => {
2338 None
2342 }
2343 PRESET_PRODUCTION => Some(apply_production_preset_from_config(auth_store, bootstrap)?),
2344 PRESET_CLOUD => Some(apply_cloud_preset(runtime, auth_store, bootstrap)?),
2345 PRESET_REGULATED => {
2346 apply_regulated_preset(runtime, auth_store)?;
2347 None
2348 }
2349 other => {
2350 return Err(format!(
2351 "bootstrap preset {other:?} is not recognised (expected `simple`, `production`, `regulated`, or `cloud`)"
2352 ));
2353 }
2354 };
2355
2356 persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2357 tracing::info!(preset = %preset, "bootstrap preset applied");
2358 Ok(())
2359}
2360
2361fn apply_production_preset_from_config(
2362 auth_store: &Arc<AuthStore>,
2363 bootstrap: &BootstrapConfig,
2364) -> Result<String, String> {
2365 use crate::auth::store::PrincipalRef;
2366 use crate::auth::UserId;
2367
2368 let username = bootstrap.production_username().ok_or_else(|| {
2369 "production preset requires --bootstrap-admin or REDDB_USERNAME (or REDDB_USERNAME_FILE)"
2370 .to_string()
2371 })?;
2372 let password = bootstrap.production_password().ok_or_else(|| {
2373 "production preset requires --bootstrap-admin-password or REDDB_PASSWORD (or REDDB_PASSWORD_FILE)"
2374 .to_string()
2375 })?;
2376
2377 let result = auth_store
2381 .bootstrap(&username, &password)
2382 .map_err(|err| format!("bootstrap first admin: {err}"))?;
2383 if let Some(cert) = result.certificate.as_deref() {
2384 eprintln!("[reddb] CERTIFICATE: {}", cert);
2385 tracing::warn!(
2386 "vault certificate issued by REDDB_PRESET=production -- save it and update the runtime secret before restart"
2387 );
2388 }
2389 let first_admin = UserId::platform(result.user.username.clone());
2390
2391 install_allow_all_policy(auth_store)?;
2393
2394 auth_store
2396 .attach_policy(
2397 PrincipalRef::User(first_admin.clone()),
2398 FIRST_ADMIN_ALLOW_ALL_POLICY,
2399 )
2400 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2401
2402 Ok(first_admin.to_string())
2403}
2404
2405fn apply_cloud_preset(
2406 runtime: &RedDBRuntime,
2407 auth_store: &Arc<AuthStore>,
2408 bootstrap: &BootstrapConfig,
2409) -> Result<String, String> {
2410 use crate::auth::store::PrincipalRef;
2411 use crate::auth::{Role, UserId};
2412
2413 let head_username = bootstrap.cloud_head_username().ok_or_else(|| {
2414 "cloud preset requires --cloud-head-admin or REDDB_CLOUD_HEAD_ADMIN (or REDDB_USERNAME)"
2415 .to_string()
2416 })?;
2417 let head_password = bootstrap.cloud_head_password().ok_or_else(|| {
2418 "cloud preset requires --cloud-head-admin-password or REDDB_CLOUD_HEAD_ADMIN_PASSWORD (or REDDB_PASSWORD)"
2419 .to_string()
2420 })?;
2421 let customer_username = bootstrap.customer_username().ok_or_else(|| {
2422 "cloud preset requires --customer-admin or REDDB_CUSTOMER_ADMIN".to_string()
2423 })?;
2424 let customer_password = bootstrap.customer_password().ok_or_else(|| {
2425 "cloud preset requires --customer-admin-password or REDDB_CUSTOMER_ADMIN_PASSWORD"
2426 .to_string()
2427 })?;
2428 if head_username == customer_username {
2429 return Err("cloud preset requires distinct head and customer admin usernames".to_string());
2430 }
2431
2432 let head = auth_store
2433 .bootstrap_system_admin(&head_username, &head_password)
2434 .map_err(|err| format!("bootstrap cloud head admin: {err}"))?;
2435 let head_id = UserId::platform(head.user.username.clone());
2436
2437 auth_store
2438 .create_user(&customer_username, &customer_password, Role::Admin)
2439 .map_err(|err| format!("create cloud customer admin: {err}"))?;
2440 let customer_id = UserId::platform(customer_username.clone());
2441
2442 install_allow_all_policy(auth_store)?;
2443 for user_id in [&head_id, &customer_id] {
2444 auth_store
2445 .attach_policy(
2446 PrincipalRef::User(user_id.clone()),
2447 FIRST_ADMIN_ALLOW_ALL_POLICY,
2448 )
2449 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2450 }
2451 install_cloud_guardrails(runtime, auth_store)?;
2452
2453 Ok(head_id.to_string())
2454}
2455
2456fn install_allow_all_policy(auth_store: &Arc<AuthStore>) -> Result<(), String> {
2457 use crate::auth::policies::Policy;
2458
2459 let policy = Policy::from_json_str(&format!(
2460 r#"{{
2461 "id": "{id}",
2462 "version": 1,
2463 "statements": [{{
2464 "effect": "allow",
2465 "actions": ["*"],
2466 "resources": ["*"]
2467 }}]
2468 }}"#,
2469 id = FIRST_ADMIN_ALLOW_ALL_POLICY
2470 ))
2471 .map_err(|err| format!("compile allow-all policy: {err}"))?;
2472 auth_store
2473 .put_policy(policy)
2474 .map_err(|err| format!("install allow-all policy: {err}"))
2475}
2476
2477fn install_cloud_guardrails(
2478 runtime: &RedDBRuntime,
2479 auth_store: &Arc<AuthStore>,
2480) -> Result<(), String> {
2481 use crate::auth::policies::Policy;
2482 use crate::auth::registry::EvidenceRequirement;
2483
2484 let policy = Policy::from_json_str(&format!(
2485 r#"{{
2486 "id": "{id}",
2487 "version": 1,
2488 "statements": [
2489 {{
2490 "effect": "deny",
2491 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2492 "resources": ["policy:{id}"]
2493 }},
2494 {{
2495 "effect": "deny",
2496 "actions": ["config:write"],
2497 "resources": ["config:{cloud}.*"]
2498 }}
2499 ]
2500 }}"#,
2501 id = CLOUD_PROTECT_MANAGED_POLICY,
2502 cloud = CLOUD_CONFIG_NAMESPACE,
2503 ))
2504 .map_err(|err| format!("compile cloud guardrail policy: {err}"))?;
2505 auth_store
2506 .put_policy(policy)
2507 .map_err(|err| format!("install cloud guardrail policy: {err}"))?;
2508
2509 let now_ms = crate::utils::now_unix_millis() as u128;
2510 let entries = vec![
2511 regulated_registry_entry(
2512 CLOUD_PROTECT_MANAGED_POLICY,
2513 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2514 "policy",
2515 "policy:*",
2516 &format!("policy:{CLOUD_PROTECT_MANAGED_POLICY}"),
2517 EvidenceRequirement::Metadata,
2518 now_ms,
2519 ),
2520 regulated_registry_entry(
2521 CLOUD_CONFIG_NAMESPACE,
2522 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2523 "config_namespace",
2524 "config:write",
2525 &format!("config:{CLOUD_CONFIG_NAMESPACE}.*"),
2526 EvidenceRequirement::Metadata,
2527 now_ms,
2528 ),
2529 ];
2530 for entry in entries.iter().cloned() {
2531 runtime
2532 .config_registry()
2533 .restore_bootstrap_entry(entry)
2534 .map_err(|err| format!("install cloud registry entry: {err}"))?;
2535 }
2536 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2537 Ok(())
2538}
2539
2540fn apply_regulated_preset(
2541 runtime: &RedDBRuntime,
2542 auth_store: &Arc<AuthStore>,
2543) -> Result<(), String> {
2544 use crate::auth::policies::Policy;
2545 use crate::auth::registry::EvidenceRequirement;
2546
2547 runtime.query_audit().enable_infrastructure();
2548
2549 let policy = Policy::from_json_str(&format!(
2550 r#"{{
2551 "id": "{id}",
2552 "version": 1,
2553 "statements": [
2554 {{
2555 "effect": "deny",
2556 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2557 "resources": ["policy:{id}"]
2558 }},
2559 {{
2560 "effect": "deny",
2561 "actions": ["config:write"],
2562 "resources": [
2563 "config:{audit}.*",
2564 "config:{evidence}.*",
2565 "config:{query_audit}.*"
2566 ]
2567 }}
2568 ]
2569 }}"#,
2570 id = REGULATED_PROTECT_MANAGED_POLICY,
2571 audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2572 evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2573 query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2574 ))
2575 .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2576 auth_store
2577 .put_policy(policy)
2578 .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2579
2580 let now_ms = crate::utils::now_unix_millis() as u128;
2581 let entries = vec![
2582 regulated_registry_entry(
2583 REGULATED_PROTECT_MANAGED_POLICY,
2584 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2585 "iam_policy",
2586 "policy:*",
2587 &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2588 EvidenceRequirement::Metadata,
2589 now_ms,
2590 ),
2591 regulated_registry_entry(
2592 REGULATED_AUDIT_CONFIG_NAMESPACE,
2593 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2594 "config_namespace",
2595 "config:write",
2596 &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2597 EvidenceRequirement::Metadata,
2598 now_ms,
2599 ),
2600 regulated_registry_entry(
2601 REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2602 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2603 "config_namespace",
2604 "config:write",
2605 &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2606 EvidenceRequirement::Metadata,
2607 now_ms,
2608 ),
2609 regulated_registry_entry(
2610 REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2611 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2612 "config_namespace",
2613 "config:write",
2614 &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2615 EvidenceRequirement::Metadata,
2616 now_ms,
2617 ),
2618 ];
2619
2620 for entry in entries.iter().cloned() {
2621 runtime
2622 .config_registry()
2623 .restore_bootstrap_entry(entry)
2624 .map_err(|err| format!("install regulated registry entry: {err}"))?;
2625 }
2626 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2627 Ok(())
2628}
2629
2630fn regulated_registry_entry(
2631 id: &str,
2632 resource_type: &str,
2633 schema: &str,
2634 required_action: &str,
2635 required_resource: &str,
2636 evidence_requirement: crate::auth::registry::EvidenceRequirement,
2637 updated_at_ms: u128,
2638) -> crate::auth::registry::ConfigRegistryEntry {
2639 crate::auth::registry::ConfigRegistryEntry {
2640 id: id.to_string(),
2641 version: 1,
2642 resource_type: resource_type.to_string(),
2643 schema: schema.to_string(),
2644 mutability: crate::auth::registry::Mutability::Immutable,
2645 sensitivity: crate::auth::registry::Sensitivity::Internal,
2646 managed: true,
2647 required_action: required_action.to_string(),
2648 required_resource: required_resource.to_string(),
2649 evidence_requirement,
2650 updated_by: "system:regulated-preset".to_string(),
2651 updated_at_ms,
2652 }
2653}
2654
2655fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2656 let store = runtime.db().store();
2657 let mut tree = crate::serde_json::Map::new();
2658 tree.insert(
2659 BOOTSTRAP_COMPLETED_KEY.to_string(),
2660 crate::serde_json::Value::Bool(true),
2661 );
2662 tree.insert(
2663 BOOTSTRAP_PRESET_KEY.to_string(),
2664 crate::serde_json::Value::String(preset.to_string()),
2665 );
2666 if let Some(id) = first_admin_id {
2667 tree.insert(
2668 BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2669 crate::serde_json::Value::String(id.to_string()),
2670 );
2671 }
2672 let json = crate::serde_json::Value::Object(tree);
2673 store.set_config_tree("", &json);
2674}
2675
2676fn merge_telemetry_with_config(
2687 mut cli: crate::telemetry::TelemetryConfig,
2688 runtime: &RedDBRuntime,
2689) -> crate::telemetry::TelemetryConfig {
2690 use crate::storage::schema::Value;
2691
2692 let store = runtime.db().store();
2693
2694 if !cli.level_explicit {
2695 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2696 cli.level_filter = v.to_string();
2697 }
2698 }
2699 if !cli.format_explicit {
2700 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2701 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2702 cli.format = parsed;
2703 }
2704 }
2705 }
2706 if !cli.rotation_keep_days_explicit {
2707 match store.get_config("red.logging.keep_days") {
2708 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2709 cli.rotation_keep_days = n as u16
2710 }
2711 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2712 cli.rotation_keep_days = n as u16
2713 }
2714 Some(Value::Text(v)) => {
2715 if let Ok(n) = v.parse::<u16>() {
2716 cli.rotation_keep_days = n;
2717 }
2718 }
2719 _ => {}
2720 }
2721 }
2722 if !cli.file_prefix_explicit {
2723 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2724 if !v.is_empty() {
2725 cli.file_prefix = v.to_string();
2726 }
2727 }
2728 }
2729 if !cli.log_dir_explicit && !cli.log_file_disabled {
2732 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2733 if !v.is_empty() {
2734 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2735 }
2736 }
2737 }
2738
2739 cli
2740}
2741
2742#[cfg(test)]
2743mod telemetry_merge_tests {
2744 use super::*;
2745 use crate::telemetry::{LogFormat, TelemetryConfig};
2746
2747 fn fresh_runtime() -> RedDBRuntime {
2748 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2749 }
2750
2751 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2752 runtime
2753 .db()
2754 .store()
2755 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2756 }
2757
2758 fn cli_base() -> TelemetryConfig {
2759 TelemetryConfig {
2762 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2763 format: LogFormat::Json,
2764 ..Default::default()
2765 }
2766 }
2767
2768 #[test]
2769 fn config_log_dir_promoted_when_flag_absent() {
2770 let runtime = fresh_runtime();
2771 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2772 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2773 assert_eq!(
2774 merged.log_dir.as_deref(),
2775 Some(std::path::Path::new("/var/log/reddb"))
2776 );
2777 }
2778
2779 #[test]
2780 fn explicit_log_dir_wins_over_config() {
2781 let runtime = fresh_runtime();
2782 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2783 let mut cli = cli_base();
2784 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2785 cli.log_dir_explicit = true;
2786 let merged = merge_telemetry_with_config(cli, &runtime);
2787 assert_eq!(
2788 merged.log_dir.as_deref(),
2789 Some(std::path::Path::new("/custom/dir"))
2790 );
2791 }
2792
2793 #[test]
2794 fn no_log_file_beats_config_log_dir() {
2795 let runtime = fresh_runtime();
2796 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2797 let mut cli = cli_base();
2798 cli.log_dir = None;
2799 cli.log_file_disabled = true;
2800 let merged = merge_telemetry_with_config(cli, &runtime);
2801 assert!(
2802 merged.log_dir.is_none(),
2803 "--no-log-file must veto config dir"
2804 );
2805 }
2806
2807 #[test]
2808 fn config_format_promoted_on_non_tty_default() {
2809 let runtime = fresh_runtime();
2813 set_str(&runtime, "red.logging.format", "pretty");
2814 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2815 assert_eq!(merged.format, LogFormat::Pretty);
2816 }
2817
2818 #[test]
2819 fn explicit_format_wins_over_config() {
2820 let runtime = fresh_runtime();
2821 set_str(&runtime, "red.logging.format", "pretty");
2822 let mut cli = cli_base();
2823 cli.format = LogFormat::Json;
2824 cli.format_explicit = true;
2825 let merged = merge_telemetry_with_config(cli, &runtime);
2826 assert_eq!(merged.format, LogFormat::Json);
2827 }
2828}
2829
2830#[inline(never)]
2831fn build_http_server(
2832 runtime: RedDBRuntime,
2833 auth_store: Arc<AuthStore>,
2834 bind_addr: String,
2835) -> RedDBServer {
2836 build_http_server_with_transport_readiness(
2837 runtime,
2838 auth_store,
2839 bind_addr,
2840 TransportReadiness::default(),
2841 )
2842}
2843
2844fn apply_http_limits(
2850 server: RedDBServer,
2851 config: &ServerCommandConfig,
2852 runtime: &RedDBRuntime,
2853) -> RedDBServer {
2854 let store = runtime.db().store();
2855 let resolved =
2856 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2857 .get_config(key)
2858 {
2859 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2860 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2861 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2862 _ => None,
2863 });
2864 tracing::info!(
2865 target: "reddb::http_limits",
2866 max_handlers = resolved.max_handlers,
2867 handler_timeout_ms = resolved.handler_timeout_ms,
2868 retry_after_secs = resolved.retry_after_secs,
2869 max_inflight_per_principal = resolved.max_inflight_per_principal,
2870 "http_limits resolved"
2871 );
2872 server.with_http_limits(resolved)
2873}
2874
2875fn apply_ui_bundle(
2885 server: RedDBServer,
2886 config: &ServerCommandConfig,
2887) -> Result<RedDBServer, String> {
2888 if !config.ui {
2889 return Ok(server);
2890 }
2891 let ui_dir = match &config.ui_dir {
2892 Some(dir) => dir.clone(),
2893 None => {
2894 let cache_root = crate::server::ui_bundle_resolver::reddb_user_cache_root()
2895 .unwrap_or_else(|_| std::env::temp_dir().join("reddb"));
2896 crate::server::ui_bundle_resolver::resolve_ui_bundle(
2897 &cache_root,
2898 &crate::server::ui_bundle_resolver::HttpFetcher,
2899 )
2900 .map_err(|err| format!("resolve red-ui bundle for --ui: {err}"))?
2901 }
2902 };
2903 tracing::info!(target: "reddb::ui", dir = %ui_dir.display(), "serving red-ui bundle on HTTP surface");
2904 Ok(server.with_ui_dir(ui_dir))
2905}
2906
2907#[inline(never)]
2908fn build_http_server_with_transport_readiness(
2909 runtime: RedDBRuntime,
2910 auth_store: Arc<AuthStore>,
2911 bind_addr: String,
2912 transport_readiness: TransportReadiness,
2913) -> RedDBServer {
2914 RedDBServer::with_options(
2915 runtime,
2916 ServerOptions {
2917 bind_addr,
2918 transport_readiness,
2919 ..ServerOptions::default()
2920 },
2921 )
2922 .with_auth(auth_store)
2923}
2924
2925#[inline(never)]
2929fn build_admin_only_server(
2930 runtime: RedDBRuntime,
2931 auth_store: Arc<AuthStore>,
2932 bind_addr: String,
2933) -> RedDBServer {
2934 RedDBServer::with_options(
2935 runtime,
2936 ServerOptions {
2937 bind_addr,
2938 surface: crate::server::ServerSurface::AdminOnly,
2939 ..ServerOptions::default()
2940 },
2941 )
2942 .with_auth(auth_store)
2943}
2944
2945#[inline(never)]
2949fn build_metrics_only_server(
2950 runtime: RedDBRuntime,
2951 auth_store: Arc<AuthStore>,
2952 bind_addr: String,
2953) -> RedDBServer {
2954 RedDBServer::with_options(
2955 runtime,
2956 ServerOptions {
2957 bind_addr,
2958 surface: crate::server::ServerSurface::MetricsOnly,
2959 ..ServerOptions::default()
2960 },
2961 )
2962 .with_auth(auth_store)
2963}
2964
2965fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2969 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2970 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2971 let _ = server.serve_in_background();
2972 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2973 }
2974 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2975 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2976 let _ = server.serve_in_background();
2977 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2978 }
2979}
2980
2981#[inline(never)]
2982fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2983 let mut transport_readiness = TransportReadiness::default();
2984 let Some(listener) = bind_listener_for_startup(
2985 &mut transport_readiness,
2986 "http",
2987 &bind_addr,
2988 config.http_bind_explicit,
2989 )?
2990 else {
2991 return Err(format!(
2992 "no HTTP listener started; implicit bind {} failed",
2993 bind_addr
2994 ));
2995 };
2996 let db_options = config.to_db_options()?;
2997 let (runtime, auth_store, _telemetry_guard) =
2998 build_runtime_and_auth_store(&config, db_options.clone())?;
2999 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3000 spawn_admin_metrics_listeners(&runtime, &auth_store);
3001 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
3002 let server = build_http_server_with_transport_readiness(
3003 runtime.clone(),
3004 auth_store,
3005 bind_addr.clone(),
3006 transport_readiness,
3007 );
3008 let server = apply_http_limits(server, &config, &runtime);
3009 let server = apply_ui_bundle(server, &config)?;
3010 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
3011 server.serve_on(listener).map_err(|err| err.to_string())
3012}
3013
3014fn spawn_http_tls_listener(
3020 config: &ServerCommandConfig,
3021 runtime: &RedDBRuntime,
3022 auth_store: &Arc<AuthStore>,
3023) -> Result<(), String> {
3024 let Some(addr) = config.http_tls_bind_addr.clone() else {
3025 return Ok(());
3026 };
3027
3028 let tls_config = resolve_http_tls_config(config)?;
3029 let server_config = crate::server::tls::build_server_config(&tls_config)
3030 .map_err(|err| format!("HTTP TLS: {err}"))?;
3031
3032 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
3033 let server = apply_http_limits(server, config, runtime);
3034 let _handle = server.serve_tls_in_background(server_config);
3035 tracing::info!(
3036 transport = "https",
3037 bind = %addr,
3038 mtls = %tls_config.client_ca_path.is_some(),
3039 "TLS listener online"
3040 );
3041 Ok(())
3042}
3043
3044fn resolve_http_tls_config(
3046 config: &ServerCommandConfig,
3047) -> Result<crate::server::tls::HttpTlsConfig, String> {
3048 match (&config.http_tls_cert, &config.http_tls_key) {
3049 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
3050 cert_path: cert.clone(),
3051 key_path: key.clone(),
3052 client_ca_path: config.http_tls_client_ca.clone(),
3053 }),
3054 (None, None) => {
3055 let dir = config
3057 .path
3058 .as_ref()
3059 .and_then(|p| p.parent().map(std::path::PathBuf::from))
3060 .unwrap_or_else(|| std::path::PathBuf::from("."));
3061 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
3062 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
3063 Ok(crate::server::tls::HttpTlsConfig {
3064 cert_path: auto.cert_path,
3065 key_path: auto.key_path,
3066 client_ca_path: config.http_tls_client_ca.clone(),
3067 })
3068 }
3069 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
3070 }
3071}
3072
3073#[inline(never)]
3074fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
3075 let workers = config.workers;
3076 let db_options = config.to_db_options()?;
3077 let rt_config = detect_runtime_config();
3078 let mut transport_readiness = TransportReadiness::default();
3079 let Some(grpc_listener) = bind_listener_for_startup(
3080 &mut transport_readiness,
3081 "grpc",
3082 &bind_addr,
3083 config.grpc_bind_explicit,
3084 )?
3085 else {
3086 return Err(format!(
3087 "no gRPC listener started; implicit bind {} failed",
3088 bind_addr
3089 ));
3090 };
3091
3092 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
3093
3094 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
3095 .enable_all()
3096 .worker_threads(worker_threads)
3097 .thread_stack_size(rt_config.stack_size)
3098 .build()
3099 .map_err(|err| format!("tokio runtime: {err}"))?;
3100
3101 let (runtime, auth_store, _telemetry_guard) =
3103 build_runtime_and_auth_store(&config, db_options.clone())?;
3104 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3105 let signal_runtime = runtime.clone();
3106 tokio_runtime.block_on(async move {
3107 spawn_lifecycle_signal_handler(signal_runtime).await;
3108 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
3110
3111 spawn_pg_listener(&config, &runtime);
3113
3114 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
3118
3119 let server = RedDBGrpcServer::with_options(
3120 runtime,
3121 GrpcServerOptions {
3122 bind_addr: bind_addr.clone(),
3123 tls: None,
3124 },
3125 auth_store,
3126 );
3127
3128 tracing::info!(
3129 transport = "grpc",
3130 bind = %bind_addr,
3131 cpus = rt_config.available_cpus,
3132 workers = worker_threads,
3133 "listener online"
3134 );
3135 server
3136 .serve_on(grpc_listener)
3137 .await
3138 .map_err(|err| err.to_string())
3139 })
3140}
3141
3142#[inline(never)]
3143fn run_dual_server(
3144 config: ServerCommandConfig,
3145 grpc_bind_addr: String,
3146 http_bind_addr: String,
3147) -> Result<(), String> {
3148 let workers = config.workers;
3149 let db_options = config.to_db_options()?;
3150 let rt_config = detect_runtime_config();
3151 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
3152 let mut transport_readiness = TransportReadiness::default();
3153 let http_listener = bind_listener_for_startup(
3154 &mut transport_readiness,
3155 "http",
3156 &http_bind_addr,
3157 config.http_bind_explicit,
3158 )?;
3159 let grpc_listener = bind_listener_for_startup(
3160 &mut transport_readiness,
3161 "grpc",
3162 &grpc_bind_addr,
3163 config.grpc_bind_explicit,
3164 )?;
3165 if http_listener.is_none() && grpc_listener.is_none() {
3166 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
3167 }
3168 let (runtime, auth_store, _telemetry_guard) =
3169 build_runtime_and_auth_store(&config, db_options.clone())?;
3170 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3171
3172 spawn_admin_metrics_listeners(&runtime, &auth_store);
3173 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
3174
3175 let http_handle = if let Some(listener) = http_listener {
3176 let http_server = build_http_server_with_transport_readiness(
3177 runtime.clone(),
3178 auth_store.clone(),
3179 http_bind_addr.clone(),
3180 transport_readiness.clone(),
3181 );
3182 let http_server = apply_http_limits(http_server, &config, &runtime);
3183 let http_server = apply_ui_bundle(http_server, &config)?;
3184 Some(http_server.serve_in_background_on(listener))
3185 } else {
3186 None
3187 };
3188
3189 thread::sleep(Duration::from_millis(150));
3190 if let Some(handle) = http_handle.as_ref() {
3191 if handle.is_finished() {
3192 let handle = http_handle.unwrap();
3193 return match handle.join() {
3194 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
3195 Ok(Err(err)) => Err(err.to_string()),
3196 Err(_) => Err("HTTP server thread panicked".to_string()),
3197 };
3198 }
3199 }
3200 if grpc_listener.is_none() {
3201 let Some(handle) = http_handle else {
3202 return Err("no listener started".to_string());
3203 };
3204 return match handle.join() {
3205 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
3206 Ok(Err(err)) => Err(err.to_string()),
3207 Err(_) => Err("HTTP server thread panicked".to_string()),
3208 };
3209 }
3210 let grpc_listener = grpc_listener.expect("checked above");
3211
3212 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
3213 .enable_all()
3214 .worker_threads(worker_threads)
3215 .thread_stack_size(rt_config.stack_size)
3216 .build()
3217 .map_err(|err| format!("tokio runtime: {err}"))?;
3218
3219 let signal_runtime = runtime.clone();
3220 tokio_runtime.block_on(async move {
3221 spawn_lifecycle_signal_handler(signal_runtime).await;
3222 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
3224
3225 spawn_pg_listener(&config, &runtime);
3227
3228 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
3230
3231 let server = RedDBGrpcServer::with_options(
3232 runtime,
3233 GrpcServerOptions {
3234 bind_addr: grpc_bind_addr.clone(),
3235 tls: None,
3236 },
3237 auth_store,
3238 );
3239
3240 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
3241 tracing::info!(
3242 transport = "grpc",
3243 bind = %grpc_bind_addr,
3244 cpus = rt_config.available_cpus,
3245 workers = worker_threads,
3246 "listener online"
3247 );
3248 server
3249 .serve_on(grpc_listener)
3250 .await
3251 .map_err(|err| err.to_string())
3252 })
3253}
3254
3255#[cfg(test)]
3256mod tests {
3257 use super::*;
3258
3259 #[test]
3260 fn render_systemd_unit_contains_expected_execstart() {
3261 let config = SystemdServiceConfig {
3262 service_name: "reddb".to_string(),
3263 binary_path: PathBuf::from("/usr/local/bin/red"),
3264 run_user: "reddb".to_string(),
3265 run_group: "reddb".to_string(),
3266 data_path: reddb_file::default_service_database_path(),
3267 router_bind_addr: None,
3268 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
3269 http_bind_addr: None,
3270 };
3271
3272 let unit = render_systemd_unit(&config);
3273 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
3274 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
3275 }
3276
3277 #[test]
3278 fn systemd_service_config_derives_paths() {
3279 let config = SystemdServiceConfig {
3280 service_name: "reddb-api".to_string(),
3281 binary_path: PathBuf::from("/usr/local/bin/red"),
3282 run_user: "reddb".to_string(),
3283 run_group: "reddb".to_string(),
3284 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
3285 router_bind_addr: None,
3286 grpc_bind_addr: None,
3287 http_bind_addr: Some("127.0.0.1:5055".to_string()),
3288 };
3289
3290 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
3291 assert_eq!(
3292 config.unit_path(),
3293 PathBuf::from("/etc/systemd/system/reddb-api.service")
3294 );
3295 }
3296
3297 #[test]
3298 fn render_systemd_unit_supports_dual_transport() {
3299 let config = SystemdServiceConfig {
3300 service_name: "reddb".to_string(),
3301 binary_path: PathBuf::from("/usr/local/bin/red"),
3302 run_user: "reddb".to_string(),
3303 run_group: "reddb".to_string(),
3304 data_path: reddb_file::default_service_database_path(),
3305 router_bind_addr: None,
3306 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
3307 http_bind_addr: Some("0.0.0.0:5055".to_string()),
3308 };
3309
3310 let unit = render_systemd_unit(&config);
3311 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
3312 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
3313 }
3314
3315 #[test]
3316 fn render_systemd_unit_supports_router_mode() {
3317 let config = SystemdServiceConfig {
3318 service_name: "reddb".to_string(),
3319 binary_path: PathBuf::from("/usr/local/bin/red"),
3320 run_user: "reddb".to_string(),
3321 run_group: "reddb".to_string(),
3322 data_path: reddb_file::default_service_database_path(),
3323 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3324 grpc_bind_addr: None,
3325 http_bind_addr: None,
3326 };
3327
3328 let unit = render_systemd_unit(&config);
3329 assert!(unit.contains("--bind 127.0.0.1:5050"));
3330 assert!(!unit.contains("--grpc-bind"));
3331 assert!(!unit.contains("--http-bind"));
3332 }
3333
3334 #[test]
3335 fn explicit_bind_collision_is_fatal() {
3336 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3337 let addr = held.local_addr().expect("held addr").to_string();
3338 let mut readiness = TransportReadiness::default();
3339
3340 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
3341
3342 assert!(error.contains("explicit http listener bind"));
3343 assert_eq!(readiness.active.len(), 0);
3344 assert_eq!(readiness.failed.len(), 1);
3345 assert!(readiness.failed[0].explicit);
3346 assert_eq!(readiness.failed[0].bind_addr, addr);
3347 }
3348
3349 fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
3356 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
3357 LOCK.get_or_init(|| std::sync::Mutex::new(()))
3358 }
3359
3360 fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
3361 ServerCommandConfig {
3362 path: None,
3363 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3364 router_bind_explicit: false,
3365 grpc_bind_addr: None,
3366 grpc_bind_explicit: false,
3367 grpc_tls_bind_addr: None,
3368 grpc_tls_cert: None,
3369 grpc_tls_key: None,
3370 grpc_tls_client_ca: None,
3371 http_bind_addr: None,
3372 http_bind_explicit: false,
3373 http_tls_bind_addr: None,
3374 http_tls_cert: None,
3375 http_tls_key: None,
3376 http_tls_client_ca: None,
3377 wire_bind_addr: None,
3378 wire_bind_explicit: false,
3379 wire_tls_bind_addr: None,
3380 wire_tls_cert: None,
3381 wire_tls_key: None,
3382 pg_bind_addr: None,
3383 create_if_missing: true,
3384 read_only: false,
3385 role: "standalone".to_string(),
3386 primary_addr: None,
3387 storage_profile: StorageProfileSelection::embedded_single_file(),
3388 auth: false,
3389 require_auth: false,
3390 vault: true,
3393 no_auth,
3394 workers: None,
3395 telemetry: None,
3396 http_limits_cli: crate::server::HttpLimitsCliInput::default(),
3397 ui: false,
3398 ui_dir: None,
3399 bootstrap: BootstrapConfig::default(),
3400 }
3401 }
3402
3403 #[test]
3404 fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
3405 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3406 unsafe {
3411 std::env::set_var("REDDB_USERNAME", "admin");
3412 std::env::set_var("REDDB_PASSWORD", "hunter2");
3413 }
3414 let config = no_auth_test_config(true);
3415 let options = config.to_db_options().expect("to_db_options");
3416
3417 assert!(no_auth_active(&options), "metadata should be stamped");
3418 assert!(!options.auth.enabled, "auth.enabled must be forced off");
3419 assert!(
3420 !options.auth.require_auth,
3421 "require_auth must be forced off"
3422 );
3423 assert!(
3424 !options.auth.vault_enabled,
3425 "vault_enabled must be forced off (overrides --vault)"
3426 );
3427 assert_eq!(
3428 options.metadata.get(NO_AUTH_META).map(String::as_str),
3429 Some("true"),
3430 );
3431
3432 unsafe {
3434 std::env::remove_var("REDDB_USERNAME");
3435 std::env::remove_var("REDDB_PASSWORD");
3436 }
3437 }
3438
3439 #[test]
3440 fn default_behaviour_without_no_auth_flag_is_unchanged() {
3441 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3442 let config = no_auth_test_config(false);
3443 let options = config.to_db_options().expect("to_db_options");
3444
3445 assert!(
3446 !no_auth_active(&options),
3447 "default boot must not be marked no-auth"
3448 );
3449 assert!(
3450 options.metadata.get(NO_AUTH_META).is_none(),
3451 "metadata key must be absent when flag is off"
3452 );
3453 assert!(options.auth.vault_enabled);
3455 }
3456
3457 #[test]
3458 fn no_auth_active_blocks_bootstrap_from_env() {
3459 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3460 unsafe {
3465 std::env::set_var("REDDB_USERNAME", "admin");
3466 std::env::set_var("REDDB_PASSWORD", "hunter2");
3467 }
3468
3469 let options = no_auth_test_config(true)
3470 .to_db_options()
3471 .expect("to_db_options");
3472
3473 let auth_store = AuthStore::new(options.auth.clone());
3477 if !no_auth_active(&options) {
3478 auth_store.bootstrap_from_env();
3479 }
3480
3481 assert!(
3482 auth_store.needs_bootstrap(),
3483 "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3484 );
3485
3486 unsafe {
3488 std::env::remove_var("REDDB_USERNAME");
3489 std::env::remove_var("REDDB_PASSWORD");
3490 }
3491 }
3492
3493 fn clear_preset_env() {
3500 unsafe {
3502 std::env::remove_var(BOOTSTRAP_PRESET_ENV);
3503 std::env::remove_var(PRESET_ENV);
3504 std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3505 std::env::remove_var("REDDB_AUTH");
3506 std::env::remove_var("REDDB_REQUIRE_AUTH");
3507 std::env::remove_var("REDDB_NO_AUTH");
3508 std::env::remove_var("REDDB_DEV");
3509 std::env::remove_var("REDDB_VAULT");
3510 std::env::remove_var("REDDB_USERNAME");
3511 std::env::remove_var("REDDB_PASSWORD");
3512 std::env::remove_var("REDDB_USERNAME_FILE");
3513 std::env::remove_var("REDDB_PASSWORD_FILE");
3514 std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN");
3515 std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD");
3516 std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD_FILE");
3517 std::env::remove_var("REDDB_CUSTOMER_ADMIN");
3518 std::env::remove_var("REDDB_CUSTOMER_ADMIN_PASSWORD");
3519 std::env::remove_var("REDDB_CUSTOMER_ADMIN_PASSWORD_FILE");
3520 }
3521 }
3522
3523 fn clear_backup_env() {
3524 unsafe {
3526 std::env::remove_var("REDDB_BACKUP_S3_ENDPOINT");
3527 std::env::remove_var("REDDB_BACKUP_S3_BUCKET");
3528 std::env::remove_var("REDDB_BACKUP_S3_PREFIX");
3529 std::env::remove_var("REDDB_BACKUP_S3_ACCESS_KEY_ID");
3530 std::env::remove_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY");
3531 std::env::remove_var("REDDB_BACKUP_S3_REGION");
3532 std::env::remove_var("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS");
3533 std::env::remove_var("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS");
3534 std::env::remove_var("REDDB_BACKUP_PAUSE_ON_LAG_SECS");
3535 }
3536 }
3537
3538 fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3539 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3540 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3541 (runtime, auth_store)
3542 }
3543
3544 #[test]
3545 fn auth_env_knobs_enable_auth_require_auth_and_vault() {
3546 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3547 clear_preset_env();
3548 unsafe {
3550 std::env::set_var("REDDB_AUTH", "true");
3551 std::env::set_var("REDDB_REQUIRE_AUTH", "true");
3552 std::env::set_var("REDDB_VAULT", "true");
3553 }
3554
3555 let mut config = no_auth_test_config(false);
3556 config.vault = false;
3557 let options = config.to_db_options().expect("to_db_options");
3558
3559 assert!(options.auth.enabled);
3560 assert!(options.auth.require_auth);
3561 assert!(options.auth.vault_enabled);
3562
3563 clear_preset_env();
3564 }
3565
3566 #[test]
3567 fn production_and_cloud_presets_force_auth_require_auth_and_vault() {
3568 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3569 clear_preset_env();
3570
3571 for preset in [PRESET_PRODUCTION, PRESET_CLOUD] {
3572 let mut config = no_auth_test_config(false);
3573 config.vault = false;
3574 config.bootstrap.preset = Some(preset.to_string());
3575
3576 let options = config.to_db_options().expect("to_db_options");
3577 assert!(options.auth.enabled, "{preset} should enable auth");
3578 assert!(
3579 options.auth.require_auth,
3580 "{preset} should require authenticated requests"
3581 );
3582 assert!(options.auth.vault_enabled, "{preset} should enable vault");
3583 assert!(!no_auth_active(&options));
3584 }
3585
3586 clear_preset_env();
3587 }
3588
3589 #[test]
3590 fn no_auth_env_overrides_preset_forced_auth() {
3591 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3592 clear_preset_env();
3593 unsafe {
3595 std::env::set_var("REDDB_NO_AUTH", "true");
3596 std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_CLOUD);
3597 }
3598
3599 let mut config = no_auth_test_config(false);
3600 config.auth = true;
3601 config.require_auth = true;
3602 let options = config.to_db_options().expect("to_db_options");
3603
3604 assert!(no_auth_active(&options));
3605 assert!(!options.auth.enabled);
3606 assert!(!options.auth.require_auth);
3607 assert!(!options.auth.vault_enabled);
3608
3609 clear_preset_env();
3610 }
3611
3612 #[test]
3613 fn simple_preset_is_default_and_persists_state() {
3614 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3615 clear_preset_env();
3616
3617 let (runtime, auth_store) = fresh_runtime_and_store();
3618 apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3619
3620 assert!(
3622 auth_store.needs_bootstrap(),
3623 "simple preset must not create an admin"
3624 );
3625
3626 let store = runtime.db().store();
3628 let completed = store
3629 .get_config(BOOTSTRAP_COMPLETED_KEY)
3630 .expect("completed key persisted");
3631 assert!(matches!(
3632 completed,
3633 crate::storage::schema::Value::Boolean(true)
3634 ));
3635 let preset = store
3636 .get_config(BOOTSTRAP_PRESET_KEY)
3637 .expect("preset key persisted");
3638 match preset {
3639 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3640 other => panic!("expected Text(simple), got {other:?}"),
3641 }
3642 assert!(
3643 store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3644 "simple preset must not record a first admin"
3645 );
3646
3647 clear_preset_env();
3648 }
3649
3650 #[test]
3651 fn production_preset_creates_first_admin_with_allow_all_policy() {
3652 use crate::auth::policies::{EvalContext, ResourceRef};
3653 use crate::auth::UserId;
3654
3655 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3656 clear_preset_env();
3657 unsafe {
3659 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3660 std::env::set_var("REDDB_USERNAME", "ops");
3661 std::env::set_var("REDDB_PASSWORD", "hunter2");
3662 }
3663
3664 let (runtime, auth_store) = fresh_runtime_and_store();
3665 apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3666
3667 assert!(
3669 !auth_store.needs_bootstrap(),
3670 "production preset must seal bootstrap"
3671 );
3672 let users = auth_store.list_users();
3673 assert_eq!(users.len(), 1);
3674 let admin = &users[0];
3675 assert_eq!(admin.username, "ops");
3676 assert!(
3677 admin.tenant_id.is_none(),
3678 "first admin must be platform-scoped (tenant=None)"
3679 );
3680
3681 let policy = auth_store
3683 .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3684 .expect("allow-all policy installed");
3685 assert!(!policy.statements.is_empty());
3686
3687 let actor = UserId::platform("ops");
3690 let ctx = EvalContext {
3691 principal_tenant: None,
3692 current_tenant: None,
3693 peer_ip: None,
3694 mfa_present: false,
3695 now_ms: 1_700_000_000_000,
3696 principal_is_admin_role: true,
3697 principal_is_platform_scoped: true,
3698 };
3699 let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3700 assert!(
3701 auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3702 "allow-all policy must grant arbitrary actions via the evaluator"
3703 );
3704
3705 let store = runtime.db().store();
3707 match store
3708 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3709 .expect("first_admin_id persisted")
3710 {
3711 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3712 other => panic!("expected Text(ops), got {other:?}"),
3713 }
3714 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3715 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3716 other => panic!("expected Text(production), got {other:?}"),
3717 }
3718
3719 clear_preset_env();
3720 }
3721
3722 #[test]
3723 fn bootstrap_preset_env_takes_precedence_over_legacy_preset_env() {
3724 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3725 clear_preset_env();
3726 unsafe {
3728 std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_REGULATED);
3729 std::env::set_var(PRESET_ENV, PRESET_SIMPLE);
3730 }
3731
3732 let options = no_auth_test_config(false)
3733 .to_db_options()
3734 .expect("regulated options");
3735 assert!(
3736 options.control_events.compliance_mode,
3737 "canonical REDDB_BOOTSTRAP_PRESET should win over REDDB_PRESET"
3738 );
3739 assert!(options.query_audit.enabled);
3740
3741 clear_preset_env();
3742 }
3743
3744 #[test]
3745 fn cloud_preset_creates_head_and_customer_admins() {
3746 use crate::auth::policies::{EvalContext, ResourceRef};
3747 use crate::auth::UserId;
3748
3749 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3750 clear_preset_env();
3751 unsafe {
3753 std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_CLOUD);
3754 std::env::set_var("REDDB_CLOUD_HEAD_ADMIN", "head");
3755 std::env::set_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD", "head-pass");
3756 std::env::set_var("REDDB_CUSTOMER_ADMIN", "customer");
3757 std::env::set_var("REDDB_CUSTOMER_ADMIN_PASSWORD", "customer-pass");
3758 }
3759
3760 let (runtime, auth_store) = fresh_runtime_and_store();
3761 apply_preset(&runtime, &auth_store).expect("cloud preset applies cleanly");
3762
3763 let head = auth_store
3764 .get_user(None, "head")
3765 .expect("head admin should exist");
3766 assert_eq!(head.tenant_id, None);
3767 let customer = auth_store
3768 .get_user(None, "customer")
3769 .expect("customer admin should exist");
3770 assert_eq!(customer.tenant_id, None);
3771
3772 let ctx = EvalContext {
3773 principal_tenant: None,
3774 current_tenant: None,
3775 peer_ip: None,
3776 mfa_present: false,
3777 now_ms: 1_700_000_000_000,
3778 principal_is_admin_role: true,
3779 principal_is_platform_scoped: true,
3780 };
3781 assert!(auth_store.check_policy_authz(
3782 &UserId::platform("customer"),
3783 "config:write",
3784 &ResourceRef::new("config", "red.config.customer.enabled"),
3785 &ctx,
3786 ));
3787
3788 assert!(auth_store.get_user(None, "head").is_some());
3789 assert!(auth_store
3790 .get_policy(CLOUD_PROTECT_MANAGED_POLICY)
3791 .is_some());
3792 assert!(runtime
3793 .config_registry()
3794 .get_active(CLOUD_CONFIG_NAMESPACE)
3795 .is_some());
3796
3797 let store = runtime.db().store();
3798 match store
3799 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3800 .expect("head admin id persisted")
3801 {
3802 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "head"),
3803 other => panic!("expected Text(head), got {other:?}"),
3804 }
3805 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3806 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_CLOUD),
3807 other => panic!("expected Text(cloud), got {other:?}"),
3808 }
3809
3810 clear_preset_env();
3811 }
3812
3813 #[test]
3814 fn cloud_preset_cli_admin_alias_wins_over_cloud_env_password() {
3815 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3816 clear_preset_env();
3817 unsafe {
3819 std::env::set_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD", "env-head-pass");
3820 }
3821
3822 let bootstrap = BootstrapConfig {
3823 preset: Some(PRESET_CLOUD.to_string()),
3824 admin_username: Some("head".to_string()),
3825 admin_password: Some("cli-head-pass".to_string()),
3826 customer_admin: Some("customer".to_string()),
3827 customer_admin_password: Some("customer-pass".to_string()),
3828 ..BootstrapConfig::default()
3829 };
3830 let (runtime, auth_store) = fresh_runtime_and_store();
3831 apply_preset_from_config(&runtime, &auth_store, &bootstrap)
3832 .expect("cloud preset applies cleanly");
3833
3834 auth_store
3835 .authenticate("head", "cli-head-pass")
3836 .expect("CLI alias password should win");
3837 assert!(
3838 auth_store.authenticate("head", "env-head-pass").is_err(),
3839 "cloud-specific env password must not beat CLI alias"
3840 );
3841
3842 clear_preset_env();
3843 }
3844
3845 #[test]
3846 fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3847 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3848 clear_preset_env();
3849 unsafe {
3851 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3852 }
3853
3854 let (runtime, auth_store) = fresh_runtime_and_store();
3855 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3856
3857 assert!(runtime.query_audit().is_enabled());
3858 assert!(runtime.query_audit().rules().is_empty());
3859 assert!(
3860 runtime
3861 .db()
3862 .store()
3863 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3864 .is_some(),
3865 "regulated preset should create the query-audit stream"
3866 );
3867
3868 runtime
3869 .execute_query("CREATE TABLE docs (id INT)")
3870 .expect("create table");
3871 runtime
3872 .execute_query("INSERT INTO docs (id) VALUES (1)")
3873 .expect("insert");
3874 runtime.execute_query("SELECT * FROM docs").expect("select");
3875 let rows = runtime
3876 .db()
3877 .store()
3878 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3879 .expect("query audit collection")
3880 .query_all(|_| true);
3881 assert!(
3882 rows.is_empty(),
3883 "regulated preset must not globally audit every query"
3884 );
3885
3886 clear_preset_env();
3887 }
3888
3889 #[test]
3890 fn managed_backup_env_rejects_primary_replica_single_file_storage() {
3891 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3892 clear_backup_env();
3893 unsafe {
3895 std::env::set_var("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.test");
3896 std::env::set_var("REDDB_BACKUP_S3_BUCKET", "reddb");
3897 std::env::set_var("REDDB_BACKUP_S3_PREFIX", "clusters/prod");
3898 std::env::set_var("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK");
3899 std::env::set_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK");
3900 }
3901
3902 let mut config = no_auth_test_config(false);
3903 config.role = "primary".to_string();
3904 config.storage_profile = crate::storage::StorageDeployPreset::PrimaryReplicaDev.selection();
3905
3906 let err = config.to_db_options().unwrap_err();
3907 assert!(err.contains("managed backup"), "got: {err}");
3908 assert!(err.contains("operational-directory"), "got: {err}");
3909
3910 clear_backup_env();
3911 }
3912
3913 #[test]
3914 fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3915 use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3916 use crate::auth::store::PrincipalRef;
3917 use crate::auth::{Role, UserId};
3918 use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3919 use crate::storage::schema::Value;
3920
3921 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3922 clear_preset_env();
3923 unsafe {
3925 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3926 }
3927
3928 let options = no_auth_test_config(false)
3929 .to_db_options()
3930 .expect("regulated options");
3931 assert!(
3932 options.control_events.compliance_mode,
3933 "regulated preset must enable fail-closed control evidence before runtime boot"
3934 );
3935 assert!(
3936 options.query_audit.enabled && options.query_audit.rules.is_empty(),
3937 "regulated preset must enable query-audit infrastructure without global rules"
3938 );
3939
3940 let runtime = RedDBRuntime::with_options(options).expect("runtime");
3941 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3942 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3943 runtime.set_auth_store(Arc::clone(&auth_store));
3944
3945 assert!(runtime.control_events_require_persistence());
3946 assert!(runtime.query_audit().is_enabled());
3947 assert!(runtime.query_audit().rules().is_empty());
3948 assert!(auth_store
3949 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3950 .is_some());
3951
3952 let managed_policy = runtime
3953 .config_registry()
3954 .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3955 .expect("regulated managed policy registry entry");
3956 assert!(managed_policy.managed);
3957 assert_eq!(managed_policy.resource_type, "policy");
3958 assert!(
3959 runtime
3960 .config_registry()
3961 .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3962 .expect("regulated audit config namespace")
3963 .managed
3964 );
3965
3966 let registry_rows = runtime
3967 .execute_query(&format!(
3968 "SELECT id, managed FROM red.registry WHERE id = '{}'",
3969 REGULATED_PROTECT_MANAGED_POLICY
3970 ))
3971 .expect("red.registry query");
3972 assert_eq!(registry_rows.result.records.len(), 1);
3973 assert_eq!(
3974 registry_rows.result.records[0].get("managed"),
3975 Some(&Value::Boolean(true))
3976 );
3977
3978 let managed_policy_rows = runtime
3979 .execute_query(&format!(
3980 "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3981 REGULATED_PROTECT_MANAGED_POLICY
3982 ))
3983 .expect("red.managed_policies query");
3984 assert_eq!(managed_policy_rows.result.records.len(), 1);
3985
3986 let capability_rows = runtime
3987 .execute_query(
3988 "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3989 )
3990 .expect("red.control_capabilities query");
3991 assert_eq!(capability_rows.result.records.len(), 1);
3992
3993 auth_store
3994 .create_user("alice", "p", Role::Admin)
3995 .expect("create ordinary admin");
3996 let allow_all = Policy::from_json_str(
3997 r#"{
3998 "id": "alice-allow-all",
3999 "version": 1,
4000 "statements": [{
4001 "effect": "allow",
4002 "actions": ["*"],
4003 "resources": ["*"]
4004 }]
4005 }"#,
4006 )
4007 .expect("allow-all policy");
4008 auth_store.put_policy(allow_all).expect("install allow-all");
4009 auth_store
4010 .attach_policy(
4011 PrincipalRef::User(UserId::platform("alice")),
4012 "alice-allow-all",
4013 )
4014 .expect("attach allow-all");
4015 let ctx = EvalContext {
4016 principal_tenant: None,
4017 current_tenant: None,
4018 peer_ip: None,
4019 mfa_present: false,
4020 now_ms: 1_700_000_000_000,
4021 principal_is_admin_role: true,
4022 principal_is_platform_scoped: true,
4023 };
4024 assert!(
4025 auth_store.check_policy_authz(
4026 &UserId::platform("alice"),
4027 "policy:drop",
4028 &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
4029 &ctx,
4030 ),
4031 "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
4032 );
4033
4034 set_current_auth_identity("alice".to_string(), Role::Admin);
4035 let denied = runtime.execute_query(&format!(
4036 "DROP POLICY '{}'",
4037 REGULATED_PROTECT_MANAGED_POLICY
4038 ));
4039 clear_current_auth_identity();
4040 let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
4041 assert!(
4042 err.to_string().contains("managed policy"),
4043 "error should name the managed guardrail: {err}"
4044 );
4045 assert!(
4046 auth_store
4047 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
4048 .is_some(),
4049 "denied mutation must leave managed policy installed"
4050 );
4051
4052 let denied_events = runtime
4053 .execute_query(&format!(
4054 "SELECT action, resource, outcome FROM red.control_events \
4055 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
4056 REGULATED_PROTECT_MANAGED_POLICY
4057 ))
4058 .expect("red.control_events denied policy drop");
4059 assert_eq!(denied_events.result.records.len(), 1);
4060 assert_eq!(
4061 denied_events.result.records[0].get("outcome"),
4062 Some(&Value::text("denied"))
4063 );
4064
4065 set_current_auth_identity("alice".to_string(), Role::Admin);
4066 let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
4067 clear_current_auth_identity();
4068 let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
4069 assert!(
4070 err.to_string().contains("managed config"),
4071 "error should name the managed config guardrail: {err}"
4072 );
4073
4074 let denied_config_events = runtime
4075 .execute_query(
4076 "SELECT action, resource, outcome FROM red.control_events \
4077 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
4078 )
4079 .expect("red.control_events denied config write");
4080 assert_eq!(denied_config_events.result.records.len(), 1);
4081 assert_eq!(
4082 denied_config_events.result.records[0].get("outcome"),
4083 Some(&Value::text("denied"))
4084 );
4085
4086 runtime
4087 .execute_query("CREATE TABLE regulated_docs (id INT)")
4088 .expect("create user table");
4089 runtime
4090 .execute_query("SELECT * FROM regulated_docs")
4091 .expect("select user table");
4092 let audit_rows = runtime
4093 .db()
4094 .store()
4095 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
4096 .expect("query audit collection")
4097 .query_all(|_| true);
4098 assert!(
4099 audit_rows.is_empty(),
4100 "regulated preset must not globally audit data-plane queries"
4101 );
4102
4103 clear_preset_env();
4104 }
4105
4106 #[test]
4107 fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
4108 use crate::auth::policies::{EvalContext, ResourceRef};
4109 use crate::auth::UserId;
4110 use crate::storage::schema::Value;
4111
4112 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4113 clear_preset_env();
4114
4115 let manifest_dir = std::env::current_dir()
4116 .expect("current dir")
4117 .join(".red/tmp/bootstrap-manifest-tests");
4118 std::fs::create_dir_all(&manifest_dir).expect("create manifest test dir");
4119 let manifest_path = manifest_dir.join(format!(
4120 "reddb-bootstrap-manifest-{}-{}.json",
4121 std::process::id(),
4122 std::time::SystemTime::now()
4123 .duration_since(std::time::UNIX_EPOCH)
4124 .unwrap_or_default()
4125 .as_millis()
4126 ));
4127 std::fs::write(
4128 &manifest_path,
4129 r#"{
4130 "users": [
4131 {
4132 "username": "ops",
4133 "password": "hunter2",
4134 "role": "admin"
4135 }
4136 ],
4137 "policies": [
4138 {
4139 "id": "bootstrap-registry-admin",
4140 "version": 1,
4141 "statements": [
4142 {
4143 "effect": "allow",
4144 "actions": ["red.registry:*", "policy:*", "config:write", "select"],
4145 "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
4146 }
4147 ]
4148 }
4149 ],
4150 "managed_policies": [
4151 {
4152 "id": "managed-deny-drop",
4153 "version": 1,
4154 "statements": [
4155 {
4156 "effect": "deny",
4157 "actions": ["policy:drop"],
4158 "resources": ["policy:managed-deny-drop"]
4159 }
4160 ],
4161 "required_resource": "policy:managed-deny-drop",
4162 "evidence": "full"
4163 }
4164 ],
4165 "attachments": [
4166 {"user": "ops", "policy": "bootstrap-registry-admin"}
4167 ],
4168 "managed_config_namespaces": [
4169 {
4170 "id": "red.ai",
4171 "required_action": "config:write",
4172 "required_resource": "config:red.ai.*",
4173 "evidence": "metadata"
4174 }
4175 ],
4176 "config": [
4177 {"key": "red.ai.default.provider", "value": "openai"},
4178 {
4179 "key": "red.ai.openai.default.secret_ref",
4180 "secret_ref": {"collection": "red.vault", "key": "openai"}
4181 }
4182 ],
4183 "actor": "ops"
4184 }"#,
4185 )
4186 .expect("write manifest");
4187 unsafe {
4189 std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
4190 }
4191
4192 let (runtime, auth_store) = fresh_runtime_and_store();
4193 apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
4194
4195 let users = auth_store.list_users();
4196 assert_eq!(users.len(), 1);
4197 assert_eq!(users[0].username, "ops");
4198 assert!(users[0].tenant_id.is_none());
4199
4200 let actor = UserId::platform("ops");
4201 let ctx = EvalContext {
4202 principal_tenant: None,
4203 current_tenant: None,
4204 peer_ip: None,
4205 mfa_present: false,
4206 now_ms: 1_700_000_000_000,
4207 principal_is_admin_role: true,
4208 principal_is_platform_scoped: true,
4209 };
4210 assert!(auth_store.check_policy_authz(
4212 &actor,
4213 "select",
4214 &ResourceRef::new("collection", "docs"),
4215 &ctx
4216 ));
4217
4218 let managed_policy = runtime
4219 .config_registry()
4220 .get_active("managed-deny-drop")
4221 .expect("managed policy registry entry");
4222 assert!(managed_policy.managed);
4223 assert_eq!(managed_policy.resource_type, "policy");
4224 let managed_config = runtime
4225 .config_registry()
4226 .get_active("red.ai")
4227 .expect("managed config namespace registry entry");
4228 assert!(managed_config.managed);
4229 assert_eq!(managed_config.resource_type, "config_namespace");
4230
4231 let store = runtime.db().store();
4232 match store
4233 .get_config("red.ai.default.provider")
4234 .expect("plain config persisted")
4235 {
4236 Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
4237 other => panic!("expected provider text, got {other:?}"),
4238 }
4239 let Value::Json(bytes) = store
4240 .get_config("red.ai.openai.default.secret_ref")
4241 .expect("secret ref config persisted")
4242 else {
4243 panic!("secret ref must be stored as structured JSON");
4244 };
4245 let reference: crate::serde_json::Value =
4246 crate::serde_json::from_slice(&bytes).expect("secret ref json");
4247 assert_eq!(
4248 reference.get("type").and_then(|v| v.as_str()),
4249 Some("secret_ref")
4250 );
4251 assert!(
4252 !String::from_utf8_lossy(&bytes).contains("hunter2"),
4253 "manifest password must not leak into secret ref config"
4254 );
4255
4256 let completed = store
4257 .get_config(BOOTSTRAP_COMPLETED_KEY)
4258 .expect("bootstrap completion persisted");
4259 assert!(matches!(completed, Value::Boolean(true)));
4260 assert!(
4261 store
4262 .get_config("system.bootstrap.manifest.registry_entries")
4263 .is_some(),
4264 "managed registry entries must be persisted internally"
4265 );
4266
4267 std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
4268 let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
4269 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
4270 .expect("registry rehydrates without manifest file");
4271 assert!(restored_registry.get_active("managed-deny-drop").is_some());
4272 assert!(restored_registry.get_active("red.ai").is_some());
4273
4274 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
4275 apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
4276 assert!(fresh.needs_bootstrap());
4277
4278 clear_preset_env();
4279 }
4280
4281 #[test]
4282 fn production_preset_refuses_to_start_without_password() {
4283 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4284 clear_preset_env();
4285 unsafe {
4287 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4288 std::env::set_var("REDDB_USERNAME", "ops");
4289 }
4290
4291 let (runtime, auth_store) = fresh_runtime_and_store();
4292 let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
4293 assert!(
4294 err.contains("REDDB_PASSWORD"),
4295 "error must name the missing env: {err}"
4296 );
4297
4298 assert!(auth_store.needs_bootstrap());
4300 assert!(runtime
4301 .db()
4302 .store()
4303 .get_config(BOOTSTRAP_COMPLETED_KEY)
4304 .is_none());
4305
4306 clear_preset_env();
4307 }
4308
4309 #[test]
4310 fn re_running_production_after_first_boot_is_a_silent_skip() {
4311 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4312 clear_preset_env();
4313 unsafe {
4315 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4316 std::env::set_var("REDDB_USERNAME", "ops");
4317 std::env::set_var("REDDB_PASSWORD", "hunter2");
4318 }
4319
4320 let (runtime, auth_store) = fresh_runtime_and_store();
4321 apply_preset(&runtime, &auth_store).expect("first apply");
4322 assert_eq!(auth_store.list_users().len(), 1);
4323
4324 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
4331 apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
4332 assert!(
4333 fresh.needs_bootstrap(),
4334 "re-run must not create a second admin"
4335 );
4336 assert!(
4337 fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
4338 "re-run must not re-install the allow-all policy on the fresh store"
4339 );
4340
4341 clear_preset_env();
4342 }
4343
4344 #[test]
4345 fn unrecognised_preset_value_is_rejected() {
4346 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4347 clear_preset_env();
4348 unsafe {
4350 std::env::set_var(PRESET_ENV, "weird");
4351 }
4352
4353 let (runtime, auth_store) = fresh_runtime_and_store();
4354 let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
4355 assert!(err.contains("weird"), "error must echo the value: {err}");
4356 assert!(auth_store.needs_bootstrap());
4357
4358 clear_preset_env();
4359 }
4360
4361 #[test]
4362 fn no_auth_short_circuits_preset_entirely() {
4363 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4364 clear_preset_env();
4365 unsafe {
4368 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4369 std::env::set_var("REDDB_USERNAME", "ops");
4370 std::env::set_var("REDDB_PASSWORD", "hunter2");
4371 }
4372
4373 let options = no_auth_test_config(true)
4374 .to_db_options()
4375 .expect("to_db_options");
4376 assert!(no_auth_active(&options));
4377
4378 let (runtime, auth_store) = fresh_runtime_and_store();
4381 if !no_auth_active(&options) {
4382 apply_preset(&runtime, &auth_store).expect("would apply preset");
4383 }
4384
4385 assert!(
4386 auth_store.needs_bootstrap(),
4387 "--no-auth must prevent any admin creation"
4388 );
4389 assert!(
4390 runtime
4391 .db()
4392 .store()
4393 .get_config(BOOTSTRAP_COMPLETED_KEY)
4394 .is_none(),
4395 "--no-auth must skip bootstrap-state persistence"
4396 );
4397
4398 clear_preset_env();
4399 }
4400
4401 #[test]
4402 fn implicit_bind_collision_degrades() {
4403 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
4404 let addr = held.local_addr().expect("held addr").to_string();
4405 let mut readiness = TransportReadiness::default();
4406
4407 let listener =
4408 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
4409
4410 assert!(listener.is_none());
4411 assert_eq!(readiness.active.len(), 0);
4412 assert_eq!(readiness.failed.len(), 1);
4413 assert!(!readiness.failed[0].explicit);
4414 assert_eq!(readiness.failed[0].bind_addr, addr);
4415 }
4416}