Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18/// Detect available CPU cores and suggest worker thread count.
19pub fn detect_runtime_config() -> RuntimeConfig {
20    let cpus = thread::available_parallelism()
21        .map(|n| n.get())
22        .unwrap_or(1);
23
24    // Reserve 1 core for OS, use the rest for workers (minimum 1)
25    let suggested_workers = cpus.saturating_sub(1).max(1);
26
27    RuntimeConfig {
28        available_cpus: cpus,
29        suggested_workers,
30        stack_size: 8 * 1024 * 1024, // 16MB default
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36    pub available_cpus: usize,
37    pub suggested_workers: usize,
38    pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43    Grpc,
44    Http,
45    Wire,
46}
47
48impl ServerTransport {
49    pub const fn as_str(self) -> &'static str {
50        match self {
51            Self::Grpc => "gRPC",
52            Self::Http => "HTTP",
53            Self::Wire => "wire",
54        }
55    }
56
57    pub const fn default_bind_addr(self) -> &'static str {
58        match self {
59            Self::Grpc => "127.0.0.1:5555",
60            Self::Http => "127.0.0.1:5055",
61            Self::Wire => "127.0.0.1:5050",
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68    pub path: Option<PathBuf>,
69    pub router_bind_addr: Option<String>,
70    pub grpc_bind_addr: Option<String>,
71    /// TLS-encrypted gRPC bind address. Can run side-by-side with
72    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
73    /// stand alone for TLS-only deploys. Defaults to `None`.
74    pub grpc_tls_bind_addr: Option<String>,
75    /// Path to PEM-encoded gRPC server certificate. Resolved through
76    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
77    /// mounts). When `None` and dev-mode is enabled
78    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
79    /// cert and prints its SHA-256 fingerprint to stderr.
80    pub grpc_tls_cert: Option<PathBuf>,
81    /// Path to PEM-encoded gRPC server private key. Same env-var
82    /// pattern as `grpc_tls_cert`.
83    pub grpc_tls_key: Option<PathBuf>,
84    /// Optional path to a PEM bundle of trust anchors used to verify
85    /// client certificates. When set, the gRPC listener requires
86    /// every client to present a cert that chains to this CA — i.e.
87    /// mutual TLS. When unset, one-way TLS only.
88    pub grpc_tls_client_ca: Option<PathBuf>,
89    pub http_bind_addr: Option<String>,
90    /// HTTPS bind address. When set, the HTTP server also serves a
91    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
92    /// run side by side (e.g. 8080 plain + 8443 TLS).
93    pub http_tls_bind_addr: Option<String>,
94    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
95    /// companion when not set explicitly.
96    pub http_tls_cert: Option<PathBuf>,
97    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
98    /// companion when not set explicitly.
99    pub http_tls_key: Option<PathBuf>,
100    /// Optional PEM CA bundle. When set, the HTTPS listener requires
101    /// every client to present a cert that chains to a CA in this
102    /// bundle (mTLS). When unset, plain server-side TLS only.
103    pub http_tls_client_ca: Option<PathBuf>,
104    pub wire_bind_addr: Option<String>,
105    /// TLS-encrypted wire protocol bind address
106    pub wire_tls_bind_addr: Option<String>,
107    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
108    pub wire_tls_cert: Option<PathBuf>,
109    /// Path to TLS key PEM
110    pub wire_tls_key: Option<PathBuf>,
111    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
112    /// When set the server accepts psql / JDBC / DBeaver clients on
113    /// this port via the v3 protocol. Defaults to None (disabled).
114    pub pg_bind_addr: Option<String>,
115    pub create_if_missing: bool,
116    pub read_only: bool,
117    pub role: String,
118    pub primary_addr: Option<String>,
119    pub vault: bool,
120    /// Override worker thread count (None = auto-detect from CPUs)
121    pub workers: Option<usize>,
122    /// Telemetry config (Phase 6 logging). `None` falls back to the
123    /// built-in default derived from `path` + stderr-only.
124    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
125}
126
127#[derive(Debug, Clone)]
128pub struct SystemdServiceConfig {
129    pub service_name: String,
130    pub binary_path: PathBuf,
131    pub run_user: String,
132    pub run_group: String,
133    pub data_path: PathBuf,
134    pub router_bind_addr: Option<String>,
135    pub grpc_bind_addr: Option<String>,
136    pub http_bind_addr: Option<String>,
137}
138
139impl SystemdServiceConfig {
140    pub fn data_dir(&self) -> PathBuf {
141        self.data_path
142            .parent()
143            .map(PathBuf::from)
144            .unwrap_or_else(|| PathBuf::from("."))
145    }
146
147    pub fn unit_path(&self) -> PathBuf {
148        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
149    }
150}
151
152/// Build a sane default `TelemetryConfig` from a server path when the
153/// caller didn't set one explicitly. Writes rotating logs into the
154/// parent directory of the DB file (or `./logs` for in-memory /
155/// pathless runs). Level defaults to `info`, pretty stderr format.
156pub fn default_telemetry_for_path(
157    path: Option<&std::path::Path>,
158) -> crate::telemetry::TelemetryConfig {
159    let log_dir = match path {
160        Some(p) => p
161            .parent()
162            .map(|parent| parent.join("logs"))
163            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
164        None => None, // in-memory — no file, stderr-only
165    };
166    crate::telemetry::TelemetryConfig {
167        log_dir,
168        file_prefix: "reddb.log".to_string(),
169        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
170        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
171            crate::telemetry::LogFormat::Pretty
172        } else {
173            crate::telemetry::LogFormat::Json
174        },
175        rotation_keep_days: 14,
176        service_name: "reddb",
177        // Implicit defaults — no CLI flag has claimed these values yet.
178        level_explicit: false,
179        format_explicit: false,
180        rotation_keep_days_explicit: false,
181        file_prefix_explicit: false,
182        log_dir_explicit: false,
183        log_file_disabled: false,
184    }
185}
186
187impl ServerCommandConfig {
188    fn to_db_options(&self) -> RedDBOptions {
189        let mut options = match &self.path {
190            Some(path) => RedDBOptions::persistent(path),
191            None => RedDBOptions::in_memory(),
192        };
193
194        options.mode = StorageMode::Persistent;
195        options.create_if_missing = self.create_if_missing;
196        // PLAN.md Phase 4.3 — read_only resolution priority:
197        //   1. CLI flag (`--readonly`) — explicit operator intent.
198        //   2. `RED_READONLY=true` env — orchestrator override.
199        //   3. Persisted `<data>/.runtime-state.json` from a prior
200        //      `POST /admin/readonly` — survives restart.
201        //   4. Default `false`.
202        options.read_only = self.read_only
203            || env_nonempty("RED_READONLY")
204                .or_else(|| env_nonempty("REDDB_READONLY"))
205                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
206                .unwrap_or(false)
207            || self.path.as_ref().is_some_and(|data_path| {
208                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
209                    data_path,
210                ))
211                .unwrap_or(false)
212            });
213
214        options.replication = match self.role.as_str() {
215            "primary" => ReplicationConfig::primary(),
216            "replica" => {
217                let primary_addr = self
218                    .primary_addr
219                    .clone()
220                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
221                // Public-mutation rejection on replicas is enforced by
222                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
223                // Leaving `options.read_only = false` keeps the pager
224                // writable so the internal logical-WAL apply path can
225                // ingest records from the primary; WriteGate ensures no
226                // client request reaches storage.
227                ReplicationConfig::replica(primary_addr)
228            }
229            _ => ReplicationConfig::standalone(),
230        };
231
232        if self.vault {
233            options.auth.vault_enabled = true;
234        }
235
236        configure_remote_backend_from_env(&mut options);
237
238        options
239    }
240
241    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
242        let mut transports = Vec::with_capacity(3);
243        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
244            transports.push(ServerTransport::Grpc);
245        }
246        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
247            transports.push(ServerTransport::Http);
248        }
249        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
250            transports.push(ServerTransport::Wire);
251        }
252        transports
253    }
254}
255
256/// Read an env var, treating empty / whitespace-only as `None`.
257/// Honors the `<NAME>_FILE` convention. Re-exports the shared
258/// `crate::utils::env_with_file_fallback` helper so call sites in
259/// this module can keep their short local name.
260fn env_nonempty(name: &str) -> Option<String> {
261    crate::utils::env_with_file_fallback(name)
262}
263
264fn env_truthy(name: &str) -> bool {
265    env_nonempty(name)
266        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
267        .unwrap_or(false)
268}
269
270fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
271    // PLAN.md (cloud-agnostic) — prefer the new spelling
272    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
273    // existing dev installs. `none` (default) means standalone — no
274    // remote backend, valid for development and on-prem without
275    // remote.
276    let backend = env_nonempty("RED_BACKEND")
277        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
278        .unwrap_or_else(|| "none".to_string())
279        .to_ascii_lowercase();
280
281    match backend.as_str() {
282        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
283        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
284        // IDrive, Storj. The `path_style` toggle in S3Config picks
285        // the right addressing for self-hosted vs hosted.
286        "s3" | "minio" | "r2" => {
287            #[cfg(feature = "backend-s3")]
288            {
289                if let Some(config) = s3_config_from_env() {
290                    let remote_key = env_nonempty("RED_REMOTE_KEY")
291                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
292                        .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
293                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
294                    options.remote_backend = Some(backend.clone());
295                    options.remote_backend_atomic = Some(backend);
296                    options.remote_key = Some(remote_key);
297                }
298            }
299            #[cfg(not(feature = "backend-s3"))]
300            {
301                tracing::warn!(
302                    backend = %backend,
303                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
304                );
305            }
306        }
307        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
308        // calls it `fs`; legacy code shipped it as `local`. Both
309        // names map to LocalBackend, with the remote_key derived
310        // from `RED_FS_PATH` + a per-database suffix when provided.
311        "fs" | "local" => {
312            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
313            let remote_key = match (
314                base_path,
315                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
316            ) {
317                (Some(base), Some(rel)) => Some(format!(
318                    "{}/{}",
319                    base.trim_end_matches('/'),
320                    rel.trim_start_matches('/')
321                )),
322                (Some(base), None) => Some(format!(
323                    "{}/clusters/dev/data.rdb",
324                    base.trim_end_matches('/')
325                )),
326                (None, Some(rel)) => Some(rel),
327                (None, None) => None,
328            };
329            if let Some(key) = remote_key {
330                let backend = Arc::new(crate::storage::backend::LocalBackend);
331                options.remote_backend = Some(backend.clone());
332                options.remote_backend_atomic = Some(backend);
333                options.remote_key = Some(key);
334            }
335        }
336        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
337        // portability: any service exposing PUT/GET/DELETE serves
338        // as a backup target. Optional auth via *_FILE secret
339        // path keeps the token out of the env.
340        "http" => {
341            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
342                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
343            {
344                Some(u) => u,
345                None => {
346                    tracing::warn!(
347                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
348                    );
349                    return;
350                }
351            };
352            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
353                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
354                .unwrap_or_default();
355            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
356                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
357            {
358                std::fs::read_to_string(&path)
359                    .ok()
360                    .map(|s| s.trim().to_string())
361                    .filter(|s| !s.is_empty())
362            } else {
363                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
364                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
365            };
366
367            let mut config =
368                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
369            if let Some(auth) = auth_header {
370                config = config.with_auth_header(auth);
371            }
372            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
373                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
374                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
375            config = config.with_conditional_writes(conditional_writes);
376            // Always populate the snapshot-transport handle. When the
377            // operator confirmed CAS support, also populate the atomic
378            // handle via AtomicHttpBackend — without that flag,
379            // LeaseStore must remain unreachable on this backend.
380            if conditional_writes {
381                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
382                    Ok(atomic) => {
383                        let atomic_arc = Arc::new(atomic);
384                        options.remote_backend = Some(atomic_arc.clone());
385                        options.remote_backend_atomic = Some(atomic_arc);
386                    }
387                    Err(err) => {
388                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
389                        options.remote_backend =
390                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
391                    }
392                }
393            } else {
394                options.remote_backend =
395                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
396            }
397            options.remote_key = env_nonempty("RED_REMOTE_KEY")
398                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
399                .or_else(|| Some("clusters/dev/data.rdb".to_string()));
400        }
401        // `none` is the explicit standalone — no remote, no backup
402        // pipeline. Boot never blocks on network reachability.
403        "none" | "" => {}
404        other => {
405            tracing::warn!(
406                backend = %other,
407                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
408            );
409        }
410    }
411}
412
413/// Resolve a value from env, accepting both the cloud-agnostic
414/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
415/// kept for existing dev installs. The new spelling wins; the
416/// legacy spelling is read with a warning hint in callers' logs.
417#[cfg(feature = "backend-s3")]
418fn env_s3(suffix: &str) -> Option<String> {
419    env_nonempty(&format!("RED_S3_{suffix}"))
420        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
421}
422
423/// Read a secret value from either the literal env var or a file
424/// path supplied via `*_FILE` (PLAN.md spec — compatible with
425/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
426/// secrets). The `_FILE` variant wins so an operator can set it to
427/// override the inline value without touching the inline env.
428#[cfg(feature = "backend-s3")]
429fn env_s3_secret(suffix: &str) -> Option<String> {
430    let file_key_red = format!("RED_S3_{suffix}_FILE");
431    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
432    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
433        return std::fs::read_to_string(&path)
434            .ok()
435            .map(|s| s.trim().to_string())
436            .filter(|s| !s.is_empty());
437    }
438    env_s3(suffix)
439}
440
441#[cfg(feature = "backend-s3")]
442fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
443    let endpoint = env_s3("ENDPOINT")?;
444    let bucket = env_s3("BUCKET")?;
445    let access_key = env_s3_secret("ACCESS_KEY")?;
446    let secret_key = env_s3_secret("SECRET_KEY")?;
447    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
448    let key_prefix = env_s3("KEY_PREFIX")
449        .or_else(|| env_s3("PREFIX"))
450        .unwrap_or_default();
451    let path_style = env_s3("PATH_STYLE")
452        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
453        .unwrap_or(true);
454    Some(crate::storage::backend::S3Config {
455        endpoint,
456        bucket,
457        key_prefix,
458        access_key,
459        secret_key,
460        region,
461        path_style,
462    })
463}
464
465pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
466    let data_dir = config.data_dir();
467    let exec_start = render_systemd_exec_start(config);
468    format!(
469        "[Unit]\n\
470Description=RedDB unified database service\n\
471After=network-online.target\n\
472Wants=network-online.target\n\
473\n\
474[Service]\n\
475Type=simple\n\
476User={user}\n\
477Group={group}\n\
478WorkingDirectory={workdir}\n\
479ExecStart={exec_start}\n\
480Restart=always\n\
481RestartSec=2\n\
482LimitSTACK=16M\n\
483NoNewPrivileges=true\n\
484PrivateTmp=true\n\
485ProtectSystem=strict\n\
486ProtectHome=true\n\
487ProtectControlGroups=true\n\
488ProtectKernelTunables=true\n\
489ProtectKernelModules=true\n\
490RestrictNamespaces=true\n\
491LockPersonality=true\n\
492MemoryDenyWriteExecute=true\n\
493ReadWritePaths={workdir}\n\
494\n\
495[Install]\n\
496WantedBy=multi-user.target\n",
497        user = config.run_user,
498        group = config.run_group,
499        workdir = data_dir.display(),
500        exec_start = exec_start,
501    )
502}
503
504/// Install a systemd unit + start the service.
505///
506/// Linux-only. The helper shells out to `systemctl`, `useradd`,
507/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
508/// Windows or macOS. The Windows/macOS branch returns a hard error so
509/// callers (the CLI) surface a clear message instead of a confusing
510/// syscall failure. A proper Windows-service equivalent (sc.exe /
511/// NSSM) is a Phase 3.6 follow-up.
512#[cfg(target_os = "linux")]
513pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
514    ensure_root()?;
515    ensure_command_available("systemctl")?;
516    ensure_command_available("getent")?;
517    ensure_command_available("groupadd")?;
518    ensure_command_available("useradd")?;
519    ensure_command_available("install")?;
520    ensure_executable(&config.binary_path)?;
521
522    if !command_success("getent", ["group", config.run_group.as_str()])? {
523        run_command("groupadd", ["--system", config.run_group.as_str()])?;
524    }
525
526    if !command_success("id", ["-u", config.run_user.as_str()])? {
527        let data_dir = config.data_dir();
528        run_command(
529            "useradd",
530            [
531                "--system",
532                "--gid",
533                config.run_group.as_str(),
534                "--home-dir",
535                data_dir.to_string_lossy().as_ref(),
536                "--shell",
537                "/usr/sbin/nologin",
538                config.run_user.as_str(),
539            ],
540        )?;
541    }
542
543    let data_dir = config.data_dir();
544    run_command(
545        "install",
546        [
547            "-d",
548            "-o",
549            config.run_user.as_str(),
550            "-g",
551            config.run_group.as_str(),
552            "-m",
553            "0750",
554            data_dir.to_string_lossy().as_ref(),
555        ],
556    )?;
557
558    std::fs::write(config.unit_path(), render_systemd_unit(config))
559        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
560
561    run_command("systemctl", ["daemon-reload"])?;
562    run_command(
563        "systemctl",
564        [
565            "enable",
566            "--now",
567            format!("{}.service", config.service_name).as_str(),
568        ],
569    )?;
570
571    Ok(())
572}
573
574/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
575/// identical so callers compile on every platform; surface a clear
576/// error at runtime. Windows/macOS service-manager integration is a
577/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
578#[cfg(not(target_os = "linux"))]
579pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
580    Err("systemd install is Linux-only — use sc.exe (Windows) or \
581         launchd (macOS) to install the service manually using the \
582         unit printed by `red service print-unit`"
583        .to_string())
584}
585
586#[cfg(target_os = "linux")]
587fn ensure_root() -> Result<(), String> {
588    let output = Command::new("id")
589        .arg("-u")
590        .output()
591        .map_err(|err| format!("failed to determine current uid: {err}"))?;
592    if !output.status.success() {
593        return Err("failed to determine current uid".to_string());
594    }
595    let uid = String::from_utf8_lossy(&output.stdout);
596    if uid.trim() != "0" {
597        return Err("run this command as root (sudo)".to_string());
598    }
599    Ok(())
600}
601
602#[cfg(target_os = "linux")]
603fn ensure_command_available(command: &str) -> Result<(), String> {
604    let status = Command::new("sh")
605        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
606        .status()
607        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
608    if status.success() {
609        Ok(())
610    } else {
611        Err(format!("required command not found: {command}"))
612    }
613}
614
615#[cfg(target_os = "linux")]
616fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
617    let metadata = std::fs::metadata(path)
618        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
619    #[cfg(unix)]
620    {
621        use std::os::unix::fs::PermissionsExt;
622        if metadata.permissions().mode() & 0o111 == 0 {
623            return Err(format!("binary is not executable: {}", path.display()));
624        }
625    }
626    #[cfg(not(unix))]
627    {
628        if !metadata.is_file() {
629            return Err(format!("binary is not a file: {}", path.display()));
630        }
631    }
632    Ok(())
633}
634
635#[cfg(target_os = "linux")]
636fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
637    Command::new(program)
638        .args(args)
639        .status()
640        .map(|status| status.success())
641        .map_err(|err| format!("failed to run {program}: {err}"))
642}
643
644#[cfg(target_os = "linux")]
645fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
646    let output = Command::new(program)
647        .args(args)
648        .output()
649        .map_err(|err| format!("failed to run {program}: {err}"))?;
650    if output.status.success() {
651        return Ok(());
652    }
653
654    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
655    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
656    let detail = if !stderr.is_empty() {
657        stderr
658    } else if !stdout.is_empty() {
659        stdout
660    } else {
661        format!("exit status {}", output.status)
662    };
663    Err(format!("{program} failed: {detail}"))
664}
665
666pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
667    let has_any = config.router_bind_addr.is_some()
668        || config.grpc_bind_addr.is_some()
669        || config.http_bind_addr.is_some()
670        || config.wire_bind_addr.is_some()
671        || config.pg_bind_addr.is_some();
672    if !has_any {
673        return Err("at least one server bind address must be configured".into());
674    }
675    let thread_name = if config.router_bind_addr.is_some() {
676        "red-server-router"
677    } else {
678        match (
679            config.grpc_bind_addr.is_some(),
680            config.http_bind_addr.is_some(),
681        ) {
682            (true, true) => "red-server-dual",
683            (true, false) => "red-server-grpc",
684            (false, true) => "red-server-http",
685            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
686            (false, false) => "red-server-pg-wire",
687        }
688    };
689
690    let handle = thread::Builder::new()
691        .name(thread_name.into())
692        .stack_size(8 * 1024 * 1024)
693        .spawn(move || run_configured_servers(config))
694        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
695
696    match handle.join() {
697        Ok(result) => result,
698        Err(_) => Err("server thread panicked".to_string()),
699    }
700}
701
702fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
703    let mut parts = vec![
704        config.binary_path.display().to_string(),
705        "server".to_string(),
706        "--path".to_string(),
707        config.data_path.display().to_string(),
708    ];
709
710    if let Some(bind_addr) = &config.router_bind_addr {
711        parts.push("--bind".to_string());
712        parts.push(bind_addr.clone());
713    } else if let Some(bind_addr) = &config.grpc_bind_addr {
714        parts.push("--grpc-bind".to_string());
715        parts.push(bind_addr.clone());
716    }
717    if let Some(bind_addr) = &config.http_bind_addr {
718        parts.push("--http-bind".to_string());
719        parts.push(bind_addr.clone());
720    }
721
722    parts.join(" ")
723}
724
725pub fn probe_listener(target: &str, timeout: Duration) -> bool {
726    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
727        Ok(addresses) => addresses.collect(),
728        Err(_) => return false,
729    };
730
731    addresses
732        .into_iter()
733        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
734}
735
736#[inline(never)]
737fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
738    // Phase 6 logging is initialised inside each runner once the
739    // runtime is open — see `build_runtime_and_auth_store`. Going
740    // after DB open lets us read `red.logging.*` config keys out of
741    // the persistent red_config store and merge them with the CLI
742    // flags (flag > red_config > built-in default).
743    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
744        return run_routed_server(config, router_bind_addr);
745    }
746
747    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
748        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
749            run_dual_server(config, grpc_bind_addr, http_bind_addr)
750        }
751        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
752        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
753        (None, None) => {
754            if let Some(wire_addr) = config.wire_bind_addr.clone() {
755                run_wire_only_server(config, wire_addr)
756            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
757                run_pg_only_server(config, pg_addr)
758            } else {
759                Err("at least one server bind address must be configured".to_string())
760            }
761        }
762    }
763}
764
765/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
766///
767/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
768/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
769/// grace window. SIGKILL after that grace window is the OS's
770/// fallback; we are responsible for finishing in time.
771///
772/// Spawns a tokio task on the caller's runtime that:
773///   1. Awaits the first of SIGTERM / SIGINT.
774///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
775///      runtime moves to `Stopped` on its own; this just runs the
776///      flush + checkpoint pipeline and (optionally) a final backup.
777///   3. Calls `std::process::exit(0)` so the orchestrator sees a
778///      clean exit code.
779///
780/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
781/// configured) toggles step 3's backup branch. The flush + checkpoint
782/// always run.
783///
784/// Idempotent across signal storms — `graceful_shutdown` returns the
785/// cached report on second call, but we exit on the first one
786/// regardless, so the second SIGTERM never reaches the handler.
787async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
788    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
789        .ok()
790        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
791        .unwrap_or(true);
792
793    #[cfg(unix)]
794    {
795        use tokio::signal::unix::{signal, SignalKind};
796
797        let mut sigterm = match signal(SignalKind::terminate()) {
798            Ok(s) => s,
799            Err(err) => {
800                tracing::warn!(
801                    error = %err,
802                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
803                );
804                return;
805            }
806        };
807        let mut sigint = match signal(SignalKind::interrupt()) {
808            Ok(s) => s,
809            Err(err) => {
810                tracing::warn!(error = %err, "could not install SIGINT handler");
811                return;
812            }
813        };
814        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
815        // their `_FILE` companions without restarting the process.
816        // Useful for credential rotation pipelines (kubectl create
817        // secret + kubectl rollout restart, but for systemd / Nomad /
818        // bare-metal where rolling the process is heavier).
819        let mut sighup = match signal(SignalKind::hangup()) {
820            Ok(s) => Some(s),
821            Err(err) => {
822                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
823                None
824            }
825        };
826
827        let reload_runtime = runtime.clone();
828        tokio::spawn(async move {
829            loop {
830                let signal_name = match &mut sighup {
831                    Some(hup) => tokio::select! {
832                        _ = sigterm.recv() => "SIGTERM",
833                        _ = sigint.recv() => "SIGINT",
834                        _ = hup.recv() => "SIGHUP",
835                    },
836                    None => tokio::select! {
837                        _ = sigterm.recv() => "SIGTERM",
838                        _ = sigint.recv() => "SIGINT",
839                    },
840                };
841
842                if signal_name == "SIGHUP" {
843                    handle_sighup_reload(&reload_runtime);
844                    continue; // stay alive; SIGHUP isn't a shutdown
845                }
846
847                tracing::info!(
848                    signal = signal_name,
849                    "lifecycle signal received; shutting down"
850                );
851                match runtime.graceful_shutdown(backup_on_shutdown) {
852                    Ok(report) => {
853                        tracing::info!(
854                            duration_ms = report.duration_ms,
855                            flushed_wal = report.flushed_wal,
856                            final_checkpoint = report.final_checkpoint,
857                            backup_uploaded = report.backup_uploaded,
858                            "graceful shutdown complete"
859                        );
860                    }
861                    Err(err) => {
862                        tracing::error!(error = %err, "graceful shutdown failed");
863                        // Issue #205 — graceful shutdown returning Err
864                        // means the runtime is exiting without a clean
865                        // flush/checkpoint. Operator-grade event so the
866                        // operator notices the dirty exit even when the
867                        // process restarts before they read tracing logs.
868                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
869                            reason: format!("graceful shutdown failed: {err}"),
870                        }
871                        .emit_global();
872                    }
873                }
874                std::process::exit(0);
875            }
876        });
877    }
878
879    #[cfg(not(unix))]
880    {
881        tokio::spawn(async move {
882            let interrupted = tokio::signal::ctrl_c().await;
883            if let Err(err) = interrupted {
884                tracing::warn!(error = %err, "could not install Ctrl+C handler");
885                return;
886            }
887
888            tracing::info!(
889                signal = "Ctrl+C",
890                "lifecycle signal received; shutting down"
891            );
892            match runtime.graceful_shutdown(backup_on_shutdown) {
893                Ok(report) => {
894                    tracing::info!(
895                        duration_ms = report.duration_ms,
896                        flushed_wal = report.flushed_wal,
897                        final_checkpoint = report.final_checkpoint,
898                        backup_uploaded = report.backup_uploaded,
899                        "graceful shutdown complete"
900                    );
901                }
902                Err(err) => {
903                    tracing::error!(error = %err, "graceful shutdown failed");
904                }
905            }
906            std::process::exit(0);
907        });
908    }
909}
910
911/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
912/// vars. Today this only refreshes the audit log + records the
913/// reload event; the runtime modules that hold cached secret
914/// material (S3 backend credentials, admin token, JWT keys) read
915/// the env on each request so the next call after SIGHUP picks up
916/// the new file contents automatically. A future extension can
917/// punch through to the LeaseStore / AuthStore for in-memory
918/// caches that don't re-read on each call.
919fn handle_sighup_reload(runtime: &RedDBRuntime) {
920    let now_ms = std::time::SystemTime::now()
921        .duration_since(std::time::UNIX_EPOCH)
922        .map(|d| d.as_millis() as u64)
923        .unwrap_or(0);
924    tracing::info!(
925        target: "reddb::secrets",
926        ts_unix_ms = now_ms,
927        "SIGHUP received; secrets will be re-read from *_FILE on next access"
928    );
929    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
930    // every emission goes through the typed-field guard. The
931    // arguments here are static, but using the typed entry point
932    // keeps the discipline uniform across call sites.
933    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
934    runtime.audit_log().record_event(
935        AuditEvent::builder("config/sighup_reload")
936            .source(AuditAuthSource::System)
937            .resource("secrets")
938            .outcome(Outcome::Success)
939            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
940            .build(),
941    );
942}
943
944#[inline(never)]
945fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
946    let workers = config.workers;
947    let cli_telemetry = config.telemetry.clone();
948    let db_options = config.to_db_options();
949    let rt_config = detect_runtime_config();
950    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
951    let (runtime, auth_store, _telemetry_guard) =
952        build_runtime_and_auth_store(db_options, cli_telemetry)?;
953
954    spawn_admin_metrics_listeners(&runtime, &auth_store);
955
956    let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
957        .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
958    let http_backend = http_listener
959        .local_addr()
960        .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
961    let http_server = build_http_server(
962        runtime.clone(),
963        auth_store.clone(),
964        http_backend.to_string(),
965    );
966    let http_handle = http_server.serve_in_background_on(http_listener);
967
968    thread::sleep(Duration::from_millis(100));
969    if http_handle.is_finished() {
970        return match http_handle.join() {
971            Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
972            Ok(Err(err)) => Err(err.to_string()),
973            Err(_) => Err("HTTP backend thread panicked".to_string()),
974        };
975    }
976
977    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
978        .enable_all()
979        .worker_threads(worker_threads)
980        .thread_stack_size(rt_config.stack_size)
981        .build()
982        .map_err(|err| format!("tokio runtime: {err}"))?;
983
984    let signal_runtime = runtime.clone();
985    tokio_runtime.block_on(async move {
986        spawn_lifecycle_signal_handler(signal_runtime).await;
987        let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
988            .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
989        let grpc_backend = grpc_listener
990            .local_addr()
991            .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
992        let grpc_server = RedDBGrpcServer::with_options(
993            runtime.clone(),
994            GrpcServerOptions {
995                bind_addr: grpc_backend.to_string(),
996                tls: None,
997            },
998            auth_store,
999        );
1000        tokio::spawn(async move {
1001            if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1002                tracing::error!(err = %err, "gRPC backend error");
1003            }
1004        });
1005
1006        let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1007            .await
1008            .map_err(|err| format!("bind internal wire listener: {err}"))?;
1009        let wire_backend = wire_listener
1010            .local_addr()
1011            .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1012        let wire_rt = Arc::new(runtime);
1013        tokio::spawn(async move {
1014            if let Err(err) =
1015                crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1016                    .await
1017            {
1018                tracing::error!(err = %err, "redwire backend error");
1019            }
1020        });
1021
1022        tracing::info!(
1023            bind = %router_bind_addr,
1024            cpus = rt_config.available_cpus,
1025            workers = worker_threads,
1026            "router bootstrapping"
1027        );
1028        serve_tcp_router(TcpProtocolRouterConfig {
1029            bind_addr: router_bind_addr,
1030            grpc_backend,
1031            http_backend,
1032            wire_backend,
1033        })
1034        .await
1035        .map_err(|err| err.to_string())
1036    })
1037}
1038
1039/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1040fn spawn_wire_listeners(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1041    // Plaintext RedWire — TCP or Unix socket
1042    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1043        let wire_rt = Arc::new(runtime.clone());
1044        tokio::spawn(async move {
1045            // Address starting with `unix://` or an absolute filesystem path
1046            // switches to Unix domain sockets.
1047            #[cfg(unix)]
1048            {
1049                if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1050                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1051                        &wire_addr, wire_rt,
1052                    )
1053                    .await
1054                    {
1055                        tracing::error!(err = %e, "redwire unix listener error");
1056                    }
1057                    return;
1058                }
1059            }
1060            let cfg = crate::wire::RedWireConfig {
1061                bind_addr: wire_addr,
1062                auth_store: None,
1063                oauth: None,
1064            };
1065            if let Err(e) = crate::wire::start_redwire_listener(cfg, wire_rt).await {
1066                tracing::error!(err = %e, "redwire listener error");
1067            }
1068        });
1069    }
1070
1071    // RedWire over TLS
1072    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1073        let tls_config = resolve_wire_tls_config(config);
1074        match tls_config {
1075            Ok(tls_cfg) => {
1076                let wire_rt = Arc::new(runtime.clone());
1077                tokio::spawn(async move {
1078                    if let Err(e) =
1079                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1080                            .await
1081                    {
1082                        tracing::error!(err = %e, "redwire+tls listener error");
1083                    }
1084                });
1085            }
1086            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1087        }
1088    }
1089}
1090
1091/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1092///
1093/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1094/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1095/// alongside the native wire listener; the two transports do not
1096/// share a port.
1097fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1098    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1099        let rt = Arc::new(runtime.clone());
1100        tokio::spawn(async move {
1101            let cfg = crate::wire::PgWireConfig {
1102                bind_addr: pg_addr,
1103                ..Default::default()
1104            };
1105            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1106                tracing::error!(err = %e, "pg wire listener error");
1107            }
1108        });
1109    }
1110}
1111
1112/// Resolve gRPC TLS material into PEM bytes.
1113///
1114/// Lookup order, in priority:
1115///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1116///      passed via CLI/env). Both must be present together.
1117///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1118///      to the data dir. Refuses to run without the env flag so an
1119///      operator can't accidentally ship a dev cert in prod.
1120///
1121/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1122/// turning the listener into a mutual-TLS endpoint that requires
1123/// every client to present a chain that anchors at one of the CAs
1124/// in the bundle.
1125fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1126    use crate::utils::secret_file::expand_file_env;
1127
1128    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1129    // surface as warnings; the fallback path (explicit cert paths) will
1130    // cover the common case.
1131    for var in [
1132        "REDDB_GRPC_TLS_CERT",
1133        "REDDB_GRPC_TLS_KEY",
1134        "REDDB_GRPC_TLS_CLIENT_CA",
1135    ] {
1136        if let Err(err) = expand_file_env(var) {
1137            tracing::warn!(
1138                target: "reddb::secrets",
1139                env = %var,
1140                err = %err,
1141                "could not expand *_FILE companion for gRPC TLS"
1142            );
1143        }
1144    }
1145
1146    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1147        (Some(cert), Some(key)) => {
1148            let cert_pem = std::fs::read(cert)
1149                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1150            let key_pem =
1151                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1152            (cert_pem, key_pem)
1153        }
1154        _ => {
1155            // No explicit material → only proceed when dev-mode is on.
1156            let dev = std::env::var("RED_GRPC_TLS_DEV")
1157                .ok()
1158                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1159                .unwrap_or(false);
1160            if !dev {
1161                return Err("gRPC TLS configured but no cert/key supplied — set \
1162                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1163                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1164                    .to_string());
1165            }
1166            let dir = config
1167                .path
1168                .as_ref()
1169                .and_then(|p| p.parent())
1170                .map(PathBuf::from)
1171                .unwrap_or_else(|| PathBuf::from("."));
1172            let (cert_pem_str, key_pem_str) =
1173                crate::wire::tls::generate_self_signed_cert("localhost")
1174                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1175
1176            // Constant-time-friendly fingerprint to stderr so the
1177            // operator can pin a client trust store. We log via
1178            // `tracing::warn!` so it stands out next to ordinary
1179            // listener-online events.
1180            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1181            tracing::warn!(
1182                target: "reddb::security",
1183                transport = "grpc",
1184                cert_sha256 = %fp,
1185                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1186                 DO NOT use in production"
1187            );
1188            // Persist so that restarts reuse the same identity.
1189            let cert_path = dir.join("grpc-tls-cert.pem");
1190            let key_path = dir.join("grpc-tls-key.pem");
1191            if !cert_path.exists() || !key_path.exists() {
1192                let _ = std::fs::create_dir_all(&dir);
1193                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1194                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1195                std::fs::write(&key_path, key_pem_str.as_bytes())
1196                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1197                #[cfg(unix)]
1198                {
1199                    use std::os::unix::fs::PermissionsExt;
1200                    let _ =
1201                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1202                }
1203            }
1204            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1205        }
1206    };
1207
1208    let client_ca_pem = match &config.grpc_tls_client_ca {
1209        Some(path) => Some(
1210            std::fs::read(path)
1211                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1212        ),
1213        None => None,
1214    };
1215
1216    Ok(crate::GrpcTlsOptions {
1217        cert_pem,
1218        key_pem,
1219        client_ca_pem,
1220    })
1221}
1222
1223/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1224/// configured. Logs and continues on TLS-config errors so the plain
1225/// listener stays up; this matches the wire-listener pattern.
1226fn spawn_grpc_tls_listener_if_configured(
1227    config: &ServerCommandConfig,
1228    runtime: RedDBRuntime,
1229    auth_store: Arc<AuthStore>,
1230) {
1231    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1232        return;
1233    };
1234    let tls_opts = match resolve_grpc_tls_options(config) {
1235        Ok(opts) => opts,
1236        Err(err) => {
1237            tracing::error!(
1238                target: "reddb::security",
1239                transport = "grpc",
1240                err = %err,
1241                "gRPC TLS config error; TLS listener will not start"
1242            );
1243            return;
1244        }
1245    };
1246    tokio::spawn(async move {
1247        let server = RedDBGrpcServer::with_options(
1248            runtime,
1249            GrpcServerOptions {
1250                bind_addr: tls_bind.clone(),
1251                tls: Some(tls_opts),
1252            },
1253            auth_store,
1254        );
1255        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1256        if let Err(err) = server.serve().await {
1257            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1258        }
1259    });
1260}
1261
1262/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1263/// lines. Constant-time hash; no token contents leave this fn.
1264fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1265    use sha2::{Digest, Sha256};
1266    let mut h = Sha256::new();
1267    h.update(pem);
1268    let d = h.finalize();
1269    let mut buf = String::with_capacity(64);
1270    for b in d.iter() {
1271        buf.push_str(&format!("{b:02x}"));
1272    }
1273    buf
1274}
1275
1276/// Resolve TLS config: use provided cert/key or auto-generate.
1277fn resolve_wire_tls_config(
1278    config: &ServerCommandConfig,
1279) -> Result<crate::wire::WireTlsConfig, String> {
1280    match (&config.wire_tls_cert, &config.wire_tls_key) {
1281        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1282            cert_path: cert.clone(),
1283            key_path: key.clone(),
1284        }),
1285        _ => {
1286            // Auto-generate self-signed cert
1287            let dir = config
1288                .path
1289                .as_ref()
1290                .and_then(|p| p.parent())
1291                .map(PathBuf::from)
1292                .unwrap_or_else(|| PathBuf::from("."));
1293            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1294        }
1295    }
1296}
1297
1298#[inline(never)]
1299fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1300    let rt_config = detect_runtime_config();
1301    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1302    let cli_telemetry = config.telemetry.clone();
1303    let db_options = config.to_db_options();
1304
1305    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1306        .enable_all()
1307        .worker_threads(workers)
1308        .thread_stack_size(rt_config.stack_size)
1309        .build()
1310        .map_err(|err| format!("tokio runtime: {err}"))?;
1311
1312    // Guard lives on the outer thread's stack so it outlives the
1313    // tokio runtime — dropping it only after the listener returns
1314    // flushes the file log writer.
1315    let (runtime, _auth_store, _telemetry_guard) =
1316        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1317    let signal_runtime = runtime.clone();
1318    tokio_runtime.block_on(async move {
1319        spawn_lifecycle_signal_handler(signal_runtime).await;
1320        spawn_pg_listener(&config, &runtime);
1321        let wire_rt = Arc::new(runtime);
1322        let cfg = crate::wire::RedWireConfig {
1323            bind_addr: wire_addr,
1324            auth_store: None,
1325            oauth: None,
1326        };
1327        crate::wire::start_redwire_listener(cfg, wire_rt)
1328            .await
1329            .map_err(|e| e.to_string())
1330    })
1331}
1332
1333#[inline(never)]
1334fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1335    let rt_config = detect_runtime_config();
1336    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1337    let cli_telemetry = config.telemetry.clone();
1338    let db_options = config.to_db_options();
1339
1340    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1341        .enable_all()
1342        .worker_threads(workers)
1343        .thread_stack_size(rt_config.stack_size)
1344        .build()
1345        .map_err(|err| format!("tokio runtime: {err}"))?;
1346
1347    let (runtime, _auth_store, _telemetry_guard) =
1348        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1349    let signal_runtime = runtime.clone();
1350    tokio_runtime.block_on(async move {
1351        spawn_lifecycle_signal_handler(signal_runtime).await;
1352        let cfg = crate::wire::PgWireConfig {
1353            bind_addr: pg_addr,
1354            ..Default::default()
1355        };
1356        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1357            .await
1358            .map_err(|e| e.to_string())
1359    })
1360}
1361
1362#[inline(never)]
1363fn build_runtime_and_auth_store(
1364    db_options: RedDBOptions,
1365    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1366) -> Result<
1367    (
1368        RedDBRuntime,
1369        Arc<AuthStore>,
1370        Option<crate::telemetry::TelemetryGuard>,
1371    ),
1372    String,
1373> {
1374    // Return the TelemetryGuard so server runners can bind it for
1375    // their full lifetime. Dropping the guard tears down the
1376    // non-blocking log writer thread and, because that writer is
1377    // built with `.lossy(true)`, any subsequent log event routed to
1378    // the file sink is silently dropped — so callers MUST keep the
1379    // returned `Option<TelemetryGuard>` alive until shutdown.
1380    build_runtime_with_telemetry(db_options, cli_telemetry)
1381}
1382
1383/// Open the runtime, initialise structured logging from merged CLI +
1384/// `red_config` settings, and return a guard the caller must keep
1385/// alive for the server lifetime (drop flushes pending log writes).
1386///
1387/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1388/// in red_config, beats the built-in default. A CLI-flag value of
1389/// `None` / empty means "inherit from config or default" — never
1390/// "disable". The one exception is `--no-log-file` which forces
1391/// `log_dir = None` regardless of config.
1392pub(crate) fn build_runtime_with_telemetry(
1393    db_options: RedDBOptions,
1394    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1395) -> Result<
1396    (
1397        RedDBRuntime,
1398        Arc<AuthStore>,
1399        Option<crate::telemetry::TelemetryGuard>,
1400    ),
1401    String,
1402> {
1403    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1404        // Issue #205 — runtime construction failure is the canonical
1405        // StartupFailed phase. The audit sink isn't installed yet
1406        // (it would have been registered inside `with_options`), so
1407        // the emit falls through to tracing+eprintln only — operator
1408        // still sees it on stderr.
1409        let msg = err.to_string();
1410        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1411            phase: "runtime_construction".to_string(),
1412            error: msg.clone(),
1413        }
1414        .emit_global();
1415        msg
1416    })?;
1417
1418    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1419    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1420    // operator asked for a fence; running unfenced would silently
1421    // expose split-brain risk.
1422    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1423        let msg = err.to_string();
1424        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1425            phase: "lease_loop".to_string(),
1426            error: msg.clone(),
1427        }
1428        .emit_global();
1429        msg
1430    })?;
1431
1432    // #213 — edge-triggered disk-space watchdog. Watches the data
1433    // directory; falls back to polling when fanotify is unavailable
1434    // (non-Linux or unprivileged container).
1435    if let Some(data_path) = db_options.data_path.as_deref() {
1436        let watch_dir = data_path.parent().unwrap_or(data_path);
1437        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1438    }
1439
1440    // #214 — inotify config hot-reload watcher. Watches the config file
1441    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1442    // applies hot-reloadable keys without restart.
1443    {
1444        let config_path = crate::runtime::config_overlay::config_file_path();
1445        let store = runtime.db().store();
1446        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1447    }
1448
1449    // Phase 6 logging: merge red_config overrides onto the CLI-built
1450    // telemetry config, then install the global subscriber.
1451    let merged = merge_telemetry_with_config(
1452        cli_telemetry
1453            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1454        &runtime,
1455    );
1456    let telemetry_guard = crate::telemetry::init(merged);
1457
1458    let auth_store =
1459        if db_options.auth.vault_enabled {
1460            let pager =
1461                runtime.db().store().pager().cloned().ok_or_else(|| {
1462                    "vault requires a paged database (persistent mode)".to_string()
1463                })?;
1464            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1465                .map_err(|err| err.to_string())?;
1466            Arc::new(store)
1467        } else {
1468            Arc::new(AuthStore::new(db_options.auth.clone()))
1469        };
1470    auth_store.bootstrap_from_env();
1471
1472    // Background session purge (every 5 minutes)
1473    {
1474        let store = Arc::clone(&auth_store);
1475        std::thread::Builder::new()
1476            .name("reddb-session-purge".into())
1477            .spawn(move || loop {
1478                std::thread::sleep(std::time::Duration::from_secs(300));
1479                store.purge_expired_sessions();
1480            })
1481            .ok();
1482    }
1483
1484    Ok((runtime, auth_store, telemetry_guard))
1485}
1486
1487/// Read `red.logging.*` keys from the persistent config store and
1488/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
1489/// explicit CLI flag > red_config > built-in default.
1490///
1491/// The "was a flag passed" signal comes from the `*_explicit` bools
1492/// on `TelemetryConfig`, populated by the CLI parser. This replaces
1493/// an earlier equality-to-default heuristic that silently dropped
1494/// config whenever the CLI-derived default diverged from
1495/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
1496/// non-TTY `format`) and that silently overrode `--no-log-file`.
1497fn merge_telemetry_with_config(
1498    mut cli: crate::telemetry::TelemetryConfig,
1499    runtime: &RedDBRuntime,
1500) -> crate::telemetry::TelemetryConfig {
1501    use crate::storage::schema::Value;
1502
1503    let store = runtime.db().store();
1504
1505    if !cli.level_explicit {
1506        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1507            cli.level_filter = v.to_string();
1508        }
1509    }
1510    if !cli.format_explicit {
1511        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1512            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1513                cli.format = parsed;
1514            }
1515        }
1516    }
1517    if !cli.rotation_keep_days_explicit {
1518        match store.get_config("red.logging.keep_days") {
1519            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1520                cli.rotation_keep_days = n as u16
1521            }
1522            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1523                cli.rotation_keep_days = n as u16
1524            }
1525            Some(Value::Text(v)) => {
1526                if let Ok(n) = v.parse::<u16>() {
1527                    cli.rotation_keep_days = n;
1528                }
1529            }
1530            _ => {}
1531        }
1532    }
1533    if !cli.file_prefix_explicit {
1534        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1535            if !v.is_empty() {
1536                cli.file_prefix = v.to_string();
1537            }
1538        }
1539    }
1540    // --no-log-file is a kill-switch: config cannot resurrect the
1541    // file sink. Explicit --log-dir also wins.
1542    if !cli.log_dir_explicit && !cli.log_file_disabled {
1543        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1544            if !v.is_empty() {
1545                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1546            }
1547        }
1548    }
1549
1550    cli
1551}
1552
1553#[cfg(test)]
1554mod telemetry_merge_tests {
1555    use super::*;
1556    use crate::telemetry::{LogFormat, TelemetryConfig};
1557
1558    fn fresh_runtime() -> RedDBRuntime {
1559        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1560    }
1561
1562    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1563        runtime
1564            .db()
1565            .store()
1566            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1567    }
1568
1569    fn cli_base() -> TelemetryConfig {
1570        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
1571        // log_dir = Some(...), format = Json. Nothing marked explicit.
1572        TelemetryConfig {
1573            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1574            format: LogFormat::Json,
1575            ..Default::default()
1576        }
1577    }
1578
1579    #[test]
1580    fn config_log_dir_promoted_when_flag_absent() {
1581        let runtime = fresh_runtime();
1582        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1583        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1584        assert_eq!(
1585            merged.log_dir.as_deref(),
1586            Some(std::path::Path::new("/var/log/reddb"))
1587        );
1588    }
1589
1590    #[test]
1591    fn explicit_log_dir_wins_over_config() {
1592        let runtime = fresh_runtime();
1593        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1594        let mut cli = cli_base();
1595        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1596        cli.log_dir_explicit = true;
1597        let merged = merge_telemetry_with_config(cli, &runtime);
1598        assert_eq!(
1599            merged.log_dir.as_deref(),
1600            Some(std::path::Path::new("/custom/dir"))
1601        );
1602    }
1603
1604    #[test]
1605    fn no_log_file_beats_config_log_dir() {
1606        let runtime = fresh_runtime();
1607        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1608        let mut cli = cli_base();
1609        cli.log_dir = None;
1610        cli.log_file_disabled = true;
1611        let merged = merge_telemetry_with_config(cli, &runtime);
1612        assert!(
1613            merged.log_dir.is_none(),
1614            "--no-log-file must veto config dir"
1615        );
1616    }
1617
1618    #[test]
1619    fn config_format_promoted_on_non_tty_default() {
1620        // On non-TTY, default_telemetry_for_path yields format=Json even
1621        // though TelemetryConfig::default() is Pretty. The old equality
1622        // check silently dropped config here.
1623        let runtime = fresh_runtime();
1624        set_str(&runtime, "red.logging.format", "pretty");
1625        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1626        assert_eq!(merged.format, LogFormat::Pretty);
1627    }
1628
1629    #[test]
1630    fn explicit_format_wins_over_config() {
1631        let runtime = fresh_runtime();
1632        set_str(&runtime, "red.logging.format", "pretty");
1633        let mut cli = cli_base();
1634        cli.format = LogFormat::Json;
1635        cli.format_explicit = true;
1636        let merged = merge_telemetry_with_config(cli, &runtime);
1637        assert_eq!(merged.format, LogFormat::Json);
1638    }
1639}
1640
1641#[inline(never)]
1642fn build_http_server(
1643    runtime: RedDBRuntime,
1644    auth_store: Arc<AuthStore>,
1645    bind_addr: String,
1646) -> RedDBServer {
1647    RedDBServer::with_options(
1648        runtime,
1649        ServerOptions {
1650            bind_addr,
1651            ..ServerOptions::default()
1652        },
1653    )
1654    .with_auth(auth_store)
1655}
1656
1657/// PLAN.md Phase 6.2 — build a listener that only serves
1658/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
1659/// when the env var has no host (loopback-only by default per spec).
1660#[inline(never)]
1661fn build_admin_only_server(
1662    runtime: RedDBRuntime,
1663    auth_store: Arc<AuthStore>,
1664    bind_addr: String,
1665) -> RedDBServer {
1666    RedDBServer::with_options(
1667        runtime,
1668        ServerOptions {
1669            bind_addr,
1670            surface: crate::server::ServerSurface::AdminOnly,
1671            ..ServerOptions::default()
1672        },
1673    )
1674    .with_auth(auth_store)
1675}
1676
1677/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
1678/// + `/health/*`. Suitable for Prometheus scrape ports that may be
1679///   exposed wider than the admin port.
1680#[inline(never)]
1681fn build_metrics_only_server(
1682    runtime: RedDBRuntime,
1683    auth_store: Arc<AuthStore>,
1684    bind_addr: String,
1685) -> RedDBServer {
1686    RedDBServer::with_options(
1687        runtime,
1688        ServerOptions {
1689            bind_addr,
1690            surface: crate::server::ServerSurface::MetricsOnly,
1691            ..ServerOptions::default()
1692        },
1693    )
1694    .with_auth(auth_store)
1695}
1696
1697/// Spawn dedicated admin / metrics listeners when the operator set
1698/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
1699/// unset the existing listener keeps serving everything (back-compat).
1700fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
1701    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
1702        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1703        let _ = server.serve_in_background();
1704        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
1705    }
1706    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
1707        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1708        let _ = server.serve_in_background();
1709        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
1710    }
1711}
1712
1713#[inline(never)]
1714fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1715    let cli_telemetry = config.telemetry.clone();
1716    let (runtime, auth_store, _telemetry_guard) =
1717        build_runtime_and_auth_store(config.to_db_options(), cli_telemetry)?;
1718    spawn_admin_metrics_listeners(&runtime, &auth_store);
1719    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1720    let server = build_http_server(runtime, auth_store, bind_addr.clone());
1721    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
1722    server.serve().map_err(|err| err.to_string())
1723}
1724
1725/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
1726/// rustls-terminated listener alongside the plain HTTP server. Cert
1727/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
1728///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
1729///   is auto-generated next to the data directory (refused otherwise).
1730fn spawn_http_tls_listener(
1731    config: &ServerCommandConfig,
1732    runtime: &RedDBRuntime,
1733    auth_store: &Arc<AuthStore>,
1734) -> Result<(), String> {
1735    let Some(addr) = config.http_tls_bind_addr.clone() else {
1736        return Ok(());
1737    };
1738
1739    let tls_config = resolve_http_tls_config(config)?;
1740    let server_config = crate::server::tls::build_server_config(&tls_config)
1741        .map_err(|err| format!("HTTP TLS: {err}"))?;
1742
1743    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
1744    let _handle = server.serve_tls_in_background(server_config);
1745    tracing::info!(
1746        transport = "https",
1747        bind = %addr,
1748        mtls = %tls_config.client_ca_path.is_some(),
1749        "TLS listener online"
1750    );
1751    Ok(())
1752}
1753
1754/// Resolve the HTTP TLS config from CLI / env / dev defaults.
1755fn resolve_http_tls_config(
1756    config: &ServerCommandConfig,
1757) -> Result<crate::server::tls::HttpTlsConfig, String> {
1758    match (&config.http_tls_cert, &config.http_tls_key) {
1759        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
1760            cert_path: cert.clone(),
1761            key_path: key.clone(),
1762            client_ca_path: config.http_tls_client_ca.clone(),
1763        }),
1764        (None, None) => {
1765            // Dev-mode auto-generate next to the data directory.
1766            let dir = config
1767                .path
1768                .as_ref()
1769                .and_then(|p| p.parent().map(std::path::PathBuf::from))
1770                .unwrap_or_else(|| std::path::PathBuf::from("."));
1771            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
1772                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
1773            Ok(crate::server::tls::HttpTlsConfig {
1774                cert_path: auto.cert_path,
1775                key_path: auto.key_path,
1776                client_ca_path: config.http_tls_client_ca.clone(),
1777            })
1778        }
1779        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
1780    }
1781}
1782
1783#[inline(never)]
1784fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1785    let workers = config.workers;
1786    let cli_telemetry = config.telemetry.clone();
1787    let db_options = config.to_db_options();
1788    let rt_config = detect_runtime_config();
1789
1790    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1791
1792    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1793        .enable_all()
1794        .worker_threads(worker_threads)
1795        .thread_stack_size(rt_config.stack_size)
1796        .build()
1797        .map_err(|err| format!("tokio runtime: {err}"))?;
1798
1799    // Guard lives on the outer stack so it outlives the tokio runtime.
1800    let (runtime, auth_store, _telemetry_guard) =
1801        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1802    let signal_runtime = runtime.clone();
1803    tokio_runtime.block_on(async move {
1804        spawn_lifecycle_signal_handler(signal_runtime).await;
1805        // Start wire protocol listeners (plaintext + TLS)
1806        spawn_wire_listeners(&config, &runtime);
1807
1808        // Start PostgreSQL wire listener when --pg-bind is configured.
1809        spawn_pg_listener(&config, &runtime);
1810
1811        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
1812        // it spawns a separate listener so plaintext + TLS can run
1813        // side-by-side (50051 plain + 50052 TLS, etc.).
1814        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1815
1816        let server = RedDBGrpcServer::with_options(
1817            runtime,
1818            GrpcServerOptions {
1819                bind_addr: bind_addr.clone(),
1820                tls: None,
1821            },
1822            auth_store,
1823        );
1824
1825        tracing::info!(
1826            transport = "grpc",
1827            bind = %bind_addr,
1828            cpus = rt_config.available_cpus,
1829            workers = worker_threads,
1830            "listener online"
1831        );
1832        server.serve().await.map_err(|err| err.to_string())
1833    })
1834}
1835
1836#[inline(never)]
1837fn run_dual_server(
1838    config: ServerCommandConfig,
1839    grpc_bind_addr: String,
1840    http_bind_addr: String,
1841) -> Result<(), String> {
1842    let workers = config.workers;
1843    let wire_bind_addr = config.wire_bind_addr.clone();
1844    let cli_telemetry = config.telemetry.clone();
1845    let db_options = config.to_db_options();
1846    let rt_config = detect_runtime_config();
1847    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1848    let (runtime, auth_store, _telemetry_guard) =
1849        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1850
1851    spawn_admin_metrics_listeners(&runtime, &auth_store);
1852    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1853
1854    let http_server =
1855        build_http_server(runtime.clone(), auth_store.clone(), http_bind_addr.clone());
1856    let http_handle = http_server.serve_in_background();
1857
1858    thread::sleep(Duration::from_millis(150));
1859    if http_handle.is_finished() {
1860        return match http_handle.join() {
1861            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
1862            Ok(Err(err)) => Err(err.to_string()),
1863            Err(_) => Err("HTTP server thread panicked".to_string()),
1864        };
1865    }
1866
1867    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1868        .enable_all()
1869        .worker_threads(worker_threads)
1870        .thread_stack_size(rt_config.stack_size)
1871        .build()
1872        .map_err(|err| format!("tokio runtime: {err}"))?;
1873
1874    let signal_runtime = runtime.clone();
1875    tokio_runtime.block_on(async move {
1876        spawn_lifecycle_signal_handler(signal_runtime).await;
1877        // Start wire protocol listeners (plaintext + TLS)
1878        spawn_wire_listeners(&config, &runtime);
1879
1880        // Start PostgreSQL wire listener when --pg-bind is configured.
1881        spawn_pg_listener(&config, &runtime);
1882
1883        // Optional TLS gRPC listener — runs alongside the plaintext one.
1884        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1885
1886        let server = RedDBGrpcServer::with_options(
1887            runtime,
1888            GrpcServerOptions {
1889                bind_addr: grpc_bind_addr.clone(),
1890                tls: None,
1891            },
1892            auth_store,
1893        );
1894
1895        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
1896        tracing::info!(
1897            transport = "grpc",
1898            bind = %grpc_bind_addr,
1899            cpus = rt_config.available_cpus,
1900            workers = worker_threads,
1901            "listener online"
1902        );
1903        server.serve().await.map_err(|err| err.to_string())
1904    })
1905}
1906
1907#[cfg(test)]
1908mod tests {
1909    use super::*;
1910
1911    #[test]
1912    fn render_systemd_unit_contains_expected_execstart() {
1913        let config = SystemdServiceConfig {
1914            service_name: "reddb".to_string(),
1915            binary_path: PathBuf::from("/usr/local/bin/red"),
1916            run_user: "reddb".to_string(),
1917            run_group: "reddb".to_string(),
1918            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1919            router_bind_addr: None,
1920            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1921            http_bind_addr: None,
1922        };
1923
1924        let unit = render_systemd_unit(&config);
1925        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
1926        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
1927    }
1928
1929    #[test]
1930    fn systemd_service_config_derives_paths() {
1931        let config = SystemdServiceConfig {
1932            service_name: "reddb-api".to_string(),
1933            binary_path: PathBuf::from("/usr/local/bin/red"),
1934            run_user: "reddb".to_string(),
1935            run_group: "reddb".to_string(),
1936            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
1937            router_bind_addr: None,
1938            grpc_bind_addr: None,
1939            http_bind_addr: Some("127.0.0.1:5055".to_string()),
1940        };
1941
1942        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
1943        assert_eq!(
1944            config.unit_path(),
1945            PathBuf::from("/etc/systemd/system/reddb-api.service")
1946        );
1947    }
1948
1949    #[test]
1950    fn render_systemd_unit_supports_dual_transport() {
1951        let config = SystemdServiceConfig {
1952            service_name: "reddb".to_string(),
1953            binary_path: PathBuf::from("/usr/local/bin/red"),
1954            run_user: "reddb".to_string(),
1955            run_group: "reddb".to_string(),
1956            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1957            router_bind_addr: None,
1958            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1959            http_bind_addr: Some("0.0.0.0:5055".to_string()),
1960        };
1961
1962        let unit = render_systemd_unit(&config);
1963        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
1964        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
1965    }
1966
1967    #[test]
1968    fn render_systemd_unit_supports_router_mode() {
1969        let config = SystemdServiceConfig {
1970            service_name: "reddb".to_string(),
1971            binary_path: PathBuf::from("/usr/local/bin/red"),
1972            run_user: "reddb".to_string(),
1973            run_group: "reddb".to_string(),
1974            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1975            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
1976            grpc_bind_addr: None,
1977            http_bind_addr: None,
1978        };
1979
1980        let unit = render_systemd_unit(&config);
1981        assert!(unit.contains("--bind 127.0.0.1:5050"));
1982        assert!(!unit.contains("--grpc-bind"));
1983        assert!(!unit.contains("--http-bind"));
1984    }
1985}