Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18/// Detect available CPU cores and suggest worker thread count.
19pub fn detect_runtime_config() -> RuntimeConfig {
20    let cpus = thread::available_parallelism()
21        .map(|n| n.get())
22        .unwrap_or(1);
23
24    // Reserve 1 core for OS, use the rest for workers (minimum 1)
25    let suggested_workers = cpus.saturating_sub(1).max(1);
26
27    RuntimeConfig {
28        available_cpus: cpus,
29        suggested_workers,
30        stack_size: 8 * 1024 * 1024, // 16MB default
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36    pub available_cpus: usize,
37    pub suggested_workers: usize,
38    pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43    Grpc,
44    Http,
45    Wire,
46}
47
48impl ServerTransport {
49    pub const fn as_str(self) -> &'static str {
50        match self {
51            Self::Grpc => "gRPC",
52            Self::Http => "HTTP",
53            Self::Wire => "wire",
54        }
55    }
56
57    pub const fn default_bind_addr(self) -> &'static str {
58        match self {
59            Self::Grpc => "127.0.0.1:5555",
60            Self::Http => "127.0.0.1:5055",
61            Self::Wire => "127.0.0.1:5050",
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68    pub path: Option<PathBuf>,
69    pub router_bind_addr: Option<String>,
70    pub router_bind_explicit: bool,
71    pub grpc_bind_addr: Option<String>,
72    pub grpc_bind_explicit: bool,
73    /// TLS-encrypted gRPC bind address. Can run side-by-side with
74    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
75    /// stand alone for TLS-only deploys. Defaults to `None`.
76    pub grpc_tls_bind_addr: Option<String>,
77    /// Path to PEM-encoded gRPC server certificate. Resolved through
78    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
79    /// mounts). When `None` and dev-mode is enabled
80    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
81    /// cert and prints its SHA-256 fingerprint to stderr.
82    pub grpc_tls_cert: Option<PathBuf>,
83    /// Path to PEM-encoded gRPC server private key. Same env-var
84    /// pattern as `grpc_tls_cert`.
85    pub grpc_tls_key: Option<PathBuf>,
86    /// Optional path to a PEM bundle of trust anchors used to verify
87    /// client certificates. When set, the gRPC listener requires
88    /// every client to present a cert that chains to this CA — i.e.
89    /// mutual TLS. When unset, one-way TLS only.
90    pub grpc_tls_client_ca: Option<PathBuf>,
91    pub http_bind_addr: Option<String>,
92    pub http_bind_explicit: bool,
93    /// HTTPS bind address. When set, the HTTP server also serves a
94    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
95    /// run side by side (e.g. 8080 plain + 8443 TLS).
96    pub http_tls_bind_addr: Option<String>,
97    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
98    /// companion when not set explicitly.
99    pub http_tls_cert: Option<PathBuf>,
100    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
101    /// companion when not set explicitly.
102    pub http_tls_key: Option<PathBuf>,
103    /// Optional PEM CA bundle. When set, the HTTPS listener requires
104    /// every client to present a cert that chains to a CA in this
105    /// bundle (mTLS). When unset, plain server-side TLS only.
106    pub http_tls_client_ca: Option<PathBuf>,
107    pub wire_bind_addr: Option<String>,
108    pub wire_bind_explicit: bool,
109    /// TLS-encrypted wire protocol bind address
110    pub wire_tls_bind_addr: Option<String>,
111    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
112    pub wire_tls_cert: Option<PathBuf>,
113    /// Path to TLS key PEM
114    pub wire_tls_key: Option<PathBuf>,
115    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
116    /// When set the server accepts psql / JDBC / DBeaver clients on
117    /// this port via the v3 protocol. Defaults to None (disabled).
118    pub pg_bind_addr: Option<String>,
119    pub create_if_missing: bool,
120    pub read_only: bool,
121    pub role: String,
122    pub primary_addr: Option<String>,
123    pub vault: bool,
124    /// Override worker thread count (None = auto-detect from CPUs)
125    pub workers: Option<usize>,
126    /// Telemetry config (Phase 6 logging). `None` falls back to the
127    /// built-in default derived from `path` + stderr-only.
128    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct TransportListenerState {
133    pub transport: String,
134    pub bind_addr: String,
135    pub explicit: bool,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
139pub struct TransportListenerFailure {
140    pub transport: String,
141    pub bind_addr: String,
142    pub explicit: bool,
143    pub reason: String,
144}
145
146#[derive(Debug, Clone, Default, PartialEq, Eq)]
147pub struct TransportReadiness {
148    pub active: Vec<TransportListenerState>,
149    pub failed: Vec<TransportListenerFailure>,
150}
151
152impl TransportReadiness {
153    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
154        self.active.push(TransportListenerState {
155            transport: transport.to_string(),
156            bind_addr: bind_addr.to_string(),
157            explicit,
158        });
159    }
160
161    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
162        self.failed.push(TransportListenerFailure {
163            transport: transport.to_string(),
164            bind_addr: bind_addr.to_string(),
165            explicit,
166            reason,
167        });
168    }
169}
170
171#[derive(Debug, Clone)]
172pub struct SystemdServiceConfig {
173    pub service_name: String,
174    pub binary_path: PathBuf,
175    pub run_user: String,
176    pub run_group: String,
177    pub data_path: PathBuf,
178    pub router_bind_addr: Option<String>,
179    pub grpc_bind_addr: Option<String>,
180    pub http_bind_addr: Option<String>,
181}
182
183impl SystemdServiceConfig {
184    pub fn data_dir(&self) -> PathBuf {
185        self.data_path
186            .parent()
187            .map(PathBuf::from)
188            .unwrap_or_else(|| PathBuf::from("."))
189    }
190
191    pub fn unit_path(&self) -> PathBuf {
192        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
193    }
194}
195
196/// Build a sane default `TelemetryConfig` from a server path when the
197/// caller didn't set one explicitly. Writes rotating logs into the
198/// parent directory of the DB file (or `./logs` for in-memory /
199/// pathless runs). Level defaults to `info`, pretty stderr format.
200pub fn default_telemetry_for_path(
201    path: Option<&std::path::Path>,
202) -> crate::telemetry::TelemetryConfig {
203    let log_dir = match path {
204        Some(p) => p
205            .parent()
206            .map(|parent| parent.join("logs"))
207            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
208        None => None, // in-memory — no file, stderr-only
209    };
210    crate::telemetry::TelemetryConfig {
211        log_dir,
212        file_prefix: "reddb.log".to_string(),
213        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
214        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
215            crate::telemetry::LogFormat::Pretty
216        } else {
217            crate::telemetry::LogFormat::Json
218        },
219        rotation_keep_days: 14,
220        service_name: "reddb",
221        // Implicit defaults — no CLI flag has claimed these values yet.
222        level_explicit: false,
223        format_explicit: false,
224        rotation_keep_days_explicit: false,
225        file_prefix_explicit: false,
226        log_dir_explicit: false,
227        log_file_disabled: false,
228    }
229}
230
231impl ServerCommandConfig {
232    fn to_db_options(&self) -> RedDBOptions {
233        let mut options = match &self.path {
234            Some(path) => RedDBOptions::persistent(path),
235            None => RedDBOptions::in_memory(),
236        };
237
238        options.mode = StorageMode::Persistent;
239        options.create_if_missing = self.create_if_missing;
240        // PLAN.md Phase 4.3 — read_only resolution priority:
241        //   1. CLI flag (`--readonly`) — explicit operator intent.
242        //   2. `RED_READONLY=true` env — orchestrator override.
243        //   3. Persisted `<data>/.runtime-state.json` from a prior
244        //      `POST /admin/readonly` — survives restart.
245        //   4. Default `false`.
246        options.read_only = self.read_only
247            || env_nonempty("RED_READONLY")
248                .or_else(|| env_nonempty("REDDB_READONLY"))
249                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
250                .unwrap_or(false)
251            || self.path.as_ref().is_some_and(|data_path| {
252                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
253                    data_path,
254                ))
255                .unwrap_or(false)
256            });
257
258        options.replication = match self.role.as_str() {
259            "primary" => ReplicationConfig::primary(),
260            "replica" => {
261                let primary_addr = self
262                    .primary_addr
263                    .clone()
264                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
265                // Public-mutation rejection on replicas is enforced by
266                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
267                // Leaving `options.read_only = false` keeps the pager
268                // writable so the internal logical-WAL apply path can
269                // ingest records from the primary; WriteGate ensures no
270                // client request reaches storage.
271                ReplicationConfig::replica(primary_addr)
272            }
273            _ => ReplicationConfig::standalone(),
274        };
275
276        if self.vault {
277            options.auth.vault_enabled = true;
278        }
279
280        configure_remote_backend_from_env(&mut options);
281
282        options
283    }
284
285    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
286        let mut transports = Vec::with_capacity(3);
287        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
288            transports.push(ServerTransport::Grpc);
289        }
290        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
291            transports.push(ServerTransport::Http);
292        }
293        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
294            transports.push(ServerTransport::Wire);
295        }
296        transports
297    }
298}
299
300/// Read an env var, treating empty / whitespace-only as `None`.
301/// Honors the `<NAME>_FILE` convention. Re-exports the shared
302/// `crate::utils::env_with_file_fallback` helper so call sites in
303/// this module can keep their short local name.
304fn env_nonempty(name: &str) -> Option<String> {
305    crate::utils::env_with_file_fallback(name)
306}
307
308fn env_truthy(name: &str) -> bool {
309    env_nonempty(name)
310        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
311        .unwrap_or(false)
312}
313
314fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
315    // PLAN.md (cloud-agnostic) — prefer the new spelling
316    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
317    // existing dev installs. `none` (default) means standalone — no
318    // remote backend, valid for development and on-prem without
319    // remote.
320    let backend = env_nonempty("RED_BACKEND")
321        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
322        .unwrap_or_else(|| "none".to_string())
323        .to_ascii_lowercase();
324
325    match backend.as_str() {
326        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
327        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
328        // IDrive, Storj. The `path_style` toggle in S3Config picks
329        // the right addressing for self-hosted vs hosted.
330        "s3" | "minio" | "r2" => {
331            #[cfg(feature = "backend-s3")]
332            {
333                if let Some(config) = s3_config_from_env() {
334                    let remote_key = env_nonempty("RED_REMOTE_KEY")
335                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
336                        .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
337                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
338                    options.remote_backend = Some(backend.clone());
339                    options.remote_backend_atomic = Some(backend);
340                    options.remote_key = Some(remote_key);
341                }
342            }
343            #[cfg(not(feature = "backend-s3"))]
344            {
345                tracing::warn!(
346                    backend = %backend,
347                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
348                );
349            }
350        }
351        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
352        // calls it `fs`; legacy code shipped it as `local`. Both
353        // names map to LocalBackend, with the remote_key derived
354        // from `RED_FS_PATH` + a per-database suffix when provided.
355        "fs" | "local" => {
356            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
357            let remote_key = match (
358                base_path,
359                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
360            ) {
361                (Some(base), Some(rel)) => Some(format!(
362                    "{}/{}",
363                    base.trim_end_matches('/'),
364                    rel.trim_start_matches('/')
365                )),
366                (Some(base), None) => Some(format!(
367                    "{}/clusters/dev/data.rdb",
368                    base.trim_end_matches('/')
369                )),
370                (None, Some(rel)) => Some(rel),
371                (None, None) => None,
372            };
373            if let Some(key) = remote_key {
374                let backend = Arc::new(crate::storage::backend::LocalBackend);
375                options.remote_backend = Some(backend.clone());
376                options.remote_backend_atomic = Some(backend);
377                options.remote_key = Some(key);
378            }
379        }
380        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
381        // portability: any service exposing PUT/GET/DELETE serves
382        // as a backup target. Optional auth via *_FILE secret
383        // path keeps the token out of the env.
384        "http" => {
385            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
386                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
387            {
388                Some(u) => u,
389                None => {
390                    tracing::warn!(
391                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
392                    );
393                    return;
394                }
395            };
396            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
397                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
398                .unwrap_or_default();
399            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
400                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
401            {
402                std::fs::read_to_string(&path)
403                    .ok()
404                    .map(|s| s.trim().to_string())
405                    .filter(|s| !s.is_empty())
406            } else {
407                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
408                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
409            };
410
411            let mut config =
412                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
413            if let Some(auth) = auth_header {
414                config = config.with_auth_header(auth);
415            }
416            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
417                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
418                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
419            config = config.with_conditional_writes(conditional_writes);
420            // Always populate the snapshot-transport handle. When the
421            // operator confirmed CAS support, also populate the atomic
422            // handle via AtomicHttpBackend — without that flag,
423            // LeaseStore must remain unreachable on this backend.
424            if conditional_writes {
425                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
426                    Ok(atomic) => {
427                        let atomic_arc = Arc::new(atomic);
428                        options.remote_backend = Some(atomic_arc.clone());
429                        options.remote_backend_atomic = Some(atomic_arc);
430                    }
431                    Err(err) => {
432                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
433                        options.remote_backend =
434                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
435                    }
436                }
437            } else {
438                options.remote_backend =
439                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
440            }
441            options.remote_key = env_nonempty("RED_REMOTE_KEY")
442                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
443                .or_else(|| Some("clusters/dev/data.rdb".to_string()));
444        }
445        // `none` is the explicit standalone — no remote, no backup
446        // pipeline. Boot never blocks on network reachability.
447        "none" | "" => {}
448        other => {
449            tracing::warn!(
450                backend = %other,
451                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
452            );
453        }
454    }
455}
456
457/// Resolve a value from env, accepting both the cloud-agnostic
458/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
459/// kept for existing dev installs. The new spelling wins; the
460/// legacy spelling is read with a warning hint in callers' logs.
461#[cfg(feature = "backend-s3")]
462fn env_s3(suffix: &str) -> Option<String> {
463    env_nonempty(&format!("RED_S3_{suffix}"))
464        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
465}
466
467/// Read a secret value from either the literal env var or a file
468/// path supplied via `*_FILE` (PLAN.md spec — compatible with
469/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
470/// secrets). The `_FILE` variant wins so an operator can set it to
471/// override the inline value without touching the inline env.
472#[cfg(feature = "backend-s3")]
473fn env_s3_secret(suffix: &str) -> Option<String> {
474    let file_key_red = format!("RED_S3_{suffix}_FILE");
475    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
476    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
477        return std::fs::read_to_string(&path)
478            .ok()
479            .map(|s| s.trim().to_string())
480            .filter(|s| !s.is_empty());
481    }
482    env_s3(suffix)
483}
484
485#[cfg(feature = "backend-s3")]
486fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
487    let endpoint = env_s3("ENDPOINT")?;
488    let bucket = env_s3("BUCKET")?;
489    let access_key = env_s3_secret("ACCESS_KEY")?;
490    let secret_key = env_s3_secret("SECRET_KEY")?;
491    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
492    let key_prefix = env_s3("KEY_PREFIX")
493        .or_else(|| env_s3("PREFIX"))
494        .unwrap_or_default();
495    let path_style = env_s3("PATH_STYLE")
496        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
497        .unwrap_or(true);
498    Some(crate::storage::backend::S3Config {
499        endpoint,
500        bucket,
501        key_prefix,
502        access_key,
503        secret_key,
504        region,
505        path_style,
506    })
507}
508
509pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
510    let data_dir = config.data_dir();
511    let exec_start = render_systemd_exec_start(config);
512    format!(
513        "[Unit]\n\
514Description=RedDB unified database service\n\
515After=network-online.target\n\
516Wants=network-online.target\n\
517\n\
518[Service]\n\
519Type=simple\n\
520User={user}\n\
521Group={group}\n\
522WorkingDirectory={workdir}\n\
523ExecStart={exec_start}\n\
524Restart=always\n\
525RestartSec=2\n\
526LimitSTACK=16M\n\
527NoNewPrivileges=true\n\
528PrivateTmp=true\n\
529ProtectSystem=strict\n\
530ProtectHome=true\n\
531ProtectControlGroups=true\n\
532ProtectKernelTunables=true\n\
533ProtectKernelModules=true\n\
534RestrictNamespaces=true\n\
535LockPersonality=true\n\
536MemoryDenyWriteExecute=true\n\
537ReadWritePaths={workdir}\n\
538\n\
539[Install]\n\
540WantedBy=multi-user.target\n",
541        user = config.run_user,
542        group = config.run_group,
543        workdir = data_dir.display(),
544        exec_start = exec_start,
545    )
546}
547
548/// Install a systemd unit + start the service.
549///
550/// Linux-only. The helper shells out to `systemctl`, `useradd`,
551/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
552/// Windows or macOS. The Windows/macOS branch returns a hard error so
553/// callers (the CLI) surface a clear message instead of a confusing
554/// syscall failure. A proper Windows-service equivalent (sc.exe /
555/// NSSM) is a Phase 3.6 follow-up.
556#[cfg(target_os = "linux")]
557pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
558    ensure_root()?;
559    ensure_command_available("systemctl")?;
560    ensure_command_available("getent")?;
561    ensure_command_available("groupadd")?;
562    ensure_command_available("useradd")?;
563    ensure_command_available("install")?;
564    ensure_executable(&config.binary_path)?;
565
566    if !command_success("getent", ["group", config.run_group.as_str()])? {
567        run_command("groupadd", ["--system", config.run_group.as_str()])?;
568    }
569
570    if !command_success("id", ["-u", config.run_user.as_str()])? {
571        let data_dir = config.data_dir();
572        run_command(
573            "useradd",
574            [
575                "--system",
576                "--gid",
577                config.run_group.as_str(),
578                "--home-dir",
579                data_dir.to_string_lossy().as_ref(),
580                "--shell",
581                "/usr/sbin/nologin",
582                config.run_user.as_str(),
583            ],
584        )?;
585    }
586
587    let data_dir = config.data_dir();
588    run_command(
589        "install",
590        [
591            "-d",
592            "-o",
593            config.run_user.as_str(),
594            "-g",
595            config.run_group.as_str(),
596            "-m",
597            "0750",
598            data_dir.to_string_lossy().as_ref(),
599        ],
600    )?;
601
602    std::fs::write(config.unit_path(), render_systemd_unit(config))
603        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
604
605    run_command("systemctl", ["daemon-reload"])?;
606    run_command(
607        "systemctl",
608        [
609            "enable",
610            "--now",
611            format!("{}.service", config.service_name).as_str(),
612        ],
613    )?;
614
615    Ok(())
616}
617
618/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
619/// identical so callers compile on every platform; surface a clear
620/// error at runtime. Windows/macOS service-manager integration is a
621/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
622#[cfg(not(target_os = "linux"))]
623pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
624    Err("systemd install is Linux-only — use sc.exe (Windows) or \
625         launchd (macOS) to install the service manually using the \
626         unit printed by `red service print-unit`"
627        .to_string())
628}
629
630#[cfg(target_os = "linux")]
631fn ensure_root() -> Result<(), String> {
632    let output = Command::new("id")
633        .arg("-u")
634        .output()
635        .map_err(|err| format!("failed to determine current uid: {err}"))?;
636    if !output.status.success() {
637        return Err("failed to determine current uid".to_string());
638    }
639    let uid = String::from_utf8_lossy(&output.stdout);
640    if uid.trim() != "0" {
641        return Err("run this command as root (sudo)".to_string());
642    }
643    Ok(())
644}
645
646#[cfg(target_os = "linux")]
647fn ensure_command_available(command: &str) -> Result<(), String> {
648    let status = Command::new("sh")
649        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
650        .status()
651        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
652    if status.success() {
653        Ok(())
654    } else {
655        Err(format!("required command not found: {command}"))
656    }
657}
658
659#[cfg(target_os = "linux")]
660fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
661    let metadata = std::fs::metadata(path)
662        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
663    #[cfg(unix)]
664    {
665        use std::os::unix::fs::PermissionsExt;
666        if metadata.permissions().mode() & 0o111 == 0 {
667            return Err(format!("binary is not executable: {}", path.display()));
668        }
669    }
670    #[cfg(not(unix))]
671    {
672        if !metadata.is_file() {
673            return Err(format!("binary is not a file: {}", path.display()));
674        }
675    }
676    Ok(())
677}
678
679#[cfg(target_os = "linux")]
680fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
681    Command::new(program)
682        .args(args)
683        .status()
684        .map(|status| status.success())
685        .map_err(|err| format!("failed to run {program}: {err}"))
686}
687
688#[cfg(target_os = "linux")]
689fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
690    let output = Command::new(program)
691        .args(args)
692        .output()
693        .map_err(|err| format!("failed to run {program}: {err}"))?;
694    if output.status.success() {
695        return Ok(());
696    }
697
698    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
699    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
700    let detail = if !stderr.is_empty() {
701        stderr
702    } else if !stdout.is_empty() {
703        stdout
704    } else {
705        format!("exit status {}", output.status)
706    };
707    Err(format!("{program} failed: {detail}"))
708}
709
710pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
711    let has_any = config.router_bind_addr.is_some()
712        || config.grpc_bind_addr.is_some()
713        || config.http_bind_addr.is_some()
714        || config.wire_bind_addr.is_some()
715        || config.pg_bind_addr.is_some();
716    if !has_any {
717        return Err("at least one server bind address must be configured".into());
718    }
719    let thread_name = if config.router_bind_addr.is_some() {
720        "red-server-router"
721    } else {
722        match (
723            config.grpc_bind_addr.is_some(),
724            config.http_bind_addr.is_some(),
725        ) {
726            (true, true) => "red-server-dual",
727            (true, false) => "red-server-grpc",
728            (false, true) => "red-server-http",
729            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
730            (false, false) => "red-server-pg-wire",
731        }
732    };
733
734    let handle = thread::Builder::new()
735        .name(thread_name.into())
736        .stack_size(8 * 1024 * 1024)
737        .spawn(move || run_configured_servers(config))
738        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
739
740    match handle.join() {
741        Ok(result) => result,
742        Err(_) => Err("server thread panicked".to_string()),
743    }
744}
745
746fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
747    let mut parts = vec![
748        config.binary_path.display().to_string(),
749        "server".to_string(),
750        "--path".to_string(),
751        config.data_path.display().to_string(),
752    ];
753
754    if let Some(bind_addr) = &config.router_bind_addr {
755        parts.push("--bind".to_string());
756        parts.push(bind_addr.clone());
757    } else if let Some(bind_addr) = &config.grpc_bind_addr {
758        parts.push("--grpc-bind".to_string());
759        parts.push(bind_addr.clone());
760    }
761    if let Some(bind_addr) = &config.http_bind_addr {
762        parts.push("--http-bind".to_string());
763        parts.push(bind_addr.clone());
764    }
765
766    parts.join(" ")
767}
768
769pub fn probe_listener(target: &str, timeout: Duration) -> bool {
770    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
771        Ok(addresses) => addresses.collect(),
772        Err(_) => return false,
773    };
774
775    addresses
776        .into_iter()
777        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
778}
779
780#[inline(never)]
781fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
782    // Phase 6 logging is initialised inside each runner once the
783    // runtime is open — see `build_runtime_and_auth_store`. Going
784    // after DB open lets us read `red.logging.*` config keys out of
785    // the persistent red_config store and merge them with the CLI
786    // flags (flag > red_config > built-in default).
787    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
788        return run_routed_server(config, router_bind_addr);
789    }
790
791    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
792        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
793            run_dual_server(config, grpc_bind_addr, http_bind_addr)
794        }
795        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
796        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
797        (None, None) => {
798            if let Some(wire_addr) = config.wire_bind_addr.clone() {
799                run_wire_only_server(config, wire_addr)
800            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
801                run_pg_only_server(config, pg_addr)
802            } else {
803                Err("at least one server bind address must be configured".to_string())
804            }
805        }
806    }
807}
808
809fn bind_listener_for_startup(
810    readiness: &mut TransportReadiness,
811    transport: &str,
812    bind_addr: &str,
813    explicit: bool,
814) -> Result<Option<TcpListener>, String> {
815    match TcpListener::bind(bind_addr) {
816        Ok(listener) => {
817            readiness.active(transport, bind_addr, explicit);
818            Ok(Some(listener))
819        }
820        Err(err) => {
821            let reason = format!("{transport} listener bind {bind_addr}: {err}");
822            readiness.failed(transport, bind_addr, explicit, reason.clone());
823            if explicit {
824                tracing::error!(
825                    transport,
826                    bind = %bind_addr,
827                    error = %err,
828                    "fatal explicit bind failure"
829                );
830                Err(format!("explicit {reason}"))
831            } else {
832                tracing::warn!(
833                    transport,
834                    bind = %bind_addr,
835                    error = %err,
836                    "non-fatal implicit bind failure; listener degraded"
837                );
838                Ok(None)
839            }
840        }
841    }
842}
843
844/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
845///
846/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
847/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
848/// grace window. SIGKILL after that grace window is the OS's
849/// fallback; we are responsible for finishing in time.
850///
851/// Spawns a tokio task on the caller's runtime that:
852///   1. Awaits the first of SIGTERM / SIGINT.
853///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
854///      runtime moves to `Stopped` on its own; this just runs the
855///      flush + checkpoint pipeline and (optionally) a final backup.
856///   3. Calls `std::process::exit(0)` so the orchestrator sees a
857///      clean exit code.
858///
859/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
860/// configured) toggles step 3's backup branch. The flush + checkpoint
861/// always run.
862///
863/// Idempotent across signal storms — `graceful_shutdown` returns the
864/// cached report on second call, but we exit on the first one
865/// regardless, so the second SIGTERM never reaches the handler.
866async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
867    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
868        .ok()
869        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
870        .unwrap_or(true);
871
872    #[cfg(unix)]
873    {
874        use tokio::signal::unix::{signal, SignalKind};
875
876        let mut sigterm = match signal(SignalKind::terminate()) {
877            Ok(s) => s,
878            Err(err) => {
879                tracing::warn!(
880                    error = %err,
881                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
882                );
883                return;
884            }
885        };
886        let mut sigint = match signal(SignalKind::interrupt()) {
887            Ok(s) => s,
888            Err(err) => {
889                tracing::warn!(error = %err, "could not install SIGINT handler");
890                return;
891            }
892        };
893        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
894        // their `_FILE` companions without restarting the process.
895        // Useful for credential rotation pipelines (kubectl create
896        // secret + kubectl rollout restart, but for systemd / Nomad /
897        // bare-metal where rolling the process is heavier).
898        let mut sighup = match signal(SignalKind::hangup()) {
899            Ok(s) => Some(s),
900            Err(err) => {
901                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
902                None
903            }
904        };
905
906        let reload_runtime = runtime.clone();
907        tokio::spawn(async move {
908            loop {
909                let signal_name = match &mut sighup {
910                    Some(hup) => tokio::select! {
911                        _ = sigterm.recv() => "SIGTERM",
912                        _ = sigint.recv() => "SIGINT",
913                        _ = hup.recv() => "SIGHUP",
914                    },
915                    None => tokio::select! {
916                        _ = sigterm.recv() => "SIGTERM",
917                        _ = sigint.recv() => "SIGINT",
918                    },
919                };
920
921                if signal_name == "SIGHUP" {
922                    handle_sighup_reload(&reload_runtime);
923                    continue; // stay alive; SIGHUP isn't a shutdown
924                }
925
926                tracing::info!(
927                    signal = signal_name,
928                    "lifecycle signal received; shutting down"
929                );
930                match runtime.graceful_shutdown(backup_on_shutdown) {
931                    Ok(report) => {
932                        tracing::info!(
933                            duration_ms = report.duration_ms,
934                            flushed_wal = report.flushed_wal,
935                            final_checkpoint = report.final_checkpoint,
936                            backup_uploaded = report.backup_uploaded,
937                            "graceful shutdown complete"
938                        );
939                    }
940                    Err(err) => {
941                        tracing::error!(error = %err, "graceful shutdown failed");
942                        // Issue #205 — graceful shutdown returning Err
943                        // means the runtime is exiting without a clean
944                        // flush/checkpoint. Operator-grade event so the
945                        // operator notices the dirty exit even when the
946                        // process restarts before they read tracing logs.
947                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
948                            reason: format!("graceful shutdown failed: {err}"),
949                        }
950                        .emit_global();
951                    }
952                }
953                std::process::exit(0);
954            }
955        });
956    }
957
958    #[cfg(not(unix))]
959    {
960        tokio::spawn(async move {
961            let interrupted = tokio::signal::ctrl_c().await;
962            if let Err(err) = interrupted {
963                tracing::warn!(error = %err, "could not install Ctrl+C handler");
964                return;
965            }
966
967            tracing::info!(
968                signal = "Ctrl+C",
969                "lifecycle signal received; shutting down"
970            );
971            match runtime.graceful_shutdown(backup_on_shutdown) {
972                Ok(report) => {
973                    tracing::info!(
974                        duration_ms = report.duration_ms,
975                        flushed_wal = report.flushed_wal,
976                        final_checkpoint = report.final_checkpoint,
977                        backup_uploaded = report.backup_uploaded,
978                        "graceful shutdown complete"
979                    );
980                }
981                Err(err) => {
982                    tracing::error!(error = %err, "graceful shutdown failed");
983                }
984            }
985            std::process::exit(0);
986        });
987    }
988}
989
990/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
991/// vars. Today this only refreshes the audit log + records the
992/// reload event; the runtime modules that hold cached secret
993/// material (S3 backend credentials, admin token, JWT keys) read
994/// the env on each request so the next call after SIGHUP picks up
995/// the new file contents automatically. A future extension can
996/// punch through to the LeaseStore / AuthStore for in-memory
997/// caches that don't re-read on each call.
998fn handle_sighup_reload(runtime: &RedDBRuntime) {
999    let now_ms = std::time::SystemTime::now()
1000        .duration_since(std::time::UNIX_EPOCH)
1001        .map(|d| d.as_millis() as u64)
1002        .unwrap_or(0);
1003    tracing::info!(
1004        target: "reddb::secrets",
1005        ts_unix_ms = now_ms,
1006        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1007    );
1008    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1009    // every emission goes through the typed-field guard. The
1010    // arguments here are static, but using the typed entry point
1011    // keeps the discipline uniform across call sites.
1012    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1013    runtime.audit_log().record_event(
1014        AuditEvent::builder("config/sighup_reload")
1015            .source(AuditAuthSource::System)
1016            .resource("secrets")
1017            .outcome(Outcome::Success)
1018            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1019            .build(),
1020    );
1021}
1022
1023#[inline(never)]
1024fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1025    let workers = config.workers;
1026    let cli_telemetry = config.telemetry.clone();
1027    let db_options = config.to_db_options();
1028    let rt_config = detect_runtime_config();
1029    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1030    let (runtime, auth_store, _telemetry_guard) =
1031        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1032
1033    spawn_admin_metrics_listeners(&runtime, &auth_store);
1034
1035    let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1036        .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1037    let http_backend = http_listener
1038        .local_addr()
1039        .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1040    let http_server = build_http_server(
1041        runtime.clone(),
1042        auth_store.clone(),
1043        http_backend.to_string(),
1044    );
1045    let http_handle = http_server.serve_in_background_on(http_listener);
1046
1047    thread::sleep(Duration::from_millis(100));
1048    if http_handle.is_finished() {
1049        return match http_handle.join() {
1050            Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1051            Ok(Err(err)) => Err(err.to_string()),
1052            Err(_) => Err("HTTP backend thread panicked".to_string()),
1053        };
1054    }
1055
1056    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1057        .enable_all()
1058        .worker_threads(worker_threads)
1059        .thread_stack_size(rt_config.stack_size)
1060        .build()
1061        .map_err(|err| format!("tokio runtime: {err}"))?;
1062
1063    let signal_runtime = runtime.clone();
1064    tokio_runtime.block_on(async move {
1065        spawn_lifecycle_signal_handler(signal_runtime).await;
1066        let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1067            .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1068        let grpc_backend = grpc_listener
1069            .local_addr()
1070            .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1071        let grpc_server = RedDBGrpcServer::with_options(
1072            runtime.clone(),
1073            GrpcServerOptions {
1074                bind_addr: grpc_backend.to_string(),
1075                tls: None,
1076            },
1077            auth_store,
1078        );
1079        tokio::spawn(async move {
1080            if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1081                tracing::error!(err = %err, "gRPC backend error");
1082            }
1083        });
1084
1085        let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1086            .await
1087            .map_err(|err| format!("bind internal wire listener: {err}"))?;
1088        let wire_backend = wire_listener
1089            .local_addr()
1090            .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1091        let wire_rt = Arc::new(runtime);
1092        tokio::spawn(async move {
1093            if let Err(err) =
1094                crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1095                    .await
1096            {
1097                tracing::error!(err = %err, "redwire backend error");
1098            }
1099        });
1100
1101        tracing::info!(
1102            bind = %router_bind_addr,
1103            cpus = rt_config.available_cpus,
1104            workers = worker_threads,
1105            "router bootstrapping"
1106        );
1107        serve_tcp_router(TcpProtocolRouterConfig {
1108            bind_addr: router_bind_addr,
1109            grpc_backend,
1110            http_backend,
1111            wire_backend,
1112        })
1113        .await
1114        .map_err(|err| err.to_string())
1115    })
1116}
1117
1118/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1119async fn spawn_wire_listeners(
1120    config: &ServerCommandConfig,
1121    runtime: &RedDBRuntime,
1122    readiness: &mut TransportReadiness,
1123) -> Result<(), String> {
1124    // Plaintext RedWire — TCP or Unix socket
1125    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1126        let wire_rt = Arc::new(runtime.clone());
1127        // Address starting with `unix://` or an absolute filesystem path
1128        // switches to Unix domain sockets.
1129        #[cfg(unix)]
1130        {
1131            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1132                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1133                tokio::spawn(async move {
1134                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1135                        &wire_addr, wire_rt,
1136                    )
1137                    .await
1138                    {
1139                        tracing::error!(err = %e, "redwire unix listener error");
1140                    }
1141                });
1142                return Ok(());
1143            }
1144        }
1145        match tokio::net::TcpListener::bind(&wire_addr).await {
1146            Ok(listener) => {
1147                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1148                tokio::spawn(async move {
1149                    if let Err(e) =
1150                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1151                            .await
1152                    {
1153                        tracing::error!(err = %e, "redwire listener error");
1154                    }
1155                });
1156            }
1157            Err(err) => {
1158                let reason = format!("wire listener bind {wire_addr}: {err}");
1159                readiness.failed(
1160                    "wire",
1161                    &wire_addr,
1162                    config.wire_bind_explicit,
1163                    reason.clone(),
1164                );
1165                if config.wire_bind_explicit {
1166                    tracing::error!(
1167                        transport = "wire",
1168                        bind = %wire_addr,
1169                        error = %err,
1170                        "fatal explicit bind failure"
1171                    );
1172                    return Err(format!("explicit {reason}"));
1173                }
1174                tracing::warn!(
1175                    transport = "wire",
1176                    bind = %wire_addr,
1177                    error = %err,
1178                    "non-fatal implicit bind failure; listener degraded"
1179                );
1180            }
1181        }
1182    }
1183
1184    // RedWire over TLS
1185    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1186        let tls_config = resolve_wire_tls_config(config);
1187        match tls_config {
1188            Ok(tls_cfg) => {
1189                let wire_rt = Arc::new(runtime.clone());
1190                tokio::spawn(async move {
1191                    if let Err(e) =
1192                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1193                            .await
1194                    {
1195                        tracing::error!(err = %e, "redwire+tls listener error");
1196                    }
1197                });
1198            }
1199            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1200        }
1201    }
1202    Ok(())
1203}
1204
1205/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1206///
1207/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1208/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1209/// alongside the native wire listener; the two transports do not
1210/// share a port.
1211fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1212    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1213        let rt = Arc::new(runtime.clone());
1214        tokio::spawn(async move {
1215            let cfg = crate::wire::PgWireConfig {
1216                bind_addr: pg_addr,
1217                ..Default::default()
1218            };
1219            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1220                tracing::error!(err = %e, "pg wire listener error");
1221            }
1222        });
1223    }
1224}
1225
1226/// Resolve gRPC TLS material into PEM bytes.
1227///
1228/// Lookup order, in priority:
1229///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1230///      passed via CLI/env). Both must be present together.
1231///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1232///      to the data dir. Refuses to run without the env flag so an
1233///      operator can't accidentally ship a dev cert in prod.
1234///
1235/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1236/// turning the listener into a mutual-TLS endpoint that requires
1237/// every client to present a chain that anchors at one of the CAs
1238/// in the bundle.
1239fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1240    use crate::utils::secret_file::expand_file_env;
1241
1242    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1243    // surface as warnings; the fallback path (explicit cert paths) will
1244    // cover the common case.
1245    for var in [
1246        "REDDB_GRPC_TLS_CERT",
1247        "REDDB_GRPC_TLS_KEY",
1248        "REDDB_GRPC_TLS_CLIENT_CA",
1249    ] {
1250        if let Err(err) = expand_file_env(var) {
1251            tracing::warn!(
1252                target: "reddb::secrets",
1253                env = %var,
1254                err = %err,
1255                "could not expand *_FILE companion for gRPC TLS"
1256            );
1257        }
1258    }
1259
1260    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1261        (Some(cert), Some(key)) => {
1262            let cert_pem = std::fs::read(cert)
1263                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1264            let key_pem =
1265                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1266            (cert_pem, key_pem)
1267        }
1268        _ => {
1269            // No explicit material → only proceed when dev-mode is on.
1270            let dev = std::env::var("RED_GRPC_TLS_DEV")
1271                .ok()
1272                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1273                .unwrap_or(false);
1274            if !dev {
1275                return Err("gRPC TLS configured but no cert/key supplied — set \
1276                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1277                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1278                    .to_string());
1279            }
1280            let dir = config
1281                .path
1282                .as_ref()
1283                .and_then(|p| p.parent())
1284                .map(PathBuf::from)
1285                .unwrap_or_else(|| PathBuf::from("."));
1286            let (cert_pem_str, key_pem_str) =
1287                crate::wire::tls::generate_self_signed_cert("localhost")
1288                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1289
1290            // Constant-time-friendly fingerprint to stderr so the
1291            // operator can pin a client trust store. We log via
1292            // `tracing::warn!` so it stands out next to ordinary
1293            // listener-online events.
1294            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1295            tracing::warn!(
1296                target: "reddb::security",
1297                transport = "grpc",
1298                cert_sha256 = %fp,
1299                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1300                 DO NOT use in production"
1301            );
1302            // Persist so that restarts reuse the same identity.
1303            let cert_path = dir.join("grpc-tls-cert.pem");
1304            let key_path = dir.join("grpc-tls-key.pem");
1305            if !cert_path.exists() || !key_path.exists() {
1306                let _ = std::fs::create_dir_all(&dir);
1307                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1308                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1309                std::fs::write(&key_path, key_pem_str.as_bytes())
1310                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1311                #[cfg(unix)]
1312                {
1313                    use std::os::unix::fs::PermissionsExt;
1314                    let _ =
1315                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1316                }
1317            }
1318            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1319        }
1320    };
1321
1322    let client_ca_pem = match &config.grpc_tls_client_ca {
1323        Some(path) => Some(
1324            std::fs::read(path)
1325                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1326        ),
1327        None => None,
1328    };
1329
1330    Ok(crate::GrpcTlsOptions {
1331        cert_pem,
1332        key_pem,
1333        client_ca_pem,
1334    })
1335}
1336
1337/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1338/// configured. Logs and continues on TLS-config errors so the plain
1339/// listener stays up; this matches the wire-listener pattern.
1340fn spawn_grpc_tls_listener_if_configured(
1341    config: &ServerCommandConfig,
1342    runtime: RedDBRuntime,
1343    auth_store: Arc<AuthStore>,
1344) {
1345    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1346        return;
1347    };
1348    let tls_opts = match resolve_grpc_tls_options(config) {
1349        Ok(opts) => opts,
1350        Err(err) => {
1351            tracing::error!(
1352                target: "reddb::security",
1353                transport = "grpc",
1354                err = %err,
1355                "gRPC TLS config error; TLS listener will not start"
1356            );
1357            return;
1358        }
1359    };
1360    tokio::spawn(async move {
1361        let server = RedDBGrpcServer::with_options(
1362            runtime,
1363            GrpcServerOptions {
1364                bind_addr: tls_bind.clone(),
1365                tls: Some(tls_opts),
1366            },
1367            auth_store,
1368        );
1369        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1370        if let Err(err) = server.serve().await {
1371            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1372        }
1373    });
1374}
1375
1376/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1377/// lines. Constant-time hash; no token contents leave this fn.
1378fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1379    use sha2::{Digest, Sha256};
1380    let mut h = Sha256::new();
1381    h.update(pem);
1382    let d = h.finalize();
1383    let mut buf = String::with_capacity(64);
1384    for b in d.iter() {
1385        buf.push_str(&format!("{b:02x}"));
1386    }
1387    buf
1388}
1389
1390/// Resolve TLS config: use provided cert/key or auto-generate.
1391fn resolve_wire_tls_config(
1392    config: &ServerCommandConfig,
1393) -> Result<crate::wire::WireTlsConfig, String> {
1394    match (&config.wire_tls_cert, &config.wire_tls_key) {
1395        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1396            cert_path: cert.clone(),
1397            key_path: key.clone(),
1398        }),
1399        _ => {
1400            // Auto-generate self-signed cert
1401            let dir = config
1402                .path
1403                .as_ref()
1404                .and_then(|p| p.parent())
1405                .map(PathBuf::from)
1406                .unwrap_or_else(|| PathBuf::from("."));
1407            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1408        }
1409    }
1410}
1411
1412#[inline(never)]
1413fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1414    let rt_config = detect_runtime_config();
1415    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1416    let cli_telemetry = config.telemetry.clone();
1417    let db_options = config.to_db_options();
1418    let mut transport_readiness = TransportReadiness::default();
1419
1420    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1421        .enable_all()
1422        .worker_threads(workers)
1423        .thread_stack_size(rt_config.stack_size)
1424        .build()
1425        .map_err(|err| format!("tokio runtime: {err}"))?;
1426
1427    // Guard lives on the outer thread's stack so it outlives the
1428    // tokio runtime — dropping it only after the listener returns
1429    // flushes the file log writer.
1430    let (runtime, _auth_store, _telemetry_guard) =
1431        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1432    let signal_runtime = runtime.clone();
1433    tokio_runtime.block_on(async move {
1434        spawn_lifecycle_signal_handler(signal_runtime).await;
1435        spawn_pg_listener(&config, &runtime);
1436        let wire_rt = Arc::new(runtime);
1437        let listener = tokio::net::TcpListener::bind(&wire_addr)
1438            .await
1439            .map_err(|err| {
1440                let reason = format!("wire listener bind {wire_addr}: {err}");
1441                transport_readiness.failed(
1442                    "wire",
1443                    &wire_addr,
1444                    config.wire_bind_explicit,
1445                    reason.clone(),
1446                );
1447                if config.wire_bind_explicit {
1448                    format!("explicit {reason}")
1449                } else {
1450                    reason
1451                }
1452            })?;
1453        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1454        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1455            .await
1456            .map_err(|e| e.to_string())
1457    })
1458}
1459
1460#[inline(never)]
1461fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1462    let rt_config = detect_runtime_config();
1463    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1464    let cli_telemetry = config.telemetry.clone();
1465    let db_options = config.to_db_options();
1466
1467    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1468        .enable_all()
1469        .worker_threads(workers)
1470        .thread_stack_size(rt_config.stack_size)
1471        .build()
1472        .map_err(|err| format!("tokio runtime: {err}"))?;
1473
1474    let (runtime, _auth_store, _telemetry_guard) =
1475        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1476    let signal_runtime = runtime.clone();
1477    tokio_runtime.block_on(async move {
1478        spawn_lifecycle_signal_handler(signal_runtime).await;
1479        let cfg = crate::wire::PgWireConfig {
1480            bind_addr: pg_addr,
1481            ..Default::default()
1482        };
1483        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1484            .await
1485            .map_err(|e| e.to_string())
1486    })
1487}
1488
1489#[inline(never)]
1490fn build_runtime_and_auth_store(
1491    db_options: RedDBOptions,
1492    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1493) -> Result<
1494    (
1495        RedDBRuntime,
1496        Arc<AuthStore>,
1497        Option<crate::telemetry::TelemetryGuard>,
1498    ),
1499    String,
1500> {
1501    // Return the TelemetryGuard so server runners can bind it for
1502    // their full lifetime. Dropping the guard tears down the
1503    // non-blocking log writer thread and, because that writer is
1504    // built with `.lossy(true)`, any subsequent log event routed to
1505    // the file sink is silently dropped — so callers MUST keep the
1506    // returned `Option<TelemetryGuard>` alive until shutdown.
1507    build_runtime_with_telemetry(db_options, cli_telemetry)
1508}
1509
1510/// Open the runtime, initialise structured logging from merged CLI +
1511/// `red_config` settings, and return a guard the caller must keep
1512/// alive for the server lifetime (drop flushes pending log writes).
1513///
1514/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1515/// in red_config, beats the built-in default. A CLI-flag value of
1516/// `None` / empty means "inherit from config or default" — never
1517/// "disable". The one exception is `--no-log-file` which forces
1518/// `log_dir = None` regardless of config.
1519pub(crate) fn build_runtime_with_telemetry(
1520    db_options: RedDBOptions,
1521    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1522) -> Result<
1523    (
1524        RedDBRuntime,
1525        Arc<AuthStore>,
1526        Option<crate::telemetry::TelemetryGuard>,
1527    ),
1528    String,
1529> {
1530    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1531        // Issue #205 — runtime construction failure is the canonical
1532        // StartupFailed phase. The audit sink isn't installed yet
1533        // (it would have been registered inside `with_options`), so
1534        // the emit falls through to tracing+eprintln only — operator
1535        // still sees it on stderr.
1536        let msg = err.to_string();
1537        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1538            phase: "runtime_construction".to_string(),
1539            error: msg.clone(),
1540        }
1541        .emit_global();
1542        msg
1543    })?;
1544
1545    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1546    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1547    // operator asked for a fence; running unfenced would silently
1548    // expose split-brain risk.
1549    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1550        let msg = err.to_string();
1551        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1552            phase: "lease_loop".to_string(),
1553            error: msg.clone(),
1554        }
1555        .emit_global();
1556        msg
1557    })?;
1558
1559    // #213 — edge-triggered disk-space watchdog. Watches the data
1560    // directory; falls back to polling when fanotify is unavailable
1561    // (non-Linux or unprivileged container).
1562    if let Some(data_path) = db_options.data_path.as_deref() {
1563        let watch_dir = data_path.parent().unwrap_or(data_path);
1564        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1565    }
1566
1567    // #214 — inotify config hot-reload watcher. Watches the config file
1568    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1569    // applies hot-reloadable keys without restart.
1570    {
1571        let config_path = crate::runtime::config_overlay::config_file_path();
1572        let store = runtime.db().store();
1573        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1574    }
1575
1576    // Phase 6 logging: merge red_config overrides onto the CLI-built
1577    // telemetry config, then install the global subscriber.
1578    let merged = merge_telemetry_with_config(
1579        cli_telemetry
1580            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1581        &runtime,
1582    );
1583    let telemetry_guard = crate::telemetry::init(merged);
1584
1585    let auth_store =
1586        if db_options.auth.vault_enabled {
1587            let pager =
1588                runtime.db().store().pager().cloned().ok_or_else(|| {
1589                    "vault requires a paged database (persistent mode)".to_string()
1590                })?;
1591            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1592                .map_err(|err| err.to_string())?;
1593            Arc::new(store)
1594        } else {
1595            Arc::new(AuthStore::new(db_options.auth.clone()))
1596        };
1597    auth_store.bootstrap_from_env();
1598
1599    // Background session purge (every 5 minutes)
1600    {
1601        let store = Arc::clone(&auth_store);
1602        std::thread::Builder::new()
1603            .name("reddb-session-purge".into())
1604            .spawn(move || loop {
1605                std::thread::sleep(std::time::Duration::from_secs(300));
1606                store.purge_expired_sessions();
1607            })
1608            .ok();
1609    }
1610
1611    Ok((runtime, auth_store, telemetry_guard))
1612}
1613
1614/// Read `red.logging.*` keys from the persistent config store and
1615/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
1616/// explicit CLI flag > red_config > built-in default.
1617///
1618/// The "was a flag passed" signal comes from the `*_explicit` bools
1619/// on `TelemetryConfig`, populated by the CLI parser. This replaces
1620/// an earlier equality-to-default heuristic that silently dropped
1621/// config whenever the CLI-derived default diverged from
1622/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
1623/// non-TTY `format`) and that silently overrode `--no-log-file`.
1624fn merge_telemetry_with_config(
1625    mut cli: crate::telemetry::TelemetryConfig,
1626    runtime: &RedDBRuntime,
1627) -> crate::telemetry::TelemetryConfig {
1628    use crate::storage::schema::Value;
1629
1630    let store = runtime.db().store();
1631
1632    if !cli.level_explicit {
1633        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1634            cli.level_filter = v.to_string();
1635        }
1636    }
1637    if !cli.format_explicit {
1638        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1639            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1640                cli.format = parsed;
1641            }
1642        }
1643    }
1644    if !cli.rotation_keep_days_explicit {
1645        match store.get_config("red.logging.keep_days") {
1646            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1647                cli.rotation_keep_days = n as u16
1648            }
1649            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1650                cli.rotation_keep_days = n as u16
1651            }
1652            Some(Value::Text(v)) => {
1653                if let Ok(n) = v.parse::<u16>() {
1654                    cli.rotation_keep_days = n;
1655                }
1656            }
1657            _ => {}
1658        }
1659    }
1660    if !cli.file_prefix_explicit {
1661        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1662            if !v.is_empty() {
1663                cli.file_prefix = v.to_string();
1664            }
1665        }
1666    }
1667    // --no-log-file is a kill-switch: config cannot resurrect the
1668    // file sink. Explicit --log-dir also wins.
1669    if !cli.log_dir_explicit && !cli.log_file_disabled {
1670        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1671            if !v.is_empty() {
1672                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1673            }
1674        }
1675    }
1676
1677    cli
1678}
1679
1680#[cfg(test)]
1681mod telemetry_merge_tests {
1682    use super::*;
1683    use crate::telemetry::{LogFormat, TelemetryConfig};
1684
1685    fn fresh_runtime() -> RedDBRuntime {
1686        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1687    }
1688
1689    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1690        runtime
1691            .db()
1692            .store()
1693            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1694    }
1695
1696    fn cli_base() -> TelemetryConfig {
1697        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
1698        // log_dir = Some(...), format = Json. Nothing marked explicit.
1699        TelemetryConfig {
1700            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1701            format: LogFormat::Json,
1702            ..Default::default()
1703        }
1704    }
1705
1706    #[test]
1707    fn config_log_dir_promoted_when_flag_absent() {
1708        let runtime = fresh_runtime();
1709        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1710        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1711        assert_eq!(
1712            merged.log_dir.as_deref(),
1713            Some(std::path::Path::new("/var/log/reddb"))
1714        );
1715    }
1716
1717    #[test]
1718    fn explicit_log_dir_wins_over_config() {
1719        let runtime = fresh_runtime();
1720        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1721        let mut cli = cli_base();
1722        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1723        cli.log_dir_explicit = true;
1724        let merged = merge_telemetry_with_config(cli, &runtime);
1725        assert_eq!(
1726            merged.log_dir.as_deref(),
1727            Some(std::path::Path::new("/custom/dir"))
1728        );
1729    }
1730
1731    #[test]
1732    fn no_log_file_beats_config_log_dir() {
1733        let runtime = fresh_runtime();
1734        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1735        let mut cli = cli_base();
1736        cli.log_dir = None;
1737        cli.log_file_disabled = true;
1738        let merged = merge_telemetry_with_config(cli, &runtime);
1739        assert!(
1740            merged.log_dir.is_none(),
1741            "--no-log-file must veto config dir"
1742        );
1743    }
1744
1745    #[test]
1746    fn config_format_promoted_on_non_tty_default() {
1747        // On non-TTY, default_telemetry_for_path yields format=Json even
1748        // though TelemetryConfig::default() is Pretty. The old equality
1749        // check silently dropped config here.
1750        let runtime = fresh_runtime();
1751        set_str(&runtime, "red.logging.format", "pretty");
1752        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1753        assert_eq!(merged.format, LogFormat::Pretty);
1754    }
1755
1756    #[test]
1757    fn explicit_format_wins_over_config() {
1758        let runtime = fresh_runtime();
1759        set_str(&runtime, "red.logging.format", "pretty");
1760        let mut cli = cli_base();
1761        cli.format = LogFormat::Json;
1762        cli.format_explicit = true;
1763        let merged = merge_telemetry_with_config(cli, &runtime);
1764        assert_eq!(merged.format, LogFormat::Json);
1765    }
1766}
1767
1768#[inline(never)]
1769fn build_http_server(
1770    runtime: RedDBRuntime,
1771    auth_store: Arc<AuthStore>,
1772    bind_addr: String,
1773) -> RedDBServer {
1774    build_http_server_with_transport_readiness(
1775        runtime,
1776        auth_store,
1777        bind_addr,
1778        TransportReadiness::default(),
1779    )
1780}
1781
1782#[inline(never)]
1783fn build_http_server_with_transport_readiness(
1784    runtime: RedDBRuntime,
1785    auth_store: Arc<AuthStore>,
1786    bind_addr: String,
1787    transport_readiness: TransportReadiness,
1788) -> RedDBServer {
1789    RedDBServer::with_options(
1790        runtime,
1791        ServerOptions {
1792            bind_addr,
1793            transport_readiness,
1794            ..ServerOptions::default()
1795        },
1796    )
1797    .with_auth(auth_store)
1798}
1799
1800/// PLAN.md Phase 6.2 — build a listener that only serves
1801/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
1802/// when the env var has no host (loopback-only by default per spec).
1803#[inline(never)]
1804fn build_admin_only_server(
1805    runtime: RedDBRuntime,
1806    auth_store: Arc<AuthStore>,
1807    bind_addr: String,
1808) -> RedDBServer {
1809    RedDBServer::with_options(
1810        runtime,
1811        ServerOptions {
1812            bind_addr,
1813            surface: crate::server::ServerSurface::AdminOnly,
1814            ..ServerOptions::default()
1815        },
1816    )
1817    .with_auth(auth_store)
1818}
1819
1820/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
1821/// + `/health/*`. Suitable for Prometheus scrape ports that may be
1822///   exposed wider than the admin port.
1823#[inline(never)]
1824fn build_metrics_only_server(
1825    runtime: RedDBRuntime,
1826    auth_store: Arc<AuthStore>,
1827    bind_addr: String,
1828) -> RedDBServer {
1829    RedDBServer::with_options(
1830        runtime,
1831        ServerOptions {
1832            bind_addr,
1833            surface: crate::server::ServerSurface::MetricsOnly,
1834            ..ServerOptions::default()
1835        },
1836    )
1837    .with_auth(auth_store)
1838}
1839
1840/// Spawn dedicated admin / metrics listeners when the operator set
1841/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
1842/// unset the existing listener keeps serving everything (back-compat).
1843fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
1844    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
1845        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1846        let _ = server.serve_in_background();
1847        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
1848    }
1849    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
1850        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1851        let _ = server.serve_in_background();
1852        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
1853    }
1854}
1855
1856#[inline(never)]
1857fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1858    let cli_telemetry = config.telemetry.clone();
1859    let mut transport_readiness = TransportReadiness::default();
1860    let Some(listener) = bind_listener_for_startup(
1861        &mut transport_readiness,
1862        "http",
1863        &bind_addr,
1864        config.http_bind_explicit,
1865    )?
1866    else {
1867        return Err(format!(
1868            "no HTTP listener started; implicit bind {} failed",
1869            bind_addr
1870        ));
1871    };
1872    let (runtime, auth_store, _telemetry_guard) =
1873        build_runtime_and_auth_store(config.to_db_options(), cli_telemetry)?;
1874    spawn_admin_metrics_listeners(&runtime, &auth_store);
1875    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1876    let server = build_http_server_with_transport_readiness(
1877        runtime,
1878        auth_store,
1879        bind_addr.clone(),
1880        transport_readiness,
1881    );
1882    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
1883    server.serve_on(listener).map_err(|err| err.to_string())
1884}
1885
1886/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
1887/// rustls-terminated listener alongside the plain HTTP server. Cert
1888/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
1889///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
1890///   is auto-generated next to the data directory (refused otherwise).
1891fn spawn_http_tls_listener(
1892    config: &ServerCommandConfig,
1893    runtime: &RedDBRuntime,
1894    auth_store: &Arc<AuthStore>,
1895) -> Result<(), String> {
1896    let Some(addr) = config.http_tls_bind_addr.clone() else {
1897        return Ok(());
1898    };
1899
1900    let tls_config = resolve_http_tls_config(config)?;
1901    let server_config = crate::server::tls::build_server_config(&tls_config)
1902        .map_err(|err| format!("HTTP TLS: {err}"))?;
1903
1904    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
1905    let _handle = server.serve_tls_in_background(server_config);
1906    tracing::info!(
1907        transport = "https",
1908        bind = %addr,
1909        mtls = %tls_config.client_ca_path.is_some(),
1910        "TLS listener online"
1911    );
1912    Ok(())
1913}
1914
1915/// Resolve the HTTP TLS config from CLI / env / dev defaults.
1916fn resolve_http_tls_config(
1917    config: &ServerCommandConfig,
1918) -> Result<crate::server::tls::HttpTlsConfig, String> {
1919    match (&config.http_tls_cert, &config.http_tls_key) {
1920        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
1921            cert_path: cert.clone(),
1922            key_path: key.clone(),
1923            client_ca_path: config.http_tls_client_ca.clone(),
1924        }),
1925        (None, None) => {
1926            // Dev-mode auto-generate next to the data directory.
1927            let dir = config
1928                .path
1929                .as_ref()
1930                .and_then(|p| p.parent().map(std::path::PathBuf::from))
1931                .unwrap_or_else(|| std::path::PathBuf::from("."));
1932            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
1933                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
1934            Ok(crate::server::tls::HttpTlsConfig {
1935                cert_path: auto.cert_path,
1936                key_path: auto.key_path,
1937                client_ca_path: config.http_tls_client_ca.clone(),
1938            })
1939        }
1940        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
1941    }
1942}
1943
1944#[inline(never)]
1945fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1946    let workers = config.workers;
1947    let cli_telemetry = config.telemetry.clone();
1948    let db_options = config.to_db_options();
1949    let rt_config = detect_runtime_config();
1950    let mut transport_readiness = TransportReadiness::default();
1951    let Some(grpc_listener) = bind_listener_for_startup(
1952        &mut transport_readiness,
1953        "grpc",
1954        &bind_addr,
1955        config.grpc_bind_explicit,
1956    )?
1957    else {
1958        return Err(format!(
1959            "no gRPC listener started; implicit bind {} failed",
1960            bind_addr
1961        ));
1962    };
1963
1964    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1965
1966    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1967        .enable_all()
1968        .worker_threads(worker_threads)
1969        .thread_stack_size(rt_config.stack_size)
1970        .build()
1971        .map_err(|err| format!("tokio runtime: {err}"))?;
1972
1973    // Guard lives on the outer stack so it outlives the tokio runtime.
1974    let (runtime, auth_store, _telemetry_guard) =
1975        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1976    let signal_runtime = runtime.clone();
1977    tokio_runtime.block_on(async move {
1978        spawn_lifecycle_signal_handler(signal_runtime).await;
1979        // Start wire protocol listeners (plaintext + TLS)
1980        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
1981
1982        // Start PostgreSQL wire listener when --pg-bind is configured.
1983        spawn_pg_listener(&config, &runtime);
1984
1985        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
1986        // it spawns a separate listener so plaintext + TLS can run
1987        // side-by-side (50051 plain + 50052 TLS, etc.).
1988        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1989
1990        let server = RedDBGrpcServer::with_options(
1991            runtime,
1992            GrpcServerOptions {
1993                bind_addr: bind_addr.clone(),
1994                tls: None,
1995            },
1996            auth_store,
1997        );
1998
1999        tracing::info!(
2000            transport = "grpc",
2001            bind = %bind_addr,
2002            cpus = rt_config.available_cpus,
2003            workers = worker_threads,
2004            "listener online"
2005        );
2006        server
2007            .serve_on(grpc_listener)
2008            .await
2009            .map_err(|err| err.to_string())
2010    })
2011}
2012
2013#[inline(never)]
2014fn run_dual_server(
2015    config: ServerCommandConfig,
2016    grpc_bind_addr: String,
2017    http_bind_addr: String,
2018) -> Result<(), String> {
2019    let workers = config.workers;
2020    let cli_telemetry = config.telemetry.clone();
2021    let db_options = config.to_db_options();
2022    let rt_config = detect_runtime_config();
2023    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2024    let mut transport_readiness = TransportReadiness::default();
2025    let http_listener = bind_listener_for_startup(
2026        &mut transport_readiness,
2027        "http",
2028        &http_bind_addr,
2029        config.http_bind_explicit,
2030    )?;
2031    let grpc_listener = bind_listener_for_startup(
2032        &mut transport_readiness,
2033        "grpc",
2034        &grpc_bind_addr,
2035        config.grpc_bind_explicit,
2036    )?;
2037    if http_listener.is_none() && grpc_listener.is_none() {
2038        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2039    }
2040    let (runtime, auth_store, _telemetry_guard) =
2041        build_runtime_and_auth_store(db_options, cli_telemetry)?;
2042
2043    spawn_admin_metrics_listeners(&runtime, &auth_store);
2044    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2045
2046    let http_handle = if let Some(listener) = http_listener {
2047        let http_server = build_http_server_with_transport_readiness(
2048            runtime.clone(),
2049            auth_store.clone(),
2050            http_bind_addr.clone(),
2051            transport_readiness.clone(),
2052        );
2053        Some(http_server.serve_in_background_on(listener))
2054    } else {
2055        None
2056    };
2057
2058    thread::sleep(Duration::from_millis(150));
2059    if let Some(handle) = http_handle.as_ref() {
2060        if handle.is_finished() {
2061            let handle = http_handle.unwrap();
2062            return match handle.join() {
2063                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2064                Ok(Err(err)) => Err(err.to_string()),
2065                Err(_) => Err("HTTP server thread panicked".to_string()),
2066            };
2067        }
2068    }
2069    if grpc_listener.is_none() {
2070        let Some(handle) = http_handle else {
2071            return Err("no listener started".to_string());
2072        };
2073        return match handle.join() {
2074            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2075            Ok(Err(err)) => Err(err.to_string()),
2076            Err(_) => Err("HTTP server thread panicked".to_string()),
2077        };
2078    }
2079    let grpc_listener = grpc_listener.expect("checked above");
2080
2081    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2082        .enable_all()
2083        .worker_threads(worker_threads)
2084        .thread_stack_size(rt_config.stack_size)
2085        .build()
2086        .map_err(|err| format!("tokio runtime: {err}"))?;
2087
2088    let signal_runtime = runtime.clone();
2089    tokio_runtime.block_on(async move {
2090        spawn_lifecycle_signal_handler(signal_runtime).await;
2091        // Start wire protocol listeners (plaintext + TLS)
2092        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2093
2094        // Start PostgreSQL wire listener when --pg-bind is configured.
2095        spawn_pg_listener(&config, &runtime);
2096
2097        // Optional TLS gRPC listener — runs alongside the plaintext one.
2098        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2099
2100        let server = RedDBGrpcServer::with_options(
2101            runtime,
2102            GrpcServerOptions {
2103                bind_addr: grpc_bind_addr.clone(),
2104                tls: None,
2105            },
2106            auth_store,
2107        );
2108
2109        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2110        tracing::info!(
2111            transport = "grpc",
2112            bind = %grpc_bind_addr,
2113            cpus = rt_config.available_cpus,
2114            workers = worker_threads,
2115            "listener online"
2116        );
2117        server
2118            .serve_on(grpc_listener)
2119            .await
2120            .map_err(|err| err.to_string())
2121    })
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126    use super::*;
2127
2128    #[test]
2129    fn render_systemd_unit_contains_expected_execstart() {
2130        let config = SystemdServiceConfig {
2131            service_name: "reddb".to_string(),
2132            binary_path: PathBuf::from("/usr/local/bin/red"),
2133            run_user: "reddb".to_string(),
2134            run_group: "reddb".to_string(),
2135            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2136            router_bind_addr: None,
2137            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2138            http_bind_addr: None,
2139        };
2140
2141        let unit = render_systemd_unit(&config);
2142        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2143        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2144    }
2145
2146    #[test]
2147    fn systemd_service_config_derives_paths() {
2148        let config = SystemdServiceConfig {
2149            service_name: "reddb-api".to_string(),
2150            binary_path: PathBuf::from("/usr/local/bin/red"),
2151            run_user: "reddb".to_string(),
2152            run_group: "reddb".to_string(),
2153            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2154            router_bind_addr: None,
2155            grpc_bind_addr: None,
2156            http_bind_addr: Some("127.0.0.1:5055".to_string()),
2157        };
2158
2159        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2160        assert_eq!(
2161            config.unit_path(),
2162            PathBuf::from("/etc/systemd/system/reddb-api.service")
2163        );
2164    }
2165
2166    #[test]
2167    fn render_systemd_unit_supports_dual_transport() {
2168        let config = SystemdServiceConfig {
2169            service_name: "reddb".to_string(),
2170            binary_path: PathBuf::from("/usr/local/bin/red"),
2171            run_user: "reddb".to_string(),
2172            run_group: "reddb".to_string(),
2173            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2174            router_bind_addr: None,
2175            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2176            http_bind_addr: Some("0.0.0.0:5055".to_string()),
2177        };
2178
2179        let unit = render_systemd_unit(&config);
2180        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2181        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2182    }
2183
2184    #[test]
2185    fn render_systemd_unit_supports_router_mode() {
2186        let config = SystemdServiceConfig {
2187            service_name: "reddb".to_string(),
2188            binary_path: PathBuf::from("/usr/local/bin/red"),
2189            run_user: "reddb".to_string(),
2190            run_group: "reddb".to_string(),
2191            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2192            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2193            grpc_bind_addr: None,
2194            http_bind_addr: None,
2195        };
2196
2197        let unit = render_systemd_unit(&config);
2198        assert!(unit.contains("--bind 127.0.0.1:5050"));
2199        assert!(!unit.contains("--grpc-bind"));
2200        assert!(!unit.contains("--http-bind"));
2201    }
2202
2203    #[test]
2204    fn explicit_bind_collision_is_fatal() {
2205        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2206        let addr = held.local_addr().expect("held addr").to_string();
2207        let mut readiness = TransportReadiness::default();
2208
2209        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2210
2211        assert!(error.contains("explicit http listener bind"));
2212        assert_eq!(readiness.active.len(), 0);
2213        assert_eq!(readiness.failed.len(), 1);
2214        assert!(readiness.failed[0].explicit);
2215        assert_eq!(readiness.failed[0].bind_addr, addr);
2216    }
2217
2218    #[test]
2219    fn implicit_bind_collision_degrades() {
2220        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2221        let addr = held.local_addr().expect("held addr").to_string();
2222        let mut readiness = TransportReadiness::default();
2223
2224        let listener =
2225            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2226
2227        assert!(listener.is_none());
2228        assert_eq!(readiness.active.len(), 0);
2229        assert_eq!(readiness.failed.len(), 1);
2230        assert!(!readiness.failed[0].explicit);
2231        assert_eq!(readiness.failed[0].bind_addr, addr);
2232    }
2233}