ff_server/config.rs
1use ff_core::partition::PartitionConfig;
2use ff_core::types::LaneId;
3use ff_engine::EngineConfig;
4use std::time::Duration;
5
6/// Server configuration, loaded from environment variables.
7pub struct ServerConfig {
8 /// Valkey host. Default: `"localhost"`.
9 pub host: String,
10 /// Valkey port. Default: `6379`.
11 pub port: u16,
12 /// Enable TLS for Valkey connections.
13 pub tls: bool,
14 /// Enable Valkey cluster mode.
15 pub cluster: bool,
16 /// Partition counts (execution/flow/budget/quota).
17 pub partition_config: PartitionConfig,
18 /// Lanes to manage. Default: `["default"]`.
19 pub lanes: Vec<LaneId>,
20 /// Listen address for the API surface. Default: `"0.0.0.0:9090"`.
21 pub listen_addr: String,
22 /// Scanner intervals and engine config.
23 pub engine_config: EngineConfig,
24 /// Skip library loading (for tests where TestCluster already loaded it).
25 pub skip_library_load: bool,
26 /// Allowed CORS origins. `["*"]` means permissive (all origins).
27 pub cors_origins: Vec<String>,
28 /// Shared-secret API token. If set, all requests except GET /healthz must
29 /// include `Authorization: Bearer <token>`. If unset, auth is disabled.
30 pub api_token: Option<String>,
31 /// Hex-encoded secret used to sign waitpoint HMAC tokens (RFC-004
32 /// §Waitpoint Security). Required on boot; the server refuses to start
33 /// without it so multi-tenant signal authentication is never silently
34 /// disabled. Recommended length: 64 hex chars (32 bytes).
35 pub waitpoint_hmac_secret: String,
36 /// Grace window during which tokens signed by the previous kid remain
37 /// accepted after rotation. Tokens already in flight survive operator
38 /// rotation; operators tighten this for sensitive tenants. Default 24h.
39 pub waitpoint_hmac_grace_ms: u64,
40 /// Maximum concurrent stream-op callers (`read_attempt_stream` +
41 /// `tail_attempt_stream` combined). Each caller holds one semaphore
42 /// permit for the duration of its Valkey round-trip(s); contention
43 /// surfaces as HTTP 429 at the REST boundary.
44 ///
45 /// Shared bound for both read and tail because both run on the same
46 /// dedicated `tail_client` (see `Server.tail_client`) — a big
47 /// 10_000-frame XRANGE reply can head-of-line the mux just as badly
48 /// as a long `XREAD BLOCK`, so they should share fairness accounting.
49 ///
50 /// Default `64`. Set below the server's request-concurrency budget
51 /// so stream ops cannot starve other routes. Env var:
52 /// `FF_MAX_CONCURRENT_STREAM_OPS` (preferred) or legacy
53 /// `FF_MAX_CONCURRENT_TAIL` (accepted during the R4 rename; both
54 /// valid for at least one release).
55 pub max_concurrent_stream_ops: u32,
56}
57
58impl ServerConfig {
59 /// Load configuration from environment variables.
60 ///
61 /// The table below enumerates every variable this function reads. It is
62 /// the canonical rustdoc mirror of the identical table in the top-level
63 /// `README.md`. `docs/DEPLOYMENT.md` references these names.
64 ///
65 /// **Maintenance contract:** every env var key this function consumes —
66 /// whether via a direct `std::env::var(...)` call or through the
67 /// `env_or` / `env_bool` / `env_u16` / `env_u16_positive` / `env_u64` /
68 /// `env_u32_positive` helpers — MUST have a row here. When you add,
69 /// rename, or remove an env var, update this table in the same commit.
70 /// There is no compile-time check — reviewers enforce it. Legacy
71 /// aliases accepted during a rename window (e.g. `FF_MAX_CONCURRENT_TAIL`)
72 /// should be listed alongside their preferred name.
73 ///
74 /// | Variable | Default | Description |
75 /// |----------|---------|-------------|
76 /// | `FF_WAITPOINT_HMAC_SECRET` | *required* | Hex-encoded HMAC signing secret for waitpoint tokens (RFC-004 §Waitpoint Security). Even-length hex; 64 chars (32 bytes) recommended. Boot fails without it. |
77 /// | `FF_HOST` | `localhost` | Valkey host |
78 /// | `FF_PORT` | `6379` | Valkey port |
79 /// | `FF_TLS` | `false` | Enable TLS for Valkey (`1` or `true`) |
80 /// | `FF_CLUSTER` | `false` | Enable Valkey cluster mode (`1` or `true`) |
81 /// | `FF_LISTEN_ADDR` | `0.0.0.0:9090` | API listen address |
82 /// | `FF_LANES` | `default` | Comma-separated lane names; at least one non-empty lane required |
83 /// | `FF_FLOW_PARTITIONS` | `256` | Flow partition count — authoritative; under RFC-011 hash-tag co-location, exec keys also route here |
84 /// | `FF_BUDGET_PARTITIONS` | `32` | Budget partition count |
85 /// | `FF_QUOTA_PARTITIONS` | `32` | Quota partition count |
86 /// | `FF_CORS_ORIGINS` | `*` | Comma-separated CORS origins (`*` = permissive). Empty string is rejected; unset the var to get the default. |
87 /// | `FF_API_TOKEN` | *(none)* | Shared-secret Bearer token. If set, all non-`/healthz` requests require it. |
88 /// | `FF_WAITPOINT_HMAC_GRACE_MS` | `86400000` | Grace window (ms) during which tokens signed by the previous kid remain accepted after rotation. Default 24h. |
89 /// | `FF_MAX_CONCURRENT_STREAM_OPS` | `64` | Shared semaphore bound for `read_attempt_stream` + `tail_attempt_stream`. Legacy `FF_MAX_CONCURRENT_TAIL` is accepted as a fallback; if both are set, the new name wins. |
90 /// | `FF_MAX_CONCURRENT_TAIL` | *(legacy)* | Deprecated alias for `FF_MAX_CONCURRENT_STREAM_OPS`; accepted during the R4 rename window. |
91 /// | `FF_LEASE_EXPIRY_INTERVAL_MS` | `1500` | Lease-expiry scanner interval |
92 /// | `FF_DELAYED_PROMOTER_INTERVAL_MS` | `750` | Delayed-promoter scanner interval |
93 /// | `FF_INDEX_RECONCILER_INTERVAL_S` | `45` | Index reconciler interval |
94 /// | `FF_ATTEMPT_TIMEOUT_INTERVAL_S` | `2` | Attempt-timeout scanner interval |
95 /// | `FF_SUSPENSION_TIMEOUT_INTERVAL_S` | `2` | Suspension-timeout scanner interval |
96 /// | `FF_PENDING_WP_EXPIRY_INTERVAL_S` | `5` | Pending-waitpoint expiry scanner interval |
97 /// | `FF_RETENTION_TRIMMER_INTERVAL_S` | `60` | Retention-trimmer scanner interval |
98 /// | `FF_BUDGET_RESET_INTERVAL_S` | `15` | Budget-reset scanner interval |
99 /// | `FF_BUDGET_RECONCILER_INTERVAL_S` | `30` | Budget reconciler interval |
100 /// | `FF_QUOTA_RECONCILER_INTERVAL_S` | `30` | Quota reconciler interval |
101 /// | `FF_UNBLOCK_INTERVAL_S` | `5` | Unblock scanner interval |
102 /// | `FF_DEPENDENCY_RECONCILER_INTERVAL_S` | `15` | DAG dependency reconciler interval (safety net behind push-based promotion) |
103 /// | `FF_FLOW_PROJECTOR_INTERVAL_S` | `15` | Flow projector scanner interval |
104 /// | `FF_EXECUTION_DEADLINE_INTERVAL_S` | `5` | Execution-deadline scanner interval |
105 /// | `FF_CANCEL_RECONCILER_INTERVAL_S` | `15` | Cancel reconciler scanner interval |
106 pub fn from_env() -> Result<Self, ConfigError> {
107 let host = env_or("FF_HOST", "localhost");
108 let port = env_u16("FF_PORT", 6379)?;
109 let tls = env_bool("FF_TLS");
110 let cluster = env_bool("FF_CLUSTER");
111 let listen_addr = env_or("FF_LISTEN_ADDR", "0.0.0.0:9090");
112 // FF_CORS_ORIGINS contract:
113 // unset → default "*" (permissive)
114 // "*" → permissive
115 // "a,b,c" → explicit allowlist
116 // "" → hard error. An empty explicit value almost always
117 // means "I tried to unset it" which a blank env var
118 // does not do. We refuse to guess and make the
119 // operator's intent explicit.
120 let cors_raw = std::env::var("FF_CORS_ORIGINS");
121 let cors_source = match &cors_raw {
122 Ok(s) if s.is_empty() => {
123 return Err(ConfigError::InvalidValue {
124 var: "FF_CORS_ORIGINS".to_owned(),
125 message: "FF_CORS_ORIGINS is set but empty; \
126 unset it to default to \"*\", or pass \"*\" explicitly, \
127 or pass a non-empty comma-separated origin list"
128 .to_owned(),
129 });
130 }
131 Ok(s) => s.clone(),
132 Err(_) => "*".to_owned(),
133 };
134 let cors_origins: Vec<String> = cors_source
135 .split(',')
136 .map(|s| s.trim().to_owned())
137 .filter(|s| !s.is_empty())
138 .collect();
139
140 let api_token = std::env::var("FF_API_TOKEN").ok().filter(|s| !s.is_empty());
141
142 // Waitpoint HMAC secret. Required on boot — refuse to start without
143 // it so multi-tenant signal authentication can never be silently
144 // disabled. Validate hex shape eagerly; empty strings and bad hex
145 // produce a configuration error, not a runtime crash later.
146 let waitpoint_hmac_secret = std::env::var("FF_WAITPOINT_HMAC_SECRET")
147 .map_err(|_| ConfigError::InvalidValue {
148 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
149 message:
150 "required: hex-encoded HMAC signing secret for waitpoint tokens \
151 (RFC-004 §Waitpoint Security); suggested 64 hex chars (32 bytes)"
152 .to_owned(),
153 })?;
154 if waitpoint_hmac_secret.is_empty() {
155 return Err(ConfigError::InvalidValue {
156 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
157 message: "must not be empty".to_owned(),
158 });
159 }
160 if waitpoint_hmac_secret.len() % 2 != 0
161 || !waitpoint_hmac_secret.chars().all(|c| c.is_ascii_hexdigit())
162 {
163 return Err(ConfigError::InvalidValue {
164 var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
165 message: "must be an even-length hex string (0-9a-fA-F)".to_owned(),
166 });
167 }
168 let waitpoint_hmac_grace_ms = env_u64("FF_WAITPOINT_HMAC_GRACE_MS", 86_400_000)?;
169 // Preferred env var: FF_MAX_CONCURRENT_STREAM_OPS. Legacy
170 // FF_MAX_CONCURRENT_TAIL is accepted for one release to avoid
171 // breaking existing deployments mid-rename (R4 unified the two
172 // stream-op clients on one permit pool). If both are set, the
173 // new name wins.
174 let max_concurrent_stream_ops = match std::env::var("FF_MAX_CONCURRENT_STREAM_OPS") {
175 Ok(_) => env_u32_positive("FF_MAX_CONCURRENT_STREAM_OPS", 64)?,
176 Err(_) => env_u32_positive("FF_MAX_CONCURRENT_TAIL", 64)?,
177 };
178
179 let lanes: Vec<LaneId> = env_or("FF_LANES", "default")
180 .split(',')
181 .map(|s| LaneId::new(s.trim()))
182 .filter(|l| !l.as_str().is_empty())
183 .collect();
184 if lanes.is_empty() {
185 return Err(ConfigError::InvalidValue {
186 var: "FF_LANES".to_owned(),
187 message: "at least one non-empty lane name is required".to_owned(),
188 });
189 }
190
191 let partition_config = PartitionConfig {
192 // RFC-011: num_execution_partitions retired; exec keys co-locate on
193 // {fp:N}. FF_FLOW_PARTITIONS is the canonical env var.
194 num_flow_partitions: env_u16_positive("FF_FLOW_PARTITIONS", 256)?,
195 num_budget_partitions: env_u16_positive("FF_BUDGET_PARTITIONS", 32)?,
196 num_quota_partitions: env_u16_positive("FF_QUOTA_PARTITIONS", 32)?,
197 };
198
199 let lease_expiry_interval =
200 Duration::from_millis(env_u64("FF_LEASE_EXPIRY_INTERVAL_MS", 1500)?);
201 let delayed_promoter_interval =
202 Duration::from_millis(env_u64("FF_DELAYED_PROMOTER_INTERVAL_MS", 750)?);
203 let index_reconciler_interval =
204 Duration::from_secs(env_u64("FF_INDEX_RECONCILER_INTERVAL_S", 45)?);
205 let attempt_timeout_interval =
206 Duration::from_secs(env_u64("FF_ATTEMPT_TIMEOUT_INTERVAL_S", 2)?);
207 let suspension_timeout_interval =
208 Duration::from_secs(env_u64("FF_SUSPENSION_TIMEOUT_INTERVAL_S", 2)?);
209 let pending_wp_expiry_interval =
210 Duration::from_secs(env_u64("FF_PENDING_WP_EXPIRY_INTERVAL_S", 5)?);
211 let retention_trimmer_interval =
212 Duration::from_secs(env_u64("FF_RETENTION_TRIMMER_INTERVAL_S", 60)?);
213 let budget_reset_interval =
214 Duration::from_secs(env_u64("FF_BUDGET_RESET_INTERVAL_S", 15)?);
215 let budget_reconciler_interval =
216 Duration::from_secs(env_u64("FF_BUDGET_RECONCILER_INTERVAL_S", 30)?);
217 let quota_reconciler_interval =
218 Duration::from_secs(env_u64("FF_QUOTA_RECONCILER_INTERVAL_S", 30)?);
219 let unblock_interval =
220 Duration::from_secs(env_u64("FF_UNBLOCK_INTERVAL_S", 5)?);
221 // Raised from 1s (pre-Batch-C) to 15s now that push-based DAG
222 // promotion is primary. The reconciler is a safety net post-
223 // completion-listener; see ff-engine docs on
224 // `dependency_reconciler_interval`.
225 let dependency_reconciler_interval =
226 Duration::from_secs(env_u64("FF_DEPENDENCY_RECONCILER_INTERVAL_S", 15)?);
227
228 let engine_config = EngineConfig {
229 partition_config,
230 lanes: lanes.clone(),
231 lease_expiry_interval,
232 delayed_promoter_interval,
233 index_reconciler_interval,
234 attempt_timeout_interval,
235 suspension_timeout_interval,
236 pending_wp_expiry_interval,
237 retention_trimmer_interval,
238 budget_reset_interval,
239 budget_reconciler_interval,
240 quota_reconciler_interval,
241 unblock_interval,
242 dependency_reconciler_interval,
243 flow_projector_interval: Duration::from_secs(
244 env_u64("FF_FLOW_PROJECTOR_INTERVAL_S", 15)?
245 ),
246 execution_deadline_interval: Duration::from_secs(
247 env_u64("FF_EXECUTION_DEADLINE_INTERVAL_S", 5)?
248 ),
249 cancel_reconciler_interval: Duration::from_secs(
250 env_u64("FF_CANCEL_RECONCILER_INTERVAL_S", 15)?
251 ),
252 edge_cancel_dispatcher_interval: Duration::from_secs(
253 env_u64("FF_EDGE_CANCEL_DISPATCHER_INTERVAL_S", 1)?
254 ),
255 edge_cancel_reconciler_interval: Duration::from_secs(
256 env_u64("FF_EDGE_CANCEL_RECONCILER_INTERVAL_S", 10)?
257 ),
258 // Issue #122: default is no-op. Multi-tenant deployments
259 // override this after ServerConfig construction.
260 scanner_filter: Default::default(),
261 };
262
263 Ok(Self {
264 host,
265 port,
266 tls,
267 cluster,
268 partition_config,
269 lanes,
270 listen_addr,
271 engine_config,
272 skip_library_load: false,
273 cors_origins,
274 api_token,
275 waitpoint_hmac_secret,
276 waitpoint_hmac_grace_ms,
277 max_concurrent_stream_ops,
278 })
279 }
280}
281
282impl Default for ServerConfig {
283 fn default() -> Self {
284 let lanes = vec![LaneId::new("default")];
285 let partition_config = PartitionConfig::default();
286 Self {
287 host: "localhost".into(),
288 port: 6379,
289 tls: false,
290 cluster: false,
291 partition_config,
292 lanes: lanes.clone(),
293 listen_addr: "0.0.0.0:9090".into(),
294 engine_config: EngineConfig {
295 partition_config,
296 lanes,
297 ..Default::default()
298 },
299 skip_library_load: false,
300 cors_origins: vec!["*".to_owned()],
301 api_token: None,
302 // Deterministic dev/test secret. Production deployments MUST
303 // override via FF_WAITPOINT_HMAC_SECRET (ServerConfig::from_env
304 // requires it), so this default only applies to unit tests and
305 // TestCluster fixtures that skip env validation.
306 waitpoint_hmac_secret:
307 "0000000000000000000000000000000000000000000000000000000000000000"
308 .to_owned(),
309 waitpoint_hmac_grace_ms: 86_400_000,
310 max_concurrent_stream_ops: 64,
311 }
312 }
313}
314
315/// Configuration error.
316#[derive(Debug, thiserror::Error)]
317pub enum ConfigError {
318 #[error("invalid value for {var}: {message}")]
319 InvalidValue { var: String, message: String },
320}
321
322fn env_or(key: &str, default: &str) -> String {
323 std::env::var(key).unwrap_or_else(|_| default.to_owned())
324}
325
326fn env_bool(key: &str) -> bool {
327 std::env::var(key)
328 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
329 .unwrap_or(false)
330}
331
332fn env_u16(key: &str, default: u16) -> Result<u16, ConfigError> {
333 match std::env::var(key) {
334 Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
335 var: key.to_owned(),
336 message: format!("expected u16, got '{v}'"),
337 }),
338 Err(_) => Ok(default),
339 }
340}
341
342/// Like env_u16 but rejects 0 (for partition counts that are used as divisors).
343fn env_u16_positive(key: &str, default: u16) -> Result<u16, ConfigError> {
344 let val = env_u16(key, default)?;
345 if val == 0 {
346 return Err(ConfigError::InvalidValue {
347 var: key.to_owned(),
348 message: "must be > 0 (used as divisor in partition math)".to_owned(),
349 });
350 }
351 Ok(val)
352}
353
354fn env_u64(key: &str, default: u64) -> Result<u64, ConfigError> {
355 match std::env::var(key) {
356 Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
357 var: key.to_owned(),
358 message: format!("expected u64, got '{v}'"),
359 }),
360 Err(_) => Ok(default),
361 }
362}
363
364fn env_u32_positive(key: &str, default: u32) -> Result<u32, ConfigError> {
365 let val = match std::env::var(key) {
366 Ok(v) => v.parse::<u32>().map_err(|_| ConfigError::InvalidValue {
367 var: key.to_owned(),
368 message: format!("expected u32, got '{v}'"),
369 })?,
370 Err(_) => default,
371 };
372 if val == 0 {
373 return Err(ConfigError::InvalidValue {
374 var: key.to_owned(),
375 message: "must be > 0 (semaphore size)".to_owned(),
376 });
377 }
378 Ok(val)
379}