1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::storage::StorageProfileSelection;
13use crate::{
14 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
15};
16
17pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
18
19pub fn detect_runtime_config() -> RuntimeConfig {
21 let cpus = thread::available_parallelism()
22 .map(|n| n.get())
23 .unwrap_or(1);
24
25 let suggested_workers = cpus.saturating_sub(1).max(1);
27
28 RuntimeConfig {
29 available_cpus: cpus,
30 suggested_workers,
31 stack_size: 8 * 1024 * 1024, }
33}
34
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37 pub available_cpus: usize,
38 pub suggested_workers: usize,
39 pub stack_size: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerTransport {
44 Grpc,
45 Http,
46 Wire,
47}
48
49impl ServerTransport {
50 pub const fn as_str(self) -> &'static str {
51 match self {
52 Self::Grpc => "gRPC",
53 Self::Http => "HTTP",
54 Self::Wire => "wire",
55 }
56 }
57
58 pub const fn default_bind_addr(self) -> &'static str {
59 match self {
60 Self::Grpc => "127.0.0.1:5555",
61 Self::Http => "127.0.0.1:5055",
62 Self::Wire => "127.0.0.1:5050",
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct ServerCommandConfig {
69 pub path: Option<PathBuf>,
70 pub router_bind_addr: Option<String>,
71 pub router_bind_explicit: bool,
72 pub grpc_bind_addr: Option<String>,
73 pub grpc_bind_explicit: bool,
74 pub grpc_tls_bind_addr: Option<String>,
78 pub grpc_tls_cert: Option<PathBuf>,
84 pub grpc_tls_key: Option<PathBuf>,
87 pub grpc_tls_client_ca: Option<PathBuf>,
92 pub http_bind_addr: Option<String>,
93 pub http_bind_explicit: bool,
94 pub http_tls_bind_addr: Option<String>,
98 pub http_tls_cert: Option<PathBuf>,
101 pub http_tls_key: Option<PathBuf>,
104 pub http_tls_client_ca: Option<PathBuf>,
108 pub wire_bind_addr: Option<String>,
109 pub wire_bind_explicit: bool,
110 pub wire_tls_bind_addr: Option<String>,
112 pub wire_tls_cert: Option<PathBuf>,
114 pub wire_tls_key: Option<PathBuf>,
116 pub pg_bind_addr: Option<String>,
120 pub create_if_missing: bool,
121 pub read_only: bool,
122 pub role: String,
123 pub primary_addr: Option<String>,
124 pub storage_profile: StorageProfileSelection,
125 pub vault: bool,
126 pub no_auth: bool,
137 pub workers: Option<usize>,
139 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
142 pub http_limits_cli: crate::server::HttpLimitsCliInput,
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct TransportListenerState {
151 pub transport: String,
152 pub bind_addr: String,
153 pub explicit: bool,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct TransportListenerFailure {
158 pub transport: String,
159 pub bind_addr: String,
160 pub explicit: bool,
161 pub reason: String,
162}
163
164#[derive(Debug, Clone, Default, PartialEq, Eq)]
165pub struct TransportReadiness {
166 pub active: Vec<TransportListenerState>,
167 pub failed: Vec<TransportListenerFailure>,
168}
169
170impl TransportReadiness {
171 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
172 self.active.push(TransportListenerState {
173 transport: transport.to_string(),
174 bind_addr: bind_addr.to_string(),
175 explicit,
176 });
177 }
178
179 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
180 self.failed.push(TransportListenerFailure {
181 transport: transport.to_string(),
182 bind_addr: bind_addr.to_string(),
183 explicit,
184 reason,
185 });
186 }
187}
188
189#[derive(Debug, Clone)]
190pub struct SystemdServiceConfig {
191 pub service_name: String,
192 pub binary_path: PathBuf,
193 pub run_user: String,
194 pub run_group: String,
195 pub data_path: PathBuf,
196 pub router_bind_addr: Option<String>,
197 pub grpc_bind_addr: Option<String>,
198 pub http_bind_addr: Option<String>,
199}
200
201impl SystemdServiceConfig {
202 pub fn data_dir(&self) -> PathBuf {
203 self.data_path
204 .parent()
205 .map(PathBuf::from)
206 .unwrap_or_else(|| PathBuf::from("."))
207 }
208
209 pub fn unit_path(&self) -> PathBuf {
210 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
211 }
212}
213
214pub fn default_telemetry_for_path(
219 path: Option<&std::path::Path>,
220) -> crate::telemetry::TelemetryConfig {
221 let log_dir = match path {
222 Some(p) => p
223 .parent()
224 .map(|parent| parent.join("logs"))
225 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
226 None => None, };
228 crate::telemetry::TelemetryConfig {
229 log_dir,
230 file_prefix: "reddb.log".to_string(),
231 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
232 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
233 crate::telemetry::LogFormat::Pretty
234 } else {
235 crate::telemetry::LogFormat::Json
236 },
237 rotation_keep_days: 14,
238 service_name: "reddb",
239 level_explicit: false,
241 format_explicit: false,
242 rotation_keep_days_explicit: false,
243 file_prefix_explicit: false,
244 log_dir_explicit: false,
245 log_file_disabled: false,
246 }
247}
248
249const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
256const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
257const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
258const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
262
263pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
271
272pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
275 options
276 .metadata
277 .get(NO_AUTH_META)
278 .map(|v| v == "true")
279 .unwrap_or(false)
280}
281
282const NO_AUTH_WARNING: &str =
287 "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
288
289impl ServerCommandConfig {
290 fn to_db_options(&self) -> Result<RedDBOptions, String> {
291 let mut options = match &self.path {
292 Some(path) => RedDBOptions::persistent(path),
293 None => RedDBOptions::in_memory(),
294 };
295
296 options.mode = StorageMode::Persistent;
297 options.create_if_missing = self.create_if_missing;
298 options.read_only = self.read_only
305 || env_nonempty("RED_READONLY")
306 .or_else(|| env_nonempty("REDDB_READONLY"))
307 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
308 .unwrap_or(false)
309 || self.path.as_ref().is_some_and(|data_path| {
310 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
311 data_path,
312 ))
313 .unwrap_or(false)
314 });
315
316 options.replication = match self.role.as_str() {
317 "primary" => ReplicationConfig::primary(),
318 "replica" => {
319 let primary_addr = self
320 .primary_addr
321 .clone()
322 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
323 ReplicationConfig::replica(primary_addr)
330 }
331 _ => ReplicationConfig::standalone(),
332 };
333 options.storage_profile = self.storage_profile.validate()?;
334
335 if self.vault {
336 options.auth.vault_enabled = true;
337 }
338
339 if self.no_auth {
350 options.auth.enabled = false;
351 options.auth.require_auth = false;
352 options.auth.vault_enabled = false;
353 options
354 .metadata
355 .insert(NO_AUTH_META.to_string(), "true".to_string());
356 }
357
358 if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
363 options.control_events.compliance_mode = matches!(
364 raw.to_ascii_lowercase().as_str(),
365 "1" | "true" | "yes" | "on"
366 );
367 }
368 if env_nonempty(PRESET_ENV).is_some_and(|s| s.trim() == PRESET_REGULATED) {
369 options.control_events.compliance_mode = true;
370 options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
371 }
372
373 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
378 Err(msg) => {
379 return Err(format!("backup bootstrap: {msg}"));
380 }
381 Ok(Some(cfg)) => {
382 apply_backup_config(&mut options, &cfg);
383 }
384 Ok(None) => {
385 configure_remote_backend_from_env(&mut options);
386 }
387 }
388
389 if options.remote_backend.is_some()
390 || options
391 .metadata
392 .contains_key(BACKUP_INTERVAL_META_CHECKPOINT)
393 {
394 let mut selection = options.storage_profile;
395 selection.managed_backup = true;
396 options.storage_profile = selection.validate()?;
397 }
398
399 Ok(options)
400 }
401
402 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
403 let mut transports = Vec::with_capacity(3);
404 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
405 transports.push(ServerTransport::Grpc);
406 }
407 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
408 transports.push(ServerTransport::Http);
409 }
410 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
411 transports.push(ServerTransport::Wire);
412 }
413 transports
414 }
415}
416
417fn env_nonempty(name: &str) -> Option<String> {
422 crate::utils::env_with_file_fallback(name)
423}
424
425fn env_truthy(name: &str) -> bool {
426 env_nonempty(name)
427 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
428 .unwrap_or(false)
429}
430
431fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
437 let endpoint_host = endpoint_host(&cfg.endpoint);
438
439 options.metadata.insert(
440 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
441 cfg.checkpoint_interval_secs.to_string(),
442 );
443 options.metadata.insert(
444 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
445 cfg.wal_flush_interval_secs.to_string(),
446 );
447 options
448 .metadata
449 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
450 options.metadata.insert(
451 BACKUP_PAUSE_ON_LAG_META.to_string(),
452 cfg.pause_on_lag_secs.to_string(),
453 );
454
455 #[cfg(feature = "backend-s3")]
456 {
457 let s3_cfg = crate::storage::backend::S3Config {
458 endpoint: cfg.endpoint.clone(),
459 bucket: cfg.bucket.clone(),
460 key_prefix: cfg.prefix.clone(),
461 access_key: cfg.access_key_id.clone(),
462 secret_key: cfg.secret_access_key.clone(),
463 region: cfg.region.clone(),
464 path_style: true,
465 };
466 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
467 options.remote_backend = Some(backend.clone());
468 options.remote_backend_atomic = Some(backend);
469 let trimmed = cfg.prefix.trim_end_matches('/');
474 options.remote_key = Some(reddb_file::remote_database_key(trimmed));
475
476 tracing::info!(
477 backend = "s3",
478 endpoint = %endpoint_host,
479 bucket = %cfg.bucket,
480 prefix = %cfg.prefix,
481 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
482 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
483 "backup backend configured from REDDB_BACKUP_* env"
484 );
485 }
486
487 #[cfg(not(feature = "backend-s3"))]
488 {
489 tracing::warn!(
490 backend = "s3",
491 endpoint = %endpoint_host,
492 bucket = %cfg.bucket,
493 prefix = %cfg.prefix,
494 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
495 backend wiring skipped (archiver/checkpointer also disabled)"
496 );
497 }
498}
499
500fn endpoint_host(endpoint: &str) -> &str {
501 let after_scheme = endpoint
502 .split_once("://")
503 .map(|(_, r)| r)
504 .unwrap_or(endpoint);
505 after_scheme.split('/').next().unwrap_or(after_scheme)
506}
507
508fn spawn_backup_tasks_if_configured(
514 options: &RedDBOptions,
515 runtime: &RedDBRuntime,
516) -> Option<BackupTasksHandle> {
517 let checkpoint_secs: u64 = options
518 .metadata
519 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
520 .parse()
521 .ok()?;
522 let wal_secs: u64 = options
523 .metadata
524 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
525 .parse()
526 .ok()?;
527 let pause_on_lag_secs: u64 = options
530 .metadata
531 .get(BACKUP_PAUSE_ON_LAG_META)
532 .and_then(|raw| raw.parse().ok())
533 .unwrap_or(0);
534 options.remote_backend.as_ref()?;
535
536 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
537
538 if pause_on_lag_secs > 0 {
543 let now_ms = std::time::SystemTime::now()
544 .duration_since(std::time::UNIX_EPOCH)
545 .map(|d| d.as_millis() as u64)
546 .unwrap_or(0);
547 runtime
548 .write_gate()
549 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
550 tracing::info!(
551 pause_on_lag_secs,
552 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
553 );
554 }
555
556 let checkpoint_handle = {
557 let stop = Arc::clone(&stop);
558 let runtime = runtime.clone();
559 let interval = Duration::from_secs(checkpoint_secs);
560 thread::Builder::new()
561 .name("red-checkpointer".into())
562 .spawn(move || {
563 periodic_loop(stop, interval, move || {
564 if let Err(err) = runtime.checkpoint() {
565 tracing::warn!(error = %err, "periodic checkpoint failed");
566 }
567 })
568 })
569 .ok()
570 };
571
572 let archiver_handle = {
573 let stop = Arc::clone(&stop);
574 let runtime = runtime.clone();
575 let interval = Duration::from_secs(wal_secs);
576 let lag_enabled = pause_on_lag_secs > 0;
577 thread::Builder::new()
578 .name("red-wal-archiver".into())
579 .spawn(move || {
580 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
581 Ok(_) if lag_enabled => {
582 let now_ms = std::time::SystemTime::now()
583 .duration_since(std::time::UNIX_EPOCH)
584 .map(|d| d.as_millis() as u64)
585 .unwrap_or(0);
586 runtime.write_gate().record_archive_success(now_ms);
587 runtime.write_gate().evaluate_archive_lag(now_ms);
591 }
592 Ok(_) => {}
593 Err(err) => {
594 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
595 }
596 })
597 })
598 .ok()
599 };
600
601 let lag_monitor_handle = if pause_on_lag_secs > 0 {
606 let stop = Arc::clone(&stop);
607 let runtime = runtime.clone();
608 let interval = Duration::from_secs(5);
612 thread::Builder::new()
613 .name("red-archive-lag-monitor".into())
614 .spawn(move || {
615 periodic_loop(stop, interval, move || {
616 let now_ms = std::time::SystemTime::now()
617 .duration_since(std::time::UNIX_EPOCH)
618 .map(|d| d.as_millis() as u64)
619 .unwrap_or(0);
620 let was_paused = runtime.write_gate().is_auto_paused();
621 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
622 if now_paused && !was_paused {
623 tracing::warn!(
624 pause_on_lag_secs,
625 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
626 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
627 );
628 } else if !now_paused && was_paused {
629 tracing::info!(
630 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
631 );
632 }
633 })
634 })
635 .ok()
636 } else {
637 None
638 };
639
640 tracing::info!(
641 checkpoint_interval_secs = checkpoint_secs,
642 wal_flush_interval_secs = wal_secs,
643 "backup tasks spawned (checkpointer + WAL archiver)"
644 );
645
646 Some(BackupTasksHandle {
647 stop,
648 _checkpoint_handle: checkpoint_handle,
649 _archiver_handle: archiver_handle,
650 _lag_monitor_handle: lag_monitor_handle,
651 })
652}
653
654pub struct BackupTasksHandle {
657 stop: Arc<std::sync::atomic::AtomicBool>,
658 _checkpoint_handle: Option<thread::JoinHandle<()>>,
659 _archiver_handle: Option<thread::JoinHandle<()>>,
660 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
663}
664
665impl Drop for BackupTasksHandle {
666 fn drop(&mut self) {
667 self.stop.store(true, std::sync::atomic::Ordering::Release);
668 }
669}
670
671fn periodic_loop<F: FnMut()>(
672 stop: Arc<std::sync::atomic::AtomicBool>,
673 interval: Duration,
674 mut tick: F,
675) {
676 let wake = Duration::from_secs(1);
679 let mut elapsed = Duration::ZERO;
680 while !stop.load(std::sync::atomic::Ordering::Acquire) {
681 thread::sleep(wake);
682 elapsed += wake;
683 if elapsed >= interval {
684 tick();
685 elapsed = Duration::ZERO;
686 }
687 }
688}
689
690fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
691 let backend = env_nonempty("RED_BACKEND")
697 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
698 .unwrap_or_else(|| "none".to_string())
699 .to_ascii_lowercase();
700
701 match backend.as_str() {
702 "s3" | "minio" | "r2" => {
707 #[cfg(feature = "backend-s3")]
708 {
709 if let Some(config) = s3_config_from_env() {
710 let remote_key = env_nonempty("RED_REMOTE_KEY")
711 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
712 .unwrap_or_else(|| reddb_file::remote_database_key("clusters/dev"));
713 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
714 options.remote_backend = Some(backend.clone());
715 options.remote_backend_atomic = Some(backend);
716 options.remote_key = Some(remote_key);
717 }
718 }
719 #[cfg(not(feature = "backend-s3"))]
720 {
721 tracing::warn!(
722 backend = %backend,
723 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
724 );
725 }
726 }
727 "fs" | "local" => {
732 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
733 let remote_key = match (
734 base_path,
735 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
736 ) {
737 (Some(base), Some(rel)) => Some(format!(
738 "{}/{}",
739 base.trim_end_matches('/'),
740 rel.trim_start_matches('/')
741 )),
742 (Some(base), None) => Some(reddb_file::remote_database_key(&format!(
743 "{}/clusters/dev",
744 base.trim_end_matches('/')
745 ))),
746 (None, Some(rel)) => Some(rel),
747 (None, None) => None,
748 };
749 if let Some(key) = remote_key {
750 let backend = Arc::new(crate::storage::backend::LocalBackend);
751 options.remote_backend = Some(backend.clone());
752 options.remote_backend_atomic = Some(backend);
753 options.remote_key = Some(key);
754 }
755 }
756 "http" => {
761 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
762 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
763 {
764 Some(u) => u,
765 None => {
766 tracing::warn!(
767 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
768 );
769 return;
770 }
771 };
772 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
773 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
774 .unwrap_or_default();
775 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
776 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
777 {
778 std::fs::read_to_string(&path)
779 .ok()
780 .map(|s| s.trim().to_string())
781 .filter(|s| !s.is_empty())
782 } else {
783 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
784 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
785 };
786
787 let mut config =
788 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
789 if let Some(auth) = auth_header {
790 config = config.with_auth_header(auth);
791 }
792 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
793 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
794 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
795 config = config.with_conditional_writes(conditional_writes);
796 if conditional_writes {
801 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
802 Ok(atomic) => {
803 let atomic_arc = Arc::new(atomic);
804 options.remote_backend = Some(atomic_arc.clone());
805 options.remote_backend_atomic = Some(atomic_arc);
806 }
807 Err(err) => {
808 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
809 options.remote_backend =
810 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
811 }
812 }
813 } else {
814 options.remote_backend =
815 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
816 }
817 options.remote_key = env_nonempty("RED_REMOTE_KEY")
818 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
819 .or_else(|| Some(reddb_file::remote_database_key("clusters/dev")));
820 }
821 "none" | "" => {}
824 other => {
825 tracing::warn!(
826 backend = %other,
827 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
828 );
829 }
830 }
831}
832
833#[cfg(feature = "backend-s3")]
838fn env_s3(suffix: &str) -> Option<String> {
839 env_nonempty(&format!("RED_S3_{suffix}"))
840 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
841}
842
843#[cfg(feature = "backend-s3")]
849fn env_s3_secret(suffix: &str) -> Option<String> {
850 let file_key_red = format!("RED_S3_{suffix}_FILE");
851 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
852 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
853 return std::fs::read_to_string(&path)
854 .ok()
855 .map(|s| s.trim().to_string())
856 .filter(|s| !s.is_empty());
857 }
858 env_s3(suffix)
859}
860
861#[cfg(feature = "backend-s3")]
862fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
863 let endpoint = env_s3("ENDPOINT")?;
864 let bucket = env_s3("BUCKET")?;
865 let access_key = env_s3_secret("ACCESS_KEY")?;
866 let secret_key = env_s3_secret("SECRET_KEY")?;
867 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
868 let key_prefix = env_s3("KEY_PREFIX")
869 .or_else(|| env_s3("PREFIX"))
870 .unwrap_or_default();
871 let path_style = env_s3("PATH_STYLE")
872 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
873 .unwrap_or(true);
874 Some(crate::storage::backend::S3Config {
875 endpoint,
876 bucket,
877 key_prefix,
878 access_key,
879 secret_key,
880 region,
881 path_style,
882 })
883}
884
885pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
886 let data_dir = config.data_dir();
887 let exec_start = render_systemd_exec_start(config);
888 format!(
889 "[Unit]\n\
890Description=RedDB unified database service\n\
891After=network-online.target\n\
892Wants=network-online.target\n\
893\n\
894[Service]\n\
895Type=simple\n\
896User={user}\n\
897Group={group}\n\
898WorkingDirectory={workdir}\n\
899ExecStart={exec_start}\n\
900Restart=always\n\
901RestartSec=2\n\
902LimitSTACK=16M\n\
903NoNewPrivileges=true\n\
904PrivateTmp=true\n\
905ProtectSystem=strict\n\
906ProtectHome=true\n\
907ProtectControlGroups=true\n\
908ProtectKernelTunables=true\n\
909ProtectKernelModules=true\n\
910RestrictNamespaces=true\n\
911LockPersonality=true\n\
912MemoryDenyWriteExecute=true\n\
913ReadWritePaths={workdir}\n\
914\n\
915[Install]\n\
916WantedBy=multi-user.target\n",
917 user = config.run_user,
918 group = config.run_group,
919 workdir = data_dir.display(),
920 exec_start = exec_start,
921 )
922}
923
924#[cfg(target_os = "linux")]
933pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
934 ensure_root()?;
935 ensure_command_available("systemctl")?;
936 ensure_command_available("getent")?;
937 ensure_command_available("groupadd")?;
938 ensure_command_available("useradd")?;
939 ensure_command_available("install")?;
940 ensure_executable(&config.binary_path)?;
941
942 if !command_success("getent", ["group", config.run_group.as_str()])? {
943 run_command("groupadd", ["--system", config.run_group.as_str()])?;
944 }
945
946 if !command_success("id", ["-u", config.run_user.as_str()])? {
947 let data_dir = config.data_dir();
948 run_command(
949 "useradd",
950 [
951 "--system",
952 "--gid",
953 config.run_group.as_str(),
954 "--home-dir",
955 data_dir.to_string_lossy().as_ref(),
956 "--shell",
957 "/usr/sbin/nologin",
958 config.run_user.as_str(),
959 ],
960 )?;
961 }
962
963 let data_dir = config.data_dir();
964 run_command(
965 "install",
966 [
967 "-d",
968 "-o",
969 config.run_user.as_str(),
970 "-g",
971 config.run_group.as_str(),
972 "-m",
973 "0750",
974 data_dir.to_string_lossy().as_ref(),
975 ],
976 )?;
977
978 std::fs::write(config.unit_path(), render_systemd_unit(config))
979 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
980
981 run_command("systemctl", ["daemon-reload"])?;
982 run_command(
983 "systemctl",
984 [
985 "enable",
986 "--now",
987 format!("{}.service", config.service_name).as_str(),
988 ],
989 )?;
990
991 Ok(())
992}
993
994#[cfg(not(target_os = "linux"))]
999pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
1000 Err("systemd install is Linux-only — use sc.exe (Windows) or \
1001 launchd (macOS) to install the service manually using the \
1002 unit printed by `red service print-unit`"
1003 .to_string())
1004}
1005
1006#[cfg(target_os = "linux")]
1007fn ensure_root() -> Result<(), String> {
1008 let output = Command::new("id")
1009 .arg("-u")
1010 .output()
1011 .map_err(|err| format!("failed to determine current uid: {err}"))?;
1012 if !output.status.success() {
1013 return Err("failed to determine current uid".to_string());
1014 }
1015 let uid = String::from_utf8_lossy(&output.stdout);
1016 if uid.trim() != "0" {
1017 return Err("run this command as root (sudo)".to_string());
1018 }
1019 Ok(())
1020}
1021
1022#[cfg(target_os = "linux")]
1023fn ensure_command_available(command: &str) -> Result<(), String> {
1024 let status = Command::new("sh")
1025 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1026 .status()
1027 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1028 if status.success() {
1029 Ok(())
1030 } else {
1031 Err(format!("required command not found: {command}"))
1032 }
1033}
1034
1035#[cfg(target_os = "linux")]
1036fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1037 let metadata = std::fs::metadata(path)
1038 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1039 #[cfg(unix)]
1040 {
1041 use std::os::unix::fs::PermissionsExt;
1042 if metadata.permissions().mode() & 0o111 == 0 {
1043 return Err(format!("binary is not executable: {}", path.display()));
1044 }
1045 }
1046 #[cfg(not(unix))]
1047 {
1048 if !metadata.is_file() {
1049 return Err(format!("binary is not a file: {}", path.display()));
1050 }
1051 }
1052 Ok(())
1053}
1054
1055#[cfg(target_os = "linux")]
1056fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1057 Command::new(program)
1058 .args(args)
1059 .status()
1060 .map(|status| status.success())
1061 .map_err(|err| format!("failed to run {program}: {err}"))
1062}
1063
1064#[cfg(target_os = "linux")]
1065fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1066 let output = Command::new(program)
1067 .args(args)
1068 .output()
1069 .map_err(|err| format!("failed to run {program}: {err}"))?;
1070 if output.status.success() {
1071 return Ok(());
1072 }
1073
1074 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1075 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1076 let detail = if !stderr.is_empty() {
1077 stderr
1078 } else if !stdout.is_empty() {
1079 stdout
1080 } else {
1081 format!("exit status {}", output.status)
1082 };
1083 Err(format!("{program} failed: {detail}"))
1084}
1085
1086pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1087 let has_any = config.router_bind_addr.is_some()
1088 || config.grpc_bind_addr.is_some()
1089 || config.http_bind_addr.is_some()
1090 || config.wire_bind_addr.is_some()
1091 || config.pg_bind_addr.is_some();
1092 if !has_any {
1093 return Err("at least one server bind address must be configured".into());
1094 }
1095 let thread_name = if config.router_bind_addr.is_some() {
1096 "red-server-router"
1097 } else {
1098 match (
1099 config.grpc_bind_addr.is_some(),
1100 config.http_bind_addr.is_some(),
1101 ) {
1102 (true, true) => "red-server-dual",
1103 (true, false) => "red-server-grpc",
1104 (false, true) => "red-server-http",
1105 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1106 (false, false) => "red-server-pg-wire",
1107 }
1108 };
1109
1110 let handle = thread::Builder::new()
1111 .name(thread_name.into())
1112 .stack_size(8 * 1024 * 1024)
1113 .spawn(move || run_configured_servers(config))
1114 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1115
1116 match handle.join() {
1117 Ok(result) => result,
1118 Err(_) => Err("server thread panicked".to_string()),
1119 }
1120}
1121
1122fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1123 let mut parts = vec![
1124 config.binary_path.display().to_string(),
1125 "server".to_string(),
1126 "--path".to_string(),
1127 config.data_path.display().to_string(),
1128 ];
1129
1130 if let Some(bind_addr) = &config.router_bind_addr {
1131 parts.push("--bind".to_string());
1132 parts.push(bind_addr.clone());
1133 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1134 parts.push("--grpc-bind".to_string());
1135 parts.push(bind_addr.clone());
1136 }
1137 if let Some(bind_addr) = &config.http_bind_addr {
1138 parts.push("--http-bind".to_string());
1139 parts.push(bind_addr.clone());
1140 }
1141
1142 parts.join(" ")
1143}
1144
1145pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1146 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1147 Ok(addresses) => addresses.collect(),
1148 Err(_) => return false,
1149 };
1150
1151 addresses
1152 .into_iter()
1153 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1154}
1155
1156#[inline(never)]
1157fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1158 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1164 return run_routed_server(config, router_bind_addr);
1165 }
1166
1167 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1168 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1169 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1170 }
1171 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1172 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1173 (None, None) => {
1174 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1175 run_wire_only_server(config, wire_addr)
1176 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1177 run_pg_only_server(config, pg_addr)
1178 } else {
1179 Err("at least one server bind address must be configured".to_string())
1180 }
1181 }
1182 }
1183}
1184
1185pub fn bind_listener_for_startup(
1203 readiness: &mut TransportReadiness,
1204 transport: &str,
1205 bind_addr: &str,
1206 explicit: bool,
1207) -> Result<Option<TcpListener>, String> {
1208 match TcpListener::bind(bind_addr) {
1209 Ok(listener) => {
1210 readiness.active(transport, bind_addr, explicit);
1211 Ok(Some(listener))
1212 }
1213 Err(err) => {
1214 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1215 readiness.failed(transport, bind_addr, explicit, reason.clone());
1216 if explicit {
1217 tracing::error!(
1218 transport,
1219 bind = %bind_addr,
1220 error = %err,
1221 "fatal explicit bind failure"
1222 );
1223 Err(format!("explicit {reason}"))
1224 } else {
1225 tracing::warn!(
1226 transport,
1227 bind = %bind_addr,
1228 error = %err,
1229 "non-fatal implicit bind failure; listener degraded"
1230 );
1231 Ok(None)
1232 }
1233 }
1234 }
1235}
1236
1237async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1260 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1261 .ok()
1262 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1263 .unwrap_or(true);
1264
1265 #[cfg(unix)]
1266 {
1267 use tokio::signal::unix::{signal, SignalKind};
1268
1269 let mut sigterm = match signal(SignalKind::terminate()) {
1270 Ok(s) => s,
1271 Err(err) => {
1272 tracing::warn!(
1273 error = %err,
1274 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1275 );
1276 return;
1277 }
1278 };
1279 let mut sigint = match signal(SignalKind::interrupt()) {
1280 Ok(s) => s,
1281 Err(err) => {
1282 tracing::warn!(error = %err, "could not install SIGINT handler");
1283 return;
1284 }
1285 };
1286 let mut sighup = match signal(SignalKind::hangup()) {
1292 Ok(s) => Some(s),
1293 Err(err) => {
1294 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1295 None
1296 }
1297 };
1298
1299 let reload_runtime = runtime.clone();
1300 tokio::spawn(async move {
1301 loop {
1302 let signal_name = match &mut sighup {
1303 Some(hup) => tokio::select! {
1304 _ = sigterm.recv() => "SIGTERM",
1305 _ = sigint.recv() => "SIGINT",
1306 _ = hup.recv() => "SIGHUP",
1307 },
1308 None => tokio::select! {
1309 _ = sigterm.recv() => "SIGTERM",
1310 _ = sigint.recv() => "SIGINT",
1311 },
1312 };
1313
1314 if signal_name == "SIGHUP" {
1315 handle_sighup_reload(&reload_runtime);
1316 continue; }
1318
1319 tracing::info!(
1320 signal = signal_name,
1321 "lifecycle signal received; shutting down"
1322 );
1323 match runtime.graceful_shutdown(backup_on_shutdown) {
1324 Ok(report) => {
1325 tracing::info!(
1326 duration_ms = report.duration_ms,
1327 flushed_wal = report.flushed_wal,
1328 final_checkpoint = report.final_checkpoint,
1329 backup_uploaded = report.backup_uploaded,
1330 "graceful shutdown complete"
1331 );
1332 }
1333 Err(err) => {
1334 tracing::error!(error = %err, "graceful shutdown failed");
1335 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1341 reason: format!("graceful shutdown failed: {err}"),
1342 }
1343 .emit_global();
1344 }
1345 }
1346 std::process::exit(0);
1347 }
1348 });
1349 }
1350
1351 #[cfg(not(unix))]
1352 {
1353 tokio::spawn(async move {
1354 let interrupted = tokio::signal::ctrl_c().await;
1355 if let Err(err) = interrupted {
1356 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1357 return;
1358 }
1359
1360 tracing::info!(
1361 signal = "Ctrl+C",
1362 "lifecycle signal received; shutting down"
1363 );
1364 match runtime.graceful_shutdown(backup_on_shutdown) {
1365 Ok(report) => {
1366 tracing::info!(
1367 duration_ms = report.duration_ms,
1368 flushed_wal = report.flushed_wal,
1369 final_checkpoint = report.final_checkpoint,
1370 backup_uploaded = report.backup_uploaded,
1371 "graceful shutdown complete"
1372 );
1373 }
1374 Err(err) => {
1375 tracing::error!(error = %err, "graceful shutdown failed");
1376 }
1377 }
1378 std::process::exit(0);
1379 });
1380 }
1381}
1382
1383fn handle_sighup_reload(runtime: &RedDBRuntime) {
1392 let now_ms = std::time::SystemTime::now()
1393 .duration_since(std::time::UNIX_EPOCH)
1394 .map(|d| d.as_millis() as u64)
1395 .unwrap_or(0);
1396 tracing::info!(
1397 target: "reddb::secrets",
1398 ts_unix_ms = now_ms,
1399 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1400 );
1401 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1406 runtime.audit_log().record_event(
1407 AuditEvent::builder("config/sighup_reload")
1408 .source(AuditAuthSource::System)
1409 .resource("secrets")
1410 .outcome(Outcome::Success)
1411 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1412 .build(),
1413 );
1414}
1415
1416#[inline(never)]
1417fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1418 let workers = config.workers;
1419 let cli_telemetry = config.telemetry.clone();
1420 let db_options = config.to_db_options()?;
1421 let rt_config = detect_runtime_config();
1422 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1423 let (runtime, auth_store, _telemetry_guard) =
1424 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1425 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1426
1427 spawn_admin_metrics_listeners(&runtime, &auth_store);
1428
1429 let http_server = build_http_server(
1435 runtime.clone(),
1436 auth_store.clone(),
1437 router_bind_addr.clone(),
1438 );
1439 let http_server = apply_http_limits(http_server, &config, &runtime);
1440
1441 let grpc_server = RedDBGrpcServer::with_options(
1442 runtime.clone(),
1443 GrpcServerOptions {
1444 bind_addr: router_bind_addr.clone(),
1445 tls: None,
1446 },
1447 auth_store,
1448 );
1449
1450 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1451 .enable_all()
1452 .worker_threads(worker_threads)
1453 .thread_stack_size(rt_config.stack_size)
1454 .build()
1455 .map_err(|err| format!("tokio runtime: {err}"))?;
1456
1457 let signal_runtime = runtime.clone();
1458 let wire_runtime = Arc::new(runtime);
1459 tokio_runtime.block_on(async move {
1460 spawn_lifecycle_signal_handler(signal_runtime).await;
1461 tracing::info!(
1462 bind = %router_bind_addr,
1463 cpus = rt_config.available_cpus,
1464 workers = worker_threads,
1465 "router bootstrapping"
1466 );
1467 serve_tcp_router(InProcessRouterConfig {
1468 bind_addr: router_bind_addr,
1469 http_server,
1470 grpc_server,
1471 wire_runtime,
1472 })
1473 .await
1474 .map_err(|err| err.to_string())
1475 })
1476}
1477
1478async fn spawn_wire_listeners(
1480 config: &ServerCommandConfig,
1481 runtime: &RedDBRuntime,
1482 readiness: &mut TransportReadiness,
1483) -> Result<(), String> {
1484 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1486 let wire_rt = Arc::new(runtime.clone());
1487 #[cfg(unix)]
1490 {
1491 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1492 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1493 tokio::spawn(async move {
1494 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1495 &wire_addr, wire_rt,
1496 )
1497 .await
1498 {
1499 tracing::error!(err = %e, "redwire unix listener error");
1500 }
1501 });
1502 return Ok(());
1503 }
1504 }
1505 match tokio::net::TcpListener::bind(&wire_addr).await {
1506 Ok(listener) => {
1507 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1508 tokio::spawn(async move {
1509 if let Err(e) =
1510 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1511 .await
1512 {
1513 tracing::error!(err = %e, "redwire listener error");
1514 }
1515 });
1516 }
1517 Err(err) => {
1518 let reason = format!("wire listener bind {wire_addr}: {err}");
1519 readiness.failed(
1520 "wire",
1521 &wire_addr,
1522 config.wire_bind_explicit,
1523 reason.clone(),
1524 );
1525 if config.wire_bind_explicit {
1526 tracing::error!(
1527 transport = "wire",
1528 bind = %wire_addr,
1529 error = %err,
1530 "fatal explicit bind failure"
1531 );
1532 return Err(format!("explicit {reason}"));
1533 }
1534 tracing::warn!(
1535 transport = "wire",
1536 bind = %wire_addr,
1537 error = %err,
1538 "non-fatal implicit bind failure; listener degraded"
1539 );
1540 }
1541 }
1542 }
1543
1544 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1546 let tls_config = resolve_wire_tls_config(config);
1547 match tls_config {
1548 Ok(tls_cfg) => {
1549 let wire_rt = Arc::new(runtime.clone());
1550 tokio::spawn(async move {
1551 if let Err(e) =
1552 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1553 .await
1554 {
1555 tracing::error!(err = %e, "redwire+tls listener error");
1556 }
1557 });
1558 }
1559 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1560 }
1561 }
1562 Ok(())
1563}
1564
1565fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1572 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1573 let rt = Arc::new(runtime.clone());
1574 tokio::spawn(async move {
1575 let cfg = crate::wire::PgWireConfig {
1576 bind_addr: pg_addr,
1577 ..Default::default()
1578 };
1579 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1580 tracing::error!(err = %e, "pg wire listener error");
1581 }
1582 });
1583 }
1584}
1585
1586fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1600 use crate::utils::secret_file::expand_file_env;
1601
1602 for var in [
1606 "REDDB_GRPC_TLS_CERT",
1607 "REDDB_GRPC_TLS_KEY",
1608 "REDDB_GRPC_TLS_CLIENT_CA",
1609 ] {
1610 if let Err(err) = expand_file_env(var) {
1611 tracing::warn!(
1612 target: "reddb::secrets",
1613 env = %var,
1614 err = %err,
1615 "could not expand *_FILE companion for gRPC TLS"
1616 );
1617 }
1618 }
1619
1620 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1621 (Some(cert), Some(key)) => {
1622 let cert_pem = std::fs::read(cert)
1623 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1624 let key_pem =
1625 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1626 (cert_pem, key_pem)
1627 }
1628 _ => {
1629 let dev = std::env::var("RED_GRPC_TLS_DEV")
1631 .ok()
1632 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1633 .unwrap_or(false);
1634 if !dev {
1635 return Err("gRPC TLS configured but no cert/key supplied — set \
1636 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1637 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1638 .to_string());
1639 }
1640 let dir = config
1641 .path
1642 .as_ref()
1643 .and_then(|p| p.parent())
1644 .map(PathBuf::from)
1645 .unwrap_or_else(|| PathBuf::from("."));
1646 let (cert_pem_str, key_pem_str) =
1647 crate::wire::tls::generate_self_signed_cert("localhost")
1648 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1649
1650 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1655 tracing::warn!(
1656 target: "reddb::security",
1657 transport = "grpc",
1658 cert_sha256 = %fp,
1659 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1660 DO NOT use in production"
1661 );
1662 let cert_path = dir.join("grpc-tls-cert.pem");
1664 let key_path = dir.join("grpc-tls-key.pem");
1665 if !cert_path.exists() || !key_path.exists() {
1666 let _ = std::fs::create_dir_all(&dir);
1667 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1668 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1669 std::fs::write(&key_path, key_pem_str.as_bytes())
1670 .map_err(|e| format!("write grpc dev key: {e}"))?;
1671 #[cfg(unix)]
1672 {
1673 use std::os::unix::fs::PermissionsExt;
1674 let _ =
1675 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1676 }
1677 }
1678 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1679 }
1680 };
1681
1682 let client_ca_pem = match &config.grpc_tls_client_ca {
1683 Some(path) => Some(
1684 std::fs::read(path)
1685 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1686 ),
1687 None => None,
1688 };
1689
1690 Ok(crate::GrpcTlsOptions {
1691 cert_pem,
1692 key_pem,
1693 client_ca_pem,
1694 })
1695}
1696
1697fn spawn_grpc_tls_listener_if_configured(
1701 config: &ServerCommandConfig,
1702 runtime: RedDBRuntime,
1703 auth_store: Arc<AuthStore>,
1704) {
1705 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1706 return;
1707 };
1708 let tls_opts = match resolve_grpc_tls_options(config) {
1709 Ok(opts) => opts,
1710 Err(err) => {
1711 tracing::error!(
1712 target: "reddb::security",
1713 transport = "grpc",
1714 err = %err,
1715 "gRPC TLS config error; TLS listener will not start"
1716 );
1717 return;
1718 }
1719 };
1720 tokio::spawn(async move {
1721 let server = RedDBGrpcServer::with_options(
1722 runtime,
1723 GrpcServerOptions {
1724 bind_addr: tls_bind.clone(),
1725 tls: Some(tls_opts),
1726 },
1727 auth_store,
1728 );
1729 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1730 if let Err(err) = server.serve().await {
1731 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1732 }
1733 });
1734}
1735
1736fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1739 use sha2::{Digest, Sha256};
1740 let mut h = Sha256::new();
1741 h.update(pem);
1742 let d = h.finalize();
1743 let mut buf = String::with_capacity(64);
1744 for b in d.iter() {
1745 buf.push_str(&format!("{b:02x}"));
1746 }
1747 buf
1748}
1749
1750fn resolve_wire_tls_config(
1752 config: &ServerCommandConfig,
1753) -> Result<crate::wire::WireTlsConfig, String> {
1754 match (&config.wire_tls_cert, &config.wire_tls_key) {
1755 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1756 cert_path: cert.clone(),
1757 key_path: key.clone(),
1758 }),
1759 _ => {
1760 let dir = config
1762 .path
1763 .as_ref()
1764 .and_then(|p| p.parent())
1765 .map(PathBuf::from)
1766 .unwrap_or_else(|| PathBuf::from("."));
1767 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1768 }
1769 }
1770}
1771
1772#[inline(never)]
1773fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1774 let rt_config = detect_runtime_config();
1775 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1776 let cli_telemetry = config.telemetry.clone();
1777 let db_options = config.to_db_options()?;
1778 let mut transport_readiness = TransportReadiness::default();
1779
1780 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1781 .enable_all()
1782 .worker_threads(workers)
1783 .thread_stack_size(rt_config.stack_size)
1784 .build()
1785 .map_err(|err| format!("tokio runtime: {err}"))?;
1786
1787 let (runtime, _auth_store, _telemetry_guard) =
1791 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1792 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1793 let signal_runtime = runtime.clone();
1794 tokio_runtime.block_on(async move {
1795 spawn_lifecycle_signal_handler(signal_runtime).await;
1796 spawn_pg_listener(&config, &runtime);
1797 let wire_rt = Arc::new(runtime);
1798 let listener = tokio::net::TcpListener::bind(&wire_addr)
1799 .await
1800 .map_err(|err| {
1801 let reason = format!("wire listener bind {wire_addr}: {err}");
1802 transport_readiness.failed(
1803 "wire",
1804 &wire_addr,
1805 config.wire_bind_explicit,
1806 reason.clone(),
1807 );
1808 if config.wire_bind_explicit {
1809 format!("explicit {reason}")
1810 } else {
1811 reason
1812 }
1813 })?;
1814 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1815 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1816 .await
1817 .map_err(|e| e.to_string())
1818 })
1819}
1820
1821#[inline(never)]
1822fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1823 let rt_config = detect_runtime_config();
1824 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1825 let cli_telemetry = config.telemetry.clone();
1826 let db_options = config.to_db_options()?;
1827
1828 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1829 .enable_all()
1830 .worker_threads(workers)
1831 .thread_stack_size(rt_config.stack_size)
1832 .build()
1833 .map_err(|err| format!("tokio runtime: {err}"))?;
1834
1835 let (runtime, _auth_store, _telemetry_guard) =
1836 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1837 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1838 let signal_runtime = runtime.clone();
1839 tokio_runtime.block_on(async move {
1840 spawn_lifecycle_signal_handler(signal_runtime).await;
1841 let cfg = crate::wire::PgWireConfig {
1842 bind_addr: pg_addr,
1843 ..Default::default()
1844 };
1845 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1846 .await
1847 .map_err(|e| e.to_string())
1848 })
1849}
1850
1851#[inline(never)]
1852fn build_runtime_and_auth_store(
1853 db_options: RedDBOptions,
1854 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1855) -> Result<
1856 (
1857 RedDBRuntime,
1858 Arc<AuthStore>,
1859 Option<crate::telemetry::TelemetryGuard>,
1860 ),
1861 String,
1862> {
1863 build_runtime_with_telemetry(db_options, cli_telemetry)
1870}
1871
1872pub(crate) fn build_runtime_with_telemetry(
1882 db_options: RedDBOptions,
1883 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1884) -> Result<
1885 (
1886 RedDBRuntime,
1887 Arc<AuthStore>,
1888 Option<crate::telemetry::TelemetryGuard>,
1889 ),
1890 String,
1891> {
1892 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1893 let msg = err.to_string();
1899 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1900 phase: "runtime_construction".to_string(),
1901 error: msg.clone(),
1902 }
1903 .emit_global();
1904 msg
1905 })?;
1906
1907 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1912 let msg = err.to_string();
1913 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1914 phase: "lease_loop".to_string(),
1915 error: msg.clone(),
1916 }
1917 .emit_global();
1918 msg
1919 })?;
1920
1921 if let Some(data_path) = db_options.data_path.as_deref() {
1925 let watch_dir = data_path.parent().unwrap_or(data_path);
1926 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1927 }
1928
1929 {
1933 let config_path = crate::runtime::config_overlay::config_file_path();
1934 let store = runtime.db().store();
1935 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1936 }
1937
1938 let merged = merge_telemetry_with_config(
1941 cli_telemetry
1942 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1943 &runtime,
1944 );
1945 let telemetry_guard = crate::telemetry::init(merged);
1946
1947 let no_auth = no_auth_active(&db_options);
1948 let auth_store =
1949 if db_options.auth.vault_enabled {
1950 let pager =
1951 runtime.db().store().pager().cloned().ok_or_else(|| {
1952 "vault requires a paged database (persistent mode)".to_string()
1953 })?;
1954 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1955 .map_err(|err| err.to_string())?;
1956 Arc::new(store)
1957 } else {
1958 Arc::new(AuthStore::new(db_options.auth.clone()))
1959 };
1960 auth_store.configure_control_events(
1961 runtime.control_event_ledger(),
1962 runtime.control_event_config(),
1963 );
1964 if no_auth {
1969 eprintln!("{NO_AUTH_WARNING}");
1970 tracing::warn!("{NO_AUTH_WARNING}");
1971 } else {
1972 apply_preset(&runtime, &auth_store)?;
1973 maybe_apply_policy_break_glass(&auth_store);
1974 }
1975
1976 {
1978 let store = Arc::clone(&auth_store);
1979 std::thread::Builder::new()
1980 .name("reddb-session-purge".into())
1981 .spawn(move || loop {
1982 std::thread::sleep(std::time::Duration::from_secs(300));
1983 store.purge_expired_sessions();
1984 })
1985 .ok();
1986 }
1987
1988 Ok((runtime, auth_store, telemetry_guard))
1989}
1990
1991fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
1995 use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
1996
1997 let enabled = std::env::var(BREAK_GLASS_ENV)
1998 .ok()
1999 .map(|v| {
2000 let trimmed = v.trim().to_ascii_lowercase();
2001 matches!(trimmed.as_str(), "1" | "true" | "yes")
2002 })
2003 .unwrap_or(false);
2004 if !enabled {
2005 return;
2006 }
2007 let now = crate::utils::now_unix_millis() as u128;
2008 match auth_store.apply_policy_break_glass(now) {
2009 Ok(()) => {
2010 tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2011 }
2012 Err(err) => {
2013 tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2014 }
2015 }
2016}
2017
2018pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2023pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2024pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2025
2026pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2028pub(crate) const PRESET_SIMPLE: &str = "simple";
2029pub(crate) const PRESET_PRODUCTION: &str = "production";
2030pub(crate) const PRESET_REGULATED: &str = "regulated";
2031
2032pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2036pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2037pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2038pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2039pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2040
2041pub(crate) fn apply_preset(
2050 runtime: &RedDBRuntime,
2051 auth_store: &Arc<AuthStore>,
2052) -> Result<(), String> {
2053 let store = runtime.db().store();
2054
2055 if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2056 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2057 runtime,
2058 &runtime.config_registry(),
2059 )?;
2060 tracing::info!("bootstrap state present, skipping preset application");
2061 return Ok(());
2062 }
2063
2064 for var in ["REDDB_USERNAME", "REDDB_PASSWORD"] {
2068 crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2069 }
2070
2071 let preset = std::env::var(PRESET_ENV)
2072 .ok()
2073 .map(|s| s.trim().to_string())
2074 .filter(|s| !s.is_empty())
2075 .unwrap_or_else(|| PRESET_SIMPLE.to_string());
2076
2077 if let Ok(path) = std::env::var(crate::cli::bootstrap_manifest::MANIFEST_ENV) {
2078 let path = path.trim();
2079 if !path.is_empty() {
2080 let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2081 runtime,
2082 auth_store,
2083 &runtime.config_registry(),
2084 std::path::Path::new(path),
2085 )?;
2086 persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2087 tracing::info!("bootstrap manifest applied");
2088 return Ok(());
2089 }
2090 }
2091
2092 let first_admin_id = match preset.as_str() {
2093 PRESET_SIMPLE => {
2094 None
2098 }
2099 PRESET_PRODUCTION => Some(apply_production_preset(auth_store)?),
2100 PRESET_REGULATED => {
2101 apply_regulated_preset(runtime, auth_store)?;
2102 None
2103 }
2104 other => {
2105 return Err(format!(
2106 "REDDB_PRESET={other:?} is not recognised (expected `simple`, `production`, or `regulated`)"
2107 ));
2108 }
2109 };
2110
2111 persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2112 tracing::info!(preset = %preset, "bootstrap preset applied");
2113 Ok(())
2114}
2115
2116fn apply_production_preset(auth_store: &Arc<AuthStore>) -> Result<String, String> {
2117 use crate::auth::store::PrincipalRef;
2118 use crate::auth::{policies::Policy, UserId};
2119
2120 let username = std::env::var("REDDB_USERNAME")
2121 .ok()
2122 .filter(|s| !s.is_empty())
2123 .ok_or_else(|| {
2124 "REDDB_PRESET=production requires REDDB_USERNAME (or REDDB_USERNAME_FILE)".to_string()
2125 })?;
2126 let password = std::env::var("REDDB_PASSWORD")
2127 .ok()
2128 .filter(|s| !s.is_empty())
2129 .ok_or_else(|| {
2130 "REDDB_PRESET=production requires REDDB_PASSWORD (or REDDB_PASSWORD_FILE)".to_string()
2131 })?;
2132
2133 let result = auth_store
2136 .bootstrap_system_admin(&username, &password)
2137 .map_err(|err| format!("bootstrap first admin: {err}"))?;
2138 let first_admin = UserId::platform(result.user.username.clone());
2139
2140 let policy = Policy::from_json_str(&format!(
2142 r#"{{
2143 "id": "{id}",
2144 "version": 1,
2145 "statements": [{{
2146 "effect": "allow",
2147 "actions": ["*"],
2148 "resources": ["*"]
2149 }}]
2150 }}"#,
2151 id = FIRST_ADMIN_ALLOW_ALL_POLICY
2152 ))
2153 .map_err(|err| format!("compile allow-all policy: {err}"))?;
2154 auth_store
2155 .put_policy(policy)
2156 .map_err(|err| format!("install allow-all policy: {err}"))?;
2157
2158 auth_store
2160 .attach_policy(
2161 PrincipalRef::User(first_admin.clone()),
2162 FIRST_ADMIN_ALLOW_ALL_POLICY,
2163 )
2164 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2165
2166 Ok(first_admin.to_string())
2167}
2168
2169fn apply_regulated_preset(
2170 runtime: &RedDBRuntime,
2171 auth_store: &Arc<AuthStore>,
2172) -> Result<(), String> {
2173 use crate::auth::policies::Policy;
2174 use crate::auth::registry::EvidenceRequirement;
2175
2176 runtime.query_audit().enable_infrastructure();
2177
2178 let policy = Policy::from_json_str(&format!(
2179 r#"{{
2180 "id": "{id}",
2181 "version": 1,
2182 "statements": [
2183 {{
2184 "effect": "deny",
2185 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2186 "resources": ["policy:{id}"]
2187 }},
2188 {{
2189 "effect": "deny",
2190 "actions": ["config:write"],
2191 "resources": [
2192 "config:{audit}.*",
2193 "config:{evidence}.*",
2194 "config:{query_audit}.*"
2195 ]
2196 }}
2197 ]
2198 }}"#,
2199 id = REGULATED_PROTECT_MANAGED_POLICY,
2200 audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2201 evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2202 query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2203 ))
2204 .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2205 auth_store
2206 .put_policy(policy)
2207 .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2208
2209 let now_ms = crate::utils::now_unix_millis() as u128;
2210 let entries = vec![
2211 regulated_registry_entry(
2212 REGULATED_PROTECT_MANAGED_POLICY,
2213 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2214 "iam_policy",
2215 "policy:*",
2216 &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2217 EvidenceRequirement::Metadata,
2218 now_ms,
2219 ),
2220 regulated_registry_entry(
2221 REGULATED_AUDIT_CONFIG_NAMESPACE,
2222 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2223 "config_namespace",
2224 "config:write",
2225 &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2226 EvidenceRequirement::Metadata,
2227 now_ms,
2228 ),
2229 regulated_registry_entry(
2230 REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2231 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2232 "config_namespace",
2233 "config:write",
2234 &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2235 EvidenceRequirement::Metadata,
2236 now_ms,
2237 ),
2238 regulated_registry_entry(
2239 REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2240 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2241 "config_namespace",
2242 "config:write",
2243 &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2244 EvidenceRequirement::Metadata,
2245 now_ms,
2246 ),
2247 ];
2248
2249 for entry in entries.iter().cloned() {
2250 runtime
2251 .config_registry()
2252 .restore_bootstrap_entry(entry)
2253 .map_err(|err| format!("install regulated registry entry: {err}"))?;
2254 }
2255 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2256 Ok(())
2257}
2258
2259fn regulated_registry_entry(
2260 id: &str,
2261 resource_type: &str,
2262 schema: &str,
2263 required_action: &str,
2264 required_resource: &str,
2265 evidence_requirement: crate::auth::registry::EvidenceRequirement,
2266 updated_at_ms: u128,
2267) -> crate::auth::registry::ConfigRegistryEntry {
2268 crate::auth::registry::ConfigRegistryEntry {
2269 id: id.to_string(),
2270 version: 1,
2271 resource_type: resource_type.to_string(),
2272 schema: schema.to_string(),
2273 mutability: crate::auth::registry::Mutability::Immutable,
2274 sensitivity: crate::auth::registry::Sensitivity::Internal,
2275 managed: true,
2276 required_action: required_action.to_string(),
2277 required_resource: required_resource.to_string(),
2278 evidence_requirement,
2279 updated_by: "system:regulated-preset".to_string(),
2280 updated_at_ms,
2281 }
2282}
2283
2284fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2285 let store = runtime.db().store();
2286 let mut tree = crate::serde_json::Map::new();
2287 tree.insert(
2288 BOOTSTRAP_COMPLETED_KEY.to_string(),
2289 crate::serde_json::Value::Bool(true),
2290 );
2291 tree.insert(
2292 BOOTSTRAP_PRESET_KEY.to_string(),
2293 crate::serde_json::Value::String(preset.to_string()),
2294 );
2295 if let Some(id) = first_admin_id {
2296 tree.insert(
2297 BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2298 crate::serde_json::Value::String(id.to_string()),
2299 );
2300 }
2301 let json = crate::serde_json::Value::Object(tree);
2302 store.set_config_tree("", &json);
2303}
2304
2305fn merge_telemetry_with_config(
2316 mut cli: crate::telemetry::TelemetryConfig,
2317 runtime: &RedDBRuntime,
2318) -> crate::telemetry::TelemetryConfig {
2319 use crate::storage::schema::Value;
2320
2321 let store = runtime.db().store();
2322
2323 if !cli.level_explicit {
2324 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2325 cli.level_filter = v.to_string();
2326 }
2327 }
2328 if !cli.format_explicit {
2329 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2330 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2331 cli.format = parsed;
2332 }
2333 }
2334 }
2335 if !cli.rotation_keep_days_explicit {
2336 match store.get_config("red.logging.keep_days") {
2337 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2338 cli.rotation_keep_days = n as u16
2339 }
2340 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2341 cli.rotation_keep_days = n as u16
2342 }
2343 Some(Value::Text(v)) => {
2344 if let Ok(n) = v.parse::<u16>() {
2345 cli.rotation_keep_days = n;
2346 }
2347 }
2348 _ => {}
2349 }
2350 }
2351 if !cli.file_prefix_explicit {
2352 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2353 if !v.is_empty() {
2354 cli.file_prefix = v.to_string();
2355 }
2356 }
2357 }
2358 if !cli.log_dir_explicit && !cli.log_file_disabled {
2361 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2362 if !v.is_empty() {
2363 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2364 }
2365 }
2366 }
2367
2368 cli
2369}
2370
2371#[cfg(test)]
2372mod telemetry_merge_tests {
2373 use super::*;
2374 use crate::telemetry::{LogFormat, TelemetryConfig};
2375
2376 fn fresh_runtime() -> RedDBRuntime {
2377 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2378 }
2379
2380 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2381 runtime
2382 .db()
2383 .store()
2384 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2385 }
2386
2387 fn cli_base() -> TelemetryConfig {
2388 TelemetryConfig {
2391 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2392 format: LogFormat::Json,
2393 ..Default::default()
2394 }
2395 }
2396
2397 #[test]
2398 fn config_log_dir_promoted_when_flag_absent() {
2399 let runtime = fresh_runtime();
2400 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2401 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2402 assert_eq!(
2403 merged.log_dir.as_deref(),
2404 Some(std::path::Path::new("/var/log/reddb"))
2405 );
2406 }
2407
2408 #[test]
2409 fn explicit_log_dir_wins_over_config() {
2410 let runtime = fresh_runtime();
2411 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2412 let mut cli = cli_base();
2413 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2414 cli.log_dir_explicit = true;
2415 let merged = merge_telemetry_with_config(cli, &runtime);
2416 assert_eq!(
2417 merged.log_dir.as_deref(),
2418 Some(std::path::Path::new("/custom/dir"))
2419 );
2420 }
2421
2422 #[test]
2423 fn no_log_file_beats_config_log_dir() {
2424 let runtime = fresh_runtime();
2425 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2426 let mut cli = cli_base();
2427 cli.log_dir = None;
2428 cli.log_file_disabled = true;
2429 let merged = merge_telemetry_with_config(cli, &runtime);
2430 assert!(
2431 merged.log_dir.is_none(),
2432 "--no-log-file must veto config dir"
2433 );
2434 }
2435
2436 #[test]
2437 fn config_format_promoted_on_non_tty_default() {
2438 let runtime = fresh_runtime();
2442 set_str(&runtime, "red.logging.format", "pretty");
2443 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2444 assert_eq!(merged.format, LogFormat::Pretty);
2445 }
2446
2447 #[test]
2448 fn explicit_format_wins_over_config() {
2449 let runtime = fresh_runtime();
2450 set_str(&runtime, "red.logging.format", "pretty");
2451 let mut cli = cli_base();
2452 cli.format = LogFormat::Json;
2453 cli.format_explicit = true;
2454 let merged = merge_telemetry_with_config(cli, &runtime);
2455 assert_eq!(merged.format, LogFormat::Json);
2456 }
2457}
2458
2459#[inline(never)]
2460fn build_http_server(
2461 runtime: RedDBRuntime,
2462 auth_store: Arc<AuthStore>,
2463 bind_addr: String,
2464) -> RedDBServer {
2465 build_http_server_with_transport_readiness(
2466 runtime,
2467 auth_store,
2468 bind_addr,
2469 TransportReadiness::default(),
2470 )
2471}
2472
2473fn apply_http_limits(
2479 server: RedDBServer,
2480 config: &ServerCommandConfig,
2481 runtime: &RedDBRuntime,
2482) -> RedDBServer {
2483 let store = runtime.db().store();
2484 let resolved =
2485 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2486 .get_config(key)
2487 {
2488 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2489 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2490 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2491 _ => None,
2492 });
2493 tracing::info!(
2494 target: "reddb::http_limits",
2495 max_handlers = resolved.max_handlers,
2496 handler_timeout_ms = resolved.handler_timeout_ms,
2497 retry_after_secs = resolved.retry_after_secs,
2498 max_inflight_per_principal = resolved.max_inflight_per_principal,
2499 "http_limits resolved"
2500 );
2501 server.with_http_limits(resolved)
2502}
2503
2504#[inline(never)]
2505fn build_http_server_with_transport_readiness(
2506 runtime: RedDBRuntime,
2507 auth_store: Arc<AuthStore>,
2508 bind_addr: String,
2509 transport_readiness: TransportReadiness,
2510) -> RedDBServer {
2511 RedDBServer::with_options(
2512 runtime,
2513 ServerOptions {
2514 bind_addr,
2515 transport_readiness,
2516 ..ServerOptions::default()
2517 },
2518 )
2519 .with_auth(auth_store)
2520}
2521
2522#[inline(never)]
2526fn build_admin_only_server(
2527 runtime: RedDBRuntime,
2528 auth_store: Arc<AuthStore>,
2529 bind_addr: String,
2530) -> RedDBServer {
2531 RedDBServer::with_options(
2532 runtime,
2533 ServerOptions {
2534 bind_addr,
2535 surface: crate::server::ServerSurface::AdminOnly,
2536 ..ServerOptions::default()
2537 },
2538 )
2539 .with_auth(auth_store)
2540}
2541
2542#[inline(never)]
2546fn build_metrics_only_server(
2547 runtime: RedDBRuntime,
2548 auth_store: Arc<AuthStore>,
2549 bind_addr: String,
2550) -> RedDBServer {
2551 RedDBServer::with_options(
2552 runtime,
2553 ServerOptions {
2554 bind_addr,
2555 surface: crate::server::ServerSurface::MetricsOnly,
2556 ..ServerOptions::default()
2557 },
2558 )
2559 .with_auth(auth_store)
2560}
2561
2562fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2566 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2567 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2568 let _ = server.serve_in_background();
2569 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2570 }
2571 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2572 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2573 let _ = server.serve_in_background();
2574 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2575 }
2576}
2577
2578#[inline(never)]
2579fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2580 let cli_telemetry = config.telemetry.clone();
2581 let mut transport_readiness = TransportReadiness::default();
2582 let Some(listener) = bind_listener_for_startup(
2583 &mut transport_readiness,
2584 "http",
2585 &bind_addr,
2586 config.http_bind_explicit,
2587 )?
2588 else {
2589 return Err(format!(
2590 "no HTTP listener started; implicit bind {} failed",
2591 bind_addr
2592 ));
2593 };
2594 let db_options = config.to_db_options()?;
2595 let (runtime, auth_store, _telemetry_guard) =
2596 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2597 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2598 spawn_admin_metrics_listeners(&runtime, &auth_store);
2599 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2600 let server = build_http_server_with_transport_readiness(
2601 runtime.clone(),
2602 auth_store,
2603 bind_addr.clone(),
2604 transport_readiness,
2605 );
2606 let server = apply_http_limits(server, &config, &runtime);
2607 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2608 server.serve_on(listener).map_err(|err| err.to_string())
2609}
2610
2611fn spawn_http_tls_listener(
2617 config: &ServerCommandConfig,
2618 runtime: &RedDBRuntime,
2619 auth_store: &Arc<AuthStore>,
2620) -> Result<(), String> {
2621 let Some(addr) = config.http_tls_bind_addr.clone() else {
2622 return Ok(());
2623 };
2624
2625 let tls_config = resolve_http_tls_config(config)?;
2626 let server_config = crate::server::tls::build_server_config(&tls_config)
2627 .map_err(|err| format!("HTTP TLS: {err}"))?;
2628
2629 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2630 let server = apply_http_limits(server, config, runtime);
2631 let _handle = server.serve_tls_in_background(server_config);
2632 tracing::info!(
2633 transport = "https",
2634 bind = %addr,
2635 mtls = %tls_config.client_ca_path.is_some(),
2636 "TLS listener online"
2637 );
2638 Ok(())
2639}
2640
2641fn resolve_http_tls_config(
2643 config: &ServerCommandConfig,
2644) -> Result<crate::server::tls::HttpTlsConfig, String> {
2645 match (&config.http_tls_cert, &config.http_tls_key) {
2646 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2647 cert_path: cert.clone(),
2648 key_path: key.clone(),
2649 client_ca_path: config.http_tls_client_ca.clone(),
2650 }),
2651 (None, None) => {
2652 let dir = config
2654 .path
2655 .as_ref()
2656 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2657 .unwrap_or_else(|| std::path::PathBuf::from("."));
2658 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2659 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2660 Ok(crate::server::tls::HttpTlsConfig {
2661 cert_path: auto.cert_path,
2662 key_path: auto.key_path,
2663 client_ca_path: config.http_tls_client_ca.clone(),
2664 })
2665 }
2666 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2667 }
2668}
2669
2670#[inline(never)]
2671fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2672 let workers = config.workers;
2673 let cli_telemetry = config.telemetry.clone();
2674 let db_options = config.to_db_options()?;
2675 let rt_config = detect_runtime_config();
2676 let mut transport_readiness = TransportReadiness::default();
2677 let Some(grpc_listener) = bind_listener_for_startup(
2678 &mut transport_readiness,
2679 "grpc",
2680 &bind_addr,
2681 config.grpc_bind_explicit,
2682 )?
2683 else {
2684 return Err(format!(
2685 "no gRPC listener started; implicit bind {} failed",
2686 bind_addr
2687 ));
2688 };
2689
2690 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2691
2692 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2693 .enable_all()
2694 .worker_threads(worker_threads)
2695 .thread_stack_size(rt_config.stack_size)
2696 .build()
2697 .map_err(|err| format!("tokio runtime: {err}"))?;
2698
2699 let (runtime, auth_store, _telemetry_guard) =
2701 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2702 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2703 let signal_runtime = runtime.clone();
2704 tokio_runtime.block_on(async move {
2705 spawn_lifecycle_signal_handler(signal_runtime).await;
2706 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2708
2709 spawn_pg_listener(&config, &runtime);
2711
2712 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2716
2717 let server = RedDBGrpcServer::with_options(
2718 runtime,
2719 GrpcServerOptions {
2720 bind_addr: bind_addr.clone(),
2721 tls: None,
2722 },
2723 auth_store,
2724 );
2725
2726 tracing::info!(
2727 transport = "grpc",
2728 bind = %bind_addr,
2729 cpus = rt_config.available_cpus,
2730 workers = worker_threads,
2731 "listener online"
2732 );
2733 server
2734 .serve_on(grpc_listener)
2735 .await
2736 .map_err(|err| err.to_string())
2737 })
2738}
2739
2740#[inline(never)]
2741fn run_dual_server(
2742 config: ServerCommandConfig,
2743 grpc_bind_addr: String,
2744 http_bind_addr: String,
2745) -> Result<(), String> {
2746 let workers = config.workers;
2747 let cli_telemetry = config.telemetry.clone();
2748 let db_options = config.to_db_options()?;
2749 let rt_config = detect_runtime_config();
2750 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2751 let mut transport_readiness = TransportReadiness::default();
2752 let http_listener = bind_listener_for_startup(
2753 &mut transport_readiness,
2754 "http",
2755 &http_bind_addr,
2756 config.http_bind_explicit,
2757 )?;
2758 let grpc_listener = bind_listener_for_startup(
2759 &mut transport_readiness,
2760 "grpc",
2761 &grpc_bind_addr,
2762 config.grpc_bind_explicit,
2763 )?;
2764 if http_listener.is_none() && grpc_listener.is_none() {
2765 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2766 }
2767 let (runtime, auth_store, _telemetry_guard) =
2768 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2769 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2770
2771 spawn_admin_metrics_listeners(&runtime, &auth_store);
2772 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2773
2774 let http_handle = if let Some(listener) = http_listener {
2775 let http_server = build_http_server_with_transport_readiness(
2776 runtime.clone(),
2777 auth_store.clone(),
2778 http_bind_addr.clone(),
2779 transport_readiness.clone(),
2780 );
2781 let http_server = apply_http_limits(http_server, &config, &runtime);
2782 Some(http_server.serve_in_background_on(listener))
2783 } else {
2784 None
2785 };
2786
2787 thread::sleep(Duration::from_millis(150));
2788 if let Some(handle) = http_handle.as_ref() {
2789 if handle.is_finished() {
2790 let handle = http_handle.unwrap();
2791 return match handle.join() {
2792 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2793 Ok(Err(err)) => Err(err.to_string()),
2794 Err(_) => Err("HTTP server thread panicked".to_string()),
2795 };
2796 }
2797 }
2798 if grpc_listener.is_none() {
2799 let Some(handle) = http_handle else {
2800 return Err("no listener started".to_string());
2801 };
2802 return match handle.join() {
2803 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2804 Ok(Err(err)) => Err(err.to_string()),
2805 Err(_) => Err("HTTP server thread panicked".to_string()),
2806 };
2807 }
2808 let grpc_listener = grpc_listener.expect("checked above");
2809
2810 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2811 .enable_all()
2812 .worker_threads(worker_threads)
2813 .thread_stack_size(rt_config.stack_size)
2814 .build()
2815 .map_err(|err| format!("tokio runtime: {err}"))?;
2816
2817 let signal_runtime = runtime.clone();
2818 tokio_runtime.block_on(async move {
2819 spawn_lifecycle_signal_handler(signal_runtime).await;
2820 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2822
2823 spawn_pg_listener(&config, &runtime);
2825
2826 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2828
2829 let server = RedDBGrpcServer::with_options(
2830 runtime,
2831 GrpcServerOptions {
2832 bind_addr: grpc_bind_addr.clone(),
2833 tls: None,
2834 },
2835 auth_store,
2836 );
2837
2838 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2839 tracing::info!(
2840 transport = "grpc",
2841 bind = %grpc_bind_addr,
2842 cpus = rt_config.available_cpus,
2843 workers = worker_threads,
2844 "listener online"
2845 );
2846 server
2847 .serve_on(grpc_listener)
2848 .await
2849 .map_err(|err| err.to_string())
2850 })
2851}
2852
2853#[cfg(test)]
2854mod tests {
2855 use super::*;
2856
2857 #[test]
2858 fn render_systemd_unit_contains_expected_execstart() {
2859 let config = SystemdServiceConfig {
2860 service_name: "reddb".to_string(),
2861 binary_path: PathBuf::from("/usr/local/bin/red"),
2862 run_user: "reddb".to_string(),
2863 run_group: "reddb".to_string(),
2864 data_path: reddb_file::default_service_database_path(),
2865 router_bind_addr: None,
2866 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2867 http_bind_addr: None,
2868 };
2869
2870 let unit = render_systemd_unit(&config);
2871 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2872 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2873 }
2874
2875 #[test]
2876 fn systemd_service_config_derives_paths() {
2877 let config = SystemdServiceConfig {
2878 service_name: "reddb-api".to_string(),
2879 binary_path: PathBuf::from("/usr/local/bin/red"),
2880 run_user: "reddb".to_string(),
2881 run_group: "reddb".to_string(),
2882 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2883 router_bind_addr: None,
2884 grpc_bind_addr: None,
2885 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2886 };
2887
2888 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2889 assert_eq!(
2890 config.unit_path(),
2891 PathBuf::from("/etc/systemd/system/reddb-api.service")
2892 );
2893 }
2894
2895 #[test]
2896 fn render_systemd_unit_supports_dual_transport() {
2897 let config = SystemdServiceConfig {
2898 service_name: "reddb".to_string(),
2899 binary_path: PathBuf::from("/usr/local/bin/red"),
2900 run_user: "reddb".to_string(),
2901 run_group: "reddb".to_string(),
2902 data_path: reddb_file::default_service_database_path(),
2903 router_bind_addr: None,
2904 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2905 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2906 };
2907
2908 let unit = render_systemd_unit(&config);
2909 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2910 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2911 }
2912
2913 #[test]
2914 fn render_systemd_unit_supports_router_mode() {
2915 let config = SystemdServiceConfig {
2916 service_name: "reddb".to_string(),
2917 binary_path: PathBuf::from("/usr/local/bin/red"),
2918 run_user: "reddb".to_string(),
2919 run_group: "reddb".to_string(),
2920 data_path: reddb_file::default_service_database_path(),
2921 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2922 grpc_bind_addr: None,
2923 http_bind_addr: None,
2924 };
2925
2926 let unit = render_systemd_unit(&config);
2927 assert!(unit.contains("--bind 127.0.0.1:5050"));
2928 assert!(!unit.contains("--grpc-bind"));
2929 assert!(!unit.contains("--http-bind"));
2930 }
2931
2932 #[test]
2933 fn explicit_bind_collision_is_fatal() {
2934 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2935 let addr = held.local_addr().expect("held addr").to_string();
2936 let mut readiness = TransportReadiness::default();
2937
2938 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2939
2940 assert!(error.contains("explicit http listener bind"));
2941 assert_eq!(readiness.active.len(), 0);
2942 assert_eq!(readiness.failed.len(), 1);
2943 assert!(readiness.failed[0].explicit);
2944 assert_eq!(readiness.failed[0].bind_addr, addr);
2945 }
2946
2947 fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2954 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2955 LOCK.get_or_init(|| std::sync::Mutex::new(()))
2956 }
2957
2958 fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
2959 ServerCommandConfig {
2960 path: None,
2961 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2962 router_bind_explicit: false,
2963 grpc_bind_addr: None,
2964 grpc_bind_explicit: false,
2965 grpc_tls_bind_addr: None,
2966 grpc_tls_cert: None,
2967 grpc_tls_key: None,
2968 grpc_tls_client_ca: None,
2969 http_bind_addr: None,
2970 http_bind_explicit: false,
2971 http_tls_bind_addr: None,
2972 http_tls_cert: None,
2973 http_tls_key: None,
2974 http_tls_client_ca: None,
2975 wire_bind_addr: None,
2976 wire_bind_explicit: false,
2977 wire_tls_bind_addr: None,
2978 wire_tls_cert: None,
2979 wire_tls_key: None,
2980 pg_bind_addr: None,
2981 create_if_missing: true,
2982 read_only: false,
2983 role: "standalone".to_string(),
2984 primary_addr: None,
2985 storage_profile: StorageProfileSelection::embedded_single_file(),
2986 vault: true,
2989 no_auth,
2990 workers: None,
2991 telemetry: None,
2992 http_limits_cli: crate::server::HttpLimitsCliInput::default(),
2993 }
2994 }
2995
2996 #[test]
2997 fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
2998 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2999 unsafe {
3004 std::env::set_var("REDDB_USERNAME", "admin");
3005 std::env::set_var("REDDB_PASSWORD", "hunter2");
3006 }
3007 let config = no_auth_test_config(true);
3008 let options = config.to_db_options().expect("to_db_options");
3009
3010 assert!(no_auth_active(&options), "metadata should be stamped");
3011 assert!(!options.auth.enabled, "auth.enabled must be forced off");
3012 assert!(
3013 !options.auth.require_auth,
3014 "require_auth must be forced off"
3015 );
3016 assert!(
3017 !options.auth.vault_enabled,
3018 "vault_enabled must be forced off (overrides --vault)"
3019 );
3020 assert_eq!(
3021 options.metadata.get(NO_AUTH_META).map(String::as_str),
3022 Some("true"),
3023 );
3024
3025 unsafe {
3027 std::env::remove_var("REDDB_USERNAME");
3028 std::env::remove_var("REDDB_PASSWORD");
3029 }
3030 }
3031
3032 #[test]
3033 fn default_behaviour_without_no_auth_flag_is_unchanged() {
3034 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3035 let config = no_auth_test_config(false);
3036 let options = config.to_db_options().expect("to_db_options");
3037
3038 assert!(
3039 !no_auth_active(&options),
3040 "default boot must not be marked no-auth"
3041 );
3042 assert!(
3043 options.metadata.get(NO_AUTH_META).is_none(),
3044 "metadata key must be absent when flag is off"
3045 );
3046 assert!(options.auth.vault_enabled);
3048 }
3049
3050 #[test]
3051 fn no_auth_active_blocks_bootstrap_from_env() {
3052 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3053 unsafe {
3058 std::env::set_var("REDDB_USERNAME", "admin");
3059 std::env::set_var("REDDB_PASSWORD", "hunter2");
3060 }
3061
3062 let options = no_auth_test_config(true)
3063 .to_db_options()
3064 .expect("to_db_options");
3065
3066 let auth_store = AuthStore::new(options.auth.clone());
3070 if !no_auth_active(&options) {
3071 auth_store.bootstrap_from_env();
3072 }
3073
3074 assert!(
3075 auth_store.needs_bootstrap(),
3076 "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3077 );
3078
3079 unsafe {
3081 std::env::remove_var("REDDB_USERNAME");
3082 std::env::remove_var("REDDB_PASSWORD");
3083 }
3084 }
3085
3086 fn clear_preset_env() {
3093 unsafe {
3095 std::env::remove_var(PRESET_ENV);
3096 std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3097 std::env::remove_var("REDDB_USERNAME");
3098 std::env::remove_var("REDDB_PASSWORD");
3099 std::env::remove_var("REDDB_USERNAME_FILE");
3100 std::env::remove_var("REDDB_PASSWORD_FILE");
3101 }
3102 }
3103
3104 fn clear_backup_env() {
3105 unsafe {
3107 std::env::remove_var("REDDB_BACKUP_S3_ENDPOINT");
3108 std::env::remove_var("REDDB_BACKUP_S3_BUCKET");
3109 std::env::remove_var("REDDB_BACKUP_S3_PREFIX");
3110 std::env::remove_var("REDDB_BACKUP_S3_ACCESS_KEY_ID");
3111 std::env::remove_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY");
3112 std::env::remove_var("REDDB_BACKUP_S3_REGION");
3113 std::env::remove_var("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS");
3114 std::env::remove_var("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS");
3115 std::env::remove_var("REDDB_BACKUP_PAUSE_ON_LAG_SECS");
3116 }
3117 }
3118
3119 fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3120 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3121 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3122 (runtime, auth_store)
3123 }
3124
3125 #[test]
3126 fn simple_preset_is_default_and_persists_state() {
3127 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3128 clear_preset_env();
3129
3130 let (runtime, auth_store) = fresh_runtime_and_store();
3131 apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3132
3133 assert!(
3135 auth_store.needs_bootstrap(),
3136 "simple preset must not create an admin"
3137 );
3138
3139 let store = runtime.db().store();
3141 let completed = store
3142 .get_config(BOOTSTRAP_COMPLETED_KEY)
3143 .expect("completed key persisted");
3144 assert!(matches!(
3145 completed,
3146 crate::storage::schema::Value::Boolean(true)
3147 ));
3148 let preset = store
3149 .get_config(BOOTSTRAP_PRESET_KEY)
3150 .expect("preset key persisted");
3151 match preset {
3152 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3153 other => panic!("expected Text(simple), got {other:?}"),
3154 }
3155 assert!(
3156 store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3157 "simple preset must not record a first admin"
3158 );
3159
3160 clear_preset_env();
3161 }
3162
3163 #[test]
3164 fn production_preset_creates_first_admin_with_allow_all_policy() {
3165 use crate::auth::policies::{EvalContext, ResourceRef};
3166 use crate::auth::UserId;
3167
3168 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3169 clear_preset_env();
3170 unsafe {
3172 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3173 std::env::set_var("REDDB_USERNAME", "ops");
3174 std::env::set_var("REDDB_PASSWORD", "hunter2");
3175 }
3176
3177 let (runtime, auth_store) = fresh_runtime_and_store();
3178 apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3179
3180 assert!(
3182 !auth_store.needs_bootstrap(),
3183 "production preset must seal bootstrap"
3184 );
3185 let users = auth_store.list_users();
3186 assert_eq!(users.len(), 1);
3187 let admin = &users[0];
3188 assert_eq!(admin.username, "ops");
3189 assert!(
3190 admin.system_owned,
3191 "first admin must be system-owned to pass the managed-config gate"
3192 );
3193 assert!(
3194 admin.tenant_id.is_none(),
3195 "first admin must be platform-scoped (tenant=None)"
3196 );
3197
3198 let policy = auth_store
3200 .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3201 .expect("allow-all policy installed");
3202 assert!(!policy.statements.is_empty());
3203
3204 let actor = UserId::platform("ops");
3207 let ctx = EvalContext {
3208 principal_tenant: None,
3209 current_tenant: None,
3210 peer_ip: None,
3211 mfa_present: false,
3212 now_ms: 1_700_000_000_000,
3213 principal_is_admin_role: true,
3214 principal_is_system_owned: true,
3215 principal_is_platform_scoped: true,
3216 };
3217 let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3218 assert!(
3219 auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3220 "allow-all policy must grant arbitrary actions via the evaluator"
3221 );
3222
3223 let store = runtime.db().store();
3225 match store
3226 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3227 .expect("first_admin_id persisted")
3228 {
3229 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3230 other => panic!("expected Text(ops), got {other:?}"),
3231 }
3232 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3233 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3234 other => panic!("expected Text(production), got {other:?}"),
3235 }
3236
3237 clear_preset_env();
3238 }
3239
3240 #[test]
3241 fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3242 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3243 clear_preset_env();
3244 unsafe {
3246 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3247 }
3248
3249 let (runtime, auth_store) = fresh_runtime_and_store();
3250 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3251
3252 assert!(runtime.query_audit().is_enabled());
3253 assert!(runtime.query_audit().rules().is_empty());
3254 assert!(
3255 runtime
3256 .db()
3257 .store()
3258 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3259 .is_some(),
3260 "regulated preset should create the query-audit stream"
3261 );
3262
3263 runtime
3264 .execute_query("CREATE TABLE docs (id INT)")
3265 .expect("create table");
3266 runtime
3267 .execute_query("INSERT INTO docs (id) VALUES (1)")
3268 .expect("insert");
3269 runtime.execute_query("SELECT * FROM docs").expect("select");
3270 let rows = runtime
3271 .db()
3272 .store()
3273 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3274 .expect("query audit collection")
3275 .query_all(|_| true);
3276 assert!(
3277 rows.is_empty(),
3278 "regulated preset must not globally audit every query"
3279 );
3280
3281 clear_preset_env();
3282 }
3283
3284 #[test]
3285 fn managed_backup_env_rejects_primary_replica_single_file_storage() {
3286 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3287 clear_backup_env();
3288 unsafe {
3290 std::env::set_var("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.test");
3291 std::env::set_var("REDDB_BACKUP_S3_BUCKET", "reddb");
3292 std::env::set_var("REDDB_BACKUP_S3_PREFIX", "clusters/prod");
3293 std::env::set_var("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK");
3294 std::env::set_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK");
3295 }
3296
3297 let mut config = no_auth_test_config(false);
3298 config.role = "primary".to_string();
3299 config.storage_profile = crate::storage::StorageDeployPreset::PrimaryReplicaDev.selection();
3300
3301 let err = config.to_db_options().unwrap_err();
3302 assert!(err.contains("managed backup"), "got: {err}");
3303 assert!(err.contains("operational-directory"), "got: {err}");
3304
3305 clear_backup_env();
3306 }
3307
3308 #[test]
3309 fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3310 use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3311 use crate::auth::store::PrincipalRef;
3312 use crate::auth::{Role, UserId};
3313 use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3314 use crate::storage::schema::Value;
3315
3316 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3317 clear_preset_env();
3318 unsafe {
3320 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3321 }
3322
3323 let options = no_auth_test_config(false)
3324 .to_db_options()
3325 .expect("regulated options");
3326 assert!(
3327 options.control_events.compliance_mode,
3328 "regulated preset must enable fail-closed control evidence before runtime boot"
3329 );
3330 assert!(
3331 options.query_audit.enabled && options.query_audit.rules.is_empty(),
3332 "regulated preset must enable query-audit infrastructure without global rules"
3333 );
3334
3335 let runtime = RedDBRuntime::with_options(options).expect("runtime");
3336 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3337 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3338 runtime.set_auth_store(Arc::clone(&auth_store));
3339
3340 assert!(runtime.control_events_require_persistence());
3341 assert!(runtime.query_audit().is_enabled());
3342 assert!(runtime.query_audit().rules().is_empty());
3343 assert!(auth_store
3344 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3345 .is_some());
3346
3347 let managed_policy = runtime
3348 .config_registry()
3349 .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3350 .expect("regulated managed policy registry entry");
3351 assert!(managed_policy.managed);
3352 assert_eq!(managed_policy.resource_type, "policy");
3353 assert!(
3354 runtime
3355 .config_registry()
3356 .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3357 .expect("regulated audit config namespace")
3358 .managed
3359 );
3360
3361 let registry_rows = runtime
3362 .execute_query(&format!(
3363 "SELECT id, managed FROM red.registry WHERE id = '{}'",
3364 REGULATED_PROTECT_MANAGED_POLICY
3365 ))
3366 .expect("red.registry query");
3367 assert_eq!(registry_rows.result.records.len(), 1);
3368 assert_eq!(
3369 registry_rows.result.records[0].get("managed"),
3370 Some(&Value::Boolean(true))
3371 );
3372
3373 let managed_policy_rows = runtime
3374 .execute_query(&format!(
3375 "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3376 REGULATED_PROTECT_MANAGED_POLICY
3377 ))
3378 .expect("red.managed_policies query");
3379 assert_eq!(managed_policy_rows.result.records.len(), 1);
3380
3381 let capability_rows = runtime
3382 .execute_query(
3383 "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3384 )
3385 .expect("red.control_capabilities query");
3386 assert_eq!(capability_rows.result.records.len(), 1);
3387
3388 auth_store
3389 .create_user("alice", "p", Role::Admin)
3390 .expect("create ordinary admin");
3391 let allow_all = Policy::from_json_str(
3392 r#"{
3393 "id": "alice-allow-all",
3394 "version": 1,
3395 "statements": [{
3396 "effect": "allow",
3397 "actions": ["*"],
3398 "resources": ["*"]
3399 }]
3400 }"#,
3401 )
3402 .expect("allow-all policy");
3403 auth_store.put_policy(allow_all).expect("install allow-all");
3404 auth_store
3405 .attach_policy(
3406 PrincipalRef::User(UserId::platform("alice")),
3407 "alice-allow-all",
3408 )
3409 .expect("attach allow-all");
3410 let ctx = EvalContext {
3411 principal_tenant: None,
3412 current_tenant: None,
3413 peer_ip: None,
3414 mfa_present: false,
3415 now_ms: 1_700_000_000_000,
3416 principal_is_admin_role: true,
3417 principal_is_system_owned: false,
3418 principal_is_platform_scoped: true,
3419 };
3420 assert!(
3421 auth_store.check_policy_authz(
3422 &UserId::platform("alice"),
3423 "policy:drop",
3424 &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
3425 &ctx,
3426 ),
3427 "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
3428 );
3429
3430 set_current_auth_identity("alice".to_string(), Role::Admin);
3431 let denied = runtime.execute_query(&format!(
3432 "DROP POLICY '{}'",
3433 REGULATED_PROTECT_MANAGED_POLICY
3434 ));
3435 clear_current_auth_identity();
3436 let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
3437 assert!(
3438 err.to_string().contains("managed policy"),
3439 "error should name the managed guardrail: {err}"
3440 );
3441 assert!(
3442 auth_store
3443 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3444 .is_some(),
3445 "denied mutation must leave managed policy installed"
3446 );
3447
3448 let denied_events = runtime
3449 .execute_query(&format!(
3450 "SELECT action, resource, outcome FROM red.control_events \
3451 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
3452 REGULATED_PROTECT_MANAGED_POLICY
3453 ))
3454 .expect("red.control_events denied policy drop");
3455 assert_eq!(denied_events.result.records.len(), 1);
3456 assert_eq!(
3457 denied_events.result.records[0].get("outcome"),
3458 Some(&Value::text("denied"))
3459 );
3460
3461 set_current_auth_identity("alice".to_string(), Role::Admin);
3462 let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
3463 clear_current_auth_identity();
3464 let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
3465 assert!(
3466 err.to_string().contains("managed config"),
3467 "error should name the managed config guardrail: {err}"
3468 );
3469
3470 let denied_config_events = runtime
3471 .execute_query(
3472 "SELECT action, resource, outcome FROM red.control_events \
3473 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
3474 )
3475 .expect("red.control_events denied config write");
3476 assert_eq!(denied_config_events.result.records.len(), 1);
3477 assert_eq!(
3478 denied_config_events.result.records[0].get("outcome"),
3479 Some(&Value::text("denied"))
3480 );
3481
3482 runtime
3483 .execute_query("CREATE TABLE regulated_docs (id INT)")
3484 .expect("create user table");
3485 runtime
3486 .execute_query("SELECT * FROM regulated_docs")
3487 .expect("select user table");
3488 let audit_rows = runtime
3489 .db()
3490 .store()
3491 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3492 .expect("query audit collection")
3493 .query_all(|_| true);
3494 assert!(
3495 audit_rows.is_empty(),
3496 "regulated preset must not globally audit data-plane queries"
3497 );
3498
3499 clear_preset_env();
3500 }
3501
3502 #[test]
3503 fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
3504 use crate::auth::policies::{EvalContext, ResourceRef};
3505 use crate::auth::UserId;
3506 use crate::storage::schema::Value;
3507
3508 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3509 clear_preset_env();
3510
3511 let manifest_dir = std::env::current_dir()
3512 .expect("current dir")
3513 .join(".red/tmp/bootstrap-manifest-tests");
3514 std::fs::create_dir_all(&manifest_dir).expect("create manifest test dir");
3515 let manifest_path = manifest_dir.join(format!(
3516 "reddb-bootstrap-manifest-{}-{}.json",
3517 std::process::id(),
3518 std::time::SystemTime::now()
3519 .duration_since(std::time::UNIX_EPOCH)
3520 .unwrap_or_default()
3521 .as_millis()
3522 ));
3523 std::fs::write(
3524 &manifest_path,
3525 r#"{
3526 "users": [
3527 {
3528 "username": "ops",
3529 "password": "hunter2",
3530 "role": "admin",
3531 "system_owned": true
3532 }
3533 ],
3534 "policies": [
3535 {
3536 "id": "bootstrap-registry-admin",
3537 "version": 1,
3538 "statements": [
3539 {
3540 "effect": "allow",
3541 "actions": ["red.registry:*", "policy:*", "config:write", "select"],
3542 "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
3543 }
3544 ]
3545 }
3546 ],
3547 "managed_policies": [
3548 {
3549 "id": "managed-deny-drop",
3550 "version": 1,
3551 "statements": [
3552 {
3553 "effect": "deny",
3554 "actions": ["policy:drop"],
3555 "resources": ["policy:managed-deny-drop"]
3556 }
3557 ],
3558 "required_resource": "policy:managed-deny-drop",
3559 "evidence": "full"
3560 }
3561 ],
3562 "attachments": [
3563 {"user": "ops", "policy": "bootstrap-registry-admin"}
3564 ],
3565 "managed_config_namespaces": [
3566 {
3567 "id": "red.ai",
3568 "required_action": "config:write",
3569 "required_resource": "config:red.ai.*",
3570 "evidence": "metadata"
3571 }
3572 ],
3573 "config": [
3574 {"key": "red.ai.default.provider", "value": "openai"},
3575 {
3576 "key": "red.ai.openai.default.secret_ref",
3577 "secret_ref": {"collection": "red.vault", "key": "openai"}
3578 }
3579 ],
3580 "actor": "ops"
3581 }"#,
3582 )
3583 .expect("write manifest");
3584 unsafe {
3586 std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
3587 }
3588
3589 let (runtime, auth_store) = fresh_runtime_and_store();
3590 apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
3591
3592 let users = auth_store.list_users();
3593 assert_eq!(users.len(), 1);
3594 assert_eq!(users[0].username, "ops");
3595 assert!(users[0].system_owned);
3596
3597 let actor = UserId::platform("ops");
3598 let ctx = EvalContext {
3599 principal_tenant: None,
3600 current_tenant: None,
3601 peer_ip: None,
3602 mfa_present: false,
3603 now_ms: 1_700_000_000_000,
3604 principal_is_admin_role: true,
3605 principal_is_system_owned: true,
3606 principal_is_platform_scoped: true,
3607 };
3608 assert!(auth_store.check_policy_authz(
3610 &actor,
3611 "select",
3612 &ResourceRef::new("collection", "docs"),
3613 &ctx
3614 ));
3615
3616 let managed_policy = runtime
3617 .config_registry()
3618 .get_active("managed-deny-drop")
3619 .expect("managed policy registry entry");
3620 assert!(managed_policy.managed);
3621 assert_eq!(managed_policy.resource_type, "policy");
3622 let managed_config = runtime
3623 .config_registry()
3624 .get_active("red.ai")
3625 .expect("managed config namespace registry entry");
3626 assert!(managed_config.managed);
3627 assert_eq!(managed_config.resource_type, "config_namespace");
3628
3629 let store = runtime.db().store();
3630 match store
3631 .get_config("red.ai.default.provider")
3632 .expect("plain config persisted")
3633 {
3634 Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
3635 other => panic!("expected provider text, got {other:?}"),
3636 }
3637 let Value::Json(bytes) = store
3638 .get_config("red.ai.openai.default.secret_ref")
3639 .expect("secret ref config persisted")
3640 else {
3641 panic!("secret ref must be stored as structured JSON");
3642 };
3643 let reference: crate::serde_json::Value =
3644 crate::serde_json::from_slice(&bytes).expect("secret ref json");
3645 assert_eq!(
3646 reference.get("type").and_then(|v| v.as_str()),
3647 Some("secret_ref")
3648 );
3649 assert!(
3650 !String::from_utf8_lossy(&bytes).contains("hunter2"),
3651 "manifest password must not leak into secret ref config"
3652 );
3653
3654 let completed = store
3655 .get_config(BOOTSTRAP_COMPLETED_KEY)
3656 .expect("bootstrap completion persisted");
3657 assert!(matches!(completed, Value::Boolean(true)));
3658 assert!(
3659 store
3660 .get_config("system.bootstrap.manifest.registry_entries")
3661 .is_some(),
3662 "managed registry entries must be persisted internally"
3663 );
3664
3665 std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
3666 let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
3667 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
3668 .expect("registry rehydrates without manifest file");
3669 assert!(restored_registry.get_active("managed-deny-drop").is_some());
3670 assert!(restored_registry.get_active("red.ai").is_some());
3671
3672 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3673 apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
3674 assert!(fresh.needs_bootstrap());
3675
3676 clear_preset_env();
3677 }
3678
3679 #[test]
3680 fn production_preset_refuses_to_start_without_password() {
3681 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3682 clear_preset_env();
3683 unsafe {
3685 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3686 std::env::set_var("REDDB_USERNAME", "ops");
3687 }
3688
3689 let (runtime, auth_store) = fresh_runtime_and_store();
3690 let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
3691 assert!(
3692 err.contains("REDDB_PASSWORD"),
3693 "error must name the missing env: {err}"
3694 );
3695
3696 assert!(auth_store.needs_bootstrap());
3698 assert!(runtime
3699 .db()
3700 .store()
3701 .get_config(BOOTSTRAP_COMPLETED_KEY)
3702 .is_none());
3703
3704 clear_preset_env();
3705 }
3706
3707 #[test]
3708 fn re_running_production_after_first_boot_is_a_silent_skip() {
3709 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3710 clear_preset_env();
3711 unsafe {
3713 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3714 std::env::set_var("REDDB_USERNAME", "ops");
3715 std::env::set_var("REDDB_PASSWORD", "hunter2");
3716 }
3717
3718 let (runtime, auth_store) = fresh_runtime_and_store();
3719 apply_preset(&runtime, &auth_store).expect("first apply");
3720 assert_eq!(auth_store.list_users().len(), 1);
3721
3722 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3729 apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
3730 assert!(
3731 fresh.needs_bootstrap(),
3732 "re-run must not create a second admin"
3733 );
3734 assert!(
3735 fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
3736 "re-run must not re-install the allow-all policy on the fresh store"
3737 );
3738
3739 clear_preset_env();
3740 }
3741
3742 #[test]
3743 fn unrecognised_preset_value_is_rejected() {
3744 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3745 clear_preset_env();
3746 unsafe {
3748 std::env::set_var(PRESET_ENV, "weird");
3749 }
3750
3751 let (runtime, auth_store) = fresh_runtime_and_store();
3752 let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
3753 assert!(err.contains("weird"), "error must echo the value: {err}");
3754 assert!(auth_store.needs_bootstrap());
3755
3756 clear_preset_env();
3757 }
3758
3759 #[test]
3760 fn no_auth_short_circuits_preset_entirely() {
3761 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3762 clear_preset_env();
3763 unsafe {
3766 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3767 std::env::set_var("REDDB_USERNAME", "ops");
3768 std::env::set_var("REDDB_PASSWORD", "hunter2");
3769 }
3770
3771 let options = no_auth_test_config(true)
3772 .to_db_options()
3773 .expect("to_db_options");
3774 assert!(no_auth_active(&options));
3775
3776 let (runtime, auth_store) = fresh_runtime_and_store();
3779 if !no_auth_active(&options) {
3780 apply_preset(&runtime, &auth_store).expect("would apply preset");
3781 }
3782
3783 assert!(
3784 auth_store.needs_bootstrap(),
3785 "--no-auth must prevent any admin creation"
3786 );
3787 assert!(
3788 runtime
3789 .db()
3790 .store()
3791 .get_config(BOOTSTRAP_COMPLETED_KEY)
3792 .is_none(),
3793 "--no-auth must skip bootstrap-state persistence"
3794 );
3795
3796 clear_preset_env();
3797 }
3798
3799 #[test]
3800 fn implicit_bind_collision_degrades() {
3801 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3802 let addr = held.local_addr().expect("held addr").to_string();
3803 let mut readiness = TransportReadiness::default();
3804
3805 let listener =
3806 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
3807
3808 assert!(listener.is_none());
3809 assert_eq!(readiness.active.len(), 0);
3810 assert_eq!(readiness.failed.len(), 1);
3811 assert!(!readiness.failed[0].explicit);
3812 assert_eq!(readiness.failed[0].bind_addr, addr);
3813 }
3814}