Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18/// Detect available CPU cores and suggest worker thread count.
19pub fn detect_runtime_config() -> RuntimeConfig {
20    let cpus = thread::available_parallelism()
21        .map(|n| n.get())
22        .unwrap_or(1);
23
24    // Reserve 1 core for OS, use the rest for workers (minimum 1)
25    let suggested_workers = cpus.saturating_sub(1).max(1);
26
27    RuntimeConfig {
28        available_cpus: cpus,
29        suggested_workers,
30        stack_size: 8 * 1024 * 1024, // 16MB default
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36    pub available_cpus: usize,
37    pub suggested_workers: usize,
38    pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43    Grpc,
44    Http,
45    Wire,
46}
47
48impl ServerTransport {
49    pub const fn as_str(self) -> &'static str {
50        match self {
51            Self::Grpc => "gRPC",
52            Self::Http => "HTTP",
53            Self::Wire => "wire",
54        }
55    }
56
57    pub const fn default_bind_addr(self) -> &'static str {
58        match self {
59            Self::Grpc => "127.0.0.1:5555",
60            Self::Http => "127.0.0.1:5055",
61            Self::Wire => "127.0.0.1:5050",
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68    pub path: Option<PathBuf>,
69    pub router_bind_addr: Option<String>,
70    pub grpc_bind_addr: Option<String>,
71    /// TLS-encrypted gRPC bind address. Can run side-by-side with
72    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
73    /// stand alone for TLS-only deploys. Defaults to `None`.
74    pub grpc_tls_bind_addr: Option<String>,
75    /// Path to PEM-encoded gRPC server certificate. Resolved through
76    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
77    /// mounts). When `None` and dev-mode is enabled
78    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
79    /// cert and prints its SHA-256 fingerprint to stderr.
80    pub grpc_tls_cert: Option<PathBuf>,
81    /// Path to PEM-encoded gRPC server private key. Same env-var
82    /// pattern as `grpc_tls_cert`.
83    pub grpc_tls_key: Option<PathBuf>,
84    /// Optional path to a PEM bundle of trust anchors used to verify
85    /// client certificates. When set, the gRPC listener requires
86    /// every client to present a cert that chains to this CA — i.e.
87    /// mutual TLS. When unset, one-way TLS only.
88    pub grpc_tls_client_ca: Option<PathBuf>,
89    pub http_bind_addr: Option<String>,
90    /// HTTPS bind address. When set, the HTTP server also serves a
91    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
92    /// run side by side (e.g. 8080 plain + 8443 TLS).
93    pub http_tls_bind_addr: Option<String>,
94    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
95    /// companion when not set explicitly.
96    pub http_tls_cert: Option<PathBuf>,
97    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
98    /// companion when not set explicitly.
99    pub http_tls_key: Option<PathBuf>,
100    /// Optional PEM CA bundle. When set, the HTTPS listener requires
101    /// every client to present a cert that chains to a CA in this
102    /// bundle (mTLS). When unset, plain server-side TLS only.
103    pub http_tls_client_ca: Option<PathBuf>,
104    pub wire_bind_addr: Option<String>,
105    /// TLS-encrypted wire protocol bind address
106    pub wire_tls_bind_addr: Option<String>,
107    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
108    pub wire_tls_cert: Option<PathBuf>,
109    /// Path to TLS key PEM
110    pub wire_tls_key: Option<PathBuf>,
111    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
112    /// When set the server accepts psql / JDBC / DBeaver clients on
113    /// this port via the v3 protocol. Defaults to None (disabled).
114    pub pg_bind_addr: Option<String>,
115    pub create_if_missing: bool,
116    pub read_only: bool,
117    pub role: String,
118    pub primary_addr: Option<String>,
119    pub vault: bool,
120    /// Override worker thread count (None = auto-detect from CPUs)
121    pub workers: Option<usize>,
122    /// Telemetry config (Phase 6 logging). `None` falls back to the
123    /// built-in default derived from `path` + stderr-only.
124    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
125}
126
127#[derive(Debug, Clone)]
128pub struct SystemdServiceConfig {
129    pub service_name: String,
130    pub binary_path: PathBuf,
131    pub run_user: String,
132    pub run_group: String,
133    pub data_path: PathBuf,
134    pub router_bind_addr: Option<String>,
135    pub grpc_bind_addr: Option<String>,
136    pub http_bind_addr: Option<String>,
137}
138
139impl SystemdServiceConfig {
140    pub fn data_dir(&self) -> PathBuf {
141        self.data_path
142            .parent()
143            .map(PathBuf::from)
144            .unwrap_or_else(|| PathBuf::from("."))
145    }
146
147    pub fn unit_path(&self) -> PathBuf {
148        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
149    }
150}
151
152/// Build a sane default `TelemetryConfig` from a server path when the
153/// caller didn't set one explicitly. Writes rotating logs into the
154/// parent directory of the DB file (or `./logs` for in-memory /
155/// pathless runs). Level defaults to `info`, pretty stderr format.
156pub fn default_telemetry_for_path(
157    path: Option<&std::path::Path>,
158) -> crate::telemetry::TelemetryConfig {
159    let log_dir = match path {
160        Some(p) => p
161            .parent()
162            .map(|parent| parent.join("logs"))
163            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
164        None => None, // in-memory — no file, stderr-only
165    };
166    crate::telemetry::TelemetryConfig {
167        log_dir,
168        file_prefix: "reddb.log".to_string(),
169        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
170        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
171            crate::telemetry::LogFormat::Pretty
172        } else {
173            crate::telemetry::LogFormat::Json
174        },
175        rotation_keep_days: 14,
176        service_name: "reddb",
177        // Implicit defaults — no CLI flag has claimed these values yet.
178        level_explicit: false,
179        format_explicit: false,
180        rotation_keep_days_explicit: false,
181        file_prefix_explicit: false,
182        log_dir_explicit: false,
183        log_file_disabled: false,
184    }
185}
186
187impl ServerCommandConfig {
188    fn to_db_options(&self) -> RedDBOptions {
189        let mut options = match &self.path {
190            Some(path) => RedDBOptions::persistent(path),
191            None => RedDBOptions::in_memory(),
192        };
193
194        options.mode = StorageMode::Persistent;
195        options.create_if_missing = self.create_if_missing;
196        // PLAN.md Phase 4.3 — read_only resolution priority:
197        //   1. CLI flag (`--readonly`) — explicit operator intent.
198        //   2. `RED_READONLY=true` env — orchestrator override.
199        //   3. Persisted `<data>/.runtime-state.json` from a prior
200        //      `POST /admin/readonly` — survives restart.
201        //   4. Default `false`.
202        options.read_only = self.read_only
203            || env_nonempty("RED_READONLY")
204                .or_else(|| env_nonempty("REDDB_READONLY"))
205                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
206                .unwrap_or(false)
207            || self.path.as_ref().is_some_and(|data_path| {
208                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
209                    data_path,
210                ))
211                .unwrap_or(false)
212            });
213
214        options.replication = match self.role.as_str() {
215            "primary" => ReplicationConfig::primary(),
216            "replica" => {
217                let primary_addr = self
218                    .primary_addr
219                    .clone()
220                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
221                // Public-mutation rejection on replicas is enforced by
222                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
223                // Leaving `options.read_only = false` keeps the pager
224                // writable so the internal logical-WAL apply path can
225                // ingest records from the primary; WriteGate ensures no
226                // client request reaches storage.
227                ReplicationConfig::replica(primary_addr)
228            }
229            _ => ReplicationConfig::standalone(),
230        };
231
232        if self.vault {
233            options.auth.vault_enabled = true;
234        }
235
236        configure_remote_backend_from_env(&mut options);
237
238        options
239    }
240
241    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
242        let mut transports = Vec::with_capacity(3);
243        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
244            transports.push(ServerTransport::Grpc);
245        }
246        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
247            transports.push(ServerTransport::Http);
248        }
249        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
250            transports.push(ServerTransport::Wire);
251        }
252        transports
253    }
254}
255
256/// Read an env var, treating empty / whitespace-only as `None`.
257/// Honors the `<NAME>_FILE` convention. Re-exports the shared
258/// `crate::utils::env_with_file_fallback` helper so call sites in
259/// this module can keep their short local name.
260fn env_nonempty(name: &str) -> Option<String> {
261    crate::utils::env_with_file_fallback(name)
262}
263
264fn env_truthy(name: &str) -> bool {
265    env_nonempty(name)
266        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
267        .unwrap_or(false)
268}
269
270fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
271    // PLAN.md (cloud-agnostic) — prefer the new spelling
272    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
273    // existing dev installs. `none` (default) means standalone — no
274    // remote backend, valid for development and on-prem without
275    // remote.
276    let backend = env_nonempty("RED_BACKEND")
277        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
278        .unwrap_or_else(|| "none".to_string())
279        .to_ascii_lowercase();
280
281    match backend.as_str() {
282        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
283        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
284        // IDrive, Storj. The `path_style` toggle in S3Config picks
285        // the right addressing for self-hosted vs hosted.
286        "s3" | "minio" | "r2" => {
287            #[cfg(feature = "backend-s3")]
288            {
289                if let Some(config) = s3_config_from_env() {
290                    let remote_key = env_nonempty("RED_REMOTE_KEY")
291                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
292                        .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
293                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
294                    options.remote_backend = Some(backend.clone());
295                    options.remote_backend_atomic = Some(backend);
296                    options.remote_key = Some(remote_key);
297                }
298            }
299            #[cfg(not(feature = "backend-s3"))]
300            {
301                tracing::warn!(
302                    backend = %backend,
303                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
304                );
305            }
306        }
307        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
308        // calls it `fs`; legacy code shipped it as `local`. Both
309        // names map to LocalBackend, with the remote_key derived
310        // from `RED_FS_PATH` + a per-database suffix when provided.
311        "fs" | "local" => {
312            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
313            let remote_key = match (
314                base_path,
315                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
316            ) {
317                (Some(base), Some(rel)) => Some(format!(
318                    "{}/{}",
319                    base.trim_end_matches('/'),
320                    rel.trim_start_matches('/')
321                )),
322                (Some(base), None) => Some(format!(
323                    "{}/clusters/dev/data.rdb",
324                    base.trim_end_matches('/')
325                )),
326                (None, Some(rel)) => Some(rel),
327                (None, None) => None,
328            };
329            if let Some(key) = remote_key {
330                let backend = Arc::new(crate::storage::backend::LocalBackend);
331                options.remote_backend = Some(backend.clone());
332                options.remote_backend_atomic = Some(backend);
333                options.remote_key = Some(key);
334            }
335        }
336        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
337        // portability: any service exposing PUT/GET/DELETE serves
338        // as a backup target. Optional auth via *_FILE secret
339        // path keeps the token out of the env.
340        "http" => {
341            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
342                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
343            {
344                Some(u) => u,
345                None => {
346                    tracing::warn!(
347                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
348                    );
349                    return;
350                }
351            };
352            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
353                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
354                .unwrap_or_default();
355            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
356                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
357            {
358                std::fs::read_to_string(&path)
359                    .ok()
360                    .map(|s| s.trim().to_string())
361                    .filter(|s| !s.is_empty())
362            } else {
363                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
364                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
365            };
366
367            let mut config =
368                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
369            if let Some(auth) = auth_header {
370                config = config.with_auth_header(auth);
371            }
372            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
373                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
374                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
375            config = config.with_conditional_writes(conditional_writes);
376            // Always populate the snapshot-transport handle. When the
377            // operator confirmed CAS support, also populate the atomic
378            // handle via AtomicHttpBackend — without that flag,
379            // LeaseStore must remain unreachable on this backend.
380            if conditional_writes {
381                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
382                    Ok(atomic) => {
383                        let atomic_arc = Arc::new(atomic);
384                        options.remote_backend = Some(atomic_arc.clone());
385                        options.remote_backend_atomic = Some(atomic_arc);
386                    }
387                    Err(err) => {
388                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
389                        options.remote_backend =
390                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
391                    }
392                }
393            } else {
394                options.remote_backend =
395                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
396            }
397            options.remote_key = env_nonempty("RED_REMOTE_KEY")
398                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
399                .or_else(|| Some("clusters/dev/data.rdb".to_string()));
400        }
401        // `none` is the explicit standalone — no remote, no backup
402        // pipeline. Boot never blocks on network reachability.
403        "none" | "" => {}
404        other => {
405            tracing::warn!(
406                backend = %other,
407                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
408            );
409        }
410    }
411}
412
413/// Resolve a value from env, accepting both the cloud-agnostic
414/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
415/// kept for existing dev installs. The new spelling wins; the
416/// legacy spelling is read with a warning hint in callers' logs.
417#[cfg(feature = "backend-s3")]
418fn env_s3(suffix: &str) -> Option<String> {
419    env_nonempty(&format!("RED_S3_{suffix}"))
420        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
421}
422
423/// Read a secret value from either the literal env var or a file
424/// path supplied via `*_FILE` (PLAN.md spec — compatible with
425/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
426/// secrets). The `_FILE` variant wins so an operator can set it to
427/// override the inline value without touching the inline env.
428#[cfg(feature = "backend-s3")]
429fn env_s3_secret(suffix: &str) -> Option<String> {
430    let file_key_red = format!("RED_S3_{suffix}_FILE");
431    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
432    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
433        return std::fs::read_to_string(&path)
434            .ok()
435            .map(|s| s.trim().to_string())
436            .filter(|s| !s.is_empty());
437    }
438    env_s3(suffix)
439}
440
441#[cfg(feature = "backend-s3")]
442fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
443    let endpoint = env_s3("ENDPOINT")?;
444    let bucket = env_s3("BUCKET")?;
445    let access_key = env_s3_secret("ACCESS_KEY")?;
446    let secret_key = env_s3_secret("SECRET_KEY")?;
447    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
448    let key_prefix = env_s3("KEY_PREFIX")
449        .or_else(|| env_s3("PREFIX"))
450        .unwrap_or_default();
451    let path_style = env_s3("PATH_STYLE")
452        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
453        .unwrap_or(true);
454    Some(crate::storage::backend::S3Config {
455        endpoint,
456        bucket,
457        key_prefix,
458        access_key,
459        secret_key,
460        region,
461        path_style,
462    })
463}
464
465pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
466    let data_dir = config.data_dir();
467    let exec_start = render_systemd_exec_start(config);
468    format!(
469        "[Unit]\n\
470Description=RedDB unified database service\n\
471After=network-online.target\n\
472Wants=network-online.target\n\
473\n\
474[Service]\n\
475Type=simple\n\
476User={user}\n\
477Group={group}\n\
478WorkingDirectory={workdir}\n\
479ExecStart={exec_start}\n\
480Restart=always\n\
481RestartSec=2\n\
482LimitSTACK=16M\n\
483NoNewPrivileges=true\n\
484PrivateTmp=true\n\
485ProtectSystem=strict\n\
486ProtectHome=true\n\
487ProtectControlGroups=true\n\
488ProtectKernelTunables=true\n\
489ProtectKernelModules=true\n\
490RestrictNamespaces=true\n\
491LockPersonality=true\n\
492MemoryDenyWriteExecute=true\n\
493ReadWritePaths={workdir}\n\
494\n\
495[Install]\n\
496WantedBy=multi-user.target\n",
497        user = config.run_user,
498        group = config.run_group,
499        workdir = data_dir.display(),
500        exec_start = exec_start,
501    )
502}
503
504/// Install a systemd unit + start the service.
505///
506/// Linux-only. The helper shells out to `systemctl`, `useradd`,
507/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
508/// Windows or macOS. The Windows/macOS branch returns a hard error so
509/// callers (the CLI) surface a clear message instead of a confusing
510/// syscall failure. A proper Windows-service equivalent (sc.exe /
511/// NSSM) is a Phase 3.6 follow-up.
512#[cfg(target_os = "linux")]
513pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
514    ensure_root()?;
515    ensure_command_available("systemctl")?;
516    ensure_command_available("getent")?;
517    ensure_command_available("groupadd")?;
518    ensure_command_available("useradd")?;
519    ensure_command_available("install")?;
520    ensure_executable(&config.binary_path)?;
521
522    if !command_success("getent", ["group", config.run_group.as_str()])? {
523        run_command("groupadd", ["--system", config.run_group.as_str()])?;
524    }
525
526    if !command_success("id", ["-u", config.run_user.as_str()])? {
527        let data_dir = config.data_dir();
528        run_command(
529            "useradd",
530            [
531                "--system",
532                "--gid",
533                config.run_group.as_str(),
534                "--home-dir",
535                data_dir.to_string_lossy().as_ref(),
536                "--shell",
537                "/usr/sbin/nologin",
538                config.run_user.as_str(),
539            ],
540        )?;
541    }
542
543    let data_dir = config.data_dir();
544    run_command(
545        "install",
546        [
547            "-d",
548            "-o",
549            config.run_user.as_str(),
550            "-g",
551            config.run_group.as_str(),
552            "-m",
553            "0750",
554            data_dir.to_string_lossy().as_ref(),
555        ],
556    )?;
557
558    std::fs::write(config.unit_path(), render_systemd_unit(config))
559        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
560
561    run_command("systemctl", ["daemon-reload"])?;
562    run_command(
563        "systemctl",
564        [
565            "enable",
566            "--now",
567            format!("{}.service", config.service_name).as_str(),
568        ],
569    )?;
570
571    Ok(())
572}
573
574/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
575/// identical so callers compile on every platform; surface a clear
576/// error at runtime. Windows/macOS service-manager integration is a
577/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
578#[cfg(not(target_os = "linux"))]
579pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
580    Err("systemd install is Linux-only — use sc.exe (Windows) or \
581         launchd (macOS) to install the service manually using the \
582         unit printed by `red service print-unit`"
583        .to_string())
584}
585
586#[cfg(target_os = "linux")]
587fn ensure_root() -> Result<(), String> {
588    let output = Command::new("id")
589        .arg("-u")
590        .output()
591        .map_err(|err| format!("failed to determine current uid: {err}"))?;
592    if !output.status.success() {
593        return Err("failed to determine current uid".to_string());
594    }
595    let uid = String::from_utf8_lossy(&output.stdout);
596    if uid.trim() != "0" {
597        return Err("run this command as root (sudo)".to_string());
598    }
599    Ok(())
600}
601
602#[cfg(target_os = "linux")]
603fn ensure_command_available(command: &str) -> Result<(), String> {
604    let status = Command::new("sh")
605        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
606        .status()
607        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
608    if status.success() {
609        Ok(())
610    } else {
611        Err(format!("required command not found: {command}"))
612    }
613}
614
615#[cfg(target_os = "linux")]
616fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
617    let metadata = std::fs::metadata(path)
618        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
619    #[cfg(unix)]
620    {
621        use std::os::unix::fs::PermissionsExt;
622        if metadata.permissions().mode() & 0o111 == 0 {
623            return Err(format!("binary is not executable: {}", path.display()));
624        }
625    }
626    #[cfg(not(unix))]
627    {
628        if !metadata.is_file() {
629            return Err(format!("binary is not a file: {}", path.display()));
630        }
631    }
632    Ok(())
633}
634
635#[cfg(target_os = "linux")]
636fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
637    Command::new(program)
638        .args(args)
639        .status()
640        .map(|status| status.success())
641        .map_err(|err| format!("failed to run {program}: {err}"))
642}
643
644#[cfg(target_os = "linux")]
645fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
646    let output = Command::new(program)
647        .args(args)
648        .output()
649        .map_err(|err| format!("failed to run {program}: {err}"))?;
650    if output.status.success() {
651        return Ok(());
652    }
653
654    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
655    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
656    let detail = if !stderr.is_empty() {
657        stderr
658    } else if !stdout.is_empty() {
659        stdout
660    } else {
661        format!("exit status {}", output.status)
662    };
663    Err(format!("{program} failed: {detail}"))
664}
665
666pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
667    let has_any = config.router_bind_addr.is_some()
668        || config.grpc_bind_addr.is_some()
669        || config.http_bind_addr.is_some()
670        || config.wire_bind_addr.is_some();
671    if !has_any {
672        return Err("at least one server bind address must be configured".into());
673    }
674    let thread_name = if config.router_bind_addr.is_some() {
675        "red-server-router"
676    } else {
677        match (
678            config.grpc_bind_addr.is_some(),
679            config.http_bind_addr.is_some(),
680        ) {
681            (true, true) => "red-server-dual",
682            (true, false) => "red-server-grpc",
683            (false, true) => "red-server-http",
684            (false, false) => "red-server-wire",
685        }
686    };
687
688    let handle = thread::Builder::new()
689        .name(thread_name.into())
690        .stack_size(8 * 1024 * 1024)
691        .spawn(move || run_configured_servers(config))
692        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
693
694    match handle.join() {
695        Ok(result) => result,
696        Err(_) => Err("server thread panicked".to_string()),
697    }
698}
699
700fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
701    let mut parts = vec![
702        config.binary_path.display().to_string(),
703        "server".to_string(),
704        "--path".to_string(),
705        config.data_path.display().to_string(),
706    ];
707
708    if let Some(bind_addr) = &config.router_bind_addr {
709        parts.push("--bind".to_string());
710        parts.push(bind_addr.clone());
711    } else if let Some(bind_addr) = &config.grpc_bind_addr {
712        parts.push("--grpc-bind".to_string());
713        parts.push(bind_addr.clone());
714    }
715    if let Some(bind_addr) = &config.http_bind_addr {
716        parts.push("--http-bind".to_string());
717        parts.push(bind_addr.clone());
718    }
719
720    parts.join(" ")
721}
722
723pub fn probe_listener(target: &str, timeout: Duration) -> bool {
724    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
725        Ok(addresses) => addresses.collect(),
726        Err(_) => return false,
727    };
728
729    addresses
730        .into_iter()
731        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
732}
733
734#[inline(never)]
735fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
736    // Phase 6 logging is initialised inside each runner once the
737    // runtime is open — see `build_runtime_and_auth_store`. Going
738    // after DB open lets us read `red.logging.*` config keys out of
739    // the persistent red_config store and merge them with the CLI
740    // flags (flag > red_config > built-in default).
741    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
742        return run_routed_server(config, router_bind_addr);
743    }
744
745    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
746        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
747            run_dual_server(config, grpc_bind_addr, http_bind_addr)
748        }
749        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
750        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
751        (None, None) => {
752            // Wire-only mode
753            if let Some(wire_addr) = config.wire_bind_addr.clone() {
754                run_wire_only_server(config, wire_addr)
755            } else {
756                Err("at least one server bind address must be configured".to_string())
757            }
758        }
759    }
760}
761
762/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
763///
764/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
765/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
766/// grace window. SIGKILL after that grace window is the OS's
767/// fallback; we are responsible for finishing in time.
768///
769/// Spawns a tokio task on the caller's runtime that:
770///   1. Awaits the first of SIGTERM / SIGINT.
771///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
772///      runtime moves to `Stopped` on its own; this just runs the
773///      flush + checkpoint pipeline and (optionally) a final backup.
774///   3. Calls `std::process::exit(0)` so the orchestrator sees a
775///      clean exit code.
776///
777/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
778/// configured) toggles step 3's backup branch. The flush + checkpoint
779/// always run.
780///
781/// Idempotent across signal storms — `graceful_shutdown` returns the
782/// cached report on second call, but we exit on the first one
783/// regardless, so the second SIGTERM never reaches the handler.
784async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
785    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
786        .ok()
787        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
788        .unwrap_or(true);
789
790    #[cfg(unix)]
791    {
792        use tokio::signal::unix::{signal, SignalKind};
793
794        let mut sigterm = match signal(SignalKind::terminate()) {
795            Ok(s) => s,
796            Err(err) => {
797                tracing::warn!(
798                    error = %err,
799                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
800                );
801                return;
802            }
803        };
804        let mut sigint = match signal(SignalKind::interrupt()) {
805            Ok(s) => s,
806            Err(err) => {
807                tracing::warn!(error = %err, "could not install SIGINT handler");
808                return;
809            }
810        };
811        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
812        // their `_FILE` companions without restarting the process.
813        // Useful for credential rotation pipelines (kubectl create
814        // secret + kubectl rollout restart, but for systemd / Nomad /
815        // bare-metal where rolling the process is heavier).
816        let mut sighup = match signal(SignalKind::hangup()) {
817            Ok(s) => Some(s),
818            Err(err) => {
819                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
820                None
821            }
822        };
823
824        let reload_runtime = runtime.clone();
825        tokio::spawn(async move {
826            loop {
827                let signal_name = match &mut sighup {
828                    Some(hup) => tokio::select! {
829                        _ = sigterm.recv() => "SIGTERM",
830                        _ = sigint.recv() => "SIGINT",
831                        _ = hup.recv() => "SIGHUP",
832                    },
833                    None => tokio::select! {
834                        _ = sigterm.recv() => "SIGTERM",
835                        _ = sigint.recv() => "SIGINT",
836                    },
837                };
838
839                if signal_name == "SIGHUP" {
840                    handle_sighup_reload(&reload_runtime);
841                    continue; // stay alive; SIGHUP isn't a shutdown
842                }
843
844                tracing::info!(
845                    signal = signal_name,
846                    "lifecycle signal received; shutting down"
847                );
848                match runtime.graceful_shutdown(backup_on_shutdown) {
849                    Ok(report) => {
850                        tracing::info!(
851                            duration_ms = report.duration_ms,
852                            flushed_wal = report.flushed_wal,
853                            final_checkpoint = report.final_checkpoint,
854                            backup_uploaded = report.backup_uploaded,
855                            "graceful shutdown complete"
856                        );
857                    }
858                    Err(err) => {
859                        tracing::error!(error = %err, "graceful shutdown failed");
860                        // Issue #205 — graceful shutdown returning Err
861                        // means the runtime is exiting without a clean
862                        // flush/checkpoint. Operator-grade event so the
863                        // operator notices the dirty exit even when the
864                        // process restarts before they read tracing logs.
865                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
866                            reason: format!("graceful shutdown failed: {err}"),
867                        }
868                        .emit_global();
869                    }
870                }
871                std::process::exit(0);
872            }
873        });
874    }
875
876    #[cfg(not(unix))]
877    {
878        tokio::spawn(async move {
879            let interrupted = tokio::signal::ctrl_c().await;
880            if let Err(err) = interrupted {
881                tracing::warn!(error = %err, "could not install Ctrl+C handler");
882                return;
883            }
884
885            tracing::info!(
886                signal = "Ctrl+C",
887                "lifecycle signal received; shutting down"
888            );
889            match runtime.graceful_shutdown(backup_on_shutdown) {
890                Ok(report) => {
891                    tracing::info!(
892                        duration_ms = report.duration_ms,
893                        flushed_wal = report.flushed_wal,
894                        final_checkpoint = report.final_checkpoint,
895                        backup_uploaded = report.backup_uploaded,
896                        "graceful shutdown complete"
897                    );
898                }
899                Err(err) => {
900                    tracing::error!(error = %err, "graceful shutdown failed");
901                }
902            }
903            std::process::exit(0);
904        });
905    }
906}
907
908/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
909/// vars. Today this only refreshes the audit log + records the
910/// reload event; the runtime modules that hold cached secret
911/// material (S3 backend credentials, admin token, JWT keys) read
912/// the env on each request so the next call after SIGHUP picks up
913/// the new file contents automatically. A future extension can
914/// punch through to the LeaseStore / AuthStore for in-memory
915/// caches that don't re-read on each call.
916fn handle_sighup_reload(runtime: &RedDBRuntime) {
917    let now_ms = std::time::SystemTime::now()
918        .duration_since(std::time::UNIX_EPOCH)
919        .map(|d| d.as_millis() as u64)
920        .unwrap_or(0);
921    tracing::info!(
922        target: "reddb::secrets",
923        ts_unix_ms = now_ms,
924        "SIGHUP received; secrets will be re-read from *_FILE on next access"
925    );
926    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
927    // every emission goes through the typed-field guard. The
928    // arguments here are static, but using the typed entry point
929    // keeps the discipline uniform across call sites.
930    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
931    runtime.audit_log().record_event(
932        AuditEvent::builder("config/sighup_reload")
933            .source(AuditAuthSource::System)
934            .resource("secrets")
935            .outcome(Outcome::Success)
936            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
937            .build(),
938    );
939}
940
941#[inline(never)]
942fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
943    let workers = config.workers;
944    let cli_telemetry = config.telemetry.clone();
945    let db_options = config.to_db_options();
946    let rt_config = detect_runtime_config();
947    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
948    let (runtime, auth_store, _telemetry_guard) =
949        build_runtime_and_auth_store(db_options, cli_telemetry)?;
950
951    spawn_admin_metrics_listeners(&runtime, &auth_store);
952
953    let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
954        .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
955    let http_backend = http_listener
956        .local_addr()
957        .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
958    let http_server = build_http_server(
959        runtime.clone(),
960        auth_store.clone(),
961        http_backend.to_string(),
962    );
963    let http_handle = http_server.serve_in_background_on(http_listener);
964
965    thread::sleep(Duration::from_millis(100));
966    if http_handle.is_finished() {
967        return match http_handle.join() {
968            Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
969            Ok(Err(err)) => Err(err.to_string()),
970            Err(_) => Err("HTTP backend thread panicked".to_string()),
971        };
972    }
973
974    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
975        .enable_all()
976        .worker_threads(worker_threads)
977        .thread_stack_size(rt_config.stack_size)
978        .build()
979        .map_err(|err| format!("tokio runtime: {err}"))?;
980
981    let signal_runtime = runtime.clone();
982    tokio_runtime.block_on(async move {
983        spawn_lifecycle_signal_handler(signal_runtime).await;
984        let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
985            .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
986        let grpc_backend = grpc_listener
987            .local_addr()
988            .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
989        let grpc_server = RedDBGrpcServer::with_options(
990            runtime.clone(),
991            GrpcServerOptions {
992                bind_addr: grpc_backend.to_string(),
993                tls: None,
994            },
995            auth_store,
996        );
997        tokio::spawn(async move {
998            if let Err(err) = grpc_server.serve_on(grpc_listener).await {
999                tracing::error!(err = %err, "gRPC backend error");
1000            }
1001        });
1002
1003        let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1004            .await
1005            .map_err(|err| format!("bind internal wire listener: {err}"))?;
1006        let wire_backend = wire_listener
1007            .local_addr()
1008            .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1009        let wire_rt = Arc::new(runtime);
1010        tokio::spawn(async move {
1011            if let Err(err) =
1012                crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1013                    .await
1014            {
1015                tracing::error!(err = %err, "redwire backend error");
1016            }
1017        });
1018
1019        tracing::info!(
1020            bind = %router_bind_addr,
1021            cpus = rt_config.available_cpus,
1022            workers = worker_threads,
1023            "router bootstrapping"
1024        );
1025        serve_tcp_router(TcpProtocolRouterConfig {
1026            bind_addr: router_bind_addr,
1027            grpc_backend,
1028            http_backend,
1029            wire_backend,
1030        })
1031        .await
1032        .map_err(|err| err.to_string())
1033    })
1034}
1035
1036/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1037fn spawn_wire_listeners(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1038    // Plaintext RedWire — TCP or Unix socket
1039    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1040        let wire_rt = Arc::new(runtime.clone());
1041        tokio::spawn(async move {
1042            // Address starting with `unix://` or an absolute filesystem path
1043            // switches to Unix domain sockets.
1044            #[cfg(unix)]
1045            {
1046                if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1047                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1048                        &wire_addr, wire_rt,
1049                    )
1050                    .await
1051                    {
1052                        tracing::error!(err = %e, "redwire unix listener error");
1053                    }
1054                    return;
1055                }
1056            }
1057            let cfg = crate::wire::RedWireConfig {
1058                bind_addr: wire_addr,
1059                auth_store: None,
1060                oauth: None,
1061            };
1062            if let Err(e) = crate::wire::start_redwire_listener(cfg, wire_rt).await {
1063                tracing::error!(err = %e, "redwire listener error");
1064            }
1065        });
1066    }
1067
1068    // RedWire over TLS
1069    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1070        let tls_config = resolve_wire_tls_config(config);
1071        match tls_config {
1072            Ok(tls_cfg) => {
1073                let wire_rt = Arc::new(runtime.clone());
1074                tokio::spawn(async move {
1075                    if let Err(e) =
1076                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1077                            .await
1078                    {
1079                        tracing::error!(err = %e, "redwire+tls listener error");
1080                    }
1081                });
1082            }
1083            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1084        }
1085    }
1086}
1087
1088/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1089///
1090/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1091/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1092/// alongside the native wire listener; the two transports do not
1093/// share a port.
1094fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1095    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1096        let rt = Arc::new(runtime.clone());
1097        tokio::spawn(async move {
1098            let cfg = crate::wire::PgWireConfig {
1099                bind_addr: pg_addr,
1100                ..Default::default()
1101            };
1102            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1103                tracing::error!(err = %e, "pg wire listener error");
1104            }
1105        });
1106    }
1107}
1108
1109/// Resolve gRPC TLS material into PEM bytes.
1110///
1111/// Lookup order, in priority:
1112///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1113///      passed via CLI/env). Both must be present together.
1114///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1115///      to the data dir. Refuses to run without the env flag so an
1116///      operator can't accidentally ship a dev cert in prod.
1117///
1118/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1119/// turning the listener into a mutual-TLS endpoint that requires
1120/// every client to present a chain that anchors at one of the CAs
1121/// in the bundle.
1122fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1123    use crate::utils::secret_file::expand_file_env;
1124
1125    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1126    // surface as warnings; the fallback path (explicit cert paths) will
1127    // cover the common case.
1128    for var in [
1129        "REDDB_GRPC_TLS_CERT",
1130        "REDDB_GRPC_TLS_KEY",
1131        "REDDB_GRPC_TLS_CLIENT_CA",
1132    ] {
1133        if let Err(err) = expand_file_env(var) {
1134            tracing::warn!(
1135                target: "reddb::secrets",
1136                env = %var,
1137                err = %err,
1138                "could not expand *_FILE companion for gRPC TLS"
1139            );
1140        }
1141    }
1142
1143    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1144        (Some(cert), Some(key)) => {
1145            let cert_pem = std::fs::read(cert)
1146                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1147            let key_pem =
1148                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1149            (cert_pem, key_pem)
1150        }
1151        _ => {
1152            // No explicit material → only proceed when dev-mode is on.
1153            let dev = std::env::var("RED_GRPC_TLS_DEV")
1154                .ok()
1155                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1156                .unwrap_or(false);
1157            if !dev {
1158                return Err("gRPC TLS configured but no cert/key supplied — set \
1159                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1160                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1161                    .to_string());
1162            }
1163            let dir = config
1164                .path
1165                .as_ref()
1166                .and_then(|p| p.parent())
1167                .map(PathBuf::from)
1168                .unwrap_or_else(|| PathBuf::from("."));
1169            let (cert_pem_str, key_pem_str) =
1170                crate::wire::tls::generate_self_signed_cert("localhost")
1171                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1172
1173            // Constant-time-friendly fingerprint to stderr so the
1174            // operator can pin a client trust store. We log via
1175            // `tracing::warn!` so it stands out next to ordinary
1176            // listener-online events.
1177            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1178            tracing::warn!(
1179                target: "reddb::security",
1180                transport = "grpc",
1181                cert_sha256 = %fp,
1182                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1183                 DO NOT use in production"
1184            );
1185            // Persist so that restarts reuse the same identity.
1186            let cert_path = dir.join("grpc-tls-cert.pem");
1187            let key_path = dir.join("grpc-tls-key.pem");
1188            if !cert_path.exists() || !key_path.exists() {
1189                let _ = std::fs::create_dir_all(&dir);
1190                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1191                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1192                std::fs::write(&key_path, key_pem_str.as_bytes())
1193                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1194                #[cfg(unix)]
1195                {
1196                    use std::os::unix::fs::PermissionsExt;
1197                    let _ =
1198                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1199                }
1200            }
1201            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1202        }
1203    };
1204
1205    let client_ca_pem = match &config.grpc_tls_client_ca {
1206        Some(path) => Some(
1207            std::fs::read(path)
1208                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1209        ),
1210        None => None,
1211    };
1212
1213    Ok(crate::GrpcTlsOptions {
1214        cert_pem,
1215        key_pem,
1216        client_ca_pem,
1217    })
1218}
1219
1220/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1221/// configured. Logs and continues on TLS-config errors so the plain
1222/// listener stays up; this matches the wire-listener pattern.
1223fn spawn_grpc_tls_listener_if_configured(
1224    config: &ServerCommandConfig,
1225    runtime: RedDBRuntime,
1226    auth_store: Arc<AuthStore>,
1227) {
1228    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1229        return;
1230    };
1231    let tls_opts = match resolve_grpc_tls_options(config) {
1232        Ok(opts) => opts,
1233        Err(err) => {
1234            tracing::error!(
1235                target: "reddb::security",
1236                transport = "grpc",
1237                err = %err,
1238                "gRPC TLS config error; TLS listener will not start"
1239            );
1240            return;
1241        }
1242    };
1243    tokio::spawn(async move {
1244        let server = RedDBGrpcServer::with_options(
1245            runtime,
1246            GrpcServerOptions {
1247                bind_addr: tls_bind.clone(),
1248                tls: Some(tls_opts),
1249            },
1250            auth_store,
1251        );
1252        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1253        if let Err(err) = server.serve().await {
1254            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1255        }
1256    });
1257}
1258
1259/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1260/// lines. Constant-time hash; no token contents leave this fn.
1261fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1262    use sha2::{Digest, Sha256};
1263    let mut h = Sha256::new();
1264    h.update(pem);
1265    let d = h.finalize();
1266    let mut buf = String::with_capacity(64);
1267    for b in d.iter() {
1268        buf.push_str(&format!("{b:02x}"));
1269    }
1270    buf
1271}
1272
1273/// Resolve TLS config: use provided cert/key or auto-generate.
1274fn resolve_wire_tls_config(
1275    config: &ServerCommandConfig,
1276) -> Result<crate::wire::WireTlsConfig, String> {
1277    match (&config.wire_tls_cert, &config.wire_tls_key) {
1278        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1279            cert_path: cert.clone(),
1280            key_path: key.clone(),
1281        }),
1282        _ => {
1283            // Auto-generate self-signed cert
1284            let dir = config
1285                .path
1286                .as_ref()
1287                .and_then(|p| p.parent())
1288                .map(PathBuf::from)
1289                .unwrap_or_else(|| PathBuf::from("."));
1290            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1291        }
1292    }
1293}
1294
1295#[inline(never)]
1296fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1297    let rt_config = detect_runtime_config();
1298    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1299    let cli_telemetry = config.telemetry.clone();
1300    let db_options = config.to_db_options();
1301
1302    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1303        .enable_all()
1304        .worker_threads(workers)
1305        .thread_stack_size(rt_config.stack_size)
1306        .build()
1307        .map_err(|err| format!("tokio runtime: {err}"))?;
1308
1309    // Guard lives on the outer thread's stack so it outlives the
1310    // tokio runtime — dropping it only after the listener returns
1311    // flushes the file log writer.
1312    let (runtime, _auth_store, _telemetry_guard) =
1313        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1314    let signal_runtime = runtime.clone();
1315    tokio_runtime.block_on(async move {
1316        spawn_lifecycle_signal_handler(signal_runtime).await;
1317        let wire_rt = Arc::new(runtime);
1318        let cfg = crate::wire::RedWireConfig {
1319            bind_addr: wire_addr,
1320            auth_store: None,
1321            oauth: None,
1322        };
1323        crate::wire::start_redwire_listener(cfg, wire_rt)
1324            .await
1325            .map_err(|e| e.to_string())
1326    })
1327}
1328
1329#[inline(never)]
1330fn build_runtime_and_auth_store(
1331    db_options: RedDBOptions,
1332    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1333) -> Result<
1334    (
1335        RedDBRuntime,
1336        Arc<AuthStore>,
1337        Option<crate::telemetry::TelemetryGuard>,
1338    ),
1339    String,
1340> {
1341    // Return the TelemetryGuard so server runners can bind it for
1342    // their full lifetime. Dropping the guard tears down the
1343    // non-blocking log writer thread and, because that writer is
1344    // built with `.lossy(true)`, any subsequent log event routed to
1345    // the file sink is silently dropped — so callers MUST keep the
1346    // returned `Option<TelemetryGuard>` alive until shutdown.
1347    build_runtime_with_telemetry(db_options, cli_telemetry)
1348}
1349
1350/// Open the runtime, initialise structured logging from merged CLI +
1351/// `red_config` settings, and return a guard the caller must keep
1352/// alive for the server lifetime (drop flushes pending log writes).
1353///
1354/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1355/// in red_config, beats the built-in default. A CLI-flag value of
1356/// `None` / empty means "inherit from config or default" — never
1357/// "disable". The one exception is `--no-log-file` which forces
1358/// `log_dir = None` regardless of config.
1359pub(crate) fn build_runtime_with_telemetry(
1360    db_options: RedDBOptions,
1361    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1362) -> Result<
1363    (
1364        RedDBRuntime,
1365        Arc<AuthStore>,
1366        Option<crate::telemetry::TelemetryGuard>,
1367    ),
1368    String,
1369> {
1370    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1371        // Issue #205 — runtime construction failure is the canonical
1372        // StartupFailed phase. The audit sink isn't installed yet
1373        // (it would have been registered inside `with_options`), so
1374        // the emit falls through to tracing+eprintln only — operator
1375        // still sees it on stderr.
1376        let msg = err.to_string();
1377        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1378            phase: "runtime_construction".to_string(),
1379            error: msg.clone(),
1380        }
1381        .emit_global();
1382        msg
1383    })?;
1384
1385    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1386    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1387    // operator asked for a fence; running unfenced would silently
1388    // expose split-brain risk.
1389    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1390        let msg = err.to_string();
1391        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1392            phase: "lease_loop".to_string(),
1393            error: msg.clone(),
1394        }
1395        .emit_global();
1396        msg
1397    })?;
1398
1399    // #213 — edge-triggered disk-space watchdog. Watches the data
1400    // directory; falls back to polling when fanotify is unavailable
1401    // (non-Linux or unprivileged container).
1402    if let Some(data_path) = db_options.data_path.as_deref() {
1403        let watch_dir = data_path.parent().unwrap_or(data_path);
1404        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1405    }
1406
1407    // #214 — inotify config hot-reload watcher. Watches the config file
1408    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1409    // applies hot-reloadable keys without restart.
1410    {
1411        let config_path = crate::runtime::config_overlay::config_file_path();
1412        let store = runtime.db().store();
1413        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1414    }
1415
1416    // Phase 6 logging: merge red_config overrides onto the CLI-built
1417    // telemetry config, then install the global subscriber.
1418    let merged = merge_telemetry_with_config(
1419        cli_telemetry
1420            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1421        &runtime,
1422    );
1423    let telemetry_guard = crate::telemetry::init(merged);
1424
1425    let auth_store =
1426        if db_options.auth.vault_enabled {
1427            let pager =
1428                runtime.db().store().pager().cloned().ok_or_else(|| {
1429                    "vault requires a paged database (persistent mode)".to_string()
1430                })?;
1431            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1432                .map_err(|err| err.to_string())?;
1433            Arc::new(store)
1434        } else {
1435            Arc::new(AuthStore::new(db_options.auth.clone()))
1436        };
1437    auth_store.bootstrap_from_env();
1438
1439    // Background session purge (every 5 minutes)
1440    {
1441        let store = Arc::clone(&auth_store);
1442        std::thread::Builder::new()
1443            .name("reddb-session-purge".into())
1444            .spawn(move || loop {
1445                std::thread::sleep(std::time::Duration::from_secs(300));
1446                store.purge_expired_sessions();
1447            })
1448            .ok();
1449    }
1450
1451    Ok((runtime, auth_store, telemetry_guard))
1452}
1453
1454/// Read `red.logging.*` keys from the persistent config store and
1455/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
1456/// explicit CLI flag > red_config > built-in default.
1457///
1458/// The "was a flag passed" signal comes from the `*_explicit` bools
1459/// on `TelemetryConfig`, populated by the CLI parser. This replaces
1460/// an earlier equality-to-default heuristic that silently dropped
1461/// config whenever the CLI-derived default diverged from
1462/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
1463/// non-TTY `format`) and that silently overrode `--no-log-file`.
1464fn merge_telemetry_with_config(
1465    mut cli: crate::telemetry::TelemetryConfig,
1466    runtime: &RedDBRuntime,
1467) -> crate::telemetry::TelemetryConfig {
1468    use crate::storage::schema::Value;
1469
1470    let store = runtime.db().store();
1471
1472    if !cli.level_explicit {
1473        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1474            cli.level_filter = v.to_string();
1475        }
1476    }
1477    if !cli.format_explicit {
1478        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1479            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1480                cli.format = parsed;
1481            }
1482        }
1483    }
1484    if !cli.rotation_keep_days_explicit {
1485        match store.get_config("red.logging.keep_days") {
1486            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1487                cli.rotation_keep_days = n as u16
1488            }
1489            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1490                cli.rotation_keep_days = n as u16
1491            }
1492            Some(Value::Text(v)) => {
1493                if let Ok(n) = v.parse::<u16>() {
1494                    cli.rotation_keep_days = n;
1495                }
1496            }
1497            _ => {}
1498        }
1499    }
1500    if !cli.file_prefix_explicit {
1501        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1502            if !v.is_empty() {
1503                cli.file_prefix = v.to_string();
1504            }
1505        }
1506    }
1507    // --no-log-file is a kill-switch: config cannot resurrect the
1508    // file sink. Explicit --log-dir also wins.
1509    if !cli.log_dir_explicit && !cli.log_file_disabled {
1510        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1511            if !v.is_empty() {
1512                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1513            }
1514        }
1515    }
1516
1517    cli
1518}
1519
1520#[cfg(test)]
1521mod telemetry_merge_tests {
1522    use super::*;
1523    use crate::telemetry::{LogFormat, TelemetryConfig};
1524
1525    fn fresh_runtime() -> RedDBRuntime {
1526        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1527    }
1528
1529    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1530        runtime
1531            .db()
1532            .store()
1533            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1534    }
1535
1536    fn cli_base() -> TelemetryConfig {
1537        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
1538        // log_dir = Some(...), format = Json. Nothing marked explicit.
1539        TelemetryConfig {
1540            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1541            format: LogFormat::Json,
1542            ..Default::default()
1543        }
1544    }
1545
1546    #[test]
1547    fn config_log_dir_promoted_when_flag_absent() {
1548        let runtime = fresh_runtime();
1549        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1550        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1551        assert_eq!(
1552            merged.log_dir.as_deref(),
1553            Some(std::path::Path::new("/var/log/reddb"))
1554        );
1555    }
1556
1557    #[test]
1558    fn explicit_log_dir_wins_over_config() {
1559        let runtime = fresh_runtime();
1560        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1561        let mut cli = cli_base();
1562        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1563        cli.log_dir_explicit = true;
1564        let merged = merge_telemetry_with_config(cli, &runtime);
1565        assert_eq!(
1566            merged.log_dir.as_deref(),
1567            Some(std::path::Path::new("/custom/dir"))
1568        );
1569    }
1570
1571    #[test]
1572    fn no_log_file_beats_config_log_dir() {
1573        let runtime = fresh_runtime();
1574        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1575        let mut cli = cli_base();
1576        cli.log_dir = None;
1577        cli.log_file_disabled = true;
1578        let merged = merge_telemetry_with_config(cli, &runtime);
1579        assert!(
1580            merged.log_dir.is_none(),
1581            "--no-log-file must veto config dir"
1582        );
1583    }
1584
1585    #[test]
1586    fn config_format_promoted_on_non_tty_default() {
1587        // On non-TTY, default_telemetry_for_path yields format=Json even
1588        // though TelemetryConfig::default() is Pretty. The old equality
1589        // check silently dropped config here.
1590        let runtime = fresh_runtime();
1591        set_str(&runtime, "red.logging.format", "pretty");
1592        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1593        assert_eq!(merged.format, LogFormat::Pretty);
1594    }
1595
1596    #[test]
1597    fn explicit_format_wins_over_config() {
1598        let runtime = fresh_runtime();
1599        set_str(&runtime, "red.logging.format", "pretty");
1600        let mut cli = cli_base();
1601        cli.format = LogFormat::Json;
1602        cli.format_explicit = true;
1603        let merged = merge_telemetry_with_config(cli, &runtime);
1604        assert_eq!(merged.format, LogFormat::Json);
1605    }
1606}
1607
1608#[inline(never)]
1609fn build_http_server(
1610    runtime: RedDBRuntime,
1611    auth_store: Arc<AuthStore>,
1612    bind_addr: String,
1613) -> RedDBServer {
1614    RedDBServer::with_options(
1615        runtime,
1616        ServerOptions {
1617            bind_addr,
1618            ..ServerOptions::default()
1619        },
1620    )
1621    .with_auth(auth_store)
1622}
1623
1624/// PLAN.md Phase 6.2 — build a listener that only serves
1625/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
1626/// when the env var has no host (loopback-only by default per spec).
1627#[inline(never)]
1628fn build_admin_only_server(
1629    runtime: RedDBRuntime,
1630    auth_store: Arc<AuthStore>,
1631    bind_addr: String,
1632) -> RedDBServer {
1633    RedDBServer::with_options(
1634        runtime,
1635        ServerOptions {
1636            bind_addr,
1637            surface: crate::server::ServerSurface::AdminOnly,
1638            ..ServerOptions::default()
1639        },
1640    )
1641    .with_auth(auth_store)
1642}
1643
1644/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
1645/// + `/health/*`. Suitable for Prometheus scrape ports that may be
1646///   exposed wider than the admin port.
1647#[inline(never)]
1648fn build_metrics_only_server(
1649    runtime: RedDBRuntime,
1650    auth_store: Arc<AuthStore>,
1651    bind_addr: String,
1652) -> RedDBServer {
1653    RedDBServer::with_options(
1654        runtime,
1655        ServerOptions {
1656            bind_addr,
1657            surface: crate::server::ServerSurface::MetricsOnly,
1658            ..ServerOptions::default()
1659        },
1660    )
1661    .with_auth(auth_store)
1662}
1663
1664/// Spawn dedicated admin / metrics listeners when the operator set
1665/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
1666/// unset the existing listener keeps serving everything (back-compat).
1667fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
1668    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
1669        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1670        let _ = server.serve_in_background();
1671        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
1672    }
1673    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
1674        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
1675        let _ = server.serve_in_background();
1676        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
1677    }
1678}
1679
1680#[inline(never)]
1681fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1682    let cli_telemetry = config.telemetry.clone();
1683    let (runtime, auth_store, _telemetry_guard) =
1684        build_runtime_and_auth_store(config.to_db_options(), cli_telemetry)?;
1685    spawn_admin_metrics_listeners(&runtime, &auth_store);
1686    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1687    let server = build_http_server(runtime, auth_store, bind_addr.clone());
1688    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
1689    server.serve().map_err(|err| err.to_string())
1690}
1691
1692/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
1693/// rustls-terminated listener alongside the plain HTTP server. Cert
1694/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
1695///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
1696///   is auto-generated next to the data directory (refused otherwise).
1697fn spawn_http_tls_listener(
1698    config: &ServerCommandConfig,
1699    runtime: &RedDBRuntime,
1700    auth_store: &Arc<AuthStore>,
1701) -> Result<(), String> {
1702    let Some(addr) = config.http_tls_bind_addr.clone() else {
1703        return Ok(());
1704    };
1705
1706    let tls_config = resolve_http_tls_config(config)?;
1707    let server_config = crate::server::tls::build_server_config(&tls_config)
1708        .map_err(|err| format!("HTTP TLS: {err}"))?;
1709
1710    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
1711    let _handle = server.serve_tls_in_background(server_config);
1712    tracing::info!(
1713        transport = "https",
1714        bind = %addr,
1715        mtls = %tls_config.client_ca_path.is_some(),
1716        "TLS listener online"
1717    );
1718    Ok(())
1719}
1720
1721/// Resolve the HTTP TLS config from CLI / env / dev defaults.
1722fn resolve_http_tls_config(
1723    config: &ServerCommandConfig,
1724) -> Result<crate::server::tls::HttpTlsConfig, String> {
1725    match (&config.http_tls_cert, &config.http_tls_key) {
1726        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
1727            cert_path: cert.clone(),
1728            key_path: key.clone(),
1729            client_ca_path: config.http_tls_client_ca.clone(),
1730        }),
1731        (None, None) => {
1732            // Dev-mode auto-generate next to the data directory.
1733            let dir = config
1734                .path
1735                .as_ref()
1736                .and_then(|p| p.parent().map(std::path::PathBuf::from))
1737                .unwrap_or_else(|| std::path::PathBuf::from("."));
1738            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
1739                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
1740            Ok(crate::server::tls::HttpTlsConfig {
1741                cert_path: auto.cert_path,
1742                key_path: auto.key_path,
1743                client_ca_path: config.http_tls_client_ca.clone(),
1744            })
1745        }
1746        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
1747    }
1748}
1749
1750#[inline(never)]
1751fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
1752    let workers = config.workers;
1753    let cli_telemetry = config.telemetry.clone();
1754    let db_options = config.to_db_options();
1755    let rt_config = detect_runtime_config();
1756
1757    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1758
1759    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1760        .enable_all()
1761        .worker_threads(worker_threads)
1762        .thread_stack_size(rt_config.stack_size)
1763        .build()
1764        .map_err(|err| format!("tokio runtime: {err}"))?;
1765
1766    // Guard lives on the outer stack so it outlives the tokio runtime.
1767    let (runtime, auth_store, _telemetry_guard) =
1768        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1769    let signal_runtime = runtime.clone();
1770    tokio_runtime.block_on(async move {
1771        spawn_lifecycle_signal_handler(signal_runtime).await;
1772        // Start wire protocol listeners (plaintext + TLS)
1773        spawn_wire_listeners(&config, &runtime);
1774
1775        // Start PostgreSQL wire listener when --pg-bind is configured.
1776        spawn_pg_listener(&config, &runtime);
1777
1778        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
1779        // it spawns a separate listener so plaintext + TLS can run
1780        // side-by-side (50051 plain + 50052 TLS, etc.).
1781        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1782
1783        let server = RedDBGrpcServer::with_options(
1784            runtime,
1785            GrpcServerOptions {
1786                bind_addr: bind_addr.clone(),
1787                tls: None,
1788            },
1789            auth_store,
1790        );
1791
1792        tracing::info!(
1793            transport = "grpc",
1794            bind = %bind_addr,
1795            cpus = rt_config.available_cpus,
1796            workers = worker_threads,
1797            "listener online"
1798        );
1799        server.serve().await.map_err(|err| err.to_string())
1800    })
1801}
1802
1803#[inline(never)]
1804fn run_dual_server(
1805    config: ServerCommandConfig,
1806    grpc_bind_addr: String,
1807    http_bind_addr: String,
1808) -> Result<(), String> {
1809    let workers = config.workers;
1810    let wire_bind_addr = config.wire_bind_addr.clone();
1811    let cli_telemetry = config.telemetry.clone();
1812    let db_options = config.to_db_options();
1813    let rt_config = detect_runtime_config();
1814    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1815    let (runtime, auth_store, _telemetry_guard) =
1816        build_runtime_and_auth_store(db_options, cli_telemetry)?;
1817
1818    spawn_admin_metrics_listeners(&runtime, &auth_store);
1819    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
1820
1821    let http_server =
1822        build_http_server(runtime.clone(), auth_store.clone(), http_bind_addr.clone());
1823    let http_handle = http_server.serve_in_background();
1824
1825    thread::sleep(Duration::from_millis(150));
1826    if http_handle.is_finished() {
1827        return match http_handle.join() {
1828            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
1829            Ok(Err(err)) => Err(err.to_string()),
1830            Err(_) => Err("HTTP server thread panicked".to_string()),
1831        };
1832    }
1833
1834    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1835        .enable_all()
1836        .worker_threads(worker_threads)
1837        .thread_stack_size(rt_config.stack_size)
1838        .build()
1839        .map_err(|err| format!("tokio runtime: {err}"))?;
1840
1841    let signal_runtime = runtime.clone();
1842    tokio_runtime.block_on(async move {
1843        spawn_lifecycle_signal_handler(signal_runtime).await;
1844        // Start wire protocol listeners (plaintext + TLS)
1845        spawn_wire_listeners(&config, &runtime);
1846
1847        // Start PostgreSQL wire listener when --pg-bind is configured.
1848        spawn_pg_listener(&config, &runtime);
1849
1850        // Optional TLS gRPC listener — runs alongside the plaintext one.
1851        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
1852
1853        let server = RedDBGrpcServer::with_options(
1854            runtime,
1855            GrpcServerOptions {
1856                bind_addr: grpc_bind_addr.clone(),
1857                tls: None,
1858            },
1859            auth_store,
1860        );
1861
1862        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
1863        tracing::info!(
1864            transport = "grpc",
1865            bind = %grpc_bind_addr,
1866            cpus = rt_config.available_cpus,
1867            workers = worker_threads,
1868            "listener online"
1869        );
1870        server.serve().await.map_err(|err| err.to_string())
1871    })
1872}
1873
1874#[cfg(test)]
1875mod tests {
1876    use super::*;
1877
1878    #[test]
1879    fn render_systemd_unit_contains_expected_execstart() {
1880        let config = SystemdServiceConfig {
1881            service_name: "reddb".to_string(),
1882            binary_path: PathBuf::from("/usr/local/bin/red"),
1883            run_user: "reddb".to_string(),
1884            run_group: "reddb".to_string(),
1885            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1886            router_bind_addr: None,
1887            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1888            http_bind_addr: None,
1889        };
1890
1891        let unit = render_systemd_unit(&config);
1892        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
1893        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
1894    }
1895
1896    #[test]
1897    fn systemd_service_config_derives_paths() {
1898        let config = SystemdServiceConfig {
1899            service_name: "reddb-api".to_string(),
1900            binary_path: PathBuf::from("/usr/local/bin/red"),
1901            run_user: "reddb".to_string(),
1902            run_group: "reddb".to_string(),
1903            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
1904            router_bind_addr: None,
1905            grpc_bind_addr: None,
1906            http_bind_addr: Some("127.0.0.1:5055".to_string()),
1907        };
1908
1909        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
1910        assert_eq!(
1911            config.unit_path(),
1912            PathBuf::from("/etc/systemd/system/reddb-api.service")
1913        );
1914    }
1915
1916    #[test]
1917    fn render_systemd_unit_supports_dual_transport() {
1918        let config = SystemdServiceConfig {
1919            service_name: "reddb".to_string(),
1920            binary_path: PathBuf::from("/usr/local/bin/red"),
1921            run_user: "reddb".to_string(),
1922            run_group: "reddb".to_string(),
1923            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1924            router_bind_addr: None,
1925            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
1926            http_bind_addr: Some("0.0.0.0:5055".to_string()),
1927        };
1928
1929        let unit = render_systemd_unit(&config);
1930        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
1931        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
1932    }
1933
1934    #[test]
1935    fn render_systemd_unit_supports_router_mode() {
1936        let config = SystemdServiceConfig {
1937            service_name: "reddb".to_string(),
1938            binary_path: PathBuf::from("/usr/local/bin/red"),
1939            run_user: "reddb".to_string(),
1940            run_group: "reddb".to_string(),
1941            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
1942            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
1943            grpc_bind_addr: None,
1944            http_bind_addr: None,
1945        };
1946
1947        let unit = render_systemd_unit(&config);
1948        assert!(unit.contains("--bind 127.0.0.1:5050"));
1949        assert!(!unit.contains("--grpc-bind"));
1950        assert!(!unit.contains("--http-bind"));
1951    }
1952}