Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::storage::StorageProfileSelection;
13use crate::{
14    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
15};
16
17pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
18
19/// Detect available CPU cores and suggest worker thread count.
20pub fn detect_runtime_config() -> RuntimeConfig {
21    let cpus = thread::available_parallelism()
22        .map(|n| n.get())
23        .unwrap_or(1);
24
25    // Reserve 1 core for OS, use the rest for workers (minimum 1)
26    let suggested_workers = cpus.saturating_sub(1).max(1);
27
28    RuntimeConfig {
29        available_cpus: cpus,
30        suggested_workers,
31        stack_size: 8 * 1024 * 1024, // 16MB default
32    }
33}
34
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37    pub available_cpus: usize,
38    pub suggested_workers: usize,
39    pub stack_size: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerTransport {
44    Grpc,
45    Http,
46    Wire,
47}
48
49impl ServerTransport {
50    pub const fn as_str(self) -> &'static str {
51        match self {
52            Self::Grpc => "gRPC",
53            Self::Http => "HTTP",
54            Self::Wire => "wire",
55        }
56    }
57
58    pub const fn default_bind_addr(self) -> &'static str {
59        match self {
60            Self::Grpc => "127.0.0.1:5555",
61            Self::Http => "127.0.0.1:5055",
62            Self::Wire => "127.0.0.1:5050",
63        }
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct ServerCommandConfig {
69    pub path: Option<PathBuf>,
70    pub router_bind_addr: Option<String>,
71    pub router_bind_explicit: bool,
72    pub grpc_bind_addr: Option<String>,
73    pub grpc_bind_explicit: bool,
74    /// TLS-encrypted gRPC bind address. Can run side-by-side with
75    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
76    /// stand alone for TLS-only deploys. Defaults to `None`.
77    pub grpc_tls_bind_addr: Option<String>,
78    /// Path to PEM-encoded gRPC server certificate. Resolved through
79    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
80    /// mounts). When `None` and dev-mode is enabled
81    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
82    /// cert and prints its SHA-256 fingerprint to stderr.
83    pub grpc_tls_cert: Option<PathBuf>,
84    /// Path to PEM-encoded gRPC server private key. Same env-var
85    /// pattern as `grpc_tls_cert`.
86    pub grpc_tls_key: Option<PathBuf>,
87    /// Optional path to a PEM bundle of trust anchors used to verify
88    /// client certificates. When set, the gRPC listener requires
89    /// every client to present a cert that chains to this CA — i.e.
90    /// mutual TLS. When unset, one-way TLS only.
91    pub grpc_tls_client_ca: Option<PathBuf>,
92    pub http_bind_addr: Option<String>,
93    pub http_bind_explicit: bool,
94    /// HTTPS bind address. When set, the HTTP server also serves a
95    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
96    /// run side by side (e.g. 8080 plain + 8443 TLS).
97    pub http_tls_bind_addr: Option<String>,
98    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
99    /// companion when not set explicitly.
100    pub http_tls_cert: Option<PathBuf>,
101    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
102    /// companion when not set explicitly.
103    pub http_tls_key: Option<PathBuf>,
104    /// Optional PEM CA bundle. When set, the HTTPS listener requires
105    /// every client to present a cert that chains to a CA in this
106    /// bundle (mTLS). When unset, plain server-side TLS only.
107    pub http_tls_client_ca: Option<PathBuf>,
108    pub wire_bind_addr: Option<String>,
109    pub wire_bind_explicit: bool,
110    /// TLS-encrypted wire protocol bind address
111    pub wire_tls_bind_addr: Option<String>,
112    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
113    pub wire_tls_cert: Option<PathBuf>,
114    /// Path to TLS key PEM
115    pub wire_tls_key: Option<PathBuf>,
116    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
117    /// When set the server accepts psql / JDBC / DBeaver clients on
118    /// this port via the v3 protocol. Defaults to None (disabled).
119    pub pg_bind_addr: Option<String>,
120    pub create_if_missing: bool,
121    pub read_only: bool,
122    pub role: String,
123    pub primary_addr: Option<String>,
124    pub storage_profile: StorageProfileSelection,
125    pub vault: bool,
126    /// Issue #663 — explicit `--no-auth` / `--dev` flag for local
127    /// no-password mode. When `true`, the boot pipeline force-disables
128    /// auth (`auth.enabled = false`, `auth.require_auth = false`,
129    /// `auth.vault_enabled = false`) and skips
130    /// `AuthStore::bootstrap_from_env`, so an explicit
131    /// `REDDB_USERNAME` + `REDDB_PASSWORD` pair cannot accidentally
132    /// turn anonymous access into a logged-in admin. An unmissable
133    /// warning is emitted at startup. Default `false` — the existing
134    /// implicit no-auth behaviour (just don't set the envs) is
135    /// unchanged.
136    pub no_auth: bool,
137    /// Override worker thread count (None = auto-detect from CPUs)
138    pub workers: Option<usize>,
139    /// Telemetry config (Phase 6 logging). `None` falls back to the
140    /// built-in default derived from `path` + stderr-only.
141    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
142    /// HTTP handler-pool knobs from the CLI layer (issue #574 slice 5).
143    /// Carries flag and env values; red_config and built-in defaults
144    /// are applied later by [`crate::server::http_limits::resolve_http_limits`]
145    /// once the runtime is open.
146    pub http_limits_cli: crate::server::HttpLimitsCliInput,
147}
148
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub struct TransportListenerState {
151    pub transport: String,
152    pub bind_addr: String,
153    pub explicit: bool,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct TransportListenerFailure {
158    pub transport: String,
159    pub bind_addr: String,
160    pub explicit: bool,
161    pub reason: String,
162}
163
164#[derive(Debug, Clone, Default, PartialEq, Eq)]
165pub struct TransportReadiness {
166    pub active: Vec<TransportListenerState>,
167    pub failed: Vec<TransportListenerFailure>,
168}
169
170impl TransportReadiness {
171    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
172        self.active.push(TransportListenerState {
173            transport: transport.to_string(),
174            bind_addr: bind_addr.to_string(),
175            explicit,
176        });
177    }
178
179    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
180        self.failed.push(TransportListenerFailure {
181            transport: transport.to_string(),
182            bind_addr: bind_addr.to_string(),
183            explicit,
184            reason,
185        });
186    }
187}
188
189#[derive(Debug, Clone)]
190pub struct SystemdServiceConfig {
191    pub service_name: String,
192    pub binary_path: PathBuf,
193    pub run_user: String,
194    pub run_group: String,
195    pub data_path: PathBuf,
196    pub router_bind_addr: Option<String>,
197    pub grpc_bind_addr: Option<String>,
198    pub http_bind_addr: Option<String>,
199}
200
201impl SystemdServiceConfig {
202    pub fn data_dir(&self) -> PathBuf {
203        self.data_path
204            .parent()
205            .map(PathBuf::from)
206            .unwrap_or_else(|| PathBuf::from("."))
207    }
208
209    pub fn unit_path(&self) -> PathBuf {
210        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
211    }
212}
213
214/// Build a sane default `TelemetryConfig` from a server path when the
215/// caller didn't set one explicitly. Writes rotating logs into the
216/// parent directory of the DB file (or `./logs` for in-memory /
217/// pathless runs). Level defaults to `info`, pretty stderr format.
218pub fn default_telemetry_for_path(
219    path: Option<&std::path::Path>,
220) -> crate::telemetry::TelemetryConfig {
221    let log_dir = match path {
222        Some(p) => p
223            .parent()
224            .map(|parent| parent.join("logs"))
225            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
226        None => None, // in-memory — no file, stderr-only
227    };
228    crate::telemetry::TelemetryConfig {
229        log_dir,
230        file_prefix: "reddb.log".to_string(),
231        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
232        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
233            crate::telemetry::LogFormat::Pretty
234        } else {
235            crate::telemetry::LogFormat::Json
236        },
237        rotation_keep_days: 14,
238        service_name: "reddb",
239        // Implicit defaults — no CLI flag has claimed these values yet.
240        level_explicit: false,
241        format_explicit: false,
242        rotation_keep_days_explicit: false,
243        file_prefix_explicit: false,
244        log_dir_explicit: false,
245        log_file_disabled: false,
246    }
247}
248
249/// Metadata key used to thread the parsed backup config from
250/// `to_db_options` down to runner threads. The runner reads it back
251/// (via `runner_backup_intervals`) to spawn the periodic checkpointer
252/// and WAL-flush tasks. Threading through `metadata` avoids extending
253/// `RedDBOptions` with a public field that has no meaning for
254/// library consumers.
255const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
256const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
257const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
258/// Issue #519 — threaded through `metadata` like the existing interval
259/// values. `0` (default) means "feature disabled" and the runner skips
260/// the lag-monitor wiring entirely.
261const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
262
263/// Issue #663 — metadata key set by `to_db_options` when the operator
264/// passes `--no-auth` / `--dev`. Read in `build_runtime_with_telemetry`
265/// to (a) skip `AuthStore::bootstrap_from_env` (so a stray
266/// `REDDB_USERNAME`/`REDDB_PASSWORD` cannot auto-create an admin) and
267/// (b) emit the loud "auth disabled" warning. Threaded via `metadata`
268/// — rather than a public `RedDBOptions` field — to keep the flag a
269/// CLI/boot concern with no meaning for library consumers.
270pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
271
272/// Returns `true` when `--no-auth` / `--dev` was active for this boot,
273/// i.e. when `to_db_options` stamped [`NO_AUTH_META`] on `options.metadata`.
274pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
275    options
276        .metadata
277        .get(NO_AUTH_META)
278        .map(|v| v == "true")
279        .unwrap_or(false)
280}
281
282/// Loud, unmissable warning printed when the operator opts into
283/// anonymous access via `--no-auth` / `--dev`. Goes to stderr (always
284/// visible at startup) **and** the tracing layer (captured by file
285/// logs once telemetry is wired).
286const NO_AUTH_WARNING: &str =
287    "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
288
289impl ServerCommandConfig {
290    fn to_db_options(&self) -> Result<RedDBOptions, String> {
291        let mut options = match &self.path {
292            Some(path) => RedDBOptions::persistent(path),
293            None => RedDBOptions::in_memory(),
294        };
295
296        options.mode = StorageMode::Persistent;
297        options.create_if_missing = self.create_if_missing;
298        // PLAN.md Phase 4.3 — read_only resolution priority:
299        //   1. CLI flag (`--readonly`) — explicit operator intent.
300        //   2. `RED_READONLY=true` env — orchestrator override.
301        //   3. Persisted `<data>/.runtime-state.json` from a prior
302        //      `POST /admin/readonly` — survives restart.
303        //   4. Default `false`.
304        options.read_only = self.read_only
305            || env_nonempty("RED_READONLY")
306                .or_else(|| env_nonempty("REDDB_READONLY"))
307                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
308                .unwrap_or(false)
309            || self.path.as_ref().is_some_and(|data_path| {
310                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
311                    data_path,
312                ))
313                .unwrap_or(false)
314            });
315
316        options.replication = match self.role.as_str() {
317            "primary" => ReplicationConfig::primary(),
318            "replica" => {
319                let primary_addr = self
320                    .primary_addr
321                    .clone()
322                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
323                // Public-mutation rejection on replicas is enforced by
324                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
325                // Leaving `options.read_only = false` keeps the pager
326                // writable so the internal logical-WAL apply path can
327                // ingest records from the primary; WriteGate ensures no
328                // client request reaches storage.
329                ReplicationConfig::replica(primary_addr)
330            }
331            _ => ReplicationConfig::standalone(),
332        };
333        options.storage_profile = self.storage_profile.validate()?;
334
335        if self.vault {
336            options.auth.vault_enabled = true;
337        }
338
339        // Issue #663 — `--no-auth` / `--dev` is the last word on auth
340        // for this boot: force every auth knob off, regardless of any
341        // env-derived config (`--vault`, `REDDB_USERNAME`/`PASSWORD`,
342        // `REDDB_VAULT_KEY`, OAuth, cert) the operator may also have
343        // set. We *also* stamp [`NO_AUTH_META`] so the auth-store
344        // builder downstream knows to skip `bootstrap_from_env`
345        // (which would otherwise auto-create an admin from
346        // `REDDB_USERNAME`/`REDDB_PASSWORD` even with auth disabled,
347        // a footgun for the local-dev workflow this flag exists to
348        // support).
349        if self.no_auth {
350            options.auth.enabled = false;
351            options.auth.require_auth = false;
352            options.auth.vault_enabled = false;
353            options
354                .metadata
355                .insert(NO_AUTH_META.to_string(), "true".to_string());
356        }
357
358        // Issue #652 — Control Event Ledger Compliance Mode.
359        // `REDDB_COMPLIANCE_MODE=true|1|yes|on` makes the producer
360        // slices (652b/c/d) fail-closed on ledger persistence
361        // failures. Default `false` — log-and-continue on emit error.
362        if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
363            options.control_events.compliance_mode = matches!(
364                raw.to_ascii_lowercase().as_str(),
365                "1" | "true" | "yes" | "on"
366            );
367        }
368        if env_nonempty(PRESET_ENV).is_some_and(|s| s.trim() == PRESET_REGULATED) {
369            options.control_events.compliance_mode = true;
370            options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
371        }
372
373        // Issue #517 — canonical `REDDB_BACKUP_*` contract takes
374        // precedence. On Err, surface the partial-config message so
375        // boot exits non-zero with a clear operator message. On
376        // Ok(None), fall through to the legacy backend-from-env path.
377        match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
378            Err(msg) => {
379                return Err(format!("backup bootstrap: {msg}"));
380            }
381            Ok(Some(cfg)) => {
382                apply_backup_config(&mut options, &cfg);
383            }
384            Ok(None) => {
385                configure_remote_backend_from_env(&mut options);
386            }
387        }
388
389        if options.remote_backend.is_some()
390            || options
391                .metadata
392                .contains_key(BACKUP_INTERVAL_META_CHECKPOINT)
393        {
394            let mut selection = options.storage_profile;
395            selection.managed_backup = true;
396            options.storage_profile = selection.validate()?;
397        }
398
399        Ok(options)
400    }
401
402    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
403        let mut transports = Vec::with_capacity(3);
404        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
405            transports.push(ServerTransport::Grpc);
406        }
407        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
408            transports.push(ServerTransport::Http);
409        }
410        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
411            transports.push(ServerTransport::Wire);
412        }
413        transports
414    }
415}
416
417/// Read an env var, treating empty / whitespace-only as `None`.
418/// Honors the `<NAME>_FILE` convention. Re-exports the shared
419/// `crate::utils::env_with_file_fallback` helper so call sites in
420/// this module can keep their short local name.
421fn env_nonempty(name: &str) -> Option<String> {
422    crate::utils::env_with_file_fallback(name)
423}
424
425fn env_truthy(name: &str) -> bool {
426    env_nonempty(name)
427        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
428        .unwrap_or(false)
429}
430
431/// Apply a parsed [`BackupConfig`] to `options`. Wires the S3
432/// backend via `with_remote_backend` + `with_atomic_remote_backend`
433/// when the `backend-s3` feature is on, stashes intervals + backend
434/// kind in `metadata` so the runner can spawn the periodic tasks,
435/// and emits the startup INFO log required by #517.
436fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
437    let endpoint_host = endpoint_host(&cfg.endpoint);
438
439    options.metadata.insert(
440        BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
441        cfg.checkpoint_interval_secs.to_string(),
442    );
443    options.metadata.insert(
444        BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
445        cfg.wal_flush_interval_secs.to_string(),
446    );
447    options
448        .metadata
449        .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
450    options.metadata.insert(
451        BACKUP_PAUSE_ON_LAG_META.to_string(),
452        cfg.pause_on_lag_secs.to_string(),
453    );
454
455    #[cfg(feature = "backend-s3")]
456    {
457        let s3_cfg = crate::storage::backend::S3Config {
458            endpoint: cfg.endpoint.clone(),
459            bucket: cfg.bucket.clone(),
460            key_prefix: cfg.prefix.clone(),
461            access_key: cfg.access_key_id.clone(),
462            secret_key: cfg.secret_access_key.clone(),
463            region: cfg.region.clone(),
464            path_style: true,
465        };
466        let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
467        options.remote_backend = Some(backend.clone());
468        options.remote_backend_atomic = Some(backend);
469        // Use the operator-supplied prefix as the snapshot key root.
470        // The existing helpers (`default_snapshot_prefix`,
471        // `default_wal_archive_prefix`) derive sub-prefixes from the
472        // parent of `remote_key`.
473        let trimmed = cfg.prefix.trim_end_matches('/');
474        options.remote_key = Some(reddb_file::remote_database_key(trimmed));
475
476        tracing::info!(
477            backend = "s3",
478            endpoint = %endpoint_host,
479            bucket = %cfg.bucket,
480            prefix = %cfg.prefix,
481            checkpoint_interval_secs = cfg.checkpoint_interval_secs,
482            wal_flush_interval_secs = cfg.wal_flush_interval_secs,
483            "backup backend configured from REDDB_BACKUP_* env"
484        );
485    }
486
487    #[cfg(not(feature = "backend-s3"))]
488    {
489        tracing::warn!(
490            backend = "s3",
491            endpoint = %endpoint_host,
492            bucket = %cfg.bucket,
493            prefix = %cfg.prefix,
494            "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
495             backend wiring skipped (archiver/checkpointer also disabled)"
496        );
497    }
498}
499
500fn endpoint_host(endpoint: &str) -> &str {
501    let after_scheme = endpoint
502        .split_once("://")
503        .map(|(_, r)| r)
504        .unwrap_or(endpoint);
505    after_scheme.split('/').next().unwrap_or(after_scheme)
506}
507
508/// If `options` carry backup-task intervals threaded in via
509/// [`apply_backup_config`], spawn periodic checkpointer + WAL-flush
510/// tasks against `runtime`. Returns a `BackupTasksHandle` that
511/// stops the tasks when dropped; runners keep it alive for the
512/// server lifetime.
513fn spawn_backup_tasks_if_configured(
514    options: &RedDBOptions,
515    runtime: &RedDBRuntime,
516) -> Option<BackupTasksHandle> {
517    let checkpoint_secs: u64 = options
518        .metadata
519        .get(BACKUP_INTERVAL_META_CHECKPOINT)?
520        .parse()
521        .ok()?;
522    let wal_secs: u64 = options
523        .metadata
524        .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
525        .parse()
526        .ok()?;
527    // Issue #519 — opt-in graceful read-only when remote archive lag
528    // exceeds the threshold. `0` (default) keeps legacy behaviour.
529    let pause_on_lag_secs: u64 = options
530        .metadata
531        .get(BACKUP_PAUSE_ON_LAG_META)
532        .and_then(|raw| raw.parse().ok())
533        .unwrap_or(0);
534    options.remote_backend.as_ref()?;
535
536    let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
537
538    // Stamp the gate with the threshold + a "now" baseline so the
539    // first auto-pause can only fire after `pause_on_lag_secs` of
540    // archive silence. The poller below re-evaluates on the same
541    // clock the archive-task wrapper uses.
542    if pause_on_lag_secs > 0 {
543        let now_ms = std::time::SystemTime::now()
544            .duration_since(std::time::UNIX_EPOCH)
545            .map(|d| d.as_millis() as u64)
546            .unwrap_or(0);
547        runtime
548            .write_gate()
549            .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
550        tracing::info!(
551            pause_on_lag_secs,
552            "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
553        );
554    }
555
556    let checkpoint_handle = {
557        let stop = Arc::clone(&stop);
558        let runtime = runtime.clone();
559        let interval = Duration::from_secs(checkpoint_secs);
560        thread::Builder::new()
561            .name("red-checkpointer".into())
562            .spawn(move || {
563                periodic_loop(stop, interval, move || {
564                    if let Err(err) = runtime.checkpoint() {
565                        tracing::warn!(error = %err, "periodic checkpoint failed");
566                    }
567                })
568            })
569            .ok()
570    };
571
572    let archiver_handle = {
573        let stop = Arc::clone(&stop);
574        let runtime = runtime.clone();
575        let interval = Duration::from_secs(wal_secs);
576        let lag_enabled = pause_on_lag_secs > 0;
577        thread::Builder::new()
578            .name("red-wal-archiver".into())
579            .spawn(move || {
580                periodic_loop(stop, interval, move || match runtime.trigger_backup() {
581                    Ok(_) if lag_enabled => {
582                        let now_ms = std::time::SystemTime::now()
583                            .duration_since(std::time::UNIX_EPOCH)
584                            .map(|d| d.as_millis() as u64)
585                            .unwrap_or(0);
586                        runtime.write_gate().record_archive_success(now_ms);
587                        // Same-tick re-evaluation: catching up while
588                        // already auto-paused must auto-resume without
589                        // waiting for the poller's cadence.
590                        runtime.write_gate().evaluate_archive_lag(now_ms);
591                    }
592                    Ok(_) => {}
593                    Err(err) => {
594                        tracing::warn!(error = %err, "periodic WAL archive/backup failed");
595                    }
596                })
597            })
598            .ok()
599    };
600
601    // Issue #519 — lag poller. Wakes on a short cadence so a frozen
602    // archiver (the worst case) still flips the gate within ~5s of
603    // crossing the threshold, instead of waiting up to a full
604    // `wal_secs` for the next archive attempt that may never come.
605    let lag_monitor_handle = if pause_on_lag_secs > 0 {
606        let stop = Arc::clone(&stop);
607        let runtime = runtime.clone();
608        // 5s is short enough that the threshold is honoured tightly
609        // and long enough that the atomic loads stay invisible at the
610        // process level.
611        let interval = Duration::from_secs(5);
612        thread::Builder::new()
613            .name("red-archive-lag-monitor".into())
614            .spawn(move || {
615                periodic_loop(stop, interval, move || {
616                    let now_ms = std::time::SystemTime::now()
617                        .duration_since(std::time::UNIX_EPOCH)
618                        .map(|d| d.as_millis() as u64)
619                        .unwrap_or(0);
620                    let was_paused = runtime.write_gate().is_auto_paused();
621                    let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
622                    if now_paused && !was_paused {
623                        tracing::warn!(
624                            pause_on_lag_secs,
625                            last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
626                            "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
627                        );
628                    } else if !now_paused && was_paused {
629                        tracing::info!(
630                            "WAL archive caught up — exiting graceful read-only mode (issue #519)"
631                        );
632                    }
633                })
634            })
635            .ok()
636    } else {
637        None
638    };
639
640    tracing::info!(
641        checkpoint_interval_secs = checkpoint_secs,
642        wal_flush_interval_secs = wal_secs,
643        "backup tasks spawned (checkpointer + WAL archiver)"
644    );
645
646    Some(BackupTasksHandle {
647        stop,
648        _checkpoint_handle: checkpoint_handle,
649        _archiver_handle: archiver_handle,
650        _lag_monitor_handle: lag_monitor_handle,
651    })
652}
653
654/// Shutdown handle for the periodic checkpointer + archiver tasks.
655/// Drop signals both loops to exit on their next wake.
656pub struct BackupTasksHandle {
657    stop: Arc<std::sync::atomic::AtomicBool>,
658    _checkpoint_handle: Option<thread::JoinHandle<()>>,
659    _archiver_handle: Option<thread::JoinHandle<()>>,
660    /// Issue #519 — periodic archive-lag poller, only spawned when
661    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS > 0`.
662    _lag_monitor_handle: Option<thread::JoinHandle<()>>,
663}
664
665impl Drop for BackupTasksHandle {
666    fn drop(&mut self) {
667        self.stop.store(true, std::sync::atomic::Ordering::Release);
668    }
669}
670
671fn periodic_loop<F: FnMut()>(
672    stop: Arc<std::sync::atomic::AtomicBool>,
673    interval: Duration,
674    mut tick: F,
675) {
676    // Wake on a short cadence so shutdown is responsive even when the
677    // operator-configured interval is large (e.g. 1h checkpoint).
678    let wake = Duration::from_secs(1);
679    let mut elapsed = Duration::ZERO;
680    while !stop.load(std::sync::atomic::Ordering::Acquire) {
681        thread::sleep(wake);
682        elapsed += wake;
683        if elapsed >= interval {
684            tick();
685            elapsed = Duration::ZERO;
686        }
687    }
688}
689
690fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
691    // PLAN.md (cloud-agnostic) — prefer the new spelling
692    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
693    // existing dev installs. `none` (default) means standalone — no
694    // remote backend, valid for development and on-prem without
695    // remote.
696    let backend = env_nonempty("RED_BACKEND")
697        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
698        .unwrap_or_else(|| "none".to_string())
699        .to_ascii_lowercase();
700
701    match backend.as_str() {
702        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
703        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
704        // IDrive, Storj. The `path_style` toggle in S3Config picks
705        // the right addressing for self-hosted vs hosted.
706        "s3" | "minio" | "r2" => {
707            #[cfg(feature = "backend-s3")]
708            {
709                if let Some(config) = s3_config_from_env() {
710                    let remote_key = env_nonempty("RED_REMOTE_KEY")
711                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
712                        .unwrap_or_else(|| reddb_file::remote_database_key("clusters/dev"));
713                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
714                    options.remote_backend = Some(backend.clone());
715                    options.remote_backend_atomic = Some(backend);
716                    options.remote_key = Some(remote_key);
717                }
718            }
719            #[cfg(not(feature = "backend-s3"))]
720            {
721                tracing::warn!(
722                    backend = %backend,
723                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
724                );
725            }
726        }
727        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
728        // calls it `fs`; legacy code shipped it as `local`. Both
729        // names map to LocalBackend, with the remote_key derived
730        // from `RED_FS_PATH` + a per-database suffix when provided.
731        "fs" | "local" => {
732            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
733            let remote_key = match (
734                base_path,
735                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
736            ) {
737                (Some(base), Some(rel)) => Some(format!(
738                    "{}/{}",
739                    base.trim_end_matches('/'),
740                    rel.trim_start_matches('/')
741                )),
742                (Some(base), None) => Some(reddb_file::remote_database_key(&format!(
743                    "{}/clusters/dev",
744                    base.trim_end_matches('/')
745                ))),
746                (None, Some(rel)) => Some(rel),
747                (None, None) => None,
748            };
749            if let Some(key) = remote_key {
750                let backend = Arc::new(crate::storage::backend::LocalBackend);
751                options.remote_backend = Some(backend.clone());
752                options.remote_backend_atomic = Some(backend);
753                options.remote_key = Some(key);
754            }
755        }
756        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
757        // portability: any service exposing PUT/GET/DELETE serves
758        // as a backup target. Optional auth via *_FILE secret
759        // path keeps the token out of the env.
760        "http" => {
761            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
762                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
763            {
764                Some(u) => u,
765                None => {
766                    tracing::warn!(
767                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
768                    );
769                    return;
770                }
771            };
772            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
773                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
774                .unwrap_or_default();
775            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
776                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
777            {
778                std::fs::read_to_string(&path)
779                    .ok()
780                    .map(|s| s.trim().to_string())
781                    .filter(|s| !s.is_empty())
782            } else {
783                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
784                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
785            };
786
787            let mut config =
788                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
789            if let Some(auth) = auth_header {
790                config = config.with_auth_header(auth);
791            }
792            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
793                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
794                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
795            config = config.with_conditional_writes(conditional_writes);
796            // Always populate the snapshot-transport handle. When the
797            // operator confirmed CAS support, also populate the atomic
798            // handle via AtomicHttpBackend — without that flag,
799            // LeaseStore must remain unreachable on this backend.
800            if conditional_writes {
801                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
802                    Ok(atomic) => {
803                        let atomic_arc = Arc::new(atomic);
804                        options.remote_backend = Some(atomic_arc.clone());
805                        options.remote_backend_atomic = Some(atomic_arc);
806                    }
807                    Err(err) => {
808                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
809                        options.remote_backend =
810                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
811                    }
812                }
813            } else {
814                options.remote_backend =
815                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
816            }
817            options.remote_key = env_nonempty("RED_REMOTE_KEY")
818                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
819                .or_else(|| Some(reddb_file::remote_database_key("clusters/dev")));
820        }
821        // `none` is the explicit standalone — no remote, no backup
822        // pipeline. Boot never blocks on network reachability.
823        "none" | "" => {}
824        other => {
825            tracing::warn!(
826                backend = %other,
827                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
828            );
829        }
830    }
831}
832
833/// Resolve a value from env, accepting both the cloud-agnostic
834/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
835/// kept for existing dev installs. The new spelling wins; the
836/// legacy spelling is read with a warning hint in callers' logs.
837#[cfg(feature = "backend-s3")]
838fn env_s3(suffix: &str) -> Option<String> {
839    env_nonempty(&format!("RED_S3_{suffix}"))
840        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
841}
842
843/// Read a secret value from either the literal env var or a file
844/// path supplied via `*_FILE` (PLAN.md spec — compatible with
845/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
846/// secrets). The `_FILE` variant wins so an operator can set it to
847/// override the inline value without touching the inline env.
848#[cfg(feature = "backend-s3")]
849fn env_s3_secret(suffix: &str) -> Option<String> {
850    let file_key_red = format!("RED_S3_{suffix}_FILE");
851    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
852    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
853        return std::fs::read_to_string(&path)
854            .ok()
855            .map(|s| s.trim().to_string())
856            .filter(|s| !s.is_empty());
857    }
858    env_s3(suffix)
859}
860
861#[cfg(feature = "backend-s3")]
862fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
863    let endpoint = env_s3("ENDPOINT")?;
864    let bucket = env_s3("BUCKET")?;
865    let access_key = env_s3_secret("ACCESS_KEY")?;
866    let secret_key = env_s3_secret("SECRET_KEY")?;
867    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
868    let key_prefix = env_s3("KEY_PREFIX")
869        .or_else(|| env_s3("PREFIX"))
870        .unwrap_or_default();
871    let path_style = env_s3("PATH_STYLE")
872        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
873        .unwrap_or(true);
874    Some(crate::storage::backend::S3Config {
875        endpoint,
876        bucket,
877        key_prefix,
878        access_key,
879        secret_key,
880        region,
881        path_style,
882    })
883}
884
885pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
886    let data_dir = config.data_dir();
887    let exec_start = render_systemd_exec_start(config);
888    format!(
889        "[Unit]\n\
890Description=RedDB unified database service\n\
891After=network-online.target\n\
892Wants=network-online.target\n\
893\n\
894[Service]\n\
895Type=simple\n\
896User={user}\n\
897Group={group}\n\
898WorkingDirectory={workdir}\n\
899ExecStart={exec_start}\n\
900Restart=always\n\
901RestartSec=2\n\
902LimitSTACK=16M\n\
903NoNewPrivileges=true\n\
904PrivateTmp=true\n\
905ProtectSystem=strict\n\
906ProtectHome=true\n\
907ProtectControlGroups=true\n\
908ProtectKernelTunables=true\n\
909ProtectKernelModules=true\n\
910RestrictNamespaces=true\n\
911LockPersonality=true\n\
912MemoryDenyWriteExecute=true\n\
913ReadWritePaths={workdir}\n\
914\n\
915[Install]\n\
916WantedBy=multi-user.target\n",
917        user = config.run_user,
918        group = config.run_group,
919        workdir = data_dir.display(),
920        exec_start = exec_start,
921    )
922}
923
924/// Install a systemd unit + start the service.
925///
926/// Linux-only. The helper shells out to `systemctl`, `useradd`,
927/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
928/// Windows or macOS. The Windows/macOS branch returns a hard error so
929/// callers (the CLI) surface a clear message instead of a confusing
930/// syscall failure. A proper Windows-service equivalent (sc.exe /
931/// NSSM) is a Phase 3.6 follow-up.
932#[cfg(target_os = "linux")]
933pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
934    ensure_root()?;
935    ensure_command_available("systemctl")?;
936    ensure_command_available("getent")?;
937    ensure_command_available("groupadd")?;
938    ensure_command_available("useradd")?;
939    ensure_command_available("install")?;
940    ensure_executable(&config.binary_path)?;
941
942    if !command_success("getent", ["group", config.run_group.as_str()])? {
943        run_command("groupadd", ["--system", config.run_group.as_str()])?;
944    }
945
946    if !command_success("id", ["-u", config.run_user.as_str()])? {
947        let data_dir = config.data_dir();
948        run_command(
949            "useradd",
950            [
951                "--system",
952                "--gid",
953                config.run_group.as_str(),
954                "--home-dir",
955                data_dir.to_string_lossy().as_ref(),
956                "--shell",
957                "/usr/sbin/nologin",
958                config.run_user.as_str(),
959            ],
960        )?;
961    }
962
963    let data_dir = config.data_dir();
964    run_command(
965        "install",
966        [
967            "-d",
968            "-o",
969            config.run_user.as_str(),
970            "-g",
971            config.run_group.as_str(),
972            "-m",
973            "0750",
974            data_dir.to_string_lossy().as_ref(),
975        ],
976    )?;
977
978    std::fs::write(config.unit_path(), render_systemd_unit(config))
979        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
980
981    run_command("systemctl", ["daemon-reload"])?;
982    run_command(
983        "systemctl",
984        [
985            "enable",
986            "--now",
987            format!("{}.service", config.service_name).as_str(),
988        ],
989    )?;
990
991    Ok(())
992}
993
994/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
995/// identical so callers compile on every platform; surface a clear
996/// error at runtime. Windows/macOS service-manager integration is a
997/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
998#[cfg(not(target_os = "linux"))]
999pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
1000    Err("systemd install is Linux-only — use sc.exe (Windows) or \
1001         launchd (macOS) to install the service manually using the \
1002         unit printed by `red service print-unit`"
1003        .to_string())
1004}
1005
1006#[cfg(target_os = "linux")]
1007fn ensure_root() -> Result<(), String> {
1008    let output = Command::new("id")
1009        .arg("-u")
1010        .output()
1011        .map_err(|err| format!("failed to determine current uid: {err}"))?;
1012    if !output.status.success() {
1013        return Err("failed to determine current uid".to_string());
1014    }
1015    let uid = String::from_utf8_lossy(&output.stdout);
1016    if uid.trim() != "0" {
1017        return Err("run this command as root (sudo)".to_string());
1018    }
1019    Ok(())
1020}
1021
1022#[cfg(target_os = "linux")]
1023fn ensure_command_available(command: &str) -> Result<(), String> {
1024    let status = Command::new("sh")
1025        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1026        .status()
1027        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1028    if status.success() {
1029        Ok(())
1030    } else {
1031        Err(format!("required command not found: {command}"))
1032    }
1033}
1034
1035#[cfg(target_os = "linux")]
1036fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1037    let metadata = std::fs::metadata(path)
1038        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1039    #[cfg(unix)]
1040    {
1041        use std::os::unix::fs::PermissionsExt;
1042        if metadata.permissions().mode() & 0o111 == 0 {
1043            return Err(format!("binary is not executable: {}", path.display()));
1044        }
1045    }
1046    #[cfg(not(unix))]
1047    {
1048        if !metadata.is_file() {
1049            return Err(format!("binary is not a file: {}", path.display()));
1050        }
1051    }
1052    Ok(())
1053}
1054
1055#[cfg(target_os = "linux")]
1056fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1057    Command::new(program)
1058        .args(args)
1059        .status()
1060        .map(|status| status.success())
1061        .map_err(|err| format!("failed to run {program}: {err}"))
1062}
1063
1064#[cfg(target_os = "linux")]
1065fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1066    let output = Command::new(program)
1067        .args(args)
1068        .output()
1069        .map_err(|err| format!("failed to run {program}: {err}"))?;
1070    if output.status.success() {
1071        return Ok(());
1072    }
1073
1074    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1075    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1076    let detail = if !stderr.is_empty() {
1077        stderr
1078    } else if !stdout.is_empty() {
1079        stdout
1080    } else {
1081        format!("exit status {}", output.status)
1082    };
1083    Err(format!("{program} failed: {detail}"))
1084}
1085
1086pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1087    let has_any = config.router_bind_addr.is_some()
1088        || config.grpc_bind_addr.is_some()
1089        || config.http_bind_addr.is_some()
1090        || config.wire_bind_addr.is_some()
1091        || config.pg_bind_addr.is_some();
1092    if !has_any {
1093        return Err("at least one server bind address must be configured".into());
1094    }
1095    let thread_name = if config.router_bind_addr.is_some() {
1096        "red-server-router"
1097    } else {
1098        match (
1099            config.grpc_bind_addr.is_some(),
1100            config.http_bind_addr.is_some(),
1101        ) {
1102            (true, true) => "red-server-dual",
1103            (true, false) => "red-server-grpc",
1104            (false, true) => "red-server-http",
1105            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1106            (false, false) => "red-server-pg-wire",
1107        }
1108    };
1109
1110    let handle = thread::Builder::new()
1111        .name(thread_name.into())
1112        .stack_size(8 * 1024 * 1024)
1113        .spawn(move || run_configured_servers(config))
1114        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1115
1116    match handle.join() {
1117        Ok(result) => result,
1118        Err(_) => Err("server thread panicked".to_string()),
1119    }
1120}
1121
1122fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1123    let mut parts = vec![
1124        config.binary_path.display().to_string(),
1125        "server".to_string(),
1126        "--path".to_string(),
1127        config.data_path.display().to_string(),
1128    ];
1129
1130    if let Some(bind_addr) = &config.router_bind_addr {
1131        parts.push("--bind".to_string());
1132        parts.push(bind_addr.clone());
1133    } else if let Some(bind_addr) = &config.grpc_bind_addr {
1134        parts.push("--grpc-bind".to_string());
1135        parts.push(bind_addr.clone());
1136    }
1137    if let Some(bind_addr) = &config.http_bind_addr {
1138        parts.push("--http-bind".to_string());
1139        parts.push(bind_addr.clone());
1140    }
1141
1142    parts.join(" ")
1143}
1144
1145pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1146    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1147        Ok(addresses) => addresses.collect(),
1148        Err(_) => return false,
1149    };
1150
1151    addresses
1152        .into_iter()
1153        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1154}
1155
1156#[inline(never)]
1157fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1158    // Phase 6 logging is initialised inside each runner once the
1159    // runtime is open — see `build_runtime_and_auth_store`. Going
1160    // after DB open lets us read `red.logging.*` config keys out of
1161    // the persistent red_config store and merge them with the CLI
1162    // flags (flag > red_config > built-in default).
1163    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1164        return run_routed_server(config, router_bind_addr);
1165    }
1166
1167    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1168        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1169            run_dual_server(config, grpc_bind_addr, http_bind_addr)
1170        }
1171        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1172        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1173        (None, None) => {
1174            if let Some(wire_addr) = config.wire_bind_addr.clone() {
1175                run_wire_only_server(config, wire_addr)
1176            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1177                run_pg_only_server(config, pg_addr)
1178            } else {
1179                Err("at least one server bind address must be configured".to_string())
1180            }
1181        }
1182    }
1183}
1184
1185/// Bind a TCP listener for a transport at startup and record the
1186/// outcome in the shared [`TransportReadiness`] state.
1187///
1188/// The split between *explicit* and *implicit/default* binds is the
1189/// contract from issue #545:
1190///
1191/// * `explicit == true` — the operator named this transport on the
1192///   CLI / env / config. A failed bind is fatal: this returns `Err`
1193///   and the boot path must propagate the error so the process exits
1194///   non-zero with the recorded `reason`.
1195/// * `explicit == false` — this is a default listener the server
1196///   would have spun up regardless. A failed bind degrades: this
1197///   returns `Ok(None)` (no listener) but the failure is still
1198///   recorded in `readiness.failed`, so the server keeps running and
1199///   the next `/health` probe enumerates the degraded listener.
1200///
1201/// On success the bound listener lands in `readiness.active`.
1202pub fn bind_listener_for_startup(
1203    readiness: &mut TransportReadiness,
1204    transport: &str,
1205    bind_addr: &str,
1206    explicit: bool,
1207) -> Result<Option<TcpListener>, String> {
1208    match TcpListener::bind(bind_addr) {
1209        Ok(listener) => {
1210            readiness.active(transport, bind_addr, explicit);
1211            Ok(Some(listener))
1212        }
1213        Err(err) => {
1214            let reason = format!("{transport} listener bind {bind_addr}: {err}");
1215            readiness.failed(transport, bind_addr, explicit, reason.clone());
1216            if explicit {
1217                tracing::error!(
1218                    transport,
1219                    bind = %bind_addr,
1220                    error = %err,
1221                    "fatal explicit bind failure"
1222                );
1223                Err(format!("explicit {reason}"))
1224            } else {
1225                tracing::warn!(
1226                    transport,
1227                    bind = %bind_addr,
1228                    error = %err,
1229                    "non-fatal implicit bind failure; listener degraded"
1230                );
1231                Ok(None)
1232            }
1233        }
1234    }
1235}
1236
1237/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
1238///
1239/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
1240/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
1241/// grace window. SIGKILL after that grace window is the OS's
1242/// fallback; we are responsible for finishing in time.
1243///
1244/// Spawns a tokio task on the caller's runtime that:
1245///   1. Awaits the first of SIGTERM / SIGINT.
1246///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
1247///      runtime moves to `Stopped` on its own; this just runs the
1248///      flush + checkpoint pipeline and (optionally) a final backup.
1249///   3. Calls `std::process::exit(0)` so the orchestrator sees a
1250///      clean exit code.
1251///
1252/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
1253/// configured) toggles step 3's backup branch. The flush + checkpoint
1254/// always run.
1255///
1256/// Idempotent across signal storms — `graceful_shutdown` returns the
1257/// cached report on second call, but we exit on the first one
1258/// regardless, so the second SIGTERM never reaches the handler.
1259async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1260    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1261        .ok()
1262        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1263        .unwrap_or(true);
1264
1265    #[cfg(unix)]
1266    {
1267        use tokio::signal::unix::{signal, SignalKind};
1268
1269        let mut sigterm = match signal(SignalKind::terminate()) {
1270            Ok(s) => s,
1271            Err(err) => {
1272                tracing::warn!(
1273                    error = %err,
1274                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1275                );
1276                return;
1277            }
1278        };
1279        let mut sigint = match signal(SignalKind::interrupt()) {
1280            Ok(s) => s,
1281            Err(err) => {
1282                tracing::warn!(error = %err, "could not install SIGINT handler");
1283                return;
1284            }
1285        };
1286        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
1287        // their `_FILE` companions without restarting the process.
1288        // Useful for credential rotation pipelines (kubectl create
1289        // secret + kubectl rollout restart, but for systemd / Nomad /
1290        // bare-metal where rolling the process is heavier).
1291        let mut sighup = match signal(SignalKind::hangup()) {
1292            Ok(s) => Some(s),
1293            Err(err) => {
1294                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1295                None
1296            }
1297        };
1298
1299        let reload_runtime = runtime.clone();
1300        tokio::spawn(async move {
1301            loop {
1302                let signal_name = match &mut sighup {
1303                    Some(hup) => tokio::select! {
1304                        _ = sigterm.recv() => "SIGTERM",
1305                        _ = sigint.recv() => "SIGINT",
1306                        _ = hup.recv() => "SIGHUP",
1307                    },
1308                    None => tokio::select! {
1309                        _ = sigterm.recv() => "SIGTERM",
1310                        _ = sigint.recv() => "SIGINT",
1311                    },
1312                };
1313
1314                if signal_name == "SIGHUP" {
1315                    handle_sighup_reload(&reload_runtime);
1316                    continue; // stay alive; SIGHUP isn't a shutdown
1317                }
1318
1319                tracing::info!(
1320                    signal = signal_name,
1321                    "lifecycle signal received; shutting down"
1322                );
1323                match runtime.graceful_shutdown(backup_on_shutdown) {
1324                    Ok(report) => {
1325                        tracing::info!(
1326                            duration_ms = report.duration_ms,
1327                            flushed_wal = report.flushed_wal,
1328                            final_checkpoint = report.final_checkpoint,
1329                            backup_uploaded = report.backup_uploaded,
1330                            "graceful shutdown complete"
1331                        );
1332                    }
1333                    Err(err) => {
1334                        tracing::error!(error = %err, "graceful shutdown failed");
1335                        // Issue #205 — graceful shutdown returning Err
1336                        // means the runtime is exiting without a clean
1337                        // flush/checkpoint. Operator-grade event so the
1338                        // operator notices the dirty exit even when the
1339                        // process restarts before they read tracing logs.
1340                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1341                            reason: format!("graceful shutdown failed: {err}"),
1342                        }
1343                        .emit_global();
1344                    }
1345                }
1346                std::process::exit(0);
1347            }
1348        });
1349    }
1350
1351    #[cfg(not(unix))]
1352    {
1353        tokio::spawn(async move {
1354            let interrupted = tokio::signal::ctrl_c().await;
1355            if let Err(err) = interrupted {
1356                tracing::warn!(error = %err, "could not install Ctrl+C handler");
1357                return;
1358            }
1359
1360            tracing::info!(
1361                signal = "Ctrl+C",
1362                "lifecycle signal received; shutting down"
1363            );
1364            match runtime.graceful_shutdown(backup_on_shutdown) {
1365                Ok(report) => {
1366                    tracing::info!(
1367                        duration_ms = report.duration_ms,
1368                        flushed_wal = report.flushed_wal,
1369                        final_checkpoint = report.final_checkpoint,
1370                        backup_uploaded = report.backup_uploaded,
1371                        "graceful shutdown complete"
1372                    );
1373                }
1374                Err(err) => {
1375                    tracing::error!(error = %err, "graceful shutdown failed");
1376                }
1377            }
1378            std::process::exit(0);
1379        });
1380    }
1381}
1382
1383/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
1384/// vars. Today this only refreshes the audit log + records the
1385/// reload event; the runtime modules that hold cached secret
1386/// material (S3 backend credentials, admin token, JWT keys) read
1387/// the env on each request so the next call after SIGHUP picks up
1388/// the new file contents automatically. A future extension can
1389/// punch through to the LeaseStore / AuthStore for in-memory
1390/// caches that don't re-read on each call.
1391fn handle_sighup_reload(runtime: &RedDBRuntime) {
1392    let now_ms = std::time::SystemTime::now()
1393        .duration_since(std::time::UNIX_EPOCH)
1394        .map(|d| d.as_millis() as u64)
1395        .unwrap_or(0);
1396    tracing::info!(
1397        target: "reddb::secrets",
1398        ts_unix_ms = now_ms,
1399        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1400    );
1401    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1402    // every emission goes through the typed-field guard. The
1403    // arguments here are static, but using the typed entry point
1404    // keeps the discipline uniform across call sites.
1405    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1406    runtime.audit_log().record_event(
1407        AuditEvent::builder("config/sighup_reload")
1408            .source(AuditAuthSource::System)
1409            .resource("secrets")
1410            .outcome(Outcome::Success)
1411            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1412            .build(),
1413    );
1414}
1415
1416#[inline(never)]
1417fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1418    let workers = config.workers;
1419    let cli_telemetry = config.telemetry.clone();
1420    let db_options = config.to_db_options()?;
1421    let rt_config = detect_runtime_config();
1422    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1423    let (runtime, auth_store, _telemetry_guard) =
1424        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1425    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1426
1427    spawn_admin_metrics_listeners(&runtime, &auth_store);
1428
1429    // Issue #933: collapse the loopback proxy. All three transports are
1430    // served from one in-process acceptor that shares the single tokio
1431    // runtime (ADR 0035) — no internal HTTP/gRPC/wire listeners, no
1432    // `copy_bidirectional` hop. We build the handler objects here and hand
1433    // them to the demux, which classifies each connection and dispatches.
1434    let http_server = build_http_server(
1435        runtime.clone(),
1436        auth_store.clone(),
1437        router_bind_addr.clone(),
1438    );
1439    let http_server = apply_http_limits(http_server, &config, &runtime);
1440
1441    let grpc_server = RedDBGrpcServer::with_options(
1442        runtime.clone(),
1443        GrpcServerOptions {
1444            bind_addr: router_bind_addr.clone(),
1445            tls: None,
1446        },
1447        auth_store,
1448    );
1449
1450    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1451        .enable_all()
1452        .worker_threads(worker_threads)
1453        .thread_stack_size(rt_config.stack_size)
1454        .build()
1455        .map_err(|err| format!("tokio runtime: {err}"))?;
1456
1457    let signal_runtime = runtime.clone();
1458    let wire_runtime = Arc::new(runtime);
1459    tokio_runtime.block_on(async move {
1460        spawn_lifecycle_signal_handler(signal_runtime).await;
1461        tracing::info!(
1462            bind = %router_bind_addr,
1463            cpus = rt_config.available_cpus,
1464            workers = worker_threads,
1465            "router bootstrapping"
1466        );
1467        serve_tcp_router(InProcessRouterConfig {
1468            bind_addr: router_bind_addr,
1469            http_server,
1470            grpc_server,
1471            wire_runtime,
1472        })
1473        .await
1474        .map_err(|err| err.to_string())
1475    })
1476}
1477
1478/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1479async fn spawn_wire_listeners(
1480    config: &ServerCommandConfig,
1481    runtime: &RedDBRuntime,
1482    readiness: &mut TransportReadiness,
1483) -> Result<(), String> {
1484    // Plaintext RedWire — TCP or Unix socket
1485    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1486        let wire_rt = Arc::new(runtime.clone());
1487        // Address starting with `unix://` or an absolute filesystem path
1488        // switches to Unix domain sockets.
1489        #[cfg(unix)]
1490        {
1491            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1492                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1493                tokio::spawn(async move {
1494                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1495                        &wire_addr, wire_rt,
1496                    )
1497                    .await
1498                    {
1499                        tracing::error!(err = %e, "redwire unix listener error");
1500                    }
1501                });
1502                return Ok(());
1503            }
1504        }
1505        match tokio::net::TcpListener::bind(&wire_addr).await {
1506            Ok(listener) => {
1507                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1508                tokio::spawn(async move {
1509                    if let Err(e) =
1510                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1511                            .await
1512                    {
1513                        tracing::error!(err = %e, "redwire listener error");
1514                    }
1515                });
1516            }
1517            Err(err) => {
1518                let reason = format!("wire listener bind {wire_addr}: {err}");
1519                readiness.failed(
1520                    "wire",
1521                    &wire_addr,
1522                    config.wire_bind_explicit,
1523                    reason.clone(),
1524                );
1525                if config.wire_bind_explicit {
1526                    tracing::error!(
1527                        transport = "wire",
1528                        bind = %wire_addr,
1529                        error = %err,
1530                        "fatal explicit bind failure"
1531                    );
1532                    return Err(format!("explicit {reason}"));
1533                }
1534                tracing::warn!(
1535                    transport = "wire",
1536                    bind = %wire_addr,
1537                    error = %err,
1538                    "non-fatal implicit bind failure; listener degraded"
1539                );
1540            }
1541        }
1542    }
1543
1544    // RedWire over TLS
1545    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1546        let tls_config = resolve_wire_tls_config(config);
1547        match tls_config {
1548            Ok(tls_cfg) => {
1549                let wire_rt = Arc::new(runtime.clone());
1550                tokio::spawn(async move {
1551                    if let Err(e) =
1552                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1553                            .await
1554                    {
1555                        tracing::error!(err = %e, "redwire+tls listener error");
1556                    }
1557                });
1558            }
1559            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1560        }
1561    }
1562    Ok(())
1563}
1564
1565/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1566///
1567/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1568/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1569/// alongside the native wire listener; the two transports do not
1570/// share a port.
1571fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1572    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1573        let rt = Arc::new(runtime.clone());
1574        tokio::spawn(async move {
1575            let cfg = crate::wire::PgWireConfig {
1576                bind_addr: pg_addr,
1577                ..Default::default()
1578            };
1579            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1580                tracing::error!(err = %e, "pg wire listener error");
1581            }
1582        });
1583    }
1584}
1585
1586/// Resolve gRPC TLS material into PEM bytes.
1587///
1588/// Lookup order, in priority:
1589///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1590///      passed via CLI/env). Both must be present together.
1591///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1592///      to the data dir. Refuses to run without the env flag so an
1593///      operator can't accidentally ship a dev cert in prod.
1594///
1595/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1596/// turning the listener into a mutual-TLS endpoint that requires
1597/// every client to present a chain that anchors at one of the CAs
1598/// in the bundle.
1599fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1600    use crate::utils::secret_file::expand_file_env;
1601
1602    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1603    // surface as warnings; the fallback path (explicit cert paths) will
1604    // cover the common case.
1605    for var in [
1606        "REDDB_GRPC_TLS_CERT",
1607        "REDDB_GRPC_TLS_KEY",
1608        "REDDB_GRPC_TLS_CLIENT_CA",
1609    ] {
1610        if let Err(err) = expand_file_env(var) {
1611            tracing::warn!(
1612                target: "reddb::secrets",
1613                env = %var,
1614                err = %err,
1615                "could not expand *_FILE companion for gRPC TLS"
1616            );
1617        }
1618    }
1619
1620    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1621        (Some(cert), Some(key)) => {
1622            let cert_pem = std::fs::read(cert)
1623                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1624            let key_pem =
1625                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1626            (cert_pem, key_pem)
1627        }
1628        _ => {
1629            // No explicit material → only proceed when dev-mode is on.
1630            let dev = std::env::var("RED_GRPC_TLS_DEV")
1631                .ok()
1632                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1633                .unwrap_or(false);
1634            if !dev {
1635                return Err("gRPC TLS configured but no cert/key supplied — set \
1636                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1637                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1638                    .to_string());
1639            }
1640            let dir = config
1641                .path
1642                .as_ref()
1643                .and_then(|p| p.parent())
1644                .map(PathBuf::from)
1645                .unwrap_or_else(|| PathBuf::from("."));
1646            let (cert_pem_str, key_pem_str) =
1647                crate::wire::tls::generate_self_signed_cert("localhost")
1648                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1649
1650            // Constant-time-friendly fingerprint to stderr so the
1651            // operator can pin a client trust store. We log via
1652            // `tracing::warn!` so it stands out next to ordinary
1653            // listener-online events.
1654            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1655            tracing::warn!(
1656                target: "reddb::security",
1657                transport = "grpc",
1658                cert_sha256 = %fp,
1659                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1660                 DO NOT use in production"
1661            );
1662            // Persist so that restarts reuse the same identity.
1663            let cert_path = dir.join("grpc-tls-cert.pem");
1664            let key_path = dir.join("grpc-tls-key.pem");
1665            if !cert_path.exists() || !key_path.exists() {
1666                let _ = std::fs::create_dir_all(&dir);
1667                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1668                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1669                std::fs::write(&key_path, key_pem_str.as_bytes())
1670                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1671                #[cfg(unix)]
1672                {
1673                    use std::os::unix::fs::PermissionsExt;
1674                    let _ =
1675                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1676                }
1677            }
1678            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1679        }
1680    };
1681
1682    let client_ca_pem = match &config.grpc_tls_client_ca {
1683        Some(path) => Some(
1684            std::fs::read(path)
1685                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1686        ),
1687        None => None,
1688    };
1689
1690    Ok(crate::GrpcTlsOptions {
1691        cert_pem,
1692        key_pem,
1693        client_ca_pem,
1694    })
1695}
1696
1697/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1698/// configured. Logs and continues on TLS-config errors so the plain
1699/// listener stays up; this matches the wire-listener pattern.
1700fn spawn_grpc_tls_listener_if_configured(
1701    config: &ServerCommandConfig,
1702    runtime: RedDBRuntime,
1703    auth_store: Arc<AuthStore>,
1704) {
1705    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1706        return;
1707    };
1708    let tls_opts = match resolve_grpc_tls_options(config) {
1709        Ok(opts) => opts,
1710        Err(err) => {
1711            tracing::error!(
1712                target: "reddb::security",
1713                transport = "grpc",
1714                err = %err,
1715                "gRPC TLS config error; TLS listener will not start"
1716            );
1717            return;
1718        }
1719    };
1720    tokio::spawn(async move {
1721        let server = RedDBGrpcServer::with_options(
1722            runtime,
1723            GrpcServerOptions {
1724                bind_addr: tls_bind.clone(),
1725                tls: Some(tls_opts),
1726            },
1727            auth_store,
1728        );
1729        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1730        if let Err(err) = server.serve().await {
1731            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1732        }
1733    });
1734}
1735
1736/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1737/// lines. Constant-time hash; no token contents leave this fn.
1738fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1739    use sha2::{Digest, Sha256};
1740    let mut h = Sha256::new();
1741    h.update(pem);
1742    let d = h.finalize();
1743    let mut buf = String::with_capacity(64);
1744    for b in d.iter() {
1745        buf.push_str(&format!("{b:02x}"));
1746    }
1747    buf
1748}
1749
1750/// Resolve TLS config: use provided cert/key or auto-generate.
1751fn resolve_wire_tls_config(
1752    config: &ServerCommandConfig,
1753) -> Result<crate::wire::WireTlsConfig, String> {
1754    match (&config.wire_tls_cert, &config.wire_tls_key) {
1755        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1756            cert_path: cert.clone(),
1757            key_path: key.clone(),
1758        }),
1759        _ => {
1760            // Auto-generate self-signed cert
1761            let dir = config
1762                .path
1763                .as_ref()
1764                .and_then(|p| p.parent())
1765                .map(PathBuf::from)
1766                .unwrap_or_else(|| PathBuf::from("."));
1767            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1768        }
1769    }
1770}
1771
1772#[inline(never)]
1773fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1774    let rt_config = detect_runtime_config();
1775    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1776    let cli_telemetry = config.telemetry.clone();
1777    let db_options = config.to_db_options()?;
1778    let mut transport_readiness = TransportReadiness::default();
1779
1780    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1781        .enable_all()
1782        .worker_threads(workers)
1783        .thread_stack_size(rt_config.stack_size)
1784        .build()
1785        .map_err(|err| format!("tokio runtime: {err}"))?;
1786
1787    // Guard lives on the outer thread's stack so it outlives the
1788    // tokio runtime — dropping it only after the listener returns
1789    // flushes the file log writer.
1790    let (runtime, _auth_store, _telemetry_guard) =
1791        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1792    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1793    let signal_runtime = runtime.clone();
1794    tokio_runtime.block_on(async move {
1795        spawn_lifecycle_signal_handler(signal_runtime).await;
1796        spawn_pg_listener(&config, &runtime);
1797        let wire_rt = Arc::new(runtime);
1798        let listener = tokio::net::TcpListener::bind(&wire_addr)
1799            .await
1800            .map_err(|err| {
1801                let reason = format!("wire listener bind {wire_addr}: {err}");
1802                transport_readiness.failed(
1803                    "wire",
1804                    &wire_addr,
1805                    config.wire_bind_explicit,
1806                    reason.clone(),
1807                );
1808                if config.wire_bind_explicit {
1809                    format!("explicit {reason}")
1810                } else {
1811                    reason
1812                }
1813            })?;
1814        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1815        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1816            .await
1817            .map_err(|e| e.to_string())
1818    })
1819}
1820
1821#[inline(never)]
1822fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1823    let rt_config = detect_runtime_config();
1824    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1825    let cli_telemetry = config.telemetry.clone();
1826    let db_options = config.to_db_options()?;
1827
1828    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1829        .enable_all()
1830        .worker_threads(workers)
1831        .thread_stack_size(rt_config.stack_size)
1832        .build()
1833        .map_err(|err| format!("tokio runtime: {err}"))?;
1834
1835    let (runtime, _auth_store, _telemetry_guard) =
1836        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1837    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1838    let signal_runtime = runtime.clone();
1839    tokio_runtime.block_on(async move {
1840        spawn_lifecycle_signal_handler(signal_runtime).await;
1841        let cfg = crate::wire::PgWireConfig {
1842            bind_addr: pg_addr,
1843            ..Default::default()
1844        };
1845        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1846            .await
1847            .map_err(|e| e.to_string())
1848    })
1849}
1850
1851#[inline(never)]
1852fn build_runtime_and_auth_store(
1853    db_options: RedDBOptions,
1854    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1855) -> Result<
1856    (
1857        RedDBRuntime,
1858        Arc<AuthStore>,
1859        Option<crate::telemetry::TelemetryGuard>,
1860    ),
1861    String,
1862> {
1863    // Return the TelemetryGuard so server runners can bind it for
1864    // their full lifetime. Dropping the guard tears down the
1865    // non-blocking log writer thread and, because that writer is
1866    // built with `.lossy(true)`, any subsequent log event routed to
1867    // the file sink is silently dropped — so callers MUST keep the
1868    // returned `Option<TelemetryGuard>` alive until shutdown.
1869    build_runtime_with_telemetry(db_options, cli_telemetry)
1870}
1871
1872/// Open the runtime, initialise structured logging from merged CLI +
1873/// `red_config` settings, and return a guard the caller must keep
1874/// alive for the server lifetime (drop flushes pending log writes).
1875///
1876/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1877/// in red_config, beats the built-in default. A CLI-flag value of
1878/// `None` / empty means "inherit from config or default" — never
1879/// "disable". The one exception is `--no-log-file` which forces
1880/// `log_dir = None` regardless of config.
1881pub(crate) fn build_runtime_with_telemetry(
1882    db_options: RedDBOptions,
1883    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1884) -> Result<
1885    (
1886        RedDBRuntime,
1887        Arc<AuthStore>,
1888        Option<crate::telemetry::TelemetryGuard>,
1889    ),
1890    String,
1891> {
1892    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1893        // Issue #205 — runtime construction failure is the canonical
1894        // StartupFailed phase. The audit sink isn't installed yet
1895        // (it would have been registered inside `with_options`), so
1896        // the emit falls through to tracing+eprintln only — operator
1897        // still sees it on stderr.
1898        let msg = err.to_string();
1899        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1900            phase: "runtime_construction".to_string(),
1901            error: msg.clone(),
1902        }
1903        .emit_global();
1904        msg
1905    })?;
1906
1907    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1908    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1909    // operator asked for a fence; running unfenced would silently
1910    // expose split-brain risk.
1911    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1912        let msg = err.to_string();
1913        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1914            phase: "lease_loop".to_string(),
1915            error: msg.clone(),
1916        }
1917        .emit_global();
1918        msg
1919    })?;
1920
1921    // #213 — edge-triggered disk-space watchdog. Watches the data
1922    // directory; falls back to polling when fanotify is unavailable
1923    // (non-Linux or unprivileged container).
1924    if let Some(data_path) = db_options.data_path.as_deref() {
1925        let watch_dir = data_path.parent().unwrap_or(data_path);
1926        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1927    }
1928
1929    // #214 — inotify config hot-reload watcher. Watches the config file
1930    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1931    // applies hot-reloadable keys without restart.
1932    {
1933        let config_path = crate::runtime::config_overlay::config_file_path();
1934        let store = runtime.db().store();
1935        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1936    }
1937
1938    // Phase 6 logging: merge red_config overrides onto the CLI-built
1939    // telemetry config, then install the global subscriber.
1940    let merged = merge_telemetry_with_config(
1941        cli_telemetry
1942            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1943        &runtime,
1944    );
1945    let telemetry_guard = crate::telemetry::init(merged);
1946
1947    let no_auth = no_auth_active(&db_options);
1948    let auth_store =
1949        if db_options.auth.vault_enabled {
1950            let pager =
1951                runtime.db().store().pager().cloned().ok_or_else(|| {
1952                    "vault requires a paged database (persistent mode)".to_string()
1953                })?;
1954            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1955                .map_err(|err| err.to_string())?;
1956            Arc::new(store)
1957        } else {
1958            Arc::new(AuthStore::new(db_options.auth.clone()))
1959        };
1960    auth_store.configure_control_events(
1961        runtime.control_event_ledger(),
1962        runtime.control_event_config(),
1963    );
1964    // Issue #663 — when `--no-auth` is active, deliberately skip the
1965    // preset machinery. Otherwise a stray `REDDB_USERNAME`+`REDDB_PASSWORD`
1966    // pair in the operator's environment would silently create an admin
1967    // user, defeating the whole point of opting into anonymous mode.
1968    if no_auth {
1969        eprintln!("{NO_AUTH_WARNING}");
1970        tracing::warn!("{NO_AUTH_WARNING}");
1971    } else {
1972        apply_preset(&runtime, &auth_store)?;
1973        maybe_apply_policy_break_glass(&auth_store);
1974    }
1975
1976    // Background session purge (every 5 minutes)
1977    {
1978        let store = Arc::clone(&auth_store);
1979        std::thread::Builder::new()
1980            .name("reddb-session-purge".into())
1981            .spawn(move || loop {
1982                std::thread::sleep(std::time::Duration::from_secs(300));
1983                store.purge_expired_sessions();
1984            })
1985            .ok();
1986    }
1987
1988    Ok((runtime, auth_store, telemetry_guard))
1989}
1990
1991/// Honour `REDDB_POLICY_BREAK_GLASS=1` at boot — see issue #713 and
1992/// the [`crate::auth::self_lock_guard`] module. Anything other than
1993/// `1`/`true`/`yes` (case-insensitive) is treated as not set.
1994fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
1995    use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
1996
1997    let enabled = std::env::var(BREAK_GLASS_ENV)
1998        .ok()
1999        .map(|v| {
2000            let trimmed = v.trim().to_ascii_lowercase();
2001            matches!(trimmed.as_str(), "1" | "true" | "yes")
2002        })
2003        .unwrap_or(false);
2004    if !enabled {
2005        return;
2006    }
2007    let now = crate::utils::now_unix_millis() as u128;
2008    match auth_store.apply_policy_break_glass(now) {
2009        Ok(()) => {
2010            tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2011        }
2012        Err(err) => {
2013            tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2014        }
2015    }
2016}
2017
2018/// Reserved config keys describing first-boot bootstrap state (issue #650).
2019/// Presence of [`BOOTSTRAP_COMPLETED_KEY`] is the idempotency hinge: when
2020/// it is set, [`apply_preset`] silently no-ops on subsequent boots so a
2021/// container restart with the same env is a no-op.
2022pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2023pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2024pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2025
2026/// Env var selecting the bootstrap preset. Default = `simple`.
2027pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2028pub(crate) const PRESET_SIMPLE: &str = "simple";
2029pub(crate) const PRESET_PRODUCTION: &str = "production";
2030pub(crate) const PRESET_REGULATED: &str = "regulated";
2031
2032/// Policy id installed by the `production` preset and attached to the
2033/// first admin. Grants `"*"` on `"*"` so the admin has policy-derived
2034/// broad authority (acceptance #3) — not an authorization bypass.
2035pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2036pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2037pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2038pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2039pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2040
2041/// Apply the bootstrap preset selected by `REDDB_PRESET` (default
2042/// `simple`). Idempotent — if `system.bootstrap.completed` is already
2043/// set, this is a one-line `tracing::info!` no-op and the server proceeds
2044/// (issue #650 acceptance #5).
2045///
2046/// Caller must have already short-circuited the `--no-auth` / `--dev`
2047/// path (issue #663): when that flag is set, the preset must be skipped
2048/// entirely — no admin created, no bootstrap state written.
2049pub(crate) fn apply_preset(
2050    runtime: &RedDBRuntime,
2051    auth_store: &Arc<AuthStore>,
2052) -> Result<(), String> {
2053    let store = runtime.db().store();
2054
2055    if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2056        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2057            runtime,
2058            &runtime.config_registry(),
2059        )?;
2060        tracing::info!("bootstrap state present, skipping preset application");
2061        return Ok(());
2062    }
2063
2064    // `_FILE` companion expansion for k8s secret mounts. Errors here
2065    // (e.g. both `REDDB_PASSWORD` and `REDDB_PASSWORD_FILE` set) are
2066    // operator misconfigs and should fail the boot loudly.
2067    for var in ["REDDB_USERNAME", "REDDB_PASSWORD"] {
2068        crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2069    }
2070
2071    let preset = std::env::var(PRESET_ENV)
2072        .ok()
2073        .map(|s| s.trim().to_string())
2074        .filter(|s| !s.is_empty())
2075        .unwrap_or_else(|| PRESET_SIMPLE.to_string());
2076
2077    if let Ok(path) = std::env::var(crate::cli::bootstrap_manifest::MANIFEST_ENV) {
2078        let path = path.trim();
2079        if !path.is_empty() {
2080            let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2081                runtime,
2082                auth_store,
2083                &runtime.config_registry(),
2084                std::path::Path::new(path),
2085            )?;
2086            persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2087            tracing::info!("bootstrap manifest applied");
2088            return Ok(());
2089        }
2090    }
2091
2092    let first_admin_id = match preset.as_str() {
2093        PRESET_SIMPLE => {
2094            // `simple` is the default low-friction preset. Auth knobs
2095            // remain whatever the CLI/env set; we only persist the
2096            // bootstrap state so subsequent boots are idempotent.
2097            None
2098        }
2099        PRESET_PRODUCTION => Some(apply_production_preset(auth_store)?),
2100        PRESET_REGULATED => {
2101            apply_regulated_preset(runtime, auth_store)?;
2102            None
2103        }
2104        other => {
2105            return Err(format!(
2106                "REDDB_PRESET={other:?} is not recognised (expected `simple`, `production`, or `regulated`)"
2107            ));
2108        }
2109    };
2110
2111    persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2112    tracing::info!(preset = %preset, "bootstrap preset applied");
2113    Ok(())
2114}
2115
2116fn apply_production_preset(auth_store: &Arc<AuthStore>) -> Result<String, String> {
2117    use crate::auth::store::PrincipalRef;
2118    use crate::auth::{policies::Policy, UserId};
2119
2120    let username = std::env::var("REDDB_USERNAME")
2121        .ok()
2122        .filter(|s| !s.is_empty())
2123        .ok_or_else(|| {
2124            "REDDB_PRESET=production requires REDDB_USERNAME (or REDDB_USERNAME_FILE)".to_string()
2125        })?;
2126    let password = std::env::var("REDDB_PASSWORD")
2127        .ok()
2128        .filter(|s| !s.is_empty())
2129        .ok_or_else(|| {
2130            "REDDB_PRESET=production requires REDDB_PASSWORD (or REDDB_PASSWORD_FILE)".to_string()
2131        })?;
2132
2133    // (1) Create the first admin as system-owned + platform-scoped so
2134    // #649's `ManagedConfigGate` accepts them on managed-config writes.
2135    let result = auth_store
2136        .bootstrap_system_admin(&username, &password)
2137        .map_err(|err| format!("bootstrap first admin: {err}"))?;
2138    let first_admin = UserId::platform(result.user.username.clone());
2139
2140    // (2) Install the allow-all policy as an ordinary policy row.
2141    let policy = Policy::from_json_str(&format!(
2142        r#"{{
2143            "id": "{id}",
2144            "version": 1,
2145            "statements": [{{
2146                "effect": "allow",
2147                "actions": ["*"],
2148                "resources": ["*"]
2149            }}]
2150        }}"#,
2151        id = FIRST_ADMIN_ALLOW_ALL_POLICY
2152    ))
2153    .map_err(|err| format!("compile allow-all policy: {err}"))?;
2154    auth_store
2155        .put_policy(policy)
2156        .map_err(|err| format!("install allow-all policy: {err}"))?;
2157
2158    // (3) Attach the policy to the first admin.
2159    auth_store
2160        .attach_policy(
2161            PrincipalRef::User(first_admin.clone()),
2162            FIRST_ADMIN_ALLOW_ALL_POLICY,
2163        )
2164        .map_err(|err| format!("attach allow-all policy: {err}"))?;
2165
2166    Ok(first_admin.to_string())
2167}
2168
2169fn apply_regulated_preset(
2170    runtime: &RedDBRuntime,
2171    auth_store: &Arc<AuthStore>,
2172) -> Result<(), String> {
2173    use crate::auth::policies::Policy;
2174    use crate::auth::registry::EvidenceRequirement;
2175
2176    runtime.query_audit().enable_infrastructure();
2177
2178    let policy = Policy::from_json_str(&format!(
2179        r#"{{
2180            "id": "{id}",
2181            "version": 1,
2182            "statements": [
2183                {{
2184                    "effect": "deny",
2185                    "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2186                    "resources": ["policy:{id}"]
2187                }},
2188                {{
2189                    "effect": "deny",
2190                    "actions": ["config:write"],
2191                    "resources": [
2192                        "config:{audit}.*",
2193                        "config:{evidence}.*",
2194                        "config:{query_audit}.*"
2195                    ]
2196                }}
2197            ]
2198        }}"#,
2199        id = REGULATED_PROTECT_MANAGED_POLICY,
2200        audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2201        evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2202        query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2203    ))
2204    .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2205    auth_store
2206        .put_policy(policy)
2207        .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2208
2209    let now_ms = crate::utils::now_unix_millis() as u128;
2210    let entries = vec![
2211        regulated_registry_entry(
2212            REGULATED_PROTECT_MANAGED_POLICY,
2213            crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2214            "iam_policy",
2215            "policy:*",
2216            &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2217            EvidenceRequirement::Metadata,
2218            now_ms,
2219        ),
2220        regulated_registry_entry(
2221            REGULATED_AUDIT_CONFIG_NAMESPACE,
2222            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2223            "config_namespace",
2224            "config:write",
2225            &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2226            EvidenceRequirement::Metadata,
2227            now_ms,
2228        ),
2229        regulated_registry_entry(
2230            REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2231            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2232            "config_namespace",
2233            "config:write",
2234            &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2235            EvidenceRequirement::Metadata,
2236            now_ms,
2237        ),
2238        regulated_registry_entry(
2239            REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2240            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2241            "config_namespace",
2242            "config:write",
2243            &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2244            EvidenceRequirement::Metadata,
2245            now_ms,
2246        ),
2247    ];
2248
2249    for entry in entries.iter().cloned() {
2250        runtime
2251            .config_registry()
2252            .restore_bootstrap_entry(entry)
2253            .map_err(|err| format!("install regulated registry entry: {err}"))?;
2254    }
2255    crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2256    Ok(())
2257}
2258
2259fn regulated_registry_entry(
2260    id: &str,
2261    resource_type: &str,
2262    schema: &str,
2263    required_action: &str,
2264    required_resource: &str,
2265    evidence_requirement: crate::auth::registry::EvidenceRequirement,
2266    updated_at_ms: u128,
2267) -> crate::auth::registry::ConfigRegistryEntry {
2268    crate::auth::registry::ConfigRegistryEntry {
2269        id: id.to_string(),
2270        version: 1,
2271        resource_type: resource_type.to_string(),
2272        schema: schema.to_string(),
2273        mutability: crate::auth::registry::Mutability::Immutable,
2274        sensitivity: crate::auth::registry::Sensitivity::Internal,
2275        managed: true,
2276        required_action: required_action.to_string(),
2277        required_resource: required_resource.to_string(),
2278        evidence_requirement,
2279        updated_by: "system:regulated-preset".to_string(),
2280        updated_at_ms,
2281    }
2282}
2283
2284fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2285    let store = runtime.db().store();
2286    let mut tree = crate::serde_json::Map::new();
2287    tree.insert(
2288        BOOTSTRAP_COMPLETED_KEY.to_string(),
2289        crate::serde_json::Value::Bool(true),
2290    );
2291    tree.insert(
2292        BOOTSTRAP_PRESET_KEY.to_string(),
2293        crate::serde_json::Value::String(preset.to_string()),
2294    );
2295    if let Some(id) = first_admin_id {
2296        tree.insert(
2297            BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2298            crate::serde_json::Value::String(id.to_string()),
2299        );
2300    }
2301    let json = crate::serde_json::Value::Object(tree);
2302    store.set_config_tree("", &json);
2303}
2304
2305/// Read `red.logging.*` keys from the persistent config store and
2306/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
2307/// explicit CLI flag > red_config > built-in default.
2308///
2309/// The "was a flag passed" signal comes from the `*_explicit` bools
2310/// on `TelemetryConfig`, populated by the CLI parser. This replaces
2311/// an earlier equality-to-default heuristic that silently dropped
2312/// config whenever the CLI-derived default diverged from
2313/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
2314/// non-TTY `format`) and that silently overrode `--no-log-file`.
2315fn merge_telemetry_with_config(
2316    mut cli: crate::telemetry::TelemetryConfig,
2317    runtime: &RedDBRuntime,
2318) -> crate::telemetry::TelemetryConfig {
2319    use crate::storage::schema::Value;
2320
2321    let store = runtime.db().store();
2322
2323    if !cli.level_explicit {
2324        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2325            cli.level_filter = v.to_string();
2326        }
2327    }
2328    if !cli.format_explicit {
2329        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2330            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2331                cli.format = parsed;
2332            }
2333        }
2334    }
2335    if !cli.rotation_keep_days_explicit {
2336        match store.get_config("red.logging.keep_days") {
2337            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2338                cli.rotation_keep_days = n as u16
2339            }
2340            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2341                cli.rotation_keep_days = n as u16
2342            }
2343            Some(Value::Text(v)) => {
2344                if let Ok(n) = v.parse::<u16>() {
2345                    cli.rotation_keep_days = n;
2346                }
2347            }
2348            _ => {}
2349        }
2350    }
2351    if !cli.file_prefix_explicit {
2352        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2353            if !v.is_empty() {
2354                cli.file_prefix = v.to_string();
2355            }
2356        }
2357    }
2358    // --no-log-file is a kill-switch: config cannot resurrect the
2359    // file sink. Explicit --log-dir also wins.
2360    if !cli.log_dir_explicit && !cli.log_file_disabled {
2361        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2362            if !v.is_empty() {
2363                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2364            }
2365        }
2366    }
2367
2368    cli
2369}
2370
2371#[cfg(test)]
2372mod telemetry_merge_tests {
2373    use super::*;
2374    use crate::telemetry::{LogFormat, TelemetryConfig};
2375
2376    fn fresh_runtime() -> RedDBRuntime {
2377        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2378    }
2379
2380    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2381        runtime
2382            .db()
2383            .store()
2384            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2385    }
2386
2387    fn cli_base() -> TelemetryConfig {
2388        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
2389        // log_dir = Some(...), format = Json. Nothing marked explicit.
2390        TelemetryConfig {
2391            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2392            format: LogFormat::Json,
2393            ..Default::default()
2394        }
2395    }
2396
2397    #[test]
2398    fn config_log_dir_promoted_when_flag_absent() {
2399        let runtime = fresh_runtime();
2400        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2401        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2402        assert_eq!(
2403            merged.log_dir.as_deref(),
2404            Some(std::path::Path::new("/var/log/reddb"))
2405        );
2406    }
2407
2408    #[test]
2409    fn explicit_log_dir_wins_over_config() {
2410        let runtime = fresh_runtime();
2411        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2412        let mut cli = cli_base();
2413        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2414        cli.log_dir_explicit = true;
2415        let merged = merge_telemetry_with_config(cli, &runtime);
2416        assert_eq!(
2417            merged.log_dir.as_deref(),
2418            Some(std::path::Path::new("/custom/dir"))
2419        );
2420    }
2421
2422    #[test]
2423    fn no_log_file_beats_config_log_dir() {
2424        let runtime = fresh_runtime();
2425        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2426        let mut cli = cli_base();
2427        cli.log_dir = None;
2428        cli.log_file_disabled = true;
2429        let merged = merge_telemetry_with_config(cli, &runtime);
2430        assert!(
2431            merged.log_dir.is_none(),
2432            "--no-log-file must veto config dir"
2433        );
2434    }
2435
2436    #[test]
2437    fn config_format_promoted_on_non_tty_default() {
2438        // On non-TTY, default_telemetry_for_path yields format=Json even
2439        // though TelemetryConfig::default() is Pretty. The old equality
2440        // check silently dropped config here.
2441        let runtime = fresh_runtime();
2442        set_str(&runtime, "red.logging.format", "pretty");
2443        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2444        assert_eq!(merged.format, LogFormat::Pretty);
2445    }
2446
2447    #[test]
2448    fn explicit_format_wins_over_config() {
2449        let runtime = fresh_runtime();
2450        set_str(&runtime, "red.logging.format", "pretty");
2451        let mut cli = cli_base();
2452        cli.format = LogFormat::Json;
2453        cli.format_explicit = true;
2454        let merged = merge_telemetry_with_config(cli, &runtime);
2455        assert_eq!(merged.format, LogFormat::Json);
2456    }
2457}
2458
2459#[inline(never)]
2460fn build_http_server(
2461    runtime: RedDBRuntime,
2462    auth_store: Arc<AuthStore>,
2463    bind_addr: String,
2464) -> RedDBServer {
2465    build_http_server_with_transport_readiness(
2466        runtime,
2467        auth_store,
2468        bind_addr,
2469        TransportReadiness::default(),
2470    )
2471}
2472
2473/// Apply the resolved HTTP limits to a freshly-built `RedDBServer`.
2474///
2475/// Centralised here so every `run_*` path goes through the same
2476/// resolver and the structured startup log line carries the same
2477/// `http_limits.*` fields regardless of transport combination.
2478fn apply_http_limits(
2479    server: RedDBServer,
2480    config: &ServerCommandConfig,
2481    runtime: &RedDBRuntime,
2482) -> RedDBServer {
2483    let store = runtime.db().store();
2484    let resolved =
2485        crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2486            .get_config(key)
2487        {
2488            Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2489            Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2490            Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2491            _ => None,
2492        });
2493    tracing::info!(
2494        target: "reddb::http_limits",
2495        max_handlers = resolved.max_handlers,
2496        handler_timeout_ms = resolved.handler_timeout_ms,
2497        retry_after_secs = resolved.retry_after_secs,
2498        max_inflight_per_principal = resolved.max_inflight_per_principal,
2499        "http_limits resolved"
2500    );
2501    server.with_http_limits(resolved)
2502}
2503
2504#[inline(never)]
2505fn build_http_server_with_transport_readiness(
2506    runtime: RedDBRuntime,
2507    auth_store: Arc<AuthStore>,
2508    bind_addr: String,
2509    transport_readiness: TransportReadiness,
2510) -> RedDBServer {
2511    RedDBServer::with_options(
2512        runtime,
2513        ServerOptions {
2514            bind_addr,
2515            transport_readiness,
2516            ..ServerOptions::default()
2517        },
2518    )
2519    .with_auth(auth_store)
2520}
2521
2522/// PLAN.md Phase 6.2 — build a listener that only serves
2523/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
2524/// when the env var has no host (loopback-only by default per spec).
2525#[inline(never)]
2526fn build_admin_only_server(
2527    runtime: RedDBRuntime,
2528    auth_store: Arc<AuthStore>,
2529    bind_addr: String,
2530) -> RedDBServer {
2531    RedDBServer::with_options(
2532        runtime,
2533        ServerOptions {
2534            bind_addr,
2535            surface: crate::server::ServerSurface::AdminOnly,
2536            ..ServerOptions::default()
2537        },
2538    )
2539    .with_auth(auth_store)
2540}
2541
2542/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
2543/// + `/health/*`. Suitable for Prometheus scrape ports that may be
2544///   exposed wider than the admin port.
2545#[inline(never)]
2546fn build_metrics_only_server(
2547    runtime: RedDBRuntime,
2548    auth_store: Arc<AuthStore>,
2549    bind_addr: String,
2550) -> RedDBServer {
2551    RedDBServer::with_options(
2552        runtime,
2553        ServerOptions {
2554            bind_addr,
2555            surface: crate::server::ServerSurface::MetricsOnly,
2556            ..ServerOptions::default()
2557        },
2558    )
2559    .with_auth(auth_store)
2560}
2561
2562/// Spawn dedicated admin / metrics listeners when the operator set
2563/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
2564/// unset the existing listener keeps serving everything (back-compat).
2565fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2566    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2567        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2568        let _ = server.serve_in_background();
2569        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2570    }
2571    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2572        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2573        let _ = server.serve_in_background();
2574        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2575    }
2576}
2577
2578#[inline(never)]
2579fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2580    let cli_telemetry = config.telemetry.clone();
2581    let mut transport_readiness = TransportReadiness::default();
2582    let Some(listener) = bind_listener_for_startup(
2583        &mut transport_readiness,
2584        "http",
2585        &bind_addr,
2586        config.http_bind_explicit,
2587    )?
2588    else {
2589        return Err(format!(
2590            "no HTTP listener started; implicit bind {} failed",
2591            bind_addr
2592        ));
2593    };
2594    let db_options = config.to_db_options()?;
2595    let (runtime, auth_store, _telemetry_guard) =
2596        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2597    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2598    spawn_admin_metrics_listeners(&runtime, &auth_store);
2599    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2600    let server = build_http_server_with_transport_readiness(
2601        runtime.clone(),
2602        auth_store,
2603        bind_addr.clone(),
2604        transport_readiness,
2605    );
2606    let server = apply_http_limits(server, &config, &runtime);
2607    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2608    server.serve_on(listener).map_err(|err| err.to_string())
2609}
2610
2611/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
2612/// rustls-terminated listener alongside the plain HTTP server. Cert
2613/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
2614///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
2615///   is auto-generated next to the data directory (refused otherwise).
2616fn spawn_http_tls_listener(
2617    config: &ServerCommandConfig,
2618    runtime: &RedDBRuntime,
2619    auth_store: &Arc<AuthStore>,
2620) -> Result<(), String> {
2621    let Some(addr) = config.http_tls_bind_addr.clone() else {
2622        return Ok(());
2623    };
2624
2625    let tls_config = resolve_http_tls_config(config)?;
2626    let server_config = crate::server::tls::build_server_config(&tls_config)
2627        .map_err(|err| format!("HTTP TLS: {err}"))?;
2628
2629    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2630    let server = apply_http_limits(server, config, runtime);
2631    let _handle = server.serve_tls_in_background(server_config);
2632    tracing::info!(
2633        transport = "https",
2634        bind = %addr,
2635        mtls = %tls_config.client_ca_path.is_some(),
2636        "TLS listener online"
2637    );
2638    Ok(())
2639}
2640
2641/// Resolve the HTTP TLS config from CLI / env / dev defaults.
2642fn resolve_http_tls_config(
2643    config: &ServerCommandConfig,
2644) -> Result<crate::server::tls::HttpTlsConfig, String> {
2645    match (&config.http_tls_cert, &config.http_tls_key) {
2646        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2647            cert_path: cert.clone(),
2648            key_path: key.clone(),
2649            client_ca_path: config.http_tls_client_ca.clone(),
2650        }),
2651        (None, None) => {
2652            // Dev-mode auto-generate next to the data directory.
2653            let dir = config
2654                .path
2655                .as_ref()
2656                .and_then(|p| p.parent().map(std::path::PathBuf::from))
2657                .unwrap_or_else(|| std::path::PathBuf::from("."));
2658            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2659                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2660            Ok(crate::server::tls::HttpTlsConfig {
2661                cert_path: auto.cert_path,
2662                key_path: auto.key_path,
2663                client_ca_path: config.http_tls_client_ca.clone(),
2664            })
2665        }
2666        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2667    }
2668}
2669
2670#[inline(never)]
2671fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2672    let workers = config.workers;
2673    let cli_telemetry = config.telemetry.clone();
2674    let db_options = config.to_db_options()?;
2675    let rt_config = detect_runtime_config();
2676    let mut transport_readiness = TransportReadiness::default();
2677    let Some(grpc_listener) = bind_listener_for_startup(
2678        &mut transport_readiness,
2679        "grpc",
2680        &bind_addr,
2681        config.grpc_bind_explicit,
2682    )?
2683    else {
2684        return Err(format!(
2685            "no gRPC listener started; implicit bind {} failed",
2686            bind_addr
2687        ));
2688    };
2689
2690    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2691
2692    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2693        .enable_all()
2694        .worker_threads(worker_threads)
2695        .thread_stack_size(rt_config.stack_size)
2696        .build()
2697        .map_err(|err| format!("tokio runtime: {err}"))?;
2698
2699    // Guard lives on the outer stack so it outlives the tokio runtime.
2700    let (runtime, auth_store, _telemetry_guard) =
2701        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2702    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2703    let signal_runtime = runtime.clone();
2704    tokio_runtime.block_on(async move {
2705        spawn_lifecycle_signal_handler(signal_runtime).await;
2706        // Start wire protocol listeners (plaintext + TLS)
2707        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2708
2709        // Start PostgreSQL wire listener when --pg-bind is configured.
2710        spawn_pg_listener(&config, &runtime);
2711
2712        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
2713        // it spawns a separate listener so plaintext + TLS can run
2714        // side-by-side (50051 plain + 50052 TLS, etc.).
2715        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2716
2717        let server = RedDBGrpcServer::with_options(
2718            runtime,
2719            GrpcServerOptions {
2720                bind_addr: bind_addr.clone(),
2721                tls: None,
2722            },
2723            auth_store,
2724        );
2725
2726        tracing::info!(
2727            transport = "grpc",
2728            bind = %bind_addr,
2729            cpus = rt_config.available_cpus,
2730            workers = worker_threads,
2731            "listener online"
2732        );
2733        server
2734            .serve_on(grpc_listener)
2735            .await
2736            .map_err(|err| err.to_string())
2737    })
2738}
2739
2740#[inline(never)]
2741fn run_dual_server(
2742    config: ServerCommandConfig,
2743    grpc_bind_addr: String,
2744    http_bind_addr: String,
2745) -> Result<(), String> {
2746    let workers = config.workers;
2747    let cli_telemetry = config.telemetry.clone();
2748    let db_options = config.to_db_options()?;
2749    let rt_config = detect_runtime_config();
2750    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2751    let mut transport_readiness = TransportReadiness::default();
2752    let http_listener = bind_listener_for_startup(
2753        &mut transport_readiness,
2754        "http",
2755        &http_bind_addr,
2756        config.http_bind_explicit,
2757    )?;
2758    let grpc_listener = bind_listener_for_startup(
2759        &mut transport_readiness,
2760        "grpc",
2761        &grpc_bind_addr,
2762        config.grpc_bind_explicit,
2763    )?;
2764    if http_listener.is_none() && grpc_listener.is_none() {
2765        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2766    }
2767    let (runtime, auth_store, _telemetry_guard) =
2768        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2769    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2770
2771    spawn_admin_metrics_listeners(&runtime, &auth_store);
2772    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2773
2774    let http_handle = if let Some(listener) = http_listener {
2775        let http_server = build_http_server_with_transport_readiness(
2776            runtime.clone(),
2777            auth_store.clone(),
2778            http_bind_addr.clone(),
2779            transport_readiness.clone(),
2780        );
2781        let http_server = apply_http_limits(http_server, &config, &runtime);
2782        Some(http_server.serve_in_background_on(listener))
2783    } else {
2784        None
2785    };
2786
2787    thread::sleep(Duration::from_millis(150));
2788    if let Some(handle) = http_handle.as_ref() {
2789        if handle.is_finished() {
2790            let handle = http_handle.unwrap();
2791            return match handle.join() {
2792                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2793                Ok(Err(err)) => Err(err.to_string()),
2794                Err(_) => Err("HTTP server thread panicked".to_string()),
2795            };
2796        }
2797    }
2798    if grpc_listener.is_none() {
2799        let Some(handle) = http_handle else {
2800            return Err("no listener started".to_string());
2801        };
2802        return match handle.join() {
2803            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2804            Ok(Err(err)) => Err(err.to_string()),
2805            Err(_) => Err("HTTP server thread panicked".to_string()),
2806        };
2807    }
2808    let grpc_listener = grpc_listener.expect("checked above");
2809
2810    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2811        .enable_all()
2812        .worker_threads(worker_threads)
2813        .thread_stack_size(rt_config.stack_size)
2814        .build()
2815        .map_err(|err| format!("tokio runtime: {err}"))?;
2816
2817    let signal_runtime = runtime.clone();
2818    tokio_runtime.block_on(async move {
2819        spawn_lifecycle_signal_handler(signal_runtime).await;
2820        // Start wire protocol listeners (plaintext + TLS)
2821        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2822
2823        // Start PostgreSQL wire listener when --pg-bind is configured.
2824        spawn_pg_listener(&config, &runtime);
2825
2826        // Optional TLS gRPC listener — runs alongside the plaintext one.
2827        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2828
2829        let server = RedDBGrpcServer::with_options(
2830            runtime,
2831            GrpcServerOptions {
2832                bind_addr: grpc_bind_addr.clone(),
2833                tls: None,
2834            },
2835            auth_store,
2836        );
2837
2838        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2839        tracing::info!(
2840            transport = "grpc",
2841            bind = %grpc_bind_addr,
2842            cpus = rt_config.available_cpus,
2843            workers = worker_threads,
2844            "listener online"
2845        );
2846        server
2847            .serve_on(grpc_listener)
2848            .await
2849            .map_err(|err| err.to_string())
2850    })
2851}
2852
2853#[cfg(test)]
2854mod tests {
2855    use super::*;
2856
2857    #[test]
2858    fn render_systemd_unit_contains_expected_execstart() {
2859        let config = SystemdServiceConfig {
2860            service_name: "reddb".to_string(),
2861            binary_path: PathBuf::from("/usr/local/bin/red"),
2862            run_user: "reddb".to_string(),
2863            run_group: "reddb".to_string(),
2864            data_path: reddb_file::default_service_database_path(),
2865            router_bind_addr: None,
2866            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2867            http_bind_addr: None,
2868        };
2869
2870        let unit = render_systemd_unit(&config);
2871        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2872        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2873    }
2874
2875    #[test]
2876    fn systemd_service_config_derives_paths() {
2877        let config = SystemdServiceConfig {
2878            service_name: "reddb-api".to_string(),
2879            binary_path: PathBuf::from("/usr/local/bin/red"),
2880            run_user: "reddb".to_string(),
2881            run_group: "reddb".to_string(),
2882            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2883            router_bind_addr: None,
2884            grpc_bind_addr: None,
2885            http_bind_addr: Some("127.0.0.1:5055".to_string()),
2886        };
2887
2888        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2889        assert_eq!(
2890            config.unit_path(),
2891            PathBuf::from("/etc/systemd/system/reddb-api.service")
2892        );
2893    }
2894
2895    #[test]
2896    fn render_systemd_unit_supports_dual_transport() {
2897        let config = SystemdServiceConfig {
2898            service_name: "reddb".to_string(),
2899            binary_path: PathBuf::from("/usr/local/bin/red"),
2900            run_user: "reddb".to_string(),
2901            run_group: "reddb".to_string(),
2902            data_path: reddb_file::default_service_database_path(),
2903            router_bind_addr: None,
2904            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2905            http_bind_addr: Some("0.0.0.0:5055".to_string()),
2906        };
2907
2908        let unit = render_systemd_unit(&config);
2909        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2910        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2911    }
2912
2913    #[test]
2914    fn render_systemd_unit_supports_router_mode() {
2915        let config = SystemdServiceConfig {
2916            service_name: "reddb".to_string(),
2917            binary_path: PathBuf::from("/usr/local/bin/red"),
2918            run_user: "reddb".to_string(),
2919            run_group: "reddb".to_string(),
2920            data_path: reddb_file::default_service_database_path(),
2921            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2922            grpc_bind_addr: None,
2923            http_bind_addr: None,
2924        };
2925
2926        let unit = render_systemd_unit(&config);
2927        assert!(unit.contains("--bind 127.0.0.1:5050"));
2928        assert!(!unit.contains("--grpc-bind"));
2929        assert!(!unit.contains("--http-bind"));
2930    }
2931
2932    #[test]
2933    fn explicit_bind_collision_is_fatal() {
2934        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2935        let addr = held.local_addr().expect("held addr").to_string();
2936        let mut readiness = TransportReadiness::default();
2937
2938        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2939
2940        assert!(error.contains("explicit http listener bind"));
2941        assert_eq!(readiness.active.len(), 0);
2942        assert_eq!(readiness.failed.len(), 1);
2943        assert!(readiness.failed[0].explicit);
2944        assert_eq!(readiness.failed[0].bind_addr, addr);
2945    }
2946
2947    // ---------- Issue #663 — `--no-auth` / `--dev` ----------
2948
2949    // Env access in tests is process-global; serialise the two
2950    // `--no-auth` tests so the REDDB_USERNAME / REDDB_PASSWORD pair
2951    // one of them sets cannot leak into the other under cargo's
2952    // default parallel runner.
2953    fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2954        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2955        LOCK.get_or_init(|| std::sync::Mutex::new(()))
2956    }
2957
2958    fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
2959        ServerCommandConfig {
2960            path: None,
2961            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2962            router_bind_explicit: false,
2963            grpc_bind_addr: None,
2964            grpc_bind_explicit: false,
2965            grpc_tls_bind_addr: None,
2966            grpc_tls_cert: None,
2967            grpc_tls_key: None,
2968            grpc_tls_client_ca: None,
2969            http_bind_addr: None,
2970            http_bind_explicit: false,
2971            http_tls_bind_addr: None,
2972            http_tls_cert: None,
2973            http_tls_key: None,
2974            http_tls_client_ca: None,
2975            wire_bind_addr: None,
2976            wire_bind_explicit: false,
2977            wire_tls_bind_addr: None,
2978            wire_tls_cert: None,
2979            wire_tls_key: None,
2980            pg_bind_addr: None,
2981            create_if_missing: true,
2982            read_only: false,
2983            role: "standalone".to_string(),
2984            primary_addr: None,
2985            storage_profile: StorageProfileSelection::embedded_single_file(),
2986            // Operator-set `--vault`: `--no-auth` must override this
2987            // alongside REDDB_USERNAME/PASSWORD.
2988            vault: true,
2989            no_auth,
2990            workers: None,
2991            telemetry: None,
2992            http_limits_cli: crate::server::HttpLimitsCliInput::default(),
2993        }
2994    }
2995
2996    #[test]
2997    fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
2998        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2999        // Pre-existing env that *would* turn auth on if `--no-auth`
3000        // weren't the last word. The acceptance criterion is that
3001        // the flag wins over env.
3002        // SAFETY: serialised by `no_auth_env_lock` above.
3003        unsafe {
3004            std::env::set_var("REDDB_USERNAME", "admin");
3005            std::env::set_var("REDDB_PASSWORD", "hunter2");
3006        }
3007        let config = no_auth_test_config(true);
3008        let options = config.to_db_options().expect("to_db_options");
3009
3010        assert!(no_auth_active(&options), "metadata should be stamped");
3011        assert!(!options.auth.enabled, "auth.enabled must be forced off");
3012        assert!(
3013            !options.auth.require_auth,
3014            "require_auth must be forced off"
3015        );
3016        assert!(
3017            !options.auth.vault_enabled,
3018            "vault_enabled must be forced off (overrides --vault)"
3019        );
3020        assert_eq!(
3021            options.metadata.get(NO_AUTH_META).map(String::as_str),
3022            Some("true"),
3023        );
3024
3025        // SAFETY: serialised by `no_auth_env_lock` above.
3026        unsafe {
3027            std::env::remove_var("REDDB_USERNAME");
3028            std::env::remove_var("REDDB_PASSWORD");
3029        }
3030    }
3031
3032    #[test]
3033    fn default_behaviour_without_no_auth_flag_is_unchanged() {
3034        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3035        let config = no_auth_test_config(false);
3036        let options = config.to_db_options().expect("to_db_options");
3037
3038        assert!(
3039            !no_auth_active(&options),
3040            "default boot must not be marked no-auth"
3041        );
3042        assert!(
3043            options.metadata.get(NO_AUTH_META).is_none(),
3044            "metadata key must be absent when flag is off"
3045        );
3046        // `--vault` should still take effect when `--no-auth` is not set.
3047        assert!(options.auth.vault_enabled);
3048    }
3049
3050    #[test]
3051    fn no_auth_active_blocks_bootstrap_from_env() {
3052        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3053        // SAFETY: serialised by `no_auth_env_lock` above. The pair
3054        // would normally cause `AuthStore::bootstrap_from_env` to
3055        // create an admin; the boot pipeline must suppress that call
3056        // whenever `no_auth_active` is true.
3057        unsafe {
3058            std::env::set_var("REDDB_USERNAME", "admin");
3059            std::env::set_var("REDDB_PASSWORD", "hunter2");
3060        }
3061
3062        let options = no_auth_test_config(true)
3063            .to_db_options()
3064            .expect("to_db_options");
3065
3066        // Mirror the exact branch in `build_runtime_with_telemetry`:
3067        // build a non-vault AuthStore from `options.auth`, then call
3068        // `bootstrap_from_env` *only* when the no-auth gate is off.
3069        let auth_store = AuthStore::new(options.auth.clone());
3070        if !no_auth_active(&options) {
3071            auth_store.bootstrap_from_env();
3072        }
3073
3074        assert!(
3075            auth_store.needs_bootstrap(),
3076            "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3077        );
3078
3079        // SAFETY: serialised by `no_auth_env_lock` above.
3080        unsafe {
3081            std::env::remove_var("REDDB_USERNAME");
3082            std::env::remove_var("REDDB_PASSWORD");
3083        }
3084    }
3085
3086    // ---------- Issue #650 — bootstrap presets ----------
3087
3088    // Preset tests mutate process-global env (`REDDB_PRESET`,
3089    // `REDDB_USERNAME`, `REDDB_PASSWORD`) and the global tracing
3090    // subscriber. Share the no_auth lock so they don't race with each
3091    // other or with the --no-auth tests above.
3092    fn clear_preset_env() {
3093        // SAFETY: callers hold `no_auth_env_lock()`.
3094        unsafe {
3095            std::env::remove_var(PRESET_ENV);
3096            std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3097            std::env::remove_var("REDDB_USERNAME");
3098            std::env::remove_var("REDDB_PASSWORD");
3099            std::env::remove_var("REDDB_USERNAME_FILE");
3100            std::env::remove_var("REDDB_PASSWORD_FILE");
3101        }
3102    }
3103
3104    fn clear_backup_env() {
3105        // SAFETY: callers hold `no_auth_env_lock()`.
3106        unsafe {
3107            std::env::remove_var("REDDB_BACKUP_S3_ENDPOINT");
3108            std::env::remove_var("REDDB_BACKUP_S3_BUCKET");
3109            std::env::remove_var("REDDB_BACKUP_S3_PREFIX");
3110            std::env::remove_var("REDDB_BACKUP_S3_ACCESS_KEY_ID");
3111            std::env::remove_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY");
3112            std::env::remove_var("REDDB_BACKUP_S3_REGION");
3113            std::env::remove_var("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS");
3114            std::env::remove_var("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS");
3115            std::env::remove_var("REDDB_BACKUP_PAUSE_ON_LAG_SECS");
3116        }
3117    }
3118
3119    fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3120        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3121        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3122        (runtime, auth_store)
3123    }
3124
3125    #[test]
3126    fn simple_preset_is_default_and_persists_state() {
3127        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3128        clear_preset_env();
3129
3130        let (runtime, auth_store) = fresh_runtime_and_store();
3131        apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3132
3133        // No admin was created — `simple` is anonymous-friendly.
3134        assert!(
3135            auth_store.needs_bootstrap(),
3136            "simple preset must not create an admin"
3137        );
3138
3139        // Bootstrap state persisted so the next boot is a no-op.
3140        let store = runtime.db().store();
3141        let completed = store
3142            .get_config(BOOTSTRAP_COMPLETED_KEY)
3143            .expect("completed key persisted");
3144        assert!(matches!(
3145            completed,
3146            crate::storage::schema::Value::Boolean(true)
3147        ));
3148        let preset = store
3149            .get_config(BOOTSTRAP_PRESET_KEY)
3150            .expect("preset key persisted");
3151        match preset {
3152            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3153            other => panic!("expected Text(simple), got {other:?}"),
3154        }
3155        assert!(
3156            store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3157            "simple preset must not record a first admin"
3158        );
3159
3160        clear_preset_env();
3161    }
3162
3163    #[test]
3164    fn production_preset_creates_first_admin_with_allow_all_policy() {
3165        use crate::auth::policies::{EvalContext, ResourceRef};
3166        use crate::auth::UserId;
3167
3168        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3169        clear_preset_env();
3170        // SAFETY: env serialised by `no_auth_env_lock`.
3171        unsafe {
3172            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3173            std::env::set_var("REDDB_USERNAME", "ops");
3174            std::env::set_var("REDDB_PASSWORD", "hunter2");
3175        }
3176
3177        let (runtime, auth_store) = fresh_runtime_and_store();
3178        apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3179
3180        // Admin exists and the auth store is sealed.
3181        assert!(
3182            !auth_store.needs_bootstrap(),
3183            "production preset must seal bootstrap"
3184        );
3185        let users = auth_store.list_users();
3186        assert_eq!(users.len(), 1);
3187        let admin = &users[0];
3188        assert_eq!(admin.username, "ops");
3189        assert!(
3190            admin.system_owned,
3191            "first admin must be system-owned to pass the managed-config gate"
3192        );
3193        assert!(
3194            admin.tenant_id.is_none(),
3195            "first admin must be platform-scoped (tenant=None)"
3196        );
3197
3198        // Allow-all policy was installed and attached to the first admin.
3199        let policy = auth_store
3200            .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3201            .expect("allow-all policy installed");
3202        assert!(!policy.statements.is_empty());
3203
3204        // Verify policy-derived authority via the policy evaluator —
3205        // not a bypass. Any action on any resource must Allow.
3206        let actor = UserId::platform("ops");
3207        let ctx = EvalContext {
3208            principal_tenant: None,
3209            current_tenant: None,
3210            peer_ip: None,
3211            mfa_present: false,
3212            now_ms: 1_700_000_000_000,
3213            principal_is_admin_role: true,
3214            principal_is_system_owned: true,
3215            principal_is_platform_scoped: true,
3216        };
3217        let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3218        assert!(
3219            auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3220            "allow-all policy must grant arbitrary actions via the evaluator"
3221        );
3222
3223        // Persisted state records the first admin id.
3224        let store = runtime.db().store();
3225        match store
3226            .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3227            .expect("first_admin_id persisted")
3228        {
3229            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3230            other => panic!("expected Text(ops), got {other:?}"),
3231        }
3232        match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3233            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3234            other => panic!("expected Text(production), got {other:?}"),
3235        }
3236
3237        clear_preset_env();
3238    }
3239
3240    #[test]
3241    fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3242        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3243        clear_preset_env();
3244        // SAFETY: env serialised by `no_auth_env_lock`.
3245        unsafe {
3246            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3247        }
3248
3249        let (runtime, auth_store) = fresh_runtime_and_store();
3250        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3251
3252        assert!(runtime.query_audit().is_enabled());
3253        assert!(runtime.query_audit().rules().is_empty());
3254        assert!(
3255            runtime
3256                .db()
3257                .store()
3258                .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3259                .is_some(),
3260            "regulated preset should create the query-audit stream"
3261        );
3262
3263        runtime
3264            .execute_query("CREATE TABLE docs (id INT)")
3265            .expect("create table");
3266        runtime
3267            .execute_query("INSERT INTO docs (id) VALUES (1)")
3268            .expect("insert");
3269        runtime.execute_query("SELECT * FROM docs").expect("select");
3270        let rows = runtime
3271            .db()
3272            .store()
3273            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3274            .expect("query audit collection")
3275            .query_all(|_| true);
3276        assert!(
3277            rows.is_empty(),
3278            "regulated preset must not globally audit every query"
3279        );
3280
3281        clear_preset_env();
3282    }
3283
3284    #[test]
3285    fn managed_backup_env_rejects_primary_replica_single_file_storage() {
3286        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3287        clear_backup_env();
3288        // SAFETY: env serialised by `no_auth_env_lock`.
3289        unsafe {
3290            std::env::set_var("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.test");
3291            std::env::set_var("REDDB_BACKUP_S3_BUCKET", "reddb");
3292            std::env::set_var("REDDB_BACKUP_S3_PREFIX", "clusters/prod");
3293            std::env::set_var("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK");
3294            std::env::set_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK");
3295        }
3296
3297        let mut config = no_auth_test_config(false);
3298        config.role = "primary".to_string();
3299        config.storage_profile = crate::storage::StorageDeployPreset::PrimaryReplicaDev.selection();
3300
3301        let err = config.to_db_options().unwrap_err();
3302        assert!(err.contains("managed backup"), "got: {err}");
3303        assert!(err.contains("operational-directory"), "got: {err}");
3304
3305        clear_backup_env();
3306    }
3307
3308    #[test]
3309    fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3310        use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3311        use crate::auth::store::PrincipalRef;
3312        use crate::auth::{Role, UserId};
3313        use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3314        use crate::storage::schema::Value;
3315
3316        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3317        clear_preset_env();
3318        // SAFETY: env serialised by `no_auth_env_lock`.
3319        unsafe {
3320            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3321        }
3322
3323        let options = no_auth_test_config(false)
3324            .to_db_options()
3325            .expect("regulated options");
3326        assert!(
3327            options.control_events.compliance_mode,
3328            "regulated preset must enable fail-closed control evidence before runtime boot"
3329        );
3330        assert!(
3331            options.query_audit.enabled && options.query_audit.rules.is_empty(),
3332            "regulated preset must enable query-audit infrastructure without global rules"
3333        );
3334
3335        let runtime = RedDBRuntime::with_options(options).expect("runtime");
3336        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3337        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3338        runtime.set_auth_store(Arc::clone(&auth_store));
3339
3340        assert!(runtime.control_events_require_persistence());
3341        assert!(runtime.query_audit().is_enabled());
3342        assert!(runtime.query_audit().rules().is_empty());
3343        assert!(auth_store
3344            .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3345            .is_some());
3346
3347        let managed_policy = runtime
3348            .config_registry()
3349            .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3350            .expect("regulated managed policy registry entry");
3351        assert!(managed_policy.managed);
3352        assert_eq!(managed_policy.resource_type, "policy");
3353        assert!(
3354            runtime
3355                .config_registry()
3356                .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3357                .expect("regulated audit config namespace")
3358                .managed
3359        );
3360
3361        let registry_rows = runtime
3362            .execute_query(&format!(
3363                "SELECT id, managed FROM red.registry WHERE id = '{}'",
3364                REGULATED_PROTECT_MANAGED_POLICY
3365            ))
3366            .expect("red.registry query");
3367        assert_eq!(registry_rows.result.records.len(), 1);
3368        assert_eq!(
3369            registry_rows.result.records[0].get("managed"),
3370            Some(&Value::Boolean(true))
3371        );
3372
3373        let managed_policy_rows = runtime
3374            .execute_query(&format!(
3375                "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3376                REGULATED_PROTECT_MANAGED_POLICY
3377            ))
3378            .expect("red.managed_policies query");
3379        assert_eq!(managed_policy_rows.result.records.len(), 1);
3380
3381        let capability_rows = runtime
3382            .execute_query(
3383                "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3384            )
3385            .expect("red.control_capabilities query");
3386        assert_eq!(capability_rows.result.records.len(), 1);
3387
3388        auth_store
3389            .create_user("alice", "p", Role::Admin)
3390            .expect("create ordinary admin");
3391        let allow_all = Policy::from_json_str(
3392            r#"{
3393                "id": "alice-allow-all",
3394                "version": 1,
3395                "statements": [{
3396                    "effect": "allow",
3397                    "actions": ["*"],
3398                    "resources": ["*"]
3399                }]
3400            }"#,
3401        )
3402        .expect("allow-all policy");
3403        auth_store.put_policy(allow_all).expect("install allow-all");
3404        auth_store
3405            .attach_policy(
3406                PrincipalRef::User(UserId::platform("alice")),
3407                "alice-allow-all",
3408            )
3409            .expect("attach allow-all");
3410        let ctx = EvalContext {
3411            principal_tenant: None,
3412            current_tenant: None,
3413            peer_ip: None,
3414            mfa_present: false,
3415            now_ms: 1_700_000_000_000,
3416            principal_is_admin_role: true,
3417            principal_is_system_owned: false,
3418            principal_is_platform_scoped: true,
3419        };
3420        assert!(
3421            auth_store.check_policy_authz(
3422                &UserId::platform("alice"),
3423                "policy:drop",
3424                &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
3425                &ctx,
3426            ),
3427            "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
3428        );
3429
3430        set_current_auth_identity("alice".to_string(), Role::Admin);
3431        let denied = runtime.execute_query(&format!(
3432            "DROP POLICY '{}'",
3433            REGULATED_PROTECT_MANAGED_POLICY
3434        ));
3435        clear_current_auth_identity();
3436        let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
3437        assert!(
3438            err.to_string().contains("managed policy"),
3439            "error should name the managed guardrail: {err}"
3440        );
3441        assert!(
3442            auth_store
3443                .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3444                .is_some(),
3445            "denied mutation must leave managed policy installed"
3446        );
3447
3448        let denied_events = runtime
3449            .execute_query(&format!(
3450                "SELECT action, resource, outcome FROM red.control_events \
3451                 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
3452                REGULATED_PROTECT_MANAGED_POLICY
3453            ))
3454            .expect("red.control_events denied policy drop");
3455        assert_eq!(denied_events.result.records.len(), 1);
3456        assert_eq!(
3457            denied_events.result.records[0].get("outcome"),
3458            Some(&Value::text("denied"))
3459        );
3460
3461        set_current_auth_identity("alice".to_string(), Role::Admin);
3462        let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
3463        clear_current_auth_identity();
3464        let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
3465        assert!(
3466            err.to_string().contains("managed config"),
3467            "error should name the managed config guardrail: {err}"
3468        );
3469
3470        let denied_config_events = runtime
3471            .execute_query(
3472                "SELECT action, resource, outcome FROM red.control_events \
3473                 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
3474            )
3475            .expect("red.control_events denied config write");
3476        assert_eq!(denied_config_events.result.records.len(), 1);
3477        assert_eq!(
3478            denied_config_events.result.records[0].get("outcome"),
3479            Some(&Value::text("denied"))
3480        );
3481
3482        runtime
3483            .execute_query("CREATE TABLE regulated_docs (id INT)")
3484            .expect("create user table");
3485        runtime
3486            .execute_query("SELECT * FROM regulated_docs")
3487            .expect("select user table");
3488        let audit_rows = runtime
3489            .db()
3490            .store()
3491            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3492            .expect("query audit collection")
3493            .query_all(|_| true);
3494        assert!(
3495            audit_rows.is_empty(),
3496            "regulated preset must not globally audit data-plane queries"
3497        );
3498
3499        clear_preset_env();
3500    }
3501
3502    #[test]
3503    fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
3504        use crate::auth::policies::{EvalContext, ResourceRef};
3505        use crate::auth::UserId;
3506        use crate::storage::schema::Value;
3507
3508        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3509        clear_preset_env();
3510
3511        let manifest_dir = std::env::current_dir()
3512            .expect("current dir")
3513            .join(".red/tmp/bootstrap-manifest-tests");
3514        std::fs::create_dir_all(&manifest_dir).expect("create manifest test dir");
3515        let manifest_path = manifest_dir.join(format!(
3516            "reddb-bootstrap-manifest-{}-{}.json",
3517            std::process::id(),
3518            std::time::SystemTime::now()
3519                .duration_since(std::time::UNIX_EPOCH)
3520                .unwrap_or_default()
3521                .as_millis()
3522        ));
3523        std::fs::write(
3524            &manifest_path,
3525            r#"{
3526                "users": [
3527                    {
3528                        "username": "ops",
3529                        "password": "hunter2",
3530                        "role": "admin",
3531                        "system_owned": true
3532                    }
3533                ],
3534                "policies": [
3535                    {
3536                        "id": "bootstrap-registry-admin",
3537                        "version": 1,
3538                        "statements": [
3539                            {
3540                                "effect": "allow",
3541                                "actions": ["red.registry:*", "policy:*", "config:write", "select"],
3542                                "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
3543                            }
3544                        ]
3545                    }
3546                ],
3547                "managed_policies": [
3548                    {
3549                        "id": "managed-deny-drop",
3550                        "version": 1,
3551                        "statements": [
3552                            {
3553                                "effect": "deny",
3554                                "actions": ["policy:drop"],
3555                                "resources": ["policy:managed-deny-drop"]
3556                            }
3557                        ],
3558                        "required_resource": "policy:managed-deny-drop",
3559                        "evidence": "full"
3560                    }
3561                ],
3562                "attachments": [
3563                    {"user": "ops", "policy": "bootstrap-registry-admin"}
3564                ],
3565                "managed_config_namespaces": [
3566                    {
3567                        "id": "red.ai",
3568                        "required_action": "config:write",
3569                        "required_resource": "config:red.ai.*",
3570                        "evidence": "metadata"
3571                    }
3572                ],
3573                "config": [
3574                    {"key": "red.ai.default.provider", "value": "openai"},
3575                    {
3576                        "key": "red.ai.openai.default.secret_ref",
3577                        "secret_ref": {"collection": "red.vault", "key": "openai"}
3578                    }
3579                ],
3580                "actor": "ops"
3581            }"#,
3582        )
3583        .expect("write manifest");
3584        // SAFETY: env serialised by `no_auth_env_lock`.
3585        unsafe {
3586            std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
3587        }
3588
3589        let (runtime, auth_store) = fresh_runtime_and_store();
3590        apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
3591
3592        let users = auth_store.list_users();
3593        assert_eq!(users.len(), 1);
3594        assert_eq!(users[0].username, "ops");
3595        assert!(users[0].system_owned);
3596
3597        let actor = UserId::platform("ops");
3598        let ctx = EvalContext {
3599            principal_tenant: None,
3600            current_tenant: None,
3601            peer_ip: None,
3602            mfa_present: false,
3603            now_ms: 1_700_000_000_000,
3604            principal_is_admin_role: true,
3605            principal_is_system_owned: true,
3606            principal_is_platform_scoped: true,
3607        };
3608        // Manifest fixture pins a canonical data-plane read action.
3609        assert!(auth_store.check_policy_authz(
3610            &actor,
3611            "select",
3612            &ResourceRef::new("collection", "docs"),
3613            &ctx
3614        ));
3615
3616        let managed_policy = runtime
3617            .config_registry()
3618            .get_active("managed-deny-drop")
3619            .expect("managed policy registry entry");
3620        assert!(managed_policy.managed);
3621        assert_eq!(managed_policy.resource_type, "policy");
3622        let managed_config = runtime
3623            .config_registry()
3624            .get_active("red.ai")
3625            .expect("managed config namespace registry entry");
3626        assert!(managed_config.managed);
3627        assert_eq!(managed_config.resource_type, "config_namespace");
3628
3629        let store = runtime.db().store();
3630        match store
3631            .get_config("red.ai.default.provider")
3632            .expect("plain config persisted")
3633        {
3634            Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
3635            other => panic!("expected provider text, got {other:?}"),
3636        }
3637        let Value::Json(bytes) = store
3638            .get_config("red.ai.openai.default.secret_ref")
3639            .expect("secret ref config persisted")
3640        else {
3641            panic!("secret ref must be stored as structured JSON");
3642        };
3643        let reference: crate::serde_json::Value =
3644            crate::serde_json::from_slice(&bytes).expect("secret ref json");
3645        assert_eq!(
3646            reference.get("type").and_then(|v| v.as_str()),
3647            Some("secret_ref")
3648        );
3649        assert!(
3650            !String::from_utf8_lossy(&bytes).contains("hunter2"),
3651            "manifest password must not leak into secret ref config"
3652        );
3653
3654        let completed = store
3655            .get_config(BOOTSTRAP_COMPLETED_KEY)
3656            .expect("bootstrap completion persisted");
3657        assert!(matches!(completed, Value::Boolean(true)));
3658        assert!(
3659            store
3660                .get_config("system.bootstrap.manifest.registry_entries")
3661                .is_some(),
3662            "managed registry entries must be persisted internally"
3663        );
3664
3665        std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
3666        let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
3667        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
3668            .expect("registry rehydrates without manifest file");
3669        assert!(restored_registry.get_active("managed-deny-drop").is_some());
3670        assert!(restored_registry.get_active("red.ai").is_some());
3671
3672        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3673        apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
3674        assert!(fresh.needs_bootstrap());
3675
3676        clear_preset_env();
3677    }
3678
3679    #[test]
3680    fn production_preset_refuses_to_start_without_password() {
3681        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3682        clear_preset_env();
3683        // SAFETY: env serialised by `no_auth_env_lock`.
3684        unsafe {
3685            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3686            std::env::set_var("REDDB_USERNAME", "ops");
3687        }
3688
3689        let (runtime, auth_store) = fresh_runtime_and_store();
3690        let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
3691        assert!(
3692            err.contains("REDDB_PASSWORD"),
3693            "error must name the missing env: {err}"
3694        );
3695
3696        // Nothing was persisted; nothing was created.
3697        assert!(auth_store.needs_bootstrap());
3698        assert!(runtime
3699            .db()
3700            .store()
3701            .get_config(BOOTSTRAP_COMPLETED_KEY)
3702            .is_none());
3703
3704        clear_preset_env();
3705    }
3706
3707    #[test]
3708    fn re_running_production_after_first_boot_is_a_silent_skip() {
3709        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3710        clear_preset_env();
3711        // SAFETY: env serialised by `no_auth_env_lock`.
3712        unsafe {
3713            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3714            std::env::set_var("REDDB_USERNAME", "ops");
3715            std::env::set_var("REDDB_PASSWORD", "hunter2");
3716        }
3717
3718        let (runtime, auth_store) = fresh_runtime_and_store();
3719        apply_preset(&runtime, &auth_store).expect("first apply");
3720        assert_eq!(auth_store.list_users().len(), 1);
3721
3722        // Second invocation: same runtime/store, same env. Must be a
3723        // no-op — no error, no duplicate admin, no duplicate policy.
3724        // We do NOT reuse `auth_store` because production sealed it; the
3725        // idempotency hinge is the persisted config key, not the auth
3726        // store's in-memory seal. Build a fresh `AuthStore` as a restart
3727        // would and confirm `apply_preset` is a silent skip.
3728        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3729        apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
3730        assert!(
3731            fresh.needs_bootstrap(),
3732            "re-run must not create a second admin"
3733        );
3734        assert!(
3735            fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
3736            "re-run must not re-install the allow-all policy on the fresh store"
3737        );
3738
3739        clear_preset_env();
3740    }
3741
3742    #[test]
3743    fn unrecognised_preset_value_is_rejected() {
3744        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3745        clear_preset_env();
3746        // SAFETY: env serialised by `no_auth_env_lock`.
3747        unsafe {
3748            std::env::set_var(PRESET_ENV, "weird");
3749        }
3750
3751        let (runtime, auth_store) = fresh_runtime_and_store();
3752        let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
3753        assert!(err.contains("weird"), "error must echo the value: {err}");
3754        assert!(auth_store.needs_bootstrap());
3755
3756        clear_preset_env();
3757    }
3758
3759    #[test]
3760    fn no_auth_short_circuits_preset_entirely() {
3761        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3762        clear_preset_env();
3763        // SAFETY: env serialised by `no_auth_env_lock`. Production creds
3764        // are set but `--no-auth` must win — no admin, no bootstrap state.
3765        unsafe {
3766            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3767            std::env::set_var("REDDB_USERNAME", "ops");
3768            std::env::set_var("REDDB_PASSWORD", "hunter2");
3769        }
3770
3771        let options = no_auth_test_config(true)
3772            .to_db_options()
3773            .expect("to_db_options");
3774        assert!(no_auth_active(&options));
3775
3776        // Mirror `build_runtime_with_telemetry`: when no_auth_active,
3777        // `apply_preset` is never called.
3778        let (runtime, auth_store) = fresh_runtime_and_store();
3779        if !no_auth_active(&options) {
3780            apply_preset(&runtime, &auth_store).expect("would apply preset");
3781        }
3782
3783        assert!(
3784            auth_store.needs_bootstrap(),
3785            "--no-auth must prevent any admin creation"
3786        );
3787        assert!(
3788            runtime
3789                .db()
3790                .store()
3791                .get_config(BOOTSTRAP_COMPLETED_KEY)
3792                .is_none(),
3793            "--no-auth must skip bootstrap-state persistence"
3794        );
3795
3796        clear_preset_env();
3797    }
3798
3799    #[test]
3800    fn implicit_bind_collision_degrades() {
3801        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3802        let addr = held.local_addr().expect("held addr").to_string();
3803        let mut readiness = TransportReadiness::default();
3804
3805        let listener =
3806            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
3807
3808        assert!(listener.is_none());
3809        assert_eq!(readiness.active.len(), 0);
3810        assert_eq!(readiness.failed.len(), 1);
3811        assert!(!readiness.failed[0].explicit);
3812        assert_eq!(readiness.failed[0].bind_addr, addr);
3813    }
3814}