Skip to main content

osproxy_config/
lib.rs

1//! Typed configuration.
2//!
3//! Loads and fully validates configuration (file → environment → flags) before
4//! any socket opens, producing validated value objects the other crates consume
5//! (`docs/01` §6). Invalid config fails fast with a typed, actionable
6//! [`ConfigError`] naming the bad field. It contains no business logic, it only
7//! turns strings into validated values; mapping those to domain types (the
8//! crypto provider, the pipeline) is the binary's job. Hot-reloadable state
9//! (directives, placement) goes through `osproxy-control`, not here.
10//!
11//! # Example
12//!
13//! ```
14//! use osproxy_config::Config;
15//! // Defaults apply when nothing is set; a bad value is a typed error.
16//! let cfg = Config::resolve_for_test(&[("bind", "0.0.0.0:9000")]).unwrap();
17//! assert_eq!(cfg.bind.port(), 9000);
18//! assert!(cfg.require_tls_for_mutation, "enforced by default (NFR-S1)");
19//! assert!(Config::resolve_for_test(&[("bind", "not-an-addr")]).is_err());
20//! ```
21#![deny(missing_docs)]
22
23mod raw;
24mod resolve;
25
26use std::net::SocketAddr;
27
28use raw::Raw;
29
30/// The fully validated configuration the binary serves from. Every field is a
31/// ready-to-use value object; no further parsing or fallbacks happen downstream.
32#[derive(Clone, Debug, PartialEq, Eq)]
33pub struct Config {
34    /// The HTTP ingress bind address.
35    pub bind: SocketAddr,
36    /// The optional gRPC ingress bind address (off when `None`).
37    pub grpc_bind: Option<SocketAddr>,
38    /// The upstream OpenSearch base URL for the single configured cluster.
39    pub upstream: String,
40    /// The shared physical index the reference tenancy writes into.
41    pub index: String,
42    /// The `token -> principal` auth map; empty means permissive dev mode.
43    pub tokens: Vec<(String, String)>,
44    /// Whether a body-mutating request is refused over cleartext (NFR-S1). True
45    /// (enforce) unless `allow_cleartext_mutation` opts out.
46    pub require_tls_for_mutation: bool,
47    /// TLS termination settings, or `None` for cleartext ingress.
48    pub tls: Option<TlsConfig>,
49    /// Observability + control-plane settings.
50    pub observability: ObservabilityConfig,
51    /// Admin (`_cat`/`_cluster`/`_nodes`) pass-through policy, or `None` to reject.
52    pub admin_passthrough: Option<AdminPassthroughConfig>,
53    /// The shared HMAC key enabling scroll/PIT cursor affinity, or `None` (off).
54    pub cursor_affinity_key: Option<String>,
55    /// Tenant-agnostic passthrough policy, or `None` = pure tenancy mode (the
56    /// default). Used for a transparent proxy or to pass selected (e.g. not-yet-
57    /// onboarded) indices through verbatim while tenant-isolating the rest.
58    pub passthrough: Option<PassthroughConfig>,
59    /// Which client headers the proxy relays to the upstream when it forwards a
60    /// request (the verbatim passthrough/admin/cursor paths). Pass-all by default
61    /// (sidecar trust), minus the mandatory hop-by-hop/framing set.
62    pub header_forwarding: HeaderForwardingConfig,
63    /// Full-fidelity traffic capture to a Kafka topic, or `None` (off). Requires
64    /// the binary be built with the `capture` feature; a configured capture
65    /// on a binary without it is a loud startup error rather than a silent no-op.
66    pub capture: Option<CaptureConfig>,
67    /// Whether capture is on for every request before any directive (default
68    /// `false`). `false` = capture on demand: nothing is teed until a published
69    /// `capture` directive selects requests. `true` = always-capture (a dedicated
70    /// capture/migration proxy). Independent of the sink: it only decides *when*
71    /// to capture; the sink still needs the `capture` feature + config.
72    pub capture_default: bool,
73    /// Async fan-out write queue (`docs/04` §9), or `None` (off). Requires the
74    /// `fanout` feature; a configured fan-out on a binary without it is a
75    /// loud startup error rather than a silent no-op.
76    pub fanout: Option<FanoutConfig>,
77    /// etcd-backed distributed directive store (`docs/05` §3), or `None` to use
78    /// the in-memory store + admin publish endpoint. Requires the `etcd` feature;
79    /// a configured etcd on a binary without it is a loud startup error.
80    pub etcd: Option<EtcdConfig>,
81}
82
83/// etcd connection settings for the distributed directive store. Plain data (no
84/// etcd client types), so the config crate stays free of the etcd dependency.
85#[derive(Clone, Debug, PartialEq, Eq)]
86pub struct EtcdConfig {
87    /// The etcd endpoints (`host:port` or full URLs), at least one.
88    pub endpoints: Vec<String>,
89    /// The key the fleet directive set is published to and watched at.
90    pub directives_key: String,
91}
92
93/// Async fan-out write queue settings: where resolved write ops are enqueued for
94/// a downstream applier (`docs/04` §9, ADR-010). Plain data (no broker types), so
95/// the config crate stays free of any Kafka client.
96#[derive(Clone, Debug, PartialEq, Eq)]
97pub struct FanoutConfig {
98    /// The Kafka bootstrap brokers (`host:port`), at least one.
99    pub brokers: Vec<String>,
100    /// The topic each op envelope is produced to.
101    pub topic: String,
102    /// TLS to the brokers, or `None` for a plaintext broker connection.
103    pub tls: Option<CaptureTlsConfig>,
104    /// How the document body is encoded in the envelope (default CBOR).
105    pub body_encoding: FanoutBodyEncoding,
106    /// Whether async is the deployment-default write mode (default `false`):
107    /// `false` = sync unless a request sends `X-Write-Mode: async`; `true` =
108    /// async unless a request sends `X-Write-Mode: sync`.
109    pub async_default: bool,
110    /// Whether `_delete_by_query` may be expanded into per-match deletes in async
111    /// mode (default `false`). It reads the match set and enqueues a delete each,
112    /// so it is opt-in (`docs/04` §9).
113    pub expand_delete_by_query: bool,
114}
115
116/// The on-the-wire encoding of the fan-out op-envelope document body.
117#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
118pub enum FanoutBodyEncoding {
119    /// CBOR (RFC 8949): compact binary, OpenSearch-native. The default.
120    #[default]
121    Cbor,
122    /// Verbatim JSON: human-readable for debugging the queue.
123    Json,
124}
125
126/// Full-fidelity traffic capture settings: where to send the captured exchange
127/// stream. This is plain data (no broker types), so the config crate stays
128/// independent of any Kafka client; the binary builds the producer from it.
129#[derive(Clone, Debug, PartialEq, Eq)]
130pub struct CaptureConfig {
131    /// The Kafka bootstrap brokers (`host:port`), at least one.
132    pub brokers: Vec<String>,
133    /// The topic each captured exchange envelope is produced to.
134    pub topic: String,
135    /// Whether to redact the `Authorization` header from the captured stream
136    /// (default `true`). The capture stream is privileged and carries bodies and
137    /// values verbatim, so credentials are stripped unless explicitly kept.
138    pub redact: bool,
139    /// TLS to the brokers, or `None` for a plaintext broker connection.
140    pub tls: Option<CaptureTlsConfig>,
141    /// The most records in flight (buffered + retrying) at once before a produce
142    /// is dropped, bounding memory. Higher = fewer drops under load, more memory.
143    pub max_inflight: usize,
144    /// Total send attempts per record before giving up. Higher = better delivery
145    /// odds across a transient broker blip, at the cost of more retry work.
146    pub max_attempts: u32,
147    /// The first retry backoff in milliseconds; it doubles after each failure.
148    pub backoff_ms: u64,
149    /// Directory for the durable on-disk spill buffer, or `None` for in-memory
150    /// best-effort. Set it for **at-least-once** capture that survives a restart:
151    /// records persist to a write-ahead log here and replay until acknowledged.
152    pub wal_dir: Option<String>,
153    /// Cap on undelivered bytes in the spill buffer before new records are dropped
154    /// (only meaningful with `wal_dir`). Bounds disk like `max_inflight` bounds memory.
155    pub wal_max_bytes: u64,
156}
157
158/// TLS settings for the capture broker connection: PEM file **paths** (the binary
159/// reads them). Presence of `ca_path` pins that CA; a client cert/key pair adds
160/// mTLS.
161#[derive(Clone, Debug, PartialEq, Eq)]
162pub struct CaptureTlsConfig {
163    /// Path to the CA PEM the broker certificate must chain to (pinned trust).
164    pub ca_path: String,
165    /// Path to the client certificate chain PEM for mTLS, or `None`.
166    pub client_cert_path: Option<String>,
167    /// Path to the client private key PEM for mTLS, or `None`.
168    pub client_key_path: Option<String>,
169}
170
171/// TLS termination settings: PEM file **paths** (the binary reads them, config
172/// stays free of certificate material). mTLS is required when `client_ca_path`
173/// is set.
174#[derive(Clone, Debug, PartialEq, Eq)]
175pub struct TlsConfig {
176    /// Path to the server certificate chain PEM.
177    pub cert_path: String,
178    /// Path to the server private key PEM.
179    pub key_path: String,
180    /// Path to the client-CA PEM that client certs must chain to (enables mTLS).
181    pub client_ca_path: Option<String>,
182}
183
184/// Observability and control-plane channel settings.
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct ObservabilityConfig {
187    /// Whether to emit a structured JSON log line per request.
188    pub log_requests: bool,
189    /// The OTLP collector base URL for span export, or `None` (export off).
190    pub otlp_endpoint: Option<String>,
191    /// The `service.name` reported on exported spans.
192    pub service_name: String,
193    /// The baseline diagnostics verbosity applied before any directive.
194    pub diag_baseline: DiagBaseline,
195    /// The shared HMAC key verifying signed `X-Debug-Directive` headers, or `None`.
196    pub debug_directive_key: Option<String>,
197    /// The bearer token gating `POST/GET /admin/directives`, or `None` (disabled).
198    pub directive_admin_token: Option<String>,
199    /// Whether the pre-auth `/debug/explain` and `/debug/breakglass` surfaces are
200    /// served (default `true`). Set `false` in production so operational metadata
201    /// is not exposed unauthenticated; `/metrics` stays on regardless.
202    pub debug_endpoints: bool,
203    /// Whether directive-selected break-glass captures are also pushed off-instance
204    /// as structured JSON lines (default `false`), so a fleet aggregator can serve
205    /// them by `trace_id` rather than only the local per-instance ring (`docs/05`
206    /// §5). The fleet-coherent counterpart of the break-glass tape.
207    pub log_diagnostic_captures: bool,
208}
209
210/// The admin pass-through policy: the cluster that answers admin requests and the
211/// allow-listed path prefixes.
212#[derive(Clone, Debug, PartialEq, Eq)]
213pub struct AdminPassthroughConfig {
214    /// The cluster id admin requests are forwarded to.
215    pub cluster: String,
216    /// The allow-listed path prefixes (e.g. `/_cat/`).
217    pub prefixes: Vec<String>,
218    /// The admin cluster's base URL, or `None` to resolve it via the tenancy's
219    /// `cluster_endpoint` lookup.
220    pub endpoint: Option<String>,
221}
222
223/// Client-to-upstream header forwarding (`forward_client_headers` /
224/// `forward_header_deny`). The proxy rebuilds the upstream request, so this
225/// decides which of the client's own headers ride along.
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub struct HeaderForwardingConfig {
228    /// Forward client headers to the upstream at all (default `true`). `false`
229    /// relays only the proxy-managed headers (content type, trace).
230    pub enabled: bool,
231    /// Extra headers to drop (case-insensitive), on top of the mandatory
232    /// hop-by-hop/framing set. E.g. `authorization` to keep the client credential
233    /// off the cluster. Empty by default (pass-all).
234    pub deny: Vec<String>,
235}
236
237impl Default for HeaderForwardingConfig {
238    fn default() -> Self {
239        // Pass-all: the sidecar-trust default. The proxy is typically co-located
240        // with the client, so the client's own headers (including auth and vendor
241        // tracing) should reach the cluster unless an operator restricts them.
242        Self {
243            enabled: true,
244            deny: Vec::new(),
245        }
246    }
247}
248
249/// Tenant-agnostic passthrough: forward matching requests verbatim to one
250/// cluster with no tenancy rewrite.
251#[derive(Clone, Debug, PartialEq, Eq)]
252pub struct PassthroughConfig {
253    /// The cluster id matching requests are forwarded to.
254    pub cluster: String,
255    /// The cluster's base URL (the sink pools it).
256    pub endpoint: String,
257    /// Logical-index prefixes that pass through verbatim; empty ⇒ every request
258    /// passes through (whole-instance transparent proxy). A non-empty list
259    /// tenant-isolates every index that does not match (fail-closed).
260    pub index_prefixes: Vec<String>,
261}
262
263/// The baseline diagnostics verbosity. A config-local enum so this crate stays
264/// independent of `osproxy-observe`; the binary maps it to the engine's level.
265#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
266pub enum DiagBaseline {
267    /// Export nothing until a directive selects a request.
268    Off,
269    /// Shapes/ids/field-names only (the default).
270    #[default]
271    Shape,
272    /// `Shape` plus per-stage timing.
273    ShapeTiming,
274    /// `Shape` plus the rewrite diff shape.
275    ShapeRewriteDiff,
276}
277
278impl DiagBaseline {
279    /// The canonical wire/config string for this level.
280    #[must_use]
281    pub fn as_str(self) -> &'static str {
282        match self {
283            Self::Off => "off",
284            Self::Shape => "shape",
285            Self::ShapeTiming => "shape-timing",
286            Self::ShapeRewriteDiff => "shape-rewrite-diff",
287        }
288    }
289}
290
291/// A configuration failure: which setting was bad and why. `Display` is a single
292/// actionable line for both an operator and an LLM (`docs/01` §6).
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct ConfigError {
295    field: String,
296    reason: String,
297}
298
299impl ConfigError {
300    /// An invalid value for a known `field`.
301    #[must_use]
302    pub fn invalid(field: impl Into<String>, reason: impl Into<String>) -> Self {
303        Self {
304            field: field.into(),
305            reason: reason.into(),
306        }
307    }
308
309    /// An unrecognized setting key (typo / unsupported option).
310    #[must_use]
311    pub fn unknown(field: impl Into<String>) -> Self {
312        Self {
313            field: field.into(),
314            reason: "unknown setting".to_owned(),
315        }
316    }
317
318    /// The offending setting's name.
319    #[must_use]
320    pub fn field(&self) -> &str {
321        &self.field
322    }
323}
324
325impl std::fmt::Display for ConfigError {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        write!(f, "config: `{}`: {}", self.field, self.reason)
328    }
329}
330
331impl std::error::Error for ConfigError {}
332
333impl Config {
334    /// Loads and validates configuration from the process environment plus
335    /// `args` (CLI flags without the program name). A `--config <path>` flag, or
336    /// the `OSPROXY_CONFIG` env var, names a config file read as the lowest layer.
337    ///
338    /// # Errors
339    ///
340    /// Returns a [`ConfigError`] if a file/flag is malformed, a key is unknown, or
341    /// any value fails validation, before any socket is opened.
342    pub fn load<I: IntoIterator<Item = String>>(args: I) -> Result<Self, ConfigError> {
343        let (file_flag, flags) = extract_config_flag(args)?;
344        let file_path = file_flag.or_else(|| {
345            std::env::var("OSPROXY_CONFIG")
346                .ok()
347                .filter(|v| !v.is_empty())
348        });
349        let file = match &file_path {
350            Some(path) => {
351                let text = std::fs::read_to_string(path)
352                    .map_err(|e| ConfigError::invalid("config", format!("reading {path}: {e}")))?;
353                Raw::from_file(&text)?
354            }
355            None => Raw::default(),
356        };
357        let raw = Raw::layered(file, Raw::from_env(), Raw::from_flags(flags)?);
358        resolve::resolve(&raw)
359    }
360
361    /// Test-only: resolve a [`Config`] directly from an in-memory `(key, value)`
362    /// list (canonical keys), skipping the file/env/flag layering. Lets tests and
363    /// doc examples exercise validation without touching the process environment.
364    ///
365    /// # Errors
366    ///
367    /// Returns a [`ConfigError`] if any value fails validation.
368    pub fn resolve_for_test(pairs: &[(&str, &str)]) -> Result<Self, ConfigError> {
369        resolve::resolve(&Raw::from_pairs(pairs)?)
370    }
371}
372
373/// Splits a reserved `--config <path>` / `--config=<path>` flag out of the
374/// argument list, returning the path (if any) and the remaining flags.
375fn extract_config_flag<I: IntoIterator<Item = String>>(
376    args: I,
377) -> Result<(Option<String>, Vec<String>), ConfigError> {
378    let mut file = None;
379    let mut rest = Vec::new();
380    let mut args = args.into_iter();
381    while let Some(arg) = args.next() {
382        if arg == "--config" {
383            file = Some(
384                args.next()
385                    .ok_or_else(|| ConfigError::invalid("config", "--config needs a path"))?,
386            );
387        } else if let Some(path) = arg.strip_prefix("--config=") {
388            file = Some(path.to_owned());
389        } else {
390            rest.push(arg);
391        }
392    }
393    Ok((file, rest))
394}