1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::storage::StorageProfileSelection;
13use crate::{
14 GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
15};
16
17pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
18
19pub fn detect_runtime_config() -> RuntimeConfig {
21 let cpus = thread::available_parallelism()
22 .map(|n| n.get())
23 .unwrap_or(1);
24
25 let suggested_workers = cpus.saturating_sub(1).max(1);
27
28 RuntimeConfig {
29 available_cpus: cpus,
30 suggested_workers,
31 stack_size: 8 * 1024 * 1024, }
33}
34
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37 pub available_cpus: usize,
38 pub suggested_workers: usize,
39 pub stack_size: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerTransport {
44 Grpc,
45 Http,
46 Wire,
47}
48
49impl ServerTransport {
50 pub const fn as_str(self) -> &'static str {
51 match self {
52 Self::Grpc => "gRPC",
53 Self::Http => "HTTP",
54 Self::Wire => "wire",
55 }
56 }
57
58 pub const fn default_bind_addr(self) -> &'static str {
59 match self {
60 Self::Grpc => "127.0.0.1:5555",
61 Self::Http => "127.0.0.1:5055",
62 Self::Wire => "127.0.0.1:5050",
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct ServerCommandConfig {
69 pub path: Option<PathBuf>,
70 pub router_bind_addr: Option<String>,
71 pub router_bind_explicit: bool,
72 pub grpc_bind_addr: Option<String>,
73 pub grpc_bind_explicit: bool,
74 pub grpc_tls_bind_addr: Option<String>,
78 pub grpc_tls_cert: Option<PathBuf>,
84 pub grpc_tls_key: Option<PathBuf>,
87 pub grpc_tls_client_ca: Option<PathBuf>,
92 pub http_bind_addr: Option<String>,
93 pub http_bind_explicit: bool,
94 pub http_tls_bind_addr: Option<String>,
98 pub http_tls_cert: Option<PathBuf>,
101 pub http_tls_key: Option<PathBuf>,
104 pub http_tls_client_ca: Option<PathBuf>,
108 pub wire_bind_addr: Option<String>,
109 pub wire_bind_explicit: bool,
110 pub wire_tls_bind_addr: Option<String>,
112 pub wire_tls_cert: Option<PathBuf>,
114 pub wire_tls_key: Option<PathBuf>,
116 pub pg_bind_addr: Option<String>,
120 pub create_if_missing: bool,
121 pub read_only: bool,
122 pub role: String,
123 pub primary_addr: Option<String>,
124 pub storage_profile: StorageProfileSelection,
125 pub vault: bool,
126 pub no_auth: bool,
137 pub workers: Option<usize>,
139 pub telemetry: Option<crate::telemetry::TelemetryConfig>,
142 pub http_limits_cli: crate::server::HttpLimitsCliInput,
147 pub ui: bool,
152 pub ui_dir: Option<PathBuf>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct TransportListenerState {
160 pub transport: String,
161 pub bind_addr: String,
162 pub explicit: bool,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct TransportListenerFailure {
167 pub transport: String,
168 pub bind_addr: String,
169 pub explicit: bool,
170 pub reason: String,
171}
172
173#[derive(Debug, Clone, Default, PartialEq, Eq)]
174pub struct TransportReadiness {
175 pub active: Vec<TransportListenerState>,
176 pub failed: Vec<TransportListenerFailure>,
177}
178
179impl TransportReadiness {
180 fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
181 self.active.push(TransportListenerState {
182 transport: transport.to_string(),
183 bind_addr: bind_addr.to_string(),
184 explicit,
185 });
186 }
187
188 fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
189 self.failed.push(TransportListenerFailure {
190 transport: transport.to_string(),
191 bind_addr: bind_addr.to_string(),
192 explicit,
193 reason,
194 });
195 }
196}
197
198#[derive(Debug, Clone)]
199pub struct SystemdServiceConfig {
200 pub service_name: String,
201 pub binary_path: PathBuf,
202 pub run_user: String,
203 pub run_group: String,
204 pub data_path: PathBuf,
205 pub router_bind_addr: Option<String>,
206 pub grpc_bind_addr: Option<String>,
207 pub http_bind_addr: Option<String>,
208}
209
210impl SystemdServiceConfig {
211 pub fn data_dir(&self) -> PathBuf {
212 self.data_path
213 .parent()
214 .map(PathBuf::from)
215 .unwrap_or_else(|| PathBuf::from("."))
216 }
217
218 pub fn unit_path(&self) -> PathBuf {
219 PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
220 }
221}
222
223pub fn default_telemetry_for_path(
228 path: Option<&std::path::Path>,
229) -> crate::telemetry::TelemetryConfig {
230 let log_dir = match path {
231 Some(p) => p
232 .parent()
233 .map(|parent| parent.join("logs"))
234 .or_else(|| Some(std::path::PathBuf::from("./logs"))),
235 None => None, };
237 crate::telemetry::TelemetryConfig {
238 log_dir,
239 file_prefix: "reddb.log".to_string(),
240 level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
241 format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
242 crate::telemetry::LogFormat::Pretty
243 } else {
244 crate::telemetry::LogFormat::Json
245 },
246 rotation_keep_days: 14,
247 service_name: "reddb",
248 level_explicit: false,
250 format_explicit: false,
251 rotation_keep_days_explicit: false,
252 file_prefix_explicit: false,
253 log_dir_explicit: false,
254 log_file_disabled: false,
255 }
256}
257
258const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
265const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
266const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
267const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
271
272pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
280
281pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
284 options
285 .metadata
286 .get(NO_AUTH_META)
287 .map(|v| v == "true")
288 .unwrap_or(false)
289}
290
291const NO_AUTH_WARNING: &str =
296 "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
297
298impl ServerCommandConfig {
299 fn to_db_options(&self) -> Result<RedDBOptions, String> {
300 let mut options = match &self.path {
301 Some(path) => RedDBOptions::persistent(path),
302 None => RedDBOptions::in_memory(),
303 };
304
305 options.mode = StorageMode::Persistent;
306 options.create_if_missing = self.create_if_missing;
307 options.read_only = self.read_only
314 || env_nonempty("RED_READONLY")
315 .or_else(|| env_nonempty("REDDB_READONLY"))
316 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
317 .unwrap_or(false)
318 || self.path.as_ref().is_some_and(|data_path| {
319 crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
320 data_path,
321 ))
322 .unwrap_or(false)
323 });
324
325 options.replication = match self.role.as_str() {
326 "primary" => ReplicationConfig::primary(),
327 "replica" => {
328 let primary_addr = self
329 .primary_addr
330 .clone()
331 .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
332 ReplicationConfig::replica(primary_addr)
339 }
340 _ => ReplicationConfig::standalone(),
341 };
342 options.storage_profile = self.storage_profile.validate()?;
343
344 if self.vault {
345 options.auth.vault_enabled = true;
346 }
347
348 if self.no_auth {
359 options.auth.enabled = false;
360 options.auth.require_auth = false;
361 options.auth.vault_enabled = false;
362 options
363 .metadata
364 .insert(NO_AUTH_META.to_string(), "true".to_string());
365 }
366
367 if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
372 options.control_events.compliance_mode = matches!(
373 raw.to_ascii_lowercase().as_str(),
374 "1" | "true" | "yes" | "on"
375 );
376 }
377 if env_nonempty(PRESET_ENV).is_some_and(|s| s.trim() == PRESET_REGULATED) {
378 options.control_events.compliance_mode = true;
379 options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
380 }
381
382 match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
387 Err(msg) => {
388 return Err(format!("backup bootstrap: {msg}"));
389 }
390 Ok(Some(cfg)) => {
391 apply_backup_config(&mut options, &cfg);
392 }
393 Ok(None) => {
394 configure_remote_backend_from_env(&mut options);
395 }
396 }
397
398 if options.remote_backend.is_some()
399 || options
400 .metadata
401 .contains_key(BACKUP_INTERVAL_META_CHECKPOINT)
402 {
403 let mut selection = options.storage_profile;
404 selection.managed_backup = true;
405 options.storage_profile = selection.validate()?;
406 }
407
408 Ok(options)
409 }
410
411 pub fn enabled_transports(&self) -> Vec<ServerTransport> {
412 let mut transports = Vec::with_capacity(3);
413 if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
414 transports.push(ServerTransport::Grpc);
415 }
416 if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
417 transports.push(ServerTransport::Http);
418 }
419 if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
420 transports.push(ServerTransport::Wire);
421 }
422 transports
423 }
424}
425
426fn env_nonempty(name: &str) -> Option<String> {
431 crate::utils::env_with_file_fallback(name)
432}
433
434fn env_truthy(name: &str) -> bool {
435 env_nonempty(name)
436 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
437 .unwrap_or(false)
438}
439
440fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
446 let endpoint_host = endpoint_host(&cfg.endpoint);
447
448 options.metadata.insert(
449 BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
450 cfg.checkpoint_interval_secs.to_string(),
451 );
452 options.metadata.insert(
453 BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
454 cfg.wal_flush_interval_secs.to_string(),
455 );
456 options
457 .metadata
458 .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
459 options.metadata.insert(
460 BACKUP_PAUSE_ON_LAG_META.to_string(),
461 cfg.pause_on_lag_secs.to_string(),
462 );
463
464 #[cfg(feature = "backend-s3")]
465 {
466 let s3_cfg = crate::storage::backend::S3Config {
467 endpoint: cfg.endpoint.clone(),
468 bucket: cfg.bucket.clone(),
469 key_prefix: cfg.prefix.clone(),
470 access_key: cfg.access_key_id.clone(),
471 secret_key: cfg.secret_access_key.clone(),
472 region: cfg.region.clone(),
473 path_style: true,
474 };
475 let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
476 options.remote_backend = Some(backend.clone());
477 options.remote_backend_atomic = Some(backend);
478 let trimmed = cfg.prefix.trim_end_matches('/');
483 options.remote_key = Some(reddb_file::remote_database_key(trimmed));
484
485 tracing::info!(
486 backend = "s3",
487 endpoint = %endpoint_host,
488 bucket = %cfg.bucket,
489 prefix = %cfg.prefix,
490 checkpoint_interval_secs = cfg.checkpoint_interval_secs,
491 wal_flush_interval_secs = cfg.wal_flush_interval_secs,
492 "backup backend configured from REDDB_BACKUP_* env"
493 );
494 }
495
496 #[cfg(not(feature = "backend-s3"))]
497 {
498 tracing::warn!(
499 backend = "s3",
500 endpoint = %endpoint_host,
501 bucket = %cfg.bucket,
502 prefix = %cfg.prefix,
503 "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
504 backend wiring skipped (archiver/checkpointer also disabled)"
505 );
506 }
507}
508
509fn endpoint_host(endpoint: &str) -> &str {
510 let after_scheme = endpoint
511 .split_once("://")
512 .map(|(_, r)| r)
513 .unwrap_or(endpoint);
514 after_scheme.split('/').next().unwrap_or(after_scheme)
515}
516
517fn spawn_backup_tasks_if_configured(
523 options: &RedDBOptions,
524 runtime: &RedDBRuntime,
525) -> Option<BackupTasksHandle> {
526 let checkpoint_secs: u64 = options
527 .metadata
528 .get(BACKUP_INTERVAL_META_CHECKPOINT)?
529 .parse()
530 .ok()?;
531 let wal_secs: u64 = options
532 .metadata
533 .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
534 .parse()
535 .ok()?;
536 let pause_on_lag_secs: u64 = options
539 .metadata
540 .get(BACKUP_PAUSE_ON_LAG_META)
541 .and_then(|raw| raw.parse().ok())
542 .unwrap_or(0);
543 options.remote_backend.as_ref()?;
544
545 let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
546
547 if pause_on_lag_secs > 0 {
552 let now_ms = std::time::SystemTime::now()
553 .duration_since(std::time::UNIX_EPOCH)
554 .map(|d| d.as_millis() as u64)
555 .unwrap_or(0);
556 runtime
557 .write_gate()
558 .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
559 tracing::info!(
560 pause_on_lag_secs,
561 "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
562 );
563 }
564
565 let checkpoint_handle = {
566 let stop = Arc::clone(&stop);
567 let runtime = runtime.clone();
568 let interval = Duration::from_secs(checkpoint_secs);
569 thread::Builder::new()
570 .name("red-checkpointer".into())
571 .spawn(move || {
572 periodic_loop(stop, interval, move || {
573 if let Err(err) = runtime.checkpoint() {
574 tracing::warn!(error = %err, "periodic checkpoint failed");
575 }
576 })
577 })
578 .ok()
579 };
580
581 let archiver_handle = {
582 let stop = Arc::clone(&stop);
583 let runtime = runtime.clone();
584 let interval = Duration::from_secs(wal_secs);
585 let lag_enabled = pause_on_lag_secs > 0;
586 thread::Builder::new()
587 .name("red-wal-archiver".into())
588 .spawn(move || {
589 periodic_loop(stop, interval, move || match runtime.trigger_backup() {
590 Ok(_) if lag_enabled => {
591 let now_ms = std::time::SystemTime::now()
592 .duration_since(std::time::UNIX_EPOCH)
593 .map(|d| d.as_millis() as u64)
594 .unwrap_or(0);
595 runtime.write_gate().record_archive_success(now_ms);
596 runtime.write_gate().evaluate_archive_lag(now_ms);
600 }
601 Ok(_) => {}
602 Err(err) => {
603 tracing::warn!(error = %err, "periodic WAL archive/backup failed");
604 }
605 })
606 })
607 .ok()
608 };
609
610 let lag_monitor_handle = if pause_on_lag_secs > 0 {
615 let stop = Arc::clone(&stop);
616 let runtime = runtime.clone();
617 let interval = Duration::from_secs(5);
621 thread::Builder::new()
622 .name("red-archive-lag-monitor".into())
623 .spawn(move || {
624 periodic_loop(stop, interval, move || {
625 let now_ms = std::time::SystemTime::now()
626 .duration_since(std::time::UNIX_EPOCH)
627 .map(|d| d.as_millis() as u64)
628 .unwrap_or(0);
629 let was_paused = runtime.write_gate().is_auto_paused();
630 let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
631 if now_paused && !was_paused {
632 tracing::warn!(
633 pause_on_lag_secs,
634 last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
635 "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
636 );
637 } else if !now_paused && was_paused {
638 tracing::info!(
639 "WAL archive caught up — exiting graceful read-only mode (issue #519)"
640 );
641 }
642 })
643 })
644 .ok()
645 } else {
646 None
647 };
648
649 tracing::info!(
650 checkpoint_interval_secs = checkpoint_secs,
651 wal_flush_interval_secs = wal_secs,
652 "backup tasks spawned (checkpointer + WAL archiver)"
653 );
654
655 Some(BackupTasksHandle {
656 stop,
657 _checkpoint_handle: checkpoint_handle,
658 _archiver_handle: archiver_handle,
659 _lag_monitor_handle: lag_monitor_handle,
660 })
661}
662
663pub struct BackupTasksHandle {
666 stop: Arc<std::sync::atomic::AtomicBool>,
667 _checkpoint_handle: Option<thread::JoinHandle<()>>,
668 _archiver_handle: Option<thread::JoinHandle<()>>,
669 _lag_monitor_handle: Option<thread::JoinHandle<()>>,
672}
673
674impl Drop for BackupTasksHandle {
675 fn drop(&mut self) {
676 self.stop.store(true, std::sync::atomic::Ordering::Release);
677 }
678}
679
680fn periodic_loop<F: FnMut()>(
681 stop: Arc<std::sync::atomic::AtomicBool>,
682 interval: Duration,
683 mut tick: F,
684) {
685 let wake = Duration::from_secs(1);
688 let mut elapsed = Duration::ZERO;
689 while !stop.load(std::sync::atomic::Ordering::Acquire) {
690 thread::sleep(wake);
691 elapsed += wake;
692 if elapsed >= interval {
693 tick();
694 elapsed = Duration::ZERO;
695 }
696 }
697}
698
699fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
700 let backend = env_nonempty("RED_BACKEND")
706 .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
707 .unwrap_or_else(|| "none".to_string())
708 .to_ascii_lowercase();
709
710 match backend.as_str() {
711 "s3" | "minio" | "r2" => {
716 #[cfg(feature = "backend-s3")]
717 {
718 if let Some(config) = s3_config_from_env() {
719 let remote_key = env_nonempty("RED_REMOTE_KEY")
720 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
721 .unwrap_or_else(|| reddb_file::remote_database_key("clusters/dev"));
722 let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
723 options.remote_backend = Some(backend.clone());
724 options.remote_backend_atomic = Some(backend);
725 options.remote_key = Some(remote_key);
726 }
727 }
728 #[cfg(not(feature = "backend-s3"))]
729 {
730 tracing::warn!(
731 backend = %backend,
732 "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
733 );
734 }
735 }
736 "fs" | "local" => {
741 let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
742 let remote_key = match (
743 base_path,
744 env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
745 ) {
746 (Some(base), Some(rel)) => Some(format!(
747 "{}/{}",
748 base.trim_end_matches('/'),
749 rel.trim_start_matches('/')
750 )),
751 (Some(base), None) => Some(reddb_file::remote_database_key(&format!(
752 "{}/clusters/dev",
753 base.trim_end_matches('/')
754 ))),
755 (None, Some(rel)) => Some(rel),
756 (None, None) => None,
757 };
758 if let Some(key) = remote_key {
759 let backend = Arc::new(crate::storage::backend::LocalBackend);
760 options.remote_backend = Some(backend.clone());
761 options.remote_backend_atomic = Some(backend);
762 options.remote_key = Some(key);
763 }
764 }
765 "http" => {
770 let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
771 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
772 {
773 Some(u) => u,
774 None => {
775 tracing::warn!(
776 "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
777 );
778 return;
779 }
780 };
781 let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
782 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
783 .unwrap_or_default();
784 let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
785 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
786 {
787 std::fs::read_to_string(&path)
788 .ok()
789 .map(|s| s.trim().to_string())
790 .filter(|s| !s.is_empty())
791 } else {
792 env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
793 .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
794 };
795
796 let mut config =
797 crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
798 if let Some(auth) = auth_header {
799 config = config.with_auth_header(auth);
800 }
801 let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
802 || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
803 || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
804 config = config.with_conditional_writes(conditional_writes);
805 if conditional_writes {
810 match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
811 Ok(atomic) => {
812 let atomic_arc = Arc::new(atomic);
813 options.remote_backend = Some(atomic_arc.clone());
814 options.remote_backend_atomic = Some(atomic_arc);
815 }
816 Err(err) => {
817 tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
818 options.remote_backend =
819 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
820 }
821 }
822 } else {
823 options.remote_backend =
824 Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
825 }
826 options.remote_key = env_nonempty("RED_REMOTE_KEY")
827 .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
828 .or_else(|| Some(reddb_file::remote_database_key("clusters/dev")));
829 }
830 "none" | "" => {}
833 other => {
834 tracing::warn!(
835 backend = %other,
836 "unknown RED_BACKEND value — supported: s3 | fs | http | none"
837 );
838 }
839 }
840}
841
842#[cfg(feature = "backend-s3")]
847fn env_s3(suffix: &str) -> Option<String> {
848 env_nonempty(&format!("RED_S3_{suffix}"))
849 .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
850}
851
852#[cfg(feature = "backend-s3")]
858fn env_s3_secret(suffix: &str) -> Option<String> {
859 let file_key_red = format!("RED_S3_{suffix}_FILE");
860 let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
861 if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
862 return std::fs::read_to_string(&path)
863 .ok()
864 .map(|s| s.trim().to_string())
865 .filter(|s| !s.is_empty());
866 }
867 env_s3(suffix)
868}
869
870#[cfg(feature = "backend-s3")]
871fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
872 let endpoint = env_s3("ENDPOINT")?;
873 let bucket = env_s3("BUCKET")?;
874 let access_key = env_s3_secret("ACCESS_KEY")?;
875 let secret_key = env_s3_secret("SECRET_KEY")?;
876 let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
877 let key_prefix = env_s3("KEY_PREFIX")
878 .or_else(|| env_s3("PREFIX"))
879 .unwrap_or_default();
880 let path_style = env_s3("PATH_STYLE")
881 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
882 .unwrap_or(true);
883 Some(crate::storage::backend::S3Config {
884 endpoint,
885 bucket,
886 key_prefix,
887 access_key,
888 secret_key,
889 region,
890 path_style,
891 })
892}
893
894pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
895 let data_dir = config.data_dir();
896 let exec_start = render_systemd_exec_start(config);
897 format!(
898 "[Unit]\n\
899Description=RedDB unified database service\n\
900After=network-online.target\n\
901Wants=network-online.target\n\
902\n\
903[Service]\n\
904Type=simple\n\
905User={user}\n\
906Group={group}\n\
907WorkingDirectory={workdir}\n\
908ExecStart={exec_start}\n\
909Restart=always\n\
910RestartSec=2\n\
911LimitSTACK=16M\n\
912NoNewPrivileges=true\n\
913PrivateTmp=true\n\
914ProtectSystem=strict\n\
915ProtectHome=true\n\
916ProtectControlGroups=true\n\
917ProtectKernelTunables=true\n\
918ProtectKernelModules=true\n\
919RestrictNamespaces=true\n\
920LockPersonality=true\n\
921MemoryDenyWriteExecute=true\n\
922ReadWritePaths={workdir}\n\
923\n\
924[Install]\n\
925WantedBy=multi-user.target\n",
926 user = config.run_user,
927 group = config.run_group,
928 workdir = data_dir.display(),
929 exec_start = exec_start,
930 )
931}
932
933#[cfg(target_os = "linux")]
942pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
943 ensure_root()?;
944 ensure_command_available("systemctl")?;
945 ensure_command_available("getent")?;
946 ensure_command_available("groupadd")?;
947 ensure_command_available("useradd")?;
948 ensure_command_available("install")?;
949 ensure_executable(&config.binary_path)?;
950
951 if !command_success("getent", ["group", config.run_group.as_str()])? {
952 run_command("groupadd", ["--system", config.run_group.as_str()])?;
953 }
954
955 if !command_success("id", ["-u", config.run_user.as_str()])? {
956 let data_dir = config.data_dir();
957 run_command(
958 "useradd",
959 [
960 "--system",
961 "--gid",
962 config.run_group.as_str(),
963 "--home-dir",
964 data_dir.to_string_lossy().as_ref(),
965 "--shell",
966 "/usr/sbin/nologin",
967 config.run_user.as_str(),
968 ],
969 )?;
970 }
971
972 let data_dir = config.data_dir();
973 run_command(
974 "install",
975 [
976 "-d",
977 "-o",
978 config.run_user.as_str(),
979 "-g",
980 config.run_group.as_str(),
981 "-m",
982 "0750",
983 data_dir.to_string_lossy().as_ref(),
984 ],
985 )?;
986
987 std::fs::write(config.unit_path(), render_systemd_unit(config))
988 .map_err(|err| format!("failed to write systemd unit: {err}"))?;
989
990 run_command("systemctl", ["daemon-reload"])?;
991 run_command(
992 "systemctl",
993 [
994 "enable",
995 "--now",
996 format!("{}.service", config.service_name).as_str(),
997 ],
998 )?;
999
1000 Ok(())
1001}
1002
1003#[cfg(not(target_os = "linux"))]
1008pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
1009 Err("systemd install is Linux-only — use sc.exe (Windows) or \
1010 launchd (macOS) to install the service manually using the \
1011 unit printed by `red service print-unit`"
1012 .to_string())
1013}
1014
1015#[cfg(target_os = "linux")]
1016fn ensure_root() -> Result<(), String> {
1017 let output = Command::new("id")
1018 .arg("-u")
1019 .output()
1020 .map_err(|err| format!("failed to determine current uid: {err}"))?;
1021 if !output.status.success() {
1022 return Err("failed to determine current uid".to_string());
1023 }
1024 let uid = String::from_utf8_lossy(&output.stdout);
1025 if uid.trim() != "0" {
1026 return Err("run this command as root (sudo)".to_string());
1027 }
1028 Ok(())
1029}
1030
1031#[cfg(target_os = "linux")]
1032fn ensure_command_available(command: &str) -> Result<(), String> {
1033 let status = Command::new("sh")
1034 .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1035 .status()
1036 .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1037 if status.success() {
1038 Ok(())
1039 } else {
1040 Err(format!("required command not found: {command}"))
1041 }
1042}
1043
1044#[cfg(target_os = "linux")]
1045fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1046 let metadata = std::fs::metadata(path)
1047 .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1048 #[cfg(unix)]
1049 {
1050 use std::os::unix::fs::PermissionsExt;
1051 if metadata.permissions().mode() & 0o111 == 0 {
1052 return Err(format!("binary is not executable: {}", path.display()));
1053 }
1054 }
1055 #[cfg(not(unix))]
1056 {
1057 if !metadata.is_file() {
1058 return Err(format!("binary is not a file: {}", path.display()));
1059 }
1060 }
1061 Ok(())
1062}
1063
1064#[cfg(target_os = "linux")]
1065fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1066 Command::new(program)
1067 .args(args)
1068 .status()
1069 .map(|status| status.success())
1070 .map_err(|err| format!("failed to run {program}: {err}"))
1071}
1072
1073#[cfg(target_os = "linux")]
1074fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1075 let output = Command::new(program)
1076 .args(args)
1077 .output()
1078 .map_err(|err| format!("failed to run {program}: {err}"))?;
1079 if output.status.success() {
1080 return Ok(());
1081 }
1082
1083 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1084 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1085 let detail = if !stderr.is_empty() {
1086 stderr
1087 } else if !stdout.is_empty() {
1088 stdout
1089 } else {
1090 format!("exit status {}", output.status)
1091 };
1092 Err(format!("{program} failed: {detail}"))
1093}
1094
1095pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1096 let has_any = config.router_bind_addr.is_some()
1097 || config.grpc_bind_addr.is_some()
1098 || config.http_bind_addr.is_some()
1099 || config.wire_bind_addr.is_some()
1100 || config.pg_bind_addr.is_some();
1101 if !has_any {
1102 return Err("at least one server bind address must be configured".into());
1103 }
1104 let thread_name = if config.router_bind_addr.is_some() {
1105 "red-server-router"
1106 } else {
1107 match (
1108 config.grpc_bind_addr.is_some(),
1109 config.http_bind_addr.is_some(),
1110 ) {
1111 (true, true) => "red-server-dual",
1112 (true, false) => "red-server-grpc",
1113 (false, true) => "red-server-http",
1114 (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1115 (false, false) => "red-server-pg-wire",
1116 }
1117 };
1118
1119 let handle = thread::Builder::new()
1120 .name(thread_name.into())
1121 .stack_size(8 * 1024 * 1024)
1122 .spawn(move || run_configured_servers(config))
1123 .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1124
1125 match handle.join() {
1126 Ok(result) => result,
1127 Err(_) => Err("server thread panicked".to_string()),
1128 }
1129}
1130
1131fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1132 let mut parts = vec![
1133 config.binary_path.display().to_string(),
1134 "server".to_string(),
1135 "--path".to_string(),
1136 config.data_path.display().to_string(),
1137 ];
1138
1139 if let Some(bind_addr) = &config.router_bind_addr {
1140 parts.push("--bind".to_string());
1141 parts.push(bind_addr.clone());
1142 } else if let Some(bind_addr) = &config.grpc_bind_addr {
1143 parts.push("--grpc-bind".to_string());
1144 parts.push(bind_addr.clone());
1145 }
1146 if let Some(bind_addr) = &config.http_bind_addr {
1147 parts.push("--http-bind".to_string());
1148 parts.push(bind_addr.clone());
1149 }
1150
1151 parts.join(" ")
1152}
1153
1154pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1155 let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1156 Ok(addresses) => addresses.collect(),
1157 Err(_) => return false,
1158 };
1159
1160 addresses
1161 .into_iter()
1162 .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1163}
1164
1165#[inline(never)]
1166fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1167 if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1173 return run_routed_server(config, router_bind_addr);
1174 }
1175
1176 match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1177 (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1178 run_dual_server(config, grpc_bind_addr, http_bind_addr)
1179 }
1180 (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1181 (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1182 (None, None) => {
1183 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1184 run_wire_only_server(config, wire_addr)
1185 } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1186 run_pg_only_server(config, pg_addr)
1187 } else {
1188 Err("at least one server bind address must be configured".to_string())
1189 }
1190 }
1191 }
1192}
1193
1194pub fn bind_listener_for_startup(
1212 readiness: &mut TransportReadiness,
1213 transport: &str,
1214 bind_addr: &str,
1215 explicit: bool,
1216) -> Result<Option<TcpListener>, String> {
1217 match TcpListener::bind(bind_addr) {
1218 Ok(listener) => {
1219 readiness.active(transport, bind_addr, explicit);
1220 Ok(Some(listener))
1221 }
1222 Err(err) => {
1223 let reason = format!("{transport} listener bind {bind_addr}: {err}");
1224 readiness.failed(transport, bind_addr, explicit, reason.clone());
1225 if explicit {
1226 tracing::error!(
1227 transport,
1228 bind = %bind_addr,
1229 error = %err,
1230 "fatal explicit bind failure"
1231 );
1232 Err(format!("explicit {reason}"))
1233 } else {
1234 tracing::warn!(
1235 transport,
1236 bind = %bind_addr,
1237 error = %err,
1238 "non-fatal implicit bind failure; listener degraded"
1239 );
1240 Ok(None)
1241 }
1242 }
1243 }
1244}
1245
1246async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1269 let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1270 .ok()
1271 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1272 .unwrap_or(true);
1273
1274 #[cfg(unix)]
1275 {
1276 use tokio::signal::unix::{signal, SignalKind};
1277
1278 let mut sigterm = match signal(SignalKind::terminate()) {
1279 Ok(s) => s,
1280 Err(err) => {
1281 tracing::warn!(
1282 error = %err,
1283 "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1284 );
1285 return;
1286 }
1287 };
1288 let mut sigint = match signal(SignalKind::interrupt()) {
1289 Ok(s) => s,
1290 Err(err) => {
1291 tracing::warn!(error = %err, "could not install SIGINT handler");
1292 return;
1293 }
1294 };
1295 let mut sighup = match signal(SignalKind::hangup()) {
1301 Ok(s) => Some(s),
1302 Err(err) => {
1303 tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1304 None
1305 }
1306 };
1307
1308 let reload_runtime = runtime.clone();
1309 tokio::spawn(async move {
1310 loop {
1311 let signal_name = match &mut sighup {
1312 Some(hup) => tokio::select! {
1313 _ = sigterm.recv() => "SIGTERM",
1314 _ = sigint.recv() => "SIGINT",
1315 _ = hup.recv() => "SIGHUP",
1316 },
1317 None => tokio::select! {
1318 _ = sigterm.recv() => "SIGTERM",
1319 _ = sigint.recv() => "SIGINT",
1320 },
1321 };
1322
1323 if signal_name == "SIGHUP" {
1324 handle_sighup_reload(&reload_runtime);
1325 continue; }
1327
1328 tracing::info!(
1329 signal = signal_name,
1330 "lifecycle signal received; shutting down"
1331 );
1332 match runtime.graceful_shutdown(backup_on_shutdown) {
1333 Ok(report) => {
1334 tracing::info!(
1335 duration_ms = report.duration_ms,
1336 flushed_wal = report.flushed_wal,
1337 final_checkpoint = report.final_checkpoint,
1338 backup_uploaded = report.backup_uploaded,
1339 "graceful shutdown complete"
1340 );
1341 }
1342 Err(err) => {
1343 tracing::error!(error = %err, "graceful shutdown failed");
1344 crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1350 reason: format!("graceful shutdown failed: {err}"),
1351 }
1352 .emit_global();
1353 }
1354 }
1355 std::process::exit(0);
1356 }
1357 });
1358 }
1359
1360 #[cfg(not(unix))]
1361 {
1362 tokio::spawn(async move {
1363 let interrupted = tokio::signal::ctrl_c().await;
1364 if let Err(err) = interrupted {
1365 tracing::warn!(error = %err, "could not install Ctrl+C handler");
1366 return;
1367 }
1368
1369 tracing::info!(
1370 signal = "Ctrl+C",
1371 "lifecycle signal received; shutting down"
1372 );
1373 match runtime.graceful_shutdown(backup_on_shutdown) {
1374 Ok(report) => {
1375 tracing::info!(
1376 duration_ms = report.duration_ms,
1377 flushed_wal = report.flushed_wal,
1378 final_checkpoint = report.final_checkpoint,
1379 backup_uploaded = report.backup_uploaded,
1380 "graceful shutdown complete"
1381 );
1382 }
1383 Err(err) => {
1384 tracing::error!(error = %err, "graceful shutdown failed");
1385 }
1386 }
1387 std::process::exit(0);
1388 });
1389 }
1390}
1391
1392fn handle_sighup_reload(runtime: &RedDBRuntime) {
1401 let now_ms = std::time::SystemTime::now()
1402 .duration_since(std::time::UNIX_EPOCH)
1403 .map(|d| d.as_millis() as u64)
1404 .unwrap_or(0);
1405 tracing::info!(
1406 target: "reddb::secrets",
1407 ts_unix_ms = now_ms,
1408 "SIGHUP received; secrets will be re-read from *_FILE on next access"
1409 );
1410 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1415 runtime.audit_log().record_event(
1416 AuditEvent::builder("config/sighup_reload")
1417 .source(AuditAuthSource::System)
1418 .resource("secrets")
1419 .outcome(Outcome::Success)
1420 .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1421 .build(),
1422 );
1423}
1424
1425#[inline(never)]
1426fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1427 let workers = config.workers;
1428 let cli_telemetry = config.telemetry.clone();
1429 let db_options = config.to_db_options()?;
1430 let rt_config = detect_runtime_config();
1431 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1432 let (runtime, auth_store, _telemetry_guard) =
1433 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1434 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1435
1436 spawn_admin_metrics_listeners(&runtime, &auth_store);
1437
1438 let http_server = build_http_server(
1444 runtime.clone(),
1445 auth_store.clone(),
1446 router_bind_addr.clone(),
1447 );
1448 let http_server = apply_http_limits(http_server, &config, &runtime);
1449 let http_server = apply_ui_bundle(http_server, &config)?;
1450
1451 let grpc_server = RedDBGrpcServer::with_options(
1452 runtime.clone(),
1453 GrpcServerOptions {
1454 bind_addr: router_bind_addr.clone(),
1455 tls: None,
1456 },
1457 auth_store,
1458 );
1459
1460 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1461 .enable_all()
1462 .worker_threads(worker_threads)
1463 .thread_stack_size(rt_config.stack_size)
1464 .build()
1465 .map_err(|err| format!("tokio runtime: {err}"))?;
1466
1467 let signal_runtime = runtime.clone();
1468 let wire_runtime = Arc::new(runtime);
1469 tokio_runtime.block_on(async move {
1470 spawn_lifecycle_signal_handler(signal_runtime).await;
1471 tracing::info!(
1472 bind = %router_bind_addr,
1473 cpus = rt_config.available_cpus,
1474 workers = worker_threads,
1475 "router bootstrapping"
1476 );
1477 serve_tcp_router(InProcessRouterConfig {
1478 bind_addr: router_bind_addr,
1479 http_server,
1480 grpc_server,
1481 wire_runtime,
1482 })
1483 .await
1484 .map_err(|err| err.to_string())
1485 })
1486}
1487
1488async fn spawn_wire_listeners(
1490 config: &ServerCommandConfig,
1491 runtime: &RedDBRuntime,
1492 readiness: &mut TransportReadiness,
1493) -> Result<(), String> {
1494 if let Some(wire_addr) = config.wire_bind_addr.clone() {
1496 let wire_rt = Arc::new(runtime.clone());
1497 #[cfg(unix)]
1500 {
1501 if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1502 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1503 tokio::spawn(async move {
1504 if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1505 &wire_addr, wire_rt,
1506 )
1507 .await
1508 {
1509 tracing::error!(err = %e, "redwire unix listener error");
1510 }
1511 });
1512 return Ok(());
1513 }
1514 }
1515 match tokio::net::TcpListener::bind(&wire_addr).await {
1516 Ok(listener) => {
1517 readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1518 tokio::spawn(async move {
1519 if let Err(e) =
1520 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1521 .await
1522 {
1523 tracing::error!(err = %e, "redwire listener error");
1524 }
1525 });
1526 }
1527 Err(err) => {
1528 let reason = format!("wire listener bind {wire_addr}: {err}");
1529 readiness.failed(
1530 "wire",
1531 &wire_addr,
1532 config.wire_bind_explicit,
1533 reason.clone(),
1534 );
1535 if config.wire_bind_explicit {
1536 tracing::error!(
1537 transport = "wire",
1538 bind = %wire_addr,
1539 error = %err,
1540 "fatal explicit bind failure"
1541 );
1542 return Err(format!("explicit {reason}"));
1543 }
1544 tracing::warn!(
1545 transport = "wire",
1546 bind = %wire_addr,
1547 error = %err,
1548 "non-fatal implicit bind failure; listener degraded"
1549 );
1550 }
1551 }
1552 }
1553
1554 if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1556 let tls_config = resolve_wire_tls_config(config);
1557 match tls_config {
1558 Ok(tls_cfg) => {
1559 let wire_rt = Arc::new(runtime.clone());
1560 tokio::spawn(async move {
1561 if let Err(e) =
1562 crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1563 .await
1564 {
1565 tracing::error!(err = %e, "redwire+tls listener error");
1566 }
1567 });
1568 }
1569 Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1570 }
1571 }
1572 Ok(())
1573}
1574
1575fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1582 if let Some(pg_addr) = config.pg_bind_addr.clone() {
1583 let rt = Arc::new(runtime.clone());
1584 tokio::spawn(async move {
1585 let cfg = crate::wire::PgWireConfig {
1586 bind_addr: pg_addr,
1587 ..Default::default()
1588 };
1589 if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1590 tracing::error!(err = %e, "pg wire listener error");
1591 }
1592 });
1593 }
1594}
1595
1596fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1610 use crate::utils::secret_file::expand_file_env;
1611
1612 for var in [
1616 "REDDB_GRPC_TLS_CERT",
1617 "REDDB_GRPC_TLS_KEY",
1618 "REDDB_GRPC_TLS_CLIENT_CA",
1619 ] {
1620 if let Err(err) = expand_file_env(var) {
1621 tracing::warn!(
1622 target: "reddb::secrets",
1623 env = %var,
1624 err = %err,
1625 "could not expand *_FILE companion for gRPC TLS"
1626 );
1627 }
1628 }
1629
1630 let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1631 (Some(cert), Some(key)) => {
1632 let cert_pem = std::fs::read(cert)
1633 .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1634 let key_pem =
1635 std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1636 (cert_pem, key_pem)
1637 }
1638 _ => {
1639 let dev = std::env::var("RED_GRPC_TLS_DEV")
1641 .ok()
1642 .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1643 .unwrap_or(false);
1644 if !dev {
1645 return Err("gRPC TLS configured but no cert/key supplied — set \
1646 REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1647 RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1648 .to_string());
1649 }
1650 let dir = config
1651 .path
1652 .as_ref()
1653 .and_then(|p| p.parent())
1654 .map(PathBuf::from)
1655 .unwrap_or_else(|| PathBuf::from("."));
1656 let (cert_pem_str, key_pem_str) =
1657 crate::wire::tls::generate_self_signed_cert("localhost")
1658 .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1659
1660 let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1665 tracing::warn!(
1666 target: "reddb::security",
1667 transport = "grpc",
1668 cert_sha256 = %fp,
1669 "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1670 DO NOT use in production"
1671 );
1672 let cert_path = dir.join("grpc-tls-cert.pem");
1674 let key_path = dir.join("grpc-tls-key.pem");
1675 if !cert_path.exists() || !key_path.exists() {
1676 let _ = std::fs::create_dir_all(&dir);
1677 std::fs::write(&cert_path, cert_pem_str.as_bytes())
1678 .map_err(|e| format!("write grpc dev cert: {e}"))?;
1679 std::fs::write(&key_path, key_pem_str.as_bytes())
1680 .map_err(|e| format!("write grpc dev key: {e}"))?;
1681 #[cfg(unix)]
1682 {
1683 use std::os::unix::fs::PermissionsExt;
1684 let _ =
1685 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1686 }
1687 }
1688 (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1689 }
1690 };
1691
1692 let client_ca_pem = match &config.grpc_tls_client_ca {
1693 Some(path) => Some(
1694 std::fs::read(path)
1695 .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1696 ),
1697 None => None,
1698 };
1699
1700 Ok(crate::GrpcTlsOptions {
1701 cert_pem,
1702 key_pem,
1703 client_ca_pem,
1704 })
1705}
1706
1707fn spawn_grpc_tls_listener_if_configured(
1711 config: &ServerCommandConfig,
1712 runtime: RedDBRuntime,
1713 auth_store: Arc<AuthStore>,
1714) {
1715 let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1716 return;
1717 };
1718 let tls_opts = match resolve_grpc_tls_options(config) {
1719 Ok(opts) => opts,
1720 Err(err) => {
1721 tracing::error!(
1722 target: "reddb::security",
1723 transport = "grpc",
1724 err = %err,
1725 "gRPC TLS config error; TLS listener will not start"
1726 );
1727 return;
1728 }
1729 };
1730 tokio::spawn(async move {
1731 let server = RedDBGrpcServer::with_options(
1732 runtime,
1733 GrpcServerOptions {
1734 bind_addr: tls_bind.clone(),
1735 tls: Some(tls_opts),
1736 },
1737 auth_store,
1738 );
1739 tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1740 if let Err(err) = server.serve().await {
1741 tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1742 }
1743 });
1744}
1745
1746fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1749 use sha2::{Digest, Sha256};
1750 let mut h = Sha256::new();
1751 h.update(pem);
1752 let d = h.finalize();
1753 let mut buf = String::with_capacity(64);
1754 for b in d.iter() {
1755 buf.push_str(&format!("{b:02x}"));
1756 }
1757 buf
1758}
1759
1760fn resolve_wire_tls_config(
1762 config: &ServerCommandConfig,
1763) -> Result<crate::wire::WireTlsConfig, String> {
1764 match (&config.wire_tls_cert, &config.wire_tls_key) {
1765 (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1766 cert_path: cert.clone(),
1767 key_path: key.clone(),
1768 }),
1769 _ => {
1770 let dir = config
1772 .path
1773 .as_ref()
1774 .and_then(|p| p.parent())
1775 .map(PathBuf::from)
1776 .unwrap_or_else(|| PathBuf::from("."));
1777 crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1778 }
1779 }
1780}
1781
1782#[inline(never)]
1783fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1784 let rt_config = detect_runtime_config();
1785 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1786 let cli_telemetry = config.telemetry.clone();
1787 let db_options = config.to_db_options()?;
1788 let mut transport_readiness = TransportReadiness::default();
1789
1790 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1791 .enable_all()
1792 .worker_threads(workers)
1793 .thread_stack_size(rt_config.stack_size)
1794 .build()
1795 .map_err(|err| format!("tokio runtime: {err}"))?;
1796
1797 let (runtime, _auth_store, _telemetry_guard) =
1801 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1802 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1803 let signal_runtime = runtime.clone();
1804 tokio_runtime.block_on(async move {
1805 spawn_lifecycle_signal_handler(signal_runtime).await;
1806 spawn_pg_listener(&config, &runtime);
1807 let wire_rt = Arc::new(runtime);
1808 let listener = tokio::net::TcpListener::bind(&wire_addr)
1809 .await
1810 .map_err(|err| {
1811 let reason = format!("wire listener bind {wire_addr}: {err}");
1812 transport_readiness.failed(
1813 "wire",
1814 &wire_addr,
1815 config.wire_bind_explicit,
1816 reason.clone(),
1817 );
1818 if config.wire_bind_explicit {
1819 format!("explicit {reason}")
1820 } else {
1821 reason
1822 }
1823 })?;
1824 transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1825 crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1826 .await
1827 .map_err(|e| e.to_string())
1828 })
1829}
1830
1831#[inline(never)]
1832fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1833 let rt_config = detect_runtime_config();
1834 let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1835 let cli_telemetry = config.telemetry.clone();
1836 let db_options = config.to_db_options()?;
1837
1838 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1839 .enable_all()
1840 .worker_threads(workers)
1841 .thread_stack_size(rt_config.stack_size)
1842 .build()
1843 .map_err(|err| format!("tokio runtime: {err}"))?;
1844
1845 let (runtime, _auth_store, _telemetry_guard) =
1846 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1847 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1848 let signal_runtime = runtime.clone();
1849 tokio_runtime.block_on(async move {
1850 spawn_lifecycle_signal_handler(signal_runtime).await;
1851 let cfg = crate::wire::PgWireConfig {
1852 bind_addr: pg_addr,
1853 ..Default::default()
1854 };
1855 crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1856 .await
1857 .map_err(|e| e.to_string())
1858 })
1859}
1860
1861#[inline(never)]
1862fn build_runtime_and_auth_store(
1863 db_options: RedDBOptions,
1864 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1865) -> Result<
1866 (
1867 RedDBRuntime,
1868 Arc<AuthStore>,
1869 Option<crate::telemetry::TelemetryGuard>,
1870 ),
1871 String,
1872> {
1873 build_runtime_with_telemetry(db_options, cli_telemetry)
1880}
1881
1882pub(crate) fn build_runtime_with_telemetry(
1892 db_options: RedDBOptions,
1893 cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1894) -> Result<
1895 (
1896 RedDBRuntime,
1897 Arc<AuthStore>,
1898 Option<crate::telemetry::TelemetryGuard>,
1899 ),
1900 String,
1901> {
1902 let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1903 let msg = err.to_string();
1909 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1910 phase: "runtime_construction".to_string(),
1911 error: msg.clone(),
1912 }
1913 .emit_global();
1914 msg
1915 })?;
1916
1917 crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1922 let msg = err.to_string();
1923 crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1924 phase: "lease_loop".to_string(),
1925 error: msg.clone(),
1926 }
1927 .emit_global();
1928 msg
1929 })?;
1930
1931 if let Some(data_path) = db_options.data_path.as_deref() {
1935 let watch_dir = data_path.parent().unwrap_or(data_path);
1936 crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1937 }
1938
1939 {
1943 let config_path = crate::runtime::config_overlay::config_file_path();
1944 let store = runtime.db().store();
1945 crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1946 }
1947
1948 let merged = merge_telemetry_with_config(
1951 cli_telemetry
1952 .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1953 &runtime,
1954 );
1955 let telemetry_guard = crate::telemetry::init(merged);
1956
1957 let no_auth = no_auth_active(&db_options);
1958 let auth_store =
1959 if db_options.auth.vault_enabled {
1960 let pager =
1961 runtime.db().store().pager().cloned().ok_or_else(|| {
1962 "vault requires a paged database (persistent mode)".to_string()
1963 })?;
1964 let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1965 .map_err(|err| err.to_string())?;
1966 Arc::new(store)
1967 } else {
1968 Arc::new(AuthStore::new(db_options.auth.clone()))
1969 };
1970 auth_store.configure_control_events(
1971 runtime.control_event_ledger(),
1972 runtime.control_event_config(),
1973 );
1974 if no_auth {
1979 eprintln!("{NO_AUTH_WARNING}");
1980 tracing::warn!("{NO_AUTH_WARNING}");
1981 } else {
1982 apply_preset(&runtime, &auth_store)?;
1983 maybe_apply_policy_break_glass(&auth_store);
1984 }
1985
1986 {
1988 let store = Arc::clone(&auth_store);
1989 std::thread::Builder::new()
1990 .name("reddb-session-purge".into())
1991 .spawn(move || loop {
1992 std::thread::sleep(std::time::Duration::from_secs(300));
1993 store.purge_expired_sessions();
1994 })
1995 .ok();
1996 }
1997
1998 Ok((runtime, auth_store, telemetry_guard))
1999}
2000
2001fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
2005 use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
2006
2007 let enabled = std::env::var(BREAK_GLASS_ENV)
2008 .ok()
2009 .map(|v| {
2010 let trimmed = v.trim().to_ascii_lowercase();
2011 matches!(trimmed.as_str(), "1" | "true" | "yes")
2012 })
2013 .unwrap_or(false);
2014 if !enabled {
2015 return;
2016 }
2017 let now = crate::utils::now_unix_millis() as u128;
2018 match auth_store.apply_policy_break_glass(now) {
2019 Ok(()) => {
2020 tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2021 }
2022 Err(err) => {
2023 tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2024 }
2025 }
2026}
2027
2028pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2033pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2034pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2035
2036pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2038pub(crate) const PRESET_SIMPLE: &str = "simple";
2039pub(crate) const PRESET_PRODUCTION: &str = "production";
2040pub(crate) const PRESET_REGULATED: &str = "regulated";
2041
2042pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2046pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2047pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2048pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2049pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2050
2051pub(crate) fn apply_preset(
2060 runtime: &RedDBRuntime,
2061 auth_store: &Arc<AuthStore>,
2062) -> Result<(), String> {
2063 let store = runtime.db().store();
2064
2065 if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2066 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2067 runtime,
2068 &runtime.config_registry(),
2069 )?;
2070 tracing::info!("bootstrap state present, skipping preset application");
2071 return Ok(());
2072 }
2073
2074 for var in ["REDDB_USERNAME", "REDDB_PASSWORD"] {
2078 crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2079 }
2080
2081 let preset = std::env::var(PRESET_ENV)
2082 .ok()
2083 .map(|s| s.trim().to_string())
2084 .filter(|s| !s.is_empty())
2085 .unwrap_or_else(|| PRESET_SIMPLE.to_string());
2086
2087 if let Ok(path) = std::env::var(crate::cli::bootstrap_manifest::MANIFEST_ENV) {
2088 let path = path.trim();
2089 if !path.is_empty() {
2090 let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2091 runtime,
2092 auth_store,
2093 &runtime.config_registry(),
2094 std::path::Path::new(path),
2095 )?;
2096 persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2097 tracing::info!("bootstrap manifest applied");
2098 return Ok(());
2099 }
2100 }
2101
2102 let first_admin_id = match preset.as_str() {
2103 PRESET_SIMPLE => {
2104 None
2108 }
2109 PRESET_PRODUCTION => Some(apply_production_preset(auth_store)?),
2110 PRESET_REGULATED => {
2111 apply_regulated_preset(runtime, auth_store)?;
2112 None
2113 }
2114 other => {
2115 return Err(format!(
2116 "REDDB_PRESET={other:?} is not recognised (expected `simple`, `production`, or `regulated`)"
2117 ));
2118 }
2119 };
2120
2121 persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2122 tracing::info!(preset = %preset, "bootstrap preset applied");
2123 Ok(())
2124}
2125
2126fn apply_production_preset(auth_store: &Arc<AuthStore>) -> Result<String, String> {
2127 use crate::auth::store::PrincipalRef;
2128 use crate::auth::{policies::Policy, UserId};
2129
2130 let username = std::env::var("REDDB_USERNAME")
2131 .ok()
2132 .filter(|s| !s.is_empty())
2133 .ok_or_else(|| {
2134 "REDDB_PRESET=production requires REDDB_USERNAME (or REDDB_USERNAME_FILE)".to_string()
2135 })?;
2136 let password = std::env::var("REDDB_PASSWORD")
2137 .ok()
2138 .filter(|s| !s.is_empty())
2139 .ok_or_else(|| {
2140 "REDDB_PRESET=production requires REDDB_PASSWORD (or REDDB_PASSWORD_FILE)".to_string()
2141 })?;
2142
2143 let result = auth_store
2146 .bootstrap_system_admin(&username, &password)
2147 .map_err(|err| format!("bootstrap first admin: {err}"))?;
2148 let first_admin = UserId::platform(result.user.username.clone());
2149
2150 let policy = Policy::from_json_str(&format!(
2152 r#"{{
2153 "id": "{id}",
2154 "version": 1,
2155 "statements": [{{
2156 "effect": "allow",
2157 "actions": ["*"],
2158 "resources": ["*"]
2159 }}]
2160 }}"#,
2161 id = FIRST_ADMIN_ALLOW_ALL_POLICY
2162 ))
2163 .map_err(|err| format!("compile allow-all policy: {err}"))?;
2164 auth_store
2165 .put_policy(policy)
2166 .map_err(|err| format!("install allow-all policy: {err}"))?;
2167
2168 auth_store
2170 .attach_policy(
2171 PrincipalRef::User(first_admin.clone()),
2172 FIRST_ADMIN_ALLOW_ALL_POLICY,
2173 )
2174 .map_err(|err| format!("attach allow-all policy: {err}"))?;
2175
2176 Ok(first_admin.to_string())
2177}
2178
2179fn apply_regulated_preset(
2180 runtime: &RedDBRuntime,
2181 auth_store: &Arc<AuthStore>,
2182) -> Result<(), String> {
2183 use crate::auth::policies::Policy;
2184 use crate::auth::registry::EvidenceRequirement;
2185
2186 runtime.query_audit().enable_infrastructure();
2187
2188 let policy = Policy::from_json_str(&format!(
2189 r#"{{
2190 "id": "{id}",
2191 "version": 1,
2192 "statements": [
2193 {{
2194 "effect": "deny",
2195 "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2196 "resources": ["policy:{id}"]
2197 }},
2198 {{
2199 "effect": "deny",
2200 "actions": ["config:write"],
2201 "resources": [
2202 "config:{audit}.*",
2203 "config:{evidence}.*",
2204 "config:{query_audit}.*"
2205 ]
2206 }}
2207 ]
2208 }}"#,
2209 id = REGULATED_PROTECT_MANAGED_POLICY,
2210 audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2211 evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2212 query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2213 ))
2214 .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2215 auth_store
2216 .put_policy(policy)
2217 .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2218
2219 let now_ms = crate::utils::now_unix_millis() as u128;
2220 let entries = vec![
2221 regulated_registry_entry(
2222 REGULATED_PROTECT_MANAGED_POLICY,
2223 crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2224 "iam_policy",
2225 "policy:*",
2226 &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2227 EvidenceRequirement::Metadata,
2228 now_ms,
2229 ),
2230 regulated_registry_entry(
2231 REGULATED_AUDIT_CONFIG_NAMESPACE,
2232 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2233 "config_namespace",
2234 "config:write",
2235 &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2236 EvidenceRequirement::Metadata,
2237 now_ms,
2238 ),
2239 regulated_registry_entry(
2240 REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2241 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2242 "config_namespace",
2243 "config:write",
2244 &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2245 EvidenceRequirement::Metadata,
2246 now_ms,
2247 ),
2248 regulated_registry_entry(
2249 REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2250 crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2251 "config_namespace",
2252 "config:write",
2253 &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2254 EvidenceRequirement::Metadata,
2255 now_ms,
2256 ),
2257 ];
2258
2259 for entry in entries.iter().cloned() {
2260 runtime
2261 .config_registry()
2262 .restore_bootstrap_entry(entry)
2263 .map_err(|err| format!("install regulated registry entry: {err}"))?;
2264 }
2265 crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2266 Ok(())
2267}
2268
2269fn regulated_registry_entry(
2270 id: &str,
2271 resource_type: &str,
2272 schema: &str,
2273 required_action: &str,
2274 required_resource: &str,
2275 evidence_requirement: crate::auth::registry::EvidenceRequirement,
2276 updated_at_ms: u128,
2277) -> crate::auth::registry::ConfigRegistryEntry {
2278 crate::auth::registry::ConfigRegistryEntry {
2279 id: id.to_string(),
2280 version: 1,
2281 resource_type: resource_type.to_string(),
2282 schema: schema.to_string(),
2283 mutability: crate::auth::registry::Mutability::Immutable,
2284 sensitivity: crate::auth::registry::Sensitivity::Internal,
2285 managed: true,
2286 required_action: required_action.to_string(),
2287 required_resource: required_resource.to_string(),
2288 evidence_requirement,
2289 updated_by: "system:regulated-preset".to_string(),
2290 updated_at_ms,
2291 }
2292}
2293
2294fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2295 let store = runtime.db().store();
2296 let mut tree = crate::serde_json::Map::new();
2297 tree.insert(
2298 BOOTSTRAP_COMPLETED_KEY.to_string(),
2299 crate::serde_json::Value::Bool(true),
2300 );
2301 tree.insert(
2302 BOOTSTRAP_PRESET_KEY.to_string(),
2303 crate::serde_json::Value::String(preset.to_string()),
2304 );
2305 if let Some(id) = first_admin_id {
2306 tree.insert(
2307 BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2308 crate::serde_json::Value::String(id.to_string()),
2309 );
2310 }
2311 let json = crate::serde_json::Value::Object(tree);
2312 store.set_config_tree("", &json);
2313}
2314
2315fn merge_telemetry_with_config(
2326 mut cli: crate::telemetry::TelemetryConfig,
2327 runtime: &RedDBRuntime,
2328) -> crate::telemetry::TelemetryConfig {
2329 use crate::storage::schema::Value;
2330
2331 let store = runtime.db().store();
2332
2333 if !cli.level_explicit {
2334 if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2335 cli.level_filter = v.to_string();
2336 }
2337 }
2338 if !cli.format_explicit {
2339 if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2340 if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2341 cli.format = parsed;
2342 }
2343 }
2344 }
2345 if !cli.rotation_keep_days_explicit {
2346 match store.get_config("red.logging.keep_days") {
2347 Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2348 cli.rotation_keep_days = n as u16
2349 }
2350 Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2351 cli.rotation_keep_days = n as u16
2352 }
2353 Some(Value::Text(v)) => {
2354 if let Ok(n) = v.parse::<u16>() {
2355 cli.rotation_keep_days = n;
2356 }
2357 }
2358 _ => {}
2359 }
2360 }
2361 if !cli.file_prefix_explicit {
2362 if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2363 if !v.is_empty() {
2364 cli.file_prefix = v.to_string();
2365 }
2366 }
2367 }
2368 if !cli.log_dir_explicit && !cli.log_file_disabled {
2371 if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2372 if !v.is_empty() {
2373 cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2374 }
2375 }
2376 }
2377
2378 cli
2379}
2380
2381#[cfg(test)]
2382mod telemetry_merge_tests {
2383 use super::*;
2384 use crate::telemetry::{LogFormat, TelemetryConfig};
2385
2386 fn fresh_runtime() -> RedDBRuntime {
2387 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2388 }
2389
2390 fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2391 runtime
2392 .db()
2393 .store()
2394 .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2395 }
2396
2397 fn cli_base() -> TelemetryConfig {
2398 TelemetryConfig {
2401 log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2402 format: LogFormat::Json,
2403 ..Default::default()
2404 }
2405 }
2406
2407 #[test]
2408 fn config_log_dir_promoted_when_flag_absent() {
2409 let runtime = fresh_runtime();
2410 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2411 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2412 assert_eq!(
2413 merged.log_dir.as_deref(),
2414 Some(std::path::Path::new("/var/log/reddb"))
2415 );
2416 }
2417
2418 #[test]
2419 fn explicit_log_dir_wins_over_config() {
2420 let runtime = fresh_runtime();
2421 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2422 let mut cli = cli_base();
2423 cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2424 cli.log_dir_explicit = true;
2425 let merged = merge_telemetry_with_config(cli, &runtime);
2426 assert_eq!(
2427 merged.log_dir.as_deref(),
2428 Some(std::path::Path::new("/custom/dir"))
2429 );
2430 }
2431
2432 #[test]
2433 fn no_log_file_beats_config_log_dir() {
2434 let runtime = fresh_runtime();
2435 set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2436 let mut cli = cli_base();
2437 cli.log_dir = None;
2438 cli.log_file_disabled = true;
2439 let merged = merge_telemetry_with_config(cli, &runtime);
2440 assert!(
2441 merged.log_dir.is_none(),
2442 "--no-log-file must veto config dir"
2443 );
2444 }
2445
2446 #[test]
2447 fn config_format_promoted_on_non_tty_default() {
2448 let runtime = fresh_runtime();
2452 set_str(&runtime, "red.logging.format", "pretty");
2453 let merged = merge_telemetry_with_config(cli_base(), &runtime);
2454 assert_eq!(merged.format, LogFormat::Pretty);
2455 }
2456
2457 #[test]
2458 fn explicit_format_wins_over_config() {
2459 let runtime = fresh_runtime();
2460 set_str(&runtime, "red.logging.format", "pretty");
2461 let mut cli = cli_base();
2462 cli.format = LogFormat::Json;
2463 cli.format_explicit = true;
2464 let merged = merge_telemetry_with_config(cli, &runtime);
2465 assert_eq!(merged.format, LogFormat::Json);
2466 }
2467}
2468
2469#[inline(never)]
2470fn build_http_server(
2471 runtime: RedDBRuntime,
2472 auth_store: Arc<AuthStore>,
2473 bind_addr: String,
2474) -> RedDBServer {
2475 build_http_server_with_transport_readiness(
2476 runtime,
2477 auth_store,
2478 bind_addr,
2479 TransportReadiness::default(),
2480 )
2481}
2482
2483fn apply_http_limits(
2489 server: RedDBServer,
2490 config: &ServerCommandConfig,
2491 runtime: &RedDBRuntime,
2492) -> RedDBServer {
2493 let store = runtime.db().store();
2494 let resolved =
2495 crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2496 .get_config(key)
2497 {
2498 Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2499 Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2500 Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2501 _ => None,
2502 });
2503 tracing::info!(
2504 target: "reddb::http_limits",
2505 max_handlers = resolved.max_handlers,
2506 handler_timeout_ms = resolved.handler_timeout_ms,
2507 retry_after_secs = resolved.retry_after_secs,
2508 max_inflight_per_principal = resolved.max_inflight_per_principal,
2509 "http_limits resolved"
2510 );
2511 server.with_http_limits(resolved)
2512}
2513
2514fn apply_ui_bundle(
2524 server: RedDBServer,
2525 config: &ServerCommandConfig,
2526) -> Result<RedDBServer, String> {
2527 if !config.ui {
2528 return Ok(server);
2529 }
2530 let ui_dir = match &config.ui_dir {
2531 Some(dir) => dir.clone(),
2532 None => {
2533 let cache_root = crate::server::ui_bundle_resolver::reddb_user_cache_root()
2534 .unwrap_or_else(|_| std::env::temp_dir().join("reddb"));
2535 crate::server::ui_bundle_resolver::resolve_ui_bundle(
2536 &cache_root,
2537 &crate::server::ui_bundle_resolver::HttpFetcher,
2538 )
2539 .map_err(|err| format!("resolve red-ui bundle for --ui: {err}"))?
2540 }
2541 };
2542 tracing::info!(target: "reddb::ui", dir = %ui_dir.display(), "serving red-ui bundle on HTTP surface");
2543 Ok(server.with_ui_dir(ui_dir))
2544}
2545
2546#[inline(never)]
2547fn build_http_server_with_transport_readiness(
2548 runtime: RedDBRuntime,
2549 auth_store: Arc<AuthStore>,
2550 bind_addr: String,
2551 transport_readiness: TransportReadiness,
2552) -> RedDBServer {
2553 RedDBServer::with_options(
2554 runtime,
2555 ServerOptions {
2556 bind_addr,
2557 transport_readiness,
2558 ..ServerOptions::default()
2559 },
2560 )
2561 .with_auth(auth_store)
2562}
2563
2564#[inline(never)]
2568fn build_admin_only_server(
2569 runtime: RedDBRuntime,
2570 auth_store: Arc<AuthStore>,
2571 bind_addr: String,
2572) -> RedDBServer {
2573 RedDBServer::with_options(
2574 runtime,
2575 ServerOptions {
2576 bind_addr,
2577 surface: crate::server::ServerSurface::AdminOnly,
2578 ..ServerOptions::default()
2579 },
2580 )
2581 .with_auth(auth_store)
2582}
2583
2584#[inline(never)]
2588fn build_metrics_only_server(
2589 runtime: RedDBRuntime,
2590 auth_store: Arc<AuthStore>,
2591 bind_addr: String,
2592) -> RedDBServer {
2593 RedDBServer::with_options(
2594 runtime,
2595 ServerOptions {
2596 bind_addr,
2597 surface: crate::server::ServerSurface::MetricsOnly,
2598 ..ServerOptions::default()
2599 },
2600 )
2601 .with_auth(auth_store)
2602}
2603
2604fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2608 if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2609 let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2610 let _ = server.serve_in_background();
2611 tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2612 }
2613 if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2614 let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2615 let _ = server.serve_in_background();
2616 tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2617 }
2618}
2619
2620#[inline(never)]
2621fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2622 let cli_telemetry = config.telemetry.clone();
2623 let mut transport_readiness = TransportReadiness::default();
2624 let Some(listener) = bind_listener_for_startup(
2625 &mut transport_readiness,
2626 "http",
2627 &bind_addr,
2628 config.http_bind_explicit,
2629 )?
2630 else {
2631 return Err(format!(
2632 "no HTTP listener started; implicit bind {} failed",
2633 bind_addr
2634 ));
2635 };
2636 let db_options = config.to_db_options()?;
2637 let (runtime, auth_store, _telemetry_guard) =
2638 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2639 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2640 spawn_admin_metrics_listeners(&runtime, &auth_store);
2641 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2642 let server = build_http_server_with_transport_readiness(
2643 runtime.clone(),
2644 auth_store,
2645 bind_addr.clone(),
2646 transport_readiness,
2647 );
2648 let server = apply_http_limits(server, &config, &runtime);
2649 let server = apply_ui_bundle(server, &config)?;
2650 tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2651 server.serve_on(listener).map_err(|err| err.to_string())
2652}
2653
2654fn spawn_http_tls_listener(
2660 config: &ServerCommandConfig,
2661 runtime: &RedDBRuntime,
2662 auth_store: &Arc<AuthStore>,
2663) -> Result<(), String> {
2664 let Some(addr) = config.http_tls_bind_addr.clone() else {
2665 return Ok(());
2666 };
2667
2668 let tls_config = resolve_http_tls_config(config)?;
2669 let server_config = crate::server::tls::build_server_config(&tls_config)
2670 .map_err(|err| format!("HTTP TLS: {err}"))?;
2671
2672 let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2673 let server = apply_http_limits(server, config, runtime);
2674 let _handle = server.serve_tls_in_background(server_config);
2675 tracing::info!(
2676 transport = "https",
2677 bind = %addr,
2678 mtls = %tls_config.client_ca_path.is_some(),
2679 "TLS listener online"
2680 );
2681 Ok(())
2682}
2683
2684fn resolve_http_tls_config(
2686 config: &ServerCommandConfig,
2687) -> Result<crate::server::tls::HttpTlsConfig, String> {
2688 match (&config.http_tls_cert, &config.http_tls_key) {
2689 (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2690 cert_path: cert.clone(),
2691 key_path: key.clone(),
2692 client_ca_path: config.http_tls_client_ca.clone(),
2693 }),
2694 (None, None) => {
2695 let dir = config
2697 .path
2698 .as_ref()
2699 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2700 .unwrap_or_else(|| std::path::PathBuf::from("."));
2701 let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2702 .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2703 Ok(crate::server::tls::HttpTlsConfig {
2704 cert_path: auto.cert_path,
2705 key_path: auto.key_path,
2706 client_ca_path: config.http_tls_client_ca.clone(),
2707 })
2708 }
2709 _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2710 }
2711}
2712
2713#[inline(never)]
2714fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2715 let workers = config.workers;
2716 let cli_telemetry = config.telemetry.clone();
2717 let db_options = config.to_db_options()?;
2718 let rt_config = detect_runtime_config();
2719 let mut transport_readiness = TransportReadiness::default();
2720 let Some(grpc_listener) = bind_listener_for_startup(
2721 &mut transport_readiness,
2722 "grpc",
2723 &bind_addr,
2724 config.grpc_bind_explicit,
2725 )?
2726 else {
2727 return Err(format!(
2728 "no gRPC listener started; implicit bind {} failed",
2729 bind_addr
2730 ));
2731 };
2732
2733 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2734
2735 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2736 .enable_all()
2737 .worker_threads(worker_threads)
2738 .thread_stack_size(rt_config.stack_size)
2739 .build()
2740 .map_err(|err| format!("tokio runtime: {err}"))?;
2741
2742 let (runtime, auth_store, _telemetry_guard) =
2744 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2745 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2746 let signal_runtime = runtime.clone();
2747 tokio_runtime.block_on(async move {
2748 spawn_lifecycle_signal_handler(signal_runtime).await;
2749 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2751
2752 spawn_pg_listener(&config, &runtime);
2754
2755 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2759
2760 let server = RedDBGrpcServer::with_options(
2761 runtime,
2762 GrpcServerOptions {
2763 bind_addr: bind_addr.clone(),
2764 tls: None,
2765 },
2766 auth_store,
2767 );
2768
2769 tracing::info!(
2770 transport = "grpc",
2771 bind = %bind_addr,
2772 cpus = rt_config.available_cpus,
2773 workers = worker_threads,
2774 "listener online"
2775 );
2776 server
2777 .serve_on(grpc_listener)
2778 .await
2779 .map_err(|err| err.to_string())
2780 })
2781}
2782
2783#[inline(never)]
2784fn run_dual_server(
2785 config: ServerCommandConfig,
2786 grpc_bind_addr: String,
2787 http_bind_addr: String,
2788) -> Result<(), String> {
2789 let workers = config.workers;
2790 let cli_telemetry = config.telemetry.clone();
2791 let db_options = config.to_db_options()?;
2792 let rt_config = detect_runtime_config();
2793 let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2794 let mut transport_readiness = TransportReadiness::default();
2795 let http_listener = bind_listener_for_startup(
2796 &mut transport_readiness,
2797 "http",
2798 &http_bind_addr,
2799 config.http_bind_explicit,
2800 )?;
2801 let grpc_listener = bind_listener_for_startup(
2802 &mut transport_readiness,
2803 "grpc",
2804 &grpc_bind_addr,
2805 config.grpc_bind_explicit,
2806 )?;
2807 if http_listener.is_none() && grpc_listener.is_none() {
2808 return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2809 }
2810 let (runtime, auth_store, _telemetry_guard) =
2811 build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2812 let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2813
2814 spawn_admin_metrics_listeners(&runtime, &auth_store);
2815 spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2816
2817 let http_handle = if let Some(listener) = http_listener {
2818 let http_server = build_http_server_with_transport_readiness(
2819 runtime.clone(),
2820 auth_store.clone(),
2821 http_bind_addr.clone(),
2822 transport_readiness.clone(),
2823 );
2824 let http_server = apply_http_limits(http_server, &config, &runtime);
2825 let http_server = apply_ui_bundle(http_server, &config)?;
2826 Some(http_server.serve_in_background_on(listener))
2827 } else {
2828 None
2829 };
2830
2831 thread::sleep(Duration::from_millis(150));
2832 if let Some(handle) = http_handle.as_ref() {
2833 if handle.is_finished() {
2834 let handle = http_handle.unwrap();
2835 return match handle.join() {
2836 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2837 Ok(Err(err)) => Err(err.to_string()),
2838 Err(_) => Err("HTTP server thread panicked".to_string()),
2839 };
2840 }
2841 }
2842 if grpc_listener.is_none() {
2843 let Some(handle) = http_handle else {
2844 return Err("no listener started".to_string());
2845 };
2846 return match handle.join() {
2847 Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2848 Ok(Err(err)) => Err(err.to_string()),
2849 Err(_) => Err("HTTP server thread panicked".to_string()),
2850 };
2851 }
2852 let grpc_listener = grpc_listener.expect("checked above");
2853
2854 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2855 .enable_all()
2856 .worker_threads(worker_threads)
2857 .thread_stack_size(rt_config.stack_size)
2858 .build()
2859 .map_err(|err| format!("tokio runtime: {err}"))?;
2860
2861 let signal_runtime = runtime.clone();
2862 tokio_runtime.block_on(async move {
2863 spawn_lifecycle_signal_handler(signal_runtime).await;
2864 spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2866
2867 spawn_pg_listener(&config, &runtime);
2869
2870 spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2872
2873 let server = RedDBGrpcServer::with_options(
2874 runtime,
2875 GrpcServerOptions {
2876 bind_addr: grpc_bind_addr.clone(),
2877 tls: None,
2878 },
2879 auth_store,
2880 );
2881
2882 tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2883 tracing::info!(
2884 transport = "grpc",
2885 bind = %grpc_bind_addr,
2886 cpus = rt_config.available_cpus,
2887 workers = worker_threads,
2888 "listener online"
2889 );
2890 server
2891 .serve_on(grpc_listener)
2892 .await
2893 .map_err(|err| err.to_string())
2894 })
2895}
2896
2897#[cfg(test)]
2898mod tests {
2899 use super::*;
2900
2901 #[test]
2902 fn render_systemd_unit_contains_expected_execstart() {
2903 let config = SystemdServiceConfig {
2904 service_name: "reddb".to_string(),
2905 binary_path: PathBuf::from("/usr/local/bin/red"),
2906 run_user: "reddb".to_string(),
2907 run_group: "reddb".to_string(),
2908 data_path: reddb_file::default_service_database_path(),
2909 router_bind_addr: None,
2910 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2911 http_bind_addr: None,
2912 };
2913
2914 let unit = render_systemd_unit(&config);
2915 assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2916 assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2917 }
2918
2919 #[test]
2920 fn systemd_service_config_derives_paths() {
2921 let config = SystemdServiceConfig {
2922 service_name: "reddb-api".to_string(),
2923 binary_path: PathBuf::from("/usr/local/bin/red"),
2924 run_user: "reddb".to_string(),
2925 run_group: "reddb".to_string(),
2926 data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2927 router_bind_addr: None,
2928 grpc_bind_addr: None,
2929 http_bind_addr: Some("127.0.0.1:5055".to_string()),
2930 };
2931
2932 assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2933 assert_eq!(
2934 config.unit_path(),
2935 PathBuf::from("/etc/systemd/system/reddb-api.service")
2936 );
2937 }
2938
2939 #[test]
2940 fn render_systemd_unit_supports_dual_transport() {
2941 let config = SystemdServiceConfig {
2942 service_name: "reddb".to_string(),
2943 binary_path: PathBuf::from("/usr/local/bin/red"),
2944 run_user: "reddb".to_string(),
2945 run_group: "reddb".to_string(),
2946 data_path: reddb_file::default_service_database_path(),
2947 router_bind_addr: None,
2948 grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2949 http_bind_addr: Some("0.0.0.0:5055".to_string()),
2950 };
2951
2952 let unit = render_systemd_unit(&config);
2953 assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2954 assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2955 }
2956
2957 #[test]
2958 fn render_systemd_unit_supports_router_mode() {
2959 let config = SystemdServiceConfig {
2960 service_name: "reddb".to_string(),
2961 binary_path: PathBuf::from("/usr/local/bin/red"),
2962 run_user: "reddb".to_string(),
2963 run_group: "reddb".to_string(),
2964 data_path: reddb_file::default_service_database_path(),
2965 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2966 grpc_bind_addr: None,
2967 http_bind_addr: None,
2968 };
2969
2970 let unit = render_systemd_unit(&config);
2971 assert!(unit.contains("--bind 127.0.0.1:5050"));
2972 assert!(!unit.contains("--grpc-bind"));
2973 assert!(!unit.contains("--http-bind"));
2974 }
2975
2976 #[test]
2977 fn explicit_bind_collision_is_fatal() {
2978 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2979 let addr = held.local_addr().expect("held addr").to_string();
2980 let mut readiness = TransportReadiness::default();
2981
2982 let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2983
2984 assert!(error.contains("explicit http listener bind"));
2985 assert_eq!(readiness.active.len(), 0);
2986 assert_eq!(readiness.failed.len(), 1);
2987 assert!(readiness.failed[0].explicit);
2988 assert_eq!(readiness.failed[0].bind_addr, addr);
2989 }
2990
2991 fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2998 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2999 LOCK.get_or_init(|| std::sync::Mutex::new(()))
3000 }
3001
3002 fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
3003 ServerCommandConfig {
3004 path: None,
3005 router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3006 router_bind_explicit: false,
3007 grpc_bind_addr: None,
3008 grpc_bind_explicit: false,
3009 grpc_tls_bind_addr: None,
3010 grpc_tls_cert: None,
3011 grpc_tls_key: None,
3012 grpc_tls_client_ca: None,
3013 http_bind_addr: None,
3014 http_bind_explicit: false,
3015 http_tls_bind_addr: None,
3016 http_tls_cert: None,
3017 http_tls_key: None,
3018 http_tls_client_ca: None,
3019 wire_bind_addr: None,
3020 wire_bind_explicit: false,
3021 wire_tls_bind_addr: None,
3022 wire_tls_cert: None,
3023 wire_tls_key: None,
3024 pg_bind_addr: None,
3025 create_if_missing: true,
3026 read_only: false,
3027 role: "standalone".to_string(),
3028 primary_addr: None,
3029 storage_profile: StorageProfileSelection::embedded_single_file(),
3030 vault: true,
3033 no_auth,
3034 workers: None,
3035 telemetry: None,
3036 http_limits_cli: crate::server::HttpLimitsCliInput::default(),
3037 ui: false,
3038 ui_dir: None,
3039 }
3040 }
3041
3042 #[test]
3043 fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
3044 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3045 unsafe {
3050 std::env::set_var("REDDB_USERNAME", "admin");
3051 std::env::set_var("REDDB_PASSWORD", "hunter2");
3052 }
3053 let config = no_auth_test_config(true);
3054 let options = config.to_db_options().expect("to_db_options");
3055
3056 assert!(no_auth_active(&options), "metadata should be stamped");
3057 assert!(!options.auth.enabled, "auth.enabled must be forced off");
3058 assert!(
3059 !options.auth.require_auth,
3060 "require_auth must be forced off"
3061 );
3062 assert!(
3063 !options.auth.vault_enabled,
3064 "vault_enabled must be forced off (overrides --vault)"
3065 );
3066 assert_eq!(
3067 options.metadata.get(NO_AUTH_META).map(String::as_str),
3068 Some("true"),
3069 );
3070
3071 unsafe {
3073 std::env::remove_var("REDDB_USERNAME");
3074 std::env::remove_var("REDDB_PASSWORD");
3075 }
3076 }
3077
3078 #[test]
3079 fn default_behaviour_without_no_auth_flag_is_unchanged() {
3080 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3081 let config = no_auth_test_config(false);
3082 let options = config.to_db_options().expect("to_db_options");
3083
3084 assert!(
3085 !no_auth_active(&options),
3086 "default boot must not be marked no-auth"
3087 );
3088 assert!(
3089 options.metadata.get(NO_AUTH_META).is_none(),
3090 "metadata key must be absent when flag is off"
3091 );
3092 assert!(options.auth.vault_enabled);
3094 }
3095
3096 #[test]
3097 fn no_auth_active_blocks_bootstrap_from_env() {
3098 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3099 unsafe {
3104 std::env::set_var("REDDB_USERNAME", "admin");
3105 std::env::set_var("REDDB_PASSWORD", "hunter2");
3106 }
3107
3108 let options = no_auth_test_config(true)
3109 .to_db_options()
3110 .expect("to_db_options");
3111
3112 let auth_store = AuthStore::new(options.auth.clone());
3116 if !no_auth_active(&options) {
3117 auth_store.bootstrap_from_env();
3118 }
3119
3120 assert!(
3121 auth_store.needs_bootstrap(),
3122 "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3123 );
3124
3125 unsafe {
3127 std::env::remove_var("REDDB_USERNAME");
3128 std::env::remove_var("REDDB_PASSWORD");
3129 }
3130 }
3131
3132 fn clear_preset_env() {
3139 unsafe {
3141 std::env::remove_var(PRESET_ENV);
3142 std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3143 std::env::remove_var("REDDB_USERNAME");
3144 std::env::remove_var("REDDB_PASSWORD");
3145 std::env::remove_var("REDDB_USERNAME_FILE");
3146 std::env::remove_var("REDDB_PASSWORD_FILE");
3147 }
3148 }
3149
3150 fn clear_backup_env() {
3151 unsafe {
3153 std::env::remove_var("REDDB_BACKUP_S3_ENDPOINT");
3154 std::env::remove_var("REDDB_BACKUP_S3_BUCKET");
3155 std::env::remove_var("REDDB_BACKUP_S3_PREFIX");
3156 std::env::remove_var("REDDB_BACKUP_S3_ACCESS_KEY_ID");
3157 std::env::remove_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY");
3158 std::env::remove_var("REDDB_BACKUP_S3_REGION");
3159 std::env::remove_var("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS");
3160 std::env::remove_var("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS");
3161 std::env::remove_var("REDDB_BACKUP_PAUSE_ON_LAG_SECS");
3162 }
3163 }
3164
3165 fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3166 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3167 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3168 (runtime, auth_store)
3169 }
3170
3171 #[test]
3172 fn simple_preset_is_default_and_persists_state() {
3173 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3174 clear_preset_env();
3175
3176 let (runtime, auth_store) = fresh_runtime_and_store();
3177 apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3178
3179 assert!(
3181 auth_store.needs_bootstrap(),
3182 "simple preset must not create an admin"
3183 );
3184
3185 let store = runtime.db().store();
3187 let completed = store
3188 .get_config(BOOTSTRAP_COMPLETED_KEY)
3189 .expect("completed key persisted");
3190 assert!(matches!(
3191 completed,
3192 crate::storage::schema::Value::Boolean(true)
3193 ));
3194 let preset = store
3195 .get_config(BOOTSTRAP_PRESET_KEY)
3196 .expect("preset key persisted");
3197 match preset {
3198 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3199 other => panic!("expected Text(simple), got {other:?}"),
3200 }
3201 assert!(
3202 store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3203 "simple preset must not record a first admin"
3204 );
3205
3206 clear_preset_env();
3207 }
3208
3209 #[test]
3210 fn production_preset_creates_first_admin_with_allow_all_policy() {
3211 use crate::auth::policies::{EvalContext, ResourceRef};
3212 use crate::auth::UserId;
3213
3214 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3215 clear_preset_env();
3216 unsafe {
3218 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3219 std::env::set_var("REDDB_USERNAME", "ops");
3220 std::env::set_var("REDDB_PASSWORD", "hunter2");
3221 }
3222
3223 let (runtime, auth_store) = fresh_runtime_and_store();
3224 apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3225
3226 assert!(
3228 !auth_store.needs_bootstrap(),
3229 "production preset must seal bootstrap"
3230 );
3231 let users = auth_store.list_users();
3232 assert_eq!(users.len(), 1);
3233 let admin = &users[0];
3234 assert_eq!(admin.username, "ops");
3235 assert!(
3236 admin.system_owned,
3237 "first admin must be system-owned to pass the managed-config gate"
3238 );
3239 assert!(
3240 admin.tenant_id.is_none(),
3241 "first admin must be platform-scoped (tenant=None)"
3242 );
3243
3244 let policy = auth_store
3246 .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3247 .expect("allow-all policy installed");
3248 assert!(!policy.statements.is_empty());
3249
3250 let actor = UserId::platform("ops");
3253 let ctx = EvalContext {
3254 principal_tenant: None,
3255 current_tenant: None,
3256 peer_ip: None,
3257 mfa_present: false,
3258 now_ms: 1_700_000_000_000,
3259 principal_is_admin_role: true,
3260 principal_is_system_owned: true,
3261 principal_is_platform_scoped: true,
3262 };
3263 let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3264 assert!(
3265 auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3266 "allow-all policy must grant arbitrary actions via the evaluator"
3267 );
3268
3269 let store = runtime.db().store();
3271 match store
3272 .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3273 .expect("first_admin_id persisted")
3274 {
3275 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3276 other => panic!("expected Text(ops), got {other:?}"),
3277 }
3278 match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3279 crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3280 other => panic!("expected Text(production), got {other:?}"),
3281 }
3282
3283 clear_preset_env();
3284 }
3285
3286 #[test]
3287 fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3288 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3289 clear_preset_env();
3290 unsafe {
3292 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3293 }
3294
3295 let (runtime, auth_store) = fresh_runtime_and_store();
3296 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3297
3298 assert!(runtime.query_audit().is_enabled());
3299 assert!(runtime.query_audit().rules().is_empty());
3300 assert!(
3301 runtime
3302 .db()
3303 .store()
3304 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3305 .is_some(),
3306 "regulated preset should create the query-audit stream"
3307 );
3308
3309 runtime
3310 .execute_query("CREATE TABLE docs (id INT)")
3311 .expect("create table");
3312 runtime
3313 .execute_query("INSERT INTO docs (id) VALUES (1)")
3314 .expect("insert");
3315 runtime.execute_query("SELECT * FROM docs").expect("select");
3316 let rows = runtime
3317 .db()
3318 .store()
3319 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3320 .expect("query audit collection")
3321 .query_all(|_| true);
3322 assert!(
3323 rows.is_empty(),
3324 "regulated preset must not globally audit every query"
3325 );
3326
3327 clear_preset_env();
3328 }
3329
3330 #[test]
3331 fn managed_backup_env_rejects_primary_replica_single_file_storage() {
3332 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3333 clear_backup_env();
3334 unsafe {
3336 std::env::set_var("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.test");
3337 std::env::set_var("REDDB_BACKUP_S3_BUCKET", "reddb");
3338 std::env::set_var("REDDB_BACKUP_S3_PREFIX", "clusters/prod");
3339 std::env::set_var("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK");
3340 std::env::set_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK");
3341 }
3342
3343 let mut config = no_auth_test_config(false);
3344 config.role = "primary".to_string();
3345 config.storage_profile = crate::storage::StorageDeployPreset::PrimaryReplicaDev.selection();
3346
3347 let err = config.to_db_options().unwrap_err();
3348 assert!(err.contains("managed backup"), "got: {err}");
3349 assert!(err.contains("operational-directory"), "got: {err}");
3350
3351 clear_backup_env();
3352 }
3353
3354 #[test]
3355 fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3356 use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3357 use crate::auth::store::PrincipalRef;
3358 use crate::auth::{Role, UserId};
3359 use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3360 use crate::storage::schema::Value;
3361
3362 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3363 clear_preset_env();
3364 unsafe {
3366 std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3367 }
3368
3369 let options = no_auth_test_config(false)
3370 .to_db_options()
3371 .expect("regulated options");
3372 assert!(
3373 options.control_events.compliance_mode,
3374 "regulated preset must enable fail-closed control evidence before runtime boot"
3375 );
3376 assert!(
3377 options.query_audit.enabled && options.query_audit.rules.is_empty(),
3378 "regulated preset must enable query-audit infrastructure without global rules"
3379 );
3380
3381 let runtime = RedDBRuntime::with_options(options).expect("runtime");
3382 let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3383 apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3384 runtime.set_auth_store(Arc::clone(&auth_store));
3385
3386 assert!(runtime.control_events_require_persistence());
3387 assert!(runtime.query_audit().is_enabled());
3388 assert!(runtime.query_audit().rules().is_empty());
3389 assert!(auth_store
3390 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3391 .is_some());
3392
3393 let managed_policy = runtime
3394 .config_registry()
3395 .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3396 .expect("regulated managed policy registry entry");
3397 assert!(managed_policy.managed);
3398 assert_eq!(managed_policy.resource_type, "policy");
3399 assert!(
3400 runtime
3401 .config_registry()
3402 .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3403 .expect("regulated audit config namespace")
3404 .managed
3405 );
3406
3407 let registry_rows = runtime
3408 .execute_query(&format!(
3409 "SELECT id, managed FROM red.registry WHERE id = '{}'",
3410 REGULATED_PROTECT_MANAGED_POLICY
3411 ))
3412 .expect("red.registry query");
3413 assert_eq!(registry_rows.result.records.len(), 1);
3414 assert_eq!(
3415 registry_rows.result.records[0].get("managed"),
3416 Some(&Value::Boolean(true))
3417 );
3418
3419 let managed_policy_rows = runtime
3420 .execute_query(&format!(
3421 "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3422 REGULATED_PROTECT_MANAGED_POLICY
3423 ))
3424 .expect("red.managed_policies query");
3425 assert_eq!(managed_policy_rows.result.records.len(), 1);
3426
3427 let capability_rows = runtime
3428 .execute_query(
3429 "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3430 )
3431 .expect("red.control_capabilities query");
3432 assert_eq!(capability_rows.result.records.len(), 1);
3433
3434 auth_store
3435 .create_user("alice", "p", Role::Admin)
3436 .expect("create ordinary admin");
3437 let allow_all = Policy::from_json_str(
3438 r#"{
3439 "id": "alice-allow-all",
3440 "version": 1,
3441 "statements": [{
3442 "effect": "allow",
3443 "actions": ["*"],
3444 "resources": ["*"]
3445 }]
3446 }"#,
3447 )
3448 .expect("allow-all policy");
3449 auth_store.put_policy(allow_all).expect("install allow-all");
3450 auth_store
3451 .attach_policy(
3452 PrincipalRef::User(UserId::platform("alice")),
3453 "alice-allow-all",
3454 )
3455 .expect("attach allow-all");
3456 let ctx = EvalContext {
3457 principal_tenant: None,
3458 current_tenant: None,
3459 peer_ip: None,
3460 mfa_present: false,
3461 now_ms: 1_700_000_000_000,
3462 principal_is_admin_role: true,
3463 principal_is_system_owned: false,
3464 principal_is_platform_scoped: true,
3465 };
3466 assert!(
3467 auth_store.check_policy_authz(
3468 &UserId::platform("alice"),
3469 "policy:drop",
3470 &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
3471 &ctx,
3472 ),
3473 "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
3474 );
3475
3476 set_current_auth_identity("alice".to_string(), Role::Admin);
3477 let denied = runtime.execute_query(&format!(
3478 "DROP POLICY '{}'",
3479 REGULATED_PROTECT_MANAGED_POLICY
3480 ));
3481 clear_current_auth_identity();
3482 let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
3483 assert!(
3484 err.to_string().contains("managed policy"),
3485 "error should name the managed guardrail: {err}"
3486 );
3487 assert!(
3488 auth_store
3489 .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3490 .is_some(),
3491 "denied mutation must leave managed policy installed"
3492 );
3493
3494 let denied_events = runtime
3495 .execute_query(&format!(
3496 "SELECT action, resource, outcome FROM red.control_events \
3497 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
3498 REGULATED_PROTECT_MANAGED_POLICY
3499 ))
3500 .expect("red.control_events denied policy drop");
3501 assert_eq!(denied_events.result.records.len(), 1);
3502 assert_eq!(
3503 denied_events.result.records[0].get("outcome"),
3504 Some(&Value::text("denied"))
3505 );
3506
3507 set_current_auth_identity("alice".to_string(), Role::Admin);
3508 let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
3509 clear_current_auth_identity();
3510 let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
3511 assert!(
3512 err.to_string().contains("managed config"),
3513 "error should name the managed config guardrail: {err}"
3514 );
3515
3516 let denied_config_events = runtime
3517 .execute_query(
3518 "SELECT action, resource, outcome FROM red.control_events \
3519 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
3520 )
3521 .expect("red.control_events denied config write");
3522 assert_eq!(denied_config_events.result.records.len(), 1);
3523 assert_eq!(
3524 denied_config_events.result.records[0].get("outcome"),
3525 Some(&Value::text("denied"))
3526 );
3527
3528 runtime
3529 .execute_query("CREATE TABLE regulated_docs (id INT)")
3530 .expect("create user table");
3531 runtime
3532 .execute_query("SELECT * FROM regulated_docs")
3533 .expect("select user table");
3534 let audit_rows = runtime
3535 .db()
3536 .store()
3537 .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3538 .expect("query audit collection")
3539 .query_all(|_| true);
3540 assert!(
3541 audit_rows.is_empty(),
3542 "regulated preset must not globally audit data-plane queries"
3543 );
3544
3545 clear_preset_env();
3546 }
3547
3548 #[test]
3549 fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
3550 use crate::auth::policies::{EvalContext, ResourceRef};
3551 use crate::auth::UserId;
3552 use crate::storage::schema::Value;
3553
3554 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3555 clear_preset_env();
3556
3557 let manifest_dir = std::env::current_dir()
3558 .expect("current dir")
3559 .join(".red/tmp/bootstrap-manifest-tests");
3560 std::fs::create_dir_all(&manifest_dir).expect("create manifest test dir");
3561 let manifest_path = manifest_dir.join(format!(
3562 "reddb-bootstrap-manifest-{}-{}.json",
3563 std::process::id(),
3564 std::time::SystemTime::now()
3565 .duration_since(std::time::UNIX_EPOCH)
3566 .unwrap_or_default()
3567 .as_millis()
3568 ));
3569 std::fs::write(
3570 &manifest_path,
3571 r#"{
3572 "users": [
3573 {
3574 "username": "ops",
3575 "password": "hunter2",
3576 "role": "admin",
3577 "system_owned": true
3578 }
3579 ],
3580 "policies": [
3581 {
3582 "id": "bootstrap-registry-admin",
3583 "version": 1,
3584 "statements": [
3585 {
3586 "effect": "allow",
3587 "actions": ["red.registry:*", "policy:*", "config:write", "select"],
3588 "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
3589 }
3590 ]
3591 }
3592 ],
3593 "managed_policies": [
3594 {
3595 "id": "managed-deny-drop",
3596 "version": 1,
3597 "statements": [
3598 {
3599 "effect": "deny",
3600 "actions": ["policy:drop"],
3601 "resources": ["policy:managed-deny-drop"]
3602 }
3603 ],
3604 "required_resource": "policy:managed-deny-drop",
3605 "evidence": "full"
3606 }
3607 ],
3608 "attachments": [
3609 {"user": "ops", "policy": "bootstrap-registry-admin"}
3610 ],
3611 "managed_config_namespaces": [
3612 {
3613 "id": "red.ai",
3614 "required_action": "config:write",
3615 "required_resource": "config:red.ai.*",
3616 "evidence": "metadata"
3617 }
3618 ],
3619 "config": [
3620 {"key": "red.ai.default.provider", "value": "openai"},
3621 {
3622 "key": "red.ai.openai.default.secret_ref",
3623 "secret_ref": {"collection": "red.vault", "key": "openai"}
3624 }
3625 ],
3626 "actor": "ops"
3627 }"#,
3628 )
3629 .expect("write manifest");
3630 unsafe {
3632 std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
3633 }
3634
3635 let (runtime, auth_store) = fresh_runtime_and_store();
3636 apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
3637
3638 let users = auth_store.list_users();
3639 assert_eq!(users.len(), 1);
3640 assert_eq!(users[0].username, "ops");
3641 assert!(users[0].system_owned);
3642
3643 let actor = UserId::platform("ops");
3644 let ctx = EvalContext {
3645 principal_tenant: None,
3646 current_tenant: None,
3647 peer_ip: None,
3648 mfa_present: false,
3649 now_ms: 1_700_000_000_000,
3650 principal_is_admin_role: true,
3651 principal_is_system_owned: true,
3652 principal_is_platform_scoped: true,
3653 };
3654 assert!(auth_store.check_policy_authz(
3656 &actor,
3657 "select",
3658 &ResourceRef::new("collection", "docs"),
3659 &ctx
3660 ));
3661
3662 let managed_policy = runtime
3663 .config_registry()
3664 .get_active("managed-deny-drop")
3665 .expect("managed policy registry entry");
3666 assert!(managed_policy.managed);
3667 assert_eq!(managed_policy.resource_type, "policy");
3668 let managed_config = runtime
3669 .config_registry()
3670 .get_active("red.ai")
3671 .expect("managed config namespace registry entry");
3672 assert!(managed_config.managed);
3673 assert_eq!(managed_config.resource_type, "config_namespace");
3674
3675 let store = runtime.db().store();
3676 match store
3677 .get_config("red.ai.default.provider")
3678 .expect("plain config persisted")
3679 {
3680 Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
3681 other => panic!("expected provider text, got {other:?}"),
3682 }
3683 let Value::Json(bytes) = store
3684 .get_config("red.ai.openai.default.secret_ref")
3685 .expect("secret ref config persisted")
3686 else {
3687 panic!("secret ref must be stored as structured JSON");
3688 };
3689 let reference: crate::serde_json::Value =
3690 crate::serde_json::from_slice(&bytes).expect("secret ref json");
3691 assert_eq!(
3692 reference.get("type").and_then(|v| v.as_str()),
3693 Some("secret_ref")
3694 );
3695 assert!(
3696 !String::from_utf8_lossy(&bytes).contains("hunter2"),
3697 "manifest password must not leak into secret ref config"
3698 );
3699
3700 let completed = store
3701 .get_config(BOOTSTRAP_COMPLETED_KEY)
3702 .expect("bootstrap completion persisted");
3703 assert!(matches!(completed, Value::Boolean(true)));
3704 assert!(
3705 store
3706 .get_config("system.bootstrap.manifest.registry_entries")
3707 .is_some(),
3708 "managed registry entries must be persisted internally"
3709 );
3710
3711 std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
3712 let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
3713 crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
3714 .expect("registry rehydrates without manifest file");
3715 assert!(restored_registry.get_active("managed-deny-drop").is_some());
3716 assert!(restored_registry.get_active("red.ai").is_some());
3717
3718 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3719 apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
3720 assert!(fresh.needs_bootstrap());
3721
3722 clear_preset_env();
3723 }
3724
3725 #[test]
3726 fn production_preset_refuses_to_start_without_password() {
3727 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3728 clear_preset_env();
3729 unsafe {
3731 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3732 std::env::set_var("REDDB_USERNAME", "ops");
3733 }
3734
3735 let (runtime, auth_store) = fresh_runtime_and_store();
3736 let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
3737 assert!(
3738 err.contains("REDDB_PASSWORD"),
3739 "error must name the missing env: {err}"
3740 );
3741
3742 assert!(auth_store.needs_bootstrap());
3744 assert!(runtime
3745 .db()
3746 .store()
3747 .get_config(BOOTSTRAP_COMPLETED_KEY)
3748 .is_none());
3749
3750 clear_preset_env();
3751 }
3752
3753 #[test]
3754 fn re_running_production_after_first_boot_is_a_silent_skip() {
3755 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3756 clear_preset_env();
3757 unsafe {
3759 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3760 std::env::set_var("REDDB_USERNAME", "ops");
3761 std::env::set_var("REDDB_PASSWORD", "hunter2");
3762 }
3763
3764 let (runtime, auth_store) = fresh_runtime_and_store();
3765 apply_preset(&runtime, &auth_store).expect("first apply");
3766 assert_eq!(auth_store.list_users().len(), 1);
3767
3768 let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3775 apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
3776 assert!(
3777 fresh.needs_bootstrap(),
3778 "re-run must not create a second admin"
3779 );
3780 assert!(
3781 fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
3782 "re-run must not re-install the allow-all policy on the fresh store"
3783 );
3784
3785 clear_preset_env();
3786 }
3787
3788 #[test]
3789 fn unrecognised_preset_value_is_rejected() {
3790 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3791 clear_preset_env();
3792 unsafe {
3794 std::env::set_var(PRESET_ENV, "weird");
3795 }
3796
3797 let (runtime, auth_store) = fresh_runtime_and_store();
3798 let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
3799 assert!(err.contains("weird"), "error must echo the value: {err}");
3800 assert!(auth_store.needs_bootstrap());
3801
3802 clear_preset_env();
3803 }
3804
3805 #[test]
3806 fn no_auth_short_circuits_preset_entirely() {
3807 let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3808 clear_preset_env();
3809 unsafe {
3812 std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3813 std::env::set_var("REDDB_USERNAME", "ops");
3814 std::env::set_var("REDDB_PASSWORD", "hunter2");
3815 }
3816
3817 let options = no_auth_test_config(true)
3818 .to_db_options()
3819 .expect("to_db_options");
3820 assert!(no_auth_active(&options));
3821
3822 let (runtime, auth_store) = fresh_runtime_and_store();
3825 if !no_auth_active(&options) {
3826 apply_preset(&runtime, &auth_store).expect("would apply preset");
3827 }
3828
3829 assert!(
3830 auth_store.needs_bootstrap(),
3831 "--no-auth must prevent any admin creation"
3832 );
3833 assert!(
3834 runtime
3835 .db()
3836 .store()
3837 .get_config(BOOTSTRAP_COMPLETED_KEY)
3838 .is_none(),
3839 "--no-auth must skip bootstrap-state persistence"
3840 );
3841
3842 clear_preset_env();
3843 }
3844
3845 #[test]
3846 fn implicit_bind_collision_degrades() {
3847 let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3848 let addr = held.local_addr().expect("held addr").to_string();
3849 let mut readiness = TransportReadiness::default();
3850
3851 let listener =
3852 bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
3853
3854 assert!(listener.is_none());
3855 assert_eq!(readiness.active.len(), 0);
3856 assert_eq!(readiness.failed.len(), 1);
3857 assert!(!readiness.failed[0].explicit);
3858 assert_eq!(readiness.failed[0].bind_addr, addr);
3859 }
3860}