1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::storage::StorageProfileSelection;
13use crate::{
14 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
15};
16
17pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
18
19pub fn detect_runtime_config() -> RuntimeConfig {
21 let cpus = thread::available_parallelism()
22 .map(|n| n.get())
23 .unwrap_or(1);
24
25 let suggested_workers = cpus.saturating_sub(1).max(1);
27
28 RuntimeConfig {
29 available_cpus: cpus,
30 suggested_workers,
31 stack_size: 8 * 1024 * 1024, }
33}
34
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37 pub available_cpus: usize,
38 pub suggested_workers: usize,
39 pub stack_size: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerTransport {
44 Grpc,
45 Http,
46 Wire,
47}
48
49impl ServerTransport {
50 pub const fn as_str(self) -> &'static str {
51 match self {
52 Self::Grpc => "gRPC",
53 Self::Http => "HTTP",
54 Self::Wire => "wire",
55 }
56 }
57
58 pub const fn default_bind_addr(self) -> &'static str {
59 match self {
60 Self::Grpc => "127.0.0.1:55055",
61 Self::Http => "127.0.0.1:5000",
62 Self::Wire => "127.0.0.1:5050",
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct ServerCommandConfig {
69 pub path: Option<PathBuf>,
70 pub router_bind_addr: Option<String>,
71 pub router_bind_explicit: bool,
72 pub grpc_bind_addr: Option<String>,
73 pub grpc_bind_explicit: bool,
74 pub grpc_tls_bind_addr: Option<String>,
78 pub grpc_tls_cert: Option<PathBuf>,
84 pub grpc_tls_key: Option<PathBuf>,
87 pub grpc_tls_client_ca: Option<PathBuf>,
92 pub http_bind_addr: Option<String>,
93 pub http_bind_explicit: bool,
94 pub http_tls_bind_addr: Option<String>,
98 pub http_tls_cert: Option<PathBuf>,
101 pub http_tls_key: Option<PathBuf>,
104 pub http_tls_client_ca: Option<PathBuf>,
108 pub wire_bind_addr: Option<String>,
109 pub wire_bind_explicit: bool,
110 pub wire_tls_bind_addr: Option<String>,
112 pub wire_tls_cert: Option<PathBuf>,
114 pub wire_tls_key: Option<PathBuf>,
116 pub pg_bind_addr: Option<String>,
120 pub create_if_missing: bool,
121 pub read_only: bool,
122 pub role: String,
123 pub primary_addr: Option<String>,
124 pub storage_profile: StorageProfileSelection,
125 pub auth: bool,
128 pub require_auth: bool,
130 pub vault: bool,
131 pub no_auth: bool,
142 pub workers: Option<usize>,
144 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
147 pub http_limits_cli: crate::server::HttpLimitsCliInput,
152 pub ui: bool,
157 pub ui_dir: Option<PathBuf>,
161 pub bootstrap: BootstrapConfig,
164}
165
166#[derive(Debug, Clone, Default)]
167pub struct BootstrapConfig {
168 pub preset: Option<String>,
169 pub manifest: Option<PathBuf>,
170 pub admin_username: Option<String>,
171 pub admin_password: Option<String>,
172 pub cloud_head_admin: Option<String>,
173 pub cloud_head_admin_password: Option<String>,
174 pub customer_admin: Option<String>,
175 pub customer_admin_password: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct TransportListenerState {
180 pub transport: String,
181 pub bind_addr: String,
182 pub explicit: bool,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct TransportListenerFailure {
187 pub transport: String,
188 pub bind_addr: String,
189 pub explicit: bool,
190 pub reason: String,
191}
192
193#[derive(Debug, Clone, Default, PartialEq, Eq)]
194pub struct TransportReadiness {
195 pub active: Vec<TransportListenerState>,
196 pub failed: Vec<TransportListenerFailure>,
197}
198
199impl TransportReadiness {
200 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
201 self.active.push(TransportListenerState {
202 transport: transport.to_string(),
203 bind_addr: bind_addr.to_string(),
204 explicit,
205 });
206 }
207
208 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
209 self.failed.push(TransportListenerFailure {
210 transport: transport.to_string(),
211 bind_addr: bind_addr.to_string(),
212 explicit,
213 reason,
214 });
215 }
216}
217
218#[derive(Debug, Clone)]
219pub struct SystemdServiceConfig {
220 pub service_name: String,
221 pub binary_path: PathBuf,
222 pub run_user: String,
223 pub run_group: String,
224 pub data_path: PathBuf,
225 pub router_bind_addr: Option<String>,
226 pub grpc_bind_addr: Option<String>,
227 pub http_bind_addr: Option<String>,
228}
229
230impl SystemdServiceConfig {
231 pub fn data_dir(&self) -> PathBuf {
232 self.data_path
233 .parent()
234 .map(PathBuf::from)
235 .unwrap_or_else(|| PathBuf::from("."))
236 }
237
238 pub fn unit_path(&self) -> PathBuf {
239 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
240 }
241}
242
243pub fn default_telemetry_for_path(
248 path: Option<&std::path::Path>,
249) -> crate::telemetry::TelemetryConfig {
250 let log_dir = match path {
251 Some(p) => p
252 .parent()
253 .map(|parent| parent.join("logs"))
254 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
255 None => None, };
257 crate::telemetry::TelemetryConfig {
258 log_dir,
259 file_prefix: "reddb.log".to_string(),
260 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
261 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
262 crate::telemetry::LogFormat::Pretty
263 } else {
264 crate::telemetry::LogFormat::Json
265 },
266 rotation_keep_days: 14,
267 service_name: "reddb",
268 level_explicit: false,
270 format_explicit: false,
271 rotation_keep_days_explicit: false,
272 file_prefix_explicit: false,
273 log_dir_explicit: false,
274 log_file_disabled: false,
275 }
276}
277
278const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
285const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
286const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
287const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
291
292pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
300
301pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
304 options
305 .metadata
306 .get(NO_AUTH_META)
307 .map(|v| v == "true")
308 .unwrap_or(false)
309}
310
311const NO_AUTH_WARNING: &str =
316 "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
317
318impl ServerCommandConfig {
319 fn to_db_options(&self) -> Result<RedDBOptions, String> {
320 let mut options = match &self.path {
321 Some(path) => RedDBOptions::persistent(path),
322 None => RedDBOptions::in_memory(),
323 };
324
325 options.mode = StorageMode::Persistent;
326 options.create_if_missing = self.create_if_missing;
327 options.read_only = self.read_only
334 || env_nonempty("RED_READONLY")
335 .or_else(|| env_nonempty("REDDB_READONLY"))
336 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
337 .unwrap_or(false)
338 || self.path.as_ref().is_some_and(|data_path| {
339 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
340 data_path,
341 ))
342 .unwrap_or(false)
343 });
344
345 options.replication = match self.role.as_str() {
346 "primary" => ReplicationConfig::primary(),
347 "replica" => {
348 let primary_addr = self
349 .primary_addr
350 .clone()
351 .unwrap_or_else(|| "http://127.0.0.1:55055".to_string());
352 ReplicationConfig::replica(primary_addr)
359 }
360 _ => ReplicationConfig::standalone(),
361 };
362 options.storage_profile = self.storage_profile.validate()?;
363
364 let no_auth = self.no_auth || env_truthy("REDDB_NO_AUTH") || env_truthy("REDDB_DEV");
365 let preset = self.bootstrap.resolved_preset();
366 let preset_requires_auth = matches!(preset.as_str(), PRESET_PRODUCTION | PRESET_CLOUD);
367
368 if self.auth || env_truthy("REDDB_AUTH") || preset_requires_auth {
369 options.auth.enabled = true;
370 }
371 if self.require_auth || env_truthy("REDDB_REQUIRE_AUTH") || preset_requires_auth {
372 options.auth.enabled = true;
373 options.auth.require_auth = true;
374 }
375 if self.vault || env_truthy("REDDB_VAULT") || preset_requires_auth {
376 options.auth.vault_enabled = true;
377 }
378
379 if no_auth {
390 options.auth.enabled = false;
391 options.auth.require_auth = false;
392 options.auth.vault_enabled = false;
393 options
394 .metadata
395 .insert(NO_AUTH_META.to_string(), "true".to_string());
396 }
397
398 if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
403 options.control_events.compliance_mode = matches!(
404 raw.to_ascii_lowercase().as_str(),
405 "1" | "true" | "yes" | "on"
406 );
407 }
408 if preset == PRESET_REGULATED {
409 options.control_events.compliance_mode = true;
410 options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
411 }
412
413 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
418 Err(msg) => {
419 return Err(format!("backup bootstrap: {msg}"));
420 }
421 Ok(Some(cfg)) => {
422 apply_backup_config(&mut options, &cfg);
423 }
424 Ok(None) => {
425 configure_remote_backend_from_env(&mut options);
426 }
427 }
428
429 if options.remote_backend.is_some()
430 || options
431 .metadata
432 .contains_key(BACKUP_INTERVAL_META_CHECKPOINT)
433 {
434 let mut selection = options.storage_profile;
435 selection.managed_backup = true;
436 options.storage_profile = selection.validate()?;
437 }
438
439 Ok(options)
440 }
441
442 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
443 let mut transports = Vec::with_capacity(3);
444 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
445 transports.push(ServerTransport::Grpc);
446 }
447 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
448 transports.push(ServerTransport::Http);
449 }
450 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
451 transports.push(ServerTransport::Wire);
452 }
453 transports
454 }
455}
456
457fn env_nonempty(name: &str) -> Option<String> {
462 crate::utils::env_with_file_fallback(name)
463}
464
465fn env_truthy(name: &str) -> bool {
466 env_nonempty(name)
467 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
468 .unwrap_or(false)
469}
470
471impl BootstrapConfig {
472 fn resolved_preset(&self) -> String {
473 self.preset
474 .as_deref()
475 .map(str::trim)
476 .filter(|s| !s.is_empty())
477 .map(str::to_string)
478 .or_else(|| env_nonempty(BOOTSTRAP_PRESET_ENV).or_else(|| env_nonempty(PRESET_ENV)))
479 .unwrap_or_else(|| PRESET_SIMPLE.to_string())
480 }
481
482 fn selected_manifest(&self) -> Option<PathBuf> {
483 self.manifest.clone().or_else(|| {
484 env_nonempty(crate::cli::bootstrap_manifest::MANIFEST_ENV).map(PathBuf::from)
485 })
486 }
487
488 fn auth_bootstrap_input(&self) -> crate::cluster::AuthBootstrapInput {
492 use crate::cluster::AuthBootstrapInput;
493 if self.selected_manifest().is_some() {
494 return AuthBootstrapInput::Manifest;
495 }
496 let preset = self.resolved_preset();
497 let preset_creates_auth = matches!(
498 preset.as_str(),
499 PRESET_PRODUCTION | PRESET_CLOUD | PRESET_REGULATED
500 );
501 let credentials_present =
506 self.production_username().is_some() && self.production_password().is_some();
507 if preset_creates_auth || credentials_present {
508 AuthBootstrapInput::Env
509 } else {
510 AuthBootstrapInput::None
511 }
512 }
513
514 fn explicit_value(value: &Option<String>) -> Option<String> {
515 value
516 .as_deref()
517 .map(str::trim)
518 .filter(|s| !s.is_empty())
519 .map(str::to_string)
520 }
521
522 fn value_or_env(value: &Option<String>, env: &str) -> Option<String> {
523 Self::explicit_value(value).or_else(|| env_nonempty(env))
524 }
525
526 fn production_username(&self) -> Option<String> {
527 Self::value_or_env(&self.admin_username, "REDDB_USERNAME")
528 }
529
530 fn production_password(&self) -> Option<String> {
531 Self::value_or_env(&self.admin_password, "REDDB_PASSWORD")
532 }
533
534 fn cloud_head_username(&self) -> Option<String> {
535 Self::explicit_value(&self.cloud_head_admin)
536 .or_else(|| Self::explicit_value(&self.admin_username))
537 .or_else(|| env_nonempty("REDDB_CLOUD_HEAD_ADMIN"))
538 .or_else(|| env_nonempty("REDDB_USERNAME"))
539 }
540
541 fn cloud_head_password(&self) -> Option<String> {
542 Self::explicit_value(&self.cloud_head_admin_password)
543 .or_else(|| Self::explicit_value(&self.admin_password))
544 .or_else(|| env_nonempty("REDDB_CLOUD_HEAD_ADMIN_PASSWORD"))
545 .or_else(|| env_nonempty("REDDB_PASSWORD"))
546 }
547
548 fn customer_username(&self) -> Option<String> {
549 Self::value_or_env(&self.customer_admin, "REDDB_CUSTOMER_ADMIN")
550 }
551
552 fn customer_password(&self) -> Option<String> {
553 Self::value_or_env(
554 &self.customer_admin_password,
555 "REDDB_CUSTOMER_ADMIN_PASSWORD",
556 )
557 }
558
559 fn secret_env_vars_to_expand(&self, preset: &str) -> Vec<&'static str> {
560 let mut vars = Vec::new();
561 match preset {
562 PRESET_PRODUCTION => {
563 if Self::explicit_value(&self.admin_username).is_none() {
564 vars.push("REDDB_USERNAME");
565 }
566 if Self::explicit_value(&self.admin_password).is_none() {
567 vars.push("REDDB_PASSWORD");
568 }
569 }
570 PRESET_CLOUD => {
571 if Self::explicit_value(&self.cloud_head_admin).is_none()
572 && Self::explicit_value(&self.admin_username).is_none()
573 {
574 vars.push("REDDB_USERNAME");
575 }
576 if Self::explicit_value(&self.cloud_head_admin_password).is_none()
577 && Self::explicit_value(&self.admin_password).is_none()
578 {
579 vars.push("REDDB_CLOUD_HEAD_ADMIN_PASSWORD");
580 vars.push("REDDB_PASSWORD");
581 }
582 if Self::explicit_value(&self.customer_admin_password).is_none() {
583 vars.push("REDDB_CUSTOMER_ADMIN_PASSWORD");
584 }
585 }
586 _ => {}
587 }
588 vars
589 }
590}
591
592fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
598 let endpoint_host = endpoint_host(&cfg.endpoint);
599
600 options.metadata.insert(
601 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
602 cfg.checkpoint_interval_secs.to_string(),
603 );
604 options.metadata.insert(
605 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
606 cfg.wal_flush_interval_secs.to_string(),
607 );
608 options
609 .metadata
610 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
611 options.metadata.insert(
612 BACKUP_PAUSE_ON_LAG_META.to_string(),
613 cfg.pause_on_lag_secs.to_string(),
614 );
615
616 #[cfg(feature = "backend-s3")]
617 {
618 let s3_cfg = crate::storage::backend::S3Config {
619 endpoint: cfg.endpoint.clone(),
620 bucket: cfg.bucket.clone(),
621 key_prefix: cfg.prefix.clone(),
622 access_key: cfg.access_key_id.clone(),
623 secret_key: cfg.secret_access_key.clone(),
624 region: cfg.region.clone(),
625 path_style: true,
626 };
627 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
628 options.remote_backend = Some(backend.clone());
629 options.remote_backend_atomic = Some(backend);
630 let trimmed = cfg.prefix.trim_end_matches('/');
635 options.remote_key = Some(reddb_file::remote_database_key(trimmed));
636
637 tracing::info!(
638 backend = "s3",
639 endpoint = %endpoint_host,
640 bucket = %cfg.bucket,
641 prefix = %cfg.prefix,
642 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
643 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
644 "backup backend configured from REDDB_BACKUP_* env"
645 );
646 }
647
648 #[cfg(not(feature = "backend-s3"))]
649 {
650 tracing::warn!(
651 backend = "s3",
652 endpoint = %endpoint_host,
653 bucket = %cfg.bucket,
654 prefix = %cfg.prefix,
655 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
656 backend wiring skipped (archiver/checkpointer also disabled)"
657 );
658 }
659}
660
661fn endpoint_host(endpoint: &str) -> &str {
662 let after_scheme = endpoint
663 .split_once("://")
664 .map(|(_, r)| r)
665 .unwrap_or(endpoint);
666 after_scheme.split('/').next().unwrap_or(after_scheme)
667}
668
669fn spawn_backup_tasks_if_configured(
675 options: &RedDBOptions,
676 runtime: &RedDBRuntime,
677) -> Option<BackupTasksHandle> {
678 let checkpoint_secs: u64 = options
679 .metadata
680 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
681 .parse()
682 .ok()?;
683 let wal_secs: u64 = options
684 .metadata
685 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
686 .parse()
687 .ok()?;
688 let pause_on_lag_secs: u64 = options
691 .metadata
692 .get(BACKUP_PAUSE_ON_LAG_META)
693 .and_then(|raw| raw.parse().ok())
694 .unwrap_or(0);
695 options.remote_backend.as_ref()?;
696
697 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
698
699 if pause_on_lag_secs > 0 {
704 let now_ms = std::time::SystemTime::now()
705 .duration_since(std::time::UNIX_EPOCH)
706 .map(|d| d.as_millis() as u64)
707 .unwrap_or(0);
708 runtime
709 .write_gate()
710 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
711 tracing::info!(
712 pause_on_lag_secs,
713 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
714 );
715 }
716
717 let checkpoint_handle = {
718 let stop = Arc::clone(&stop);
719 let runtime = runtime.clone();
720 let interval = Duration::from_secs(checkpoint_secs);
721 thread::Builder::new()
722 .name("red-checkpointer".into())
723 .spawn(move || {
724 periodic_loop(stop, interval, move || {
725 if let Err(err) = runtime.checkpoint() {
726 tracing::warn!(error = %err, "periodic checkpoint failed");
727 }
728 })
729 })
730 .ok()
731 };
732
733 let archiver_handle = {
734 let stop = Arc::clone(&stop);
735 let runtime = runtime.clone();
736 let interval = Duration::from_secs(wal_secs);
737 let lag_enabled = pause_on_lag_secs > 0;
738 thread::Builder::new()
739 .name("red-wal-archiver".into())
740 .spawn(move || {
741 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
742 Ok(_) if lag_enabled => {
743 let now_ms = std::time::SystemTime::now()
744 .duration_since(std::time::UNIX_EPOCH)
745 .map(|d| d.as_millis() as u64)
746 .unwrap_or(0);
747 runtime.write_gate().record_archive_success(now_ms);
748 runtime.write_gate().evaluate_archive_lag(now_ms);
752 }
753 Ok(_) => {}
754 Err(err) => {
755 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
756 }
757 })
758 })
759 .ok()
760 };
761
762 let lag_monitor_handle = if pause_on_lag_secs > 0 {
767 let stop = Arc::clone(&stop);
768 let runtime = runtime.clone();
769 let interval = Duration::from_secs(5);
773 thread::Builder::new()
774 .name("red-archive-lag-monitor".into())
775 .spawn(move || {
776 periodic_loop(stop, interval, move || {
777 let now_ms = std::time::SystemTime::now()
778 .duration_since(std::time::UNIX_EPOCH)
779 .map(|d| d.as_millis() as u64)
780 .unwrap_or(0);
781 let was_paused = runtime.write_gate().is_auto_paused();
782 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
783 if now_paused && !was_paused {
784 tracing::warn!(
785 pause_on_lag_secs,
786 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
787 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
788 );
789 } else if !now_paused && was_paused {
790 tracing::info!(
791 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
792 );
793 }
794 })
795 })
796 .ok()
797 } else {
798 None
799 };
800
801 tracing::info!(
802 checkpoint_interval_secs = checkpoint_secs,
803 wal_flush_interval_secs = wal_secs,
804 "backup tasks spawned (checkpointer + WAL archiver)"
805 );
806
807 Some(BackupTasksHandle {
808 stop,
809 _checkpoint_handle: checkpoint_handle,
810 _archiver_handle: archiver_handle,
811 _lag_monitor_handle: lag_monitor_handle,
812 })
813}
814
815pub struct BackupTasksHandle {
818 stop: Arc<std::sync::atomic::AtomicBool>,
819 _checkpoint_handle: Option<thread::JoinHandle<()>>,
820 _archiver_handle: Option<thread::JoinHandle<()>>,
821 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
824}
825
826impl Drop for BackupTasksHandle {
827 fn drop(&mut self) {
828 self.stop.store(true, std::sync::atomic::Ordering::Release);
829 }
830}
831
832fn periodic_loop<F: FnMut()>(
833 stop: Arc<std::sync::atomic::AtomicBool>,
834 interval: Duration,
835 mut tick: F,
836) {
837 let wake = Duration::from_secs(1);
840 let mut elapsed = Duration::ZERO;
841 while !stop.load(std::sync::atomic::Ordering::Acquire) {
842 thread::sleep(wake);
843 elapsed += wake;
844 if elapsed >= interval {
845 tick();
846 elapsed = Duration::ZERO;
847 }
848 }
849}
850
851fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
852 let backend = env_nonempty("RED_BACKEND")
858 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
859 .unwrap_or_else(|| "none".to_string())
860 .to_ascii_lowercase();
861
862 match backend.as_str() {
863 "s3" | "minio" | "r2" => {
868 #[cfg(feature = "backend-s3")]
869 {
870 if let Some(config) = s3_config_from_env() {
871 let remote_key = env_nonempty("RED_REMOTE_KEY")
872 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
873 .unwrap_or_else(|| reddb_file::remote_database_key("clusters/dev"));
874 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
875 options.remote_backend = Some(backend.clone());
876 options.remote_backend_atomic = Some(backend);
877 options.remote_key = Some(remote_key);
878 }
879 }
880 #[cfg(not(feature = "backend-s3"))]
881 {
882 tracing::warn!(
883 backend = %backend,
884 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
885 );
886 }
887 }
888 "fs" | "local" => {
893 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
894 let remote_key = match (
895 base_path,
896 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
897 ) {
898 (Some(base), Some(rel)) => Some(format!(
899 "{}/{}",
900 base.trim_end_matches('/'),
901 rel.trim_start_matches('/')
902 )),
903 (Some(base), None) => Some(reddb_file::remote_database_key(&format!(
904 "{}/clusters/dev",
905 base.trim_end_matches('/')
906 ))),
907 (None, Some(rel)) => Some(rel),
908 (None, None) => None,
909 };
910 if let Some(key) = remote_key {
911 let backend = Arc::new(crate::storage::backend::LocalBackend);
912 options.remote_backend = Some(backend.clone());
913 options.remote_backend_atomic = Some(backend);
914 options.remote_key = Some(key);
915 }
916 }
917 "http" => {
922 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
923 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
924 {
925 Some(u) => u,
926 None => {
927 tracing::warn!(
928 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
929 );
930 return;
931 }
932 };
933 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
934 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
935 .unwrap_or_default();
936 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
937 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
938 {
939 std::fs::read_to_string(&path)
940 .ok()
941 .map(|s| s.trim().to_string())
942 .filter(|s| !s.is_empty())
943 } else {
944 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
945 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
946 };
947
948 let mut config =
949 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
950 if let Some(auth) = auth_header {
951 config = config.with_auth_header(auth);
952 }
953 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
954 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
955 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
956 config = config.with_conditional_writes(conditional_writes);
957 if conditional_writes {
962 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
963 Ok(atomic) => {
964 let atomic_arc = Arc::new(atomic);
965 options.remote_backend = Some(atomic_arc.clone());
966 options.remote_backend_atomic = Some(atomic_arc);
967 }
968 Err(err) => {
969 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
970 options.remote_backend =
971 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
972 }
973 }
974 } else {
975 options.remote_backend =
976 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
977 }
978 options.remote_key = env_nonempty("RED_REMOTE_KEY")
979 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
980 .or_else(|| Some(reddb_file::remote_database_key("clusters/dev")));
981 }
982 "none" | "" => {}
985 other => {
986 tracing::warn!(
987 backend = %other,
988 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
989 );
990 }
991 }
992}
993
994#[cfg(feature = "backend-s3")]
999fn env_s3(suffix: &str) -> Option<String> {
1000 env_nonempty(&format!("RED_S3_{suffix}"))
1001 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
1002}
1003
1004#[cfg(feature = "backend-s3")]
1010fn env_s3_secret(suffix: &str) -> Option<String> {
1011 let file_key_red = format!("RED_S3_{suffix}_FILE");
1012 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
1013 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
1014 return std::fs::read_to_string(&path)
1015 .ok()
1016 .map(|s| s.trim().to_string())
1017 .filter(|s| !s.is_empty());
1018 }
1019 env_s3(suffix)
1020}
1021
1022#[cfg(feature = "backend-s3")]
1023fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
1024 let endpoint = env_s3("ENDPOINT")?;
1025 let bucket = env_s3("BUCKET")?;
1026 let access_key = env_s3_secret("ACCESS_KEY")?;
1027 let secret_key = env_s3_secret("SECRET_KEY")?;
1028 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
1029 let key_prefix = env_s3("KEY_PREFIX")
1030 .or_else(|| env_s3("PREFIX"))
1031 .unwrap_or_default();
1032 let path_style = env_s3("PATH_STYLE")
1033 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1034 .unwrap_or(true);
1035 Some(crate::storage::backend::S3Config {
1036 endpoint,
1037 bucket,
1038 key_prefix,
1039 access_key,
1040 secret_key,
1041 region,
1042 path_style,
1043 })
1044}
1045
1046pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
1047 let data_dir = config.data_dir();
1048 let exec_start = render_systemd_exec_start(config);
1049 format!(
1050 "[Unit]\n\
1051Description=RedDB unified database service\n\
1052After=network-online.target\n\
1053Wants=network-online.target\n\
1054\n\
1055[Service]\n\
1056Type=simple\n\
1057User={user}\n\
1058Group={group}\n\
1059WorkingDirectory={workdir}\n\
1060ExecStart={exec_start}\n\
1061Restart=always\n\
1062RestartSec=2\n\
1063LimitSTACK=16M\n\
1064NoNewPrivileges=true\n\
1065PrivateTmp=true\n\
1066ProtectSystem=strict\n\
1067ProtectHome=true\n\
1068ProtectControlGroups=true\n\
1069ProtectKernelTunables=true\n\
1070ProtectKernelModules=true\n\
1071RestrictNamespaces=true\n\
1072LockPersonality=true\n\
1073MemoryDenyWriteExecute=true\n\
1074ReadWritePaths={workdir}\n\
1075\n\
1076[Install]\n\
1077WantedBy=multi-user.target\n",
1078 user = config.run_user,
1079 group = config.run_group,
1080 workdir = data_dir.display(),
1081 exec_start = exec_start,
1082 )
1083}
1084
1085#[cfg(target_os = "linux")]
1094pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
1095 ensure_root()?;
1096 ensure_command_available("systemctl")?;
1097 ensure_command_available("getent")?;
1098 ensure_command_available("groupadd")?;
1099 ensure_command_available("useradd")?;
1100 ensure_command_available("install")?;
1101 ensure_executable(&config.binary_path)?;
1102
1103 if !command_success("getent", ["group", config.run_group.as_str()])? {
1104 run_command("groupadd", ["--system", config.run_group.as_str()])?;
1105 }
1106
1107 if !command_success("id", ["-u", config.run_user.as_str()])? {
1108 let data_dir = config.data_dir();
1109 run_command(
1110 "useradd",
1111 [
1112 "--system",
1113 "--gid",
1114 config.run_group.as_str(),
1115 "--home-dir",
1116 data_dir.to_string_lossy().as_ref(),
1117 "--shell",
1118 "/usr/sbin/nologin",
1119 config.run_user.as_str(),
1120 ],
1121 )?;
1122 }
1123
1124 let data_dir = config.data_dir();
1125 run_command(
1126 "install",
1127 [
1128 "-d",
1129 "-o",
1130 config.run_user.as_str(),
1131 "-g",
1132 config.run_group.as_str(),
1133 "-m",
1134 "0750",
1135 data_dir.to_string_lossy().as_ref(),
1136 ],
1137 )?;
1138
1139 std::fs::write(config.unit_path(), render_systemd_unit(config))
1140 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
1141
1142 run_command("systemctl", ["daemon-reload"])?;
1143 run_command(
1144 "systemctl",
1145 [
1146 "enable",
1147 "--now",
1148 format!("{}.service", config.service_name).as_str(),
1149 ],
1150 )?;
1151
1152 Ok(())
1153}
1154
1155#[cfg(not(target_os = "linux"))]
1160pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
1161 Err("systemd install is Linux-only — use sc.exe (Windows) or \
1162 launchd (macOS) to install the service manually using the \
1163 unit printed by `red service print-unit`"
1164 .to_string())
1165}
1166
1167#[cfg(target_os = "linux")]
1168fn ensure_root() -> Result<(), String> {
1169 let output = Command::new("id")
1170 .arg("-u")
1171 .output()
1172 .map_err(|err| format!("failed to determine current uid: {err}"))?;
1173 if !output.status.success() {
1174 return Err("failed to determine current uid".to_string());
1175 }
1176 let uid = String::from_utf8_lossy(&output.stdout);
1177 if uid.trim() != "0" {
1178 return Err("run this command as root (sudo)".to_string());
1179 }
1180 Ok(())
1181}
1182
1183#[cfg(target_os = "linux")]
1184fn ensure_command_available(command: &str) -> Result<(), String> {
1185 let status = Command::new("sh")
1186 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1187 .status()
1188 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1189 if status.success() {
1190 Ok(())
1191 } else {
1192 Err(format!("required command not found: {command}"))
1193 }
1194}
1195
1196#[cfg(target_os = "linux")]
1197fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1198 let metadata = std::fs::metadata(path)
1199 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1200 #[cfg(unix)]
1201 {
1202 use std::os::unix::fs::PermissionsExt;
1203 if metadata.permissions().mode() & 0o111 == 0 {
1204 return Err(format!("binary is not executable: {}", path.display()));
1205 }
1206 }
1207 #[cfg(not(unix))]
1208 {
1209 if !metadata.is_file() {
1210 return Err(format!("binary is not a file: {}", path.display()));
1211 }
1212 }
1213 Ok(())
1214}
1215
1216#[cfg(target_os = "linux")]
1217fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1218 Command::new(program)
1219 .args(args)
1220 .status()
1221 .map(|status| status.success())
1222 .map_err(|err| format!("failed to run {program}: {err}"))
1223}
1224
1225#[cfg(target_os = "linux")]
1226fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1227 let output = Command::new(program)
1228 .args(args)
1229 .output()
1230 .map_err(|err| format!("failed to run {program}: {err}"))?;
1231 if output.status.success() {
1232 return Ok(());
1233 }
1234
1235 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1236 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1237 let detail = if !stderr.is_empty() {
1238 stderr
1239 } else if !stdout.is_empty() {
1240 stdout
1241 } else {
1242 format!("exit status {}", output.status)
1243 };
1244 Err(format!("{program} failed: {detail}"))
1245}
1246
1247pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1248 let has_any = config.router_bind_addr.is_some()
1249 || config.grpc_bind_addr.is_some()
1250 || config.http_bind_addr.is_some()
1251 || config.wire_bind_addr.is_some()
1252 || config.pg_bind_addr.is_some();
1253 if !has_any {
1254 return Err("at least one server bind address must be configured".into());
1255 }
1256 let thread_name = if config.router_bind_addr.is_some() {
1257 "red-server-router"
1258 } else {
1259 match (
1260 config.grpc_bind_addr.is_some(),
1261 config.http_bind_addr.is_some(),
1262 ) {
1263 (true, true) => "red-server-dual",
1264 (true, false) => "red-server-grpc",
1265 (false, true) => "red-server-http",
1266 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1267 (false, false) => "red-server-pg-wire",
1268 }
1269 };
1270
1271 let handle = thread::Builder::new()
1272 .name(thread_name.into())
1273 .stack_size(8 * 1024 * 1024)
1274 .spawn(move || run_configured_servers(config))
1275 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1276
1277 match handle.join() {
1278 Ok(result) => result,
1279 Err(_) => Err("server thread panicked".to_string()),
1280 }
1281}
1282
1283fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1284 let mut parts = vec![
1285 config.binary_path.display().to_string(),
1286 "server".to_string(),
1287 "--path".to_string(),
1288 config.data_path.display().to_string(),
1289 ];
1290
1291 if let Some(bind_addr) = &config.router_bind_addr {
1292 parts.push("--bind".to_string());
1293 parts.push(bind_addr.clone());
1294 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1295 parts.push("--grpc-bind".to_string());
1296 parts.push(bind_addr.clone());
1297 }
1298 if let Some(bind_addr) = &config.http_bind_addr {
1299 parts.push("--http-bind".to_string());
1300 parts.push(bind_addr.clone());
1301 }
1302
1303 parts.join(" ")
1304}
1305
1306pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1307 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1308 Ok(addresses) => addresses.collect(),
1309 Err(_) => return false,
1310 };
1311
1312 addresses
1313 .into_iter()
1314 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1315}
1316
1317#[inline(never)]
1318fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1319 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1325 return run_routed_server(config, router_bind_addr);
1326 }
1327
1328 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1329 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1330 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1331 }
1332 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1333 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1334 (None, None) => {
1335 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1336 run_wire_only_server(config, wire_addr)
1337 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1338 run_pg_only_server(config, pg_addr)
1339 } else {
1340 Err("at least one server bind address must be configured".to_string())
1341 }
1342 }
1343 }
1344}
1345
1346pub fn bind_listener_for_startup(
1364 readiness: &mut TransportReadiness,
1365 transport: &str,
1366 bind_addr: &str,
1367 explicit: bool,
1368) -> Result<Option<TcpListener>, String> {
1369 match TcpListener::bind(bind_addr) {
1370 Ok(listener) => {
1371 readiness.active(transport, bind_addr, explicit);
1372 Ok(Some(listener))
1373 }
1374 Err(err) => {
1375 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1376 readiness.failed(transport, bind_addr, explicit, reason.clone());
1377 if explicit {
1378 tracing::error!(
1379 transport,
1380 bind = %bind_addr,
1381 error = %err,
1382 "fatal explicit bind failure"
1383 );
1384 Err(format!("explicit {reason}"))
1385 } else {
1386 tracing::warn!(
1387 transport,
1388 bind = %bind_addr,
1389 error = %err,
1390 "non-fatal implicit bind failure; listener degraded"
1391 );
1392 Ok(None)
1393 }
1394 }
1395 }
1396}
1397
1398async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1421 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1422 .ok()
1423 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1424 .unwrap_or(true);
1425
1426 #[cfg(unix)]
1427 {
1428 use tokio::signal::unix::{signal, SignalKind};
1429
1430 let mut sigterm = match signal(SignalKind::terminate()) {
1431 Ok(s) => s,
1432 Err(err) => {
1433 tracing::warn!(
1434 error = %err,
1435 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1436 );
1437 return;
1438 }
1439 };
1440 let mut sigint = match signal(SignalKind::interrupt()) {
1441 Ok(s) => s,
1442 Err(err) => {
1443 tracing::warn!(error = %err, "could not install SIGINT handler");
1444 return;
1445 }
1446 };
1447 let mut sighup = match signal(SignalKind::hangup()) {
1453 Ok(s) => Some(s),
1454 Err(err) => {
1455 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1456 None
1457 }
1458 };
1459
1460 let reload_runtime = runtime.clone();
1461 tokio::spawn(async move {
1462 loop {
1463 let signal_name = match &mut sighup {
1464 Some(hup) => tokio::select! {
1465 _ = sigterm.recv() => "SIGTERM",
1466 _ = sigint.recv() => "SIGINT",
1467 _ = hup.recv() => "SIGHUP",
1468 },
1469 None => tokio::select! {
1470 _ = sigterm.recv() => "SIGTERM",
1471 _ = sigint.recv() => "SIGINT",
1472 },
1473 };
1474
1475 if signal_name == "SIGHUP" {
1476 handle_sighup_reload(&reload_runtime);
1477 continue; }
1479
1480 tracing::info!(
1481 signal = signal_name,
1482 "lifecycle signal received; shutting down"
1483 );
1484 match runtime.graceful_shutdown(backup_on_shutdown) {
1485 Ok(report) => {
1486 tracing::info!(
1487 duration_ms = report.duration_ms,
1488 flushed_wal = report.flushed_wal,
1489 final_checkpoint = report.final_checkpoint,
1490 backup_uploaded = report.backup_uploaded,
1491 "graceful shutdown complete"
1492 );
1493 }
1494 Err(err) => {
1495 tracing::error!(error = %err, "graceful shutdown failed");
1496 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1502 reason: format!("graceful shutdown failed: {err}"),
1503 }
1504 .emit_global();
1505 }
1506 }
1507 std::process::exit(0);
1508 }
1509 });
1510 }
1511
1512 #[cfg(not(unix))]
1513 {
1514 tokio::spawn(async move {
1515 let interrupted = tokio::signal::ctrl_c().await;
1516 if let Err(err) = interrupted {
1517 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1518 return;
1519 }
1520
1521 tracing::info!(
1522 signal = "Ctrl+C",
1523 "lifecycle signal received; shutting down"
1524 );
1525 match runtime.graceful_shutdown(backup_on_shutdown) {
1526 Ok(report) => {
1527 tracing::info!(
1528 duration_ms = report.duration_ms,
1529 flushed_wal = report.flushed_wal,
1530 final_checkpoint = report.final_checkpoint,
1531 backup_uploaded = report.backup_uploaded,
1532 "graceful shutdown complete"
1533 );
1534 }
1535 Err(err) => {
1536 tracing::error!(error = %err, "graceful shutdown failed");
1537 }
1538 }
1539 std::process::exit(0);
1540 });
1541 }
1542}
1543
1544fn handle_sighup_reload(runtime: &RedDBRuntime) {
1553 let now_ms = std::time::SystemTime::now()
1554 .duration_since(std::time::UNIX_EPOCH)
1555 .map(|d| d.as_millis() as u64)
1556 .unwrap_or(0);
1557 tracing::info!(
1558 target: "reddb::secrets",
1559 ts_unix_ms = now_ms,
1560 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1561 );
1562 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1567 runtime.audit_log().record_event(
1568 AuditEvent::builder("config/sighup_reload")
1569 .source(AuditAuthSource::System)
1570 .resource("secrets")
1571 .outcome(Outcome::Success)
1572 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1573 .build(),
1574 );
1575}
1576
1577#[inline(never)]
1578fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1579 let workers = config.workers;
1580 let db_options = config.to_db_options()?;
1581 let rt_config = detect_runtime_config();
1582 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1583 let (runtime, auth_store, _telemetry_guard) =
1584 build_runtime_and_auth_store(&config, db_options.clone())?;
1585 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1586
1587 spawn_admin_metrics_listeners(&runtime, &auth_store);
1588
1589 let http_server = build_http_server(
1595 runtime.clone(),
1596 auth_store.clone(),
1597 router_bind_addr.clone(),
1598 );
1599 let http_server = apply_http_limits(http_server, &config, &runtime);
1600 let http_server = apply_ui_bundle(http_server, &config)?;
1601
1602 let grpc_server = RedDBGrpcServer::with_options(
1603 runtime.clone(),
1604 GrpcServerOptions {
1605 bind_addr: router_bind_addr.clone(),
1606 tls: None,
1607 },
1608 auth_store,
1609 );
1610
1611 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1612 .enable_all()
1613 .worker_threads(worker_threads)
1614 .thread_stack_size(rt_config.stack_size)
1615 .build()
1616 .map_err(|err| format!("tokio runtime: {err}"))?;
1617
1618 let signal_runtime = runtime.clone();
1619 let wire_runtime = Arc::new(runtime);
1620 tokio_runtime.block_on(async move {
1621 spawn_lifecycle_signal_handler(signal_runtime).await;
1622 tracing::info!(
1623 bind = %router_bind_addr,
1624 cpus = rt_config.available_cpus,
1625 workers = worker_threads,
1626 "router bootstrapping"
1627 );
1628 serve_tcp_router(InProcessRouterConfig {
1629 bind_addr: router_bind_addr,
1630 http_server,
1631 grpc_server,
1632 wire_runtime,
1633 })
1634 .await
1635 .map_err(|err| err.to_string())
1636 })
1637}
1638
1639async fn spawn_wire_listeners(
1641 config: &ServerCommandConfig,
1642 runtime: &RedDBRuntime,
1643 readiness: &mut TransportReadiness,
1644) -> Result<(), String> {
1645 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1647 let wire_rt = Arc::new(runtime.clone());
1648 #[cfg(unix)]
1651 {
1652 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1653 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1654 tokio::spawn(async move {
1655 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1656 &wire_addr, wire_rt,
1657 )
1658 .await
1659 {
1660 tracing::error!(err = %e, "redwire unix listener error");
1661 }
1662 });
1663 return Ok(());
1664 }
1665 }
1666 match tokio::net::TcpListener::bind(&wire_addr).await {
1667 Ok(listener) => {
1668 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1669 tokio::spawn(async move {
1670 if let Err(e) =
1671 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1672 .await
1673 {
1674 tracing::error!(err = %e, "redwire listener error");
1675 }
1676 });
1677 }
1678 Err(err) => {
1679 let reason = format!("wire listener bind {wire_addr}: {err}");
1680 readiness.failed(
1681 "wire",
1682 &wire_addr,
1683 config.wire_bind_explicit,
1684 reason.clone(),
1685 );
1686 if config.wire_bind_explicit {
1687 tracing::error!(
1688 transport = "wire",
1689 bind = %wire_addr,
1690 error = %err,
1691 "fatal explicit bind failure"
1692 );
1693 return Err(format!("explicit {reason}"));
1694 }
1695 tracing::warn!(
1696 transport = "wire",
1697 bind = %wire_addr,
1698 error = %err,
1699 "non-fatal implicit bind failure; listener degraded"
1700 );
1701 }
1702 }
1703 }
1704
1705 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1707 let tls_config = resolve_wire_tls_config(config);
1708 match tls_config {
1709 Ok(tls_cfg) => {
1710 let wire_rt = Arc::new(runtime.clone());
1711 tokio::spawn(async move {
1712 if let Err(e) =
1713 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1714 .await
1715 {
1716 tracing::error!(err = %e, "redwire+tls listener error");
1717 }
1718 });
1719 }
1720 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1721 }
1722 }
1723 Ok(())
1724}
1725
1726fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1733 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1734 let rt = Arc::new(runtime.clone());
1735 tokio::spawn(async move {
1736 let cfg = crate::wire::PgWireConfig {
1737 bind_addr: pg_addr,
1738 ..Default::default()
1739 };
1740 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1741 tracing::error!(err = %e, "pg wire listener error");
1742 }
1743 });
1744 }
1745}
1746
1747fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1761 use crate::utils::secret_file::expand_file_env;
1762
1763 for var in [
1767 "REDDB_GRPC_TLS_CERT",
1768 "REDDB_GRPC_TLS_KEY",
1769 "REDDB_GRPC_TLS_CLIENT_CA",
1770 ] {
1771 if let Err(err) = expand_file_env(var) {
1772 tracing::warn!(
1773 target: "reddb::secrets",
1774 env = %var,
1775 err = %err,
1776 "could not expand *_FILE companion for gRPC TLS"
1777 );
1778 }
1779 }
1780
1781 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1782 (Some(cert), Some(key)) => {
1783 let cert_pem = std::fs::read(cert)
1784 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1785 let key_pem =
1786 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1787 (cert_pem, key_pem)
1788 }
1789 _ => {
1790 let dev = std::env::var("RED_GRPC_TLS_DEV")
1792 .ok()
1793 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1794 .unwrap_or(false);
1795 if !dev {
1796 return Err("gRPC TLS configured but no cert/key supplied — set \
1797 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1798 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1799 .to_string());
1800 }
1801 let dir = config
1802 .path
1803 .as_ref()
1804 .and_then(|p| p.parent())
1805 .map(PathBuf::from)
1806 .unwrap_or_else(|| PathBuf::from("."));
1807 let (cert_pem_str, key_pem_str) =
1808 crate::wire::tls::generate_self_signed_cert("localhost")
1809 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1810
1811 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1816 tracing::warn!(
1817 target: "reddb::security",
1818 transport = "grpc",
1819 cert_sha256 = %fp,
1820 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1821 DO NOT use in production"
1822 );
1823 let cert_path = dir.join("grpc-tls-cert.pem");
1825 let key_path = dir.join("grpc-tls-key.pem");
1826 if !cert_path.exists() || !key_path.exists() {
1827 let _ = std::fs::create_dir_all(&dir);
1828 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1829 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1830 std::fs::write(&key_path, key_pem_str.as_bytes())
1831 .map_err(|e| format!("write grpc dev key: {e}"))?;
1832 #[cfg(unix)]
1833 {
1834 use std::os::unix::fs::PermissionsExt;
1835 let _ =
1836 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1837 }
1838 }
1839 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1840 }
1841 };
1842
1843 let client_ca_pem = match &config.grpc_tls_client_ca {
1844 Some(path) => Some(
1845 std::fs::read(path)
1846 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1847 ),
1848 None => None,
1849 };
1850
1851 Ok(crate::GrpcTlsOptions {
1852 cert_pem,
1853 key_pem,
1854 client_ca_pem,
1855 })
1856}
1857
1858fn spawn_grpc_tls_listener_if_configured(
1862 config: &ServerCommandConfig,
1863 runtime: RedDBRuntime,
1864 auth_store: Arc<AuthStore>,
1865) {
1866 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1867 return;
1868 };
1869 let tls_opts = match resolve_grpc_tls_options(config) {
1870 Ok(opts) => opts,
1871 Err(err) => {
1872 tracing::error!(
1873 target: "reddb::security",
1874 transport = "grpc",
1875 err = %err,
1876 "gRPC TLS config error; TLS listener will not start"
1877 );
1878 return;
1879 }
1880 };
1881 tokio::spawn(async move {
1882 let server = RedDBGrpcServer::with_options(
1883 runtime,
1884 GrpcServerOptions {
1885 bind_addr: tls_bind.clone(),
1886 tls: Some(tls_opts),
1887 },
1888 auth_store,
1889 );
1890 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1891 if let Err(err) = server.serve().await {
1892 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1893 }
1894 });
1895}
1896
1897fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1900 use sha2::{Digest, Sha256};
1901 let mut h = Sha256::new();
1902 h.update(pem);
1903 let d = h.finalize();
1904 let mut buf = String::with_capacity(64);
1905 for b in d.iter() {
1906 buf.push_str(&format!("{b:02x}"));
1907 }
1908 buf
1909}
1910
1911fn resolve_wire_tls_config(
1913 config: &ServerCommandConfig,
1914) -> Result<crate::wire::WireTlsConfig, String> {
1915 match (&config.wire_tls_cert, &config.wire_tls_key) {
1916 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1917 cert_path: cert.clone(),
1918 key_path: key.clone(),
1919 }),
1920 _ => {
1921 let dir = config
1923 .path
1924 .as_ref()
1925 .and_then(|p| p.parent())
1926 .map(PathBuf::from)
1927 .unwrap_or_else(|| PathBuf::from("."));
1928 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1929 }
1930 }
1931}
1932
1933#[inline(never)]
1934fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1935 let rt_config = detect_runtime_config();
1936 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1937 let db_options = config.to_db_options()?;
1938 let mut transport_readiness = TransportReadiness::default();
1939
1940 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1941 .enable_all()
1942 .worker_threads(workers)
1943 .thread_stack_size(rt_config.stack_size)
1944 .build()
1945 .map_err(|err| format!("tokio runtime: {err}"))?;
1946
1947 let (runtime, _auth_store, _telemetry_guard) =
1951 build_runtime_and_auth_store(&config, db_options.clone())?;
1952 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1953 let signal_runtime = runtime.clone();
1954 tokio_runtime.block_on(async move {
1955 spawn_lifecycle_signal_handler(signal_runtime).await;
1956 spawn_pg_listener(&config, &runtime);
1957 let wire_rt = Arc::new(runtime);
1958 let listener = tokio::net::TcpListener::bind(&wire_addr)
1959 .await
1960 .map_err(|err| {
1961 let reason = format!("wire listener bind {wire_addr}: {err}");
1962 transport_readiness.failed(
1963 "wire",
1964 &wire_addr,
1965 config.wire_bind_explicit,
1966 reason.clone(),
1967 );
1968 if config.wire_bind_explicit {
1969 format!("explicit {reason}")
1970 } else {
1971 reason
1972 }
1973 })?;
1974 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1975 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1976 .await
1977 .map_err(|e| e.to_string())
1978 })
1979}
1980
1981#[inline(never)]
1982fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1983 let rt_config = detect_runtime_config();
1984 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1985 let db_options = config.to_db_options()?;
1986
1987 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1988 .enable_all()
1989 .worker_threads(workers)
1990 .thread_stack_size(rt_config.stack_size)
1991 .build()
1992 .map_err(|err| format!("tokio runtime: {err}"))?;
1993
1994 let (runtime, _auth_store, _telemetry_guard) =
1995 build_runtime_and_auth_store(&config, db_options.clone())?;
1996 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1997 let signal_runtime = runtime.clone();
1998 tokio_runtime.block_on(async move {
1999 spawn_lifecycle_signal_handler(signal_runtime).await;
2000 let cfg = crate::wire::PgWireConfig {
2001 bind_addr: pg_addr,
2002 ..Default::default()
2003 };
2004 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
2005 .await
2006 .map_err(|e| e.to_string())
2007 })
2008}
2009
2010#[inline(never)]
2011fn build_runtime_and_auth_store(
2012 config: &ServerCommandConfig,
2013 db_options: RedDBOptions,
2014) -> Result<
2015 (
2016 RedDBRuntime,
2017 Arc<AuthStore>,
2018 Option<crate::telemetry::TelemetryGuard>,
2019 ),
2020 String,
2021> {
2022 build_runtime_with_bootstrap(db_options, config.telemetry.clone(), &config.bootstrap)
2029}
2030
2031pub(crate) fn build_runtime_with_telemetry(
2041 db_options: RedDBOptions,
2042 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
2043) -> Result<
2044 (
2045 RedDBRuntime,
2046 Arc<AuthStore>,
2047 Option<crate::telemetry::TelemetryGuard>,
2048 ),
2049 String,
2050> {
2051 let bootstrap = BootstrapConfig::default();
2052 build_runtime_with_bootstrap(db_options, cli_telemetry, &bootstrap)
2053}
2054
2055fn build_runtime_with_bootstrap(
2056 db_options: RedDBOptions,
2057 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
2058 bootstrap: &BootstrapConfig,
2059) -> Result<
2060 (
2061 RedDBRuntime,
2062 Arc<AuthStore>,
2063 Option<crate::telemetry::TelemetryGuard>,
2064 ),
2065 String,
2066> {
2067 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
2068 let msg = err.to_string();
2074 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
2075 phase: "runtime_construction".to_string(),
2076 error: msg.clone(),
2077 }
2078 .emit_global();
2079 msg
2080 })?;
2081
2082 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
2087 let msg = err.to_string();
2088 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
2089 phase: "lease_loop".to_string(),
2090 error: msg.clone(),
2091 }
2092 .emit_global();
2093 msg
2094 })?;
2095
2096 if let Some(data_path) = db_options.data_path.as_deref() {
2100 let watch_dir = data_path.parent().unwrap_or(data_path);
2101 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
2102 }
2103
2104 {
2108 let config_path = crate::runtime::config_overlay::config_file_path();
2109 let store = runtime.db().store();
2110 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
2111 }
2112
2113 let merged = merge_telemetry_with_config(
2116 cli_telemetry
2117 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
2118 &runtime,
2119 );
2120 let telemetry_guard = crate::telemetry::init(merged);
2121
2122 let no_auth = no_auth_active(&db_options);
2123 let auth_store =
2124 if db_options.auth.vault_enabled {
2125 let pager =
2126 runtime.db().store().pager().cloned().ok_or_else(|| {
2127 "vault requires a paged database (persistent mode)".to_string()
2128 })?;
2129 let store = AuthStore::with_vault(db_options.auth.clone(), pager)
2130 .map_err(|err| err.to_string())?;
2131 Arc::new(store)
2132 } else {
2133 Arc::new(AuthStore::new(db_options.auth.clone()))
2134 };
2135 auth_store.configure_control_events(
2136 runtime.control_event_ledger(),
2137 runtime.control_event_config(),
2138 );
2139 let bootstrap_already_completed = runtime
2156 .db()
2157 .store()
2158 .get_config(BOOTSTRAP_COMPLETED_KEY)
2159 .is_some();
2160 let disposition = crate::cluster::authorize_cluster_bootstrap(
2161 db_options.storage_profile.deploy_profile,
2162 no_auth,
2163 bootstrap.auth_bootstrap_input(),
2164 bootstrap_already_completed,
2165 )?;
2166 let vault_plan = crate::cluster::plan_vault_bootstrap(disposition);
2175 match disposition {
2176 crate::cluster::BootstrapDisposition::SkipDevBypass => {
2177 debug_assert_eq!(vault_plan, crate::cluster::VaultBootstrapPlan::SkipNoVault);
2178 eprintln!("{NO_AUTH_WARNING}");
2179 tracing::warn!("{NO_AUTH_WARNING}");
2180 }
2181 crate::cluster::BootstrapDisposition::AlreadyComplete => {
2182 debug_assert_eq!(
2189 vault_plan,
2190 crate::cluster::VaultBootstrapPlan::OpenClusterGlobalStore {
2191 consume_secret_inputs: false,
2192 }
2193 );
2194 apply_preset_from_config(&runtime, &auth_store, bootstrap)?;
2195 tracing::info!("bootstrap already completed, unsealing existing cluster auth store");
2196 }
2197 crate::cluster::BootstrapDisposition::ProceedLocal => {
2198 debug_assert_eq!(
2201 vault_plan,
2202 crate::cluster::VaultBootstrapPlan::OpenClusterGlobalStore {
2203 consume_secret_inputs: true,
2204 }
2205 );
2206 apply_preset_from_config(&runtime, &auth_store, bootstrap)?;
2207 maybe_apply_policy_break_glass(&auth_store);
2208 }
2209 }
2210
2211 {
2213 let store = Arc::clone(&auth_store);
2214 std::thread::Builder::new()
2215 .name("reddb-session-purge".into())
2216 .spawn(move || loop {
2217 std::thread::sleep(std::time::Duration::from_secs(300));
2218 store.purge_expired_sessions();
2219 })
2220 .ok();
2221 }
2222
2223 Ok((runtime, auth_store, telemetry_guard))
2224}
2225
2226fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
2230 use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
2231
2232 let enabled = std::env::var(BREAK_GLASS_ENV)
2233 .ok()
2234 .map(|v| {
2235 let trimmed = v.trim().to_ascii_lowercase();
2236 matches!(trimmed.as_str(), "1" | "true" | "yes")
2237 })
2238 .unwrap_or(false);
2239 if !enabled {
2240 return;
2241 }
2242 let now = crate::utils::now_unix_millis() as u128;
2243 match auth_store.apply_policy_break_glass(now) {
2244 Ok(()) => {
2245 tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2246 }
2247 Err(err) => {
2248 tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2249 }
2250 }
2251}
2252
2253pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2258pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2259pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2260
2261pub(crate) const BOOTSTRAP_PRESET_ENV: &str = "REDDB_BOOTSTRAP_PRESET";
2264pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2265pub(crate) const PRESET_SIMPLE: &str = "simple";
2266pub(crate) const PRESET_PRODUCTION: &str = "production";
2267pub(crate) const PRESET_REGULATED: &str = "regulated";
2268pub(crate) const PRESET_CLOUD: &str = "cloud";
2269
2270pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2274pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2275pub(crate) const CLOUD_PROTECT_MANAGED_POLICY: &str = "system.cloud.protect-managed";
2276pub(crate) const CLOUD_CONFIG_NAMESPACE: &str = "red.config.cloud";
2277pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2278pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2279pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2280
2281pub(crate) fn apply_preset(
2291 runtime: &RedDBRuntime,
2292 auth_store: &Arc<AuthStore>,
2293) -> Result<(), String> {
2294 let bootstrap = BootstrapConfig::default();
2295 apply_preset_from_config(runtime, auth_store, &bootstrap)
2296}
2297
2298fn apply_preset_from_config(
2299 runtime: &RedDBRuntime,
2300 auth_store: &Arc<AuthStore>,
2301 bootstrap: &BootstrapConfig,
2302) -> Result<(), String> {
2303 let store = runtime.db().store();
2304
2305 if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2306 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2307 runtime,
2308 &runtime.config_registry(),
2309 )?;
2310 tracing::info!("bootstrap state present, skipping preset application");
2311 return Ok(());
2312 }
2313
2314 let preset = bootstrap.resolved_preset();
2315
2316 for var in bootstrap.secret_env_vars_to_expand(&preset) {
2321 crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2322 }
2323
2324 if let Some(path) = bootstrap.selected_manifest() {
2325 let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2326 runtime,
2327 auth_store,
2328 &runtime.config_registry(),
2329 &path,
2330 )?;
2331 persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2332 tracing::info!("bootstrap manifest applied");
2333 return Ok(());
2334 }
2335
2336 let first_admin_id = match preset.as_str() {
2337 PRESET_SIMPLE => {
2338 None
2342 }
2343 PRESET_PRODUCTION => Some(apply_production_preset_from_config(auth_store, bootstrap)?),
2344 PRESET_CLOUD => Some(apply_cloud_preset(runtime, auth_store, bootstrap)?),
2345 PRESET_REGULATED => {
2346 apply_regulated_preset(runtime, auth_store)?;
2347 None
2348 }
2349 other => {
2350 return Err(format!(
2351 "bootstrap preset {other:?} is not recognised (expected `simple`, `production`, `regulated`, or `cloud`)"
2352 ));
2353 }
2354 };
2355
2356 persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2357 tracing::info!(preset = %preset, "bootstrap preset applied");
2358 Ok(())
2359}
2360
2361fn apply_production_preset_from_config(
2362 auth_store: &Arc<AuthStore>,
2363 bootstrap: &BootstrapConfig,
2364) -> Result<String, String> {
2365 use crate::auth::store::PrincipalRef;
2366 use crate::auth::UserId;
2367
2368 let username = bootstrap.production_username().ok_or_else(|| {
2369 "production preset requires --bootstrap-admin or REDDB_USERNAME (or REDDB_USERNAME_FILE)"
2370 .to_string()
2371 })?;
2372 let password = bootstrap.production_password().ok_or_else(|| {
2373 "production preset requires --bootstrap-admin-password or REDDB_PASSWORD (or REDDB_PASSWORD_FILE)"
2374 .to_string()
2375 })?;
2376
2377 let result = auth_store
2381 .bootstrap(&username, &password)
2382 .map_err(|err| format!("bootstrap first admin: {err}"))?;
2383 if let Some(cert) = result.certificate.as_deref() {
2384 eprintln!("[reddb] CERTIFICATE: {}", cert);
2385 tracing::warn!(
2386 "vault certificate issued by REDDB_PRESET=production -- save it and update the runtime secret before restart"
2387 );
2388 }
2389 let first_admin = UserId::platform(result.user.username.clone());
2390
2391 install_allow_all_policy(auth_store)?;
2393
2394 auth_store
2396 .attach_policy(
2397 PrincipalRef::User(first_admin.clone()),
2398 FIRST_ADMIN_ALLOW_ALL_POLICY,
2399 )
2400 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2401
2402 Ok(first_admin.to_string())
2403}
2404
2405fn apply_cloud_preset(
2406 runtime: &RedDBRuntime,
2407 auth_store: &Arc<AuthStore>,
2408 bootstrap: &BootstrapConfig,
2409) -> Result<String, String> {
2410 use crate::auth::store::PrincipalRef;
2411 use crate::auth::{Role, UserId};
2412
2413 let head_username = bootstrap.cloud_head_username().ok_or_else(|| {
2414 "cloud preset requires --cloud-head-admin or REDDB_CLOUD_HEAD_ADMIN (or REDDB_USERNAME)"
2415 .to_string()
2416 })?;
2417 let head_password = bootstrap.cloud_head_password().ok_or_else(|| {
2418 "cloud preset requires --cloud-head-admin-password or REDDB_CLOUD_HEAD_ADMIN_PASSWORD (or REDDB_PASSWORD)"
2419 .to_string()
2420 })?;
2421 let customer_username = bootstrap.customer_username().ok_or_else(|| {
2422 "cloud preset requires --customer-admin or REDDB_CUSTOMER_ADMIN".to_string()
2423 })?;
2424 let customer_password = bootstrap.customer_password().ok_or_else(|| {
2425 "cloud preset requires --customer-admin-password or REDDB_CUSTOMER_ADMIN_PASSWORD"
2426 .to_string()
2427 })?;
2428 if head_username == customer_username {
2429 return Err("cloud preset requires distinct head and customer admin usernames".to_string());
2430 }
2431
2432 let head = auth_store
2433 .bootstrap_system_admin(&head_username, &head_password)
2434 .map_err(|err| format!("bootstrap cloud head admin: {err}"))?;
2435 let head_id = UserId::platform(head.user.username.clone());
2436
2437 auth_store
2438 .create_user(&customer_username, &customer_password, Role::Admin)
2439 .map_err(|err| format!("create cloud customer admin: {err}"))?;
2440 let customer_id = UserId::platform(customer_username.clone());
2441
2442 install_allow_all_policy(auth_store)?;
2443 for user_id in [&head_id, &customer_id] {
2444 auth_store
2445 .attach_policy(
2446 PrincipalRef::User(user_id.clone()),
2447 FIRST_ADMIN_ALLOW_ALL_POLICY,
2448 )
2449 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2450 }
2451 install_cloud_guardrails(runtime, auth_store)?;
2452 auth_store
2453 .attach_policy(
2454 PrincipalRef::User(customer_id),
2455 CLOUD_PROTECT_MANAGED_POLICY,
2456 )
2457 .map_err(|err| format!("attach cloud guardrail policy: {err}"))?;
2458
2459 Ok(head_id.to_string())
2460}
2461
2462fn install_allow_all_policy(auth_store: &Arc<AuthStore>) -> Result<(), String> {
2463 use crate::auth::policies::Policy;
2464
2465 let policy = Policy::from_json_str(&format!(
2466 r#"{{
2467 "id": "{id}",
2468 "version": 1,
2469 "statements": [{{
2470 "effect": "allow",
2471 "actions": ["*"],
2472 "resources": ["*"]
2473 }}]
2474 }}"#,
2475 id = FIRST_ADMIN_ALLOW_ALL_POLICY
2476 ))
2477 .map_err(|err| format!("compile allow-all policy: {err}"))?;
2478 auth_store
2479 .put_policy(policy)
2480 .map_err(|err| format!("install allow-all policy: {err}"))
2481}
2482
2483fn install_cloud_guardrails(
2484 runtime: &RedDBRuntime,
2485 auth_store: &Arc<AuthStore>,
2486) -> Result<(), String> {
2487 use crate::auth::policies::Policy;
2488 use crate::auth::registry::EvidenceRequirement;
2489
2490 let policy = Policy::from_json_str(&format!(
2491 r#"{{
2492 "id": "{id}",
2493 "version": 1,
2494 "statements": [
2495 {{
2496 "effect": "deny",
2497 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2498 "resources": ["policy:{id}"]
2499 }},
2500 {{
2501 "effect": "deny",
2502 "actions": ["config:write"],
2503 "resources": ["config:{cloud}.*"]
2504 }}
2505 ]
2506 }}"#,
2507 id = CLOUD_PROTECT_MANAGED_POLICY,
2508 cloud = CLOUD_CONFIG_NAMESPACE,
2509 ))
2510 .map_err(|err| format!("compile cloud guardrail policy: {err}"))?;
2511 auth_store
2512 .put_policy(policy)
2513 .map_err(|err| format!("install cloud guardrail policy: {err}"))?;
2514
2515 let now_ms = crate::utils::now_unix_millis() as u128;
2516 let entries = vec![
2517 regulated_registry_entry(
2518 CLOUD_PROTECT_MANAGED_POLICY,
2519 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2520 "policy",
2521 "policy:*",
2522 &format!("policy:{CLOUD_PROTECT_MANAGED_POLICY}"),
2523 EvidenceRequirement::Metadata,
2524 now_ms,
2525 ),
2526 regulated_registry_entry(
2527 CLOUD_CONFIG_NAMESPACE,
2528 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2529 "config_namespace",
2530 "config:write",
2531 &format!("config:{CLOUD_CONFIG_NAMESPACE}.*"),
2532 EvidenceRequirement::Metadata,
2533 now_ms,
2534 ),
2535 ];
2536 for entry in entries.iter().cloned() {
2537 runtime
2538 .config_registry()
2539 .restore_bootstrap_entry(entry)
2540 .map_err(|err| format!("install cloud registry entry: {err}"))?;
2541 }
2542 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2543 Ok(())
2544}
2545
2546fn apply_regulated_preset(
2547 runtime: &RedDBRuntime,
2548 auth_store: &Arc<AuthStore>,
2549) -> Result<(), String> {
2550 use crate::auth::policies::Policy;
2551 use crate::auth::registry::EvidenceRequirement;
2552 use crate::auth::store::{PrincipalRef, PUBLIC_IAM_GROUP};
2553
2554 runtime.query_audit().enable_infrastructure();
2555
2556 let policy = Policy::from_json_str(&format!(
2557 r#"{{
2558 "id": "{id}",
2559 "version": 1,
2560 "statements": [
2561 {{
2562 "effect": "deny",
2563 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2564 "resources": ["policy:{id}"]
2565 }},
2566 {{
2567 "effect": "deny",
2568 "actions": ["config:write"],
2569 "resources": [
2570 "config:{audit}.*",
2571 "config:{evidence}.*",
2572 "config:{query_audit}.*"
2573 ]
2574 }}
2575 ]
2576 }}"#,
2577 id = REGULATED_PROTECT_MANAGED_POLICY,
2578 audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2579 evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2580 query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2581 ))
2582 .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2583 auth_store
2584 .put_policy(policy)
2585 .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2586 auth_store
2587 .attach_policy(
2588 PrincipalRef::Group(PUBLIC_IAM_GROUP.to_string()),
2589 REGULATED_PROTECT_MANAGED_POLICY,
2590 )
2591 .map_err(|err| format!("attach regulated guardrail policy: {err}"))?;
2592
2593 let now_ms = crate::utils::now_unix_millis() as u128;
2594 let entries = vec![
2595 regulated_registry_entry(
2596 REGULATED_PROTECT_MANAGED_POLICY,
2597 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2598 "iam_policy",
2599 "policy:*",
2600 &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2601 EvidenceRequirement::Metadata,
2602 now_ms,
2603 ),
2604 regulated_registry_entry(
2605 REGULATED_AUDIT_CONFIG_NAMESPACE,
2606 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2607 "config_namespace",
2608 "config:write",
2609 &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2610 EvidenceRequirement::Metadata,
2611 now_ms,
2612 ),
2613 regulated_registry_entry(
2614 REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2615 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2616 "config_namespace",
2617 "config:write",
2618 &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2619 EvidenceRequirement::Metadata,
2620 now_ms,
2621 ),
2622 regulated_registry_entry(
2623 REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2624 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2625 "config_namespace",
2626 "config:write",
2627 &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2628 EvidenceRequirement::Metadata,
2629 now_ms,
2630 ),
2631 ];
2632
2633 for entry in entries.iter().cloned() {
2634 runtime
2635 .config_registry()
2636 .restore_bootstrap_entry(entry)
2637 .map_err(|err| format!("install regulated registry entry: {err}"))?;
2638 }
2639 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2640 Ok(())
2641}
2642
2643fn regulated_registry_entry(
2644 id: &str,
2645 resource_type: &str,
2646 schema: &str,
2647 required_action: &str,
2648 required_resource: &str,
2649 evidence_requirement: crate::auth::registry::EvidenceRequirement,
2650 updated_at_ms: u128,
2651) -> crate::auth::registry::ConfigRegistryEntry {
2652 crate::auth::registry::ConfigRegistryEntry {
2653 id: id.to_string(),
2654 version: 1,
2655 resource_type: resource_type.to_string(),
2656 schema: schema.to_string(),
2657 mutability: crate::auth::registry::Mutability::Immutable,
2658 sensitivity: crate::auth::registry::Sensitivity::Internal,
2659 managed: true,
2660 required_action: required_action.to_string(),
2661 required_resource: required_resource.to_string(),
2662 evidence_requirement,
2663 updated_by: "system:regulated-preset".to_string(),
2664 updated_at_ms,
2665 }
2666}
2667
2668fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2669 let store = runtime.db().store();
2670 let mut tree = crate::serde_json::Map::new();
2671 tree.insert(
2672 BOOTSTRAP_COMPLETED_KEY.to_string(),
2673 crate::serde_json::Value::Bool(true),
2674 );
2675 tree.insert(
2676 BOOTSTRAP_PRESET_KEY.to_string(),
2677 crate::serde_json::Value::String(preset.to_string()),
2678 );
2679 if let Some(id) = first_admin_id {
2680 tree.insert(
2681 BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2682 crate::serde_json::Value::String(id.to_string()),
2683 );
2684 }
2685 let json = crate::serde_json::Value::Object(tree);
2686 store.set_config_tree("", &json);
2687}
2688
2689fn merge_telemetry_with_config(
2700 mut cli: crate::telemetry::TelemetryConfig,
2701 runtime: &RedDBRuntime,
2702) -> crate::telemetry::TelemetryConfig {
2703 use crate::storage::schema::Value;
2704
2705 let store = runtime.db().store();
2706
2707 if !cli.level_explicit {
2708 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2709 cli.level_filter = v.to_string();
2710 }
2711 }
2712 if !cli.format_explicit {
2713 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2714 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2715 cli.format = parsed;
2716 }
2717 }
2718 }
2719 if !cli.rotation_keep_days_explicit {
2720 match store.get_config("red.logging.keep_days") {
2721 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2722 cli.rotation_keep_days = n as u16
2723 }
2724 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2725 cli.rotation_keep_days = n as u16
2726 }
2727 Some(Value::Text(v)) => {
2728 if let Ok(n) = v.parse::<u16>() {
2729 cli.rotation_keep_days = n;
2730 }
2731 }
2732 _ => {}
2733 }
2734 }
2735 if !cli.file_prefix_explicit {
2736 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2737 if !v.is_empty() {
2738 cli.file_prefix = v.to_string();
2739 }
2740 }
2741 }
2742 if !cli.log_dir_explicit && !cli.log_file_disabled {
2745 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2746 if !v.is_empty() {
2747 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2748 }
2749 }
2750 }
2751
2752 cli
2753}
2754
2755#[cfg(test)]
2756mod telemetry_merge_tests {
2757 use super::*;
2758 use crate::telemetry::{LogFormat, TelemetryConfig};
2759
2760 fn fresh_runtime() -> RedDBRuntime {
2761 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2762 }
2763
2764 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2765 runtime
2766 .db()
2767 .store()
2768 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2769 }
2770
2771 fn cli_base() -> TelemetryConfig {
2772 TelemetryConfig {
2775 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2776 format: LogFormat::Json,
2777 ..Default::default()
2778 }
2779 }
2780
2781 #[test]
2782 fn config_log_dir_promoted_when_flag_absent() {
2783 let runtime = fresh_runtime();
2784 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2785 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2786 assert_eq!(
2787 merged.log_dir.as_deref(),
2788 Some(std::path::Path::new("/var/log/reddb"))
2789 );
2790 }
2791
2792 #[test]
2793 fn explicit_log_dir_wins_over_config() {
2794 let runtime = fresh_runtime();
2795 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2796 let mut cli = cli_base();
2797 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2798 cli.log_dir_explicit = true;
2799 let merged = merge_telemetry_with_config(cli, &runtime);
2800 assert_eq!(
2801 merged.log_dir.as_deref(),
2802 Some(std::path::Path::new("/custom/dir"))
2803 );
2804 }
2805
2806 #[test]
2807 fn no_log_file_beats_config_log_dir() {
2808 let runtime = fresh_runtime();
2809 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2810 let mut cli = cli_base();
2811 cli.log_dir = None;
2812 cli.log_file_disabled = true;
2813 let merged = merge_telemetry_with_config(cli, &runtime);
2814 assert!(
2815 merged.log_dir.is_none(),
2816 "--no-log-file must veto config dir"
2817 );
2818 }
2819
2820 #[test]
2821 fn config_format_promoted_on_non_tty_default() {
2822 let runtime = fresh_runtime();
2826 set_str(&runtime, "red.logging.format", "pretty");
2827 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2828 assert_eq!(merged.format, LogFormat::Pretty);
2829 }
2830
2831 #[test]
2832 fn explicit_format_wins_over_config() {
2833 let runtime = fresh_runtime();
2834 set_str(&runtime, "red.logging.format", "pretty");
2835 let mut cli = cli_base();
2836 cli.format = LogFormat::Json;
2837 cli.format_explicit = true;
2838 let merged = merge_telemetry_with_config(cli, &runtime);
2839 assert_eq!(merged.format, LogFormat::Json);
2840 }
2841}
2842
2843#[inline(never)]
2844fn build_http_server(
2845 runtime: RedDBRuntime,
2846 auth_store: Arc<AuthStore>,
2847 bind_addr: String,
2848) -> RedDBServer {
2849 build_http_server_with_transport_readiness(
2850 runtime,
2851 auth_store,
2852 bind_addr,
2853 TransportReadiness::default(),
2854 )
2855}
2856
2857fn apply_http_limits(
2863 server: RedDBServer,
2864 config: &ServerCommandConfig,
2865 runtime: &RedDBRuntime,
2866) -> RedDBServer {
2867 let store = runtime.db().store();
2868 let resolved =
2869 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2870 .get_config(key)
2871 {
2872 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2873 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2874 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2875 _ => None,
2876 });
2877 tracing::info!(
2878 target: "reddb::http_limits",
2879 max_handlers = resolved.max_handlers,
2880 handler_timeout_ms = resolved.handler_timeout_ms,
2881 retry_after_secs = resolved.retry_after_secs,
2882 max_inflight_per_principal = resolved.max_inflight_per_principal,
2883 "http_limits resolved"
2884 );
2885 server.with_http_limits(resolved)
2886}
2887
2888fn apply_ui_bundle(
2898 server: RedDBServer,
2899 config: &ServerCommandConfig,
2900) -> Result<RedDBServer, String> {
2901 if !config.ui {
2902 return Ok(server);
2903 }
2904 let ui_dir = match &config.ui_dir {
2905 Some(dir) => dir.clone(),
2906 None => {
2907 let cache_root = crate::server::ui_bundle_resolver::reddb_user_cache_root()
2908 .unwrap_or_else(|_| std::env::temp_dir().join("reddb"));
2909 crate::server::ui_bundle_resolver::resolve_ui_bundle(
2910 &cache_root,
2911 &crate::server::ui_bundle_resolver::HttpFetcher,
2912 )
2913 .map_err(|err| format!("resolve red-ui bundle for --ui: {err}"))?
2914 }
2915 };
2916 tracing::info!(target: "reddb::ui", dir = %ui_dir.display(), "serving red-ui bundle on HTTP surface");
2917 Ok(server.with_ui_dir(ui_dir))
2918}
2919
2920#[inline(never)]
2921fn build_http_server_with_transport_readiness(
2922 runtime: RedDBRuntime,
2923 auth_store: Arc<AuthStore>,
2924 bind_addr: String,
2925 transport_readiness: TransportReadiness,
2926) -> RedDBServer {
2927 RedDBServer::with_options(
2928 runtime,
2929 ServerOptions {
2930 bind_addr,
2931 transport_readiness,
2932 ..ServerOptions::default()
2933 },
2934 )
2935 .with_auth(auth_store)
2936}
2937
2938#[inline(never)]
2942fn build_admin_only_server(
2943 runtime: RedDBRuntime,
2944 auth_store: Arc<AuthStore>,
2945 bind_addr: String,
2946) -> RedDBServer {
2947 RedDBServer::with_options(
2948 runtime,
2949 ServerOptions {
2950 bind_addr,
2951 surface: crate::server::ServerSurface::AdminOnly,
2952 ..ServerOptions::default()
2953 },
2954 )
2955 .with_auth(auth_store)
2956}
2957
2958#[inline(never)]
2962fn build_metrics_only_server(
2963 runtime: RedDBRuntime,
2964 auth_store: Arc<AuthStore>,
2965 bind_addr: String,
2966) -> RedDBServer {
2967 RedDBServer::with_options(
2968 runtime,
2969 ServerOptions {
2970 bind_addr,
2971 surface: crate::server::ServerSurface::MetricsOnly,
2972 ..ServerOptions::default()
2973 },
2974 )
2975 .with_auth(auth_store)
2976}
2977
2978fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2982 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2983 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2984 let _ = server.serve_in_background();
2985 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2986 }
2987 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2988 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2989 let _ = server.serve_in_background();
2990 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2991 }
2992}
2993
2994#[inline(never)]
2995fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2996 let mut transport_readiness = TransportReadiness::default();
2997 let Some(listener) = bind_listener_for_startup(
2998 &mut transport_readiness,
2999 "http",
3000 &bind_addr,
3001 config.http_bind_explicit,
3002 )?
3003 else {
3004 return Err(format!(
3005 "no HTTP listener started; implicit bind {} failed",
3006 bind_addr
3007 ));
3008 };
3009 let db_options = config.to_db_options()?;
3010 let (runtime, auth_store, _telemetry_guard) =
3011 build_runtime_and_auth_store(&config, db_options.clone())?;
3012 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3013 spawn_admin_metrics_listeners(&runtime, &auth_store);
3014 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
3015 let server = build_http_server_with_transport_readiness(
3016 runtime.clone(),
3017 auth_store,
3018 bind_addr.clone(),
3019 transport_readiness,
3020 );
3021 let server = apply_http_limits(server, &config, &runtime);
3022 let server = apply_ui_bundle(server, &config)?;
3023 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
3024 server.serve_on(listener).map_err(|err| err.to_string())
3025}
3026
3027fn spawn_http_tls_listener(
3033 config: &ServerCommandConfig,
3034 runtime: &RedDBRuntime,
3035 auth_store: &Arc<AuthStore>,
3036) -> Result<(), String> {
3037 let Some(addr) = config.http_tls_bind_addr.clone() else {
3038 return Ok(());
3039 };
3040
3041 let tls_config = resolve_http_tls_config(config)?;
3042 let server_config = crate::server::tls::build_server_config(&tls_config)
3043 .map_err(|err| format!("HTTP TLS: {err}"))?;
3044
3045 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
3046 let server = apply_http_limits(server, config, runtime);
3047 let _handle = server.serve_tls_in_background(server_config);
3048 tracing::info!(
3049 transport = "https",
3050 bind = %addr,
3051 mtls = %tls_config.client_ca_path.is_some(),
3052 "TLS listener online"
3053 );
3054 Ok(())
3055}
3056
3057fn resolve_http_tls_config(
3059 config: &ServerCommandConfig,
3060) -> Result<crate::server::tls::HttpTlsConfig, String> {
3061 match (&config.http_tls_cert, &config.http_tls_key) {
3062 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
3063 cert_path: cert.clone(),
3064 key_path: key.clone(),
3065 client_ca_path: config.http_tls_client_ca.clone(),
3066 }),
3067 (None, None) => {
3068 let dir = config
3070 .path
3071 .as_ref()
3072 .and_then(|p| p.parent().map(std::path::PathBuf::from))
3073 .unwrap_or_else(|| std::path::PathBuf::from("."));
3074 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
3075 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
3076 Ok(crate::server::tls::HttpTlsConfig {
3077 cert_path: auto.cert_path,
3078 key_path: auto.key_path,
3079 client_ca_path: config.http_tls_client_ca.clone(),
3080 })
3081 }
3082 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
3083 }
3084}
3085
3086#[inline(never)]
3087fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
3088 let workers = config.workers;
3089 let db_options = config.to_db_options()?;
3090 let rt_config = detect_runtime_config();
3091 let mut transport_readiness = TransportReadiness::default();
3092 let Some(grpc_listener) = bind_listener_for_startup(
3093 &mut transport_readiness,
3094 "grpc",
3095 &bind_addr,
3096 config.grpc_bind_explicit,
3097 )?
3098 else {
3099 return Err(format!(
3100 "no gRPC listener started; implicit bind {} failed",
3101 bind_addr
3102 ));
3103 };
3104
3105 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
3106
3107 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
3108 .enable_all()
3109 .worker_threads(worker_threads)
3110 .thread_stack_size(rt_config.stack_size)
3111 .build()
3112 .map_err(|err| format!("tokio runtime: {err}"))?;
3113
3114 let (runtime, auth_store, _telemetry_guard) =
3116 build_runtime_and_auth_store(&config, db_options.clone())?;
3117 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3118 let signal_runtime = runtime.clone();
3119 tokio_runtime.block_on(async move {
3120 spawn_lifecycle_signal_handler(signal_runtime).await;
3121 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
3123
3124 spawn_pg_listener(&config, &runtime);
3126
3127 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
3131
3132 let server = RedDBGrpcServer::with_options(
3133 runtime,
3134 GrpcServerOptions {
3135 bind_addr: bind_addr.clone(),
3136 tls: None,
3137 },
3138 auth_store,
3139 );
3140
3141 tracing::info!(
3142 transport = "grpc",
3143 bind = %bind_addr,
3144 cpus = rt_config.available_cpus,
3145 workers = worker_threads,
3146 "listener online"
3147 );
3148 server
3149 .serve_on(grpc_listener)
3150 .await
3151 .map_err(|err| err.to_string())
3152 })
3153}
3154
3155#[inline(never)]
3156fn run_dual_server(
3157 config: ServerCommandConfig,
3158 grpc_bind_addr: String,
3159 http_bind_addr: String,
3160) -> Result<(), String> {
3161 let workers = config.workers;
3162 let db_options = config.to_db_options()?;
3163 let rt_config = detect_runtime_config();
3164 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
3165 let mut transport_readiness = TransportReadiness::default();
3166 let http_listener = bind_listener_for_startup(
3167 &mut transport_readiness,
3168 "http",
3169 &http_bind_addr,
3170 config.http_bind_explicit,
3171 )?;
3172 let grpc_listener = bind_listener_for_startup(
3173 &mut transport_readiness,
3174 "grpc",
3175 &grpc_bind_addr,
3176 config.grpc_bind_explicit,
3177 )?;
3178 if http_listener.is_none() && grpc_listener.is_none() {
3179 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
3180 }
3181 let (runtime, auth_store, _telemetry_guard) =
3182 build_runtime_and_auth_store(&config, db_options.clone())?;
3183 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3184
3185 spawn_admin_metrics_listeners(&runtime, &auth_store);
3186 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
3187
3188 let http_handle = if let Some(listener) = http_listener {
3189 let http_server = build_http_server_with_transport_readiness(
3190 runtime.clone(),
3191 auth_store.clone(),
3192 http_bind_addr.clone(),
3193 transport_readiness.clone(),
3194 );
3195 let http_server = apply_http_limits(http_server, &config, &runtime);
3196 let http_server = apply_ui_bundle(http_server, &config)?;
3197 Some(http_server.serve_in_background_on(listener))
3198 } else {
3199 None
3200 };
3201
3202 thread::sleep(Duration::from_millis(150));
3203 if let Some(handle) = http_handle.as_ref() {
3204 if handle.is_finished() {
3205 let handle = http_handle.unwrap();
3206 return match handle.join() {
3207 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
3208 Ok(Err(err)) => Err(err.to_string()),
3209 Err(_) => Err("HTTP server thread panicked".to_string()),
3210 };
3211 }
3212 }
3213 if grpc_listener.is_none() {
3214 let Some(handle) = http_handle else {
3215 return Err("no listener started".to_string());
3216 };
3217 return match handle.join() {
3218 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
3219 Ok(Err(err)) => Err(err.to_string()),
3220 Err(_) => Err("HTTP server thread panicked".to_string()),
3221 };
3222 }
3223 let grpc_listener = grpc_listener.expect("checked above");
3224
3225 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
3226 .enable_all()
3227 .worker_threads(worker_threads)
3228 .thread_stack_size(rt_config.stack_size)
3229 .build()
3230 .map_err(|err| format!("tokio runtime: {err}"))?;
3231
3232 let signal_runtime = runtime.clone();
3233 tokio_runtime.block_on(async move {
3234 spawn_lifecycle_signal_handler(signal_runtime).await;
3235 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
3237
3238 spawn_pg_listener(&config, &runtime);
3240
3241 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
3243
3244 let server = RedDBGrpcServer::with_options(
3245 runtime,
3246 GrpcServerOptions {
3247 bind_addr: grpc_bind_addr.clone(),
3248 tls: None,
3249 },
3250 auth_store,
3251 );
3252
3253 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
3254 tracing::info!(
3255 transport = "grpc",
3256 bind = %grpc_bind_addr,
3257 cpus = rt_config.available_cpus,
3258 workers = worker_threads,
3259 "listener online"
3260 );
3261 server
3262 .serve_on(grpc_listener)
3263 .await
3264 .map_err(|err| err.to_string())
3265 })
3266}
3267
3268#[cfg(test)]
3269mod tests {
3270 use super::*;
3271
3272 #[test]
3273 fn render_systemd_unit_contains_expected_execstart() {
3274 let config = SystemdServiceConfig {
3275 service_name: "reddb".to_string(),
3276 binary_path: PathBuf::from("/usr/local/bin/red"),
3277 run_user: "reddb".to_string(),
3278 run_group: "reddb".to_string(),
3279 data_path: reddb_file::default_service_database_path(),
3280 router_bind_addr: None,
3281 grpc_bind_addr: Some("0.0.0.0:55055".to_string()),
3282 http_bind_addr: None,
3283 };
3284
3285 let unit = render_systemd_unit(&config);
3286 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:55055"));
3287 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
3288 }
3289
3290 #[test]
3291 fn systemd_service_config_derives_paths() {
3292 let config = SystemdServiceConfig {
3293 service_name: "reddb-api".to_string(),
3294 binary_path: PathBuf::from("/usr/local/bin/red"),
3295 run_user: "reddb".to_string(),
3296 run_group: "reddb".to_string(),
3297 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
3298 router_bind_addr: None,
3299 grpc_bind_addr: None,
3300 http_bind_addr: Some("127.0.0.1:5000".to_string()),
3301 };
3302
3303 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
3304 assert_eq!(
3305 config.unit_path(),
3306 PathBuf::from("/etc/systemd/system/reddb-api.service")
3307 );
3308 }
3309
3310 #[test]
3311 fn render_systemd_unit_supports_dual_transport() {
3312 let config = SystemdServiceConfig {
3313 service_name: "reddb".to_string(),
3314 binary_path: PathBuf::from("/usr/local/bin/red"),
3315 run_user: "reddb".to_string(),
3316 run_group: "reddb".to_string(),
3317 data_path: reddb_file::default_service_database_path(),
3318 router_bind_addr: None,
3319 grpc_bind_addr: Some("0.0.0.0:55055".to_string()),
3320 http_bind_addr: Some("0.0.0.0:5000".to_string()),
3321 };
3322
3323 let unit = render_systemd_unit(&config);
3324 assert!(unit.contains("--grpc-bind 0.0.0.0:55055"));
3325 assert!(unit.contains("--http-bind 0.0.0.0:5000"));
3326 }
3327
3328 #[test]
3329 fn render_systemd_unit_supports_router_mode() {
3330 let config = SystemdServiceConfig {
3331 service_name: "reddb".to_string(),
3332 binary_path: PathBuf::from("/usr/local/bin/red"),
3333 run_user: "reddb".to_string(),
3334 run_group: "reddb".to_string(),
3335 data_path: reddb_file::default_service_database_path(),
3336 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3337 grpc_bind_addr: None,
3338 http_bind_addr: None,
3339 };
3340
3341 let unit = render_systemd_unit(&config);
3342 assert!(unit.contains("--bind 127.0.0.1:5050"));
3343 assert!(!unit.contains("--grpc-bind"));
3344 assert!(!unit.contains("--http-bind"));
3345 }
3346
3347 #[test]
3348 fn explicit_bind_collision_is_fatal() {
3349 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3350 let addr = held.local_addr().expect("held addr").to_string();
3351 let mut readiness = TransportReadiness::default();
3352
3353 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
3354
3355 assert!(error.contains("explicit http listener bind"));
3356 assert_eq!(readiness.active.len(), 0);
3357 assert_eq!(readiness.failed.len(), 1);
3358 assert!(readiness.failed[0].explicit);
3359 assert_eq!(readiness.failed[0].bind_addr, addr);
3360 }
3361
3362 fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
3369 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
3370 LOCK.get_or_init(|| std::sync::Mutex::new(()))
3371 }
3372
3373 fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
3374 ServerCommandConfig {
3375 path: None,
3376 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3377 router_bind_explicit: false,
3378 grpc_bind_addr: None,
3379 grpc_bind_explicit: false,
3380 grpc_tls_bind_addr: None,
3381 grpc_tls_cert: None,
3382 grpc_tls_key: None,
3383 grpc_tls_client_ca: None,
3384 http_bind_addr: None,
3385 http_bind_explicit: false,
3386 http_tls_bind_addr: None,
3387 http_tls_cert: None,
3388 http_tls_key: None,
3389 http_tls_client_ca: None,
3390 wire_bind_addr: None,
3391 wire_bind_explicit: false,
3392 wire_tls_bind_addr: None,
3393 wire_tls_cert: None,
3394 wire_tls_key: None,
3395 pg_bind_addr: None,
3396 create_if_missing: true,
3397 read_only: false,
3398 role: "standalone".to_string(),
3399 primary_addr: None,
3400 storage_profile: StorageProfileSelection::embedded_single_file(),
3401 auth: false,
3402 require_auth: false,
3403 vault: true,
3406 no_auth,
3407 workers: None,
3408 telemetry: None,
3409 http_limits_cli: crate::server::HttpLimitsCliInput::default(),
3410 ui: false,
3411 ui_dir: None,
3412 bootstrap: BootstrapConfig::default(),
3413 }
3414 }
3415
3416 #[test]
3417 fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
3418 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3419 unsafe {
3424 std::env::set_var("REDDB_USERNAME", "admin");
3425 std::env::set_var("REDDB_PASSWORD", "hunter2");
3426 }
3427 let config = no_auth_test_config(true);
3428 let options = config.to_db_options().expect("to_db_options");
3429
3430 assert!(no_auth_active(&options), "metadata should be stamped");
3431 assert!(!options.auth.enabled, "auth.enabled must be forced off");
3432 assert!(
3433 !options.auth.require_auth,
3434 "require_auth must be forced off"
3435 );
3436 assert!(
3437 !options.auth.vault_enabled,
3438 "vault_enabled must be forced off (overrides --vault)"
3439 );
3440 assert_eq!(
3441 options.metadata.get(NO_AUTH_META).map(String::as_str),
3442 Some("true"),
3443 );
3444
3445 unsafe {
3447 std::env::remove_var("REDDB_USERNAME");
3448 std::env::remove_var("REDDB_PASSWORD");
3449 }
3450 }
3451
3452 #[test]
3453 fn default_behaviour_without_no_auth_flag_is_unchanged() {
3454 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3455 let config = no_auth_test_config(false);
3456 let options = config.to_db_options().expect("to_db_options");
3457
3458 assert!(
3459 !no_auth_active(&options),
3460 "default boot must not be marked no-auth"
3461 );
3462 assert!(
3463 options.metadata.get(NO_AUTH_META).is_none(),
3464 "metadata key must be absent when flag is off"
3465 );
3466 assert!(options.auth.vault_enabled);
3468 }
3469
3470 #[test]
3471 fn no_auth_active_blocks_bootstrap_from_env() {
3472 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3473 unsafe {
3478 std::env::set_var("REDDB_USERNAME", "admin");
3479 std::env::set_var("REDDB_PASSWORD", "hunter2");
3480 }
3481
3482 let options = no_auth_test_config(true)
3483 .to_db_options()
3484 .expect("to_db_options");
3485
3486 let auth_store = AuthStore::new(options.auth.clone());
3490 if !no_auth_active(&options) {
3491 auth_store.bootstrap_from_env();
3492 }
3493
3494 assert!(
3495 auth_store.needs_bootstrap(),
3496 "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3497 );
3498
3499 unsafe {
3501 std::env::remove_var("REDDB_USERNAME");
3502 std::env::remove_var("REDDB_PASSWORD");
3503 }
3504 }
3505
3506 fn clear_preset_env() {
3513 unsafe {
3515 std::env::remove_var(BOOTSTRAP_PRESET_ENV);
3516 std::env::remove_var(PRESET_ENV);
3517 std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3518 std::env::remove_var("REDDB_AUTH");
3519 std::env::remove_var("REDDB_REQUIRE_AUTH");
3520 std::env::remove_var("REDDB_NO_AUTH");
3521 std::env::remove_var("REDDB_DEV");
3522 std::env::remove_var("REDDB_VAULT");
3523 std::env::remove_var("REDDB_USERNAME");
3524 std::env::remove_var("REDDB_PASSWORD");
3525 std::env::remove_var("REDDB_USERNAME_FILE");
3526 std::env::remove_var("REDDB_PASSWORD_FILE");
3527 std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN");
3528 std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD");
3529 std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD_FILE");
3530 std::env::remove_var("REDDB_CUSTOMER_ADMIN");
3531 std::env::remove_var("REDDB_CUSTOMER_ADMIN_PASSWORD");
3532 std::env::remove_var("REDDB_CUSTOMER_ADMIN_PASSWORD_FILE");
3533 }
3534 }
3535
3536 fn clear_backup_env() {
3537 unsafe {
3539 std::env::remove_var("REDDB_BACKUP_S3_ENDPOINT");
3540 std::env::remove_var("REDDB_BACKUP_S3_BUCKET");
3541 std::env::remove_var("REDDB_BACKUP_S3_PREFIX");
3542 std::env::remove_var("REDDB_BACKUP_S3_ACCESS_KEY_ID");
3543 std::env::remove_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY");
3544 std::env::remove_var("REDDB_BACKUP_S3_REGION");
3545 std::env::remove_var("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS");
3546 std::env::remove_var("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS");
3547 std::env::remove_var("REDDB_BACKUP_PAUSE_ON_LAG_SECS");
3548 }
3549 }
3550
3551 fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3552 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3553 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3554 (runtime, auth_store)
3555 }
3556
3557 #[test]
3558 fn auth_env_knobs_enable_auth_require_auth_and_vault() {
3559 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3560 clear_preset_env();
3561 unsafe {
3563 std::env::set_var("REDDB_AUTH", "true");
3564 std::env::set_var("REDDB_REQUIRE_AUTH", "true");
3565 std::env::set_var("REDDB_VAULT", "true");
3566 }
3567
3568 let mut config = no_auth_test_config(false);
3569 config.vault = false;
3570 let options = config.to_db_options().expect("to_db_options");
3571
3572 assert!(options.auth.enabled);
3573 assert!(options.auth.require_auth);
3574 assert!(options.auth.vault_enabled);
3575
3576 clear_preset_env();
3577 }
3578
3579 #[test]
3580 fn production_and_cloud_presets_force_auth_require_auth_and_vault() {
3581 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3582 clear_preset_env();
3583
3584 for preset in [PRESET_PRODUCTION, PRESET_CLOUD] {
3585 let mut config = no_auth_test_config(false);
3586 config.vault = false;
3587 config.bootstrap.preset = Some(preset.to_string());
3588
3589 let options = config.to_db_options().expect("to_db_options");
3590 assert!(options.auth.enabled, "{preset} should enable auth");
3591 assert!(
3592 options.auth.require_auth,
3593 "{preset} should require authenticated requests"
3594 );
3595 assert!(options.auth.vault_enabled, "{preset} should enable vault");
3596 assert!(!no_auth_active(&options));
3597 }
3598
3599 clear_preset_env();
3600 }
3601
3602 #[test]
3603 fn no_auth_env_overrides_preset_forced_auth() {
3604 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3605 clear_preset_env();
3606 unsafe {
3608 std::env::set_var("REDDB_NO_AUTH", "true");
3609 std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_CLOUD);
3610 }
3611
3612 let mut config = no_auth_test_config(false);
3613 config.auth = true;
3614 config.require_auth = true;
3615 let options = config.to_db_options().expect("to_db_options");
3616
3617 assert!(no_auth_active(&options));
3618 assert!(!options.auth.enabled);
3619 assert!(!options.auth.require_auth);
3620 assert!(!options.auth.vault_enabled);
3621
3622 clear_preset_env();
3623 }
3624
3625 #[test]
3626 fn simple_preset_is_default_and_persists_state() {
3627 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3628 clear_preset_env();
3629
3630 let (runtime, auth_store) = fresh_runtime_and_store();
3631 apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3632
3633 assert!(
3635 auth_store.needs_bootstrap(),
3636 "simple preset must not create an admin"
3637 );
3638
3639 let store = runtime.db().store();
3641 let completed = store
3642 .get_config(BOOTSTRAP_COMPLETED_KEY)
3643 .expect("completed key persisted");
3644 assert!(matches!(
3645 completed,
3646 crate::storage::schema::Value::Boolean(true)
3647 ));
3648 let preset = store
3649 .get_config(BOOTSTRAP_PRESET_KEY)
3650 .expect("preset key persisted");
3651 match preset {
3652 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3653 other => panic!("expected Text(simple), got {other:?}"),
3654 }
3655 assert!(
3656 store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3657 "simple preset must not record a first admin"
3658 );
3659
3660 clear_preset_env();
3661 }
3662
3663 #[test]
3664 fn production_preset_creates_first_admin_with_allow_all_policy() {
3665 use crate::auth::policies::{EvalContext, ResourceRef};
3666 use crate::auth::UserId;
3667
3668 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3669 clear_preset_env();
3670 unsafe {
3672 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3673 std::env::set_var("REDDB_USERNAME", "ops");
3674 std::env::set_var("REDDB_PASSWORD", "hunter2");
3675 }
3676
3677 let (runtime, auth_store) = fresh_runtime_and_store();
3678 apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3679
3680 assert!(
3682 !auth_store.needs_bootstrap(),
3683 "production preset must seal bootstrap"
3684 );
3685 let users = auth_store.list_users();
3686 assert_eq!(users.len(), 1);
3687 let admin = &users[0];
3688 assert_eq!(admin.username, "ops");
3689 assert!(
3690 admin.tenant_id.is_none(),
3691 "first admin must be platform-scoped (tenant=None)"
3692 );
3693
3694 let policy = auth_store
3696 .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3697 .expect("allow-all policy installed");
3698 assert!(!policy.statements.is_empty());
3699
3700 let actor = UserId::platform("ops");
3703 let ctx = EvalContext {
3704 principal_tenant: None,
3705 current_tenant: None,
3706 peer_ip: None,
3707 mfa_present: false,
3708 now_ms: 1_700_000_000_000,
3709 principal_is_admin_role: true,
3710 principal_is_platform_scoped: true,
3711 };
3712 let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3713 assert!(
3714 auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3715 "allow-all policy must grant arbitrary actions via the evaluator"
3716 );
3717
3718 let store = runtime.db().store();
3720 match store
3721 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3722 .expect("first_admin_id persisted")
3723 {
3724 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3725 other => panic!("expected Text(ops), got {other:?}"),
3726 }
3727 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3728 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3729 other => panic!("expected Text(production), got {other:?}"),
3730 }
3731
3732 clear_preset_env();
3733 }
3734
3735 #[test]
3736 fn bootstrap_preset_env_takes_precedence_over_legacy_preset_env() {
3737 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3738 clear_preset_env();
3739 unsafe {
3741 std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_REGULATED);
3742 std::env::set_var(PRESET_ENV, PRESET_SIMPLE);
3743 }
3744
3745 let options = no_auth_test_config(false)
3746 .to_db_options()
3747 .expect("regulated options");
3748 assert!(
3749 options.control_events.compliance_mode,
3750 "canonical REDDB_BOOTSTRAP_PRESET should win over REDDB_PRESET"
3751 );
3752 assert!(options.query_audit.enabled);
3753
3754 clear_preset_env();
3755 }
3756
3757 #[test]
3758 fn cloud_preset_creates_system_head_and_customer_admins() {
3759 use crate::auth::policies::{EvalContext, ResourceRef};
3760 use crate::auth::UserId;
3761
3762 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3763 clear_preset_env();
3764 unsafe {
3766 std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_CLOUD);
3767 std::env::set_var("REDDB_CLOUD_HEAD_ADMIN", "head");
3768 std::env::set_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD", "head-pass");
3769 std::env::set_var("REDDB_CUSTOMER_ADMIN", "customer");
3770 std::env::set_var("REDDB_CUSTOMER_ADMIN_PASSWORD", "customer-pass");
3771 }
3772
3773 let (runtime, auth_store) = fresh_runtime_and_store();
3774 apply_preset(&runtime, &auth_store).expect("cloud preset applies cleanly");
3775
3776 let head = auth_store
3777 .get_user(None, "head")
3778 .expect("head admin should exist");
3779 assert_eq!(head.tenant_id, None);
3780 let customer = auth_store
3781 .get_user(None, "customer")
3782 .expect("customer admin should exist");
3783 assert_eq!(customer.tenant_id, None);
3784
3785 let ctx = EvalContext {
3786 principal_tenant: None,
3787 current_tenant: None,
3788 peer_ip: None,
3789 mfa_present: false,
3790 now_ms: 1_700_000_000_000,
3791 principal_is_admin_role: true,
3792 principal_is_platform_scoped: true,
3793 };
3794 assert!(auth_store.check_policy_authz(
3795 &UserId::platform("customer"),
3796 "config:write",
3797 &ResourceRef::new("config", "red.config.customer.enabled"),
3798 &ctx,
3799 ));
3800 assert!(
3801 auth_store.check_policy_authz(
3802 &UserId::platform("head"),
3803 "config:write",
3804 &ResourceRef::new("config", "red.config.cloud.enabled"),
3805 &ctx,
3806 ),
3807 "cloud head keeps allow-all authority over managed cloud config"
3808 );
3809 assert!(
3810 !auth_store.check_policy_authz(
3811 &UserId::platform("customer"),
3812 "config:write",
3813 &ResourceRef::new("config", "red.config.cloud.enabled"),
3814 &ctx,
3815 ),
3816 "cloud customer must be denied managed cloud config writes"
3817 );
3818 assert!(
3819 !auth_store.check_policy_authz(
3820 &UserId::platform("customer"),
3821 "policy:drop",
3822 &ResourceRef::new("policy", CLOUD_PROTECT_MANAGED_POLICY),
3823 &ctx,
3824 ),
3825 "cloud customer must be denied mutations of the managed guardrail policy"
3826 );
3827
3828 assert!(auth_store.get_user(None, "head").is_some());
3829 assert!(auth_store
3830 .get_policy(CLOUD_PROTECT_MANAGED_POLICY)
3831 .is_some());
3832 assert!(runtime
3833 .config_registry()
3834 .get_active(CLOUD_CONFIG_NAMESPACE)
3835 .is_some());
3836
3837 let store = runtime.db().store();
3838 match store
3839 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3840 .expect("head admin id persisted")
3841 {
3842 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "head"),
3843 other => panic!("expected Text(head), got {other:?}"),
3844 }
3845 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3846 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_CLOUD),
3847 other => panic!("expected Text(cloud), got {other:?}"),
3848 }
3849
3850 clear_preset_env();
3851 }
3852
3853 #[test]
3854 fn cloud_preset_cli_admin_alias_wins_over_cloud_env_password() {
3855 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3856 clear_preset_env();
3857 unsafe {
3859 std::env::set_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD", "env-head-pass");
3860 }
3861
3862 let bootstrap = BootstrapConfig {
3863 preset: Some(PRESET_CLOUD.to_string()),
3864 admin_username: Some("head".to_string()),
3865 admin_password: Some("cli-head-pass".to_string()),
3866 customer_admin: Some("customer".to_string()),
3867 customer_admin_password: Some("customer-pass".to_string()),
3868 ..BootstrapConfig::default()
3869 };
3870 let (runtime, auth_store) = fresh_runtime_and_store();
3871 apply_preset_from_config(&runtime, &auth_store, &bootstrap)
3872 .expect("cloud preset applies cleanly");
3873
3874 auth_store
3875 .authenticate("head", "cli-head-pass")
3876 .expect("CLI alias password should win");
3877 assert!(
3878 auth_store.authenticate("head", "env-head-pass").is_err(),
3879 "cloud-specific env password must not beat CLI alias"
3880 );
3881
3882 clear_preset_env();
3883 }
3884
3885 #[test]
3886 fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3887 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3888 clear_preset_env();
3889 unsafe {
3891 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3892 }
3893
3894 let (runtime, auth_store) = fresh_runtime_and_store();
3895 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3896
3897 assert!(runtime.query_audit().is_enabled());
3898 assert!(runtime.query_audit().rules().is_empty());
3899 assert!(
3900 runtime
3901 .db()
3902 .store()
3903 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3904 .is_some(),
3905 "regulated preset should create the query-audit stream"
3906 );
3907
3908 runtime
3909 .execute_query("CREATE TABLE docs (id INT)")
3910 .expect("create table");
3911 runtime
3912 .execute_query("INSERT INTO docs (id) VALUES (1)")
3913 .expect("insert");
3914 runtime.execute_query("SELECT * FROM docs").expect("select");
3915 let rows = runtime
3916 .db()
3917 .store()
3918 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3919 .expect("query audit collection")
3920 .query_all(|_| true);
3921 assert!(
3922 rows.is_empty(),
3923 "regulated preset must not globally audit every query"
3924 );
3925
3926 clear_preset_env();
3927 }
3928
3929 #[test]
3930 fn managed_backup_env_rejects_primary_replica_single_file_storage() {
3931 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3932 clear_backup_env();
3933 unsafe {
3935 std::env::set_var("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.test");
3936 std::env::set_var("REDDB_BACKUP_S3_BUCKET", "reddb");
3937 std::env::set_var("REDDB_BACKUP_S3_PREFIX", "clusters/prod");
3938 std::env::set_var("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK");
3939 std::env::set_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK");
3940 }
3941
3942 let mut config = no_auth_test_config(false);
3943 config.role = "primary".to_string();
3944 config.storage_profile = crate::storage::StorageDeployPreset::PrimaryReplicaDev.selection();
3945
3946 let err = config.to_db_options().unwrap_err();
3947 assert!(err.contains("managed backup"), "got: {err}");
3948 assert!(err.contains("operational-directory"), "got: {err}");
3949
3950 clear_backup_env();
3951 }
3952
3953 #[test]
3954 fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3955 use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3956 use crate::auth::store::PrincipalRef;
3957 use crate::auth::{Role, UserId};
3958 use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3959 use crate::storage::schema::Value;
3960
3961 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3962 clear_preset_env();
3963 unsafe {
3965 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3966 }
3967
3968 let options = no_auth_test_config(false)
3969 .to_db_options()
3970 .expect("regulated options");
3971 assert!(
3972 options.control_events.compliance_mode,
3973 "regulated preset must enable fail-closed control evidence before runtime boot"
3974 );
3975 assert!(
3976 options.query_audit.enabled && options.query_audit.rules.is_empty(),
3977 "regulated preset must enable query-audit infrastructure without global rules"
3978 );
3979
3980 let runtime = RedDBRuntime::with_options(options).expect("runtime");
3981 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3982 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3983 runtime.set_auth_store(Arc::clone(&auth_store));
3984
3985 assert!(runtime.control_events_require_persistence());
3986 assert!(runtime.query_audit().is_enabled());
3987 assert!(runtime.query_audit().rules().is_empty());
3988 assert!(auth_store
3989 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3990 .is_some());
3991
3992 let managed_policy = runtime
3993 .config_registry()
3994 .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3995 .expect("regulated managed policy registry entry");
3996 assert!(managed_policy.managed);
3997 assert_eq!(managed_policy.resource_type, "policy");
3998 assert!(
3999 runtime
4000 .config_registry()
4001 .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
4002 .expect("regulated audit config namespace")
4003 .managed
4004 );
4005
4006 let registry_rows = runtime
4007 .execute_query(&format!(
4008 "SELECT id, managed FROM red.registry WHERE id = '{}'",
4009 REGULATED_PROTECT_MANAGED_POLICY
4010 ))
4011 .expect("red.registry query");
4012 assert_eq!(registry_rows.result.records.len(), 1);
4013 assert_eq!(
4014 registry_rows.result.records[0].get("managed"),
4015 Some(&Value::Boolean(true))
4016 );
4017
4018 let managed_policy_rows = runtime
4019 .execute_query(&format!(
4020 "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
4021 REGULATED_PROTECT_MANAGED_POLICY
4022 ))
4023 .expect("red.managed_policies query");
4024 assert_eq!(managed_policy_rows.result.records.len(), 1);
4025
4026 let capability_rows = runtime
4027 .execute_query(
4028 "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
4029 )
4030 .expect("red.control_capabilities query");
4031 assert_eq!(capability_rows.result.records.len(), 1);
4032
4033 auth_store
4034 .create_user("alice", "p", Role::Admin)
4035 .expect("create ordinary admin");
4036 let allow_all = Policy::from_json_str(
4037 r#"{
4038 "id": "alice-allow-all",
4039 "version": 1,
4040 "statements": [{
4041 "effect": "allow",
4042 "actions": ["*"],
4043 "resources": ["*"]
4044 }]
4045 }"#,
4046 )
4047 .expect("allow-all policy");
4048 auth_store.put_policy(allow_all).expect("install allow-all");
4049 auth_store
4050 .attach_policy(
4051 PrincipalRef::User(UserId::platform("alice")),
4052 "alice-allow-all",
4053 )
4054 .expect("attach allow-all");
4055 let ctx = EvalContext {
4056 principal_tenant: None,
4057 current_tenant: None,
4058 peer_ip: None,
4059 mfa_present: false,
4060 now_ms: 1_700_000_000_000,
4061 principal_is_admin_role: true,
4062 principal_is_platform_scoped: true,
4063 };
4064 assert!(
4065 !auth_store.check_policy_authz(
4066 &UserId::platform("alice"),
4067 "policy:drop",
4068 &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
4069 &ctx,
4070 ),
4071 "managed guardrail deny policy must be effective for ordinary admins"
4072 );
4073
4074 set_current_auth_identity("alice".to_string(), Role::Admin);
4075 let denied = runtime.execute_query(&format!(
4076 "DROP POLICY '{}'",
4077 REGULATED_PROTECT_MANAGED_POLICY
4078 ));
4079 clear_current_auth_identity();
4080 let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
4081 assert!(
4082 err.to_string().contains("managed policy"),
4083 "error should name the managed guardrail: {err}"
4084 );
4085 assert!(
4086 auth_store
4087 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
4088 .is_some(),
4089 "denied mutation must leave managed policy installed"
4090 );
4091
4092 let denied_events = runtime
4093 .execute_query(&format!(
4094 "SELECT action, resource, outcome FROM red.control_events \
4095 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
4096 REGULATED_PROTECT_MANAGED_POLICY
4097 ))
4098 .expect("red.control_events denied policy drop");
4099 assert_eq!(denied_events.result.records.len(), 1);
4100 assert_eq!(
4101 denied_events.result.records[0].get("outcome"),
4102 Some(&Value::text("denied"))
4103 );
4104
4105 set_current_auth_identity("alice".to_string(), Role::Admin);
4106 let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
4107 clear_current_auth_identity();
4108 let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
4109 assert!(
4110 err.to_string().contains("managed config"),
4111 "error should name the managed config guardrail: {err}"
4112 );
4113
4114 let denied_config_events = runtime
4115 .execute_query(
4116 "SELECT action, resource, outcome FROM red.control_events \
4117 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
4118 )
4119 .expect("red.control_events denied config write");
4120 assert_eq!(denied_config_events.result.records.len(), 1);
4121 assert_eq!(
4122 denied_config_events.result.records[0].get("outcome"),
4123 Some(&Value::text("denied"))
4124 );
4125
4126 runtime
4127 .execute_query("CREATE TABLE regulated_docs (id INT)")
4128 .expect("create user table");
4129 runtime
4130 .execute_query("SELECT * FROM regulated_docs")
4131 .expect("select user table");
4132 let audit_rows = runtime
4133 .db()
4134 .store()
4135 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
4136 .expect("query audit collection")
4137 .query_all(|_| true);
4138 assert!(
4139 audit_rows.is_empty(),
4140 "regulated preset must not globally audit data-plane queries"
4141 );
4142
4143 clear_preset_env();
4144 }
4145
4146 #[test]
4147 fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
4148 use crate::auth::policies::{EvalContext, ResourceRef};
4149 use crate::auth::UserId;
4150 use crate::storage::schema::Value;
4151
4152 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4153 clear_preset_env();
4154
4155 let manifest_dir = std::env::current_dir()
4156 .expect("current dir")
4157 .join(".red/tmp/bootstrap-manifest-tests");
4158 std::fs::create_dir_all(&manifest_dir).expect("create manifest test dir");
4159 let manifest_path = manifest_dir.join(format!(
4160 "reddb-bootstrap-manifest-{}-{}.json",
4161 std::process::id(),
4162 std::time::SystemTime::now()
4163 .duration_since(std::time::UNIX_EPOCH)
4164 .unwrap_or_default()
4165 .as_millis()
4166 ));
4167 std::fs::write(
4168 &manifest_path,
4169 r#"{
4170 "users": [
4171 {
4172 "username": "ops",
4173 "password": "hunter2",
4174 "role": "admin"
4175 }
4176 ],
4177 "policies": [
4178 {
4179 "id": "bootstrap-registry-admin",
4180 "version": 1,
4181 "statements": [
4182 {
4183 "effect": "allow",
4184 "actions": ["red.registry:*", "policy:*", "config:write", "select"],
4185 "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
4186 }
4187 ]
4188 }
4189 ],
4190 "managed_policies": [
4191 {
4192 "id": "managed-deny-drop",
4193 "version": 1,
4194 "statements": [
4195 {
4196 "effect": "deny",
4197 "actions": ["policy:drop"],
4198 "resources": ["policy:managed-deny-drop"]
4199 }
4200 ],
4201 "required_resource": "policy:managed-deny-drop",
4202 "evidence": "full"
4203 }
4204 ],
4205 "attachments": [
4206 {"user": "ops", "policy": "bootstrap-registry-admin"}
4207 ],
4208 "managed_config_namespaces": [
4209 {
4210 "id": "red.ai",
4211 "required_action": "config:write",
4212 "required_resource": "config:red.ai.*",
4213 "evidence": "metadata"
4214 }
4215 ],
4216 "config": [
4217 {"key": "red.ai.default.provider", "value": "openai"},
4218 {
4219 "key": "red.ai.openai.default.secret_ref",
4220 "secret_ref": {"collection": "red.vault", "key": "openai"}
4221 }
4222 ],
4223 "actor": "ops"
4224 }"#,
4225 )
4226 .expect("write manifest");
4227 unsafe {
4229 std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
4230 }
4231
4232 let (runtime, auth_store) = fresh_runtime_and_store();
4233 apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
4234
4235 let users = auth_store.list_users();
4236 assert_eq!(users.len(), 1);
4237 assert_eq!(users[0].username, "ops");
4238 assert!(users[0].tenant_id.is_none());
4239
4240 let actor = UserId::platform("ops");
4241 let ctx = EvalContext {
4242 principal_tenant: None,
4243 current_tenant: None,
4244 peer_ip: None,
4245 mfa_present: false,
4246 now_ms: 1_700_000_000_000,
4247 principal_is_admin_role: true,
4248 principal_is_platform_scoped: true,
4249 };
4250 assert!(auth_store.check_policy_authz(
4252 &actor,
4253 "select",
4254 &ResourceRef::new("collection", "docs"),
4255 &ctx
4256 ));
4257
4258 let managed_policy = runtime
4259 .config_registry()
4260 .get_active("managed-deny-drop")
4261 .expect("managed policy registry entry");
4262 assert!(managed_policy.managed);
4263 assert_eq!(managed_policy.resource_type, "policy");
4264 let managed_config = runtime
4265 .config_registry()
4266 .get_active("red.ai")
4267 .expect("managed config namespace registry entry");
4268 assert!(managed_config.managed);
4269 assert_eq!(managed_config.resource_type, "config_namespace");
4270
4271 let store = runtime.db().store();
4272 match store
4273 .get_config("red.ai.default.provider")
4274 .expect("plain config persisted")
4275 {
4276 Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
4277 other => panic!("expected provider text, got {other:?}"),
4278 }
4279 let Value::Json(bytes) = store
4280 .get_config("red.ai.openai.default.secret_ref")
4281 .expect("secret ref config persisted")
4282 else {
4283 panic!("secret ref must be stored as structured JSON");
4284 };
4285 let reference: crate::serde_json::Value =
4286 crate::serde_json::from_slice(&bytes).expect("secret ref json");
4287 assert_eq!(
4288 reference.get("type").and_then(|v| v.as_str()),
4289 Some("secret_ref")
4290 );
4291 assert!(
4292 !String::from_utf8_lossy(&bytes).contains("hunter2"),
4293 "manifest password must not leak into secret ref config"
4294 );
4295
4296 let completed = store
4297 .get_config(BOOTSTRAP_COMPLETED_KEY)
4298 .expect("bootstrap completion persisted");
4299 assert!(matches!(completed, Value::Boolean(true)));
4300 assert!(
4301 store
4302 .get_config("system.bootstrap.manifest.registry_entries")
4303 .is_some(),
4304 "managed registry entries must be persisted internally"
4305 );
4306
4307 std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
4308 let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
4309 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
4310 .expect("registry rehydrates without manifest file");
4311 assert!(restored_registry.get_active("managed-deny-drop").is_some());
4312 assert!(restored_registry.get_active("red.ai").is_some());
4313
4314 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
4315 apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
4316 assert!(fresh.needs_bootstrap());
4317
4318 clear_preset_env();
4319 }
4320
4321 #[test]
4322 fn production_preset_refuses_to_start_without_password() {
4323 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4324 clear_preset_env();
4325 unsafe {
4327 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4328 std::env::set_var("REDDB_USERNAME", "ops");
4329 }
4330
4331 let (runtime, auth_store) = fresh_runtime_and_store();
4332 let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
4333 assert!(
4334 err.contains("REDDB_PASSWORD"),
4335 "error must name the missing env: {err}"
4336 );
4337
4338 assert!(auth_store.needs_bootstrap());
4340 assert!(runtime
4341 .db()
4342 .store()
4343 .get_config(BOOTSTRAP_COMPLETED_KEY)
4344 .is_none());
4345
4346 clear_preset_env();
4347 }
4348
4349 #[test]
4350 fn re_running_production_after_first_boot_is_a_silent_skip() {
4351 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4352 clear_preset_env();
4353 unsafe {
4355 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4356 std::env::set_var("REDDB_USERNAME", "ops");
4357 std::env::set_var("REDDB_PASSWORD", "hunter2");
4358 }
4359
4360 let (runtime, auth_store) = fresh_runtime_and_store();
4361 apply_preset(&runtime, &auth_store).expect("first apply");
4362 assert_eq!(auth_store.list_users().len(), 1);
4363
4364 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
4371 apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
4372 assert!(
4373 fresh.needs_bootstrap(),
4374 "re-run must not create a second admin"
4375 );
4376 assert!(
4377 fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
4378 "re-run must not re-install the allow-all policy on the fresh store"
4379 );
4380
4381 clear_preset_env();
4382 }
4383
4384 #[test]
4385 fn unrecognised_preset_value_is_rejected() {
4386 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4387 clear_preset_env();
4388 unsafe {
4390 std::env::set_var(PRESET_ENV, "weird");
4391 }
4392
4393 let (runtime, auth_store) = fresh_runtime_and_store();
4394 let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
4395 assert!(err.contains("weird"), "error must echo the value: {err}");
4396 assert!(auth_store.needs_bootstrap());
4397
4398 clear_preset_env();
4399 }
4400
4401 #[test]
4402 fn no_auth_short_circuits_preset_entirely() {
4403 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4404 clear_preset_env();
4405 unsafe {
4408 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4409 std::env::set_var("REDDB_USERNAME", "ops");
4410 std::env::set_var("REDDB_PASSWORD", "hunter2");
4411 }
4412
4413 let options = no_auth_test_config(true)
4414 .to_db_options()
4415 .expect("to_db_options");
4416 assert!(no_auth_active(&options));
4417
4418 let (runtime, auth_store) = fresh_runtime_and_store();
4421 if !no_auth_active(&options) {
4422 apply_preset(&runtime, &auth_store).expect("would apply preset");
4423 }
4424
4425 assert!(
4426 auth_store.needs_bootstrap(),
4427 "--no-auth must prevent any admin creation"
4428 );
4429 assert!(
4430 runtime
4431 .db()
4432 .store()
4433 .get_config(BOOTSTRAP_COMPLETED_KEY)
4434 .is_none(),
4435 "--no-auth must skip bootstrap-state persistence"
4436 );
4437
4438 clear_preset_env();
4439 }
4440
4441 #[test]
4442 fn implicit_bind_collision_degrades() {
4443 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
4444 let addr = held.local_addr().expect("held addr").to_string();
4445 let mut readiness = TransportReadiness::default();
4446
4447 let listener =
4448 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
4449
4450 assert!(listener.is_none());
4451 assert_eq!(readiness.active.len(), 0);
4452 assert_eq!(readiness.failed.len(), 1);
4453 assert!(!readiness.failed[0].explicit);
4454 assert_eq!(readiness.failed[0].bind_addr, addr);
4455 }
4456}