Skip to main content

ff_server/
server.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9    AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10    CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13    ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14    DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15    ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16    ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17    RevokeLeaseResult,
18    RotateWaitpointHmacSecretArgs,
19    StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22    self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23    IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26    budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27    PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36/// Upper bound on `member_execution_ids` returned in the
37/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
38/// terminal state (idempotent retry). The first (non-idempotent) cancel call
39/// returns the full list; retries only need a sample.
40const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42/// Re-export of the budget dimension cap.
43///
44/// Defined as the single source of truth in `ff_script::functions::budget` so
45/// the typed FCALL wrappers and the REST boundary cannot silently drift
46/// (PR #106 review). The limit exists to cap FCALL ARGV allocation: both
47/// `create_budget` and `report_usage` build argv whose length is linear in
48/// `dimensions.len()`, so an untrusted caller could otherwise request an
49/// unbounded `Vec` allocation (CodeQL `rust/uncontrolled-allocation-size`,
50/// issue #104).
51pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53/// Validate `create_budget` dimension inputs before building the FCALL argv.
54///
55/// Rejects:
56///   * more than [`MAX_BUDGET_DIMENSIONS`] dimensions (prevents unbounded
57///     `Vec::with_capacity` allocation on attacker-controlled length);
58///   * parallel-array length mismatches between `dimensions`, `hard_limits`,
59///     and `soft_limits` — these are positional inputs the Lua side indexes
60///     by `i = 1..dim_count`, so a mismatch silently corrupts limits rather
61///     than raising.
62fn validate_create_budget_dimensions(
63    dimensions: &[String],
64    hard_limits: &[u64],
65    soft_limits: &[u64],
66) -> Result<(), ServerError> {
67    let dim_count = dimensions.len();
68    if dim_count > MAX_BUDGET_DIMENSIONS {
69        return Err(ServerError::InvalidInput(format!(
70            "too_many_dimensions: limit={}, got={}",
71            MAX_BUDGET_DIMENSIONS, dim_count
72        )));
73    }
74    if hard_limits.len() != dim_count {
75        return Err(ServerError::InvalidInput(format!(
76            "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77            dim_count,
78            hard_limits.len()
79        )));
80    }
81    if soft_limits.len() != dim_count {
82        return Err(ServerError::InvalidInput(format!(
83            "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84            dim_count,
85            soft_limits.len()
86        )));
87    }
88    Ok(())
89}
90
91/// Validate `report_usage` dimension inputs before building the FCALL argv.
92///
93/// Same class of defense as [`validate_create_budget_dimensions`]: caps
94/// argv length and enforces the `dimensions`/`deltas` parallel-array
95/// invariant the Lua side relies on.
96fn validate_report_usage_dimensions(
97    dimensions: &[String],
98    deltas: &[u64],
99) -> Result<(), ServerError> {
100    let dim_count = dimensions.len();
101    if dim_count > MAX_BUDGET_DIMENSIONS {
102        return Err(ServerError::InvalidInput(format!(
103            "too_many_dimensions: limit={}, got={}",
104            MAX_BUDGET_DIMENSIONS, dim_count
105        )));
106    }
107    if deltas.len() != dim_count {
108        return Err(ServerError::InvalidInput(format!(
109            "dimension_delta_array_mismatch: dimensions={} deltas={}",
110            dim_count,
111            deltas.len()
112        )));
113    }
114    Ok(())
115}
116
117/// FlowFabric server — connects everything together.
118///
119/// Manages the Valkey connection, Lua library loading, background scanners,
120/// and provides a minimal API for Phase 1.
121pub struct Server {
122    client: Client,
123    /// Dedicated Valkey connection used EXCLUSIVELY for stream-op calls:
124    /// `xread_block` tails AND `ff_read_attempt_stream` range reads.
125    /// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
126    /// processes commands FIFO on it.
127    ///
128    /// Two head-of-line risks motivate the split from the main client:
129    ///
130    /// * **Blocking**: `XREAD BLOCK 30_000` holds the read side until a
131    ///   new entry arrives or `block_ms` elapses.
132    /// * **Large replies**: `XRANGE … COUNT 10_000` with ~64 KB per
133    ///   frame returns a multi-MB reply serialized on one connection.
134    ///
135    /// Sharing either load with the main client would starve every other
136    /// FCALL (create_execution, claim, rotate_waitpoint_secret,
137    /// budget/quota, admin endpoints) AND every engine scanner.
138    ///
139    /// Kept separate from `client` and from the `Engine` scanner client so
140    /// tail latency cannot couple to foreground API latency or background
141    /// scanner cadence. See RFC-006 Impl Notes for the cascading-failure
142    /// rationale.
143    tail_client: Client,
144    /// Bounds concurrent stream-op calls server-wide — read AND tail
145    /// combined. Each caller acquires one permit for the duration of its
146    /// Valkey round-trip(s); contention surfaces as HTTP 429 at the REST
147    /// boundary, not as a silent queue on the stream connection. Default
148    /// size is `FF_MAX_CONCURRENT_STREAM_OPS` (64; legacy env
149    /// `FF_MAX_CONCURRENT_TAIL` accepted for one release).
150    ///
151    /// Read and tail share the same pool deliberately: they share the
152    /// `tail_client`, so fairness accounting must be unified or a flood
153    /// of one can starve the other. The semaphore is also `close()`d on
154    /// shutdown so no new stream ops can start while existing ones drain
155    /// (see `Server::shutdown`).
156    stream_semaphore: Arc<tokio::sync::Semaphore>,
157    /// Serializes `XREAD BLOCK` calls against `tail_client`.
158    ///
159    /// `ferriskey::Client` is a pipelined multiplexed connection — Valkey
160    /// processes commands FIFO on one socket. `XREAD BLOCK` holds the
161    /// connection's read side for the full `block_ms`, so two parallel
162    /// BLOCKs sent down the same mux serialize: the second waits for the
163    /// first to return before its own BLOCK even begins at the server.
164    /// Meanwhile ferriskey's per-call `request_timeout` (auto-extended to
165    /// `block_ms + 500ms`) starts at future-poll on the CLIENT side, so
166    /// the second call's timeout fires before its turn at the server —
167    /// spurious `timed_out` errors under concurrent tail load.
168    ///
169    /// Explicit serialization around `xread_block` removes the
170    /// silent-failure mode: concurrent tails queue on this Mutex (inside
171    /// an already-acquired semaphore permit), then dispatch one at a
172    /// time with their full `block_ms` budget intact. The semaphore
173    /// ceiling (`max_concurrent_stream_ops`) effectively becomes queue
174    /// depth; throughput on the tail client is 1 BLOCK at a time.
175    ///
176    /// V2 upgrade: a pool of N dedicated `ferriskey::Client` connections
177    /// replacing the single `tail_client` + this Mutex. Deferred; the
178    /// Mutex here is correct v1 behavior.
179    ///
180    /// XRANGE reads (`read_attempt_stream`) are NOT gated by this Mutex —
181    /// XRANGE is non-blocking at the server, so pipelined XRANGEs on one
182    /// mux complete in microseconds each and don't trigger the same
183    /// client-side timeout race. Keeping reads unserialized preserves
184    /// read throughput.
185    xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186    /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
187    /// operators rotate ~monthly and can afford to serialize; concurrent
188    /// rotate requests are an attack or misbehaving script. Holding the
189    /// permit also guards against interleaved partial rotations on the
190    /// Server side of the per-partition locks, surfacing contention as
191    /// HTTP 429 instead of silently queueing and blowing past the 120s
192    /// HTTP timeout. See `rotate_waitpoint_secret` handler.
193    admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194    engine: Engine,
195    config: ServerConfig,
196    /// Long-lived scheduler instance. Held on the server (not rebuilt per
197    /// claim call) so its rotation cursor can advance across calls — a
198    /// fresh-per-call scheduler would reset the cursor on every tick,
199    /// defeating the fairness property (RFC-009 §scan rotation).
200    scheduler: Arc<ff_scheduler::Scheduler>,
201    /// Background tasks spawned by async handlers (e.g. cancel_flow member
202    /// dispatch). Drained on shutdown with a bounded timeout.
203    background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204    /// PR-94: observability registry. Always present; the no-op shim
205    /// takes zero runtime cost when the `observability` feature is
206    /// off, and the real OTEL-backed registry is passed in via
207    /// [`Server::start_with_metrics`] when on. Same `Arc` is shared
208    /// with [`Engine::start_with_metrics`] and
209    /// [`ff_scheduler::Scheduler::with_metrics`] so a single scrape
210    /// sees everything the process produces.
211    metrics: Arc<ff_observability::Metrics>,
212}
213
214/// Server error type.
215#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217    /// Backend transport error. Previously wrapped `ferriskey::Error`
218    /// directly (#88); now carries a backend-agnostic
219    /// [`ff_core::BackendError`] so consumers match on
220    /// [`ff_core::BackendErrorKind`] instead of ferriskey's native
221    /// taxonomy. The ferriskey → [`ff_core::BackendError`] mapping
222    /// lives in `ff_backend_valkey::backend_error_from_ferriskey`.
223    #[error("backend: {0}")]
224    Backend(#[from] ff_core::BackendError),
225    /// Backend error with additional context. Previously
226    /// `ValkeyContext { source: ferriskey::Error }` (#88).
227    #[error("backend ({context}): {source}")]
228    BackendContext {
229        #[source]
230        source: ff_core::BackendError,
231        context: String,
232    },
233    #[error("config: {0}")]
234    Config(#[from] crate::config::ConfigError),
235    #[error("library load: {0}")]
236    LibraryLoad(#[from] ff_script::loader::LoadError),
237    #[error("partition mismatch: {0}")]
238    PartitionMismatch(String),
239    #[error("not found: {0}")]
240    NotFound(String),
241    #[error("invalid input: {0}")]
242    InvalidInput(String),
243    #[error("operation failed: {0}")]
244    OperationFailed(String),
245    #[error("script: {0}")]
246    Script(String),
247    /// Server-wide concurrency limit reached on a labelled pool. Surfaces
248    /// as HTTP 429 at the REST boundary so load balancers and clients can
249    /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
250    /// etc.) identifies WHICH pool is exhausted so operators aren't
251    /// misled by a single "tail unavailable" string when the real fault
252    /// is rotation contention.
253    ///
254    /// Fields: (source_label, max_permits).
255    #[error("too many concurrent {0} calls (max: {1})")]
256    ConcurrencyLimitExceeded(&'static str, u32),
257    /// Detected Valkey version is below the RFC-011 §13 minimum. The engine
258    /// depends on Valkey Functions (stabilized in 7.2), RESP3 (7.2+), and
259    /// hash-tag routing; older versions do not implement the required
260    /// primitives. Operator action: upgrade Valkey.
261    #[error(
262        "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
263    )]
264    ValkeyVersionTooLow {
265        detected: String,
266        required: String,
267    },
268}
269
270/// Lift a native `ferriskey::Error` into [`ServerError::Backend`] via
271/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
272/// `?`-propagation ergonomic at ferriskey call sites while the
273/// public variant stays backend-agnostic.
274impl From<ferriskey::Error> for ServerError {
275    fn from(err: ferriskey::Error) -> Self {
276        Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
277    }
278}
279
280/// Build a [`ServerError::BackendContext`] from a native
281/// `ferriskey::Error` and a call-site label (#88).
282pub(crate) fn backend_context(
283    err: ferriskey::Error,
284    context: impl Into<String>,
285) -> ServerError {
286    ServerError::BackendContext {
287        source: ff_backend_valkey::backend_error_from_ferriskey(&err),
288        context: context.into(),
289    }
290}
291
292impl ServerError {
293    /// Returns the classified [`ff_core::BackendErrorKind`] if this
294    /// error carries a backend transport fault. Covers direct
295    /// Backend variants and library-load failures.
296    ///
297    /// Renamed from `valkey_kind` in #88 — the previous return type
298    /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
299    /// consumer doing retry/error classification.
300    pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
301        match self {
302            Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
303            Self::LibraryLoad(e) => e
304                .valkey_kind()
305                .map(ff_backend_valkey::classify_ferriskey_kind),
306            _ => None,
307        }
308    }
309
310    /// Whether this error is safely retryable by a caller. For
311    /// backend transport variants, delegates to
312    /// [`ff_core::BackendErrorKind::is_retryable`]. Business-logic
313    /// rejections (NotFound, InvalidInput, OperationFailed, Script,
314    /// Config, PartitionMismatch) return false — those won't change
315    /// on retry.
316    pub fn is_retryable(&self) -> bool {
317        match self {
318            Self::Backend(be) | Self::BackendContext { source: be, .. } => {
319                be.kind().is_retryable()
320            }
321            Self::LibraryLoad(load_err) => load_err
322                .valkey_kind()
323                .map(is_retryable_kind)
324                .unwrap_or(false),
325            Self::Config(_)
326            | Self::PartitionMismatch(_)
327            | Self::NotFound(_)
328            | Self::InvalidInput(_)
329            | Self::OperationFailed(_)
330            | Self::Script(_) => false,
331            // Back off and retry — the bound is a server-side permit pool,
332            // so the retry will succeed once a permit frees up. Applies
333            // equally to stream ops, admin rotate, etc.
334            Self::ConcurrencyLimitExceeded(_, _) => true,
335            // Operator must upgrade Valkey; a retry at the caller won't help.
336            Self::ValkeyVersionTooLow { .. } => false,
337        }
338    }
339}
340
341impl Server {
342    /// Start the FlowFabric server.
343    ///
344    /// Boot sequence:
345    /// 1. Connect to Valkey
346    /// 2. Validate or create partition config key
347    /// 3. Load the FlowFabric Lua library
348    /// 4. Start engine (14 background scanners)
349    pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
350        Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
351    }
352
353    /// PR-94: boot the server with a shared observability registry.
354    ///
355    /// Scanner cycle + scheduler metrics record into this registry;
356    /// `main.rs` threads the same handle into the router so `/metrics`
357    /// exposes what the engine produces. The no-arg [`Server::start`]
358    /// forwards here with a fresh `Metrics::new()` — under the default
359    /// build that's the shim, under `observability` it's a real
360    /// registry not shared with any HTTP route (useful for tests
361    /// exercising the engine in isolation).
362    pub async fn start_with_metrics(
363        config: ServerConfig,
364        metrics: Arc<ff_observability::Metrics>,
365    ) -> Result<Self, ServerError> {
366        // Step 1: Connect to Valkey via ClientBuilder
367        tracing::info!(
368            host = %config.host, port = config.port,
369            tls = config.tls, cluster = config.cluster,
370            "connecting to Valkey"
371        );
372        let mut builder = ClientBuilder::new()
373            .host(&config.host, config.port)
374            .connect_timeout(Duration::from_secs(10))
375            .request_timeout(Duration::from_millis(5000));
376        if config.tls {
377            builder = builder.tls();
378        }
379        if config.cluster {
380            builder = builder.cluster();
381        }
382        let client = builder
383            .build()
384            .await
385            .map_err(|e| crate::server::backend_context(e, "connect"))?;
386
387        // Verify connectivity
388        let pong: String = client
389            .cmd("PING")
390            .execute()
391            .await
392            .map_err(|e| crate::server::backend_context(e, "PING"))?;
393        if pong != "PONG" {
394            return Err(ServerError::OperationFailed(format!(
395                "unexpected PING response: {pong}"
396            )));
397        }
398        tracing::info!("Valkey connection established");
399
400        // Step 1b: Verify Valkey version meets the RFC-011 §13 minimum (8.0).
401        // Tolerates a rolling upgrade via a 60s exponential-backoff budget
402        // per RFC-011 §9.17 — transient INFO errors during a node restart
403        // don't trip the check until the whole budget is exhausted.
404        verify_valkey_version(&client).await?;
405
406        // Step 2: Validate or create partition config
407        validate_or_create_partition_config(&client, &config.partition_config).await?;
408
409        // Step 2b: Install waitpoint HMAC secret into every execution partition
410        // (RFC-004 §Waitpoint Security). Fail-fast: if any partition fails,
411        // the server refuses to start — a partial install would silently
412        // reject signal deliveries on half the partitions.
413        initialize_waitpoint_hmac_secret(
414            &client,
415            &config.partition_config,
416            &config.waitpoint_hmac_secret,
417        )
418        .await?;
419
420        // Step 3: Load Lua library (skippable for tests where fixture already loaded)
421        if !config.skip_library_load {
422            tracing::info!("loading flowfabric Lua library");
423            ff_script::loader::ensure_library(&client)
424                .await
425                .map_err(ServerError::LibraryLoad)?;
426        } else {
427            tracing::info!("skipping library load (skip_library_load=true)");
428        }
429
430        // Step 4: Start engine with scanners
431        // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
432        let engine_cfg = ff_engine::EngineConfig {
433            partition_config: config.partition_config,
434            lanes: config.lanes.clone(),
435            lease_expiry_interval: config.engine_config.lease_expiry_interval,
436            delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
437            index_reconciler_interval: config.engine_config.index_reconciler_interval,
438            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
439            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
440            pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
441            retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
442            budget_reset_interval: config.engine_config.budget_reset_interval,
443            budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
444            quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
445            unblock_interval: config.engine_config.unblock_interval,
446            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
447            flow_projector_interval: config.engine_config.flow_projector_interval,
448            execution_deadline_interval: config.engine_config.execution_deadline_interval,
449            cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
450            scanner_filter: config.engine_config.scanner_filter.clone(),
451        };
452        // Engine scanners keep running on the MAIN `client` mux — NOT on
453        // `tail_client`. Scanner cadence is foreground-latency-coupled by
454        // design (an incident that blocks all FCALLs should also visibly
455        // block scanners), and keeping scanners off the tail client means a
456        // long-poll tail can never starve lease-expiry, retention-trim,
457        // etc. Do not change this without revisiting RFC-006 Impl Notes.
458        // Build the Valkey completion backend (issue #90) and subscribe.
459        // This replaces the pre-#90 `CompletionListenerConfig` path:
460        // the wire subscription now lives in ff-backend-valkey, the
461        // engine just consumes the resulting stream.
462        let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
463            config.host.clone(),
464            config.port,
465        );
466        valkey_conn.tls = config.tls;
467        valkey_conn.cluster = config.cluster;
468        let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
469            client.clone(),
470            config.partition_config,
471            valkey_conn,
472        );
473        let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
474            .await
475            .map_err(|e| ServerError::OperationFailed(format!(
476                "subscribe_completions: {e}"
477            )))?;
478
479        let engine = Engine::start_with_completions(
480            engine_cfg,
481            client.clone(),
482            metrics.clone(),
483            completion_stream,
484        );
485
486        // Dedicated tail client. Built AFTER library load + HMAC install
487        // because those steps use the main client; tail client only runs
488        // XREAD/XREAD BLOCK + HMGET on stream_meta, so it never needs the
489        // library loaded — but we build it on the same host/port/TLS
490        // options so network reachability is identical.
491        tracing::info!("opening dedicated tail connection");
492        let mut tail_builder = ClientBuilder::new()
493            .host(&config.host, config.port)
494            .connect_timeout(Duration::from_secs(10))
495            // `request_timeout` is ignored for XREAD BLOCK (ferriskey
496            // auto-extends to `block_ms + 500ms` for blocking commands),
497            // but is used for the companion HMGET — 5s matches main.
498            .request_timeout(Duration::from_millis(5000));
499        if config.tls {
500            tail_builder = tail_builder.tls();
501        }
502        if config.cluster {
503            tail_builder = tail_builder.cluster();
504        }
505        let tail_client = tail_builder
506            .build()
507            .await
508            .map_err(|e| crate::server::backend_context(e, "connect (tail)"))?;
509        let tail_pong: String = tail_client
510            .cmd("PING")
511            .execute()
512            .await
513            .map_err(|e| crate::server::backend_context(e, "PING (tail)"))?;
514        if tail_pong != "PONG" {
515            return Err(ServerError::OperationFailed(format!(
516                "tail client unexpected PING response: {tail_pong}"
517            )));
518        }
519
520        let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
521            config.max_concurrent_stream_ops as usize,
522        ));
523        let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
524        tracing::info!(
525            max_concurrent_stream_ops = config.max_concurrent_stream_ops,
526            "stream-op client ready (read + tail share the semaphore; \
527             tails additionally serialize via xread_block_lock)"
528        );
529
530        // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
531        // rotation, etc.) are only protected by the global Bearer
532        // middleware in api.rs — which is only installed when
533        // config.api_token is set. Without FF_API_TOKEN, a public
534        // deployment exposes secret rotation to anyone that can reach
535        // the listen_addr. Warn loudly so operators can't miss it; we
536        // don't fail-start because single-tenant dev uses this path.
537        if config.api_token.is_none() {
538            tracing::warn!(
539                listen_addr = %config.listen_addr,
540                "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
541                 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
542                 FF_API_TOKEN for any deployment reachable from untrusted \
543                 networks."
544            );
545            // Explicit callout for the credential-bearing read endpoints.
546            // Auditors grep for these on a per-endpoint basis; lumping
547            // into the admin warning alone hides the fact that
548            // /pending-waitpoints returns HMAC tokens and /result
549            // returns completion payload bytes.
550            tracing::warn!(
551                listen_addr = %config.listen_addr,
552                "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
553                 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
554                 and GET /v1/executions/{{id}}/result returns raw completion payload \
555                 bytes (may contain PII). Both are UNAUTHENTICATED in this \
556                 configuration."
557            );
558        }
559
560        // Partition counts — post-RFC-011 there are three physical families.
561        // Execution keys co-locate with their parent flow's partition (§2 of
562        // the RFC), so `num_flow_partitions` governs both exec and flow
563        // routing; no separate `num_execution_partitions` count exists.
564        tracing::info!(
565            flow_partitions = config.partition_config.num_flow_partitions,
566            budget_partitions = config.partition_config.num_budget_partitions,
567            quota_partitions = config.partition_config.num_quota_partitions,
568            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
569            listen_addr = %config.listen_addr,
570            "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
571            config.partition_config.num_flow_partitions,
572            config.partition_config.num_budget_partitions,
573            config.partition_config.num_quota_partitions,
574        );
575
576        let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
577            client.clone(),
578            config.partition_config,
579            metrics.clone(),
580        ));
581
582        Ok(Self {
583            client,
584            tail_client,
585            stream_semaphore,
586            xread_block_lock,
587            // Single-permit semaphore: only one rotate-waitpoint-secret can
588            // be mid-flight server-wide. Attackers or misbehaving scripts
589            // firing parallel rotations fail fast with 429 instead of
590            // queueing HTTP handlers.
591            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
592            engine,
593            config,
594            scheduler,
595            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
596            metrics,
597        })
598    }
599
600    /// PR-94: access the shared observability registry.
601    pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
602        &self.metrics
603    }
604
605    /// Get a reference to the ferriskey client.
606    pub fn client(&self) -> &Client {
607        &self.client
608    }
609
610    /// Execute an FCALL with automatic Lua library reload on "function not loaded".
611    ///
612    /// After a Valkey failover the new primary may not have the Lua library
613    /// loaded (replication lag or cold replica). This wrapper detects that
614    /// condition, reloads the library via `ff_script::loader::ensure_library`,
615    /// and retries the FCALL once.
616    async fn fcall_with_reload(
617        &self,
618        function: &str,
619        keys: &[&str],
620        args: &[&str],
621    ) -> Result<Value, ServerError> {
622        fcall_with_reload_on_client(&self.client, function, keys, args).await
623    }
624
625    /// Get the server config.
626    pub fn config(&self) -> &ServerConfig {
627        &self.config
628    }
629
630    /// Get the partition config.
631    pub fn partition_config(&self) -> &PartitionConfig {
632        &self.config.partition_config
633    }
634
635    // ── Minimal Phase 1 API ──
636
637    /// Create a new execution.
638    ///
639    /// Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.
640    pub async fn create_execution(
641        &self,
642        args: &CreateExecutionArgs,
643    ) -> Result<CreateExecutionResult, ServerError> {
644        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
645        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
646        let idx = IndexKeys::new(&partition);
647
648        let lane = &args.lane_id;
649        let tag = partition.hash_tag();
650        let idem_key = match &args.idempotency_key {
651            Some(k) if !k.is_empty() => {
652                keys::idempotency_key(&tag, args.namespace.as_str(), k)
653            }
654            _ => ctx.noop(),
655        };
656
657        let delay_str = args
658            .delay_until
659            .map(|d| d.0.to_string())
660            .unwrap_or_default();
661        let is_delayed = !delay_str.is_empty();
662
663        // KEYS (8) must match lua/execution.lua ff_create_execution positional order:
664        //   [1] exec_core, [2] payload, [3] policy, [4] tags,
665        //   [5] scheduling_zset (eligible OR delayed — ONE key),
666        //   [6] idem_key, [7] execution_deadline, [8] all_executions
667        let scheduling_zset = if is_delayed {
668            idx.lane_delayed(lane)
669        } else {
670            idx.lane_eligible(lane)
671        };
672
673        let fcall_keys: Vec<String> = vec![
674            ctx.core(),                  // 1
675            ctx.payload(),               // 2
676            ctx.policy(),                // 3
677            ctx.tags(),                  // 4
678            scheduling_zset,             // 5
679            idem_key,                    // 6
680            idx.execution_deadline(),    // 7
681            idx.all_executions(),        // 8
682        ];
683
684        let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
685
686        // ARGV (13) must match lua/execution.lua ff_create_execution positional order:
687        //   [1] execution_id, [2] namespace, [3] lane_id, [4] execution_kind,
688        //   [5] priority, [6] creator_identity, [7] policy_json,
689        //   [8] input_payload, [9] delay_until, [10] dedup_ttl_ms,
690        //   [11] tags_json, [12] execution_deadline_at, [13] partition_id
691        let fcall_args: Vec<String> = vec![
692            args.execution_id.to_string(),           // 1
693            args.namespace.to_string(),              // 2
694            args.lane_id.to_string(),                // 3
695            args.execution_kind.clone(),             // 4
696            args.priority.to_string(),               // 5
697            args.creator_identity.clone(),           // 6
698            args.policy.as_ref()
699                .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
700                .unwrap_or_else(|| "{}".to_owned()), // 7
701            String::from_utf8_lossy(&args.input_payload).into_owned(), // 8
702            delay_str,                               // 9
703            args.idempotency_key.as_ref()
704                .map(|_| "86400000".to_string())
705                .unwrap_or_default(),                // 10 dedup_ttl_ms
706            tags_json,                               // 11
707            args.execution_deadline_at
708                .map(|d| d.to_string())
709                .unwrap_or_default(),                // 12 execution_deadline_at
710            args.partition_id.to_string(),           // 13
711        ];
712
713        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
714        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
715
716        let raw: Value = self
717            .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
718            .await?;
719
720        parse_create_result(&raw, &args.execution_id)
721    }
722
723    /// Cancel an execution.
724    pub async fn cancel_execution(
725        &self,
726        args: &CancelExecutionArgs,
727    ) -> Result<CancelExecutionResult, ServerError> {
728        let raw = self
729            .fcall_cancel_execution_with_reload(args)
730            .await?;
731        parse_cancel_result(&raw, &args.execution_id)
732    }
733
734    /// Build KEYS/ARGV for `ff_cancel_execution` and invoke via the server's
735    /// reload-capable FCALL. Shared by the inline method and background
736    /// cancel_flow dispatch via [`Self::fcall_cancel_execution_with_reload`].
737    async fn fcall_cancel_execution_with_reload(
738        &self,
739        args: &CancelExecutionArgs,
740    ) -> Result<Value, ServerError> {
741        let (keys, argv) = build_cancel_execution_fcall(
742            &self.client,
743            &self.config.partition_config,
744            args,
745        )
746        .await?;
747        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
748        let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
749        self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
750    }
751
752    /// Get the public state of an execution.
753    ///
754    /// Reads `public_state` from the exec_core hash. Returns the parsed
755    /// PublicState enum. If the execution is not found, returns an error.
756    pub async fn get_execution_state(
757        &self,
758        execution_id: &ExecutionId,
759    ) -> Result<PublicState, ServerError> {
760        let partition = execution_partition(execution_id, &self.config.partition_config);
761        let ctx = ExecKeyContext::new(&partition, execution_id);
762
763        let state_str: Option<String> = self
764            .client
765            .hget(&ctx.core(), "public_state")
766            .await
767            .map_err(|e| crate::server::backend_context(e, "HGET public_state"))?;
768
769        match state_str {
770            Some(s) => {
771                let quoted = format!("\"{s}\"");
772                serde_json::from_str(&quoted).map_err(|e| {
773                    ServerError::Script(format!(
774                        "invalid public_state '{s}' for {execution_id}: {e}"
775                    ))
776                })
777            }
778            None => Err(ServerError::NotFound(format!(
779                "execution not found: {execution_id}"
780            ))),
781        }
782    }
783
784    /// Read the raw result payload written by `ff_complete_execution`.
785    ///
786    /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
787    /// No FCALL — this is a direct GET; returns `Ok(None)` when the
788    /// execution is missing, not yet complete, or (in a future
789    /// retention-policy world) when the result was trimmed.
790    ///
791    /// # Contract vs `get_execution_state`
792    ///
793    /// `get_execution_state` is the authoritative completion signal. If
794    /// a caller observes `state == completed` but `get_execution_result`
795    /// returns `None`, the result bytes are unavailable — not a caller
796    /// bug and not a server bug, just the retention policy trimming the
797    /// blob. V1 sets no retention, so callers on v1 can treat
798    /// `state == completed` + `Ok(None)` as a server bug.
799    ///
800    /// # Ordering
801    ///
802    /// Callers MUST wait for `state == completed` before calling this
803    /// method; polls issued before the state transition may hit a
804    /// narrow window where the completion Lua has written
805    /// `public_state = completed` but the `result` key SET is still
806    /// on-wire. The current Lua `ff_complete_execution` writes both in
807    /// the same atomic script, so the window is effectively zero for
808    /// direct callers — but retries via `ff_replay_execution` open it
809    /// briefly.
810    pub async fn get_execution_result(
811        &self,
812        execution_id: &ExecutionId,
813    ) -> Result<Option<Vec<u8>>, ServerError> {
814        let partition = execution_partition(execution_id, &self.config.partition_config);
815        let ctx = ExecKeyContext::new(&partition, execution_id);
816
817        // Binary-safe read. Decoding into `String` would reject any
818        // non-UTF-8 payload (bincode-encoded f32 vectors, compressed
819        // artifacts, arbitrary binary stages) with a ferriskey decode
820        // error that surfaces as HTTP 500. The endpoint response is
821        // already `application/octet-stream`; `Vec<u8>` has a
822        // ferriskey-specialized `FromValue` impl that preserves raw
823        // bytes without UTF-8 validation (see ferriskey/src/value.rs:
824        // `from_byte_vec`). Nil → None via the blanket
825        // `Option<T: FromValue>` wrapper.
826        let payload: Option<Vec<u8>> = self
827            .client
828            .cmd("GET")
829            .arg(ctx.result())
830            .execute()
831            .await
832            .map_err(|e| crate::server::backend_context(e, "GET exec result"))?;
833        Ok(payload)
834    }
835
836    /// List the active (`pending` or `active`) waitpoints for an execution.
837    ///
838    /// Returns one [`PendingWaitpointInfo`] per open waitpoint, including the
839    /// HMAC-SHA1 `waitpoint_token` needed to deliver authenticated signals.
840    /// `closed` waitpoints are elided — callers looking at history should
841    /// read the stream or lease history instead.
842    ///
843    /// Read plan: SSCAN `ctx.waitpoints()` with `COUNT 100` (bounded
844    /// page size, matching the unblock / flow-projector / budget-
845    /// reconciler convention) to enumerate waitpoint IDs, then TWO
846    /// pipelines:
847    ///
848    /// * Pass 1 — single round-trip containing, per waitpoint, one
849    ///   HMGET over the documented field set + one HGET for
850    ///   `total_matchers` on the condition hash.
851    /// * Pass 2 (conditional) — single round-trip containing an
852    ///   HMGET per waitpoint with `total_matchers > 0` to read the
853    ///   `matcher:N:name` fields.
854    ///
855    /// No FCALL — this is a read-only view built from already-
856    /// persisted state, so skipping Lua keeps the Valkey single-
857    /// writer path uncontended. HMGET (vs HGETALL) bounds the per-
858    /// waitpoint read to the documented field set and defends
859    /// against a poisoned waitpoint hash with unbounded extra fields
860    /// accumulating response memory.
861    ///
862    /// # Empty result semantics (TOCTOU)
863    ///
864    /// An empty `Vec` is returned in three cases:
865    ///
866    /// 1. The execution exists but has never suspended.
867    /// 2. All existing waitpoints are `closed` (already resolved).
868    /// 3. A narrow teardown race: `SSCAN` read the waitpoint set after
869    ///    a concurrent `ff_close_waitpoint` or execution-cleanup script
870    ///    deleted the waitpoint hashes but before it SREM'd the set
871    ///    members. Each HMGET returns all-None and we skip.
872    ///
873    /// Callers that get an unexpected empty list should cross-check
874    /// execution state (`get_execution_state`) to distinguish "pipeline
875    /// moved past suspended" from "nothing to review yet".
876    ///
877    /// A waitpoint hash that's present but missing its `waitpoint_token`
878    /// field is similarly elided and a server-side WARN is emitted —
879    /// this indicates storage corruption (a write that half-populated
880    /// the hash) and operators should investigate.
881    pub async fn list_pending_waitpoints(
882        &self,
883        execution_id: &ExecutionId,
884    ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
885        let partition = execution_partition(execution_id, &self.config.partition_config);
886        let ctx = ExecKeyContext::new(&partition, execution_id);
887
888        let core_exists: bool = self
889            .client
890            .cmd("EXISTS")
891            .arg(ctx.core())
892            .execute()
893            .await
894            .map_err(|e| crate::server::backend_context(e, "EXISTS exec_core (pending waitpoints)"))?;
895        if !core_exists {
896            return Err(ServerError::NotFound(format!(
897                "execution not found: {execution_id}"
898            )));
899        }
900
901        // SSCAN the waitpoints set in bounded pages. SMEMBERS would
902        // load the entire set in one reply — fine for today's
903        // single-waitpoint executions, but the codebase convention
904        // (budget_reconciler, flow_projector, unblock scanner) is SSCAN
905        // COUNT 100 so response size per round-trip is bounded as the
906        // set grows. Match the precedent instead of carving a new
907        // pattern here.
908        const WAITPOINTS_SSCAN_COUNT: usize = 100;
909        let waitpoints_key = ctx.waitpoints();
910        let mut wp_ids_raw: Vec<String> = Vec::new();
911        let mut cursor: String = "0".to_owned();
912        loop {
913            let reply: (String, Vec<String>) = self
914                .client
915                .cmd("SSCAN")
916                .arg(&waitpoints_key)
917                .arg(&cursor)
918                .arg("COUNT")
919                .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
920                .execute()
921                .await
922                .map_err(|e| crate::server::backend_context(e, "SSCAN waitpoints"))?;
923            cursor = reply.0;
924            wp_ids_raw.extend(reply.1);
925            if cursor == "0" {
926                break;
927            }
928        }
929
930        // SSCAN may observe a member more than once across iterations —
931        // that is documented Valkey behavior, not a bug (see
932        // https://valkey.io/commands/sscan/). Dedup in-place before
933        // building the typed id list so the HTTP response and the
934        // downstream pipelined HMGETs don't duplicate work. Sort +
935        // dedup keeps this O(n log n) without a BTreeSet allocation;
936        // typical n is 1 (the single waitpoint on a suspended exec).
937        wp_ids_raw.sort_unstable();
938        wp_ids_raw.dedup();
939
940        if wp_ids_raw.is_empty() {
941            return Ok(Vec::new());
942        }
943
944        // Parse + filter out malformed waitpoint_ids up-front so the
945        // downstream pipeline indexes stay aligned to the Vec of
946        // typed ids.
947        let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
948        for raw in &wp_ids_raw {
949            match WaitpointId::parse(raw) {
950                Ok(id) => wp_ids.push(id),
951                Err(e) => tracing::warn!(
952                    raw_id = %raw,
953                    error = %e,
954                    execution_id = %execution_id,
955                    "list_pending_waitpoints: skipping unparseable waitpoint_id"
956                ),
957            }
958        }
959        if wp_ids.is_empty() {
960            return Ok(Vec::new());
961        }
962
963        // Bounded HMGET field set — these are the six hash fields that
964        // surface in `PendingWaitpointInfo`. Fixed-size indexing into
965        // the response below tracks this order.
966        const WP_FIELDS: [&str; 6] = [
967            "state",
968            "waitpoint_key",
969            "waitpoint_token",
970            "created_at",
971            "activated_at",
972            "expires_at",
973        ];
974
975        // Pass 1 pipeline: waitpoint HMGET + condition HGET
976        // total_matchers, one of each per waitpoint, in a SINGLE
977        // round-trip. Sequential HMGETs were an N-round-trip latency
978        // floor on flows with fan-out waitpoints.
979        let mut pass1 = self.client.pipeline();
980        let mut wp_slots = Vec::with_capacity(wp_ids.len());
981        let mut cond_slots = Vec::with_capacity(wp_ids.len());
982        for wp_id in &wp_ids {
983            let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
984            cmd = cmd.arg(ctx.waitpoint(wp_id));
985            for f in WP_FIELDS {
986                cmd = cmd.arg(f);
987            }
988            wp_slots.push(cmd.finish());
989
990            cond_slots.push(
991                pass1
992                    .cmd::<Option<String>>("HGET")
993                    .arg(ctx.waitpoint_condition(wp_id))
994                    .arg("total_matchers")
995                    .finish(),
996            );
997        }
998        pass1
999            .execute()
1000            .await
1001            .map_err(|e| crate::server::backend_context(e, "pipeline HMGET waitpoints + HGET total_matchers"))?;
1002
1003        // Collect pass-1 results + queue pass-2 HMGETs for condition
1004        // matcher names on waitpoints that are actionable (state in
1005        // pending/active, non-empty token, total_matchers > 0). PipeSlot
1006        // values are owning, so iterate all three ordered zip'd iters
1007        // and consume them together.
1008        struct Kept {
1009            wp_id: WaitpointId,
1010            wp_fields: Vec<Option<String>>,
1011            total_matchers: usize,
1012        }
1013        let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
1014        for ((wp_id, wp_slot), cond_slot) in wp_ids
1015            .iter()
1016            .zip(wp_slots)
1017            .zip(cond_slots)
1018        {
1019            let wp_fields: Vec<Option<String>> =
1020                wp_slot.value().map_err(|e| crate::server::backend_context(e, format!("pipeline slot HMGET waitpoint {wp_id}")))?;
1021
1022            // Waitpoint hash may have been GC'd between the SSCAN and
1023            // this HMGET (rotation / cleanup race). Skip silently.
1024            if wp_fields.iter().all(Option::is_none) {
1025                // Still consume the cond_slot to free its buffer.
1026                let _ = cond_slot.value();
1027                continue;
1028            }
1029            let state_ref = wp_fields
1030                .first()
1031                .and_then(|v| v.as_deref())
1032                .unwrap_or("");
1033            if state_ref != "pending" && state_ref != "active" {
1034                let _ = cond_slot.value();
1035                continue;
1036            }
1037            let token_ref = wp_fields
1038                .get(2)
1039                .and_then(|v| v.as_deref())
1040                .unwrap_or("");
1041            if token_ref.is_empty() {
1042                let _ = cond_slot.value();
1043                tracing::warn!(
1044                    waitpoint_id = %wp_id,
1045                    execution_id = %execution_id,
1046                    waitpoint_hash_key = %ctx.waitpoint(wp_id),
1047                    state = %state_ref,
1048                    "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1049                     field is empty — likely storage corruption (half-populated write, \
1050                     operator edit, or interrupted script). Skipping this entry in the \
1051                     response. HGETALL the waitpoint_hash_key to inspect."
1052                );
1053                continue;
1054            }
1055
1056            let total_matchers = cond_slot
1057                .value()
1058                .map_err(|e| crate::server::backend_context(e, format!("pipeline slot HGET total_matchers {wp_id}")))?
1059                .and_then(|s| s.parse::<usize>().ok())
1060                .unwrap_or(0);
1061
1062            kept.push(Kept {
1063                wp_id: wp_id.clone(),
1064                wp_fields,
1065                total_matchers,
1066            });
1067        }
1068
1069        if kept.is_empty() {
1070            return Ok(Vec::new());
1071        }
1072
1073        // Pass 2 pipeline: matcher-name HMGETs for every kept waitpoint
1074        // with total_matchers > 0. Single round-trip regardless of
1075        // per-waitpoint matcher count. Waitpoints with total_matchers==0
1076        // (wildcard) skip the HMGET entirely.
1077        let mut pass2 = self.client.pipeline();
1078        let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1079        let mut pass2_needed = false;
1080        for k in &kept {
1081            if k.total_matchers == 0 {
1082                matcher_slots.push(None);
1083                continue;
1084            }
1085            pass2_needed = true;
1086            let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1087            cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1088            for i in 0..k.total_matchers {
1089                cmd = cmd.arg(format!("matcher:{i}:name"));
1090            }
1091            matcher_slots.push(Some(cmd.finish()));
1092        }
1093        if pass2_needed {
1094            pass2.execute().await.map_err(|e| crate::server::backend_context(e, "pipeline HMGET wp_condition matchers"))?;
1095        }
1096
1097        let parse_ts = |raw: &str| -> Option<TimestampMs> {
1098            if raw.is_empty() {
1099                None
1100            } else {
1101                raw.parse::<i64>().ok().map(TimestampMs)
1102            }
1103        };
1104
1105        let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1106        for (k, slot) in kept.into_iter().zip(matcher_slots) {
1107            let get = |i: usize| -> &str {
1108                k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1109            };
1110
1111            // Collect matcher names, eliding the empty-name wildcard
1112            // sentinel (see lua/helpers.lua initialize_condition).
1113            let required_signal_names: Vec<String> = match slot {
1114                None => Vec::new(),
1115                Some(s) => {
1116                    let vals: Vec<Option<String>> =
1117                        s.value().map_err(|e| crate::server::backend_context(e, format!(
1118                                "pipeline slot HMGET wp_condition matchers {}",
1119                                k.wp_id
1120                            )))?;
1121                    vals.into_iter()
1122                        .flatten()
1123                        .filter(|name| !name.is_empty())
1124                        .collect()
1125                }
1126            };
1127
1128            out.push(PendingWaitpointInfo {
1129                waitpoint_id: k.wp_id,
1130                waitpoint_key: get(1).to_owned(),
1131                state: get(0).to_owned(),
1132                waitpoint_token: WaitpointToken(get(2).to_owned()),
1133                required_signal_names,
1134                created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1135                activated_at: parse_ts(get(4)),
1136                expires_at: parse_ts(get(5)),
1137            });
1138        }
1139
1140        Ok(out)
1141    }
1142
1143    // ── Budget / Quota API ──
1144
1145    /// Create a new budget policy.
1146    pub async fn create_budget(
1147        &self,
1148        args: &CreateBudgetArgs,
1149    ) -> Result<CreateBudgetResult, ServerError> {
1150        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1151        validate_create_budget_dimensions(
1152            &args.dimensions,
1153            &args.hard_limits,
1154            &args.soft_limits,
1155        )?;
1156        let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1157        let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1158        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1159        let policies_index = keys::budget_policies_index(bctx.hash_tag());
1160
1161        // KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,
1162        //           budget_policies_index
1163        let fcall_keys: Vec<String> = vec![
1164            bctx.definition(),
1165            bctx.limits(),
1166            bctx.usage(),
1167            resets_key,
1168            policies_index,
1169        ];
1170
1171        // ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,
1172        //   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
1173        //   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
1174        let dim_count = args.dimensions.len();
1175        let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1176        fcall_args.push(args.budget_id.to_string());
1177        fcall_args.push(args.scope_type.clone());
1178        fcall_args.push(args.scope_id.clone());
1179        fcall_args.push(args.enforcement_mode.clone());
1180        fcall_args.push(args.on_hard_limit.clone());
1181        fcall_args.push(args.on_soft_limit.clone());
1182        fcall_args.push(args.reset_interval_ms.to_string());
1183        fcall_args.push(args.now.to_string());
1184        fcall_args.push(dim_count.to_string());
1185        for dim in &args.dimensions {
1186            fcall_args.push(dim.clone());
1187        }
1188        for hard in &args.hard_limits {
1189            fcall_args.push(hard.to_string());
1190        }
1191        for soft in &args.soft_limits {
1192            fcall_args.push(soft.to_string());
1193        }
1194
1195        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1196        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1197
1198        let raw: Value = self
1199            .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1200            .await?;
1201
1202        parse_budget_create_result(&raw, &args.budget_id)
1203    }
1204
1205    /// Create a new quota/rate-limit policy.
1206    pub async fn create_quota_policy(
1207        &self,
1208        args: &CreateQuotaPolicyArgs,
1209    ) -> Result<CreateQuotaPolicyResult, ServerError> {
1210        let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1211        let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1212
1213        // KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,
1214        //           admitted_set, quota_policies_index
1215        let fcall_keys: Vec<String> = vec![
1216            qctx.definition(),
1217            qctx.window("requests_per_window"),
1218            qctx.concurrency(),
1219            qctx.admitted_set(),
1220            keys::quota_policies_index(qctx.hash_tag()),
1221        ];
1222
1223        // ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,
1224        //           max_concurrent, now_ms
1225        let fcall_args: Vec<String> = vec![
1226            args.quota_policy_id.to_string(),
1227            args.window_seconds.to_string(),
1228            args.max_requests_per_window.to_string(),
1229            args.max_concurrent.to_string(),
1230            args.now.to_string(),
1231        ];
1232
1233        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1234        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1235
1236        let raw: Value = self
1237            .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1238            .await?;
1239
1240        parse_quota_create_result(&raw, &args.quota_policy_id)
1241    }
1242
1243    /// Read-only budget status for operator visibility.
1244    pub async fn get_budget_status(
1245        &self,
1246        budget_id: &BudgetId,
1247    ) -> Result<BudgetStatus, ServerError> {
1248        let partition = budget_partition(budget_id, &self.config.partition_config);
1249        let bctx = BudgetKeyContext::new(&partition, budget_id);
1250
1251        // Read budget definition
1252        let def: HashMap<String, String> = self
1253            .client
1254            .hgetall(&bctx.definition())
1255            .await
1256            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_def"))?;
1257
1258        if def.is_empty() {
1259            return Err(ServerError::NotFound(format!(
1260                "budget not found: {budget_id}"
1261            )));
1262        }
1263
1264        // Read usage
1265        let usage_raw: HashMap<String, String> = self
1266            .client
1267            .hgetall(&bctx.usage())
1268            .await
1269            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_usage"))?;
1270        let usage: HashMap<String, u64> = usage_raw
1271            .into_iter()
1272            .filter(|(k, _)| k != "_init")
1273            .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1274            .collect();
1275
1276        // Read limits
1277        let limits_raw: HashMap<String, String> = self
1278            .client
1279            .hgetall(&bctx.limits())
1280            .await
1281            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_limits"))?;
1282        let mut hard_limits = HashMap::new();
1283        let mut soft_limits = HashMap::new();
1284        for (k, v) in &limits_raw {
1285            if let Some(dim) = k.strip_prefix("hard:") {
1286                hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1287            } else if let Some(dim) = k.strip_prefix("soft:") {
1288                soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1289            }
1290        }
1291
1292        let non_empty = |s: Option<&String>| -> Option<String> {
1293            s.filter(|v| !v.is_empty()).cloned()
1294        };
1295
1296        Ok(BudgetStatus {
1297            budget_id: budget_id.to_string(),
1298            scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1299            scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1300            enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1301            usage,
1302            hard_limits,
1303            soft_limits,
1304            breach_count: def
1305                .get("breach_count")
1306                .and_then(|v| v.parse().ok())
1307                .unwrap_or(0),
1308            soft_breach_count: def
1309                .get("soft_breach_count")
1310                .and_then(|v| v.parse().ok())
1311                .unwrap_or(0),
1312            last_breach_at: non_empty(def.get("last_breach_at")),
1313            last_breach_dim: non_empty(def.get("last_breach_dim")),
1314            next_reset_at: non_empty(def.get("next_reset_at")),
1315            created_at: non_empty(def.get("created_at")),
1316        })
1317    }
1318
1319    /// Report usage against a budget and check limits.
1320    pub async fn report_usage(
1321        &self,
1322        budget_id: &BudgetId,
1323        args: &ReportUsageArgs,
1324    ) -> Result<ReportUsageResult, ServerError> {
1325        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1326        validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1327        let partition = budget_partition(budget_id, &self.config.partition_config);
1328        let bctx = BudgetKeyContext::new(&partition, budget_id);
1329
1330        // KEYS (3): budget_usage, budget_limits, budget_def
1331        let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1332
1333        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
1334        let dim_count = args.dimensions.len();
1335        let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1336        fcall_args.push(dim_count.to_string());
1337        for dim in &args.dimensions {
1338            fcall_args.push(dim.clone());
1339        }
1340        for delta in &args.deltas {
1341            fcall_args.push(delta.to_string());
1342        }
1343        fcall_args.push(args.now.to_string());
1344        let dedup_key_val = args
1345            .dedup_key
1346            .as_ref()
1347            .filter(|k| !k.is_empty())
1348            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1349            .unwrap_or_default();
1350        fcall_args.push(dedup_key_val);
1351
1352        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1353        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1354
1355        let raw: Value = self
1356            .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1357            .await?;
1358
1359        parse_report_usage_result(&raw)
1360    }
1361
1362    /// Reset a budget's usage counters and schedule the next reset.
1363    pub async fn reset_budget(
1364        &self,
1365        budget_id: &BudgetId,
1366    ) -> Result<ResetBudgetResult, ServerError> {
1367        let partition = budget_partition(budget_id, &self.config.partition_config);
1368        let bctx = BudgetKeyContext::new(&partition, budget_id);
1369        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1370
1371        // KEYS (3): budget_def, budget_usage, budget_resets_zset
1372        let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1373
1374        // ARGV (2): budget_id, now_ms
1375        let now = TimestampMs::now();
1376        let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1377
1378        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1379        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1380
1381        let raw: Value = self
1382            .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1383            .await?;
1384
1385        parse_reset_budget_result(&raw)
1386    }
1387
1388    // ── Flow API ──
1389
1390    /// Create a new flow container.
1391    pub async fn create_flow(
1392        &self,
1393        args: &CreateFlowArgs,
1394    ) -> Result<CreateFlowResult, ServerError> {
1395        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1396        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1397        let fidx = FlowIndexKeys::new(&partition);
1398
1399        // KEYS (3): flow_core, members_set, flow_index
1400        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1401
1402        // ARGV (4): flow_id, flow_kind, namespace, now_ms
1403        let fcall_args: Vec<String> = vec![
1404            args.flow_id.to_string(),
1405            args.flow_kind.clone(),
1406            args.namespace.to_string(),
1407            args.now.to_string(),
1408        ];
1409
1410        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1411        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1412
1413        let raw: Value = self
1414            .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1415            .await?;
1416
1417        parse_create_flow_result(&raw, &args.flow_id)
1418    }
1419
1420    /// Add an execution to a flow.
1421    ///
1422    /// # Atomic single-FCALL commit (RFC-011 §7.3)
1423    ///
1424    /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
1425    /// hash-tag routing (both hash to `{fp:N}` via the exec id's
1426    /// embedded partition). A single atomic FCALL writes:
1427    ///
1428    ///   - `members_set` SADD (flow membership)
1429    ///   - `exec_core.flow_id` HSET (back-pointer)
1430    ///   - `flow_index` SADD (self-heal)
1431    ///   - `flow_core` HINCRBY node_count / graph_revision +
1432    ///     HSET last_mutation_at
1433    ///
1434    /// All four writes commit atomically or none do (Valkey scripting
1435    /// contract: validates-before-writing in the Lua body means
1436    /// `flow_not_found` / `flow_already_terminal` early-returns fire
1437    /// BEFORE any `redis.call()` mutation, and a mid-body error after
1438    /// writes is not expected because all writes are on the same slot).
1439    ///
1440    /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
1441    ///
1442    /// # Consumer contract
1443    ///
1444    /// The caller's `args.execution_id` **must** be co-located with
1445    /// `args.flow_id`'s partition — i.e. minted via
1446    /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
1447    /// `solo`-minted id (or any exec id hashing to a different
1448    /// `{fp:N}` than the flow's) will fail at the Valkey level with
1449    /// `CROSSSLOT` on a clustered deploy.
1450    ///
1451    /// Callers with a flow context in scope always use `for_flow`;
1452    /// this is the only supported mint path for flow-member execs
1453    /// post-RFC-011. Test fixtures that pre-date the co-location
1454    /// contract use `TestCluster::new_execution_id_on_partition` to
1455    /// pin to a specific hash-tag index for `fcall_create_flow`-style
1456    /// helpers that hard-code their flow partition.
1457    pub async fn add_execution_to_flow(
1458        &self,
1459        args: &AddExecutionToFlowArgs,
1460    ) -> Result<AddExecutionToFlowResult, ServerError> {
1461        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1462        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1463        let fidx = FlowIndexKeys::new(&partition);
1464
1465        // exec_core co-locates with flow_core under RFC-011 §7.3 —
1466        // same `{fp:N}` hash-tag, same slot, part of the same atomic
1467        // FCALL below.
1468        let exec_partition =
1469            execution_partition(&args.execution_id, &self.config.partition_config);
1470        let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1471
1472        // Pre-flight: exec_partition must match flow_partition under
1473        // RFC-011 §7.3 co-location contract. If the caller hands us a
1474        // `solo`-minted exec whose hash-tag ≠ flow_partition, the FCALL
1475        // would fail with raw `CROSSSLOT` on a clustered deploy — a
1476        // typed `ServerError::PartitionMismatch` is a clearer signal
1477        // that the consumer-contract invariant was violated at mint
1478        // time (caller should have used `ExecutionId::for_flow(...)`).
1479        // See the Consumer contract section of this method's rustdoc.
1480        if exec_partition.index != partition.index {
1481            return Err(ServerError::PartitionMismatch(format!(
1482                "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1483                 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1484                 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1485                exec_p = exec_partition.index,
1486                flow_p = partition.index,
1487            )));
1488        }
1489
1490        // KEYS (4): flow_core, members_set, flow_index, exec_core
1491        let fcall_keys: Vec<String> = vec![
1492            fctx.core(),
1493            fctx.members(),
1494            fidx.flow_index(),
1495            ectx.core(),
1496        ];
1497
1498        // ARGV (3): flow_id, execution_id, now_ms
1499        let fcall_args: Vec<String> = vec![
1500            args.flow_id.to_string(),
1501            args.execution_id.to_string(),
1502            args.now.to_string(),
1503        ];
1504
1505        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1506        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1507
1508        let raw: Value = self
1509            .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1510            .await?;
1511
1512        parse_add_execution_to_flow_result(&raw)
1513    }
1514
1515    /// Cancel a flow.
1516    ///
1517    /// Flips `public_flow_state` to `cancelled` atomically via
1518    /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
1519    /// executions must be cancelled cross-partition; this dispatch runs in
1520    /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
1521    /// immediately. For all other policies (or flows with no members), or
1522    /// when the flow was already in a terminal state (idempotent retry),
1523    /// the call returns [`CancelFlowResult::Cancelled`].
1524    ///
1525    /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
1526    ///
1527    /// # Backpressure
1528    ///
1529    /// Each call that hits the async dispatch path spawns a new task into
1530    /// the shared background `JoinSet`. Rapid repeated calls against the
1531    /// same flow will spawn *multiple* overlapping dispatch tasks. This is
1532    /// not a correctness issue — each member cancel is idempotent and
1533    /// terminal flows short-circuit via [`ParsedCancelFlow::AlreadyTerminal`]
1534    /// — but heavy burst callers should either use `?wait=true` (serialises
1535    /// the dispatch on the HTTP thread, giving natural backpressure) or
1536    /// implement client-side deduplication on `flow_id`. The `JoinSet` is
1537    /// drained with a 15s timeout on [`Self::shutdown`], so very long
1538    /// dispatch tails may be aborted during graceful shutdown.
1539    ///
1540    /// # Orphan-member semantics on shutdown abort
1541    ///
1542    /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
1543    /// while a dispatch loop is mid-iteration, the already-issued
1544    /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
1545    /// `terminal_outcome = cancelled` and the caller-supplied reason. The
1546    /// members not yet visited are abandoned mid-loop. They remain in
1547    /// whichever state they were in (active/eligible/suspended) until the
1548    /// natural lifecycle scanners reach them: active leases expire
1549    /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
1550    /// members time out to `skipped`, eligible ones sit until retention
1551    /// trim. So no orphan state — but the terminal_outcome for the
1552    /// abandoned members will be `expired`/`skipped` rather than
1553    /// `cancelled`, and the operator-supplied `reason` is lost for them.
1554    /// Audit tooling that requires reason fidelity across shutdowns should
1555    /// use `?wait=true`.
1556    pub async fn cancel_flow(
1557        &self,
1558        args: &CancelFlowArgs,
1559    ) -> Result<CancelFlowResult, ServerError> {
1560        self.cancel_flow_inner(args, false).await
1561    }
1562
1563    /// Cancel a flow and wait for all member cancellations to complete
1564    /// inline. Slower than [`Self::cancel_flow`] for large flows, but
1565    /// guarantees every member is in a terminal state on return.
1566    pub async fn cancel_flow_wait(
1567        &self,
1568        args: &CancelFlowArgs,
1569    ) -> Result<CancelFlowResult, ServerError> {
1570        self.cancel_flow_inner(args, true).await
1571    }
1572
1573    async fn cancel_flow_inner(
1574        &self,
1575        args: &CancelFlowArgs,
1576        wait: bool,
1577    ) -> Result<CancelFlowResult, ServerError> {
1578        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1579        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1580        let fidx = FlowIndexKeys::new(&partition);
1581
1582        // Grace window before the reconciler may pick up this flow.
1583        // Kept short because the in-process dispatch is fast and bounded;
1584        // long enough to cover transport round-trips under load without
1585        // the reconciler fighting the live dispatch.
1586        const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1587
1588        // KEYS (5): flow_core, members_set, flow_index, pending_cancels, cancel_backlog
1589        let fcall_keys: Vec<String> = vec![
1590            fctx.core(),
1591            fctx.members(),
1592            fidx.flow_index(),
1593            fctx.pending_cancels(),
1594            fidx.cancel_backlog(),
1595        ];
1596
1597        // ARGV (5): flow_id, reason, cancellation_policy, now_ms, grace_ms
1598        let fcall_args: Vec<String> = vec![
1599            args.flow_id.to_string(),
1600            args.reason.clone(),
1601            args.cancellation_policy.clone(),
1602            args.now.to_string(),
1603            CANCEL_RECONCILER_GRACE_MS.to_string(),
1604        ];
1605
1606        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1607        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1608
1609        let raw: Value = self
1610            .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1611            .await?;
1612
1613        let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1614            ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1615                (policy, member_execution_ids)
1616            }
1617            // Idempotent retry: flow was already cancelled/completed/failed.
1618            // Return Cancelled with the *stored* policy and member list so
1619            // observability tooling gets the real historical state rather
1620            // than echoing the caller's retry intent. One HMGET + SMEMBERS
1621            // on the idempotent path — both on {fp:N}, same slot.
1622            ParsedCancelFlow::AlreadyTerminal => {
1623                let flow_meta: Vec<Option<String>> = self
1624                    .client
1625                    .cmd("HMGET")
1626                    .arg(fctx.core())
1627                    .arg("cancellation_policy")
1628                    .arg("cancel_reason")
1629                    .execute()
1630                    .await
1631                    .map_err(|e| crate::server::backend_context(e, "HMGET flow_core cancellation_policy,cancel_reason"))?;
1632                let stored_policy = flow_meta
1633                    .first()
1634                    .and_then(|v| v.as_ref())
1635                    .filter(|s| !s.is_empty())
1636                    .cloned();
1637                let stored_reason = flow_meta
1638                    .get(1)
1639                    .and_then(|v| v.as_ref())
1640                    .filter(|s| !s.is_empty())
1641                    .cloned();
1642                let all_members: Vec<String> = self
1643                    .client
1644                    .cmd("SMEMBERS")
1645                    .arg(fctx.members())
1646                    .execute()
1647                    .await
1648                    .map_err(|e| crate::server::backend_context(e, "SMEMBERS flow members (already terminal)"))?;
1649                // Cap the returned list to avoid pathological bandwidth on
1650                // idempotent retries for flows with 10k+ members. Clients
1651                // already received the authoritative member list on the
1652                // first (non-idempotent) call; subsequent retries just need
1653                // enough to confirm the operation and trigger per-member
1654                // polling if desired.
1655                let total_members = all_members.len();
1656                let stored_members: Vec<String> = all_members
1657                    .into_iter()
1658                    .take(ALREADY_TERMINAL_MEMBER_CAP)
1659                    .collect();
1660                tracing::debug!(
1661                    flow_id = %args.flow_id,
1662                    stored_policy = stored_policy.as_deref().unwrap_or(""),
1663                    stored_reason = stored_reason.as_deref().unwrap_or(""),
1664                    total_members,
1665                    returned_members = stored_members.len(),
1666                    "cancel_flow: flow already terminal, returning idempotent Cancelled"
1667                );
1668                return Ok(CancelFlowResult::Cancelled {
1669                    // Fall back to caller's policy only if the stored field
1670                    // is missing (flows cancelled by older Lua that did not
1671                    // persist cancellation_policy).
1672                    cancellation_policy: stored_policy
1673                        .unwrap_or_else(|| args.cancellation_policy.clone()),
1674                    member_execution_ids: stored_members,
1675                });
1676            }
1677        };
1678        let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1679
1680        if !needs_dispatch {
1681            return Ok(CancelFlowResult::Cancelled {
1682                cancellation_policy: policy,
1683                member_execution_ids: members,
1684            });
1685        }
1686
1687        let pending_cancels_key = fctx.pending_cancels();
1688        let cancel_backlog_key = fidx.cancel_backlog();
1689
1690        if wait {
1691            // Synchronous dispatch — cancel every member inline before returning.
1692            // Collect per-member failures so the caller sees a
1693            // PartiallyCancelled outcome instead of a false-positive
1694            // Cancelled when any member cancel faulted (ghost member,
1695            // transport exhaustion, Lua reject). The cancel-backlog
1696            // reconciler still retries the unacked members; surfacing
1697            // the partial state lets operator tooling alert without
1698            // polling per-member state.
1699            let mut failed: Vec<String> = Vec::new();
1700            for eid_str in &members {
1701                match cancel_member_execution(
1702                    &self.client,
1703                    &self.config.partition_config,
1704                    eid_str,
1705                    &args.reason,
1706                    args.now,
1707                )
1708                .await
1709                {
1710                    Ok(()) => {
1711                        ack_cancel_member(
1712                            &self.client,
1713                            &pending_cancels_key,
1714                            &cancel_backlog_key,
1715                            eid_str,
1716                            &args.flow_id.to_string(),
1717                        )
1718                        .await;
1719                    }
1720                    Err(e) => {
1721                        // If the member was already terminal (execution_not_active /
1722                        // execution_not_found), treat this as ack-worthy success —
1723                        // the member is effectively "cancelled" from the flow's
1724                        // perspective and shouldn't be surfaced as a partial
1725                        // failure. Mirrors cancel_reconciler::cancel_member which
1726                        // acks on the same codes to avoid backlog poisoning.
1727                        if is_terminal_ack_error(&e) {
1728                            ack_cancel_member(
1729                                &self.client,
1730                                &pending_cancels_key,
1731                                &cancel_backlog_key,
1732                                eid_str,
1733                                &args.flow_id.to_string(),
1734                            )
1735                            .await;
1736                            continue;
1737                        }
1738                        tracing::warn!(
1739                            execution_id = %eid_str,
1740                            error = %e,
1741                            "cancel_flow(wait): individual execution cancel failed \
1742                             (transport/contract fault; reconciler will retry if transient)"
1743                        );
1744                        failed.push(eid_str.clone());
1745                    }
1746                }
1747            }
1748            if failed.is_empty() {
1749                return Ok(CancelFlowResult::Cancelled {
1750                    cancellation_policy: policy,
1751                    member_execution_ids: members,
1752                });
1753            }
1754            return Ok(CancelFlowResult::PartiallyCancelled {
1755                cancellation_policy: policy,
1756                member_execution_ids: members,
1757                failed_member_execution_ids: failed,
1758            });
1759        }
1760
1761        // Asynchronous dispatch — spawn into the shared JoinSet so Server::shutdown
1762        // can wait for pending cancellations (bounded by a shutdown timeout).
1763        let client = self.client.clone();
1764        let partition_config = self.config.partition_config;
1765        let reason = args.reason.clone();
1766        let now = args.now;
1767        let dispatch_members = members.clone();
1768        let flow_id = args.flow_id.clone();
1769        // Every async cancel_flow contends on this lock, but the critical
1770        // section is tiny: try_join_next drain + spawn. Drain is amortized
1771        // O(1) — each completed task is reaped exactly once across all
1772        // callers, and spawn is synchronous. At realistic cancel rates the
1773        // lock hold time is microseconds and does not bottleneck handlers.
1774        let mut guard = self.background_tasks.lock().await;
1775
1776        // Reap completed background dispatches before spawning the next.
1777        // Without this sweep, JoinSet accumulates Ok(()) results for every
1778        // async cancel ever issued — a memory leak in long-running servers
1779        // that would otherwise only drain on Server::shutdown. Surface any
1780        // panicked/aborted dispatches via tracing so silent failures in
1781        // cancel_member_execution are visible in logs.
1782        while let Some(joined) = guard.try_join_next() {
1783            if let Err(e) = joined {
1784                tracing::warn!(
1785                    error = %e,
1786                    "cancel_flow: background dispatch task panicked or was aborted"
1787                );
1788            }
1789        }
1790
1791        let pending_key_owned = pending_cancels_key.clone();
1792        let backlog_key_owned = cancel_backlog_key.clone();
1793        let flow_id_str = args.flow_id.to_string();
1794
1795        guard.spawn(async move {
1796            // Bounded parallel dispatch via futures::stream::buffer_unordered.
1797            // Sequential cancel of a 1000-member flow at ~2ms/FCALL is ~2s —
1798            // too long to finish inside a 15s shutdown abort window for
1799            // large flows. Bounding at CONCURRENCY keeps Valkey load
1800            // predictable while still cutting wall-clock dispatch time by
1801            // ~CONCURRENCY× for large member sets.
1802            use futures::stream::StreamExt;
1803            const CONCURRENCY: usize = 16;
1804
1805            let member_count = dispatch_members.len();
1806            let flow_id_for_log = flow_id.clone();
1807            futures::stream::iter(dispatch_members)
1808                .map(|eid_str| {
1809                    let client = client.clone();
1810                    let reason = reason.clone();
1811                    let flow_id = flow_id.clone();
1812                    let pending = pending_key_owned.clone();
1813                    let backlog = backlog_key_owned.clone();
1814                    let flow_id_str = flow_id_str.clone();
1815                    async move {
1816                        match cancel_member_execution(
1817                            &client,
1818                            &partition_config,
1819                            &eid_str,
1820                            &reason,
1821                            now,
1822                        )
1823                        .await
1824                        {
1825                            Ok(()) => {
1826                                ack_cancel_member(
1827                                    &client,
1828                                    &pending,
1829                                    &backlog,
1830                                    &eid_str,
1831                                    &flow_id_str,
1832                                )
1833                                .await;
1834                            }
1835                            Err(e) => {
1836                                if is_terminal_ack_error(&e) {
1837                                    ack_cancel_member(
1838                                        &client,
1839                                        &pending,
1840                                        &backlog,
1841                                        &eid_str,
1842                                        &flow_id_str,
1843                                    )
1844                                    .await;
1845                                } else {
1846                                    tracing::warn!(
1847                                        flow_id = %flow_id,
1848                                        execution_id = %eid_str,
1849                                        error = %e,
1850                                        "cancel_flow(async): individual execution cancel failed \
1851                                         (transport/contract fault; reconciler will retry if transient)"
1852                                    );
1853                                }
1854                            }
1855                        }
1856                    }
1857                })
1858                .buffer_unordered(CONCURRENCY)
1859                .for_each(|()| async {})
1860                .await;
1861
1862            tracing::debug!(
1863                flow_id = %flow_id_for_log,
1864                member_count,
1865                concurrency = CONCURRENCY,
1866                "cancel_flow: background member dispatch complete"
1867            );
1868        });
1869        drop(guard);
1870
1871        let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1872        Ok(CancelFlowResult::CancellationScheduled {
1873            cancellation_policy: policy,
1874            member_count,
1875            member_execution_ids: members,
1876        })
1877    }
1878
1879    /// Stage a dependency edge between two executions in a flow.
1880    ///
1881    /// Runs on the flow partition {fp:N}.
1882    /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1883    pub async fn stage_dependency_edge(
1884        &self,
1885        args: &StageDependencyEdgeArgs,
1886    ) -> Result<StageDependencyEdgeResult, ServerError> {
1887        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1888        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1889
1890        // KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set, grant_hash
1891        let fcall_keys: Vec<String> = vec![
1892            fctx.core(),
1893            fctx.members(),
1894            fctx.edge(&args.edge_id),
1895            fctx.outgoing(&args.upstream_execution_id),
1896            fctx.incoming(&args.downstream_execution_id),
1897            fctx.grant(&args.edge_id.to_string()),
1898        ];
1899
1900        // ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
1901        //           dependency_kind, data_passing_ref, expected_graph_revision, now_ms
1902        let fcall_args: Vec<String> = vec![
1903            args.flow_id.to_string(),
1904            args.edge_id.to_string(),
1905            args.upstream_execution_id.to_string(),
1906            args.downstream_execution_id.to_string(),
1907            args.dependency_kind.clone(),
1908            args.data_passing_ref.clone().unwrap_or_default(),
1909            args.expected_graph_revision.to_string(),
1910            args.now.to_string(),
1911        ];
1912
1913        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1914        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1915
1916        let raw: Value = self
1917            .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1918            .await?;
1919
1920        parse_stage_dependency_edge_result(&raw)
1921    }
1922
1923    /// Apply a staged dependency edge to the child execution.
1924    ///
1925    /// Runs on the child execution partition {p:N}.
1926    /// KEYS (7), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.
1927    pub async fn apply_dependency_to_child(
1928        &self,
1929        args: &ApplyDependencyToChildArgs,
1930    ) -> Result<ApplyDependencyToChildResult, ServerError> {
1931        let partition = execution_partition(
1932            &args.downstream_execution_id,
1933            &self.config.partition_config,
1934        );
1935        let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1936        let idx = IndexKeys::new(&partition);
1937
1938        // Pre-read lane_id for index keys
1939        let lane_str: Option<String> = self
1940            .client
1941            .hget(&ctx.core(), "lane_id")
1942            .await
1943            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
1944        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1945
1946        // KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,
1947        //           eligible_zset, blocked_deps_zset, deps_all_edges
1948        let fcall_keys: Vec<String> = vec![
1949            ctx.core(),
1950            ctx.deps_meta(),
1951            ctx.deps_unresolved(),
1952            ctx.dep_edge(&args.edge_id),
1953            idx.lane_eligible(&lane),
1954            idx.lane_blocked_dependencies(&lane),
1955            ctx.deps_all_edges(),
1956        ];
1957
1958        // ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
1959        //           dependency_kind, data_passing_ref, now_ms
1960        let fcall_args: Vec<String> = vec![
1961            args.flow_id.to_string(),
1962            args.edge_id.to_string(),
1963            args.upstream_execution_id.to_string(),
1964            args.graph_revision.to_string(),
1965            args.dependency_kind.clone(),
1966            args.data_passing_ref.clone().unwrap_or_default(),
1967            args.now.to_string(),
1968        ];
1969
1970        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1971        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1972
1973        let raw: Value = self
1974            .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1975            .await?;
1976
1977        parse_apply_dependency_result(&raw)
1978    }
1979
1980    // ── Execution operations API ──
1981
1982    /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1983    ///
1984    /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1985    /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1986    pub async fn deliver_signal(
1987        &self,
1988        args: &DeliverSignalArgs,
1989    ) -> Result<DeliverSignalResult, ServerError> {
1990        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1991        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1992        let idx = IndexKeys::new(&partition);
1993
1994        // Pre-read lane_id for index keys
1995        let lane_str: Option<String> = self
1996            .client
1997            .hget(&ctx.core(), "lane_id")
1998            .await
1999            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2000        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2001
2002        let wp_id = &args.waitpoint_id;
2003        let sig_id = &args.signal_id;
2004        let idem_key = args
2005            .idempotency_key
2006            .as_ref()
2007            .filter(|k| !k.is_empty())
2008            .map(|k| ctx.signal_dedup(wp_id, k))
2009            .unwrap_or_else(|| ctx.noop());
2010
2011        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
2012        //            exec_signals_zset, signal_hash, signal_payload,
2013        //            idem_key, waitpoint_hash, suspension_current,
2014        //            eligible_zset, suspended_zset, delayed_zset,
2015        //            suspension_timeout_zset, hmac_secrets
2016        let fcall_keys: Vec<String> = vec![
2017            ctx.core(),                       // 1
2018            ctx.waitpoint_condition(wp_id),    // 2
2019            ctx.waitpoint_signals(wp_id),      // 3
2020            ctx.exec_signals(),                // 4
2021            ctx.signal(sig_id),                // 5
2022            ctx.signal_payload(sig_id),        // 6
2023            idem_key,                          // 7
2024            ctx.waitpoint(wp_id),              // 8
2025            ctx.suspension_current(),          // 9
2026            idx.lane_eligible(&lane),          // 10
2027            idx.lane_suspended(&lane),         // 11
2028            idx.lane_delayed(&lane),           // 12
2029            idx.suspension_timeout(),          // 13
2030            idx.waitpoint_hmac_secrets(),      // 14
2031        ];
2032
2033        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
2034        //            signal_category, source_type, source_identity,
2035        //            payload, payload_encoding, idempotency_key,
2036        //            correlation_id, target_scope, created_at,
2037        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
2038        //            max_signals_per_execution, waitpoint_token
2039        let fcall_args: Vec<String> = vec![
2040            args.signal_id.to_string(),                          // 1
2041            args.execution_id.to_string(),                       // 2
2042            args.waitpoint_id.to_string(),                       // 3
2043            args.signal_name.clone(),                            // 4
2044            args.signal_category.clone(),                        // 5
2045            args.source_type.clone(),                            // 6
2046            args.source_identity.clone(),                        // 7
2047            args.payload.as_ref()
2048                .map(|p| String::from_utf8_lossy(p).into_owned())
2049                .unwrap_or_default(),                            // 8
2050            args.payload_encoding
2051                .clone()
2052                .unwrap_or_else(|| "json".to_owned()),           // 9
2053            args.idempotency_key
2054                .clone()
2055                .unwrap_or_default(),                            // 10
2056            args.correlation_id
2057                .clone()
2058                .unwrap_or_default(),                            // 11
2059            args.target_scope.clone(),                           // 12
2060            args.created_at
2061                .map(|ts| ts.to_string())
2062                .unwrap_or_else(|| args.now.to_string()),        // 13
2063            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), // 14
2064            args.resume_delay_ms.unwrap_or(0).to_string(),       // 15
2065            args.signal_maxlen.unwrap_or(1000).to_string(),      // 16
2066            args.max_signals_per_execution
2067                .unwrap_or(10_000)
2068                .to_string(),                                    // 17
2069            // WIRE BOUNDARY — raw token to Lua. Display is redacted
2070            // for log safety; use .as_str() at the wire crossing.
2071            args.waitpoint_token.as_str().to_owned(),            // 18
2072        ];
2073
2074        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2075        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2076
2077        let raw: Value = self
2078            .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2079            .await?;
2080
2081        parse_deliver_signal_result(&raw, &args.signal_id)
2082    }
2083
2084    /// Change an execution's priority.
2085    ///
2086    /// KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.
2087    pub async fn change_priority(
2088        &self,
2089        execution_id: &ExecutionId,
2090        new_priority: i32,
2091    ) -> Result<ChangePriorityResult, ServerError> {
2092        let partition = execution_partition(execution_id, &self.config.partition_config);
2093        let ctx = ExecKeyContext::new(&partition, execution_id);
2094        let idx = IndexKeys::new(&partition);
2095
2096        // Read lane_id for eligible_zset key
2097        let lane_str: Option<String> = self
2098            .client
2099            .hget(&ctx.core(), "lane_id")
2100            .await
2101            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2102        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2103
2104        // KEYS (2): exec_core, eligible_zset
2105        let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2106
2107        // ARGV (2): execution_id, new_priority
2108        let fcall_args: Vec<String> = vec![
2109            execution_id.to_string(),
2110            new_priority.to_string(),
2111        ];
2112
2113        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2114        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2115
2116        let raw: Value = self
2117            .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2118            .await?;
2119
2120        parse_change_priority_result(&raw, execution_id)
2121    }
2122
2123    /// Scheduler-routed claim entry point (Batch C item 2 PR-B).
2124    ///
2125    /// Delegates to [`ff_scheduler::Scheduler::claim_for_worker`] which
2126    /// runs budget + quota + capability admission before issuing the
2127    /// grant. Returns `Ok(None)` when no eligible execution exists on
2128    /// the lane at this scan cycle. The worker's subsequent
2129    /// `claim_from_grant(lane, grant)` mints the lease.
2130    ///
2131    /// Keeping the claim-grant mint inside the server (rather than the
2132    /// worker) means capability CSV validation, budget/quota breach
2133    /// checks, and lane routing run in one place for every tenant
2134    /// worker — the same invariants as the `direct-valkey-claim` path
2135    /// enforces inline, but gated at a single server choke point.
2136    pub async fn claim_for_worker(
2137        &self,
2138        lane: &LaneId,
2139        worker_id: &WorkerId,
2140        worker_instance_id: &WorkerInstanceId,
2141        worker_capabilities: &std::collections::BTreeSet<String>,
2142        grant_ttl_ms: u64,
2143    ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2144        self.scheduler
2145            .claim_for_worker(
2146                lane,
2147                worker_id,
2148                worker_instance_id,
2149                worker_capabilities,
2150                grant_ttl_ms,
2151            )
2152            .await
2153            .map_err(|e| match e {
2154                ff_scheduler::SchedulerError::Valkey(inner) => ServerError::from(inner),
2155                ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2156                    crate::server::backend_context(source, context)
2157                }
2158                ff_scheduler::SchedulerError::Config(msg) => ServerError::InvalidInput(msg),
2159            })
2160    }
2161
2162    /// Revoke an active lease (operator-initiated).
2163    pub async fn revoke_lease(
2164        &self,
2165        execution_id: &ExecutionId,
2166    ) -> Result<RevokeLeaseResult, ServerError> {
2167        let partition = execution_partition(execution_id, &self.config.partition_config);
2168        let ctx = ExecKeyContext::new(&partition, execution_id);
2169        let idx = IndexKeys::new(&partition);
2170
2171        // Pre-read worker_instance_id for worker_leases key
2172        let wiid_str: Option<String> = self
2173            .client
2174            .hget(&ctx.core(), "current_worker_instance_id")
2175            .await
2176            .map_err(|e| crate::server::backend_context(e, "HGET worker_instance_id"))?;
2177        let wiid = match wiid_str {
2178            Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2179            _ => {
2180                return Err(ServerError::NotFound(format!(
2181                    "no active lease for execution {execution_id} (no current_worker_instance_id)"
2182                )));
2183            }
2184        };
2185
2186        // KEYS (5): exec_core, lease_current, lease_history, lease_expiry_zset, worker_leases
2187        let fcall_keys: Vec<String> = vec![
2188            ctx.core(),
2189            ctx.lease_current(),
2190            ctx.lease_history(),
2191            idx.lease_expiry(),
2192            idx.worker_leases(&wiid),
2193        ];
2194
2195        // ARGV (3): execution_id, expected_lease_id (empty = skip check), revoke_reason
2196        let fcall_args: Vec<String> = vec![
2197            execution_id.to_string(),
2198            String::new(), // no expected_lease_id check
2199            "operator_revoke".to_owned(),
2200        ];
2201
2202        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2203        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2204
2205        let raw: Value = self
2206            .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2207            .await?;
2208
2209        parse_revoke_lease_result(&raw)
2210    }
2211
2212    /// Get full execution info via HGETALL on exec_core.
2213    pub async fn get_execution(
2214        &self,
2215        execution_id: &ExecutionId,
2216    ) -> Result<ExecutionInfo, ServerError> {
2217        let partition = execution_partition(execution_id, &self.config.partition_config);
2218        let ctx = ExecKeyContext::new(&partition, execution_id);
2219
2220        let fields: HashMap<String, String> = self
2221            .client
2222            .hgetall(&ctx.core())
2223            .await
2224            .map_err(|e| crate::server::backend_context(e, "HGETALL exec_core"))?;
2225
2226        if fields.is_empty() {
2227            return Err(ServerError::NotFound(format!(
2228                "execution not found: {execution_id}"
2229            )));
2230        }
2231
2232        let parse_enum = |field: &str| -> String {
2233            fields.get(field).cloned().unwrap_or_default()
2234        };
2235        fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2236            let quoted = format!("\"{raw}\"");
2237            serde_json::from_str(&quoted).map_err(|e| {
2238                ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2239            })
2240        }
2241
2242        let lp_str = parse_enum("lifecycle_phase");
2243        let os_str = parse_enum("ownership_state");
2244        let es_str = parse_enum("eligibility_state");
2245        let br_str = parse_enum("blocking_reason");
2246        let to_str = parse_enum("terminal_outcome");
2247        let as_str = parse_enum("attempt_state");
2248        let ps_str = parse_enum("public_state");
2249
2250        let state_vector = StateVector {
2251            lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2252            ownership_state: deserialize("ownership_state", &os_str)?,
2253            eligibility_state: deserialize("eligibility_state", &es_str)?,
2254            blocking_reason: deserialize("blocking_reason", &br_str)?,
2255            terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2256            attempt_state: deserialize("attempt_state", &as_str)?,
2257            public_state: deserialize("public_state", &ps_str)?,
2258        };
2259
2260        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2261        // stamped atomically with membership in `add_execution_to_flow`'s
2262        // single FCALL. Empty iff the exec has no flow affinity (never
2263        // called `add_execution_to_flow` — solo execs) — NOT "orphaned
2264        // mid-two-phase" (that failure mode is retired). Filter on empty
2265        // to surface "no flow affinity" distinctly from "flow X".
2266        let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2267
2268        // started_at / completed_at come from the exec_core hash —
2269        // lua/execution.lua writes them alongside the rest of the state
2270        // vector. `created_at` is required; the other two are optional
2271        // (pre-claim / pre-terminal reads) and elided when empty so the
2272        // wire payload for in-flight executions stays the shape existing
2273        // callers expect.
2274        let started_at_opt = fields
2275            .get("started_at")
2276            .filter(|s| !s.is_empty())
2277            .cloned();
2278        let completed_at_opt = fields
2279            .get("completed_at")
2280            .filter(|s| !s.is_empty())
2281            .cloned();
2282
2283        Ok(ExecutionInfo {
2284            execution_id: execution_id.clone(),
2285            namespace: parse_enum("namespace"),
2286            lane_id: parse_enum("lane_id"),
2287            priority: fields
2288                .get("priority")
2289                .and_then(|v| v.parse().ok())
2290                .unwrap_or(0),
2291            execution_kind: parse_enum("execution_kind"),
2292            state_vector,
2293            public_state: deserialize("public_state", &ps_str)?,
2294            created_at: parse_enum("created_at"),
2295            started_at: started_at_opt,
2296            completed_at: completed_at_opt,
2297            current_attempt_index: fields
2298                .get("current_attempt_index")
2299                .and_then(|v| v.parse().ok())
2300                .unwrap_or(0),
2301            flow_id: flow_id_val,
2302            blocking_detail: parse_enum("blocking_detail"),
2303        })
2304    }
2305
2306    /// List executions from a partition's index ZSET.
2307    ///
2308    /// No FCALL — direct ZRANGE + pipelined HMGET reads.
2309    pub async fn list_executions(
2310        &self,
2311        partition_id: u16,
2312        lane: &LaneId,
2313        state_filter: &str,
2314        offset: u64,
2315        limit: u64,
2316    ) -> Result<ListExecutionsResult, ServerError> {
2317        let partition = ff_core::partition::Partition {
2318            family: ff_core::partition::PartitionFamily::Execution,
2319            index: partition_id,
2320        };
2321        let idx = IndexKeys::new(&partition);
2322
2323        let zset_key = match state_filter {
2324            "eligible" => idx.lane_eligible(lane),
2325            "delayed" => idx.lane_delayed(lane),
2326            "terminal" => idx.lane_terminal(lane),
2327            "suspended" => idx.lane_suspended(lane),
2328            "active" => idx.lane_active(lane),
2329            other => {
2330                return Err(ServerError::InvalidInput(format!(
2331                    "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2332                )));
2333            }
2334        };
2335
2336        // ZRANGE key -inf +inf BYSCORE LIMIT offset count
2337        let eids: Vec<String> = self
2338            .client
2339            .cmd("ZRANGE")
2340            .arg(&zset_key)
2341            .arg("-inf")
2342            .arg("+inf")
2343            .arg("BYSCORE")
2344            .arg("LIMIT")
2345            .arg(offset)
2346            .arg(limit)
2347            .execute()
2348            .await
2349            .map_err(|e| crate::server::backend_context(e, format!("ZRANGE {zset_key}")))?;
2350
2351        if eids.is_empty() {
2352            return Ok(ListExecutionsResult {
2353                executions: vec![],
2354                total_returned: 0,
2355            });
2356        }
2357
2358        // Parse execution IDs, warning on corrupt ZSET members
2359        let mut parsed = Vec::with_capacity(eids.len());
2360        for eid_str in &eids {
2361            match ExecutionId::parse(eid_str) {
2362                Ok(id) => parsed.push(id),
2363                Err(e) => {
2364                    tracing::warn!(
2365                        raw_id = %eid_str,
2366                        error = %e,
2367                        zset = %zset_key,
2368                        "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2369                    );
2370                }
2371            }
2372        }
2373
2374        if parsed.is_empty() {
2375            return Ok(ListExecutionsResult {
2376                executions: vec![],
2377                total_returned: 0,
2378            });
2379        }
2380
2381        // Pipeline all HMGETs into a single round-trip
2382        let mut pipe = self.client.pipeline();
2383        let mut slots = Vec::with_capacity(parsed.len());
2384        for eid in &parsed {
2385            let ep = execution_partition(eid, &self.config.partition_config);
2386            let ctx = ExecKeyContext::new(&ep, eid);
2387            let slot = pipe
2388                .cmd::<Vec<Option<String>>>("HMGET")
2389                .arg(ctx.core())
2390                .arg("namespace")
2391                .arg("lane_id")
2392                .arg("execution_kind")
2393                .arg("public_state")
2394                .arg("priority")
2395                .arg("created_at")
2396                .finish();
2397            slots.push(slot);
2398        }
2399
2400        pipe.execute()
2401            .await
2402            .map_err(|e| crate::server::backend_context(e, "pipeline HMGET"))?;
2403
2404        let mut summaries = Vec::with_capacity(parsed.len());
2405        for (eid, slot) in parsed.into_iter().zip(slots) {
2406            let fields: Vec<Option<String>> = slot.value()
2407                .map_err(|e| crate::server::backend_context(e, "pipeline slot"))?;
2408
2409            let field = |i: usize| -> String {
2410                fields
2411                    .get(i)
2412                    .and_then(|v| v.as_ref())
2413                    .cloned()
2414                    .unwrap_or_default()
2415            };
2416
2417            summaries.push(ExecutionSummary {
2418                execution_id: eid,
2419                namespace: field(0),
2420                lane_id: field(1),
2421                execution_kind: field(2),
2422                public_state: field(3),
2423                priority: field(4).parse().unwrap_or(0),
2424                created_at: field(5),
2425            });
2426        }
2427
2428        let total = summaries.len();
2429        Ok(ListExecutionsResult {
2430            executions: summaries,
2431            total_returned: total,
2432        })
2433    }
2434
2435    /// Replay a terminal execution.
2436    ///
2437    /// Pre-reads exec_core for flow_id and dep edges (variable KEYS).
2438    /// KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.
2439    pub async fn replay_execution(
2440        &self,
2441        execution_id: &ExecutionId,
2442    ) -> Result<ReplayExecutionResult, ServerError> {
2443        let partition = execution_partition(execution_id, &self.config.partition_config);
2444        let ctx = ExecKeyContext::new(&partition, execution_id);
2445        let idx = IndexKeys::new(&partition);
2446
2447        // Pre-read lane_id, flow_id, terminal_outcome.
2448        //
2449        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2450        // stamped atomically with membership by `add_execution_to_flow`'s
2451        // single FCALL. Empty iff the exec has no flow affinity
2452        // (solo-path create_execution — never added to a flow). The
2453        // `is_skipped_flow_member` branch below gates on
2454        // `!flow_id_str.is_empty()`, so solo execs correctly fall back
2455        // to the non-flow-member replay path. See
2456        // `add_execution_to_flow` rustdoc for the atomic-commit
2457        // invariant.
2458        let dyn_fields: Vec<Option<String>> = self
2459            .client
2460            .cmd("HMGET")
2461            .arg(ctx.core())
2462            .arg("lane_id")
2463            .arg("flow_id")
2464            .arg("terminal_outcome")
2465            .execute()
2466            .await
2467            .map_err(|e| crate::server::backend_context(e, "HMGET replay pre-read"))?;
2468        let lane = LaneId::new(
2469            dyn_fields
2470                .first()
2471                .and_then(|v| v.as_ref())
2472                .cloned()
2473                .unwrap_or_else(|| "default".to_owned()),
2474        );
2475        let flow_id_str = dyn_fields
2476            .get(1)
2477            .and_then(|v| v.as_ref())
2478            .cloned()
2479            .unwrap_or_default();
2480        let terminal_outcome = dyn_fields
2481            .get(2)
2482            .and_then(|v| v.as_ref())
2483            .cloned()
2484            .unwrap_or_default();
2485
2486        let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2487
2488        // Base KEYS (4): exec_core, terminal_zset, eligible_zset, lease_history
2489        let mut fcall_keys: Vec<String> = vec![
2490            ctx.core(),
2491            idx.lane_terminal(&lane),
2492            idx.lane_eligible(&lane),
2493            ctx.lease_history(),
2494        ];
2495
2496        // Base ARGV (2): execution_id, now_ms
2497        let now = TimestampMs::now();
2498        let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2499
2500        if is_skipped_flow_member {
2501            // Read ALL inbound edge IDs from the flow partition's adjacency set.
2502            // Cannot use deps:unresolved because impossible edges were SREM'd
2503            // by ff_resolve_dependency. The flow's in:<eid> set has all edges.
2504            let flow_id = FlowId::parse(&flow_id_str)
2505                .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2506            let flow_part =
2507                flow_partition(&flow_id, &self.config.partition_config);
2508            let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2509            let edge_ids: Vec<String> = self
2510                .client
2511                .cmd("SMEMBERS")
2512                .arg(flow_ctx.incoming(execution_id))
2513                .execute()
2514                .await
2515                .map_err(|e| crate::server::backend_context(e, "SMEMBERS replay edges"))?;
2516
2517            // Extended KEYS: blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N
2518            fcall_keys.push(idx.lane_blocked_dependencies(&lane)); // 5
2519            fcall_keys.push(ctx.deps_meta()); // 6
2520            fcall_keys.push(ctx.deps_unresolved()); // 7
2521            for eid_str in &edge_ids {
2522                let edge_id = EdgeId::parse(eid_str)
2523                    .unwrap_or_else(|_| EdgeId::new());
2524                fcall_keys.push(ctx.dep_edge(&edge_id)); // 8..8+N
2525                fcall_args.push(eid_str.clone()); // 3..3+N
2526            }
2527        }
2528
2529        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2530        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2531
2532        let raw: Value = self
2533            .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2534            .await?;
2535
2536        parse_replay_result(&raw)
2537    }
2538
2539    /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
2540    /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
2541    /// when the producer finalizes.
2542    ///
2543    /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
2544    /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
2545    /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
2546    /// and the Lua-side reject).
2547    ///
2548    /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
2549    /// execution id, so all KEYS share the same slot.
2550    pub async fn read_attempt_stream(
2551        &self,
2552        execution_id: &ExecutionId,
2553        attempt_index: AttemptIndex,
2554        from_id: &str,
2555        to_id: &str,
2556        count_limit: u64,
2557    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2558        use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2559
2560        if count_limit == 0 {
2561            return Err(ServerError::InvalidInput(
2562                "count_limit must be >= 1".to_owned(),
2563            ));
2564        }
2565
2566        // Share the same semaphore as tail. A large XRANGE reply (10_000
2567        // frames × ~64KB) is just as capable of head-of-line-blocking the
2568        // tail_client mux as a long BLOCK — fairness accounting must be
2569        // unified. Non-blocking acquire → 429 on contention.
2570        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2571            Ok(p) => p,
2572            Err(tokio::sync::TryAcquireError::NoPermits) => {
2573                return Err(ServerError::ConcurrencyLimitExceeded(
2574                    "stream_ops",
2575                    self.config.max_concurrent_stream_ops,
2576                ));
2577            }
2578            Err(tokio::sync::TryAcquireError::Closed) => {
2579                return Err(ServerError::OperationFailed(
2580                    "stream semaphore closed (server shutting down)".into(),
2581                ));
2582            }
2583        };
2584
2585        let args = ReadFramesArgs {
2586            execution_id: execution_id.clone(),
2587            attempt_index,
2588            from_id: from_id.to_owned(),
2589            to_id: to_id.to_owned(),
2590            count_limit,
2591        };
2592
2593        let partition = execution_partition(execution_id, &self.config.partition_config);
2594        let ctx = ExecKeyContext::new(&partition, execution_id);
2595        let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2596
2597        // Route on the dedicated stream client, same as tail. A 10_000-
2598        // frame XRANGE reply on the main mux would stall every other
2599        // FCALL behind reply serialization.
2600        let result = ff_script::functions::stream::ff_read_attempt_stream(
2601            &self.tail_client, &keys, &args,
2602        )
2603        .await
2604        .map_err(script_error_to_server);
2605
2606        drop(permit);
2607
2608        match result? {
2609            ReadFramesResult::Frames(f) => Ok(f),
2610        }
2611    }
2612
2613    /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
2614    /// plus the terminal signal so a polling consumer can exit when the
2615    /// producer closes the stream.
2616    ///
2617    /// `last_id` is exclusive — XREAD returns entries with id > last_id.
2618    /// Pass `"0-0"` to read from the beginning.
2619    ///
2620    /// `block_ms == 0` → non-blocking peek (returns immediately).
2621    /// `block_ms > 0`  → blocks up to that many ms. Empty `frames` +
2622    /// `closed_at=None` → timeout, no new data, still open.
2623    ///
2624    /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
2625    ///
2626    /// Implemented as a direct XREAD command (not FCALL) because blocking
2627    /// commands are rejected inside Valkey Functions. The terminal
2628    /// markers come from a companion HMGET on `stream_meta` — see
2629    /// `ff_script::stream_tail` module docs.
2630    pub async fn tail_attempt_stream(
2631        &self,
2632        execution_id: &ExecutionId,
2633        attempt_index: AttemptIndex,
2634        last_id: &str,
2635        block_ms: u64,
2636        count_limit: u64,
2637    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2638        if count_limit == 0 {
2639            return Err(ServerError::InvalidInput(
2640                "count_limit must be >= 1".to_owned(),
2641            ));
2642        }
2643
2644        // Non-blocking permit acquisition on the shared stream_semaphore
2645        // (read + tail split the same pool). If the server is already at
2646        // the `max_concurrent_stream_ops` ceiling, return `TailUnavailable`
2647        // (→ 429) rather than queueing — a queued tail holds the caller's
2648        // HTTP request open with no upper bound, which is exactly the
2649        // resource-exhaustion pattern this limit exists to prevent.
2650        // Clients retry with backoff on 429.
2651        //
2652        // Worst-case permit hold: if the producer closes the stream via
2653        // HSET on stream_meta (not an XADD), the XREAD BLOCK won't wake
2654        // until `block_ms` elapses — so a permit can be held for up to
2655        // the caller's block_ms even though the terminal signal is
2656        // ready. This is a v1-accepted limitation; RFC-006 §Terminal-
2657        // signal timing under active tail documents it and sketches the
2658        // v2 sentinel-XADD upgrade path.
2659        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2660            Ok(p) => p,
2661            Err(tokio::sync::TryAcquireError::NoPermits) => {
2662                return Err(ServerError::ConcurrencyLimitExceeded(
2663                    "stream_ops",
2664                    self.config.max_concurrent_stream_ops,
2665                ));
2666            }
2667            Err(tokio::sync::TryAcquireError::Closed) => {
2668                return Err(ServerError::OperationFailed(
2669                    "stream semaphore closed (server shutting down)".into(),
2670                ));
2671            }
2672        };
2673
2674        let partition = execution_partition(execution_id, &self.config.partition_config);
2675        let ctx = ExecKeyContext::new(&partition, execution_id);
2676        let stream_key = ctx.stream(attempt_index);
2677        let stream_meta_key = ctx.stream_meta(attempt_index);
2678
2679        // Acquire the XREAD BLOCK serializer AFTER the stream semaphore.
2680        // Nesting order matters: the semaphore is the user-visible
2681        // ceiling (surfaces as 429), the Mutex is an internal fairness
2682        // gate. Holding the permit while waiting on the Mutex means the
2683        // ceiling still bounds queue depth. See the field docstring for
2684        // the full rationale (ferriskey pipeline FIFO + client-side
2685        // per-call timeout race).
2686        let _xread_guard = self.xread_block_lock.lock().await;
2687
2688        let result = ff_script::stream_tail::xread_block(
2689            &self.tail_client,
2690            &stream_key,
2691            &stream_meta_key,
2692            last_id,
2693            block_ms,
2694            count_limit,
2695        )
2696        .await
2697        .map_err(script_error_to_server);
2698
2699        drop(_xread_guard);
2700        drop(permit);
2701        result
2702    }
2703
2704    /// Graceful shutdown — stops scanners, drains background handler tasks
2705    /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
2706    /// for scanners to finish.
2707    ///
2708    /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
2709    /// cleanly without new arrivals piling up:
2710    ///
2711    /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
2712    ///    with `ServerError::OperationFailed("stream semaphore closed …")`
2713    ///    which the REST layer surfaces as a 500 with `retryable=false`
2714    ///    (ops tooling may choose to wait + retry on 503-class responses;
2715    ///    the body clearly names the shutdown reason).
2716    /// 2. Drain handler-spawned background tasks with a 15s ceiling.
2717    /// 3. `engine.shutdown()` stops scanners.
2718    ///
2719    /// Existing in-flight tails finish on their natural `block_ms`
2720    /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
2721    /// is dropped after this function returns. We do NOT wait for tails
2722    /// to drain explicitly — the semaphore-close + natural-timeout
2723    /// combination bounds shutdown to roughly `block_ms + 15s` in the
2724    /// worst case. Callers observing a dropped connection retry against
2725    /// whatever replacement is coming up.
2726    pub async fn shutdown(self) {
2727        tracing::info!("shutting down FlowFabric server");
2728
2729        // Step 1: Close the stream semaphore FIRST so any in-flight
2730        // read/tail calls that are between `try_acquire` and their
2731        // Valkey command still hold a valid permit, but no NEW stream
2732        // op can start. `Semaphore::close()` is idempotent.
2733        self.stream_semaphore.close();
2734        tracing::info!(
2735            "stream semaphore closed; no new read/tail attempts will be accepted"
2736        );
2737
2738        // Step 2: Drain handler-spawned background tasks with the same
2739        // ceiling as Engine::shutdown. If dispatch is still running at
2740        // the deadline, drop the JoinSet to abort remaining tasks.
2741        let drain_timeout = Duration::from_secs(15);
2742        let background = self.background_tasks.clone();
2743        let drain = async move {
2744            let mut guard = background.lock().await;
2745            while guard.join_next().await.is_some() {}
2746        };
2747        match tokio::time::timeout(drain_timeout, drain).await {
2748            Ok(()) => {}
2749            Err(_) => {
2750                tracing::warn!(
2751                    timeout_s = drain_timeout.as_secs(),
2752                    "shutdown: background tasks did not finish in time, aborting"
2753                );
2754                self.background_tasks.lock().await.abort_all();
2755            }
2756        }
2757
2758        self.engine.shutdown().await;
2759        tracing::info!("FlowFabric server shutdown complete");
2760    }
2761}
2762
2763// ── Valkey version check (RFC-011 §13) ──
2764
2765/// Minimum Valkey version the engine requires (see RFC-011 §13). 7.2 is the
2766/// release where Valkey Functions and RESP3 stabilized — the primitives the
2767/// co-location design and typed FCALL wrappers actually depend on.
2768const REQUIRED_VALKEY_MAJOR: u32 = 7;
2769const REQUIRED_VALKEY_MINOR: u32 = 2;
2770
2771/// Upper bound on the rolling-upgrade retry window (RFC-011 §9.17). A Valkey
2772/// node cycling through SIGTERM → restart typically completes in well under
2773/// 60s; this budget is generous without letting a truly-stuck cluster hang
2774/// boot indefinitely.
2775const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2776
2777/// Verify the connected Valkey reports a version ≥ 7.2.
2778///
2779/// Per RFC-011 §9.17, during a rolling upgrade the node we happen to connect
2780/// to may temporarily be pre-upgrade while others are post-upgrade. The check
2781/// tolerates this by retrying the whole verification (including low-version
2782/// responses) with exponential backoff, capped at a 60s budget.
2783///
2784/// **Retries on:**
2785/// - Low-version responses (below `(REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR)`)
2786///   — may resolve as the rolling upgrade progresses onto the connected node.
2787/// - Retryable ferriskey transport errors — connection refused,
2788///   `BusyLoadingError`, `ClusterDown`, etc., classified via
2789///   `ff_script::retry::is_retryable_kind`.
2790/// - Missing/unparsable version field — treated as transient (fresh-boot
2791///   server may not have the INFO fields populated yet). Reads
2792///   `valkey_version` when present (authoritative on Valkey 8.0+), falls
2793///   back to `redis_version` for Valkey 7.x.
2794///
2795/// **Does NOT retry on:**
2796/// - Non-retryable transport errors (auth failures, permission denied,
2797///   invalid client config) — these are operator misconfiguration, not
2798///   transient cluster state; fast-fail preserves a clear signal.
2799///
2800/// On budget exhaustion, returns the last observed error.
2801async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2802    let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2803    let mut backoff = Duration::from_millis(200);
2804    loop {
2805        let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2806            match query_valkey_version(client).await {
2807                Ok((detected_major, detected_minor))
2808                    if (detected_major, detected_minor)
2809                        >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2810                {
2811                    tracing::info!(
2812                        detected_major,
2813                        detected_minor,
2814                        required_major = REQUIRED_VALKEY_MAJOR,
2815                        required_minor = REQUIRED_VALKEY_MINOR,
2816                        "Valkey version accepted"
2817                    );
2818                    return Ok(());
2819                }
2820                Ok((detected_major, detected_minor)) => (
2821                    // Low version — may be a rolling-upgrade stale node.
2822                    // Retry within budget; after exhaustion, the cluster
2823                    // is misconfigured and fast-fail is the correct signal.
2824                    true,
2825                    ServerError::ValkeyVersionTooLow {
2826                        detected: format!("{detected_major}.{detected_minor}"),
2827                        required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2828                    },
2829                    format!(
2830                        "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2831                    ),
2832                ),
2833                Err(e) => {
2834                    // Only retry if the underlying Valkey error is retryable
2835                    // by kind. Auth / permission / invalid-config should fast-
2836                    // fail so operators see the true root cause immediately,
2837                    // not a 60s hang followed by a generic "transient" error.
2838                    let retryable = e
2839                        .backend_kind()
2840                        .map(|k| k.is_retryable())
2841                        // Non-backend errors (parse, missing field, operation
2842                        // failures) are treated as transient — a fresh-boot
2843                        // Valkey may not have redis_version populated yet.
2844                        .unwrap_or(true);
2845                    let detail = e.to_string();
2846                    (retryable, e, detail)
2847                }
2848            };
2849
2850        if !should_retry {
2851            return Err(err_for_budget_exhaust);
2852        }
2853        if tokio::time::Instant::now() >= deadline {
2854            return Err(err_for_budget_exhaust);
2855        }
2856        tracing::warn!(
2857            backoff_ms = backoff.as_millis() as u64,
2858            detail = %log_detail,
2859            "valkey version check transient failure; retrying"
2860        );
2861        tokio::time::sleep(backoff).await;
2862        backoff = (backoff * 2).min(Duration::from_secs(5));
2863    }
2864}
2865
2866/// Run `INFO server` and extract the `(major, minor)` components of the
2867/// Valkey version.
2868///
2869/// Returns `Err` on transport errors, missing field, or unparsable version.
2870/// Handles three response shapes:
2871///
2872/// - **Standalone:** single string body; parse directly.
2873/// - **Cluster (RESP3 map):** `INFO` returns a map keyed by node address;
2874///   every node runs the same version in a healthy deployment, so we pick
2875///   one entry and parse it. Divergent versions during a rolling upgrade
2876///   are handled by the outer retry loop.
2877/// - **Empty / unexpected:** surfaces as `OperationFailed` with context.
2878async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2879    let raw: Value = client
2880        .cmd("INFO")
2881        .arg("server")
2882        .execute()
2883        .await
2884        .map_err(|e| crate::server::backend_context(e, "INFO server"))?;
2885    let bodies = extract_info_bodies(&raw)?;
2886    // Cluster: return the minimum (major, minor) across all nodes so a stale
2887    // pre-upgrade replica cannot hide behind an already-upgraded primary.
2888    // Standalone: exactly one body. The outer retry loop tolerates rolling
2889    // upgrades — a briefly-low minimum gets retried; a persistently-low one
2890    // exits with the structured floor error.
2891    let mut min_version: Option<(u32, u32)> = None;
2892    for body in &bodies {
2893        let version = parse_valkey_version(body)?;
2894        min_version = Some(match min_version {
2895            None => version,
2896            Some(existing) => existing.min(version),
2897        });
2898    }
2899    min_version.ok_or_else(|| {
2900        ServerError::OperationFailed(
2901            "valkey version check: cluster INFO returned no node bodies".into(),
2902        )
2903    })
2904}
2905
2906/// Normalize an `INFO server` response to one string body per node.
2907///
2908/// Standalone returns a single body. Cluster (RESP3 map keyed by node address)
2909/// returns every node's body — the caller must consider all of them to reject
2910/// a mixed-version cluster where one stale node is below the floor.
2911fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2912    match raw {
2913        Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2914        Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2915        Value::SimpleString(s) => Ok(vec![s.clone()]),
2916        Value::Map(entries) => {
2917            if entries.is_empty() {
2918                return Err(ServerError::OperationFailed(
2919                    "valkey version check: cluster INFO returned empty map".into(),
2920                ));
2921            }
2922            let mut out = Vec::with_capacity(entries.len());
2923            for (_, body) in entries {
2924                out.extend(extract_info_bodies(body)?);
2925            }
2926            Ok(out)
2927        }
2928        other => Err(ServerError::OperationFailed(format!(
2929            "valkey version check: unexpected INFO shape: {other:?}"
2930        ))),
2931    }
2932}
2933
2934/// Extract the `(major, minor)` components of the Valkey version from an
2935/// `INFO server` response body. Pure parser — pulled out of
2936/// [`query_valkey_version`] so it is unit-testable without a live Valkey.
2937///
2938/// **Prefers `valkey_version:`** (introduced in Valkey 8.0+; this is the real
2939/// server version on 8.x/9.x, which pin `redis_version:7.2.4` for
2940/// Redis-client compatibility).
2941///
2942/// **Falls back to `redis_version:` only when the body carries an affirmative
2943/// `server_name:valkey` field.** `server_name:` was introduced in Valkey 7.2,
2944/// which is our floor — so every floor-compliant Valkey deployment carries
2945/// the marker. Redis does not emit `server_name:valkey`, which is how we
2946/// reject a Redis backend that would otherwise look identical to Valkey 7.x
2947/// at the `redis_version:` level.
2948fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2949    let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2950        let trimmed = line.trim();
2951        let mut parts = trimmed.split('.');
2952        let major_str = parts.next().unwrap_or("").trim();
2953        if major_str.is_empty() {
2954            return Err(ServerError::OperationFailed(format!(
2955                "valkey version check: empty version field in '{trimmed}'"
2956            )));
2957        }
2958        let major = major_str.parse::<u32>().map_err(|_| {
2959            ServerError::OperationFailed(format!(
2960                "valkey version check: non-numeric major in '{trimmed}'"
2961            ))
2962        })?;
2963        // Minor is required — a bare major ("7") cannot be compared against
2964        // the (major, minor) floor reliably. Valkey always reports
2965        // major.minor.patch for INFO, so missing minor is a real parse error.
2966        let minor_str = parts.next().unwrap_or("").trim();
2967        if minor_str.is_empty() {
2968            return Err(ServerError::OperationFailed(format!(
2969                "valkey version check: missing minor component in '{trimmed}'"
2970            )));
2971        }
2972        let minor = minor_str.parse::<u32>().map_err(|_| {
2973            ServerError::OperationFailed(format!(
2974                "valkey version check: non-numeric minor in '{trimmed}'"
2975            ))
2976        })?;
2977        Ok((major, minor))
2978    };
2979    // Prefer valkey_version (authoritative on Valkey 8.0+).
2980    if let Some(valkey_line) = info
2981        .lines()
2982        .find_map(|line| line.strip_prefix("valkey_version:"))
2983    {
2984        return extract_major_minor(valkey_line);
2985    }
2986    // No valkey_version — could be Valkey 7.2 (which doesn't emit the field)
2987    // or could be Redis. Require an affirmative server_name:valkey marker
2988    // before falling back to redis_version. This rejects Redis backends,
2989    // which don't emit server_name:valkey.
2990    let server_is_valkey = info
2991        .lines()
2992        .map(str::trim)
2993        .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
2994    if !server_is_valkey {
2995        return Err(ServerError::OperationFailed(
2996            "valkey version check: INFO missing valkey_version and server_name:valkey marker \
2997             (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
2998                .into(),
2999        ));
3000    }
3001    // Valkey 7.x fallback. 8.x+ pins redis_version:7.2.4 for Redis-client
3002    // compat, so reading it there would under-report — but 8.x+ is handled
3003    // by the valkey_version branch above, so we only reach here on 7.x.
3004    if let Some(redis_line) = info
3005        .lines()
3006        .find_map(|line| line.strip_prefix("redis_version:"))
3007    {
3008        return extract_major_minor(redis_line);
3009    }
3010    Err(ServerError::OperationFailed(
3011        "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
3012            .into(),
3013    ))
3014}
3015
3016// ── Partition config validation ──
3017
3018/// Validate or create the `ff:config:partitions` key on first boot.
3019///
3020/// If the key exists, its values must match the server's config.
3021/// If it doesn't exist, create it (first boot).
3022async fn validate_or_create_partition_config(
3023    client: &Client,
3024    config: &PartitionConfig,
3025) -> Result<(), ServerError> {
3026    let key = keys::global_config_partitions();
3027
3028    let existing: HashMap<String, String> = client
3029        .hgetall(&key)
3030        .await
3031        .map_err(|e| crate::server::backend_context(e, format!("HGETALL {key}")))?;
3032
3033    if existing.is_empty() {
3034        // First boot — create the config
3035        tracing::info!("first boot: creating {key}");
3036        client
3037            .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
3038            .await
3039            .map_err(|e| crate::server::backend_context(e, "HSET num_flow_partitions"))?;
3040        client
3041            .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
3042            .await
3043            .map_err(|e| crate::server::backend_context(e, "HSET num_budget_partitions"))?;
3044        client
3045            .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
3046            .await
3047            .map_err(|e| crate::server::backend_context(e, "HSET num_quota_partitions"))?;
3048        return Ok(());
3049    }
3050
3051    // Validate existing config matches
3052    let check = |field: &str, expected: u16| -> Result<(), ServerError> {
3053        let stored: u16 = existing
3054            .get(field)
3055            .and_then(|v| v.parse().ok())
3056            .unwrap_or(0);
3057        if stored != expected {
3058            return Err(ServerError::PartitionMismatch(format!(
3059                "{field}: stored={stored}, config={expected}. \
3060                 Partition counts are fixed at deployment time. \
3061                 Either fix your config or migrate the data."
3062            )));
3063        }
3064        Ok(())
3065    };
3066
3067    check("num_flow_partitions", config.num_flow_partitions)?;
3068    check("num_budget_partitions", config.num_budget_partitions)?;
3069    check("num_quota_partitions", config.num_quota_partitions)?;
3070
3071    tracing::info!("partition config validated against stored {key}");
3072    Ok(())
3073}
3074
3075// ── Waitpoint HMAC secret bootstrap (RFC-004 §Waitpoint Security) ──
3076
3077/// Stable initial kid written on first boot. Rotation promotes to k2, k3, ...
3078/// The kid is stored alongside the secret in every partition's hash so each
3079/// FCALL can self-identify which secret produced a given token.
3080const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3081
3082/// Per-partition outcome of the HMAC bootstrap step. Collected across the
3083/// parallel scan so we can emit aggregated logs once at the end.
3084enum PartitionBootOutcome {
3085    /// Partition already had a matching (kid, secret) pair.
3086    Match,
3087    /// Stored secret diverges from env — likely operator rotation; kept.
3088    Mismatch,
3089    /// Torn write (current_kid present, secret:<kid> missing); repaired.
3090    Repaired,
3091    /// Fresh partition; atomically installed env secret under kid=k1.
3092    Installed,
3093}
3094
3095/// Bounded in-flight concurrency for the startup fan-out. Large enough to
3096/// turn a 256-partition install from ~15s sequential into ~1s on cross-AZ
3097/// Valkey, small enough to leave a cold cluster breathing room for other
3098/// Server::start work (library load, engine scanner spawn).
3099const BOOT_INIT_CONCURRENCY: usize = 16;
3100
3101async fn init_one_partition(
3102    client: &Client,
3103    partition: Partition,
3104    secret_hex: &str,
3105) -> Result<PartitionBootOutcome, ServerError> {
3106    let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3107
3108    // Probe for an existing install. Fast path (fresh partition): HGET
3109    // returns nil and we fall through to the atomic install below. Slow
3110    // path: secret:<stored_kid> is then HGET'd once we know the kid name.
3111    // (A previous version used a 2-field HMGET that included a fake
3112    // `secret:probe` placeholder — vestigial from an abandoned
3113    // optimization attempt, and confusing to read. Collapsed back to a
3114    // single-field HGET.)
3115    let stored_kid: Option<String> = client
3116        .cmd("HGET")
3117        .arg(&key)
3118        .arg("current_kid")
3119        .execute()
3120        .await
3121        .map_err(|e| crate::server::backend_context(e, format!("HGET {key} current_kid (init probe)")))?;
3122
3123    if let Some(stored_kid) = stored_kid {
3124        // We didn't know the stored kid up front, so now HGET the real
3125        // secret:<stored_kid> field. Two round-trips in the slow path; the
3126        // fast path (fresh partition) stays at one.
3127        let field = format!("secret:{stored_kid}");
3128        let stored_secret: Option<String> = client
3129            .hget(&key, &field)
3130            .await
3131            .map_err(|e| crate::server::backend_context(e, format!("HGET {key} secret:<kid> (init check)")))?;
3132        if stored_secret.is_none() {
3133            // Torn write from a previous boot: current_kid present but
3134            // secret:<kid> missing. Without repair, mint returns
3135            // "hmac_secret_not_initialized" on that partition forever.
3136            // Repair in place with env secret. Not rotation — rotation
3137            // always writes the secret first.
3138            client
3139                .hset(&key, &field, secret_hex)
3140                .await
3141                .map_err(|e| crate::server::backend_context(e, format!("HSET {key} secret:<kid> (repair torn write)")))?;
3142            return Ok(PartitionBootOutcome::Repaired);
3143        }
3144        if stored_secret.as_deref() != Some(secret_hex) {
3145            return Ok(PartitionBootOutcome::Mismatch);
3146        }
3147        return Ok(PartitionBootOutcome::Match);
3148    }
3149
3150    // Fresh partition — install current_kid + secret:<kid> atomically in
3151    // one HSET. Multi-field HSET is single-command atomic, so a crash
3152    // can't leave current_kid without its secret.
3153    let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3154    let _: i64 = client
3155        .cmd("HSET")
3156        .arg(&key)
3157        .arg("current_kid")
3158        .arg(WAITPOINT_HMAC_INITIAL_KID)
3159        .arg(&secret_field)
3160        .arg(secret_hex)
3161        .execute()
3162        .await
3163        .map_err(|e| crate::server::backend_context(e, format!("HSET {key} (init waitpoint HMAC atomic)")))?;
3164    Ok(PartitionBootOutcome::Installed)
3165}
3166
3167/// Install the waitpoint HMAC secret on every execution partition.
3168///
3169/// Parallelized fan-out with bounded in-flight concurrency
3170/// (`BOOT_INIT_CONCURRENCY`) so 256-partition boots finish in ~1s instead
3171/// of ~15s sequential — the prior sequential loop was tight on K8s
3172/// `initialDelaySeconds=30` defaults, especially cross-AZ. Fail-fast:
3173/// the first per-partition error aborts boot.
3174///
3175/// Outcomes aggregate into mismatch/repaired counts (logged once at end)
3176/// so operators see a single loud warning per fault class instead of 256
3177/// per-partition lines.
3178async fn initialize_waitpoint_hmac_secret(
3179    client: &Client,
3180    partition_config: &PartitionConfig,
3181    secret_hex: &str,
3182) -> Result<(), ServerError> {
3183    use futures::stream::{FuturesUnordered, StreamExt};
3184
3185    let n = partition_config.num_flow_partitions;
3186    tracing::info!(
3187        partitions = n,
3188        concurrency = BOOT_INIT_CONCURRENCY,
3189        "installing waitpoint HMAC secret across {n} execution partitions"
3190    );
3191
3192    let mut mismatch_count: u16 = 0;
3193    let mut repaired_count: u16 = 0;
3194    let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3195    let mut next_index: u16 = 0;
3196
3197    loop {
3198        while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3199            let partition = Partition {
3200                family: PartitionFamily::Execution,
3201                index: next_index,
3202            };
3203            let client = client.clone();
3204            let secret_hex = secret_hex.to_owned();
3205            pending.push(async move {
3206                init_one_partition(&client, partition, &secret_hex).await
3207            });
3208            next_index += 1;
3209        }
3210        match pending.next().await {
3211            Some(res) => match res? {
3212                PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3213                PartitionBootOutcome::Mismatch => mismatch_count += 1,
3214                PartitionBootOutcome::Repaired => repaired_count += 1,
3215            },
3216            None => break,
3217        }
3218    }
3219
3220    if repaired_count > 0 {
3221        tracing::warn!(
3222            repaired_partitions = repaired_count,
3223            total_partitions = n,
3224            "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3225             (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3226        );
3227    }
3228
3229    if mismatch_count > 0 {
3230        tracing::warn!(
3231            mismatched_partitions = mismatch_count,
3232            total_partitions = n,
3233            "stored/env secret mismatch on {mismatch_count} partitions — \
3234             env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3235             run POST /v1/admin/rotate-waitpoint-secret to sync"
3236        );
3237    }
3238
3239    tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3240    Ok(())
3241}
3242
3243/// Result of a waitpoint HMAC secret rotation across all execution partitions.
3244#[derive(Debug, Clone, serde::Serialize)]
3245pub struct RotateWaitpointSecretResult {
3246    /// Count of partitions that accepted the rotation.
3247    pub rotated: u16,
3248    /// Partition indices that failed — operator should investigate (Valkey
3249    /// outage, auth failure, cluster split). Rotation is idempotent, so a
3250    /// re-run after the underlying fault clears converges to the correct
3251    /// state.
3252    pub failed: Vec<u16>,
3253    /// New kid installed as current.
3254    pub new_kid: String,
3255}
3256
3257impl Server {
3258    /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
3259    /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
3260    /// as the new current kid. Idempotent: re-running with the same `new_kid`
3261    /// and `new_secret_hex` converges partitions to the same state.
3262    ///
3263    /// Returns a structured result so operators can see which partitions failed.
3264    /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
3265    pub async fn rotate_waitpoint_secret(
3266        &self,
3267        new_kid: &str,
3268        new_secret_hex: &str,
3269    ) -> Result<RotateWaitpointSecretResult, ServerError> {
3270        if new_kid.is_empty() || new_kid.contains(':') {
3271            return Err(ServerError::OperationFailed(
3272                "new_kid must be non-empty and must not contain ':'".into(),
3273            ));
3274        }
3275        if new_secret_hex.is_empty()
3276            || !new_secret_hex.len().is_multiple_of(2)
3277            || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3278        {
3279            return Err(ServerError::OperationFailed(
3280                "new_secret_hex must be a non-empty even-length hex string".into(),
3281            ));
3282        }
3283
3284        // Single-writer gate. Concurrent rotates against the SAME operator
3285        // token are an attack pattern (or a retry-loop bug); legitimate
3286        // operators rotate monthly and can afford to serialize. Contention
3287        // returns ConcurrencyLimitExceeded("admin_rotate", 1) (→ HTTP 429
3288        // with a labelled error body) rather than queueing the HTTP
3289        // handler past the 120s endpoint timeout. Permit is held for
3290        // the full partition fan-out.
3291        let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3292            Ok(p) => p,
3293            Err(tokio::sync::TryAcquireError::NoPermits) => {
3294                return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3295            }
3296            Err(tokio::sync::TryAcquireError::Closed) => {
3297                return Err(ServerError::OperationFailed(
3298                    "admin rotate semaphore closed (server shutting down)".into(),
3299                ));
3300            }
3301        };
3302
3303        let n = self.config.partition_config.num_flow_partitions;
3304        // "now" is derived inside the FCALL from `redis.call("TIME")`
3305        // (consistency with validate_waitpoint_token and flow scanners);
3306        // grace_ms is a duration — safe to carry from config.
3307        let grace_ms = self.config.waitpoint_hmac_grace_ms;
3308
3309        // Parallelize the rotation fan-out with the same bounded
3310        // concurrency as boot init (BOOT_INIT_CONCURRENCY = 16). A 256-
3311        // partition sequential rotation takes ~7.7s at 30ms cross-AZ RTT,
3312        // uncomfortably close to the 120s HTTP endpoint timeout under
3313        // contention. Atomicity per partition now lives inside the
3314        // `ff_rotate_waitpoint_hmac_secret` FCALL (FCALL is atomic per
3315        // shard); parallelism across DIFFERENT partitions is safe. The
3316        // outer `admin_rotate_semaphore(1)` bounds server-wide concurrent
3317        // rotations, so this fan-out only affects a single in-flight
3318        // rotate call at a time.
3319        use futures::stream::{FuturesUnordered, StreamExt};
3320
3321        let mut rotated = 0u16;
3322        let mut failed = Vec::new();
3323        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3324        let mut next_index: u16 = 0;
3325
3326        loop {
3327            while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3328                let partition = Partition {
3329                    family: PartitionFamily::Execution,
3330                    index: next_index,
3331                };
3332                let idx = next_index;
3333                // Clone only what the per-partition future needs. The
3334                // new_kid / new_secret_hex references outlive the loop
3335                // (they come from the enclosing function args), but
3336                // FuturesUnordered needs 'static futures. Own the strings.
3337                let new_kid_owned = new_kid.to_owned();
3338                let new_secret_owned = new_secret_hex.to_owned();
3339                let partition_owned = partition;
3340                let fut = async move {
3341                    let outcome = self
3342                        .rotate_single_partition(
3343                            &partition_owned,
3344                            &new_kid_owned,
3345                            &new_secret_owned,
3346                            grace_ms,
3347                        )
3348                        .await;
3349                    (idx, partition_owned, outcome)
3350                };
3351                pending.push(fut);
3352                next_index += 1;
3353            }
3354            match pending.next().await {
3355                Some((idx, partition, outcome)) => match outcome {
3356                    Ok(()) => {
3357                        rotated += 1;
3358                        // Per-partition event → DEBUG (not INFO). Rationale:
3359                        // one rotate endpoint call produces 256 partition-level
3360                        // events, which would blow up paid aggregator budgets
3361                        // (Datadog/Splunk) at no operational value. The single
3362                        // aggregated audit event below is the compliance
3363                        // artifact. Failures stay at ERROR with per-partition
3364                        // detail — that's where operators need it.
3365                        tracing::debug!(
3366                            partition = %partition,
3367                            new_kid = %new_kid,
3368                            "waitpoint_hmac_rotated"
3369                        );
3370                    }
3371                    Err(e) => {
3372                        // Failures stay at ERROR (target=audit) per-partition —
3373                        // operators need the partition index + error to debug
3374                        // Valkey/config faults. Low cardinality in practice.
3375                        tracing::error!(
3376                            target: "audit",
3377                            partition = %partition,
3378                            err = %e,
3379                            "waitpoint_hmac_rotation_failed"
3380                        );
3381                        failed.push(idx);
3382                    }
3383                },
3384                None => break,
3385            }
3386        }
3387
3388        // Single aggregated audit event for the whole rotation. This is
3389        // the load-bearing compliance artifact — operators alert on
3390        // target="audit" at INFO level and this is the stable schema.
3391        tracing::info!(
3392            target: "audit",
3393            new_kid = %new_kid,
3394            total_partitions = n,
3395            rotated,
3396            failed_count = failed.len(),
3397            "waitpoint_hmac_rotation_complete"
3398        );
3399
3400        Ok(RotateWaitpointSecretResult {
3401            rotated,
3402            failed,
3403            new_kid: new_kid.to_owned(),
3404        })
3405    }
3406
3407    /// Rotate on a single partition by dispatching the
3408    /// `ff_rotate_waitpoint_hmac_secret` FCALL. FCALL is atomic per shard,
3409    /// so no external SETNX lock is needed — the script itself IS the
3410    /// atomicity boundary. Single source of truth (Lua); the Rust
3411    /// implementation that previously lived here was an exact duplicate
3412    /// and has been removed.
3413    async fn rotate_single_partition(
3414        &self,
3415        partition: &Partition,
3416        new_kid: &str,
3417        new_secret_hex: &str,
3418        grace_ms: u64,
3419    ) -> Result<(), ServerError> {
3420        let idx = IndexKeys::new(partition);
3421        let args = RotateWaitpointHmacSecretArgs {
3422            new_kid: new_kid.to_owned(),
3423            new_secret_hex: new_secret_hex.to_owned(),
3424            grace_ms,
3425        };
3426        let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3427            &self.client,
3428            &idx,
3429            &args,
3430        )
3431        .await
3432        .map_err(|e| match e {
3433            // Same kid + different secret. Map to the same 409-style
3434            // error the old Rust path returned so HTTP callers keep the
3435            // current surface.
3436            ff_script::ScriptError::RotationConflict(kid) => {
3437                ServerError::OperationFailed(format!(
3438                    "rotation conflict: kid {kid} already installed with a \
3439                     different secret. Either use a fresh kid or restore the \
3440                     original secret for this kid before retrying."
3441                ))
3442            }
3443            ff_script::ScriptError::Valkey(v) => crate::server::backend_context(
3444                v,
3445                format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3446            ),
3447            other => ServerError::OperationFailed(format!(
3448                "rotation failed on partition {partition}: {other}"
3449            )),
3450        })?;
3451        // Either outcome is a successful write from the operator's POV.
3452        // Rotated → new install; Noop → idempotent replay.
3453        let _ = outcome;
3454        Ok(())
3455    }
3456}
3457
3458// ── FCALL result parsing ──
3459
3460fn parse_create_result(
3461    raw: &Value,
3462    execution_id: &ExecutionId,
3463) -> Result<CreateExecutionResult, ServerError> {
3464    let arr = match raw {
3465        Value::Array(arr) => arr,
3466        _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3467    };
3468
3469    let status = match arr.first() {
3470        Some(Ok(Value::Int(n))) => *n,
3471        _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3472    };
3473
3474    if status == 1 {
3475        // Check sub-status: OK or DUPLICATE
3476        let sub = arr
3477            .get(1)
3478            .and_then(|v| match v {
3479                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3480                Ok(Value::SimpleString(s)) => Some(s.clone()),
3481                _ => None,
3482            })
3483            .unwrap_or_default();
3484
3485        if sub == "DUPLICATE" {
3486            Ok(CreateExecutionResult::Duplicate {
3487                execution_id: execution_id.clone(),
3488            })
3489        } else {
3490            Ok(CreateExecutionResult::Created {
3491                execution_id: execution_id.clone(),
3492                public_state: PublicState::Waiting,
3493            })
3494        }
3495    } else {
3496        let error_code = arr
3497            .get(1)
3498            .and_then(|v| match v {
3499                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3500                Ok(Value::SimpleString(s)) => Some(s.clone()),
3501                _ => None,
3502            })
3503            .unwrap_or_else(|| "unknown".to_owned());
3504        Err(ServerError::OperationFailed(format!(
3505            "ff_create_execution failed: {error_code}"
3506        )))
3507    }
3508}
3509
3510fn parse_cancel_result(
3511    raw: &Value,
3512    execution_id: &ExecutionId,
3513) -> Result<CancelExecutionResult, ServerError> {
3514    let arr = match raw {
3515        Value::Array(arr) => arr,
3516        _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3517    };
3518
3519    let status = match arr.first() {
3520        Some(Ok(Value::Int(n))) => *n,
3521        _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3522    };
3523
3524    if status == 1 {
3525        Ok(CancelExecutionResult::Cancelled {
3526            execution_id: execution_id.clone(),
3527            public_state: PublicState::Cancelled,
3528        })
3529    } else {
3530        let error_code = arr
3531            .get(1)
3532            .and_then(|v| match v {
3533                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3534                Ok(Value::SimpleString(s)) => Some(s.clone()),
3535                _ => None,
3536            })
3537            .unwrap_or_else(|| "unknown".to_owned());
3538        Err(ServerError::OperationFailed(format!(
3539            "ff_cancel_execution failed: {error_code}"
3540        )))
3541    }
3542}
3543
3544fn parse_budget_create_result(
3545    raw: &Value,
3546    budget_id: &BudgetId,
3547) -> Result<CreateBudgetResult, ServerError> {
3548    let arr = match raw {
3549        Value::Array(arr) => arr,
3550        _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3551    };
3552
3553    let status = match arr.first() {
3554        Some(Ok(Value::Int(n))) => *n,
3555        _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3556    };
3557
3558    if status == 1 {
3559        let sub = arr
3560            .get(1)
3561            .and_then(|v| match v {
3562                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3563                Ok(Value::SimpleString(s)) => Some(s.clone()),
3564                _ => None,
3565            })
3566            .unwrap_or_default();
3567
3568        if sub == "ALREADY_SATISFIED" {
3569            Ok(CreateBudgetResult::AlreadySatisfied {
3570                budget_id: budget_id.clone(),
3571            })
3572        } else {
3573            Ok(CreateBudgetResult::Created {
3574                budget_id: budget_id.clone(),
3575            })
3576        }
3577    } else {
3578        let error_code = arr
3579            .get(1)
3580            .and_then(|v| match v {
3581                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3582                Ok(Value::SimpleString(s)) => Some(s.clone()),
3583                _ => None,
3584            })
3585            .unwrap_or_else(|| "unknown".to_owned());
3586        Err(ServerError::OperationFailed(format!(
3587            "ff_create_budget failed: {error_code}"
3588        )))
3589    }
3590}
3591
3592fn parse_quota_create_result(
3593    raw: &Value,
3594    quota_policy_id: &QuotaPolicyId,
3595) -> Result<CreateQuotaPolicyResult, ServerError> {
3596    let arr = match raw {
3597        Value::Array(arr) => arr,
3598        _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3599    };
3600
3601    let status = match arr.first() {
3602        Some(Ok(Value::Int(n))) => *n,
3603        _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3604    };
3605
3606    if status == 1 {
3607        let sub = arr
3608            .get(1)
3609            .and_then(|v| match v {
3610                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3611                Ok(Value::SimpleString(s)) => Some(s.clone()),
3612                _ => None,
3613            })
3614            .unwrap_or_default();
3615
3616        if sub == "ALREADY_SATISFIED" {
3617            Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3618                quota_policy_id: quota_policy_id.clone(),
3619            })
3620        } else {
3621            Ok(CreateQuotaPolicyResult::Created {
3622                quota_policy_id: quota_policy_id.clone(),
3623            })
3624        }
3625    } else {
3626        let error_code = arr
3627            .get(1)
3628            .and_then(|v| match v {
3629                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3630                Ok(Value::SimpleString(s)) => Some(s.clone()),
3631                _ => None,
3632            })
3633            .unwrap_or_else(|| "unknown".to_owned());
3634        Err(ServerError::OperationFailed(format!(
3635            "ff_create_quota_policy failed: {error_code}"
3636        )))
3637    }
3638}
3639
3640// ── Flow FCALL result parsing ──
3641
3642fn parse_create_flow_result(
3643    raw: &Value,
3644    flow_id: &FlowId,
3645) -> Result<CreateFlowResult, ServerError> {
3646    let arr = match raw {
3647        Value::Array(arr) => arr,
3648        _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3649    };
3650    let status = match arr.first() {
3651        Some(Ok(Value::Int(n))) => *n,
3652        _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3653    };
3654    if status == 1 {
3655        let sub = fcall_field_str(arr, 1);
3656        if sub == "ALREADY_SATISFIED" {
3657            Ok(CreateFlowResult::AlreadySatisfied {
3658                flow_id: flow_id.clone(),
3659            })
3660        } else {
3661            Ok(CreateFlowResult::Created {
3662                flow_id: flow_id.clone(),
3663            })
3664        }
3665    } else {
3666        let error_code = fcall_field_str(arr, 1);
3667        Err(ServerError::OperationFailed(format!(
3668            "ff_create_flow failed: {error_code}"
3669        )))
3670    }
3671}
3672
3673fn parse_add_execution_to_flow_result(
3674    raw: &Value,
3675) -> Result<AddExecutionToFlowResult, ServerError> {
3676    let arr = match raw {
3677        Value::Array(arr) => arr,
3678        _ => {
3679            return Err(ServerError::Script(
3680                "ff_add_execution_to_flow: expected Array".into(),
3681            ))
3682        }
3683    };
3684    let status = match arr.first() {
3685        Some(Ok(Value::Int(n))) => *n,
3686        _ => {
3687            return Err(ServerError::Script(
3688                "ff_add_execution_to_flow: bad status code".into(),
3689            ))
3690        }
3691    };
3692    if status == 1 {
3693        let sub = fcall_field_str(arr, 1);
3694        let eid_str = fcall_field_str(arr, 2);
3695        let nc_str = fcall_field_str(arr, 3);
3696        let eid = ExecutionId::parse(&eid_str)
3697            .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3698        let nc: u32 = nc_str.parse().unwrap_or(0);
3699        if sub == "ALREADY_SATISFIED" {
3700            Ok(AddExecutionToFlowResult::AlreadyMember {
3701                execution_id: eid,
3702                node_count: nc,
3703            })
3704        } else {
3705            Ok(AddExecutionToFlowResult::Added {
3706                execution_id: eid,
3707                new_node_count: nc,
3708            })
3709        }
3710    } else {
3711        let error_code = fcall_field_str(arr, 1);
3712        Err(ServerError::OperationFailed(format!(
3713            "ff_add_execution_to_flow failed: {error_code}"
3714        )))
3715    }
3716}
3717
3718/// Outcome of parsing a raw `ff_cancel_flow` FCALL response.
3719///
3720/// Keeps `AlreadyTerminal` distinct from other script errors so the caller
3721/// can treat cancel on an already-cancelled/completed/failed flow as
3722/// idempotent success instead of surfacing a 400 to the client.
3723enum ParsedCancelFlow {
3724    Cancelled {
3725        policy: String,
3726        member_execution_ids: Vec<String>,
3727    },
3728    AlreadyTerminal,
3729}
3730
3731/// Parse the raw `ff_cancel_flow` FCALL response.
3732///
3733/// Returns [`ParsedCancelFlow::Cancelled`] on success, [`ParsedCancelFlow::AlreadyTerminal`]
3734/// when the flow was already in a terminal state (idempotent retry), or a
3735/// [`ServerError`] for any other failure.
3736fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3737    let arr = match raw {
3738        Value::Array(arr) => arr,
3739        _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3740    };
3741    let status = match arr.first() {
3742        Some(Ok(Value::Int(n))) => *n,
3743        _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3744    };
3745    if status != 1 {
3746        let error_code = fcall_field_str(arr, 1);
3747        if error_code == "flow_already_terminal" {
3748            return Ok(ParsedCancelFlow::AlreadyTerminal);
3749        }
3750        return Err(ServerError::OperationFailed(format!(
3751            "ff_cancel_flow failed: {error_code}"
3752        )));
3753    }
3754    // {1, "OK", cancellation_policy, member1, member2, ...}
3755    let policy = fcall_field_str(arr, 2);
3756    // Iterate to arr.len() rather than breaking on the first empty string —
3757    // safer against malformed Lua responses and clearer than a sentinel loop.
3758    let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3759    for i in 3..arr.len() {
3760        members.push(fcall_field_str(arr, i));
3761    }
3762    Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3763}
3764
3765fn parse_stage_dependency_edge_result(
3766    raw: &Value,
3767) -> Result<StageDependencyEdgeResult, ServerError> {
3768    let arr = match raw {
3769        Value::Array(arr) => arr,
3770        _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3771    };
3772    let status = match arr.first() {
3773        Some(Ok(Value::Int(n))) => *n,
3774        _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3775    };
3776    if status == 1 {
3777        let edge_id_str = fcall_field_str(arr, 2);
3778        let rev_str = fcall_field_str(arr, 3);
3779        let edge_id = EdgeId::parse(&edge_id_str)
3780            .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3781        let rev: u64 = rev_str.parse().unwrap_or(0);
3782        Ok(StageDependencyEdgeResult::Staged {
3783            edge_id,
3784            new_graph_revision: rev,
3785        })
3786    } else {
3787        let error_code = fcall_field_str(arr, 1);
3788        Err(ServerError::OperationFailed(format!(
3789            "ff_stage_dependency_edge failed: {error_code}"
3790        )))
3791    }
3792}
3793
3794fn parse_apply_dependency_result(
3795    raw: &Value,
3796) -> Result<ApplyDependencyToChildResult, ServerError> {
3797    let arr = match raw {
3798        Value::Array(arr) => arr,
3799        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3800    };
3801    let status = match arr.first() {
3802        Some(Ok(Value::Int(n))) => *n,
3803        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3804    };
3805    if status == 1 {
3806        let sub = fcall_field_str(arr, 1);
3807        if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3808            Ok(ApplyDependencyToChildResult::AlreadyApplied)
3809        } else {
3810            // OK status — field at index 2 is unsatisfied count
3811            let count_str = fcall_field_str(arr, 2);
3812            let count: u32 = count_str.parse().unwrap_or(0);
3813            Ok(ApplyDependencyToChildResult::Applied {
3814                unsatisfied_count: count,
3815            })
3816        }
3817    } else {
3818        let error_code = fcall_field_str(arr, 1);
3819        Err(ServerError::OperationFailed(format!(
3820            "ff_apply_dependency_to_child failed: {error_code}"
3821        )))
3822    }
3823}
3824
3825fn parse_deliver_signal_result(
3826    raw: &Value,
3827    signal_id: &SignalId,
3828) -> Result<DeliverSignalResult, ServerError> {
3829    let arr = match raw {
3830        Value::Array(arr) => arr,
3831        _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3832    };
3833    let status = match arr.first() {
3834        Some(Ok(Value::Int(n))) => *n,
3835        _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3836    };
3837    if status == 1 {
3838        let sub = fcall_field_str(arr, 1);
3839        if sub == "DUPLICATE" {
3840            // ok_duplicate(existing_signal_id) → {1, "DUPLICATE", existing_signal_id}
3841            let existing_str = fcall_field_str(arr, 2);
3842            let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3843            Ok(DeliverSignalResult::Duplicate {
3844                existing_signal_id: existing_id,
3845            })
3846        } else {
3847            // ok(signal_id, effect) → {1, "OK", signal_id, effect}
3848            let effect = fcall_field_str(arr, 3);
3849            Ok(DeliverSignalResult::Accepted {
3850                signal_id: signal_id.clone(),
3851                effect,
3852            })
3853        }
3854    } else {
3855        let error_code = fcall_field_str(arr, 1);
3856        Err(ServerError::OperationFailed(format!(
3857            "ff_deliver_signal failed: {error_code}"
3858        )))
3859    }
3860}
3861
3862fn parse_change_priority_result(
3863    raw: &Value,
3864    execution_id: &ExecutionId,
3865) -> Result<ChangePriorityResult, ServerError> {
3866    let arr = match raw {
3867        Value::Array(arr) => arr,
3868        _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3869    };
3870    let status = match arr.first() {
3871        Some(Ok(Value::Int(n))) => *n,
3872        _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3873    };
3874    if status == 1 {
3875        Ok(ChangePriorityResult::Changed {
3876            execution_id: execution_id.clone(),
3877        })
3878    } else {
3879        let error_code = fcall_field_str(arr, 1);
3880        Err(ServerError::OperationFailed(format!(
3881            "ff_change_priority failed: {error_code}"
3882        )))
3883    }
3884}
3885
3886fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3887    let arr = match raw {
3888        Value::Array(arr) => arr,
3889        _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3890    };
3891    let status = match arr.first() {
3892        Some(Ok(Value::Int(n))) => *n,
3893        _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3894    };
3895    if status == 1 {
3896        // ok("0") for normal replay, ok(N) for skipped flow member
3897        let unsatisfied = fcall_field_str(arr, 2);
3898        let ps = if unsatisfied == "0" {
3899            PublicState::Waiting
3900        } else {
3901            PublicState::WaitingChildren
3902        };
3903        Ok(ReplayExecutionResult::Replayed { public_state: ps })
3904    } else {
3905        let error_code = fcall_field_str(arr, 1);
3906        Err(ServerError::OperationFailed(format!(
3907            "ff_replay_execution failed: {error_code}"
3908        )))
3909    }
3910}
3911
3912/// Extract a string from an FCALL result array at the given index.
3913/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
3914/// for transport-level variants. Business-logic variants keep their code as
3915/// `ServerError::Script(String)` so HTTP clients see a stable message.
3916///
3917/// Why this exists: before R2, the stream handlers did
3918/// `ScriptError → format!() → ServerError::Script(String)`, which erased
3919/// the ErrorKind and made `ServerError::is_retryable()` always return
3920/// false. Retry-capable clients (cairn-fabric) would not retry a legit
3921/// transient error like `IoError`.
3922fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3923    match e {
3924        ff_script::error::ScriptError::Valkey(valkey_err) => {
3925            crate::server::backend_context(valkey_err, "stream FCALL transport")
3926        }
3927        other => ServerError::Script(other.to_string()),
3928    }
3929}
3930
3931fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3932    match arr.get(index) {
3933        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3934        Some(Ok(Value::SimpleString(s))) => s.clone(),
3935        Some(Ok(Value::Int(n))) => n.to_string(),
3936        _ => String::new(),
3937    }
3938}
3939
3940/// Parse ff_report_usage_and_check result.
3941/// Standard format: {1, "OK"}, {1, "SOFT_BREACH", dim, current, limit},
3942///                  {1, "HARD_BREACH", dim, current, limit}, {1, "ALREADY_APPLIED"}
3943fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3944    let arr = match raw {
3945        Value::Array(arr) => arr,
3946        _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3947    };
3948    let status_code = match arr.first() {
3949        Some(Ok(Value::Int(n))) => *n,
3950        _ => {
3951            return Err(ServerError::Script(
3952                "ff_report_usage_and_check: expected Int status code".into(),
3953            ))
3954        }
3955    };
3956    if status_code != 1 {
3957        let error_code = fcall_field_str(arr, 1);
3958        return Err(ServerError::OperationFailed(format!(
3959            "ff_report_usage_and_check failed: {error_code}"
3960        )));
3961    }
3962    let sub_status = fcall_field_str(arr, 1);
3963    match sub_status.as_str() {
3964        "OK" => Ok(ReportUsageResult::Ok),
3965        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3966        "SOFT_BREACH" => {
3967            let dim = fcall_field_str(arr, 2);
3968            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3969            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3970            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3971        }
3972        "HARD_BREACH" => {
3973            let dim = fcall_field_str(arr, 2);
3974            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3975            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3976            Ok(ReportUsageResult::HardBreach {
3977                dimension: dim,
3978                current_usage: current,
3979                hard_limit: limit,
3980            })
3981        }
3982        _ => Err(ServerError::OperationFailed(format!(
3983            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3984        ))),
3985    }
3986}
3987
3988fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3989    let arr = match raw {
3990        Value::Array(arr) => arr,
3991        _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3992    };
3993    let status = match arr.first() {
3994        Some(Ok(Value::Int(n))) => *n,
3995        _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3996    };
3997    if status == 1 {
3998        let sub = fcall_field_str(arr, 1);
3999        if sub == "ALREADY_SATISFIED" {
4000            let reason = fcall_field_str(arr, 2);
4001            Ok(RevokeLeaseResult::AlreadySatisfied { reason })
4002        } else {
4003            let lid = fcall_field_str(arr, 2);
4004            let epoch = fcall_field_str(arr, 3);
4005            Ok(RevokeLeaseResult::Revoked {
4006                lease_id: lid,
4007                lease_epoch: epoch,
4008            })
4009        }
4010    } else {
4011        let error_code = fcall_field_str(arr, 1);
4012        Err(ServerError::OperationFailed(format!(
4013            "ff_revoke_lease failed: {error_code}"
4014        )))
4015    }
4016}
4017
4018/// Detect Valkey errors indicating the Lua function library is not loaded.
4019///
4020/// After a failover, the new primary may not have the library if replication
4021/// was incomplete. Valkey returns `ERR Function not loaded` for FCALL calls
4022/// targeting missing functions.
4023fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
4024    if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
4025        return true;
4026    }
4027    e.detail()
4028        .map(|d| {
4029            d.contains("Function not loaded")
4030                || d.contains("No matching function")
4031                || d.contains("function not found")
4032        })
4033        .unwrap_or(false)
4034        || e.to_string().contains("Function not loaded")
4035}
4036
4037/// Free-function form of [`Server::fcall_with_reload`] — callable from
4038/// background tasks that own a cloned `Client` but no `&Server`.
4039async fn fcall_with_reload_on_client(
4040    client: &Client,
4041    function: &str,
4042    keys: &[&str],
4043    args: &[&str],
4044) -> Result<Value, ServerError> {
4045    match client.fcall(function, keys, args).await {
4046        Ok(v) => Ok(v),
4047        Err(e) if is_function_not_loaded(&e) => {
4048            tracing::warn!(function, "Lua library not found on server, reloading");
4049            ff_script::loader::ensure_library(client)
4050                .await
4051                .map_err(ServerError::LibraryLoad)?;
4052            client
4053                .fcall(function, keys, args)
4054                .await
4055                .map_err(ServerError::from)
4056        }
4057        Err(e) => Err(ServerError::from(e)),
4058    }
4059}
4060
4061/// Build the `ff_cancel_execution` KEYS (21) and ARGV (5) by pre-reading
4062/// dynamic fields from `exec_core`. Shared by [`Server::cancel_execution`]
4063/// and the async cancel_flow member-dispatch path.
4064async fn build_cancel_execution_fcall(
4065    client: &Client,
4066    partition_config: &PartitionConfig,
4067    args: &CancelExecutionArgs,
4068) -> Result<(Vec<String>, Vec<String>), ServerError> {
4069    let partition = execution_partition(&args.execution_id, partition_config);
4070    let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4071    let idx = IndexKeys::new(&partition);
4072
4073    let lane_str: Option<String> = client
4074        .hget(&ctx.core(), "lane_id")
4075        .await
4076        .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
4077    let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4078
4079    let dyn_fields: Vec<Option<String>> = client
4080        .cmd("HMGET")
4081        .arg(ctx.core())
4082        .arg("current_attempt_index")
4083        .arg("current_waitpoint_id")
4084        .arg("current_worker_instance_id")
4085        .execute()
4086        .await
4087        .map_err(|e| crate::server::backend_context(e, "HMGET cancel pre-read"))?;
4088
4089    let att_idx_val = dyn_fields.first()
4090        .and_then(|v| v.as_ref())
4091        .and_then(|s| s.parse::<u32>().ok())
4092        .unwrap_or(0);
4093    let att_idx = AttemptIndex::new(att_idx_val);
4094    let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4095    let wp_id = if wp_id_str.is_empty() {
4096        WaitpointId::new()
4097    } else {
4098        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4099    };
4100    let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4101    let wiid = WorkerInstanceId::new(&wiid_str);
4102
4103    let keys: Vec<String> = vec![
4104        ctx.core(),                              // 1
4105        ctx.attempt_hash(att_idx),               // 2
4106        ctx.stream_meta(att_idx),                // 3
4107        ctx.lease_current(),                     // 4
4108        ctx.lease_history(),                     // 5
4109        idx.lease_expiry(),                      // 6
4110        idx.worker_leases(&wiid),                // 7
4111        ctx.suspension_current(),                // 8
4112        ctx.waitpoint(&wp_id),                   // 9
4113        ctx.waitpoint_condition(&wp_id),         // 10
4114        idx.suspension_timeout(),                // 11
4115        idx.lane_terminal(&lane),                // 12
4116        idx.attempt_timeout(),                   // 13
4117        idx.execution_deadline(),                // 14
4118        idx.lane_eligible(&lane),                // 15
4119        idx.lane_delayed(&lane),                 // 16
4120        idx.lane_blocked_dependencies(&lane),    // 17
4121        idx.lane_blocked_budget(&lane),          // 18
4122        idx.lane_blocked_quota(&lane),           // 19
4123        idx.lane_blocked_route(&lane),           // 20
4124        idx.lane_blocked_operator(&lane),        // 21
4125    ];
4126    let argv: Vec<String> = vec![
4127        args.execution_id.to_string(),
4128        args.reason.clone(),
4129        args.source.to_string(),
4130        args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4131        args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4132    ];
4133    Ok((keys, argv))
4134}
4135
4136/// Backoff schedule for transient Valkey errors during async cancel_flow
4137/// dispatch. Length = retry-attempt count (including the initial attempt).
4138/// The last entry is not slept on because it's the final attempt.
4139const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4140
4141/// Reach into a `ServerError` for a transport-layer retryability hint.
4142/// Matches the variants that can carry a backend transport fault:
4143/// direct `Backend`, `BackendContext`, and `LibraryLoad` (which
4144/// wraps a ferriskey error internally).
4145///
4146/// Renamed from `extract_valkey_kind` in #88 — the previous return
4147/// type `Option<ferriskey::ErrorKind>` leaked ferriskey into the
4148/// server's internal retry logic. Callers now get a backend-agnostic
4149/// [`BackendErrorKind`] and dispatch via
4150/// [`BackendErrorKind::is_retryable`].
4151fn extract_backend_kind(e: &ServerError) -> Option<ff_core::BackendErrorKind> {
4152    e.backend_kind()
4153}
4154
4155/// Cancel a single member execution from a cancel_flow dispatch context.
4156/// Parses the flow-member EID string, builds the FCALL via the shared helper,
4157/// and executes with the same reload-on-failover semantics as the inline path.
4158///
4159/// Wrapped in a bounded retry loop (see [`CANCEL_MEMBER_RETRY_DELAYS_MS`]) so
4160/// that transient Valkey errors mid-dispatch (failover, `TryAgain`,
4161/// `ClusterDown`, `IoError`, `FatalSendError`) do not silently leak
4162/// non-cancelled members. `FatalReceiveError` and non-retryable kinds bubble
4163/// up immediately — those either indicate the Lua ran server-side anyway or a
4164/// permanent mismatch that retries cannot fix.
4165/// Acknowledge that a member cancel has committed. Fires
4166/// `ff_ack_cancel_member` on `{fp:N}` to SREM the execution from the
4167/// flow's `pending_cancels` set and, if empty, ZREM the flow from the
4168/// partition-level `cancel_backlog`. Best-effort — failures are logged
4169/// but not propagated, since the reconciler will catch anything that
4170/// stays behind on its next pass.
4171async fn ack_cancel_member(
4172    client: &Client,
4173    pending_cancels_key: &str,
4174    cancel_backlog_key: &str,
4175    eid_str: &str,
4176    flow_id: &str,
4177) {
4178    let keys = [pending_cancels_key, cancel_backlog_key];
4179    let args_v = [eid_str, flow_id];
4180    let fut: Result<Value, _> =
4181        client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4182    if let Err(e) = fut {
4183        tracing::warn!(
4184            flow_id = %flow_id,
4185            execution_id = %eid_str,
4186            error = %e,
4187            "ff_ack_cancel_member failed; reconciler will drain on next pass"
4188        );
4189    }
4190}
4191
4192/// Returns true if a cancel_member failure reflects an already-terminal
4193/// (or never-existed) execution, which from the flow-cancel perspective
4194/// is ack-worthy success rather than a partial failure. The Lua
4195/// `ff_cancel_execution` function emits `execution_not_active` when the
4196/// member is already in a terminal phase, and `execution_not_found` when
4197/// the key is gone. Both codes arrive here wrapped in
4198/// `ServerError::OperationFailed("ff_cancel_execution failed: <code>")`
4199/// via `parse_cancel_result`.
4200fn is_terminal_ack_error(err: &ServerError) -> bool {
4201    match err {
4202        ServerError::OperationFailed(msg) => {
4203            msg.contains("execution_not_active") || msg.contains("execution_not_found")
4204        }
4205        _ => false,
4206    }
4207}
4208
4209async fn cancel_member_execution(
4210    client: &Client,
4211    partition_config: &PartitionConfig,
4212    eid_str: &str,
4213    reason: &str,
4214    now: TimestampMs,
4215) -> Result<(), ServerError> {
4216    let execution_id = ExecutionId::parse(eid_str)
4217        .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4218    let args = CancelExecutionArgs {
4219        execution_id: execution_id.clone(),
4220        reason: reason.to_owned(),
4221        source: CancelSource::OperatorOverride,
4222        lease_id: None,
4223        lease_epoch: None,
4224        attempt_id: None,
4225        now,
4226    };
4227
4228    let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4229    for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4230        let is_last = attempt_idx + 1 == attempts;
4231        match try_cancel_member_once(client, partition_config, &args).await {
4232            Ok(()) => return Ok(()),
4233            Err(e) => {
4234                // Only retry transport-layer transients; business-logic
4235                // errors (Script / OperationFailed / NotFound / InvalidInput)
4236                // won't change on retry.
4237                let retryable = extract_backend_kind(&e)
4238                    .map(|k| k.is_retryable())
4239                    .unwrap_or(false);
4240                if !retryable || is_last {
4241                    return Err(e);
4242                }
4243                tracing::debug!(
4244                    execution_id = %execution_id,
4245                    attempt = attempt_idx + 1,
4246                    delay_ms = *delay_ms,
4247                    error = %e,
4248                    "cancel_member_execution: transient error, retrying"
4249                );
4250                tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4251            }
4252        }
4253    }
4254    // Unreachable: the loop above either returns Ok, returns Err on the
4255    // last attempt, or returns Err on a non-retryable error. Keep a
4256    // defensive fallback for future edits to the retry structure.
4257    Err(ServerError::OperationFailed(format!(
4258        "cancel_member_execution: retries exhausted for {execution_id}"
4259    )))
4260}
4261
4262/// Single cancel attempt — pre-read + FCALL + parse. Factored out so the
4263/// retry loop in [`cancel_member_execution`] can invoke it cleanly.
4264async fn try_cancel_member_once(
4265    client: &Client,
4266    partition_config: &PartitionConfig,
4267    args: &CancelExecutionArgs,
4268) -> Result<(), ServerError> {
4269    let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4270    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4271    let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4272    let raw =
4273        fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4274    parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4275}
4276
4277fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4278    let arr = match raw {
4279        Value::Array(arr) => arr,
4280        _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4281    };
4282    let status = match arr.first() {
4283        Some(Ok(Value::Int(n))) => *n,
4284        _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4285    };
4286    if status == 1 {
4287        let next_str = fcall_field_str(arr, 2);
4288        let next_ms: i64 = next_str.parse().unwrap_or(0);
4289        Ok(ResetBudgetResult::Reset {
4290            next_reset_at: TimestampMs::from_millis(next_ms),
4291        })
4292    } else {
4293        let error_code = fcall_field_str(arr, 1);
4294        Err(ServerError::OperationFailed(format!(
4295            "ff_reset_budget failed: {error_code}"
4296        )))
4297    }
4298}
4299
4300#[cfg(test)]
4301mod tests {
4302    use super::*;
4303    use ferriskey::ErrorKind;
4304
4305    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4306        ferriskey::Error::from((kind, "synthetic"))
4307    }
4308
4309    // ── Budget dimension-cap validation (issue #104) ──
4310
4311    #[test]
4312    fn create_budget_rejects_over_cap_dimension_count() {
4313        let n = MAX_BUDGET_DIMENSIONS + 1;
4314        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4315        let hard = vec![1u64; n];
4316        let soft = vec![0u64; n];
4317        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4318        match err {
4319            ServerError::InvalidInput(msg) => {
4320                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4321                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4322                assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4323            }
4324            other => panic!("expected InvalidInput, got {other:?}"),
4325        }
4326    }
4327
4328    #[test]
4329    fn create_budget_accepts_exactly_cap_dimensions() {
4330        let n = MAX_BUDGET_DIMENSIONS;
4331        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4332        let hard = vec![1u64; n];
4333        let soft = vec![0u64; n];
4334        assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4335    }
4336
4337    #[test]
4338    fn create_budget_rejects_hard_limit_length_mismatch() {
4339        let dims = vec!["a".to_string(), "b".to_string()];
4340        let hard = vec![1u64]; // too short
4341        let soft = vec![0u64, 0u64];
4342        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4343        match err {
4344            ServerError::InvalidInput(msg) => {
4345                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4346                assert!(msg.contains("hard_limits=1"), "got: {msg}");
4347                assert!(msg.contains("dimensions=2"), "got: {msg}");
4348            }
4349            other => panic!("expected InvalidInput, got {other:?}"),
4350        }
4351    }
4352
4353    #[test]
4354    fn create_budget_rejects_soft_limit_length_mismatch() {
4355        let dims = vec!["a".to_string(), "b".to_string()];
4356        let hard = vec![1u64, 2u64];
4357        let soft = vec![0u64, 0u64, 0u64]; // too long
4358        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4359        match err {
4360            ServerError::InvalidInput(msg) => {
4361                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4362                assert!(msg.contains("soft_limits=3"), "got: {msg}");
4363            }
4364            other => panic!("expected InvalidInput, got {other:?}"),
4365        }
4366    }
4367
4368    #[test]
4369    fn report_usage_rejects_over_cap_dimension_count() {
4370        let n = MAX_BUDGET_DIMENSIONS + 1;
4371        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4372        let deltas = vec![1u64; n];
4373        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4374        match err {
4375            ServerError::InvalidInput(msg) => {
4376                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4377                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4378            }
4379            other => panic!("expected InvalidInput, got {other:?}"),
4380        }
4381    }
4382
4383    #[test]
4384    fn report_usage_accepts_exactly_cap_dimensions() {
4385        let n = MAX_BUDGET_DIMENSIONS;
4386        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4387        let deltas = vec![1u64; n];
4388        assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4389    }
4390
4391    #[test]
4392    fn report_usage_rejects_delta_length_mismatch() {
4393        let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4394        let deltas = vec![1u64, 2u64]; // too short
4395        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4396        match err {
4397            ServerError::InvalidInput(msg) => {
4398                assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4399                assert!(msg.contains("dimensions=3"), "got: {msg}");
4400                assert!(msg.contains("deltas=2"), "got: {msg}");
4401            }
4402            other => panic!("expected InvalidInput, got {other:?}"),
4403        }
4404    }
4405
4406    #[test]
4407    fn report_usage_accepts_empty_dimensions() {
4408        // Edge case: zero-dimension report_usage is a no-op that should pass
4409        // validation (Lua handles dim_count=0 correctly).
4410        assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4411    }
4412
4413    #[test]
4414    fn is_retryable_backend_variant_uses_kind_table() {
4415        // Transport-bucketed: retryable.
4416        assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
4417        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4418        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4419        // Cluster-bucketed (Moved / Ask / TryAgain / ClusterDown): retryable
4420        // after topology settles — the #88 BackendErrorKind classifier
4421        // treats these as transient cluster-churn, a semantic refinement
4422        // over the previous ff-script retry table.
4423        assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4424        assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4425        assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
4426        assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
4427        // BusyLoading: retryable.
4428        assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4429
4430        // Auth / Protocol / ScriptNotLoaded: terminal.
4431        assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4432        assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4433        assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4434    }
4435
4436    #[test]
4437    fn is_retryable_backend_context_uses_kind_table() {
4438        let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
4439        assert!(err.is_retryable());
4440
4441        let err =
4442            crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
4443        assert!(!err.is_retryable());
4444    }
4445
4446    #[test]
4447    fn is_retryable_library_load_delegates_to_inner_kind() {
4448        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4449            mk_fk_err(ErrorKind::IoError),
4450        ));
4451        assert!(err.is_retryable());
4452
4453        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4454            mk_fk_err(ErrorKind::AuthenticationFailed),
4455        ));
4456        assert!(!err.is_retryable());
4457
4458        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4459            expected: "1".into(),
4460            got: "2".into(),
4461        });
4462        assert!(!err.is_retryable());
4463    }
4464
4465    #[test]
4466    fn is_retryable_business_logic_variants_are_false() {
4467        assert!(!ServerError::NotFound("x".into()).is_retryable());
4468        assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4469        assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4470        assert!(!ServerError::Script("x".into()).is_retryable());
4471        assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4472    }
4473
4474    #[test]
4475    fn backend_kind_delegates_through_library_load() {
4476        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4477            mk_fk_err(ErrorKind::ClusterDown),
4478        ));
4479        assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
4480
4481        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4482            expected: "1".into(),
4483            got: "2".into(),
4484        });
4485        assert_eq!(err.backend_kind(), None);
4486    }
4487
4488    // ── Valkey version check (RFC-011 §13) ──
4489
4490    #[test]
4491    fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4492        // Valkey 8.x+ pins redis_version to 7.2.4 for Redis-client compat
4493        // and exposes the real version in valkey_version. Parser must use
4494        // valkey_version when both are present.
4495        let info = "\
4496# Server\r\n\
4497redis_version:7.2.4\r\n\
4498valkey_version:9.0.3\r\n\
4499server_mode:cluster\r\n\
4500os:Linux\r\n";
4501        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4502    }
4503
4504    #[test]
4505    fn parse_valkey_version_real_valkey_8_cluster_body() {
4506        // Actual INFO server response observed on valkey/valkey:latest in
4507        // cluster mode (CI matrix): redis_version compat-pinned to 7.2.4,
4508        // valkey_version authoritative at 9.0.3.
4509        let info = "\
4510# Server\r\n\
4511redis_version:7.2.4\r\n\
4512server_name:valkey\r\n\
4513valkey_version:9.0.3\r\n\
4514valkey_release_stage:ga\r\n\
4515redis_git_sha1:00000000\r\n\
4516server_mode:cluster\r\n";
4517        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4518    }
4519
4520    #[test]
4521    fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4522        // Valkey 7.x doesn't emit valkey_version, but does emit
4523        // server_name:valkey; parser falls back to redis_version.
4524        let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4525        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4526    }
4527
4528    #[test]
4529    fn parse_valkey_version_rejects_redis_backend() {
4530        // Real Redis emits redis_version: but not server_name:valkey and
4531        // not valkey_version:. Parser must reject this affirmatively so an
4532        // operator pointing ff-server at a Redis instance fails loud instead
4533        // of silently running against an unsupported backend.
4534        let info = "\
4535# Server\r\n\
4536redis_version:7.4.0\r\n\
4537redis_mode:standalone\r\n\
4538os:Linux\r\n";
4539        let err = parse_valkey_version(info).unwrap_err();
4540        assert!(matches!(err, ServerError::OperationFailed(_)));
4541        let msg = err.to_string();
4542        assert!(
4543            msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4544            "expected Redis-rejection message, got: {msg}"
4545        );
4546    }
4547
4548    #[test]
4549    fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4550        // INFO values are conventionally lowercase but be defensive.
4551        let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4552        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4553    }
4554
4555    #[test]
4556    fn parse_valkey_version_errors_when_no_version_field() {
4557        let info = "# Server\r\nfoo:bar\r\n";
4558        let err = parse_valkey_version(info).unwrap_err();
4559        assert!(matches!(err, ServerError::OperationFailed(_)));
4560        assert!(
4561            err.to_string().contains("missing"),
4562            "expected 'missing' in message, got: {err}"
4563        );
4564    }
4565
4566    #[test]
4567    fn parse_valkey_version_errors_on_non_numeric_major() {
4568        let info = "valkey_version:invalid.x.y\n";
4569        let err = parse_valkey_version(info).unwrap_err();
4570        assert!(matches!(err, ServerError::OperationFailed(_)));
4571        assert!(err.to_string().contains("non-numeric major"));
4572    }
4573
4574    #[test]
4575    fn parse_valkey_version_errors_on_non_numeric_minor() {
4576        let info = "valkey_version:7.x.0\n";
4577        let err = parse_valkey_version(info).unwrap_err();
4578        assert!(matches!(err, ServerError::OperationFailed(_)));
4579        assert!(err.to_string().contains("non-numeric minor"));
4580    }
4581
4582    #[test]
4583    fn parse_valkey_version_errors_on_missing_minor() {
4584        // Bare major (no dot) — cannot be compared against a (major, minor)
4585        // floor. Flag as a real parse error.
4586        let info = "valkey_version:7\n";
4587        let err = parse_valkey_version(info).unwrap_err();
4588        assert!(matches!(err, ServerError::OperationFailed(_)));
4589        assert!(err.to_string().contains("missing minor"));
4590    }
4591
4592    #[test]
4593    fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4594        // Simulates cluster-mode INFO response: map of node_addr → body.
4595        // extract_info_bodies must return EVERY node's body so a stale
4596        // pre-upgrade replica cannot hide behind an upgraded primary.
4597        let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4598        let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4599        let map = Value::Map(vec![
4600            (
4601                Value::SimpleString("127.0.0.1:7000".to_string()),
4602                Value::VerbatimString {
4603                    format: ferriskey::value::VerbatimFormat::Text,
4604                    text: body_a.to_string(),
4605                },
4606            ),
4607            (
4608                Value::SimpleString("127.0.0.1:7001".to_string()),
4609                Value::VerbatimString {
4610                    format: ferriskey::value::VerbatimFormat::Text,
4611                    text: body_b.to_string(),
4612                },
4613            ),
4614        ]);
4615        let bodies = extract_info_bodies(&map).unwrap();
4616        assert_eq!(bodies.len(), 2);
4617        assert_eq!(bodies[0], body_a);
4618        assert_eq!(bodies[1], body_b);
4619    }
4620
4621    #[test]
4622    fn extract_info_bodies_handles_simple_string() {
4623        let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4624        let v = Value::SimpleString(body_text.to_string());
4625        let bodies = extract_info_bodies(&v).unwrap();
4626        assert_eq!(bodies, vec![body_text.to_string()]);
4627    }
4628
4629    #[test]
4630    fn extract_info_bodies_rejects_empty_cluster_map() {
4631        let map = Value::Map(vec![]);
4632        let err = extract_info_bodies(&map).unwrap_err();
4633        assert!(matches!(err, ServerError::OperationFailed(_)));
4634        assert!(err.to_string().contains("empty map"));
4635    }
4636
4637    /// End-to-end composition test for the cluster-min fix (issue #84):
4638    /// `extract_info_bodies` → `parse_valkey_version` per node → min-reduce →
4639    /// floor comparison. A mixed-version cluster where one node is 7.1.0 must
4640    /// fail the gate, even if another node is already on 8.0.0 and that
4641    /// node's entry appears first in the map.
4642    #[test]
4643    fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4644        // node1 appears first and is above the floor. Pre-fix behavior
4645        // (first-entry only) would accept. The min across all three nodes is
4646        // (7, 1), below the (7, 2) floor, so the gate must reject.
4647        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4648        let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4649        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4650        let map = Value::Map(vec![
4651            (
4652                Value::SimpleString("node1:6379".to_string()),
4653                Value::VerbatimString {
4654                    format: ferriskey::value::VerbatimFormat::Text,
4655                    text: body_node1.to_string(),
4656                },
4657            ),
4658            (
4659                Value::SimpleString("node2:6379".to_string()),
4660                Value::VerbatimString {
4661                    format: ferriskey::value::VerbatimFormat::Text,
4662                    text: body_node2.to_string(),
4663                },
4664            ),
4665            (
4666                Value::SimpleString("node3:6379".to_string()),
4667                Value::VerbatimString {
4668                    format: ferriskey::value::VerbatimFormat::Text,
4669                    text: body_node3.to_string(),
4670                },
4671            ),
4672        ]);
4673
4674        let bodies = extract_info_bodies(&map).unwrap();
4675        let min = bodies
4676            .iter()
4677            .map(|b| parse_valkey_version(b).unwrap())
4678            .min()
4679            .unwrap();
4680
4681        assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4682        assert!(
4683            min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4684            "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4685        );
4686    }
4687
4688    /// Companion to `parse_valkey_version_min_across_cluster_map_picks_lowest`:
4689    /// when every node is at or above the floor, the min-reduce + gate
4690    /// composition accepts.
4691    #[test]
4692    fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4693        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4694        let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4695        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4696        let map = Value::Map(vec![
4697            (
4698                Value::SimpleString("node1:6379".to_string()),
4699                Value::VerbatimString {
4700                    format: ferriskey::value::VerbatimFormat::Text,
4701                    text: body_node1.to_string(),
4702                },
4703            ),
4704            (
4705                Value::SimpleString("node2:6379".to_string()),
4706                Value::VerbatimString {
4707                    format: ferriskey::value::VerbatimFormat::Text,
4708                    text: body_node2.to_string(),
4709                },
4710            ),
4711            (
4712                Value::SimpleString("node3:6379".to_string()),
4713                Value::VerbatimString {
4714                    format: ferriskey::value::VerbatimFormat::Text,
4715                    text: body_node3.to_string(),
4716                },
4717            ),
4718        ]);
4719
4720        let bodies = extract_info_bodies(&map).unwrap();
4721        let min = bodies
4722            .iter()
4723            .map(|b| parse_valkey_version(b).unwrap())
4724            .min()
4725            .unwrap();
4726
4727        assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4728        assert!(
4729            min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4730            "all-above-floor cluster must pass the gate"
4731        );
4732    }
4733
4734    #[test]
4735    fn valkey_version_too_low_is_not_retryable() {
4736        let err = ServerError::ValkeyVersionTooLow {
4737            detected: "7.0".into(),
4738            required: "7.2".into(),
4739        };
4740        assert!(!err.is_retryable());
4741        assert_eq!(err.backend_kind(), None);
4742    }
4743
4744    #[test]
4745    fn valkey_version_too_low_error_message_includes_both_versions() {
4746        let err = ServerError::ValkeyVersionTooLow {
4747            detected: "7.0".into(),
4748            required: "7.2".into(),
4749        };
4750        let msg = err.to_string();
4751        assert!(msg.contains("7.0"), "detected version in message: {msg}");
4752        assert!(msg.contains("7.2"), "required version in message: {msg}");
4753        assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4754    }
4755}