Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18/// Detect available CPU cores and suggest worker thread count.
19pub fn detect_runtime_config() -> RuntimeConfig {
20    let cpus = thread::available_parallelism()
21        .map(|n| n.get())
22        .unwrap_or(1);
23
24    // Reserve 1 core for OS, use the rest for workers (minimum 1)
25    let suggested_workers = cpus.saturating_sub(1).max(1);
26
27    RuntimeConfig {
28        available_cpus: cpus,
29        suggested_workers,
30        stack_size: 8 * 1024 * 1024, // 16MB default
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36    pub available_cpus: usize,
37    pub suggested_workers: usize,
38    pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43    Grpc,
44    Http,
45    Wire,
46}
47
48impl ServerTransport {
49    pub const fn as_str(self) -> &'static str {
50        match self {
51            Self::Grpc => "gRPC",
52            Self::Http => "HTTP",
53            Self::Wire => "wire",
54        }
55    }
56
57    pub const fn default_bind_addr(self) -> &'static str {
58        match self {
59            Self::Grpc => "127.0.0.1:5555",
60            Self::Http => "127.0.0.1:5055",
61            Self::Wire => "127.0.0.1:5050",
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68    pub path: Option<PathBuf>,
69    pub router_bind_addr: Option<String>,
70    pub router_bind_explicit: bool,
71    pub grpc_bind_addr: Option<String>,
72    pub grpc_bind_explicit: bool,
73    /// TLS-encrypted gRPC bind address. Can run side-by-side with
74    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
75    /// stand alone for TLS-only deploys. Defaults to `None`.
76    pub grpc_tls_bind_addr: Option<String>,
77    /// Path to PEM-encoded gRPC server certificate. Resolved through
78    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
79    /// mounts). When `None` and dev-mode is enabled
80    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
81    /// cert and prints its SHA-256 fingerprint to stderr.
82    pub grpc_tls_cert: Option<PathBuf>,
83    /// Path to PEM-encoded gRPC server private key. Same env-var
84    /// pattern as `grpc_tls_cert`.
85    pub grpc_tls_key: Option<PathBuf>,
86    /// Optional path to a PEM bundle of trust anchors used to verify
87    /// client certificates. When set, the gRPC listener requires
88    /// every client to present a cert that chains to this CA — i.e.
89    /// mutual TLS. When unset, one-way TLS only.
90    pub grpc_tls_client_ca: Option<PathBuf>,
91    pub http_bind_addr: Option<String>,
92    pub http_bind_explicit: bool,
93    /// HTTPS bind address. When set, the HTTP server also serves a
94    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
95    /// run side by side (e.g. 8080 plain + 8443 TLS).
96    pub http_tls_bind_addr: Option<String>,
97    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
98    /// companion when not set explicitly.
99    pub http_tls_cert: Option<PathBuf>,
100    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
101    /// companion when not set explicitly.
102    pub http_tls_key: Option<PathBuf>,
103    /// Optional PEM CA bundle. When set, the HTTPS listener requires
104    /// every client to present a cert that chains to a CA in this
105    /// bundle (mTLS). When unset, plain server-side TLS only.
106    pub http_tls_client_ca: Option<PathBuf>,
107    pub wire_bind_addr: Option<String>,
108    pub wire_bind_explicit: bool,
109    /// TLS-encrypted wire protocol bind address
110    pub wire_tls_bind_addr: Option<String>,
111    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
112    pub wire_tls_cert: Option<PathBuf>,
113    /// Path to TLS key PEM
114    pub wire_tls_key: Option<PathBuf>,
115    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
116    /// When set the server accepts psql / JDBC / DBeaver clients on
117    /// this port via the v3 protocol. Defaults to None (disabled).
118    pub pg_bind_addr: Option<String>,
119    pub create_if_missing: bool,
120    pub read_only: bool,
121    pub role: String,
122    pub primary_addr: Option<String>,
123    pub vault: bool,
124    /// Issue #663 — explicit `--no-auth` / `--dev` flag for local
125    /// no-password mode. When `true`, the boot pipeline force-disables
126    /// auth (`auth.enabled = false`, `auth.require_auth = false`,
127    /// `auth.vault_enabled = false`) and skips
128    /// `AuthStore::bootstrap_from_env`, so an explicit
129    /// `REDDB_USERNAME` + `REDDB_PASSWORD` pair cannot accidentally
130    /// turn anonymous access into a logged-in admin. An unmissable
131    /// warning is emitted at startup. Default `false` — the existing
132    /// implicit no-auth behaviour (just don't set the envs) is
133    /// unchanged.
134    pub no_auth: bool,
135    /// Override worker thread count (None = auto-detect from CPUs)
136    pub workers: Option<usize>,
137    /// Telemetry config (Phase 6 logging). `None` falls back to the
138    /// built-in default derived from `path` + stderr-only.
139    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
140    /// HTTP handler-pool knobs from the CLI layer (issue #574 slice 5).
141    /// Carries flag and env values; red_config and built-in defaults
142    /// are applied later by [`crate::server::http_limits::resolve_http_limits`]
143    /// once the runtime is open.
144    pub http_limits_cli: crate::server::HttpLimitsCliInput,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct TransportListenerState {
149    pub transport: String,
150    pub bind_addr: String,
151    pub explicit: bool,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct TransportListenerFailure {
156    pub transport: String,
157    pub bind_addr: String,
158    pub explicit: bool,
159    pub reason: String,
160}
161
162#[derive(Debug, Clone, Default, PartialEq, Eq)]
163pub struct TransportReadiness {
164    pub active: Vec<TransportListenerState>,
165    pub failed: Vec<TransportListenerFailure>,
166}
167
168impl TransportReadiness {
169    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
170        self.active.push(TransportListenerState {
171            transport: transport.to_string(),
172            bind_addr: bind_addr.to_string(),
173            explicit,
174        });
175    }
176
177    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
178        self.failed.push(TransportListenerFailure {
179            transport: transport.to_string(),
180            bind_addr: bind_addr.to_string(),
181            explicit,
182            reason,
183        });
184    }
185}
186
187#[derive(Debug, Clone)]
188pub struct SystemdServiceConfig {
189    pub service_name: String,
190    pub binary_path: PathBuf,
191    pub run_user: String,
192    pub run_group: String,
193    pub data_path: PathBuf,
194    pub router_bind_addr: Option<String>,
195    pub grpc_bind_addr: Option<String>,
196    pub http_bind_addr: Option<String>,
197}
198
199impl SystemdServiceConfig {
200    pub fn data_dir(&self) -> PathBuf {
201        self.data_path
202            .parent()
203            .map(PathBuf::from)
204            .unwrap_or_else(|| PathBuf::from("."))
205    }
206
207    pub fn unit_path(&self) -> PathBuf {
208        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
209    }
210}
211
212/// Build a sane default `TelemetryConfig` from a server path when the
213/// caller didn't set one explicitly. Writes rotating logs into the
214/// parent directory of the DB file (or `./logs` for in-memory /
215/// pathless runs). Level defaults to `info`, pretty stderr format.
216pub fn default_telemetry_for_path(
217    path: Option<&std::path::Path>,
218) -> crate::telemetry::TelemetryConfig {
219    let log_dir = match path {
220        Some(p) => p
221            .parent()
222            .map(|parent| parent.join("logs"))
223            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
224        None => None, // in-memory — no file, stderr-only
225    };
226    crate::telemetry::TelemetryConfig {
227        log_dir,
228        file_prefix: "reddb.log".to_string(),
229        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
230        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
231            crate::telemetry::LogFormat::Pretty
232        } else {
233            crate::telemetry::LogFormat::Json
234        },
235        rotation_keep_days: 14,
236        service_name: "reddb",
237        // Implicit defaults — no CLI flag has claimed these values yet.
238        level_explicit: false,
239        format_explicit: false,
240        rotation_keep_days_explicit: false,
241        file_prefix_explicit: false,
242        log_dir_explicit: false,
243        log_file_disabled: false,
244    }
245}
246
247/// Metadata key used to thread the parsed backup config from
248/// `to_db_options` down to runner threads. The runner reads it back
249/// (via `runner_backup_intervals`) to spawn the periodic checkpointer
250/// and WAL-flush tasks. Threading through `metadata` avoids extending
251/// `RedDBOptions` with a public field that has no meaning for
252/// library consumers.
253const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
254const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
255const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
256/// Issue #519 — threaded through `metadata` like the existing interval
257/// values. `0` (default) means "feature disabled" and the runner skips
258/// the lag-monitor wiring entirely.
259const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
260
261/// Issue #663 — metadata key set by `to_db_options` when the operator
262/// passes `--no-auth` / `--dev`. Read in `build_runtime_with_telemetry`
263/// to (a) skip `AuthStore::bootstrap_from_env` (so a stray
264/// `REDDB_USERNAME`/`REDDB_PASSWORD` cannot auto-create an admin) and
265/// (b) emit the loud "auth disabled" warning. Threaded via `metadata`
266/// — rather than a public `RedDBOptions` field — to keep the flag a
267/// CLI/boot concern with no meaning for library consumers.
268pub(crate) const NO_AUTH_META: &str = "red.boot.no_auth";
269
270/// Returns `true` when `--no-auth` / `--dev` was active for this boot,
271/// i.e. when `to_db_options` stamped [`NO_AUTH_META`] on `options.metadata`.
272pub(crate) fn no_auth_active(options: &RedDBOptions) -> bool {
273    options
274        .metadata
275        .get(NO_AUTH_META)
276        .map(|v| v == "true")
277        .unwrap_or(false)
278}
279
280/// Loud, unmissable warning printed when the operator opts into
281/// anonymous access via `--no-auth` / `--dev`. Goes to stderr (always
282/// visible at startup) **and** the tracing layer (captured by file
283/// logs once telemetry is wired).
284const NO_AUTH_WARNING: &str =
285    "⚠ auth disabled (--no-auth) — anonymous access, do NOT use in production";
286
287impl ServerCommandConfig {
288    fn to_db_options(&self) -> Result<RedDBOptions, String> {
289        let mut options = match &self.path {
290            Some(path) => RedDBOptions::persistent(path),
291            None => RedDBOptions::in_memory(),
292        };
293
294        options.mode = StorageMode::Persistent;
295        options.create_if_missing = self.create_if_missing;
296        // PLAN.md Phase 4.3 — read_only resolution priority:
297        //   1. CLI flag (`--readonly`) — explicit operator intent.
298        //   2. `RED_READONLY=true` env — orchestrator override.
299        //   3. Persisted `<data>/.runtime-state.json` from a prior
300        //      `POST /admin/readonly` — survives restart.
301        //   4. Default `false`.
302        options.read_only = self.read_only
303            || env_nonempty("RED_READONLY")
304                .or_else(|| env_nonempty("REDDB_READONLY"))
305                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
306                .unwrap_or(false)
307            || self.path.as_ref().is_some_and(|data_path| {
308                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
309                    data_path,
310                ))
311                .unwrap_or(false)
312            });
313
314        options.replication = match self.role.as_str() {
315            "primary" => ReplicationConfig::primary(),
316            "replica" => {
317                let primary_addr = self
318                    .primary_addr
319                    .clone()
320                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
321                // Public-mutation rejection on replicas is enforced by
322                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
323                // Leaving `options.read_only = false` keeps the pager
324                // writable so the internal logical-WAL apply path can
325                // ingest records from the primary; WriteGate ensures no
326                // client request reaches storage.
327                ReplicationConfig::replica(primary_addr)
328            }
329            _ => ReplicationConfig::standalone(),
330        };
331
332        if self.vault {
333            options.auth.vault_enabled = true;
334        }
335
336        // Issue #663 — `--no-auth` / `--dev` is the last word on auth
337        // for this boot: force every auth knob off, regardless of any
338        // env-derived config (`--vault`, `REDDB_USERNAME`/`PASSWORD`,
339        // `REDDB_VAULT_KEY`, OAuth, cert) the operator may also have
340        // set. We *also* stamp [`NO_AUTH_META`] so the auth-store
341        // builder downstream knows to skip `bootstrap_from_env`
342        // (which would otherwise auto-create an admin from
343        // `REDDB_USERNAME`/`REDDB_PASSWORD` even with auth disabled,
344        // a footgun for the local-dev workflow this flag exists to
345        // support).
346        if self.no_auth {
347            options.auth.enabled = false;
348            options.auth.require_auth = false;
349            options.auth.vault_enabled = false;
350            options
351                .metadata
352                .insert(NO_AUTH_META.to_string(), "true".to_string());
353        }
354
355        // Issue #517 — canonical `REDDB_BACKUP_*` contract takes
356        // precedence. On Err, surface the partial-config message so
357        // boot exits non-zero with a clear operator message. On
358        // Ok(None), fall through to the legacy backend-from-env path.
359        match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
360            Err(msg) => {
361                return Err(format!("backup bootstrap: {msg}"));
362            }
363            Ok(Some(cfg)) => {
364                apply_backup_config(&mut options, &cfg);
365            }
366            Ok(None) => {
367                configure_remote_backend_from_env(&mut options);
368            }
369        }
370
371        Ok(options)
372    }
373
374    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
375        let mut transports = Vec::with_capacity(3);
376        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
377            transports.push(ServerTransport::Grpc);
378        }
379        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
380            transports.push(ServerTransport::Http);
381        }
382        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
383            transports.push(ServerTransport::Wire);
384        }
385        transports
386    }
387}
388
389/// Read an env var, treating empty / whitespace-only as `None`.
390/// Honors the `<NAME>_FILE` convention. Re-exports the shared
391/// `crate::utils::env_with_file_fallback` helper so call sites in
392/// this module can keep their short local name.
393fn env_nonempty(name: &str) -> Option<String> {
394    crate::utils::env_with_file_fallback(name)
395}
396
397fn env_truthy(name: &str) -> bool {
398    env_nonempty(name)
399        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
400        .unwrap_or(false)
401}
402
403/// Apply a parsed [`BackupConfig`] to `options`. Wires the S3
404/// backend via `with_remote_backend` + `with_atomic_remote_backend`
405/// when the `backend-s3` feature is on, stashes intervals + backend
406/// kind in `metadata` so the runner can spawn the periodic tasks,
407/// and emits the startup INFO log required by #517.
408fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
409    let endpoint_host = endpoint_host(&cfg.endpoint);
410
411    options.metadata.insert(
412        BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
413        cfg.checkpoint_interval_secs.to_string(),
414    );
415    options.metadata.insert(
416        BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
417        cfg.wal_flush_interval_secs.to_string(),
418    );
419    options
420        .metadata
421        .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
422    options.metadata.insert(
423        BACKUP_PAUSE_ON_LAG_META.to_string(),
424        cfg.pause_on_lag_secs.to_string(),
425    );
426
427    #[cfg(feature = "backend-s3")]
428    {
429        let s3_cfg = crate::storage::backend::S3Config {
430            endpoint: cfg.endpoint.clone(),
431            bucket: cfg.bucket.clone(),
432            key_prefix: cfg.prefix.clone(),
433            access_key: cfg.access_key_id.clone(),
434            secret_key: cfg.secret_access_key.clone(),
435            region: cfg.region.clone(),
436            path_style: true,
437        };
438        let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
439        options.remote_backend = Some(backend.clone());
440        options.remote_backend_atomic = Some(backend);
441        // Use the operator-supplied prefix as the snapshot key root.
442        // The existing helpers (`default_snapshot_prefix`,
443        // `default_wal_archive_prefix`) derive sub-prefixes from the
444        // parent of `remote_key`.
445        let trimmed = cfg.prefix.trim_end_matches('/');
446        options.remote_key = Some(format!("{}/data.rdb", trimmed));
447
448        tracing::info!(
449            backend = "s3",
450            endpoint = %endpoint_host,
451            bucket = %cfg.bucket,
452            prefix = %cfg.prefix,
453            checkpoint_interval_secs = cfg.checkpoint_interval_secs,
454            wal_flush_interval_secs = cfg.wal_flush_interval_secs,
455            "backup backend configured from REDDB_BACKUP_* env"
456        );
457    }
458
459    #[cfg(not(feature = "backend-s3"))]
460    {
461        tracing::warn!(
462            backend = "s3",
463            endpoint = %endpoint_host,
464            bucket = %cfg.bucket,
465            prefix = %cfg.prefix,
466            "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
467             backend wiring skipped (archiver/checkpointer also disabled)"
468        );
469    }
470}
471
472fn endpoint_host(endpoint: &str) -> &str {
473    let after_scheme = endpoint
474        .split_once("://")
475        .map(|(_, r)| r)
476        .unwrap_or(endpoint);
477    after_scheme.split('/').next().unwrap_or(after_scheme)
478}
479
480/// If `options` carry backup-task intervals threaded in via
481/// [`apply_backup_config`], spawn periodic checkpointer + WAL-flush
482/// tasks against `runtime`. Returns a `BackupTasksHandle` that
483/// stops the tasks when dropped; runners keep it alive for the
484/// server lifetime.
485fn spawn_backup_tasks_if_configured(
486    options: &RedDBOptions,
487    runtime: &RedDBRuntime,
488) -> Option<BackupTasksHandle> {
489    let checkpoint_secs: u64 = options
490        .metadata
491        .get(BACKUP_INTERVAL_META_CHECKPOINT)?
492        .parse()
493        .ok()?;
494    let wal_secs: u64 = options
495        .metadata
496        .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
497        .parse()
498        .ok()?;
499    // Issue #519 — opt-in graceful read-only when remote archive lag
500    // exceeds the threshold. `0` (default) keeps legacy behaviour.
501    let pause_on_lag_secs: u64 = options
502        .metadata
503        .get(BACKUP_PAUSE_ON_LAG_META)
504        .and_then(|raw| raw.parse().ok())
505        .unwrap_or(0);
506    options.remote_backend.as_ref()?;
507
508    let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
509
510    // Stamp the gate with the threshold + a "now" baseline so the
511    // first auto-pause can only fire after `pause_on_lag_secs` of
512    // archive silence. The poller below re-evaluates on the same
513    // clock the archive-task wrapper uses.
514    if pause_on_lag_secs > 0 {
515        let now_ms = std::time::SystemTime::now()
516            .duration_since(std::time::UNIX_EPOCH)
517            .map(|d| d.as_millis() as u64)
518            .unwrap_or(0);
519        runtime
520            .write_gate()
521            .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
522        tracing::info!(
523            pause_on_lag_secs,
524            "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
525        );
526    }
527
528    let checkpoint_handle = {
529        let stop = Arc::clone(&stop);
530        let runtime = runtime.clone();
531        let interval = Duration::from_secs(checkpoint_secs);
532        thread::Builder::new()
533            .name("red-checkpointer".into())
534            .spawn(move || {
535                periodic_loop(stop, interval, move || {
536                    if let Err(err) = runtime.checkpoint() {
537                        tracing::warn!(error = %err, "periodic checkpoint failed");
538                    }
539                })
540            })
541            .ok()
542    };
543
544    let archiver_handle = {
545        let stop = Arc::clone(&stop);
546        let runtime = runtime.clone();
547        let interval = Duration::from_secs(wal_secs);
548        let lag_enabled = pause_on_lag_secs > 0;
549        thread::Builder::new()
550            .name("red-wal-archiver".into())
551            .spawn(move || {
552                periodic_loop(stop, interval, move || match runtime.trigger_backup() {
553                    Ok(_) if lag_enabled => {
554                        let now_ms = std::time::SystemTime::now()
555                            .duration_since(std::time::UNIX_EPOCH)
556                            .map(|d| d.as_millis() as u64)
557                            .unwrap_or(0);
558                        runtime.write_gate().record_archive_success(now_ms);
559                        // Same-tick re-evaluation: catching up while
560                        // already auto-paused must auto-resume without
561                        // waiting for the poller's cadence.
562                        runtime.write_gate().evaluate_archive_lag(now_ms);
563                    }
564                    Ok(_) => {}
565                    Err(err) => {
566                        tracing::warn!(error = %err, "periodic WAL archive/backup failed");
567                    }
568                })
569            })
570            .ok()
571    };
572
573    // Issue #519 — lag poller. Wakes on a short cadence so a frozen
574    // archiver (the worst case) still flips the gate within ~5s of
575    // crossing the threshold, instead of waiting up to a full
576    // `wal_secs` for the next archive attempt that may never come.
577    let lag_monitor_handle = if pause_on_lag_secs > 0 {
578        let stop = Arc::clone(&stop);
579        let runtime = runtime.clone();
580        // 5s is short enough that the threshold is honoured tightly
581        // and long enough that the atomic loads stay invisible at the
582        // process level.
583        let interval = Duration::from_secs(5);
584        thread::Builder::new()
585            .name("red-archive-lag-monitor".into())
586            .spawn(move || {
587                periodic_loop(stop, interval, move || {
588                    let now_ms = std::time::SystemTime::now()
589                        .duration_since(std::time::UNIX_EPOCH)
590                        .map(|d| d.as_millis() as u64)
591                        .unwrap_or(0);
592                    let was_paused = runtime.write_gate().is_auto_paused();
593                    let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
594                    if now_paused && !was_paused {
595                        tracing::warn!(
596                            pause_on_lag_secs,
597                            last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
598                            "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
599                        );
600                    } else if !now_paused && was_paused {
601                        tracing::info!(
602                            "WAL archive caught up — exiting graceful read-only mode (issue #519)"
603                        );
604                    }
605                })
606            })
607            .ok()
608    } else {
609        None
610    };
611
612    tracing::info!(
613        checkpoint_interval_secs = checkpoint_secs,
614        wal_flush_interval_secs = wal_secs,
615        "backup tasks spawned (checkpointer + WAL archiver)"
616    );
617
618    Some(BackupTasksHandle {
619        stop,
620        _checkpoint_handle: checkpoint_handle,
621        _archiver_handle: archiver_handle,
622        _lag_monitor_handle: lag_monitor_handle,
623    })
624}
625
626/// Shutdown handle for the periodic checkpointer + archiver tasks.
627/// Drop signals both loops to exit on their next wake.
628pub struct BackupTasksHandle {
629    stop: Arc<std::sync::atomic::AtomicBool>,
630    _checkpoint_handle: Option<thread::JoinHandle<()>>,
631    _archiver_handle: Option<thread::JoinHandle<()>>,
632    /// Issue #519 — periodic archive-lag poller, only spawned when
633    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS > 0`.
634    _lag_monitor_handle: Option<thread::JoinHandle<()>>,
635}
636
637impl Drop for BackupTasksHandle {
638    fn drop(&mut self) {
639        self.stop.store(true, std::sync::atomic::Ordering::Release);
640    }
641}
642
643fn periodic_loop<F: FnMut()>(
644    stop: Arc<std::sync::atomic::AtomicBool>,
645    interval: Duration,
646    mut tick: F,
647) {
648    // Wake on a short cadence so shutdown is responsive even when the
649    // operator-configured interval is large (e.g. 1h checkpoint).
650    let wake = Duration::from_secs(1);
651    let mut elapsed = Duration::ZERO;
652    while !stop.load(std::sync::atomic::Ordering::Acquire) {
653        thread::sleep(wake);
654        elapsed += wake;
655        if elapsed >= interval {
656            tick();
657            elapsed = Duration::ZERO;
658        }
659    }
660}
661
662fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
663    // PLAN.md (cloud-agnostic) — prefer the new spelling
664    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
665    // existing dev installs. `none` (default) means standalone — no
666    // remote backend, valid for development and on-prem without
667    // remote.
668    let backend = env_nonempty("RED_BACKEND")
669        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
670        .unwrap_or_else(|| "none".to_string())
671        .to_ascii_lowercase();
672
673    match backend.as_str() {
674        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
675        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
676        // IDrive, Storj. The `path_style` toggle in S3Config picks
677        // the right addressing for self-hosted vs hosted.
678        "s3" | "minio" | "r2" => {
679            #[cfg(feature = "backend-s3")]
680            {
681                if let Some(config) = s3_config_from_env() {
682                    let remote_key = env_nonempty("RED_REMOTE_KEY")
683                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
684                        .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
685                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
686                    options.remote_backend = Some(backend.clone());
687                    options.remote_backend_atomic = Some(backend);
688                    options.remote_key = Some(remote_key);
689                }
690            }
691            #[cfg(not(feature = "backend-s3"))]
692            {
693                tracing::warn!(
694                    backend = %backend,
695                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
696                );
697            }
698        }
699        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
700        // calls it `fs`; legacy code shipped it as `local`. Both
701        // names map to LocalBackend, with the remote_key derived
702        // from `RED_FS_PATH` + a per-database suffix when provided.
703        "fs" | "local" => {
704            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
705            let remote_key = match (
706                base_path,
707                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
708            ) {
709                (Some(base), Some(rel)) => Some(format!(
710                    "{}/{}",
711                    base.trim_end_matches('/'),
712                    rel.trim_start_matches('/')
713                )),
714                (Some(base), None) => Some(format!(
715                    "{}/clusters/dev/data.rdb",
716                    base.trim_end_matches('/')
717                )),
718                (None, Some(rel)) => Some(rel),
719                (None, None) => None,
720            };
721            if let Some(key) = remote_key {
722                let backend = Arc::new(crate::storage::backend::LocalBackend);
723                options.remote_backend = Some(backend.clone());
724                options.remote_backend_atomic = Some(backend);
725                options.remote_key = Some(key);
726            }
727        }
728        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
729        // portability: any service exposing PUT/GET/DELETE serves
730        // as a backup target. Optional auth via *_FILE secret
731        // path keeps the token out of the env.
732        "http" => {
733            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
734                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
735            {
736                Some(u) => u,
737                None => {
738                    tracing::warn!(
739                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
740                    );
741                    return;
742                }
743            };
744            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
745                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
746                .unwrap_or_default();
747            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
748                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
749            {
750                std::fs::read_to_string(&path)
751                    .ok()
752                    .map(|s| s.trim().to_string())
753                    .filter(|s| !s.is_empty())
754            } else {
755                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
756                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
757            };
758
759            let mut config =
760                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
761            if let Some(auth) = auth_header {
762                config = config.with_auth_header(auth);
763            }
764            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
765                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
766                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
767            config = config.with_conditional_writes(conditional_writes);
768            // Always populate the snapshot-transport handle. When the
769            // operator confirmed CAS support, also populate the atomic
770            // handle via AtomicHttpBackend — without that flag,
771            // LeaseStore must remain unreachable on this backend.
772            if conditional_writes {
773                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
774                    Ok(atomic) => {
775                        let atomic_arc = Arc::new(atomic);
776                        options.remote_backend = Some(atomic_arc.clone());
777                        options.remote_backend_atomic = Some(atomic_arc);
778                    }
779                    Err(err) => {
780                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
781                        options.remote_backend =
782                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
783                    }
784                }
785            } else {
786                options.remote_backend =
787                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
788            }
789            options.remote_key = env_nonempty("RED_REMOTE_KEY")
790                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
791                .or_else(|| Some("clusters/dev/data.rdb".to_string()));
792        }
793        // `none` is the explicit standalone — no remote, no backup
794        // pipeline. Boot never blocks on network reachability.
795        "none" | "" => {}
796        other => {
797            tracing::warn!(
798                backend = %other,
799                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
800            );
801        }
802    }
803}
804
805/// Resolve a value from env, accepting both the cloud-agnostic
806/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
807/// kept for existing dev installs. The new spelling wins; the
808/// legacy spelling is read with a warning hint in callers' logs.
809#[cfg(feature = "backend-s3")]
810fn env_s3(suffix: &str) -> Option<String> {
811    env_nonempty(&format!("RED_S3_{suffix}"))
812        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
813}
814
815/// Read a secret value from either the literal env var or a file
816/// path supplied via `*_FILE` (PLAN.md spec — compatible with
817/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
818/// secrets). The `_FILE` variant wins so an operator can set it to
819/// override the inline value without touching the inline env.
820#[cfg(feature = "backend-s3")]
821fn env_s3_secret(suffix: &str) -> Option<String> {
822    let file_key_red = format!("RED_S3_{suffix}_FILE");
823    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
824    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
825        return std::fs::read_to_string(&path)
826            .ok()
827            .map(|s| s.trim().to_string())
828            .filter(|s| !s.is_empty());
829    }
830    env_s3(suffix)
831}
832
833#[cfg(feature = "backend-s3")]
834fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
835    let endpoint = env_s3("ENDPOINT")?;
836    let bucket = env_s3("BUCKET")?;
837    let access_key = env_s3_secret("ACCESS_KEY")?;
838    let secret_key = env_s3_secret("SECRET_KEY")?;
839    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
840    let key_prefix = env_s3("KEY_PREFIX")
841        .or_else(|| env_s3("PREFIX"))
842        .unwrap_or_default();
843    let path_style = env_s3("PATH_STYLE")
844        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
845        .unwrap_or(true);
846    Some(crate::storage::backend::S3Config {
847        endpoint,
848        bucket,
849        key_prefix,
850        access_key,
851        secret_key,
852        region,
853        path_style,
854    })
855}
856
857pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
858    let data_dir = config.data_dir();
859    let exec_start = render_systemd_exec_start(config);
860    format!(
861        "[Unit]\n\
862Description=RedDB unified database service\n\
863After=network-online.target\n\
864Wants=network-online.target\n\
865\n\
866[Service]\n\
867Type=simple\n\
868User={user}\n\
869Group={group}\n\
870WorkingDirectory={workdir}\n\
871ExecStart={exec_start}\n\
872Restart=always\n\
873RestartSec=2\n\
874LimitSTACK=16M\n\
875NoNewPrivileges=true\n\
876PrivateTmp=true\n\
877ProtectSystem=strict\n\
878ProtectHome=true\n\
879ProtectControlGroups=true\n\
880ProtectKernelTunables=true\n\
881ProtectKernelModules=true\n\
882RestrictNamespaces=true\n\
883LockPersonality=true\n\
884MemoryDenyWriteExecute=true\n\
885ReadWritePaths={workdir}\n\
886\n\
887[Install]\n\
888WantedBy=multi-user.target\n",
889        user = config.run_user,
890        group = config.run_group,
891        workdir = data_dir.display(),
892        exec_start = exec_start,
893    )
894}
895
896/// Install a systemd unit + start the service.
897///
898/// Linux-only. The helper shells out to `systemctl`, `useradd`,
899/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
900/// Windows or macOS. The Windows/macOS branch returns a hard error so
901/// callers (the CLI) surface a clear message instead of a confusing
902/// syscall failure. A proper Windows-service equivalent (sc.exe /
903/// NSSM) is a Phase 3.6 follow-up.
904#[cfg(target_os = "linux")]
905pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
906    ensure_root()?;
907    ensure_command_available("systemctl")?;
908    ensure_command_available("getent")?;
909    ensure_command_available("groupadd")?;
910    ensure_command_available("useradd")?;
911    ensure_command_available("install")?;
912    ensure_executable(&config.binary_path)?;
913
914    if !command_success("getent", ["group", config.run_group.as_str()])? {
915        run_command("groupadd", ["--system", config.run_group.as_str()])?;
916    }
917
918    if !command_success("id", ["-u", config.run_user.as_str()])? {
919        let data_dir = config.data_dir();
920        run_command(
921            "useradd",
922            [
923                "--system",
924                "--gid",
925                config.run_group.as_str(),
926                "--home-dir",
927                data_dir.to_string_lossy().as_ref(),
928                "--shell",
929                "/usr/sbin/nologin",
930                config.run_user.as_str(),
931            ],
932        )?;
933    }
934
935    let data_dir = config.data_dir();
936    run_command(
937        "install",
938        [
939            "-d",
940            "-o",
941            config.run_user.as_str(),
942            "-g",
943            config.run_group.as_str(),
944            "-m",
945            "0750",
946            data_dir.to_string_lossy().as_ref(),
947        ],
948    )?;
949
950    std::fs::write(config.unit_path(), render_systemd_unit(config))
951        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
952
953    run_command("systemctl", ["daemon-reload"])?;
954    run_command(
955        "systemctl",
956        [
957            "enable",
958            "--now",
959            format!("{}.service", config.service_name).as_str(),
960        ],
961    )?;
962
963    Ok(())
964}
965
966/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
967/// identical so callers compile on every platform; surface a clear
968/// error at runtime. Windows/macOS service-manager integration is a
969/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
970#[cfg(not(target_os = "linux"))]
971pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
972    Err("systemd install is Linux-only — use sc.exe (Windows) or \
973         launchd (macOS) to install the service manually using the \
974         unit printed by `red service print-unit`"
975        .to_string())
976}
977
978#[cfg(target_os = "linux")]
979fn ensure_root() -> Result<(), String> {
980    let output = Command::new("id")
981        .arg("-u")
982        .output()
983        .map_err(|err| format!("failed to determine current uid: {err}"))?;
984    if !output.status.success() {
985        return Err("failed to determine current uid".to_string());
986    }
987    let uid = String::from_utf8_lossy(&output.stdout);
988    if uid.trim() != "0" {
989        return Err("run this command as root (sudo)".to_string());
990    }
991    Ok(())
992}
993
994#[cfg(target_os = "linux")]
995fn ensure_command_available(command: &str) -> Result<(), String> {
996    let status = Command::new("sh")
997        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
998        .status()
999        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
1000    if status.success() {
1001        Ok(())
1002    } else {
1003        Err(format!("required command not found: {command}"))
1004    }
1005}
1006
1007#[cfg(target_os = "linux")]
1008fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
1009    let metadata = std::fs::metadata(path)
1010        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
1011    #[cfg(unix)]
1012    {
1013        use std::os::unix::fs::PermissionsExt;
1014        if metadata.permissions().mode() & 0o111 == 0 {
1015            return Err(format!("binary is not executable: {}", path.display()));
1016        }
1017    }
1018    #[cfg(not(unix))]
1019    {
1020        if !metadata.is_file() {
1021            return Err(format!("binary is not a file: {}", path.display()));
1022        }
1023    }
1024    Ok(())
1025}
1026
1027#[cfg(target_os = "linux")]
1028fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
1029    Command::new(program)
1030        .args(args)
1031        .status()
1032        .map(|status| status.success())
1033        .map_err(|err| format!("failed to run {program}: {err}"))
1034}
1035
1036#[cfg(target_os = "linux")]
1037fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
1038    let output = Command::new(program)
1039        .args(args)
1040        .output()
1041        .map_err(|err| format!("failed to run {program}: {err}"))?;
1042    if output.status.success() {
1043        return Ok(());
1044    }
1045
1046    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
1047    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
1048    let detail = if !stderr.is_empty() {
1049        stderr
1050    } else if !stdout.is_empty() {
1051        stdout
1052    } else {
1053        format!("exit status {}", output.status)
1054    };
1055    Err(format!("{program} failed: {detail}"))
1056}
1057
1058pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1059    let has_any = config.router_bind_addr.is_some()
1060        || config.grpc_bind_addr.is_some()
1061        || config.http_bind_addr.is_some()
1062        || config.wire_bind_addr.is_some()
1063        || config.pg_bind_addr.is_some();
1064    if !has_any {
1065        return Err("at least one server bind address must be configured".into());
1066    }
1067    let thread_name = if config.router_bind_addr.is_some() {
1068        "red-server-router"
1069    } else {
1070        match (
1071            config.grpc_bind_addr.is_some(),
1072            config.http_bind_addr.is_some(),
1073        ) {
1074            (true, true) => "red-server-dual",
1075            (true, false) => "red-server-grpc",
1076            (false, true) => "red-server-http",
1077            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1078            (false, false) => "red-server-pg-wire",
1079        }
1080    };
1081
1082    let handle = thread::Builder::new()
1083        .name(thread_name.into())
1084        .stack_size(8 * 1024 * 1024)
1085        .spawn(move || run_configured_servers(config))
1086        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1087
1088    match handle.join() {
1089        Ok(result) => result,
1090        Err(_) => Err("server thread panicked".to_string()),
1091    }
1092}
1093
1094fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1095    let mut parts = vec![
1096        config.binary_path.display().to_string(),
1097        "server".to_string(),
1098        "--path".to_string(),
1099        config.data_path.display().to_string(),
1100    ];
1101
1102    if let Some(bind_addr) = &config.router_bind_addr {
1103        parts.push("--bind".to_string());
1104        parts.push(bind_addr.clone());
1105    } else if let Some(bind_addr) = &config.grpc_bind_addr {
1106        parts.push("--grpc-bind".to_string());
1107        parts.push(bind_addr.clone());
1108    }
1109    if let Some(bind_addr) = &config.http_bind_addr {
1110        parts.push("--http-bind".to_string());
1111        parts.push(bind_addr.clone());
1112    }
1113
1114    parts.join(" ")
1115}
1116
1117pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1118    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1119        Ok(addresses) => addresses.collect(),
1120        Err(_) => return false,
1121    };
1122
1123    addresses
1124        .into_iter()
1125        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1126}
1127
1128#[inline(never)]
1129fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1130    // Phase 6 logging is initialised inside each runner once the
1131    // runtime is open — see `build_runtime_and_auth_store`. Going
1132    // after DB open lets us read `red.logging.*` config keys out of
1133    // the persistent red_config store and merge them with the CLI
1134    // flags (flag > red_config > built-in default).
1135    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1136        return run_routed_server(config, router_bind_addr);
1137    }
1138
1139    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1140        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1141            run_dual_server(config, grpc_bind_addr, http_bind_addr)
1142        }
1143        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1144        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1145        (None, None) => {
1146            if let Some(wire_addr) = config.wire_bind_addr.clone() {
1147                run_wire_only_server(config, wire_addr)
1148            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1149                run_pg_only_server(config, pg_addr)
1150            } else {
1151                Err("at least one server bind address must be configured".to_string())
1152            }
1153        }
1154    }
1155}
1156
1157/// Bind a TCP listener for a transport at startup and record the
1158/// outcome in the shared [`TransportReadiness`] state.
1159///
1160/// The split between *explicit* and *implicit/default* binds is the
1161/// contract from issue #545:
1162///
1163/// * `explicit == true` — the operator named this transport on the
1164///   CLI / env / config. A failed bind is fatal: this returns `Err`
1165///   and the boot path must propagate the error so the process exits
1166///   non-zero with the recorded `reason`.
1167/// * `explicit == false` — this is a default listener the server
1168///   would have spun up regardless. A failed bind degrades: this
1169///   returns `Ok(None)` (no listener) but the failure is still
1170///   recorded in `readiness.failed`, so the server keeps running and
1171///   the next `/health` probe enumerates the degraded listener.
1172///
1173/// On success the bound listener lands in `readiness.active`.
1174pub fn bind_listener_for_startup(
1175    readiness: &mut TransportReadiness,
1176    transport: &str,
1177    bind_addr: &str,
1178    explicit: bool,
1179) -> Result<Option<TcpListener>, String> {
1180    match TcpListener::bind(bind_addr) {
1181        Ok(listener) => {
1182            readiness.active(transport, bind_addr, explicit);
1183            Ok(Some(listener))
1184        }
1185        Err(err) => {
1186            let reason = format!("{transport} listener bind {bind_addr}: {err}");
1187            readiness.failed(transport, bind_addr, explicit, reason.clone());
1188            if explicit {
1189                tracing::error!(
1190                    transport,
1191                    bind = %bind_addr,
1192                    error = %err,
1193                    "fatal explicit bind failure"
1194                );
1195                Err(format!("explicit {reason}"))
1196            } else {
1197                tracing::warn!(
1198                    transport,
1199                    bind = %bind_addr,
1200                    error = %err,
1201                    "non-fatal implicit bind failure; listener degraded"
1202                );
1203                Ok(None)
1204            }
1205        }
1206    }
1207}
1208
1209/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
1210///
1211/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
1212/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
1213/// grace window. SIGKILL after that grace window is the OS's
1214/// fallback; we are responsible for finishing in time.
1215///
1216/// Spawns a tokio task on the caller's runtime that:
1217///   1. Awaits the first of SIGTERM / SIGINT.
1218///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
1219///      runtime moves to `Stopped` on its own; this just runs the
1220///      flush + checkpoint pipeline and (optionally) a final backup.
1221///   3. Calls `std::process::exit(0)` so the orchestrator sees a
1222///      clean exit code.
1223///
1224/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
1225/// configured) toggles step 3's backup branch. The flush + checkpoint
1226/// always run.
1227///
1228/// Idempotent across signal storms — `graceful_shutdown` returns the
1229/// cached report on second call, but we exit on the first one
1230/// regardless, so the second SIGTERM never reaches the handler.
1231async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1232    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1233        .ok()
1234        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1235        .unwrap_or(true);
1236
1237    #[cfg(unix)]
1238    {
1239        use tokio::signal::unix::{signal, SignalKind};
1240
1241        let mut sigterm = match signal(SignalKind::terminate()) {
1242            Ok(s) => s,
1243            Err(err) => {
1244                tracing::warn!(
1245                    error = %err,
1246                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1247                );
1248                return;
1249            }
1250        };
1251        let mut sigint = match signal(SignalKind::interrupt()) {
1252            Ok(s) => s,
1253            Err(err) => {
1254                tracing::warn!(error = %err, "could not install SIGINT handler");
1255                return;
1256            }
1257        };
1258        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
1259        // their `_FILE` companions without restarting the process.
1260        // Useful for credential rotation pipelines (kubectl create
1261        // secret + kubectl rollout restart, but for systemd / Nomad /
1262        // bare-metal where rolling the process is heavier).
1263        let mut sighup = match signal(SignalKind::hangup()) {
1264            Ok(s) => Some(s),
1265            Err(err) => {
1266                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1267                None
1268            }
1269        };
1270
1271        let reload_runtime = runtime.clone();
1272        tokio::spawn(async move {
1273            loop {
1274                let signal_name = match &mut sighup {
1275                    Some(hup) => tokio::select! {
1276                        _ = sigterm.recv() => "SIGTERM",
1277                        _ = sigint.recv() => "SIGINT",
1278                        _ = hup.recv() => "SIGHUP",
1279                    },
1280                    None => tokio::select! {
1281                        _ = sigterm.recv() => "SIGTERM",
1282                        _ = sigint.recv() => "SIGINT",
1283                    },
1284                };
1285
1286                if signal_name == "SIGHUP" {
1287                    handle_sighup_reload(&reload_runtime);
1288                    continue; // stay alive; SIGHUP isn't a shutdown
1289                }
1290
1291                tracing::info!(
1292                    signal = signal_name,
1293                    "lifecycle signal received; shutting down"
1294                );
1295                match runtime.graceful_shutdown(backup_on_shutdown) {
1296                    Ok(report) => {
1297                        tracing::info!(
1298                            duration_ms = report.duration_ms,
1299                            flushed_wal = report.flushed_wal,
1300                            final_checkpoint = report.final_checkpoint,
1301                            backup_uploaded = report.backup_uploaded,
1302                            "graceful shutdown complete"
1303                        );
1304                    }
1305                    Err(err) => {
1306                        tracing::error!(error = %err, "graceful shutdown failed");
1307                        // Issue #205 — graceful shutdown returning Err
1308                        // means the runtime is exiting without a clean
1309                        // flush/checkpoint. Operator-grade event so the
1310                        // operator notices the dirty exit even when the
1311                        // process restarts before they read tracing logs.
1312                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1313                            reason: format!("graceful shutdown failed: {err}"),
1314                        }
1315                        .emit_global();
1316                    }
1317                }
1318                std::process::exit(0);
1319            }
1320        });
1321    }
1322
1323    #[cfg(not(unix))]
1324    {
1325        tokio::spawn(async move {
1326            let interrupted = tokio::signal::ctrl_c().await;
1327            if let Err(err) = interrupted {
1328                tracing::warn!(error = %err, "could not install Ctrl+C handler");
1329                return;
1330            }
1331
1332            tracing::info!(
1333                signal = "Ctrl+C",
1334                "lifecycle signal received; shutting down"
1335            );
1336            match runtime.graceful_shutdown(backup_on_shutdown) {
1337                Ok(report) => {
1338                    tracing::info!(
1339                        duration_ms = report.duration_ms,
1340                        flushed_wal = report.flushed_wal,
1341                        final_checkpoint = report.final_checkpoint,
1342                        backup_uploaded = report.backup_uploaded,
1343                        "graceful shutdown complete"
1344                    );
1345                }
1346                Err(err) => {
1347                    tracing::error!(error = %err, "graceful shutdown failed");
1348                }
1349            }
1350            std::process::exit(0);
1351        });
1352    }
1353}
1354
1355/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
1356/// vars. Today this only refreshes the audit log + records the
1357/// reload event; the runtime modules that hold cached secret
1358/// material (S3 backend credentials, admin token, JWT keys) read
1359/// the env on each request so the next call after SIGHUP picks up
1360/// the new file contents automatically. A future extension can
1361/// punch through to the LeaseStore / AuthStore for in-memory
1362/// caches that don't re-read on each call.
1363fn handle_sighup_reload(runtime: &RedDBRuntime) {
1364    let now_ms = std::time::SystemTime::now()
1365        .duration_since(std::time::UNIX_EPOCH)
1366        .map(|d| d.as_millis() as u64)
1367        .unwrap_or(0);
1368    tracing::info!(
1369        target: "reddb::secrets",
1370        ts_unix_ms = now_ms,
1371        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1372    );
1373    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1374    // every emission goes through the typed-field guard. The
1375    // arguments here are static, but using the typed entry point
1376    // keeps the discipline uniform across call sites.
1377    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1378    runtime.audit_log().record_event(
1379        AuditEvent::builder("config/sighup_reload")
1380            .source(AuditAuthSource::System)
1381            .resource("secrets")
1382            .outcome(Outcome::Success)
1383            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1384            .build(),
1385    );
1386}
1387
1388#[inline(never)]
1389fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1390    let workers = config.workers;
1391    let cli_telemetry = config.telemetry.clone();
1392    let db_options = config.to_db_options()?;
1393    let rt_config = detect_runtime_config();
1394    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1395    let (runtime, auth_store, _telemetry_guard) =
1396        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1397    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1398
1399    spawn_admin_metrics_listeners(&runtime, &auth_store);
1400
1401    let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1402        .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1403    let http_backend = http_listener
1404        .local_addr()
1405        .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1406    let http_server = build_http_server(
1407        runtime.clone(),
1408        auth_store.clone(),
1409        http_backend.to_string(),
1410    );
1411    let http_server = apply_http_limits(http_server, &config, &runtime);
1412    let http_handle = http_server.serve_in_background_on(http_listener);
1413
1414    thread::sleep(Duration::from_millis(100));
1415    if http_handle.is_finished() {
1416        return match http_handle.join() {
1417            Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1418            Ok(Err(err)) => Err(err.to_string()),
1419            Err(_) => Err("HTTP backend thread panicked".to_string()),
1420        };
1421    }
1422
1423    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1424        .enable_all()
1425        .worker_threads(worker_threads)
1426        .thread_stack_size(rt_config.stack_size)
1427        .build()
1428        .map_err(|err| format!("tokio runtime: {err}"))?;
1429
1430    let signal_runtime = runtime.clone();
1431    tokio_runtime.block_on(async move {
1432        spawn_lifecycle_signal_handler(signal_runtime).await;
1433        let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1434            .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1435        let grpc_backend = grpc_listener
1436            .local_addr()
1437            .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1438        let grpc_server = RedDBGrpcServer::with_options(
1439            runtime.clone(),
1440            GrpcServerOptions {
1441                bind_addr: grpc_backend.to_string(),
1442                tls: None,
1443            },
1444            auth_store,
1445        );
1446        tokio::spawn(async move {
1447            if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1448                tracing::error!(err = %err, "gRPC backend error");
1449            }
1450        });
1451
1452        let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1453            .await
1454            .map_err(|err| format!("bind internal wire listener: {err}"))?;
1455        let wire_backend = wire_listener
1456            .local_addr()
1457            .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1458        let wire_rt = Arc::new(runtime);
1459        tokio::spawn(async move {
1460            if let Err(err) =
1461                crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1462                    .await
1463            {
1464                tracing::error!(err = %err, "redwire backend error");
1465            }
1466        });
1467
1468        tracing::info!(
1469            bind = %router_bind_addr,
1470            cpus = rt_config.available_cpus,
1471            workers = worker_threads,
1472            "router bootstrapping"
1473        );
1474        serve_tcp_router(TcpProtocolRouterConfig {
1475            bind_addr: router_bind_addr,
1476            grpc_backend,
1477            http_backend,
1478            wire_backend,
1479        })
1480        .await
1481        .map_err(|err| err.to_string())
1482    })
1483}
1484
1485/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1486async fn spawn_wire_listeners(
1487    config: &ServerCommandConfig,
1488    runtime: &RedDBRuntime,
1489    readiness: &mut TransportReadiness,
1490) -> Result<(), String> {
1491    // Plaintext RedWire — TCP or Unix socket
1492    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1493        let wire_rt = Arc::new(runtime.clone());
1494        // Address starting with `unix://` or an absolute filesystem path
1495        // switches to Unix domain sockets.
1496        #[cfg(unix)]
1497        {
1498            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1499                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1500                tokio::spawn(async move {
1501                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1502                        &wire_addr, wire_rt,
1503                    )
1504                    .await
1505                    {
1506                        tracing::error!(err = %e, "redwire unix listener error");
1507                    }
1508                });
1509                return Ok(());
1510            }
1511        }
1512        match tokio::net::TcpListener::bind(&wire_addr).await {
1513            Ok(listener) => {
1514                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1515                tokio::spawn(async move {
1516                    if let Err(e) =
1517                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1518                            .await
1519                    {
1520                        tracing::error!(err = %e, "redwire listener error");
1521                    }
1522                });
1523            }
1524            Err(err) => {
1525                let reason = format!("wire listener bind {wire_addr}: {err}");
1526                readiness.failed(
1527                    "wire",
1528                    &wire_addr,
1529                    config.wire_bind_explicit,
1530                    reason.clone(),
1531                );
1532                if config.wire_bind_explicit {
1533                    tracing::error!(
1534                        transport = "wire",
1535                        bind = %wire_addr,
1536                        error = %err,
1537                        "fatal explicit bind failure"
1538                    );
1539                    return Err(format!("explicit {reason}"));
1540                }
1541                tracing::warn!(
1542                    transport = "wire",
1543                    bind = %wire_addr,
1544                    error = %err,
1545                    "non-fatal implicit bind failure; listener degraded"
1546                );
1547            }
1548        }
1549    }
1550
1551    // RedWire over TLS
1552    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1553        let tls_config = resolve_wire_tls_config(config);
1554        match tls_config {
1555            Ok(tls_cfg) => {
1556                let wire_rt = Arc::new(runtime.clone());
1557                tokio::spawn(async move {
1558                    if let Err(e) =
1559                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1560                            .await
1561                    {
1562                        tracing::error!(err = %e, "redwire+tls listener error");
1563                    }
1564                });
1565            }
1566            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1567        }
1568    }
1569    Ok(())
1570}
1571
1572/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1573///
1574/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1575/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1576/// alongside the native wire listener; the two transports do not
1577/// share a port.
1578fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1579    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1580        let rt = Arc::new(runtime.clone());
1581        tokio::spawn(async move {
1582            let cfg = crate::wire::PgWireConfig {
1583                bind_addr: pg_addr,
1584                ..Default::default()
1585            };
1586            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1587                tracing::error!(err = %e, "pg wire listener error");
1588            }
1589        });
1590    }
1591}
1592
1593/// Resolve gRPC TLS material into PEM bytes.
1594///
1595/// Lookup order, in priority:
1596///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1597///      passed via CLI/env). Both must be present together.
1598///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1599///      to the data dir. Refuses to run without the env flag so an
1600///      operator can't accidentally ship a dev cert in prod.
1601///
1602/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1603/// turning the listener into a mutual-TLS endpoint that requires
1604/// every client to present a chain that anchors at one of the CAs
1605/// in the bundle.
1606fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1607    use crate::utils::secret_file::expand_file_env;
1608
1609    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1610    // surface as warnings; the fallback path (explicit cert paths) will
1611    // cover the common case.
1612    for var in [
1613        "REDDB_GRPC_TLS_CERT",
1614        "REDDB_GRPC_TLS_KEY",
1615        "REDDB_GRPC_TLS_CLIENT_CA",
1616    ] {
1617        if let Err(err) = expand_file_env(var) {
1618            tracing::warn!(
1619                target: "reddb::secrets",
1620                env = %var,
1621                err = %err,
1622                "could not expand *_FILE companion for gRPC TLS"
1623            );
1624        }
1625    }
1626
1627    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1628        (Some(cert), Some(key)) => {
1629            let cert_pem = std::fs::read(cert)
1630                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1631            let key_pem =
1632                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1633            (cert_pem, key_pem)
1634        }
1635        _ => {
1636            // No explicit material → only proceed when dev-mode is on.
1637            let dev = std::env::var("RED_GRPC_TLS_DEV")
1638                .ok()
1639                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1640                .unwrap_or(false);
1641            if !dev {
1642                return Err("gRPC TLS configured but no cert/key supplied — set \
1643                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1644                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1645                    .to_string());
1646            }
1647            let dir = config
1648                .path
1649                .as_ref()
1650                .and_then(|p| p.parent())
1651                .map(PathBuf::from)
1652                .unwrap_or_else(|| PathBuf::from("."));
1653            let (cert_pem_str, key_pem_str) =
1654                crate::wire::tls::generate_self_signed_cert("localhost")
1655                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1656
1657            // Constant-time-friendly fingerprint to stderr so the
1658            // operator can pin a client trust store. We log via
1659            // `tracing::warn!` so it stands out next to ordinary
1660            // listener-online events.
1661            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1662            tracing::warn!(
1663                target: "reddb::security",
1664                transport = "grpc",
1665                cert_sha256 = %fp,
1666                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1667                 DO NOT use in production"
1668            );
1669            // Persist so that restarts reuse the same identity.
1670            let cert_path = dir.join("grpc-tls-cert.pem");
1671            let key_path = dir.join("grpc-tls-key.pem");
1672            if !cert_path.exists() || !key_path.exists() {
1673                let _ = std::fs::create_dir_all(&dir);
1674                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1675                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1676                std::fs::write(&key_path, key_pem_str.as_bytes())
1677                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1678                #[cfg(unix)]
1679                {
1680                    use std::os::unix::fs::PermissionsExt;
1681                    let _ =
1682                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1683                }
1684            }
1685            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1686        }
1687    };
1688
1689    let client_ca_pem = match &config.grpc_tls_client_ca {
1690        Some(path) => Some(
1691            std::fs::read(path)
1692                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1693        ),
1694        None => None,
1695    };
1696
1697    Ok(crate::GrpcTlsOptions {
1698        cert_pem,
1699        key_pem,
1700        client_ca_pem,
1701    })
1702}
1703
1704/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1705/// configured. Logs and continues on TLS-config errors so the plain
1706/// listener stays up; this matches the wire-listener pattern.
1707fn spawn_grpc_tls_listener_if_configured(
1708    config: &ServerCommandConfig,
1709    runtime: RedDBRuntime,
1710    auth_store: Arc<AuthStore>,
1711) {
1712    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1713        return;
1714    };
1715    let tls_opts = match resolve_grpc_tls_options(config) {
1716        Ok(opts) => opts,
1717        Err(err) => {
1718            tracing::error!(
1719                target: "reddb::security",
1720                transport = "grpc",
1721                err = %err,
1722                "gRPC TLS config error; TLS listener will not start"
1723            );
1724            return;
1725        }
1726    };
1727    tokio::spawn(async move {
1728        let server = RedDBGrpcServer::with_options(
1729            runtime,
1730            GrpcServerOptions {
1731                bind_addr: tls_bind.clone(),
1732                tls: Some(tls_opts),
1733            },
1734            auth_store,
1735        );
1736        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1737        if let Err(err) = server.serve().await {
1738            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1739        }
1740    });
1741}
1742
1743/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1744/// lines. Constant-time hash; no token contents leave this fn.
1745fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1746    use sha2::{Digest, Sha256};
1747    let mut h = Sha256::new();
1748    h.update(pem);
1749    let d = h.finalize();
1750    let mut buf = String::with_capacity(64);
1751    for b in d.iter() {
1752        buf.push_str(&format!("{b:02x}"));
1753    }
1754    buf
1755}
1756
1757/// Resolve TLS config: use provided cert/key or auto-generate.
1758fn resolve_wire_tls_config(
1759    config: &ServerCommandConfig,
1760) -> Result<crate::wire::WireTlsConfig, String> {
1761    match (&config.wire_tls_cert, &config.wire_tls_key) {
1762        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1763            cert_path: cert.clone(),
1764            key_path: key.clone(),
1765        }),
1766        _ => {
1767            // Auto-generate self-signed cert
1768            let dir = config
1769                .path
1770                .as_ref()
1771                .and_then(|p| p.parent())
1772                .map(PathBuf::from)
1773                .unwrap_or_else(|| PathBuf::from("."));
1774            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1775        }
1776    }
1777}
1778
1779#[inline(never)]
1780fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1781    let rt_config = detect_runtime_config();
1782    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1783    let cli_telemetry = config.telemetry.clone();
1784    let db_options = config.to_db_options()?;
1785    let mut transport_readiness = TransportReadiness::default();
1786
1787    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1788        .enable_all()
1789        .worker_threads(workers)
1790        .thread_stack_size(rt_config.stack_size)
1791        .build()
1792        .map_err(|err| format!("tokio runtime: {err}"))?;
1793
1794    // Guard lives on the outer thread's stack so it outlives the
1795    // tokio runtime — dropping it only after the listener returns
1796    // flushes the file log writer.
1797    let (runtime, _auth_store, _telemetry_guard) =
1798        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1799    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1800    let signal_runtime = runtime.clone();
1801    tokio_runtime.block_on(async move {
1802        spawn_lifecycle_signal_handler(signal_runtime).await;
1803        spawn_pg_listener(&config, &runtime);
1804        let wire_rt = Arc::new(runtime);
1805        let listener = tokio::net::TcpListener::bind(&wire_addr)
1806            .await
1807            .map_err(|err| {
1808                let reason = format!("wire listener bind {wire_addr}: {err}");
1809                transport_readiness.failed(
1810                    "wire",
1811                    &wire_addr,
1812                    config.wire_bind_explicit,
1813                    reason.clone(),
1814                );
1815                if config.wire_bind_explicit {
1816                    format!("explicit {reason}")
1817                } else {
1818                    reason
1819                }
1820            })?;
1821        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1822        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1823            .await
1824            .map_err(|e| e.to_string())
1825    })
1826}
1827
1828#[inline(never)]
1829fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1830    let rt_config = detect_runtime_config();
1831    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1832    let cli_telemetry = config.telemetry.clone();
1833    let db_options = config.to_db_options()?;
1834
1835    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1836        .enable_all()
1837        .worker_threads(workers)
1838        .thread_stack_size(rt_config.stack_size)
1839        .build()
1840        .map_err(|err| format!("tokio runtime: {err}"))?;
1841
1842    let (runtime, _auth_store, _telemetry_guard) =
1843        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1844    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1845    let signal_runtime = runtime.clone();
1846    tokio_runtime.block_on(async move {
1847        spawn_lifecycle_signal_handler(signal_runtime).await;
1848        let cfg = crate::wire::PgWireConfig {
1849            bind_addr: pg_addr,
1850            ..Default::default()
1851        };
1852        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1853            .await
1854            .map_err(|e| e.to_string())
1855    })
1856}
1857
1858#[inline(never)]
1859fn build_runtime_and_auth_store(
1860    db_options: RedDBOptions,
1861    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1862) -> Result<
1863    (
1864        RedDBRuntime,
1865        Arc<AuthStore>,
1866        Option<crate::telemetry::TelemetryGuard>,
1867    ),
1868    String,
1869> {
1870    // Return the TelemetryGuard so server runners can bind it for
1871    // their full lifetime. Dropping the guard tears down the
1872    // non-blocking log writer thread and, because that writer is
1873    // built with `.lossy(true)`, any subsequent log event routed to
1874    // the file sink is silently dropped — so callers MUST keep the
1875    // returned `Option<TelemetryGuard>` alive until shutdown.
1876    build_runtime_with_telemetry(db_options, cli_telemetry)
1877}
1878
1879/// Open the runtime, initialise structured logging from merged CLI +
1880/// `red_config` settings, and return a guard the caller must keep
1881/// alive for the server lifetime (drop flushes pending log writes).
1882///
1883/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1884/// in red_config, beats the built-in default. A CLI-flag value of
1885/// `None` / empty means "inherit from config or default" — never
1886/// "disable". The one exception is `--no-log-file` which forces
1887/// `log_dir = None` regardless of config.
1888pub(crate) fn build_runtime_with_telemetry(
1889    db_options: RedDBOptions,
1890    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1891) -> Result<
1892    (
1893        RedDBRuntime,
1894        Arc<AuthStore>,
1895        Option<crate::telemetry::TelemetryGuard>,
1896    ),
1897    String,
1898> {
1899    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1900        // Issue #205 — runtime construction failure is the canonical
1901        // StartupFailed phase. The audit sink isn't installed yet
1902        // (it would have been registered inside `with_options`), so
1903        // the emit falls through to tracing+eprintln only — operator
1904        // still sees it on stderr.
1905        let msg = err.to_string();
1906        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1907            phase: "runtime_construction".to_string(),
1908            error: msg.clone(),
1909        }
1910        .emit_global();
1911        msg
1912    })?;
1913
1914    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1915    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1916    // operator asked for a fence; running unfenced would silently
1917    // expose split-brain risk.
1918    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1919        let msg = err.to_string();
1920        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1921            phase: "lease_loop".to_string(),
1922            error: msg.clone(),
1923        }
1924        .emit_global();
1925        msg
1926    })?;
1927
1928    // #213 — edge-triggered disk-space watchdog. Watches the data
1929    // directory; falls back to polling when fanotify is unavailable
1930    // (non-Linux or unprivileged container).
1931    if let Some(data_path) = db_options.data_path.as_deref() {
1932        let watch_dir = data_path.parent().unwrap_or(data_path);
1933        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1934    }
1935
1936    // #214 — inotify config hot-reload watcher. Watches the config file
1937    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1938    // applies hot-reloadable keys without restart.
1939    {
1940        let config_path = crate::runtime::config_overlay::config_file_path();
1941        let store = runtime.db().store();
1942        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1943    }
1944
1945    // Phase 6 logging: merge red_config overrides onto the CLI-built
1946    // telemetry config, then install the global subscriber.
1947    let merged = merge_telemetry_with_config(
1948        cli_telemetry
1949            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1950        &runtime,
1951    );
1952    let telemetry_guard = crate::telemetry::init(merged);
1953
1954    let no_auth = no_auth_active(&db_options);
1955    let auth_store =
1956        if db_options.auth.vault_enabled {
1957            let pager =
1958                runtime.db().store().pager().cloned().ok_or_else(|| {
1959                    "vault requires a paged database (persistent mode)".to_string()
1960                })?;
1961            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1962                .map_err(|err| err.to_string())?;
1963            Arc::new(store)
1964        } else {
1965            Arc::new(AuthStore::new(db_options.auth.clone()))
1966        };
1967    // Issue #663 — when `--no-auth` is active, deliberately skip
1968    // `bootstrap_from_env`. Otherwise a stray
1969    // `REDDB_USERNAME`+`REDDB_PASSWORD` pair in the operator's
1970    // environment would silently create an admin user, defeating the
1971    // whole point of opting into anonymous mode.
1972    if no_auth {
1973        eprintln!("{NO_AUTH_WARNING}");
1974        tracing::warn!("{NO_AUTH_WARNING}");
1975    } else {
1976        auth_store.bootstrap_from_env();
1977    }
1978
1979    // Background session purge (every 5 minutes)
1980    {
1981        let store = Arc::clone(&auth_store);
1982        std::thread::Builder::new()
1983            .name("reddb-session-purge".into())
1984            .spawn(move || loop {
1985                std::thread::sleep(std::time::Duration::from_secs(300));
1986                store.purge_expired_sessions();
1987            })
1988            .ok();
1989    }
1990
1991    Ok((runtime, auth_store, telemetry_guard))
1992}
1993
1994/// Read `red.logging.*` keys from the persistent config store and
1995/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
1996/// explicit CLI flag > red_config > built-in default.
1997///
1998/// The "was a flag passed" signal comes from the `*_explicit` bools
1999/// on `TelemetryConfig`, populated by the CLI parser. This replaces
2000/// an earlier equality-to-default heuristic that silently dropped
2001/// config whenever the CLI-derived default diverged from
2002/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
2003/// non-TTY `format`) and that silently overrode `--no-log-file`.
2004fn merge_telemetry_with_config(
2005    mut cli: crate::telemetry::TelemetryConfig,
2006    runtime: &RedDBRuntime,
2007) -> crate::telemetry::TelemetryConfig {
2008    use crate::storage::schema::Value;
2009
2010    let store = runtime.db().store();
2011
2012    if !cli.level_explicit {
2013        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
2014            cli.level_filter = v.to_string();
2015        }
2016    }
2017    if !cli.format_explicit {
2018        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
2019            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
2020                cli.format = parsed;
2021            }
2022        }
2023    }
2024    if !cli.rotation_keep_days_explicit {
2025        match store.get_config("red.logging.keep_days") {
2026            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
2027                cli.rotation_keep_days = n as u16
2028            }
2029            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
2030                cli.rotation_keep_days = n as u16
2031            }
2032            Some(Value::Text(v)) => {
2033                if let Ok(n) = v.parse::<u16>() {
2034                    cli.rotation_keep_days = n;
2035                }
2036            }
2037            _ => {}
2038        }
2039    }
2040    if !cli.file_prefix_explicit {
2041        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
2042            if !v.is_empty() {
2043                cli.file_prefix = v.to_string();
2044            }
2045        }
2046    }
2047    // --no-log-file is a kill-switch: config cannot resurrect the
2048    // file sink. Explicit --log-dir also wins.
2049    if !cli.log_dir_explicit && !cli.log_file_disabled {
2050        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
2051            if !v.is_empty() {
2052                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
2053            }
2054        }
2055    }
2056
2057    cli
2058}
2059
2060#[cfg(test)]
2061mod telemetry_merge_tests {
2062    use super::*;
2063    use crate::telemetry::{LogFormat, TelemetryConfig};
2064
2065    fn fresh_runtime() -> RedDBRuntime {
2066        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2067    }
2068
2069    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2070        runtime
2071            .db()
2072            .store()
2073            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2074    }
2075
2076    fn cli_base() -> TelemetryConfig {
2077        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
2078        // log_dir = Some(...), format = Json. Nothing marked explicit.
2079        TelemetryConfig {
2080            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2081            format: LogFormat::Json,
2082            ..Default::default()
2083        }
2084    }
2085
2086    #[test]
2087    fn config_log_dir_promoted_when_flag_absent() {
2088        let runtime = fresh_runtime();
2089        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2090        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2091        assert_eq!(
2092            merged.log_dir.as_deref(),
2093            Some(std::path::Path::new("/var/log/reddb"))
2094        );
2095    }
2096
2097    #[test]
2098    fn explicit_log_dir_wins_over_config() {
2099        let runtime = fresh_runtime();
2100        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2101        let mut cli = cli_base();
2102        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2103        cli.log_dir_explicit = true;
2104        let merged = merge_telemetry_with_config(cli, &runtime);
2105        assert_eq!(
2106            merged.log_dir.as_deref(),
2107            Some(std::path::Path::new("/custom/dir"))
2108        );
2109    }
2110
2111    #[test]
2112    fn no_log_file_beats_config_log_dir() {
2113        let runtime = fresh_runtime();
2114        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2115        let mut cli = cli_base();
2116        cli.log_dir = None;
2117        cli.log_file_disabled = true;
2118        let merged = merge_telemetry_with_config(cli, &runtime);
2119        assert!(
2120            merged.log_dir.is_none(),
2121            "--no-log-file must veto config dir"
2122        );
2123    }
2124
2125    #[test]
2126    fn config_format_promoted_on_non_tty_default() {
2127        // On non-TTY, default_telemetry_for_path yields format=Json even
2128        // though TelemetryConfig::default() is Pretty. The old equality
2129        // check silently dropped config here.
2130        let runtime = fresh_runtime();
2131        set_str(&runtime, "red.logging.format", "pretty");
2132        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2133        assert_eq!(merged.format, LogFormat::Pretty);
2134    }
2135
2136    #[test]
2137    fn explicit_format_wins_over_config() {
2138        let runtime = fresh_runtime();
2139        set_str(&runtime, "red.logging.format", "pretty");
2140        let mut cli = cli_base();
2141        cli.format = LogFormat::Json;
2142        cli.format_explicit = true;
2143        let merged = merge_telemetry_with_config(cli, &runtime);
2144        assert_eq!(merged.format, LogFormat::Json);
2145    }
2146}
2147
2148#[inline(never)]
2149fn build_http_server(
2150    runtime: RedDBRuntime,
2151    auth_store: Arc<AuthStore>,
2152    bind_addr: String,
2153) -> RedDBServer {
2154    build_http_server_with_transport_readiness(
2155        runtime,
2156        auth_store,
2157        bind_addr,
2158        TransportReadiness::default(),
2159    )
2160}
2161
2162/// Apply the resolved HTTP limits to a freshly-built `RedDBServer`.
2163///
2164/// Centralised here so every `run_*` path goes through the same
2165/// resolver and the structured startup log line carries the same
2166/// `http_limits.*` fields regardless of transport combination.
2167fn apply_http_limits(
2168    server: RedDBServer,
2169    config: &ServerCommandConfig,
2170    runtime: &RedDBRuntime,
2171) -> RedDBServer {
2172    let store = runtime.db().store();
2173    let resolved =
2174        crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2175            .get_config(key)
2176        {
2177            Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2178            Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2179            Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2180            _ => None,
2181        });
2182    tracing::info!(
2183        target: "reddb::http_limits",
2184        max_handlers = resolved.max_handlers,
2185        handler_timeout_ms = resolved.handler_timeout_ms,
2186        retry_after_secs = resolved.retry_after_secs,
2187        "http_limits resolved"
2188    );
2189    server.with_http_limits(resolved)
2190}
2191
2192#[inline(never)]
2193fn build_http_server_with_transport_readiness(
2194    runtime: RedDBRuntime,
2195    auth_store: Arc<AuthStore>,
2196    bind_addr: String,
2197    transport_readiness: TransportReadiness,
2198) -> RedDBServer {
2199    RedDBServer::with_options(
2200        runtime,
2201        ServerOptions {
2202            bind_addr,
2203            transport_readiness,
2204            ..ServerOptions::default()
2205        },
2206    )
2207    .with_auth(auth_store)
2208}
2209
2210/// PLAN.md Phase 6.2 — build a listener that only serves
2211/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
2212/// when the env var has no host (loopback-only by default per spec).
2213#[inline(never)]
2214fn build_admin_only_server(
2215    runtime: RedDBRuntime,
2216    auth_store: Arc<AuthStore>,
2217    bind_addr: String,
2218) -> RedDBServer {
2219    RedDBServer::with_options(
2220        runtime,
2221        ServerOptions {
2222            bind_addr,
2223            surface: crate::server::ServerSurface::AdminOnly,
2224            ..ServerOptions::default()
2225        },
2226    )
2227    .with_auth(auth_store)
2228}
2229
2230/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
2231/// + `/health/*`. Suitable for Prometheus scrape ports that may be
2232///   exposed wider than the admin port.
2233#[inline(never)]
2234fn build_metrics_only_server(
2235    runtime: RedDBRuntime,
2236    auth_store: Arc<AuthStore>,
2237    bind_addr: String,
2238) -> RedDBServer {
2239    RedDBServer::with_options(
2240        runtime,
2241        ServerOptions {
2242            bind_addr,
2243            surface: crate::server::ServerSurface::MetricsOnly,
2244            ..ServerOptions::default()
2245        },
2246    )
2247    .with_auth(auth_store)
2248}
2249
2250/// Spawn dedicated admin / metrics listeners when the operator set
2251/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
2252/// unset the existing listener keeps serving everything (back-compat).
2253fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2254    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2255        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2256        let _ = server.serve_in_background();
2257        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2258    }
2259    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2260        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2261        let _ = server.serve_in_background();
2262        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2263    }
2264}
2265
2266#[inline(never)]
2267fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2268    let cli_telemetry = config.telemetry.clone();
2269    let mut transport_readiness = TransportReadiness::default();
2270    let Some(listener) = bind_listener_for_startup(
2271        &mut transport_readiness,
2272        "http",
2273        &bind_addr,
2274        config.http_bind_explicit,
2275    )?
2276    else {
2277        return Err(format!(
2278            "no HTTP listener started; implicit bind {} failed",
2279            bind_addr
2280        ));
2281    };
2282    let db_options = config.to_db_options()?;
2283    let (runtime, auth_store, _telemetry_guard) =
2284        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2285    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2286    spawn_admin_metrics_listeners(&runtime, &auth_store);
2287    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2288    let server = build_http_server_with_transport_readiness(
2289        runtime.clone(),
2290        auth_store,
2291        bind_addr.clone(),
2292        transport_readiness,
2293    );
2294    let server = apply_http_limits(server, &config, &runtime);
2295    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2296    server.serve_on(listener).map_err(|err| err.to_string())
2297}
2298
2299/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
2300/// rustls-terminated listener alongside the plain HTTP server. Cert
2301/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
2302///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
2303///   is auto-generated next to the data directory (refused otherwise).
2304fn spawn_http_tls_listener(
2305    config: &ServerCommandConfig,
2306    runtime: &RedDBRuntime,
2307    auth_store: &Arc<AuthStore>,
2308) -> Result<(), String> {
2309    let Some(addr) = config.http_tls_bind_addr.clone() else {
2310        return Ok(());
2311    };
2312
2313    let tls_config = resolve_http_tls_config(config)?;
2314    let server_config = crate::server::tls::build_server_config(&tls_config)
2315        .map_err(|err| format!("HTTP TLS: {err}"))?;
2316
2317    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2318    let server = apply_http_limits(server, config, runtime);
2319    let _handle = server.serve_tls_in_background(server_config);
2320    tracing::info!(
2321        transport = "https",
2322        bind = %addr,
2323        mtls = %tls_config.client_ca_path.is_some(),
2324        "TLS listener online"
2325    );
2326    Ok(())
2327}
2328
2329/// Resolve the HTTP TLS config from CLI / env / dev defaults.
2330fn resolve_http_tls_config(
2331    config: &ServerCommandConfig,
2332) -> Result<crate::server::tls::HttpTlsConfig, String> {
2333    match (&config.http_tls_cert, &config.http_tls_key) {
2334        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2335            cert_path: cert.clone(),
2336            key_path: key.clone(),
2337            client_ca_path: config.http_tls_client_ca.clone(),
2338        }),
2339        (None, None) => {
2340            // Dev-mode auto-generate next to the data directory.
2341            let dir = config
2342                .path
2343                .as_ref()
2344                .and_then(|p| p.parent().map(std::path::PathBuf::from))
2345                .unwrap_or_else(|| std::path::PathBuf::from("."));
2346            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2347                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2348            Ok(crate::server::tls::HttpTlsConfig {
2349                cert_path: auto.cert_path,
2350                key_path: auto.key_path,
2351                client_ca_path: config.http_tls_client_ca.clone(),
2352            })
2353        }
2354        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2355    }
2356}
2357
2358#[inline(never)]
2359fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2360    let workers = config.workers;
2361    let cli_telemetry = config.telemetry.clone();
2362    let db_options = config.to_db_options()?;
2363    let rt_config = detect_runtime_config();
2364    let mut transport_readiness = TransportReadiness::default();
2365    let Some(grpc_listener) = bind_listener_for_startup(
2366        &mut transport_readiness,
2367        "grpc",
2368        &bind_addr,
2369        config.grpc_bind_explicit,
2370    )?
2371    else {
2372        return Err(format!(
2373            "no gRPC listener started; implicit bind {} failed",
2374            bind_addr
2375        ));
2376    };
2377
2378    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2379
2380    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2381        .enable_all()
2382        .worker_threads(worker_threads)
2383        .thread_stack_size(rt_config.stack_size)
2384        .build()
2385        .map_err(|err| format!("tokio runtime: {err}"))?;
2386
2387    // Guard lives on the outer stack so it outlives the tokio runtime.
2388    let (runtime, auth_store, _telemetry_guard) =
2389        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2390    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2391    let signal_runtime = runtime.clone();
2392    tokio_runtime.block_on(async move {
2393        spawn_lifecycle_signal_handler(signal_runtime).await;
2394        // Start wire protocol listeners (plaintext + TLS)
2395        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2396
2397        // Start PostgreSQL wire listener when --pg-bind is configured.
2398        spawn_pg_listener(&config, &runtime);
2399
2400        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
2401        // it spawns a separate listener so plaintext + TLS can run
2402        // side-by-side (50051 plain + 50052 TLS, etc.).
2403        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2404
2405        let server = RedDBGrpcServer::with_options(
2406            runtime,
2407            GrpcServerOptions {
2408                bind_addr: bind_addr.clone(),
2409                tls: None,
2410            },
2411            auth_store,
2412        );
2413
2414        tracing::info!(
2415            transport = "grpc",
2416            bind = %bind_addr,
2417            cpus = rt_config.available_cpus,
2418            workers = worker_threads,
2419            "listener online"
2420        );
2421        server
2422            .serve_on(grpc_listener)
2423            .await
2424            .map_err(|err| err.to_string())
2425    })
2426}
2427
2428#[inline(never)]
2429fn run_dual_server(
2430    config: ServerCommandConfig,
2431    grpc_bind_addr: String,
2432    http_bind_addr: String,
2433) -> Result<(), String> {
2434    let workers = config.workers;
2435    let cli_telemetry = config.telemetry.clone();
2436    let db_options = config.to_db_options()?;
2437    let rt_config = detect_runtime_config();
2438    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2439    let mut transport_readiness = TransportReadiness::default();
2440    let http_listener = bind_listener_for_startup(
2441        &mut transport_readiness,
2442        "http",
2443        &http_bind_addr,
2444        config.http_bind_explicit,
2445    )?;
2446    let grpc_listener = bind_listener_for_startup(
2447        &mut transport_readiness,
2448        "grpc",
2449        &grpc_bind_addr,
2450        config.grpc_bind_explicit,
2451    )?;
2452    if http_listener.is_none() && grpc_listener.is_none() {
2453        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2454    }
2455    let (runtime, auth_store, _telemetry_guard) =
2456        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2457    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2458
2459    spawn_admin_metrics_listeners(&runtime, &auth_store);
2460    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2461
2462    let http_handle = if let Some(listener) = http_listener {
2463        let http_server = build_http_server_with_transport_readiness(
2464            runtime.clone(),
2465            auth_store.clone(),
2466            http_bind_addr.clone(),
2467            transport_readiness.clone(),
2468        );
2469        let http_server = apply_http_limits(http_server, &config, &runtime);
2470        Some(http_server.serve_in_background_on(listener))
2471    } else {
2472        None
2473    };
2474
2475    thread::sleep(Duration::from_millis(150));
2476    if let Some(handle) = http_handle.as_ref() {
2477        if handle.is_finished() {
2478            let handle = http_handle.unwrap();
2479            return match handle.join() {
2480                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2481                Ok(Err(err)) => Err(err.to_string()),
2482                Err(_) => Err("HTTP server thread panicked".to_string()),
2483            };
2484        }
2485    }
2486    if grpc_listener.is_none() {
2487        let Some(handle) = http_handle else {
2488            return Err("no listener started".to_string());
2489        };
2490        return match handle.join() {
2491            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2492            Ok(Err(err)) => Err(err.to_string()),
2493            Err(_) => Err("HTTP server thread panicked".to_string()),
2494        };
2495    }
2496    let grpc_listener = grpc_listener.expect("checked above");
2497
2498    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2499        .enable_all()
2500        .worker_threads(worker_threads)
2501        .thread_stack_size(rt_config.stack_size)
2502        .build()
2503        .map_err(|err| format!("tokio runtime: {err}"))?;
2504
2505    let signal_runtime = runtime.clone();
2506    tokio_runtime.block_on(async move {
2507        spawn_lifecycle_signal_handler(signal_runtime).await;
2508        // Start wire protocol listeners (plaintext + TLS)
2509        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2510
2511        // Start PostgreSQL wire listener when --pg-bind is configured.
2512        spawn_pg_listener(&config, &runtime);
2513
2514        // Optional TLS gRPC listener — runs alongside the plaintext one.
2515        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2516
2517        let server = RedDBGrpcServer::with_options(
2518            runtime,
2519            GrpcServerOptions {
2520                bind_addr: grpc_bind_addr.clone(),
2521                tls: None,
2522            },
2523            auth_store,
2524        );
2525
2526        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2527        tracing::info!(
2528            transport = "grpc",
2529            bind = %grpc_bind_addr,
2530            cpus = rt_config.available_cpus,
2531            workers = worker_threads,
2532            "listener online"
2533        );
2534        server
2535            .serve_on(grpc_listener)
2536            .await
2537            .map_err(|err| err.to_string())
2538    })
2539}
2540
2541#[cfg(test)]
2542mod tests {
2543    use super::*;
2544
2545    #[test]
2546    fn render_systemd_unit_contains_expected_execstart() {
2547        let config = SystemdServiceConfig {
2548            service_name: "reddb".to_string(),
2549            binary_path: PathBuf::from("/usr/local/bin/red"),
2550            run_user: "reddb".to_string(),
2551            run_group: "reddb".to_string(),
2552            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2553            router_bind_addr: None,
2554            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2555            http_bind_addr: None,
2556        };
2557
2558        let unit = render_systemd_unit(&config);
2559        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2560        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2561    }
2562
2563    #[test]
2564    fn systemd_service_config_derives_paths() {
2565        let config = SystemdServiceConfig {
2566            service_name: "reddb-api".to_string(),
2567            binary_path: PathBuf::from("/usr/local/bin/red"),
2568            run_user: "reddb".to_string(),
2569            run_group: "reddb".to_string(),
2570            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2571            router_bind_addr: None,
2572            grpc_bind_addr: None,
2573            http_bind_addr: Some("127.0.0.1:5055".to_string()),
2574        };
2575
2576        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2577        assert_eq!(
2578            config.unit_path(),
2579            PathBuf::from("/etc/systemd/system/reddb-api.service")
2580        );
2581    }
2582
2583    #[test]
2584    fn render_systemd_unit_supports_dual_transport() {
2585        let config = SystemdServiceConfig {
2586            service_name: "reddb".to_string(),
2587            binary_path: PathBuf::from("/usr/local/bin/red"),
2588            run_user: "reddb".to_string(),
2589            run_group: "reddb".to_string(),
2590            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2591            router_bind_addr: None,
2592            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2593            http_bind_addr: Some("0.0.0.0:5055".to_string()),
2594        };
2595
2596        let unit = render_systemd_unit(&config);
2597        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2598        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2599    }
2600
2601    #[test]
2602    fn render_systemd_unit_supports_router_mode() {
2603        let config = SystemdServiceConfig {
2604            service_name: "reddb".to_string(),
2605            binary_path: PathBuf::from("/usr/local/bin/red"),
2606            run_user: "reddb".to_string(),
2607            run_group: "reddb".to_string(),
2608            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2609            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2610            grpc_bind_addr: None,
2611            http_bind_addr: None,
2612        };
2613
2614        let unit = render_systemd_unit(&config);
2615        assert!(unit.contains("--bind 127.0.0.1:5050"));
2616        assert!(!unit.contains("--grpc-bind"));
2617        assert!(!unit.contains("--http-bind"));
2618    }
2619
2620    #[test]
2621    fn explicit_bind_collision_is_fatal() {
2622        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2623        let addr = held.local_addr().expect("held addr").to_string();
2624        let mut readiness = TransportReadiness::default();
2625
2626        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2627
2628        assert!(error.contains("explicit http listener bind"));
2629        assert_eq!(readiness.active.len(), 0);
2630        assert_eq!(readiness.failed.len(), 1);
2631        assert!(readiness.failed[0].explicit);
2632        assert_eq!(readiness.failed[0].bind_addr, addr);
2633    }
2634
2635    // ---------- Issue #663 — `--no-auth` / `--dev` ----------
2636
2637    // Env access in tests is process-global; serialise the two
2638    // `--no-auth` tests so the REDDB_USERNAME / REDDB_PASSWORD pair
2639    // one of them sets cannot leak into the other under cargo's
2640    // default parallel runner.
2641    fn no_auth_env_lock() -> &'static std::sync::Mutex<()> {
2642        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
2643        LOCK.get_or_init(|| std::sync::Mutex::new(()))
2644    }
2645
2646    fn no_auth_test_config(no_auth: bool) -> ServerCommandConfig {
2647        ServerCommandConfig {
2648            path: None,
2649            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2650            router_bind_explicit: false,
2651            grpc_bind_addr: None,
2652            grpc_bind_explicit: false,
2653            grpc_tls_bind_addr: None,
2654            grpc_tls_cert: None,
2655            grpc_tls_key: None,
2656            grpc_tls_client_ca: None,
2657            http_bind_addr: None,
2658            http_bind_explicit: false,
2659            http_tls_bind_addr: None,
2660            http_tls_cert: None,
2661            http_tls_key: None,
2662            http_tls_client_ca: None,
2663            wire_bind_addr: None,
2664            wire_bind_explicit: false,
2665            wire_tls_bind_addr: None,
2666            wire_tls_cert: None,
2667            wire_tls_key: None,
2668            pg_bind_addr: None,
2669            create_if_missing: true,
2670            read_only: false,
2671            role: "standalone".to_string(),
2672            primary_addr: None,
2673            // Operator-set `--vault`: `--no-auth` must override this
2674            // alongside REDDB_USERNAME/PASSWORD.
2675            vault: true,
2676            no_auth,
2677            workers: None,
2678            telemetry: None,
2679            http_limits_cli: crate::server::HttpLimitsCliInput::default(),
2680        }
2681    }
2682
2683    #[test]
2684    fn no_auth_flag_disables_every_auth_knob_and_stamps_metadata() {
2685        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2686        // Pre-existing env that *would* turn auth on if `--no-auth`
2687        // weren't the last word. The acceptance criterion is that
2688        // the flag wins over env.
2689        // SAFETY: serialised by `no_auth_env_lock` above.
2690        unsafe {
2691            std::env::set_var("REDDB_USERNAME", "admin");
2692            std::env::set_var("REDDB_PASSWORD", "hunter2");
2693        }
2694        let config = no_auth_test_config(true);
2695        let options = config.to_db_options().expect("to_db_options");
2696
2697        assert!(no_auth_active(&options), "metadata should be stamped");
2698        assert!(!options.auth.enabled, "auth.enabled must be forced off");
2699        assert!(
2700            !options.auth.require_auth,
2701            "require_auth must be forced off"
2702        );
2703        assert!(
2704            !options.auth.vault_enabled,
2705            "vault_enabled must be forced off (overrides --vault)"
2706        );
2707        assert_eq!(
2708            options.metadata.get(NO_AUTH_META).map(String::as_str),
2709            Some("true"),
2710        );
2711
2712        // SAFETY: serialised by `no_auth_env_lock` above.
2713        unsafe {
2714            std::env::remove_var("REDDB_USERNAME");
2715            std::env::remove_var("REDDB_PASSWORD");
2716        }
2717    }
2718
2719    #[test]
2720    fn default_behaviour_without_no_auth_flag_is_unchanged() {
2721        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2722        let config = no_auth_test_config(false);
2723        let options = config.to_db_options().expect("to_db_options");
2724
2725        assert!(
2726            !no_auth_active(&options),
2727            "default boot must not be marked no-auth"
2728        );
2729        assert!(
2730            options.metadata.get(NO_AUTH_META).is_none(),
2731            "metadata key must be absent when flag is off"
2732        );
2733        // `--vault` should still take effect when `--no-auth` is not set.
2734        assert!(options.auth.vault_enabled);
2735    }
2736
2737    #[test]
2738    fn no_auth_active_blocks_bootstrap_from_env() {
2739        let _g = no_auth_env_lock().lock().unwrap_or_else(|e| e.into_inner());
2740        // SAFETY: serialised by `no_auth_env_lock` above. The pair
2741        // would normally cause `AuthStore::bootstrap_from_env` to
2742        // create an admin; the boot pipeline must suppress that call
2743        // whenever `no_auth_active` is true.
2744        unsafe {
2745            std::env::set_var("REDDB_USERNAME", "admin");
2746            std::env::set_var("REDDB_PASSWORD", "hunter2");
2747        }
2748
2749        let options = no_auth_test_config(true)
2750            .to_db_options()
2751            .expect("to_db_options");
2752
2753        // Mirror the exact branch in `build_runtime_with_telemetry`:
2754        // build a non-vault AuthStore from `options.auth`, then call
2755        // `bootstrap_from_env` *only* when the no-auth gate is off.
2756        let auth_store = AuthStore::new(options.auth.clone());
2757        if !no_auth_active(&options) {
2758            auth_store.bootstrap_from_env();
2759        }
2760
2761        assert!(
2762            auth_store.needs_bootstrap(),
2763            "no admin user must be bootstrapped under --no-auth even with REDDB_USERNAME/PASSWORD set"
2764        );
2765
2766        // SAFETY: serialised by `no_auth_env_lock` above.
2767        unsafe {
2768            std::env::remove_var("REDDB_USERNAME");
2769            std::env::remove_var("REDDB_PASSWORD");
2770        }
2771    }
2772
2773    #[test]
2774    fn implicit_bind_collision_degrades() {
2775        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2776        let addr = held.local_addr().expect("held addr").to_string();
2777        let mut readiness = TransportReadiness::default();
2778
2779        let listener =
2780            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2781
2782        assert!(listener.is_none());
2783        assert_eq!(readiness.active.len(), 0);
2784        assert_eq!(readiness.failed.len(), 1);
2785        assert!(!readiness.failed[0].explicit);
2786        assert_eq!(readiness.failed[0].bind_addr, addr);
2787    }
2788}