Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18/// Detect available CPU cores and suggest worker thread count.
19pub fn detect_runtime_config() -> RuntimeConfig {
20    let cpus = thread::available_parallelism()
21        .map(|n| n.get())
22        .unwrap_or(1);
23
24    // Reserve 1 core for OS, use the rest for workers (minimum 1)
25    let suggested_workers = cpus.saturating_sub(1).max(1);
26
27    RuntimeConfig {
28        available_cpus: cpus,
29        suggested_workers,
30        stack_size: 8 * 1024 * 1024, // 16MB default
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36    pub available_cpus: usize,
37    pub suggested_workers: usize,
38    pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43    Grpc,
44    Http,
45    Wire,
46}
47
48impl ServerTransport {
49    pub const fn as_str(self) -> &'static str {
50        match self {
51            Self::Grpc => "gRPC",
52            Self::Http => "HTTP",
53            Self::Wire => "wire",
54        }
55    }
56
57    pub const fn default_bind_addr(self) -> &'static str {
58        match self {
59            Self::Grpc => "127.0.0.1:5555",
60            Self::Http => "127.0.0.1:5055",
61            Self::Wire => "127.0.0.1:5050",
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68    pub path: Option<PathBuf>,
69    pub router_bind_addr: Option<String>,
70    pub router_bind_explicit: bool,
71    pub grpc_bind_addr: Option<String>,
72    pub grpc_bind_explicit: bool,
73    /// TLS-encrypted gRPC bind address. Can run side-by-side with
74    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
75    /// stand alone for TLS-only deploys. Defaults to `None`.
76    pub grpc_tls_bind_addr: Option<String>,
77    /// Path to PEM-encoded gRPC server certificate. Resolved through
78    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
79    /// mounts). When `None` and dev-mode is enabled
80    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
81    /// cert and prints its SHA-256 fingerprint to stderr.
82    pub grpc_tls_cert: Option<PathBuf>,
83    /// Path to PEM-encoded gRPC server private key. Same env-var
84    /// pattern as `grpc_tls_cert`.
85    pub grpc_tls_key: Option<PathBuf>,
86    /// Optional path to a PEM bundle of trust anchors used to verify
87    /// client certificates. When set, the gRPC listener requires
88    /// every client to present a cert that chains to this CA — i.e.
89    /// mutual TLS. When unset, one-way TLS only.
90    pub grpc_tls_client_ca: Option<PathBuf>,
91    pub http_bind_addr: Option<String>,
92    pub http_bind_explicit: bool,
93    /// HTTPS bind address. When set, the HTTP server also serves a
94    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
95    /// run side by side (e.g. 8080 plain + 8443 TLS).
96    pub http_tls_bind_addr: Option<String>,
97    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
98    /// companion when not set explicitly.
99    pub http_tls_cert: Option<PathBuf>,
100    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
101    /// companion when not set explicitly.
102    pub http_tls_key: Option<PathBuf>,
103    /// Optional PEM CA bundle. When set, the HTTPS listener requires
104    /// every client to present a cert that chains to a CA in this
105    /// bundle (mTLS). When unset, plain server-side TLS only.
106    pub http_tls_client_ca: Option<PathBuf>,
107    pub wire_bind_addr: Option<String>,
108    pub wire_bind_explicit: bool,
109    /// TLS-encrypted wire protocol bind address
110    pub wire_tls_bind_addr: Option<String>,
111    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
112    pub wire_tls_cert: Option<PathBuf>,
113    /// Path to TLS key PEM
114    pub wire_tls_key: Option<PathBuf>,
115    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
116    /// When set the server accepts psql / JDBC / DBeaver clients on
117    /// this port via the v3 protocol. Defaults to None (disabled).
118    pub pg_bind_addr: Option<String>,
119    pub create_if_missing: bool,
120    pub read_only: bool,
121    pub role: String,
122    pub primary_addr: Option<String>,
123    pub vault: bool,
124    /// Override worker thread count (None = auto-detect from CPUs)
125    pub workers: Option<usize>,
126    /// Telemetry config (Phase 6 logging). `None` falls back to the
127    /// built-in default derived from `path` + stderr-only.
128    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
129    /// HTTP handler-pool knobs from the CLI layer (issue #574 slice 5).
130    /// Carries flag and env values; red_config and built-in defaults
131    /// are applied later by [`crate::server::http_limits::resolve_http_limits`]
132    /// once the runtime is open.
133    pub http_limits_cli: crate::server::HttpLimitsCliInput,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct TransportListenerState {
138    pub transport: String,
139    pub bind_addr: String,
140    pub explicit: bool,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct TransportListenerFailure {
145    pub transport: String,
146    pub bind_addr: String,
147    pub explicit: bool,
148    pub reason: String,
149}
150
151#[derive(Debug, Clone, Default, PartialEq, Eq)]
152pub struct TransportReadiness {
153    pub active: Vec<TransportListenerState>,
154    pub failed: Vec<TransportListenerFailure>,
155}
156
157impl TransportReadiness {
158    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
159        self.active.push(TransportListenerState {
160            transport: transport.to_string(),
161            bind_addr: bind_addr.to_string(),
162            explicit,
163        });
164    }
165
166    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
167        self.failed.push(TransportListenerFailure {
168            transport: transport.to_string(),
169            bind_addr: bind_addr.to_string(),
170            explicit,
171            reason,
172        });
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct SystemdServiceConfig {
178    pub service_name: String,
179    pub binary_path: PathBuf,
180    pub run_user: String,
181    pub run_group: String,
182    pub data_path: PathBuf,
183    pub router_bind_addr: Option<String>,
184    pub grpc_bind_addr: Option<String>,
185    pub http_bind_addr: Option<String>,
186}
187
188impl SystemdServiceConfig {
189    pub fn data_dir(&self) -> PathBuf {
190        self.data_path
191            .parent()
192            .map(PathBuf::from)
193            .unwrap_or_else(|| PathBuf::from("."))
194    }
195
196    pub fn unit_path(&self) -> PathBuf {
197        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
198    }
199}
200
201/// Build a sane default `TelemetryConfig` from a server path when the
202/// caller didn't set one explicitly. Writes rotating logs into the
203/// parent directory of the DB file (or `./logs` for in-memory /
204/// pathless runs). Level defaults to `info`, pretty stderr format.
205pub fn default_telemetry_for_path(
206    path: Option<&std::path::Path>,
207) -> crate::telemetry::TelemetryConfig {
208    let log_dir = match path {
209        Some(p) => p
210            .parent()
211            .map(|parent| parent.join("logs"))
212            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
213        None => None, // in-memory — no file, stderr-only
214    };
215    crate::telemetry::TelemetryConfig {
216        log_dir,
217        file_prefix: "reddb.log".to_string(),
218        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
219        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
220            crate::telemetry::LogFormat::Pretty
221        } else {
222            crate::telemetry::LogFormat::Json
223        },
224        rotation_keep_days: 14,
225        service_name: "reddb",
226        // Implicit defaults — no CLI flag has claimed these values yet.
227        level_explicit: false,
228        format_explicit: false,
229        rotation_keep_days_explicit: false,
230        file_prefix_explicit: false,
231        log_dir_explicit: false,
232        log_file_disabled: false,
233    }
234}
235
236/// Metadata key used to thread the parsed backup config from
237/// `to_db_options` down to runner threads. The runner reads it back
238/// (via `runner_backup_intervals`) to spawn the periodic checkpointer
239/// and WAL-flush tasks. Threading through `metadata` avoids extending
240/// `RedDBOptions` with a public field that has no meaning for
241/// library consumers.
242const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
243const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
244const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
245/// Issue #519 — threaded through `metadata` like the existing interval
246/// values. `0` (default) means "feature disabled" and the runner skips
247/// the lag-monitor wiring entirely.
248const BACKUP_PAUSE_ON_LAG_META: &str = "red.boot.backup.pause_on_lag_secs";
249
250impl ServerCommandConfig {
251    fn to_db_options(&self) -> Result<RedDBOptions, String> {
252        let mut options = match &self.path {
253            Some(path) => RedDBOptions::persistent(path),
254            None => RedDBOptions::in_memory(),
255        };
256
257        options.mode = StorageMode::Persistent;
258        options.create_if_missing = self.create_if_missing;
259        // PLAN.md Phase 4.3 — read_only resolution priority:
260        //   1. CLI flag (`--readonly`) — explicit operator intent.
261        //   2. `RED_READONLY=true` env — orchestrator override.
262        //   3. Persisted `<data>/.runtime-state.json` from a prior
263        //      `POST /admin/readonly` — survives restart.
264        //   4. Default `false`.
265        options.read_only = self.read_only
266            || env_nonempty("RED_READONLY")
267                .or_else(|| env_nonempty("REDDB_READONLY"))
268                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
269                .unwrap_or(false)
270            || self.path.as_ref().is_some_and(|data_path| {
271                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
272                    data_path,
273                ))
274                .unwrap_or(false)
275            });
276
277        options.replication = match self.role.as_str() {
278            "primary" => ReplicationConfig::primary(),
279            "replica" => {
280                let primary_addr = self
281                    .primary_addr
282                    .clone()
283                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
284                // Public-mutation rejection on replicas is enforced by
285                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
286                // Leaving `options.read_only = false` keeps the pager
287                // writable so the internal logical-WAL apply path can
288                // ingest records from the primary; WriteGate ensures no
289                // client request reaches storage.
290                ReplicationConfig::replica(primary_addr)
291            }
292            _ => ReplicationConfig::standalone(),
293        };
294
295        if self.vault {
296            options.auth.vault_enabled = true;
297        }
298
299        // Issue #517 — canonical `REDDB_BACKUP_*` contract takes
300        // precedence. On Err, surface the partial-config message so
301        // boot exits non-zero with a clear operator message. On
302        // Ok(None), fall through to the legacy backend-from-env path.
303        match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
304            Err(msg) => {
305                return Err(format!("backup bootstrap: {msg}"));
306            }
307            Ok(Some(cfg)) => {
308                apply_backup_config(&mut options, &cfg);
309            }
310            Ok(None) => {
311                configure_remote_backend_from_env(&mut options);
312            }
313        }
314
315        Ok(options)
316    }
317
318    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
319        let mut transports = Vec::with_capacity(3);
320        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
321            transports.push(ServerTransport::Grpc);
322        }
323        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
324            transports.push(ServerTransport::Http);
325        }
326        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
327            transports.push(ServerTransport::Wire);
328        }
329        transports
330    }
331}
332
333/// Read an env var, treating empty / whitespace-only as `None`.
334/// Honors the `<NAME>_FILE` convention. Re-exports the shared
335/// `crate::utils::env_with_file_fallback` helper so call sites in
336/// this module can keep their short local name.
337fn env_nonempty(name: &str) -> Option<String> {
338    crate::utils::env_with_file_fallback(name)
339}
340
341fn env_truthy(name: &str) -> bool {
342    env_nonempty(name)
343        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
344        .unwrap_or(false)
345}
346
347/// Apply a parsed [`BackupConfig`] to `options`. Wires the S3
348/// backend via `with_remote_backend` + `with_atomic_remote_backend`
349/// when the `backend-s3` feature is on, stashes intervals + backend
350/// kind in `metadata` so the runner can spawn the periodic tasks,
351/// and emits the startup INFO log required by #517.
352fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
353    let endpoint_host = endpoint_host(&cfg.endpoint);
354
355    options.metadata.insert(
356        BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
357        cfg.checkpoint_interval_secs.to_string(),
358    );
359    options.metadata.insert(
360        BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
361        cfg.wal_flush_interval_secs.to_string(),
362    );
363    options
364        .metadata
365        .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
366    options.metadata.insert(
367        BACKUP_PAUSE_ON_LAG_META.to_string(),
368        cfg.pause_on_lag_secs.to_string(),
369    );
370
371    #[cfg(feature = "backend-s3")]
372    {
373        let s3_cfg = crate::storage::backend::S3Config {
374            endpoint: cfg.endpoint.clone(),
375            bucket: cfg.bucket.clone(),
376            key_prefix: cfg.prefix.clone(),
377            access_key: cfg.access_key_id.clone(),
378            secret_key: cfg.secret_access_key.clone(),
379            region: cfg.region.clone(),
380            path_style: true,
381        };
382        let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
383        options.remote_backend = Some(backend.clone());
384        options.remote_backend_atomic = Some(backend);
385        // Use the operator-supplied prefix as the snapshot key root.
386        // The existing helpers (`default_snapshot_prefix`,
387        // `default_wal_archive_prefix`) derive sub-prefixes from the
388        // parent of `remote_key`.
389        let trimmed = cfg.prefix.trim_end_matches('/');
390        options.remote_key = Some(format!("{}/data.rdb", trimmed));
391
392        tracing::info!(
393            backend = "s3",
394            endpoint = %endpoint_host,
395            bucket = %cfg.bucket,
396            prefix = %cfg.prefix,
397            checkpoint_interval_secs = cfg.checkpoint_interval_secs,
398            wal_flush_interval_secs = cfg.wal_flush_interval_secs,
399            "backup backend configured from REDDB_BACKUP_* env"
400        );
401    }
402
403    #[cfg(not(feature = "backend-s3"))]
404    {
405        tracing::warn!(
406            backend = "s3",
407            endpoint = %endpoint_host,
408            bucket = %cfg.bucket,
409            prefix = %cfg.prefix,
410            "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
411             backend wiring skipped (archiver/checkpointer also disabled)"
412        );
413    }
414}
415
416fn endpoint_host(endpoint: &str) -> &str {
417    let after_scheme = endpoint
418        .split_once("://")
419        .map(|(_, r)| r)
420        .unwrap_or(endpoint);
421    after_scheme.split('/').next().unwrap_or(after_scheme)
422}
423
424/// If `options` carry backup-task intervals threaded in via
425/// [`apply_backup_config`], spawn periodic checkpointer + WAL-flush
426/// tasks against `runtime`. Returns a `BackupTasksHandle` that
427/// stops the tasks when dropped; runners keep it alive for the
428/// server lifetime.
429fn spawn_backup_tasks_if_configured(
430    options: &RedDBOptions,
431    runtime: &RedDBRuntime,
432) -> Option<BackupTasksHandle> {
433    let checkpoint_secs: u64 = options
434        .metadata
435        .get(BACKUP_INTERVAL_META_CHECKPOINT)?
436        .parse()
437        .ok()?;
438    let wal_secs: u64 = options
439        .metadata
440        .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
441        .parse()
442        .ok()?;
443    // Issue #519 — opt-in graceful read-only when remote archive lag
444    // exceeds the threshold. `0` (default) keeps legacy behaviour.
445    let pause_on_lag_secs: u64 = options
446        .metadata
447        .get(BACKUP_PAUSE_ON_LAG_META)
448        .and_then(|raw| raw.parse().ok())
449        .unwrap_or(0);
450    options.remote_backend.as_ref()?;
451
452    let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
453
454    // Stamp the gate with the threshold + a "now" baseline so the
455    // first auto-pause can only fire after `pause_on_lag_secs` of
456    // archive silence. The poller below re-evaluates on the same
457    // clock the archive-task wrapper uses.
458    if pause_on_lag_secs > 0 {
459        let now_ms = std::time::SystemTime::now()
460            .duration_since(std::time::UNIX_EPOCH)
461            .map(|d| d.as_millis() as u64)
462            .unwrap_or(0);
463        runtime
464            .write_gate()
465            .configure_archive_lag_pause(pause_on_lag_secs, now_ms);
466        tracing::info!(
467            pause_on_lag_secs,
468            "archive-lag pause enabled — engine will transition to read-only after threshold seconds of archiver silence"
469        );
470    }
471
472    let checkpoint_handle = {
473        let stop = Arc::clone(&stop);
474        let runtime = runtime.clone();
475        let interval = Duration::from_secs(checkpoint_secs);
476        thread::Builder::new()
477            .name("red-checkpointer".into())
478            .spawn(move || {
479                periodic_loop(stop, interval, move || {
480                    if let Err(err) = runtime.checkpoint() {
481                        tracing::warn!(error = %err, "periodic checkpoint failed");
482                    }
483                })
484            })
485            .ok()
486    };
487
488    let archiver_handle = {
489        let stop = Arc::clone(&stop);
490        let runtime = runtime.clone();
491        let interval = Duration::from_secs(wal_secs);
492        let lag_enabled = pause_on_lag_secs > 0;
493        thread::Builder::new()
494            .name("red-wal-archiver".into())
495            .spawn(move || {
496                periodic_loop(stop, interval, move || match runtime.trigger_backup() {
497                    Ok(_) if lag_enabled => {
498                        let now_ms = std::time::SystemTime::now()
499                            .duration_since(std::time::UNIX_EPOCH)
500                            .map(|d| d.as_millis() as u64)
501                            .unwrap_or(0);
502                        runtime.write_gate().record_archive_success(now_ms);
503                        // Same-tick re-evaluation: catching up while
504                        // already auto-paused must auto-resume without
505                        // waiting for the poller's cadence.
506                        runtime.write_gate().evaluate_archive_lag(now_ms);
507                    }
508                    Ok(_) => {}
509                    Err(err) => {
510                        tracing::warn!(error = %err, "periodic WAL archive/backup failed");
511                    }
512                })
513            })
514            .ok()
515    };
516
517    // Issue #519 — lag poller. Wakes on a short cadence so a frozen
518    // archiver (the worst case) still flips the gate within ~5s of
519    // crossing the threshold, instead of waiting up to a full
520    // `wal_secs` for the next archive attempt that may never come.
521    let lag_monitor_handle = if pause_on_lag_secs > 0 {
522        let stop = Arc::clone(&stop);
523        let runtime = runtime.clone();
524        // 5s is short enough that the threshold is honoured tightly
525        // and long enough that the atomic loads stay invisible at the
526        // process level.
527        let interval = Duration::from_secs(5);
528        thread::Builder::new()
529            .name("red-archive-lag-monitor".into())
530            .spawn(move || {
531                periodic_loop(stop, interval, move || {
532                    let now_ms = std::time::SystemTime::now()
533                        .duration_since(std::time::UNIX_EPOCH)
534                        .map(|d| d.as_millis() as u64)
535                        .unwrap_or(0);
536                    let was_paused = runtime.write_gate().is_auto_paused();
537                    let now_paused = runtime.write_gate().evaluate_archive_lag(now_ms);
538                    if now_paused && !was_paused {
539                        tracing::warn!(
540                            pause_on_lag_secs,
541                            last_archive_at_ms = runtime.write_gate().last_archive_at_ms(),
542                            "WAL archive lag exceeded threshold — entering graceful read-only mode (issue #519)"
543                        );
544                    } else if !now_paused && was_paused {
545                        tracing::info!(
546                            "WAL archive caught up — exiting graceful read-only mode (issue #519)"
547                        );
548                    }
549                })
550            })
551            .ok()
552    } else {
553        None
554    };
555
556    tracing::info!(
557        checkpoint_interval_secs = checkpoint_secs,
558        wal_flush_interval_secs = wal_secs,
559        "backup tasks spawned (checkpointer + WAL archiver)"
560    );
561
562    Some(BackupTasksHandle {
563        stop,
564        _checkpoint_handle: checkpoint_handle,
565        _archiver_handle: archiver_handle,
566        _lag_monitor_handle: lag_monitor_handle,
567    })
568}
569
570/// Shutdown handle for the periodic checkpointer + archiver tasks.
571/// Drop signals both loops to exit on their next wake.
572pub struct BackupTasksHandle {
573    stop: Arc<std::sync::atomic::AtomicBool>,
574    _checkpoint_handle: Option<thread::JoinHandle<()>>,
575    _archiver_handle: Option<thread::JoinHandle<()>>,
576    /// Issue #519 — periodic archive-lag poller, only spawned when
577    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS > 0`.
578    _lag_monitor_handle: Option<thread::JoinHandle<()>>,
579}
580
581impl Drop for BackupTasksHandle {
582    fn drop(&mut self) {
583        self.stop.store(true, std::sync::atomic::Ordering::Release);
584    }
585}
586
587fn periodic_loop<F: FnMut()>(
588    stop: Arc<std::sync::atomic::AtomicBool>,
589    interval: Duration,
590    mut tick: F,
591) {
592    // Wake on a short cadence so shutdown is responsive even when the
593    // operator-configured interval is large (e.g. 1h checkpoint).
594    let wake = Duration::from_secs(1);
595    let mut elapsed = Duration::ZERO;
596    while !stop.load(std::sync::atomic::Ordering::Acquire) {
597        thread::sleep(wake);
598        elapsed += wake;
599        if elapsed >= interval {
600            tick();
601            elapsed = Duration::ZERO;
602        }
603    }
604}
605
606fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
607    // PLAN.md (cloud-agnostic) — prefer the new spelling
608    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
609    // existing dev installs. `none` (default) means standalone — no
610    // remote backend, valid for development and on-prem without
611    // remote.
612    let backend = env_nonempty("RED_BACKEND")
613        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
614        .unwrap_or_else(|| "none".to_string())
615        .to_ascii_lowercase();
616
617    match backend.as_str() {
618        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
619        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
620        // IDrive, Storj. The `path_style` toggle in S3Config picks
621        // the right addressing for self-hosted vs hosted.
622        "s3" | "minio" | "r2" => {
623            #[cfg(feature = "backend-s3")]
624            {
625                if let Some(config) = s3_config_from_env() {
626                    let remote_key = env_nonempty("RED_REMOTE_KEY")
627                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
628                        .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
629                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
630                    options.remote_backend = Some(backend.clone());
631                    options.remote_backend_atomic = Some(backend);
632                    options.remote_key = Some(remote_key);
633                }
634            }
635            #[cfg(not(feature = "backend-s3"))]
636            {
637                tracing::warn!(
638                    backend = %backend,
639                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
640                );
641            }
642        }
643        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
644        // calls it `fs`; legacy code shipped it as `local`. Both
645        // names map to LocalBackend, with the remote_key derived
646        // from `RED_FS_PATH` + a per-database suffix when provided.
647        "fs" | "local" => {
648            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
649            let remote_key = match (
650                base_path,
651                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
652            ) {
653                (Some(base), Some(rel)) => Some(format!(
654                    "{}/{}",
655                    base.trim_end_matches('/'),
656                    rel.trim_start_matches('/')
657                )),
658                (Some(base), None) => Some(format!(
659                    "{}/clusters/dev/data.rdb",
660                    base.trim_end_matches('/')
661                )),
662                (None, Some(rel)) => Some(rel),
663                (None, None) => None,
664            };
665            if let Some(key) = remote_key {
666                let backend = Arc::new(crate::storage::backend::LocalBackend);
667                options.remote_backend = Some(backend.clone());
668                options.remote_backend_atomic = Some(backend);
669                options.remote_key = Some(key);
670            }
671        }
672        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
673        // portability: any service exposing PUT/GET/DELETE serves
674        // as a backup target. Optional auth via *_FILE secret
675        // path keeps the token out of the env.
676        "http" => {
677            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
678                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
679            {
680                Some(u) => u,
681                None => {
682                    tracing::warn!(
683                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
684                    );
685                    return;
686                }
687            };
688            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
689                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
690                .unwrap_or_default();
691            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
692                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
693            {
694                std::fs::read_to_string(&path)
695                    .ok()
696                    .map(|s| s.trim().to_string())
697                    .filter(|s| !s.is_empty())
698            } else {
699                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
700                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
701            };
702
703            let mut config =
704                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
705            if let Some(auth) = auth_header {
706                config = config.with_auth_header(auth);
707            }
708            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
709                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
710                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
711            config = config.with_conditional_writes(conditional_writes);
712            // Always populate the snapshot-transport handle. When the
713            // operator confirmed CAS support, also populate the atomic
714            // handle via AtomicHttpBackend — without that flag,
715            // LeaseStore must remain unreachable on this backend.
716            if conditional_writes {
717                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
718                    Ok(atomic) => {
719                        let atomic_arc = Arc::new(atomic);
720                        options.remote_backend = Some(atomic_arc.clone());
721                        options.remote_backend_atomic = Some(atomic_arc);
722                    }
723                    Err(err) => {
724                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
725                        options.remote_backend =
726                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
727                    }
728                }
729            } else {
730                options.remote_backend =
731                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
732            }
733            options.remote_key = env_nonempty("RED_REMOTE_KEY")
734                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
735                .or_else(|| Some("clusters/dev/data.rdb".to_string()));
736        }
737        // `none` is the explicit standalone — no remote, no backup
738        // pipeline. Boot never blocks on network reachability.
739        "none" | "" => {}
740        other => {
741            tracing::warn!(
742                backend = %other,
743                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
744            );
745        }
746    }
747}
748
749/// Resolve a value from env, accepting both the cloud-agnostic
750/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
751/// kept for existing dev installs. The new spelling wins; the
752/// legacy spelling is read with a warning hint in callers' logs.
753#[cfg(feature = "backend-s3")]
754fn env_s3(suffix: &str) -> Option<String> {
755    env_nonempty(&format!("RED_S3_{suffix}"))
756        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
757}
758
759/// Read a secret value from either the literal env var or a file
760/// path supplied via `*_FILE` (PLAN.md spec — compatible with
761/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
762/// secrets). The `_FILE` variant wins so an operator can set it to
763/// override the inline value without touching the inline env.
764#[cfg(feature = "backend-s3")]
765fn env_s3_secret(suffix: &str) -> Option<String> {
766    let file_key_red = format!("RED_S3_{suffix}_FILE");
767    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
768    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
769        return std::fs::read_to_string(&path)
770            .ok()
771            .map(|s| s.trim().to_string())
772            .filter(|s| !s.is_empty());
773    }
774    env_s3(suffix)
775}
776
777#[cfg(feature = "backend-s3")]
778fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
779    let endpoint = env_s3("ENDPOINT")?;
780    let bucket = env_s3("BUCKET")?;
781    let access_key = env_s3_secret("ACCESS_KEY")?;
782    let secret_key = env_s3_secret("SECRET_KEY")?;
783    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
784    let key_prefix = env_s3("KEY_PREFIX")
785        .or_else(|| env_s3("PREFIX"))
786        .unwrap_or_default();
787    let path_style = env_s3("PATH_STYLE")
788        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
789        .unwrap_or(true);
790    Some(crate::storage::backend::S3Config {
791        endpoint,
792        bucket,
793        key_prefix,
794        access_key,
795        secret_key,
796        region,
797        path_style,
798    })
799}
800
801pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
802    let data_dir = config.data_dir();
803    let exec_start = render_systemd_exec_start(config);
804    format!(
805        "[Unit]\n\
806Description=RedDB unified database service\n\
807After=network-online.target\n\
808Wants=network-online.target\n\
809\n\
810[Service]\n\
811Type=simple\n\
812User={user}\n\
813Group={group}\n\
814WorkingDirectory={workdir}\n\
815ExecStart={exec_start}\n\
816Restart=always\n\
817RestartSec=2\n\
818LimitSTACK=16M\n\
819NoNewPrivileges=true\n\
820PrivateTmp=true\n\
821ProtectSystem=strict\n\
822ProtectHome=true\n\
823ProtectControlGroups=true\n\
824ProtectKernelTunables=true\n\
825ProtectKernelModules=true\n\
826RestrictNamespaces=true\n\
827LockPersonality=true\n\
828MemoryDenyWriteExecute=true\n\
829ReadWritePaths={workdir}\n\
830\n\
831[Install]\n\
832WantedBy=multi-user.target\n",
833        user = config.run_user,
834        group = config.run_group,
835        workdir = data_dir.display(),
836        exec_start = exec_start,
837    )
838}
839
840/// Install a systemd unit + start the service.
841///
842/// Linux-only. The helper shells out to `systemctl`, `useradd`,
843/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
844/// Windows or macOS. The Windows/macOS branch returns a hard error so
845/// callers (the CLI) surface a clear message instead of a confusing
846/// syscall failure. A proper Windows-service equivalent (sc.exe /
847/// NSSM) is a Phase 3.6 follow-up.
848#[cfg(target_os = "linux")]
849pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
850    ensure_root()?;
851    ensure_command_available("systemctl")?;
852    ensure_command_available("getent")?;
853    ensure_command_available("groupadd")?;
854    ensure_command_available("useradd")?;
855    ensure_command_available("install")?;
856    ensure_executable(&config.binary_path)?;
857
858    if !command_success("getent", ["group", config.run_group.as_str()])? {
859        run_command("groupadd", ["--system", config.run_group.as_str()])?;
860    }
861
862    if !command_success("id", ["-u", config.run_user.as_str()])? {
863        let data_dir = config.data_dir();
864        run_command(
865            "useradd",
866            [
867                "--system",
868                "--gid",
869                config.run_group.as_str(),
870                "--home-dir",
871                data_dir.to_string_lossy().as_ref(),
872                "--shell",
873                "/usr/sbin/nologin",
874                config.run_user.as_str(),
875            ],
876        )?;
877    }
878
879    let data_dir = config.data_dir();
880    run_command(
881        "install",
882        [
883            "-d",
884            "-o",
885            config.run_user.as_str(),
886            "-g",
887            config.run_group.as_str(),
888            "-m",
889            "0750",
890            data_dir.to_string_lossy().as_ref(),
891        ],
892    )?;
893
894    std::fs::write(config.unit_path(), render_systemd_unit(config))
895        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
896
897    run_command("systemctl", ["daemon-reload"])?;
898    run_command(
899        "systemctl",
900        [
901            "enable",
902            "--now",
903            format!("{}.service", config.service_name).as_str(),
904        ],
905    )?;
906
907    Ok(())
908}
909
910/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
911/// identical so callers compile on every platform; surface a clear
912/// error at runtime. Windows/macOS service-manager integration is a
913/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
914#[cfg(not(target_os = "linux"))]
915pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
916    Err("systemd install is Linux-only — use sc.exe (Windows) or \
917         launchd (macOS) to install the service manually using the \
918         unit printed by `red service print-unit`"
919        .to_string())
920}
921
922#[cfg(target_os = "linux")]
923fn ensure_root() -> Result<(), String> {
924    let output = Command::new("id")
925        .arg("-u")
926        .output()
927        .map_err(|err| format!("failed to determine current uid: {err}"))?;
928    if !output.status.success() {
929        return Err("failed to determine current uid".to_string());
930    }
931    let uid = String::from_utf8_lossy(&output.stdout);
932    if uid.trim() != "0" {
933        return Err("run this command as root (sudo)".to_string());
934    }
935    Ok(())
936}
937
938#[cfg(target_os = "linux")]
939fn ensure_command_available(command: &str) -> Result<(), String> {
940    let status = Command::new("sh")
941        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
942        .status()
943        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
944    if status.success() {
945        Ok(())
946    } else {
947        Err(format!("required command not found: {command}"))
948    }
949}
950
951#[cfg(target_os = "linux")]
952fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
953    let metadata = std::fs::metadata(path)
954        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
955    #[cfg(unix)]
956    {
957        use std::os::unix::fs::PermissionsExt;
958        if metadata.permissions().mode() & 0o111 == 0 {
959            return Err(format!("binary is not executable: {}", path.display()));
960        }
961    }
962    #[cfg(not(unix))]
963    {
964        if !metadata.is_file() {
965            return Err(format!("binary is not a file: {}", path.display()));
966        }
967    }
968    Ok(())
969}
970
971#[cfg(target_os = "linux")]
972fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
973    Command::new(program)
974        .args(args)
975        .status()
976        .map(|status| status.success())
977        .map_err(|err| format!("failed to run {program}: {err}"))
978}
979
980#[cfg(target_os = "linux")]
981fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
982    let output = Command::new(program)
983        .args(args)
984        .output()
985        .map_err(|err| format!("failed to run {program}: {err}"))?;
986    if output.status.success() {
987        return Ok(());
988    }
989
990    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
991    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
992    let detail = if !stderr.is_empty() {
993        stderr
994    } else if !stdout.is_empty() {
995        stdout
996    } else {
997        format!("exit status {}", output.status)
998    };
999    Err(format!("{program} failed: {detail}"))
1000}
1001
1002pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
1003    let has_any = config.router_bind_addr.is_some()
1004        || config.grpc_bind_addr.is_some()
1005        || config.http_bind_addr.is_some()
1006        || config.wire_bind_addr.is_some()
1007        || config.pg_bind_addr.is_some();
1008    if !has_any {
1009        return Err("at least one server bind address must be configured".into());
1010    }
1011    let thread_name = if config.router_bind_addr.is_some() {
1012        "red-server-router"
1013    } else {
1014        match (
1015            config.grpc_bind_addr.is_some(),
1016            config.http_bind_addr.is_some(),
1017        ) {
1018            (true, true) => "red-server-dual",
1019            (true, false) => "red-server-grpc",
1020            (false, true) => "red-server-http",
1021            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
1022            (false, false) => "red-server-pg-wire",
1023        }
1024    };
1025
1026    let handle = thread::Builder::new()
1027        .name(thread_name.into())
1028        .stack_size(8 * 1024 * 1024)
1029        .spawn(move || run_configured_servers(config))
1030        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
1031
1032    match handle.join() {
1033        Ok(result) => result,
1034        Err(_) => Err("server thread panicked".to_string()),
1035    }
1036}
1037
1038fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
1039    let mut parts = vec![
1040        config.binary_path.display().to_string(),
1041        "server".to_string(),
1042        "--path".to_string(),
1043        config.data_path.display().to_string(),
1044    ];
1045
1046    if let Some(bind_addr) = &config.router_bind_addr {
1047        parts.push("--bind".to_string());
1048        parts.push(bind_addr.clone());
1049    } else if let Some(bind_addr) = &config.grpc_bind_addr {
1050        parts.push("--grpc-bind".to_string());
1051        parts.push(bind_addr.clone());
1052    }
1053    if let Some(bind_addr) = &config.http_bind_addr {
1054        parts.push("--http-bind".to_string());
1055        parts.push(bind_addr.clone());
1056    }
1057
1058    parts.join(" ")
1059}
1060
1061pub fn probe_listener(target: &str, timeout: Duration) -> bool {
1062    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
1063        Ok(addresses) => addresses.collect(),
1064        Err(_) => return false,
1065    };
1066
1067    addresses
1068        .into_iter()
1069        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
1070}
1071
1072#[inline(never)]
1073fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
1074    // Phase 6 logging is initialised inside each runner once the
1075    // runtime is open — see `build_runtime_and_auth_store`. Going
1076    // after DB open lets us read `red.logging.*` config keys out of
1077    // the persistent red_config store and merge them with the CLI
1078    // flags (flag > red_config > built-in default).
1079    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
1080        return run_routed_server(config, router_bind_addr);
1081    }
1082
1083    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
1084        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
1085            run_dual_server(config, grpc_bind_addr, http_bind_addr)
1086        }
1087        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1088        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1089        (None, None) => {
1090            if let Some(wire_addr) = config.wire_bind_addr.clone() {
1091                run_wire_only_server(config, wire_addr)
1092            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1093                run_pg_only_server(config, pg_addr)
1094            } else {
1095                Err("at least one server bind address must be configured".to_string())
1096            }
1097        }
1098    }
1099}
1100
1101/// Bind a TCP listener for a transport at startup and record the
1102/// outcome in the shared [`TransportReadiness`] state.
1103///
1104/// The split between *explicit* and *implicit/default* binds is the
1105/// contract from issue #545:
1106///
1107/// * `explicit == true` — the operator named this transport on the
1108///   CLI / env / config. A failed bind is fatal: this returns `Err`
1109///   and the boot path must propagate the error so the process exits
1110///   non-zero with the recorded `reason`.
1111/// * `explicit == false` — this is a default listener the server
1112///   would have spun up regardless. A failed bind degrades: this
1113///   returns `Ok(None)` (no listener) but the failure is still
1114///   recorded in `readiness.failed`, so the server keeps running and
1115///   the next `/health` probe enumerates the degraded listener.
1116///
1117/// On success the bound listener lands in `readiness.active`.
1118pub fn bind_listener_for_startup(
1119    readiness: &mut TransportReadiness,
1120    transport: &str,
1121    bind_addr: &str,
1122    explicit: bool,
1123) -> Result<Option<TcpListener>, String> {
1124    match TcpListener::bind(bind_addr) {
1125        Ok(listener) => {
1126            readiness.active(transport, bind_addr, explicit);
1127            Ok(Some(listener))
1128        }
1129        Err(err) => {
1130            let reason = format!("{transport} listener bind {bind_addr}: {err}");
1131            readiness.failed(transport, bind_addr, explicit, reason.clone());
1132            if explicit {
1133                tracing::error!(
1134                    transport,
1135                    bind = %bind_addr,
1136                    error = %err,
1137                    "fatal explicit bind failure"
1138                );
1139                Err(format!("explicit {reason}"))
1140            } else {
1141                tracing::warn!(
1142                    transport,
1143                    bind = %bind_addr,
1144                    error = %err,
1145                    "non-fatal implicit bind failure; listener degraded"
1146                );
1147                Ok(None)
1148            }
1149        }
1150    }
1151}
1152
1153/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
1154///
1155/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
1156/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
1157/// grace window. SIGKILL after that grace window is the OS's
1158/// fallback; we are responsible for finishing in time.
1159///
1160/// Spawns a tokio task on the caller's runtime that:
1161///   1. Awaits the first of SIGTERM / SIGINT.
1162///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
1163///      runtime moves to `Stopped` on its own; this just runs the
1164///      flush + checkpoint pipeline and (optionally) a final backup.
1165///   3. Calls `std::process::exit(0)` so the orchestrator sees a
1166///      clean exit code.
1167///
1168/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
1169/// configured) toggles step 3's backup branch. The flush + checkpoint
1170/// always run.
1171///
1172/// Idempotent across signal storms — `graceful_shutdown` returns the
1173/// cached report on second call, but we exit on the first one
1174/// regardless, so the second SIGTERM never reaches the handler.
1175async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1176    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1177        .ok()
1178        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1179        .unwrap_or(true);
1180
1181    #[cfg(unix)]
1182    {
1183        use tokio::signal::unix::{signal, SignalKind};
1184
1185        let mut sigterm = match signal(SignalKind::terminate()) {
1186            Ok(s) => s,
1187            Err(err) => {
1188                tracing::warn!(
1189                    error = %err,
1190                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1191                );
1192                return;
1193            }
1194        };
1195        let mut sigint = match signal(SignalKind::interrupt()) {
1196            Ok(s) => s,
1197            Err(err) => {
1198                tracing::warn!(error = %err, "could not install SIGINT handler");
1199                return;
1200            }
1201        };
1202        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
1203        // their `_FILE` companions without restarting the process.
1204        // Useful for credential rotation pipelines (kubectl create
1205        // secret + kubectl rollout restart, but for systemd / Nomad /
1206        // bare-metal where rolling the process is heavier).
1207        let mut sighup = match signal(SignalKind::hangup()) {
1208            Ok(s) => Some(s),
1209            Err(err) => {
1210                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1211                None
1212            }
1213        };
1214
1215        let reload_runtime = runtime.clone();
1216        tokio::spawn(async move {
1217            loop {
1218                let signal_name = match &mut sighup {
1219                    Some(hup) => tokio::select! {
1220                        _ = sigterm.recv() => "SIGTERM",
1221                        _ = sigint.recv() => "SIGINT",
1222                        _ = hup.recv() => "SIGHUP",
1223                    },
1224                    None => tokio::select! {
1225                        _ = sigterm.recv() => "SIGTERM",
1226                        _ = sigint.recv() => "SIGINT",
1227                    },
1228                };
1229
1230                if signal_name == "SIGHUP" {
1231                    handle_sighup_reload(&reload_runtime);
1232                    continue; // stay alive; SIGHUP isn't a shutdown
1233                }
1234
1235                tracing::info!(
1236                    signal = signal_name,
1237                    "lifecycle signal received; shutting down"
1238                );
1239                match runtime.graceful_shutdown(backup_on_shutdown) {
1240                    Ok(report) => {
1241                        tracing::info!(
1242                            duration_ms = report.duration_ms,
1243                            flushed_wal = report.flushed_wal,
1244                            final_checkpoint = report.final_checkpoint,
1245                            backup_uploaded = report.backup_uploaded,
1246                            "graceful shutdown complete"
1247                        );
1248                    }
1249                    Err(err) => {
1250                        tracing::error!(error = %err, "graceful shutdown failed");
1251                        // Issue #205 — graceful shutdown returning Err
1252                        // means the runtime is exiting without a clean
1253                        // flush/checkpoint. Operator-grade event so the
1254                        // operator notices the dirty exit even when the
1255                        // process restarts before they read tracing logs.
1256                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1257                            reason: format!("graceful shutdown failed: {err}"),
1258                        }
1259                        .emit_global();
1260                    }
1261                }
1262                std::process::exit(0);
1263            }
1264        });
1265    }
1266
1267    #[cfg(not(unix))]
1268    {
1269        tokio::spawn(async move {
1270            let interrupted = tokio::signal::ctrl_c().await;
1271            if let Err(err) = interrupted {
1272                tracing::warn!(error = %err, "could not install Ctrl+C handler");
1273                return;
1274            }
1275
1276            tracing::info!(
1277                signal = "Ctrl+C",
1278                "lifecycle signal received; shutting down"
1279            );
1280            match runtime.graceful_shutdown(backup_on_shutdown) {
1281                Ok(report) => {
1282                    tracing::info!(
1283                        duration_ms = report.duration_ms,
1284                        flushed_wal = report.flushed_wal,
1285                        final_checkpoint = report.final_checkpoint,
1286                        backup_uploaded = report.backup_uploaded,
1287                        "graceful shutdown complete"
1288                    );
1289                }
1290                Err(err) => {
1291                    tracing::error!(error = %err, "graceful shutdown failed");
1292                }
1293            }
1294            std::process::exit(0);
1295        });
1296    }
1297}
1298
1299/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
1300/// vars. Today this only refreshes the audit log + records the
1301/// reload event; the runtime modules that hold cached secret
1302/// material (S3 backend credentials, admin token, JWT keys) read
1303/// the env on each request so the next call after SIGHUP picks up
1304/// the new file contents automatically. A future extension can
1305/// punch through to the LeaseStore / AuthStore for in-memory
1306/// caches that don't re-read on each call.
1307fn handle_sighup_reload(runtime: &RedDBRuntime) {
1308    let now_ms = std::time::SystemTime::now()
1309        .duration_since(std::time::UNIX_EPOCH)
1310        .map(|d| d.as_millis() as u64)
1311        .unwrap_or(0);
1312    tracing::info!(
1313        target: "reddb::secrets",
1314        ts_unix_ms = now_ms,
1315        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1316    );
1317    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1318    // every emission goes through the typed-field guard. The
1319    // arguments here are static, but using the typed entry point
1320    // keeps the discipline uniform across call sites.
1321    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1322    runtime.audit_log().record_event(
1323        AuditEvent::builder("config/sighup_reload")
1324            .source(AuditAuthSource::System)
1325            .resource("secrets")
1326            .outcome(Outcome::Success)
1327            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1328            .build(),
1329    );
1330}
1331
1332#[inline(never)]
1333fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1334    let workers = config.workers;
1335    let cli_telemetry = config.telemetry.clone();
1336    let db_options = config.to_db_options()?;
1337    let rt_config = detect_runtime_config();
1338    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1339    let (runtime, auth_store, _telemetry_guard) =
1340        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1341    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1342
1343    spawn_admin_metrics_listeners(&runtime, &auth_store);
1344
1345    let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1346        .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1347    let http_backend = http_listener
1348        .local_addr()
1349        .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1350    let http_server = build_http_server(
1351        runtime.clone(),
1352        auth_store.clone(),
1353        http_backend.to_string(),
1354    );
1355    let http_server = apply_http_limits(http_server, &config, &runtime);
1356    let http_handle = http_server.serve_in_background_on(http_listener);
1357
1358    thread::sleep(Duration::from_millis(100));
1359    if http_handle.is_finished() {
1360        return match http_handle.join() {
1361            Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1362            Ok(Err(err)) => Err(err.to_string()),
1363            Err(_) => Err("HTTP backend thread panicked".to_string()),
1364        };
1365    }
1366
1367    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1368        .enable_all()
1369        .worker_threads(worker_threads)
1370        .thread_stack_size(rt_config.stack_size)
1371        .build()
1372        .map_err(|err| format!("tokio runtime: {err}"))?;
1373
1374    let signal_runtime = runtime.clone();
1375    tokio_runtime.block_on(async move {
1376        spawn_lifecycle_signal_handler(signal_runtime).await;
1377        let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1378            .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1379        let grpc_backend = grpc_listener
1380            .local_addr()
1381            .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1382        let grpc_server = RedDBGrpcServer::with_options(
1383            runtime.clone(),
1384            GrpcServerOptions {
1385                bind_addr: grpc_backend.to_string(),
1386                tls: None,
1387            },
1388            auth_store,
1389        );
1390        tokio::spawn(async move {
1391            if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1392                tracing::error!(err = %err, "gRPC backend error");
1393            }
1394        });
1395
1396        let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1397            .await
1398            .map_err(|err| format!("bind internal wire listener: {err}"))?;
1399        let wire_backend = wire_listener
1400            .local_addr()
1401            .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1402        let wire_rt = Arc::new(runtime);
1403        tokio::spawn(async move {
1404            if let Err(err) =
1405                crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1406                    .await
1407            {
1408                tracing::error!(err = %err, "redwire backend error");
1409            }
1410        });
1411
1412        tracing::info!(
1413            bind = %router_bind_addr,
1414            cpus = rt_config.available_cpus,
1415            workers = worker_threads,
1416            "router bootstrapping"
1417        );
1418        serve_tcp_router(TcpProtocolRouterConfig {
1419            bind_addr: router_bind_addr,
1420            grpc_backend,
1421            http_backend,
1422            wire_backend,
1423        })
1424        .await
1425        .map_err(|err| err.to_string())
1426    })
1427}
1428
1429/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1430async fn spawn_wire_listeners(
1431    config: &ServerCommandConfig,
1432    runtime: &RedDBRuntime,
1433    readiness: &mut TransportReadiness,
1434) -> Result<(), String> {
1435    // Plaintext RedWire — TCP or Unix socket
1436    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1437        let wire_rt = Arc::new(runtime.clone());
1438        // Address starting with `unix://` or an absolute filesystem path
1439        // switches to Unix domain sockets.
1440        #[cfg(unix)]
1441        {
1442            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1443                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1444                tokio::spawn(async move {
1445                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1446                        &wire_addr, wire_rt,
1447                    )
1448                    .await
1449                    {
1450                        tracing::error!(err = %e, "redwire unix listener error");
1451                    }
1452                });
1453                return Ok(());
1454            }
1455        }
1456        match tokio::net::TcpListener::bind(&wire_addr).await {
1457            Ok(listener) => {
1458                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1459                tokio::spawn(async move {
1460                    if let Err(e) =
1461                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1462                            .await
1463                    {
1464                        tracing::error!(err = %e, "redwire listener error");
1465                    }
1466                });
1467            }
1468            Err(err) => {
1469                let reason = format!("wire listener bind {wire_addr}: {err}");
1470                readiness.failed(
1471                    "wire",
1472                    &wire_addr,
1473                    config.wire_bind_explicit,
1474                    reason.clone(),
1475                );
1476                if config.wire_bind_explicit {
1477                    tracing::error!(
1478                        transport = "wire",
1479                        bind = %wire_addr,
1480                        error = %err,
1481                        "fatal explicit bind failure"
1482                    );
1483                    return Err(format!("explicit {reason}"));
1484                }
1485                tracing::warn!(
1486                    transport = "wire",
1487                    bind = %wire_addr,
1488                    error = %err,
1489                    "non-fatal implicit bind failure; listener degraded"
1490                );
1491            }
1492        }
1493    }
1494
1495    // RedWire over TLS
1496    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1497        let tls_config = resolve_wire_tls_config(config);
1498        match tls_config {
1499            Ok(tls_cfg) => {
1500                let wire_rt = Arc::new(runtime.clone());
1501                tokio::spawn(async move {
1502                    if let Err(e) =
1503                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1504                            .await
1505                    {
1506                        tracing::error!(err = %e, "redwire+tls listener error");
1507                    }
1508                });
1509            }
1510            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1511        }
1512    }
1513    Ok(())
1514}
1515
1516/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1517///
1518/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1519/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1520/// alongside the native wire listener; the two transports do not
1521/// share a port.
1522fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1523    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1524        let rt = Arc::new(runtime.clone());
1525        tokio::spawn(async move {
1526            let cfg = crate::wire::PgWireConfig {
1527                bind_addr: pg_addr,
1528                ..Default::default()
1529            };
1530            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1531                tracing::error!(err = %e, "pg wire listener error");
1532            }
1533        });
1534    }
1535}
1536
1537/// Resolve gRPC TLS material into PEM bytes.
1538///
1539/// Lookup order, in priority:
1540///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1541///      passed via CLI/env). Both must be present together.
1542///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1543///      to the data dir. Refuses to run without the env flag so an
1544///      operator can't accidentally ship a dev cert in prod.
1545///
1546/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1547/// turning the listener into a mutual-TLS endpoint that requires
1548/// every client to present a chain that anchors at one of the CAs
1549/// in the bundle.
1550fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1551    use crate::utils::secret_file::expand_file_env;
1552
1553    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1554    // surface as warnings; the fallback path (explicit cert paths) will
1555    // cover the common case.
1556    for var in [
1557        "REDDB_GRPC_TLS_CERT",
1558        "REDDB_GRPC_TLS_KEY",
1559        "REDDB_GRPC_TLS_CLIENT_CA",
1560    ] {
1561        if let Err(err) = expand_file_env(var) {
1562            tracing::warn!(
1563                target: "reddb::secrets",
1564                env = %var,
1565                err = %err,
1566                "could not expand *_FILE companion for gRPC TLS"
1567            );
1568        }
1569    }
1570
1571    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1572        (Some(cert), Some(key)) => {
1573            let cert_pem = std::fs::read(cert)
1574                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1575            let key_pem =
1576                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1577            (cert_pem, key_pem)
1578        }
1579        _ => {
1580            // No explicit material → only proceed when dev-mode is on.
1581            let dev = std::env::var("RED_GRPC_TLS_DEV")
1582                .ok()
1583                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1584                .unwrap_or(false);
1585            if !dev {
1586                return Err("gRPC TLS configured but no cert/key supplied — set \
1587                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1588                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1589                    .to_string());
1590            }
1591            let dir = config
1592                .path
1593                .as_ref()
1594                .and_then(|p| p.parent())
1595                .map(PathBuf::from)
1596                .unwrap_or_else(|| PathBuf::from("."));
1597            let (cert_pem_str, key_pem_str) =
1598                crate::wire::tls::generate_self_signed_cert("localhost")
1599                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1600
1601            // Constant-time-friendly fingerprint to stderr so the
1602            // operator can pin a client trust store. We log via
1603            // `tracing::warn!` so it stands out next to ordinary
1604            // listener-online events.
1605            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1606            tracing::warn!(
1607                target: "reddb::security",
1608                transport = "grpc",
1609                cert_sha256 = %fp,
1610                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1611                 DO NOT use in production"
1612            );
1613            // Persist so that restarts reuse the same identity.
1614            let cert_path = dir.join("grpc-tls-cert.pem");
1615            let key_path = dir.join("grpc-tls-key.pem");
1616            if !cert_path.exists() || !key_path.exists() {
1617                let _ = std::fs::create_dir_all(&dir);
1618                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1619                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1620                std::fs::write(&key_path, key_pem_str.as_bytes())
1621                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1622                #[cfg(unix)]
1623                {
1624                    use std::os::unix::fs::PermissionsExt;
1625                    let _ =
1626                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1627                }
1628            }
1629            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1630        }
1631    };
1632
1633    let client_ca_pem = match &config.grpc_tls_client_ca {
1634        Some(path) => Some(
1635            std::fs::read(path)
1636                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1637        ),
1638        None => None,
1639    };
1640
1641    Ok(crate::GrpcTlsOptions {
1642        cert_pem,
1643        key_pem,
1644        client_ca_pem,
1645    })
1646}
1647
1648/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1649/// configured. Logs and continues on TLS-config errors so the plain
1650/// listener stays up; this matches the wire-listener pattern.
1651fn spawn_grpc_tls_listener_if_configured(
1652    config: &ServerCommandConfig,
1653    runtime: RedDBRuntime,
1654    auth_store: Arc<AuthStore>,
1655) {
1656    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1657        return;
1658    };
1659    let tls_opts = match resolve_grpc_tls_options(config) {
1660        Ok(opts) => opts,
1661        Err(err) => {
1662            tracing::error!(
1663                target: "reddb::security",
1664                transport = "grpc",
1665                err = %err,
1666                "gRPC TLS config error; TLS listener will not start"
1667            );
1668            return;
1669        }
1670    };
1671    tokio::spawn(async move {
1672        let server = RedDBGrpcServer::with_options(
1673            runtime,
1674            GrpcServerOptions {
1675                bind_addr: tls_bind.clone(),
1676                tls: Some(tls_opts),
1677            },
1678            auth_store,
1679        );
1680        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1681        if let Err(err) = server.serve().await {
1682            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1683        }
1684    });
1685}
1686
1687/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1688/// lines. Constant-time hash; no token contents leave this fn.
1689fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1690    use sha2::{Digest, Sha256};
1691    let mut h = Sha256::new();
1692    h.update(pem);
1693    let d = h.finalize();
1694    let mut buf = String::with_capacity(64);
1695    for b in d.iter() {
1696        buf.push_str(&format!("{b:02x}"));
1697    }
1698    buf
1699}
1700
1701/// Resolve TLS config: use provided cert/key or auto-generate.
1702fn resolve_wire_tls_config(
1703    config: &ServerCommandConfig,
1704) -> Result<crate::wire::WireTlsConfig, String> {
1705    match (&config.wire_tls_cert, &config.wire_tls_key) {
1706        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1707            cert_path: cert.clone(),
1708            key_path: key.clone(),
1709        }),
1710        _ => {
1711            // Auto-generate self-signed cert
1712            let dir = config
1713                .path
1714                .as_ref()
1715                .and_then(|p| p.parent())
1716                .map(PathBuf::from)
1717                .unwrap_or_else(|| PathBuf::from("."));
1718            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1719        }
1720    }
1721}
1722
1723#[inline(never)]
1724fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1725    let rt_config = detect_runtime_config();
1726    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1727    let cli_telemetry = config.telemetry.clone();
1728    let db_options = config.to_db_options()?;
1729    let mut transport_readiness = TransportReadiness::default();
1730
1731    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1732        .enable_all()
1733        .worker_threads(workers)
1734        .thread_stack_size(rt_config.stack_size)
1735        .build()
1736        .map_err(|err| format!("tokio runtime: {err}"))?;
1737
1738    // Guard lives on the outer thread's stack so it outlives the
1739    // tokio runtime — dropping it only after the listener returns
1740    // flushes the file log writer.
1741    let (runtime, _auth_store, _telemetry_guard) =
1742        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1743    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1744    let signal_runtime = runtime.clone();
1745    tokio_runtime.block_on(async move {
1746        spawn_lifecycle_signal_handler(signal_runtime).await;
1747        spawn_pg_listener(&config, &runtime);
1748        let wire_rt = Arc::new(runtime);
1749        let listener = tokio::net::TcpListener::bind(&wire_addr)
1750            .await
1751            .map_err(|err| {
1752                let reason = format!("wire listener bind {wire_addr}: {err}");
1753                transport_readiness.failed(
1754                    "wire",
1755                    &wire_addr,
1756                    config.wire_bind_explicit,
1757                    reason.clone(),
1758                );
1759                if config.wire_bind_explicit {
1760                    format!("explicit {reason}")
1761                } else {
1762                    reason
1763                }
1764            })?;
1765        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1766        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1767            .await
1768            .map_err(|e| e.to_string())
1769    })
1770}
1771
1772#[inline(never)]
1773fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1774    let rt_config = detect_runtime_config();
1775    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1776    let cli_telemetry = config.telemetry.clone();
1777    let db_options = config.to_db_options()?;
1778
1779    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1780        .enable_all()
1781        .worker_threads(workers)
1782        .thread_stack_size(rt_config.stack_size)
1783        .build()
1784        .map_err(|err| format!("tokio runtime: {err}"))?;
1785
1786    let (runtime, _auth_store, _telemetry_guard) =
1787        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1788    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1789    let signal_runtime = runtime.clone();
1790    tokio_runtime.block_on(async move {
1791        spawn_lifecycle_signal_handler(signal_runtime).await;
1792        let cfg = crate::wire::PgWireConfig {
1793            bind_addr: pg_addr,
1794            ..Default::default()
1795        };
1796        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1797            .await
1798            .map_err(|e| e.to_string())
1799    })
1800}
1801
1802#[inline(never)]
1803fn build_runtime_and_auth_store(
1804    db_options: RedDBOptions,
1805    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1806) -> Result<
1807    (
1808        RedDBRuntime,
1809        Arc<AuthStore>,
1810        Option<crate::telemetry::TelemetryGuard>,
1811    ),
1812    String,
1813> {
1814    // Return the TelemetryGuard so server runners can bind it for
1815    // their full lifetime. Dropping the guard tears down the
1816    // non-blocking log writer thread and, because that writer is
1817    // built with `.lossy(true)`, any subsequent log event routed to
1818    // the file sink is silently dropped — so callers MUST keep the
1819    // returned `Option<TelemetryGuard>` alive until shutdown.
1820    build_runtime_with_telemetry(db_options, cli_telemetry)
1821}
1822
1823/// Open the runtime, initialise structured logging from merged CLI +
1824/// `red_config` settings, and return a guard the caller must keep
1825/// alive for the server lifetime (drop flushes pending log writes).
1826///
1827/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1828/// in red_config, beats the built-in default. A CLI-flag value of
1829/// `None` / empty means "inherit from config or default" — never
1830/// "disable". The one exception is `--no-log-file` which forces
1831/// `log_dir = None` regardless of config.
1832pub(crate) fn build_runtime_with_telemetry(
1833    db_options: RedDBOptions,
1834    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1835) -> Result<
1836    (
1837        RedDBRuntime,
1838        Arc<AuthStore>,
1839        Option<crate::telemetry::TelemetryGuard>,
1840    ),
1841    String,
1842> {
1843    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1844        // Issue #205 — runtime construction failure is the canonical
1845        // StartupFailed phase. The audit sink isn't installed yet
1846        // (it would have been registered inside `with_options`), so
1847        // the emit falls through to tracing+eprintln only — operator
1848        // still sees it on stderr.
1849        let msg = err.to_string();
1850        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1851            phase: "runtime_construction".to_string(),
1852            error: msg.clone(),
1853        }
1854        .emit_global();
1855        msg
1856    })?;
1857
1858    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1859    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1860    // operator asked for a fence; running unfenced would silently
1861    // expose split-brain risk.
1862    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1863        let msg = err.to_string();
1864        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1865            phase: "lease_loop".to_string(),
1866            error: msg.clone(),
1867        }
1868        .emit_global();
1869        msg
1870    })?;
1871
1872    // #213 — edge-triggered disk-space watchdog. Watches the data
1873    // directory; falls back to polling when fanotify is unavailable
1874    // (non-Linux or unprivileged container).
1875    if let Some(data_path) = db_options.data_path.as_deref() {
1876        let watch_dir = data_path.parent().unwrap_or(data_path);
1877        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1878    }
1879
1880    // #214 — inotify config hot-reload watcher. Watches the config file
1881    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1882    // applies hot-reloadable keys without restart.
1883    {
1884        let config_path = crate::runtime::config_overlay::config_file_path();
1885        let store = runtime.db().store();
1886        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1887    }
1888
1889    // Phase 6 logging: merge red_config overrides onto the CLI-built
1890    // telemetry config, then install the global subscriber.
1891    let merged = merge_telemetry_with_config(
1892        cli_telemetry
1893            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1894        &runtime,
1895    );
1896    let telemetry_guard = crate::telemetry::init(merged);
1897
1898    let auth_store =
1899        if db_options.auth.vault_enabled {
1900            let pager =
1901                runtime.db().store().pager().cloned().ok_or_else(|| {
1902                    "vault requires a paged database (persistent mode)".to_string()
1903                })?;
1904            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1905                .map_err(|err| err.to_string())?;
1906            Arc::new(store)
1907        } else {
1908            Arc::new(AuthStore::new(db_options.auth.clone()))
1909        };
1910    auth_store.bootstrap_from_env();
1911
1912    // Background session purge (every 5 minutes)
1913    {
1914        let store = Arc::clone(&auth_store);
1915        std::thread::Builder::new()
1916            .name("reddb-session-purge".into())
1917            .spawn(move || loop {
1918                std::thread::sleep(std::time::Duration::from_secs(300));
1919                store.purge_expired_sessions();
1920            })
1921            .ok();
1922    }
1923
1924    Ok((runtime, auth_store, telemetry_guard))
1925}
1926
1927/// Read `red.logging.*` keys from the persistent config store and
1928/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
1929/// explicit CLI flag > red_config > built-in default.
1930///
1931/// The "was a flag passed" signal comes from the `*_explicit` bools
1932/// on `TelemetryConfig`, populated by the CLI parser. This replaces
1933/// an earlier equality-to-default heuristic that silently dropped
1934/// config whenever the CLI-derived default diverged from
1935/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
1936/// non-TTY `format`) and that silently overrode `--no-log-file`.
1937fn merge_telemetry_with_config(
1938    mut cli: crate::telemetry::TelemetryConfig,
1939    runtime: &RedDBRuntime,
1940) -> crate::telemetry::TelemetryConfig {
1941    use crate::storage::schema::Value;
1942
1943    let store = runtime.db().store();
1944
1945    if !cli.level_explicit {
1946        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1947            cli.level_filter = v.to_string();
1948        }
1949    }
1950    if !cli.format_explicit {
1951        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1952            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1953                cli.format = parsed;
1954            }
1955        }
1956    }
1957    if !cli.rotation_keep_days_explicit {
1958        match store.get_config("red.logging.keep_days") {
1959            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1960                cli.rotation_keep_days = n as u16
1961            }
1962            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1963                cli.rotation_keep_days = n as u16
1964            }
1965            Some(Value::Text(v)) => {
1966                if let Ok(n) = v.parse::<u16>() {
1967                    cli.rotation_keep_days = n;
1968                }
1969            }
1970            _ => {}
1971        }
1972    }
1973    if !cli.file_prefix_explicit {
1974        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1975            if !v.is_empty() {
1976                cli.file_prefix = v.to_string();
1977            }
1978        }
1979    }
1980    // --no-log-file is a kill-switch: config cannot resurrect the
1981    // file sink. Explicit --log-dir also wins.
1982    if !cli.log_dir_explicit && !cli.log_file_disabled {
1983        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1984            if !v.is_empty() {
1985                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1986            }
1987        }
1988    }
1989
1990    cli
1991}
1992
1993#[cfg(test)]
1994mod telemetry_merge_tests {
1995    use super::*;
1996    use crate::telemetry::{LogFormat, TelemetryConfig};
1997
1998    fn fresh_runtime() -> RedDBRuntime {
1999        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
2000    }
2001
2002    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
2003        runtime
2004            .db()
2005            .store()
2006            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
2007    }
2008
2009    fn cli_base() -> TelemetryConfig {
2010        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
2011        // log_dir = Some(...), format = Json. Nothing marked explicit.
2012        TelemetryConfig {
2013            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
2014            format: LogFormat::Json,
2015            ..Default::default()
2016        }
2017    }
2018
2019    #[test]
2020    fn config_log_dir_promoted_when_flag_absent() {
2021        let runtime = fresh_runtime();
2022        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2023        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2024        assert_eq!(
2025            merged.log_dir.as_deref(),
2026            Some(std::path::Path::new("/var/log/reddb"))
2027        );
2028    }
2029
2030    #[test]
2031    fn explicit_log_dir_wins_over_config() {
2032        let runtime = fresh_runtime();
2033        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2034        let mut cli = cli_base();
2035        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
2036        cli.log_dir_explicit = true;
2037        let merged = merge_telemetry_with_config(cli, &runtime);
2038        assert_eq!(
2039            merged.log_dir.as_deref(),
2040            Some(std::path::Path::new("/custom/dir"))
2041        );
2042    }
2043
2044    #[test]
2045    fn no_log_file_beats_config_log_dir() {
2046        let runtime = fresh_runtime();
2047        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
2048        let mut cli = cli_base();
2049        cli.log_dir = None;
2050        cli.log_file_disabled = true;
2051        let merged = merge_telemetry_with_config(cli, &runtime);
2052        assert!(
2053            merged.log_dir.is_none(),
2054            "--no-log-file must veto config dir"
2055        );
2056    }
2057
2058    #[test]
2059    fn config_format_promoted_on_non_tty_default() {
2060        // On non-TTY, default_telemetry_for_path yields format=Json even
2061        // though TelemetryConfig::default() is Pretty. The old equality
2062        // check silently dropped config here.
2063        let runtime = fresh_runtime();
2064        set_str(&runtime, "red.logging.format", "pretty");
2065        let merged = merge_telemetry_with_config(cli_base(), &runtime);
2066        assert_eq!(merged.format, LogFormat::Pretty);
2067    }
2068
2069    #[test]
2070    fn explicit_format_wins_over_config() {
2071        let runtime = fresh_runtime();
2072        set_str(&runtime, "red.logging.format", "pretty");
2073        let mut cli = cli_base();
2074        cli.format = LogFormat::Json;
2075        cli.format_explicit = true;
2076        let merged = merge_telemetry_with_config(cli, &runtime);
2077        assert_eq!(merged.format, LogFormat::Json);
2078    }
2079}
2080
2081#[inline(never)]
2082fn build_http_server(
2083    runtime: RedDBRuntime,
2084    auth_store: Arc<AuthStore>,
2085    bind_addr: String,
2086) -> RedDBServer {
2087    build_http_server_with_transport_readiness(
2088        runtime,
2089        auth_store,
2090        bind_addr,
2091        TransportReadiness::default(),
2092    )
2093}
2094
2095/// Apply the resolved HTTP limits to a freshly-built `RedDBServer`.
2096///
2097/// Centralised here so every `run_*` path goes through the same
2098/// resolver and the structured startup log line carries the same
2099/// `http_limits.*` fields regardless of transport combination.
2100fn apply_http_limits(
2101    server: RedDBServer,
2102    config: &ServerCommandConfig,
2103    runtime: &RedDBRuntime,
2104) -> RedDBServer {
2105    let store = runtime.db().store();
2106    let resolved =
2107        crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2108            .get_config(key)
2109        {
2110            Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2111            Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2112            Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2113            _ => None,
2114        });
2115    tracing::info!(
2116        target: "reddb::http_limits",
2117        max_handlers = resolved.max_handlers,
2118        handler_timeout_ms = resolved.handler_timeout_ms,
2119        retry_after_secs = resolved.retry_after_secs,
2120        "http_limits resolved"
2121    );
2122    server.with_http_limits(resolved)
2123}
2124
2125#[inline(never)]
2126fn build_http_server_with_transport_readiness(
2127    runtime: RedDBRuntime,
2128    auth_store: Arc<AuthStore>,
2129    bind_addr: String,
2130    transport_readiness: TransportReadiness,
2131) -> RedDBServer {
2132    RedDBServer::with_options(
2133        runtime,
2134        ServerOptions {
2135            bind_addr,
2136            transport_readiness,
2137            ..ServerOptions::default()
2138        },
2139    )
2140    .with_auth(auth_store)
2141}
2142
2143/// PLAN.md Phase 6.2 — build a listener that only serves
2144/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
2145/// when the env var has no host (loopback-only by default per spec).
2146#[inline(never)]
2147fn build_admin_only_server(
2148    runtime: RedDBRuntime,
2149    auth_store: Arc<AuthStore>,
2150    bind_addr: String,
2151) -> RedDBServer {
2152    RedDBServer::with_options(
2153        runtime,
2154        ServerOptions {
2155            bind_addr,
2156            surface: crate::server::ServerSurface::AdminOnly,
2157            ..ServerOptions::default()
2158        },
2159    )
2160    .with_auth(auth_store)
2161}
2162
2163/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
2164/// + `/health/*`. Suitable for Prometheus scrape ports that may be
2165///   exposed wider than the admin port.
2166#[inline(never)]
2167fn build_metrics_only_server(
2168    runtime: RedDBRuntime,
2169    auth_store: Arc<AuthStore>,
2170    bind_addr: String,
2171) -> RedDBServer {
2172    RedDBServer::with_options(
2173        runtime,
2174        ServerOptions {
2175            bind_addr,
2176            surface: crate::server::ServerSurface::MetricsOnly,
2177            ..ServerOptions::default()
2178        },
2179    )
2180    .with_auth(auth_store)
2181}
2182
2183/// Spawn dedicated admin / metrics listeners when the operator set
2184/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
2185/// unset the existing listener keeps serving everything (back-compat).
2186fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2187    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2188        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2189        let _ = server.serve_in_background();
2190        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2191    }
2192    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2193        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2194        let _ = server.serve_in_background();
2195        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2196    }
2197}
2198
2199#[inline(never)]
2200fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2201    let cli_telemetry = config.telemetry.clone();
2202    let mut transport_readiness = TransportReadiness::default();
2203    let Some(listener) = bind_listener_for_startup(
2204        &mut transport_readiness,
2205        "http",
2206        &bind_addr,
2207        config.http_bind_explicit,
2208    )?
2209    else {
2210        return Err(format!(
2211            "no HTTP listener started; implicit bind {} failed",
2212            bind_addr
2213        ));
2214    };
2215    let db_options = config.to_db_options()?;
2216    let (runtime, auth_store, _telemetry_guard) =
2217        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2218    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2219    spawn_admin_metrics_listeners(&runtime, &auth_store);
2220    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2221    let server = build_http_server_with_transport_readiness(
2222        runtime.clone(),
2223        auth_store,
2224        bind_addr.clone(),
2225        transport_readiness,
2226    );
2227    let server = apply_http_limits(server, &config, &runtime);
2228    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2229    server.serve_on(listener).map_err(|err| err.to_string())
2230}
2231
2232/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
2233/// rustls-terminated listener alongside the plain HTTP server. Cert
2234/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
2235///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
2236///   is auto-generated next to the data directory (refused otherwise).
2237fn spawn_http_tls_listener(
2238    config: &ServerCommandConfig,
2239    runtime: &RedDBRuntime,
2240    auth_store: &Arc<AuthStore>,
2241) -> Result<(), String> {
2242    let Some(addr) = config.http_tls_bind_addr.clone() else {
2243        return Ok(());
2244    };
2245
2246    let tls_config = resolve_http_tls_config(config)?;
2247    let server_config = crate::server::tls::build_server_config(&tls_config)
2248        .map_err(|err| format!("HTTP TLS: {err}"))?;
2249
2250    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2251    let server = apply_http_limits(server, config, runtime);
2252    let _handle = server.serve_tls_in_background(server_config);
2253    tracing::info!(
2254        transport = "https",
2255        bind = %addr,
2256        mtls = %tls_config.client_ca_path.is_some(),
2257        "TLS listener online"
2258    );
2259    Ok(())
2260}
2261
2262/// Resolve the HTTP TLS config from CLI / env / dev defaults.
2263fn resolve_http_tls_config(
2264    config: &ServerCommandConfig,
2265) -> Result<crate::server::tls::HttpTlsConfig, String> {
2266    match (&config.http_tls_cert, &config.http_tls_key) {
2267        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2268            cert_path: cert.clone(),
2269            key_path: key.clone(),
2270            client_ca_path: config.http_tls_client_ca.clone(),
2271        }),
2272        (None, None) => {
2273            // Dev-mode auto-generate next to the data directory.
2274            let dir = config
2275                .path
2276                .as_ref()
2277                .and_then(|p| p.parent().map(std::path::PathBuf::from))
2278                .unwrap_or_else(|| std::path::PathBuf::from("."));
2279            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2280                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2281            Ok(crate::server::tls::HttpTlsConfig {
2282                cert_path: auto.cert_path,
2283                key_path: auto.key_path,
2284                client_ca_path: config.http_tls_client_ca.clone(),
2285            })
2286        }
2287        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2288    }
2289}
2290
2291#[inline(never)]
2292fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2293    let workers = config.workers;
2294    let cli_telemetry = config.telemetry.clone();
2295    let db_options = config.to_db_options()?;
2296    let rt_config = detect_runtime_config();
2297    let mut transport_readiness = TransportReadiness::default();
2298    let Some(grpc_listener) = bind_listener_for_startup(
2299        &mut transport_readiness,
2300        "grpc",
2301        &bind_addr,
2302        config.grpc_bind_explicit,
2303    )?
2304    else {
2305        return Err(format!(
2306            "no gRPC listener started; implicit bind {} failed",
2307            bind_addr
2308        ));
2309    };
2310
2311    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2312
2313    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2314        .enable_all()
2315        .worker_threads(worker_threads)
2316        .thread_stack_size(rt_config.stack_size)
2317        .build()
2318        .map_err(|err| format!("tokio runtime: {err}"))?;
2319
2320    // Guard lives on the outer stack so it outlives the tokio runtime.
2321    let (runtime, auth_store, _telemetry_guard) =
2322        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2323    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2324    let signal_runtime = runtime.clone();
2325    tokio_runtime.block_on(async move {
2326        spawn_lifecycle_signal_handler(signal_runtime).await;
2327        // Start wire protocol listeners (plaintext + TLS)
2328        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2329
2330        // Start PostgreSQL wire listener when --pg-bind is configured.
2331        spawn_pg_listener(&config, &runtime);
2332
2333        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
2334        // it spawns a separate listener so plaintext + TLS can run
2335        // side-by-side (50051 plain + 50052 TLS, etc.).
2336        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2337
2338        let server = RedDBGrpcServer::with_options(
2339            runtime,
2340            GrpcServerOptions {
2341                bind_addr: bind_addr.clone(),
2342                tls: None,
2343            },
2344            auth_store,
2345        );
2346
2347        tracing::info!(
2348            transport = "grpc",
2349            bind = %bind_addr,
2350            cpus = rt_config.available_cpus,
2351            workers = worker_threads,
2352            "listener online"
2353        );
2354        server
2355            .serve_on(grpc_listener)
2356            .await
2357            .map_err(|err| err.to_string())
2358    })
2359}
2360
2361#[inline(never)]
2362fn run_dual_server(
2363    config: ServerCommandConfig,
2364    grpc_bind_addr: String,
2365    http_bind_addr: String,
2366) -> Result<(), String> {
2367    let workers = config.workers;
2368    let cli_telemetry = config.telemetry.clone();
2369    let db_options = config.to_db_options()?;
2370    let rt_config = detect_runtime_config();
2371    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2372    let mut transport_readiness = TransportReadiness::default();
2373    let http_listener = bind_listener_for_startup(
2374        &mut transport_readiness,
2375        "http",
2376        &http_bind_addr,
2377        config.http_bind_explicit,
2378    )?;
2379    let grpc_listener = bind_listener_for_startup(
2380        &mut transport_readiness,
2381        "grpc",
2382        &grpc_bind_addr,
2383        config.grpc_bind_explicit,
2384    )?;
2385    if http_listener.is_none() && grpc_listener.is_none() {
2386        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2387    }
2388    let (runtime, auth_store, _telemetry_guard) =
2389        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2390    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2391
2392    spawn_admin_metrics_listeners(&runtime, &auth_store);
2393    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2394
2395    let http_handle = if let Some(listener) = http_listener {
2396        let http_server = build_http_server_with_transport_readiness(
2397            runtime.clone(),
2398            auth_store.clone(),
2399            http_bind_addr.clone(),
2400            transport_readiness.clone(),
2401        );
2402        let http_server = apply_http_limits(http_server, &config, &runtime);
2403        Some(http_server.serve_in_background_on(listener))
2404    } else {
2405        None
2406    };
2407
2408    thread::sleep(Duration::from_millis(150));
2409    if let Some(handle) = http_handle.as_ref() {
2410        if handle.is_finished() {
2411            let handle = http_handle.unwrap();
2412            return match handle.join() {
2413                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2414                Ok(Err(err)) => Err(err.to_string()),
2415                Err(_) => Err("HTTP server thread panicked".to_string()),
2416            };
2417        }
2418    }
2419    if grpc_listener.is_none() {
2420        let Some(handle) = http_handle else {
2421            return Err("no listener started".to_string());
2422        };
2423        return match handle.join() {
2424            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2425            Ok(Err(err)) => Err(err.to_string()),
2426            Err(_) => Err("HTTP server thread panicked".to_string()),
2427        };
2428    }
2429    let grpc_listener = grpc_listener.expect("checked above");
2430
2431    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2432        .enable_all()
2433        .worker_threads(worker_threads)
2434        .thread_stack_size(rt_config.stack_size)
2435        .build()
2436        .map_err(|err| format!("tokio runtime: {err}"))?;
2437
2438    let signal_runtime = runtime.clone();
2439    tokio_runtime.block_on(async move {
2440        spawn_lifecycle_signal_handler(signal_runtime).await;
2441        // Start wire protocol listeners (plaintext + TLS)
2442        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2443
2444        // Start PostgreSQL wire listener when --pg-bind is configured.
2445        spawn_pg_listener(&config, &runtime);
2446
2447        // Optional TLS gRPC listener — runs alongside the plaintext one.
2448        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2449
2450        let server = RedDBGrpcServer::with_options(
2451            runtime,
2452            GrpcServerOptions {
2453                bind_addr: grpc_bind_addr.clone(),
2454                tls: None,
2455            },
2456            auth_store,
2457        );
2458
2459        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2460        tracing::info!(
2461            transport = "grpc",
2462            bind = %grpc_bind_addr,
2463            cpus = rt_config.available_cpus,
2464            workers = worker_threads,
2465            "listener online"
2466        );
2467        server
2468            .serve_on(grpc_listener)
2469            .await
2470            .map_err(|err| err.to_string())
2471    })
2472}
2473
2474#[cfg(test)]
2475mod tests {
2476    use super::*;
2477
2478    #[test]
2479    fn render_systemd_unit_contains_expected_execstart() {
2480        let config = SystemdServiceConfig {
2481            service_name: "reddb".to_string(),
2482            binary_path: PathBuf::from("/usr/local/bin/red"),
2483            run_user: "reddb".to_string(),
2484            run_group: "reddb".to_string(),
2485            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2486            router_bind_addr: None,
2487            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2488            http_bind_addr: None,
2489        };
2490
2491        let unit = render_systemd_unit(&config);
2492        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2493        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2494    }
2495
2496    #[test]
2497    fn systemd_service_config_derives_paths() {
2498        let config = SystemdServiceConfig {
2499            service_name: "reddb-api".to_string(),
2500            binary_path: PathBuf::from("/usr/local/bin/red"),
2501            run_user: "reddb".to_string(),
2502            run_group: "reddb".to_string(),
2503            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2504            router_bind_addr: None,
2505            grpc_bind_addr: None,
2506            http_bind_addr: Some("127.0.0.1:5055".to_string()),
2507        };
2508
2509        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2510        assert_eq!(
2511            config.unit_path(),
2512            PathBuf::from("/etc/systemd/system/reddb-api.service")
2513        );
2514    }
2515
2516    #[test]
2517    fn render_systemd_unit_supports_dual_transport() {
2518        let config = SystemdServiceConfig {
2519            service_name: "reddb".to_string(),
2520            binary_path: PathBuf::from("/usr/local/bin/red"),
2521            run_user: "reddb".to_string(),
2522            run_group: "reddb".to_string(),
2523            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2524            router_bind_addr: None,
2525            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2526            http_bind_addr: Some("0.0.0.0:5055".to_string()),
2527        };
2528
2529        let unit = render_systemd_unit(&config);
2530        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2531        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2532    }
2533
2534    #[test]
2535    fn render_systemd_unit_supports_router_mode() {
2536        let config = SystemdServiceConfig {
2537            service_name: "reddb".to_string(),
2538            binary_path: PathBuf::from("/usr/local/bin/red"),
2539            run_user: "reddb".to_string(),
2540            run_group: "reddb".to_string(),
2541            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2542            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2543            grpc_bind_addr: None,
2544            http_bind_addr: None,
2545        };
2546
2547        let unit = render_systemd_unit(&config);
2548        assert!(unit.contains("--bind 127.0.0.1:5050"));
2549        assert!(!unit.contains("--grpc-bind"));
2550        assert!(!unit.contains("--http-bind"));
2551    }
2552
2553    #[test]
2554    fn explicit_bind_collision_is_fatal() {
2555        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2556        let addr = held.local_addr().expect("held addr").to_string();
2557        let mut readiness = TransportReadiness::default();
2558
2559        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2560
2561        assert!(error.contains("explicit http listener bind"));
2562        assert_eq!(readiness.active.len(), 0);
2563        assert_eq!(readiness.failed.len(), 1);
2564        assert!(readiness.failed[0].explicit);
2565        assert_eq!(readiness.failed[0].bind_addr, addr);
2566    }
2567
2568    #[test]
2569    fn implicit_bind_collision_degrades() {
2570        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2571        let addr = held.local_addr().expect("held addr").to_string();
2572        let mut readiness = TransportReadiness::default();
2573
2574        let listener =
2575            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2576
2577        assert!(listener.is_none());
2578        assert_eq!(readiness.active.len(), 0);
2579        assert_eq!(readiness.failed.len(), 1);
2580        assert!(!readiness.failed[0].explicit);
2581        assert_eq!(readiness.failed[0].bind_addr, addr);
2582    }
2583}