Skip to main content

ff_server/
config.rs

1use ff_core::partition::PartitionConfig;
2use ff_core::types::LaneId;
3use ff_engine::EngineConfig;
4use std::time::Duration;
5
6/// Server configuration, loaded from environment variables.
7pub struct ServerConfig {
8    /// Valkey host. Default: `"localhost"`.
9    pub host: String,
10    /// Valkey port. Default: `6379`.
11    pub port: u16,
12    /// Enable TLS for Valkey connections.
13    pub tls: bool,
14    /// Enable Valkey cluster mode.
15    pub cluster: bool,
16    /// Partition counts (execution/flow/budget/quota).
17    pub partition_config: PartitionConfig,
18    /// Lanes to manage. Default: `["default"]`.
19    pub lanes: Vec<LaneId>,
20    /// Listen address for the API surface. Default: `"0.0.0.0:9090"`.
21    pub listen_addr: String,
22    /// Scanner intervals and engine config.
23    pub engine_config: EngineConfig,
24    /// Skip library loading (for tests where TestCluster already loaded it).
25    pub skip_library_load: bool,
26    /// Allowed CORS origins. `["*"]` means permissive (all origins).
27    pub cors_origins: Vec<String>,
28    /// Shared-secret API token. If set, all requests except GET /healthz must
29    /// include `Authorization: Bearer <token>`. If unset, auth is disabled.
30    pub api_token: Option<String>,
31    /// Hex-encoded secret used to sign waitpoint HMAC tokens (RFC-004
32    /// §Waitpoint Security). Required on boot; the server refuses to start
33    /// without it so multi-tenant signal authentication is never silently
34    /// disabled. Recommended length: 64 hex chars (32 bytes).
35    pub waitpoint_hmac_secret: String,
36    /// Grace window during which tokens signed by the previous kid remain
37    /// accepted after rotation. Tokens already in flight survive operator
38    /// rotation; operators tighten this for sensitive tenants. Default 24h.
39    pub waitpoint_hmac_grace_ms: u64,
40    /// Maximum concurrent stream-op callers (`read_attempt_stream` +
41    /// `tail_attempt_stream` combined). Each caller holds one semaphore
42    /// permit for the duration of its Valkey round-trip(s); contention
43    /// surfaces as HTTP 429 at the REST boundary.
44    ///
45    /// Shared bound for both read and tail because both run on the same
46    /// dedicated `tail_client` (see `Server.tail_client`) — a big
47    /// 10_000-frame XRANGE reply can head-of-line the mux just as badly
48    /// as a long `XREAD BLOCK`, so they should share fairness accounting.
49    ///
50    /// Default `64`. Set below the server's request-concurrency budget
51    /// so stream ops cannot starve other routes. Env var:
52    /// `FF_MAX_CONCURRENT_STREAM_OPS` (preferred) or legacy
53    /// `FF_MAX_CONCURRENT_TAIL` (accepted during the R4 rename; both
54    /// valid for at least one release).
55    pub max_concurrent_stream_ops: u32,
56}
57
58impl ServerConfig {
59    /// Load configuration from environment variables.
60    ///
61    /// The table below enumerates every variable this function reads. It is
62    /// the canonical rustdoc mirror of the identical table in the top-level
63    /// `README.md`. `docs/DEPLOYMENT.md` references these names.
64    ///
65    /// **Maintenance contract:** every env var key this function consumes —
66    /// whether via a direct `std::env::var(...)` call or through the
67    /// `env_or` / `env_bool` / `env_u16` / `env_u16_positive` / `env_u64` /
68    /// `env_u32_positive` helpers — MUST have a row here. When you add,
69    /// rename, or remove an env var, update this table in the same commit.
70    /// There is no compile-time check — reviewers enforce it. Legacy
71    /// aliases accepted during a rename window (e.g. `FF_MAX_CONCURRENT_TAIL`)
72    /// should be listed alongside their preferred name.
73    ///
74    /// | Variable | Default | Description |
75    /// |----------|---------|-------------|
76    /// | `FF_WAITPOINT_HMAC_SECRET` | *required* | Hex-encoded HMAC signing secret for waitpoint tokens (RFC-004 §Waitpoint Security). Even-length hex; 64 chars (32 bytes) recommended. Boot fails without it. |
77    /// | `FF_HOST` | `localhost` | Valkey host |
78    /// | `FF_PORT` | `6379` | Valkey port |
79    /// | `FF_TLS` | `false` | Enable TLS for Valkey (`1` or `true`) |
80    /// | `FF_CLUSTER` | `false` | Enable Valkey cluster mode (`1` or `true`) |
81    /// | `FF_LISTEN_ADDR` | `0.0.0.0:9090` | API listen address |
82    /// | `FF_LANES` | `default` | Comma-separated lane names; at least one non-empty lane required |
83    /// | `FF_FLOW_PARTITIONS` | `256` | Flow partition count — authoritative; under RFC-011 hash-tag co-location, exec keys also route here |
84    /// | `FF_BUDGET_PARTITIONS` | `32` | Budget partition count |
85    /// | `FF_QUOTA_PARTITIONS` | `32` | Quota partition count |
86    /// | `FF_CORS_ORIGINS` | `*` | Comma-separated CORS origins (`*` = permissive). Empty string is rejected; unset the var to get the default. |
87    /// | `FF_API_TOKEN` | *(none)* | Shared-secret Bearer token. If set, all non-`/healthz` requests require it. |
88    /// | `FF_WAITPOINT_HMAC_GRACE_MS` | `86400000` | Grace window (ms) during which tokens signed by the previous kid remain accepted after rotation. Default 24h. |
89    /// | `FF_MAX_CONCURRENT_STREAM_OPS` | `64` | Shared semaphore bound for `read_attempt_stream` + `tail_attempt_stream`. Legacy `FF_MAX_CONCURRENT_TAIL` is accepted as a fallback; if both are set, the new name wins. |
90    /// | `FF_MAX_CONCURRENT_TAIL` | *(legacy)* | Deprecated alias for `FF_MAX_CONCURRENT_STREAM_OPS`; accepted during the R4 rename window. |
91    /// | `FF_LEASE_EXPIRY_INTERVAL_MS` | `1500` | Lease-expiry scanner interval |
92    /// | `FF_DELAYED_PROMOTER_INTERVAL_MS` | `750` | Delayed-promoter scanner interval |
93    /// | `FF_INDEX_RECONCILER_INTERVAL_S` | `45` | Index reconciler interval |
94    /// | `FF_ATTEMPT_TIMEOUT_INTERVAL_S` | `2` | Attempt-timeout scanner interval |
95    /// | `FF_SUSPENSION_TIMEOUT_INTERVAL_S` | `2` | Suspension-timeout scanner interval |
96    /// | `FF_PENDING_WP_EXPIRY_INTERVAL_S` | `5` | Pending-waitpoint expiry scanner interval |
97    /// | `FF_RETENTION_TRIMMER_INTERVAL_S` | `60` | Retention-trimmer scanner interval |
98    /// | `FF_BUDGET_RESET_INTERVAL_S` | `15` | Budget-reset scanner interval |
99    /// | `FF_BUDGET_RECONCILER_INTERVAL_S` | `30` | Budget reconciler interval |
100    /// | `FF_QUOTA_RECONCILER_INTERVAL_S` | `30` | Quota reconciler interval |
101    /// | `FF_UNBLOCK_INTERVAL_S` | `5` | Unblock scanner interval |
102    /// | `FF_DEPENDENCY_RECONCILER_INTERVAL_S` | `15` | DAG dependency reconciler interval (safety net behind push-based promotion) |
103    /// | `FF_FLOW_PROJECTOR_INTERVAL_S` | `15` | Flow projector scanner interval |
104    /// | `FF_EXECUTION_DEADLINE_INTERVAL_S` | `5` | Execution-deadline scanner interval |
105    /// | `FF_CANCEL_RECONCILER_INTERVAL_S` | `15` | Cancel reconciler scanner interval |
106    pub fn from_env() -> Result<Self, ConfigError> {
107        let host = env_or("FF_HOST", "localhost");
108        let port = env_u16("FF_PORT", 6379)?;
109        let tls = env_bool("FF_TLS");
110        let cluster = env_bool("FF_CLUSTER");
111        let listen_addr = env_or("FF_LISTEN_ADDR", "0.0.0.0:9090");
112        // FF_CORS_ORIGINS contract:
113        //   unset      → default "*" (permissive)
114        //   "*"        → permissive
115        //   "a,b,c"    → explicit allowlist
116        //   ""         → hard error. An empty explicit value almost always
117        //                means "I tried to unset it" which a blank env var
118        //                does not do. We refuse to guess and make the
119        //                operator's intent explicit.
120        let cors_raw = std::env::var("FF_CORS_ORIGINS");
121        let cors_source = match &cors_raw {
122            Ok(s) if s.is_empty() => {
123                return Err(ConfigError::InvalidValue {
124                    var: "FF_CORS_ORIGINS".to_owned(),
125                    message: "FF_CORS_ORIGINS is set but empty; \
126                              unset it to default to \"*\", or pass \"*\" explicitly, \
127                              or pass a non-empty comma-separated origin list"
128                        .to_owned(),
129                });
130            }
131            Ok(s) => s.clone(),
132            Err(_) => "*".to_owned(),
133        };
134        let cors_origins: Vec<String> = cors_source
135            .split(',')
136            .map(|s| s.trim().to_owned())
137            .filter(|s| !s.is_empty())
138            .collect();
139
140        let api_token = std::env::var("FF_API_TOKEN").ok().filter(|s| !s.is_empty());
141
142        // Waitpoint HMAC secret. Required on boot — refuse to start without
143        // it so multi-tenant signal authentication can never be silently
144        // disabled. Validate hex shape eagerly; empty strings and bad hex
145        // produce a configuration error, not a runtime crash later.
146        let waitpoint_hmac_secret = std::env::var("FF_WAITPOINT_HMAC_SECRET")
147            .map_err(|_| ConfigError::InvalidValue {
148                var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
149                message:
150                    "required: hex-encoded HMAC signing secret for waitpoint tokens \
151                     (RFC-004 §Waitpoint Security); suggested 64 hex chars (32 bytes)"
152                        .to_owned(),
153            })?;
154        if waitpoint_hmac_secret.is_empty() {
155            return Err(ConfigError::InvalidValue {
156                var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
157                message: "must not be empty".to_owned(),
158            });
159        }
160        if waitpoint_hmac_secret.len() % 2 != 0
161            || !waitpoint_hmac_secret.chars().all(|c| c.is_ascii_hexdigit())
162        {
163            return Err(ConfigError::InvalidValue {
164                var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
165                message: "must be an even-length hex string (0-9a-fA-F)".to_owned(),
166            });
167        }
168        let waitpoint_hmac_grace_ms = env_u64("FF_WAITPOINT_HMAC_GRACE_MS", 86_400_000)?;
169        // Preferred env var: FF_MAX_CONCURRENT_STREAM_OPS. Legacy
170        // FF_MAX_CONCURRENT_TAIL is accepted for one release to avoid
171        // breaking existing deployments mid-rename (R4 unified the two
172        // stream-op clients on one permit pool). If both are set, the
173        // new name wins.
174        let max_concurrent_stream_ops = match std::env::var("FF_MAX_CONCURRENT_STREAM_OPS") {
175            Ok(_) => env_u32_positive("FF_MAX_CONCURRENT_STREAM_OPS", 64)?,
176            Err(_) => env_u32_positive("FF_MAX_CONCURRENT_TAIL", 64)?,
177        };
178
179        let lanes: Vec<LaneId> = env_or("FF_LANES", "default")
180            .split(',')
181            .map(|s| LaneId::new(s.trim()))
182            .filter(|l| !l.as_str().is_empty())
183            .collect();
184        if lanes.is_empty() {
185            return Err(ConfigError::InvalidValue {
186                var: "FF_LANES".to_owned(),
187                message: "at least one non-empty lane name is required".to_owned(),
188            });
189        }
190
191        let partition_config = PartitionConfig {
192            // RFC-011: num_execution_partitions retired; exec keys co-locate on
193            // {fp:N}. FF_FLOW_PARTITIONS is the canonical env var.
194            num_flow_partitions: env_u16_positive("FF_FLOW_PARTITIONS", 256)?,
195            num_budget_partitions: env_u16_positive("FF_BUDGET_PARTITIONS", 32)?,
196            num_quota_partitions: env_u16_positive("FF_QUOTA_PARTITIONS", 32)?,
197        };
198
199        let lease_expiry_interval =
200            Duration::from_millis(env_u64("FF_LEASE_EXPIRY_INTERVAL_MS", 1500)?);
201        let delayed_promoter_interval =
202            Duration::from_millis(env_u64("FF_DELAYED_PROMOTER_INTERVAL_MS", 750)?);
203        let index_reconciler_interval =
204            Duration::from_secs(env_u64("FF_INDEX_RECONCILER_INTERVAL_S", 45)?);
205        let attempt_timeout_interval =
206            Duration::from_secs(env_u64("FF_ATTEMPT_TIMEOUT_INTERVAL_S", 2)?);
207        let suspension_timeout_interval =
208            Duration::from_secs(env_u64("FF_SUSPENSION_TIMEOUT_INTERVAL_S", 2)?);
209        let pending_wp_expiry_interval =
210            Duration::from_secs(env_u64("FF_PENDING_WP_EXPIRY_INTERVAL_S", 5)?);
211        let retention_trimmer_interval =
212            Duration::from_secs(env_u64("FF_RETENTION_TRIMMER_INTERVAL_S", 60)?);
213        let budget_reset_interval =
214            Duration::from_secs(env_u64("FF_BUDGET_RESET_INTERVAL_S", 15)?);
215        let budget_reconciler_interval =
216            Duration::from_secs(env_u64("FF_BUDGET_RECONCILER_INTERVAL_S", 30)?);
217        let quota_reconciler_interval =
218            Duration::from_secs(env_u64("FF_QUOTA_RECONCILER_INTERVAL_S", 30)?);
219        let unblock_interval =
220            Duration::from_secs(env_u64("FF_UNBLOCK_INTERVAL_S", 5)?);
221        // Raised from 1s (pre-Batch-C) to 15s now that push-based DAG
222        // promotion is primary. The reconciler is a safety net post-
223        // completion-listener; see ff-engine docs on
224        // `dependency_reconciler_interval`.
225        let dependency_reconciler_interval =
226            Duration::from_secs(env_u64("FF_DEPENDENCY_RECONCILER_INTERVAL_S", 15)?);
227
228        let engine_config = EngineConfig {
229            partition_config,
230            lanes: lanes.clone(),
231            lease_expiry_interval,
232            delayed_promoter_interval,
233            index_reconciler_interval,
234            attempt_timeout_interval,
235            suspension_timeout_interval,
236            pending_wp_expiry_interval,
237            retention_trimmer_interval,
238            budget_reset_interval,
239            budget_reconciler_interval,
240            quota_reconciler_interval,
241            unblock_interval,
242            dependency_reconciler_interval,
243            flow_projector_interval: Duration::from_secs(
244                env_u64("FF_FLOW_PROJECTOR_INTERVAL_S", 15)?
245            ),
246            execution_deadline_interval: Duration::from_secs(
247                env_u64("FF_EXECUTION_DEADLINE_INTERVAL_S", 5)?
248            ),
249            cancel_reconciler_interval: Duration::from_secs(
250                env_u64("FF_CANCEL_RECONCILER_INTERVAL_S", 15)?
251            ),
252            // Issue #122: default is no-op. Multi-tenant deployments
253            // override this after ServerConfig construction.
254            scanner_filter: Default::default(),
255        };
256
257        Ok(Self {
258            host,
259            port,
260            tls,
261            cluster,
262            partition_config,
263            lanes,
264            listen_addr,
265            engine_config,
266            skip_library_load: false,
267            cors_origins,
268            api_token,
269            waitpoint_hmac_secret,
270            waitpoint_hmac_grace_ms,
271            max_concurrent_stream_ops,
272        })
273    }
274}
275
276impl Default for ServerConfig {
277    fn default() -> Self {
278        let lanes = vec![LaneId::new("default")];
279        let partition_config = PartitionConfig::default();
280        Self {
281            host: "localhost".into(),
282            port: 6379,
283            tls: false,
284            cluster: false,
285            partition_config,
286            lanes: lanes.clone(),
287            listen_addr: "0.0.0.0:9090".into(),
288            engine_config: EngineConfig {
289                partition_config,
290                lanes,
291                ..Default::default()
292            },
293            skip_library_load: false,
294            cors_origins: vec!["*".to_owned()],
295            api_token: None,
296            // Deterministic dev/test secret. Production deployments MUST
297            // override via FF_WAITPOINT_HMAC_SECRET (ServerConfig::from_env
298            // requires it), so this default only applies to unit tests and
299            // TestCluster fixtures that skip env validation.
300            waitpoint_hmac_secret:
301                "0000000000000000000000000000000000000000000000000000000000000000"
302                    .to_owned(),
303            waitpoint_hmac_grace_ms: 86_400_000,
304            max_concurrent_stream_ops: 64,
305        }
306    }
307}
308
309/// Configuration error.
310#[derive(Debug, thiserror::Error)]
311pub enum ConfigError {
312    #[error("invalid value for {var}: {message}")]
313    InvalidValue { var: String, message: String },
314}
315
316fn env_or(key: &str, default: &str) -> String {
317    std::env::var(key).unwrap_or_else(|_| default.to_owned())
318}
319
320fn env_bool(key: &str) -> bool {
321    std::env::var(key)
322        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
323        .unwrap_or(false)
324}
325
326fn env_u16(key: &str, default: u16) -> Result<u16, ConfigError> {
327    match std::env::var(key) {
328        Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
329            var: key.to_owned(),
330            message: format!("expected u16, got '{v}'"),
331        }),
332        Err(_) => Ok(default),
333    }
334}
335
336/// Like env_u16 but rejects 0 (for partition counts that are used as divisors).
337fn env_u16_positive(key: &str, default: u16) -> Result<u16, ConfigError> {
338    let val = env_u16(key, default)?;
339    if val == 0 {
340        return Err(ConfigError::InvalidValue {
341            var: key.to_owned(),
342            message: "must be > 0 (used as divisor in partition math)".to_owned(),
343        });
344    }
345    Ok(val)
346}
347
348fn env_u64(key: &str, default: u64) -> Result<u64, ConfigError> {
349    match std::env::var(key) {
350        Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
351            var: key.to_owned(),
352            message: format!("expected u64, got '{v}'"),
353        }),
354        Err(_) => Ok(default),
355    }
356}
357
358fn env_u32_positive(key: &str, default: u32) -> Result<u32, ConfigError> {
359    let val = match std::env::var(key) {
360        Ok(v) => v.parse::<u32>().map_err(|_| ConfigError::InvalidValue {
361            var: key.to_owned(),
362            message: format!("expected u32, got '{v}'"),
363        })?,
364        Err(_) => default,
365    };
366    if val == 0 {
367        return Err(ConfigError::InvalidValue {
368            var: key.to_owned(),
369            message: "must be > 0 (semaphore size)".to_owned(),
370        });
371    }
372    Ok(val)
373}