Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::storage::StorageProfileSelection;
13use crate::{
14    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
15};
16
17pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
18
19/// Detect available CPU cores and suggest worker thread count.
20pub fn detect_runtime_config() -> RuntimeConfig {
21    let cpus = thread::available_parallelism()
22        .map(|n| n.get())
23        .unwrap_or(1);
24
25    // Reserve 1 core for OS, use the rest for workers (minimum 1)
26    let suggested_workers = cpus.saturating_sub(1).max(1);
27
28    RuntimeConfig {
29        available_cpus: cpus,
30        suggested_workers,
31        stack_size: 8 * 1024 * 1024, // 16MB default
32    }
33}
34
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37    pub available_cpus: usize,
38    pub suggested_workers: usize,
39    pub stack_size: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerTransport {
44    Grpc,
45    Http,
46    Wire,
47}
48
49impl ServerTransport {
50    pub const fn as_str(self) -> &'static str {
51        match self {
52            Self::Grpc => "gRPC",
53            Self::Http => "HTTP",
54            Self::Wire => "wire",
55        }
56    }
57
58    pub const fn default_bind_addr(self) -> &'static str {
59        match self {
60            Self::Grpc => "127.0.0.1:55055",
61            Self::Http => "127.0.0.1:5000",
62            Self::Wire => "127.0.0.1:5050",
63        }
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct ServerCommandConfig {
69    pub path: Option<PathBuf>,
70    pub router_bind_addr: Option<String>,
71    pub router_bind_explicit: bool,
72    pub grpc_bind_addr: Option<String>,
73    pub grpc_bind_explicit: bool,
74    /// TLS-encrypted gRPC bind address. Can run side-by-side with
75    /// `grpc_bind_addr` (e.g. `:55055` plain + `:55555` TLS) or
76    /// stand alone for TLS-only deploys. Defaults to `None`.
77    pub grpc_tls_bind_addr: Option<String>,
78    /// Path to PEM-encoded gRPC server certificate. Resolved through
79    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
80    /// mounts). When `None` and dev-mode is enabled
81    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
82    /// cert and prints its SHA-256 fingerprint to stderr.
83    pub grpc_tls_cert: Option<PathBuf>,
84    /// Path to PEM-encoded gRPC server private key. Same env-var
85    /// pattern as `grpc_tls_cert`.
86    pub grpc_tls_key: Option<PathBuf>,
87    /// Optional path to a PEM bundle of trust anchors used to verify
88    /// client certificates. When set, the gRPC listener requires
89    /// every client to present a cert that chains to this CA — i.e.
90    /// mutual TLS. When unset, one-way TLS only.
91    pub grpc_tls_client_ca: Option<PathBuf>,
92    pub http_bind_addr: Option<String>,
93    pub http_bind_explicit: bool,
94    /// HTTPS bind address. When set, the HTTP server also serves a
95    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
96    /// run side by side (e.g. 5000 plain + 55555 TLS).
97    pub http_tls_bind_addr: Option<String>,
98    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
99    /// companion when not set explicitly.
100    pub http_tls_cert: Option<PathBuf>,
101    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
102    /// companion when not set explicitly.
103    pub http_tls_key: Option<PathBuf>,
104    /// Optional PEM CA bundle. When set, the HTTPS listener requires
105    /// every client to present a cert that chains to a CA in this
106    /// bundle (mTLS). When unset, plain server-side TLS only.
107    pub http_tls_client_ca: Option<PathBuf>,
108    pub wire_bind_addr: Option<String>,
109    pub wire_bind_explicit: bool,
110    /// TLS-encrypted wire protocol bind address
111    pub wire_tls_bind_addr: Option<String>,
112    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
113    pub wire_tls_cert: Option<PathBuf>,
114    /// Path to TLS key PEM
115    pub wire_tls_key: Option<PathBuf>,
116    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
117    /// When set the server accepts psql / JDBC / DBeaver clients on
118    /// this port via the v3 protocol. Defaults to None (disabled).
119    pub pg_bind_addr: Option<String>,
120    pub create_if_missing: bool,
121    pub read_only: bool,
122    pub role: String,
123    pub primary_addr: Option<String>,
124    pub storage_profile: StorageProfileSelection,
125    /// Explicit auth enable switch from CLI/env. `--no-auth` remains
126    /// the final override and forces this off at option resolution time.
127    pub auth: bool,
128    /// Require authenticated requests. Implies `auth = true` when set.
129    pub require_auth: bool,
130    pub vault: bool,
131    /// Issue #663 — explicit `--no-auth` / `--dev` flag for local
132    /// no-password mode. When `true`, the boot pipeline force-disables
133    /// auth (`auth.enabled = false`, `auth.require_auth = false`,
134    /// `auth.vault_enabled = false`) and skips
135    /// `AuthStore::bootstrap_from_env`, so an explicit
136    /// `REDDB_USERNAME` + `REDDB_PASSWORD` pair cannot accidentally
137    /// turn anonymous access into a logged-in admin. An unmissable
138    /// warning is emitted at startup. Default `false` — the existing
139    /// implicit no-auth behaviour (just don't set the envs) is
140    /// unchanged.
141    pub no_auth: bool,
142    /// Override worker thread count (None = auto-detect from CPUs)
143    pub workers: Option<usize>,
144    /// Telemetry config (Phase 6 logging). `None` falls back to the
145    /// built-in default derived from `path` + stderr-only.
146    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
147    /// HTTP handler-pool knobs from the CLI layer (issue #574 slice 5).
148    /// Carries flag and env values; red_config and built-in defaults
149    /// are applied later by [`crate::server::http_limits::resolve_http_limits`]
150    /// once the runtime is open.
151    pub http_limits_cli: crate::server::HttpLimitsCliInput,
152    /// `red server --ui` (issue #1047, ADR 0051). When `true`, the HTTP
153    /// listener also serves the pinned red-ui bundle as static assets.
154    /// The bundle directory is resolved at boot from the local cache
155    /// (downloading on first use, ADR 0050) unless `ui_dir` overrides it.
156    pub ui: bool,
157    /// Explicit bundle directory for `--ui` (`--ui-dir <DIR>`). When set,
158    /// it is served as-is with no download — the seam tests and an
159    /// air-gapped deploy use this. `None` resolves the pinned bundle.
160    pub ui_dir: Option<PathBuf>,
161    /// First-boot bootstrap settings resolved from CLI flags and, when
162    /// unset here, from environment variables by the bootstrap layer.
163    pub bootstrap: BootstrapConfig,
164}
165
166#[derive(Debug, Clone, Default)]
167pub struct BootstrapConfig {
168    pub preset: Option<String>,
169    pub manifest: Option<PathBuf>,
170    pub admin_username: Option<String>,
171    pub admin_password: Option<String>,
172    pub cloud_head_admin: Option<String>,
173    pub cloud_head_admin_password: Option<String>,
174    pub customer_admin: Option<String>,
175    pub customer_admin_password: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub struct TransportListenerState {
180    pub transport: String,
181    pub bind_addr: String,
182    pub explicit: bool,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct TransportListenerFailure {
187    pub transport: String,
188    pub bind_addr: String,
189    pub explicit: bool,
190    pub reason: String,
191}
192
193#[derive(Debug, Clone, Default, PartialEq, Eq)]
194pub struct TransportReadiness {
195    pub active: Vec<TransportListenerState>,
196    pub failed: Vec<TransportListenerFailure>,
197}
198
199impl TransportReadiness {
200    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
201        self.active.push(TransportListenerState {
202            transport: transport.to_string(),
203            bind_addr: bind_addr.to_string(),
204            explicit,
205        });
206    }
207
208    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
209        self.failed.push(TransportListenerFailure {
210            transport: transport.to_string(),
211            bind_addr: bind_addr.to_string(),
212            explicit,
213            reason,
214        });
215    }
216}
217
218#[derive(Debug, Clone)]
219pub struct SystemdServiceConfig {
220    pub service_name: String,
221    pub binary_path: PathBuf,
222    pub run_user: String,
223    pub run_group: String,
224    pub data_path: PathBuf,
225    pub router_bind_addr: Option<String>,
226    pub grpc_bind_addr: Option<String>,
227    pub http_bind_addr: Option<String>,
228}
229
230impl SystemdServiceConfig {
231    pub fn data_dir(&self) -> PathBuf {
232        self.data_path
233            .parent()
234            .map(PathBuf::from)
235            .unwrap_or_else(|| PathBuf::from("."))
236    }
237
238    pub fn unit_path(&self) -> PathBuf {
239        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
240    }
241}
242
243/// Build a sane default `TelemetryConfig` from a server path when the
244/// caller didn't set one explicitly. Writes rotating logs into the
245/// parent directory of the DB file (or `./logs` for in-memory /
246/// pathless runs). Level defaults to `info`, pretty stderr format.
247pub fn default_telemetry_for_path(
248    path: Option<&std::path::Path>,
249) -> crate::telemetry::TelemetryConfig {
250    let log_dir = match path {
251        Some(p) => p
252            .parent()
253            .map(|parent| parent.join("logs"))
254            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
255        None => None, // in-memory — no file, stderr-only
256    };
257    crate::telemetry::TelemetryConfig {
258        log_dir,
259        file_prefix: "reddb.log".to_string(),
260        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
261        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
262            crate::telemetry::LogFormat::Pretty
263        } else {
264            crate::telemetry::LogFormat::Json
265        },
266        rotation_keep_days: 14,
267        service_name: "reddb",
268        // Implicit defaults — no CLI flag has claimed these values yet.
269        level_explicit: false,
270        format_explicit: false,
271        rotation_keep_days_explicit: false,
272        file_prefix_explicit: false,
273        log_dir_explicit: false,
274        log_file_disabled: false,
275    }
276}
277
278/// Metadata key used to thread the parsed backup config from
279/// `to_db_options` down to runner threads. The runner reads it back
280/// (via `runner_backup_intervals`) to spawn the periodic checkpointer
281/// and WAL-flush tasks. Threading through `metadata` avoids extending
282/// `RedDBOptions` with a public field that has no meaning for
283/// library consumers.
284const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
285const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
286const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
287/// Issue #519 — threaded through `metadata` like the existing interval
288/// values. `0` (default) means "feature disabled" and the runner skips
289/// the lag-monitor wiring entirely.
290const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
291
292/// Issue #663 — metadata key set by `to_db_options` when the operator
293/// passes `--no-auth` / `--dev`. Read in `build_runtime_with_telemetry`
294/// to (a) skip `AuthStore::bootstrap_from_env` (so a stray
295/// `REDDB_USERNAME`/`REDDB_PASSWORD` cannot auto-create an admin) and
296/// (b) emit the loud "auth disabled" warning. Threaded via `metadata`
297/// — rather than a public `RedDBOptions` field — to keep the flag a
298/// CLI/boot concern with no meaning for library consumers.
299pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
300
301/// Returns `true` when `--no-auth` / `--dev` was active for this boot,
302/// i.e. when `to_db_options` stamped [`NO_AUTH_META`] on `options.metadata`.
303pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
304    options
305        .metadata
306        .get(NO_AUTH_META)
307        .map(|v| v == "true")
308        .unwrap_or(false)
309}
310
311/// Loud, unmissable warning printed when the operator opts into
312/// anonymous access via `--no-auth` / `--dev`. Goes to stderr (always
313/// visible at startup) **and** the tracing layer (captured by file
314/// logs once telemetry is wired).
315const NO_AUTH_WARNING: &str =
316    "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
317
318impl ServerCommandConfig {
319    fn to_db_options(&self) -> Result<RedDBOptions, String> {
320        let mut options = match &self.path {
321            Some(path) => RedDBOptions::persistent(path),
322            None => RedDBOptions::in_memory(),
323        };
324
325        options.mode = StorageMode::Persistent;
326        options.create_if_missing = self.create_if_missing;
327        // PLAN.md Phase 4.3 — read_only resolution priority:
328        //   1. CLI flag (`--readonly`) — explicit operator intent.
329        //   2. `RED_READONLY=true` env — orchestrator override.
330        //   3. Persisted `<data>/.runtime-state.json` from a prior
331        //      `POST /admin/readonly` — survives restart.
332        //   4. Default `false`.
333        options.read_only = self.read_only
334            || env_nonempty("RED_READONLY")
335                .or_else(|| env_nonempty("REDDB_READONLY"))
336                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
337                .unwrap_or(false)
338            || self.path.as_ref().is_some_and(|data_path| {
339                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
340                    data_path,
341                ))
342                .unwrap_or(false)
343            });
344
345        options.replication = match self.role.as_str() {
346            "primary" => ReplicationConfig::primary(),
347            "replica" => {
348                let primary_addr = self
349                    .primary_addr
350                    .clone()
351                    .unwrap_or_else(|| "http://127.0.0.1:55055".to_string());
352                // Public-mutation rejection on replicas is enforced by
353                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
354                // Leaving `options.read_only = false` keeps the pager
355                // writable so the internal logical-WAL apply path can
356                // ingest records from the primary; WriteGate ensures no
357                // client request reaches storage.
358                ReplicationConfig::replica(primary_addr)
359            }
360            _ => ReplicationConfig::standalone(),
361        };
362        options.storage_profile = self.storage_profile.validate()?;
363
364        let no_auth = self.no_auth || env_truthy("REDDB_NO_AUTH") || env_truthy("REDDB_DEV");
365        let preset = self.bootstrap.resolved_preset();
366        let preset_requires_auth = matches!(preset.as_str(), PRESET_PRODUCTION | PRESET_CLOUD);
367
368        if self.auth || env_truthy("REDDB_AUTH") || preset_requires_auth {
369            options.auth.enabled = true;
370        }
371        if self.require_auth || env_truthy("REDDB_REQUIRE_AUTH") || preset_requires_auth {
372            options.auth.enabled = true;
373            options.auth.require_auth = true;
374        }
375        if self.vault || env_truthy("REDDB_VAULT") || preset_requires_auth {
376            options.auth.vault_enabled = true;
377        }
378
379        // Issue #663 — `--no-auth` / `--dev` is the last word on auth
380        // for this boot: force every auth knob off, regardless of any
381        // env-derived config (`--vault`, `REDDB_USERNAME`/`PASSWORD`,
382        // OAuth, cert) the operator may also have
383        // set. We *also* stamp [`NO_AUTH_META`] so the auth-store
384        // builder downstream knows to skip `bootstrap_from_env`
385        // (which would otherwise auto-create an admin from
386        // `REDDB_USERNAME`/`REDDB_PASSWORD` even with auth disabled,
387        // a footgun for the local-dev workflow this flag exists to
388        // support).
389        if no_auth {
390            options.auth.enabled = false;
391            options.auth.require_auth = false;
392            options.auth.vault_enabled = false;
393            options
394                .metadata
395                .insert(NO_AUTH_META.to_string(), "true".to_string());
396        }
397
398        // Issue #652 — Control Event Ledger Compliance Mode.
399        // `REDDB_COMPLIANCE_MODE=true|1|yes|on` makes the producer
400        // slices (652b/c/d) fail-closed on ledger persistence
401        // failures. Default `false` — log-and-continue on emit error.
402        if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
403            options.control_events.compliance_mode = matches!(
404                raw.to_ascii_lowercase().as_str(),
405                "1" | "true" | "yes" | "on"
406            );
407        }
408        if preset == PRESET_REGULATED {
409            options.control_events.compliance_mode = true;
410            options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
411        }
412
413        // Issue #517 — canonical `REDDB_BACKUP_*` contract takes
414        // precedence. On Err, surface the partial-config message so
415        // boot exits non-zero with a clear operator message. On
416        // Ok(None), fall through to the legacy backend-from-env path.
417        match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
418            Err(msg) => {
419                return Err(format!("backup bootstrap: {msg}"));
420            }
421            Ok(Some(cfg)) => {
422                apply_backup_config(&mut options, &cfg);
423            }
424            Ok(None) => {
425                configure_remote_backend_from_env(&mut options);
426            }
427        }
428
429        if options.remote_backend.is_some()
430            || options
431                .metadata
432                .contains_key(BACKUP_INTERVAL_META_CHECKPOINT)
433        {
434            let mut selection = options.storage_profile;
435            selection.managed_backup = true;
436            options.storage_profile = selection.validate()?;
437        }
438
439        Ok(options)
440    }
441
442    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
443        let mut transports = Vec::with_capacity(3);
444        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
445            transports.push(ServerTransport::Grpc);
446        }
447        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
448            transports.push(ServerTransport::Http);
449        }
450        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
451            transports.push(ServerTransport::Wire);
452        }
453        transports
454    }
455}
456
457/// Read an env var, treating empty / whitespace-only as `None`.
458/// Honors the `<NAME>_FILE` convention. Re-exports the shared
459/// `crate::utils::env_with_file_fallback` helper so call sites in
460/// this module can keep their short local name.
461fn env_nonempty(name: &str) -> Option<String> {
462    crate::utils::env_with_file_fallback(name)
463}
464
465fn env_truthy(name: &str) -> bool {
466    env_nonempty(name)
467        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
468        .unwrap_or(false)
469}
470
471impl BootstrapConfig {
472    fn resolved_preset(&self) -> String {
473        self.preset
474            .as_deref()
475            .map(str::trim)
476            .filter(|s| !s.is_empty())
477            .map(str::to_string)
478            .or_else(|| env_nonempty(BOOTSTRAP_PRESET_ENV).or_else(|| env_nonempty(PRESET_ENV)))
479            .unwrap_or_else(|| PRESET_SIMPLE.to_string())
480    }
481
482    fn selected_manifest(&self) -> Option<PathBuf> {
483        self.manifest.clone().or_else(|| {
484            env_nonempty(crate::cli::bootstrap_manifest::MANIFEST_ENV).map(PathBuf::from)
485        })
486    }
487
488    /// Classify the auth bootstrap this config would perform, for the
489    /// cluster bootstrap authority seam (ADR 0058). A manifest wins over
490    /// preset/credential env because it is the broadest authority input.
491    fn auth_bootstrap_input(&self) -> crate::cluster::AuthBootstrapInput {
492        use crate::cluster::AuthBootstrapInput;
493        if self.selected_manifest().is_some() {
494            return AuthBootstrapInput::Manifest;
495        }
496        let preset = self.resolved_preset();
497        let preset_creates_auth = matches!(
498            preset.as_str(),
499            PRESET_PRODUCTION | PRESET_CLOUD | PRESET_REGULATED
500        );
501        // Even the `simple` preset auto-creates an admin when
502        // `REDDB_USERNAME` + `REDDB_PASSWORD` are present (see
503        // `AuthStore::bootstrap_from_env`), so treat credential env as
504        // auth bootstrap input too.
505        let credentials_present =
506            self.production_username().is_some() && self.production_password().is_some();
507        if preset_creates_auth || credentials_present {
508            AuthBootstrapInput::Env
509        } else {
510            AuthBootstrapInput::None
511        }
512    }
513
514    fn explicit_value(value: &Option<String>) -> Option<String> {
515        value
516            .as_deref()
517            .map(str::trim)
518            .filter(|s| !s.is_empty())
519            .map(str::to_string)
520    }
521
522    fn value_or_env(value: &Option<String>, env: &str) -> Option<String> {
523        Self::explicit_value(value).or_else(|| env_nonempty(env))
524    }
525
526    fn production_username(&self) -> Option<String> {
527        Self::value_or_env(&self.admin_username, "REDDB_USERNAME")
528    }
529
530    fn production_password(&self) -> Option<String> {
531        Self::value_or_env(&self.admin_password, "REDDB_PASSWORD")
532    }
533
534    fn cloud_head_username(&self) -> Option<String> {
535        Self::explicit_value(&self.cloud_head_admin)
536            .or_else(|| Self::explicit_value(&self.admin_username))
537            .or_else(|| env_nonempty("REDDB_CLOUD_HEAD_ADMIN"))
538            .or_else(|| env_nonempty("REDDB_USERNAME"))
539    }
540
541    fn cloud_head_password(&self) -> Option<String> {
542        Self::explicit_value(&self.cloud_head_admin_password)
543            .or_else(|| Self::explicit_value(&self.admin_password))
544            .or_else(|| env_nonempty("REDDB_CLOUD_HEAD_ADMIN_PASSWORD"))
545            .or_else(|| env_nonempty("REDDB_PASSWORD"))
546    }
547
548    fn customer_username(&self) -> Option<String> {
549        Self::value_or_env(&self.customer_admin, "REDDB_CUSTOMER_ADMIN")
550    }
551
552    fn customer_password(&self) -> Option<String> {
553        Self::value_or_env(
554            &self.customer_admin_password,
555            "REDDB_CUSTOMER_ADMIN_PASSWORD",
556        )
557    }
558
559    fn secret_env_vars_to_expand(&self, preset: &str) -> Vec<&'static str> {
560        let mut vars = Vec::new();
561        match preset {
562            PRESET_PRODUCTION => {
563                if Self::explicit_value(&self.admin_username).is_none() {
564                    vars.push("REDDB_USERNAME");
565                }
566                if Self::explicit_value(&self.admin_password).is_none() {
567                    vars.push("REDDB_PASSWORD");
568                }
569            }
570            PRESET_CLOUD => {
571                if Self::explicit_value(&self.cloud_head_admin).is_none()
572                    && Self::explicit_value(&self.admin_username).is_none()
573                {
574                    vars.push("REDDB_USERNAME");
575                }
576                if Self::explicit_value(&self.cloud_head_admin_password).is_none()
577                    && Self::explicit_value(&self.admin_password).is_none()
578                {
579                    vars.push("REDDB_CLOUD_HEAD_ADMIN_PASSWORD");
580                    vars.push("REDDB_PASSWORD");
581                }
582                if Self::explicit_value(&self.customer_admin_password).is_none() {
583                    vars.push("REDDB_CUSTOMER_ADMIN_PASSWORD");
584                }
585            }
586            _ => {}
587        }
588        vars
589    }
590}
591
592/// Apply a parsed [`BackupConfig`] to `options`. Wires the S3
593/// backend via `with_remote_backend` + `with_atomic_remote_backend`
594/// when the `backend-s3` feature is on, stashes intervals + backend
595/// kind in `metadata` so the runner can spawn the periodic tasks,
596/// and emits the startup INFO log required by #517.
597fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
598    let endpoint_host = endpoint_host(&cfg.endpoint);
599
600    options.metadata.insert(
601        BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
602        cfg.checkpoint_interval_secs.to_string(),
603    );
604    options.metadata.insert(
605        BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
606        cfg.wal_flush_interval_secs.to_string(),
607    );
608    options
609        .metadata
610        .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
611    options.metadata.insert(
612        BACKUP_PAUSE_ON_LAG_META.to_string(),
613        cfg.pause_on_lag_secs.to_string(),
614    );
615
616    #[cfg(feature = "backend-s3")]
617    {
618        let s3_cfg = crate::storage::backend::S3Config {
619            endpoint: cfg.endpoint.clone(),
620            bucket: cfg.bucket.clone(),
621            key_prefix: cfg.prefix.clone(),
622            access_key: cfg.access_key_id.clone(),
623            secret_key: cfg.secret_access_key.clone(),
624            region: cfg.region.clone(),
625            path_style: true,
626        };
627        let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
628        options.remote_backend = Some(backend.clone());
629        options.remote_backend_atomic = Some(backend);
630        // Use the operator-supplied prefix as the snapshot key root.
631        // The existing helpers (`default_snapshot_prefix`,
632        // `default_wal_archive_prefix`) derive sub-prefixes from the
633        // parent of `remote_key`.
634        let trimmed = cfg.prefix.trim_end_matches('/');
635        options.remote_key = Some(reddb_file::remote_database_key(trimmed));
636
637        tracing::info!(
638            backend = "s3",
639            endpoint = %endpoint_host,
640            bucket = %cfg.bucket,
641            prefix = %cfg.prefix,
642            checkpoint_interval_secs = cfg.checkpoint_interval_secs,
643            wal_flush_interval_secs = cfg.wal_flush_interval_secs,
644            "backup backend configured from REDDB_BACKUP_* env"
645        );
646    }
647
648    #[cfg(not(feature = "backend-s3"))]
649    {
650        tracing::warn!(
651            backend = "s3",
652            endpoint = %endpoint_host,
653            bucket = %cfg.bucket,
654            prefix = %cfg.prefix,
655            "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
656             backend wiring skipped (archiver/checkpointer also disabled)"
657        );
658    }
659}
660
661fn endpoint_host(endpoint: &str) -> &str {
662    let after_scheme = endpoint
663        .split_once("://")
664        .map(|(_, r)| r)
665        .unwrap_or(endpoint);
666    after_scheme.split('/').next().unwrap_or(after_scheme)
667}
668
669/// If `options` carry backup-task intervals threaded in via
670/// [`apply_backup_config`], spawn periodic checkpointer + WAL-flush
671/// tasks against `runtime`. Returns a `BackupTasksHandle` that
672/// stops the tasks when dropped; runners keep it alive for the
673/// server lifetime.
674fn spawn_backup_tasks_if_configured(
675    options: &RedDBOptions,
676    runtime: &RedDBRuntime,
677) -> Option<BackupTasksHandle> {
678    let checkpoint_secs: u64 = options
679        .metadata
680        .get(BACKUP_INTERVAL_META_CHECKPOINT)?
681        .parse()
682        .ok()?;
683    let wal_secs: u64 = options
684        .metadata
685        .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
686        .parse()
687        .ok()?;
688    // Issue #519 — opt-in graceful read-only when remote archive lag
689    // exceeds the threshold. `0` (default) keeps legacy behaviour.
690    let pause_on_lag_secs: u64 = options
691        .metadata
692        .get(BACKUP_PAUSE_ON_LAG_META)
693        .and_then(|raw| raw.parse().ok())
694        .unwrap_or(0);
695    options.remote_backend.as_ref()?;
696
697    let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
698
699    // Stamp the gate with the threshold + a "now" baseline so the
700    // first auto-pause can only fire after `pause_on_lag_secs` of
701    // archive silence. The poller below re-evaluates on the same
702    // clock the archive-task wrapper uses.
703    if pause_on_lag_secs > 0 {
704        let now_ms = std::time::SystemTime::now()
705            .duration_since(std::time::UNIX_EPOCH)
706            .map(|d| d.as_millis() as u64)
707            .unwrap_or(0);
708        runtime
709            .write_gate()
710            .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
711        tracing::info!(
712            pause_on_lag_secs,
713            "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
714        );
715    }
716
717    let checkpoint_handle = {
718        let stop = Arc::clone(&stop);
719        let runtime = runtime.clone();
720        let interval = Duration::from_secs(checkpoint_secs);
721        thread::Builder::new()
722            .name("red-checkpointer".into())
723            .spawn(move || {
724                periodic_loop(stop, interval, move || {
725                    if let Err(err) = runtime.checkpoint() {
726                        tracing::warn!(error = %err, "periodic checkpoint failed");
727                    }
728                })
729            })
730            .ok()
731    };
732
733    let archiver_handle = {
734        let stop = Arc::clone(&stop);
735        let runtime = runtime.clone();
736        let interval = Duration::from_secs(wal_secs);
737        let lag_enabled = pause_on_lag_secs > 0;
738        thread::Builder::new()
739            .name("red-wal-archiver".into())
740            .spawn(move || {
741                periodic_loop(stop, interval, move || match runtime.trigger_backup() {
742                    Ok(_) if lag_enabled => {
743                        let now_ms = std::time::SystemTime::now()
744                            .duration_since(std::time::UNIX_EPOCH)
745                            .map(|d| d.as_millis() as u64)
746                            .unwrap_or(0);
747                        runtime.write_gate().record_archive_success(now_ms);
748                        // Same-tick re-evaluation: catching up while
749                        // already auto-paused must auto-resume without
750                        // waiting for the poller's cadence.
751                        runtime.write_gate().evaluate_archive_lag(now_ms);
752                    }
753                    Ok(_) => {}
754                    Err(err) => {
755                        tracing::warn!(error = %err, "periodic WAL archive/backup failed");
756                    }
757                })
758            })
759            .ok()
760    };
761
762    // Issue #519 — lag poller. Wakes on a short cadence so a frozen
763    // archiver (the worst case) still flips the gate within ~5s of
764    // crossing the threshold, instead of waiting up to a full
765    // `wal_secs` for the next archive attempt that may never come.
766    let lag_monitor_handle = if pause_on_lag_secs > 0 {
767        let stop = Arc::clone(&stop);
768        let runtime = runtime.clone();
769        // 5s is short enough that the threshold is honoured tightly
770        // and long enough that the atomic loads stay invisible at the
771        // process level.
772        let interval = Duration::from_secs(5);
773        thread::Builder::new()
774            .name("red-archive-lag-monitor".into())
775            .spawn(move || {
776                periodic_loop(stop, interval, move || {
777                    let now_ms = std::time::SystemTime::now()
778                        .duration_since(std::time::UNIX_EPOCH)
779                        .map(|d| d.as_millis() as u64)
780                        .unwrap_or(0);
781                    let was_paused = runtime.write_gate().is_auto_paused();
782                    let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
783                    if now_paused && !was_paused {
784                        tracing::warn!(
785                            pause_on_lag_secs,
786                            last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
787                            "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
788                        );
789                    } else if !now_paused && was_paused {
790                        tracing::info!(
791                            "WAL archive caught up — exiting graceful read-only mode (issue #519)"
792                        );
793                    }
794                })
795            })
796            .ok()
797    } else {
798        None
799    };
800
801    tracing::info!(
802        checkpoint_interval_secs = checkpoint_secs,
803        wal_flush_interval_secs = wal_secs,
804        "backup tasks spawned (checkpointer + WAL archiver)"
805    );
806
807    Some(BackupTasksHandle {
808        stop,
809        _checkpoint_handle: checkpoint_handle,
810        _archiver_handle: archiver_handle,
811        _lag_monitor_handle: lag_monitor_handle,
812    })
813}
814
815/// Shutdown handle for the periodic checkpointer + archiver tasks.
816/// Drop signals both loops to exit on their next wake.
817pub struct BackupTasksHandle {
818    stop: Arc<std::sync::atomic::AtomicBool>,
819    _checkpoint_handle: Option<thread::JoinHandle<()>>,
820    _archiver_handle: Option<thread::JoinHandle<()>>,
821    /// Issue #519 — periodic archive-lag poller, only spawned when
822    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS > 0`.
823    _lag_monitor_handle: Option<thread::JoinHandle<()>>,
824}
825
826impl Drop for BackupTasksHandle {
827    fn drop(&mut self) {
828        self.stop.store(true, std::sync::atomic::Ordering::Release);
829    }
830}
831
832fn periodic_loop<F: FnMut()>(
833    stop: Arc<std::sync::atomic::AtomicBool>,
834    interval: Duration,
835    mut tick: F,
836) {
837    // Wake on a short cadence so shutdown is responsive even when the
838    // operator-configured interval is large (e.g. 1h checkpoint).
839    let wake = Duration::from_secs(1);
840    let mut elapsed = Duration::ZERO;
841    while !stop.load(std::sync::atomic::Ordering::Acquire) {
842        thread::sleep(wake);
843        elapsed += wake;
844        if elapsed >= interval {
845            tick();
846            elapsed = Duration::ZERO;
847        }
848    }
849}
850
851fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
852    // PLAN.md (cloud-agnostic) — prefer the new spelling
853    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
854    // existing dev installs. `none` (default) means standalone — no
855    // remote backend, valid for development and on-prem without
856    // remote.
857    let backend = env_nonempty("RED_BACKEND")
858        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
859        .unwrap_or_else(|| "none".to_string())
860        .to_ascii_lowercase();
861
862    match backend.as_str() {
863        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
864        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
865        // IDrive, Storj. The `path_style` toggle in S3Config picks
866        // the right addressing for self-hosted vs hosted.
867        "s3" | "minio" | "r2" => {
868            #[cfg(feature = "backend-s3")]
869            {
870                if let Some(config) = s3_config_from_env() {
871                    let remote_key = env_nonempty("RED_REMOTE_KEY")
872                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
873                        .unwrap_or_else(|| reddb_file::remote_database_key("clusters/dev"));
874                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
875                    options.remote_backend = Some(backend.clone());
876                    options.remote_backend_atomic = Some(backend);
877                    options.remote_key = Some(remote_key);
878                }
879            }
880            #[cfg(not(feature = "backend-s3"))]
881            {
882                tracing::warn!(
883                    backend = %backend,
884                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
885                );
886            }
887        }
888        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
889        // calls it `fs`; legacy code shipped it as `local`. Both
890        // names map to LocalBackend, with the remote_key derived
891        // from `RED_FS_PATH` + a per-database suffix when provided.
892        "fs" | "local" => {
893            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
894            let remote_key = match (
895                base_path,
896                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
897            ) {
898                (Some(base), Some(rel)) => Some(format!(
899                    "{}/{}",
900                    base.trim_end_matches('/'),
901                    rel.trim_start_matches('/')
902                )),
903                (Some(base), None) => Some(reddb_file::remote_database_key(&format!(
904                    "{}/clusters/dev",
905                    base.trim_end_matches('/')
906                ))),
907                (None, Some(rel)) => Some(rel),
908                (None, None) => None,
909            };
910            if let Some(key) = remote_key {
911                let backend = Arc::new(crate::storage::backend::LocalBackend);
912                options.remote_backend = Some(backend.clone());
913                options.remote_backend_atomic = Some(backend);
914                options.remote_key = Some(key);
915            }
916        }
917        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
918        // portability: any service exposing PUT/GET/DELETE serves
919        // as a backup target. Optional auth via *_FILE secret
920        // path keeps the token out of the env.
921        "http" => {
922            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
923                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
924            {
925                Some(u) => u,
926                None => {
927                    tracing::warn!(
928                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
929                    );
930                    return;
931                }
932            };
933            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
934                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
935                .unwrap_or_default();
936            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
937                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
938            {
939                std::fs::read_to_string(&path)
940                    .ok()
941                    .map(|s| s.trim().to_string())
942                    .filter(|s| !s.is_empty())
943            } else {
944                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
945                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
946            };
947
948            let mut config =
949                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
950            if let Some(auth) = auth_header {
951                config = config.with_auth_header(auth);
952            }
953            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
954                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
955                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
956            config = config.with_conditional_writes(conditional_writes);
957            // Always populate the snapshot-transport handle. When the
958            // operator confirmed CAS support, also populate the atomic
959            // handle via AtomicHttpBackend — without that flag,
960            // LeaseStore must remain unreachable on this backend.
961            if conditional_writes {
962                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
963                    Ok(atomic) => {
964                        let atomic_arc = Arc::new(atomic);
965                        options.remote_backend = Some(atomic_arc.clone());
966                        options.remote_backend_atomic = Some(atomic_arc);
967                    }
968                    Err(err) => {
969                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
970                        options.remote_backend =
971                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
972                    }
973                }
974            } else {
975                options.remote_backend =
976                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
977            }
978            options.remote_key = env_nonempty("RED_REMOTE_KEY")
979                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
980                .or_else(|| Some(reddb_file::remote_database_key("clusters/dev")));
981        }
982        // `none` is the explicit standalone — no remote, no backup
983        // pipeline. Boot never blocks on network reachability.
984        "none" | "" => {}
985        other => {
986            tracing::warn!(
987                backend = %other,
988                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
989            );
990        }
991    }
992}
993
994/// Resolve a value from env, accepting both the cloud-agnostic
995/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
996/// kept for existing dev installs. The new spelling wins; the
997/// legacy spelling is read with a warning hint in callers' logs.
998#[cfg(feature = "backend-s3")]
999fn env_s3(suffix: &str) -> Option<String> {
1000    env_nonempty(&format!("RED_S3_{suffix}"))
1001        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
1002}
1003
1004/// Read a secret value from either the literal env var or a file
1005/// path supplied via `*_FILE` (PLAN.md spec — compatible with
1006/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
1007/// secrets). The `_FILE` variant wins so an operator can set it to
1008/// override the inline value without touching the inline env.
1009#[cfg(feature = "backend-s3")]
1010fn env_s3_secret(suffix: &str) -> Option<String> {
1011    let file_key_red = format!("RED_S3_{suffix}_FILE");
1012    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
1013    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
1014        return std::fs::read_to_string(&path)
1015            .ok()
1016            .map(|s| s.trim().to_string())
1017            .filter(|s| !s.is_empty());
1018    }
1019    env_s3(suffix)
1020}
1021
1022#[cfg(feature = "backend-s3")]
1023fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
1024    let endpoint = env_s3("ENDPOINT")?;
1025    let bucket = env_s3("BUCKET")?;
1026    let access_key = env_s3_secret("ACCESS_KEY")?;
1027    let secret_key = env_s3_secret("SECRET_KEY")?;
1028    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
1029    let key_prefix = env_s3("KEY_PREFIX")
1030        .or_else(|| env_s3("PREFIX"))
1031        .unwrap_or_default();
1032    let path_style = env_s3("PATH_STYLE")
1033        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1034        .unwrap_or(true);
1035    Some(crate::storage::backend::S3Config {
1036        endpoint,
1037        bucket,
1038        key_prefix,
1039        access_key,
1040        secret_key,
1041        region,
1042        path_style,
1043    })
1044}
1045
1046pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
1047    let data_dir = config.data_dir();
1048    let exec_start = render_systemd_exec_start(config);
1049    format!(
1050        "[Unit]\n\
1051Description=RedDB unified database service\n\
1052After=network-online.target\n\
1053Wants=network-online.target\n\
1054\n\
1055[Service]\n\
1056Type=simple\n\
1057User={user}\n\
1058Group={group}\n\
1059WorkingDirectory={workdir}\n\
1060ExecStart={exec_start}\n\
1061Restart=always\n\
1062RestartSec=2\n\
1063LimitSTACK=16M\n\
1064NoNewPrivileges=true\n\
1065PrivateTmp=true\n\
1066ProtectSystem=strict\n\
1067ProtectHome=true\n\
1068ProtectControlGroups=true\n\
1069ProtectKernelTunables=true\n\
1070ProtectKernelModules=true\n\
1071RestrictNamespaces=true\n\
1072LockPersonality=true\n\
1073MemoryDenyWriteExecute=true\n\
1074ReadWritePaths={workdir}\n\
1075\n\
1076[Install]\n\
1077WantedBy=multi-user.target\n",
1078        user = config.run_user,
1079        group = config.run_group,
1080        workdir = data_dir.display(),
1081        exec_start = exec_start,
1082    )
1083}
1084
1085/// Install a systemd unit + start the service.
1086///
1087/// Linux-only. The helper shells out to `systemctl`, `useradd`,
1088/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
1089/// Windows or macOS. The Windows/macOS branch returns a hard error so
1090/// callers (the CLI) surface a clear message instead of a confusing
1091/// syscall failure. A proper Windows-service equivalent (sc.exe /
1092/// NSSM) is a Phase 3.6 follow-up.
1093#[cfg(target_os = "linux")]
1094pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
1095    ensure_root()?;
1096    ensure_command_available("systemctl")?;
1097    ensure_command_available("getent")?;
1098    ensure_command_available("groupadd")?;
1099    ensure_command_available("useradd")?;
1100    ensure_command_available("install")?;
1101    ensure_executable(&config.binary_path)?;
1102
1103    if !command_success("getent", ["group", config.run_group.as_str()])? {
1104        run_command("groupadd", ["--system", config.run_group.as_str()])?;
1105    }
1106
1107    if !command_success("id", ["-u", config.run_user.as_str()])? {
1108        let data_dir = config.data_dir();
1109        run_command(
1110            "useradd",
1111            [
1112                "--system",
1113                "--gid",
1114                config.run_group.as_str(),
1115                "--home-dir",
1116                data_dir.to_string_lossy().as_ref(),
1117                "--shell",
1118                "/usr/sbin/nologin",
1119                config.run_user.as_str(),
1120            ],
1121        )?;
1122    }
1123
1124    let data_dir = config.data_dir();
1125    run_command(
1126        "install",
1127        [
1128            "-d",
1129            "-o",
1130            config.run_user.as_str(),
1131            "-g",
1132            config.run_group.as_str(),
1133            "-m",
1134            "0750",
1135            data_dir.to_string_lossy().as_ref(),
1136        ],
1137    )?;
1138
1139    std::fs::write(config.unit_path(), render_systemd_unit(config))
1140        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
1141
1142    run_command("systemctl", ["daemon-reload"])?;
1143    run_command(
1144        "systemctl",
1145        [
1146            "enable",
1147            "--now",
1148            format!("{}.service", config.service_name).as_str(),
1149        ],
1150    )?;
1151
1152    Ok(())
1153}
1154
1155/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
1156/// identical so callers compile on every platform; surface a clear
1157/// error at runtime. Windows/macOS service-manager integration is a
1158/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
1159#[cfg(not(target_os = "linux"))]
1160pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
1161    Err("systemd install is Linux-only — use sc.exe (Windows) or \
1162         launchd (macOS) to install the service manually using the \
1163         unit printed by `red service print-unit`"
1164        .to_string())
1165}
1166
1167#[cfg(target_os = "linux")]
1168fn ensure_root() -> Result<(), String> {
1169    let output = Command::new("id")
1170        .arg("-u")
1171        .output()
1172        .map_err(|err| format!("failed to determine current uid: {err}"))?;
1173    if !output.status.success() {
1174        return Err("failed to determine current uid".to_string());
1175    }
1176    let uid = String::from_utf8_lossy(&output.stdout);
1177    if uid.trim() != "0" {
1178        return Err("run this command as root (sudo)".to_string());
1179    }
1180    Ok(())
1181}
1182
1183#[cfg(target_os = "linux")]
1184fn ensure_command_available(command: &str) -> Result<(), String> {
1185    let status = Command::new("sh")
1186        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1187        .status()
1188        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1189    if status.success() {
1190        Ok(())
1191    } else {
1192        Err(format!("required command not found: {command}"))
1193    }
1194}
1195
1196#[cfg(target_os = "linux")]
1197fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1198    let metadata = std::fs::metadata(path)
1199        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1200    #[cfg(unix)]
1201    {
1202        use std::os::unix::fs::PermissionsExt;
1203        if metadata.permissions().mode() & 0o111 == 0 {
1204            return Err(format!("binary is not executable: {}", path.display()));
1205        }
1206    }
1207    #[cfg(not(unix))]
1208    {
1209        if !metadata.is_file() {
1210            return Err(format!("binary is not a file: {}", path.display()));
1211        }
1212    }
1213    Ok(())
1214}
1215
1216#[cfg(target_os = "linux")]
1217fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1218    Command::new(program)
1219        .args(args)
1220        .status()
1221        .map(|status| status.success())
1222        .map_err(|err| format!("failed to run {program}: {err}"))
1223}
1224
1225#[cfg(target_os = "linux")]
1226fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1227    let output = Command::new(program)
1228        .args(args)
1229        .output()
1230        .map_err(|err| format!("failed to run {program}: {err}"))?;
1231    if output.status.success() {
1232        return Ok(());
1233    }
1234
1235    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1236    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1237    let detail = if !stderr.is_empty() {
1238        stderr
1239    } else if !stdout.is_empty() {
1240        stdout
1241    } else {
1242        format!("exit status {}", output.status)
1243    };
1244    Err(format!("{program} failed: {detail}"))
1245}
1246
1247pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1248    let has_any = config.router_bind_addr.is_some()
1249        || config.grpc_bind_addr.is_some()
1250        || config.http_bind_addr.is_some()
1251        || config.wire_bind_addr.is_some()
1252        || config.pg_bind_addr.is_some();
1253    if !has_any {
1254        return Err("at least one server bind address must be configured".into());
1255    }
1256    let thread_name = if config.router_bind_addr.is_some() {
1257        "red-server-router"
1258    } else {
1259        match (
1260            config.grpc_bind_addr.is_some(),
1261            config.http_bind_addr.is_some(),
1262        ) {
1263            (true, true) => "red-server-dual",
1264            (true, false) => "red-server-grpc",
1265            (false, true) => "red-server-http",
1266            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1267            (false, false) => "red-server-pg-wire",
1268        }
1269    };
1270
1271    let handle = thread::Builder::new()
1272        .name(thread_name.into())
1273        .stack_size(8 * 1024 * 1024)
1274        .spawn(move || run_configured_servers(config))
1275        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1276
1277    match handle.join() {
1278        Ok(result) => result,
1279        Err(_) => Err("server thread panicked".to_string()),
1280    }
1281}
1282
1283fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1284    let mut parts = vec![
1285        config.binary_path.display().to_string(),
1286        "server".to_string(),
1287        "--path".to_string(),
1288        config.data_path.display().to_string(),
1289    ];
1290
1291    if let Some(bind_addr) = &config.router_bind_addr {
1292        parts.push("--bind".to_string());
1293        parts.push(bind_addr.clone());
1294    } else if let Some(bind_addr) = &config.grpc_bind_addr {
1295        parts.push("--grpc-bind".to_string());
1296        parts.push(bind_addr.clone());
1297    }
1298    if let Some(bind_addr) = &config.http_bind_addr {
1299        parts.push("--http-bind".to_string());
1300        parts.push(bind_addr.clone());
1301    }
1302
1303    parts.join(" ")
1304}
1305
1306pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1307    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1308        Ok(addresses) => addresses.collect(),
1309        Err(_) => return false,
1310    };
1311
1312    addresses
1313        .into_iter()
1314        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1315}
1316
1317#[inline(never)]
1318fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1319    // Phase 6 logging is initialised inside each runner once the
1320    // runtime is open — see `build_runtime_and_auth_store`. Going
1321    // after DB open lets us read `red.logging.*` config keys out of
1322    // the persistent red_config store and merge them with the CLI
1323    // flags (flag > red_config > built-in default).
1324    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1325        return run_routed_server(config, router_bind_addr);
1326    }
1327
1328    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1329        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1330            run_dual_server(config, grpc_bind_addr, http_bind_addr)
1331        }
1332        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1333        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1334        (None, None) => {
1335            if let Some(wire_addr) = config.wire_bind_addr.clone() {
1336                run_wire_only_server(config, wire_addr)
1337            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1338                run_pg_only_server(config, pg_addr)
1339            } else {
1340                Err("at least one server bind address must be configured".to_string())
1341            }
1342        }
1343    }
1344}
1345
1346/// Bind a TCP listener for a transport at startup and record the
1347/// outcome in the shared [`TransportReadiness`] state.
1348///
1349/// The split between *explicit* and *implicit/default* binds is the
1350/// contract from issue #545:
1351///
1352/// * `explicit == true` — the operator named this transport on the
1353///   CLI / env / config. A failed bind is fatal: this returns `Err`
1354///   and the boot path must propagate the error so the process exits
1355///   non-zero with the recorded `reason`.
1356/// * `explicit == false` — this is a default listener the server
1357///   would have spun up regardless. A failed bind degrades: this
1358///   returns `Ok(None)` (no listener) but the failure is still
1359///   recorded in `readiness.failed`, so the server keeps running and
1360///   the next `/health` probe enumerates the degraded listener.
1361///
1362/// On success the bound listener lands in `readiness.active`.
1363pub fn bind_listener_for_startup(
1364    readiness: &mut TransportReadiness,
1365    transport: &str,
1366    bind_addr: &str,
1367    explicit: bool,
1368) -> Result<Option<TcpListener>, String> {
1369    match TcpListener::bind(bind_addr) {
1370        Ok(listener) => {
1371            readiness.active(transport, bind_addr, explicit);
1372            Ok(Some(listener))
1373        }
1374        Err(err) => {
1375            let reason = format!("{transport} listener bind {bind_addr}: {err}");
1376            readiness.failed(transport, bind_addr, explicit, reason.clone());
1377            if explicit {
1378                tracing::error!(
1379                    transport,
1380                    bind = %bind_addr,
1381                    error = %err,
1382                    "fatal explicit bind failure"
1383                );
1384                Err(format!("explicit {reason}"))
1385            } else {
1386                tracing::warn!(
1387                    transport,
1388                    bind = %bind_addr,
1389                    error = %err,
1390                    "non-fatal implicit bind failure; listener degraded"
1391                );
1392                Ok(None)
1393            }
1394        }
1395    }
1396}
1397
1398/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
1399///
1400/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
1401/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
1402/// grace window. SIGKILL after that grace window is the OS's
1403/// fallback; we are responsible for finishing in time.
1404///
1405/// Spawns a tokio task on the caller's runtime that:
1406///   1. Awaits the first of SIGTERM / SIGINT.
1407///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
1408///      runtime moves to `Stopped` on its own; this just runs the
1409///      flush + checkpoint pipeline and (optionally) a final backup.
1410///   3. Calls `std::process::exit(0)` so the orchestrator sees a
1411///      clean exit code.
1412///
1413/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
1414/// configured) toggles step 3's backup branch. The flush + checkpoint
1415/// always run.
1416///
1417/// Idempotent across signal storms — `graceful_shutdown` returns the
1418/// cached report on second call, but we exit on the first one
1419/// regardless, so the second SIGTERM never reaches the handler.
1420async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1421    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1422        .ok()
1423        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1424        .unwrap_or(true);
1425
1426    #[cfg(unix)]
1427    {
1428        use tokio::signal::unix::{signal, SignalKind};
1429
1430        let mut sigterm = match signal(SignalKind::terminate()) {
1431            Ok(s) => s,
1432            Err(err) => {
1433                tracing::warn!(
1434                    error = %err,
1435                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1436                );
1437                return;
1438            }
1439        };
1440        let mut sigint = match signal(SignalKind::interrupt()) {
1441            Ok(s) => s,
1442            Err(err) => {
1443                tracing::warn!(error = %err, "could not install SIGINT handler");
1444                return;
1445            }
1446        };
1447        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
1448        // their `_FILE` companions without restarting the process.
1449        // Useful for credential rotation pipelines (kubectl create
1450        // secret + kubectl rollout restart, but for systemd / Nomad /
1451        // bare-metal where rolling the process is heavier).
1452        let mut sighup = match signal(SignalKind::hangup()) {
1453            Ok(s) => Some(s),
1454            Err(err) => {
1455                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1456                None
1457            }
1458        };
1459
1460        let reload_runtime = runtime.clone();
1461        tokio::spawn(async move {
1462            loop {
1463                let signal_name = match &mut sighup {
1464                    Some(hup) => tokio::select! {
1465                        _ = sigterm.recv() => "SIGTERM",
1466                        _ = sigint.recv() => "SIGINT",
1467                        _ = hup.recv() => "SIGHUP",
1468                    },
1469                    None => tokio::select! {
1470                        _ = sigterm.recv() => "SIGTERM",
1471                        _ = sigint.recv() => "SIGINT",
1472                    },
1473                };
1474
1475                if signal_name == "SIGHUP" {
1476                    handle_sighup_reload(&reload_runtime);
1477                    continue; // stay alive; SIGHUP isn't a shutdown
1478                }
1479
1480                tracing::info!(
1481                    signal = signal_name,
1482                    "lifecycle signal received; shutting down"
1483                );
1484                match runtime.graceful_shutdown(backup_on_shutdown) {
1485                    Ok(report) => {
1486                        tracing::info!(
1487                            duration_ms = report.duration_ms,
1488                            flushed_wal = report.flushed_wal,
1489                            final_checkpoint = report.final_checkpoint,
1490                            backup_uploaded = report.backup_uploaded,
1491                            "graceful shutdown complete"
1492                        );
1493                    }
1494                    Err(err) => {
1495                        tracing::error!(error = %err, "graceful shutdown failed");
1496                        // Issue #205 — graceful shutdown returning Err
1497                        // means the runtime is exiting without a clean
1498                        // flush/checkpoint. Operator-grade event so the
1499                        // operator notices the dirty exit even when the
1500                        // process restarts before they read tracing logs.
1501                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1502                            reason: format!("graceful shutdown failed: {err}"),
1503                        }
1504                        .emit_global();
1505                    }
1506                }
1507                std::process::exit(0);
1508            }
1509        });
1510    }
1511
1512    #[cfg(not(unix))]
1513    {
1514        tokio::spawn(async move {
1515            let interrupted = tokio::signal::ctrl_c().await;
1516            if let Err(err) = interrupted {
1517                tracing::warn!(error = %err, "could not install Ctrl+C handler");
1518                return;
1519            }
1520
1521            tracing::info!(
1522                signal = "Ctrl+C",
1523                "lifecycle signal received; shutting down"
1524            );
1525            match runtime.graceful_shutdown(backup_on_shutdown) {
1526                Ok(report) => {
1527                    tracing::info!(
1528                        duration_ms = report.duration_ms,
1529                        flushed_wal = report.flushed_wal,
1530                        final_checkpoint = report.final_checkpoint,
1531                        backup_uploaded = report.backup_uploaded,
1532                        "graceful shutdown complete"
1533                    );
1534                }
1535                Err(err) => {
1536                    tracing::error!(error = %err, "graceful shutdown failed");
1537                }
1538            }
1539            std::process::exit(0);
1540        });
1541    }
1542}
1543
1544/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
1545/// vars. Today this only refreshes the audit log + records the
1546/// reload event; the runtime modules that hold cached secret
1547/// material (S3 backend credentials, admin token, JWT keys) read
1548/// the env on each request so the next call after SIGHUP picks up
1549/// the new file contents automatically. A future extension can
1550/// punch through to the LeaseStore / AuthStore for in-memory
1551/// caches that don't re-read on each call.
1552fn handle_sighup_reload(runtime: &RedDBRuntime) {
1553    let now_ms = std::time::SystemTime::now()
1554        .duration_since(std::time::UNIX_EPOCH)
1555        .map(|d| d.as_millis() as u64)
1556        .unwrap_or(0);
1557    tracing::info!(
1558        target: "reddb::secrets",
1559        ts_unix_ms = now_ms,
1560        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1561    );
1562    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1563    // every emission goes through the typed-field guard. The
1564    // arguments here are static, but using the typed entry point
1565    // keeps the discipline uniform across call sites.
1566    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1567    runtime.audit_log().record_event(
1568        AuditEvent::builder("config/sighup_reload")
1569            .source(AuditAuthSource::System)
1570            .resource("secrets")
1571            .outcome(Outcome::Success)
1572            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1573            .build(),
1574    );
1575}
1576
1577#[inline(never)]
1578fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1579    let workers = config.workers;
1580    let db_options = config.to_db_options()?;
1581    let rt_config = detect_runtime_config();
1582    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1583    let (runtime, auth_store, _telemetry_guard) =
1584        build_runtime_and_auth_store(&config, db_options.clone())?;
1585    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1586
1587    spawn_admin_metrics_listeners(&runtime, &auth_store);
1588
1589    // Issue #933: collapse the loopback proxy. All three transports are
1590    // served from one in-process acceptor that shares the single tokio
1591    // runtime (ADR 0035) — no internal HTTP/gRPC/wire listeners, no
1592    // `copy_bidirectional` hop. We build the handler objects here and hand
1593    // them to the demux, which classifies each connection and dispatches.
1594    let http_server = build_http_server(
1595        runtime.clone(),
1596        auth_store.clone(),
1597        router_bind_addr.clone(),
1598    );
1599    let http_server = apply_http_limits(http_server, &config, &runtime);
1600    let http_server = apply_ui_bundle(http_server, &config)?;
1601
1602    let grpc_server = RedDBGrpcServer::with_options(
1603        runtime.clone(),
1604        GrpcServerOptions {
1605            bind_addr: router_bind_addr.clone(),
1606            tls: None,
1607        },
1608        auth_store,
1609    );
1610
1611    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1612        .enable_all()
1613        .worker_threads(worker_threads)
1614        .thread_stack_size(rt_config.stack_size)
1615        .build()
1616        .map_err(|err| format!("tokio runtime: {err}"))?;
1617
1618    let signal_runtime = runtime.clone();
1619    let wire_runtime = Arc::new(runtime);
1620    tokio_runtime.block_on(async move {
1621        spawn_lifecycle_signal_handler(signal_runtime).await;
1622        tracing::info!(
1623            bind = %router_bind_addr,
1624            cpus = rt_config.available_cpus,
1625            workers = worker_threads,
1626            "router bootstrapping"
1627        );
1628        serve_tcp_router(InProcessRouterConfig {
1629            bind_addr: router_bind_addr,
1630            http_server,
1631            grpc_server,
1632            wire_runtime,
1633        })
1634        .await
1635        .map_err(|err| err.to_string())
1636    })
1637}
1638
1639/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1640async fn spawn_wire_listeners(
1641    config: &ServerCommandConfig,
1642    runtime: &RedDBRuntime,
1643    readiness: &mut TransportReadiness,
1644) -> Result<(), String> {
1645    // Plaintext RedWire — TCP or Unix socket
1646    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1647        let wire_rt = Arc::new(runtime.clone());
1648        // Address starting with `unix://` or an absolute filesystem path
1649        // switches to Unix domain sockets.
1650        #[cfg(unix)]
1651        {
1652            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1653                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1654                tokio::spawn(async move {
1655                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1656                        &wire_addr, wire_rt,
1657                    )
1658                    .await
1659                    {
1660                        tracing::error!(err = %e, "redwire unix listener error");
1661                    }
1662                });
1663                return Ok(());
1664            }
1665        }
1666        match tokio::net::TcpListener::bind(&wire_addr).await {
1667            Ok(listener) => {
1668                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1669                tokio::spawn(async move {
1670                    if let Err(e) =
1671                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1672                            .await
1673                    {
1674                        tracing::error!(err = %e, "redwire listener error");
1675                    }
1676                });
1677            }
1678            Err(err) => {
1679                let reason = format!("wire listener bind {wire_addr}: {err}");
1680                readiness.failed(
1681                    "wire",
1682                    &wire_addr,
1683                    config.wire_bind_explicit,
1684                    reason.clone(),
1685                );
1686                if config.wire_bind_explicit {
1687                    tracing::error!(
1688                        transport = "wire",
1689                        bind = %wire_addr,
1690                        error = %err,
1691                        "fatal explicit bind failure"
1692                    );
1693                    return Err(format!("explicit {reason}"));
1694                }
1695                tracing::warn!(
1696                    transport = "wire",
1697                    bind = %wire_addr,
1698                    error = %err,
1699                    "non-fatal implicit bind failure; listener degraded"
1700                );
1701            }
1702        }
1703    }
1704
1705    // RedWire over TLS
1706    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1707        let tls_config = resolve_wire_tls_config(config);
1708        match tls_config {
1709            Ok(tls_cfg) => {
1710                let wire_rt = Arc::new(runtime.clone());
1711                tokio::spawn(async move {
1712                    if let Err(e) =
1713                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1714                            .await
1715                    {
1716                        tracing::error!(err = %e, "redwire+tls listener error");
1717                    }
1718                });
1719            }
1720            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1721        }
1722    }
1723    Ok(())
1724}
1725
1726/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1727///
1728/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1729/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1730/// alongside the native wire listener; the two transports do not
1731/// share a port.
1732fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1733    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1734        let rt = Arc::new(runtime.clone());
1735        tokio::spawn(async move {
1736            let cfg = crate::wire::PgWireConfig {
1737                bind_addr: pg_addr,
1738                ..Default::default()
1739            };
1740            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1741                tracing::error!(err = %e, "pg wire listener error");
1742            }
1743        });
1744    }
1745}
1746
1747/// Resolve gRPC TLS material into PEM bytes.
1748///
1749/// Lookup order, in priority:
1750///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1751///      passed via CLI/env). Both must be present together.
1752///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1753///      to the data dir. Refuses to run without the env flag so an
1754///      operator can't accidentally ship a dev cert in prod.
1755///
1756/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1757/// turning the listener into a mutual-TLS endpoint that requires
1758/// every client to present a chain that anchors at one of the CAs
1759/// in the bundle.
1760fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1761    use crate::utils::secret_file::expand_file_env;
1762
1763    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1764    // surface as warnings; the fallback path (explicit cert paths) will
1765    // cover the common case.
1766    for var in [
1767        "REDDB_GRPC_TLS_CERT",
1768        "REDDB_GRPC_TLS_KEY",
1769        "REDDB_GRPC_TLS_CLIENT_CA",
1770    ] {
1771        if let Err(err) = expand_file_env(var) {
1772            tracing::warn!(
1773                target: "reddb::secrets",
1774                env = %var,
1775                err = %err,
1776                "could not expand *_FILE companion for gRPC TLS"
1777            );
1778        }
1779    }
1780
1781    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1782        (Some(cert), Some(key)) => {
1783            let cert_pem = std::fs::read(cert)
1784                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1785            let key_pem =
1786                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1787            (cert_pem, key_pem)
1788        }
1789        _ => {
1790            // No explicit material → only proceed when dev-mode is on.
1791            let dev = std::env::var("RED_GRPC_TLS_DEV")
1792                .ok()
1793                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1794                .unwrap_or(false);
1795            if !dev {
1796                return Err("gRPC TLS configured but no cert/key supplied — set \
1797                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1798                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1799                    .to_string());
1800            }
1801            let dir = config
1802                .path
1803                .as_ref()
1804                .and_then(|p| p.parent())
1805                .map(PathBuf::from)
1806                .unwrap_or_else(|| PathBuf::from("."));
1807            let (cert_pem_str, key_pem_str) =
1808                crate::wire::tls::generate_self_signed_cert("localhost")
1809                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1810
1811            // Constant-time-friendly fingerprint to stderr so the
1812            // operator can pin a client trust store. We log via
1813            // `tracing::warn!` so it stands out next to ordinary
1814            // listener-online events.
1815            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1816            tracing::warn!(
1817                target: "reddb::security",
1818                transport = "grpc",
1819                cert_sha256 = %fp,
1820                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1821                 DO NOT use in production"
1822            );
1823            // Persist so that restarts reuse the same identity.
1824            let cert_path = dir.join("grpc-tls-cert.pem");
1825            let key_path = dir.join("grpc-tls-key.pem");
1826            if !cert_path.exists() || !key_path.exists() {
1827                let _ = std::fs::create_dir_all(&dir);
1828                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1829                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1830                std::fs::write(&key_path, key_pem_str.as_bytes())
1831                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1832                #[cfg(unix)]
1833                {
1834                    use std::os::unix::fs::PermissionsExt;
1835                    let _ =
1836                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1837                }
1838            }
1839            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1840        }
1841    };
1842
1843    let client_ca_pem = match &config.grpc_tls_client_ca {
1844        Some(path) => Some(
1845            std::fs::read(path)
1846                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1847        ),
1848        None => None,
1849    };
1850
1851    Ok(crate::GrpcTlsOptions {
1852        cert_pem,
1853        key_pem,
1854        client_ca_pem,
1855    })
1856}
1857
1858/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1859/// configured. Logs and continues on TLS-config errors so the plain
1860/// listener stays up; this matches the wire-listener pattern.
1861fn spawn_grpc_tls_listener_if_configured(
1862    config: &ServerCommandConfig,
1863    runtime: RedDBRuntime,
1864    auth_store: Arc<AuthStore>,
1865) {
1866    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1867        return;
1868    };
1869    let tls_opts = match resolve_grpc_tls_options(config) {
1870        Ok(opts) => opts,
1871        Err(err) => {
1872            tracing::error!(
1873                target: "reddb::security",
1874                transport = "grpc",
1875                err = %err,
1876                "gRPC TLS config error; TLS listener will not start"
1877            );
1878            return;
1879        }
1880    };
1881    tokio::spawn(async move {
1882        let server = RedDBGrpcServer::with_options(
1883            runtime,
1884            GrpcServerOptions {
1885                bind_addr: tls_bind.clone(),
1886                tls: Some(tls_opts),
1887            },
1888            auth_store,
1889        );
1890        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1891        if let Err(err) = server.serve().await {
1892            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1893        }
1894    });
1895}
1896
1897/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1898/// lines. Constant-time hash; no token contents leave this fn.
1899fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1900    use sha2::{Digest, Sha256};
1901    let mut h = Sha256::new();
1902    h.update(pem);
1903    let d = h.finalize();
1904    let mut buf = String::with_capacity(64);
1905    for b in d.iter() {
1906        buf.push_str(&format!("{b:02x}"));
1907    }
1908    buf
1909}
1910
1911/// Resolve TLS config: use provided cert/key or auto-generate.
1912fn resolve_wire_tls_config(
1913    config: &ServerCommandConfig,
1914) -> Result<crate::wire::WireTlsConfig, String> {
1915    match (&config.wire_tls_cert, &config.wire_tls_key) {
1916        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1917            cert_path: cert.clone(),
1918            key_path: key.clone(),
1919        }),
1920        _ => {
1921            // Auto-generate self-signed cert
1922            let dir = config
1923                .path
1924                .as_ref()
1925                .and_then(|p| p.parent())
1926                .map(PathBuf::from)
1927                .unwrap_or_else(|| PathBuf::from("."));
1928            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1929        }
1930    }
1931}
1932
1933#[inline(never)]
1934fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1935    let rt_config = detect_runtime_config();
1936    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1937    let db_options = config.to_db_options()?;
1938    let mut transport_readiness = TransportReadiness::default();
1939
1940    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1941        .enable_all()
1942        .worker_threads(workers)
1943        .thread_stack_size(rt_config.stack_size)
1944        .build()
1945        .map_err(|err| format!("tokio runtime: {err}"))?;
1946
1947    // Guard lives on the outer thread's stack so it outlives the
1948    // tokio runtime — dropping it only after the listener returns
1949    // flushes the file log writer.
1950    let (runtime, _auth_store, _telemetry_guard) =
1951        build_runtime_and_auth_store(&config, db_options.clone())?;
1952    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1953    let signal_runtime = runtime.clone();
1954    tokio_runtime.block_on(async move {
1955        spawn_lifecycle_signal_handler(signal_runtime).await;
1956        spawn_pg_listener(&config, &runtime);
1957        let wire_rt = Arc::new(runtime);
1958        let listener = tokio::net::TcpListener::bind(&wire_addr)
1959            .await
1960            .map_err(|err| {
1961                let reason = format!("wire listener bind {wire_addr}: {err}");
1962                transport_readiness.failed(
1963                    "wire",
1964                    &wire_addr,
1965                    config.wire_bind_explicit,
1966                    reason.clone(),
1967                );
1968                if config.wire_bind_explicit {
1969                    format!("explicit {reason}")
1970                } else {
1971                    reason
1972                }
1973            })?;
1974        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1975        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1976            .await
1977            .map_err(|e| e.to_string())
1978    })
1979}
1980
1981#[inline(never)]
1982fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1983    let rt_config = detect_runtime_config();
1984    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1985    let db_options = config.to_db_options()?;
1986
1987    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1988        .enable_all()
1989        .worker_threads(workers)
1990        .thread_stack_size(rt_config.stack_size)
1991        .build()
1992        .map_err(|err| format!("tokio runtime: {err}"))?;
1993
1994    let (runtime, _auth_store, _telemetry_guard) =
1995        build_runtime_and_auth_store(&config, db_options.clone())?;
1996    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1997    let signal_runtime = runtime.clone();
1998    tokio_runtime.block_on(async move {
1999        spawn_lifecycle_signal_handler(signal_runtime).await;
2000        let cfg = crate::wire::PgWireConfig {
2001            bind_addr: pg_addr,
2002            ..Default::default()
2003        };
2004        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
2005            .await
2006            .map_err(|e| e.to_string())
2007    })
2008}
2009
2010#[inline(never)]
2011fn build_runtime_and_auth_store(
2012    config: &ServerCommandConfig,
2013    db_options: RedDBOptions,
2014) -> Result<
2015    (
2016        RedDBRuntime,
2017        Arc<AuthStore>,
2018        Option<crate::telemetry::TelemetryGuard>,
2019    ),
2020    String,
2021> {
2022    // Return the TelemetryGuard so server runners can bind it for
2023    // their full lifetime. Dropping the guard tears down the
2024    // non-blocking log writer thread and, because that writer is
2025    // built with `.lossy(true)`, any subsequent log event routed to
2026    // the file sink is silently dropped — so callers MUST keep the
2027    // returned `Option<TelemetryGuard>` alive until shutdown.
2028    build_runtime_with_bootstrap(db_options, config.telemetry.clone(), &config.bootstrap)
2029}
2030
2031/// Open the runtime, initialise structured logging from merged CLI +
2032/// `red_config` settings, and return a guard the caller must keep
2033/// alive for the server lifetime (drop flushes pending log writes).
2034///
2035/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
2036/// in red_config, beats the built-in default. A CLI-flag value of
2037/// `None` / empty means "inherit from config or default" — never
2038/// "disable". The one exception is `--no-log-file` which forces
2039/// `log_dir = None` regardless of config.
2040pub(crate) fn build_runtime_with_telemetry(
2041    db_options: RedDBOptions,
2042    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
2043) -> Result<
2044    (
2045        RedDBRuntime,
2046        Arc<AuthStore>,
2047        Option<crate::telemetry::TelemetryGuard>,
2048    ),
2049    String,
2050> {
2051    let bootstrap = BootstrapConfig::default();
2052    build_runtime_with_bootstrap(db_options, cli_telemetry, &bootstrap)
2053}
2054
2055fn build_runtime_with_bootstrap(
2056    db_options: RedDBOptions,
2057    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
2058    bootstrap: &BootstrapConfig,
2059) -> Result<
2060    (
2061        RedDBRuntime,
2062        Arc<AuthStore>,
2063        Option<crate::telemetry::TelemetryGuard>,
2064    ),
2065    String,
2066> {
2067    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
2068        // Issue #205 — runtime construction failure is the canonical
2069        // StartupFailed phase. The audit sink isn't installed yet
2070        // (it would have been registered inside `with_options`), so
2071        // the emit falls through to tracing+eprintln only — operator
2072        // still sees it on stderr.
2073        let msg = err.to_string();
2074        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
2075            phase: "runtime_construction".to_string(),
2076            error: msg.clone(),
2077        }
2078        .emit_global();
2079        msg
2080    })?;
2081
2082    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
2083    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
2084    // operator asked for a fence; running unfenced would silently
2085    // expose split-brain risk.
2086    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
2087        let msg = err.to_string();
2088        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
2089            phase: "lease_loop".to_string(),
2090            error: msg.clone(),
2091        }
2092        .emit_global();
2093        msg
2094    })?;
2095
2096    // #213 — edge-triggered disk-space watchdog. Watches the data
2097    // directory; falls back to polling when fanotify is unavailable
2098    // (non-Linux or unprivileged container).
2099    if let Some(data_path) = db_options.data_path.as_deref() {
2100        let watch_dir = data_path.parent().unwrap_or(data_path);
2101        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
2102    }
2103
2104    // #214 — inotify config hot-reload watcher. Watches the config file
2105    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
2106    // applies hot-reloadable keys without restart.
2107    {
2108        let config_path = crate::runtime::config_overlay::config_file_path();
2109        let store = runtime.db().store();
2110        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
2111    }
2112
2113    // Phase 6 logging: merge red_config overrides onto the CLI-built
2114    // telemetry config, then install the global subscriber.
2115    let merged = merge_telemetry_with_config(
2116        cli_telemetry
2117            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
2118        &runtime,
2119    );
2120    let telemetry_guard = crate::telemetry::init(merged);
2121
2122    let no_auth = no_auth_active(&db_options);
2123    let auth_store =
2124        if db_options.auth.vault_enabled {
2125            let pager =
2126                runtime.db().store().pager().cloned().ok_or_else(|| {
2127                    "vault requires a paged database (persistent mode)".to_string()
2128                })?;
2129            let store = AuthStore::with_vault(db_options.auth.clone(), pager)
2130                .map_err(|err| err.to_string())?;
2131            Arc::new(store)
2132        } else {
2133            Arc::new(AuthStore::new(db_options.auth.clone()))
2134        };
2135    auth_store.configure_control_events(
2136        runtime.control_event_ledger(),
2137        runtime.control_event_config(),
2138    );
2139    // ADR 0058 — cluster bootstrap authority fail-closed seam. A
2140    // cluster-shaped boot may only run auth bootstrap when a concrete
2141    // reserved-range owner can be proven; none is implemented yet, so the
2142    // seam rejects credentialled cluster boots and allows only the explicit
2143    // `--no-auth` / `--dev` development carveout. Issue #663 — when
2144    // `--no-auth` is active, deliberately skip the preset machinery so a
2145    // stray `REDDB_USERNAME`+`REDDB_PASSWORD` pair cannot silently create an
2146    // admin, defeating the point of anonymous mode.
2147    //
2148    // Issue #1230 — the durable bootstrap completion marker
2149    // (`system.bootstrap.completed`) is the authority path's record that
2150    // first boot already produced global auth state. Observing it makes a
2151    // restart or duplicate bootstrap attempt idempotent: the seam returns
2152    // `AlreadyComplete` and the caller rehydrates read-only state only,
2153    // recreating no users, reissuing no vault certificate, and reapplying no
2154    // mutable config over operator changes.
2155    let bootstrap_already_completed = runtime
2156        .db()
2157        .store()
2158        .get_config(BOOTSTRAP_COMPLETED_KEY)
2159        .is_some();
2160    let disposition = crate::cluster::authorize_cluster_bootstrap(
2161        db_options.storage_profile.deploy_profile,
2162        no_auth,
2163        bootstrap.auth_bootstrap_input(),
2164        bootstrap_already_completed,
2165    )?;
2166    // Issue #1231 — wire cluster vault first boot through the bootstrap
2167    // authority. The authorized disposition selects the vault plan: the owner
2168    // path (`ProceedLocal`) opens the vault against the real cluster-global
2169    // auth store (`auth_store`, backed by the runtime pager) and consumes the
2170    // env/`_FILE` secret inputs to mint the certificate; a restart after
2171    // completion (`AlreadyComplete`) reopens and unseals that same store
2172    // without consuming secret inputs. A non-owner cluster boot already failed
2173    // closed above, so no scratch or per-member vault is minted.
2174    let vault_plan = crate::cluster::plan_vault_bootstrap(disposition);
2175    match disposition {
2176        crate::cluster::BootstrapDisposition::SkipDevBypass => {
2177            debug_assert_eq!(vault_plan, crate::cluster::VaultBootstrapPlan::SkipNoVault);
2178            eprintln!("{NO_AUTH_WARNING}");
2179            tracing::warn!("{NO_AUTH_WARNING}");
2180        }
2181        crate::cluster::BootstrapDisposition::AlreadyComplete => {
2182            // First boot is over. Rehydrate the read-only manifest registry
2183            // so config-derived state is observable, but create no users,
2184            // reissue no vault material, and reapply no mutable config
2185            // (issue #1230). `apply_preset_from_config` does exactly this
2186            // when the completion marker is present — matching the unseal-only
2187            // vault plan (no secret inputs consumed).
2188            debug_assert_eq!(
2189                vault_plan,
2190                crate::cluster::VaultBootstrapPlan::OpenClusterGlobalStore {
2191                    consume_secret_inputs: false,
2192                }
2193            );
2194            apply_preset_from_config(&runtime, &auth_store, bootstrap)?;
2195            tracing::info!("bootstrap already completed, unsealing existing cluster auth store");
2196        }
2197        crate::cluster::BootstrapDisposition::ProceedLocal => {
2198            // Owner first boot: open the vault against the real cluster-global
2199            // auth store and consume the secret inputs to mint the certificate.
2200            debug_assert_eq!(
2201                vault_plan,
2202                crate::cluster::VaultBootstrapPlan::OpenClusterGlobalStore {
2203                    consume_secret_inputs: true,
2204                }
2205            );
2206            apply_preset_from_config(&runtime, &auth_store, bootstrap)?;
2207            maybe_apply_policy_break_glass(&auth_store);
2208        }
2209    }
2210
2211    // Background session purge (every 5 minutes)
2212    {
2213        let store = Arc::clone(&auth_store);
2214        std::thread::Builder::new()
2215            .name("reddb-session-purge".into())
2216            .spawn(move || loop {
2217                std::thread::sleep(std::time::Duration::from_secs(300));
2218                store.purge_expired_sessions();
2219            })
2220            .ok();
2221    }
2222
2223    Ok((runtime, auth_store, telemetry_guard))
2224}
2225
2226/// Honour `REDDB_POLICY_BREAK_GLASS=1` at boot — see issue #713 and
2227/// the [`crate::auth::self_lock_guard`] module. Anything other than
2228/// `1`/`true`/`yes` (case-insensitive) is treated as not set.
2229fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
2230    use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
2231
2232    let enabled = std::env::var(BREAK_GLASS_ENV)
2233        .ok()
2234        .map(|v| {
2235            let trimmed = v.trim().to_ascii_lowercase();
2236            matches!(trimmed.as_str(), "1" | "true" | "yes")
2237        })
2238        .unwrap_or(false);
2239    if !enabled {
2240        return;
2241    }
2242    let now = crate::utils::now_unix_millis() as u128;
2243    match auth_store.apply_policy_break_glass(now) {
2244        Ok(()) => {
2245            tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2246        }
2247        Err(err) => {
2248            tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2249        }
2250    }
2251}
2252
2253/// Reserved config keys describing first-boot bootstrap state (issue #650).
2254/// Presence of [`BOOTSTRAP_COMPLETED_KEY`] is the idempotency hinge: when
2255/// it is set, [`apply_preset`] silently no-ops on subsequent boots so a
2256/// container restart with the same env is a no-op.
2257pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2258pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2259pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2260
2261/// Env vars selecting the bootstrap preset. `REDDB_BOOTSTRAP_PRESET`
2262/// is canonical; `REDDB_PRESET` remains accepted for compatibility.
2263pub(crate) const BOOTSTRAP_PRESET_ENV: &str = "REDDB_BOOTSTRAP_PRESET";
2264pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2265pub(crate) const PRESET_SIMPLE: &str = "simple";
2266pub(crate) const PRESET_PRODUCTION: &str = "production";
2267pub(crate) const PRESET_REGULATED: &str = "regulated";
2268pub(crate) const PRESET_CLOUD: &str = "cloud";
2269
2270/// Policy id installed by the `production` preset and attached to the
2271/// first admin. Grants `"*"` on `"*"` so the admin has policy-derived
2272/// broad authority (acceptance #3) — not an authorization bypass.
2273pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2274pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2275pub(crate) const CLOUD_PROTECT_MANAGED_POLICY: &str = "system.cloud.protect-managed";
2276pub(crate) const CLOUD_CONFIG_NAMESPACE: &str = "red.config.cloud";
2277pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2278pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2279pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2280
2281/// Apply the bootstrap preset selected by `REDDB_BOOTSTRAP_PRESET` (or
2282/// legacy `REDDB_PRESET`; default `simple`). Idempotent — if
2283/// `system.bootstrap.completed` is already set, this is a one-line
2284/// `tracing::info!` no-op and the server proceeds (issue #650
2285/// acceptance #5).
2286///
2287/// Caller must have already short-circuited the `--no-auth` / `--dev`
2288/// path (issue #663): when that flag is set, the preset must be skipped
2289/// entirely — no admin created, no bootstrap state written.
2290pub(crate) fn apply_preset(
2291    runtime: &RedDBRuntime,
2292    auth_store: &Arc<AuthStore>,
2293) -> Result<(), String> {
2294    let bootstrap = BootstrapConfig::default();
2295    apply_preset_from_config(runtime, auth_store, &bootstrap)
2296}
2297
2298fn apply_preset_from_config(
2299    runtime: &RedDBRuntime,
2300    auth_store: &Arc<AuthStore>,
2301    bootstrap: &BootstrapConfig,
2302) -> Result<(), String> {
2303    let store = runtime.db().store();
2304
2305    if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2306        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2307            runtime,
2308            &runtime.config_registry(),
2309        )?;
2310        tracing::info!("bootstrap state present, skipping preset application");
2311        return Ok(());
2312    }
2313
2314    let preset = bootstrap.resolved_preset();
2315
2316    // `_FILE` companion expansion for k8s secret mounts. Only expand
2317    // vars that this selected preset may read from env; explicit CLI
2318    // credentials have already won and should not be blocked by an
2319    // unrelated env-file misconfiguration.
2320    for var in bootstrap.secret_env_vars_to_expand(&preset) {
2321        crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2322    }
2323
2324    if let Some(path) = bootstrap.selected_manifest() {
2325        let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2326            runtime,
2327            auth_store,
2328            &runtime.config_registry(),
2329            &path,
2330        )?;
2331        persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2332        tracing::info!("bootstrap manifest applied");
2333        return Ok(());
2334    }
2335
2336    let first_admin_id = match preset.as_str() {
2337        PRESET_SIMPLE => {
2338            // `simple` is the default low-friction preset. Auth knobs
2339            // remain whatever the CLI/env set; we only persist the
2340            // bootstrap state so subsequent boots are idempotent.
2341            None
2342        }
2343        PRESET_PRODUCTION => Some(apply_production_preset_from_config(auth_store, bootstrap)?),
2344        PRESET_CLOUD => Some(apply_cloud_preset(runtime, auth_store, bootstrap)?),
2345        PRESET_REGULATED => {
2346            apply_regulated_preset(runtime, auth_store)?;
2347            None
2348        }
2349        other => {
2350            return Err(format!(
2351                "bootstrap preset {other:?} is not recognised (expected `simple`, `production`, `regulated`, or `cloud`)"
2352            ));
2353        }
2354    };
2355
2356    persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2357    tracing::info!(preset = %preset, "bootstrap preset applied");
2358    Ok(())
2359}
2360
2361fn apply_production_preset_from_config(
2362    auth_store: &Arc<AuthStore>,
2363    bootstrap: &BootstrapConfig,
2364) -> Result<String, String> {
2365    use crate::auth::store::PrincipalRef;
2366    use crate::auth::UserId;
2367
2368    let username = bootstrap.production_username().ok_or_else(|| {
2369        "production preset requires --bootstrap-admin or REDDB_USERNAME (or REDDB_USERNAME_FILE)"
2370            .to_string()
2371    })?;
2372    let password = bootstrap.production_password().ok_or_else(|| {
2373        "production preset requires --bootstrap-admin-password or REDDB_PASSWORD (or REDDB_PASSWORD_FILE)"
2374            .to_string()
2375    })?;
2376
2377    // (1) Create the first admin as an ordinary platform-scoped user.
2378    // Managed guardrails are policy-derived; bootstrap does not stamp a
2379    // parallel ownership flag.
2380    let result = auth_store
2381        .bootstrap(&username, &password)
2382        .map_err(|err| format!("bootstrap first admin: {err}"))?;
2383    if let Some(cert) = result.certificate.as_deref() {
2384        eprintln!("[reddb] CERTIFICATE: {}", cert);
2385        tracing::warn!(
2386            "vault certificate issued by REDDB_PRESET=production -- save it and update the runtime secret before restart"
2387        );
2388    }
2389    let first_admin = UserId::platform(result.user.username.clone());
2390
2391    // (2) Install the allow-all policy as an ordinary policy row.
2392    install_allow_all_policy(auth_store)?;
2393
2394    // (3) Attach the policy to the first admin.
2395    auth_store
2396        .attach_policy(
2397            PrincipalRef::User(first_admin.clone()),
2398            FIRST_ADMIN_ALLOW_ALL_POLICY,
2399        )
2400        .map_err(|err| format!("attach allow-all policy: {err}"))?;
2401
2402    Ok(first_admin.to_string())
2403}
2404
2405fn apply_cloud_preset(
2406    runtime: &RedDBRuntime,
2407    auth_store: &Arc<AuthStore>,
2408    bootstrap: &BootstrapConfig,
2409) -> Result<String, String> {
2410    use crate::auth::store::PrincipalRef;
2411    use crate::auth::{Role, UserId};
2412
2413    let head_username = bootstrap.cloud_head_username().ok_or_else(|| {
2414        "cloud preset requires --cloud-head-admin or REDDB_CLOUD_HEAD_ADMIN (or REDDB_USERNAME)"
2415            .to_string()
2416    })?;
2417    let head_password = bootstrap.cloud_head_password().ok_or_else(|| {
2418        "cloud preset requires --cloud-head-admin-password or REDDB_CLOUD_HEAD_ADMIN_PASSWORD (or REDDB_PASSWORD)"
2419            .to_string()
2420    })?;
2421    let customer_username = bootstrap.customer_username().ok_or_else(|| {
2422        "cloud preset requires --customer-admin or REDDB_CUSTOMER_ADMIN".to_string()
2423    })?;
2424    let customer_password = bootstrap.customer_password().ok_or_else(|| {
2425        "cloud preset requires --customer-admin-password or REDDB_CUSTOMER_ADMIN_PASSWORD"
2426            .to_string()
2427    })?;
2428    if head_username == customer_username {
2429        return Err("cloud preset requires distinct head and customer admin usernames".to_string());
2430    }
2431
2432    let head = auth_store
2433        .bootstrap_system_admin(&head_username, &head_password)
2434        .map_err(|err| format!("bootstrap cloud head admin: {err}"))?;
2435    let head_id = UserId::platform(head.user.username.clone());
2436
2437    auth_store
2438        .create_user(&customer_username, &customer_password, Role::Admin)
2439        .map_err(|err| format!("create cloud customer admin: {err}"))?;
2440    let customer_id = UserId::platform(customer_username.clone());
2441
2442    install_allow_all_policy(auth_store)?;
2443    for user_id in [&head_id, &customer_id] {
2444        auth_store
2445            .attach_policy(
2446                PrincipalRef::User(user_id.clone()),
2447                FIRST_ADMIN_ALLOW_ALL_POLICY,
2448            )
2449            .map_err(|err| format!("attach allow-all policy: {err}"))?;
2450    }
2451    install_cloud_guardrails(runtime, auth_store)?;
2452    auth_store
2453        .attach_policy(
2454            PrincipalRef::User(customer_id),
2455            CLOUD_PROTECT_MANAGED_POLICY,
2456        )
2457        .map_err(|err| format!("attach cloud guardrail policy: {err}"))?;
2458
2459    Ok(head_id.to_string())
2460}
2461
2462fn install_allow_all_policy(auth_store: &Arc<AuthStore>) -> Result<(), String> {
2463    use crate::auth::policies::Policy;
2464
2465    let policy = Policy::from_json_str(&format!(
2466        r#"{{
2467            "id": "{id}",
2468            "version": 1,
2469            "statements": [{{
2470                "effect": "allow",
2471                "actions": ["*"],
2472                "resources": ["*"]
2473            }}]
2474        }}"#,
2475        id = FIRST_ADMIN_ALLOW_ALL_POLICY
2476    ))
2477    .map_err(|err| format!("compile allow-all policy: {err}"))?;
2478    auth_store
2479        .put_policy(policy)
2480        .map_err(|err| format!("install allow-all policy: {err}"))
2481}
2482
2483fn install_cloud_guardrails(
2484    runtime: &RedDBRuntime,
2485    auth_store: &Arc<AuthStore>,
2486) -> Result<(), String> {
2487    use crate::auth::policies::Policy;
2488    use crate::auth::registry::EvidenceRequirement;
2489
2490    let policy = Policy::from_json_str(&format!(
2491        r#"{{
2492            "id": "{id}",
2493            "version": 1,
2494            "statements": [
2495                {{
2496                    "effect": "deny",
2497                    "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2498                    "resources": ["policy:{id}"]
2499                }},
2500                {{
2501                    "effect": "deny",
2502                    "actions": ["config:write"],
2503                    "resources": ["config:{cloud}.*"]
2504                }}
2505            ]
2506        }}"#,
2507        id = CLOUD_PROTECT_MANAGED_POLICY,
2508        cloud = CLOUD_CONFIG_NAMESPACE,
2509    ))
2510    .map_err(|err| format!("compile cloud guardrail policy: {err}"))?;
2511    auth_store
2512        .put_policy(policy)
2513        .map_err(|err| format!("install cloud guardrail policy: {err}"))?;
2514
2515    let now_ms = crate::utils::now_unix_millis() as u128;
2516    let entries = vec![
2517        regulated_registry_entry(
2518            CLOUD_PROTECT_MANAGED_POLICY,
2519            crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2520            "policy",
2521            "policy:*",
2522            &format!("policy:{CLOUD_PROTECT_MANAGED_POLICY}"),
2523            EvidenceRequirement::Metadata,
2524            now_ms,
2525        ),
2526        regulated_registry_entry(
2527            CLOUD_CONFIG_NAMESPACE,
2528            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2529            "config_namespace",
2530            "config:write",
2531            &format!("config:{CLOUD_CONFIG_NAMESPACE}.*"),
2532            EvidenceRequirement::Metadata,
2533            now_ms,
2534        ),
2535    ];
2536    for entry in entries.iter().cloned() {
2537        runtime
2538            .config_registry()
2539            .restore_bootstrap_entry(entry)
2540            .map_err(|err| format!("install cloud registry entry: {err}"))?;
2541    }
2542    crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2543    Ok(())
2544}
2545
2546fn apply_regulated_preset(
2547    runtime: &RedDBRuntime,
2548    auth_store: &Arc<AuthStore>,
2549) -> Result<(), String> {
2550    use crate::auth::policies::Policy;
2551    use crate::auth::registry::EvidenceRequirement;
2552    use crate::auth::store::{PrincipalRef, PUBLIC_IAM_GROUP};
2553
2554    runtime.query_audit().enable_infrastructure();
2555
2556    let policy = Policy::from_json_str(&format!(
2557        r#"{{
2558            "id": "{id}",
2559            "version": 1,
2560            "statements": [
2561                {{
2562                    "effect": "deny",
2563                    "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2564                    "resources": ["policy:{id}"]
2565                }},
2566                {{
2567                    "effect": "deny",
2568                    "actions": ["config:write"],
2569                    "resources": [
2570                        "config:{audit}.*",
2571                        "config:{evidence}.*",
2572                        "config:{query_audit}.*"
2573                    ]
2574                }}
2575            ]
2576        }}"#,
2577        id = REGULATED_PROTECT_MANAGED_POLICY,
2578        audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2579        evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2580        query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2581    ))
2582    .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2583    auth_store
2584        .put_policy(policy)
2585        .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2586    auth_store
2587        .attach_policy(
2588            PrincipalRef::Group(PUBLIC_IAM_GROUP.to_string()),
2589            REGULATED_PROTECT_MANAGED_POLICY,
2590        )
2591        .map_err(|err| format!("attach regulated guardrail policy: {err}"))?;
2592
2593    let now_ms = crate::utils::now_unix_millis() as u128;
2594    let entries = vec![
2595        regulated_registry_entry(
2596            REGULATED_PROTECT_MANAGED_POLICY,
2597            crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2598            "iam_policy",
2599            "policy:*",
2600            &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2601            EvidenceRequirement::Metadata,
2602            now_ms,
2603        ),
2604        regulated_registry_entry(
2605            REGULATED_AUDIT_CONFIG_NAMESPACE,
2606            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2607            "config_namespace",
2608            "config:write",
2609            &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2610            EvidenceRequirement::Metadata,
2611            now_ms,
2612        ),
2613        regulated_registry_entry(
2614            REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2615            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2616            "config_namespace",
2617            "config:write",
2618            &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2619            EvidenceRequirement::Metadata,
2620            now_ms,
2621        ),
2622        regulated_registry_entry(
2623            REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2624            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2625            "config_namespace",
2626            "config:write",
2627            &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2628            EvidenceRequirement::Metadata,
2629            now_ms,
2630        ),
2631    ];
2632
2633    for entry in entries.iter().cloned() {
2634        runtime
2635            .config_registry()
2636            .restore_bootstrap_entry(entry)
2637            .map_err(|err| format!("install regulated registry entry: {err}"))?;
2638    }
2639    crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2640    Ok(())
2641}
2642
2643fn regulated_registry_entry(
2644    id: &str,
2645    resource_type: &str,
2646    schema: &str,
2647    required_action: &str,
2648    required_resource: &str,
2649    evidence_requirement: crate::auth::registry::EvidenceRequirement,
2650    updated_at_ms: u128,
2651) -> crate::auth::registry::ConfigRegistryEntry {
2652    crate::auth::registry::ConfigRegistryEntry {
2653        id: id.to_string(),
2654        version: 1,
2655        resource_type: resource_type.to_string(),
2656        schema: schema.to_string(),
2657        mutability: crate::auth::registry::Mutability::Immutable,
2658        sensitivity: crate::auth::registry::Sensitivity::Internal,
2659        managed: true,
2660        required_action: required_action.to_string(),
2661        required_resource: required_resource.to_string(),
2662        evidence_requirement,
2663        updated_by: "system:regulated-preset".to_string(),
2664        updated_at_ms,
2665    }
2666}
2667
2668fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2669    let store = runtime.db().store();
2670    let mut tree = crate::serde_json::Map::new();
2671    tree.insert(
2672        BOOTSTRAP_COMPLETED_KEY.to_string(),
2673        crate::serde_json::Value::Bool(true),
2674    );
2675    tree.insert(
2676        BOOTSTRAP_PRESET_KEY.to_string(),
2677        crate::serde_json::Value::String(preset.to_string()),
2678    );
2679    if let Some(id) = first_admin_id {
2680        tree.insert(
2681            BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2682            crate::serde_json::Value::String(id.to_string()),
2683        );
2684    }
2685    let json = crate::serde_json::Value::Object(tree);
2686    store.set_config_tree("", &json);
2687}
2688
2689/// Read `red.logging.*` keys from the persistent config store and
2690/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
2691/// explicit CLI flag > red_config > built-in default.
2692///
2693/// The "was a flag passed" signal comes from the `*_explicit` bools
2694/// on `TelemetryConfig`, populated by the CLI parser. This replaces
2695/// an earlier equality-to-default heuristic that silently dropped
2696/// config whenever the CLI-derived default diverged from
2697/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
2698/// non-TTY `format`) and that silently overrode `--no-log-file`.
2699fn merge_telemetry_with_config(
2700    mut cli: crate::telemetry::TelemetryConfig,
2701    runtime: &RedDBRuntime,
2702) -> crate::telemetry::TelemetryConfig {
2703    use crate::storage::schema::Value;
2704
2705    let store = runtime.db().store();
2706
2707    if !cli.level_explicit {
2708        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2709            cli.level_filter = v.to_string();
2710        }
2711    }
2712    if !cli.format_explicit {
2713        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2714            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2715                cli.format = parsed;
2716            }
2717        }
2718    }
2719    if !cli.rotation_keep_days_explicit {
2720        match store.get_config("red.logging.keep_days") {
2721            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2722                cli.rotation_keep_days = n as u16
2723            }
2724            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2725                cli.rotation_keep_days = n as u16
2726            }
2727            Some(Value::Text(v)) => {
2728                if let Ok(n) = v.parse::<u16>() {
2729                    cli.rotation_keep_days = n;
2730                }
2731            }
2732            _ => {}
2733        }
2734    }
2735    if !cli.file_prefix_explicit {
2736        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2737            if !v.is_empty() {
2738                cli.file_prefix = v.to_string();
2739            }
2740        }
2741    }
2742    // --no-log-file is a kill-switch: config cannot resurrect the
2743    // file sink. Explicit --log-dir also wins.
2744    if !cli.log_dir_explicit && !cli.log_file_disabled {
2745        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2746            if !v.is_empty() {
2747                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2748            }
2749        }
2750    }
2751
2752    cli
2753}
2754
2755#[cfg(test)]
2756mod telemetry_merge_tests {
2757    use super::*;
2758    use crate::telemetry::{LogFormat, TelemetryConfig};
2759
2760    fn fresh_runtime() -> RedDBRuntime {
2761        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2762    }
2763
2764    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2765        runtime
2766            .db()
2767            .store()
2768            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2769    }
2770
2771    fn cli_base() -> TelemetryConfig {
2772        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
2773        // log_dir = Some(...), format = Json. Nothing marked explicit.
2774        TelemetryConfig {
2775            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2776            format: LogFormat::Json,
2777            ..Default::default()
2778        }
2779    }
2780
2781    #[test]
2782    fn config_log_dir_promoted_when_flag_absent() {
2783        let runtime = fresh_runtime();
2784        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2785        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2786        assert_eq!(
2787            merged.log_dir.as_deref(),
2788            Some(std::path::Path::new("/var/log/reddb"))
2789        );
2790    }
2791
2792    #[test]
2793    fn explicit_log_dir_wins_over_config() {
2794        let runtime = fresh_runtime();
2795        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2796        let mut cli = cli_base();
2797        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2798        cli.log_dir_explicit = true;
2799        let merged = merge_telemetry_with_config(cli, &runtime);
2800        assert_eq!(
2801            merged.log_dir.as_deref(),
2802            Some(std::path::Path::new("/custom/dir"))
2803        );
2804    }
2805
2806    #[test]
2807    fn no_log_file_beats_config_log_dir() {
2808        let runtime = fresh_runtime();
2809        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2810        let mut cli = cli_base();
2811        cli.log_dir = None;
2812        cli.log_file_disabled = true;
2813        let merged = merge_telemetry_with_config(cli, &runtime);
2814        assert!(
2815            merged.log_dir.is_none(),
2816            "--no-log-file must veto config dir"
2817        );
2818    }
2819
2820    #[test]
2821    fn config_format_promoted_on_non_tty_default() {
2822        // On non-TTY, default_telemetry_for_path yields format=Json even
2823        // though TelemetryConfig::default() is Pretty. The old equality
2824        // check silently dropped config here.
2825        let runtime = fresh_runtime();
2826        set_str(&runtime, "red.logging.format", "pretty");
2827        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2828        assert_eq!(merged.format, LogFormat::Pretty);
2829    }
2830
2831    #[test]
2832    fn explicit_format_wins_over_config() {
2833        let runtime = fresh_runtime();
2834        set_str(&runtime, "red.logging.format", "pretty");
2835        let mut cli = cli_base();
2836        cli.format = LogFormat::Json;
2837        cli.format_explicit = true;
2838        let merged = merge_telemetry_with_config(cli, &runtime);
2839        assert_eq!(merged.format, LogFormat::Json);
2840    }
2841}
2842
2843#[inline(never)]
2844fn build_http_server(
2845    runtime: RedDBRuntime,
2846    auth_store: Arc<AuthStore>,
2847    bind_addr: String,
2848) -> RedDBServer {
2849    build_http_server_with_transport_readiness(
2850        runtime,
2851        auth_store,
2852        bind_addr,
2853        TransportReadiness::default(),
2854    )
2855}
2856
2857/// Apply the resolved HTTP limits to a freshly-built `RedDBServer`.
2858///
2859/// Centralised here so every `run_*` path goes through the same
2860/// resolver and the structured startup log line carries the same
2861/// `http_limits.*` fields regardless of transport combination.
2862fn apply_http_limits(
2863    server: RedDBServer,
2864    config: &ServerCommandConfig,
2865    runtime: &RedDBRuntime,
2866) -> RedDBServer {
2867    let store = runtime.db().store();
2868    let resolved =
2869        crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2870            .get_config(key)
2871        {
2872            Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2873            Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2874            Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2875            _ => None,
2876        });
2877    tracing::info!(
2878        target: "reddb::http_limits",
2879        max_handlers = resolved.max_handlers,
2880        handler_timeout_ms = resolved.handler_timeout_ms,
2881        retry_after_secs = resolved.retry_after_secs,
2882        max_inflight_per_principal = resolved.max_inflight_per_principal,
2883        "http_limits resolved"
2884    );
2885    server.with_http_limits(resolved)
2886}
2887
2888/// `red server --ui` (issue #1047, ADR 0051): attach the resolved red-ui
2889/// bundle directory to an HTTP server so it also serves the bundle as
2890/// static assets. No-op when `--ui` is off.
2891///
2892/// An explicit `--ui-dir <DIR>` is served as-is (no download); otherwise
2893/// the pinned bundle is resolved from the local cache, downloading on
2894/// first use (ADR 0050). A resolution failure is fatal: the operator
2895/// explicitly asked for the UI, so silently serving API-only would be
2896/// surprising and would mask an offline/checksum problem.
2897fn apply_ui_bundle(
2898    server: RedDBServer,
2899    config: &ServerCommandConfig,
2900) -> Result<RedDBServer, String> {
2901    if !config.ui {
2902        return Ok(server);
2903    }
2904    let ui_dir = match &config.ui_dir {
2905        Some(dir) => dir.clone(),
2906        None => {
2907            let cache_root = crate::server::ui_bundle_resolver::reddb_user_cache_root()
2908                .unwrap_or_else(|_| std::env::temp_dir().join("reddb"));
2909            crate::server::ui_bundle_resolver::resolve_ui_bundle(
2910                &cache_root,
2911                &crate::server::ui_bundle_resolver::HttpFetcher,
2912            )
2913            .map_err(|err| format!("resolve red-ui bundle for --ui: {err}"))?
2914        }
2915    };
2916    tracing::info!(target: "reddb::ui", dir = %ui_dir.display(), "serving red-ui bundle on HTTP surface");
2917    Ok(server.with_ui_dir(ui_dir))
2918}
2919
2920#[inline(never)]
2921fn build_http_server_with_transport_readiness(
2922    runtime: RedDBRuntime,
2923    auth_store: Arc<AuthStore>,
2924    bind_addr: String,
2925    transport_readiness: TransportReadiness,
2926) -> RedDBServer {
2927    RedDBServer::with_options(
2928        runtime,
2929        ServerOptions {
2930            bind_addr,
2931            transport_readiness,
2932            ..ServerOptions::default()
2933        },
2934    )
2935    .with_auth(auth_store)
2936}
2937
2938/// PLAN.md Phase 6.2 — build a listener that only serves
2939/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
2940/// when the env var has no host (loopback-only by default per spec).
2941#[inline(never)]
2942fn build_admin_only_server(
2943    runtime: RedDBRuntime,
2944    auth_store: Arc<AuthStore>,
2945    bind_addr: String,
2946) -> RedDBServer {
2947    RedDBServer::with_options(
2948        runtime,
2949        ServerOptions {
2950            bind_addr,
2951            surface: crate::server::ServerSurface::AdminOnly,
2952            ..ServerOptions::default()
2953        },
2954    )
2955    .with_auth(auth_store)
2956}
2957
2958/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
2959/// + `/health/*`. Suitable for Prometheus scrape ports that may be
2960///   exposed wider than the admin port.
2961#[inline(never)]
2962fn build_metrics_only_server(
2963    runtime: RedDBRuntime,
2964    auth_store: Arc<AuthStore>,
2965    bind_addr: String,
2966) -> RedDBServer {
2967    RedDBServer::with_options(
2968        runtime,
2969        ServerOptions {
2970            bind_addr,
2971            surface: crate::server::ServerSurface::MetricsOnly,
2972            ..ServerOptions::default()
2973        },
2974    )
2975    .with_auth(auth_store)
2976}
2977
2978/// Spawn dedicated admin / metrics listeners when the operator set
2979/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
2980/// unset the existing listener keeps serving everything (back-compat).
2981fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2982    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2983        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2984        let _ = server.serve_in_background();
2985        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2986    }
2987    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2988        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2989        let _ = server.serve_in_background();
2990        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2991    }
2992}
2993
2994#[inline(never)]
2995fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2996    let mut transport_readiness = TransportReadiness::default();
2997    let Some(listener) = bind_listener_for_startup(
2998        &mut transport_readiness,
2999        "http",
3000        &bind_addr,
3001        config.http_bind_explicit,
3002    )?
3003    else {
3004        return Err(format!(
3005            "no HTTP listener started; implicit bind {} failed",
3006            bind_addr
3007        ));
3008    };
3009    let db_options = config.to_db_options()?;
3010    let (runtime, auth_store, _telemetry_guard) =
3011        build_runtime_and_auth_store(&config, db_options.clone())?;
3012    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3013    spawn_admin_metrics_listeners(&runtime, &auth_store);
3014    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
3015    let server = build_http_server_with_transport_readiness(
3016        runtime.clone(),
3017        auth_store,
3018        bind_addr.clone(),
3019        transport_readiness,
3020    );
3021    let server = apply_http_limits(server, &config, &runtime);
3022    let server = apply_ui_bundle(server, &config)?;
3023    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
3024    server.serve_on(listener).map_err(|err| err.to_string())
3025}
3026
3027/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
3028/// rustls-terminated listener alongside the plain HTTP server. Cert
3029/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
3030///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
3031///   is auto-generated next to the data directory (refused otherwise).
3032fn spawn_http_tls_listener(
3033    config: &ServerCommandConfig,
3034    runtime: &RedDBRuntime,
3035    auth_store: &Arc<AuthStore>,
3036) -> Result<(), String> {
3037    let Some(addr) = config.http_tls_bind_addr.clone() else {
3038        return Ok(());
3039    };
3040
3041    let tls_config = resolve_http_tls_config(config)?;
3042    let server_config = crate::server::tls::build_server_config(&tls_config)
3043        .map_err(|err| format!("HTTP TLS: {err}"))?;
3044
3045    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
3046    let server = apply_http_limits(server, config, runtime);
3047    let _handle = server.serve_tls_in_background(server_config);
3048    tracing::info!(
3049        transport = "https",
3050        bind = %addr,
3051        mtls = %tls_config.client_ca_path.is_some(),
3052        "TLS listener online"
3053    );
3054    Ok(())
3055}
3056
3057/// Resolve the HTTP TLS config from CLI / env / dev defaults.
3058fn resolve_http_tls_config(
3059    config: &ServerCommandConfig,
3060) -> Result<crate::server::tls::HttpTlsConfig, String> {
3061    match (&config.http_tls_cert, &config.http_tls_key) {
3062        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
3063            cert_path: cert.clone(),
3064            key_path: key.clone(),
3065            client_ca_path: config.http_tls_client_ca.clone(),
3066        }),
3067        (None, None) => {
3068            // Dev-mode auto-generate next to the data directory.
3069            let dir = config
3070                .path
3071                .as_ref()
3072                .and_then(|p| p.parent().map(std::path::PathBuf::from))
3073                .unwrap_or_else(|| std::path::PathBuf::from("."));
3074            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
3075                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
3076            Ok(crate::server::tls::HttpTlsConfig {
3077                cert_path: auto.cert_path,
3078                key_path: auto.key_path,
3079                client_ca_path: config.http_tls_client_ca.clone(),
3080            })
3081        }
3082        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
3083    }
3084}
3085
3086#[inline(never)]
3087fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
3088    let workers = config.workers;
3089    let db_options = config.to_db_options()?;
3090    let rt_config = detect_runtime_config();
3091    let mut transport_readiness = TransportReadiness::default();
3092    let Some(grpc_listener) = bind_listener_for_startup(
3093        &mut transport_readiness,
3094        "grpc",
3095        &bind_addr,
3096        config.grpc_bind_explicit,
3097    )?
3098    else {
3099        return Err(format!(
3100            "no gRPC listener started; implicit bind {} failed",
3101            bind_addr
3102        ));
3103    };
3104
3105    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
3106
3107    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
3108        .enable_all()
3109        .worker_threads(worker_threads)
3110        .thread_stack_size(rt_config.stack_size)
3111        .build()
3112        .map_err(|err| format!("tokio runtime: {err}"))?;
3113
3114    // Guard lives on the outer stack so it outlives the tokio runtime.
3115    let (runtime, auth_store, _telemetry_guard) =
3116        build_runtime_and_auth_store(&config, db_options.clone())?;
3117    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3118    let signal_runtime = runtime.clone();
3119    tokio_runtime.block_on(async move {
3120        spawn_lifecycle_signal_handler(signal_runtime).await;
3121        // Start wire protocol listeners (plaintext + TLS)
3122        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
3123
3124        // Start PostgreSQL wire listener when --pg-bind is configured.
3125        spawn_pg_listener(&config, &runtime);
3126
3127        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
3128        // it spawns a separate listener so plaintext + TLS can run
3129        // side-by-side (55055 plain + 55555 TLS, etc.).
3130        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
3131
3132        let server = RedDBGrpcServer::with_options(
3133            runtime,
3134            GrpcServerOptions {
3135                bind_addr: bind_addr.clone(),
3136                tls: None,
3137            },
3138            auth_store,
3139        );
3140
3141        tracing::info!(
3142            transport = "grpc",
3143            bind = %bind_addr,
3144            cpus = rt_config.available_cpus,
3145            workers = worker_threads,
3146            "listener online"
3147        );
3148        server
3149            .serve_on(grpc_listener)
3150            .await
3151            .map_err(|err| err.to_string())
3152    })
3153}
3154
3155#[inline(never)]
3156fn run_dual_server(
3157    config: ServerCommandConfig,
3158    grpc_bind_addr: String,
3159    http_bind_addr: String,
3160) -> Result<(), String> {
3161    let workers = config.workers;
3162    let db_options = config.to_db_options()?;
3163    let rt_config = detect_runtime_config();
3164    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
3165    let mut transport_readiness = TransportReadiness::default();
3166    let http_listener = bind_listener_for_startup(
3167        &mut transport_readiness,
3168        "http",
3169        &http_bind_addr,
3170        config.http_bind_explicit,
3171    )?;
3172    let grpc_listener = bind_listener_for_startup(
3173        &mut transport_readiness,
3174        "grpc",
3175        &grpc_bind_addr,
3176        config.grpc_bind_explicit,
3177    )?;
3178    if http_listener.is_none() && grpc_listener.is_none() {
3179        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
3180    }
3181    let (runtime, auth_store, _telemetry_guard) =
3182        build_runtime_and_auth_store(&config, db_options.clone())?;
3183    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
3184
3185    spawn_admin_metrics_listeners(&runtime, &auth_store);
3186    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
3187
3188    let http_handle = if let Some(listener) = http_listener {
3189        let http_server = build_http_server_with_transport_readiness(
3190            runtime.clone(),
3191            auth_store.clone(),
3192            http_bind_addr.clone(),
3193            transport_readiness.clone(),
3194        );
3195        let http_server = apply_http_limits(http_server, &config, &runtime);
3196        let http_server = apply_ui_bundle(http_server, &config)?;
3197        Some(http_server.serve_in_background_on(listener))
3198    } else {
3199        None
3200    };
3201
3202    thread::sleep(Duration::from_millis(150));
3203    if let Some(handle) = http_handle.as_ref() {
3204        if handle.is_finished() {
3205            let handle = http_handle.unwrap();
3206            return match handle.join() {
3207                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
3208                Ok(Err(err)) => Err(err.to_string()),
3209                Err(_) => Err("HTTP server thread panicked".to_string()),
3210            };
3211        }
3212    }
3213    if grpc_listener.is_none() {
3214        let Some(handle) = http_handle else {
3215            return Err("no listener started".to_string());
3216        };
3217        return match handle.join() {
3218            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
3219            Ok(Err(err)) => Err(err.to_string()),
3220            Err(_) => Err("HTTP server thread panicked".to_string()),
3221        };
3222    }
3223    let grpc_listener = grpc_listener.expect("checked above");
3224
3225    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
3226        .enable_all()
3227        .worker_threads(worker_threads)
3228        .thread_stack_size(rt_config.stack_size)
3229        .build()
3230        .map_err(|err| format!("tokio runtime: {err}"))?;
3231
3232    let signal_runtime = runtime.clone();
3233    tokio_runtime.block_on(async move {
3234        spawn_lifecycle_signal_handler(signal_runtime).await;
3235        // Start wire protocol listeners (plaintext + TLS)
3236        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
3237
3238        // Start PostgreSQL wire listener when --pg-bind is configured.
3239        spawn_pg_listener(&config, &runtime);
3240
3241        // Optional TLS gRPC listener — runs alongside the plaintext one.
3242        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
3243
3244        let server = RedDBGrpcServer::with_options(
3245            runtime,
3246            GrpcServerOptions {
3247                bind_addr: grpc_bind_addr.clone(),
3248                tls: None,
3249            },
3250            auth_store,
3251        );
3252
3253        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
3254        tracing::info!(
3255            transport = "grpc",
3256            bind = %grpc_bind_addr,
3257            cpus = rt_config.available_cpus,
3258            workers = worker_threads,
3259            "listener online"
3260        );
3261        server
3262            .serve_on(grpc_listener)
3263            .await
3264            .map_err(|err| err.to_string())
3265    })
3266}
3267
3268#[cfg(test)]
3269mod tests {
3270    use super::*;
3271
3272    #[test]
3273    fn render_systemd_unit_contains_expected_execstart() {
3274        let config = SystemdServiceConfig {
3275            service_name: "reddb".to_string(),
3276            binary_path: PathBuf::from("/usr/local/bin/red"),
3277            run_user: "reddb".to_string(),
3278            run_group: "reddb".to_string(),
3279            data_path: reddb_file::default_service_database_path(),
3280            router_bind_addr: None,
3281            grpc_bind_addr: Some("0.0.0.0:55055".to_string()),
3282            http_bind_addr: None,
3283        };
3284
3285        let unit = render_systemd_unit(&config);
3286        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:55055"));
3287        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
3288    }
3289
3290    #[test]
3291    fn systemd_service_config_derives_paths() {
3292        let config = SystemdServiceConfig {
3293            service_name: "reddb-api".to_string(),
3294            binary_path: PathBuf::from("/usr/local/bin/red"),
3295            run_user: "reddb".to_string(),
3296            run_group: "reddb".to_string(),
3297            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
3298            router_bind_addr: None,
3299            grpc_bind_addr: None,
3300            http_bind_addr: Some("127.0.0.1:5000".to_string()),
3301        };
3302
3303        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
3304        assert_eq!(
3305            config.unit_path(),
3306            PathBuf::from("/etc/systemd/system/reddb-api.service")
3307        );
3308    }
3309
3310    #[test]
3311    fn render_systemd_unit_supports_dual_transport() {
3312        let config = SystemdServiceConfig {
3313            service_name: "reddb".to_string(),
3314            binary_path: PathBuf::from("/usr/local/bin/red"),
3315            run_user: "reddb".to_string(),
3316            run_group: "reddb".to_string(),
3317            data_path: reddb_file::default_service_database_path(),
3318            router_bind_addr: None,
3319            grpc_bind_addr: Some("0.0.0.0:55055".to_string()),
3320            http_bind_addr: Some("0.0.0.0:5000".to_string()),
3321        };
3322
3323        let unit = render_systemd_unit(&config);
3324        assert!(unit.contains("--grpc-bind 0.0.0.0:55055"));
3325        assert!(unit.contains("--http-bind 0.0.0.0:5000"));
3326    }
3327
3328    #[test]
3329    fn render_systemd_unit_supports_router_mode() {
3330        let config = SystemdServiceConfig {
3331            service_name: "reddb".to_string(),
3332            binary_path: PathBuf::from("/usr/local/bin/red"),
3333            run_user: "reddb".to_string(),
3334            run_group: "reddb".to_string(),
3335            data_path: reddb_file::default_service_database_path(),
3336            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3337            grpc_bind_addr: None,
3338            http_bind_addr: None,
3339        };
3340
3341        let unit = render_systemd_unit(&config);
3342        assert!(unit.contains("--bind 127.0.0.1:5050"));
3343        assert!(!unit.contains("--grpc-bind"));
3344        assert!(!unit.contains("--http-bind"));
3345    }
3346
3347    #[test]
3348    fn explicit_bind_collision_is_fatal() {
3349        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3350        let addr = held.local_addr().expect("held addr").to_string();
3351        let mut readiness = TransportReadiness::default();
3352
3353        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
3354
3355        assert!(error.contains("explicit http listener bind"));
3356        assert_eq!(readiness.active.len(), 0);
3357        assert_eq!(readiness.failed.len(), 1);
3358        assert!(readiness.failed[0].explicit);
3359        assert_eq!(readiness.failed[0].bind_addr, addr);
3360    }
3361
3362    // ---------- Issue #663 — `--no-auth` / `--dev` ----------
3363
3364    // Env access in tests is process-global; serialise the two
3365    // `--no-auth` tests so the REDDB_USERNAME / REDDB_PASSWORD pair
3366    // one of them sets cannot leak into the other under cargo's
3367    // default parallel runner.
3368    fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
3369        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
3370        LOCK.get_or_init(|| std::sync::Mutex::new(()))
3371    }
3372
3373    fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
3374        ServerCommandConfig {
3375            path: None,
3376            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3377            router_bind_explicit: false,
3378            grpc_bind_addr: None,
3379            grpc_bind_explicit: false,
3380            grpc_tls_bind_addr: None,
3381            grpc_tls_cert: None,
3382            grpc_tls_key: None,
3383            grpc_tls_client_ca: None,
3384            http_bind_addr: None,
3385            http_bind_explicit: false,
3386            http_tls_bind_addr: None,
3387            http_tls_cert: None,
3388            http_tls_key: None,
3389            http_tls_client_ca: None,
3390            wire_bind_addr: None,
3391            wire_bind_explicit: false,
3392            wire_tls_bind_addr: None,
3393            wire_tls_cert: None,
3394            wire_tls_key: None,
3395            pg_bind_addr: None,
3396            create_if_missing: true,
3397            read_only: false,
3398            role: "standalone".to_string(),
3399            primary_addr: None,
3400            storage_profile: StorageProfileSelection::embedded_single_file(),
3401            auth: false,
3402            require_auth: false,
3403            // Operator-set `--vault`: `--no-auth` must override this
3404            // alongside REDDB_USERNAME/PASSWORD.
3405            vault: true,
3406            no_auth,
3407            workers: None,
3408            telemetry: None,
3409            http_limits_cli: crate::server::HttpLimitsCliInput::default(),
3410            ui: false,
3411            ui_dir: None,
3412            bootstrap: BootstrapConfig::default(),
3413        }
3414    }
3415
3416    #[test]
3417    fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
3418        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3419        // Pre-existing env that *would* turn auth on if `--no-auth`
3420        // weren't the last word. The acceptance criterion is that
3421        // the flag wins over env.
3422        // SAFETY: serialised by `no_auth_env_lock` above.
3423        unsafe {
3424            std::env::set_var("REDDB_USERNAME", "admin");
3425            std::env::set_var("REDDB_PASSWORD", "hunter2");
3426        }
3427        let config = no_auth_test_config(true);
3428        let options = config.to_db_options().expect("to_db_options");
3429
3430        assert!(no_auth_active(&options), "metadata should be stamped");
3431        assert!(!options.auth.enabled, "auth.enabled must be forced off");
3432        assert!(
3433            !options.auth.require_auth,
3434            "require_auth must be forced off"
3435        );
3436        assert!(
3437            !options.auth.vault_enabled,
3438            "vault_enabled must be forced off (overrides --vault)"
3439        );
3440        assert_eq!(
3441            options.metadata.get(NO_AUTH_META).map(String::as_str),
3442            Some("true"),
3443        );
3444
3445        // SAFETY: serialised by `no_auth_env_lock` above.
3446        unsafe {
3447            std::env::remove_var("REDDB_USERNAME");
3448            std::env::remove_var("REDDB_PASSWORD");
3449        }
3450    }
3451
3452    #[test]
3453    fn default_behaviour_without_no_auth_flag_is_unchanged() {
3454        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3455        let config = no_auth_test_config(false);
3456        let options = config.to_db_options().expect("to_db_options");
3457
3458        assert!(
3459            !no_auth_active(&options),
3460            "default boot must not be marked no-auth"
3461        );
3462        assert!(
3463            options.metadata.get(NO_AUTH_META).is_none(),
3464            "metadata key must be absent when flag is off"
3465        );
3466        // `--vault` should still take effect when `--no-auth` is not set.
3467        assert!(options.auth.vault_enabled);
3468    }
3469
3470    #[test]
3471    fn no_auth_active_blocks_bootstrap_from_env() {
3472        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3473        // SAFETY: serialised by `no_auth_env_lock` above. The pair
3474        // would normally cause `AuthStore::bootstrap_from_env` to
3475        // create an admin; the boot pipeline must suppress that call
3476        // whenever `no_auth_active` is true.
3477        unsafe {
3478            std::env::set_var("REDDB_USERNAME", "admin");
3479            std::env::set_var("REDDB_PASSWORD", "hunter2");
3480        }
3481
3482        let options = no_auth_test_config(true)
3483            .to_db_options()
3484            .expect("to_db_options");
3485
3486        // Mirror the exact branch in `build_runtime_with_telemetry`:
3487        // build a non-vault AuthStore from `options.auth`, then call
3488        // `bootstrap_from_env` *only* when the no-auth gate is off.
3489        let auth_store = AuthStore::new(options.auth.clone());
3490        if !no_auth_active(&options) {
3491            auth_store.bootstrap_from_env();
3492        }
3493
3494        assert!(
3495            auth_store.needs_bootstrap(),
3496            "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3497        );
3498
3499        // SAFETY: serialised by `no_auth_env_lock` above.
3500        unsafe {
3501            std::env::remove_var("REDDB_USERNAME");
3502            std::env::remove_var("REDDB_PASSWORD");
3503        }
3504    }
3505
3506    // ---------- Issue #650 — bootstrap presets ----------
3507
3508    // Preset tests mutate process-global env (`REDDB_PRESET`,
3509    // `REDDB_USERNAME`, `REDDB_PASSWORD`) and the global tracing
3510    // subscriber. Share the no_auth lock so they don't race with each
3511    // other or with the --no-auth tests above.
3512    fn clear_preset_env() {
3513        // SAFETY: callers hold `no_auth_env_lock()`.
3514        unsafe {
3515            std::env::remove_var(BOOTSTRAP_PRESET_ENV);
3516            std::env::remove_var(PRESET_ENV);
3517            std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3518            std::env::remove_var("REDDB_AUTH");
3519            std::env::remove_var("REDDB_REQUIRE_AUTH");
3520            std::env::remove_var("REDDB_NO_AUTH");
3521            std::env::remove_var("REDDB_DEV");
3522            std::env::remove_var("REDDB_VAULT");
3523            std::env::remove_var("REDDB_USERNAME");
3524            std::env::remove_var("REDDB_PASSWORD");
3525            std::env::remove_var("REDDB_USERNAME_FILE");
3526            std::env::remove_var("REDDB_PASSWORD_FILE");
3527            std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN");
3528            std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD");
3529            std::env::remove_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD_FILE");
3530            std::env::remove_var("REDDB_CUSTOMER_ADMIN");
3531            std::env::remove_var("REDDB_CUSTOMER_ADMIN_PASSWORD");
3532            std::env::remove_var("REDDB_CUSTOMER_ADMIN_PASSWORD_FILE");
3533        }
3534    }
3535
3536    fn clear_backup_env() {
3537        // SAFETY: callers hold `no_auth_env_lock()`.
3538        unsafe {
3539            std::env::remove_var("REDDB_BACKUP_S3_ENDPOINT");
3540            std::env::remove_var("REDDB_BACKUP_S3_BUCKET");
3541            std::env::remove_var("REDDB_BACKUP_S3_PREFIX");
3542            std::env::remove_var("REDDB_BACKUP_S3_ACCESS_KEY_ID");
3543            std::env::remove_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY");
3544            std::env::remove_var("REDDB_BACKUP_S3_REGION");
3545            std::env::remove_var("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS");
3546            std::env::remove_var("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS");
3547            std::env::remove_var("REDDB_BACKUP_PAUSE_ON_LAG_SECS");
3548        }
3549    }
3550
3551    fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3552        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3553        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3554        (runtime, auth_store)
3555    }
3556
3557    #[test]
3558    fn auth_env_knobs_enable_auth_require_auth_and_vault() {
3559        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3560        clear_preset_env();
3561        // SAFETY: env serialised by `no_auth_env_lock`.
3562        unsafe {
3563            std::env::set_var("REDDB_AUTH", "true");
3564            std::env::set_var("REDDB_REQUIRE_AUTH", "true");
3565            std::env::set_var("REDDB_VAULT", "true");
3566        }
3567
3568        let mut config = no_auth_test_config(false);
3569        config.vault = false;
3570        let options = config.to_db_options().expect("to_db_options");
3571
3572        assert!(options.auth.enabled);
3573        assert!(options.auth.require_auth);
3574        assert!(options.auth.vault_enabled);
3575
3576        clear_preset_env();
3577    }
3578
3579    #[test]
3580    fn production_and_cloud_presets_force_auth_require_auth_and_vault() {
3581        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3582        clear_preset_env();
3583
3584        for preset in [PRESET_PRODUCTION, PRESET_CLOUD] {
3585            let mut config = no_auth_test_config(false);
3586            config.vault = false;
3587            config.bootstrap.preset = Some(preset.to_string());
3588
3589            let options = config.to_db_options().expect("to_db_options");
3590            assert!(options.auth.enabled, "{preset} should enable auth");
3591            assert!(
3592                options.auth.require_auth,
3593                "{preset} should require authenticated requests"
3594            );
3595            assert!(options.auth.vault_enabled, "{preset} should enable vault");
3596            assert!(!no_auth_active(&options));
3597        }
3598
3599        clear_preset_env();
3600    }
3601
3602    #[test]
3603    fn no_auth_env_overrides_preset_forced_auth() {
3604        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3605        clear_preset_env();
3606        // SAFETY: env serialised by `no_auth_env_lock`.
3607        unsafe {
3608            std::env::set_var("REDDB_NO_AUTH", "true");
3609            std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_CLOUD);
3610        }
3611
3612        let mut config = no_auth_test_config(false);
3613        config.auth = true;
3614        config.require_auth = true;
3615        let options = config.to_db_options().expect("to_db_options");
3616
3617        assert!(no_auth_active(&options));
3618        assert!(!options.auth.enabled);
3619        assert!(!options.auth.require_auth);
3620        assert!(!options.auth.vault_enabled);
3621
3622        clear_preset_env();
3623    }
3624
3625    #[test]
3626    fn simple_preset_is_default_and_persists_state() {
3627        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3628        clear_preset_env();
3629
3630        let (runtime, auth_store) = fresh_runtime_and_store();
3631        apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3632
3633        // No admin was created — `simple` is anonymous-friendly.
3634        assert!(
3635            auth_store.needs_bootstrap(),
3636            "simple preset must not create an admin"
3637        );
3638
3639        // Bootstrap state persisted so the next boot is a no-op.
3640        let store = runtime.db().store();
3641        let completed = store
3642            .get_config(BOOTSTRAP_COMPLETED_KEY)
3643            .expect("completed key persisted");
3644        assert!(matches!(
3645            completed,
3646            crate::storage::schema::Value::Boolean(true)
3647        ));
3648        let preset = store
3649            .get_config(BOOTSTRAP_PRESET_KEY)
3650            .expect("preset key persisted");
3651        match preset {
3652            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3653            other => panic!("expected Text(simple), got {other:?}"),
3654        }
3655        assert!(
3656            store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3657            "simple preset must not record a first admin"
3658        );
3659
3660        clear_preset_env();
3661    }
3662
3663    #[test]
3664    fn production_preset_creates_first_admin_with_allow_all_policy() {
3665        use crate::auth::policies::{EvalContext, ResourceRef};
3666        use crate::auth::UserId;
3667
3668        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3669        clear_preset_env();
3670        // SAFETY: env serialised by `no_auth_env_lock`.
3671        unsafe {
3672            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3673            std::env::set_var("REDDB_USERNAME", "ops");
3674            std::env::set_var("REDDB_PASSWORD", "hunter2");
3675        }
3676
3677        let (runtime, auth_store) = fresh_runtime_and_store();
3678        apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3679
3680        // Admin exists and the auth store is sealed.
3681        assert!(
3682            !auth_store.needs_bootstrap(),
3683            "production preset must seal bootstrap"
3684        );
3685        let users = auth_store.list_users();
3686        assert_eq!(users.len(), 1);
3687        let admin = &users[0];
3688        assert_eq!(admin.username, "ops");
3689        assert!(
3690            admin.tenant_id.is_none(),
3691            "first admin must be platform-scoped (tenant=None)"
3692        );
3693
3694        // Allow-all policy was installed and attached to the first admin.
3695        let policy = auth_store
3696            .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3697            .expect("allow-all policy installed");
3698        assert!(!policy.statements.is_empty());
3699
3700        // Verify policy-derived authority via the policy evaluator —
3701        // not a bypass. Any action on any resource must Allow.
3702        let actor = UserId::platform("ops");
3703        let ctx = EvalContext {
3704            principal_tenant: None,
3705            current_tenant: None,
3706            peer_ip: None,
3707            mfa_present: false,
3708            now_ms: 1_700_000_000_000,
3709            principal_is_admin_role: true,
3710            principal_is_platform_scoped: true,
3711        };
3712        let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3713        assert!(
3714            auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3715            "allow-all policy must grant arbitrary actions via the evaluator"
3716        );
3717
3718        // Persisted state records the first admin id.
3719        let store = runtime.db().store();
3720        match store
3721            .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3722            .expect("first_admin_id persisted")
3723        {
3724            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3725            other => panic!("expected Text(ops), got {other:?}"),
3726        }
3727        match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3728            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3729            other => panic!("expected Text(production), got {other:?}"),
3730        }
3731
3732        clear_preset_env();
3733    }
3734
3735    #[test]
3736    fn bootstrap_preset_env_takes_precedence_over_legacy_preset_env() {
3737        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3738        clear_preset_env();
3739        // SAFETY: env serialised by `no_auth_env_lock`.
3740        unsafe {
3741            std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_REGULATED);
3742            std::env::set_var(PRESET_ENV, PRESET_SIMPLE);
3743        }
3744
3745        let options = no_auth_test_config(false)
3746            .to_db_options()
3747            .expect("regulated options");
3748        assert!(
3749            options.control_events.compliance_mode,
3750            "canonical REDDB_BOOTSTRAP_PRESET should win over REDDB_PRESET"
3751        );
3752        assert!(options.query_audit.enabled);
3753
3754        clear_preset_env();
3755    }
3756
3757    #[test]
3758    fn cloud_preset_creates_system_head_and_customer_admins() {
3759        use crate::auth::policies::{EvalContext, ResourceRef};
3760        use crate::auth::UserId;
3761
3762        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3763        clear_preset_env();
3764        // SAFETY: env serialised by `no_auth_env_lock`.
3765        unsafe {
3766            std::env::set_var(BOOTSTRAP_PRESET_ENV, PRESET_CLOUD);
3767            std::env::set_var("REDDB_CLOUD_HEAD_ADMIN", "head");
3768            std::env::set_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD", "head-pass");
3769            std::env::set_var("REDDB_CUSTOMER_ADMIN", "customer");
3770            std::env::set_var("REDDB_CUSTOMER_ADMIN_PASSWORD", "customer-pass");
3771        }
3772
3773        let (runtime, auth_store) = fresh_runtime_and_store();
3774        apply_preset(&runtime, &auth_store).expect("cloud preset applies cleanly");
3775
3776        let head = auth_store
3777            .get_user(None, "head")
3778            .expect("head admin should exist");
3779        assert_eq!(head.tenant_id, None);
3780        let customer = auth_store
3781            .get_user(None, "customer")
3782            .expect("customer admin should exist");
3783        assert_eq!(customer.tenant_id, None);
3784
3785        let ctx = EvalContext {
3786            principal_tenant: None,
3787            current_tenant: None,
3788            peer_ip: None,
3789            mfa_present: false,
3790            now_ms: 1_700_000_000_000,
3791            principal_is_admin_role: true,
3792            principal_is_platform_scoped: true,
3793        };
3794        assert!(auth_store.check_policy_authz(
3795            &UserId::platform("customer"),
3796            "config:write",
3797            &ResourceRef::new("config", "red.config.customer.enabled"),
3798            &ctx,
3799        ));
3800        assert!(
3801            auth_store.check_policy_authz(
3802                &UserId::platform("head"),
3803                "config:write",
3804                &ResourceRef::new("config", "red.config.cloud.enabled"),
3805                &ctx,
3806            ),
3807            "cloud head keeps allow-all authority over managed cloud config"
3808        );
3809        assert!(
3810            !auth_store.check_policy_authz(
3811                &UserId::platform("customer"),
3812                "config:write",
3813                &ResourceRef::new("config", "red.config.cloud.enabled"),
3814                &ctx,
3815            ),
3816            "cloud customer must be denied managed cloud config writes"
3817        );
3818        assert!(
3819            !auth_store.check_policy_authz(
3820                &UserId::platform("customer"),
3821                "policy:drop",
3822                &ResourceRef::new("policy", CLOUD_PROTECT_MANAGED_POLICY),
3823                &ctx,
3824            ),
3825            "cloud customer must be denied mutations of the managed guardrail policy"
3826        );
3827
3828        assert!(auth_store.get_user(None, "head").is_some());
3829        assert!(auth_store
3830            .get_policy(CLOUD_PROTECT_MANAGED_POLICY)
3831            .is_some());
3832        assert!(runtime
3833            .config_registry()
3834            .get_active(CLOUD_CONFIG_NAMESPACE)
3835            .is_some());
3836
3837        let store = runtime.db().store();
3838        match store
3839            .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3840            .expect("head admin id persisted")
3841        {
3842            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "head"),
3843            other => panic!("expected Text(head), got {other:?}"),
3844        }
3845        match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3846            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_CLOUD),
3847            other => panic!("expected Text(cloud), got {other:?}"),
3848        }
3849
3850        clear_preset_env();
3851    }
3852
3853    #[test]
3854    fn cloud_preset_cli_admin_alias_wins_over_cloud_env_password() {
3855        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3856        clear_preset_env();
3857        // SAFETY: env serialised by `no_auth_env_lock`.
3858        unsafe {
3859            std::env::set_var("REDDB_CLOUD_HEAD_ADMIN_PASSWORD", "env-head-pass");
3860        }
3861
3862        let bootstrap = BootstrapConfig {
3863            preset: Some(PRESET_CLOUD.to_string()),
3864            admin_username: Some("head".to_string()),
3865            admin_password: Some("cli-head-pass".to_string()),
3866            customer_admin: Some("customer".to_string()),
3867            customer_admin_password: Some("customer-pass".to_string()),
3868            ..BootstrapConfig::default()
3869        };
3870        let (runtime, auth_store) = fresh_runtime_and_store();
3871        apply_preset_from_config(&runtime, &auth_store, &bootstrap)
3872            .expect("cloud preset applies cleanly");
3873
3874        auth_store
3875            .authenticate("head", "cli-head-pass")
3876            .expect("CLI alias password should win");
3877        assert!(
3878            auth_store.authenticate("head", "env-head-pass").is_err(),
3879            "cloud-specific env password must not beat CLI alias"
3880        );
3881
3882        clear_preset_env();
3883    }
3884
3885    #[test]
3886    fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3887        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3888        clear_preset_env();
3889        // SAFETY: env serialised by `no_auth_env_lock`.
3890        unsafe {
3891            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3892        }
3893
3894        let (runtime, auth_store) = fresh_runtime_and_store();
3895        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3896
3897        assert!(runtime.query_audit().is_enabled());
3898        assert!(runtime.query_audit().rules().is_empty());
3899        assert!(
3900            runtime
3901                .db()
3902                .store()
3903                .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3904                .is_some(),
3905            "regulated preset should create the query-audit stream"
3906        );
3907
3908        runtime
3909            .execute_query("CREATE TABLE docs (id INT)")
3910            .expect("create table");
3911        runtime
3912            .execute_query("INSERT INTO docs (id) VALUES (1)")
3913            .expect("insert");
3914        runtime.execute_query("SELECT * FROM docs").expect("select");
3915        let rows = runtime
3916            .db()
3917            .store()
3918            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3919            .expect("query audit collection")
3920            .query_all(|_| true);
3921        assert!(
3922            rows.is_empty(),
3923            "regulated preset must not globally audit every query"
3924        );
3925
3926        clear_preset_env();
3927    }
3928
3929    #[test]
3930    fn managed_backup_env_rejects_primary_replica_single_file_storage() {
3931        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3932        clear_backup_env();
3933        // SAFETY: env serialised by `no_auth_env_lock`.
3934        unsafe {
3935            std::env::set_var("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.test");
3936            std::env::set_var("REDDB_BACKUP_S3_BUCKET", "reddb");
3937            std::env::set_var("REDDB_BACKUP_S3_PREFIX", "clusters/prod");
3938            std::env::set_var("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK");
3939            std::env::set_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK");
3940        }
3941
3942        let mut config = no_auth_test_config(false);
3943        config.role = "primary".to_string();
3944        config.storage_profile = crate::storage::StorageDeployPreset::PrimaryReplicaDev.selection();
3945
3946        let err = config.to_db_options().unwrap_err();
3947        assert!(err.contains("managed backup"), "got: {err}");
3948        assert!(err.contains("operational-directory"), "got: {err}");
3949
3950        clear_backup_env();
3951    }
3952
3953    #[test]
3954    fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3955        use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3956        use crate::auth::store::PrincipalRef;
3957        use crate::auth::{Role, UserId};
3958        use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3959        use crate::storage::schema::Value;
3960
3961        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3962        clear_preset_env();
3963        // SAFETY: env serialised by `no_auth_env_lock`.
3964        unsafe {
3965            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3966        }
3967
3968        let options = no_auth_test_config(false)
3969            .to_db_options()
3970            .expect("regulated options");
3971        assert!(
3972            options.control_events.compliance_mode,
3973            "regulated preset must enable fail-closed control evidence before runtime boot"
3974        );
3975        assert!(
3976            options.query_audit.enabled && options.query_audit.rules.is_empty(),
3977            "regulated preset must enable query-audit infrastructure without global rules"
3978        );
3979
3980        let runtime = RedDBRuntime::with_options(options).expect("runtime");
3981        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3982        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3983        runtime.set_auth_store(Arc::clone(&auth_store));
3984
3985        assert!(runtime.control_events_require_persistence());
3986        assert!(runtime.query_audit().is_enabled());
3987        assert!(runtime.query_audit().rules().is_empty());
3988        assert!(auth_store
3989            .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3990            .is_some());
3991
3992        let managed_policy = runtime
3993            .config_registry()
3994            .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3995            .expect("regulated managed policy registry entry");
3996        assert!(managed_policy.managed);
3997        assert_eq!(managed_policy.resource_type, "policy");
3998        assert!(
3999            runtime
4000                .config_registry()
4001                .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
4002                .expect("regulated audit config namespace")
4003                .managed
4004        );
4005
4006        let registry_rows = runtime
4007            .execute_query(&format!(
4008                "SELECT id, managed FROM red.registry WHERE id = '{}'",
4009                REGULATED_PROTECT_MANAGED_POLICY
4010            ))
4011            .expect("red.registry query");
4012        assert_eq!(registry_rows.result.records.len(), 1);
4013        assert_eq!(
4014            registry_rows.result.records[0].get("managed"),
4015            Some(&Value::Boolean(true))
4016        );
4017
4018        let managed_policy_rows = runtime
4019            .execute_query(&format!(
4020                "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
4021                REGULATED_PROTECT_MANAGED_POLICY
4022            ))
4023            .expect("red.managed_policies query");
4024        assert_eq!(managed_policy_rows.result.records.len(), 1);
4025
4026        let capability_rows = runtime
4027            .execute_query(
4028                "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
4029            )
4030            .expect("red.control_capabilities query");
4031        assert_eq!(capability_rows.result.records.len(), 1);
4032
4033        auth_store
4034            .create_user("alice", "p", Role::Admin)
4035            .expect("create ordinary admin");
4036        let allow_all = Policy::from_json_str(
4037            r#"{
4038                "id": "alice-allow-all",
4039                "version": 1,
4040                "statements": [{
4041                    "effect": "allow",
4042                    "actions": ["*"],
4043                    "resources": ["*"]
4044                }]
4045            }"#,
4046        )
4047        .expect("allow-all policy");
4048        auth_store.put_policy(allow_all).expect("install allow-all");
4049        auth_store
4050            .attach_policy(
4051                PrincipalRef::User(UserId::platform("alice")),
4052                "alice-allow-all",
4053            )
4054            .expect("attach allow-all");
4055        let ctx = EvalContext {
4056            principal_tenant: None,
4057            current_tenant: None,
4058            peer_ip: None,
4059            mfa_present: false,
4060            now_ms: 1_700_000_000_000,
4061            principal_is_admin_role: true,
4062            principal_is_platform_scoped: true,
4063        };
4064        assert!(
4065            !auth_store.check_policy_authz(
4066                &UserId::platform("alice"),
4067                "policy:drop",
4068                &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
4069                &ctx,
4070            ),
4071            "managed guardrail deny policy must be effective for ordinary admins"
4072        );
4073
4074        set_current_auth_identity("alice".to_string(), Role::Admin);
4075        let denied = runtime.execute_query(&format!(
4076            "DROP POLICY '{}'",
4077            REGULATED_PROTECT_MANAGED_POLICY
4078        ));
4079        clear_current_auth_identity();
4080        let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
4081        assert!(
4082            err.to_string().contains("managed policy"),
4083            "error should name the managed guardrail: {err}"
4084        );
4085        assert!(
4086            auth_store
4087                .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
4088                .is_some(),
4089            "denied mutation must leave managed policy installed"
4090        );
4091
4092        let denied_events = runtime
4093            .execute_query(&format!(
4094                "SELECT action, resource, outcome FROM red.control_events \
4095                 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
4096                REGULATED_PROTECT_MANAGED_POLICY
4097            ))
4098            .expect("red.control_events denied policy drop");
4099        assert_eq!(denied_events.result.records.len(), 1);
4100        assert_eq!(
4101            denied_events.result.records[0].get("outcome"),
4102            Some(&Value::text("denied"))
4103        );
4104
4105        set_current_auth_identity("alice".to_string(), Role::Admin);
4106        let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
4107        clear_current_auth_identity();
4108        let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
4109        assert!(
4110            err.to_string().contains("managed config"),
4111            "error should name the managed config guardrail: {err}"
4112        );
4113
4114        let denied_config_events = runtime
4115            .execute_query(
4116                "SELECT action, resource, outcome FROM red.control_events \
4117                 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
4118            )
4119            .expect("red.control_events denied config write");
4120        assert_eq!(denied_config_events.result.records.len(), 1);
4121        assert_eq!(
4122            denied_config_events.result.records[0].get("outcome"),
4123            Some(&Value::text("denied"))
4124        );
4125
4126        runtime
4127            .execute_query("CREATE TABLE regulated_docs (id INT)")
4128            .expect("create user table");
4129        runtime
4130            .execute_query("SELECT * FROM regulated_docs")
4131            .expect("select user table");
4132        let audit_rows = runtime
4133            .db()
4134            .store()
4135            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
4136            .expect("query audit collection")
4137            .query_all(|_| true);
4138        assert!(
4139            audit_rows.is_empty(),
4140            "regulated preset must not globally audit data-plane queries"
4141        );
4142
4143        clear_preset_env();
4144    }
4145
4146    #[test]
4147    fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
4148        use crate::auth::policies::{EvalContext, ResourceRef};
4149        use crate::auth::UserId;
4150        use crate::storage::schema::Value;
4151
4152        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4153        clear_preset_env();
4154
4155        let manifest_dir = std::env::current_dir()
4156            .expect("current dir")
4157            .join(".red/tmp/bootstrap-manifest-tests");
4158        std::fs::create_dir_all(&manifest_dir).expect("create manifest test dir");
4159        let manifest_path = manifest_dir.join(format!(
4160            "reddb-bootstrap-manifest-{}-{}.json",
4161            std::process::id(),
4162            std::time::SystemTime::now()
4163                .duration_since(std::time::UNIX_EPOCH)
4164                .unwrap_or_default()
4165                .as_millis()
4166        ));
4167        std::fs::write(
4168            &manifest_path,
4169            r#"{
4170                "users": [
4171                    {
4172                        "username": "ops",
4173                        "password": "hunter2",
4174                        "role": "admin"
4175                    }
4176                ],
4177                "policies": [
4178                    {
4179                        "id": "bootstrap-registry-admin",
4180                        "version": 1,
4181                        "statements": [
4182                            {
4183                                "effect": "allow",
4184                                "actions": ["red.registry:*", "policy:*", "config:write", "select"],
4185                                "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
4186                            }
4187                        ]
4188                    }
4189                ],
4190                "managed_policies": [
4191                    {
4192                        "id": "managed-deny-drop",
4193                        "version": 1,
4194                        "statements": [
4195                            {
4196                                "effect": "deny",
4197                                "actions": ["policy:drop"],
4198                                "resources": ["policy:managed-deny-drop"]
4199                            }
4200                        ],
4201                        "required_resource": "policy:managed-deny-drop",
4202                        "evidence": "full"
4203                    }
4204                ],
4205                "attachments": [
4206                    {"user": "ops", "policy": "bootstrap-registry-admin"}
4207                ],
4208                "managed_config_namespaces": [
4209                    {
4210                        "id": "red.ai",
4211                        "required_action": "config:write",
4212                        "required_resource": "config:red.ai.*",
4213                        "evidence": "metadata"
4214                    }
4215                ],
4216                "config": [
4217                    {"key": "red.ai.default.provider", "value": "openai"},
4218                    {
4219                        "key": "red.ai.openai.default.secret_ref",
4220                        "secret_ref": {"collection": "red.vault", "key": "openai"}
4221                    }
4222                ],
4223                "actor": "ops"
4224            }"#,
4225        )
4226        .expect("write manifest");
4227        // SAFETY: env serialised by `no_auth_env_lock`.
4228        unsafe {
4229            std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
4230        }
4231
4232        let (runtime, auth_store) = fresh_runtime_and_store();
4233        apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
4234
4235        let users = auth_store.list_users();
4236        assert_eq!(users.len(), 1);
4237        assert_eq!(users[0].username, "ops");
4238        assert!(users[0].tenant_id.is_none());
4239
4240        let actor = UserId::platform("ops");
4241        let ctx = EvalContext {
4242            principal_tenant: None,
4243            current_tenant: None,
4244            peer_ip: None,
4245            mfa_present: false,
4246            now_ms: 1_700_000_000_000,
4247            principal_is_admin_role: true,
4248            principal_is_platform_scoped: true,
4249        };
4250        // Manifest fixture pins a canonical data-plane read action.
4251        assert!(auth_store.check_policy_authz(
4252            &actor,
4253            "select",
4254            &ResourceRef::new("collection", "docs"),
4255            &ctx
4256        ));
4257
4258        let managed_policy = runtime
4259            .config_registry()
4260            .get_active("managed-deny-drop")
4261            .expect("managed policy registry entry");
4262        assert!(managed_policy.managed);
4263        assert_eq!(managed_policy.resource_type, "policy");
4264        let managed_config = runtime
4265            .config_registry()
4266            .get_active("red.ai")
4267            .expect("managed config namespace registry entry");
4268        assert!(managed_config.managed);
4269        assert_eq!(managed_config.resource_type, "config_namespace");
4270
4271        let store = runtime.db().store();
4272        match store
4273            .get_config("red.ai.default.provider")
4274            .expect("plain config persisted")
4275        {
4276            Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
4277            other => panic!("expected provider text, got {other:?}"),
4278        }
4279        let Value::Json(bytes) = store
4280            .get_config("red.ai.openai.default.secret_ref")
4281            .expect("secret ref config persisted")
4282        else {
4283            panic!("secret ref must be stored as structured JSON");
4284        };
4285        let reference: crate::serde_json::Value =
4286            crate::serde_json::from_slice(&bytes).expect("secret ref json");
4287        assert_eq!(
4288            reference.get("type").and_then(|v| v.as_str()),
4289            Some("secret_ref")
4290        );
4291        assert!(
4292            !String::from_utf8_lossy(&bytes).contains("hunter2"),
4293            "manifest password must not leak into secret ref config"
4294        );
4295
4296        let completed = store
4297            .get_config(BOOTSTRAP_COMPLETED_KEY)
4298            .expect("bootstrap completion persisted");
4299        assert!(matches!(completed, Value::Boolean(true)));
4300        assert!(
4301            store
4302                .get_config("system.bootstrap.manifest.registry_entries")
4303                .is_some(),
4304            "managed registry entries must be persisted internally"
4305        );
4306
4307        std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
4308        let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
4309        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
4310            .expect("registry rehydrates without manifest file");
4311        assert!(restored_registry.get_active("managed-deny-drop").is_some());
4312        assert!(restored_registry.get_active("red.ai").is_some());
4313
4314        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
4315        apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
4316        assert!(fresh.needs_bootstrap());
4317
4318        clear_preset_env();
4319    }
4320
4321    #[test]
4322    fn production_preset_refuses_to_start_without_password() {
4323        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4324        clear_preset_env();
4325        // SAFETY: env serialised by `no_auth_env_lock`.
4326        unsafe {
4327            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4328            std::env::set_var("REDDB_USERNAME", "ops");
4329        }
4330
4331        let (runtime, auth_store) = fresh_runtime_and_store();
4332        let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
4333        assert!(
4334            err.contains("REDDB_PASSWORD"),
4335            "error must name the missing env: {err}"
4336        );
4337
4338        // Nothing was persisted; nothing was created.
4339        assert!(auth_store.needs_bootstrap());
4340        assert!(runtime
4341            .db()
4342            .store()
4343            .get_config(BOOTSTRAP_COMPLETED_KEY)
4344            .is_none());
4345
4346        clear_preset_env();
4347    }
4348
4349    #[test]
4350    fn re_running_production_after_first_boot_is_a_silent_skip() {
4351        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4352        clear_preset_env();
4353        // SAFETY: env serialised by `no_auth_env_lock`.
4354        unsafe {
4355            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4356            std::env::set_var("REDDB_USERNAME", "ops");
4357            std::env::set_var("REDDB_PASSWORD", "hunter2");
4358        }
4359
4360        let (runtime, auth_store) = fresh_runtime_and_store();
4361        apply_preset(&runtime, &auth_store).expect("first apply");
4362        assert_eq!(auth_store.list_users().len(), 1);
4363
4364        // Second invocation: same runtime/store, same env. Must be a
4365        // no-op — no error, no duplicate admin, no duplicate policy.
4366        // We do NOT reuse `auth_store` because production sealed it; the
4367        // idempotency hinge is the persisted config key, not the auth
4368        // store's in-memory seal. Build a fresh `AuthStore` as a restart
4369        // would and confirm `apply_preset` is a silent skip.
4370        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
4371        apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
4372        assert!(
4373            fresh.needs_bootstrap(),
4374            "re-run must not create a second admin"
4375        );
4376        assert!(
4377            fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
4378            "re-run must not re-install the allow-all policy on the fresh store"
4379        );
4380
4381        clear_preset_env();
4382    }
4383
4384    #[test]
4385    fn unrecognised_preset_value_is_rejected() {
4386        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4387        clear_preset_env();
4388        // SAFETY: env serialised by `no_auth_env_lock`.
4389        unsafe {
4390            std::env::set_var(PRESET_ENV, "weird");
4391        }
4392
4393        let (runtime, auth_store) = fresh_runtime_and_store();
4394        let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
4395        assert!(err.contains("weird"), "error must echo the value: {err}");
4396        assert!(auth_store.needs_bootstrap());
4397
4398        clear_preset_env();
4399    }
4400
4401    #[test]
4402    fn no_auth_short_circuits_preset_entirely() {
4403        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
4404        clear_preset_env();
4405        // SAFETY: env serialised by `no_auth_env_lock`. Production creds
4406        // are set but `--no-auth` must win — no admin, no bootstrap state.
4407        unsafe {
4408            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
4409            std::env::set_var("REDDB_USERNAME", "ops");
4410            std::env::set_var("REDDB_PASSWORD", "hunter2");
4411        }
4412
4413        let options = no_auth_test_config(true)
4414            .to_db_options()
4415            .expect("to_db_options");
4416        assert!(no_auth_active(&options));
4417
4418        // Mirror `build_runtime_with_telemetry`: when no_auth_active,
4419        // `apply_preset` is never called.
4420        let (runtime, auth_store) = fresh_runtime_and_store();
4421        if !no_auth_active(&options) {
4422            apply_preset(&runtime, &auth_store).expect("would apply preset");
4423        }
4424
4425        assert!(
4426            auth_store.needs_bootstrap(),
4427            "--no-auth must prevent any admin creation"
4428        );
4429        assert!(
4430            runtime
4431                .db()
4432                .store()
4433                .get_config(BOOTSTRAP_COMPLETED_KEY)
4434                .is_none(),
4435            "--no-auth must skip bootstrap-state persistence"
4436        );
4437
4438        clear_preset_env();
4439    }
4440
4441    #[test]
4442    fn implicit_bind_collision_degrades() {
4443        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
4444        let addr = held.local_addr().expect("held addr").to_string();
4445        let mut readiness = TransportReadiness::default();
4446
4447        let listener =
4448            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
4449
4450        assert!(listener.is_none());
4451        assert_eq!(readiness.active.len(), 0);
4452        assert_eq!(readiness.failed.len(), 1);
4453        assert!(!readiness.failed[0].explicit);
4454        assert_eq!(readiness.failed[0].bind_addr, addr);
4455    }
4456}