Skip to main content

reddb_server/
service_cli.rs

1use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
2use std::path::PathBuf;
3use std::process::Command;
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use crate::auth::store::AuthStore;
9use crate::replication::ReplicationConfig;
10use crate::runtime::RedDBRuntime;
11use crate::service_router::{serve_tcp_router, TcpProtocolRouterConfig};
12use crate::{
13    GrpcServerOptions, RedDBGrpcServer, RedDBOptions, RedDBServer, ServerOptions, StorageMode,
14};
15
16pub const DEFAULT_ROUTER_BIND_ADDR: &str = "127.0.0.1:5050";
17
18/// Detect available CPU cores and suggest worker thread count.
19pub fn detect_runtime_config() -> RuntimeConfig {
20    let cpus = thread::available_parallelism()
21        .map(|n| n.get())
22        .unwrap_or(1);
23
24    // Reserve 1 core for OS, use the rest for workers (minimum 1)
25    let suggested_workers = cpus.saturating_sub(1).max(1);
26
27    RuntimeConfig {
28        available_cpus: cpus,
29        suggested_workers,
30        stack_size: 8 * 1024 * 1024, // 16MB default
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct RuntimeConfig {
36    pub available_cpus: usize,
37    pub suggested_workers: usize,
38    pub stack_size: usize,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum ServerTransport {
43    Grpc,
44    Http,
45    Wire,
46}
47
48impl ServerTransport {
49    pub const fn as_str(self) -> &'static str {
50        match self {
51            Self::Grpc => "gRPC",
52            Self::Http => "HTTP",
53            Self::Wire => "wire",
54        }
55    }
56
57    pub const fn default_bind_addr(self) -> &'static str {
58        match self {
59            Self::Grpc => "127.0.0.1:5555",
60            Self::Http => "127.0.0.1:5055",
61            Self::Wire => "127.0.0.1:5050",
62        }
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct ServerCommandConfig {
68    pub path: Option<PathBuf>,
69    pub router_bind_addr: Option<String>,
70    pub router_bind_explicit: bool,
71    pub grpc_bind_addr: Option<String>,
72    pub grpc_bind_explicit: bool,
73    /// TLS-encrypted gRPC bind address. Can run side-by-side with
74    /// `grpc_bind_addr` (e.g. `:50051` plain + `:50052` TLS) or
75    /// stand alone for TLS-only deploys. Defaults to `None`.
76    pub grpc_tls_bind_addr: Option<String>,
77    /// Path to PEM-encoded gRPC server certificate. Resolved through
78    /// `REDDB_GRPC_TLS_CERT` (with `_FILE` companion for k8s secret
79    /// mounts). When `None` and dev-mode is enabled
80    /// (`RED_GRPC_TLS_DEV=1`) the server auto-generates a self-signed
81    /// cert and prints its SHA-256 fingerprint to stderr.
82    pub grpc_tls_cert: Option<PathBuf>,
83    /// Path to PEM-encoded gRPC server private key. Same env-var
84    /// pattern as `grpc_tls_cert`.
85    pub grpc_tls_key: Option<PathBuf>,
86    /// Optional path to a PEM bundle of trust anchors used to verify
87    /// client certificates. When set, the gRPC listener requires
88    /// every client to present a cert that chains to this CA — i.e.
89    /// mutual TLS. When unset, one-way TLS only.
90    pub grpc_tls_client_ca: Option<PathBuf>,
91    pub http_bind_addr: Option<String>,
92    pub http_bind_explicit: bool,
93    /// HTTPS bind address. When set, the HTTP server also serves a
94    /// TLS-terminated listener on this addr. Plain HTTP and HTTPS can
95    /// run side by side (e.g. 8080 plain + 8443 TLS).
96    pub http_tls_bind_addr: Option<String>,
97    /// PEM cert for HTTPS. Reads `REDDB_HTTP_TLS_CERT` / its `_FILE`
98    /// companion when not set explicitly.
99    pub http_tls_cert: Option<PathBuf>,
100    /// PEM key for HTTPS. Reads `REDDB_HTTP_TLS_KEY` / its `_FILE`
101    /// companion when not set explicitly.
102    pub http_tls_key: Option<PathBuf>,
103    /// Optional PEM CA bundle. When set, the HTTPS listener requires
104    /// every client to present a cert that chains to a CA in this
105    /// bundle (mTLS). When unset, plain server-side TLS only.
106    pub http_tls_client_ca: Option<PathBuf>,
107    pub wire_bind_addr: Option<String>,
108    pub wire_bind_explicit: bool,
109    /// TLS-encrypted wire protocol bind address
110    pub wire_tls_bind_addr: Option<String>,
111    /// Path to TLS cert PEM (if None + wire_tls_bind, auto-generate)
112    pub wire_tls_cert: Option<PathBuf>,
113    /// Path to TLS key PEM
114    pub wire_tls_key: Option<PathBuf>,
115    /// PostgreSQL wire protocol bind address (Phase 3.1 PG parity).
116    /// When set the server accepts psql / JDBC / DBeaver clients on
117    /// this port via the v3 protocol. Defaults to None (disabled).
118    pub pg_bind_addr: Option<String>,
119    pub create_if_missing: bool,
120    pub read_only: bool,
121    pub role: String,
122    pub primary_addr: Option<String>,
123    pub vault: bool,
124    /// Override worker thread count (None = auto-detect from CPUs)
125    pub workers: Option<usize>,
126    /// Telemetry config (Phase 6 logging). `None` falls back to the
127    /// built-in default derived from `path` + stderr-only.
128    pub telemetry: Option<crate::telemetry::TelemetryConfig>,
129    /// HTTP handler-pool knobs from the CLI layer (issue #574 slice 5).
130    /// Carries flag and env values; red_config and built-in defaults
131    /// are applied later by [`crate::server::http_limits::resolve_http_limits`]
132    /// once the runtime is open.
133    pub http_limits_cli: crate::server::HttpLimitsCliInput,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub struct TransportListenerState {
138    pub transport: String,
139    pub bind_addr: String,
140    pub explicit: bool,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
144pub struct TransportListenerFailure {
145    pub transport: String,
146    pub bind_addr: String,
147    pub explicit: bool,
148    pub reason: String,
149}
150
151#[derive(Debug, Clone, Default, PartialEq, Eq)]
152pub struct TransportReadiness {
153    pub active: Vec<TransportListenerState>,
154    pub failed: Vec<TransportListenerFailure>,
155}
156
157impl TransportReadiness {
158    fn active(&mut self, transport: &str, bind_addr: &str, explicit: bool) {
159        self.active.push(TransportListenerState {
160            transport: transport.to_string(),
161            bind_addr: bind_addr.to_string(),
162            explicit,
163        });
164    }
165
166    fn failed(&mut self, transport: &str, bind_addr: &str, explicit: bool, reason: String) {
167        self.failed.push(TransportListenerFailure {
168            transport: transport.to_string(),
169            bind_addr: bind_addr.to_string(),
170            explicit,
171            reason,
172        });
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct SystemdServiceConfig {
178    pub service_name: String,
179    pub binary_path: PathBuf,
180    pub run_user: String,
181    pub run_group: String,
182    pub data_path: PathBuf,
183    pub router_bind_addr: Option<String>,
184    pub grpc_bind_addr: Option<String>,
185    pub http_bind_addr: Option<String>,
186}
187
188impl SystemdServiceConfig {
189    pub fn data_dir(&self) -> PathBuf {
190        self.data_path
191            .parent()
192            .map(PathBuf::from)
193            .unwrap_or_else(|| PathBuf::from("."))
194    }
195
196    pub fn unit_path(&self) -> PathBuf {
197        PathBuf::from(format!("/etc/systemd/system/{}.service", self.service_name))
198    }
199}
200
201/// Build a sane default `TelemetryConfig` from a server path when the
202/// caller didn't set one explicitly. Writes rotating logs into the
203/// parent directory of the DB file (or `./logs` for in-memory /
204/// pathless runs). Level defaults to `info`, pretty stderr format.
205pub fn default_telemetry_for_path(
206    path: Option<&std::path::Path>,
207) -> crate::telemetry::TelemetryConfig {
208    let log_dir = match path {
209        Some(p) => p
210            .parent()
211            .map(|parent| parent.join("logs"))
212            .or_else(|| Some(std::path::PathBuf::from("./logs"))),
213        None => None, // in-memory — no file, stderr-only
214    };
215    crate::telemetry::TelemetryConfig {
216        log_dir,
217        file_prefix: "reddb.log".to_string(),
218        level_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
219        format: if std::io::IsTerminal::is_terminal(&std::io::stderr()) {
220            crate::telemetry::LogFormat::Pretty
221        } else {
222            crate::telemetry::LogFormat::Json
223        },
224        rotation_keep_days: 14,
225        service_name: "reddb",
226        // Implicit defaults — no CLI flag has claimed these values yet.
227        level_explicit: false,
228        format_explicit: false,
229        rotation_keep_days_explicit: false,
230        file_prefix_explicit: false,
231        log_dir_explicit: false,
232        log_file_disabled: false,
233    }
234}
235
236/// Metadata key used to thread the parsed backup config from
237/// `to_db_options` down to runner threads. The runner reads it back
238/// (via `runner_backup_intervals`) to spawn the periodic checkpointer
239/// + WAL-flush tasks. Threading through `metadata` avoids extending
240/// `RedDBOptions` with a public field that has no meaning for
241/// library consumers.
242const BACKUP_INTERVAL_META_CHECKPOINT: &str = "red.boot.backup.checkpoint_interval_secs";
243const BACKUP_INTERVAL_META_WAL_FLUSH: &str = "red.boot.backup.wal_flush_interval_secs";
244const BACKUP_KIND_META: &str = "red.boot.backup.backend_kind";
245
246impl ServerCommandConfig {
247    fn to_db_options(&self) -> Result<RedDBOptions, String> {
248        let mut options = match &self.path {
249            Some(path) => RedDBOptions::persistent(path),
250            None => RedDBOptions::in_memory(),
251        };
252
253        options.mode = StorageMode::Persistent;
254        options.create_if_missing = self.create_if_missing;
255        // PLAN.md Phase 4.3 — read_only resolution priority:
256        //   1. CLI flag (`--readonly`) — explicit operator intent.
257        //   2. `RED_READONLY=true` env — orchestrator override.
258        //   3. Persisted `<data>/.runtime-state.json` from a prior
259        //      `POST /admin/readonly` — survives restart.
260        //   4. Default `false`.
261        options.read_only = self.read_only
262            || env_nonempty("RED_READONLY")
263                .or_else(|| env_nonempty("REDDB_READONLY"))
264                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
265                .unwrap_or(false)
266            || self.path.as_ref().is_some_and(|data_path| {
267                crate::server::handlers_admin::load_runtime_readonly(std::path::Path::new(
268                    data_path,
269                ))
270                .unwrap_or(false)
271            });
272
273        options.replication = match self.role.as_str() {
274            "primary" => ReplicationConfig::primary(),
275            "replica" => {
276                let primary_addr = self
277                    .primary_addr
278                    .clone()
279                    .unwrap_or_else(|| "http://127.0.0.1:5555".to_string());
280                // Public-mutation rejection on replicas is enforced by
281                // `WriteGate` at the runtime/RPC boundary (PLAN.md W1).
282                // Leaving `options.read_only = false` keeps the pager
283                // writable so the internal logical-WAL apply path can
284                // ingest records from the primary; WriteGate ensures no
285                // client request reaches storage.
286                ReplicationConfig::replica(primary_addr)
287            }
288            _ => ReplicationConfig::standalone(),
289        };
290
291        if self.vault {
292            options.auth.vault_enabled = true;
293        }
294
295        // Issue #517 — canonical `REDDB_BACKUP_*` contract takes
296        // precedence. On Err, surface the partial-config message so
297        // boot exits non-zero with a clear operator message. On
298        // Ok(None), fall through to the legacy backend-from-env path.
299        match crate::backup_bootstrap::from_env(|k| std::env::var(k).ok()) {
300            Err(msg) => {
301                return Err(format!("backup bootstrap: {msg}"));
302            }
303            Ok(Some(cfg)) => {
304                apply_backup_config(&mut options, &cfg);
305            }
306            Ok(None) => {
307                configure_remote_backend_from_env(&mut options);
308            }
309        }
310
311        Ok(options)
312    }
313
314    pub fn enabled_transports(&self) -> Vec<ServerTransport> {
315        let mut transports = Vec::with_capacity(3);
316        if self.router_bind_addr.is_some() || self.grpc_bind_addr.is_some() {
317            transports.push(ServerTransport::Grpc);
318        }
319        if self.router_bind_addr.is_some() || self.http_bind_addr.is_some() {
320            transports.push(ServerTransport::Http);
321        }
322        if self.router_bind_addr.is_some() || self.wire_bind_addr.is_some() {
323            transports.push(ServerTransport::Wire);
324        }
325        transports
326    }
327}
328
329/// Read an env var, treating empty / whitespace-only as `None`.
330/// Honors the `<NAME>_FILE` convention. Re-exports the shared
331/// `crate::utils::env_with_file_fallback` helper so call sites in
332/// this module can keep their short local name.
333fn env_nonempty(name: &str) -> Option<String> {
334    crate::utils::env_with_file_fallback(name)
335}
336
337fn env_truthy(name: &str) -> bool {
338    env_nonempty(name)
339        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
340        .unwrap_or(false)
341}
342
343/// Apply a parsed [`BackupConfig`] to `options`. Wires the S3
344/// backend via `with_remote_backend` + `with_atomic_remote_backend`
345/// when the `backend-s3` feature is on, stashes intervals + backend
346/// kind in `metadata` so the runner can spawn the periodic tasks,
347/// and emits the startup INFO log required by #517.
348fn apply_backup_config(options: &mut RedDBOptions, cfg: &crate::backup_bootstrap::BackupConfig) {
349    let endpoint_host = endpoint_host(&cfg.endpoint);
350
351    options.metadata.insert(
352        BACKUP_INTERVAL_META_CHECKPOINT.to_string(),
353        cfg.checkpoint_interval_secs.to_string(),
354    );
355    options.metadata.insert(
356        BACKUP_INTERVAL_META_WAL_FLUSH.to_string(),
357        cfg.wal_flush_interval_secs.to_string(),
358    );
359    options
360        .metadata
361        .insert(BACKUP_KIND_META.to_string(), "s3".to_string());
362
363    #[cfg(feature = "backend-s3")]
364    {
365        let s3_cfg = crate::storage::backend::S3Config {
366            endpoint: cfg.endpoint.clone(),
367            bucket: cfg.bucket.clone(),
368            key_prefix: cfg.prefix.clone(),
369            access_key: cfg.access_key_id.clone(),
370            secret_key: cfg.secret_access_key.clone(),
371            region: cfg.region.clone(),
372            path_style: true,
373        };
374        let backend = Arc::new(crate::storage::backend::S3Backend::new(s3_cfg));
375        options.remote_backend = Some(backend.clone());
376        options.remote_backend_atomic = Some(backend);
377        // Use the operator-supplied prefix as the snapshot key root.
378        // The existing helpers (`default_snapshot_prefix`,
379        // `default_wal_archive_prefix`) derive sub-prefixes from the
380        // parent of `remote_key`.
381        let trimmed = cfg.prefix.trim_end_matches('/');
382        options.remote_key = Some(format!("{}/data.rdb", trimmed));
383
384        tracing::info!(
385            backend = "s3",
386            endpoint = %endpoint_host,
387            bucket = %cfg.bucket,
388            prefix = %cfg.prefix,
389            checkpoint_interval_secs = cfg.checkpoint_interval_secs,
390            wal_flush_interval_secs = cfg.wal_flush_interval_secs,
391            "backup backend configured from REDDB_BACKUP_* env"
392        );
393    }
394
395    #[cfg(not(feature = "backend-s3"))]
396    {
397        tracing::warn!(
398            backend = "s3",
399            endpoint = %endpoint_host,
400            bucket = %cfg.bucket,
401            prefix = %cfg.prefix,
402            "REDDB_BACKUP_S3_* configured but binary built without `backend-s3` feature; \
403             backend wiring skipped (archiver/checkpointer also disabled)"
404        );
405    }
406}
407
408fn endpoint_host(endpoint: &str) -> &str {
409    let after_scheme = endpoint
410        .split_once("://")
411        .map(|(_, r)| r)
412        .unwrap_or(endpoint);
413    after_scheme.split('/').next().unwrap_or(after_scheme)
414}
415
416/// If `options` carry backup-task intervals threaded in via
417/// [`apply_backup_config`], spawn periodic checkpointer + WAL-flush
418/// tasks against `runtime`. Returns a `BackupTasksHandle` that
419/// stops the tasks when dropped; runners keep it alive for the
420/// server lifetime.
421fn spawn_backup_tasks_if_configured(
422    options: &RedDBOptions,
423    runtime: &RedDBRuntime,
424) -> Option<BackupTasksHandle> {
425    let checkpoint_secs: u64 = options
426        .metadata
427        .get(BACKUP_INTERVAL_META_CHECKPOINT)?
428        .parse()
429        .ok()?;
430    let wal_secs: u64 = options
431        .metadata
432        .get(BACKUP_INTERVAL_META_WAL_FLUSH)?
433        .parse()
434        .ok()?;
435    if options.remote_backend.is_none() {
436        return None;
437    }
438
439    let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
440
441    let checkpoint_handle = {
442        let stop = Arc::clone(&stop);
443        let runtime = runtime.clone();
444        let interval = Duration::from_secs(checkpoint_secs);
445        thread::Builder::new()
446            .name("red-checkpointer".into())
447            .spawn(move || {
448                periodic_loop(stop, interval, move || {
449                    if let Err(err) = runtime.checkpoint() {
450                        tracing::warn!(error = %err, "periodic checkpoint failed");
451                    }
452                })
453            })
454            .ok()
455    };
456
457    let archiver_handle = {
458        let stop = Arc::clone(&stop);
459        let runtime = runtime.clone();
460        let interval = Duration::from_secs(wal_secs);
461        thread::Builder::new()
462            .name("red-wal-archiver".into())
463            .spawn(move || {
464                periodic_loop(stop, interval, move || {
465                    if let Err(err) = runtime.trigger_backup() {
466                        tracing::warn!(error = %err, "periodic WAL archive/backup failed");
467                    }
468                })
469            })
470            .ok()
471    };
472
473    tracing::info!(
474        checkpoint_interval_secs = checkpoint_secs,
475        wal_flush_interval_secs = wal_secs,
476        "backup tasks spawned (checkpointer + WAL archiver)"
477    );
478
479    Some(BackupTasksHandle {
480        stop,
481        _checkpoint_handle: checkpoint_handle,
482        _archiver_handle: archiver_handle,
483    })
484}
485
486/// Shutdown handle for the periodic checkpointer + archiver tasks.
487/// Drop signals both loops to exit on their next wake.
488pub struct BackupTasksHandle {
489    stop: Arc<std::sync::atomic::AtomicBool>,
490    _checkpoint_handle: Option<thread::JoinHandle<()>>,
491    _archiver_handle: Option<thread::JoinHandle<()>>,
492}
493
494impl Drop for BackupTasksHandle {
495    fn drop(&mut self) {
496        self.stop.store(true, std::sync::atomic::Ordering::Release);
497    }
498}
499
500fn periodic_loop<F: FnMut()>(
501    stop: Arc<std::sync::atomic::AtomicBool>,
502    interval: Duration,
503    mut tick: F,
504) {
505    // Wake on a short cadence so shutdown is responsive even when the
506    // operator-configured interval is large (e.g. 1h checkpoint).
507    let wake = Duration::from_secs(1);
508    let mut elapsed = Duration::ZERO;
509    while !stop.load(std::sync::atomic::Ordering::Acquire) {
510        thread::sleep(wake);
511        elapsed += wake;
512        if elapsed >= interval {
513            tick();
514            elapsed = Duration::ZERO;
515        }
516    }
517}
518
519fn configure_remote_backend_from_env(options: &mut RedDBOptions) {
520    // PLAN.md (cloud-agnostic) — prefer the new spelling
521    // `RED_BACKEND`; accept the legacy `REDDB_REMOTE_BACKEND` for
522    // existing dev installs. `none` (default) means standalone — no
523    // remote backend, valid for development and on-prem without
524    // remote.
525    let backend = env_nonempty("RED_BACKEND")
526        .or_else(|| env_nonempty("REDDB_REMOTE_BACKEND"))
527        .unwrap_or_else(|| "none".to_string())
528        .to_ascii_lowercase();
529
530    match backend.as_str() {
531        // Universal S3-compatible — covers AWS, R2, MinIO, Ceph,
532        // GCS-interop, B2, DO Spaces, Wasabi, Garage, SeaweedFS,
533        // IDrive, Storj. The `path_style` toggle in S3Config picks
534        // the right addressing for self-hosted vs hosted.
535        "s3" | "minio" | "r2" => {
536            #[cfg(feature = "backend-s3")]
537            {
538                if let Some(config) = s3_config_from_env() {
539                    let remote_key = env_nonempty("RED_REMOTE_KEY")
540                        .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
541                        .unwrap_or_else(|| "clusters/dev/data.rdb".to_string());
542                    let backend = Arc::new(crate::storage::backend::S3Backend::new(config));
543                    options.remote_backend = Some(backend.clone());
544                    options.remote_backend_atomic = Some(backend);
545                    options.remote_key = Some(remote_key);
546                }
547            }
548            #[cfg(not(feature = "backend-s3"))]
549            {
550                tracing::warn!(
551                    backend = %backend,
552                    "RED_BACKEND={backend} requested but binary was built without `backend-s3` feature"
553                );
554            }
555        }
556        // Filesystem backend (NFS/EFS/SMB/local-disk). PLAN.md spec
557        // calls it `fs`; legacy code shipped it as `local`. Both
558        // names map to LocalBackend, with the remote_key derived
559        // from `RED_FS_PATH` + a per-database suffix when provided.
560        "fs" | "local" => {
561            let base_path = env_nonempty("RED_FS_PATH").or_else(|| env_nonempty("REDDB_FS_PATH"));
562            let remote_key = match (
563                base_path,
564                env_nonempty("RED_REMOTE_KEY").or_else(|| env_nonempty("REDDB_REMOTE_KEY")),
565            ) {
566                (Some(base), Some(rel)) => Some(format!(
567                    "{}/{}",
568                    base.trim_end_matches('/'),
569                    rel.trim_start_matches('/')
570                )),
571                (Some(base), None) => Some(format!(
572                    "{}/clusters/dev/data.rdb",
573                    base.trim_end_matches('/')
574                )),
575                (None, Some(rel)) => Some(rel),
576                (None, None) => None,
577            };
578            if let Some(key) = remote_key {
579                let backend = Arc::new(crate::storage::backend::LocalBackend);
580                options.remote_backend = Some(backend.clone());
581                options.remote_backend_atomic = Some(backend);
582                options.remote_key = Some(key);
583            }
584        }
585        // Generic HTTP backend (PLAN.md Phase 2.3). Maximum
586        // portability: any service exposing PUT/GET/DELETE serves
587        // as a backup target. Optional auth via *_FILE secret
588        // path keeps the token out of the env.
589        "http" => {
590            let base_url = match env_nonempty("RED_HTTP_BACKEND_URL")
591                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_URL"))
592            {
593                Some(u) => u,
594                None => {
595                    tracing::warn!(
596                        "RED_BACKEND=http requires RED_HTTP_BACKEND_URL — backend disabled"
597                    );
598                    return;
599                }
600            };
601            let prefix = env_nonempty("RED_HTTP_BACKEND_PREFIX")
602                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_PREFIX"))
603                .unwrap_or_default();
604            let auth_header = if let Some(path) = env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER_FILE")
605                .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER_FILE"))
606            {
607                std::fs::read_to_string(&path)
608                    .ok()
609                    .map(|s| s.trim().to_string())
610                    .filter(|s| !s.is_empty())
611            } else {
612                env_nonempty("RED_HTTP_BACKEND_AUTH_HEADER")
613                    .or_else(|| env_nonempty("REDDB_HTTP_BACKEND_AUTH_HEADER"))
614            };
615
616            let mut config =
617                crate::storage::backend::HttpBackendConfig::new(base_url).with_prefix(prefix);
618            if let Some(auth) = auth_header {
619                config = config.with_auth_header(auth);
620            }
621            let conditional_writes = env_truthy("RED_HTTP_CONDITIONAL_WRITES")
622                || env_truthy("RED_HTTP_BACKEND_CONDITIONAL_WRITES")
623                || env_truthy("REDDB_HTTP_BACKEND_CONDITIONAL_WRITES");
624            config = config.with_conditional_writes(conditional_writes);
625            // Always populate the snapshot-transport handle. When the
626            // operator confirmed CAS support, also populate the atomic
627            // handle via AtomicHttpBackend — without that flag,
628            // LeaseStore must remain unreachable on this backend.
629            if conditional_writes {
630                match crate::storage::backend::AtomicHttpBackend::try_new(config.clone()) {
631                    Ok(atomic) => {
632                        let atomic_arc = Arc::new(atomic);
633                        options.remote_backend = Some(atomic_arc.clone());
634                        options.remote_backend_atomic = Some(atomic_arc);
635                    }
636                    Err(err) => {
637                        tracing::warn!(error = %err, "AtomicHttpBackend init failed; falling back to plain HTTP (no CAS)");
638                        options.remote_backend =
639                            Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
640                    }
641                }
642            } else {
643                options.remote_backend =
644                    Some(Arc::new(crate::storage::backend::HttpBackend::new(config)));
645            }
646            options.remote_key = env_nonempty("RED_REMOTE_KEY")
647                .or_else(|| env_nonempty("REDDB_REMOTE_KEY"))
648                .or_else(|| Some("clusters/dev/data.rdb".to_string()));
649        }
650        // `none` is the explicit standalone — no remote, no backup
651        // pipeline. Boot never blocks on network reachability.
652        "none" | "" => {}
653        other => {
654            tracing::warn!(
655                backend = %other,
656                "unknown RED_BACKEND value — supported: s3 | fs | http | none"
657            );
658        }
659    }
660}
661
662/// Resolve a value from env, accepting both the cloud-agnostic
663/// `RED_S3_*` family (PLAN.md spec) and the legacy `REDDB_S3_*` form
664/// kept for existing dev installs. The new spelling wins; the
665/// legacy spelling is read with a warning hint in callers' logs.
666#[cfg(feature = "backend-s3")]
667fn env_s3(suffix: &str) -> Option<String> {
668    env_nonempty(&format!("RED_S3_{suffix}"))
669        .or_else(|| env_nonempty(&format!("REDDB_S3_{suffix}")))
670}
671
672/// Read a secret value from either the literal env var or a file
673/// path supplied via `*_FILE` (PLAN.md spec — compatible with
674/// Kubernetes / Docker Secrets, HashiCorp Vault Agent, sealed-
675/// secrets). The `_FILE` variant wins so an operator can set it to
676/// override the inline value without touching the inline env.
677#[cfg(feature = "backend-s3")]
678fn env_s3_secret(suffix: &str) -> Option<String> {
679    let file_key_red = format!("RED_S3_{suffix}_FILE");
680    let file_key_legacy = format!("REDDB_S3_{suffix}_FILE");
681    if let Some(path) = env_nonempty(&file_key_red).or_else(|| env_nonempty(&file_key_legacy)) {
682        return std::fs::read_to_string(&path)
683            .ok()
684            .map(|s| s.trim().to_string())
685            .filter(|s| !s.is_empty());
686    }
687    env_s3(suffix)
688}
689
690#[cfg(feature = "backend-s3")]
691fn s3_config_from_env() -> Option<crate::storage::backend::S3Config> {
692    let endpoint = env_s3("ENDPOINT")?;
693    let bucket = env_s3("BUCKET")?;
694    let access_key = env_s3_secret("ACCESS_KEY")?;
695    let secret_key = env_s3_secret("SECRET_KEY")?;
696    let region = env_s3("REGION").unwrap_or_else(|| "us-east-1".to_string());
697    let key_prefix = env_s3("KEY_PREFIX")
698        .or_else(|| env_s3("PREFIX"))
699        .unwrap_or_default();
700    let path_style = env_s3("PATH_STYLE")
701        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
702        .unwrap_or(true);
703    Some(crate::storage::backend::S3Config {
704        endpoint,
705        bucket,
706        key_prefix,
707        access_key,
708        secret_key,
709        region,
710        path_style,
711    })
712}
713
714pub fn render_systemd_unit(config: &SystemdServiceConfig) -> String {
715    let data_dir = config.data_dir();
716    let exec_start = render_systemd_exec_start(config);
717    format!(
718        "[Unit]\n\
719Description=RedDB unified database service\n\
720After=network-online.target\n\
721Wants=network-online.target\n\
722\n\
723[Service]\n\
724Type=simple\n\
725User={user}\n\
726Group={group}\n\
727WorkingDirectory={workdir}\n\
728ExecStart={exec_start}\n\
729Restart=always\n\
730RestartSec=2\n\
731LimitSTACK=16M\n\
732NoNewPrivileges=true\n\
733PrivateTmp=true\n\
734ProtectSystem=strict\n\
735ProtectHome=true\n\
736ProtectControlGroups=true\n\
737ProtectKernelTunables=true\n\
738ProtectKernelModules=true\n\
739RestrictNamespaces=true\n\
740LockPersonality=true\n\
741MemoryDenyWriteExecute=true\n\
742ReadWritePaths={workdir}\n\
743\n\
744[Install]\n\
745WantedBy=multi-user.target\n",
746        user = config.run_user,
747        group = config.run_group,
748        workdir = data_dir.display(),
749        exec_start = exec_start,
750    )
751}
752
753/// Install a systemd unit + start the service.
754///
755/// Linux-only. The helper shells out to `systemctl`, `useradd`,
756/// `groupadd`, `install`, `getent`, and `id` — none of which exist on
757/// Windows or macOS. The Windows/macOS branch returns a hard error so
758/// callers (the CLI) surface a clear message instead of a confusing
759/// syscall failure. A proper Windows-service equivalent (sc.exe /
760/// NSSM) is a Phase 3.6 follow-up.
761#[cfg(target_os = "linux")]
762pub fn install_systemd_service(config: &SystemdServiceConfig) -> Result<(), String> {
763    ensure_root()?;
764    ensure_command_available("systemctl")?;
765    ensure_command_available("getent")?;
766    ensure_command_available("groupadd")?;
767    ensure_command_available("useradd")?;
768    ensure_command_available("install")?;
769    ensure_executable(&config.binary_path)?;
770
771    if !command_success("getent", ["group", config.run_group.as_str()])? {
772        run_command("groupadd", ["--system", config.run_group.as_str()])?;
773    }
774
775    if !command_success("id", ["-u", config.run_user.as_str()])? {
776        let data_dir = config.data_dir();
777        run_command(
778            "useradd",
779            [
780                "--system",
781                "--gid",
782                config.run_group.as_str(),
783                "--home-dir",
784                data_dir.to_string_lossy().as_ref(),
785                "--shell",
786                "/usr/sbin/nologin",
787                config.run_user.as_str(),
788            ],
789        )?;
790    }
791
792    let data_dir = config.data_dir();
793    run_command(
794        "install",
795        [
796            "-d",
797            "-o",
798            config.run_user.as_str(),
799            "-g",
800            config.run_group.as_str(),
801            "-m",
802            "0750",
803            data_dir.to_string_lossy().as_ref(),
804        ],
805    )?;
806
807    std::fs::write(config.unit_path(), render_systemd_unit(config))
808        .map_err(|err| format!("failed to write systemd unit: {err}"))?;
809
810    run_command("systemctl", ["daemon-reload"])?;
811    run_command(
812        "systemctl",
813        [
814            "enable",
815            "--now",
816            format!("{}.service", config.service_name).as_str(),
817        ],
818    )?;
819
820    Ok(())
821}
822
823/// Non-Linux fallback — systemd is Linux-specific. Keep the signature
824/// identical so callers compile on every platform; surface a clear
825/// error at runtime. Windows/macOS service-manager integration is a
826/// Phase 3.6 follow-up (sc.exe + NSSM on Windows, launchd on macOS).
827#[cfg(not(target_os = "linux"))]
828pub fn install_systemd_service(_config: &SystemdServiceConfig) -> Result<(), String> {
829    Err("systemd install is Linux-only — use sc.exe (Windows) or \
830         launchd (macOS) to install the service manually using the \
831         unit printed by `red service print-unit`"
832        .to_string())
833}
834
835#[cfg(target_os = "linux")]
836fn ensure_root() -> Result<(), String> {
837    let output = Command::new("id")
838        .arg("-u")
839        .output()
840        .map_err(|err| format!("failed to determine current uid: {err}"))?;
841    if !output.status.success() {
842        return Err("failed to determine current uid".to_string());
843    }
844    let uid = String::from_utf8_lossy(&output.stdout);
845    if uid.trim() != "0" {
846        return Err("run this command as root (sudo)".to_string());
847    }
848    Ok(())
849}
850
851#[cfg(target_os = "linux")]
852fn ensure_command_available(command: &str) -> Result<(), String> {
853    let status = Command::new("sh")
854        .args(["-lc", &format!("command -v {command} >/dev/null 2>&1")])
855        .status()
856        .map_err(|err| format!("failed to check command '{command}': {err}"))?;
857    if status.success() {
858        Ok(())
859    } else {
860        Err(format!("required command not found: {command}"))
861    }
862}
863
864#[cfg(target_os = "linux")]
865fn ensure_executable(path: &std::path::Path) -> Result<(), String> {
866    let metadata = std::fs::metadata(path)
867        .map_err(|err| format!("binary not found '{}': {err}", path.display()))?;
868    #[cfg(unix)]
869    {
870        use std::os::unix::fs::PermissionsExt;
871        if metadata.permissions().mode() & 0o111 == 0 {
872            return Err(format!("binary is not executable: {}", path.display()));
873        }
874    }
875    #[cfg(not(unix))]
876    {
877        if !metadata.is_file() {
878            return Err(format!("binary is not a file: {}", path.display()));
879        }
880    }
881    Ok(())
882}
883
884#[cfg(target_os = "linux")]
885fn command_success<const N: usize>(program: &str, args: [&str; N]) -> Result<bool, String> {
886    Command::new(program)
887        .args(args)
888        .status()
889        .map(|status| status.success())
890        .map_err(|err| format!("failed to run {program}: {err}"))
891}
892
893#[cfg(target_os = "linux")]
894fn run_command<const N: usize>(program: &str, args: [&str; N]) -> Result<(), String> {
895    let output = Command::new(program)
896        .args(args)
897        .output()
898        .map_err(|err| format!("failed to run {program}: {err}"))?;
899    if output.status.success() {
900        return Ok(());
901    }
902
903    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
904    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
905    let detail = if !stderr.is_empty() {
906        stderr
907    } else if !stdout.is_empty() {
908        stdout
909    } else {
910        format!("exit status {}", output.status)
911    };
912    Err(format!("{program} failed: {detail}"))
913}
914
915pub fn run_server_with_large_stack(config: ServerCommandConfig) -> Result<(), String> {
916    let has_any = config.router_bind_addr.is_some()
917        || config.grpc_bind_addr.is_some()
918        || config.http_bind_addr.is_some()
919        || config.wire_bind_addr.is_some()
920        || config.pg_bind_addr.is_some();
921    if !has_any {
922        return Err("at least one server bind address must be configured".into());
923    }
924    let thread_name = if config.router_bind_addr.is_some() {
925        "red-server-router"
926    } else {
927        match (
928            config.grpc_bind_addr.is_some(),
929            config.http_bind_addr.is_some(),
930        ) {
931            (true, true) => "red-server-dual",
932            (true, false) => "red-server-grpc",
933            (false, true) => "red-server-http",
934            (false, false) if config.wire_bind_addr.is_some() => "red-server-wire",
935            (false, false) => "red-server-pg-wire",
936        }
937    };
938
939    let handle = thread::Builder::new()
940        .name(thread_name.into())
941        .stack_size(8 * 1024 * 1024)
942        .spawn(move || run_configured_servers(config))
943        .map_err(|err| format!("failed to spawn server thread: {err}"))?;
944
945    match handle.join() {
946        Ok(result) => result,
947        Err(_) => Err("server thread panicked".to_string()),
948    }
949}
950
951fn render_systemd_exec_start(config: &SystemdServiceConfig) -> String {
952    let mut parts = vec![
953        config.binary_path.display().to_string(),
954        "server".to_string(),
955        "--path".to_string(),
956        config.data_path.display().to_string(),
957    ];
958
959    if let Some(bind_addr) = &config.router_bind_addr {
960        parts.push("--bind".to_string());
961        parts.push(bind_addr.clone());
962    } else if let Some(bind_addr) = &config.grpc_bind_addr {
963        parts.push("--grpc-bind".to_string());
964        parts.push(bind_addr.clone());
965    }
966    if let Some(bind_addr) = &config.http_bind_addr {
967        parts.push("--http-bind".to_string());
968        parts.push(bind_addr.clone());
969    }
970
971    parts.join(" ")
972}
973
974pub fn probe_listener(target: &str, timeout: Duration) -> bool {
975    let addresses: Vec<SocketAddr> = match target.to_socket_addrs() {
976        Ok(addresses) => addresses.collect(),
977        Err(_) => return false,
978    };
979
980    addresses
981        .into_iter()
982        .any(|address| TcpStream::connect_timeout(&address, timeout).is_ok())
983}
984
985#[inline(never)]
986fn run_configured_servers(config: ServerCommandConfig) -> Result<(), String> {
987    // Phase 6 logging is initialised inside each runner once the
988    // runtime is open — see `build_runtime_and_auth_store`. Going
989    // after DB open lets us read `red.logging.*` config keys out of
990    // the persistent red_config store and merge them with the CLI
991    // flags (flag > red_config > built-in default).
992    if let Some(router_bind_addr) = config.router_bind_addr.clone() {
993        return run_routed_server(config, router_bind_addr);
994    }
995
996    match (config.grpc_bind_addr.clone(), config.http_bind_addr.clone()) {
997        (Some(grpc_bind_addr), Some(http_bind_addr)) => {
998            run_dual_server(config, grpc_bind_addr, http_bind_addr)
999        }
1000        (Some(grpc_bind_addr), None) => run_grpc_server(config, grpc_bind_addr),
1001        (None, Some(http_bind_addr)) => run_http_server(config, http_bind_addr),
1002        (None, None) => {
1003            if let Some(wire_addr) = config.wire_bind_addr.clone() {
1004                run_wire_only_server(config, wire_addr)
1005            } else if let Some(pg_addr) = config.pg_bind_addr.clone() {
1006                run_pg_only_server(config, pg_addr)
1007            } else {
1008                Err("at least one server bind address must be configured".to_string())
1009            }
1010        }
1011    }
1012}
1013
1014/// Bind a TCP listener for a transport at startup and record the
1015/// outcome in the shared [`TransportReadiness`] state.
1016///
1017/// The split between *explicit* and *implicit/default* binds is the
1018/// contract from issue #545:
1019///
1020/// * `explicit == true` — the operator named this transport on the
1021///   CLI / env / config. A failed bind is fatal: this returns `Err`
1022///   and the boot path must propagate the error so the process exits
1023///   non-zero with the recorded `reason`.
1024/// * `explicit == false` — this is a default listener the server
1025///   would have spun up regardless. A failed bind degrades: this
1026///   returns `Ok(None)` (no listener) but the failure is still
1027///   recorded in `readiness.failed`, so the server keeps running and
1028///   the next `/health` probe enumerates the degraded listener.
1029///
1030/// On success the bound listener lands in `readiness.active`.
1031pub fn bind_listener_for_startup(
1032    readiness: &mut TransportReadiness,
1033    transport: &str,
1034    bind_addr: &str,
1035    explicit: bool,
1036) -> Result<Option<TcpListener>, String> {
1037    match TcpListener::bind(bind_addr) {
1038        Ok(listener) => {
1039            readiness.active(transport, bind_addr, explicit);
1040            Ok(Some(listener))
1041        }
1042        Err(err) => {
1043            let reason = format!("{transport} listener bind {bind_addr}: {err}");
1044            readiness.failed(transport, bind_addr, explicit, reason.clone());
1045            if explicit {
1046                tracing::error!(
1047                    transport,
1048                    bind = %bind_addr,
1049                    error = %err,
1050                    "fatal explicit bind failure"
1051                );
1052                Err(format!("explicit {reason}"))
1053            } else {
1054                tracing::warn!(
1055                    transport,
1056                    bind = %bind_addr,
1057                    error = %err,
1058                    "non-fatal implicit bind failure; listener degraded"
1059                );
1060                Ok(None)
1061            }
1062        }
1063    }
1064}
1065
1066/// Wire SIGTERM and SIGINT to `RedDBRuntime::graceful_shutdown`.
1067///
1068/// PLAN.md Phase 1.1 — orchestrators (K8s preStop, Fly autostop, ECS
1069/// drain, systemd, plain `docker stop`) all rely on SIGTERM with a
1070/// grace window. SIGKILL after that grace window is the OS's
1071/// fallback; we are responsible for finishing in time.
1072///
1073/// Spawns a tokio task on the caller's runtime that:
1074///   1. Awaits the first of SIGTERM / SIGINT.
1075///   2. Calls `runtime.graceful_shutdown(backup_on_shutdown)`. The
1076///      runtime moves to `Stopped` on its own; this just runs the
1077///      flush + checkpoint pipeline and (optionally) a final backup.
1078///   3. Calls `std::process::exit(0)` so the orchestrator sees a
1079///      clean exit code.
1080///
1081/// `RED_BACKUP_ON_SHUTDOWN` (default `true` if a remote backend is
1082/// configured) toggles step 3's backup branch. The flush + checkpoint
1083/// always run.
1084///
1085/// Idempotent across signal storms — `graceful_shutdown` returns the
1086/// cached report on second call, but we exit on the first one
1087/// regardless, so the second SIGTERM never reaches the handler.
1088async fn spawn_lifecycle_signal_handler(runtime: RedDBRuntime) {
1089    let backup_on_shutdown = std::env::var("RED_BACKUP_ON_SHUTDOWN")
1090        .ok()
1091        .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1092        .unwrap_or(true);
1093
1094    #[cfg(unix)]
1095    {
1096        use tokio::signal::unix::{signal, SignalKind};
1097
1098        let mut sigterm = match signal(SignalKind::terminate()) {
1099            Ok(s) => s,
1100            Err(err) => {
1101                tracing::warn!(
1102                    error = %err,
1103                    "could not install SIGTERM handler; orchestrator graceful shutdown will fall back to SIGKILL"
1104                );
1105                return;
1106            }
1107        };
1108        let mut sigint = match signal(SignalKind::interrupt()) {
1109            Ok(s) => s,
1110            Err(err) => {
1111                tracing::warn!(error = %err, "could not install SIGINT handler");
1112                return;
1113            }
1114        };
1115        // PLAN.md Phase 6.4 — SIGHUP triggers a reload of secrets from
1116        // their `_FILE` companions without restarting the process.
1117        // Useful for credential rotation pipelines (kubectl create
1118        // secret + kubectl rollout restart, but for systemd / Nomad /
1119        // bare-metal where rolling the process is heavier).
1120        let mut sighup = match signal(SignalKind::hangup()) {
1121            Ok(s) => Some(s),
1122            Err(err) => {
1123                tracing::warn!(error = %err, "could not install SIGHUP handler; secret reload via signal disabled");
1124                None
1125            }
1126        };
1127
1128        let reload_runtime = runtime.clone();
1129        tokio::spawn(async move {
1130            loop {
1131                let signal_name = match &mut sighup {
1132                    Some(hup) => tokio::select! {
1133                        _ = sigterm.recv() => "SIGTERM",
1134                        _ = sigint.recv() => "SIGINT",
1135                        _ = hup.recv() => "SIGHUP",
1136                    },
1137                    None => tokio::select! {
1138                        _ = sigterm.recv() => "SIGTERM",
1139                        _ = sigint.recv() => "SIGINT",
1140                    },
1141                };
1142
1143                if signal_name == "SIGHUP" {
1144                    handle_sighup_reload(&reload_runtime);
1145                    continue; // stay alive; SIGHUP isn't a shutdown
1146                }
1147
1148                tracing::info!(
1149                    signal = signal_name,
1150                    "lifecycle signal received; shutting down"
1151                );
1152                match runtime.graceful_shutdown(backup_on_shutdown) {
1153                    Ok(report) => {
1154                        tracing::info!(
1155                            duration_ms = report.duration_ms,
1156                            flushed_wal = report.flushed_wal,
1157                            final_checkpoint = report.final_checkpoint,
1158                            backup_uploaded = report.backup_uploaded,
1159                            "graceful shutdown complete"
1160                        );
1161                    }
1162                    Err(err) => {
1163                        tracing::error!(error = %err, "graceful shutdown failed");
1164                        // Issue #205 — graceful shutdown returning Err
1165                        // means the runtime is exiting without a clean
1166                        // flush/checkpoint. Operator-grade event so the
1167                        // operator notices the dirty exit even when the
1168                        // process restarts before they read tracing logs.
1169                        crate::telemetry::operator_event::OperatorEvent::ShutdownForced {
1170                            reason: format!("graceful shutdown failed: {err}"),
1171                        }
1172                        .emit_global();
1173                    }
1174                }
1175                std::process::exit(0);
1176            }
1177        });
1178    }
1179
1180    #[cfg(not(unix))]
1181    {
1182        tokio::spawn(async move {
1183            let interrupted = tokio::signal::ctrl_c().await;
1184            if let Err(err) = interrupted {
1185                tracing::warn!(error = %err, "could not install Ctrl+C handler");
1186                return;
1187            }
1188
1189            tracing::info!(
1190                signal = "Ctrl+C",
1191                "lifecycle signal received; shutting down"
1192            );
1193            match runtime.graceful_shutdown(backup_on_shutdown) {
1194                Ok(report) => {
1195                    tracing::info!(
1196                        duration_ms = report.duration_ms,
1197                        flushed_wal = report.flushed_wal,
1198                        final_checkpoint = report.final_checkpoint,
1199                        backup_uploaded = report.backup_uploaded,
1200                        "graceful shutdown complete"
1201                    );
1202                }
1203                Err(err) => {
1204                    tracing::error!(error = %err, "graceful shutdown failed");
1205                }
1206            }
1207            std::process::exit(0);
1208        });
1209    }
1210}
1211
1212/// PLAN.md Phase 6.4 — re-read secrets from `*_FILE` companion env
1213/// vars. Today this only refreshes the audit log + records the
1214/// reload event; the runtime modules that hold cached secret
1215/// material (S3 backend credentials, admin token, JWT keys) read
1216/// the env on each request so the next call after SIGHUP picks up
1217/// the new file contents automatically. A future extension can
1218/// punch through to the LeaseStore / AuthStore for in-memory
1219/// caches that don't re-read on each call.
1220fn handle_sighup_reload(runtime: &RedDBRuntime) {
1221    let now_ms = std::time::SystemTime::now()
1222        .duration_since(std::time::UNIX_EPOCH)
1223        .map(|d| d.as_millis() as u64)
1224        .unwrap_or(0);
1225    tracing::info!(
1226        target: "reddb::secrets",
1227        ts_unix_ms = now_ms,
1228        "SIGHUP received; secrets will be re-read from *_FILE on next access"
1229    );
1230    // Routed through AuditFieldEscaper (ADR 0010 / issue #177) so
1231    // every emission goes through the typed-field guard. The
1232    // arguments here are static, but using the typed entry point
1233    // keeps the discipline uniform across call sites.
1234    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
1235    runtime.audit_log().record_event(
1236        AuditEvent::builder("config/sighup_reload")
1237            .source(AuditAuthSource::System)
1238            .resource("secrets")
1239            .outcome(Outcome::Success)
1240            .field(AuditFieldEscaper::field("ts_unix_ms", now_ms))
1241            .build(),
1242    );
1243}
1244
1245#[inline(never)]
1246fn run_routed_server(config: ServerCommandConfig, router_bind_addr: String) -> Result<(), String> {
1247    let workers = config.workers;
1248    let cli_telemetry = config.telemetry.clone();
1249    let db_options = config.to_db_options()?;
1250    let rt_config = detect_runtime_config();
1251    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
1252    let (runtime, auth_store, _telemetry_guard) =
1253        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1254    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1255
1256    spawn_admin_metrics_listeners(&runtime, &auth_store);
1257
1258    let http_listener = std::net::TcpListener::bind("127.0.0.1:0")
1259        .map_err(|err| format!("bind internal HTTP listener: {err}"))?;
1260    let http_backend = http_listener
1261        .local_addr()
1262        .map_err(|err| format!("inspect internal HTTP listener: {err}"))?;
1263    let http_server = build_http_server(
1264        runtime.clone(),
1265        auth_store.clone(),
1266        http_backend.to_string(),
1267    );
1268    let http_server = apply_http_limits(http_server, &config, &runtime);
1269    let http_handle = http_server.serve_in_background_on(http_listener);
1270
1271    thread::sleep(Duration::from_millis(100));
1272    if http_handle.is_finished() {
1273        return match http_handle.join() {
1274            Ok(Ok(())) => Err("HTTP backend exited unexpectedly".to_string()),
1275            Ok(Err(err)) => Err(err.to_string()),
1276            Err(_) => Err("HTTP backend thread panicked".to_string()),
1277        };
1278    }
1279
1280    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1281        .enable_all()
1282        .worker_threads(worker_threads)
1283        .thread_stack_size(rt_config.stack_size)
1284        .build()
1285        .map_err(|err| format!("tokio runtime: {err}"))?;
1286
1287    let signal_runtime = runtime.clone();
1288    tokio_runtime.block_on(async move {
1289        spawn_lifecycle_signal_handler(signal_runtime).await;
1290        let grpc_listener = std::net::TcpListener::bind("127.0.0.1:0")
1291            .map_err(|err| format!("bind internal gRPC listener: {err}"))?;
1292        let grpc_backend = grpc_listener
1293            .local_addr()
1294            .map_err(|err| format!("inspect internal gRPC listener: {err}"))?;
1295        let grpc_server = RedDBGrpcServer::with_options(
1296            runtime.clone(),
1297            GrpcServerOptions {
1298                bind_addr: grpc_backend.to_string(),
1299                tls: None,
1300            },
1301            auth_store,
1302        );
1303        tokio::spawn(async move {
1304            if let Err(err) = grpc_server.serve_on(grpc_listener).await {
1305                tracing::error!(err = %err, "gRPC backend error");
1306            }
1307        });
1308
1309        let wire_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
1310            .await
1311            .map_err(|err| format!("bind internal wire listener: {err}"))?;
1312        let wire_backend = wire_listener
1313            .local_addr()
1314            .map_err(|err| format!("inspect internal wire listener: {err}"))?;
1315        let wire_rt = Arc::new(runtime);
1316        tokio::spawn(async move {
1317            if let Err(err) =
1318                crate::wire::redwire::listener::start_redwire_listener_on(wire_listener, wire_rt)
1319                    .await
1320            {
1321                tracing::error!(err = %err, "redwire backend error");
1322            }
1323        });
1324
1325        tracing::info!(
1326            bind = %router_bind_addr,
1327            cpus = rt_config.available_cpus,
1328            workers = worker_threads,
1329            "router bootstrapping"
1330        );
1331        serve_tcp_router(TcpProtocolRouterConfig {
1332            bind_addr: router_bind_addr,
1333            grpc_backend,
1334            http_backend,
1335            wire_backend,
1336        })
1337        .await
1338        .map_err(|err| err.to_string())
1339    })
1340}
1341
1342/// Spawn RedWire listeners (plaintext + TLS) as background tokio tasks.
1343async fn spawn_wire_listeners(
1344    config: &ServerCommandConfig,
1345    runtime: &RedDBRuntime,
1346    readiness: &mut TransportReadiness,
1347) -> Result<(), String> {
1348    // Plaintext RedWire — TCP or Unix socket
1349    if let Some(wire_addr) = config.wire_bind_addr.clone() {
1350        let wire_rt = Arc::new(runtime.clone());
1351        // Address starting with `unix://` or an absolute filesystem path
1352        // switches to Unix domain sockets.
1353        #[cfg(unix)]
1354        {
1355            if wire_addr.starts_with("unix://") || wire_addr.starts_with('/') {
1356                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1357                tokio::spawn(async move {
1358                    if let Err(e) = crate::wire::redwire::listener::start_redwire_unix_listener(
1359                        &wire_addr, wire_rt,
1360                    )
1361                    .await
1362                    {
1363                        tracing::error!(err = %e, "redwire unix listener error");
1364                    }
1365                });
1366                return Ok(());
1367            }
1368        }
1369        match tokio::net::TcpListener::bind(&wire_addr).await {
1370            Ok(listener) => {
1371                readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1372                tokio::spawn(async move {
1373                    if let Err(e) =
1374                        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1375                            .await
1376                    {
1377                        tracing::error!(err = %e, "redwire listener error");
1378                    }
1379                });
1380            }
1381            Err(err) => {
1382                let reason = format!("wire listener bind {wire_addr}: {err}");
1383                readiness.failed(
1384                    "wire",
1385                    &wire_addr,
1386                    config.wire_bind_explicit,
1387                    reason.clone(),
1388                );
1389                if config.wire_bind_explicit {
1390                    tracing::error!(
1391                        transport = "wire",
1392                        bind = %wire_addr,
1393                        error = %err,
1394                        "fatal explicit bind failure"
1395                    );
1396                    return Err(format!("explicit {reason}"));
1397                }
1398                tracing::warn!(
1399                    transport = "wire",
1400                    bind = %wire_addr,
1401                    error = %err,
1402                    "non-fatal implicit bind failure; listener degraded"
1403                );
1404            }
1405        }
1406    }
1407
1408    // RedWire over TLS
1409    if let Some(wire_tls_addr) = config.wire_tls_bind_addr.clone() {
1410        let tls_config = resolve_wire_tls_config(config);
1411        match tls_config {
1412            Ok(tls_cfg) => {
1413                let wire_rt = Arc::new(runtime.clone());
1414                tokio::spawn(async move {
1415                    if let Err(e) =
1416                        crate::wire::start_redwire_tls_listener(&wire_tls_addr, wire_rt, &tls_cfg)
1417                            .await
1418                    {
1419                        tracing::error!(err = %e, "redwire+tls listener error");
1420                    }
1421                });
1422            }
1423            Err(e) => tracing::error!(err = %e, "redwire TLS config error"),
1424        }
1425    }
1426    Ok(())
1427}
1428
1429/// Spawn the PostgreSQL wire-protocol listener (Phase 3.1 PG parity).
1430///
1431/// Only runs when `--pg-bind` is supplied. Uses the v3 protocol so
1432/// psql, JDBC drivers, DBeaver, etc. can connect directly. Runs
1433/// alongside the native wire listener; the two transports do not
1434/// share a port.
1435fn spawn_pg_listener(config: &ServerCommandConfig, runtime: &RedDBRuntime) {
1436    if let Some(pg_addr) = config.pg_bind_addr.clone() {
1437        let rt = Arc::new(runtime.clone());
1438        tokio::spawn(async move {
1439            let cfg = crate::wire::PgWireConfig {
1440                bind_addr: pg_addr,
1441                ..Default::default()
1442            };
1443            if let Err(e) = crate::wire::start_pg_wire_listener(cfg, rt).await {
1444                tracing::error!(err = %e, "pg wire listener error");
1445            }
1446        });
1447    }
1448}
1449
1450/// Resolve gRPC TLS material into PEM bytes.
1451///
1452/// Lookup order, in priority:
1453///   1. Explicit `config.grpc_tls_cert` / `config.grpc_tls_key` (paths
1454///      passed via CLI/env). Both must be present together.
1455///   2. `RED_GRPC_TLS_DEV=1` — auto-generate a self-signed cert next
1456///      to the data dir. Refuses to run without the env flag so an
1457///      operator can't accidentally ship a dev cert in prod.
1458///
1459/// `client_ca` is loaded when `config.grpc_tls_client_ca` is set,
1460/// turning the listener into a mutual-TLS endpoint that requires
1461/// every client to present a chain that anchors at one of the CAs
1462/// in the bundle.
1463fn resolve_grpc_tls_options(config: &ServerCommandConfig) -> Result<crate::GrpcTlsOptions, String> {
1464    use crate::utils::secret_file::expand_file_env;
1465
1466    // Best-effort *_FILE expansion for every TLS env knob. Errors here
1467    // surface as warnings; the fallback path (explicit cert paths) will
1468    // cover the common case.
1469    for var in [
1470        "REDDB_GRPC_TLS_CERT",
1471        "REDDB_GRPC_TLS_KEY",
1472        "REDDB_GRPC_TLS_CLIENT_CA",
1473    ] {
1474        if let Err(err) = expand_file_env(var) {
1475            tracing::warn!(
1476                target: "reddb::secrets",
1477                env = %var,
1478                err = %err,
1479                "could not expand *_FILE companion for gRPC TLS"
1480            );
1481        }
1482    }
1483
1484    let (cert_pem, key_pem) = match (&config.grpc_tls_cert, &config.grpc_tls_key) {
1485        (Some(cert), Some(key)) => {
1486            let cert_pem = std::fs::read(cert)
1487                .map_err(|e| format!("read grpc cert {}: {e}", cert.display()))?;
1488            let key_pem =
1489                std::fs::read(key).map_err(|e| format!("read grpc key {}: {e}", key.display()))?;
1490            (cert_pem, key_pem)
1491        }
1492        _ => {
1493            // No explicit material → only proceed when dev-mode is on.
1494            let dev = std::env::var("RED_GRPC_TLS_DEV")
1495                .ok()
1496                .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"))
1497                .unwrap_or(false);
1498            if !dev {
1499                return Err("gRPC TLS configured but no cert/key supplied — set \
1500                     REDDB_GRPC_TLS_CERT / REDDB_GRPC_TLS_KEY (or \
1501                     RED_GRPC_TLS_DEV=1 to auto-generate a self-signed cert)"
1502                    .to_string());
1503            }
1504            let dir = config
1505                .path
1506                .as_ref()
1507                .and_then(|p| p.parent())
1508                .map(PathBuf::from)
1509                .unwrap_or_else(|| PathBuf::from("."));
1510            let (cert_pem_str, key_pem_str) =
1511                crate::wire::tls::generate_self_signed_cert("localhost")
1512                    .map_err(|e| format!("auto-generate dev grpc cert: {e}"))?;
1513
1514            // Constant-time-friendly fingerprint to stderr so the
1515            // operator can pin a client trust store. We log via
1516            // `tracing::warn!` so it stands out next to ordinary
1517            // listener-online events.
1518            let fp = sha256_pem_fingerprint(cert_pem_str.as_bytes());
1519            tracing::warn!(
1520                target: "reddb::security",
1521                transport = "grpc",
1522                cert_sha256 = %fp,
1523                "RED_GRPC_TLS_DEV=1: using auto-generated self-signed cert; \
1524                 DO NOT use in production"
1525            );
1526            // Persist so that restarts reuse the same identity.
1527            let cert_path = dir.join("grpc-tls-cert.pem");
1528            let key_path = dir.join("grpc-tls-key.pem");
1529            if !cert_path.exists() || !key_path.exists() {
1530                let _ = std::fs::create_dir_all(&dir);
1531                std::fs::write(&cert_path, cert_pem_str.as_bytes())
1532                    .map_err(|e| format!("write grpc dev cert: {e}"))?;
1533                std::fs::write(&key_path, key_pem_str.as_bytes())
1534                    .map_err(|e| format!("write grpc dev key: {e}"))?;
1535                #[cfg(unix)]
1536                {
1537                    use std::os::unix::fs::PermissionsExt;
1538                    let _ =
1539                        std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600));
1540                }
1541            }
1542            (cert_pem_str.into_bytes(), key_pem_str.into_bytes())
1543        }
1544    };
1545
1546    let client_ca_pem = match &config.grpc_tls_client_ca {
1547        Some(path) => Some(
1548            std::fs::read(path)
1549                .map_err(|e| format!("read grpc client CA {}: {e}", path.display()))?,
1550        ),
1551        None => None,
1552    };
1553
1554    Ok(crate::GrpcTlsOptions {
1555        cert_pem,
1556        key_pem,
1557        client_ca_pem,
1558    })
1559}
1560
1561/// Spawn a TLS-terminated gRPC listener when `grpc_tls_bind_addr` is
1562/// configured. Logs and continues on TLS-config errors so the plain
1563/// listener stays up; this matches the wire-listener pattern.
1564fn spawn_grpc_tls_listener_if_configured(
1565    config: &ServerCommandConfig,
1566    runtime: RedDBRuntime,
1567    auth_store: Arc<AuthStore>,
1568) {
1569    let Some(tls_bind) = config.grpc_tls_bind_addr.clone() else {
1570        return;
1571    };
1572    let tls_opts = match resolve_grpc_tls_options(config) {
1573        Ok(opts) => opts,
1574        Err(err) => {
1575            tracing::error!(
1576                target: "reddb::security",
1577                transport = "grpc",
1578                err = %err,
1579                "gRPC TLS config error; TLS listener will not start"
1580            );
1581            return;
1582        }
1583    };
1584    tokio::spawn(async move {
1585        let server = RedDBGrpcServer::with_options(
1586            runtime,
1587            GrpcServerOptions {
1588                bind_addr: tls_bind.clone(),
1589                tls: Some(tls_opts),
1590            },
1591            auth_store,
1592        );
1593        tracing::info!(transport = "grpc+tls", bind = %tls_bind, "listener online");
1594        if let Err(err) = server.serve().await {
1595            tracing::error!(transport = "grpc+tls", err = %err, "gRPC TLS listener error");
1596        }
1597    });
1598}
1599
1600/// Hex-encoded SHA-256 of a PEM blob, used for cert-pin operator log
1601/// lines. Constant-time hash; no token contents leave this fn.
1602fn sha256_pem_fingerprint(pem: &[u8]) -> String {
1603    use sha2::{Digest, Sha256};
1604    let mut h = Sha256::new();
1605    h.update(pem);
1606    let d = h.finalize();
1607    let mut buf = String::with_capacity(64);
1608    for b in d.iter() {
1609        buf.push_str(&format!("{b:02x}"));
1610    }
1611    buf
1612}
1613
1614/// Resolve TLS config: use provided cert/key or auto-generate.
1615fn resolve_wire_tls_config(
1616    config: &ServerCommandConfig,
1617) -> Result<crate::wire::WireTlsConfig, String> {
1618    match (&config.wire_tls_cert, &config.wire_tls_key) {
1619        (Some(cert), Some(key)) => Ok(crate::wire::WireTlsConfig {
1620            cert_path: cert.clone(),
1621            key_path: key.clone(),
1622        }),
1623        _ => {
1624            // Auto-generate self-signed cert
1625            let dir = config
1626                .path
1627                .as_ref()
1628                .and_then(|p| p.parent())
1629                .map(PathBuf::from)
1630                .unwrap_or_else(|| PathBuf::from("."));
1631            crate::wire::tls::auto_generate_cert(&dir).map_err(|e| e.to_string())
1632        }
1633    }
1634}
1635
1636#[inline(never)]
1637fn run_wire_only_server(config: ServerCommandConfig, wire_addr: String) -> Result<(), String> {
1638    let rt_config = detect_runtime_config();
1639    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1640    let cli_telemetry = config.telemetry.clone();
1641    let db_options = config.to_db_options()?;
1642    let mut transport_readiness = TransportReadiness::default();
1643
1644    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1645        .enable_all()
1646        .worker_threads(workers)
1647        .thread_stack_size(rt_config.stack_size)
1648        .build()
1649        .map_err(|err| format!("tokio runtime: {err}"))?;
1650
1651    // Guard lives on the outer thread's stack so it outlives the
1652    // tokio runtime — dropping it only after the listener returns
1653    // flushes the file log writer.
1654    let (runtime, _auth_store, _telemetry_guard) =
1655        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1656    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1657    let signal_runtime = runtime.clone();
1658    tokio_runtime.block_on(async move {
1659        spawn_lifecycle_signal_handler(signal_runtime).await;
1660        spawn_pg_listener(&config, &runtime);
1661        let wire_rt = Arc::new(runtime);
1662        let listener = tokio::net::TcpListener::bind(&wire_addr)
1663            .await
1664            .map_err(|err| {
1665                let reason = format!("wire listener bind {wire_addr}: {err}");
1666                transport_readiness.failed(
1667                    "wire",
1668                    &wire_addr,
1669                    config.wire_bind_explicit,
1670                    reason.clone(),
1671                );
1672                if config.wire_bind_explicit {
1673                    format!("explicit {reason}")
1674                } else {
1675                    reason
1676                }
1677            })?;
1678        transport_readiness.active("wire", &wire_addr, config.wire_bind_explicit);
1679        crate::wire::redwire::listener::start_redwire_listener_on(listener, wire_rt)
1680            .await
1681            .map_err(|e| e.to_string())
1682    })
1683}
1684
1685#[inline(never)]
1686fn run_pg_only_server(config: ServerCommandConfig, pg_addr: String) -> Result<(), String> {
1687    let rt_config = detect_runtime_config();
1688    let workers = config.workers.unwrap_or(rt_config.suggested_workers);
1689    let cli_telemetry = config.telemetry.clone();
1690    let db_options = config.to_db_options()?;
1691
1692    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
1693        .enable_all()
1694        .worker_threads(workers)
1695        .thread_stack_size(rt_config.stack_size)
1696        .build()
1697        .map_err(|err| format!("tokio runtime: {err}"))?;
1698
1699    let (runtime, _auth_store, _telemetry_guard) =
1700        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
1701    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
1702    let signal_runtime = runtime.clone();
1703    tokio_runtime.block_on(async move {
1704        spawn_lifecycle_signal_handler(signal_runtime).await;
1705        let cfg = crate::wire::PgWireConfig {
1706            bind_addr: pg_addr,
1707            ..Default::default()
1708        };
1709        crate::wire::start_pg_wire_listener(cfg, Arc::new(runtime))
1710            .await
1711            .map_err(|e| e.to_string())
1712    })
1713}
1714
1715#[inline(never)]
1716fn build_runtime_and_auth_store(
1717    db_options: RedDBOptions,
1718    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1719) -> Result<
1720    (
1721        RedDBRuntime,
1722        Arc<AuthStore>,
1723        Option<crate::telemetry::TelemetryGuard>,
1724    ),
1725    String,
1726> {
1727    // Return the TelemetryGuard so server runners can bind it for
1728    // their full lifetime. Dropping the guard tears down the
1729    // non-blocking log writer thread and, because that writer is
1730    // built with `.lossy(true)`, any subsequent log event routed to
1731    // the file sink is silently dropped — so callers MUST keep the
1732    // returned `Option<TelemetryGuard>` alive until shutdown.
1733    build_runtime_with_telemetry(db_options, cli_telemetry)
1734}
1735
1736/// Open the runtime, initialise structured logging from merged CLI +
1737/// `red_config` settings, and return a guard the caller must keep
1738/// alive for the server lifetime (drop flushes pending log writes).
1739///
1740/// Merge priority: CLI flag (explicit `Some`) beats `red.logging.*`
1741/// in red_config, beats the built-in default. A CLI-flag value of
1742/// `None` / empty means "inherit from config or default" — never
1743/// "disable". The one exception is `--no-log-file` which forces
1744/// `log_dir = None` regardless of config.
1745pub(crate) fn build_runtime_with_telemetry(
1746    db_options: RedDBOptions,
1747    cli_telemetry: Option<crate::telemetry::TelemetryConfig>,
1748) -> Result<
1749    (
1750        RedDBRuntime,
1751        Arc<AuthStore>,
1752        Option<crate::telemetry::TelemetryGuard>,
1753    ),
1754    String,
1755> {
1756    let runtime = RedDBRuntime::with_options(db_options.clone()).map_err(|err| {
1757        // Issue #205 — runtime construction failure is the canonical
1758        // StartupFailed phase. The audit sink isn't installed yet
1759        // (it would have been registered inside `with_options`), so
1760        // the emit falls through to tracing+eprintln only — operator
1761        // still sees it on stderr.
1762        let msg = err.to_string();
1763        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1764            phase: "runtime_construction".to_string(),
1765            error: msg.clone(),
1766        }
1767        .emit_global();
1768        msg
1769    })?;
1770
1771    // PLAN.md Phase 5 / W6 — opt into serverless writer-lease fencing
1772    // when `RED_LEASE_REQUIRED=true`. Failure here aborts boot: the
1773    // operator asked for a fence; running unfenced would silently
1774    // expose split-brain risk.
1775    crate::runtime::lease_loop::start_lease_loop_if_required(&runtime).map_err(|err| {
1776        let msg = err.to_string();
1777        crate::telemetry::operator_event::OperatorEvent::StartupFailed {
1778            phase: "lease_loop".to_string(),
1779            error: msg.clone(),
1780        }
1781        .emit_global();
1782        msg
1783    })?;
1784
1785    // #213 — edge-triggered disk-space watchdog. Watches the data
1786    // directory; falls back to polling when fanotify is unavailable
1787    // (non-Linux or unprivileged container).
1788    if let Some(data_path) = db_options.data_path.as_deref() {
1789        let watch_dir = data_path.parent().unwrap_or(data_path);
1790        crate::runtime::disk_space_monitor::DiskSpaceMonitor::new(watch_dir, 90).spawn();
1791    }
1792
1793    // #214 — inotify config hot-reload watcher. Watches the config file
1794    // (REDDB_CONFIG_FILE or /etc/reddb/config.json) for changes and
1795    // applies hot-reloadable keys without restart.
1796    {
1797        let config_path = crate::runtime::config_overlay::config_file_path();
1798        let store = runtime.db().store();
1799        crate::runtime::config_watcher::ConfigWatcher::new(config_path, store).spawn();
1800    }
1801
1802    // Phase 6 logging: merge red_config overrides onto the CLI-built
1803    // telemetry config, then install the global subscriber.
1804    let merged = merge_telemetry_with_config(
1805        cli_telemetry
1806            .unwrap_or_else(|| default_telemetry_for_path(db_options.data_path.as_deref())),
1807        &runtime,
1808    );
1809    let telemetry_guard = crate::telemetry::init(merged);
1810
1811    let auth_store =
1812        if db_options.auth.vault_enabled {
1813            let pager =
1814                runtime.db().store().pager().cloned().ok_or_else(|| {
1815                    "vault requires a paged database (persistent mode)".to_string()
1816                })?;
1817            let store = AuthStore::with_vault(db_options.auth.clone(), pager, None)
1818                .map_err(|err| err.to_string())?;
1819            Arc::new(store)
1820        } else {
1821            Arc::new(AuthStore::new(db_options.auth.clone()))
1822        };
1823    auth_store.bootstrap_from_env();
1824
1825    // Background session purge (every 5 minutes)
1826    {
1827        let store = Arc::clone(&auth_store);
1828        std::thread::Builder::new()
1829            .name("reddb-session-purge".into())
1830            .spawn(move || loop {
1831                std::thread::sleep(std::time::Duration::from_secs(300));
1832                store.purge_expired_sessions();
1833            })
1834            .ok();
1835    }
1836
1837    Ok((runtime, auth_store, telemetry_guard))
1838}
1839
1840/// Read `red.logging.*` keys from the persistent config store and
1841/// merge them into the CLI-built `TelemetryConfig`. Merge priority:
1842/// explicit CLI flag > red_config > built-in default.
1843///
1844/// The "was a flag passed" signal comes from the `*_explicit` bools
1845/// on `TelemetryConfig`, populated by the CLI parser. This replaces
1846/// an earlier equality-to-default heuristic that silently dropped
1847/// config whenever the CLI-derived default diverged from
1848/// `TelemetryConfig::default()` (e.g. path-derived `log_dir`,
1849/// non-TTY `format`) and that silently overrode `--no-log-file`.
1850fn merge_telemetry_with_config(
1851    mut cli: crate::telemetry::TelemetryConfig,
1852    runtime: &RedDBRuntime,
1853) -> crate::telemetry::TelemetryConfig {
1854    use crate::storage::schema::Value;
1855
1856    let store = runtime.db().store();
1857
1858    if !cli.level_explicit {
1859        if let Some(Value::Text(v)) = store.get_config("red.logging.level") {
1860            cli.level_filter = v.to_string();
1861        }
1862    }
1863    if !cli.format_explicit {
1864        if let Some(Value::Text(v)) = store.get_config("red.logging.format") {
1865            if let Some(parsed) = crate::telemetry::LogFormat::parse(&v) {
1866                cli.format = parsed;
1867            }
1868        }
1869    }
1870    if !cli.rotation_keep_days_explicit {
1871        match store.get_config("red.logging.keep_days") {
1872            Some(Value::Integer(n)) if n >= 0 && n <= u16::MAX as i64 => {
1873                cli.rotation_keep_days = n as u16
1874            }
1875            Some(Value::UnsignedInteger(n)) if n <= u16::MAX as u64 => {
1876                cli.rotation_keep_days = n as u16
1877            }
1878            Some(Value::Text(v)) => {
1879                if let Ok(n) = v.parse::<u16>() {
1880                    cli.rotation_keep_days = n;
1881                }
1882            }
1883            _ => {}
1884        }
1885    }
1886    if !cli.file_prefix_explicit {
1887        if let Some(Value::Text(v)) = store.get_config("red.logging.file_prefix") {
1888            if !v.is_empty() {
1889                cli.file_prefix = v.to_string();
1890            }
1891        }
1892    }
1893    // --no-log-file is a kill-switch: config cannot resurrect the
1894    // file sink. Explicit --log-dir also wins.
1895    if !cli.log_dir_explicit && !cli.log_file_disabled {
1896        if let Some(Value::Text(v)) = store.get_config("red.logging.dir") {
1897            if !v.is_empty() {
1898                cli.log_dir = Some(std::path::PathBuf::from(v.as_ref()));
1899            }
1900        }
1901    }
1902
1903    cli
1904}
1905
1906#[cfg(test)]
1907mod telemetry_merge_tests {
1908    use super::*;
1909    use crate::telemetry::{LogFormat, TelemetryConfig};
1910
1911    fn fresh_runtime() -> RedDBRuntime {
1912        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime")
1913    }
1914
1915    fn set_str(runtime: &RedDBRuntime, key: &str, value: &str) {
1916        runtime
1917            .db()
1918            .store()
1919            .set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1920    }
1921
1922    fn cli_base() -> TelemetryConfig {
1923        // Emulate default_telemetry_for_path(Some(path)) on a non-TTY host:
1924        // log_dir = Some(...), format = Json. Nothing marked explicit.
1925        TelemetryConfig {
1926            log_dir: Some(std::path::PathBuf::from("/tmp/reddb-default/logs")),
1927            format: LogFormat::Json,
1928            ..Default::default()
1929        }
1930    }
1931
1932    #[test]
1933    fn config_log_dir_promoted_when_flag_absent() {
1934        let runtime = fresh_runtime();
1935        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1936        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1937        assert_eq!(
1938            merged.log_dir.as_deref(),
1939            Some(std::path::Path::new("/var/log/reddb"))
1940        );
1941    }
1942
1943    #[test]
1944    fn explicit_log_dir_wins_over_config() {
1945        let runtime = fresh_runtime();
1946        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1947        let mut cli = cli_base();
1948        cli.log_dir = Some(std::path::PathBuf::from("/custom/dir"));
1949        cli.log_dir_explicit = true;
1950        let merged = merge_telemetry_with_config(cli, &runtime);
1951        assert_eq!(
1952            merged.log_dir.as_deref(),
1953            Some(std::path::Path::new("/custom/dir"))
1954        );
1955    }
1956
1957    #[test]
1958    fn no_log_file_beats_config_log_dir() {
1959        let runtime = fresh_runtime();
1960        set_str(&runtime, "red.logging.dir", "/var/log/reddb");
1961        let mut cli = cli_base();
1962        cli.log_dir = None;
1963        cli.log_file_disabled = true;
1964        let merged = merge_telemetry_with_config(cli, &runtime);
1965        assert!(
1966            merged.log_dir.is_none(),
1967            "--no-log-file must veto config dir"
1968        );
1969    }
1970
1971    #[test]
1972    fn config_format_promoted_on_non_tty_default() {
1973        // On non-TTY, default_telemetry_for_path yields format=Json even
1974        // though TelemetryConfig::default() is Pretty. The old equality
1975        // check silently dropped config here.
1976        let runtime = fresh_runtime();
1977        set_str(&runtime, "red.logging.format", "pretty");
1978        let merged = merge_telemetry_with_config(cli_base(), &runtime);
1979        assert_eq!(merged.format, LogFormat::Pretty);
1980    }
1981
1982    #[test]
1983    fn explicit_format_wins_over_config() {
1984        let runtime = fresh_runtime();
1985        set_str(&runtime, "red.logging.format", "pretty");
1986        let mut cli = cli_base();
1987        cli.format = LogFormat::Json;
1988        cli.format_explicit = true;
1989        let merged = merge_telemetry_with_config(cli, &runtime);
1990        assert_eq!(merged.format, LogFormat::Json);
1991    }
1992}
1993
1994#[inline(never)]
1995fn build_http_server(
1996    runtime: RedDBRuntime,
1997    auth_store: Arc<AuthStore>,
1998    bind_addr: String,
1999) -> RedDBServer {
2000    build_http_server_with_transport_readiness(
2001        runtime,
2002        auth_store,
2003        bind_addr,
2004        TransportReadiness::default(),
2005    )
2006}
2007
2008/// Apply the resolved HTTP limits to a freshly-built `RedDBServer`.
2009///
2010/// Centralised here so every `run_*` path goes through the same
2011/// resolver and the structured startup log line carries the same
2012/// `http_limits.*` fields regardless of transport combination.
2013fn apply_http_limits(
2014    server: RedDBServer,
2015    config: &ServerCommandConfig,
2016    runtime: &RedDBRuntime,
2017) -> RedDBServer {
2018    let store = runtime.db().store();
2019    let resolved =
2020        crate::server::http_limits::resolve_http_limits(&config.http_limits_cli, |key| match store
2021            .get_config(key)
2022        {
2023            Some(crate::storage::schema::Value::Text(v)) => Some(v.to_string()),
2024            Some(crate::storage::schema::Value::Integer(n)) if n >= 0 => Some(n.to_string()),
2025            Some(crate::storage::schema::Value::UnsignedInteger(n)) => Some(n.to_string()),
2026            _ => None,
2027        });
2028    tracing::info!(
2029        target: "reddb::http_limits",
2030        max_handlers = resolved.max_handlers,
2031        handler_timeout_ms = resolved.handler_timeout_ms,
2032        retry_after_secs = resolved.retry_after_secs,
2033        "http_limits resolved"
2034    );
2035    server.with_http_limits(resolved)
2036}
2037
2038#[inline(never)]
2039fn build_http_server_with_transport_readiness(
2040    runtime: RedDBRuntime,
2041    auth_store: Arc<AuthStore>,
2042    bind_addr: String,
2043    transport_readiness: TransportReadiness,
2044) -> RedDBServer {
2045    RedDBServer::with_options(
2046        runtime,
2047        ServerOptions {
2048            bind_addr,
2049            transport_readiness,
2050            ..ServerOptions::default()
2051        },
2052    )
2053    .with_auth(auth_store)
2054}
2055
2056/// PLAN.md Phase 6.2 — build a listener that only serves
2057/// `/admin/*` + `/metrics` + `/health/*`. Defaults to `127.0.0.1`
2058/// when the env var has no host (loopback-only by default per spec).
2059#[inline(never)]
2060fn build_admin_only_server(
2061    runtime: RedDBRuntime,
2062    auth_store: Arc<AuthStore>,
2063    bind_addr: String,
2064) -> RedDBServer {
2065    RedDBServer::with_options(
2066        runtime,
2067        ServerOptions {
2068            bind_addr,
2069            surface: crate::server::ServerSurface::AdminOnly,
2070            ..ServerOptions::default()
2071        },
2072    )
2073    .with_auth(auth_store)
2074}
2075
2076/// PLAN.md Phase 6.2 — build a listener that only serves `/metrics`
2077/// + `/health/*`. Suitable for Prometheus scrape ports that may be
2078///   exposed wider than the admin port.
2079#[inline(never)]
2080fn build_metrics_only_server(
2081    runtime: RedDBRuntime,
2082    auth_store: Arc<AuthStore>,
2083    bind_addr: String,
2084) -> RedDBServer {
2085    RedDBServer::with_options(
2086        runtime,
2087        ServerOptions {
2088            bind_addr,
2089            surface: crate::server::ServerSurface::MetricsOnly,
2090            ..ServerOptions::default()
2091        },
2092    )
2093    .with_auth(auth_store)
2094}
2095
2096/// Spawn dedicated admin / metrics listeners when the operator set
2097/// `RED_ADMIN_BIND` / `RED_METRICS_BIND`. Both are optional; when
2098/// unset the existing listener keeps serving everything (back-compat).
2099fn spawn_admin_metrics_listeners(runtime: &RedDBRuntime, auth_store: &Arc<AuthStore>) {
2100    if let Some(addr) = env_nonempty("RED_ADMIN_BIND") {
2101        let server = build_admin_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2102        let _ = server.serve_in_background();
2103        tracing::info!(transport = "http", surface = "admin", bind = %addr, "listener online");
2104    }
2105    if let Some(addr) = env_nonempty("RED_METRICS_BIND") {
2106        let server = build_metrics_only_server(runtime.clone(), auth_store.clone(), addr.clone());
2107        let _ = server.serve_in_background();
2108        tracing::info!(transport = "http", surface = "metrics", bind = %addr, "listener online");
2109    }
2110}
2111
2112#[inline(never)]
2113fn run_http_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2114    let cli_telemetry = config.telemetry.clone();
2115    let mut transport_readiness = TransportReadiness::default();
2116    let Some(listener) = bind_listener_for_startup(
2117        &mut transport_readiness,
2118        "http",
2119        &bind_addr,
2120        config.http_bind_explicit,
2121    )?
2122    else {
2123        return Err(format!(
2124            "no HTTP listener started; implicit bind {} failed",
2125            bind_addr
2126        ));
2127    };
2128    let db_options = config.to_db_options()?;
2129    let (runtime, auth_store, _telemetry_guard) =
2130        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2131    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2132    spawn_admin_metrics_listeners(&runtime, &auth_store);
2133    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2134    let server = build_http_server_with_transport_readiness(
2135        runtime.clone(),
2136        auth_store,
2137        bind_addr.clone(),
2138        transport_readiness,
2139    );
2140    let server = apply_http_limits(server, &config, &runtime);
2141    tracing::info!(transport = "http", bind = %bind_addr, "listener online");
2142    server.serve_on(listener).map_err(|err| err.to_string())
2143}
2144
2145/// PLAN.md HTTP TLS — when `http_tls_bind_addr` is set, spawn a
2146/// rustls-terminated listener alongside the plain HTTP server. Cert
2147/// + key paths come from CLI flags or `REDDB_HTTP_TLS_*` env vars; if
2148///   both are absent and `RED_HTTP_TLS_DEV=1` is set, a self-signed cert
2149///   is auto-generated next to the data directory (refused otherwise).
2150fn spawn_http_tls_listener(
2151    config: &ServerCommandConfig,
2152    runtime: &RedDBRuntime,
2153    auth_store: &Arc<AuthStore>,
2154) -> Result<(), String> {
2155    let Some(addr) = config.http_tls_bind_addr.clone() else {
2156        return Ok(());
2157    };
2158
2159    let tls_config = resolve_http_tls_config(config)?;
2160    let server_config = crate::server::tls::build_server_config(&tls_config)
2161        .map_err(|err| format!("HTTP TLS: {err}"))?;
2162
2163    let server = build_http_server(runtime.clone(), auth_store.clone(), addr.clone());
2164    let server = apply_http_limits(server, config, runtime);
2165    let _handle = server.serve_tls_in_background(server_config);
2166    tracing::info!(
2167        transport = "https",
2168        bind = %addr,
2169        mtls = %tls_config.client_ca_path.is_some(),
2170        "TLS listener online"
2171    );
2172    Ok(())
2173}
2174
2175/// Resolve the HTTP TLS config from CLI / env / dev defaults.
2176fn resolve_http_tls_config(
2177    config: &ServerCommandConfig,
2178) -> Result<crate::server::tls::HttpTlsConfig, String> {
2179    match (&config.http_tls_cert, &config.http_tls_key) {
2180        (Some(cert), Some(key)) => Ok(crate::server::tls::HttpTlsConfig {
2181            cert_path: cert.clone(),
2182            key_path: key.clone(),
2183            client_ca_path: config.http_tls_client_ca.clone(),
2184        }),
2185        (None, None) => {
2186            // Dev-mode auto-generate next to the data directory.
2187            let dir = config
2188                .path
2189                .as_ref()
2190                .and_then(|p| p.parent().map(std::path::PathBuf::from))
2191                .unwrap_or_else(|| std::path::PathBuf::from("."));
2192            let auto = crate::server::tls::auto_generate_dev_cert(&dir)
2193                .map_err(|err| format!("HTTP TLS dev: {err}"))?;
2194            Ok(crate::server::tls::HttpTlsConfig {
2195                cert_path: auto.cert_path,
2196                key_path: auto.key_path,
2197                client_ca_path: config.http_tls_client_ca.clone(),
2198            })
2199        }
2200        _ => Err("HTTP TLS requires both --http-tls-cert and --http-tls-key (or neither, with RED_HTTP_TLS_DEV=1)".to_string()),
2201    }
2202}
2203
2204#[inline(never)]
2205fn run_grpc_server(config: ServerCommandConfig, bind_addr: String) -> Result<(), String> {
2206    let workers = config.workers;
2207    let cli_telemetry = config.telemetry.clone();
2208    let db_options = config.to_db_options()?;
2209    let rt_config = detect_runtime_config();
2210    let mut transport_readiness = TransportReadiness::default();
2211    let Some(grpc_listener) = bind_listener_for_startup(
2212        &mut transport_readiness,
2213        "grpc",
2214        &bind_addr,
2215        config.grpc_bind_explicit,
2216    )?
2217    else {
2218        return Err(format!(
2219            "no gRPC listener started; implicit bind {} failed",
2220            bind_addr
2221        ));
2222    };
2223
2224    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2225
2226    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2227        .enable_all()
2228        .worker_threads(worker_threads)
2229        .thread_stack_size(rt_config.stack_size)
2230        .build()
2231        .map_err(|err| format!("tokio runtime: {err}"))?;
2232
2233    // Guard lives on the outer stack so it outlives the tokio runtime.
2234    let (runtime, auth_store, _telemetry_guard) =
2235        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2236    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2237    let signal_runtime = runtime.clone();
2238    tokio_runtime.block_on(async move {
2239        spawn_lifecycle_signal_handler(signal_runtime).await;
2240        // Start wire protocol listeners (plaintext + TLS)
2241        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2242
2243        // Start PostgreSQL wire listener when --pg-bind is configured.
2244        spawn_pg_listener(&config, &runtime);
2245
2246        // Optional TLS gRPC listener. When `grpc_tls_bind_addr` is set
2247        // it spawns a separate listener so plaintext + TLS can run
2248        // side-by-side (50051 plain + 50052 TLS, etc.).
2249        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2250
2251        let server = RedDBGrpcServer::with_options(
2252            runtime,
2253            GrpcServerOptions {
2254                bind_addr: bind_addr.clone(),
2255                tls: None,
2256            },
2257            auth_store,
2258        );
2259
2260        tracing::info!(
2261            transport = "grpc",
2262            bind = %bind_addr,
2263            cpus = rt_config.available_cpus,
2264            workers = worker_threads,
2265            "listener online"
2266        );
2267        server
2268            .serve_on(grpc_listener)
2269            .await
2270            .map_err(|err| err.to_string())
2271    })
2272}
2273
2274#[inline(never)]
2275fn run_dual_server(
2276    config: ServerCommandConfig,
2277    grpc_bind_addr: String,
2278    http_bind_addr: String,
2279) -> Result<(), String> {
2280    let workers = config.workers;
2281    let cli_telemetry = config.telemetry.clone();
2282    let db_options = config.to_db_options()?;
2283    let rt_config = detect_runtime_config();
2284    let worker_threads = workers.unwrap_or(rt_config.suggested_workers);
2285    let mut transport_readiness = TransportReadiness::default();
2286    let http_listener = bind_listener_for_startup(
2287        &mut transport_readiness,
2288        "http",
2289        &http_bind_addr,
2290        config.http_bind_explicit,
2291    )?;
2292    let grpc_listener = bind_listener_for_startup(
2293        &mut transport_readiness,
2294        "grpc",
2295        &grpc_bind_addr,
2296        config.grpc_bind_explicit,
2297    )?;
2298    if http_listener.is_none() && grpc_listener.is_none() {
2299        return Err("no listener started; implicit HTTP and gRPC binds failed".to_string());
2300    }
2301    let (runtime, auth_store, _telemetry_guard) =
2302        build_runtime_and_auth_store(db_options.clone(), cli_telemetry)?;
2303    let _backup_tasks = spawn_backup_tasks_if_configured(&db_options, &runtime);
2304
2305    spawn_admin_metrics_listeners(&runtime, &auth_store);
2306    spawn_http_tls_listener(&config, &runtime, &auth_store)?;
2307
2308    let http_handle = if let Some(listener) = http_listener {
2309        let http_server = build_http_server_with_transport_readiness(
2310            runtime.clone(),
2311            auth_store.clone(),
2312            http_bind_addr.clone(),
2313            transport_readiness.clone(),
2314        );
2315        let http_server = apply_http_limits(http_server, &config, &runtime);
2316        Some(http_server.serve_in_background_on(listener))
2317    } else {
2318        None
2319    };
2320
2321    thread::sleep(Duration::from_millis(150));
2322    if let Some(handle) = http_handle.as_ref() {
2323        if handle.is_finished() {
2324            let handle = http_handle.unwrap();
2325            return match handle.join() {
2326                Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2327                Ok(Err(err)) => Err(err.to_string()),
2328                Err(_) => Err("HTTP server thread panicked".to_string()),
2329            };
2330        }
2331    }
2332    if grpc_listener.is_none() {
2333        let Some(handle) = http_handle else {
2334            return Err("no listener started".to_string());
2335        };
2336        return match handle.join() {
2337            Ok(Ok(())) => Err("HTTP server exited unexpectedly".to_string()),
2338            Ok(Err(err)) => Err(err.to_string()),
2339            Err(_) => Err("HTTP server thread panicked".to_string()),
2340        };
2341    }
2342    let grpc_listener = grpc_listener.expect("checked above");
2343
2344    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
2345        .enable_all()
2346        .worker_threads(worker_threads)
2347        .thread_stack_size(rt_config.stack_size)
2348        .build()
2349        .map_err(|err| format!("tokio runtime: {err}"))?;
2350
2351    let signal_runtime = runtime.clone();
2352    tokio_runtime.block_on(async move {
2353        spawn_lifecycle_signal_handler(signal_runtime).await;
2354        // Start wire protocol listeners (plaintext + TLS)
2355        spawn_wire_listeners(&config, &runtime, &mut transport_readiness).await?;
2356
2357        // Start PostgreSQL wire listener when --pg-bind is configured.
2358        spawn_pg_listener(&config, &runtime);
2359
2360        // Optional TLS gRPC listener — runs alongside the plaintext one.
2361        spawn_grpc_tls_listener_if_configured(&config, runtime.clone(), auth_store.clone());
2362
2363        let server = RedDBGrpcServer::with_options(
2364            runtime,
2365            GrpcServerOptions {
2366                bind_addr: grpc_bind_addr.clone(),
2367                tls: None,
2368            },
2369            auth_store,
2370        );
2371
2372        tracing::info!(transport = "http", bind = %http_bind_addr, "listener online");
2373        tracing::info!(
2374            transport = "grpc",
2375            bind = %grpc_bind_addr,
2376            cpus = rt_config.available_cpus,
2377            workers = worker_threads,
2378            "listener online"
2379        );
2380        server
2381            .serve_on(grpc_listener)
2382            .await
2383            .map_err(|err| err.to_string())
2384    })
2385}
2386
2387#[cfg(test)]
2388mod tests {
2389    use super::*;
2390
2391    #[test]
2392    fn render_systemd_unit_contains_expected_execstart() {
2393        let config = SystemdServiceConfig {
2394            service_name: "reddb".to_string(),
2395            binary_path: PathBuf::from("/usr/local/bin/red"),
2396            run_user: "reddb".to_string(),
2397            run_group: "reddb".to_string(),
2398            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2399            router_bind_addr: None,
2400            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2401            http_bind_addr: None,
2402        };
2403
2404        let unit = render_systemd_unit(&config);
2405        assert!(unit.contains("ExecStart=/usr/local/bin/red server --path /var/lib/reddb/data.rdb --grpc-bind 0.0.0.0:5555"));
2406        assert!(unit.contains("ReadWritePaths=/var/lib/reddb"));
2407    }
2408
2409    #[test]
2410    fn systemd_service_config_derives_paths() {
2411        let config = SystemdServiceConfig {
2412            service_name: "reddb-api".to_string(),
2413            binary_path: PathBuf::from("/usr/local/bin/red"),
2414            run_user: "reddb".to_string(),
2415            run_group: "reddb".to_string(),
2416            data_path: PathBuf::from("/srv/reddb/live/data.rdb"),
2417            router_bind_addr: None,
2418            grpc_bind_addr: None,
2419            http_bind_addr: Some("127.0.0.1:5055".to_string()),
2420        };
2421
2422        assert_eq!(config.data_dir(), PathBuf::from("/srv/reddb/live"));
2423        assert_eq!(
2424            config.unit_path(),
2425            PathBuf::from("/etc/systemd/system/reddb-api.service")
2426        );
2427    }
2428
2429    #[test]
2430    fn render_systemd_unit_supports_dual_transport() {
2431        let config = SystemdServiceConfig {
2432            service_name: "reddb".to_string(),
2433            binary_path: PathBuf::from("/usr/local/bin/red"),
2434            run_user: "reddb".to_string(),
2435            run_group: "reddb".to_string(),
2436            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2437            router_bind_addr: None,
2438            grpc_bind_addr: Some("0.0.0.0:5555".to_string()),
2439            http_bind_addr: Some("0.0.0.0:5055".to_string()),
2440        };
2441
2442        let unit = render_systemd_unit(&config);
2443        assert!(unit.contains("--grpc-bind 0.0.0.0:5555"));
2444        assert!(unit.contains("--http-bind 0.0.0.0:5055"));
2445    }
2446
2447    #[test]
2448    fn render_systemd_unit_supports_router_mode() {
2449        let config = SystemdServiceConfig {
2450            service_name: "reddb".to_string(),
2451            binary_path: PathBuf::from("/usr/local/bin/red"),
2452            run_user: "reddb".to_string(),
2453            run_group: "reddb".to_string(),
2454            data_path: PathBuf::from("/var/lib/reddb/data.rdb"),
2455            router_bind_addr: Some(DEFAULT_ROUTER_BIND_ADDR.to_string()),
2456            grpc_bind_addr: None,
2457            http_bind_addr: None,
2458        };
2459
2460        let unit = render_systemd_unit(&config);
2461        assert!(unit.contains("--bind 127.0.0.1:5050"));
2462        assert!(!unit.contains("--grpc-bind"));
2463        assert!(!unit.contains("--http-bind"));
2464    }
2465
2466    #[test]
2467    fn explicit_bind_collision_is_fatal() {
2468        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2469        let addr = held.local_addr().expect("held addr").to_string();
2470        let mut readiness = TransportReadiness::default();
2471
2472        let error = bind_listener_for_startup(&mut readiness, "http", &addr, true).unwrap_err();
2473
2474        assert!(error.contains("explicit http listener bind"));
2475        assert_eq!(readiness.active.len(), 0);
2476        assert_eq!(readiness.failed.len(), 1);
2477        assert!(readiness.failed[0].explicit);
2478        assert_eq!(readiness.failed[0].bind_addr, addr);
2479    }
2480
2481    #[test]
2482    fn implicit_bind_collision_degrades() {
2483        let held = TcpListener::bind("127.0.0.1:0").expect("hold test port");
2484        let addr = held.local_addr().expect("held addr").to_string();
2485        let mut readiness = TransportReadiness::default();
2486
2487        let listener =
2488            bind_listener_for_startup(&mut readiness, "http", &addr, false).expect("nonfatal");
2489
2490        assert!(listener.is_none());
2491        assert_eq!(readiness.active.len(), 0);
2492        assert_eq!(readiness.failed.len(), 1);
2493        assert!(!readiness.failed[0].explicit);
2494        assert_eq!(readiness.failed[0].bind_addr, addr);
2495    }
2496}