Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, InProcessRouterConfig};
12use crate::storage::StorageProfileSelection;
13use crate::{
14    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
15};
16
17pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
18
19/// Detect available CPU cores and suggest worker thread count.
20pub fn detect_runtime_config() -> RuntimeConfig {
21    let cpus = thread::available_parallelism()
22        .map(|n| n.get())
23        .unwrap_or(1);
24
25    // Reserve 1 core for OS, use the rest for workers (minimum 1)
26    let suggested_workers = cpus.saturating_sub(1).max(1);
27
28    RuntimeConfig {
29        available_cpus: cpus,
30        suggested_workers,
31        stack_size: 8 * 1024 * 1024, // 16MB default
32    }
33}
34
35#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37    pub available_cpus: usize,
38    pub suggested_workers: usize,
39    pub stack_size: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerTransport {
44    Grpc,
45    Http,
46    Wire,
47}
48
49impl ServerTransport {
50    pub const fn as_str(self) -> &'static str {
51        match self {
52            Self::Grpc => "gRPC",
53            Self::Http => "HTTP",
54            Self::Wire => "wire",
55        }
56    }
57
58    pub const fn default_bind_addr(self) -> &'static str {
59        match self {
60            Self::Grpc => "127.0.0.1:5555",
61            Self::Http => "127.0.0.1:5055",
62            Self::Wire => "127.0.0.1:5050",
63        }
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct ServerCommandConfig {
69    pub path: Option<PathBuf>,
70    pub router_bind_addr: Option<String>,
71    pub router_bind_explicit: bool,
72    pub grpc_bind_addr: Option<String>,
73    pub grpc_bind_explicit: bool,
74    /// TLS-encrypted gRPC bind address. Can run side-by-side with
75    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
76    /// stand alone for TLS-only deploys. Defaults to `None`.
77    pub grpc_tls_bind_addr: Option<String>,
78    /// Path to PEM-encoded gRPC server certificate. Resolved through
79    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
80    /// mounts). When `None` and dev-mode is enabled
81    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
82    /// cert and prints its SHA-256 fingerprint to stderr.
83    pub grpc_tls_cert: Option<PathBuf>,
84    /// Path to PEM-encoded gRPC server private key. Same env-var
85    /// pattern as `grpc_tls_cert`.
86    pub grpc_tls_key: Option<PathBuf>,
87    /// Optional path to a PEM bundle of trust anchors used to verify
88    /// client certificates. When set, the gRPC listener requires
89    /// every client to present a cert that chains to this CA — i.e.
90    /// mutual TLS. When unset, one-way TLS only.
91    pub grpc_tls_client_ca: Option<PathBuf>,
92    pub http_bind_addr: Option<String>,
93    pub http_bind_explicit: bool,
94    /// HTTPS bind address. When set, the HTTP server also serves a
95    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
96    /// run side by side (e.g. 8080 plain + 8443 TLS).
97    pub http_tls_bind_addr: Option<String>,
98    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
99    /// companion when not set explicitly.
100    pub http_tls_cert: Option<PathBuf>,
101    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
102    /// companion when not set explicitly.
103    pub http_tls_key: Option<PathBuf>,
104    /// Optional PEM CA bundle. When set, the HTTPS listener requires
105    /// every client to present a cert that chains to a CA in this
106    /// bundle (mTLS). When unset, plain server-side TLS only.
107    pub http_tls_client_ca: Option<PathBuf>,
108    pub wire_bind_addr: Option<String>,
109    pub wire_bind_explicit: bool,
110    /// TLS-encrypted wire protocol bind address
111    pub wire_tls_bind_addr: Option<String>,
112    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
113    pub wire_tls_cert: Option<PathBuf>,
114    /// Path to TLS key PEM
115    pub wire_tls_key: Option<PathBuf>,
116    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
117    /// When set the server accepts psql / JDBC / DBeaver clients on
118    /// this port via the v3 protocol. Defaults to None (disabled).
119    pub pg_bind_addr: Option<String>,
120    pub create_if_missing: bool,
121    pub read_only: bool,
122    pub role: String,
123    pub primary_addr: Option<String>,
124    pub storage_profile: StorageProfileSelection,
125    pub vault: bool,
126    /// Issue #663 — explicit `--no-auth` / `--dev` flag for local
127    /// no-password mode. When `true`, the boot pipeline force-disables
128    /// auth (`auth.enabled = false`, `auth.require_auth = false`,
129    /// `auth.vault_enabled = false`) and skips
130    /// `AuthStore::bootstrap_from_env`, so an explicit
131    /// `REDDB_USERNAME` + `REDDB_PASSWORD` pair cannot accidentally
132    /// turn anonymous access into a logged-in admin. An unmissable
133    /// warning is emitted at startup. Default `false` — the existing
134    /// implicit no-auth behaviour (just don't set the envs) is
135    /// unchanged.
136    pub no_auth: bool,
137    /// Override worker thread count (None = auto-detect from CPUs)
138    pub workers: Option<usize>,
139    /// Telemetry config (Phase 6 logging). `None` falls back to the
140    /// built-in default derived from `path` + stderr-only.
141    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
142    /// HTTP handler-pool knobs from the CLI layer (issue #574 slice 5).
143    /// Carries flag and env values; red_config and built-in defaults
144    /// are applied later by [`crate::server::http_limits::resolve_http_limits`]
145    /// once the runtime is open.
146    pub http_limits_cli: crate::server::HttpLimitsCliInput,
147    /// `red server --ui` (issue #1047, ADR 0051). When `true`, the HTTP
148    /// listener also serves the pinned red-ui bundle as static assets.
149    /// The bundle directory is resolved at boot from the local cache
150    /// (downloading on first use, ADR 0050) unless `ui_dir` overrides it.
151    pub ui: bool,
152    /// Explicit bundle directory for `--ui` (`--ui-dir <DIR>`). When set,
153    /// it is served as-is with no download — the seam tests and an
154    /// air-gapped deploy use this. `None` resolves the pinned bundle.
155    pub ui_dir: Option<PathBuf>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct TransportListenerState {
160    pub transport: String,
161    pub bind_addr: String,
162    pub explicit: bool,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct TransportListenerFailure {
167    pub transport: String,
168    pub bind_addr: String,
169    pub explicit: bool,
170    pub reason: String,
171}
172
173#[derive(Debug, Clone, Default, PartialEq, Eq)]
174pub struct TransportReadiness {
175    pub active: Vec<TransportListenerState>,
176    pub failed: Vec<TransportListenerFailure>,
177}
178
179impl TransportReadiness {
180    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
181        self.active.push(TransportListenerState {
182            transport: transport.to_string(),
183            bind_addr: bind_addr.to_string(),
184            explicit,
185        });
186    }
187
188    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
189        self.failed.push(TransportListenerFailure {
190            transport: transport.to_string(),
191            bind_addr: bind_addr.to_string(),
192            explicit,
193            reason,
194        });
195    }
196}
197
198#[derive(Debug, Clone)]
199pub struct SystemdServiceConfig {
200    pub service_name: String,
201    pub binary_path: PathBuf,
202    pub run_user: String,
203    pub run_group: String,
204    pub data_path: PathBuf,
205    pub router_bind_addr: Option<String>,
206    pub grpc_bind_addr: Option<String>,
207    pub http_bind_addr: Option<String>,
208}
209
210impl SystemdServiceConfig {
211    pub fn data_dir(&self) -> PathBuf {
212        self.data_path
213            .parent()
214            .map(PathBuf::from)
215            .unwrap_or_else(|| PathBuf::from("."))
216    }
217
218    pub fn unit_path(&self) -> PathBuf {
219        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
220    }
221}
222
223/// Build a sane default `TelemetryConfig` from a server path when the
224/// caller didn't set one explicitly. Writes rotating logs into the
225/// parent directory of the DB file (or `./logs` for in-memory /
226/// pathless runs). Level defaults to `info`, pretty stderr format.
227pub fn default_telemetry_for_path(
228    path: Option<&std::path::Path>,
229) -> crate::telemetry::TelemetryConfig {
230    let log_dir = match path {
231        Some(p) => p
232            .parent()
233            .map(|parent| parent.join("logs"))
234            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
235        None => None, // in-memory — no file, stderr-only
236    };
237    crate::telemetry::TelemetryConfig {
238        log_dir,
239        file_prefix: "reddb.log".to_string(),
240        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
241        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
242            crate::telemetry::LogFormat::Pretty
243        } else {
244            crate::telemetry::LogFormat::Json
245        },
246        rotation_keep_days: 14,
247        service_name: "reddb",
248        // Implicit defaults — no CLI flag has claimed these values yet.
249        level_explicit: false,
250        format_explicit: false,
251        rotation_keep_days_explicit: false,
252        file_prefix_explicit: false,
253        log_dir_explicit: false,
254        log_file_disabled: false,
255    }
256}
257
258/// Metadata key used to thread the parsed backup config from
259/// `to_db_options` down to runner threads. The runner reads it back
260/// (via `runner_backup_intervals`) to spawn the periodic checkpointer
261/// and WAL-flush tasks. Threading through `metadata` avoids extending
262/// `RedDBOptions` with a public field that has no meaning for
263/// library consumers.
264const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
265const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
266const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
267/// Issue #519 — threaded through `metadata` like the existing interval
268/// values. `0` (default) means "feature disabled" and the runner skips
269/// the lag-monitor wiring entirely.
270const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
271
272/// Issue #663 — metadata key set by `to_db_options` when the operator
273/// passes `--no-auth` / `--dev`. Read in `build_runtime_with_telemetry`
274/// to (a) skip `AuthStore::bootstrap_from_env` (so a stray
275/// `REDDB_USERNAME`/`REDDB_PASSWORD` cannot auto-create an admin) and
276/// (b) emit the loud "auth disabled" warning. Threaded via `metadata`
277/// — rather than a public `RedDBOptions` field — to keep the flag a
278/// CLI/boot concern with no meaning for library consumers.
279pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
280
281/// Returns `true` when `--no-auth` / `--dev` was active for this boot,
282/// i.e. when `to_db_options` stamped [`NO_AUTH_META`] on `options.metadata`.
283pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
284    options
285        .metadata
286        .get(NO_AUTH_META)
287        .map(|v| v == "true")
288        .unwrap_or(false)
289}
290
291/// Loud, unmissable warning printed when the operator opts into
292/// anonymous access via `--no-auth` / `--dev`. Goes to stderr (always
293/// visible at startup) **and** the tracing layer (captured by file
294/// logs once telemetry is wired).
295const NO_AUTH_WARNING: &str =
296    "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
297
298impl ServerCommandConfig {
299    fn to_db_options(&self) -> Result<RedDBOptions, String> {
300        let mut options = match &self.path {
301            Some(path) => RedDBOptions::persistent(path),
302            None => RedDBOptions::in_memory(),
303        };
304
305        options.mode = StorageMode::Persistent;
306        options.create_if_missing = self.create_if_missing;
307        // PLAN.md Phase 4.3 — read_only resolution priority:
308        //   1. CLI flag (`--readonly`) — explicit operator intent.
309        //   2. `RED_READONLY=true` env — orchestrator override.
310        //   3. Persisted `<data>/.runtime-state.json` from a prior
311        //      `POST /admin/readonly` — survives restart.
312        //   4. Default `false`.
313        options.read_only = self.read_only
314            || env_nonempty("RED_READONLY")
315                .or_else(|| env_nonempty("REDDB_READONLY"))
316                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
317                .unwrap_or(false)
318            || self.path.as_ref().is_some_and(|data_path| {
319                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
320                    data_path,
321                ))
322                .unwrap_or(false)
323            });
324
325        options.replication = match self.role.as_str() {
326            "primary" => ReplicationConfig::primary(),
327            "replica" => {
328                let primary_addr = self
329                    .primary_addr
330                    .clone()
331                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
332                // Public-mutation rejection on replicas is enforced by
333                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
334                // Leaving `options.read_only = false` keeps the pager
335                // writable so the internal logical-WAL apply path can
336                // ingest records from the primary; WriteGate ensures no
337                // client request reaches storage.
338                ReplicationConfig::replica(primary_addr)
339            }
340            _ => ReplicationConfig::standalone(),
341        };
342        options.storage_profile = self.storage_profile.validate()?;
343
344        if self.vault {
345            options.auth.vault_enabled = true;
346        }
347
348        // Issue #663 — `--no-auth` / `--dev` is the last word on auth
349        // for this boot: force every auth knob off, regardless of any
350        // env-derived config (`--vault`, `REDDB_USERNAME`/`PASSWORD`,
351        // `REDDB_VAULT_KEY`, OAuth, cert) the operator may also have
352        // set. We *also* stamp [`NO_AUTH_META`] so the auth-store
353        // builder downstream knows to skip `bootstrap_from_env`
354        // (which would otherwise auto-create an admin from
355        // `REDDB_USERNAME`/`REDDB_PASSWORD` even with auth disabled,
356        // a footgun for the local-dev workflow this flag exists to
357        // support).
358        if self.no_auth {
359            options.auth.enabled = false;
360            options.auth.require_auth = false;
361            options.auth.vault_enabled = false;
362            options
363                .metadata
364                .insert(NO_AUTH_META.to_string(), "true".to_string());
365        }
366
367        // Issue #652 — Control Event Ledger Compliance Mode.
368        // `REDDB_COMPLIANCE_MODE=true|1|yes|on` makes the producer
369        // slices (652b/c/d) fail-closed on ledger persistence
370        // failures. Default `false` — log-and-continue on emit error.
371        if let Some(raw) = env_nonempty("REDDB_COMPLIANCE_MODE") {
372            options.control_events.compliance_mode = matches!(
373                raw.to_ascii_lowercase().as_str(),
374                "1" | "true" | "yes" | "on"
375            );
376        }
377        if env_nonempty(PRESET_ENV).is_some_and(|s| s.trim() == PRESET_REGULATED) {
378            options.control_events.compliance_mode = true;
379            options.query_audit = crate::runtime::query_audit::QueryAuditConfig::regulated();
380        }
381
382        // Issue #517 — canonical `REDDB_BACKUP_*` contract takes
383        // precedence. On Err, surface the partial-config message so
384        // boot exits non-zero with a clear operator message. On
385        // Ok(None), fall through to the legacy backend-from-env path.
386        match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
387            Err(msg) => {
388                return Err(format!("backup bootstrap: {msg}"));
389            }
390            Ok(Some(cfg)) => {
391                apply_backup_config(&mut options, &cfg);
392            }
393            Ok(None) => {
394                configure_remote_backend_from_env(&mut options);
395            }
396        }
397
398        if options.remote_backend.is_some()
399            || options
400                .metadata
401                .contains_key(BACKUP_INTERVAL_META_CHECKPOINT)
402        {
403            let mut selection = options.storage_profile;
404            selection.managed_backup = true;
405            options.storage_profile = selection.validate()?;
406        }
407
408        Ok(options)
409    }
410
411    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
412        let mut transports = Vec::with_capacity(3);
413        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
414            transports.push(ServerTransport::Grpc);
415        }
416        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
417            transports.push(ServerTransport::Http);
418        }
419        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
420            transports.push(ServerTransport::Wire);
421        }
422        transports
423    }
424}
425
426/// Read an env var, treating empty / whitespace-only as `None`.
427/// Honors the `<NAME>_FILE` convention. Re-exports the shared
428/// `crate::utils::env_with_file_fallback` helper so call sites in
429/// this module can keep their short local name.
430fn env_nonempty(name: &str) -> Option<String> {
431    crate::utils::env_with_file_fallback(name)
432}
433
434fn env_truthy(name: &str) -> bool {
435    env_nonempty(name)
436        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
437        .unwrap_or(false)
438}
439
440/// Apply a parsed [`BackupConfig`] to `options`. Wires the S3
441/// backend via `with_remote_backend` + `with_atomic_remote_backend`
442/// when the `backend-s3` feature is on, stashes intervals + backend
443/// kind in `metadata` so the runner can spawn the periodic tasks,
444/// and emits the startup INFO log required by #517.
445fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
446    let endpoint_host = endpoint_host(&cfg.endpoint);
447
448    options.metadata.insert(
449        BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
450        cfg.checkpoint_interval_secs.to_string(),
451    );
452    options.metadata.insert(
453        BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
454        cfg.wal_flush_interval_secs.to_string(),
455    );
456    options
457        .metadata
458        .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
459    options.metadata.insert(
460        BACKUP_PAUSE_ON_LAG_META.to_string(),
461        cfg.pause_on_lag_secs.to_string(),
462    );
463
464    #[cfg(feature = "backend-s3")]
465    {
466        let s3_cfg = crate::storage::backend::S3Config {
467            endpoint: cfg.endpoint.clone(),
468            bucket: cfg.bucket.clone(),
469            key_prefix: cfg.prefix.clone(),
470            access_key: cfg.access_key_id.clone(),
471            secret_key: cfg.secret_access_key.clone(),
472            region: cfg.region.clone(),
473            path_style: true,
474        };
475        let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
476        options.remote_backend = Some(backend.clone());
477        options.remote_backend_atomic = Some(backend);
478        // Use the operator-supplied prefix as the snapshot key root.
479        // The existing helpers (`default_snapshot_prefix`,
480        // `default_wal_archive_prefix`) derive sub-prefixes from the
481        // parent of `remote_key`.
482        let trimmed = cfg.prefix.trim_end_matches('/');
483        options.remote_key = Some(reddb_file::remote_database_key(trimmed));
484
485        tracing::info!(
486            backend = "s3",
487            endpoint = %endpoint_host,
488            bucket = %cfg.bucket,
489            prefix = %cfg.prefix,
490            checkpoint_interval_secs = cfg.checkpoint_interval_secs,
491            wal_flush_interval_secs = cfg.wal_flush_interval_secs,
492            "backup backend configured from REDDB_BACKUP_* env"
493        );
494    }
495
496    #[cfg(not(feature = "backend-s3"))]
497    {
498        tracing::warn!(
499            backend = "s3",
500            endpoint = %endpoint_host,
501            bucket = %cfg.bucket,
502            prefix = %cfg.prefix,
503            "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
504             backend wiring skipped (archiver/checkpointer also disabled)"
505        );
506    }
507}
508
509fn endpoint_host(endpoint: &str) -> &str {
510    let after_scheme = endpoint
511        .split_once("://")
512        .map(|(_, r)| r)
513        .unwrap_or(endpoint);
514    after_scheme.split('/').next().unwrap_or(after_scheme)
515}
516
517/// If `options` carry backup-task intervals threaded in via
518/// [`apply_backup_config`], spawn periodic checkpointer + WAL-flush
519/// tasks against `runtime`. Returns a `BackupTasksHandle` that
520/// stops the tasks when dropped; runners keep it alive for the
521/// server lifetime.
522fn spawn_backup_tasks_if_configured(
523    options: &RedDBOptions,
524    runtime: &RedDBRuntime,
525) -> Option<BackupTasksHandle> {
526    let checkpoint_secs: u64 = options
527        .metadata
528        .get(BACKUP_INTERVAL_META_CHECKPOINT)?
529        .parse()
530        .ok()?;
531    let wal_secs: u64 = options
532        .metadata
533        .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
534        .parse()
535        .ok()?;
536    // Issue #519 — opt-in graceful read-only when remote archive lag
537    // exceeds the threshold. `0` (default) keeps legacy behaviour.
538    let pause_on_lag_secs: u64 = options
539        .metadata
540        .get(BACKUP_PAUSE_ON_LAG_META)
541        .and_then(|raw| raw.parse().ok())
542        .unwrap_or(0);
543    options.remote_backend.as_ref()?;
544
545    let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
546
547    // Stamp the gate with the threshold + a "now" baseline so the
548    // first auto-pause can only fire after `pause_on_lag_secs` of
549    // archive silence. The poller below re-evaluates on the same
550    // clock the archive-task wrapper uses.
551    if pause_on_lag_secs > 0 {
552        let now_ms = std::time::SystemTime::now()
553            .duration_since(std::time::UNIX_EPOCH)
554            .map(|d| d.as_millis() as u64)
555            .unwrap_or(0);
556        runtime
557            .write_gate()
558            .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
559        tracing::info!(
560            pause_on_lag_secs,
561            "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
562        );
563    }
564
565    let checkpoint_handle = {
566        let stop = Arc::clone(&stop);
567        let runtime = runtime.clone();
568        let interval = Duration::from_secs(checkpoint_secs);
569        thread::Builder::new()
570            .name("red-checkpointer".into())
571            .spawn(move || {
572                periodic_loop(stop, interval, move || {
573                    if let Err(err) = runtime.checkpoint() {
574                        tracing::warn!(error = %err, "periodic checkpoint failed");
575                    }
576                })
577            })
578            .ok()
579    };
580
581    let archiver_handle = {
582        let stop = Arc::clone(&stop);
583        let runtime = runtime.clone();
584        let interval = Duration::from_secs(wal_secs);
585        let lag_enabled = pause_on_lag_secs > 0;
586        thread::Builder::new()
587            .name("red-wal-archiver".into())
588            .spawn(move || {
589                periodic_loop(stop, interval, move || match runtime.trigger_backup() {
590                    Ok(_) if lag_enabled => {
591                        let now_ms = std::time::SystemTime::now()
592                            .duration_since(std::time::UNIX_EPOCH)
593                            .map(|d| d.as_millis() as u64)
594                            .unwrap_or(0);
595                        runtime.write_gate().record_archive_success(now_ms);
596                        // Same-tick re-evaluation: catching up while
597                        // already auto-paused must auto-resume without
598                        // waiting for the poller's cadence.
599                        runtime.write_gate().evaluate_archive_lag(now_ms);
600                    }
601                    Ok(_) => {}
602                    Err(err) => {
603                        tracing::warn!(error = %err, "periodic WAL archive/backup failed");
604                    }
605                })
606            })
607            .ok()
608    };
609
610    // Issue #519 — lag poller. Wakes on a short cadence so a frozen
611    // archiver (the worst case) still flips the gate within ~5s of
612    // crossing the threshold, instead of waiting up to a full
613    // `wal_secs` for the next archive attempt that may never come.
614    let lag_monitor_handle = if pause_on_lag_secs > 0 {
615        let stop = Arc::clone(&stop);
616        let runtime = runtime.clone();
617        // 5s is short enough that the threshold is honoured tightly
618        // and long enough that the atomic loads stay invisible at the
619        // process level.
620        let interval = Duration::from_secs(5);
621        thread::Builder::new()
622            .name("red-archive-lag-monitor".into())
623            .spawn(move || {
624                periodic_loop(stop, interval, move || {
625                    let now_ms = std::time::SystemTime::now()
626                        .duration_since(std::time::UNIX_EPOCH)
627                        .map(|d| d.as_millis() as u64)
628                        .unwrap_or(0);
629                    let was_paused = runtime.write_gate().is_auto_paused();
630                    let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
631                    if now_paused && !was_paused {
632                        tracing::warn!(
633                            pause_on_lag_secs,
634                            last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
635                            "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
636                        );
637                    } else if !now_paused && was_paused {
638                        tracing::info!(
639                            "WAL archive caught up — exiting graceful read-only mode (issue #519)"
640                        );
641                    }
642                })
643            })
644            .ok()
645    } else {
646        None
647    };
648
649    tracing::info!(
650        checkpoint_interval_secs = checkpoint_secs,
651        wal_flush_interval_secs = wal_secs,
652        "backup tasks spawned (checkpointer + WAL archiver)"
653    );
654
655    Some(BackupTasksHandle {
656        stop,
657        _checkpoint_handle: checkpoint_handle,
658        _archiver_handle: archiver_handle,
659        _lag_monitor_handle: lag_monitor_handle,
660    })
661}
662
663/// Shutdown handle for the periodic checkpointer + archiver tasks.
664/// Drop signals both loops to exit on their next wake.
665pub struct BackupTasksHandle {
666    stop: Arc<std::sync::atomic::AtomicBool>,
667    _checkpoint_handle: Option<thread::JoinHandle<()>>,
668    _archiver_handle: Option<thread::JoinHandle<()>>,
669    /// Issue #519 — periodic archive-lag poller, only spawned when
670    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS > 0`.
671    _lag_monitor_handle: Option<thread::JoinHandle<()>>,
672}
673
674impl Drop for BackupTasksHandle {
675    fn drop(&mut self) {
676        self.stop.store(true, std::sync::atomic::Ordering::Release);
677    }
678}
679
680fn periodic_loop<F: FnMut()>(
681    stop: Arc<std::sync::atomic::AtomicBool>,
682    interval: Duration,
683    mut tick: F,
684) {
685    // Wake on a short cadence so shutdown is responsive even when the
686    // operator-configured interval is large (e.g. 1h checkpoint).
687    let wake = Duration::from_secs(1);
688    let mut elapsed = Duration::ZERO;
689    while !stop.load(std::sync::atomic::Ordering::Acquire) {
690        thread::sleep(wake);
691        elapsed += wake;
692        if elapsed >= interval {
693            tick();
694            elapsed = Duration::ZERO;
695        }
696    }
697}
698
699fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
700    // PLAN.md (cloud-agnostic) — prefer the new spelling
701    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
702    // existing dev installs. `none` (default) means standalone — no
703    // remote backend, valid for development and on-prem without
704    // remote.
705    let backend = env_nonempty("RED_BACKEND")
706        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
707        .unwrap_or_else(|| "none".to_string())
708        .to_ascii_lowercase();
709
710    match backend.as_str() {
711        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
712        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
713        // IDrive, Storj. The `path_style` toggle in S3Config picks
714        // the right addressing for self-hosted vs hosted.
715        "s3" | "minio" | "r2" => {
716            #[cfg(feature = "backend-s3")]
717            {
718                if let Some(config) = s3_config_from_env() {
719                    let remote_key = env_nonempty("RED_REMOTE_KEY")
720                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
721                        .unwrap_or_else(|| reddb_file::remote_database_key("clusters/dev"));
722                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
723                    options.remote_backend = Some(backend.clone());
724                    options.remote_backend_atomic = Some(backend);
725                    options.remote_key = Some(remote_key);
726                }
727            }
728            #[cfg(not(feature = "backend-s3"))]
729            {
730                tracing::warn!(
731                    backend = %backend,
732                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
733                );
734            }
735        }
736        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
737        // calls it `fs`; legacy code shipped it as `local`. Both
738        // names map to LocalBackend, with the remote_key derived
739        // from `RED_FS_PATH` + a per-database suffix when provided.
740        "fs" | "local" => {
741            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
742            let remote_key = match (
743                base_path,
744                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
745            ) {
746                (Some(base), Some(rel)) => Some(format!(
747                    "{}/{}",
748                    base.trim_end_matches('/'),
749                    rel.trim_start_matches('/')
750                )),
751                (Some(base), None) => Some(reddb_file::remote_database_key(&format!(
752                    "{}/clusters/dev",
753                    base.trim_end_matches('/')
754                ))),
755                (None, Some(rel)) => Some(rel),
756                (None, None) => None,
757            };
758            if let Some(key) = remote_key {
759                let backend = Arc::new(crate::storage::backend::LocalBackend);
760                options.remote_backend = Some(backend.clone());
761                options.remote_backend_atomic = Some(backend);
762                options.remote_key = Some(key);
763            }
764        }
765        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
766        // portability: any service exposing PUT/GET/DELETE serves
767        // as a backup target. Optional auth via *_FILE secret
768        // path keeps the token out of the env.
769        "http" => {
770            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
771                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
772            {
773                Some(u) => u,
774                None => {
775                    tracing::warn!(
776                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
777                    );
778                    return;
779                }
780            };
781            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
782                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
783                .unwrap_or_default();
784            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
785                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
786            {
787                std::fs::read_to_string(&path)
788                    .ok()
789                    .map(|s| s.trim().to_string())
790                    .filter(|s| !s.is_empty())
791            } else {
792                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
793                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
794            };
795
796            let mut config =
797                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
798            if let Some(auth) = auth_header {
799                config = config.with_auth_header(auth);
800            }
801            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
802                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
803                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
804            config = config.with_conditional_writes(conditional_writes);
805            // Always populate the snapshot-transport handle. When the
806            // operator confirmed CAS support, also populate the atomic
807            // handle via AtomicHttpBackend — without that flag,
808            // LeaseStore must remain unreachable on this backend.
809            if conditional_writes {
810                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
811                    Ok(atomic) => {
812                        let atomic_arc = Arc::new(atomic);
813                        options.remote_backend = Some(atomic_arc.clone());
814                        options.remote_backend_atomic = Some(atomic_arc);
815                    }
816                    Err(err) => {
817                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
818                        options.remote_backend =
819                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
820                    }
821                }
822            } else {
823                options.remote_backend =
824                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
825            }
826            options.remote_key = env_nonempty("RED_REMOTE_KEY")
827                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
828                .or_else(|| Some(reddb_file::remote_database_key("clusters/dev")));
829        }
830        // `none` is the explicit standalone — no remote, no backup
831        // pipeline. Boot never blocks on network reachability.
832        "none" | "" => {}
833        other => {
834            tracing::warn!(
835                backend = %other,
836                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
837            );
838        }
839    }
840}
841
842/// Resolve a value from env, accepting both the cloud-agnostic
843/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
844/// kept for existing dev installs. The new spelling wins; the
845/// legacy spelling is read with a warning hint in callers' logs.
846#[cfg(feature = "backend-s3")]
847fn env_s3(suffix: &str) -> Option<String> {
848    env_nonempty(&format!("RED_S3_{suffix}"))
849        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
850}
851
852/// Read a secret value from either the literal env var or a file
853/// path supplied via `*_FILE` (PLAN.md spec — compatible with
854/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
855/// secrets). The `_FILE` variant wins so an operator can set it to
856/// override the inline value without touching the inline env.
857#[cfg(feature = "backend-s3")]
858fn env_s3_secret(suffix: &str) -> Option<String> {
859    let file_key_red = format!("RED_S3_{suffix}_FILE");
860    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
861    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
862        return std::fs::read_to_string(&path)
863            .ok()
864            .map(|s| s.trim().to_string())
865            .filter(|s| !s.is_empty());
866    }
867    env_s3(suffix)
868}
869
870#[cfg(feature = "backend-s3")]
871fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
872    let endpoint = env_s3("ENDPOINT")?;
873    let bucket = env_s3("BUCKET")?;
874    let access_key = env_s3_secret("ACCESS_KEY")?;
875    let secret_key = env_s3_secret("SECRET_KEY")?;
876    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
877    let key_prefix = env_s3("KEY_PREFIX")
878        .or_else(|| env_s3("PREFIX"))
879        .unwrap_or_default();
880    let path_style = env_s3("PATH_STYLE")
881        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
882        .unwrap_or(true);
883    Some(crate::storage::backend::S3Config {
884        endpoint,
885        bucket,
886        key_prefix,
887        access_key,
888        secret_key,
889        region,
890        path_style,
891    })
892}
893
894pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
895    let data_dir = config.data_dir();
896    let exec_start = render_systemd_exec_start(config);
897    format!(
898        "[Unit]\n\
899Description=RedDB unified database service\n\
900After=network-online.target\n\
901Wants=network-online.target\n\
902\n\
903[Service]\n\
904Type=simple\n\
905User={user}\n\
906Group={group}\n\
907WorkingDirectory={workdir}\n\
908ExecStart={exec_start}\n\
909Restart=always\n\
910RestartSec=2\n\
911LimitSTACK=16M\n\
912NoNewPrivileges=true\n\
913PrivateTmp=true\n\
914ProtectSystem=strict\n\
915ProtectHome=true\n\
916ProtectControlGroups=true\n\
917ProtectKernelTunables=true\n\
918ProtectKernelModules=true\n\
919RestrictNamespaces=true\n\
920LockPersonality=true\n\
921MemoryDenyWriteExecute=true\n\
922ReadWritePaths={workdir}\n\
923\n\
924[Install]\n\
925WantedBy=multi-user.target\n",
926        user = config.run_user,
927        group = config.run_group,
928        workdir = data_dir.display(),
929        exec_start = exec_start,
930    )
931}
932
933/// Install a systemd unit + start the service.
934///
935/// Linux-only. The helper shells out to `systemctl`, `useradd`,
936/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
937/// Windows or macOS. The Windows/macOS branch returns a hard error so
938/// callers (the CLI) surface a clear message instead of a confusing
939/// syscall failure. A proper Windows-service equivalent (sc.exe /
940/// NSSM) is a Phase 3.6 follow-up.
941#[cfg(target_os = "linux")]
942pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
943    ensure_root()?;
944    ensure_command_available("systemctl")?;
945    ensure_command_available("getent")?;
946    ensure_command_available("groupadd")?;
947    ensure_command_available("useradd")?;
948    ensure_command_available("install")?;
949    ensure_executable(&config.binary_path)?;
950
951    if !command_success("getent", ["group", config.run_group.as_str()])? {
952        run_command("groupadd", ["--system", config.run_group.as_str()])?;
953    }
954
955    if !command_success("id", ["-u", config.run_user.as_str()])? {
956        let data_dir = config.data_dir();
957        run_command(
958            "useradd",
959            [
960                "--system",
961                "--gid",
962                config.run_group.as_str(),
963                "--home-dir",
964                data_dir.to_string_lossy().as_ref(),
965                "--shell",
966                "/usr/sbin/nologin",
967                config.run_user.as_str(),
968            ],
969        )?;
970    }
971
972    let data_dir = config.data_dir();
973    run_command(
974        "install",
975        [
976            "-d",
977            "-o",
978            config.run_user.as_str(),
979            "-g",
980            config.run_group.as_str(),
981            "-m",
982            "0750",
983            data_dir.to_string_lossy().as_ref(),
984        ],
985    )?;
986
987    std::fs::write(config.unit_path(), render_systemd_unit(config))
988        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
989
990    run_command("systemctl", ["daemon-reload"])?;
991    run_command(
992        "systemctl",
993        [
994            "enable",
995            "--now",
996            format!("{}.service", config.service_name).as_str(),
997        ],
998    )?;
999
1000    Ok(())
1001}
1002
1003/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
1004/// identical so callers compile on every platform; surface a clear
1005/// error at runtime. Windows/macOS service-manager integration is a
1006/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
1007#[cfg(not(target_os = "linux"))]
1008pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
1009    Err("systemd install is Linux-only — use sc.exe (Windows) or \
1010         launchd (macOS) to install the service manually using the \
1011         unit printed by `red service print-unit`"
1012        .to_string())
1013}
1014
1015#[cfg(target_os = "linux")]
1016fn ensure_root() -> Result<(), String> {
1017    let output = Command::new("id")
1018        .arg("-u")
1019        .output()
1020        .map_err(|err| format!("failed to determine current uid: {err}"))?;
1021    if !output.status.success() {
1022        return Err("failed to determine current uid".to_string());
1023    }
1024    let uid = String::from_utf8_lossy(&output.stdout);
1025    if uid.trim() != "0" {
1026        return Err("run this command as root (sudo)".to_string());
1027    }
1028    Ok(())
1029}
1030
1031#[cfg(target_os = "linux")]
1032fn ensure_command_available(command: &str) -> Result<(), String> {
1033    let status = Command::new("sh")
1034        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
1035        .status()
1036        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1037    if status.success() {
1038        Ok(())
1039    } else {
1040        Err(format!("required command not found: {command}"))
1041    }
1042}
1043
1044#[cfg(target_os = "linux")]
1045fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1046    let metadata = std::fs::metadata(path)
1047        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1048    #[cfg(unix)]
1049    {
1050        use std::os::unix::fs::PermissionsExt;
1051        if metadata.permissions().mode() & 0o111 == 0 {
1052            return Err(format!("binary is not executable: {}", path.display()));
1053        }
1054    }
1055    #[cfg(not(unix))]
1056    {
1057        if !metadata.is_file() {
1058            return Err(format!("binary is not a file: {}", path.display()));
1059        }
1060    }
1061    Ok(())
1062}
1063
1064#[cfg(target_os = "linux")]
1065fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1066    Command::new(program)
1067        .args(args)
1068        .status()
1069        .map(|status| status.success())
1070        .map_err(|err| format!("failed to run {program}: {err}"))
1071}
1072
1073#[cfg(target_os = "linux")]
1074fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1075    let output = Command::new(program)
1076        .args(args)
1077        .output()
1078        .map_err(|err| format!("failed to run {program}: {err}"))?;
1079    if output.status.success() {
1080        return Ok(());
1081    }
1082
1083    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1084    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1085    let detail = if !stderr.is_empty() {
1086        stderr
1087    } else if !stdout.is_empty() {
1088        stdout
1089    } else {
1090        format!("exit status {}", output.status)
1091    };
1092    Err(format!("{program} failed: {detail}"))
1093}
1094
1095pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1096    let has_any = config.router_bind_addr.is_some()
1097        || config.grpc_bind_addr.is_some()
1098        || config.http_bind_addr.is_some()
1099        || config.wire_bind_addr.is_some()
1100        || config.pg_bind_addr.is_some();
1101    if !has_any {
1102        return Err("at least one server bind address must be configured".into());
1103    }
1104    let thread_name = if config.router_bind_addr.is_some() {
1105        "red-server-router"
1106    } else {
1107        match (
1108            config.grpc_bind_addr.is_some(),
1109            config.http_bind_addr.is_some(),
1110        ) {
1111            (true, true) => "red-server-dual",
1112            (true, false) => "red-server-grpc",
1113            (false, true) => "red-server-http",
1114            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1115            (false, false) => "red-server-pg-wire",
1116        }
1117    };
1118
1119    let handle = thread::Builder::new()
1120        .name(thread_name.into())
1121        .stack_size(8 * 1024 * 1024)
1122        .spawn(move || run_configured_servers(config))
1123        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1124
1125    match handle.join() {
1126        Ok(result) => result,
1127        Err(_) => Err("server thread panicked".to_string()),
1128    }
1129}
1130
1131fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1132    let mut parts = vec![
1133        config.binary_path.display().to_string(),
1134        "server".to_string(),
1135        "--path".to_string(),
1136        config.data_path.display().to_string(),
1137    ];
1138
1139    if let Some(bind_addr) = &config.router_bind_addr {
1140        parts.push("--bind".to_string());
1141        parts.push(bind_addr.clone());
1142    } else if let Some(bind_addr) = &config.grpc_bind_addr {
1143        parts.push("--grpc-bind".to_string());
1144        parts.push(bind_addr.clone());
1145    }
1146    if let Some(bind_addr) = &config.http_bind_addr {
1147        parts.push("--http-bind".to_string());
1148        parts.push(bind_addr.clone());
1149    }
1150
1151    parts.join(" ")
1152}
1153
1154pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1155    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1156        Ok(addresses) => addresses.collect(),
1157        Err(_) => return false,
1158    };
1159
1160    addresses
1161        .into_iter()
1162        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1163}
1164
1165#[inline(never)]
1166fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1167    // Phase 6 logging is initialised inside each runner once the
1168    // runtime is open — see `build_runtime_and_auth_store`. Going
1169    // after DB open lets us read `red.logging.*` config keys out of
1170    // the persistent red_config store and merge them with the CLI
1171    // flags (flag > red_config > built-in default).
1172    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1173        return run_routed_server(config, router_bind_addr);
1174    }
1175
1176    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1177        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1178            run_dual_server(config, grpc_bind_addr, http_bind_addr)
1179        }
1180        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1181        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1182        (None, None) => {
1183            if let Some(wire_addr) = config.wire_bind_addr.clone() {
1184                run_wire_only_server(config, wire_addr)
1185            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1186                run_pg_only_server(config, pg_addr)
1187            } else {
1188                Err("at least one server bind address must be configured".to_string())
1189            }
1190        }
1191    }
1192}
1193
1194/// Bind a TCP listener for a transport at startup and record the
1195/// outcome in the shared [`TransportReadiness`] state.
1196///
1197/// The split between *explicit* and *implicit/default* binds is the
1198/// contract from issue #545:
1199///
1200/// * `explicit == true` — the operator named this transport on the
1201///   CLI / env / config. A failed bind is fatal: this returns `Err`
1202///   and the boot path must propagate the error so the process exits
1203///   non-zero with the recorded `reason`.
1204/// * `explicit == false` — this is a default listener the server
1205///   would have spun up regardless. A failed bind degrades: this
1206///   returns `Ok(None)` (no listener) but the failure is still
1207///   recorded in `readiness.failed`, so the server keeps running and
1208///   the next `/health` probe enumerates the degraded listener.
1209///
1210/// On success the bound listener lands in `readiness.active`.
1211pub fn bind_listener_for_startup(
1212    readiness: &mut TransportReadiness,
1213    transport: &str,
1214    bind_addr: &str,
1215    explicit: bool,
1216) -> Result<Option<TcpListener>, String> {
1217    match TcpListener::bind(bind_addr) {
1218        Ok(listener) => {
1219            readiness.active(transport, bind_addr, explicit);
1220            Ok(Some(listener))
1221        }
1222        Err(err) => {
1223            let reason = format!("{transport} listener bind {bind_addr}: {err}");
1224            readiness.failed(transport, bind_addr, explicit, reason.clone());
1225            if explicit {
1226                tracing::error!(
1227                    transport,
1228                    bind = %bind_addr,
1229                    error = %err,
1230                    "fatal explicit bind failure"
1231                );
1232                Err(format!("explicit {reason}"))
1233            } else {
1234                tracing::warn!(
1235                    transport,
1236                    bind = %bind_addr,
1237                    error = %err,
1238                    "non-fatal implicit bind failure; listener degraded"
1239                );
1240                Ok(None)
1241            }
1242        }
1243    }
1244}
1245
1246/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
1247///
1248/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
1249/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
1250/// grace window. SIGKILL after that grace window is the OS's
1251/// fallback; we are responsible for finishing in time.
1252///
1253/// Spawns a tokio task on the caller's runtime that:
1254///   1. Awaits the first of SIGTERM / SIGINT.
1255///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
1256///      runtime moves to `Stopped` on its own; this just runs the
1257///      flush + checkpoint pipeline and (optionally) a final backup.
1258///   3. Calls `std::process::exit(0)` so the orchestrator sees a
1259///      clean exit code.
1260///
1261/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
1262/// configured) toggles step 3's backup branch. The flush + checkpoint
1263/// always run.
1264///
1265/// Idempotent across signal storms — `graceful_shutdown` returns the
1266/// cached report on second call, but we exit on the first one
1267/// regardless, so the second SIGTERM never reaches the handler.
1268async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1269    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1270        .ok()
1271        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1272        .unwrap_or(true);
1273
1274    #[cfg(unix)]
1275    {
1276        use tokio::signal::unix::{signal, SignalKind};
1277
1278        let mut sigterm = match signal(SignalKind::terminate()) {
1279            Ok(s) => s,
1280            Err(err) => {
1281                tracing::warn!(
1282                    error = %err,
1283                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1284                );
1285                return;
1286            }
1287        };
1288        let mut sigint = match signal(SignalKind::interrupt()) {
1289            Ok(s) => s,
1290            Err(err) => {
1291                tracing::warn!(error = %err, "could not install SIGINT handler");
1292                return;
1293            }
1294        };
1295        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
1296        // their `_FILE` companions without restarting the process.
1297        // Useful for credential rotation pipelines (kubectl create
1298        // secret + kubectl rollout restart, but for systemd / Nomad /
1299        // bare-metal where rolling the process is heavier).
1300        let mut sighup = match signal(SignalKind::hangup()) {
1301            Ok(s) => Some(s),
1302            Err(err) => {
1303                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1304                None
1305            }
1306        };
1307
1308        let reload_runtime = runtime.clone();
1309        tokio::spawn(async move {
1310            loop {
1311                let signal_name = match &mut sighup {
1312                    Some(hup) => tokio::select! {
1313                        _ = sigterm.recv() => "SIGTERM",
1314                        _ = sigint.recv() => "SIGINT",
1315                        _ = hup.recv() => "SIGHUP",
1316                    },
1317                    None => tokio::select! {
1318                        _ = sigterm.recv() => "SIGTERM",
1319                        _ = sigint.recv() => "SIGINT",
1320                    },
1321                };
1322
1323                if signal_name == "SIGHUP" {
1324                    handle_sighup_reload(&reload_runtime);
1325                    continue; // stay alive; SIGHUP isn't a shutdown
1326                }
1327
1328                tracing::info!(
1329                    signal = signal_name,
1330                    "lifecycle signal received; shutting down"
1331                );
1332                match runtime.graceful_shutdown(backup_on_shutdown) {
1333                    Ok(report) => {
1334                        tracing::info!(
1335                            duration_ms = report.duration_ms,
1336                            flushed_wal = report.flushed_wal,
1337                            final_checkpoint = report.final_checkpoint,
1338                            backup_uploaded = report.backup_uploaded,
1339                            "graceful shutdown complete"
1340                        );
1341                    }
1342                    Err(err) => {
1343                        tracing::error!(error = %err, "graceful shutdown failed");
1344                        // Issue #205 — graceful shutdown returning Err
1345                        // means the runtime is exiting without a clean
1346                        // flush/checkpoint. Operator-grade event so the
1347                        // operator notices the dirty exit even when the
1348                        // process restarts before they read tracing logs.
1349                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1350                            reason: format!("graceful shutdown failed: {err}"),
1351                        }
1352                        .emit_global();
1353                    }
1354                }
1355                std::process::exit(0);
1356            }
1357        });
1358    }
1359
1360    #[cfg(not(unix))]
1361    {
1362        tokio::spawn(async move {
1363            let interrupted = tokio::signal::ctrl_c().await;
1364            if let Err(err) = interrupted {
1365                tracing::warn!(error = %err, "could not install Ctrl+C handler");
1366                return;
1367            }
1368
1369            tracing::info!(
1370                signal = "Ctrl+C",
1371                "lifecycle signal received; shutting down"
1372            );
1373            match runtime.graceful_shutdown(backup_on_shutdown) {
1374                Ok(report) => {
1375                    tracing::info!(
1376                        duration_ms = report.duration_ms,
1377                        flushed_wal = report.flushed_wal,
1378                        final_checkpoint = report.final_checkpoint,
1379                        backup_uploaded = report.backup_uploaded,
1380                        "graceful shutdown complete"
1381                    );
1382                }
1383                Err(err) => {
1384                    tracing::error!(error = %err, "graceful shutdown failed");
1385                }
1386            }
1387            std::process::exit(0);
1388        });
1389    }
1390}
1391
1392/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
1393/// vars. Today this only refreshes the audit log + records the
1394/// reload event; the runtime modules that hold cached secret
1395/// material (S3 backend credentials, admin token, JWT keys) read
1396/// the env on each request so the next call after SIGHUP picks up
1397/// the new file contents automatically. A future extension can
1398/// punch through to the LeaseStore / AuthStore for in-memory
1399/// caches that don't re-read on each call.
1400fn handle_sighup_reload(runtime: &RedDBRuntime) {
1401    let now_ms = std::time::SystemTime::now()
1402        .duration_since(std::time::UNIX_EPOCH)
1403        .map(|d| d.as_millis() as u64)
1404        .unwrap_or(0);
1405    tracing::info!(
1406        target: "reddb::secrets",
1407        ts_unix_ms = now_ms,
1408        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1409    );
1410    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1411    // every emission goes through the typed-field guard. The
1412    // arguments here are static, but using the typed entry point
1413    // keeps the discipline uniform across call sites.
1414    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1415    runtime.audit_log().record_event(
1416        AuditEvent::builder("config/sighup_reload")
1417            .source(AuditAuthSource::System)
1418            .resource("secrets")
1419            .outcome(Outcome::Success)
1420            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1421            .build(),
1422    );
1423}
1424
1425#[inline(never)]
1426fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1427    let workers = config.workers;
1428    let cli_telemetry = config.telemetry.clone();
1429    let db_options = config.to_db_options()?;
1430    let rt_config = detect_runtime_config();
1431    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1432    let (runtime, auth_store, _telemetry_guard) =
1433        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1434    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1435
1436    spawn_admin_metrics_listeners(&runtime, &auth_store);
1437
1438    // Issue #933: collapse the loopback proxy. All three transports are
1439    // served from one in-process acceptor that shares the single tokio
1440    // runtime (ADR 0035) — no internal HTTP/gRPC/wire listeners, no
1441    // `copy_bidirectional` hop. We build the handler objects here and hand
1442    // them to the demux, which classifies each connection and dispatches.
1443    let http_server = build_http_server(
1444        runtime.clone(),
1445        auth_store.clone(),
1446        router_bind_addr.clone(),
1447    );
1448    let http_server = apply_http_limits(http_server, &config, &runtime);
1449    let http_server = apply_ui_bundle(http_server, &config)?;
1450
1451    let grpc_server = RedDBGrpcServer::with_options(
1452        runtime.clone(),
1453        GrpcServerOptions {
1454            bind_addr: router_bind_addr.clone(),
1455            tls: None,
1456        },
1457        auth_store,
1458    );
1459
1460    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1461        .enable_all()
1462        .worker_threads(worker_threads)
1463        .thread_stack_size(rt_config.stack_size)
1464        .build()
1465        .map_err(|err| format!("tokio runtime: {err}"))?;
1466
1467    let signal_runtime = runtime.clone();
1468    let wire_runtime = Arc::new(runtime);
1469    tokio_runtime.block_on(async move {
1470        spawn_lifecycle_signal_handler(signal_runtime).await;
1471        tracing::info!(
1472            bind = %router_bind_addr,
1473            cpus = rt_config.available_cpus,
1474            workers = worker_threads,
1475            "router bootstrapping"
1476        );
1477        serve_tcp_router(InProcessRouterConfig {
1478            bind_addr: router_bind_addr,
1479            http_server,
1480            grpc_server,
1481            wire_runtime,
1482        })
1483        .await
1484        .map_err(|err| err.to_string())
1485    })
1486}
1487
1488/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1489async fn spawn_wire_listeners(
1490    config: &ServerCommandConfig,
1491    runtime: &RedDBRuntime,
1492    readiness: &mut TransportReadiness,
1493) -> Result<(), String> {
1494    // Plaintext RedWire — TCP or Unix socket
1495    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1496        let wire_rt = Arc::new(runtime.clone());
1497        // Address starting with `unix://` or an absolute filesystem path
1498        // switches to Unix domain sockets.
1499        #[cfg(unix)]
1500        {
1501            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1502                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1503                tokio::spawn(async move {
1504                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1505                        &wire_addr, wire_rt,
1506                    )
1507                    .await
1508                    {
1509                        tracing::error!(err = %e, "redwire unix listener error");
1510                    }
1511                });
1512                return Ok(());
1513            }
1514        }
1515        match tokio::net::TcpListener::bind(&wire_addr).await {
1516            Ok(listener) => {
1517                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1518                tokio::spawn(async move {
1519                    if let Err(e) =
1520                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1521                            .await
1522                    {
1523                        tracing::error!(err = %e, "redwire listener error");
1524                    }
1525                });
1526            }
1527            Err(err) => {
1528                let reason = format!("wire listener bind {wire_addr}: {err}");
1529                readiness.failed(
1530                    "wire",
1531                    &wire_addr,
1532                    config.wire_bind_explicit,
1533                    reason.clone(),
1534                );
1535                if config.wire_bind_explicit {
1536                    tracing::error!(
1537                        transport = "wire",
1538                        bind = %wire_addr,
1539                        error = %err,
1540                        "fatal explicit bind failure"
1541                    );
1542                    return Err(format!("explicit {reason}"));
1543                }
1544                tracing::warn!(
1545                    transport = "wire",
1546                    bind = %wire_addr,
1547                    error = %err,
1548                    "non-fatal implicit bind failure; listener degraded"
1549                );
1550            }
1551        }
1552    }
1553
1554    // RedWire over TLS
1555    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1556        let tls_config = resolve_wire_tls_config(config);
1557        match tls_config {
1558            Ok(tls_cfg) => {
1559                let wire_rt = Arc::new(runtime.clone());
1560                tokio::spawn(async move {
1561                    if let Err(e) =
1562                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1563                            .await
1564                    {
1565                        tracing::error!(err = %e, "redwire+tls listener error");
1566                    }
1567                });
1568            }
1569            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1570        }
1571    }
1572    Ok(())
1573}
1574
1575/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1576///
1577/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1578/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1579/// alongside the native wire listener; the two transports do not
1580/// share a port.
1581fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1582    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1583        let rt = Arc::new(runtime.clone());
1584        tokio::spawn(async move {
1585            let cfg = crate::wire::PgWireConfig {
1586                bind_addr: pg_addr,
1587                ..Default::default()
1588            };
1589            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1590                tracing::error!(err = %e, "pg wire listener error");
1591            }
1592        });
1593    }
1594}
1595
1596/// Resolve gRPC TLS material into PEM bytes.
1597///
1598/// Lookup order, in priority:
1599///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1600///      passed via CLI/env). Both must be present together.
1601///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1602///      to the data dir. Refuses to run without the env flag so an
1603///      operator can't accidentally ship a dev cert in prod.
1604///
1605/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1606/// turning the listener into a mutual-TLS endpoint that requires
1607/// every client to present a chain that anchors at one of the CAs
1608/// in the bundle.
1609fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1610    use crate::utils::secret_file::expand_file_env;
1611
1612    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1613    // surface as warnings; the fallback path (explicit cert paths) will
1614    // cover the common case.
1615    for var in [
1616        "REDDB_GRPC_TLS_CERT",
1617        "REDDB_GRPC_TLS_KEY",
1618        "REDDB_GRPC_TLS_CLIENT_CA",
1619    ] {
1620        if let Err(err) = expand_file_env(var) {
1621            tracing::warn!(
1622                target: "reddb::secrets",
1623                env = %var,
1624                err = %err,
1625                "could not expand *_FILE companion for gRPC TLS"
1626            );
1627        }
1628    }
1629
1630    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1631        (Some(cert), Some(key)) => {
1632            let cert_pem = std::fs::read(cert)
1633                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1634            let key_pem =
1635                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1636            (cert_pem, key_pem)
1637        }
1638        _ => {
1639            // No explicit material → only proceed when dev-mode is on.
1640            let dev = std::env::var("RED_GRPC_TLS_DEV")
1641                .ok()
1642                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1643                .unwrap_or(false);
1644            if !dev {
1645                return Err("gRPC TLS configured but no cert/key supplied — set \
1646                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1647                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1648                    .to_string());
1649            }
1650            let dir = config
1651                .path
1652                .as_ref()
1653                .and_then(|p| p.parent())
1654                .map(PathBuf::from)
1655                .unwrap_or_else(|| PathBuf::from("."));
1656            let (cert_pem_str, key_pem_str) =
1657                crate::wire::tls::generate_self_signed_cert("localhost")
1658                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1659
1660            // Constant-time-friendly fingerprint to stderr so the
1661            // operator can pin a client trust store. We log via
1662            // `tracing::warn!` so it stands out next to ordinary
1663            // listener-online events.
1664            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1665            tracing::warn!(
1666                target: "reddb::security",
1667                transport = "grpc",
1668                cert_sha256 = %fp,
1669                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1670                 DO NOT use in production"
1671            );
1672            // Persist so that restarts reuse the same identity.
1673            let cert_path = dir.join("grpc-tls-cert.pem");
1674            let key_path = dir.join("grpc-tls-key.pem");
1675            if !cert_path.exists() || !key_path.exists() {
1676                let _ = std::fs::create_dir_all(&dir);
1677                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1678                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1679                std::fs::write(&key_path, key_pem_str.as_bytes())
1680                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1681                #[cfg(unix)]
1682                {
1683                    use std::os::unix::fs::PermissionsExt;
1684                    let _ =
1685                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1686                }
1687            }
1688            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1689        }
1690    };
1691
1692    let client_ca_pem = match &config.grpc_tls_client_ca {
1693        Some(path) => Some(
1694            std::fs::read(path)
1695                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1696        ),
1697        None => None,
1698    };
1699
1700    Ok(crate::GrpcTlsOptions {
1701        cert_pem,
1702        key_pem,
1703        client_ca_pem,
1704    })
1705}
1706
1707/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1708/// configured. Logs and continues on TLS-config errors so the plain
1709/// listener stays up; this matches the wire-listener pattern.
1710fn spawn_grpc_tls_listener_if_configured(
1711    config: &ServerCommandConfig,
1712    runtime: RedDBRuntime,
1713    auth_store: Arc<AuthStore>,
1714) {
1715    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1716        return;
1717    };
1718    let tls_opts = match resolve_grpc_tls_options(config) {
1719        Ok(opts) => opts,
1720        Err(err) => {
1721            tracing::error!(
1722                target: "reddb::security",
1723                transport = "grpc",
1724                err = %err,
1725                "gRPC TLS config error; TLS listener will not start"
1726            );
1727            return;
1728        }
1729    };
1730    tokio::spawn(async move {
1731        let server = RedDBGrpcServer::with_options(
1732            runtime,
1733            GrpcServerOptions {
1734                bind_addr: tls_bind.clone(),
1735                tls: Some(tls_opts),
1736            },
1737            auth_store,
1738        );
1739        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1740        if let Err(err) = server.serve().await {
1741            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1742        }
1743    });
1744}
1745
1746/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1747/// lines. Constant-time hash; no token contents leave this fn.
1748fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1749    use sha2::{Digest, Sha256};
1750    let mut h = Sha256::new();
1751    h.update(pem);
1752    let d = h.finalize();
1753    let mut buf = String::with_capacity(64);
1754    for b in d.iter() {
1755        buf.push_str(&format!("{b:02x}"));
1756    }
1757    buf
1758}
1759
1760/// Resolve TLS config: use provided cert/key or auto-generate.
1761fn resolve_wire_tls_config(
1762    config: &ServerCommandConfig,
1763) -> Result<crate::wire::WireTlsConfig, String> {
1764    match (&config.wire_tls_cert, &config.wire_tls_key) {
1765        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1766            cert_path: cert.clone(),
1767            key_path: key.clone(),
1768        }),
1769        _ => {
1770            // Auto-generate self-signed cert
1771            let dir = config
1772                .path
1773                .as_ref()
1774                .and_then(|p| p.parent())
1775                .map(PathBuf::from)
1776                .unwrap_or_else(|| PathBuf::from("."));
1777            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1778        }
1779    }
1780}
1781
1782#[inline(never)]
1783fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1784    let rt_config = detect_runtime_config();
1785    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1786    let cli_telemetry = config.telemetry.clone();
1787    let db_options = config.to_db_options()?;
1788    let mut transport_readiness = TransportReadiness::default();
1789
1790    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1791        .enable_all()
1792        .worker_threads(workers)
1793        .thread_stack_size(rt_config.stack_size)
1794        .build()
1795        .map_err(|err| format!("tokio runtime: {err}"))?;
1796
1797    // Guard lives on the outer thread's stack so it outlives the
1798    // tokio runtime — dropping it only after the listener returns
1799    // flushes the file log writer.
1800    let (runtime, _auth_store, _telemetry_guard) =
1801        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1802    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1803    let signal_runtime = runtime.clone();
1804    tokio_runtime.block_on(async move {
1805        spawn_lifecycle_signal_handler(signal_runtime).await;
1806        spawn_pg_listener(&config, &runtime);
1807        let wire_rt = Arc::new(runtime);
1808        let listener = tokio::net::TcpListener::bind(&wire_addr)
1809            .await
1810            .map_err(|err| {
1811                let reason = format!("wire listener bind {wire_addr}: {err}");
1812                transport_readiness.failed(
1813                    "wire",
1814                    &wire_addr,
1815                    config.wire_bind_explicit,
1816                    reason.clone(),
1817                );
1818                if config.wire_bind_explicit {
1819                    format!("explicit {reason}")
1820                } else {
1821                    reason
1822                }
1823            })?;
1824        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1825        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1826            .await
1827            .map_err(|e| e.to_string())
1828    })
1829}
1830
1831#[inline(never)]
1832fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1833    let rt_config = detect_runtime_config();
1834    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1835    let cli_telemetry = config.telemetry.clone();
1836    let db_options = config.to_db_options()?;
1837
1838    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1839        .enable_all()
1840        .worker_threads(workers)
1841        .thread_stack_size(rt_config.stack_size)
1842        .build()
1843        .map_err(|err| format!("tokio runtime: {err}"))?;
1844
1845    let (runtime, _auth_store, _telemetry_guard) =
1846        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1847    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1848    let signal_runtime = runtime.clone();
1849    tokio_runtime.block_on(async move {
1850        spawn_lifecycle_signal_handler(signal_runtime).await;
1851        let cfg = crate::wire::PgWireConfig {
1852            bind_addr: pg_addr,
1853            ..Default::default()
1854        };
1855        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1856            .await
1857            .map_err(|e| e.to_string())
1858    })
1859}
1860
1861#[inline(never)]
1862fn build_runtime_and_auth_store(
1863    db_options: RedDBOptions,
1864    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1865) -> Result<
1866    (
1867        RedDBRuntime,
1868        Arc<AuthStore>,
1869        Option<crate::telemetry::TelemetryGuard>,
1870    ),
1871    String,
1872> {
1873    // Return the TelemetryGuard so server runners can bind it for
1874    // their full lifetime. Dropping the guard tears down the
1875    // non-blocking log writer thread and, because that writer is
1876    // built with `.lossy(true)`, any subsequent log event routed to
1877    // the file sink is silently dropped — so callers MUST keep the
1878    // returned `Option<TelemetryGuard>` alive until shutdown.
1879    build_runtime_with_telemetry(db_options, cli_telemetry)
1880}
1881
1882/// Open the runtime, initialise structured logging from merged CLI +
1883/// `red_config` settings, and return a guard the caller must keep
1884/// alive for the server lifetime (drop flushes pending log writes).
1885///
1886/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1887/// in red_config, beats the built-in default. A CLI-flag value of
1888/// `None` / empty means "inherit from config or default" — never
1889/// "disable". The one exception is `--no-log-file` which forces
1890/// `log_dir = None` regardless of config.
1891pub(crate) fn build_runtime_with_telemetry(
1892    db_options: RedDBOptions,
1893    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1894) -> Result<
1895    (
1896        RedDBRuntime,
1897        Arc<AuthStore>,
1898        Option<crate::telemetry::TelemetryGuard>,
1899    ),
1900    String,
1901> {
1902    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1903        // Issue #205 — runtime construction failure is the canonical
1904        // StartupFailed phase. The audit sink isn't installed yet
1905        // (it would have been registered inside `with_options`), so
1906        // the emit falls through to tracing+eprintln only — operator
1907        // still sees it on stderr.
1908        let msg = err.to_string();
1909        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1910            phase: "runtime_construction".to_string(),
1911            error: msg.clone(),
1912        }
1913        .emit_global();
1914        msg
1915    })?;
1916
1917    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1918    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1919    // operator asked for a fence; running unfenced would silently
1920    // expose split-brain risk.
1921    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1922        let msg = err.to_string();
1923        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1924            phase: "lease_loop".to_string(),
1925            error: msg.clone(),
1926        }
1927        .emit_global();
1928        msg
1929    })?;
1930
1931    // #213 — edge-triggered disk-space watchdog. Watches the data
1932    // directory; falls back to polling when fanotify is unavailable
1933    // (non-Linux or unprivileged container).
1934    if let Some(data_path) = db_options.data_path.as_deref() {
1935        let watch_dir = data_path.parent().unwrap_or(data_path);
1936        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1937    }
1938
1939    // #214 — inotify config hot-reload watcher. Watches the config file
1940    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1941    // applies hot-reloadable keys without restart.
1942    {
1943        let config_path = crate::runtime::config_overlay::config_file_path();
1944        let store = runtime.db().store();
1945        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1946    }
1947
1948    // Phase 6 logging: merge red_config overrides onto the CLI-built
1949    // telemetry config, then install the global subscriber.
1950    let merged = merge_telemetry_with_config(
1951        cli_telemetry
1952            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1953        &runtime,
1954    );
1955    let telemetry_guard = crate::telemetry::init(merged);
1956
1957    let no_auth = no_auth_active(&db_options);
1958    let auth_store =
1959        if db_options.auth.vault_enabled {
1960            let pager =
1961                runtime.db().store().pager().cloned().ok_or_else(|| {
1962                    "vault requires a paged database (persistent mode)".to_string()
1963                })?;
1964            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1965                .map_err(|err| err.to_string())?;
1966            Arc::new(store)
1967        } else {
1968            Arc::new(AuthStore::new(db_options.auth.clone()))
1969        };
1970    auth_store.configure_control_events(
1971        runtime.control_event_ledger(),
1972        runtime.control_event_config(),
1973    );
1974    // Issue #663 — when `--no-auth` is active, deliberately skip the
1975    // preset machinery. Otherwise a stray `REDDB_USERNAME`+`REDDB_PASSWORD`
1976    // pair in the operator's environment would silently create an admin
1977    // user, defeating the whole point of opting into anonymous mode.
1978    if no_auth {
1979        eprintln!("{NO_AUTH_WARNING}");
1980        tracing::warn!("{NO_AUTH_WARNING}");
1981    } else {
1982        apply_preset(&runtime, &auth_store)?;
1983        maybe_apply_policy_break_glass(&auth_store);
1984    }
1985
1986    // Background session purge (every 5 minutes)
1987    {
1988        let store = Arc::clone(&auth_store);
1989        std::thread::Builder::new()
1990            .name("reddb-session-purge".into())
1991            .spawn(move || loop {
1992                std::thread::sleep(std::time::Duration::from_secs(300));
1993                store.purge_expired_sessions();
1994            })
1995            .ok();
1996    }
1997
1998    Ok((runtime, auth_store, telemetry_guard))
1999}
2000
2001/// Honour `REDDB_POLICY_BREAK_GLASS=1` at boot — see issue #713 and
2002/// the [`crate::auth::self_lock_guard`] module. Anything other than
2003/// `1`/`true`/`yes` (case-insensitive) is treated as not set.
2004fn maybe_apply_policy_break_glass(auth_store: &Arc<AuthStore>) {
2005    use crate::auth::self_lock_guard::BREAK_GLASS_ENV;
2006
2007    let enabled = std::env::var(BREAK_GLASS_ENV)
2008        .ok()
2009        .map(|v| {
2010            let trimmed = v.trim().to_ascii_lowercase();
2011            matches!(trimmed.as_str(), "1" | "true" | "yes")
2012        })
2013        .unwrap_or(false);
2014    if !enabled {
2015        return;
2016    }
2017    let now = crate::utils::now_unix_millis() as u128;
2018    match auth_store.apply_policy_break_glass(now) {
2019        Ok(()) => {
2020            tracing::warn!(env = BREAK_GLASS_ENV, "policy break-glass recovery applied");
2021        }
2022        Err(err) => {
2023            tracing::error!(env = BREAK_GLASS_ENV, %err, "policy break-glass recovery failed");
2024        }
2025    }
2026}
2027
2028/// Reserved config keys describing first-boot bootstrap state (issue #650).
2029/// Presence of [`BOOTSTRAP_COMPLETED_KEY`] is the idempotency hinge: when
2030/// it is set, [`apply_preset`] silently no-ops on subsequent boots so a
2031/// container restart with the same env is a no-op.
2032pub(crate) const BOOTSTRAP_COMPLETED_KEY: &str = "system.bootstrap.completed";
2033pub(crate) const BOOTSTRAP_PRESET_KEY: &str = "system.bootstrap.preset";
2034pub(crate) const BOOTSTRAP_FIRST_ADMIN_KEY: &str = "system.bootstrap.first_admin_id";
2035
2036/// Env var selecting the bootstrap preset. Default = `simple`.
2037pub(crate) const PRESET_ENV: &str = "REDDB_PRESET";
2038pub(crate) const PRESET_SIMPLE: &str = "simple";
2039pub(crate) const PRESET_PRODUCTION: &str = "production";
2040pub(crate) const PRESET_REGULATED: &str = "regulated";
2041
2042/// Policy id installed by the `production` preset and attached to the
2043/// first admin. Grants `"*"` on `"*"` so the admin has policy-derived
2044/// broad authority (acceptance #3) — not an authorization bypass.
2045pub(crate) const FIRST_ADMIN_ALLOW_ALL_POLICY: &str = "system.bootstrap.first-admin-allow-all";
2046pub(crate) const REGULATED_PROTECT_MANAGED_POLICY: &str = "system.regulated.protect-managed";
2047pub(crate) const REGULATED_AUDIT_CONFIG_NAMESPACE: &str = "red.config.audit";
2048pub(crate) const REGULATED_EVIDENCE_CONFIG_NAMESPACE: &str = "red.config.evidence";
2049pub(crate) const REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE: &str = "red.config.query_audit";
2050
2051/// Apply the bootstrap preset selected by `REDDB_PRESET` (default
2052/// `simple`). Idempotent — if `system.bootstrap.completed` is already
2053/// set, this is a one-line `tracing::info!` no-op and the server proceeds
2054/// (issue #650 acceptance #5).
2055///
2056/// Caller must have already short-circuited the `--no-auth` / `--dev`
2057/// path (issue #663): when that flag is set, the preset must be skipped
2058/// entirely — no admin created, no bootstrap state written.
2059pub(crate) fn apply_preset(
2060    runtime: &RedDBRuntime,
2061    auth_store: &Arc<AuthStore>,
2062) -> Result<(), String> {
2063    let store = runtime.db().store();
2064
2065    if store.get_config(BOOTSTRAP_COMPLETED_KEY).is_some() {
2066        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(
2067            runtime,
2068            &runtime.config_registry(),
2069        )?;
2070        tracing::info!("bootstrap state present, skipping preset application");
2071        return Ok(());
2072    }
2073
2074    // `_FILE` companion expansion for k8s secret mounts. Errors here
2075    // (e.g. both `REDDB_PASSWORD` and `REDDB_PASSWORD_FILE` set) are
2076    // operator misconfigs and should fail the boot loudly.
2077    for var in ["REDDB_USERNAME", "REDDB_PASSWORD"] {
2078        crate::utils::expand_file_env(var).map_err(|err| format!("expand {var}_FILE: {err}"))?;
2079    }
2080
2081    let preset = std::env::var(PRESET_ENV)
2082        .ok()
2083        .map(|s| s.trim().to_string())
2084        .filter(|s| !s.is_empty())
2085        .unwrap_or_else(|| PRESET_SIMPLE.to_string());
2086
2087    if let Ok(path) = std::env::var(crate::cli::bootstrap_manifest::MANIFEST_ENV) {
2088        let path = path.trim();
2089        if !path.is_empty() {
2090            let first_admin_id = crate::cli::bootstrap_manifest::apply_manifest_file(
2091                runtime,
2092                auth_store,
2093                &runtime.config_registry(),
2094                std::path::Path::new(path),
2095            )?;
2096            persist_bootstrap_state(runtime, "manifest", Some(&first_admin_id));
2097            tracing::info!("bootstrap manifest applied");
2098            return Ok(());
2099        }
2100    }
2101
2102    let first_admin_id = match preset.as_str() {
2103        PRESET_SIMPLE => {
2104            // `simple` is the default low-friction preset. Auth knobs
2105            // remain whatever the CLI/env set; we only persist the
2106            // bootstrap state so subsequent boots are idempotent.
2107            None
2108        }
2109        PRESET_PRODUCTION => Some(apply_production_preset(auth_store)?),
2110        PRESET_REGULATED => {
2111            apply_regulated_preset(runtime, auth_store)?;
2112            None
2113        }
2114        other => {
2115            return Err(format!(
2116                "REDDB_PRESET={other:?} is not recognised (expected `simple`, `production`, or `regulated`)"
2117            ));
2118        }
2119    };
2120
2121    persist_bootstrap_state(runtime, &preset, first_admin_id.as_deref());
2122    tracing::info!(preset = %preset, "bootstrap preset applied");
2123    Ok(())
2124}
2125
2126fn apply_production_preset(auth_store: &Arc<AuthStore>) -> Result<String, String> {
2127    use crate::auth::store::PrincipalRef;
2128    use crate::auth::{policies::Policy, UserId};
2129
2130    let username = std::env::var("REDDB_USERNAME")
2131        .ok()
2132        .filter(|s| !s.is_empty())
2133        .ok_or_else(|| {
2134            "REDDB_PRESET=production requires REDDB_USERNAME (or REDDB_USERNAME_FILE)".to_string()
2135        })?;
2136    let password = std::env::var("REDDB_PASSWORD")
2137        .ok()
2138        .filter(|s| !s.is_empty())
2139        .ok_or_else(|| {
2140            "REDDB_PRESET=production requires REDDB_PASSWORD (or REDDB_PASSWORD_FILE)".to_string()
2141        })?;
2142
2143    // (1) Create the first admin as system-owned + platform-scoped so
2144    // #649's `ManagedConfigGate` accepts them on managed-config writes.
2145    let result = auth_store
2146        .bootstrap_system_admin(&username, &password)
2147        .map_err(|err| format!("bootstrap first admin: {err}"))?;
2148    let first_admin = UserId::platform(result.user.username.clone());
2149
2150    // (2) Install the allow-all policy as an ordinary policy row.
2151    let policy = Policy::from_json_str(&format!(
2152        r#"{{
2153            "id": "{id}",
2154            "version": 1,
2155            "statements": [{{
2156                "effect": "allow",
2157                "actions": ["*"],
2158                "resources": ["*"]
2159            }}]
2160        }}"#,
2161        id = FIRST_ADMIN_ALLOW_ALL_POLICY
2162    ))
2163    .map_err(|err| format!("compile allow-all policy: {err}"))?;
2164    auth_store
2165        .put_policy(policy)
2166        .map_err(|err| format!("install allow-all policy: {err}"))?;
2167
2168    // (3) Attach the policy to the first admin.
2169    auth_store
2170        .attach_policy(
2171            PrincipalRef::User(first_admin.clone()),
2172            FIRST_ADMIN_ALLOW_ALL_POLICY,
2173        )
2174        .map_err(|err| format!("attach allow-all policy: {err}"))?;
2175
2176    Ok(first_admin.to_string())
2177}
2178
2179fn apply_regulated_preset(
2180    runtime: &RedDBRuntime,
2181    auth_store: &Arc<AuthStore>,
2182) -> Result<(), String> {
2183    use crate::auth::policies::Policy;
2184    use crate::auth::registry::EvidenceRequirement;
2185
2186    runtime.query_audit().enable_infrastructure();
2187
2188    let policy = Policy::from_json_str(&format!(
2189        r#"{{
2190            "id": "{id}",
2191            "version": 1,
2192            "statements": [
2193                {{
2194                    "effect": "deny",
2195                    "actions": ["policy:put", "policy:drop", "policy:attach", "policy:detach"],
2196                    "resources": ["policy:{id}"]
2197                }},
2198                {{
2199                    "effect": "deny",
2200                    "actions": ["config:write"],
2201                    "resources": [
2202                        "config:{audit}.*",
2203                        "config:{evidence}.*",
2204                        "config:{query_audit}.*"
2205                    ]
2206                }}
2207            ]
2208        }}"#,
2209        id = REGULATED_PROTECT_MANAGED_POLICY,
2210        audit = REGULATED_AUDIT_CONFIG_NAMESPACE,
2211        evidence = REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2212        query_audit = REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2213    ))
2214    .map_err(|err| format!("compile regulated guardrail policy: {err}"))?;
2215    auth_store
2216        .put_policy(policy)
2217        .map_err(|err| format!("install regulated guardrail policy: {err}"))?;
2218
2219    let now_ms = crate::utils::now_unix_millis() as u128;
2220    let entries = vec![
2221        regulated_registry_entry(
2222            REGULATED_PROTECT_MANAGED_POLICY,
2223            crate::auth::managed_policy::RESOURCE_TYPE_POLICY,
2224            "iam_policy",
2225            "policy:*",
2226            &format!("policy:{REGULATED_PROTECT_MANAGED_POLICY}"),
2227            EvidenceRequirement::Metadata,
2228            now_ms,
2229        ),
2230        regulated_registry_entry(
2231            REGULATED_AUDIT_CONFIG_NAMESPACE,
2232            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2233            "config_namespace",
2234            "config:write",
2235            &format!("config:{REGULATED_AUDIT_CONFIG_NAMESPACE}.*"),
2236            EvidenceRequirement::Metadata,
2237            now_ms,
2238        ),
2239        regulated_registry_entry(
2240            REGULATED_EVIDENCE_CONFIG_NAMESPACE,
2241            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2242            "config_namespace",
2243            "config:write",
2244            &format!("config:{REGULATED_EVIDENCE_CONFIG_NAMESPACE}.*"),
2245            EvidenceRequirement::Metadata,
2246            now_ms,
2247        ),
2248        regulated_registry_entry(
2249            REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE,
2250            crate::auth::managed_config::RESOURCE_TYPE_CONFIG_NAMESPACE,
2251            "config_namespace",
2252            "config:write",
2253            &format!("config:{REGULATED_QUERY_AUDIT_CONFIG_NAMESPACE}.*"),
2254            EvidenceRequirement::Metadata,
2255            now_ms,
2256        ),
2257    ];
2258
2259    for entry in entries.iter().cloned() {
2260        runtime
2261            .config_registry()
2262            .restore_bootstrap_entry(entry)
2263            .map_err(|err| format!("install regulated registry entry: {err}"))?;
2264    }
2265    crate::cli::bootstrap_manifest::persist_registry_state(runtime, &entries)?;
2266    Ok(())
2267}
2268
2269fn regulated_registry_entry(
2270    id: &str,
2271    resource_type: &str,
2272    schema: &str,
2273    required_action: &str,
2274    required_resource: &str,
2275    evidence_requirement: crate::auth::registry::EvidenceRequirement,
2276    updated_at_ms: u128,
2277) -> crate::auth::registry::ConfigRegistryEntry {
2278    crate::auth::registry::ConfigRegistryEntry {
2279        id: id.to_string(),
2280        version: 1,
2281        resource_type: resource_type.to_string(),
2282        schema: schema.to_string(),
2283        mutability: crate::auth::registry::Mutability::Immutable,
2284        sensitivity: crate::auth::registry::Sensitivity::Internal,
2285        managed: true,
2286        required_action: required_action.to_string(),
2287        required_resource: required_resource.to_string(),
2288        evidence_requirement,
2289        updated_by: "system:regulated-preset".to_string(),
2290        updated_at_ms,
2291    }
2292}
2293
2294fn persist_bootstrap_state(runtime: &RedDBRuntime, preset: &str, first_admin_id: Option<&str>) {
2295    let store = runtime.db().store();
2296    let mut tree = crate::serde_json::Map::new();
2297    tree.insert(
2298        BOOTSTRAP_COMPLETED_KEY.to_string(),
2299        crate::serde_json::Value::Bool(true),
2300    );
2301    tree.insert(
2302        BOOTSTRAP_PRESET_KEY.to_string(),
2303        crate::serde_json::Value::String(preset.to_string()),
2304    );
2305    if let Some(id) = first_admin_id {
2306        tree.insert(
2307            BOOTSTRAP_FIRST_ADMIN_KEY.to_string(),
2308            crate::serde_json::Value::String(id.to_string()),
2309        );
2310    }
2311    let json = crate::serde_json::Value::Object(tree);
2312    store.set_config_tree("", &json);
2313}
2314
2315/// Read `red.logging.*` keys from the persistent config store and
2316/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
2317/// explicit CLI flag > red_config > built-in default.
2318///
2319/// The "was a flag passed" signal comes from the `*_explicit` bools
2320/// on `TelemetryConfig`, populated by the CLI parser. This replaces
2321/// an earlier equality-to-default heuristic that silently dropped
2322/// config whenever the CLI-derived default diverged from
2323/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
2324/// non-TTY `format`) and that silently overrode `--no-log-file`.
2325fn merge_telemetry_with_config(
2326    mut cli: crate::telemetry::TelemetryConfig,
2327    runtime: &RedDBRuntime,
2328) -> crate::telemetry::TelemetryConfig {
2329    use crate::storage::schema::Value;
2330
2331    let store = runtime.db().store();
2332
2333    if !cli.level_explicit {
2334        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2335            cli.level_filter = v.to_string();
2336        }
2337    }
2338    if !cli.format_explicit {
2339        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2340            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2341                cli.format = parsed;
2342            }
2343        }
2344    }
2345    if !cli.rotation_keep_days_explicit {
2346        match store.get_config("red.logging.keep_days") {
2347            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2348                cli.rotation_keep_days = n as u16
2349            }
2350            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2351                cli.rotation_keep_days = n as u16
2352            }
2353            Some(Value::Text(v)) => {
2354                if let Ok(n) = v.parse::<u16>() {
2355                    cli.rotation_keep_days = n;
2356                }
2357            }
2358            _ => {}
2359        }
2360    }
2361    if !cli.file_prefix_explicit {
2362        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2363            if !v.is_empty() {
2364                cli.file_prefix = v.to_string();
2365            }
2366        }
2367    }
2368    // --no-log-file is a kill-switch: config cannot resurrect the
2369    // file sink. Explicit --log-dir also wins.
2370    if !cli.log_dir_explicit && !cli.log_file_disabled {
2371        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2372            if !v.is_empty() {
2373                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2374            }
2375        }
2376    }
2377
2378    cli
2379}
2380
2381#[cfg(test)]
2382mod telemetry_merge_tests {
2383    use super::*;
2384    use crate::telemetry::{LogFormat, TelemetryConfig};
2385
2386    fn fresh_runtime() -> RedDBRuntime {
2387        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2388    }
2389
2390    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2391        runtime
2392            .db()
2393            .store()
2394            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2395    }
2396
2397    fn cli_base() -> TelemetryConfig {
2398        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
2399        // log_dir = Some(...), format = Json. Nothing marked explicit.
2400        TelemetryConfig {
2401            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2402            format: LogFormat::Json,
2403            ..Default::default()
2404        }
2405    }
2406
2407    #[test]
2408    fn config_log_dir_promoted_when_flag_absent() {
2409        let runtime = fresh_runtime();
2410        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2411        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2412        assert_eq!(
2413            merged.log_dir.as_deref(),
2414            Some(std::path::Path::new("/var/log/reddb"))
2415        );
2416    }
2417
2418    #[test]
2419    fn explicit_log_dir_wins_over_config() {
2420        let runtime = fresh_runtime();
2421        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2422        let mut cli = cli_base();
2423        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2424        cli.log_dir_explicit = true;
2425        let merged = merge_telemetry_with_config(cli, &runtime);
2426        assert_eq!(
2427            merged.log_dir.as_deref(),
2428            Some(std::path::Path::new("/custom/dir"))
2429        );
2430    }
2431
2432    #[test]
2433    fn no_log_file_beats_config_log_dir() {
2434        let runtime = fresh_runtime();
2435        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2436        let mut cli = cli_base();
2437        cli.log_dir = None;
2438        cli.log_file_disabled = true;
2439        let merged = merge_telemetry_with_config(cli, &runtime);
2440        assert!(
2441            merged.log_dir.is_none(),
2442            "--no-log-file must veto config dir"
2443        );
2444    }
2445
2446    #[test]
2447    fn config_format_promoted_on_non_tty_default() {
2448        // On non-TTY, default_telemetry_for_path yields format=Json even
2449        // though TelemetryConfig::default() is Pretty. The old equality
2450        // check silently dropped config here.
2451        let runtime = fresh_runtime();
2452        set_str(&runtime, "red.logging.format", "pretty");
2453        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2454        assert_eq!(merged.format, LogFormat::Pretty);
2455    }
2456
2457    #[test]
2458    fn explicit_format_wins_over_config() {
2459        let runtime = fresh_runtime();
2460        set_str(&runtime, "red.logging.format", "pretty");
2461        let mut cli = cli_base();
2462        cli.format = LogFormat::Json;
2463        cli.format_explicit = true;
2464        let merged = merge_telemetry_with_config(cli, &runtime);
2465        assert_eq!(merged.format, LogFormat::Json);
2466    }
2467}
2468
2469#[inline(never)]
2470fn build_http_server(
2471    runtime: RedDBRuntime,
2472    auth_store: Arc<AuthStore>,
2473    bind_addr: String,
2474) -> RedDBServer {
2475    build_http_server_with_transport_readiness(
2476        runtime,
2477        auth_store,
2478        bind_addr,
2479        TransportReadiness::default(),
2480    )
2481}
2482
2483/// Apply the resolved HTTP limits to a freshly-built `RedDBServer`.
2484///
2485/// Centralised here so every `run_*` path goes through the same
2486/// resolver and the structured startup log line carries the same
2487/// `http_limits.*` fields regardless of transport combination.
2488fn apply_http_limits(
2489    server: RedDBServer,
2490    config: &ServerCommandConfig,
2491    runtime: &RedDBRuntime,
2492) -> RedDBServer {
2493    let store = runtime.db().store();
2494    let resolved =
2495        crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2496            .get_config(key)
2497        {
2498            Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2499            Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2500            Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2501            _ => None,
2502        });
2503    tracing::info!(
2504        target: "reddb::http_limits",
2505        max_handlers = resolved.max_handlers,
2506        handler_timeout_ms = resolved.handler_timeout_ms,
2507        retry_after_secs = resolved.retry_after_secs,
2508        max_inflight_per_principal = resolved.max_inflight_per_principal,
2509        "http_limits resolved"
2510    );
2511    server.with_http_limits(resolved)
2512}
2513
2514/// `red server --ui` (issue #1047, ADR 0051): attach the resolved red-ui
2515/// bundle directory to an HTTP server so it also serves the bundle as
2516/// static assets. No-op when `--ui` is off.
2517///
2518/// An explicit `--ui-dir <DIR>` is served as-is (no download); otherwise
2519/// the pinned bundle is resolved from the local cache, downloading on
2520/// first use (ADR 0050). A resolution failure is fatal: the operator
2521/// explicitly asked for the UI, so silently serving API-only would be
2522/// surprising and would mask an offline/checksum problem.
2523fn apply_ui_bundle(
2524    server: RedDBServer,
2525    config: &ServerCommandConfig,
2526) -> Result<RedDBServer, String> {
2527    if !config.ui {
2528        return Ok(server);
2529    }
2530    let ui_dir = match &config.ui_dir {
2531        Some(dir) => dir.clone(),
2532        None => {
2533            let cache_root = crate::server::ui_bundle_resolver::reddb_user_cache_root()
2534                .unwrap_or_else(|_| std::env::temp_dir().join("reddb"));
2535            crate::server::ui_bundle_resolver::resolve_ui_bundle(
2536                &cache_root,
2537                &crate::server::ui_bundle_resolver::HttpFetcher,
2538            )
2539            .map_err(|err| format!("resolve red-ui bundle for --ui: {err}"))?
2540        }
2541    };
2542    tracing::info!(target: "reddb::ui", dir = %ui_dir.display(), "serving red-ui bundle on HTTP surface");
2543    Ok(server.with_ui_dir(ui_dir))
2544}
2545
2546#[inline(never)]
2547fn build_http_server_with_transport_readiness(
2548    runtime: RedDBRuntime,
2549    auth_store: Arc<AuthStore>,
2550    bind_addr: String,
2551    transport_readiness: TransportReadiness,
2552) -> RedDBServer {
2553    RedDBServer::with_options(
2554        runtime,
2555        ServerOptions {
2556            bind_addr,
2557            transport_readiness,
2558            ..ServerOptions::default()
2559        },
2560    )
2561    .with_auth(auth_store)
2562}
2563
2564/// PLAN.md Phase 6.2 — build a listener that only serves
2565/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
2566/// when the env var has no host (loopback-only by default per spec).
2567#[inline(never)]
2568fn build_admin_only_server(
2569    runtime: RedDBRuntime,
2570    auth_store: Arc<AuthStore>,
2571    bind_addr: String,
2572) -> RedDBServer {
2573    RedDBServer::with_options(
2574        runtime,
2575        ServerOptions {
2576            bind_addr,
2577            surface: crate::server::ServerSurface::AdminOnly,
2578            ..ServerOptions::default()
2579        },
2580    )
2581    .with_auth(auth_store)
2582}
2583
2584/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
2585/// + `/health/*`. Suitable for Prometheus scrape ports that may be
2586///   exposed wider than the admin port.
2587#[inline(never)]
2588fn build_metrics_only_server(
2589    runtime: RedDBRuntime,
2590    auth_store: Arc<AuthStore>,
2591    bind_addr: String,
2592) -> RedDBServer {
2593    RedDBServer::with_options(
2594        runtime,
2595        ServerOptions {
2596            bind_addr,
2597            surface: crate::server::ServerSurface::MetricsOnly,
2598            ..ServerOptions::default()
2599        },
2600    )
2601    .with_auth(auth_store)
2602}
2603
2604/// Spawn dedicated admin / metrics listeners when the operator set
2605/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
2606/// unset the existing listener keeps serving everything (back-compat).
2607fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2608    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2609        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2610        let _ = server.serve_in_background();
2611        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2612    }
2613    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2614        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2615        let _ = server.serve_in_background();
2616        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2617    }
2618}
2619
2620#[inline(never)]
2621fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2622    let cli_telemetry = config.telemetry.clone();
2623    let mut transport_readiness = TransportReadiness::default();
2624    let Some(listener) = bind_listener_for_startup(
2625        &mut transport_readiness,
2626        "http",
2627        &bind_addr,
2628        config.http_bind_explicit,
2629    )?
2630    else {
2631        return Err(format!(
2632            "no HTTP listener started; implicit bind {} failed",
2633            bind_addr
2634        ));
2635    };
2636    let db_options = config.to_db_options()?;
2637    let (runtime, auth_store, _telemetry_guard) =
2638        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2639    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2640    spawn_admin_metrics_listeners(&runtime, &auth_store);
2641    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2642    let server = build_http_server_with_transport_readiness(
2643        runtime.clone(),
2644        auth_store,
2645        bind_addr.clone(),
2646        transport_readiness,
2647    );
2648    let server = apply_http_limits(server, &config, &runtime);
2649    let server = apply_ui_bundle(server, &config)?;
2650    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2651    server.serve_on(listener).map_err(|err| err.to_string())
2652}
2653
2654/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
2655/// rustls-terminated listener alongside the plain HTTP server. Cert
2656/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
2657///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
2658///   is auto-generated next to the data directory (refused otherwise).
2659fn spawn_http_tls_listener(
2660    config: &ServerCommandConfig,
2661    runtime: &RedDBRuntime,
2662    auth_store: &Arc<AuthStore>,
2663) -> Result<(), String> {
2664    let Some(addr) = config.http_tls_bind_addr.clone() else {
2665        return Ok(());
2666    };
2667
2668    let tls_config = resolve_http_tls_config(config)?;
2669    let server_config = crate::server::tls::build_server_config(&tls_config)
2670        .map_err(|err| format!("HTTP TLS: {err}"))?;
2671
2672    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2673    let server = apply_http_limits(server, config, runtime);
2674    let _handle = server.serve_tls_in_background(server_config);
2675    tracing::info!(
2676        transport = "https",
2677        bind = %addr,
2678        mtls = %tls_config.client_ca_path.is_some(),
2679        "TLS listener online"
2680    );
2681    Ok(())
2682}
2683
2684/// Resolve the HTTP TLS config from CLI / env / dev defaults.
2685fn resolve_http_tls_config(
2686    config: &ServerCommandConfig,
2687) -> Result<crate::server::tls::HttpTlsConfig, String> {
2688    match (&config.http_tls_cert, &config.http_tls_key) {
2689        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2690            cert_path: cert.clone(),
2691            key_path: key.clone(),
2692            client_ca_path: config.http_tls_client_ca.clone(),
2693        }),
2694        (None, None) => {
2695            // Dev-mode auto-generate next to the data directory.
2696            let dir = config
2697                .path
2698                .as_ref()
2699                .and_then(|p| p.parent().map(std::path::PathBuf::from))
2700                .unwrap_or_else(|| std::path::PathBuf::from("."));
2701            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2702                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2703            Ok(crate::server::tls::HttpTlsConfig {
2704                cert_path: auto.cert_path,
2705                key_path: auto.key_path,
2706                client_ca_path: config.http_tls_client_ca.clone(),
2707            })
2708        }
2709        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2710    }
2711}
2712
2713#[inline(never)]
2714fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2715    let workers = config.workers;
2716    let cli_telemetry = config.telemetry.clone();
2717    let db_options = config.to_db_options()?;
2718    let rt_config = detect_runtime_config();
2719    let mut transport_readiness = TransportReadiness::default();
2720    let Some(grpc_listener) = bind_listener_for_startup(
2721        &mut transport_readiness,
2722        "grpc",
2723        &bind_addr,
2724        config.grpc_bind_explicit,
2725    )?
2726    else {
2727        return Err(format!(
2728            "no gRPC listener started; implicit bind {} failed",
2729            bind_addr
2730        ));
2731    };
2732
2733    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2734
2735    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2736        .enable_all()
2737        .worker_threads(worker_threads)
2738        .thread_stack_size(rt_config.stack_size)
2739        .build()
2740        .map_err(|err| format!("tokio runtime: {err}"))?;
2741
2742    // Guard lives on the outer stack so it outlives the tokio runtime.
2743    let (runtime, auth_store, _telemetry_guard) =
2744        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2745    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2746    let signal_runtime = runtime.clone();
2747    tokio_runtime.block_on(async move {
2748        spawn_lifecycle_signal_handler(signal_runtime).await;
2749        // Start wire protocol listeners (plaintext + TLS)
2750        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2751
2752        // Start PostgreSQL wire listener when --pg-bind is configured.
2753        spawn_pg_listener(&config, &runtime);
2754
2755        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
2756        // it spawns a separate listener so plaintext + TLS can run
2757        // side-by-side (50051 plain + 50052 TLS, etc.).
2758        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2759
2760        let server = RedDBGrpcServer::with_options(
2761            runtime,
2762            GrpcServerOptions {
2763                bind_addr: bind_addr.clone(),
2764                tls: None,
2765            },
2766            auth_store,
2767        );
2768
2769        tracing::info!(
2770            transport = "grpc",
2771            bind = %bind_addr,
2772            cpus = rt_config.available_cpus,
2773            workers = worker_threads,
2774            "listener online"
2775        );
2776        server
2777            .serve_on(grpc_listener)
2778            .await
2779            .map_err(|err| err.to_string())
2780    })
2781}
2782
2783#[inline(never)]
2784fn run_dual_server(
2785    config: ServerCommandConfig,
2786    grpc_bind_addr: String,
2787    http_bind_addr: String,
2788) -> Result<(), String> {
2789    let workers = config.workers;
2790    let cli_telemetry = config.telemetry.clone();
2791    let db_options = config.to_db_options()?;
2792    let rt_config = detect_runtime_config();
2793    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2794    let mut transport_readiness = TransportReadiness::default();
2795    let http_listener = bind_listener_for_startup(
2796        &mut transport_readiness,
2797        "http",
2798        &http_bind_addr,
2799        config.http_bind_explicit,
2800    )?;
2801    let grpc_listener = bind_listener_for_startup(
2802        &mut transport_readiness,
2803        "grpc",
2804        &grpc_bind_addr,
2805        config.grpc_bind_explicit,
2806    )?;
2807    if http_listener.is_none() && grpc_listener.is_none() {
2808        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2809    }
2810    let (runtime, auth_store, _telemetry_guard) =
2811        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2812    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2813
2814    spawn_admin_metrics_listeners(&runtime, &auth_store);
2815    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2816
2817    let http_handle = if let Some(listener) = http_listener {
2818        let http_server = build_http_server_with_transport_readiness(
2819            runtime.clone(),
2820            auth_store.clone(),
2821            http_bind_addr.clone(),
2822            transport_readiness.clone(),
2823        );
2824        let http_server = apply_http_limits(http_server, &config, &runtime);
2825        let http_server = apply_ui_bundle(http_server, &config)?;
2826        Some(http_server.serve_in_background_on(listener))
2827    } else {
2828        None
2829    };
2830
2831    thread::sleep(Duration::from_millis(150));
2832    if let Some(handle) = http_handle.as_ref() {
2833        if handle.is_finished() {
2834            let handle = http_handle.unwrap();
2835            return match handle.join() {
2836                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2837                Ok(Err(err)) => Err(err.to_string()),
2838                Err(_) => Err("HTTP server thread panicked".to_string()),
2839            };
2840        }
2841    }
2842    if grpc_listener.is_none() {
2843        let Some(handle) = http_handle else {
2844            return Err("no listener started".to_string());
2845        };
2846        return match handle.join() {
2847            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2848            Ok(Err(err)) => Err(err.to_string()),
2849            Err(_) => Err("HTTP server thread panicked".to_string()),
2850        };
2851    }
2852    let grpc_listener = grpc_listener.expect("checked above");
2853
2854    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2855        .enable_all()
2856        .worker_threads(worker_threads)
2857        .thread_stack_size(rt_config.stack_size)
2858        .build()
2859        .map_err(|err| format!("tokio runtime: {err}"))?;
2860
2861    let signal_runtime = runtime.clone();
2862    tokio_runtime.block_on(async move {
2863        spawn_lifecycle_signal_handler(signal_runtime).await;
2864        // Start wire protocol listeners (plaintext + TLS)
2865        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2866
2867        // Start PostgreSQL wire listener when --pg-bind is configured.
2868        spawn_pg_listener(&config, &runtime);
2869
2870        // Optional TLS gRPC listener — runs alongside the plaintext one.
2871        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2872
2873        let server = RedDBGrpcServer::with_options(
2874            runtime,
2875            GrpcServerOptions {
2876                bind_addr: grpc_bind_addr.clone(),
2877                tls: None,
2878            },
2879            auth_store,
2880        );
2881
2882        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2883        tracing::info!(
2884            transport = "grpc",
2885            bind = %grpc_bind_addr,
2886            cpus = rt_config.available_cpus,
2887            workers = worker_threads,
2888            "listener online"
2889        );
2890        server
2891            .serve_on(grpc_listener)
2892            .await
2893            .map_err(|err| err.to_string())
2894    })
2895}
2896
2897#[cfg(test)]
2898mod tests {
2899    use super::*;
2900
2901    #[test]
2902    fn render_systemd_unit_contains_expected_execstart() {
2903        let config = SystemdServiceConfig {
2904            service_name: "reddb".to_string(),
2905            binary_path: PathBuf::from("/usr/local/bin/red"),
2906            run_user: "reddb".to_string(),
2907            run_group: "reddb".to_string(),
2908            data_path: reddb_file::default_service_database_path(),
2909            router_bind_addr: None,
2910            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2911            http_bind_addr: None,
2912        };
2913
2914        let unit = render_systemd_unit(&config);
2915        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2916        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2917    }
2918
2919    #[test]
2920    fn systemd_service_config_derives_paths() {
2921        let config = SystemdServiceConfig {
2922            service_name: "reddb-api".to_string(),
2923            binary_path: PathBuf::from("/usr/local/bin/red"),
2924            run_user: "reddb".to_string(),
2925            run_group: "reddb".to_string(),
2926            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2927            router_bind_addr: None,
2928            grpc_bind_addr: None,
2929            http_bind_addr: Some("127.0.0.1:5055".to_string()),
2930        };
2931
2932        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2933        assert_eq!(
2934            config.unit_path(),
2935            PathBuf::from("/etc/systemd/system/reddb-api.service")
2936        );
2937    }
2938
2939    #[test]
2940    fn render_systemd_unit_supports_dual_transport() {
2941        let config = SystemdServiceConfig {
2942            service_name: "reddb".to_string(),
2943            binary_path: PathBuf::from("/usr/local/bin/red"),
2944            run_user: "reddb".to_string(),
2945            run_group: "reddb".to_string(),
2946            data_path: reddb_file::default_service_database_path(),
2947            router_bind_addr: None,
2948            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2949            http_bind_addr: Some("0.0.0.0:5055".to_string()),
2950        };
2951
2952        let unit = render_systemd_unit(&config);
2953        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2954        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2955    }
2956
2957    #[test]
2958    fn render_systemd_unit_supports_router_mode() {
2959        let config = SystemdServiceConfig {
2960            service_name: "reddb".to_string(),
2961            binary_path: PathBuf::from("/usr/local/bin/red"),
2962            run_user: "reddb".to_string(),
2963            run_group: "reddb".to_string(),
2964            data_path: reddb_file::default_service_database_path(),
2965            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2966            grpc_bind_addr: None,
2967            http_bind_addr: None,
2968        };
2969
2970        let unit = render_systemd_unit(&config);
2971        assert!(unit.contains("--bind 127.0.0.1:5050"));
2972        assert!(!unit.contains("--grpc-bind"));
2973        assert!(!unit.contains("--http-bind"));
2974    }
2975
2976    #[test]
2977    fn explicit_bind_collision_is_fatal() {
2978        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2979        let addr = held.local_addr().expect("held addr").to_string();
2980        let mut readiness = TransportReadiness::default();
2981
2982        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2983
2984        assert!(error.contains("explicit http listener bind"));
2985        assert_eq!(readiness.active.len(), 0);
2986        assert_eq!(readiness.failed.len(), 1);
2987        assert!(readiness.failed[0].explicit);
2988        assert_eq!(readiness.failed[0].bind_addr, addr);
2989    }
2990
2991    // ---------- Issue #663 — `--no-auth` / `--dev` ----------
2992
2993    // Env access in tests is process-global; serialise the two
2994    // `--no-auth` tests so the REDDB_USERNAME / REDDB_PASSWORD pair
2995    // one of them sets cannot leak into the other under cargo's
2996    // default parallel runner.
2997    fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2998        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2999        LOCK.get_or_init(|| std::sync::Mutex::new(()))
3000    }
3001
3002    fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
3003        ServerCommandConfig {
3004            path: None,
3005            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
3006            router_bind_explicit: false,
3007            grpc_bind_addr: None,
3008            grpc_bind_explicit: false,
3009            grpc_tls_bind_addr: None,
3010            grpc_tls_cert: None,
3011            grpc_tls_key: None,
3012            grpc_tls_client_ca: None,
3013            http_bind_addr: None,
3014            http_bind_explicit: false,
3015            http_tls_bind_addr: None,
3016            http_tls_cert: None,
3017            http_tls_key: None,
3018            http_tls_client_ca: None,
3019            wire_bind_addr: None,
3020            wire_bind_explicit: false,
3021            wire_tls_bind_addr: None,
3022            wire_tls_cert: None,
3023            wire_tls_key: None,
3024            pg_bind_addr: None,
3025            create_if_missing: true,
3026            read_only: false,
3027            role: "standalone".to_string(),
3028            primary_addr: None,
3029            storage_profile: StorageProfileSelection::embedded_single_file(),
3030            // Operator-set `--vault`: `--no-auth` must override this
3031            // alongside REDDB_USERNAME/PASSWORD.
3032            vault: true,
3033            no_auth,
3034            workers: None,
3035            telemetry: None,
3036            http_limits_cli: crate::server::HttpLimitsCliInput::default(),
3037            ui: false,
3038            ui_dir: None,
3039        }
3040    }
3041
3042    #[test]
3043    fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
3044        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3045        // Pre-existing env that *would* turn auth on if `--no-auth`
3046        // weren't the last word. The acceptance criterion is that
3047        // the flag wins over env.
3048        // SAFETY: serialised by `no_auth_env_lock` above.
3049        unsafe {
3050            std::env::set_var("REDDB_USERNAME", "admin");
3051            std::env::set_var("REDDB_PASSWORD", "hunter2");
3052        }
3053        let config = no_auth_test_config(true);
3054        let options = config.to_db_options().expect("to_db_options");
3055
3056        assert!(no_auth_active(&options), "metadata should be stamped");
3057        assert!(!options.auth.enabled, "auth.enabled must be forced off");
3058        assert!(
3059            !options.auth.require_auth,
3060            "require_auth must be forced off"
3061        );
3062        assert!(
3063            !options.auth.vault_enabled,
3064            "vault_enabled must be forced off (overrides --vault)"
3065        );
3066        assert_eq!(
3067            options.metadata.get(NO_AUTH_META).map(String::as_str),
3068            Some("true"),
3069        );
3070
3071        // SAFETY: serialised by `no_auth_env_lock` above.
3072        unsafe {
3073            std::env::remove_var("REDDB_USERNAME");
3074            std::env::remove_var("REDDB_PASSWORD");
3075        }
3076    }
3077
3078    #[test]
3079    fn default_behaviour_without_no_auth_flag_is_unchanged() {
3080        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3081        let config = no_auth_test_config(false);
3082        let options = config.to_db_options().expect("to_db_options");
3083
3084        assert!(
3085            !no_auth_active(&options),
3086            "default boot must not be marked no-auth"
3087        );
3088        assert!(
3089            options.metadata.get(NO_AUTH_META).is_none(),
3090            "metadata key must be absent when flag is off"
3091        );
3092        // `--vault` should still take effect when `--no-auth` is not set.
3093        assert!(options.auth.vault_enabled);
3094    }
3095
3096    #[test]
3097    fn no_auth_active_blocks_bootstrap_from_env() {
3098        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3099        // SAFETY: serialised by `no_auth_env_lock` above. The pair
3100        // would normally cause `AuthStore::bootstrap_from_env` to
3101        // create an admin; the boot pipeline must suppress that call
3102        // whenever `no_auth_active` is true.
3103        unsafe {
3104            std::env::set_var("REDDB_USERNAME", "admin");
3105            std::env::set_var("REDDB_PASSWORD", "hunter2");
3106        }
3107
3108        let options = no_auth_test_config(true)
3109            .to_db_options()
3110            .expect("to_db_options");
3111
3112        // Mirror the exact branch in `build_runtime_with_telemetry`:
3113        // build a non-vault AuthStore from `options.auth`, then call
3114        // `bootstrap_from_env` *only* when the no-auth gate is off.
3115        let auth_store = AuthStore::new(options.auth.clone());
3116        if !no_auth_active(&options) {
3117            auth_store.bootstrap_from_env();
3118        }
3119
3120        assert!(
3121            auth_store.needs_bootstrap(),
3122            "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
3123        );
3124
3125        // SAFETY: serialised by `no_auth_env_lock` above.
3126        unsafe {
3127            std::env::remove_var("REDDB_USERNAME");
3128            std::env::remove_var("REDDB_PASSWORD");
3129        }
3130    }
3131
3132    // ---------- Issue #650 — bootstrap presets ----------
3133
3134    // Preset tests mutate process-global env (`REDDB_PRESET`,
3135    // `REDDB_USERNAME`, `REDDB_PASSWORD`) and the global tracing
3136    // subscriber. Share the no_auth lock so they don't race with each
3137    // other or with the --no-auth tests above.
3138    fn clear_preset_env() {
3139        // SAFETY: callers hold `no_auth_env_lock()`.
3140        unsafe {
3141            std::env::remove_var(PRESET_ENV);
3142            std::env::remove_var("REDDB_BOOTSTRAP_MANIFEST");
3143            std::env::remove_var("REDDB_USERNAME");
3144            std::env::remove_var("REDDB_PASSWORD");
3145            std::env::remove_var("REDDB_USERNAME_FILE");
3146            std::env::remove_var("REDDB_PASSWORD_FILE");
3147        }
3148    }
3149
3150    fn clear_backup_env() {
3151        // SAFETY: callers hold `no_auth_env_lock()`.
3152        unsafe {
3153            std::env::remove_var("REDDB_BACKUP_S3_ENDPOINT");
3154            std::env::remove_var("REDDB_BACKUP_S3_BUCKET");
3155            std::env::remove_var("REDDB_BACKUP_S3_PREFIX");
3156            std::env::remove_var("REDDB_BACKUP_S3_ACCESS_KEY_ID");
3157            std::env::remove_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY");
3158            std::env::remove_var("REDDB_BACKUP_S3_REGION");
3159            std::env::remove_var("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS");
3160            std::env::remove_var("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS");
3161            std::env::remove_var("REDDB_BACKUP_PAUSE_ON_LAG_SECS");
3162        }
3163    }
3164
3165    fn fresh_runtime_and_store() -> (RedDBRuntime, Arc<AuthStore>) {
3166        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
3167        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3168        (runtime, auth_store)
3169    }
3170
3171    #[test]
3172    fn simple_preset_is_default_and_persists_state() {
3173        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3174        clear_preset_env();
3175
3176        let (runtime, auth_store) = fresh_runtime_and_store();
3177        apply_preset(&runtime, &auth_store).expect("simple preset applies cleanly");
3178
3179        // No admin was created — `simple` is anonymous-friendly.
3180        assert!(
3181            auth_store.needs_bootstrap(),
3182            "simple preset must not create an admin"
3183        );
3184
3185        // Bootstrap state persisted so the next boot is a no-op.
3186        let store = runtime.db().store();
3187        let completed = store
3188            .get_config(BOOTSTRAP_COMPLETED_KEY)
3189            .expect("completed key persisted");
3190        assert!(matches!(
3191            completed,
3192            crate::storage::schema::Value::Boolean(true)
3193        ));
3194        let preset = store
3195            .get_config(BOOTSTRAP_PRESET_KEY)
3196            .expect("preset key persisted");
3197        match preset {
3198            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_SIMPLE),
3199            other => panic!("expected Text(simple), got {other:?}"),
3200        }
3201        assert!(
3202            store.get_config(BOOTSTRAP_FIRST_ADMIN_KEY).is_none(),
3203            "simple preset must not record a first admin"
3204        );
3205
3206        clear_preset_env();
3207    }
3208
3209    #[test]
3210    fn production_preset_creates_first_admin_with_allow_all_policy() {
3211        use crate::auth::policies::{EvalContext, ResourceRef};
3212        use crate::auth::UserId;
3213
3214        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3215        clear_preset_env();
3216        // SAFETY: env serialised by `no_auth_env_lock`.
3217        unsafe {
3218            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3219            std::env::set_var("REDDB_USERNAME", "ops");
3220            std::env::set_var("REDDB_PASSWORD", "hunter2");
3221        }
3222
3223        let (runtime, auth_store) = fresh_runtime_and_store();
3224        apply_preset(&runtime, &auth_store).expect("production preset applies cleanly");
3225
3226        // Admin exists and the auth store is sealed.
3227        assert!(
3228            !auth_store.needs_bootstrap(),
3229            "production preset must seal bootstrap"
3230        );
3231        let users = auth_store.list_users();
3232        assert_eq!(users.len(), 1);
3233        let admin = &users[0];
3234        assert_eq!(admin.username, "ops");
3235        assert!(
3236            admin.system_owned,
3237            "first admin must be system-owned to pass the managed-config gate"
3238        );
3239        assert!(
3240            admin.tenant_id.is_none(),
3241            "first admin must be platform-scoped (tenant=None)"
3242        );
3243
3244        // Allow-all policy was installed and attached to the first admin.
3245        let policy = auth_store
3246            .get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY)
3247            .expect("allow-all policy installed");
3248        assert!(!policy.statements.is_empty());
3249
3250        // Verify policy-derived authority via the policy evaluator —
3251        // not a bypass. Any action on any resource must Allow.
3252        let actor = UserId::platform("ops");
3253        let ctx = EvalContext {
3254            principal_tenant: None,
3255            current_tenant: None,
3256            peer_ip: None,
3257            mfa_present: false,
3258            now_ms: 1_700_000_000_000,
3259            principal_is_admin_role: true,
3260            principal_is_system_owned: true,
3261            principal_is_platform_scoped: true,
3262        };
3263        let arbitrary_resource = ResourceRef::new("config", "red.config.audit.enabled");
3264        assert!(
3265            auth_store.check_policy_authz(&actor, "config:write", &arbitrary_resource, &ctx),
3266            "allow-all policy must grant arbitrary actions via the evaluator"
3267        );
3268
3269        // Persisted state records the first admin id.
3270        let store = runtime.db().store();
3271        match store
3272            .get_config(BOOTSTRAP_FIRST_ADMIN_KEY)
3273            .expect("first_admin_id persisted")
3274        {
3275            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), "ops"),
3276            other => panic!("expected Text(ops), got {other:?}"),
3277        }
3278        match store.get_config(BOOTSTRAP_PRESET_KEY).unwrap() {
3279            crate::storage::schema::Value::Text(s) => assert_eq!(s.as_ref(), PRESET_PRODUCTION),
3280            other => panic!("expected Text(production), got {other:?}"),
3281        }
3282
3283        clear_preset_env();
3284    }
3285
3286    #[test]
3287    fn regulated_preset_enables_query_audit_infrastructure_without_rules() {
3288        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3289        clear_preset_env();
3290        // SAFETY: env serialised by `no_auth_env_lock`.
3291        unsafe {
3292            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3293        }
3294
3295        let (runtime, auth_store) = fresh_runtime_and_store();
3296        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3297
3298        assert!(runtime.query_audit().is_enabled());
3299        assert!(runtime.query_audit().rules().is_empty());
3300        assert!(
3301            runtime
3302                .db()
3303                .store()
3304                .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3305                .is_some(),
3306            "regulated preset should create the query-audit stream"
3307        );
3308
3309        runtime
3310            .execute_query("CREATE TABLE docs (id INT)")
3311            .expect("create table");
3312        runtime
3313            .execute_query("INSERT INTO docs (id) VALUES (1)")
3314            .expect("insert");
3315        runtime.execute_query("SELECT * FROM docs").expect("select");
3316        let rows = runtime
3317            .db()
3318            .store()
3319            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3320            .expect("query audit collection")
3321            .query_all(|_| true);
3322        assert!(
3323            rows.is_empty(),
3324            "regulated preset must not globally audit every query"
3325        );
3326
3327        clear_preset_env();
3328    }
3329
3330    #[test]
3331    fn managed_backup_env_rejects_primary_replica_single_file_storage() {
3332        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3333        clear_backup_env();
3334        // SAFETY: env serialised by `no_auth_env_lock`.
3335        unsafe {
3336            std::env::set_var("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.test");
3337            std::env::set_var("REDDB_BACKUP_S3_BUCKET", "reddb");
3338            std::env::set_var("REDDB_BACKUP_S3_PREFIX", "clusters/prod");
3339            std::env::set_var("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK");
3340            std::env::set_var("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK");
3341        }
3342
3343        let mut config = no_auth_test_config(false);
3344        config.role = "primary".to_string();
3345        config.storage_profile = crate::storage::StorageDeployPreset::PrimaryReplicaDev.selection();
3346
3347        let err = config.to_db_options().unwrap_err();
3348        assert!(err.contains("managed backup"), "got: {err}");
3349        assert!(err.contains("operational-directory"), "got: {err}");
3350
3351        clear_backup_env();
3352    }
3353
3354    #[test]
3355    fn regulated_preset_installs_managed_evidence_guardrails_end_to_end() {
3356        use crate::auth::policies::{EvalContext, Policy, ResourceRef};
3357        use crate::auth::store::PrincipalRef;
3358        use crate::auth::{Role, UserId};
3359        use crate::runtime::mvcc::{clear_current_auth_identity, set_current_auth_identity};
3360        use crate::storage::schema::Value;
3361
3362        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3363        clear_preset_env();
3364        // SAFETY: env serialised by `no_auth_env_lock`.
3365        unsafe {
3366            std::env::set_var(PRESET_ENV, PRESET_REGULATED);
3367        }
3368
3369        let options = no_auth_test_config(false)
3370            .to_db_options()
3371            .expect("regulated options");
3372        assert!(
3373            options.control_events.compliance_mode,
3374            "regulated preset must enable fail-closed control evidence before runtime boot"
3375        );
3376        assert!(
3377            options.query_audit.enabled && options.query_audit.rules.is_empty(),
3378            "regulated preset must enable query-audit infrastructure without global rules"
3379        );
3380
3381        let runtime = RedDBRuntime::with_options(options).expect("runtime");
3382        let auth_store = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3383        apply_preset(&runtime, &auth_store).expect("regulated preset applies cleanly");
3384        runtime.set_auth_store(Arc::clone(&auth_store));
3385
3386        assert!(runtime.control_events_require_persistence());
3387        assert!(runtime.query_audit().is_enabled());
3388        assert!(runtime.query_audit().rules().is_empty());
3389        assert!(auth_store
3390            .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3391            .is_some());
3392
3393        let managed_policy = runtime
3394            .config_registry()
3395            .get_active(REGULATED_PROTECT_MANAGED_POLICY)
3396            .expect("regulated managed policy registry entry");
3397        assert!(managed_policy.managed);
3398        assert_eq!(managed_policy.resource_type, "policy");
3399        assert!(
3400            runtime
3401                .config_registry()
3402                .get_active(REGULATED_AUDIT_CONFIG_NAMESPACE)
3403                .expect("regulated audit config namespace")
3404                .managed
3405        );
3406
3407        let registry_rows = runtime
3408            .execute_query(&format!(
3409                "SELECT id, managed FROM red.registry WHERE id = '{}'",
3410                REGULATED_PROTECT_MANAGED_POLICY
3411            ))
3412            .expect("red.registry query");
3413        assert_eq!(registry_rows.result.records.len(), 1);
3414        assert_eq!(
3415            registry_rows.result.records[0].get("managed"),
3416            Some(&Value::Boolean(true))
3417        );
3418
3419        let managed_policy_rows = runtime
3420            .execute_query(&format!(
3421                "SELECT policy_id FROM red.managed_policies WHERE policy_id = '{}'",
3422                REGULATED_PROTECT_MANAGED_POLICY
3423            ))
3424            .expect("red.managed_policies query");
3425        assert_eq!(managed_policy_rows.result.records.len(), 1);
3426
3427        let capability_rows = runtime
3428            .execute_query(
3429                "SELECT action FROM red.control_capabilities WHERE action = 'evidence:export'",
3430            )
3431            .expect("red.control_capabilities query");
3432        assert_eq!(capability_rows.result.records.len(), 1);
3433
3434        auth_store
3435            .create_user("alice", "p", Role::Admin)
3436            .expect("create ordinary admin");
3437        let allow_all = Policy::from_json_str(
3438            r#"{
3439                "id": "alice-allow-all",
3440                "version": 1,
3441                "statements": [{
3442                    "effect": "allow",
3443                    "actions": ["*"],
3444                    "resources": ["*"]
3445                }]
3446            }"#,
3447        )
3448        .expect("allow-all policy");
3449        auth_store.put_policy(allow_all).expect("install allow-all");
3450        auth_store
3451            .attach_policy(
3452                PrincipalRef::User(UserId::platform("alice")),
3453                "alice-allow-all",
3454            )
3455            .expect("attach allow-all");
3456        let ctx = EvalContext {
3457            principal_tenant: None,
3458            current_tenant: None,
3459            peer_ip: None,
3460            mfa_present: false,
3461            now_ms: 1_700_000_000_000,
3462            principal_is_admin_role: true,
3463            principal_is_system_owned: false,
3464            principal_is_platform_scoped: true,
3465        };
3466        assert!(
3467            auth_store.check_policy_authz(
3468                &UserId::platform("alice"),
3469                "policy:drop",
3470                &ResourceRef::new("policy", REGULATED_PROTECT_MANAGED_POLICY),
3471                &ctx,
3472            ),
3473            "ordinary allow-all policy should be broad enough that only the managed guardrail blocks"
3474        );
3475
3476        set_current_auth_identity("alice".to_string(), Role::Admin);
3477        let denied = runtime.execute_query(&format!(
3478            "DROP POLICY '{}'",
3479            REGULATED_PROTECT_MANAGED_POLICY
3480        ));
3481        clear_current_auth_identity();
3482        let err = denied.expect_err("managed policy guardrail must deny ordinary admin");
3483        assert!(
3484            err.to_string().contains("managed policy"),
3485            "error should name the managed guardrail: {err}"
3486        );
3487        assert!(
3488            auth_store
3489                .get_policy(REGULATED_PROTECT_MANAGED_POLICY)
3490                .is_some(),
3491            "denied mutation must leave managed policy installed"
3492        );
3493
3494        let denied_events = runtime
3495            .execute_query(&format!(
3496                "SELECT action, resource, outcome FROM red.control_events \
3497                 WHERE action = 'policy:drop' AND resource = 'policy:{}'",
3498                REGULATED_PROTECT_MANAGED_POLICY
3499            ))
3500            .expect("red.control_events denied policy drop");
3501        assert_eq!(denied_events.result.records.len(), 1);
3502        assert_eq!(
3503            denied_events.result.records[0].get("outcome"),
3504            Some(&Value::text("denied"))
3505        );
3506
3507        set_current_auth_identity("alice".to_string(), Role::Admin);
3508        let config_denied = runtime.execute_query("SET CONFIG red.config.audit.enabled = true");
3509        clear_current_auth_identity();
3510        let err = config_denied.expect_err("managed config guardrail must deny ordinary admin");
3511        assert!(
3512            err.to_string().contains("managed config"),
3513            "error should name the managed config guardrail: {err}"
3514        );
3515
3516        let denied_config_events = runtime
3517            .execute_query(
3518                "SELECT action, resource, outcome FROM red.control_events \
3519                 WHERE action = 'config:write' AND resource = 'config:red.config.audit.enabled'",
3520            )
3521            .expect("red.control_events denied config write");
3522        assert_eq!(denied_config_events.result.records.len(), 1);
3523        assert_eq!(
3524            denied_config_events.result.records[0].get("outcome"),
3525            Some(&Value::text("denied"))
3526        );
3527
3528        runtime
3529            .execute_query("CREATE TABLE regulated_docs (id INT)")
3530            .expect("create user table");
3531        runtime
3532            .execute_query("SELECT * FROM regulated_docs")
3533            .expect("select user table");
3534        let audit_rows = runtime
3535            .db()
3536            .store()
3537            .get_collection(crate::runtime::query_audit::QUERY_AUDIT_COLLECTION)
3538            .expect("query audit collection")
3539            .query_all(|_| true);
3540        assert!(
3541            audit_rows.is_empty(),
3542            "regulated preset must not globally audit data-plane queries"
3543        );
3544
3545        clear_preset_env();
3546    }
3547
3548    #[test]
3549    fn bootstrap_manifest_installs_initial_users_policies_guardrails_and_config() {
3550        use crate::auth::policies::{EvalContext, ResourceRef};
3551        use crate::auth::UserId;
3552        use crate::storage::schema::Value;
3553
3554        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3555        clear_preset_env();
3556
3557        let manifest_dir = std::env::current_dir()
3558            .expect("current dir")
3559            .join(".red/tmp/bootstrap-manifest-tests");
3560        std::fs::create_dir_all(&manifest_dir).expect("create manifest test dir");
3561        let manifest_path = manifest_dir.join(format!(
3562            "reddb-bootstrap-manifest-{}-{}.json",
3563            std::process::id(),
3564            std::time::SystemTime::now()
3565                .duration_since(std::time::UNIX_EPOCH)
3566                .unwrap_or_default()
3567                .as_millis()
3568        ));
3569        std::fs::write(
3570            &manifest_path,
3571            r#"{
3572                "users": [
3573                    {
3574                        "username": "ops",
3575                        "password": "hunter2",
3576                        "role": "admin",
3577                        "system_owned": true
3578                    }
3579                ],
3580                "policies": [
3581                    {
3582                        "id": "bootstrap-registry-admin",
3583                        "version": 1,
3584                        "statements": [
3585                            {
3586                                "effect": "allow",
3587                                "actions": ["red.registry:*", "policy:*", "config:write", "select"],
3588                                "resources": ["registry:*", "policy:*", "config:*", "collection:docs"]
3589                            }
3590                        ]
3591                    }
3592                ],
3593                "managed_policies": [
3594                    {
3595                        "id": "managed-deny-drop",
3596                        "version": 1,
3597                        "statements": [
3598                            {
3599                                "effect": "deny",
3600                                "actions": ["policy:drop"],
3601                                "resources": ["policy:managed-deny-drop"]
3602                            }
3603                        ],
3604                        "required_resource": "policy:managed-deny-drop",
3605                        "evidence": "full"
3606                    }
3607                ],
3608                "attachments": [
3609                    {"user": "ops", "policy": "bootstrap-registry-admin"}
3610                ],
3611                "managed_config_namespaces": [
3612                    {
3613                        "id": "red.ai",
3614                        "required_action": "config:write",
3615                        "required_resource": "config:red.ai.*",
3616                        "evidence": "metadata"
3617                    }
3618                ],
3619                "config": [
3620                    {"key": "red.ai.default.provider", "value": "openai"},
3621                    {
3622                        "key": "red.ai.openai.default.secret_ref",
3623                        "secret_ref": {"collection": "red.vault", "key": "openai"}
3624                    }
3625                ],
3626                "actor": "ops"
3627            }"#,
3628        )
3629        .expect("write manifest");
3630        // SAFETY: env serialised by `no_auth_env_lock`.
3631        unsafe {
3632            std::env::set_var("REDDB_BOOTSTRAP_MANIFEST", &manifest_path);
3633        }
3634
3635        let (runtime, auth_store) = fresh_runtime_and_store();
3636        apply_preset(&runtime, &auth_store).expect("manifest applies cleanly");
3637
3638        let users = auth_store.list_users();
3639        assert_eq!(users.len(), 1);
3640        assert_eq!(users[0].username, "ops");
3641        assert!(users[0].system_owned);
3642
3643        let actor = UserId::platform("ops");
3644        let ctx = EvalContext {
3645            principal_tenant: None,
3646            current_tenant: None,
3647            peer_ip: None,
3648            mfa_present: false,
3649            now_ms: 1_700_000_000_000,
3650            principal_is_admin_role: true,
3651            principal_is_system_owned: true,
3652            principal_is_platform_scoped: true,
3653        };
3654        // Manifest fixture pins a canonical data-plane read action.
3655        assert!(auth_store.check_policy_authz(
3656            &actor,
3657            "select",
3658            &ResourceRef::new("collection", "docs"),
3659            &ctx
3660        ));
3661
3662        let managed_policy = runtime
3663            .config_registry()
3664            .get_active("managed-deny-drop")
3665            .expect("managed policy registry entry");
3666        assert!(managed_policy.managed);
3667        assert_eq!(managed_policy.resource_type, "policy");
3668        let managed_config = runtime
3669            .config_registry()
3670            .get_active("red.ai")
3671            .expect("managed config namespace registry entry");
3672        assert!(managed_config.managed);
3673        assert_eq!(managed_config.resource_type, "config_namespace");
3674
3675        let store = runtime.db().store();
3676        match store
3677            .get_config("red.ai.default.provider")
3678            .expect("plain config persisted")
3679        {
3680            Value::Text(s) => assert_eq!(s.as_ref(), "openai"),
3681            other => panic!("expected provider text, got {other:?}"),
3682        }
3683        let Value::Json(bytes) = store
3684            .get_config("red.ai.openai.default.secret_ref")
3685            .expect("secret ref config persisted")
3686        else {
3687            panic!("secret ref must be stored as structured JSON");
3688        };
3689        let reference: crate::serde_json::Value =
3690            crate::serde_json::from_slice(&bytes).expect("secret ref json");
3691        assert_eq!(
3692            reference.get("type").and_then(|v| v.as_str()),
3693            Some("secret_ref")
3694        );
3695        assert!(
3696            !String::from_utf8_lossy(&bytes).contains("hunter2"),
3697            "manifest password must not leak into secret ref config"
3698        );
3699
3700        let completed = store
3701            .get_config(BOOTSTRAP_COMPLETED_KEY)
3702            .expect("bootstrap completion persisted");
3703        assert!(matches!(completed, Value::Boolean(true)));
3704        assert!(
3705            store
3706                .get_config("system.bootstrap.manifest.registry_entries")
3707                .is_some(),
3708            "managed registry entries must be persisted internally"
3709        );
3710
3711        std::fs::remove_file(&manifest_path).expect("remove manifest after first boot");
3712        let restored_registry = Arc::new(crate::auth::registry::ConfigRegistry::new());
3713        crate::cli::bootstrap_manifest::rehydrate_manifest_registry(&runtime, &restored_registry)
3714            .expect("registry rehydrates without manifest file");
3715        assert!(restored_registry.get_active("managed-deny-drop").is_some());
3716        assert!(restored_registry.get_active("red.ai").is_some());
3717
3718        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3719        apply_preset(&runtime, &fresh).expect("re-run must not need manifest file");
3720        assert!(fresh.needs_bootstrap());
3721
3722        clear_preset_env();
3723    }
3724
3725    #[test]
3726    fn production_preset_refuses_to_start_without_password() {
3727        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3728        clear_preset_env();
3729        // SAFETY: env serialised by `no_auth_env_lock`.
3730        unsafe {
3731            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3732            std::env::set_var("REDDB_USERNAME", "ops");
3733        }
3734
3735        let (runtime, auth_store) = fresh_runtime_and_store();
3736        let err = apply_preset(&runtime, &auth_store).expect_err("must reject missing password");
3737        assert!(
3738            err.contains("REDDB_PASSWORD"),
3739            "error must name the missing env: {err}"
3740        );
3741
3742        // Nothing was persisted; nothing was created.
3743        assert!(auth_store.needs_bootstrap());
3744        assert!(runtime
3745            .db()
3746            .store()
3747            .get_config(BOOTSTRAP_COMPLETED_KEY)
3748            .is_none());
3749
3750        clear_preset_env();
3751    }
3752
3753    #[test]
3754    fn re_running_production_after_first_boot_is_a_silent_skip() {
3755        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3756        clear_preset_env();
3757        // SAFETY: env serialised by `no_auth_env_lock`.
3758        unsafe {
3759            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3760            std::env::set_var("REDDB_USERNAME", "ops");
3761            std::env::set_var("REDDB_PASSWORD", "hunter2");
3762        }
3763
3764        let (runtime, auth_store) = fresh_runtime_and_store();
3765        apply_preset(&runtime, &auth_store).expect("first apply");
3766        assert_eq!(auth_store.list_users().len(), 1);
3767
3768        // Second invocation: same runtime/store, same env. Must be a
3769        // no-op — no error, no duplicate admin, no duplicate policy.
3770        // We do NOT reuse `auth_store` because production sealed it; the
3771        // idempotency hinge is the persisted config key, not the auth
3772        // store's in-memory seal. Build a fresh `AuthStore` as a restart
3773        // would and confirm `apply_preset` is a silent skip.
3774        let fresh = Arc::new(AuthStore::new(crate::auth::AuthConfig::default()));
3775        apply_preset(&runtime, &fresh).expect("re-run is silent-skip");
3776        assert!(
3777            fresh.needs_bootstrap(),
3778            "re-run must not create a second admin"
3779        );
3780        assert!(
3781            fresh.get_policy(FIRST_ADMIN_ALLOW_ALL_POLICY).is_none(),
3782            "re-run must not re-install the allow-all policy on the fresh store"
3783        );
3784
3785        clear_preset_env();
3786    }
3787
3788    #[test]
3789    fn unrecognised_preset_value_is_rejected() {
3790        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3791        clear_preset_env();
3792        // SAFETY: env serialised by `no_auth_env_lock`.
3793        unsafe {
3794            std::env::set_var(PRESET_ENV, "weird");
3795        }
3796
3797        let (runtime, auth_store) = fresh_runtime_and_store();
3798        let err = apply_preset(&runtime, &auth_store).expect_err("must reject unknown preset");
3799        assert!(err.contains("weird"), "error must echo the value: {err}");
3800        assert!(auth_store.needs_bootstrap());
3801
3802        clear_preset_env();
3803    }
3804
3805    #[test]
3806    fn no_auth_short_circuits_preset_entirely() {
3807        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
3808        clear_preset_env();
3809        // SAFETY: env serialised by `no_auth_env_lock`. Production creds
3810        // are set but `--no-auth` must win — no admin, no bootstrap state.
3811        unsafe {
3812            std::env::set_var(PRESET_ENV, PRESET_PRODUCTION);
3813            std::env::set_var("REDDB_USERNAME", "ops");
3814            std::env::set_var("REDDB_PASSWORD", "hunter2");
3815        }
3816
3817        let options = no_auth_test_config(true)
3818            .to_db_options()
3819            .expect("to_db_options");
3820        assert!(no_auth_active(&options));
3821
3822        // Mirror `build_runtime_with_telemetry`: when no_auth_active,
3823        // `apply_preset` is never called.
3824        let (runtime, auth_store) = fresh_runtime_and_store();
3825        if !no_auth_active(&options) {
3826            apply_preset(&runtime, &auth_store).expect("would apply preset");
3827        }
3828
3829        assert!(
3830            auth_store.needs_bootstrap(),
3831            "--no-auth must prevent any admin creation"
3832        );
3833        assert!(
3834            runtime
3835                .db()
3836                .store()
3837                .get_config(BOOTSTRAP_COMPLETED_KEY)
3838                .is_none(),
3839            "--no-auth must skip bootstrap-state persistence"
3840        );
3841
3842        clear_preset_env();
3843    }
3844
3845    #[test]
3846    fn implicit_bind_collision_degrades() {
3847        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
3848        let addr = held.local_addr().expect("held addr").to_string();
3849        let mut readiness = TransportReadiness::default();
3850
3851        let listener =
3852            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
3853
3854        assert!(listener.is_none());
3855        assert_eq!(readiness.active.len(), 0);
3856        assert_eq!(readiness.failed.len(), 1);
3857        assert!(!readiness.failed[0].explicit);
3858        assert_eq!(readiness.failed[0].bind_addr, addr);
3859    }
3860}