Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18/// Detect available CPU cores and suggest worker thread count.
19pub fn detect_runtime_config() -> RuntimeConfig {
20    let cpus = thread::available_parallelism()
21        .map(|n| n.get())
22        .unwrap_or(1);
23
24    // Reserve 1 core for OS, use the rest for workers (minimum 1)
25    let suggested_workers = cpus.saturating_sub(1).max(1);
26
27    RuntimeConfig {
28        available_cpus: cpus,
29        suggested_workers,
30        stack_size: 8 * 1024 * 1024, // 16MB default
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36    pub available_cpus: usize,
37    pub suggested_workers: usize,
38    pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43    Grpc,
44    Http,
45    Wire,
46}
47
48impl ServerTransport {
49    pub const fn as_str(self) -> &'static str {
50        match self {
51            Self::Grpc => "gRPC",
52            Self::Http => "HTTP",
53            Self::Wire => "wire",
54        }
55    }
56
57    pub const fn default_bind_addr(self) -> &'static str {
58        match self {
59            Self::Grpc => "127.0.0.1:5555",
60            Self::Http => "127.0.0.1:5055",
61            Self::Wire => "127.0.0.1:5050",
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68    pub path: Option<PathBuf>,
69    pub router_bind_addr: Option<String>,
70    pub router_bind_explicit: bool,
71    pub grpc_bind_addr: Option<String>,
72    pub grpc_bind_explicit: bool,
73    /// TLS-encrypted gRPC bind address. Can run side-by-side with
74    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
75    /// stand alone for TLS-only deploys. Defaults to `None`.
76    pub grpc_tls_bind_addr: Option<String>,
77    /// Path to PEM-encoded gRPC server certificate. Resolved through
78    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
79    /// mounts). When `None` and dev-mode is enabled
80    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
81    /// cert and prints its SHA-256 fingerprint to stderr.
82    pub grpc_tls_cert: Option<PathBuf>,
83    /// Path to PEM-encoded gRPC server private key. Same env-var
84    /// pattern as `grpc_tls_cert`.
85    pub grpc_tls_key: Option<PathBuf>,
86    /// Optional path to a PEM bundle of trust anchors used to verify
87    /// client certificates. When set, the gRPC listener requires
88    /// every client to present a cert that chains to this CA — i.e.
89    /// mutual TLS. When unset, one-way TLS only.
90    pub grpc_tls_client_ca: Option<PathBuf>,
91    pub http_bind_addr: Option<String>,
92    pub http_bind_explicit: bool,
93    /// HTTPS bind address. When set, the HTTP server also serves a
94    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
95    /// run side by side (e.g. 8080 plain + 8443 TLS).
96    pub http_tls_bind_addr: Option<String>,
97    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
98    /// companion when not set explicitly.
99    pub http_tls_cert: Option<PathBuf>,
100    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
101    /// companion when not set explicitly.
102    pub http_tls_key: Option<PathBuf>,
103    /// Optional PEM CA bundle. When set, the HTTPS listener requires
104    /// every client to present a cert that chains to a CA in this
105    /// bundle (mTLS). When unset, plain server-side TLS only.
106    pub http_tls_client_ca: Option<PathBuf>,
107    pub wire_bind_addr: Option<String>,
108    pub wire_bind_explicit: bool,
109    /// TLS-encrypted wire protocol bind address
110    pub wire_tls_bind_addr: Option<String>,
111    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
112    pub wire_tls_cert: Option<PathBuf>,
113    /// Path to TLS key PEM
114    pub wire_tls_key: Option<PathBuf>,
115    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
116    /// When set the server accepts psql / JDBC / DBeaver clients on
117    /// this port via the v3 protocol. Defaults to None (disabled).
118    pub pg_bind_addr: Option<String>,
119    pub create_if_missing: bool,
120    pub read_only: bool,
121    pub role: String,
122    pub primary_addr: Option<String>,
123    pub vault: bool,
124    /// Issue #663 — explicit `--no-auth` / `--dev` flag for local
125    /// no-password mode. When `true`, the boot pipeline force-disables
126    /// auth (`auth.enabled = false`, `auth.require_auth = false`,
127    /// `auth.vault_enabled = false`) and skips
128    /// `AuthStore::bootstrap_from_env`, so an explicit
129    /// `REDDB_USERNAME` + `REDDB_PASSWORD` pair cannot accidentally
130    /// turn anonymous access into a logged-in admin. An unmissable
131    /// warning is emitted at startup. Default `false` — the existing
132    /// implicit no-auth behaviour (just don't set the envs) is
133    /// unchanged.
134    pub no_auth: bool,
135    /// Override worker thread count (None = auto-detect from CPUs)
136    pub workers: Option<usize>,
137    /// Telemetry config (Phase 6 logging). `None` falls back to the
138    /// built-in default derived from `path` + stderr-only.
139    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
140    /// HTTP handler-pool knobs from the CLI layer (issue #574 slice 5).
141    /// Carries flag and env values; red_config and built-in defaults
142    /// are applied later by [`crate::server::http_limits::resolve_http_limits`]
143    /// once the runtime is open.
144    pub http_limits_cli: crate::server::HttpLimitsCliInput,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct TransportListenerState {
149    pub transport: String,
150    pub bind_addr: String,
151    pub explicit: bool,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct TransportListenerFailure {
156    pub transport: String,
157    pub bind_addr: String,
158    pub explicit: bool,
159    pub reason: String,
160}
161
162#[derive(Debug, Clone, Default, PartialEq, Eq)]
163pub struct TransportReadiness {
164    pub active: Vec<TransportListenerState>,
165    pub failed: Vec<TransportListenerFailure>,
166}
167
168impl TransportReadiness {
169    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
170        self.active.push(TransportListenerState {
171            transport: transport.to_string(),
172            bind_addr: bind_addr.to_string(),
173            explicit,
174        });
175    }
176
177    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
178        self.failed.push(TransportListenerFailure {
179            transport: transport.to_string(),
180            bind_addr: bind_addr.to_string(),
181            explicit,
182            reason,
183        });
184    }
185}
186
187#[derive(Debug, Clone)]
188pub struct SystemdServiceConfig {
189    pub service_name: String,
190    pub binary_path: PathBuf,
191    pub run_user: String,
192    pub run_group: String,
193    pub data_path: PathBuf,
194    pub router_bind_addr: Option<String>,
195    pub grpc_bind_addr: Option<String>,
196    pub http_bind_addr: Option<String>,
197}
198
199impl SystemdServiceConfig {
200    pub fn data_dir(&self) -> PathBuf {
201        self.data_path
202            .parent()
203            .map(PathBuf::from)
204            .unwrap_or_else(|| PathBuf::from("."))
205    }
206
207    pub fn unit_path(&self) -> PathBuf {
208        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
209    }
210}
211
212/// Build a sane default `TelemetryConfig` from a server path when the
213/// caller didn't set one explicitly. Writes rotating logs into the
214/// parent directory of the DB file (or `./logs` for in-memory /
215/// pathless runs). Level defaults to `info`, pretty stderr format.
216pub fn default_telemetry_for_path(
217    path: Option<&std::path::Path>,
218) -> crate::telemetry::TelemetryConfig {
219    let log_dir = match path {
220        Some(p) => p
221            .parent()
222            .map(|parent| parent.join("logs"))
223            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
224        None => None, // in-memory — no file, stderr-only
225    };
226    crate::telemetry::TelemetryConfig {
227        log_dir,
228        file_prefix: "reddb.log".to_string(),
229        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
230        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
231            crate::telemetry::LogFormat::Pretty
232        } else {
233            crate::telemetry::LogFormat::Json
234        },
235        rotation_keep_days: 14,
236        service_name: "reddb",
237        // Implicit defaults — no CLI flag has claimed these values yet.
238        level_explicit: false,
239        format_explicit: false,
240        rotation_keep_days_explicit: false,
241        file_prefix_explicit: false,
242        log_dir_explicit: false,
243        log_file_disabled: false,
244    }
245}
246
247/// Metadata key used to thread the parsed backup config from
248/// `to_db_options` down to runner threads. The runner reads it back
249/// (via `runner_backup_intervals`) to spawn the periodic checkpointer
250/// and WAL-flush tasks. Threading through `metadata` avoids extending
251/// `RedDBOptions` with a public field that has no meaning for
252/// library consumers.
253const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
254const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
255const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
256/// Issue #519 — threaded through `metadata` like the existing interval
257/// values. `0` (default) means "feature disabled" and the runner skips
258/// the lag-monitor wiring entirely.
259const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
260
261/// Issue #663 — metadata key set by `to_db_options` when the operator
262/// passes `--no-auth` / `--dev`. Read in `build_runtime_with_telemetry`
263/// to (a) skip `AuthStore::bootstrap_from_env` (so a stray
264/// `REDDB_USERNAME`/`REDDB_PASSWORD` cannot auto-create an admin) and
265/// (b) emit the loud "auth disabled" warning. Threaded via `metadata`
266/// — rather than a public `RedDBOptions` field — to keep the flag a
267/// CLI/boot concern with no meaning for library consumers.
268pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
269
270/// Returns `true` when `--no-auth` / `--dev` was active for this boot,
271/// i.e. when `to_db_options` stamped [`NO_AUTH_META`] on `options.metadata`.
272pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
273    options
274        .metadata
275        .get(NO_AUTH_META)
276        .map(|v| v == "true")
277        .unwrap_or(false)
278}
279
280/// Loud, unmissable warning printed when the operator opts into
281/// anonymous access via `--no-auth` / `--dev`. Goes to stderr (always
282/// visible at startup) **and** the tracing layer (captured by file
283/// logs once telemetry is wired).
284const NO_AUTH_WARNING: &str =
285    "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
286
287impl ServerCommandConfig {
288    fn to_db_options(&self) -> Result<RedDBOptions, String> {
289        let mut options = match &self.path {
290            Some(path) => RedDBOptions::persistent(path),
291            None => RedDBOptions::in_memory(),
292        };
293
294        options.mode = StorageMode::Persistent;
295        options.create_if_missing = self.create_if_missing;
296        // PLAN.md Phase 4.3 — read_only resolution priority:
297        //   1. CLI flag (`--readonly`) — explicit operator intent.
298        //   2. `RED_READONLY=true` env — orchestrator override.
299        //   3. Persisted `<data>/.runtime-state.json` from a prior
300        //      `POST /admin/readonly` — survives restart.
301        //   4. Default `false`.
302        options.read_only = self.read_only
303            || env_nonempty("RED_READONLY")
304                .or_else(|| env_nonempty("REDDB_READONLY"))
305                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
306                .unwrap_or(false)
307            || self.path.as_ref().is_some_and(|data_path| {
308                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
309                    data_path,
310                ))
311                .unwrap_or(false)
312            });
313
314        options.replication = match self.role.as_str() {
315            "primary" => ReplicationConfig::primary(),
316            "replica" => {
317                let primary_addr = self
318                    .primary_addr
319                    .clone()
320                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
321                // Public-mutation rejection on replicas is enforced by
322                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
323                // Leaving `options.read_only = false` keeps the pager
324                // writable so the internal logical-WAL apply path can
325                // ingest records from the primary; WriteGate ensures no
326                // client request reaches storage.
327                ReplicationConfig::replica(primary_addr)
328            }
329            _ => ReplicationConfig::standalone(),
330        };
331
332        if self.vault {
333            options.auth.vault_enabled = true;
334        }
335
336        // Issue #663 — `--no-auth` / `--dev` is the last word on auth
337        // for this boot: force every auth knob off, regardless of any
338        // env-derived config (`--vault`, `REDDB_USERNAME`/`PASSWORD`,
339        // `REDDB_VAULT_KEY`, OAuth, cert) the operator may also have
340        // set. We *also* stamp [`NO_AUTH_META`] so the auth-store
341        // builder downstream knows to skip `bootstrap_from_env`
342        // (which would otherwise auto-create an admin from
343        // `REDDB_USERNAME`/`REDDB_PASSWORD` even with auth disabled,
344        // a footgun for the local-dev workflow this flag exists to
345        // support).
346        if self.no_auth {
347            options.auth.enabled = false;
348            options.auth.require_auth = false;
349            options.auth.vault_enabled = false;
350            options
351                .metadata
352                .insert(NO_AUTH_META.to_string(), "true".to_string());
353        }
354
355        // Issue #652 — Control Event Ledger Compliance Mode.
356        // `REDDB_COMPLIANCE_MODE=true|1|yes|on` makes the producer
357        // slices (652b/c/d) fail-closed on ledger persistence
358        // failures. Default `false` — log-and-continue on emit error.
359        if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
360            options.control_events.compliance_mode = matches!(
361                raw.to_ascii_lowercase().as_str(),
362                "1" | "true" | "yes" | "on"
363            );
364        }
365        if env_nonempty(PRESET_ENV).is_some_and(|s| s.trim() == PRESET_REGULATED) {
366            options.control_events.compliance_mode = true;
367            options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
368        }
369
370        // Issue #517 — canonical `REDDB_BACKUP_*` contract takes
371        // precedence. On Err, surface the partial-config message so
372        // boot exits non-zero with a clear operator message. On
373        // Ok(None), fall through to the legacy backend-from-env path.
374        match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
375            Err(msg) => {
376                return Err(format!("backup bootstrap: {msg}"));
377            }
378            Ok(Some(cfg)) => {
379                apply_backup_config(&mut options, &cfg);
380            }
381            Ok(None) => {
382                configure_remote_backend_from_env(&mut options);
383            }
384        }
385
386        Ok(options)
387    }
388
389    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
390        let mut transports = Vec::with_capacity(3);
391        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
392            transports.push(ServerTransport::Grpc);
393        }
394        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
395            transports.push(ServerTransport::Http);
396        }
397        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
398            transports.push(ServerTransport::Wire);
399        }
400        transports
401    }
402}
403
404/// Read an env var, treating empty / whitespace-only as `None`.
405/// Honors the `<NAME>_FILE` convention. Re-exports the shared
406/// `crate::utils::env_with_file_fallback` helper so call sites in
407/// this module can keep their short local name.
408fn env_nonempty(name: &str) -> Option<String> {
409    crate::utils::env_with_file_fallback(name)
410}
411
412fn env_truthy(name: &str) -> bool {
413    env_nonempty(name)
414        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
415        .unwrap_or(false)
416}
417
418/// Apply a parsed [`BackupConfig`] to `options`. Wires the S3
419/// backend via `with_remote_backend` + `with_atomic_remote_backend`
420/// when the `backend-s3` feature is on, stashes intervals + backend
421/// kind in `metadata` so the runner can spawn the periodic tasks,
422/// and emits the startup INFO log required by #517.
423fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
424    let endpoint_host = endpoint_host(&cfg.endpoint);
425
426    options.metadata.insert(
427        BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
428        cfg.checkpoint_interval_secs.to_string(),
429    );
430    options.metadata.insert(
431        BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
432        cfg.wal_flush_interval_secs.to_string(),
433    );
434    options
435        .metadata
436        .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
437    options.metadata.insert(
438        BACKUP_PAUSE_ON_LAG_META.to_string(),
439        cfg.pause_on_lag_secs.to_string(),
440    );
441
442    #[cfg(feature = "backend-s3")]
443    {
444        let s3_cfg = crate::storage::backend::S3Config {
445            endpoint: cfg.endpoint.clone(),
446            bucket: cfg.bucket.clone(),
447            key_prefix: cfg.prefix.clone(),
448            access_key: cfg.access_key_id.clone(),
449            secret_key: cfg.secret_access_key.clone(),
450            region: cfg.region.clone(),
451            path_style: true,
452        };
453        let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
454        options.remote_backend = Some(backend.clone());
455        options.remote_backend_atomic = Some(backend);
456        // Use the operator-supplied prefix as the snapshot key root.
457        // The existing helpers (`default_snapshot_prefix`,
458        // `default_wal_archive_prefix`) derive sub-prefixes from the
459        // parent of `remote_key`.
460        let trimmed = cfg.prefix.trim_end_matches('/');
461        options.remote_key = Some(format!("{}/data.rdb", trimmed));
462
463        tracing::info!(
464            backend = "s3",
465            endpoint = %endpoint_host,
466            bucket = %cfg.bucket,
467            prefix = %cfg.prefix,
468            checkpoint_interval_secs = cfg.checkpoint_interval_secs,
469            wal_flush_interval_secs = cfg.wal_flush_interval_secs,
470            "backup backend configured from REDDB_BACKUP_* env"
471        );
472    }
473
474    #[cfg(not(feature = "backend-s3"))]
475    {
476        tracing::warn!(
477            backend = "s3",
478            endpoint = %endpoint_host,
479            bucket = %cfg.bucket,
480            prefix = %cfg.prefix,
481            "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
482             backend wiring skipped (archiver/checkpointer also disabled)"
483        );
484    }
485}
486
487fn endpoint_host(endpoint: &str) -> &str {
488    let after_scheme = endpoint
489        .split_once("://")
490        .map(|(_, r)| r)
491        .unwrap_or(endpoint);
492    after_scheme.split('/').next().unwrap_or(after_scheme)
493}
494
495/// If `options` carry backup-task intervals threaded in via
496/// [`apply_backup_config`], spawn periodic checkpointer + WAL-flush
497/// tasks against `runtime`. Returns a `BackupTasksHandle` that
498/// stops the tasks when dropped; runners keep it alive for the
499/// server lifetime.
500fn spawn_backup_tasks_if_configured(
501    options: &RedDBOptions,
502    runtime: &RedDBRuntime,
503) -> Option<BackupTasksHandle> {
504    let checkpoint_secs: u64 = options
505        .metadata
506        .get(BACKUP_INTERVAL_META_CHECKPOINT)?
507        .parse()
508        .ok()?;
509    let wal_secs: u64 = options
510        .metadata
511        .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
512        .parse()
513        .ok()?;
514    // Issue #519 — opt-in graceful read-only when remote archive lag
515    // exceeds the threshold. `0` (default) keeps legacy behaviour.
516    let pause_on_lag_secs: u64 = options
517        .metadata
518        .get(BACKUP_PAUSE_ON_LAG_META)
519        .and_then(|raw| raw.parse().ok())
520        .unwrap_or(0);
521    options.remote_backend.as_ref()?;
522
523    let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
524
525    // Stamp the gate with the threshold + a "now" baseline so the
526    // first auto-pause can only fire after `pause_on_lag_secs` of
527    // archive silence. The poller below re-evaluates on the same
528    // clock the archive-task wrapper uses.
529    if pause_on_lag_secs > 0 {
530        let now_ms = std::time::SystemTime::now()
531            .duration_since(std::time::UNIX_EPOCH)
532            .map(|d| d.as_millis() as u64)
533            .unwrap_or(0);
534        runtime
535            .write_gate()
536            .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
537        tracing::info!(
538            pause_on_lag_secs,
539            "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
540        );
541    }
542
543    let checkpoint_handle = {
544        let stop = Arc::clone(&stop);
545        let runtime = runtime.clone();
546        let interval = Duration::from_secs(checkpoint_secs);
547        thread::Builder::new()
548            .name("red-checkpointer".into())
549            .spawn(move || {
550                periodic_loop(stop, interval, move || {
551                    if let Err(err) = runtime.checkpoint() {
552                        tracing::warn!(error = %err, "periodic checkpoint failed");
553                    }
554                })
555            })
556            .ok()
557    };
558
559    let archiver_handle = {
560        let stop = Arc::clone(&stop);
561        let runtime = runtime.clone();
562        let interval = Duration::from_secs(wal_secs);
563        let lag_enabled = pause_on_lag_secs > 0;
564        thread::Builder::new()
565            .name("red-wal-archiver".into())
566            .spawn(move || {
567                periodic_loop(stop, interval, move || match runtime.trigger_backup() {
568                    Ok(_) if lag_enabled => {
569                        let now_ms = std::time::SystemTime::now()
570                            .duration_since(std::time::UNIX_EPOCH)
571                            .map(|d| d.as_millis() as u64)
572                            .unwrap_or(0);
573                        runtime.write_gate().record_archive_success(now_ms);
574                        // Same-tick re-evaluation: catching up while
575                        // already auto-paused must auto-resume without
576                        // waiting for the poller's cadence.
577                        runtime.write_gate().evaluate_archive_lag(now_ms);
578                    }
579                    Ok(_) => {}
580                    Err(err) => {
581                        tracing::warn!(error = %err, "periodic WAL archive/backup failed");
582                    }
583                })
584            })
585            .ok()
586    };
587
588    // Issue #519 — lag poller. Wakes on a short cadence so a frozen
589    // archiver (the worst case) still flips the gate within ~5s of
590    // crossing the threshold, instead of waiting up to a full
591    // `wal_secs` for the next archive attempt that may never come.
592    let lag_monitor_handle = if pause_on_lag_secs > 0 {
593        let stop = Arc::clone(&stop);
594        let runtime = runtime.clone();
595        // 5s is short enough that the threshold is honoured tightly
596        // and long enough that the atomic loads stay invisible at the
597        // process level.
598        let interval = Duration::from_secs(5);
599        thread::Builder::new()
600            .name("red-archive-lag-monitor".into())
601            .spawn(move || {
602                periodic_loop(stop, interval, move || {
603                    let now_ms = std::time::SystemTime::now()
604                        .duration_since(std::time::UNIX_EPOCH)
605                        .map(|d| d.as_millis() as u64)
606                        .unwrap_or(0);
607                    let was_paused = runtime.write_gate().is_auto_paused();
608                    let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
609                    if now_paused && !was_paused {
610                        tracing::warn!(
611                            pause_on_lag_secs,
612                            last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
613                            "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
614                        );
615                    } else if !now_paused && was_paused {
616                        tracing::info!(
617                            "WAL archive caught up — exiting graceful read-only mode (issue #519)"
618                        );
619                    }
620                })
621            })
622            .ok()
623    } else {
624        None
625    };
626
627    tracing::info!(
628        checkpoint_interval_secs = checkpoint_secs,
629        wal_flush_interval_secs = wal_secs,
630        "backup tasks spawned (checkpointer + WAL archiver)"
631    );
632
633    Some(BackupTasksHandle {
634        stop,
635        _checkpoint_handle: checkpoint_handle,
636        _archiver_handle: archiver_handle,
637        _lag_monitor_handle: lag_monitor_handle,
638    })
639}
640
641/// Shutdown handle for the periodic checkpointer + archiver tasks.
642/// Drop signals both loops to exit on their next wake.
643pub struct BackupTasksHandle {
644    stop: Arc<std::sync::atomic::AtomicBool>,
645    _checkpoint_handle: Option<thread::JoinHandle<()>>,
646    _archiver_handle: Option<thread::JoinHandle<()>>,
647    /// Issue #519 — periodic archive-lag poller, only spawned when
648    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS > 0`.
649    _lag_monitor_handle: Option<thread::JoinHandle<()>>,
650}
651
652impl Drop for BackupTasksHandle {
653    fn drop(&mut self) {
654        self.stop.store(true, std::sync::atomic::Ordering::Release);
655    }
656}
657
658fn periodic_loop<F: FnMut()>(
659    stop: Arc<std::sync::atomic::AtomicBool>,
660    interval: Duration,
661    mut tick: F,
662) {
663    // Wake on a short cadence so shutdown is responsive even when the
664    // operator-configured interval is large (e.g. 1h checkpoint).
665    let wake = Duration::from_secs(1);
666    let mut elapsed = Duration::ZERO;
667    while !stop.load(std::sync::atomic::Ordering::Acquire) {
668        thread::sleep(wake);
669        elapsed += wake;
670        if elapsed >= interval {
671            tick();
672            elapsed = Duration::ZERO;
673        }
674    }
675}
676
677fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
678    // PLAN.md (cloud-agnostic) — prefer the new spelling
679    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
680    // existing dev installs. `none` (default) means standalone — no
681    // remote backend, valid for development and on-prem without
682    // remote.
683    let backend = env_nonempty("RED_BACKEND")
684        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
685        .unwrap_or_else(|| "none".to_string())
686        .to_ascii_lowercase();
687
688    match backend.as_str() {
689        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
690        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
691        // IDrive, Storj. The `path_style` toggle in S3Config picks
692        // the right addressing for self-hosted vs hosted.
693        "s3" | "minio" | "r2" => {
694            #[cfg(feature = "backend-s3")]
695            {
696                if let Some(config) = s3_config_from_env() {
697                    let remote_key = env_nonempty("RED_REMOTE_KEY")
698                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
699                        .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
700                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
701                    options.remote_backend = Some(backend.clone());
702                    options.remote_backend_atomic = Some(backend);
703                    options.remote_key = Some(remote_key);
704                }
705            }
706            #[cfg(not(feature = "backend-s3"))]
707            {
708                tracing::warn!(
709                    backend = %backend,
710                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
711                );
712            }
713        }
714        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
715        // calls it `fs`; legacy code shipped it as `local`. Both
716        // names map to LocalBackend, with the remote_key derived
717        // from `RED_FS_PATH` + a per-database suffix when provided.
718        "fs" | "local" => {
719            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
720            let remote_key = match (
721                base_path,
722                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
723            ) {
724                (Some(base), Some(rel)) => Some(format!(
725                    "{}/{}",
726                    base.trim_end_matches('/'),
727                    rel.trim_start_matches('/')
728                )),
729                (Some(base), None) => Some(format!(
730                    "{}/clusters/dev/data.rdb",
731                    base.trim_end_matches('/')
732                )),
733                (None, Some(rel)) => Some(rel),
734                (None, None) => None,
735            };
736            if let Some(key) = remote_key {
737                let backend = Arc::new(crate::storage::backend::LocalBackend);
738                options.remote_backend = Some(backend.clone());
739                options.remote_backend_atomic = Some(backend);
740                options.remote_key = Some(key);
741            }
742        }
743        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
744        // portability: any service exposing PUT/GET/DELETE serves
745        // as a backup target. Optional auth via *_FILE secret
746        // path keeps the token out of the env.
747        "http" => {
748            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
749                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
750            {
751                Some(u) => u,
752                None => {
753                    tracing::warn!(
754                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
755                    );
756                    return;
757                }
758            };
759            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
760                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
761                .unwrap_or_default();
762            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
763                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
764            {
765                std::fs::read_to_string(&path)
766                    .ok()
767                    .map(|s| s.trim().to_string())
768                    .filter(|s| !s.is_empty())
769            } else {
770                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
771                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
772            };
773
774            let mut config =
775                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
776            if let Some(auth) = auth_header {
777                config = config.with_auth_header(auth);
778            }
779            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
780                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
781                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
782            config = config.with_conditional_writes(conditional_writes);
783            // Always populate the snapshot-transport handle. When the
784            // operator confirmed CAS support, also populate the atomic
785            // handle via AtomicHttpBackend — without that flag,
786            // LeaseStore must remain unreachable on this backend.
787            if conditional_writes {
788                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
789                    Ok(atomic) => {
790                        let atomic_arc = Arc::new(atomic);
791                        options.remote_backend = Some(atomic_arc.clone());
792                        options.remote_backend_atomic = Some(atomic_arc);
793                    }
794                    Err(err) => {
795                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
796                        options.remote_backend =
797                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
798                    }
799                }
800            } else {
801                options.remote_backend =
802                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
803            }
804            options.remote_key = env_nonempty("RED_REMOTE_KEY")
805                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
806                .or_else(|| Some("clusters/dev/data.rdb".to_string()));
807        }
808        // `none` is the explicit standalone — no remote, no backup
809        // pipeline. Boot never blocks on network reachability.
810        "none" | "" => {}
811        other => {
812            tracing::warn!(
813                backend = %other,
814                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
815            );
816        }
817    }
818}
819
820/// Resolve a value from env, accepting both the cloud-agnostic
821/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
822/// kept for existing dev installs. The new spelling wins; the
823/// legacy spelling is read with a warning hint in callers' logs.
824#[cfg(feature = "backend-s3")]
825fn env_s3(suffix: &str) -> Option<String> {
826    env_nonempty(&format!("RED_S3_{suffix}"))
827        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
828}
829
830/// Read a secret value from either the literal env var or a file
831/// path supplied via `*_FILE` (PLAN.md spec — compatible with
832/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
833/// secrets). The `_FILE` variant wins so an operator can set it to
834/// override the inline value without touching the inline env.
835#[cfg(feature = "backend-s3")]
836fn env_s3_secret(suffix: &str) -> Option<String> {
837    let file_key_red = format!("RED_S3_{suffix}_FILE");
838    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
839    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
840        return std::fs::read_to_string(&path)
841            .ok()
842            .map(|s| s.trim().to_string())
843            .filter(|s| !s.is_empty());
844    }
845    env_s3(suffix)
846}
847
848#[cfg(feature = "backend-s3")]
849fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
850    let endpoint = env_s3("ENDPOINT")?;
851    let bucket = env_s3("BUCKET")?;
852    let access_key = env_s3_secret("ACCESS_KEY")?;
853    let secret_key = env_s3_secret("SECRET_KEY")?;
854    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
855    let key_prefix = env_s3("KEY_PREFIX")
856        .or_else(|| env_s3("PREFIX"))
857        .unwrap_or_default();
858    let path_style = env_s3("PATH_STYLE")
859        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
860        .unwrap_or(true);
861    Some(crate::storage::backend::S3Config {
862        endpoint,
863        bucket,
864        key_prefix,
865        access_key,
866        secret_key,
867        region,
868        path_style,
869    })
870}
871
872pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
873    let data_dir = config.data_dir();
874    let exec_start = render_systemd_exec_start(config);
875    format!(
876        "[Unit]\n\
877Description=RedDB unified database service\n\
878After=network-online.target\n\
879Wants=network-online.target\n\
880\n\
881[Service]\n\
882Type=simple\n\
883User={user}\n\
884Group={group}\n\
885WorkingDirectory={workdir}\n\
886ExecStart={exec_start}\n\
887Restart=always\n\
888RestartSec=2\n\
889LimitSTACK=16M\n\
890NoNewPrivileges=true\n\
891PrivateTmp=true\n\
892ProtectSystem=strict\n\
893ProtectHome=true\n\
894ProtectControlGroups=true\n\
895ProtectKernelTunables=true\n\
896ProtectKernelModules=true\n\
897RestrictNamespaces=true\n\
898LockPersonality=true\n\
899MemoryDenyWriteExecute=true\n\
900ReadWritePaths={workdir}\n\
901\n\
902[Install]\n\
903WantedBy=multi-user.target\n",
904        user = config.run_user,
905        group = config.run_group,
906        workdir = data_dir.display(),
907        exec_start = exec_start,
908    )
909}
910
911/// Install a systemd unit + start the service.
912///
913/// Linux-only. The helper shells out to `systemctl`, `useradd`,
914/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
915/// Windows or macOS. The Windows/macOS branch returns a hard error so
916/// callers (the CLI) surface a clear message instead of a confusing
917/// syscall failure. A proper Windows-service equivalent (sc.exe /
918/// NSSM) is a Phase 3.6 follow-up.
919#[cfg(target_os = "linux")]
920pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
921    ensure_root()?;
922    ensure_command_available("systemctl")?;
923    ensure_command_available("getent")?;
924    ensure_command_available("groupadd")?;
925    ensure_command_available("useradd")?;
926    ensure_command_available("install")?;
927    ensure_executable(&config.binary_path)?;
928
929    if !command_success("getent", ["group", config.run_group.as_str()])? {
930        run_command("groupadd", ["--system", config.run_group.as_str()])?;
931    }
932
933    if !command_success("id", ["-u", config.run_user.as_str()])? {
934        let data_dir = config.data_dir();
935        run_command(
936            "useradd",
937            [
938                "--system",
939                "--gid",
940                config.run_group.as_str(),
941                "--home-dir",
942                data_dir.to_string_lossy().as_ref(),
943                "--shell",
944                "/usr/sbin/nologin",
945                config.run_user.as_str(),
946            ],
947        )?;
948    }
949
950    let data_dir = config.data_dir();
951    run_command(
952        "install",
953        [
954            "-d",
955            "-o",
956            config.run_user.as_str(),
957            "-g",
958            config.run_group.as_str(),
959            "-m",
960            "0750",
961            data_dir.to_string_lossy().as_ref(),
962        ],
963    )?;
964
965    std::fs::write(config.unit_path(), render_systemd_unit(config))
966        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
967
968    run_command("systemctl", ["daemon-reload"])?;
969    run_command(
970        "systemctl",
971        [
972            "enable",
973            "--now",
974            format!("{}.service", config.service_name).as_str(),
975        ],
976    )?;
977
978    Ok(())
979}
980
981/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
982/// identical so callers compile on every platform; surface a clear
983/// error at runtime. Windows/macOS service-manager integration is a
984/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
985#[cfg(not(target_os = "linux"))]
986pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
987    Err("systemd install is Linux-only — use sc.exe (Windows) or \
988         launchd (macOS) to install the service manually using the \
989         unit printed by `red service print-unit`"
990        .to_string())
991}
992
993#[cfg(target_os = "linux")]
994fn ensure_root() -> Result<(), String> {
995    let output = Command::new("id")
996        .arg("-u")
997        .output()
998        .map_err(|err| format!("failed to determine current uid: {err}"))?;
999    if !output.status.success() {
1000        return Err("failed to determine current uid".to_string());
1001    }
1002    let uid = String::from_utf8_lossy(&output.stdout);
1003    if uid.trim() != "0" {
1004        return Err("run this command as root (sudo)".to_string());
1005    }
1006    Ok(())
1007}
1008
1009#[cfg(target_os = "linux")]
1010fn ensure_command_available(command: &str) -> Result<(), String> {
1011    let status = Command::new("sh")
1012        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1013        .status()
1014        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1015    if status.success() {
1016        Ok(())
1017    } else {
1018        Err(format!("required command not found: {command}"))
1019    }
1020}
1021
1022#[cfg(target_os = "linux")]
1023fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1024    let metadata = std::fs::metadata(path)
1025        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1026    #[cfg(unix)]
1027    {
1028        use std::os::unix::fs::PermissionsExt;
1029        if metadata.permissions().mode() & 0o111 == 0 {
1030            return Err(format!("binary is not executable: {}", path.display()));
1031        }
1032    }
1033    #[cfg(not(unix))]
1034    {
1035        if !metadata.is_file() {
1036            return Err(format!("binary is not a file: {}", path.display()));
1037        }
1038    }
1039    Ok(())
1040}
1041
1042#[cfg(target_os = "linux")]
1043fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1044    Command::new(program)
1045        .args(args)
1046        .status()
1047        .map(|status| status.success())
1048        .map_err(|err| format!("failed to run {program}: {err}"))
1049}
1050
1051#[cfg(target_os = "linux")]
1052fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1053    let output = Command::new(program)
1054        .args(args)
1055        .output()
1056        .map_err(|err| format!("failed to run {program}: {err}"))?;
1057    if output.status.success() {
1058        return Ok(());
1059    }
1060
1061    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1062    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1063    let detail = if !stderr.is_empty() {
1064        stderr
1065    } else if !stdout.is_empty() {
1066        stdout
1067    } else {
1068        format!("exit status {}", output.status)
1069    };
1070    Err(format!("{program} failed: {detail}"))
1071}
1072
1073pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1074    let has_any = config.router_bind_addr.is_some()
1075        || config.grpc_bind_addr.is_some()
1076        || config.http_bind_addr.is_some()
1077        || config.wire_bind_addr.is_some()
1078        || config.pg_bind_addr.is_some();
1079    if !has_any {
1080        return Err("at least one server bind address must be configured".into());
1081    }
1082    let thread_name = if config.router_bind_addr.is_some() {
1083        "red-server-router"
1084    } else {
1085        match (
1086            config.grpc_bind_addr.is_some(),
1087            config.http_bind_addr.is_some(),
1088        ) {
1089            (true, true) => "red-server-dual",
1090            (true, false) => "red-server-grpc",
1091            (false, true) => "red-server-http",
1092            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1093            (false, false) => "red-server-pg-wire",
1094        }
1095    };
1096
1097    let handle = thread::Builder::new()
1098        .name(thread_name.into())
1099        .stack_size(8 * 1024 * 1024)
1100        .spawn(move || run_configured_servers(config))
1101        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1102
1103    match handle.join() {
1104        Ok(result) => result,
1105        Err(_) => Err("server thread panicked".to_string()),
1106    }
1107}
1108
1109fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1110    let mut parts = vec![
1111        config.binary_path.display().to_string(),
1112        "server".to_string(),
1113        "--path".to_string(),
1114        config.data_path.display().to_string(),
1115    ];
1116
1117    if let Some(bind_addr) = &config.router_bind_addr {
1118        parts.push("--bind".to_string());
1119        parts.push(bind_addr.clone());
1120    } else if let Some(bind_addr) = &config.grpc_bind_addr {
1121        parts.push("--grpc-bind".to_string());
1122        parts.push(bind_addr.clone());
1123    }
1124    if let Some(bind_addr) = &config.http_bind_addr {
1125        parts.push("--http-bind".to_string());
1126        parts.push(bind_addr.clone());
1127    }
1128
1129    parts.join(" ")
1130}
1131
1132pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1133    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1134        Ok(addresses) => addresses.collect(),
1135        Err(_) => return false,
1136    };
1137
1138    addresses
1139        .into_iter()
1140        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1141}
1142
1143#[inline(never)]
1144fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1145    // Phase 6 logging is initialised inside each runner once the
1146    // runtime is open — see `build_runtime_and_auth_store`. Going
1147    // after DB open lets us read `red.logging.*` config keys out of
1148    // the persistent red_config store and merge them with the CLI
1149    // flags (flag > red_config > built-in default).
1150    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1151        return run_routed_server(config, router_bind_addr);
1152    }
1153
1154    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1155        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1156            run_dual_server(config, grpc_bind_addr, http_bind_addr)
1157        }
1158        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1159        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1160        (None, None) => {
1161            if let Some(wire_addr) = config.wire_bind_addr.clone() {
1162                run_wire_only_server(config, wire_addr)
1163            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1164                run_pg_only_server(config, pg_addr)
1165            } else {
1166                Err("at least one server bind address must be configured".to_string())
1167            }
1168        }
1169    }
1170}
1171
1172/// Bind a TCP listener for a transport at startup and record the
1173/// outcome in the shared [`TransportReadiness`] state.
1174///
1175/// The split between *explicit* and *implicit/default* binds is the
1176/// contract from issue #545:
1177///
1178/// * `explicit == true` — the operator named this transport on the
1179///   CLI / env / config. A failed bind is fatal: this returns `Err`
1180///   and the boot path must propagate the error so the process exits
1181///   non-zero with the recorded `reason`.
1182/// * `explicit == false` — this is a default listener the server
1183///   would have spun up regardless. A failed bind degrades: this
1184///   returns `Ok(None)` (no listener) but the failure is still
1185///   recorded in `readiness.failed`, so the server keeps running and
1186///   the next `/health` probe enumerates the degraded listener.
1187///
1188/// On success the bound listener lands in `readiness.active`.
1189pub fn bind_listener_for_startup(
1190    readiness: &mut TransportReadiness,
1191    transport: &str,
1192    bind_addr: &str,
1193    explicit: bool,
1194) -> Result<Option<TcpListener>, String> {
1195    match TcpListener::bind(bind_addr) {
1196        Ok(listener) => {
1197            readiness.active(transport, bind_addr, explicit);
1198            Ok(Some(listener))
1199        }
1200        Err(err) => {
1201            let reason = format!("{transport} listener bind {bind_addr}: {err}");
1202            readiness.failed(transport, bind_addr, explicit, reason.clone());
1203            if explicit {
1204                tracing::error!(
1205                    transport,
1206                    bind = %bind_addr,
1207                    error = %err,
1208                    "fatal explicit bind failure"
1209                );
1210                Err(format!("explicit {reason}"))
1211            } else {
1212                tracing::warn!(
1213                    transport,
1214                    bind = %bind_addr,
1215                    error = %err,
1216                    "non-fatal implicit bind failure; listener degraded"
1217                );
1218                Ok(None)
1219            }
1220        }
1221    }
1222}
1223
1224/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
1225///
1226/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
1227/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
1228/// grace window. SIGKILL after that grace window is the OS's
1229/// fallback; we are responsible for finishing in time.
1230///
1231/// Spawns a tokio task on the caller's runtime that:
1232///   1. Awaits the first of SIGTERM / SIGINT.
1233///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
1234///      runtime moves to `Stopped` on its own; this just runs the
1235///      flush + checkpoint pipeline and (optionally) a final backup.
1236///   3. Calls `std::process::exit(0)` so the orchestrator sees a
1237///      clean exit code.
1238///
1239/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
1240/// configured) toggles step 3's backup branch. The flush + checkpoint
1241/// always run.
1242///
1243/// Idempotent across signal storms — `graceful_shutdown` returns the
1244/// cached report on second call, but we exit on the first one
1245/// regardless, so the second SIGTERM never reaches the handler.
1246async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1247    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1248        .ok()
1249        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1250        .unwrap_or(true);
1251
1252    #[cfg(unix)]
1253    {
1254        use tokio::signal::unix::{signal, SignalKind};
1255
1256        let mut sigterm = match signal(SignalKind::terminate()) {
1257            Ok(s) => s,
1258            Err(err) => {
1259                tracing::warn!(
1260                    error = %err,
1261                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1262                );
1263                return;
1264            }
1265        };
1266        let mut sigint = match signal(SignalKind::interrupt()) {
1267            Ok(s) => s,
1268            Err(err) => {
1269                tracing::warn!(error = %err, "could not install SIGINT handler");
1270                return;
1271            }
1272        };
1273        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
1274        // their `_FILE` companions without restarting the process.
1275        // Useful for credential rotation pipelines (kubectl create
1276        // secret + kubectl rollout restart, but for systemd / Nomad /
1277        // bare-metal where rolling the process is heavier).
1278        let mut sighup = match signal(SignalKind::hangup()) {
1279            Ok(s) => Some(s),
1280            Err(err) => {
1281                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1282                None
1283            }
1284        };
1285
1286        let reload_runtime = runtime.clone();
1287        tokio::spawn(async move {
1288            loop {
1289                let signal_name = match &mut sighup {
1290                    Some(hup) => tokio::select! {
1291                        _ = sigterm.recv() => "SIGTERM",
1292                        _ = sigint.recv() => "SIGINT",
1293                        _ = hup.recv() => "SIGHUP",
1294                    },
1295                    None => tokio::select! {
1296                        _ = sigterm.recv() => "SIGTERM",
1297                        _ = sigint.recv() => "SIGINT",
1298                    },
1299                };
1300
1301                if signal_name == "SIGHUP" {
1302                    handle_sighup_reload(&reload_runtime);
1303                    continue; // stay alive; SIGHUP isn't a shutdown
1304                }
1305
1306                tracing::info!(
1307                    signal = signal_name,
1308                    "lifecycle signal received; shutting down"
1309                );
1310                match runtime.graceful_shutdown(backup_on_shutdown) {
1311                    Ok(report) => {
1312                        tracing::info!(
1313                            duration_ms = report.duration_ms,
1314                            flushed_wal = report.flushed_wal,
1315                            final_checkpoint = report.final_checkpoint,
1316                            backup_uploaded = report.backup_uploaded,
1317                            "graceful shutdown complete"
1318                        );
1319                    }
1320                    Err(err) => {
1321                        tracing::error!(error = %err, "graceful shutdown failed");
1322                        // Issue #205 — graceful shutdown returning Err
1323                        // means the runtime is exiting without a clean
1324                        // flush/checkpoint. Operator-grade event so the
1325                        // operator notices the dirty exit even when the
1326                        // process restarts before they read tracing logs.
1327                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1328                            reason: format!("graceful shutdown failed: {err}"),
1329                        }
1330                        .emit_global();
1331                    }
1332                }
1333                std::process::exit(0);
1334            }
1335        });
1336    }
1337
1338    #[cfg(not(unix))]
1339    {
1340        tokio::spawn(async move {
1341            let interrupted = tokio::signal::ctrl_c().await;
1342            if let Err(err) = interrupted {
1343                tracing::warn!(error = %err, "could not install Ctrl+C handler");
1344                return;
1345            }
1346
1347            tracing::info!(
1348                signal = "Ctrl+C",
1349                "lifecycle signal received; shutting down"
1350            );
1351            match runtime.graceful_shutdown(backup_on_shutdown) {
1352                Ok(report) => {
1353                    tracing::info!(
1354                        duration_ms = report.duration_ms,
1355                        flushed_wal = report.flushed_wal,
1356                        final_checkpoint = report.final_checkpoint,
1357                        backup_uploaded = report.backup_uploaded,
1358                        "graceful shutdown complete"
1359                    );
1360                }
1361                Err(err) => {
1362                    tracing::error!(error = %err, "graceful shutdown failed");
1363                }
1364            }
1365            std::process::exit(0);
1366        });
1367    }
1368}
1369
1370/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
1371/// vars. Today this only refreshes the audit log + records the
1372/// reload event; the runtime modules that hold cached secret
1373/// material (S3 backend credentials, admin token, JWT keys) read
1374/// the env on each request so the next call after SIGHUP picks up
1375/// the new file contents automatically. A future extension can
1376/// punch through to the LeaseStore / AuthStore for in-memory
1377/// caches that don't re-read on each call.
1378fn handle_sighup_reload(runtime: &RedDBRuntime) {
1379    let now_ms = std::time::SystemTime::now()
1380        .duration_since(std::time::UNIX_EPOCH)
1381        .map(|d| d.as_millis() as u64)
1382        .unwrap_or(0);
1383    tracing::info!(
1384        target: "reddb::secrets",
1385        ts_unix_ms = now_ms,
1386        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1387    );
1388    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1389    // every emission goes through the typed-field guard. The
1390    // arguments here are static, but using the typed entry point
1391    // keeps the discipline uniform across call sites.
1392    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1393    runtime.audit_log().record_event(
1394        AuditEvent::builder("config/sighup_reload")
1395            .source(AuditAuthSource::System)
1396            .resource("secrets")
1397            .outcome(Outcome::Success)
1398            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1399            .build(),
1400    );
1401}
1402
1403#[inline(never)]
1404fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1405    let workers = config.workers;
1406    let cli_telemetry = config.telemetry.clone();
1407    let db_options = config.to_db_options()?;
1408    let rt_config = detect_runtime_config();
1409    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1410    let (runtime, auth_store, _telemetry_guard) =
1411        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1412    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1413
1414    spawn_admin_metrics_listeners(&runtime, &auth_store);
1415
1416    let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1417        .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1418    let http_backend = http_listener
1419        .local_addr()
1420        .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1421    let http_server = build_http_server(
1422        runtime.clone(),
1423        auth_store.clone(),
1424        http_backend.to_string(),
1425    );
1426    let http_server = apply_http_limits(http_server, &config, &runtime);
1427    let http_handle = http_server.serve_in_background_on(http_listener);
1428
1429    thread::sleep(Duration::from_millis(100));
1430    if http_handle.is_finished() {
1431        return match http_handle.join() {
1432            Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1433            Ok(Err(err)) => Err(err.to_string()),
1434            Err(_) => Err("HTTP backend thread panicked".to_string()),
1435        };
1436    }
1437
1438    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1439        .enable_all()
1440        .worker_threads(worker_threads)
1441        .thread_stack_size(rt_config.stack_size)
1442        .build()
1443        .map_err(|err| format!("tokio runtime: {err}"))?;
1444
1445    let signal_runtime = runtime.clone();
1446    tokio_runtime.block_on(async move {
1447        spawn_lifecycle_signal_handler(signal_runtime).await;
1448        let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1449            .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1450        let grpc_backend = grpc_listener
1451            .local_addr()
1452            .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1453        let grpc_server = RedDBGrpcServer::with_options(
1454            runtime.clone(),
1455            GrpcServerOptions {
1456                bind_addr: grpc_backend.to_string(),
1457                tls: None,
1458            },
1459            auth_store,
1460        );
1461        tokio::spawn(async move {
1462            if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1463                tracing::error!(err = %err, "gRPC backend error");
1464            }
1465        });
1466
1467        let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1468            .await
1469            .map_err(|err| format!("bind internal wire listener: {err}"))?;
1470        let wire_backend = wire_listener
1471            .local_addr()
1472            .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1473        let wire_rt = Arc::new(runtime);
1474        tokio::spawn(async move {
1475            if let Err(err) =
1476                crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1477                    .await
1478            {
1479                tracing::error!(err = %err, "redwire backend error");
1480            }
1481        });
1482
1483        tracing::info!(
1484            bind = %router_bind_addr,
1485            cpus = rt_config.available_cpus,
1486            workers = worker_threads,
1487            "router bootstrapping"
1488        );
1489        serve_tcp_router(TcpProtocolRouterConfig {
1490            bind_addr: router_bind_addr,
1491            grpc_backend,
1492            http_backend,
1493            wire_backend,
1494        })
1495        .await
1496        .map_err(|err| err.to_string())
1497    })
1498}
1499
1500/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1501async fn spawn_wire_listeners(
1502    config: &ServerCommandConfig,
1503    runtime: &RedDBRuntime,
1504    readiness: &mut TransportReadiness,
1505) -> Result<(), String> {
1506    // Plaintext RedWire — TCP or Unix socket
1507    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1508        let wire_rt = Arc::new(runtime.clone());
1509        // Address starting with `unix://` or an absolute filesystem path
1510        // switches to Unix domain sockets.
1511        #[cfg(unix)]
1512        {
1513            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1514                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1515                tokio::spawn(async move {
1516                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1517                        &wire_addr, wire_rt,
1518                    )
1519                    .await
1520                    {
1521                        tracing::error!(err = %e, "redwire unix listener error");
1522                    }
1523                });
1524                return Ok(());
1525            }
1526        }
1527        match tokio::net::TcpListener::bind(&wire_addr).await {
1528            Ok(listener) => {
1529                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1530                tokio::spawn(async move {
1531                    if let Err(e) =
1532                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1533                            .await
1534                    {
1535                        tracing::error!(err = %e, "redwire listener error");
1536                    }
1537                });
1538            }
1539            Err(err) => {
1540                let reason = format!("wire listener bind {wire_addr}: {err}");
1541                readiness.failed(
1542                    "wire",
1543                    &wire_addr,
1544                    config.wire_bind_explicit,
1545                    reason.clone(),
1546                );
1547                if config.wire_bind_explicit {
1548                    tracing::error!(
1549                        transport = "wire",
1550                        bind = %wire_addr,
1551                        error = %err,
1552                        "fatal explicit bind failure"
1553                    );
1554                    return Err(format!("explicit {reason}"));
1555                }
1556                tracing::warn!(
1557                    transport = "wire",
1558                    bind = %wire_addr,
1559                    error = %err,
1560                    "non-fatal implicit bind failure; listener degraded"
1561                );
1562            }
1563        }
1564    }
1565
1566    // RedWire over TLS
1567    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1568        let tls_config = resolve_wire_tls_config(config);
1569        match tls_config {
1570            Ok(tls_cfg) => {
1571                let wire_rt = Arc::new(runtime.clone());
1572                tokio::spawn(async move {
1573                    if let Err(e) =
1574                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1575                            .await
1576                    {
1577                        tracing::error!(err = %e, "redwire+tls listener error");
1578                    }
1579                });
1580            }
1581            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1582        }
1583    }
1584    Ok(())
1585}
1586
1587/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1588///
1589/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1590/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1591/// alongside the native wire listener; the two transports do not
1592/// share a port.
1593fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1594    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1595        let rt = Arc::new(runtime.clone());
1596        tokio::spawn(async move {
1597            let cfg = crate::wire::PgWireConfig {
1598                bind_addr: pg_addr,
1599                ..Default::default()
1600            };
1601            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1602                tracing::error!(err = %e, "pg wire listener error");
1603            }
1604        });
1605    }
1606}
1607
1608/// Resolve gRPC TLS material into PEM bytes.
1609///
1610/// Lookup order, in priority:
1611///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1612///      passed via CLI/env). Both must be present together.
1613///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1614///      to the data dir. Refuses to run without the env flag so an
1615///      operator can't accidentally ship a dev cert in prod.
1616///
1617/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1618/// turning the listener into a mutual-TLS endpoint that requires
1619/// every client to present a chain that anchors at one of the CAs
1620/// in the bundle.
1621fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1622    use crate::utils::secret_file::expand_file_env;
1623
1624    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1625    // surface as warnings; the fallback path (explicit cert paths) will
1626    // cover the common case.
1627    for var in [
1628        "REDDB_GRPC_TLS_CERT",
1629        "REDDB_GRPC_TLS_KEY",
1630        "REDDB_GRPC_TLS_CLIENT_CA",
1631    ] {
1632        if let Err(err) = expand_file_env(var) {
1633            tracing::warn!(
1634                target: "reddb::secrets",
1635                env = %var,
1636                err = %err,
1637                "could not expand *_FILE companion for gRPC TLS"
1638            );
1639        }
1640    }
1641
1642    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1643        (Some(cert), Some(key)) => {
1644            let cert_pem = std::fs::read(cert)
1645                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1646            let key_pem =
1647                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1648            (cert_pem, key_pem)
1649        }
1650        _ => {
1651            // No explicit material → only proceed when dev-mode is on.
1652            let dev = std::env::var("RED_GRPC_TLS_DEV")
1653                .ok()
1654                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1655                .unwrap_or(false);
1656            if !dev {
1657                return Err("gRPC TLS configured but no cert/key supplied — set \
1658                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1659                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1660                    .to_string());
1661            }
1662            let dir = config
1663                .path
1664                .as_ref()
1665                .and_then(|p| p.parent())
1666                .map(PathBuf::from)
1667                .unwrap_or_else(|| PathBuf::from("."));
1668            let (cert_pem_str, key_pem_str) =
1669                crate::wire::tls::generate_self_signed_cert("localhost")
1670                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1671
1672            // Constant-time-friendly fingerprint to stderr so the
1673            // operator can pin a client trust store. We log via
1674            // `tracing::warn!` so it stands out next to ordinary
1675            // listener-online events.
1676            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1677            tracing::warn!(
1678                target: "reddb::security",
1679                transport = "grpc",
1680                cert_sha256 = %fp,
1681                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1682                 DO NOT use in production"
1683            );
1684            // Persist so that restarts reuse the same identity.
1685            let cert_path = dir.join("grpc-tls-cert.pem");
1686            let key_path = dir.join("grpc-tls-key.pem");
1687            if !cert_path.exists() || !key_path.exists() {
1688                let _ = std::fs::create_dir_all(&dir);
1689                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1690                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1691                std::fs::write(&key_path, key_pem_str.as_bytes())
1692                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1693                #[cfg(unix)]
1694                {
1695                    use std::os::unix::fs::PermissionsExt;
1696                    let _ =
1697                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1698                }
1699            }
1700            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1701        }
1702    };
1703
1704    let client_ca_pem = match &config.grpc_tls_client_ca {
1705        Some(path) => Some(
1706            std::fs::read(path)
1707                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1708        ),
1709        None => None,
1710    };
1711
1712    Ok(crate::GrpcTlsOptions {
1713        cert_pem,
1714        key_pem,
1715        client_ca_pem,
1716    })
1717}
1718
1719/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1720/// configured. Logs and continues on TLS-config errors so the plain
1721/// listener stays up; this matches the wire-listener pattern.
1722fn spawn_grpc_tls_listener_if_configured(
1723    config: &ServerCommandConfig,
1724    runtime: RedDBRuntime,
1725    auth_store: Arc<AuthStore>,
1726) {
1727    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1728        return;
1729    };
1730    let tls_opts = match resolve_grpc_tls_options(config) {
1731        Ok(opts) => opts,
1732        Err(err) => {
1733            tracing::error!(
1734                target: "reddb::security",
1735                transport = "grpc",
1736                err = %err,
1737                "gRPC TLS config error; TLS listener will not start"
1738            );
1739            return;
1740        }
1741    };
1742    tokio::spawn(async move {
1743        let server = RedDBGrpcServer::with_options(
1744            runtime,
1745            GrpcServerOptions {
1746                bind_addr: tls_bind.clone(),
1747                tls: Some(tls_opts),
1748            },
1749            auth_store,
1750        );
1751        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1752        if let Err(err) = server.serve().await {
1753            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1754        }
1755    });
1756}
1757
1758/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1759/// lines. Constant-time hash; no token contents leave this fn.
1760fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1761    use sha2::{Digest, Sha256};
1762    let mut h = Sha256::new();
1763    h.update(pem);
1764    let d = h.finalize();
1765    let mut buf = String::with_capacity(64);
1766    for b in d.iter() {
1767        buf.push_str(&format!("{b:02x}"));
1768    }
1769    buf
1770}
1771
1772/// Resolve TLS config: use provided cert/key or auto-generate.
1773fn resolve_wire_tls_config(
1774    config: &ServerCommandConfig,
1775) -> Result<crate::wire::WireTlsConfig, String> {
1776    match (&config.wire_tls_cert, &config.wire_tls_key) {
1777        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1778            cert_path: cert.clone(),
1779            key_path: key.clone(),
1780        }),
1781        _ => {
1782            // Auto-generate self-signed cert
1783            let dir = config
1784                .path
1785                .as_ref()
1786                .and_then(|p| p.parent())
1787                .map(PathBuf::from)
1788                .unwrap_or_else(|| PathBuf::from("."));
1789            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1790        }
1791    }
1792}
1793
1794#[inline(never)]
1795fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1796    let rt_config = detect_runtime_config();
1797    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1798    let cli_telemetry = config.telemetry.clone();
1799    let db_options = config.to_db_options()?;
1800    let mut transport_readiness = TransportReadiness::default();
1801
1802    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1803        .enable_all()
1804        .worker_threads(workers)
1805        .thread_stack_size(rt_config.stack_size)
1806        .build()
1807        .map_err(|err| format!("tokio runtime: {err}"))?;
1808
1809    // Guard lives on the outer thread's stack so it outlives the
1810    // tokio runtime — dropping it only after the listener returns
1811    // flushes the file log writer.
1812    let (runtime, _auth_store, _telemetry_guard) =
1813        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1814    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1815    let signal_runtime = runtime.clone();
1816    tokio_runtime.block_on(async move {
1817        spawn_lifecycle_signal_handler(signal_runtime).await;
1818        spawn_pg_listener(&config, &runtime);
1819        let wire_rt = Arc::new(runtime);
1820        let listener = tokio::net::TcpListener::bind(&wire_addr)
1821            .await
1822            .map_err(|err| {
1823                let reason = format!("wire listener bind {wire_addr}: {err}");
1824                transport_readiness.failed(
1825                    "wire",
1826                    &wire_addr,
1827                    config.wire_bind_explicit,
1828                    reason.clone(),
1829                );
1830                if config.wire_bind_explicit {
1831                    format!("explicit {reason}")
1832                } else {
1833                    reason
1834                }
1835            })?;
1836        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1837        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1838            .await
1839            .map_err(|e| e.to_string())
1840    })
1841}
1842
1843#[inline(never)]
1844fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1845    let rt_config = detect_runtime_config();
1846    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1847    let cli_telemetry = config.telemetry.clone();
1848    let db_options = config.to_db_options()?;
1849
1850    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1851        .enable_all()
1852        .worker_threads(workers)
1853        .thread_stack_size(rt_config.stack_size)
1854        .build()
1855        .map_err(|err| format!("tokio runtime: {err}"))?;
1856
1857    let (runtime, _auth_store, _telemetry_guard) =
1858        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1859    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1860    let signal_runtime = runtime.clone();
1861    tokio_runtime.block_on(async move {
1862        spawn_lifecycle_signal_handler(signal_runtime).await;
1863        let cfg = crate::wire::PgWireConfig {
1864            bind_addr: pg_addr,
1865            ..Default::default()
1866        };
1867        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1868            .await
1869            .map_err(|e| e.to_string())
1870    })
1871}
1872
1873#[inline(never)]
1874fn build_runtime_and_auth_store(
1875    db_options: RedDBOptions,
1876    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1877) -> Result<
1878    (
1879        RedDBRuntime,
1880        Arc<AuthStore>,
1881        Option<crate::telemetry::TelemetryGuard>,
1882    ),
1883    String,
1884> {
1885    // Return the TelemetryGuard so server runners can bind it for
1886    // their full lifetime. Dropping the guard tears down the
1887    // non-blocking log writer thread and, because that writer is
1888    // built with `.lossy(true)`, any subsequent log event routed to
1889    // the file sink is silently dropped — so callers MUST keep the
1890    // returned `Option<TelemetryGuard>` alive until shutdown.
1891    build_runtime_with_telemetry(db_options, cli_telemetry)
1892}
1893
1894/// Open the runtime, initialise structured logging from merged CLI +
1895/// `red_config` settings, and return a guard the caller must keep
1896/// alive for the server lifetime (drop flushes pending log writes).
1897///
1898/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1899/// in red_config, beats the built-in default. A CLI-flag value of
1900/// `None` / empty means "inherit from config or default" — never
1901/// "disable". The one exception is `--no-log-file` which forces
1902/// `log_dir = None` regardless of config.
1903pub(crate) fn build_runtime_with_telemetry(
1904    db_options: RedDBOptions,
1905    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1906) -> Result<
1907    (
1908        RedDBRuntime,
1909        Arc<AuthStore>,
1910        Option<crate::telemetry::TelemetryGuard>,
1911    ),
1912    String,
1913> {
1914    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1915        // Issue #205 — runtime construction failure is the canonical
1916        // StartupFailed phase. The audit sink isn't installed yet
1917        // (it would have been registered inside `with_options`), so
1918        // the emit falls through to tracing+eprintln only — operator
1919        // still sees it on stderr.
1920        let msg = err.to_string();
1921        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1922            phase: "runtime_construction".to_string(),
1923            error: msg.clone(),
1924        }
1925        .emit_global();
1926        msg
1927    })?;
1928
1929    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1930    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1931    // operator asked for a fence; running unfenced would silently
1932    // expose split-brain risk.
1933    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1934        let msg = err.to_string();
1935        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1936            phase: "lease_loop".to_string(),
1937            error: msg.clone(),
1938        }
1939        .emit_global();
1940        msg
1941    })?;
1942
1943    // #213 — edge-triggered disk-space watchdog. Watches the data
1944    // directory; falls back to polling when fanotify is unavailable
1945    // (non-Linux or unprivileged container).
1946    if let Some(data_path) = db_options.data_path.as_deref() {
1947        let watch_dir = data_path.parent().unwrap_or(data_path);
1948        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1949    }
1950
1951    // #214 — inotify config hot-reload watcher. Watches the config file
1952    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1953    // applies hot-reloadable keys without restart.
1954    {
1955        let config_path = crate::runtime::config_overlay::config_file_path();
1956        let store = runtime.db().store();
1957        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1958    }
1959
1960    // Phase 6 logging: merge red_config overrides onto the CLI-built
1961    // telemetry config, then install the global subscriber.
1962    let merged = merge_telemetry_with_config(
1963        cli_telemetry
1964            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1965        &runtime,
1966    );
1967    let telemetry_guard = crate::telemetry::init(merged);
1968
1969    let no_auth = no_auth_active(&db_options);
1970    let auth_store =
1971        if db_options.auth.vault_enabled {
1972            let pager =
1973                runtime.db().store().pager().cloned().ok_or_else(|| {
1974                    "vault requires a paged database (persistent mode)".to_string()
1975                })?;
1976            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1977                .map_err(|err| err.to_string())?;
1978            Arc::new(store)
1979        } else {
1980            Arc::new(AuthStore::new(db_options.auth.clone()))
1981        };
1982    auth_store.configure_control_events(
1983        runtime.control_event_ledger(),
1984        runtime.control_event_config(),
1985    );
1986    // Issue #663 — when `--no-auth` is active, deliberately skip the
1987    // preset machinery. Otherwise a stray `REDDB_USERNAME`+`REDDB_PASSWORD`
1988    // pair in the operator's environment would silently create an admin
1989    // user, defeating the whole point of opting into anonymous mode.
1990    if no_auth {
1991        eprintln!("{NO_AUTH_WARNING}");
1992        tracing::warn!("{NO_AUTH_WARNING}");
1993    } else {
1994        apply_preset(&runtime, &auth_store)?;
1995        maybe_apply_policy_break_glass(&auth_store);
1996    }
1997
1998    // Background session purge (every 5 minutes)
1999    {
2000        let store = Arc::clone(&auth_store);
2001        std::thread::Builder::new()
2002            .name("reddb-session-purge".into())
2003            .spawn(move || loop {
2004                std::thread::sleep(std::time::Duration::from_secs(300));
2005                store.purge_expired_sessions();
2006            })
2007            .ok();
2008    }
2009
2010    Ok((runtime, auth_store, telemetry_guard))
2011}
2012
2013/// Honour `REDDB_POLICY_BREAK_GLASS=1` at boot — see issue #713 and
2014/// the [`crate::auth::self_lock_guard`] module. Anything other than
2015/// `1`/`true`/`yes` (case-insensitive) is treated as not set.
2016fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
2017    use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
2018
2019    let enabled = std::env::var(BREAK_GLASS_ENV)
2020        .ok()
2021        .map(|v| {
2022            let trimmed = v.trim().to_ascii_lowercase();
2023            matches!(trimmed.as_str(), "1" | "true" | "yes")
2024        })
2025        .unwrap_or(false);
2026    if !enabled {
2027        return;
2028    }
2029    let now = crate::utils::now_unix_millis() as u128;
2030    match auth_store.apply_policy_break_glass(now) {
2031        Ok(()) => {
2032            tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2033        }
2034        Err(err) => {
2035            tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2036        }
2037    }
2038}
2039
2040/// Reserved config keys describing first-boot bootstrap state (issue #650).
2041/// Presence of [`BOOTSTRAP_COMPLETED_KEY`] is the idempotency hinge: when
2042/// it is set, [`apply_preset`] silently no-ops on subsequent boots so a
2043/// container restart with the same env is a no-op.
2044pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2045pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2046pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2047
2048/// Env var selecting the bootstrap preset. Default = `simple`.
2049pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2050pub(crate) const PRESET_SIMPLE: &str = "simple";
2051pub(crate) const PRESET_PRODUCTION: &str = "production";
2052pub(crate) const PRESET_REGULATED: &str = "regulated";
2053
2054/// Policy id installed by the `production` preset and attached to the
2055/// first admin. Grants `"*"` on `"*"` so the admin has policy-derived
2056/// broad authority (acceptance #3) — not an authorization bypass.
2057pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2058pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2059pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2060pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2061pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2062
2063/// Apply the bootstrap preset selected by `REDDB_PRESET` (default
2064/// `simple`). Idempotent — if `system.bootstrap.completed` is already
2065/// set, this is a one-line `tracing::info!` no-op and the server proceeds
2066/// (issue #650 acceptance #5).
2067///
2068/// Caller must have already short-circuited the `--no-auth` / `--dev`
2069/// path (issue #663): when that flag is set, the preset must be skipped
2070/// entirely — no admin created, no bootstrap state written.
2071pub(crate) fn apply_preset(
2072    runtime: &RedDBRuntime,
2073    auth_store: &Arc<AuthStore>,
2074) -> Result<(), String> {
2075    let store = runtime.db().store();
2076
2077    if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2078        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2079            runtime,
2080            &runtime.config_registry(),
2081        )?;
2082        tracing::info!("bootstrap state present, skipping preset application");
2083        return Ok(());
2084    }
2085
2086    // `_FILE` companion expansion for k8s secret mounts. Errors here
2087    // (e.g. both `REDDB_PASSWORD` and `REDDB_PASSWORD_FILE` set) are
2088    // operator misconfigs and should fail the boot loudly.
2089    for var in ["REDDB_USERNAME", "REDDB_PASSWORD"] {
2090        crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2091    }
2092
2093    let preset = std::env::var(PRESET_ENV)
2094        .ok()
2095        .map(|s| s.trim().to_string())
2096        .filter(|s| !s.is_empty())
2097        .unwrap_or_else(|| PRESET_SIMPLE.to_string());
2098
2099    if let Ok(path) = std::env::var(crate::cli::bootstrap_manifest::MANIFEST_ENV) {
2100        let path = path.trim();
2101        if !path.is_empty() {
2102            let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2103                runtime,
2104                auth_store,
2105                &runtime.config_registry(),
2106                std::path::Path::new(path),
2107            )?;
2108            persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2109            tracing::info!("bootstrap manifest applied");
2110            return Ok(());
2111        }
2112    }
2113
2114    let first_admin_id = match preset.as_str() {
2115        PRESET_SIMPLE => {
2116            // `simple` is the default low-friction preset. Auth knobs
2117            // remain whatever the CLI/env set; we only persist the
2118            // bootstrap state so subsequent boots are idempotent.
2119            None
2120        }
2121        PRESET_PRODUCTION => Some(apply_production_preset(auth_store)?),
2122        PRESET_REGULATED => {
2123            apply_regulated_preset(runtime, auth_store)?;
2124            None
2125        }
2126        other => {
2127            return Err(format!(
2128                "REDDB_PRESET={other:?} is not recognised (expected `simple`, `production`, or `regulated`)"
2129            ));
2130        }
2131    };
2132
2133    persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2134    tracing::info!(preset = %preset, "bootstrap preset applied");
2135    Ok(())
2136}
2137
2138fn apply_production_preset(auth_store: &Arc<AuthStore>) -> Result<String, String> {
2139    use crate::auth::store::PrincipalRef;
2140    use crate::auth::{policies::Policy, UserId};
2141
2142    let username = std::env::var("REDDB_USERNAME")
2143        .ok()
2144        .filter(|s| !s.is_empty())
2145        .ok_or_else(|| {
2146            "REDDB_PRESET=production requires REDDB_USERNAME (or REDDB_USERNAME_FILE)".to_string()
2147        })?;
2148    let password = std::env::var("REDDB_PASSWORD")
2149        .ok()
2150        .filter(|s| !s.is_empty())
2151        .ok_or_else(|| {
2152            "REDDB_PRESET=production requires REDDB_PASSWORD (or REDDB_PASSWORD_FILE)".to_string()
2153        })?;
2154
2155    // (1) Create the first admin as system-owned + platform-scoped so
2156    // #649's `ManagedConfigGate` accepts them on managed-config writes.
2157    let result = auth_store
2158        .bootstrap_system_admin(&username, &password)
2159        .map_err(|err| format!("bootstrap first admin: {err}"))?;
2160    let first_admin = UserId::platform(result.user.username.clone());
2161
2162    // (2) Install the allow-all policy as an ordinary policy row.
2163    let policy = Policy::from_json_str(&format!(
2164        r#"{{
2165            "id": "{id}",
2166            "version": 1,
2167            "statements": [{{
2168                "effect": "allow",
2169                "actions": ["*"],
2170                "resources": ["*"]
2171            }}]
2172        }}"#,
2173        id = FIRST_ADMIN_ALLOW_ALL_POLICY
2174    ))
2175    .map_err(|err| format!("compile allow-all policy: {err}"))?;
2176    auth_store
2177        .put_policy(policy)
2178        .map_err(|err| format!("install allow-all policy: {err}"))?;
2179
2180    // (3) Attach the policy to the first admin.
2181    auth_store
2182        .attach_policy(
2183            PrincipalRef::User(first_admin.clone()),
2184            FIRST_ADMIN_ALLOW_ALL_POLICY,
2185        )
2186        .map_err(|err| format!("attach allow-all policy: {err}"))?;
2187
2188    Ok(first_admin.to_string())
2189}
2190
2191fn apply_regulated_preset(
2192    runtime: &RedDBRuntime,
2193    auth_store: &Arc<AuthStore>,
2194) -> Result<(), String> {
2195    use crate::auth::policies::Policy;
2196    use crate::auth::registry::EvidenceRequirement;
2197
2198    runtime.query_audit().enable_infrastructure();
2199
2200    let policy = Policy::from_json_str(&format!(
2201        r#"{{
2202            "id": "{id}",
2203            "version": 1,
2204            "statements": [
2205                {{
2206                    "effect": "deny",
2207                    "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2208                    "resources": ["policy:{id}"]
2209                }},
2210                {{
2211                    "effect": "deny",
2212                    "actions": ["config:write"],
2213                    "resources": [
2214                        "config:{audit}.*",
2215                        "config:{evidence}.*",
2216                        "config:{query_audit}.*"
2217                    ]
2218                }}
2219            ]
2220        }}"#,
2221        id = REGULATED_PROTECT_MANAGED_POLICY,
2222        audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2223        evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2224        query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2225    ))
2226    .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2227    auth_store
2228        .put_policy(policy)
2229        .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2230
2231    let now_ms = crate::utils::now_unix_millis() as u128;
2232    let entries = vec![
2233        regulated_registry_entry(
2234            REGULATED_PROTECT_MANAGED_POLICY,
2235            crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2236            "iam_policy",
2237            "policy:*",
2238            &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2239            EvidenceRequirement::Metadata,
2240            now_ms,
2241        ),
2242        regulated_registry_entry(
2243            REGULATED_AUDIT_CONFIG_NAMESPACE,
2244            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2245            "config_namespace",
2246            "config:write",
2247            &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2248            EvidenceRequirement::Metadata,
2249            now_ms,
2250        ),
2251        regulated_registry_entry(
2252            REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2253            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2254            "config_namespace",
2255            "config:write",
2256            &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2257            EvidenceRequirement::Metadata,
2258            now_ms,
2259        ),
2260        regulated_registry_entry(
2261            REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2262            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2263            "config_namespace",
2264            "config:write",
2265            &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2266            EvidenceRequirement::Metadata,
2267            now_ms,
2268        ),
2269    ];
2270
2271    for entry in entries.iter().cloned() {
2272        runtime
2273            .config_registry()
2274            .restore_bootstrap_entry(entry)
2275            .map_err(|err| format!("install regulated registry entry: {err}"))?;
2276    }
2277    crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2278    Ok(())
2279}
2280
2281fn regulated_registry_entry(
2282    id: &str,
2283    resource_type: &str,
2284    schema: &str,
2285    required_action: &str,
2286    required_resource: &str,
2287    evidence_requirement: crate::auth::registry::EvidenceRequirement,
2288    updated_at_ms: u128,
2289) -> crate::auth::registry::ConfigRegistryEntry {
2290    crate::auth::registry::ConfigRegistryEntry {
2291        id: id.to_string(),
2292        version: 1,
2293        resource_type: resource_type.to_string(),
2294        schema: schema.to_string(),
2295        mutability: crate::auth::registry::Mutability::Immutable,
2296        sensitivity: crate::auth::registry::Sensitivity::Internal,
2297        managed: true,
2298        required_action: required_action.to_string(),
2299        required_resource: required_resource.to_string(),
2300        evidence_requirement,
2301        updated_by: "system:regulated-preset".to_string(),
2302        updated_at_ms,
2303    }
2304}
2305
2306fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2307    let store = runtime.db().store();
2308    let mut tree = crate::serde_json::Map::new();
2309    tree.insert(
2310        BOOTSTRAP_COMPLETED_KEY.to_string(),
2311        crate::serde_json::Value::Bool(true),
2312    );
2313    tree.insert(
2314        BOOTSTRAP_PRESET_KEY.to_string(),
2315        crate::serde_json::Value::String(preset.to_string()),
2316    );
2317    if let Some(id) = first_admin_id {
2318        tree.insert(
2319            BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2320            crate::serde_json::Value::String(id.to_string()),
2321        );
2322    }
2323    let json = crate::serde_json::Value::Object(tree);
2324    store.set_config_tree("", &json);
2325}
2326
2327/// Read `red.logging.*` keys from the persistent config store and
2328/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
2329/// explicit CLI flag > red_config > built-in default.
2330///
2331/// The "was a flag passed" signal comes from the `*_explicit` bools
2332/// on `TelemetryConfig`, populated by the CLI parser. This replaces
2333/// an earlier equality-to-default heuristic that silently dropped
2334/// config whenever the CLI-derived default diverged from
2335/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
2336/// non-TTY `format`) and that silently overrode `--no-log-file`.
2337fn merge_telemetry_with_config(
2338    mut cli: crate::telemetry::TelemetryConfig,
2339    runtime: &RedDBRuntime,
2340) -> crate::telemetry::TelemetryConfig {
2341    use crate::storage::schema::Value;
2342
2343    let store = runtime.db().store();
2344
2345    if !cli.level_explicit {
2346        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2347            cli.level_filter = v.to_string();
2348        }
2349    }
2350    if !cli.format_explicit {
2351        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2352            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2353                cli.format = parsed;
2354            }
2355        }
2356    }
2357    if !cli.rotation_keep_days_explicit {
2358        match store.get_config("red.logging.keep_days") {
2359            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2360                cli.rotation_keep_days = n as u16
2361            }
2362            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2363                cli.rotation_keep_days = n as u16
2364            }
2365            Some(Value::Text(v)) => {
2366                if let Ok(n) = v.parse::<u16>() {
2367                    cli.rotation_keep_days = n;
2368                }
2369            }
2370            _ => {}
2371        }
2372    }
2373    if !cli.file_prefix_explicit {
2374        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2375            if !v.is_empty() {
2376                cli.file_prefix = v.to_string();
2377            }
2378        }
2379    }
2380    // --no-log-file is a kill-switch: config cannot resurrect the
2381    // file sink. Explicit --log-dir also wins.
2382    if !cli.log_dir_explicit && !cli.log_file_disabled {
2383        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2384            if !v.is_empty() {
2385                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2386            }
2387        }
2388    }
2389
2390    cli
2391}
2392
2393#[cfg(test)]
2394mod telemetry_merge_tests {
2395    use super::*;
2396    use crate::telemetry::{LogFormat, TelemetryConfig};
2397
2398    fn fresh_runtime() -> RedDBRuntime {
2399        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2400    }
2401
2402    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2403        runtime
2404            .db()
2405            .store()
2406            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2407    }
2408
2409    fn cli_base() -> TelemetryConfig {
2410        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
2411        // log_dir = Some(...), format = Json. Nothing marked explicit.
2412        TelemetryConfig {
2413            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2414            format: LogFormat::Json,
2415            ..Default::default()
2416        }
2417    }
2418
2419    #[test]
2420    fn config_log_dir_promoted_when_flag_absent() {
2421        let runtime = fresh_runtime();
2422        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2423        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2424        assert_eq!(
2425            merged.log_dir.as_deref(),
2426            Some(std::path::Path::new("/var/log/reddb"))
2427        );
2428    }
2429
2430    #[test]
2431    fn explicit_log_dir_wins_over_config() {
2432        let runtime = fresh_runtime();
2433        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2434        let mut cli = cli_base();
2435        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2436        cli.log_dir_explicit = true;
2437        let merged = merge_telemetry_with_config(cli, &runtime);
2438        assert_eq!(
2439            merged.log_dir.as_deref(),
2440            Some(std::path::Path::new("/custom/dir"))
2441        );
2442    }
2443
2444    #[test]
2445    fn no_log_file_beats_config_log_dir() {
2446        let runtime = fresh_runtime();
2447        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2448        let mut cli = cli_base();
2449        cli.log_dir = None;
2450        cli.log_file_disabled = true;
2451        let merged = merge_telemetry_with_config(cli, &runtime);
2452        assert!(
2453            merged.log_dir.is_none(),
2454            "--no-log-file must veto config dir"
2455        );
2456    }
2457
2458    #[test]
2459    fn config_format_promoted_on_non_tty_default() {
2460        // On non-TTY, default_telemetry_for_path yields format=Json even
2461        // though TelemetryConfig::default() is Pretty. The old equality
2462        // check silently dropped config here.
2463        let runtime = fresh_runtime();
2464        set_str(&runtime, "red.logging.format", "pretty");
2465        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2466        assert_eq!(merged.format, LogFormat::Pretty);
2467    }
2468
2469    #[test]
2470    fn explicit_format_wins_over_config() {
2471        let runtime = fresh_runtime();
2472        set_str(&runtime, "red.logging.format", "pretty");
2473        let mut cli = cli_base();
2474        cli.format = LogFormat::Json;
2475        cli.format_explicit = true;
2476        let merged = merge_telemetry_with_config(cli, &runtime);
2477        assert_eq!(merged.format, LogFormat::Json);
2478    }
2479}
2480
2481#[inline(never)]
2482fn build_http_server(
2483    runtime: RedDBRuntime,
2484    auth_store: Arc<AuthStore>,
2485    bind_addr: String,
2486) -> RedDBServer {
2487    build_http_server_with_transport_readiness(
2488        runtime,
2489        auth_store,
2490        bind_addr,
2491        TransportReadiness::default(),
2492    )
2493}
2494
2495/// Apply the resolved HTTP limits to a freshly-built `RedDBServer`.
2496///
2497/// Centralised here so every `run_*` path goes through the same
2498/// resolver and the structured startup log line carries the same
2499/// `http_limits.*` fields regardless of transport combination.
2500fn apply_http_limits(
2501    server: RedDBServer,
2502    config: &ServerCommandConfig,
2503    runtime: &RedDBRuntime,
2504) -> RedDBServer {
2505    let store = runtime.db().store();
2506    let resolved =
2507        crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2508            .get_config(key)
2509        {
2510            Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2511            Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2512            Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2513            _ => None,
2514        });
2515    tracing::info!(
2516        target: "reddb::http_limits",
2517        max_handlers = resolved.max_handlers,
2518        handler_timeout_ms = resolved.handler_timeout_ms,
2519        retry_after_secs = resolved.retry_after_secs,
2520        "http_limits resolved"
2521    );
2522    server.with_http_limits(resolved)
2523}
2524
2525#[inline(never)]
2526fn build_http_server_with_transport_readiness(
2527    runtime: RedDBRuntime,
2528    auth_store: Arc<AuthStore>,
2529    bind_addr: String,
2530    transport_readiness: TransportReadiness,
2531) -> RedDBServer {
2532    RedDBServer::with_options(
2533        runtime,
2534        ServerOptions {
2535            bind_addr,
2536            transport_readiness,
2537            ..ServerOptions::default()
2538        },
2539    )
2540    .with_auth(auth_store)
2541}
2542
2543/// PLAN.md Phase 6.2 — build a listener that only serves
2544/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
2545/// when the env var has no host (loopback-only by default per spec).
2546#[inline(never)]
2547fn build_admin_only_server(
2548    runtime: RedDBRuntime,
2549    auth_store: Arc<AuthStore>,
2550    bind_addr: String,
2551) -> RedDBServer {
2552    RedDBServer::with_options(
2553        runtime,
2554        ServerOptions {
2555            bind_addr,
2556            surface: crate::server::ServerSurface::AdminOnly,
2557            ..ServerOptions::default()
2558        },
2559    )
2560    .with_auth(auth_store)
2561}
2562
2563/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
2564/// + `/health/*`. Suitable for Prometheus scrape ports that may be
2565///   exposed wider than the admin port.
2566#[inline(never)]
2567fn build_metrics_only_server(
2568    runtime: RedDBRuntime,
2569    auth_store: Arc<AuthStore>,
2570    bind_addr: String,
2571) -> RedDBServer {
2572    RedDBServer::with_options(
2573        runtime,
2574        ServerOptions {
2575            bind_addr,
2576            surface: crate::server::ServerSurface::MetricsOnly,
2577            ..ServerOptions::default()
2578        },
2579    )
2580    .with_auth(auth_store)
2581}
2582
2583/// Spawn dedicated admin / metrics listeners when the operator set
2584/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
2585/// unset the existing listener keeps serving everything (back-compat).
2586fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2587    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2588        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2589        let _ = server.serve_in_background();
2590        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2591    }
2592    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2593        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2594        let _ = server.serve_in_background();
2595        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2596    }
2597}
2598
2599#[inline(never)]
2600fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2601    let cli_telemetry = config.telemetry.clone();
2602    let mut transport_readiness = TransportReadiness::default();
2603    let Some(listener) = bind_listener_for_startup(
2604        &mut transport_readiness,
2605        "http",
2606        &bind_addr,
2607        config.http_bind_explicit,
2608    )?
2609    else {
2610        return Err(format!(
2611            "no HTTP listener started; implicit bind {} failed",
2612            bind_addr
2613        ));
2614    };
2615    let db_options = config.to_db_options()?;
2616    let (runtime, auth_store, _telemetry_guard) =
2617        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2618    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2619    spawn_admin_metrics_listeners(&runtime, &auth_store);
2620    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2621    let server = build_http_server_with_transport_readiness(
2622        runtime.clone(),
2623        auth_store,
2624        bind_addr.clone(),
2625        transport_readiness,
2626    );
2627    let server = apply_http_limits(server, &config, &runtime);
2628    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2629    server.serve_on(listener).map_err(|err| err.to_string())
2630}
2631
2632/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
2633/// rustls-terminated listener alongside the plain HTTP server. Cert
2634/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
2635///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
2636///   is auto-generated next to the data directory (refused otherwise).
2637fn spawn_http_tls_listener(
2638    config: &ServerCommandConfig,
2639    runtime: &RedDBRuntime,
2640    auth_store: &Arc<AuthStore>,
2641) -> Result<(), String> {
2642    let Some(addr) = config.http_tls_bind_addr.clone() else {
2643        return Ok(());
2644    };
2645
2646    let tls_config = resolve_http_tls_config(config)?;
2647    let server_config = crate::server::tls::build_server_config(&tls_config)
2648        .map_err(|err| format!("HTTP TLS: {err}"))?;
2649
2650    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2651    let server = apply_http_limits(server, config, runtime);
2652    let _handle = server.serve_tls_in_background(server_config);
2653    tracing::info!(
2654        transport = "https",
2655        bind = %addr,
2656        mtls = %tls_config.client_ca_path.is_some(),
2657        "TLS listener online"
2658    );
2659    Ok(())
2660}
2661
2662/// Resolve the HTTP TLS config from CLI / env / dev defaults.
2663fn resolve_http_tls_config(
2664    config: &ServerCommandConfig,
2665) -> Result<crate::server::tls::HttpTlsConfig, String> {
2666    match (&config.http_tls_cert, &config.http_tls_key) {
2667        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2668            cert_path: cert.clone(),
2669            key_path: key.clone(),
2670            client_ca_path: config.http_tls_client_ca.clone(),
2671        }),
2672        (None, None) => {
2673            // Dev-mode auto-generate next to the data directory.
2674            let dir = config
2675                .path
2676                .as_ref()
2677                .and_then(|p| p.parent().map(std::path::PathBuf::from))
2678                .unwrap_or_else(|| std::path::PathBuf::from("."));
2679            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2680                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2681            Ok(crate::server::tls::HttpTlsConfig {
2682                cert_path: auto.cert_path,
2683                key_path: auto.key_path,
2684                client_ca_path: config.http_tls_client_ca.clone(),
2685            })
2686        }
2687        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2688    }
2689}
2690
2691#[inline(never)]
2692fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2693    let workers = config.workers;
2694    let cli_telemetry = config.telemetry.clone();
2695    let db_options = config.to_db_options()?;
2696    let rt_config = detect_runtime_config();
2697    let mut transport_readiness = TransportReadiness::default();
2698    let Some(grpc_listener) = bind_listener_for_startup(
2699        &mut transport_readiness,
2700        "grpc",
2701        &bind_addr,
2702        config.grpc_bind_explicit,
2703    )?
2704    else {
2705        return Err(format!(
2706            "no gRPC listener started; implicit bind {} failed",
2707            bind_addr
2708        ));
2709    };
2710
2711    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2712
2713    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2714        .enable_all()
2715        .worker_threads(worker_threads)
2716        .thread_stack_size(rt_config.stack_size)
2717        .build()
2718        .map_err(|err| format!("tokio runtime: {err}"))?;
2719
2720    // Guard lives on the outer stack so it outlives the tokio runtime.
2721    let (runtime, auth_store, _telemetry_guard) =
2722        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2723    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2724    let signal_runtime = runtime.clone();
2725    tokio_runtime.block_on(async move {
2726        spawn_lifecycle_signal_handler(signal_runtime).await;
2727        // Start wire protocol listeners (plaintext + TLS)
2728        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2729
2730        // Start PostgreSQL wire listener when --pg-bind is configured.
2731        spawn_pg_listener(&config, &runtime);
2732
2733        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
2734        // it spawns a separate listener so plaintext + TLS can run
2735        // side-by-side (50051 plain + 50052 TLS, etc.).
2736        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2737
2738        let server = RedDBGrpcServer::with_options(
2739            runtime,
2740            GrpcServerOptions {
2741                bind_addr: bind_addr.clone(),
2742                tls: None,
2743            },
2744            auth_store,
2745        );
2746
2747        tracing::info!(
2748            transport = "grpc",
2749            bind = %bind_addr,
2750            cpus = rt_config.available_cpus,
2751            workers = worker_threads,
2752            "listener online"
2753        );
2754        server
2755            .serve_on(grpc_listener)
2756            .await
2757            .map_err(|err| err.to_string())
2758    })
2759}
2760
2761#[inline(never)]
2762fn run_dual_server(
2763    config: ServerCommandConfig,
2764    grpc_bind_addr: String,
2765    http_bind_addr: String,
2766) -> Result<(), String> {
2767    let workers = config.workers;
2768    let cli_telemetry = config.telemetry.clone();
2769    let db_options = config.to_db_options()?;
2770    let rt_config = detect_runtime_config();
2771    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2772    let mut transport_readiness = TransportReadiness::default();
2773    let http_listener = bind_listener_for_startup(
2774        &mut transport_readiness,
2775        "http",
2776        &http_bind_addr,
2777        config.http_bind_explicit,
2778    )?;
2779    let grpc_listener = bind_listener_for_startup(
2780        &mut transport_readiness,
2781        "grpc",
2782        &grpc_bind_addr,
2783        config.grpc_bind_explicit,
2784    )?;
2785    if http_listener.is_none() && grpc_listener.is_none() {
2786        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2787    }
2788    let (runtime, auth_store, _telemetry_guard) =
2789        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2790    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2791
2792    spawn_admin_metrics_listeners(&runtime, &auth_store);
2793    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2794
2795    let http_handle = if let Some(listener) = http_listener {
2796        let http_server = build_http_server_with_transport_readiness(
2797            runtime.clone(),
2798            auth_store.clone(),
2799            http_bind_addr.clone(),
2800            transport_readiness.clone(),
2801        );
2802        let http_server = apply_http_limits(http_server, &config, &runtime);
2803        Some(http_server.serve_in_background_on(listener))
2804    } else {
2805        None
2806    };
2807
2808    thread::sleep(Duration::from_millis(150));
2809    if let Some(handle) = http_handle.as_ref() {
2810        if handle.is_finished() {
2811            let handle = http_handle.unwrap();
2812            return match handle.join() {
2813                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2814                Ok(Err(err)) => Err(err.to_string()),
2815                Err(_) => Err("HTTP server thread panicked".to_string()),
2816            };
2817        }
2818    }
2819    if grpc_listener.is_none() {
2820        let Some(handle) = http_handle else {
2821            return Err("no listener started".to_string());
2822        };
2823        return match handle.join() {
2824            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2825            Ok(Err(err)) => Err(err.to_string()),
2826            Err(_) => Err("HTTP server thread panicked".to_string()),
2827        };
2828    }
2829    let grpc_listener = grpc_listener.expect("checked above");
2830
2831    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2832        .enable_all()
2833        .worker_threads(worker_threads)
2834        .thread_stack_size(rt_config.stack_size)
2835        .build()
2836        .map_err(|err| format!("tokio runtime: {err}"))?;
2837
2838    let signal_runtime = runtime.clone();
2839    tokio_runtime.block_on(async move {
2840        spawn_lifecycle_signal_handler(signal_runtime).await;
2841        // Start wire protocol listeners (plaintext + TLS)
2842        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2843
2844        // Start PostgreSQL wire listener when --pg-bind is configured.
2845        spawn_pg_listener(&config, &runtime);
2846
2847        // Optional TLS gRPC listener — runs alongside the plaintext one.
2848        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2849
2850        let server = RedDBGrpcServer::with_options(
2851            runtime,
2852            GrpcServerOptions {
2853                bind_addr: grpc_bind_addr.clone(),
2854                tls: None,
2855            },
2856            auth_store,
2857        );
2858
2859        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2860        tracing::info!(
2861            transport = "grpc",
2862            bind = %grpc_bind_addr,
2863            cpus = rt_config.available_cpus,
2864            workers = worker_threads,
2865            "listener online"
2866        );
2867        server
2868            .serve_on(grpc_listener)
2869            .await
2870            .map_err(|err| err.to_string())
2871    })
2872}
2873
2874#[cfg(test)]
2875mod tests {
2876    use super::*;
2877
2878    #[test]
2879    fn render_systemd_unit_contains_expected_execstart() {
2880        let config = SystemdServiceConfig {
2881            service_name: "reddb".to_string(),
2882            binary_path: PathBuf::from("/usr/local/bin/red"),
2883            run_user: "reddb".to_string(),
2884            run_group: "reddb".to_string(),
2885            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2886            router_bind_addr: None,
2887            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2888            http_bind_addr: None,
2889        };
2890
2891        let unit = render_systemd_unit(&config);
2892        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2893        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2894    }
2895
2896    #[test]
2897    fn systemd_service_config_derives_paths() {
2898        let config = SystemdServiceConfig {
2899            service_name: "reddb-api".to_string(),
2900            binary_path: PathBuf::from("/usr/local/bin/red"),
2901            run_user: "reddb".to_string(),
2902            run_group: "reddb".to_string(),
2903            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2904            router_bind_addr: None,
2905            grpc_bind_addr: None,
2906            http_bind_addr: Some("127.0.0.1:5055".to_string()),
2907        };
2908
2909        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2910        assert_eq!(
2911            config.unit_path(),
2912            PathBuf::from("/etc/systemd/system/reddb-api.service")
2913        );
2914    }
2915
2916    #[test]
2917    fn render_systemd_unit_supports_dual_transport() {
2918        let config = SystemdServiceConfig {
2919            service_name: "reddb".to_string(),
2920            binary_path: PathBuf::from("/usr/local/bin/red"),
2921            run_user: "reddb".to_string(),
2922            run_group: "reddb".to_string(),
2923            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2924            router_bind_addr: None,
2925            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2926            http_bind_addr: Some("0.0.0.0:5055".to_string()),
2927        };
2928
2929        let unit = render_systemd_unit(&config);
2930        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2931        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2932    }
2933
2934    #[test]
2935    fn render_systemd_unit_supports_router_mode() {
2936        let config = SystemdServiceConfig {
2937            service_name: "reddb".to_string(),
2938            binary_path: PathBuf::from("/usr/local/bin/red"),
2939            run_user: "reddb".to_string(),
2940            run_group: "reddb".to_string(),
2941            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2942            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2943            grpc_bind_addr: None,
2944            http_bind_addr: None,
2945        };
2946
2947        let unit = render_systemd_unit(&config);
2948        assert!(unit.contains("--bind 127.0.0.1:5050"));
2949        assert!(!unit.contains("--grpc-bind"));
2950        assert!(!unit.contains("--http-bind"));
2951    }
2952
2953    #[test]
2954    fn explicit_bind_collision_is_fatal() {
2955        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2956        let addr = held.local_addr().expect("held addr").to_string();
2957        let mut readiness = TransportReadiness::default();
2958
2959        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2960
2961        assert!(error.contains("explicit http listener bind"));
2962        assert_eq!(readiness.active.len(), 0);
2963        assert_eq!(readiness.failed.len(), 1);
2964        assert!(readiness.failed[0].explicit);
2965        assert_eq!(readiness.failed[0].bind_addr, addr);
2966    }
2967
2968    // ---------- Issue #663 — `--no-auth` / `--dev` ----------
2969
2970    // Env access in tests is process-global; serialise the two
2971    // `--no-auth` tests so the REDDB_USERNAME / REDDB_PASSWORD pair
2972    // one of them sets cannot leak into the other under cargo's
2973    // default parallel runner.
2974    fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2975        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2976        LOCK.get_or_init(|| std::sync::Mutex::new(()))
2977    }
2978
2979    fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
2980        ServerCommandConfig {
2981            path: None,
2982            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2983            router_bind_explicit: false,
2984            grpc_bind_addr: None,
2985            grpc_bind_explicit: false,
2986            grpc_tls_bind_addr: None,
2987            grpc_tls_cert: None,
2988            grpc_tls_key: None,
2989            grpc_tls_client_ca: None,
2990            http_bind_addr: None,
2991            http_bind_explicit: false,
2992            http_tls_bind_addr: None,
2993            http_tls_cert: None,
2994            http_tls_key: None,
2995            http_tls_client_ca: None,
2996            wire_bind_addr: None,
2997            wire_bind_explicit: false,
2998            wire_tls_bind_addr: None,
2999            wire_tls_cert: None,
3000            wire_tls_key: None,
3001            pg_bind_addr: None,
3002            create_if_missing: true,
3003            read_only: false,
3004            role: "standalone".to_string(),
3005            primary_addr: None,
3006            // Operator-set `--vault`: `--no-auth` must override this
3007            // alongside REDDB_USERNAME/PASSWORD.
3008            vault: true,
3009            no_auth,
3010            workers: None,
3011            telemetry: None,
3012            http_limits_cli: crate::server::HttpLimitsCliInput::default(),
3013        }
3014    }
3015
3016    #[test]
3017    fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
3018        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3019        // Pre-existing env that *would* turn auth on if `--no-auth`
3020        // weren't the last word. The acceptance criterion is that
3021        // the flag wins over env.
3022        // SAFETY: serialised by `no_auth_env_lock` above.
3023        unsafe {
3024            std::env::set_var("REDDB_USERNAME", "admin");
3025            std::env::set_var("REDDB_PASSWORD", "hunter2");
3026        }
3027        let config = no_auth_test_config(true);
3028        let options = config.to_db_options().expect("to_db_options");
3029
3030        assert!(no_auth_active(&options), "metadata should be stamped");
3031        assert!(!options.auth.enabled, "auth.enabled must be forced off");
3032        assert!(
3033            !options.auth.require_auth,
3034            "require_auth must be forced off"
3035        );
3036        assert!(
3037            !options.auth.vault_enabled,
3038            "vault_enabled must be forced off (overrides --vault)"
3039        );
3040        assert_eq!(
3041            options.metadata.get(NO_AUTH_META).map(String::as_str),
3042            Some("true"),
3043        );
3044
3045        // SAFETY: serialised by `no_auth_env_lock` above.
3046        unsafe {
3047            std::env::remove_var("REDDB_USERNAME");
3048            std::env::remove_var("REDDB_PASSWORD");
3049        }
3050    }
3051
3052    #[test]
3053    fn default_behaviour_without_no_auth_flag_is_unchanged() {
3054        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3055        let config = no_auth_test_config(false);
3056        let options = config.to_db_options().expect("to_db_options");
3057
3058        assert!(
3059            !no_auth_active(&options),
3060            "default boot must not be marked no-auth"
3061        );
3062        assert!(
3063            options.metadata.get(NO_AUTH_META).is_none(),
3064            "metadata key must be absent when flag is off"
3065        );
3066        // `--vault` should still take effect when `--no-auth` is not set.
3067        assert!(options.auth.vault_enabled);
3068    }
3069
3070    #[test]
3071    fn no_auth_active_blocks_bootstrap_from_env() {
3072        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3073        // SAFETY: serialised by `no_auth_env_lock` above. The pair
3074        // would normally cause `AuthStore::bootstrap_from_env` to
3075        // create an admin; the boot pipeline must suppress that call
3076        // whenever `no_auth_active` is true.
3077        unsafe {
3078            std::env::set_var("REDDB_USERNAME", "admin");
3079            std::env::set_var("REDDB_PASSWORD", "hunter2");
3080        }
3081
3082        let options = no_auth_test_config(true)
3083            .to_db_options()
3084            .expect("to_db_options");
3085
3086        // Mirror the exact branch in `build_runtime_with_telemetry`:
3087        // build a non-vault AuthStore from `options.auth`, then call
3088        // `bootstrap_from_env` *only* when the no-auth gate is off.
3089        let auth_store = AuthStore::new(options.auth.clone());
3090        if !no_auth_active(&options) {
3091            auth_store.bootstrap_from_env();
3092        }
3093
3094        assert!(
3095            auth_store.needs_bootstrap(),
3096            "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3097        );
3098
3099        // SAFETY: serialised by `no_auth_env_lock` above.
3100        unsafe {
3101            std::env::remove_var("REDDB_USERNAME");
3102            std::env::remove_var("REDDB_PASSWORD");
3103        }
3104    }
3105
3106    // ---------- Issue #650 — bootstrap presets ----------
3107
3108    // Preset tests mutate process-global env (`REDDB_PRESET`,
3109    // `REDDB_USERNAME`, `REDDB_PASSWORD`) and the global tracing
3110    // subscriber. Share the no_auth lock so they don't race with each
3111    // other or with the --no-auth tests above.
3112    fn clear_preset_env() {
3113        // SAFETY: callers hold `no_auth_env_lock()`.
3114        unsafe {
3115            std::env::remove_var(PRESET_ENV);
3116            std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3117            std::env::remove_var("REDDB_USERNAME");
3118            std::env::remove_var("REDDB_PASSWORD");
3119            std::env::remove_var("REDDB_USERNAME_FILE");
3120            std::env::remove_var("REDDB_PASSWORD_FILE");
3121        }
3122    }
3123
3124    fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3125        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3126        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3127        (runtime, auth_store)
3128    }
3129
3130    #[test]
3131    fn simple_preset_is_default_and_persists_state() {
3132        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3133        clear_preset_env();
3134
3135        let (runtime, auth_store) = fresh_runtime_and_store();
3136        apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3137
3138        // No admin was created — `simple` is anonymous-friendly.
3139        assert!(
3140            auth_store.needs_bootstrap(),
3141            "simple preset must not create an admin"
3142        );
3143
3144        // Bootstrap state persisted so the next boot is a no-op.
3145        let store = runtime.db().store();
3146        let completed = store
3147            .get_config(BOOTSTRAP_COMPLETED_KEY)
3148            .expect("completed key persisted");
3149        assert!(matches!(
3150            completed,
3151            crate::storage::schema::Value::Boolean(true)
3152        ));
3153        let preset = store
3154            .get_config(BOOTSTRAP_PRESET_KEY)
3155            .expect("preset key persisted");
3156        match preset {
3157            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3158            other => panic!("expected Text(simple), got {other:?}"),
3159        }
3160        assert!(
3161            store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3162            "simple preset must not record a first admin"
3163        );
3164
3165        clear_preset_env();
3166    }
3167
3168    #[test]
3169    fn production_preset_creates_first_admin_with_allow_all_policy() {
3170        use crate::auth::policies::{EvalContext, ResourceRef};
3171        use crate::auth::UserId;
3172
3173        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3174        clear_preset_env();
3175        // SAFETY: env serialised by `no_auth_env_lock`.
3176        unsafe {
3177            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3178            std::env::set_var("REDDB_USERNAME", "ops");
3179            std::env::set_var("REDDB_PASSWORD", "hunter2");
3180        }
3181
3182        let (runtime, auth_store) = fresh_runtime_and_store();
3183        apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3184
3185        // Admin exists and the auth store is sealed.
3186        assert!(
3187            !auth_store.needs_bootstrap(),
3188            "production preset must seal bootstrap"
3189        );
3190        let users = auth_store.list_users();
3191        assert_eq!(users.len(), 1);
3192        let admin = &users[0];
3193        assert_eq!(admin.username, "ops");
3194        assert!(
3195            admin.system_owned,
3196            "first admin must be system-owned to pass the managed-config gate"
3197        );
3198        assert!(
3199            admin.tenant_id.is_none(),
3200            "first admin must be platform-scoped (tenant=None)"
3201        );
3202
3203        // Allow-all policy was installed and attached to the first admin.
3204        let policy = auth_store
3205            .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3206            .expect("allow-all policy installed");
3207        assert!(!policy.statements.is_empty());
3208
3209        // Verify policy-derived authority via the policy evaluator —
3210        // not a bypass. Any action on any resource must Allow.
3211        let actor = UserId::platform("ops");
3212        let ctx = EvalContext {
3213            principal_tenant: None,
3214            current_tenant: None,
3215            peer_ip: None,
3216            mfa_present: false,
3217            now_ms: 1_700_000_000_000,
3218            principal_is_admin_role: true,
3219            principal_is_system_owned: true,
3220            principal_is_platform_scoped: true,
3221        };
3222        let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3223        assert!(
3224            auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3225            "allow-all policy must grant arbitrary actions via the evaluator"
3226        );
3227
3228        // Persisted state records the first admin id.
3229        let store = runtime.db().store();
3230        match store
3231            .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3232            .expect("first_admin_id persisted")
3233        {
3234            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3235            other => panic!("expected Text(ops), got {other:?}"),
3236        }
3237        match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3238            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3239            other => panic!("expected Text(production), got {other:?}"),
3240        }
3241
3242        clear_preset_env();
3243    }
3244
3245    #[test]
3246    fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3247        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3248        clear_preset_env();
3249        // SAFETY: env serialised by `no_auth_env_lock`.
3250        unsafe {
3251            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3252        }
3253
3254        let (runtime, auth_store) = fresh_runtime_and_store();
3255        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3256
3257        assert!(runtime.query_audit().is_enabled());
3258        assert!(runtime.query_audit().rules().is_empty());
3259        assert!(
3260            runtime
3261                .db()
3262                .store()
3263                .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3264                .is_some(),
3265            "regulated preset should create the query-audit stream"
3266        );
3267
3268        runtime
3269            .execute_query("CREATE TABLE docs (id INT)")
3270            .expect("create table");
3271        runtime
3272            .execute_query("INSERT INTO docs (id) VALUES (1)")
3273            .expect("insert");
3274        runtime.execute_query("SELECT * FROM docs").expect("select");
3275        let rows = runtime
3276            .db()
3277            .store()
3278            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3279            .expect("query audit collection")
3280            .query_all(|_| true);
3281        assert!(
3282            rows.is_empty(),
3283            "regulated preset must not globally audit every query"
3284        );
3285
3286        clear_preset_env();
3287    }
3288
3289    #[test]
3290    fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3291        use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3292        use crate::auth::store::PrincipalRef;
3293        use crate::auth::{Role, UserId};
3294        use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3295        use crate::storage::schema::Value;
3296
3297        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3298        clear_preset_env();
3299        // SAFETY: env serialised by `no_auth_env_lock`.
3300        unsafe {
3301            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3302        }
3303
3304        let options = no_auth_test_config(false)
3305            .to_db_options()
3306            .expect("regulated options");
3307        assert!(
3308            options.control_events.compliance_mode,
3309            "regulated preset must enable fail-closed control evidence before runtime boot"
3310        );
3311        assert!(
3312            options.query_audit.enabled && options.query_audit.rules.is_empty(),
3313            "regulated preset must enable query-audit infrastructure without global rules"
3314        );
3315
3316        let runtime = RedDBRuntime::with_options(options).expect("runtime");
3317        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3318        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3319        runtime.set_auth_store(Arc::clone(&auth_store));
3320
3321        assert!(runtime.control_events_require_persistence());
3322        assert!(runtime.query_audit().is_enabled());
3323        assert!(runtime.query_audit().rules().is_empty());
3324        assert!(auth_store
3325            .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3326            .is_some());
3327
3328        let managed_policy = runtime
3329            .config_registry()
3330            .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3331            .expect("regulated managed policy registry entry");
3332        assert!(managed_policy.managed);
3333        assert_eq!(managed_policy.resource_type, "policy");
3334        assert!(
3335            runtime
3336                .config_registry()
3337                .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3338                .expect("regulated audit config namespace")
3339                .managed
3340        );
3341
3342        let registry_rows = runtime
3343            .execute_query(&format!(
3344                "SELECT id, managed FROM red.registry WHERE id = '{}'",
3345                REGULATED_PROTECT_MANAGED_POLICY
3346            ))
3347            .expect("red.registry query");
3348        assert_eq!(registry_rows.result.records.len(), 1);
3349        assert_eq!(
3350            registry_rows.result.records[0].get("managed"),
3351            Some(&Value::Boolean(true))
3352        );
3353
3354        let managed_policy_rows = runtime
3355            .execute_query(&format!(
3356                "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3357                REGULATED_PROTECT_MANAGED_POLICY
3358            ))
3359            .expect("red.managed_policies query");
3360        assert_eq!(managed_policy_rows.result.records.len(), 1);
3361
3362        let capability_rows = runtime
3363            .execute_query(
3364                "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3365            )
3366            .expect("red.control_capabilities query");
3367        assert_eq!(capability_rows.result.records.len(), 1);
3368
3369        auth_store
3370            .create_user("alice", "p", Role::Admin)
3371            .expect("create ordinary admin");
3372        let allow_all = Policy::from_json_str(
3373            r#"{
3374                "id": "alice-allow-all",
3375                "version": 1,
3376                "statements": [{
3377                    "effect": "allow",
3378                    "actions": ["*"],
3379                    "resources": ["*"]
3380                }]
3381            }"#,
3382        )
3383        .expect("allow-all policy");
3384        auth_store.put_policy(allow_all).expect("install allow-all");
3385        auth_store
3386            .attach_policy(
3387                PrincipalRef::User(UserId::platform("alice")),
3388                "alice-allow-all",
3389            )
3390            .expect("attach allow-all");
3391        let ctx = EvalContext {
3392            principal_tenant: None,
3393            current_tenant: None,
3394            peer_ip: None,
3395            mfa_present: false,
3396            now_ms: 1_700_000_000_000,
3397            principal_is_admin_role: true,
3398            principal_is_system_owned: false,
3399            principal_is_platform_scoped: true,
3400        };
3401        assert!(
3402            auth_store.check_policy_authz(
3403                &UserId::platform("alice"),
3404                "policy:drop",
3405                &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
3406                &ctx,
3407            ),
3408            "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
3409        );
3410
3411        set_current_auth_identity("alice".to_string(), Role::Admin);
3412        let denied = runtime.execute_query(&format!(
3413            "DROP POLICY '{}'",
3414            REGULATED_PROTECT_MANAGED_POLICY
3415        ));
3416        clear_current_auth_identity();
3417        let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
3418        assert!(
3419            err.to_string().contains("managed policy"),
3420            "error should name the managed guardrail: {err}"
3421        );
3422        assert!(
3423            auth_store
3424                .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3425                .is_some(),
3426            "denied mutation must leave managed policy installed"
3427        );
3428
3429        let denied_events = runtime
3430            .execute_query(&format!(
3431                "SELECT action, resource, outcome FROM red.control_events \
3432                 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
3433                REGULATED_PROTECT_MANAGED_POLICY
3434            ))
3435            .expect("red.control_events denied policy drop");
3436        assert_eq!(denied_events.result.records.len(), 1);
3437        assert_eq!(
3438            denied_events.result.records[0].get("outcome"),
3439            Some(&Value::text("denied"))
3440        );
3441
3442        set_current_auth_identity("alice".to_string(), Role::Admin);
3443        let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
3444        clear_current_auth_identity();
3445        let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
3446        assert!(
3447            err.to_string().contains("managed config"),
3448            "error should name the managed config guardrail: {err}"
3449        );
3450
3451        let denied_config_events = runtime
3452            .execute_query(
3453                "SELECT action, resource, outcome FROM red.control_events \
3454                 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
3455            )
3456            .expect("red.control_events denied config write");
3457        assert_eq!(denied_config_events.result.records.len(), 1);
3458        assert_eq!(
3459            denied_config_events.result.records[0].get("outcome"),
3460            Some(&Value::text("denied"))
3461        );
3462
3463        runtime
3464            .execute_query("CREATE TABLE regulated_docs (id INT)")
3465            .expect("create user table");
3466        runtime
3467            .execute_query("SELECT * FROM regulated_docs")
3468            .expect("select user table");
3469        let audit_rows = runtime
3470            .db()
3471            .store()
3472            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3473            .expect("query audit collection")
3474            .query_all(|_| true);
3475        assert!(
3476            audit_rows.is_empty(),
3477            "regulated preset must not globally audit data-plane queries"
3478        );
3479
3480        clear_preset_env();
3481    }
3482
3483    #[test]
3484    fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
3485        use crate::auth::policies::{EvalContext, ResourceRef};
3486        use crate::auth::UserId;
3487        use crate::storage::schema::Value;
3488
3489        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3490        clear_preset_env();
3491
3492        let manifest_path = std::env::temp_dir().join(format!(
3493            "reddb-bootstrap-manifest-{}-{}.json",
3494            std::process::id(),
3495            std::time::SystemTime::now()
3496                .duration_since(std::time::UNIX_EPOCH)
3497                .unwrap_or_default()
3498                .as_millis()
3499        ));
3500        std::fs::write(
3501            &manifest_path,
3502            r#"{
3503                "users": [
3504                    {
3505                        "username": "ops",
3506                        "password": "hunter2",
3507                        "role": "admin",
3508                        "system_owned": true
3509                    }
3510                ],
3511                "policies": [
3512                    {
3513                        "id": "bootstrap-registry-admin",
3514                        "version": 1,
3515                        "statements": [
3516                            {
3517                                "effect": "allow",
3518                                "actions": ["red.registry:*", "policy:*", "config:write", "app:read"],
3519                                "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
3520                            }
3521                        ]
3522                    }
3523                ],
3524                "managed_policies": [
3525                    {
3526                        "id": "managed-deny-drop",
3527                        "version": 1,
3528                        "statements": [
3529                            {
3530                                "effect": "deny",
3531                                "actions": ["policy:drop"],
3532                                "resources": ["policy:managed-deny-drop"]
3533                            }
3534                        ],
3535                        "required_resource": "policy:managed-deny-drop",
3536                        "evidence": "full"
3537                    }
3538                ],
3539                "attachments": [
3540                    {"user": "ops", "policy": "bootstrap-registry-admin"}
3541                ],
3542                "managed_config_namespaces": [
3543                    {
3544                        "id": "red.ai",
3545                        "required_action": "config:write",
3546                        "required_resource": "config:red.ai.*",
3547                        "evidence": "metadata"
3548                    }
3549                ],
3550                "config": [
3551                    {"key": "red.ai.default.provider", "value": "openai"},
3552                    {
3553                        "key": "red.ai.openai.default.secret_ref",
3554                        "secret_ref": {"collection": "red.vault", "key": "openai"}
3555                    }
3556                ],
3557                "actor": "ops"
3558            }"#,
3559        )
3560        .expect("write manifest");
3561        // SAFETY: env serialised by `no_auth_env_lock`.
3562        unsafe {
3563            std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
3564        }
3565
3566        let (runtime, auth_store) = fresh_runtime_and_store();
3567        apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
3568
3569        let users = auth_store.list_users();
3570        assert_eq!(users.len(), 1);
3571        assert_eq!(users[0].username, "ops");
3572        assert!(users[0].system_owned);
3573
3574        let actor = UserId::platform("ops");
3575        let ctx = EvalContext {
3576            principal_tenant: None,
3577            current_tenant: None,
3578            peer_ip: None,
3579            mfa_present: false,
3580            now_ms: 1_700_000_000_000,
3581            principal_is_admin_role: true,
3582            principal_is_system_owned: true,
3583            principal_is_platform_scoped: true,
3584        };
3585        assert!(auth_store.check_policy_authz(
3586            &actor,
3587            "app:read",
3588            &ResourceRef::new("collection", "docs"),
3589            &ctx
3590        ));
3591
3592        let managed_policy = runtime
3593            .config_registry()
3594            .get_active("managed-deny-drop")
3595            .expect("managed policy registry entry");
3596        assert!(managed_policy.managed);
3597        assert_eq!(managed_policy.resource_type, "policy");
3598        let managed_config = runtime
3599            .config_registry()
3600            .get_active("red.ai")
3601            .expect("managed config namespace registry entry");
3602        assert!(managed_config.managed);
3603        assert_eq!(managed_config.resource_type, "config_namespace");
3604
3605        let store = runtime.db().store();
3606        match store
3607            .get_config("red.ai.default.provider")
3608            .expect("plain config persisted")
3609        {
3610            Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
3611            other => panic!("expected provider text, got {other:?}"),
3612        }
3613        let Value::Json(bytes) = store
3614            .get_config("red.ai.openai.default.secret_ref")
3615            .expect("secret ref config persisted")
3616        else {
3617            panic!("secret ref must be stored as structured JSON");
3618        };
3619        let reference: crate::serde_json::Value =
3620            crate::serde_json::from_slice(&bytes).expect("secret ref json");
3621        assert_eq!(
3622            reference.get("type").and_then(|v| v.as_str()),
3623            Some("secret_ref")
3624        );
3625        assert!(
3626            !String::from_utf8_lossy(&bytes).contains("hunter2"),
3627            "manifest password must not leak into secret ref config"
3628        );
3629
3630        let completed = store
3631            .get_config(BOOTSTRAP_COMPLETED_KEY)
3632            .expect("bootstrap completion persisted");
3633        assert!(matches!(completed, Value::Boolean(true)));
3634        assert!(
3635            store
3636                .get_config("system.bootstrap.manifest.registry_entries")
3637                .is_some(),
3638            "managed registry entries must be persisted internally"
3639        );
3640
3641        std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
3642        let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
3643        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
3644            .expect("registry rehydrates without manifest file");
3645        assert!(restored_registry.get_active("managed-deny-drop").is_some());
3646        assert!(restored_registry.get_active("red.ai").is_some());
3647
3648        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3649        apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
3650        assert!(fresh.needs_bootstrap());
3651
3652        clear_preset_env();
3653    }
3654
3655    #[test]
3656    fn production_preset_refuses_to_start_without_password() {
3657        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3658        clear_preset_env();
3659        // SAFETY: env serialised by `no_auth_env_lock`.
3660        unsafe {
3661            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3662            std::env::set_var("REDDB_USERNAME", "ops");
3663        }
3664
3665        let (runtime, auth_store) = fresh_runtime_and_store();
3666        let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
3667        assert!(
3668            err.contains("REDDB_PASSWORD"),
3669            "error must name the missing env: {err}"
3670        );
3671
3672        // Nothing was persisted; nothing was created.
3673        assert!(auth_store.needs_bootstrap());
3674        assert!(runtime
3675            .db()
3676            .store()
3677            .get_config(BOOTSTRAP_COMPLETED_KEY)
3678            .is_none());
3679
3680        clear_preset_env();
3681    }
3682
3683    #[test]
3684    fn re_running_production_after_first_boot_is_a_silent_skip() {
3685        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3686        clear_preset_env();
3687        // SAFETY: env serialised by `no_auth_env_lock`.
3688        unsafe {
3689            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3690            std::env::set_var("REDDB_USERNAME", "ops");
3691            std::env::set_var("REDDB_PASSWORD", "hunter2");
3692        }
3693
3694        let (runtime, auth_store) = fresh_runtime_and_store();
3695        apply_preset(&runtime, &auth_store).expect("first apply");
3696        assert_eq!(auth_store.list_users().len(), 1);
3697
3698        // Second invocation: same runtime/store, same env. Must be a
3699        // no-op — no error, no duplicate admin, no duplicate policy.
3700        // We do NOT reuse `auth_store` because production sealed it; the
3701        // idempotency hinge is the persisted config key, not the auth
3702        // store's in-memory seal. Build a fresh `AuthStore` as a restart
3703        // would and confirm `apply_preset` is a silent skip.
3704        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3705        apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
3706        assert!(
3707            fresh.needs_bootstrap(),
3708            "re-run must not create a second admin"
3709        );
3710        assert!(
3711            fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
3712            "re-run must not re-install the allow-all policy on the fresh store"
3713        );
3714
3715        clear_preset_env();
3716    }
3717
3718    #[test]
3719    fn unrecognised_preset_value_is_rejected() {
3720        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3721        clear_preset_env();
3722        // SAFETY: env serialised by `no_auth_env_lock`.
3723        unsafe {
3724            std::env::set_var(PRESET_ENV, "weird");
3725        }
3726
3727        let (runtime, auth_store) = fresh_runtime_and_store();
3728        let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
3729        assert!(err.contains("weird"), "error must echo the value: {err}");
3730        assert!(auth_store.needs_bootstrap());
3731
3732        clear_preset_env();
3733    }
3734
3735    #[test]
3736    fn no_auth_short_circuits_preset_entirely() {
3737        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3738        clear_preset_env();
3739        // SAFETY: env serialised by `no_auth_env_lock`. Production creds
3740        // are set but `--no-auth` must win — no admin, no bootstrap state.
3741        unsafe {
3742            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3743            std::env::set_var("REDDB_USERNAME", "ops");
3744            std::env::set_var("REDDB_PASSWORD", "hunter2");
3745        }
3746
3747        let options = no_auth_test_config(true)
3748            .to_db_options()
3749            .expect("to_db_options");
3750        assert!(no_auth_active(&options));
3751
3752        // Mirror `build_runtime_with_telemetry`: when no_auth_active,
3753        // `apply_preset` is never called.
3754        let (runtime, auth_store) = fresh_runtime_and_store();
3755        if !no_auth_active(&options) {
3756            apply_preset(&runtime, &auth_store).expect("would apply preset");
3757        }
3758
3759        assert!(
3760            auth_store.needs_bootstrap(),
3761            "--no-auth must prevent any admin creation"
3762        );
3763        assert!(
3764            runtime
3765                .db()
3766                .store()
3767                .get_config(BOOTSTRAP_COMPLETED_KEY)
3768                .is_none(),
3769            "--no-auth must skip bootstrap-state persistence"
3770        );
3771
3772        clear_preset_env();
3773    }
3774
3775    #[test]
3776    fn implicit_bind_collision_degrades() {
3777        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3778        let addr = held.local_addr().expect("held addr").to_string();
3779        let mut readiness = TransportReadiness::default();
3780
3781        let listener =
3782            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
3783
3784        assert!(listener.is_none());
3785        assert_eq!(readiness.active.len(), 0);
3786        assert_eq!(readiness.failed.len(), 1);
3787        assert!(!readiness.failed[0].explicit);
3788        assert_eq!(readiness.failed[0].bind_addr, addr);
3789    }
3790}