Skip to main content

ff_server/
config.rs

1use ff_core::partition::PartitionConfig;
2use ff_core::types::LaneId;
3use ff_engine::EngineConfig;
4use std::time::Duration;
5
6/// Server configuration, loaded from environment variables.
7pub struct ServerConfig {
8    /// Valkey host. Default: `"localhost"`.
9    pub host: String,
10    /// Valkey port. Default: `6379`.
11    pub port: u16,
12    /// Enable TLS for Valkey connections.
13    pub tls: bool,
14    /// Enable Valkey cluster mode.
15    pub cluster: bool,
16    /// Partition counts (execution/flow/budget/quota).
17    pub partition_config: PartitionConfig,
18    /// Lanes to manage. Default: `["default"]`.
19    pub lanes: Vec<LaneId>,
20    /// Listen address for the API surface. Default: `"0.0.0.0:9090"`.
21    pub listen_addr: String,
22    /// Scanner intervals and engine config.
23    pub engine_config: EngineConfig,
24    /// Skip library loading (for tests where TestCluster already loaded it).
25    pub skip_library_load: bool,
26    /// Allowed CORS origins. `["*"]` means permissive (all origins).
27    pub cors_origins: Vec<String>,
28    /// Shared-secret API token. If set, all requests except GET /healthz must
29    /// include `Authorization: Bearer <token>`. If unset, auth is disabled.
30    pub api_token: Option<String>,
31    /// Hex-encoded secret used to sign waitpoint HMAC tokens (RFC-004
32    /// §Waitpoint Security). Required on boot; the server refuses to start
33    /// without it so multi-tenant signal authentication is never silently
34    /// disabled. Recommended length: 64 hex chars (32 bytes).
35    pub waitpoint_hmac_secret: String,
36    /// Grace window during which tokens signed by the previous kid remain
37    /// accepted after rotation. Tokens already in flight survive operator
38    /// rotation; operators tighten this for sensitive tenants. Default 24h.
39    pub waitpoint_hmac_grace_ms: u64,
40    /// Maximum concurrent stream-op callers (`read_attempt_stream` +
41    /// `tail_attempt_stream` combined). Each caller holds one semaphore
42    /// permit for the duration of its Valkey round-trip(s); contention
43    /// surfaces as HTTP 429 at the REST boundary.
44    ///
45    /// Shared bound for both read and tail because both run on the same
46    /// dedicated `tail_client` (see `Server.tail_client`) — a big
47    /// 10_000-frame XRANGE reply can head-of-line the mux just as badly
48    /// as a long `XREAD BLOCK`, so they should share fairness accounting.
49    ///
50    /// Default `64`. Set below the server's request-concurrency budget
51    /// so stream ops cannot starve other routes. Env var:
52    /// `FF_MAX_CONCURRENT_STREAM_OPS` (preferred) or legacy
53    /// `FF_MAX_CONCURRENT_TAIL` (accepted during the R4 rename; both
54    /// valid for at least one release).
55    pub max_concurrent_stream_ops: u32,
56}
57
58impl ServerConfig {
59    /// Load configuration from environment variables.
60    ///
61    /// | Variable | Default | Description |
62    /// |----------|---------|-------------|
63    /// | `FF_HOST` | `localhost` | Valkey host |
64    /// | `FF_PORT` | `6379` | Valkey port |
65    /// | `FF_TLS` | `false` | Enable TLS (`1` or `true`) |
66    /// | `FF_CLUSTER` | `false` | Enable cluster mode (`1` or `true`) |
67    /// | `FF_LISTEN_ADDR` | `0.0.0.0:9090` | API listen address |
68    /// | `FF_LANES` | `default` | Comma-separated lane names |
69    /// | `FF_FLOW_PARTITIONS` | `256` | Flow partition count — authoritative; under RFC-011 hash-tag co-location, exec keys also route here |
70    /// | `FF_BUDGET_PARTITIONS` | `32` | Budget partition count |
71    /// | `FF_QUOTA_PARTITIONS` | `32` | Quota partition count |
72    /// | `FF_CORS_ORIGINS` | `*` | Comma-separated CORS origins (`*` = permissive) |
73    /// | `FF_API_TOKEN` | *(none)* | Shared-secret Bearer token. If set, all non-healthz requests require it. |
74    /// | `FF_LEASE_EXPIRY_INTERVAL_MS` | `1500` | Lease expiry scanner interval |
75    /// | `FF_DELAYED_PROMOTER_INTERVAL_MS` | `750` | Delayed promoter interval |
76    /// | `FF_INDEX_RECONCILER_INTERVAL_S` | `45` | Index reconciler interval |
77    pub fn from_env() -> Result<Self, ConfigError> {
78        let host = env_or("FF_HOST", "localhost");
79        let port = env_u16("FF_PORT", 6379)?;
80        let tls = env_bool("FF_TLS");
81        let cluster = env_bool("FF_CLUSTER");
82        let listen_addr = env_or("FF_LISTEN_ADDR", "0.0.0.0:9090");
83        // FF_CORS_ORIGINS contract:
84        //   unset      → default "*" (permissive)
85        //   "*"        → permissive
86        //   "a,b,c"    → explicit allowlist
87        //   ""         → hard error. An empty explicit value almost always
88        //                means "I tried to unset it" which a blank env var
89        //                does not do. We refuse to guess and make the
90        //                operator's intent explicit.
91        let cors_raw = std::env::var("FF_CORS_ORIGINS");
92        let cors_source = match &cors_raw {
93            Ok(s) if s.is_empty() => {
94                return Err(ConfigError::InvalidValue {
95                    var: "FF_CORS_ORIGINS".to_owned(),
96                    message: "FF_CORS_ORIGINS is set but empty; \
97                              unset it to default to \"*\", or pass \"*\" explicitly, \
98                              or pass a non-empty comma-separated origin list"
99                        .to_owned(),
100                });
101            }
102            Ok(s) => s.clone(),
103            Err(_) => "*".to_owned(),
104        };
105        let cors_origins: Vec<String> = cors_source
106            .split(',')
107            .map(|s| s.trim().to_owned())
108            .filter(|s| !s.is_empty())
109            .collect();
110
111        let api_token = std::env::var("FF_API_TOKEN").ok().filter(|s| !s.is_empty());
112
113        // Waitpoint HMAC secret. Required on boot — refuse to start without
114        // it so multi-tenant signal authentication can never be silently
115        // disabled. Validate hex shape eagerly; empty strings and bad hex
116        // produce a configuration error, not a runtime crash later.
117        let waitpoint_hmac_secret = std::env::var("FF_WAITPOINT_HMAC_SECRET")
118            .map_err(|_| ConfigError::InvalidValue {
119                var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
120                message:
121                    "required: hex-encoded HMAC signing secret for waitpoint tokens \
122                     (RFC-004 §Waitpoint Security); suggested 64 hex chars (32 bytes)"
123                        .to_owned(),
124            })?;
125        if waitpoint_hmac_secret.is_empty() {
126            return Err(ConfigError::InvalidValue {
127                var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
128                message: "must not be empty".to_owned(),
129            });
130        }
131        if waitpoint_hmac_secret.len() % 2 != 0
132            || !waitpoint_hmac_secret.chars().all(|c| c.is_ascii_hexdigit())
133        {
134            return Err(ConfigError::InvalidValue {
135                var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
136                message: "must be an even-length hex string (0-9a-fA-F)".to_owned(),
137            });
138        }
139        let waitpoint_hmac_grace_ms = env_u64("FF_WAITPOINT_HMAC_GRACE_MS", 86_400_000)?;
140        // Preferred env var: FF_MAX_CONCURRENT_STREAM_OPS. Legacy
141        // FF_MAX_CONCURRENT_TAIL is accepted for one release to avoid
142        // breaking existing deployments mid-rename (R4 unified the two
143        // stream-op clients on one permit pool). If both are set, the
144        // new name wins.
145        let max_concurrent_stream_ops = match std::env::var("FF_MAX_CONCURRENT_STREAM_OPS") {
146            Ok(_) => env_u32_positive("FF_MAX_CONCURRENT_STREAM_OPS", 64)?,
147            Err(_) => env_u32_positive("FF_MAX_CONCURRENT_TAIL", 64)?,
148        };
149
150        let lanes: Vec<LaneId> = env_or("FF_LANES", "default")
151            .split(',')
152            .map(|s| LaneId::new(s.trim()))
153            .filter(|l| !l.as_str().is_empty())
154            .collect();
155        if lanes.is_empty() {
156            return Err(ConfigError::InvalidValue {
157                var: "FF_LANES".to_owned(),
158                message: "at least one non-empty lane name is required".to_owned(),
159            });
160        }
161
162        let partition_config = PartitionConfig {
163            // RFC-011: num_execution_partitions retired; exec keys co-locate on
164            // {fp:N}. FF_FLOW_PARTITIONS is the canonical env var.
165            num_flow_partitions: env_u16_positive("FF_FLOW_PARTITIONS", 256)?,
166            num_budget_partitions: env_u16_positive("FF_BUDGET_PARTITIONS", 32)?,
167            num_quota_partitions: env_u16_positive("FF_QUOTA_PARTITIONS", 32)?,
168        };
169
170        let lease_expiry_interval =
171            Duration::from_millis(env_u64("FF_LEASE_EXPIRY_INTERVAL_MS", 1500)?);
172        let delayed_promoter_interval =
173            Duration::from_millis(env_u64("FF_DELAYED_PROMOTER_INTERVAL_MS", 750)?);
174        let index_reconciler_interval =
175            Duration::from_secs(env_u64("FF_INDEX_RECONCILER_INTERVAL_S", 45)?);
176        let attempt_timeout_interval =
177            Duration::from_secs(env_u64("FF_ATTEMPT_TIMEOUT_INTERVAL_S", 2)?);
178        let suspension_timeout_interval =
179            Duration::from_secs(env_u64("FF_SUSPENSION_TIMEOUT_INTERVAL_S", 2)?);
180        let pending_wp_expiry_interval =
181            Duration::from_secs(env_u64("FF_PENDING_WP_EXPIRY_INTERVAL_S", 5)?);
182        let retention_trimmer_interval =
183            Duration::from_secs(env_u64("FF_RETENTION_TRIMMER_INTERVAL_S", 60)?);
184        let budget_reset_interval =
185            Duration::from_secs(env_u64("FF_BUDGET_RESET_INTERVAL_S", 15)?);
186        let budget_reconciler_interval =
187            Duration::from_secs(env_u64("FF_BUDGET_RECONCILER_INTERVAL_S", 30)?);
188        let quota_reconciler_interval =
189            Duration::from_secs(env_u64("FF_QUOTA_RECONCILER_INTERVAL_S", 30)?);
190        let unblock_interval =
191            Duration::from_secs(env_u64("FF_UNBLOCK_INTERVAL_S", 5)?);
192        // Raised from 1s (pre-Batch-C) to 15s now that push-based DAG
193        // promotion is primary. The reconciler is a safety net post-
194        // completion-listener; see ff-engine docs on
195        // `dependency_reconciler_interval`.
196        let dependency_reconciler_interval =
197            Duration::from_secs(env_u64("FF_DEPENDENCY_RECONCILER_INTERVAL_S", 15)?);
198
199        let engine_config = EngineConfig {
200            partition_config,
201            lanes: lanes.clone(),
202            lease_expiry_interval,
203            delayed_promoter_interval,
204            index_reconciler_interval,
205            attempt_timeout_interval,
206            suspension_timeout_interval,
207            pending_wp_expiry_interval,
208            retention_trimmer_interval,
209            budget_reset_interval,
210            budget_reconciler_interval,
211            quota_reconciler_interval,
212            unblock_interval,
213            dependency_reconciler_interval,
214            // Listener is owned by `Server::start`, which has the
215            // Valkey endpoint info. Left None here; populated when
216            // the ServerConfig gets consumed by the server.
217            completion_listener: None,
218            flow_projector_interval: Duration::from_secs(
219                env_u64("FF_FLOW_PROJECTOR_INTERVAL_S", 15)?
220            ),
221            execution_deadline_interval: Duration::from_secs(
222                env_u64("FF_EXECUTION_DEADLINE_INTERVAL_S", 5)?
223            ),
224        };
225
226        Ok(Self {
227            host,
228            port,
229            tls,
230            cluster,
231            partition_config,
232            lanes,
233            listen_addr,
234            engine_config,
235            skip_library_load: false,
236            cors_origins,
237            api_token,
238            waitpoint_hmac_secret,
239            waitpoint_hmac_grace_ms,
240            max_concurrent_stream_ops,
241        })
242    }
243}
244
245impl Default for ServerConfig {
246    fn default() -> Self {
247        let lanes = vec![LaneId::new("default")];
248        let partition_config = PartitionConfig::default();
249        Self {
250            host: "localhost".into(),
251            port: 6379,
252            tls: false,
253            cluster: false,
254            partition_config,
255            lanes: lanes.clone(),
256            listen_addr: "0.0.0.0:9090".into(),
257            engine_config: EngineConfig {
258                partition_config,
259                lanes,
260                ..Default::default()
261            },
262            skip_library_load: false,
263            cors_origins: vec!["*".to_owned()],
264            api_token: None,
265            // Deterministic dev/test secret. Production deployments MUST
266            // override via FF_WAITPOINT_HMAC_SECRET (ServerConfig::from_env
267            // requires it), so this default only applies to unit tests and
268            // TestCluster fixtures that skip env validation.
269            waitpoint_hmac_secret:
270                "0000000000000000000000000000000000000000000000000000000000000000"
271                    .to_owned(),
272            waitpoint_hmac_grace_ms: 86_400_000,
273            max_concurrent_stream_ops: 64,
274        }
275    }
276}
277
278/// Configuration error.
279#[derive(Debug, thiserror::Error)]
280pub enum ConfigError {
281    #[error("invalid value for {var}: {message}")]
282    InvalidValue { var: String, message: String },
283}
284
285fn env_or(key: &str, default: &str) -> String {
286    std::env::var(key).unwrap_or_else(|_| default.to_owned())
287}
288
289fn env_bool(key: &str) -> bool {
290    std::env::var(key)
291        .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
292        .unwrap_or(false)
293}
294
295fn env_u16(key: &str, default: u16) -> Result<u16, ConfigError> {
296    match std::env::var(key) {
297        Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
298            var: key.to_owned(),
299            message: format!("expected u16, got '{v}'"),
300        }),
301        Err(_) => Ok(default),
302    }
303}
304
305/// Like env_u16 but rejects 0 (for partition counts that are used as divisors).
306fn env_u16_positive(key: &str, default: u16) -> Result<u16, ConfigError> {
307    let val = env_u16(key, default)?;
308    if val == 0 {
309        return Err(ConfigError::InvalidValue {
310            var: key.to_owned(),
311            message: "must be > 0 (used as divisor in partition math)".to_owned(),
312        });
313    }
314    Ok(val)
315}
316
317fn env_u64(key: &str, default: u64) -> Result<u64, ConfigError> {
318    match std::env::var(key) {
319        Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
320            var: key.to_owned(),
321            message: format!("expected u64, got '{v}'"),
322        }),
323        Err(_) => Ok(default),
324    }
325}
326
327fn env_u32_positive(key: &str, default: u32) -> Result<u32, ConfigError> {
328    let val = match std::env::var(key) {
329        Ok(v) => v.parse::<u32>().map_err(|_| ConfigError::InvalidValue {
330            var: key.to_owned(),
331            message: format!("expected u32, got '{v}'"),
332        })?,
333        Err(_) => default,
334    };
335    if val == 0 {
336        return Err(ConfigError::InvalidValue {
337            var: key.to_owned(),
338            message: "must be > 0 (semaphore size)".to_owned(),
339        });
340    }
341    Ok(val)
342}