ff_server/config.rs
1use ff_core::partition::PartitionConfig;
2use ff_core::types::LaneId;
3use ff_engine::EngineConfig;
4use std::time::Duration;
5
6/// RFC-017 Stage A: backend family selector. Default `Valkey`. At
7/// Stage E4 (v0.8.0) both `Valkey` and `Postgres` are first-class and
8/// boot without a dev-override; `BACKEND_STAGE_READY` remains in
9/// `ff-server::server` as defence-in-depth for future backend
10/// additions.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12#[non_exhaustive]
13pub enum BackendKind {
14 /// Valkey / FCALL backend (production path through v0.7.x).
15 #[default]
16 Valkey,
17 /// Postgres backend. First-class since v0.8.0 (RFC-017 Stage E4).
18 Postgres,
19}
20
21impl BackendKind {
22 /// Stable `&'static str` label matching the backend's
23 /// `backend_label()` for metrics dimensioning.
24 pub fn as_str(&self) -> &'static str {
25 match self {
26 Self::Valkey => "valkey",
27 Self::Postgres => "postgres",
28 }
29 }
30}
31
32/// RFC-017 Wave 8 Stage E1: Postgres connection parameters carried
33/// on [`ServerConfig`] when `backend == BackendKind::Postgres`.
34///
35/// Unlike the flat Valkey fields (`host` / `port` / `tls` / `cluster`),
36/// the Postgres surface is gathered into its own struct because Stage
37/// E4 will retire the flat Valkey fields in favour of a sum-typed
38/// `BackendConfig` on `ServerConfig`. Keeping the Postgres carrier
39/// pre-sum-typed avoids churn when E4 lands.
40#[derive(Debug, Clone)]
41#[non_exhaustive]
42pub struct PostgresServerConfig {
43 /// Connection URL (libpq / sqlx shape:
44 /// `postgres://user:pass@host:port/db`). Read from `FF_POSTGRES_URL`.
45 pub url: String,
46 /// Max pool connections. Read from `FF_POSTGRES_POOL_SIZE`,
47 /// default `10` (matches sqlx's out-of-box default +
48 /// [`ff_core::backend::PostgresConnection`] default).
49 pub pool_size: u32,
50}
51
52impl Default for PostgresServerConfig {
53 fn default() -> Self {
54 Self {
55 url: String::new(),
56 pool_size: 10,
57 }
58 }
59}
60
61/// RFC-017 Wave 8 Stage E4 (v0.8.0): Valkey connection parameters
62/// carried on [`ServerConfig`] when `backend == BackendKind::Valkey`.
63///
64/// Mirrors the pre-existing [`PostgresServerConfig`] shape. Replaces
65/// the flat `host` / `port` / `tls` / `cluster` / `skip_library_load`
66/// fields removed at v0.8.0 in favour of sum-typed nesting.
67///
68/// **Not `#[non_exhaustive]`** so downstream tests and consumers can
69/// construct the struct literal directly. Adding fields here is a
70/// v0.y.0 breaking bump; call sites that want to remain insulated can
71/// use `..ValkeyServerConfig::default()` in their literal.
72#[derive(Debug, Clone)]
73pub struct ValkeyServerConfig {
74 /// Valkey host. Default: `"localhost"`.
75 pub host: String,
76 /// Valkey port. Default: `6379`.
77 pub port: u16,
78 /// Enable TLS for Valkey connections.
79 pub tls: bool,
80 /// Enable Valkey cluster mode.
81 pub cluster: bool,
82 /// Skip library loading (for tests where TestCluster already loaded it).
83 pub skip_library_load: bool,
84}
85
86impl Default for ValkeyServerConfig {
87 fn default() -> Self {
88 Self {
89 host: "localhost".into(),
90 port: 6379,
91 tls: false,
92 cluster: false,
93 skip_library_load: false,
94 }
95 }
96}
97
98/// Server configuration, loaded from environment variables.
99///
100/// **RFC-017 Stage E4 (v0.8.0):** the flat Valkey fields (`host`,
101/// `port`, `tls`, `cluster`, `skip_library_load`) were removed. Use
102/// [`ValkeyServerConfig`] on the `valkey` field instead.
103pub struct ServerConfig {
104 /// Partition counts (execution/flow/budget/quota).
105 pub partition_config: PartitionConfig,
106 /// Lanes to manage. Default: `["default"]`.
107 pub lanes: Vec<LaneId>,
108 /// Listen address for the API surface. Default: `"0.0.0.0:9090"`.
109 pub listen_addr: String,
110 /// Scanner intervals and engine config.
111 pub engine_config: EngineConfig,
112 /// Allowed CORS origins. `["*"]` means permissive (all origins).
113 pub cors_origins: Vec<String>,
114 /// Shared-secret API token. If set, all requests except GET /healthz must
115 /// include `Authorization: Bearer <token>`. If unset, auth is disabled.
116 pub api_token: Option<String>,
117 /// Hex-encoded secret used to sign waitpoint HMAC tokens (RFC-004
118 /// §Waitpoint Security). Required on boot; the server refuses to start
119 /// without it so multi-tenant signal authentication is never silently
120 /// disabled. Recommended length: 64 hex chars (32 bytes).
121 pub waitpoint_hmac_secret: String,
122 /// Grace window during which tokens signed by the previous kid remain
123 /// accepted after rotation. Tokens already in flight survive operator
124 /// rotation; operators tighten this for sensitive tenants. Default 24h.
125 pub waitpoint_hmac_grace_ms: u64,
126 /// Maximum concurrent stream-op callers (`read_attempt_stream` +
127 /// `tail_attempt_stream` combined). Each caller holds one semaphore
128 /// permit for the duration of its Valkey round-trip(s); contention
129 /// surfaces as HTTP 429 at the REST boundary.
130 ///
131 /// Shared bound for both read and tail because both run on the same
132 /// dedicated `tail_client` (see `Server.tail_client`) — a big
133 /// 10_000-frame XRANGE reply can head-of-line the mux just as badly
134 /// as a long `XREAD BLOCK`, so they should share fairness accounting.
135 ///
136 /// Default `64`. Set below the server's request-concurrency budget
137 /// so stream ops cannot starve other routes. Env var:
138 /// `FF_MAX_CONCURRENT_STREAM_OPS` (preferred) or legacy
139 /// `FF_MAX_CONCURRENT_TAIL` (accepted during the R4 rename; both
140 /// valid for at least one release).
141 pub max_concurrent_stream_ops: u32,
142 /// RFC-017 Stage A: which backend family to boot. Default
143 /// [`BackendKind::Valkey`]. `BackendKind::Postgres` is rejected
144 /// at startup through Stage D per RFC-017 §9.0.
145 pub backend: BackendKind,
146 /// RFC-017 Stage E4 (v0.8.0): Valkey connection parameters.
147 /// Meaningful only when `backend == BackendKind::Valkey`; the
148 /// Postgres path ignores these fields.
149 pub valkey: ValkeyServerConfig,
150 /// RFC-017 Wave 8 Stage E1: Postgres connection parameters.
151 /// Meaningful only when `backend == BackendKind::Postgres`; the
152 /// Valkey path ignores these fields.
153 pub postgres: PostgresServerConfig,
154}
155
156impl ServerConfig {
157 /// RFC-017 Wave 8 Stage E1: build the
158 /// [`ff_core::backend::BackendConfig`] the Postgres backend's
159 /// `connect_with_metrics` expects, from the flat `postgres.url`
160 /// + `postgres.pool_size` fields on this struct.
161 pub fn postgres_config(&self) -> ff_core::backend::BackendConfig {
162 let mut cfg = ff_core::backend::BackendConfig::postgres(&self.postgres.url);
163 if let ff_core::backend::BackendConnection::Postgres(ref mut conn) = cfg.connection {
164 conn.max_connections = self.postgres.pool_size;
165 }
166 cfg
167 }
168}
169
170impl ServerConfig {
171 /// Load configuration from environment variables.
172 ///
173 /// The table below enumerates every variable this function reads. It is
174 /// the canonical rustdoc mirror of the identical table in the top-level
175 /// `README.md`. `docs/DEPLOYMENT.md` references these names.
176 ///
177 /// **Maintenance contract:** every env var key this function consumes —
178 /// whether via a direct `std::env::var(...)` call or through the
179 /// `env_or` / `env_bool` / `env_u16` / `env_u16_positive` / `env_u64` /
180 /// `env_u32_positive` helpers — MUST have a row here. When you add,
181 /// rename, or remove an env var, update this table in the same commit.
182 /// There is no compile-time check — reviewers enforce it. Legacy
183 /// aliases accepted during a rename window (e.g. `FF_MAX_CONCURRENT_TAIL`)
184 /// should be listed alongside their preferred name.
185 ///
186 /// | Variable | Default | Description |
187 /// |----------|---------|-------------|
188 /// | `FF_WAITPOINT_HMAC_SECRET` | *required* | Hex-encoded HMAC signing secret for waitpoint tokens (RFC-004 §Waitpoint Security). Even-length hex; 64 chars (32 bytes) recommended. Boot fails without it. |
189 /// | `FF_HOST` | `localhost` | Valkey host |
190 /// | `FF_PORT` | `6379` | Valkey port |
191 /// | `FF_TLS` | `false` | Enable TLS for Valkey (`1` or `true`) |
192 /// | `FF_CLUSTER` | `false` | Enable Valkey cluster mode (`1` or `true`) |
193 /// | `FF_LISTEN_ADDR` | `0.0.0.0:9090` | API listen address |
194 /// | `FF_LANES` | `default` | Comma-separated lane names; at least one non-empty lane required |
195 /// | `FF_FLOW_PARTITIONS` | `256` | Flow partition count — authoritative; under RFC-011 hash-tag co-location, exec keys also route here |
196 /// | `FF_BUDGET_PARTITIONS` | `32` | Budget partition count |
197 /// | `FF_QUOTA_PARTITIONS` | `32` | Quota partition count |
198 /// | `FF_CORS_ORIGINS` | `*` | Comma-separated CORS origins (`*` = permissive). Empty string is rejected; unset the var to get the default. |
199 /// | `FF_API_TOKEN` | *(none)* | Shared-secret Bearer token. If set, all non-`/healthz` requests require it. |
200 /// | `FF_WAITPOINT_HMAC_GRACE_MS` | `86400000` | Grace window (ms) during which tokens signed by the previous kid remain accepted after rotation. Default 24h. |
201 /// | `FF_MAX_CONCURRENT_STREAM_OPS` | `64` | Shared semaphore bound for `read_attempt_stream` + `tail_attempt_stream`. Legacy `FF_MAX_CONCURRENT_TAIL` is accepted as a fallback; if both are set, the new name wins. |
202 /// | `FF_MAX_CONCURRENT_TAIL` | *(legacy)* | Deprecated alias for `FF_MAX_CONCURRENT_STREAM_OPS`; accepted during the R4 rename window. |
203 /// | `FF_LEASE_EXPIRY_INTERVAL_MS` | `1500` | Lease-expiry scanner interval |
204 /// | `FF_DELAYED_PROMOTER_INTERVAL_MS` | `750` | Delayed-promoter scanner interval |
205 /// | `FF_INDEX_RECONCILER_INTERVAL_S` | `45` | Index reconciler interval |
206 /// | `FF_ATTEMPT_TIMEOUT_INTERVAL_S` | `2` | Attempt-timeout scanner interval |
207 /// | `FF_SUSPENSION_TIMEOUT_INTERVAL_S` | `2` | Suspension-timeout scanner interval |
208 /// | `FF_PENDING_WP_EXPIRY_INTERVAL_S` | `5` | Pending-waitpoint expiry scanner interval |
209 /// | `FF_RETENTION_TRIMMER_INTERVAL_S` | `60` | Retention-trimmer scanner interval |
210 /// | `FF_BUDGET_RESET_INTERVAL_S` | `15` | Budget-reset scanner interval |
211 /// | `FF_BUDGET_RECONCILER_INTERVAL_S` | `30` | Budget reconciler interval |
212 /// | `FF_QUOTA_RECONCILER_INTERVAL_S` | `30` | Quota reconciler interval |
213 /// | `FF_UNBLOCK_INTERVAL_S` | `5` | Unblock scanner interval |
214 /// | `FF_DEPENDENCY_RECONCILER_INTERVAL_S` | `15` | DAG dependency reconciler interval (safety net behind push-based promotion) |
215 /// | `FF_FLOW_PROJECTOR_INTERVAL_S` | `15` | Flow projector scanner interval |
216 /// | `FF_EXECUTION_DEADLINE_INTERVAL_S` | `5` | Execution-deadline scanner interval |
217 /// | `FF_CANCEL_RECONCILER_INTERVAL_S` | `15` | Cancel reconciler scanner interval |
218 /// | `FF_BACKEND` | `valkey` | Backend family — `valkey` or `postgres`. Both are first-class at v0.8.0 (RFC-017 Stage E4 flipped `BACKEND_STAGE_READY` to `&["valkey", "postgres"]`). |
219 /// | `FF_POSTGRES_URL` | *(empty)* | Postgres connection URL (libpq/sqlx shape, e.g. `postgres://user:pass@host:port/db`). Required when `FF_BACKEND=postgres`; ignored otherwise. |
220 /// | `FF_POSTGRES_POOL_SIZE` | `10` | Max Postgres pool connections; ignored on the Valkey path. |
221 pub fn from_env() -> Result<Self, ConfigError> {
222 let valkey = ValkeyServerConfig {
223 host: env_or("FF_HOST", "localhost"),
224 port: env_u16("FF_PORT", 6379)?,
225 tls: env_bool("FF_TLS"),
226 cluster: env_bool("FF_CLUSTER"),
227 skip_library_load: false,
228 };
229 let listen_addr = env_or("FF_LISTEN_ADDR", "0.0.0.0:9090");
230 // FF_CORS_ORIGINS contract:
231 // unset → default "*" (permissive)
232 // "*" → permissive
233 // "a,b,c" → explicit allowlist
234 // "" → hard error. An empty explicit value almost always
235 // means "I tried to unset it" which a blank env var
236 // does not do. We refuse to guess and make the
237 // operator's intent explicit.
238 let cors_raw = std::env::var("FF_CORS_ORIGINS");
239 let cors_source = match &cors_raw {
240 Ok(s) if s.is_empty() => {
241 return Err(ConfigError::InvalidValue {
242 var: "FF_CORS_ORIGINS".to_owned(),
243 message: "FF_CORS_ORIGINS is set but empty; \
244 unset it to default to \"*\", or pass \"*\" explicitly, \
245 or pass a non-empty comma-separated origin list"
246 .to_owned(),
247 });
248 }
249 Ok(s) => s.clone(),
250 Err(_) => "*".to_owned(),
251 };
252 let cors_origins: Vec<String> = cors_source
253 .split(',')
254 .map(|s| s.trim().to_owned())
255 .filter(|s| !s.is_empty())
256 .collect();
257
258 let api_token = std::env::var("FF_API_TOKEN").ok().filter(|s| !s.is_empty());
259
260 // Waitpoint HMAC secret. Required on boot — refuse to start without
261 // it so multi-tenant signal authentication can never be silently
262 // disabled. Validate hex shape eagerly; empty strings and bad hex
263 // produce a configuration error, not a runtime crash later.
264 let waitpoint_hmac_secret = std::env::var("FF_WAITPOINT_HMAC_SECRET")
265 .map_err(|_| ConfigError::InvalidValue {
266 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
267 message:
268 "required: hex-encoded HMAC signing secret for waitpoint tokens \
269 (RFC-004 §Waitpoint Security); suggested 64 hex chars (32 bytes)"
270 .to_owned(),
271 })?;
272 if waitpoint_hmac_secret.is_empty() {
273 return Err(ConfigError::InvalidValue {
274 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
275 message: "must not be empty".to_owned(),
276 });
277 }
278 if waitpoint_hmac_secret.len() % 2 != 0
279 || !waitpoint_hmac_secret.chars().all(|c| c.is_ascii_hexdigit())
280 {
281 return Err(ConfigError::InvalidValue {
282 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
283 message: "must be an even-length hex string (0-9a-fA-F)".to_owned(),
284 });
285 }
286 let waitpoint_hmac_grace_ms = env_u64("FF_WAITPOINT_HMAC_GRACE_MS", 86_400_000)?;
287 // Preferred env var: FF_MAX_CONCURRENT_STREAM_OPS. Legacy
288 // FF_MAX_CONCURRENT_TAIL is accepted for one release to avoid
289 // breaking existing deployments mid-rename (R4 unified the two
290 // stream-op clients on one permit pool). If both are set, the
291 // new name wins.
292 let max_concurrent_stream_ops = match std::env::var("FF_MAX_CONCURRENT_STREAM_OPS") {
293 Ok(_) => env_u32_positive("FF_MAX_CONCURRENT_STREAM_OPS", 64)?,
294 Err(_) => env_u32_positive("FF_MAX_CONCURRENT_TAIL", 64)?,
295 };
296
297 let lanes: Vec<LaneId> = env_or("FF_LANES", "default")
298 .split(',')
299 .map(|s| LaneId::new(s.trim()))
300 .filter(|l| !l.as_str().is_empty())
301 .collect();
302 if lanes.is_empty() {
303 return Err(ConfigError::InvalidValue {
304 var: "FF_LANES".to_owned(),
305 message: "at least one non-empty lane name is required".to_owned(),
306 });
307 }
308
309 let partition_config = PartitionConfig {
310 // RFC-011: num_execution_partitions retired; exec keys co-locate on
311 // {fp:N}. FF_FLOW_PARTITIONS is the canonical env var.
312 num_flow_partitions: env_u16_positive("FF_FLOW_PARTITIONS", 256)?,
313 num_budget_partitions: env_u16_positive("FF_BUDGET_PARTITIONS", 32)?,
314 num_quota_partitions: env_u16_positive("FF_QUOTA_PARTITIONS", 32)?,
315 };
316
317 let lease_expiry_interval =
318 Duration::from_millis(env_u64("FF_LEASE_EXPIRY_INTERVAL_MS", 1500)?);
319 let delayed_promoter_interval =
320 Duration::from_millis(env_u64("FF_DELAYED_PROMOTER_INTERVAL_MS", 750)?);
321 let index_reconciler_interval =
322 Duration::from_secs(env_u64("FF_INDEX_RECONCILER_INTERVAL_S", 45)?);
323 let attempt_timeout_interval =
324 Duration::from_secs(env_u64("FF_ATTEMPT_TIMEOUT_INTERVAL_S", 2)?);
325 let suspension_timeout_interval =
326 Duration::from_secs(env_u64("FF_SUSPENSION_TIMEOUT_INTERVAL_S", 2)?);
327 let pending_wp_expiry_interval =
328 Duration::from_secs(env_u64("FF_PENDING_WP_EXPIRY_INTERVAL_S", 5)?);
329 let retention_trimmer_interval =
330 Duration::from_secs(env_u64("FF_RETENTION_TRIMMER_INTERVAL_S", 60)?);
331 let budget_reset_interval =
332 Duration::from_secs(env_u64("FF_BUDGET_RESET_INTERVAL_S", 15)?);
333 let budget_reconciler_interval =
334 Duration::from_secs(env_u64("FF_BUDGET_RECONCILER_INTERVAL_S", 30)?);
335 let quota_reconciler_interval =
336 Duration::from_secs(env_u64("FF_QUOTA_RECONCILER_INTERVAL_S", 30)?);
337 let unblock_interval =
338 Duration::from_secs(env_u64("FF_UNBLOCK_INTERVAL_S", 5)?);
339 // Raised from 1s (pre-Batch-C) to 15s now that push-based DAG
340 // promotion is primary. The reconciler is a safety net post-
341 // completion-listener; see ff-engine docs on
342 // `dependency_reconciler_interval`.
343 let dependency_reconciler_interval =
344 Duration::from_secs(env_u64("FF_DEPENDENCY_RECONCILER_INTERVAL_S", 15)?);
345
346 let engine_config = EngineConfig {
347 partition_config,
348 lanes: lanes.clone(),
349 lease_expiry_interval,
350 delayed_promoter_interval,
351 index_reconciler_interval,
352 attempt_timeout_interval,
353 suspension_timeout_interval,
354 pending_wp_expiry_interval,
355 retention_trimmer_interval,
356 budget_reset_interval,
357 budget_reconciler_interval,
358 quota_reconciler_interval,
359 unblock_interval,
360 dependency_reconciler_interval,
361 flow_projector_interval: Duration::from_secs(
362 env_u64("FF_FLOW_PROJECTOR_INTERVAL_S", 15)?
363 ),
364 execution_deadline_interval: Duration::from_secs(
365 env_u64("FF_EXECUTION_DEADLINE_INTERVAL_S", 5)?
366 ),
367 cancel_reconciler_interval: Duration::from_secs(
368 env_u64("FF_CANCEL_RECONCILER_INTERVAL_S", 15)?
369 ),
370 edge_cancel_dispatcher_interval: Duration::from_secs(
371 env_u64("FF_EDGE_CANCEL_DISPATCHER_INTERVAL_S", 1)?
372 ),
373 edge_cancel_reconciler_interval: Duration::from_secs(
374 env_u64("FF_EDGE_CANCEL_RECONCILER_INTERVAL_S", 10)?
375 ),
376 // Issue #122: default is no-op. Multi-tenant deployments
377 // override this after ServerConfig construction.
378 scanner_filter: Default::default(),
379 };
380
381 // RFC-017 Stage E4 (v0.8.0): `FF_BACKEND` selects the backend
382 // family at boot. Default `valkey`; both `valkey` and `postgres`
383 // are first-class. Unknown values are rejected eagerly so typos
384 // don't silently fall through to the default. FF_POSTGRES_URL +
385 // FF_POSTGRES_POOL_SIZE populate `postgres` when FF_BACKEND=postgres.
386 // Read regardless of backend selector so operators can preset the
387 // values; the Valkey path ignores them.
388 let postgres = PostgresServerConfig {
389 url: std::env::var("FF_POSTGRES_URL").unwrap_or_default(),
390 pool_size: env_u32_positive("FF_POSTGRES_POOL_SIZE", 10)?,
391 };
392
393 let backend = match std::env::var("FF_BACKEND") {
394 Ok(v) => match v.to_ascii_lowercase().as_str() {
395 "" | "valkey" => BackendKind::Valkey,
396 "postgres" => BackendKind::Postgres,
397 other => {
398 return Err(ConfigError::InvalidValue {
399 var: "FF_BACKEND".to_owned(),
400 message: format!(
401 "unknown backend '{other}': expected 'valkey' or 'postgres'"
402 ),
403 });
404 }
405 },
406 Err(_) => BackendKind::default(),
407 };
408
409 Ok(Self {
410 partition_config,
411 lanes,
412 listen_addr,
413 engine_config,
414 cors_origins,
415 api_token,
416 waitpoint_hmac_secret,
417 waitpoint_hmac_grace_ms,
418 max_concurrent_stream_ops,
419 backend,
420 valkey,
421 postgres,
422 })
423 }
424}
425
426impl Default for ServerConfig {
427 fn default() -> Self {
428 let lanes = vec![LaneId::new("default")];
429 let partition_config = PartitionConfig::default();
430 Self {
431 partition_config,
432 lanes: lanes.clone(),
433 listen_addr: "0.0.0.0:9090".into(),
434 engine_config: EngineConfig {
435 partition_config,
436 lanes,
437 ..Default::default()
438 },
439 cors_origins: vec!["*".to_owned()],
440 api_token: None,
441 // Deterministic dev/test secret. Production deployments MUST
442 // override via FF_WAITPOINT_HMAC_SECRET (ServerConfig::from_env
443 // requires it), so this default only applies to unit tests and
444 // TestCluster fixtures that skip env validation.
445 waitpoint_hmac_secret:
446 "0000000000000000000000000000000000000000000000000000000000000000"
447 .to_owned(),
448 waitpoint_hmac_grace_ms: 86_400_000,
449 max_concurrent_stream_ops: 64,
450 backend: BackendKind::default(),
451 valkey: ValkeyServerConfig::default(),
452 postgres: PostgresServerConfig::default(),
453 }
454 }
455}
456
457/// Configuration error.
458#[derive(Debug, thiserror::Error)]
459pub enum ConfigError {
460 #[error("invalid value for {var}: {message}")]
461 InvalidValue { var: String, message: String },
462}
463
464fn env_or(key: &str, default: &str) -> String {
465 std::env::var(key).unwrap_or_else(|_| default.to_owned())
466}
467
468fn env_bool(key: &str) -> bool {
469 std::env::var(key)
470 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
471 .unwrap_or(false)
472}
473
474fn env_u16(key: &str, default: u16) -> Result<u16, ConfigError> {
475 match std::env::var(key) {
476 Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
477 var: key.to_owned(),
478 message: format!("expected u16, got '{v}'"),
479 }),
480 Err(_) => Ok(default),
481 }
482}
483
484/// Like env_u16 but rejects 0 (for partition counts that are used as divisors).
485fn env_u16_positive(key: &str, default: u16) -> Result<u16, ConfigError> {
486 let val = env_u16(key, default)?;
487 if val == 0 {
488 return Err(ConfigError::InvalidValue {
489 var: key.to_owned(),
490 message: "must be > 0 (used as divisor in partition math)".to_owned(),
491 });
492 }
493 Ok(val)
494}
495
496fn env_u64(key: &str, default: u64) -> Result<u64, ConfigError> {
497 match std::env::var(key) {
498 Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
499 var: key.to_owned(),
500 message: format!("expected u64, got '{v}'"),
501 }),
502 Err(_) => Ok(default),
503 }
504}
505
506fn env_u32_positive(key: &str, default: u32) -> Result<u32, ConfigError> {
507 let val = match std::env::var(key) {
508 Ok(v) => v.parse::<u32>().map_err(|_| ConfigError::InvalidValue {
509 var: key.to_owned(),
510 message: format!("expected u32, got '{v}'"),
511 })?,
512 Err(_) => default,
513 };
514 if val == 0 {
515 return Err(ConfigError::InvalidValue {
516 var: key.to_owned(),
517 message: "must be > 0 (semaphore size)".to_owned(),
518 });
519 }
520 Ok(val)
521}