osproxy_config/lib.rs
1//! Typed configuration.
2//!
3//! Loads and fully validates configuration (file → environment → flags) before
4//! any socket opens, producing validated value objects the other crates consume
5//! (`docs/01` §6). Invalid config fails fast with a typed, actionable
6//! [`ConfigError`] naming the bad field. It contains no business logic, it only
7//! turns strings into validated values; mapping those to domain types (the
8//! crypto provider, the pipeline) is the binary's job. Hot-reloadable state
9//! (directives, placement) goes through `osproxy-control`, not here.
10//!
11//! # Example
12//!
13//! ```
14//! use osproxy_config::Config;
15//! // Defaults apply when nothing is set; a bad value is a typed error.
16//! let cfg = Config::resolve_for_test(&[("bind", "0.0.0.0:9000")]).unwrap();
17//! assert_eq!(cfg.bind.port(), 9000);
18//! assert!(cfg.require_tls_for_mutation, "enforced by default (NFR-S1)");
19//! assert!(Config::resolve_for_test(&[("bind", "not-an-addr")]).is_err());
20//! ```
21#![deny(missing_docs)]
22
23mod raw;
24mod resolve;
25
26use std::net::SocketAddr;
27
28use raw::Raw;
29
30/// The fully validated configuration the binary serves from. Every field is a
31/// ready-to-use value object; no further parsing or fallbacks happen downstream.
32#[derive(Clone, Debug, PartialEq, Eq)]
33pub struct Config {
34 /// The HTTP ingress bind address.
35 pub bind: SocketAddr,
36 /// The optional gRPC ingress bind address (off when `None`).
37 pub grpc_bind: Option<SocketAddr>,
38 /// The upstream OpenSearch base URL for the single configured cluster.
39 pub upstream: String,
40 /// The shared physical index the reference tenancy writes into.
41 pub index: String,
42 /// The `token -> principal` auth map; empty means permissive dev mode.
43 pub tokens: Vec<(String, String)>,
44 /// Whether a body-mutating request is refused over cleartext (NFR-S1). True
45 /// (enforce) unless `allow_cleartext_mutation` opts out.
46 pub require_tls_for_mutation: bool,
47 /// TLS termination settings, or `None` for cleartext ingress.
48 pub tls: Option<TlsConfig>,
49 /// Observability + control-plane settings.
50 pub observability: ObservabilityConfig,
51 /// Admin (`_cat`/`_cluster`/`_nodes`) pass-through policy, or `None` to reject.
52 pub admin_passthrough: Option<AdminPassthroughConfig>,
53 /// The shared HMAC key enabling scroll/PIT cursor affinity, or `None` (off).
54 pub cursor_affinity_key: Option<String>,
55 /// Tenant-agnostic passthrough policy, or `None` = pure tenancy mode (the
56 /// default). Used for a transparent proxy or to pass selected (e.g. not-yet-
57 /// onboarded) indices through verbatim while tenant-isolating the rest.
58 pub passthrough: Option<PassthroughConfig>,
59 /// Which client headers the proxy relays to the upstream when it forwards a
60 /// request (the verbatim passthrough/admin/cursor paths). Pass-all by default
61 /// (sidecar trust), minus the mandatory hop-by-hop/framing set.
62 pub header_forwarding: HeaderForwardingConfig,
63 /// Full-fidelity traffic capture to a Kafka topic, or `None` (off). Requires
64 /// the binary be built with the `capture` feature; a configured capture
65 /// on a binary without it is a loud startup error rather than a silent no-op.
66 pub capture: Option<CaptureConfig>,
67 /// Whether capture is on for every request before any directive (default
68 /// `false`). `false` = capture on demand: nothing is teed until a published
69 /// `capture` directive selects requests. `true` = always-capture (a dedicated
70 /// capture/migration proxy). Independent of the sink: it only decides *when*
71 /// to capture; the sink still needs the `capture` feature + config.
72 pub capture_default: bool,
73 /// Async fan-out write queue (`docs/04` §9), or `None` (off). Requires the
74 /// `fanout` feature; a configured fan-out on a binary without it is a
75 /// loud startup error rather than a silent no-op.
76 pub fanout: Option<FanoutConfig>,
77 /// etcd-backed distributed directive store (`docs/05` §3), or `None` to use
78 /// the in-memory store + admin publish endpoint. Requires the `etcd` feature;
79 /// a configured etcd on a binary without it is a loud startup error.
80 pub etcd: Option<EtcdConfig>,
81}
82
83/// etcd connection settings for the distributed directive store. Plain data (no
84/// etcd client types), so the config crate stays free of the etcd dependency.
85#[derive(Clone, Debug, PartialEq, Eq)]
86pub struct EtcdConfig {
87 /// The etcd endpoints (`host:port` or full URLs), at least one.
88 pub endpoints: Vec<String>,
89 /// The key the fleet directive set is published to and watched at.
90 pub directives_key: String,
91}
92
93/// Async fan-out write queue settings: where resolved write ops are enqueued for
94/// a downstream applier (`docs/04` §9, ADR-010). Plain data (no broker types), so
95/// the config crate stays free of any Kafka client.
96#[derive(Clone, Debug, PartialEq, Eq)]
97pub struct FanoutConfig {
98 /// The Kafka bootstrap brokers (`host:port`), at least one.
99 pub brokers: Vec<String>,
100 /// The topic each op envelope is produced to.
101 pub topic: String,
102 /// TLS to the brokers, or `None` for a plaintext broker connection.
103 pub tls: Option<CaptureTlsConfig>,
104 /// How the document body is encoded in the envelope (default CBOR).
105 pub body_encoding: FanoutBodyEncoding,
106 /// Whether async is the deployment-default write mode (default `false`):
107 /// `false` = sync unless a request sends `X-Write-Mode: async`; `true` =
108 /// async unless a request sends `X-Write-Mode: sync`.
109 pub async_default: bool,
110 /// Whether `_delete_by_query` may be expanded into per-match deletes in async
111 /// mode (default `false`). It reads the match set and enqueues a delete each,
112 /// so it is opt-in (`docs/04` §9).
113 pub expand_delete_by_query: bool,
114}
115
116/// The on-the-wire encoding of the fan-out op-envelope document body.
117#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
118pub enum FanoutBodyEncoding {
119 /// CBOR (RFC 8949): compact binary, OpenSearch-native. The default.
120 #[default]
121 Cbor,
122 /// Verbatim JSON: human-readable for debugging the queue.
123 Json,
124}
125
126/// Full-fidelity traffic capture settings: where to send the captured exchange
127/// stream. This is plain data (no broker types), so the config crate stays
128/// independent of any Kafka client; the binary builds the producer from it.
129#[derive(Clone, Debug, PartialEq, Eq)]
130pub struct CaptureConfig {
131 /// The Kafka bootstrap brokers (`host:port`), at least one.
132 pub brokers: Vec<String>,
133 /// The topic each captured exchange envelope is produced to.
134 pub topic: String,
135 /// Whether to redact the `Authorization` header from the captured stream
136 /// (default `true`). The capture stream is privileged and carries bodies and
137 /// values verbatim, so credentials are stripped unless explicitly kept.
138 pub redact: bool,
139 /// TLS to the brokers, or `None` for a plaintext broker connection.
140 pub tls: Option<CaptureTlsConfig>,
141 /// The most records in flight (buffered + retrying) at once before a produce
142 /// is dropped, bounding memory. Higher = fewer drops under load, more memory.
143 pub max_inflight: usize,
144 /// Total send attempts per record before giving up. Higher = better delivery
145 /// odds across a transient broker blip, at the cost of more retry work.
146 pub max_attempts: u32,
147 /// The first retry backoff in milliseconds; it doubles after each failure.
148 pub backoff_ms: u64,
149 /// Directory for the durable on-disk spill buffer, or `None` for in-memory
150 /// best-effort. Set it for **at-least-once** capture that survives a restart:
151 /// records persist to a write-ahead log here and replay until acknowledged.
152 pub wal_dir: Option<String>,
153 /// Cap on undelivered bytes in the spill buffer before new records are dropped
154 /// (only meaningful with `wal_dir`). Bounds disk like `max_inflight` bounds memory.
155 pub wal_max_bytes: u64,
156}
157
158/// TLS settings for the capture broker connection: PEM file **paths** (the binary
159/// reads them). Presence of `ca_path` pins that CA; a client cert/key pair adds
160/// mTLS.
161#[derive(Clone, Debug, PartialEq, Eq)]
162pub struct CaptureTlsConfig {
163 /// Path to the CA PEM the broker certificate must chain to (pinned trust).
164 pub ca_path: String,
165 /// Path to the client certificate chain PEM for mTLS, or `None`.
166 pub client_cert_path: Option<String>,
167 /// Path to the client private key PEM for mTLS, or `None`.
168 pub client_key_path: Option<String>,
169}
170
171/// TLS termination settings: PEM file **paths** (the binary reads them, config
172/// stays free of certificate material). mTLS is required when `client_ca_path`
173/// is set.
174#[derive(Clone, Debug, PartialEq, Eq)]
175pub struct TlsConfig {
176 /// Path to the server certificate chain PEM.
177 pub cert_path: String,
178 /// Path to the server private key PEM.
179 pub key_path: String,
180 /// Path to the client-CA PEM that client certs must chain to (enables mTLS).
181 pub client_ca_path: Option<String>,
182}
183
184/// Observability and control-plane channel settings.
185#[derive(Clone, Debug, PartialEq, Eq)]
186pub struct ObservabilityConfig {
187 /// Whether to emit a structured JSON log line per request.
188 pub log_requests: bool,
189 /// The OTLP collector base URL for span export, or `None` (export off).
190 pub otlp_endpoint: Option<String>,
191 /// The `service.name` reported on exported spans.
192 pub service_name: String,
193 /// The baseline diagnostics verbosity applied before any directive.
194 pub diag_baseline: DiagBaseline,
195 /// The shared HMAC key verifying signed `X-Debug-Directive` headers, or `None`.
196 pub debug_directive_key: Option<String>,
197 /// The bearer token gating `POST/GET /admin/directives`, or `None` (disabled).
198 pub directive_admin_token: Option<String>,
199 /// Whether the pre-auth `/debug/explain` and `/debug/breakglass` surfaces are
200 /// served (default `true`). Set `false` in production so operational metadata
201 /// is not exposed unauthenticated; `/metrics` stays on regardless.
202 pub debug_endpoints: bool,
203 /// Whether directive-selected break-glass captures are also pushed off-instance
204 /// as structured JSON lines (default `false`), so a fleet aggregator can serve
205 /// them by `trace_id` rather than only the local per-instance ring (`docs/05`
206 /// §5). The fleet-coherent counterpart of the break-glass tape.
207 pub log_diagnostic_captures: bool,
208}
209
210/// The admin pass-through policy: the cluster that answers admin requests and the
211/// allow-listed path prefixes.
212#[derive(Clone, Debug, PartialEq, Eq)]
213pub struct AdminPassthroughConfig {
214 /// The cluster id admin requests are forwarded to.
215 pub cluster: String,
216 /// The allow-listed path prefixes (e.g. `/_cat/`).
217 pub prefixes: Vec<String>,
218 /// The admin cluster's base URL, or `None` to resolve it via the tenancy's
219 /// `cluster_endpoint` lookup.
220 pub endpoint: Option<String>,
221}
222
223/// Client-to-upstream header forwarding (`forward_client_headers` /
224/// `forward_header_deny`). The proxy rebuilds the upstream request, so this
225/// decides which of the client's own headers ride along.
226#[derive(Clone, Debug, PartialEq, Eq)]
227pub struct HeaderForwardingConfig {
228 /// Forward client headers to the upstream at all (default `true`). `false`
229 /// relays only the proxy-managed headers (content type, trace).
230 pub enabled: bool,
231 /// Extra headers to drop (case-insensitive), on top of the mandatory
232 /// hop-by-hop/framing set. E.g. `authorization` to keep the client credential
233 /// off the cluster. Empty by default (pass-all).
234 pub deny: Vec<String>,
235}
236
237impl Default for HeaderForwardingConfig {
238 fn default() -> Self {
239 // Pass-all: the sidecar-trust default. The proxy is typically co-located
240 // with the client, so the client's own headers (including auth and vendor
241 // tracing) should reach the cluster unless an operator restricts them.
242 Self {
243 enabled: true,
244 deny: Vec::new(),
245 }
246 }
247}
248
249/// Tenant-agnostic passthrough: forward matching requests verbatim to one
250/// cluster with no tenancy rewrite.
251#[derive(Clone, Debug, PartialEq, Eq)]
252pub struct PassthroughConfig {
253 /// The cluster id matching requests are forwarded to.
254 pub cluster: String,
255 /// The cluster's base URL (the sink pools it).
256 pub endpoint: String,
257 /// Logical-index prefixes that pass through verbatim; empty ⇒ every request
258 /// passes through (whole-instance transparent proxy). A non-empty list
259 /// tenant-isolates every index that does not match (fail-closed).
260 pub index_prefixes: Vec<String>,
261}
262
263/// The baseline diagnostics verbosity. A config-local enum so this crate stays
264/// independent of `osproxy-observe`; the binary maps it to the engine's level.
265#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
266pub enum DiagBaseline {
267 /// Export nothing until a directive selects a request.
268 Off,
269 /// Shapes/ids/field-names only (the default).
270 #[default]
271 Shape,
272 /// `Shape` plus per-stage timing.
273 ShapeTiming,
274 /// `Shape` plus the rewrite diff shape.
275 ShapeRewriteDiff,
276}
277
278impl DiagBaseline {
279 /// The canonical wire/config string for this level.
280 #[must_use]
281 pub fn as_str(self) -> &'static str {
282 match self {
283 Self::Off => "off",
284 Self::Shape => "shape",
285 Self::ShapeTiming => "shape-timing",
286 Self::ShapeRewriteDiff => "shape-rewrite-diff",
287 }
288 }
289}
290
291/// A configuration failure: which setting was bad and why. `Display` is a single
292/// actionable line for both an operator and an LLM (`docs/01` §6).
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub struct ConfigError {
295 field: String,
296 reason: String,
297}
298
299impl ConfigError {
300 /// An invalid value for a known `field`.
301 #[must_use]
302 pub fn invalid(field: impl Into<String>, reason: impl Into<String>) -> Self {
303 Self {
304 field: field.into(),
305 reason: reason.into(),
306 }
307 }
308
309 /// An unrecognized setting key (typo / unsupported option).
310 #[must_use]
311 pub fn unknown(field: impl Into<String>) -> Self {
312 Self {
313 field: field.into(),
314 reason: "unknown setting".to_owned(),
315 }
316 }
317
318 /// The offending setting's name.
319 #[must_use]
320 pub fn field(&self) -> &str {
321 &self.field
322 }
323}
324
325impl std::fmt::Display for ConfigError {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 write!(f, "config: `{}`: {}", self.field, self.reason)
328 }
329}
330
331impl std::error::Error for ConfigError {}
332
333impl Config {
334 /// Loads and validates configuration from the process environment plus
335 /// `args` (CLI flags without the program name). A `--config <path>` flag, or
336 /// the `OSPROXY_CONFIG` env var, names a config file read as the lowest layer.
337 ///
338 /// # Errors
339 ///
340 /// Returns a [`ConfigError`] if a file/flag is malformed, a key is unknown, or
341 /// any value fails validation, before any socket is opened.
342 pub fn load<I: IntoIterator<Item = String>>(args: I) -> Result<Self, ConfigError> {
343 let (file_flag, flags) = extract_config_flag(args)?;
344 let file_path = file_flag.or_else(|| {
345 std::env::var("OSPROXY_CONFIG")
346 .ok()
347 .filter(|v| !v.is_empty())
348 });
349 let file = match &file_path {
350 Some(path) => {
351 let text = std::fs::read_to_string(path)
352 .map_err(|e| ConfigError::invalid("config", format!("reading {path}: {e}")))?;
353 Raw::from_file(&text)?
354 }
355 None => Raw::default(),
356 };
357 let raw = Raw::layered(file, Raw::from_env(), Raw::from_flags(flags)?);
358 resolve::resolve(&raw)
359 }
360
361 /// Test-only: resolve a [`Config`] directly from an in-memory `(key, value)`
362 /// list (canonical keys), skipping the file/env/flag layering. Lets tests and
363 /// doc examples exercise validation without touching the process environment.
364 ///
365 /// # Errors
366 ///
367 /// Returns a [`ConfigError`] if any value fails validation.
368 pub fn resolve_for_test(pairs: &[(&str, &str)]) -> Result<Self, ConfigError> {
369 resolve::resolve(&Raw::from_pairs(pairs)?)
370 }
371}
372
373/// Splits a reserved `--config <path>` / `--config=<path>` flag out of the
374/// argument list, returning the path (if any) and the remaining flags.
375fn extract_config_flag<I: IntoIterator<Item = String>>(
376 args: I,
377) -> Result<(Option<String>, Vec<String>), ConfigError> {
378 let mut file = None;
379 let mut rest = Vec::new();
380 let mut args = args.into_iter();
381 while let Some(arg) = args.next() {
382 if arg == "--config" {
383 file = Some(
384 args.next()
385 .ok_or_else(|| ConfigError::invalid("config", "--config needs a path"))?,
386 );
387 } else if let Some(path) = arg.strip_prefix("--config=") {
388 file = Some(path.to_owned());
389 } else {
390 rest.push(arg);
391 }
392 }
393 Ok((file, rest))
394}