ff_server/config.rs
1use ff_core::partition::PartitionConfig;
2use ff_core::types::LaneId;
3use ff_engine::EngineConfig;
4use std::time::Duration;
5
6/// Server configuration, loaded from environment variables.
7pub struct ServerConfig {
8 /// Valkey host. Default: `"localhost"`.
9 pub host: String,
10 /// Valkey port. Default: `6379`.
11 pub port: u16,
12 /// Enable TLS for Valkey connections.
13 pub tls: bool,
14 /// Enable Valkey cluster mode.
15 pub cluster: bool,
16 /// Partition counts (execution/flow/budget/quota).
17 pub partition_config: PartitionConfig,
18 /// Lanes to manage. Default: `["default"]`.
19 pub lanes: Vec<LaneId>,
20 /// Listen address for the API surface. Default: `"0.0.0.0:9090"`.
21 pub listen_addr: String,
22 /// Scanner intervals and engine config.
23 pub engine_config: EngineConfig,
24 /// Skip library loading (for tests where TestCluster already loaded it).
25 pub skip_library_load: bool,
26 /// Allowed CORS origins. `["*"]` means permissive (all origins).
27 pub cors_origins: Vec<String>,
28 /// Shared-secret API token. If set, all requests except GET /healthz must
29 /// include `Authorization: Bearer <token>`. If unset, auth is disabled.
30 pub api_token: Option<String>,
31 /// Hex-encoded secret used to sign waitpoint HMAC tokens (RFC-004
32 /// §Waitpoint Security). Required on boot; the server refuses to start
33 /// without it so multi-tenant signal authentication is never silently
34 /// disabled. Recommended length: 64 hex chars (32 bytes).
35 pub waitpoint_hmac_secret: String,
36 /// Grace window during which tokens signed by the previous kid remain
37 /// accepted after rotation. Tokens already in flight survive operator
38 /// rotation; operators tighten this for sensitive tenants. Default 24h.
39 pub waitpoint_hmac_grace_ms: u64,
40 /// Maximum concurrent stream-op callers (`read_attempt_stream` +
41 /// `tail_attempt_stream` combined). Each caller holds one semaphore
42 /// permit for the duration of its Valkey round-trip(s); contention
43 /// surfaces as HTTP 429 at the REST boundary.
44 ///
45 /// Shared bound for both read and tail because both run on the same
46 /// dedicated `tail_client` (see `Server.tail_client`) — a big
47 /// 10_000-frame XRANGE reply can head-of-line the mux just as badly
48 /// as a long `XREAD BLOCK`, so they should share fairness accounting.
49 ///
50 /// Default `64`. Set below the server's request-concurrency budget
51 /// so stream ops cannot starve other routes. Env var:
52 /// `FF_MAX_CONCURRENT_STREAM_OPS` (preferred) or legacy
53 /// `FF_MAX_CONCURRENT_TAIL` (accepted during the R4 rename; both
54 /// valid for at least one release).
55 pub max_concurrent_stream_ops: u32,
56}
57
58impl ServerConfig {
59 /// Load configuration from environment variables.
60 ///
61 /// The table below enumerates every variable this function reads. It is
62 /// the canonical rustdoc mirror of the identical table in the top-level
63 /// `README.md`. `docs/DEPLOYMENT.md` references these names.
64 ///
65 /// **Maintenance contract:** every env var key this function consumes —
66 /// whether via a direct `std::env::var(...)` call or through the
67 /// `env_or` / `env_bool` / `env_u16` / `env_u16_positive` / `env_u64` /
68 /// `env_u32_positive` helpers — MUST have a row here. When you add,
69 /// rename, or remove an env var, update this table in the same commit.
70 /// There is no compile-time check — reviewers enforce it. Legacy
71 /// aliases accepted during a rename window (e.g. `FF_MAX_CONCURRENT_TAIL`)
72 /// should be listed alongside their preferred name.
73 ///
74 /// | Variable | Default | Description |
75 /// |----------|---------|-------------|
76 /// | `FF_WAITPOINT_HMAC_SECRET` | *required* | Hex-encoded HMAC signing secret for waitpoint tokens (RFC-004 §Waitpoint Security). Even-length hex; 64 chars (32 bytes) recommended. Boot fails without it. |
77 /// | `FF_HOST` | `localhost` | Valkey host |
78 /// | `FF_PORT` | `6379` | Valkey port |
79 /// | `FF_TLS` | `false` | Enable TLS for Valkey (`1` or `true`) |
80 /// | `FF_CLUSTER` | `false` | Enable Valkey cluster mode (`1` or `true`) |
81 /// | `FF_LISTEN_ADDR` | `0.0.0.0:9090` | API listen address |
82 /// | `FF_LANES` | `default` | Comma-separated lane names; at least one non-empty lane required |
83 /// | `FF_FLOW_PARTITIONS` | `256` | Flow partition count — authoritative; under RFC-011 hash-tag co-location, exec keys also route here |
84 /// | `FF_BUDGET_PARTITIONS` | `32` | Budget partition count |
85 /// | `FF_QUOTA_PARTITIONS` | `32` | Quota partition count |
86 /// | `FF_CORS_ORIGINS` | `*` | Comma-separated CORS origins (`*` = permissive). Empty string is rejected; unset the var to get the default. |
87 /// | `FF_API_TOKEN` | *(none)* | Shared-secret Bearer token. If set, all non-`/healthz` requests require it. |
88 /// | `FF_WAITPOINT_HMAC_GRACE_MS` | `86400000` | Grace window (ms) during which tokens signed by the previous kid remain accepted after rotation. Default 24h. |
89 /// | `FF_MAX_CONCURRENT_STREAM_OPS` | `64` | Shared semaphore bound for `read_attempt_stream` + `tail_attempt_stream`. Legacy `FF_MAX_CONCURRENT_TAIL` is accepted as a fallback; if both are set, the new name wins. |
90 /// | `FF_MAX_CONCURRENT_TAIL` | *(legacy)* | Deprecated alias for `FF_MAX_CONCURRENT_STREAM_OPS`; accepted during the R4 rename window. |
91 /// | `FF_LEASE_EXPIRY_INTERVAL_MS` | `1500` | Lease-expiry scanner interval |
92 /// | `FF_DELAYED_PROMOTER_INTERVAL_MS` | `750` | Delayed-promoter scanner interval |
93 /// | `FF_INDEX_RECONCILER_INTERVAL_S` | `45` | Index reconciler interval |
94 /// | `FF_ATTEMPT_TIMEOUT_INTERVAL_S` | `2` | Attempt-timeout scanner interval |
95 /// | `FF_SUSPENSION_TIMEOUT_INTERVAL_S` | `2` | Suspension-timeout scanner interval |
96 /// | `FF_PENDING_WP_EXPIRY_INTERVAL_S` | `5` | Pending-waitpoint expiry scanner interval |
97 /// | `FF_RETENTION_TRIMMER_INTERVAL_S` | `60` | Retention-trimmer scanner interval |
98 /// | `FF_BUDGET_RESET_INTERVAL_S` | `15` | Budget-reset scanner interval |
99 /// | `FF_BUDGET_RECONCILER_INTERVAL_S` | `30` | Budget reconciler interval |
100 /// | `FF_QUOTA_RECONCILER_INTERVAL_S` | `30` | Quota reconciler interval |
101 /// | `FF_UNBLOCK_INTERVAL_S` | `5` | Unblock scanner interval |
102 /// | `FF_DEPENDENCY_RECONCILER_INTERVAL_S` | `15` | DAG dependency reconciler interval (safety net behind push-based promotion) |
103 /// | `FF_FLOW_PROJECTOR_INTERVAL_S` | `15` | Flow projector scanner interval |
104 /// | `FF_EXECUTION_DEADLINE_INTERVAL_S` | `5` | Execution-deadline scanner interval |
105 /// | `FF_CANCEL_RECONCILER_INTERVAL_S` | `15` | Cancel reconciler scanner interval |
106 pub fn from_env() -> Result<Self, ConfigError> {
107 let host = env_or("FF_HOST", "localhost");
108 let port = env_u16("FF_PORT", 6379)?;
109 let tls = env_bool("FF_TLS");
110 let cluster = env_bool("FF_CLUSTER");
111 let listen_addr = env_or("FF_LISTEN_ADDR", "0.0.0.0:9090");
112 // FF_CORS_ORIGINS contract:
113 // unset → default "*" (permissive)
114 // "*" → permissive
115 // "a,b,c" → explicit allowlist
116 // "" → hard error. An empty explicit value almost always
117 // means "I tried to unset it" which a blank env var
118 // does not do. We refuse to guess and make the
119 // operator's intent explicit.
120 let cors_raw = std::env::var("FF_CORS_ORIGINS");
121 let cors_source = match &cors_raw {
122 Ok(s) if s.is_empty() => {
123 return Err(ConfigError::InvalidValue {
124 var: "FF_CORS_ORIGINS".to_owned(),
125 message: "FF_CORS_ORIGINS is set but empty; \
126 unset it to default to \"*\", or pass \"*\" explicitly, \
127 or pass a non-empty comma-separated origin list"
128 .to_owned(),
129 });
130 }
131 Ok(s) => s.clone(),
132 Err(_) => "*".to_owned(),
133 };
134 let cors_origins: Vec<String> = cors_source
135 .split(',')
136 .map(|s| s.trim().to_owned())
137 .filter(|s| !s.is_empty())
138 .collect();
139
140 let api_token = std::env::var("FF_API_TOKEN").ok().filter(|s| !s.is_empty());
141
142 // Waitpoint HMAC secret. Required on boot — refuse to start without
143 // it so multi-tenant signal authentication can never be silently
144 // disabled. Validate hex shape eagerly; empty strings and bad hex
145 // produce a configuration error, not a runtime crash later.
146 let waitpoint_hmac_secret = std::env::var("FF_WAITPOINT_HMAC_SECRET")
147 .map_err(|_| ConfigError::InvalidValue {
148 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
149 message:
150 "required: hex-encoded HMAC signing secret for waitpoint tokens \
151 (RFC-004 §Waitpoint Security); suggested 64 hex chars (32 bytes)"
152 .to_owned(),
153 })?;
154 if waitpoint_hmac_secret.is_empty() {
155 return Err(ConfigError::InvalidValue {
156 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
157 message: "must not be empty".to_owned(),
158 });
159 }
160 if waitpoint_hmac_secret.len() % 2 != 0
161 || !waitpoint_hmac_secret.chars().all(|c| c.is_ascii_hexdigit())
162 {
163 return Err(ConfigError::InvalidValue {
164 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
165 message: "must be an even-length hex string (0-9a-fA-F)".to_owned(),
166 });
167 }
168 let waitpoint_hmac_grace_ms = env_u64("FF_WAITPOINT_HMAC_GRACE_MS", 86_400_000)?;
169 // Preferred env var: FF_MAX_CONCURRENT_STREAM_OPS. Legacy
170 // FF_MAX_CONCURRENT_TAIL is accepted for one release to avoid
171 // breaking existing deployments mid-rename (R4 unified the two
172 // stream-op clients on one permit pool). If both are set, the
173 // new name wins.
174 let max_concurrent_stream_ops = match std::env::var("FF_MAX_CONCURRENT_STREAM_OPS") {
175 Ok(_) => env_u32_positive("FF_MAX_CONCURRENT_STREAM_OPS", 64)?,
176 Err(_) => env_u32_positive("FF_MAX_CONCURRENT_TAIL", 64)?,
177 };
178
179 let lanes: Vec<LaneId> = env_or("FF_LANES", "default")
180 .split(',')
181 .map(|s| LaneId::new(s.trim()))
182 .filter(|l| !l.as_str().is_empty())
183 .collect();
184 if lanes.is_empty() {
185 return Err(ConfigError::InvalidValue {
186 var: "FF_LANES".to_owned(),
187 message: "at least one non-empty lane name is required".to_owned(),
188 });
189 }
190
191 let partition_config = PartitionConfig {
192 // RFC-011: num_execution_partitions retired; exec keys co-locate on
193 // {fp:N}. FF_FLOW_PARTITIONS is the canonical env var.
194 num_flow_partitions: env_u16_positive("FF_FLOW_PARTITIONS", 256)?,
195 num_budget_partitions: env_u16_positive("FF_BUDGET_PARTITIONS", 32)?,
196 num_quota_partitions: env_u16_positive("FF_QUOTA_PARTITIONS", 32)?,
197 };
198
199 let lease_expiry_interval =
200 Duration::from_millis(env_u64("FF_LEASE_EXPIRY_INTERVAL_MS", 1500)?);
201 let delayed_promoter_interval =
202 Duration::from_millis(env_u64("FF_DELAYED_PROMOTER_INTERVAL_MS", 750)?);
203 let index_reconciler_interval =
204 Duration::from_secs(env_u64("FF_INDEX_RECONCILER_INTERVAL_S", 45)?);
205 let attempt_timeout_interval =
206 Duration::from_secs(env_u64("FF_ATTEMPT_TIMEOUT_INTERVAL_S", 2)?);
207 let suspension_timeout_interval =
208 Duration::from_secs(env_u64("FF_SUSPENSION_TIMEOUT_INTERVAL_S", 2)?);
209 let pending_wp_expiry_interval =
210 Duration::from_secs(env_u64("FF_PENDING_WP_EXPIRY_INTERVAL_S", 5)?);
211 let retention_trimmer_interval =
212 Duration::from_secs(env_u64("FF_RETENTION_TRIMMER_INTERVAL_S", 60)?);
213 let budget_reset_interval =
214 Duration::from_secs(env_u64("FF_BUDGET_RESET_INTERVAL_S", 15)?);
215 let budget_reconciler_interval =
216 Duration::from_secs(env_u64("FF_BUDGET_RECONCILER_INTERVAL_S", 30)?);
217 let quota_reconciler_interval =
218 Duration::from_secs(env_u64("FF_QUOTA_RECONCILER_INTERVAL_S", 30)?);
219 let unblock_interval =
220 Duration::from_secs(env_u64("FF_UNBLOCK_INTERVAL_S", 5)?);
221 // Raised from 1s (pre-Batch-C) to 15s now that push-based DAG
222 // promotion is primary. The reconciler is a safety net post-
223 // completion-listener; see ff-engine docs on
224 // `dependency_reconciler_interval`.
225 let dependency_reconciler_interval =
226 Duration::from_secs(env_u64("FF_DEPENDENCY_RECONCILER_INTERVAL_S", 15)?);
227
228 let engine_config = EngineConfig {
229 partition_config,
230 lanes: lanes.clone(),
231 lease_expiry_interval,
232 delayed_promoter_interval,
233 index_reconciler_interval,
234 attempt_timeout_interval,
235 suspension_timeout_interval,
236 pending_wp_expiry_interval,
237 retention_trimmer_interval,
238 budget_reset_interval,
239 budget_reconciler_interval,
240 quota_reconciler_interval,
241 unblock_interval,
242 dependency_reconciler_interval,
243 flow_projector_interval: Duration::from_secs(
244 env_u64("FF_FLOW_PROJECTOR_INTERVAL_S", 15)?
245 ),
246 execution_deadline_interval: Duration::from_secs(
247 env_u64("FF_EXECUTION_DEADLINE_INTERVAL_S", 5)?
248 ),
249 cancel_reconciler_interval: Duration::from_secs(
250 env_u64("FF_CANCEL_RECONCILER_INTERVAL_S", 15)?
251 ),
252 // Issue #122: default is no-op. Multi-tenant deployments
253 // override this after ServerConfig construction.
254 scanner_filter: Default::default(),
255 };
256
257 Ok(Self {
258 host,
259 port,
260 tls,
261 cluster,
262 partition_config,
263 lanes,
264 listen_addr,
265 engine_config,
266 skip_library_load: false,
267 cors_origins,
268 api_token,
269 waitpoint_hmac_secret,
270 waitpoint_hmac_grace_ms,
271 max_concurrent_stream_ops,
272 })
273 }
274}
275
276impl Default for ServerConfig {
277 fn default() -> Self {
278 let lanes = vec![LaneId::new("default")];
279 let partition_config = PartitionConfig::default();
280 Self {
281 host: "localhost".into(),
282 port: 6379,
283 tls: false,
284 cluster: false,
285 partition_config,
286 lanes: lanes.clone(),
287 listen_addr: "0.0.0.0:9090".into(),
288 engine_config: EngineConfig {
289 partition_config,
290 lanes,
291 ..Default::default()
292 },
293 skip_library_load: false,
294 cors_origins: vec!["*".to_owned()],
295 api_token: None,
296 // Deterministic dev/test secret. Production deployments MUST
297 // override via FF_WAITPOINT_HMAC_SECRET (ServerConfig::from_env
298 // requires it), so this default only applies to unit tests and
299 // TestCluster fixtures that skip env validation.
300 waitpoint_hmac_secret:
301 "0000000000000000000000000000000000000000000000000000000000000000"
302 .to_owned(),
303 waitpoint_hmac_grace_ms: 86_400_000,
304 max_concurrent_stream_ops: 64,
305 }
306 }
307}
308
309/// Configuration error.
310#[derive(Debug, thiserror::Error)]
311pub enum ConfigError {
312 #[error("invalid value for {var}: {message}")]
313 InvalidValue { var: String, message: String },
314}
315
316fn env_or(key: &str, default: &str) -> String {
317 std::env::var(key).unwrap_or_else(|_| default.to_owned())
318}
319
320fn env_bool(key: &str) -> bool {
321 std::env::var(key)
322 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
323 .unwrap_or(false)
324}
325
326fn env_u16(key: &str, default: u16) -> Result<u16, ConfigError> {
327 match std::env::var(key) {
328 Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
329 var: key.to_owned(),
330 message: format!("expected u16, got '{v}'"),
331 }),
332 Err(_) => Ok(default),
333 }
334}
335
336/// Like env_u16 but rejects 0 (for partition counts that are used as divisors).
337fn env_u16_positive(key: &str, default: u16) -> Result<u16, ConfigError> {
338 let val = env_u16(key, default)?;
339 if val == 0 {
340 return Err(ConfigError::InvalidValue {
341 var: key.to_owned(),
342 message: "must be > 0 (used as divisor in partition math)".to_owned(),
343 });
344 }
345 Ok(val)
346}
347
348fn env_u64(key: &str, default: u64) -> Result<u64, ConfigError> {
349 match std::env::var(key) {
350 Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
351 var: key.to_owned(),
352 message: format!("expected u64, got '{v}'"),
353 }),
354 Err(_) => Ok(default),
355 }
356}
357
358fn env_u32_positive(key: &str, default: u32) -> Result<u32, ConfigError> {
359 let val = match std::env::var(key) {
360 Ok(v) => v.parse::<u32>().map_err(|_| ConfigError::InvalidValue {
361 var: key.to_owned(),
362 message: format!("expected u32, got '{v}'"),
363 })?,
364 Err(_) => default,
365 };
366 if val == 0 {
367 return Err(ConfigError::InvalidValue {
368 var: key.to_owned(),
369 message: "must be > 0 (semaphore size)".to_owned(),
370 });
371 }
372 Ok(val)
373}