Skip to main content

ff_server/
server.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9    AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10    CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13    ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14    DeliverSignalArgs, DeliverSignalResult, ExecutionInfo,
15    ListExecutionsPage, PendingWaitpointInfo, ReplayExecutionResult,
16    ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17    RevokeLeaseResult,
18    RotateWaitpointHmacSecretArgs,
19    StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22    self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23    IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26    budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27    PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36/// Upper bound on `member_execution_ids` returned in the
37/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
38/// terminal state (idempotent retry). The first (non-idempotent) cancel call
39/// returns the full list; retries only need a sample.
40const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42/// Re-export of the budget dimension cap.
43///
44/// Defined as the single source of truth in `ff_script::functions::budget` so
45/// the typed FCALL wrappers and the REST boundary cannot silently drift
46/// (PR #106 review). The limit exists to cap FCALL ARGV allocation: both
47/// `create_budget` and `report_usage` build argv whose length is linear in
48/// `dimensions.len()`, so an untrusted caller could otherwise request an
49/// unbounded `Vec` allocation (CodeQL `rust/uncontrolled-allocation-size`,
50/// issue #104).
51pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53/// Validate `create_budget` dimension inputs before building the FCALL argv.
54///
55/// Rejects:
56///   * more than [`MAX_BUDGET_DIMENSIONS`] dimensions (prevents unbounded
57///     `Vec::with_capacity` allocation on attacker-controlled length);
58///   * parallel-array length mismatches between `dimensions`, `hard_limits`,
59///     and `soft_limits` — these are positional inputs the Lua side indexes
60///     by `i = 1..dim_count`, so a mismatch silently corrupts limits rather
61///     than raising.
62fn validate_create_budget_dimensions(
63    dimensions: &[String],
64    hard_limits: &[u64],
65    soft_limits: &[u64],
66) -> Result<(), ServerError> {
67    let dim_count = dimensions.len();
68    if dim_count > MAX_BUDGET_DIMENSIONS {
69        return Err(ServerError::InvalidInput(format!(
70            "too_many_dimensions: limit={}, got={}",
71            MAX_BUDGET_DIMENSIONS, dim_count
72        )));
73    }
74    if hard_limits.len() != dim_count {
75        return Err(ServerError::InvalidInput(format!(
76            "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77            dim_count,
78            hard_limits.len()
79        )));
80    }
81    if soft_limits.len() != dim_count {
82        return Err(ServerError::InvalidInput(format!(
83            "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84            dim_count,
85            soft_limits.len()
86        )));
87    }
88    Ok(())
89}
90
91/// Validate `report_usage` dimension inputs before building the FCALL argv.
92///
93/// Same class of defense as [`validate_create_budget_dimensions`]: caps
94/// argv length and enforces the `dimensions`/`deltas` parallel-array
95/// invariant the Lua side relies on.
96fn validate_report_usage_dimensions(
97    dimensions: &[String],
98    deltas: &[u64],
99) -> Result<(), ServerError> {
100    let dim_count = dimensions.len();
101    if dim_count > MAX_BUDGET_DIMENSIONS {
102        return Err(ServerError::InvalidInput(format!(
103            "too_many_dimensions: limit={}, got={}",
104            MAX_BUDGET_DIMENSIONS, dim_count
105        )));
106    }
107    if deltas.len() != dim_count {
108        return Err(ServerError::InvalidInput(format!(
109            "dimension_delta_array_mismatch: dimensions={} deltas={}",
110            dim_count,
111            deltas.len()
112        )));
113    }
114    Ok(())
115}
116
117/// FlowFabric server — connects everything together.
118///
119/// Manages the Valkey connection, Lua library loading, background scanners,
120/// and provides a minimal API for Phase 1.
121pub struct Server {
122    client: Client,
123    /// Dedicated Valkey connection used EXCLUSIVELY for stream-op calls:
124    /// `xread_block` tails AND `ff_read_attempt_stream` range reads.
125    /// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
126    /// processes commands FIFO on it.
127    ///
128    /// Two head-of-line risks motivate the split from the main client:
129    ///
130    /// * **Blocking**: `XREAD BLOCK 30_000` holds the read side until a
131    ///   new entry arrives or `block_ms` elapses.
132    /// * **Large replies**: `XRANGE … COUNT 10_000` with ~64 KB per
133    ///   frame returns a multi-MB reply serialized on one connection.
134    ///
135    /// Sharing either load with the main client would starve every other
136    /// FCALL (create_execution, claim, rotate_waitpoint_secret,
137    /// budget/quota, admin endpoints) AND every engine scanner.
138    ///
139    /// Kept separate from `client` and from the `Engine` scanner client so
140    /// tail latency cannot couple to foreground API latency or background
141    /// scanner cadence. See RFC-006 Impl Notes for the cascading-failure
142    /// rationale.
143    tail_client: Client,
144    /// Bounds concurrent stream-op calls server-wide — read AND tail
145    /// combined. Each caller acquires one permit for the duration of its
146    /// Valkey round-trip(s); contention surfaces as HTTP 429 at the REST
147    /// boundary, not as a silent queue on the stream connection. Default
148    /// size is `FF_MAX_CONCURRENT_STREAM_OPS` (64; legacy env
149    /// `FF_MAX_CONCURRENT_TAIL` accepted for one release).
150    ///
151    /// Read and tail share the same pool deliberately: they share the
152    /// `tail_client`, so fairness accounting must be unified or a flood
153    /// of one can starve the other. The semaphore is also `close()`d on
154    /// shutdown so no new stream ops can start while existing ones drain
155    /// (see `Server::shutdown`).
156    stream_semaphore: Arc<tokio::sync::Semaphore>,
157    /// Serializes `XREAD BLOCK` calls against `tail_client`.
158    ///
159    /// `ferriskey::Client` is a pipelined multiplexed connection — Valkey
160    /// processes commands FIFO on one socket. `XREAD BLOCK` holds the
161    /// connection's read side for the full `block_ms`, so two parallel
162    /// BLOCKs sent down the same mux serialize: the second waits for the
163    /// first to return before its own BLOCK even begins at the server.
164    /// Meanwhile ferriskey's per-call `request_timeout` (auto-extended to
165    /// `block_ms + 500ms`) starts at future-poll on the CLIENT side, so
166    /// the second call's timeout fires before its turn at the server —
167    /// spurious `timed_out` errors under concurrent tail load.
168    ///
169    /// Explicit serialization around `xread_block` removes the
170    /// silent-failure mode: concurrent tails queue on this Mutex (inside
171    /// an already-acquired semaphore permit), then dispatch one at a
172    /// time with their full `block_ms` budget intact. The semaphore
173    /// ceiling (`max_concurrent_stream_ops`) effectively becomes queue
174    /// depth; throughput on the tail client is 1 BLOCK at a time.
175    ///
176    /// V2 upgrade: a pool of N dedicated `ferriskey::Client` connections
177    /// replacing the single `tail_client` + this Mutex. Deferred; the
178    /// Mutex here is correct v1 behavior.
179    ///
180    /// XRANGE reads (`read_attempt_stream`) are NOT gated by this Mutex —
181    /// XRANGE is non-blocking at the server, so pipelined XRANGEs on one
182    /// mux complete in microseconds each and don't trigger the same
183    /// client-side timeout race. Keeping reads unserialized preserves
184    /// read throughput.
185    xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186    /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
187    /// operators rotate ~monthly and can afford to serialize; concurrent
188    /// rotate requests are an attack or misbehaving script. Holding the
189    /// permit also guards against interleaved partial rotations on the
190    /// Server side of the per-partition locks, surfacing contention as
191    /// HTTP 429 instead of silently queueing and blowing past the 120s
192    /// HTTP timeout. See `rotate_waitpoint_secret` handler.
193    admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194    engine: Engine,
195    config: ServerConfig,
196    /// Long-lived scheduler instance. Held on the server (not rebuilt per
197    /// claim call) so its rotation cursor can advance across calls — a
198    /// fresh-per-call scheduler would reset the cursor on every tick,
199    /// defeating the fairness property (RFC-009 §scan rotation).
200    scheduler: Arc<ff_scheduler::Scheduler>,
201    /// Background tasks spawned by async handlers (e.g. cancel_flow member
202    /// dispatch). Drained on shutdown with a bounded timeout.
203    background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204    /// PR-94: observability registry. Always present; the no-op shim
205    /// takes zero runtime cost when the `observability` feature is
206    /// off, and the real OTEL-backed registry is passed in via
207    /// [`Server::start_with_metrics`] when on. Same `Arc` is shared
208    /// with [`Engine::start_with_metrics`] and
209    /// [`ff_scheduler::Scheduler::with_metrics`] so a single scrape
210    /// sees everything the process produces.
211    metrics: Arc<ff_observability::Metrics>,
212}
213
214/// Server error type.
215#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217    /// Backend transport error. Previously wrapped `ferriskey::Error`
218    /// directly (#88); now carries a backend-agnostic
219    /// [`ff_core::BackendError`] so consumers match on
220    /// [`ff_core::BackendErrorKind`] instead of ferriskey's native
221    /// taxonomy. The ferriskey → [`ff_core::BackendError`] mapping
222    /// lives in `ff_backend_valkey::backend_error_from_ferriskey`.
223    #[error("backend: {0}")]
224    Backend(#[from] ff_core::BackendError),
225    /// Backend error with additional context. Previously
226    /// `ValkeyContext { source: ferriskey::Error }` (#88).
227    #[error("backend ({context}): {source}")]
228    BackendContext {
229        #[source]
230        source: ff_core::BackendError,
231        context: String,
232    },
233    #[error("config: {0}")]
234    Config(#[from] crate::config::ConfigError),
235    #[error("library load: {0}")]
236    LibraryLoad(#[from] ff_script::loader::LoadError),
237    #[error("partition mismatch: {0}")]
238    PartitionMismatch(String),
239    #[error("not found: {0}")]
240    NotFound(String),
241    #[error("invalid input: {0}")]
242    InvalidInput(String),
243    #[error("operation failed: {0}")]
244    OperationFailed(String),
245    #[error("script: {0}")]
246    Script(String),
247    /// Server-wide concurrency limit reached on a labelled pool. Surfaces
248    /// as HTTP 429 at the REST boundary so load balancers and clients can
249    /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
250    /// etc.) identifies WHICH pool is exhausted so operators aren't
251    /// misled by a single "tail unavailable" string when the real fault
252    /// is rotation contention.
253    ///
254    /// Fields: (source_label, max_permits).
255    #[error("too many concurrent {0} calls (max: {1})")]
256    ConcurrencyLimitExceeded(&'static str, u32),
257    /// Detected Valkey version is below the RFC-011 §13 minimum. The engine
258    /// depends on Valkey Functions (stabilized in 7.2), RESP3 (7.2+), and
259    /// hash-tag routing; older versions do not implement the required
260    /// primitives. Operator action: upgrade Valkey.
261    #[error(
262        "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
263    )]
264    ValkeyVersionTooLow {
265        detected: String,
266        required: String,
267    },
268}
269
270/// Lift a native `ferriskey::Error` into [`ServerError::Backend`] via
271/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
272/// `?`-propagation ergonomic at ferriskey call sites while the
273/// public variant stays backend-agnostic.
274impl From<ferriskey::Error> for ServerError {
275    fn from(err: ferriskey::Error) -> Self {
276        Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
277    }
278}
279
280/// Build a [`ServerError::BackendContext`] from a native
281/// `ferriskey::Error` and a call-site label (#88).
282pub(crate) fn backend_context(
283    err: ferriskey::Error,
284    context: impl Into<String>,
285) -> ServerError {
286    ServerError::BackendContext {
287        source: ff_backend_valkey::backend_error_from_ferriskey(&err),
288        context: context.into(),
289    }
290}
291
292impl ServerError {
293    /// Returns the classified [`ff_core::BackendErrorKind`] if this
294    /// error carries a backend transport fault. Covers direct
295    /// Backend variants and library-load failures.
296    ///
297    /// Renamed from `valkey_kind` in #88 — the previous return type
298    /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
299    /// consumer doing retry/error classification.
300    pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
301        match self {
302            Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
303            Self::LibraryLoad(e) => e
304                .valkey_kind()
305                .map(ff_backend_valkey::classify_ferriskey_kind),
306            _ => None,
307        }
308    }
309
310    /// Whether this error is safely retryable by a caller. For
311    /// backend transport variants, delegates to
312    /// [`ff_core::BackendErrorKind::is_retryable`]. Business-logic
313    /// rejections (NotFound, InvalidInput, OperationFailed, Script,
314    /// Config, PartitionMismatch) return false — those won't change
315    /// on retry.
316    pub fn is_retryable(&self) -> bool {
317        match self {
318            Self::Backend(be) | Self::BackendContext { source: be, .. } => {
319                be.kind().is_retryable()
320            }
321            Self::LibraryLoad(load_err) => load_err
322                .valkey_kind()
323                .map(is_retryable_kind)
324                .unwrap_or(false),
325            Self::Config(_)
326            | Self::PartitionMismatch(_)
327            | Self::NotFound(_)
328            | Self::InvalidInput(_)
329            | Self::OperationFailed(_)
330            | Self::Script(_) => false,
331            // Back off and retry — the bound is a server-side permit pool,
332            // so the retry will succeed once a permit frees up. Applies
333            // equally to stream ops, admin rotate, etc.
334            Self::ConcurrencyLimitExceeded(_, _) => true,
335            // Operator must upgrade Valkey; a retry at the caller won't help.
336            Self::ValkeyVersionTooLow { .. } => false,
337        }
338    }
339}
340
341impl Server {
342    /// Start the FlowFabric server.
343    ///
344    /// Boot sequence:
345    /// 1. Connect to Valkey
346    /// 2. Validate or create partition config key
347    /// 3. Load the FlowFabric Lua library
348    /// 4. Start engine (14 background scanners)
349    pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
350        Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
351    }
352
353    /// PR-94: boot the server with a shared observability registry.
354    ///
355    /// Scanner cycle + scheduler metrics record into this registry;
356    /// `main.rs` threads the same handle into the router so `/metrics`
357    /// exposes what the engine produces. The no-arg [`Server::start`]
358    /// forwards here with a fresh `Metrics::new()` — under the default
359    /// build that's the shim, under `observability` it's a real
360    /// registry not shared with any HTTP route (useful for tests
361    /// exercising the engine in isolation).
362    pub async fn start_with_metrics(
363        config: ServerConfig,
364        metrics: Arc<ff_observability::Metrics>,
365    ) -> Result<Self, ServerError> {
366        // Step 1: Connect to Valkey via ClientBuilder
367        tracing::info!(
368            host = %config.host, port = config.port,
369            tls = config.tls, cluster = config.cluster,
370            "connecting to Valkey"
371        );
372        let mut builder = ClientBuilder::new()
373            .host(&config.host, config.port)
374            .connect_timeout(Duration::from_secs(10))
375            .request_timeout(Duration::from_millis(5000));
376        if config.tls {
377            builder = builder.tls();
378        }
379        if config.cluster {
380            builder = builder.cluster();
381        }
382        let client = builder
383            .build()
384            .await
385            .map_err(|e| crate::server::backend_context(e, "connect"))?;
386
387        // Verify connectivity
388        let pong: String = client
389            .cmd("PING")
390            .execute()
391            .await
392            .map_err(|e| crate::server::backend_context(e, "PING"))?;
393        if pong != "PONG" {
394            return Err(ServerError::OperationFailed(format!(
395                "unexpected PING response: {pong}"
396            )));
397        }
398        tracing::info!("Valkey connection established");
399
400        // Step 1b: Verify Valkey version meets the RFC-011 §13 minimum (8.0).
401        // Tolerates a rolling upgrade via a 60s exponential-backoff budget
402        // per RFC-011 §9.17 — transient INFO errors during a node restart
403        // don't trip the check until the whole budget is exhausted.
404        verify_valkey_version(&client).await?;
405
406        // Step 2: Validate or create partition config
407        validate_or_create_partition_config(&client, &config.partition_config).await?;
408
409        // Step 2b: Install waitpoint HMAC secret into every execution partition
410        // (RFC-004 §Waitpoint Security). Fail-fast: if any partition fails,
411        // the server refuses to start — a partial install would silently
412        // reject signal deliveries on half the partitions.
413        initialize_waitpoint_hmac_secret(
414            &client,
415            &config.partition_config,
416            &config.waitpoint_hmac_secret,
417        )
418        .await?;
419
420        // Step 3: Load Lua library (skippable for tests where fixture already loaded)
421        if !config.skip_library_load {
422            tracing::info!("loading flowfabric Lua library");
423            ff_script::loader::ensure_library(&client)
424                .await
425                .map_err(ServerError::LibraryLoad)?;
426        } else {
427            tracing::info!("skipping library load (skip_library_load=true)");
428        }
429
430        // Step 4: Start engine with scanners
431        // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
432        let engine_cfg = ff_engine::EngineConfig {
433            partition_config: config.partition_config,
434            lanes: config.lanes.clone(),
435            lease_expiry_interval: config.engine_config.lease_expiry_interval,
436            delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
437            index_reconciler_interval: config.engine_config.index_reconciler_interval,
438            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
439            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
440            pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
441            retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
442            budget_reset_interval: config.engine_config.budget_reset_interval,
443            budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
444            quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
445            unblock_interval: config.engine_config.unblock_interval,
446            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
447            flow_projector_interval: config.engine_config.flow_projector_interval,
448            execution_deadline_interval: config.engine_config.execution_deadline_interval,
449            cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
450            scanner_filter: config.engine_config.scanner_filter.clone(),
451        };
452        // Engine scanners keep running on the MAIN `client` mux — NOT on
453        // `tail_client`. Scanner cadence is foreground-latency-coupled by
454        // design (an incident that blocks all FCALLs should also visibly
455        // block scanners), and keeping scanners off the tail client means a
456        // long-poll tail can never starve lease-expiry, retention-trim,
457        // etc. Do not change this without revisiting RFC-006 Impl Notes.
458        // Build the Valkey completion backend (issue #90) and subscribe.
459        // This replaces the pre-#90 `CompletionListenerConfig` path:
460        // the wire subscription now lives in ff-backend-valkey, the
461        // engine just consumes the resulting stream.
462        let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
463            config.host.clone(),
464            config.port,
465        );
466        valkey_conn.tls = config.tls;
467        valkey_conn.cluster = config.cluster;
468        let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
469            client.clone(),
470            config.partition_config,
471            valkey_conn,
472        );
473        let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
474            .await
475            .map_err(|e| ServerError::OperationFailed(format!(
476                "subscribe_completions: {e}"
477            )))?;
478
479        let engine = Engine::start_with_completions(
480            engine_cfg,
481            client.clone(),
482            metrics.clone(),
483            completion_stream,
484        );
485
486        // Dedicated tail client. Built AFTER library load + HMAC install
487        // because those steps use the main client; tail client only runs
488        // XREAD/XREAD BLOCK + HMGET on stream_meta, so it never needs the
489        // library loaded — but we build it on the same host/port/TLS
490        // options so network reachability is identical.
491        tracing::info!("opening dedicated tail connection");
492        let mut tail_builder = ClientBuilder::new()
493            .host(&config.host, config.port)
494            .connect_timeout(Duration::from_secs(10))
495            // `request_timeout` is ignored for XREAD BLOCK (ferriskey
496            // auto-extends to `block_ms + 500ms` for blocking commands),
497            // but is used for the companion HMGET — 5s matches main.
498            .request_timeout(Duration::from_millis(5000));
499        if config.tls {
500            tail_builder = tail_builder.tls();
501        }
502        if config.cluster {
503            tail_builder = tail_builder.cluster();
504        }
505        let tail_client = tail_builder
506            .build()
507            .await
508            .map_err(|e| crate::server::backend_context(e, "connect (tail)"))?;
509        let tail_pong: String = tail_client
510            .cmd("PING")
511            .execute()
512            .await
513            .map_err(|e| crate::server::backend_context(e, "PING (tail)"))?;
514        if tail_pong != "PONG" {
515            return Err(ServerError::OperationFailed(format!(
516                "tail client unexpected PING response: {tail_pong}"
517            )));
518        }
519
520        let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
521            config.max_concurrent_stream_ops as usize,
522        ));
523        let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
524        tracing::info!(
525            max_concurrent_stream_ops = config.max_concurrent_stream_ops,
526            "stream-op client ready (read + tail share the semaphore; \
527             tails additionally serialize via xread_block_lock)"
528        );
529
530        // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
531        // rotation, etc.) are only protected by the global Bearer
532        // middleware in api.rs — which is only installed when
533        // config.api_token is set. Without FF_API_TOKEN, a public
534        // deployment exposes secret rotation to anyone that can reach
535        // the listen_addr. Warn loudly so operators can't miss it; we
536        // don't fail-start because single-tenant dev uses this path.
537        if config.api_token.is_none() {
538            tracing::warn!(
539                listen_addr = %config.listen_addr,
540                "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
541                 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
542                 FF_API_TOKEN for any deployment reachable from untrusted \
543                 networks."
544            );
545            // Explicit callout for the credential-bearing read endpoints.
546            // Auditors grep for these on a per-endpoint basis; lumping
547            // into the admin warning alone hides the fact that
548            // /pending-waitpoints returns HMAC tokens and /result
549            // returns completion payload bytes.
550            tracing::warn!(
551                listen_addr = %config.listen_addr,
552                "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
553                 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
554                 and GET /v1/executions/{{id}}/result returns raw completion payload \
555                 bytes (may contain PII). Both are UNAUTHENTICATED in this \
556                 configuration."
557            );
558        }
559
560        // Partition counts — post-RFC-011 there are three physical families.
561        // Execution keys co-locate with their parent flow's partition (§2 of
562        // the RFC), so `num_flow_partitions` governs both exec and flow
563        // routing; no separate `num_execution_partitions` count exists.
564        tracing::info!(
565            flow_partitions = config.partition_config.num_flow_partitions,
566            budget_partitions = config.partition_config.num_budget_partitions,
567            quota_partitions = config.partition_config.num_quota_partitions,
568            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
569            listen_addr = %config.listen_addr,
570            "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
571            config.partition_config.num_flow_partitions,
572            config.partition_config.num_budget_partitions,
573            config.partition_config.num_quota_partitions,
574        );
575
576        let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
577            client.clone(),
578            config.partition_config,
579            metrics.clone(),
580        ));
581
582        Ok(Self {
583            client,
584            tail_client,
585            stream_semaphore,
586            xread_block_lock,
587            // Single-permit semaphore: only one rotate-waitpoint-secret can
588            // be mid-flight server-wide. Attackers or misbehaving scripts
589            // firing parallel rotations fail fast with 429 instead of
590            // queueing HTTP handlers.
591            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
592            engine,
593            config,
594            scheduler,
595            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
596            metrics,
597        })
598    }
599
600    /// PR-94: access the shared observability registry.
601    pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
602        &self.metrics
603    }
604
605    /// Get a reference to the ferriskey client.
606    pub fn client(&self) -> &Client {
607        &self.client
608    }
609
610    /// Execute an FCALL with automatic Lua library reload on "function not loaded".
611    ///
612    /// After a Valkey failover the new primary may not have the Lua library
613    /// loaded (replication lag or cold replica). This wrapper detects that
614    /// condition, reloads the library via `ff_script::loader::ensure_library`,
615    /// and retries the FCALL once.
616    async fn fcall_with_reload(
617        &self,
618        function: &str,
619        keys: &[&str],
620        args: &[&str],
621    ) -> Result<Value, ServerError> {
622        fcall_with_reload_on_client(&self.client, function, keys, args).await
623    }
624
625    /// Get the server config.
626    pub fn config(&self) -> &ServerConfig {
627        &self.config
628    }
629
630    /// Get the partition config.
631    pub fn partition_config(&self) -> &PartitionConfig {
632        &self.config.partition_config
633    }
634
635    // ── Minimal Phase 1 API ──
636
637    /// Create a new execution.
638    ///
639    /// Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.
640    pub async fn create_execution(
641        &self,
642        args: &CreateExecutionArgs,
643    ) -> Result<CreateExecutionResult, ServerError> {
644        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
645        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
646        let idx = IndexKeys::new(&partition);
647
648        let lane = &args.lane_id;
649        let tag = partition.hash_tag();
650        let idem_key = match &args.idempotency_key {
651            Some(k) if !k.is_empty() => {
652                keys::idempotency_key(&tag, args.namespace.as_str(), k)
653            }
654            _ => ctx.noop(),
655        };
656
657        let delay_str = args
658            .delay_until
659            .map(|d| d.0.to_string())
660            .unwrap_or_default();
661        let is_delayed = !delay_str.is_empty();
662
663        // KEYS (8) must match lua/execution.lua ff_create_execution positional order:
664        //   [1] exec_core, [2] payload, [3] policy, [4] tags,
665        //   [5] scheduling_zset (eligible OR delayed — ONE key),
666        //   [6] idem_key, [7] execution_deadline, [8] all_executions
667        let scheduling_zset = if is_delayed {
668            idx.lane_delayed(lane)
669        } else {
670            idx.lane_eligible(lane)
671        };
672
673        let fcall_keys: Vec<String> = vec![
674            ctx.core(),                  // 1
675            ctx.payload(),               // 2
676            ctx.policy(),                // 3
677            ctx.tags(),                  // 4
678            scheduling_zset,             // 5
679            idem_key,                    // 6
680            idx.execution_deadline(),    // 7
681            idx.all_executions(),        // 8
682        ];
683
684        let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
685
686        // ARGV (13) must match lua/execution.lua ff_create_execution positional order:
687        //   [1] execution_id, [2] namespace, [3] lane_id, [4] execution_kind,
688        //   [5] priority, [6] creator_identity, [7] policy_json,
689        //   [8] input_payload, [9] delay_until, [10] dedup_ttl_ms,
690        //   [11] tags_json, [12] execution_deadline_at, [13] partition_id
691        let fcall_args: Vec<String> = vec![
692            args.execution_id.to_string(),           // 1
693            args.namespace.to_string(),              // 2
694            args.lane_id.to_string(),                // 3
695            args.execution_kind.clone(),             // 4
696            args.priority.to_string(),               // 5
697            args.creator_identity.clone(),           // 6
698            args.policy.as_ref()
699                .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
700                .unwrap_or_else(|| "{}".to_owned()), // 7
701            String::from_utf8_lossy(&args.input_payload).into_owned(), // 8
702            delay_str,                               // 9
703            args.idempotency_key.as_ref()
704                .map(|_| "86400000".to_string())
705                .unwrap_or_default(),                // 10 dedup_ttl_ms
706            tags_json,                               // 11
707            args.execution_deadline_at
708                .map(|d| d.to_string())
709                .unwrap_or_default(),                // 12 execution_deadline_at
710            args.partition_id.to_string(),           // 13
711        ];
712
713        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
714        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
715
716        let raw: Value = self
717            .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
718            .await?;
719
720        parse_create_result(&raw, &args.execution_id)
721    }
722
723    /// Cancel an execution.
724    pub async fn cancel_execution(
725        &self,
726        args: &CancelExecutionArgs,
727    ) -> Result<CancelExecutionResult, ServerError> {
728        let raw = self
729            .fcall_cancel_execution_with_reload(args)
730            .await?;
731        parse_cancel_result(&raw, &args.execution_id)
732    }
733
734    /// Build KEYS/ARGV for `ff_cancel_execution` and invoke via the server's
735    /// reload-capable FCALL. Shared by the inline method and background
736    /// cancel_flow dispatch via [`Self::fcall_cancel_execution_with_reload`].
737    async fn fcall_cancel_execution_with_reload(
738        &self,
739        args: &CancelExecutionArgs,
740    ) -> Result<Value, ServerError> {
741        let (keys, argv) = build_cancel_execution_fcall(
742            &self.client,
743            &self.config.partition_config,
744            args,
745        )
746        .await?;
747        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
748        let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
749        self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
750    }
751
752    /// Get the public state of an execution.
753    ///
754    /// Reads `public_state` from the exec_core hash. Returns the parsed
755    /// PublicState enum. If the execution is not found, returns an error.
756    pub async fn get_execution_state(
757        &self,
758        execution_id: &ExecutionId,
759    ) -> Result<PublicState, ServerError> {
760        let partition = execution_partition(execution_id, &self.config.partition_config);
761        let ctx = ExecKeyContext::new(&partition, execution_id);
762
763        let state_str: Option<String> = self
764            .client
765            .hget(&ctx.core(), "public_state")
766            .await
767            .map_err(|e| crate::server::backend_context(e, "HGET public_state"))?;
768
769        match state_str {
770            Some(s) => {
771                let quoted = format!("\"{s}\"");
772                serde_json::from_str(&quoted).map_err(|e| {
773                    ServerError::Script(format!(
774                        "invalid public_state '{s}' for {execution_id}: {e}"
775                    ))
776                })
777            }
778            None => Err(ServerError::NotFound(format!(
779                "execution not found: {execution_id}"
780            ))),
781        }
782    }
783
784    /// Read the raw result payload written by `ff_complete_execution`.
785    ///
786    /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
787    /// No FCALL — this is a direct GET; returns `Ok(None)` when the
788    /// execution is missing, not yet complete, or (in a future
789    /// retention-policy world) when the result was trimmed.
790    ///
791    /// # Contract vs `get_execution_state`
792    ///
793    /// `get_execution_state` is the authoritative completion signal. If
794    /// a caller observes `state == completed` but `get_execution_result`
795    /// returns `None`, the result bytes are unavailable — not a caller
796    /// bug and not a server bug, just the retention policy trimming the
797    /// blob. V1 sets no retention, so callers on v1 can treat
798    /// `state == completed` + `Ok(None)` as a server bug.
799    ///
800    /// # Ordering
801    ///
802    /// Callers MUST wait for `state == completed` before calling this
803    /// method; polls issued before the state transition may hit a
804    /// narrow window where the completion Lua has written
805    /// `public_state = completed` but the `result` key SET is still
806    /// on-wire. The current Lua `ff_complete_execution` writes both in
807    /// the same atomic script, so the window is effectively zero for
808    /// direct callers — but retries via `ff_replay_execution` open it
809    /// briefly.
810    pub async fn get_execution_result(
811        &self,
812        execution_id: &ExecutionId,
813    ) -> Result<Option<Vec<u8>>, ServerError> {
814        let partition = execution_partition(execution_id, &self.config.partition_config);
815        let ctx = ExecKeyContext::new(&partition, execution_id);
816
817        // Binary-safe read. Decoding into `String` would reject any
818        // non-UTF-8 payload (bincode-encoded f32 vectors, compressed
819        // artifacts, arbitrary binary stages) with a ferriskey decode
820        // error that surfaces as HTTP 500. The endpoint response is
821        // already `application/octet-stream`; `Vec<u8>` has a
822        // ferriskey-specialized `FromValue` impl that preserves raw
823        // bytes without UTF-8 validation (see ferriskey/src/value.rs:
824        // `from_byte_vec`). Nil → None via the blanket
825        // `Option<T: FromValue>` wrapper.
826        let payload: Option<Vec<u8>> = self
827            .client
828            .cmd("GET")
829            .arg(ctx.result())
830            .execute()
831            .await
832            .map_err(|e| crate::server::backend_context(e, "GET exec result"))?;
833        Ok(payload)
834    }
835
836    /// List the active (`pending` or `active`) waitpoints for an execution.
837    ///
838    /// Returns one [`PendingWaitpointInfo`] per open waitpoint, including the
839    /// HMAC-SHA1 `waitpoint_token` needed to deliver authenticated signals.
840    /// `closed` waitpoints are elided — callers looking at history should
841    /// read the stream or lease history instead.
842    ///
843    /// Read plan: SSCAN `ctx.waitpoints()` with `COUNT 100` (bounded
844    /// page size, matching the unblock / flow-projector / budget-
845    /// reconciler convention) to enumerate waitpoint IDs, then TWO
846    /// pipelines:
847    ///
848    /// * Pass 1 — single round-trip containing, per waitpoint, one
849    ///   HMGET over the documented field set + one HGET for
850    ///   `total_matchers` on the condition hash.
851    /// * Pass 2 (conditional) — single round-trip containing an
852    ///   HMGET per waitpoint with `total_matchers > 0` to read the
853    ///   `matcher:N:name` fields.
854    ///
855    /// No FCALL — this is a read-only view built from already-
856    /// persisted state, so skipping Lua keeps the Valkey single-
857    /// writer path uncontended. HMGET (vs HGETALL) bounds the per-
858    /// waitpoint read to the documented field set and defends
859    /// against a poisoned waitpoint hash with unbounded extra fields
860    /// accumulating response memory.
861    ///
862    /// # Empty result semantics (TOCTOU)
863    ///
864    /// An empty `Vec` is returned in three cases:
865    ///
866    /// 1. The execution exists but has never suspended.
867    /// 2. All existing waitpoints are `closed` (already resolved).
868    /// 3. A narrow teardown race: `SSCAN` read the waitpoint set after
869    ///    a concurrent `ff_close_waitpoint` or execution-cleanup script
870    ///    deleted the waitpoint hashes but before it SREM'd the set
871    ///    members. Each HMGET returns all-None and we skip.
872    ///
873    /// Callers that get an unexpected empty list should cross-check
874    /// execution state (`get_execution_state`) to distinguish "pipeline
875    /// moved past suspended" from "nothing to review yet".
876    ///
877    /// A waitpoint hash that's present but missing its `waitpoint_token`
878    /// field is similarly elided and a server-side WARN is emitted —
879    /// this indicates storage corruption (a write that half-populated
880    /// the hash) and operators should investigate.
881    pub async fn list_pending_waitpoints(
882        &self,
883        execution_id: &ExecutionId,
884    ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
885        let partition = execution_partition(execution_id, &self.config.partition_config);
886        let ctx = ExecKeyContext::new(&partition, execution_id);
887
888        let core_exists: bool = self
889            .client
890            .cmd("EXISTS")
891            .arg(ctx.core())
892            .execute()
893            .await
894            .map_err(|e| crate::server::backend_context(e, "EXISTS exec_core (pending waitpoints)"))?;
895        if !core_exists {
896            return Err(ServerError::NotFound(format!(
897                "execution not found: {execution_id}"
898            )));
899        }
900
901        // SSCAN the waitpoints set in bounded pages. SMEMBERS would
902        // load the entire set in one reply — fine for today's
903        // single-waitpoint executions, but the codebase convention
904        // (budget_reconciler, flow_projector, unblock scanner) is SSCAN
905        // COUNT 100 so response size per round-trip is bounded as the
906        // set grows. Match the precedent instead of carving a new
907        // pattern here.
908        const WAITPOINTS_SSCAN_COUNT: usize = 100;
909        let waitpoints_key = ctx.waitpoints();
910        let mut wp_ids_raw: Vec<String> = Vec::new();
911        let mut cursor: String = "0".to_owned();
912        loop {
913            let reply: (String, Vec<String>) = self
914                .client
915                .cmd("SSCAN")
916                .arg(&waitpoints_key)
917                .arg(&cursor)
918                .arg("COUNT")
919                .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
920                .execute()
921                .await
922                .map_err(|e| crate::server::backend_context(e, "SSCAN waitpoints"))?;
923            cursor = reply.0;
924            wp_ids_raw.extend(reply.1);
925            if cursor == "0" {
926                break;
927            }
928        }
929
930        // SSCAN may observe a member more than once across iterations —
931        // that is documented Valkey behavior, not a bug (see
932        // https://valkey.io/commands/sscan/). Dedup in-place before
933        // building the typed id list so the HTTP response and the
934        // downstream pipelined HMGETs don't duplicate work. Sort +
935        // dedup keeps this O(n log n) without a BTreeSet allocation;
936        // typical n is 1 (the single waitpoint on a suspended exec).
937        wp_ids_raw.sort_unstable();
938        wp_ids_raw.dedup();
939
940        if wp_ids_raw.is_empty() {
941            return Ok(Vec::new());
942        }
943
944        // Parse + filter out malformed waitpoint_ids up-front so the
945        // downstream pipeline indexes stay aligned to the Vec of
946        // typed ids.
947        let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
948        for raw in &wp_ids_raw {
949            match WaitpointId::parse(raw) {
950                Ok(id) => wp_ids.push(id),
951                Err(e) => tracing::warn!(
952                    raw_id = %raw,
953                    error = %e,
954                    execution_id = %execution_id,
955                    "list_pending_waitpoints: skipping unparseable waitpoint_id"
956                ),
957            }
958        }
959        if wp_ids.is_empty() {
960            return Ok(Vec::new());
961        }
962
963        // Bounded HMGET field set — these are the six hash fields that
964        // surface in `PendingWaitpointInfo`. Fixed-size indexing into
965        // the response below tracks this order.
966        const WP_FIELDS: [&str; 6] = [
967            "state",
968            "waitpoint_key",
969            "waitpoint_token",
970            "created_at",
971            "activated_at",
972            "expires_at",
973        ];
974
975        // Pass 1 pipeline: waitpoint HMGET + condition HGET
976        // total_matchers, one of each per waitpoint, in a SINGLE
977        // round-trip. Sequential HMGETs were an N-round-trip latency
978        // floor on flows with fan-out waitpoints.
979        let mut pass1 = self.client.pipeline();
980        let mut wp_slots = Vec::with_capacity(wp_ids.len());
981        let mut cond_slots = Vec::with_capacity(wp_ids.len());
982        for wp_id in &wp_ids {
983            let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
984            cmd = cmd.arg(ctx.waitpoint(wp_id));
985            for f in WP_FIELDS {
986                cmd = cmd.arg(f);
987            }
988            wp_slots.push(cmd.finish());
989
990            cond_slots.push(
991                pass1
992                    .cmd::<Option<String>>("HGET")
993                    .arg(ctx.waitpoint_condition(wp_id))
994                    .arg("total_matchers")
995                    .finish(),
996            );
997        }
998        pass1
999            .execute()
1000            .await
1001            .map_err(|e| crate::server::backend_context(e, "pipeline HMGET waitpoints + HGET total_matchers"))?;
1002
1003        // Collect pass-1 results + queue pass-2 HMGETs for condition
1004        // matcher names on waitpoints that are actionable (state in
1005        // pending/active, non-empty token, total_matchers > 0). PipeSlot
1006        // values are owning, so iterate all three ordered zip'd iters
1007        // and consume them together.
1008        struct Kept {
1009            wp_id: WaitpointId,
1010            wp_fields: Vec<Option<String>>,
1011            total_matchers: usize,
1012        }
1013        let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
1014        for ((wp_id, wp_slot), cond_slot) in wp_ids
1015            .iter()
1016            .zip(wp_slots)
1017            .zip(cond_slots)
1018        {
1019            let wp_fields: Vec<Option<String>> =
1020                wp_slot.value().map_err(|e| crate::server::backend_context(e, format!("pipeline slot HMGET waitpoint {wp_id}")))?;
1021
1022            // Waitpoint hash may have been GC'd between the SSCAN and
1023            // this HMGET (rotation / cleanup race). Skip silently.
1024            if wp_fields.iter().all(Option::is_none) {
1025                // Still consume the cond_slot to free its buffer.
1026                let _ = cond_slot.value();
1027                continue;
1028            }
1029            let state_ref = wp_fields
1030                .first()
1031                .and_then(|v| v.as_deref())
1032                .unwrap_or("");
1033            if state_ref != "pending" && state_ref != "active" {
1034                let _ = cond_slot.value();
1035                continue;
1036            }
1037            let token_ref = wp_fields
1038                .get(2)
1039                .and_then(|v| v.as_deref())
1040                .unwrap_or("");
1041            if token_ref.is_empty() {
1042                let _ = cond_slot.value();
1043                tracing::warn!(
1044                    waitpoint_id = %wp_id,
1045                    execution_id = %execution_id,
1046                    waitpoint_hash_key = %ctx.waitpoint(wp_id),
1047                    state = %state_ref,
1048                    "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1049                     field is empty — likely storage corruption (half-populated write, \
1050                     operator edit, or interrupted script). Skipping this entry in the \
1051                     response. HGETALL the waitpoint_hash_key to inspect."
1052                );
1053                continue;
1054            }
1055
1056            let total_matchers = cond_slot
1057                .value()
1058                .map_err(|e| crate::server::backend_context(e, format!("pipeline slot HGET total_matchers {wp_id}")))?
1059                .and_then(|s| s.parse::<usize>().ok())
1060                .unwrap_or(0);
1061
1062            kept.push(Kept {
1063                wp_id: wp_id.clone(),
1064                wp_fields,
1065                total_matchers,
1066            });
1067        }
1068
1069        if kept.is_empty() {
1070            return Ok(Vec::new());
1071        }
1072
1073        // Pass 2 pipeline: matcher-name HMGETs for every kept waitpoint
1074        // with total_matchers > 0. Single round-trip regardless of
1075        // per-waitpoint matcher count. Waitpoints with total_matchers==0
1076        // (wildcard) skip the HMGET entirely.
1077        let mut pass2 = self.client.pipeline();
1078        let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1079        let mut pass2_needed = false;
1080        for k in &kept {
1081            if k.total_matchers == 0 {
1082                matcher_slots.push(None);
1083                continue;
1084            }
1085            pass2_needed = true;
1086            let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1087            cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1088            for i in 0..k.total_matchers {
1089                cmd = cmd.arg(format!("matcher:{i}:name"));
1090            }
1091            matcher_slots.push(Some(cmd.finish()));
1092        }
1093        if pass2_needed {
1094            pass2.execute().await.map_err(|e| crate::server::backend_context(e, "pipeline HMGET wp_condition matchers"))?;
1095        }
1096
1097        let parse_ts = |raw: &str| -> Option<TimestampMs> {
1098            if raw.is_empty() {
1099                None
1100            } else {
1101                raw.parse::<i64>().ok().map(TimestampMs)
1102            }
1103        };
1104
1105        let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1106        for (k, slot) in kept.into_iter().zip(matcher_slots) {
1107            let get = |i: usize| -> &str {
1108                k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1109            };
1110
1111            // Collect matcher names, eliding the empty-name wildcard
1112            // sentinel (see lua/helpers.lua initialize_condition).
1113            let required_signal_names: Vec<String> = match slot {
1114                None => Vec::new(),
1115                Some(s) => {
1116                    let vals: Vec<Option<String>> =
1117                        s.value().map_err(|e| crate::server::backend_context(e, format!(
1118                                "pipeline slot HMGET wp_condition matchers {}",
1119                                k.wp_id
1120                            )))?;
1121                    vals.into_iter()
1122                        .flatten()
1123                        .filter(|name| !name.is_empty())
1124                        .collect()
1125                }
1126            };
1127
1128            out.push(PendingWaitpointInfo {
1129                waitpoint_id: k.wp_id,
1130                waitpoint_key: get(1).to_owned(),
1131                state: get(0).to_owned(),
1132                waitpoint_token: WaitpointToken(get(2).to_owned()),
1133                required_signal_names,
1134                created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1135                activated_at: parse_ts(get(4)),
1136                expires_at: parse_ts(get(5)),
1137            });
1138        }
1139
1140        Ok(out)
1141    }
1142
1143    // ── Budget / Quota API ──
1144
1145    /// Create a new budget policy.
1146    pub async fn create_budget(
1147        &self,
1148        args: &CreateBudgetArgs,
1149    ) -> Result<CreateBudgetResult, ServerError> {
1150        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1151        validate_create_budget_dimensions(
1152            &args.dimensions,
1153            &args.hard_limits,
1154            &args.soft_limits,
1155        )?;
1156        let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1157        let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1158        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1159        let policies_index = keys::budget_policies_index(bctx.hash_tag());
1160
1161        // KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,
1162        //           budget_policies_index
1163        let fcall_keys: Vec<String> = vec![
1164            bctx.definition(),
1165            bctx.limits(),
1166            bctx.usage(),
1167            resets_key,
1168            policies_index,
1169        ];
1170
1171        // ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,
1172        //   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
1173        //   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
1174        let dim_count = args.dimensions.len();
1175        let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1176        fcall_args.push(args.budget_id.to_string());
1177        fcall_args.push(args.scope_type.clone());
1178        fcall_args.push(args.scope_id.clone());
1179        fcall_args.push(args.enforcement_mode.clone());
1180        fcall_args.push(args.on_hard_limit.clone());
1181        fcall_args.push(args.on_soft_limit.clone());
1182        fcall_args.push(args.reset_interval_ms.to_string());
1183        fcall_args.push(args.now.to_string());
1184        fcall_args.push(dim_count.to_string());
1185        for dim in &args.dimensions {
1186            fcall_args.push(dim.clone());
1187        }
1188        for hard in &args.hard_limits {
1189            fcall_args.push(hard.to_string());
1190        }
1191        for soft in &args.soft_limits {
1192            fcall_args.push(soft.to_string());
1193        }
1194
1195        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1196        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1197
1198        let raw: Value = self
1199            .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1200            .await?;
1201
1202        parse_budget_create_result(&raw, &args.budget_id)
1203    }
1204
1205    /// Create a new quota/rate-limit policy.
1206    pub async fn create_quota_policy(
1207        &self,
1208        args: &CreateQuotaPolicyArgs,
1209    ) -> Result<CreateQuotaPolicyResult, ServerError> {
1210        let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1211        let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1212
1213        // KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,
1214        //           admitted_set, quota_policies_index
1215        let fcall_keys: Vec<String> = vec![
1216            qctx.definition(),
1217            qctx.window("requests_per_window"),
1218            qctx.concurrency(),
1219            qctx.admitted_set(),
1220            keys::quota_policies_index(qctx.hash_tag()),
1221        ];
1222
1223        // ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,
1224        //           max_concurrent, now_ms
1225        let fcall_args: Vec<String> = vec![
1226            args.quota_policy_id.to_string(),
1227            args.window_seconds.to_string(),
1228            args.max_requests_per_window.to_string(),
1229            args.max_concurrent.to_string(),
1230            args.now.to_string(),
1231        ];
1232
1233        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1234        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1235
1236        let raw: Value = self
1237            .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1238            .await?;
1239
1240        parse_quota_create_result(&raw, &args.quota_policy_id)
1241    }
1242
1243    /// Read-only budget status for operator visibility.
1244    pub async fn get_budget_status(
1245        &self,
1246        budget_id: &BudgetId,
1247    ) -> Result<BudgetStatus, ServerError> {
1248        let partition = budget_partition(budget_id, &self.config.partition_config);
1249        let bctx = BudgetKeyContext::new(&partition, budget_id);
1250
1251        // Read budget definition
1252        let def: HashMap<String, String> = self
1253            .client
1254            .hgetall(&bctx.definition())
1255            .await
1256            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_def"))?;
1257
1258        if def.is_empty() {
1259            return Err(ServerError::NotFound(format!(
1260                "budget not found: {budget_id}"
1261            )));
1262        }
1263
1264        // Read usage
1265        let usage_raw: HashMap<String, String> = self
1266            .client
1267            .hgetall(&bctx.usage())
1268            .await
1269            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_usage"))?;
1270        let usage: HashMap<String, u64> = usage_raw
1271            .into_iter()
1272            .filter(|(k, _)| k != "_init")
1273            .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1274            .collect();
1275
1276        // Read limits
1277        let limits_raw: HashMap<String, String> = self
1278            .client
1279            .hgetall(&bctx.limits())
1280            .await
1281            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_limits"))?;
1282        let mut hard_limits = HashMap::new();
1283        let mut soft_limits = HashMap::new();
1284        for (k, v) in &limits_raw {
1285            if let Some(dim) = k.strip_prefix("hard:") {
1286                hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1287            } else if let Some(dim) = k.strip_prefix("soft:") {
1288                soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1289            }
1290        }
1291
1292        let non_empty = |s: Option<&String>| -> Option<String> {
1293            s.filter(|v| !v.is_empty()).cloned()
1294        };
1295
1296        Ok(BudgetStatus {
1297            budget_id: budget_id.to_string(),
1298            scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1299            scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1300            enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1301            usage,
1302            hard_limits,
1303            soft_limits,
1304            breach_count: def
1305                .get("breach_count")
1306                .and_then(|v| v.parse().ok())
1307                .unwrap_or(0),
1308            soft_breach_count: def
1309                .get("soft_breach_count")
1310                .and_then(|v| v.parse().ok())
1311                .unwrap_or(0),
1312            last_breach_at: non_empty(def.get("last_breach_at")),
1313            last_breach_dim: non_empty(def.get("last_breach_dim")),
1314            next_reset_at: non_empty(def.get("next_reset_at")),
1315            created_at: non_empty(def.get("created_at")),
1316        })
1317    }
1318
1319    /// Report usage against a budget and check limits.
1320    pub async fn report_usage(
1321        &self,
1322        budget_id: &BudgetId,
1323        args: &ReportUsageArgs,
1324    ) -> Result<ReportUsageResult, ServerError> {
1325        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1326        validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1327        let partition = budget_partition(budget_id, &self.config.partition_config);
1328        let bctx = BudgetKeyContext::new(&partition, budget_id);
1329
1330        // KEYS (3): budget_usage, budget_limits, budget_def
1331        let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1332
1333        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
1334        let dim_count = args.dimensions.len();
1335        let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1336        fcall_args.push(dim_count.to_string());
1337        for dim in &args.dimensions {
1338            fcall_args.push(dim.clone());
1339        }
1340        for delta in &args.deltas {
1341            fcall_args.push(delta.to_string());
1342        }
1343        fcall_args.push(args.now.to_string());
1344        let dedup_key_val = args
1345            .dedup_key
1346            .as_ref()
1347            .filter(|k| !k.is_empty())
1348            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1349            .unwrap_or_default();
1350        fcall_args.push(dedup_key_val);
1351
1352        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1353        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1354
1355        let raw: Value = self
1356            .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1357            .await?;
1358
1359        parse_report_usage_result(&raw)
1360    }
1361
1362    /// Reset a budget's usage counters and schedule the next reset.
1363    pub async fn reset_budget(
1364        &self,
1365        budget_id: &BudgetId,
1366    ) -> Result<ResetBudgetResult, ServerError> {
1367        let partition = budget_partition(budget_id, &self.config.partition_config);
1368        let bctx = BudgetKeyContext::new(&partition, budget_id);
1369        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1370
1371        // KEYS (3): budget_def, budget_usage, budget_resets_zset
1372        let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1373
1374        // ARGV (2): budget_id, now_ms
1375        let now = TimestampMs::now();
1376        let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1377
1378        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1379        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1380
1381        let raw: Value = self
1382            .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1383            .await?;
1384
1385        parse_reset_budget_result(&raw)
1386    }
1387
1388    // ── Flow API ──
1389
1390    /// Create a new flow container.
1391    pub async fn create_flow(
1392        &self,
1393        args: &CreateFlowArgs,
1394    ) -> Result<CreateFlowResult, ServerError> {
1395        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1396        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1397        let fidx = FlowIndexKeys::new(&partition);
1398
1399        // KEYS (3): flow_core, members_set, flow_index
1400        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1401
1402        // ARGV (4): flow_id, flow_kind, namespace, now_ms
1403        let fcall_args: Vec<String> = vec![
1404            args.flow_id.to_string(),
1405            args.flow_kind.clone(),
1406            args.namespace.to_string(),
1407            args.now.to_string(),
1408        ];
1409
1410        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1411        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1412
1413        let raw: Value = self
1414            .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1415            .await?;
1416
1417        parse_create_flow_result(&raw, &args.flow_id)
1418    }
1419
1420    /// Add an execution to a flow.
1421    ///
1422    /// # Atomic single-FCALL commit (RFC-011 §7.3)
1423    ///
1424    /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
1425    /// hash-tag routing (both hash to `{fp:N}` via the exec id's
1426    /// embedded partition). A single atomic FCALL writes:
1427    ///
1428    ///   - `members_set` SADD (flow membership)
1429    ///   - `exec_core.flow_id` HSET (back-pointer)
1430    ///   - `flow_index` SADD (self-heal)
1431    ///   - `flow_core` HINCRBY node_count / graph_revision +
1432    ///     HSET last_mutation_at
1433    ///
1434    /// All four writes commit atomically or none do (Valkey scripting
1435    /// contract: validates-before-writing in the Lua body means
1436    /// `flow_not_found` / `flow_already_terminal` early-returns fire
1437    /// BEFORE any `redis.call()` mutation, and a mid-body error after
1438    /// writes is not expected because all writes are on the same slot).
1439    ///
1440    /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
1441    ///
1442    /// # Consumer contract
1443    ///
1444    /// The caller's `args.execution_id` **must** be co-located with
1445    /// `args.flow_id`'s partition — i.e. minted via
1446    /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
1447    /// `solo`-minted id (or any exec id hashing to a different
1448    /// `{fp:N}` than the flow's) will fail at the Valkey level with
1449    /// `CROSSSLOT` on a clustered deploy.
1450    ///
1451    /// Callers with a flow context in scope always use `for_flow`;
1452    /// this is the only supported mint path for flow-member execs
1453    /// post-RFC-011. Test fixtures that pre-date the co-location
1454    /// contract use `TestCluster::new_execution_id_on_partition` to
1455    /// pin to a specific hash-tag index for `fcall_create_flow`-style
1456    /// helpers that hard-code their flow partition.
1457    pub async fn add_execution_to_flow(
1458        &self,
1459        args: &AddExecutionToFlowArgs,
1460    ) -> Result<AddExecutionToFlowResult, ServerError> {
1461        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1462        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1463        let fidx = FlowIndexKeys::new(&partition);
1464
1465        // exec_core co-locates with flow_core under RFC-011 §7.3 —
1466        // same `{fp:N}` hash-tag, same slot, part of the same atomic
1467        // FCALL below.
1468        let exec_partition =
1469            execution_partition(&args.execution_id, &self.config.partition_config);
1470        let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1471
1472        // Pre-flight: exec_partition must match flow_partition under
1473        // RFC-011 §7.3 co-location contract. If the caller hands us a
1474        // `solo`-minted exec whose hash-tag ≠ flow_partition, the FCALL
1475        // would fail with raw `CROSSSLOT` on a clustered deploy — a
1476        // typed `ServerError::PartitionMismatch` is a clearer signal
1477        // that the consumer-contract invariant was violated at mint
1478        // time (caller should have used `ExecutionId::for_flow(...)`).
1479        // See the Consumer contract section of this method's rustdoc.
1480        if exec_partition.index != partition.index {
1481            return Err(ServerError::PartitionMismatch(format!(
1482                "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1483                 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1484                 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1485                exec_p = exec_partition.index,
1486                flow_p = partition.index,
1487            )));
1488        }
1489
1490        // KEYS (4): flow_core, members_set, flow_index, exec_core
1491        let fcall_keys: Vec<String> = vec![
1492            fctx.core(),
1493            fctx.members(),
1494            fidx.flow_index(),
1495            ectx.core(),
1496        ];
1497
1498        // ARGV (3): flow_id, execution_id, now_ms
1499        let fcall_args: Vec<String> = vec![
1500            args.flow_id.to_string(),
1501            args.execution_id.to_string(),
1502            args.now.to_string(),
1503        ];
1504
1505        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1506        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1507
1508        let raw: Value = self
1509            .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1510            .await?;
1511
1512        parse_add_execution_to_flow_result(&raw)
1513    }
1514
1515    /// Cancel a flow.
1516    ///
1517    /// Flips `public_flow_state` to `cancelled` atomically via
1518    /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
1519    /// executions must be cancelled cross-partition; this dispatch runs in
1520    /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
1521    /// immediately. For all other policies (or flows with no members), or
1522    /// when the flow was already in a terminal state (idempotent retry),
1523    /// the call returns [`CancelFlowResult::Cancelled`].
1524    ///
1525    /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
1526    ///
1527    /// # Backpressure
1528    ///
1529    /// Each call that hits the async dispatch path spawns a new task into
1530    /// the shared background `JoinSet`. Rapid repeated calls against the
1531    /// same flow will spawn *multiple* overlapping dispatch tasks. This is
1532    /// not a correctness issue — each member cancel is idempotent and
1533    /// terminal flows short-circuit via [`ParsedCancelFlow::AlreadyTerminal`]
1534    /// — but heavy burst callers should either use `?wait=true` (serialises
1535    /// the dispatch on the HTTP thread, giving natural backpressure) or
1536    /// implement client-side deduplication on `flow_id`. The `JoinSet` is
1537    /// drained with a 15s timeout on [`Self::shutdown`], so very long
1538    /// dispatch tails may be aborted during graceful shutdown.
1539    ///
1540    /// # Orphan-member semantics on shutdown abort
1541    ///
1542    /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
1543    /// while a dispatch loop is mid-iteration, the already-issued
1544    /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
1545    /// `terminal_outcome = cancelled` and the caller-supplied reason. The
1546    /// members not yet visited are abandoned mid-loop. They remain in
1547    /// whichever state they were in (active/eligible/suspended) until the
1548    /// natural lifecycle scanners reach them: active leases expire
1549    /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
1550    /// members time out to `skipped`, eligible ones sit until retention
1551    /// trim. So no orphan state — but the terminal_outcome for the
1552    /// abandoned members will be `expired`/`skipped` rather than
1553    /// `cancelled`, and the operator-supplied `reason` is lost for them.
1554    /// Audit tooling that requires reason fidelity across shutdowns should
1555    /// use `?wait=true`.
1556    pub async fn cancel_flow(
1557        &self,
1558        args: &CancelFlowArgs,
1559    ) -> Result<CancelFlowResult, ServerError> {
1560        self.cancel_flow_inner(args, false).await
1561    }
1562
1563    /// Cancel a flow and wait for all member cancellations to complete
1564    /// inline. Slower than [`Self::cancel_flow`] for large flows, but
1565    /// guarantees every member is in a terminal state on return.
1566    pub async fn cancel_flow_wait(
1567        &self,
1568        args: &CancelFlowArgs,
1569    ) -> Result<CancelFlowResult, ServerError> {
1570        self.cancel_flow_inner(args, true).await
1571    }
1572
1573    async fn cancel_flow_inner(
1574        &self,
1575        args: &CancelFlowArgs,
1576        wait: bool,
1577    ) -> Result<CancelFlowResult, ServerError> {
1578        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1579        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1580        let fidx = FlowIndexKeys::new(&partition);
1581
1582        // Grace window before the reconciler may pick up this flow.
1583        // Kept short because the in-process dispatch is fast and bounded;
1584        // long enough to cover transport round-trips under load without
1585        // the reconciler fighting the live dispatch.
1586        const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1587
1588        // KEYS (5): flow_core, members_set, flow_index, pending_cancels, cancel_backlog
1589        let fcall_keys: Vec<String> = vec![
1590            fctx.core(),
1591            fctx.members(),
1592            fidx.flow_index(),
1593            fctx.pending_cancels(),
1594            fidx.cancel_backlog(),
1595        ];
1596
1597        // ARGV (5): flow_id, reason, cancellation_policy, now_ms, grace_ms
1598        let fcall_args: Vec<String> = vec![
1599            args.flow_id.to_string(),
1600            args.reason.clone(),
1601            args.cancellation_policy.clone(),
1602            args.now.to_string(),
1603            CANCEL_RECONCILER_GRACE_MS.to_string(),
1604        ];
1605
1606        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1607        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1608
1609        let raw: Value = self
1610            .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1611            .await?;
1612
1613        let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1614            ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1615                (policy, member_execution_ids)
1616            }
1617            // Idempotent retry: flow was already cancelled/completed/failed.
1618            // Return Cancelled with the *stored* policy and member list so
1619            // observability tooling gets the real historical state rather
1620            // than echoing the caller's retry intent. One HMGET + SMEMBERS
1621            // on the idempotent path — both on {fp:N}, same slot.
1622            ParsedCancelFlow::AlreadyTerminal => {
1623                let flow_meta: Vec<Option<String>> = self
1624                    .client
1625                    .cmd("HMGET")
1626                    .arg(fctx.core())
1627                    .arg("cancellation_policy")
1628                    .arg("cancel_reason")
1629                    .execute()
1630                    .await
1631                    .map_err(|e| crate::server::backend_context(e, "HMGET flow_core cancellation_policy,cancel_reason"))?;
1632                let stored_policy = flow_meta
1633                    .first()
1634                    .and_then(|v| v.as_ref())
1635                    .filter(|s| !s.is_empty())
1636                    .cloned();
1637                let stored_reason = flow_meta
1638                    .get(1)
1639                    .and_then(|v| v.as_ref())
1640                    .filter(|s| !s.is_empty())
1641                    .cloned();
1642                let all_members: Vec<String> = self
1643                    .client
1644                    .cmd("SMEMBERS")
1645                    .arg(fctx.members())
1646                    .execute()
1647                    .await
1648                    .map_err(|e| crate::server::backend_context(e, "SMEMBERS flow members (already terminal)"))?;
1649                // Cap the returned list to avoid pathological bandwidth on
1650                // idempotent retries for flows with 10k+ members. Clients
1651                // already received the authoritative member list on the
1652                // first (non-idempotent) call; subsequent retries just need
1653                // enough to confirm the operation and trigger per-member
1654                // polling if desired.
1655                let total_members = all_members.len();
1656                let stored_members: Vec<String> = all_members
1657                    .into_iter()
1658                    .take(ALREADY_TERMINAL_MEMBER_CAP)
1659                    .collect();
1660                tracing::debug!(
1661                    flow_id = %args.flow_id,
1662                    stored_policy = stored_policy.as_deref().unwrap_or(""),
1663                    stored_reason = stored_reason.as_deref().unwrap_or(""),
1664                    total_members,
1665                    returned_members = stored_members.len(),
1666                    "cancel_flow: flow already terminal, returning idempotent Cancelled"
1667                );
1668                return Ok(CancelFlowResult::Cancelled {
1669                    // Fall back to caller's policy only if the stored field
1670                    // is missing (flows cancelled by older Lua that did not
1671                    // persist cancellation_policy).
1672                    cancellation_policy: stored_policy
1673                        .unwrap_or_else(|| args.cancellation_policy.clone()),
1674                    member_execution_ids: stored_members,
1675                });
1676            }
1677        };
1678        let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1679
1680        if !needs_dispatch {
1681            return Ok(CancelFlowResult::Cancelled {
1682                cancellation_policy: policy,
1683                member_execution_ids: members,
1684            });
1685        }
1686
1687        let pending_cancels_key = fctx.pending_cancels();
1688        let cancel_backlog_key = fidx.cancel_backlog();
1689
1690        if wait {
1691            // Synchronous dispatch — cancel every member inline before returning.
1692            // Collect per-member failures so the caller sees a
1693            // PartiallyCancelled outcome instead of a false-positive
1694            // Cancelled when any member cancel faulted (ghost member,
1695            // transport exhaustion, Lua reject). The cancel-backlog
1696            // reconciler still retries the unacked members; surfacing
1697            // the partial state lets operator tooling alert without
1698            // polling per-member state.
1699            let mut failed: Vec<String> = Vec::new();
1700            for eid_str in &members {
1701                match cancel_member_execution(
1702                    &self.client,
1703                    &self.config.partition_config,
1704                    eid_str,
1705                    &args.reason,
1706                    args.now,
1707                )
1708                .await
1709                {
1710                    Ok(()) => {
1711                        ack_cancel_member(
1712                            &self.client,
1713                            &pending_cancels_key,
1714                            &cancel_backlog_key,
1715                            eid_str,
1716                            &args.flow_id.to_string(),
1717                        )
1718                        .await;
1719                    }
1720                    Err(e) => {
1721                        // If the member was already terminal (execution_not_active /
1722                        // execution_not_found), treat this as ack-worthy success —
1723                        // the member is effectively "cancelled" from the flow's
1724                        // perspective and shouldn't be surfaced as a partial
1725                        // failure. Mirrors cancel_reconciler::cancel_member which
1726                        // acks on the same codes to avoid backlog poisoning.
1727                        if is_terminal_ack_error(&e) {
1728                            ack_cancel_member(
1729                                &self.client,
1730                                &pending_cancels_key,
1731                                &cancel_backlog_key,
1732                                eid_str,
1733                                &args.flow_id.to_string(),
1734                            )
1735                            .await;
1736                            continue;
1737                        }
1738                        tracing::warn!(
1739                            execution_id = %eid_str,
1740                            error = %e,
1741                            "cancel_flow(wait): individual execution cancel failed \
1742                             (transport/contract fault; reconciler will retry if transient)"
1743                        );
1744                        failed.push(eid_str.clone());
1745                    }
1746                }
1747            }
1748            if failed.is_empty() {
1749                return Ok(CancelFlowResult::Cancelled {
1750                    cancellation_policy: policy,
1751                    member_execution_ids: members,
1752                });
1753            }
1754            return Ok(CancelFlowResult::PartiallyCancelled {
1755                cancellation_policy: policy,
1756                member_execution_ids: members,
1757                failed_member_execution_ids: failed,
1758            });
1759        }
1760
1761        // Asynchronous dispatch — spawn into the shared JoinSet so Server::shutdown
1762        // can wait for pending cancellations (bounded by a shutdown timeout).
1763        let client = self.client.clone();
1764        let partition_config = self.config.partition_config;
1765        let reason = args.reason.clone();
1766        let now = args.now;
1767        let dispatch_members = members.clone();
1768        let flow_id = args.flow_id.clone();
1769        // Every async cancel_flow contends on this lock, but the critical
1770        // section is tiny: try_join_next drain + spawn. Drain is amortized
1771        // O(1) — each completed task is reaped exactly once across all
1772        // callers, and spawn is synchronous. At realistic cancel rates the
1773        // lock hold time is microseconds and does not bottleneck handlers.
1774        let mut guard = self.background_tasks.lock().await;
1775
1776        // Reap completed background dispatches before spawning the next.
1777        // Without this sweep, JoinSet accumulates Ok(()) results for every
1778        // async cancel ever issued — a memory leak in long-running servers
1779        // that would otherwise only drain on Server::shutdown. Surface any
1780        // panicked/aborted dispatches via tracing so silent failures in
1781        // cancel_member_execution are visible in logs.
1782        while let Some(joined) = guard.try_join_next() {
1783            if let Err(e) = joined {
1784                tracing::warn!(
1785                    error = %e,
1786                    "cancel_flow: background dispatch task panicked or was aborted"
1787                );
1788            }
1789        }
1790
1791        let pending_key_owned = pending_cancels_key.clone();
1792        let backlog_key_owned = cancel_backlog_key.clone();
1793        let flow_id_str = args.flow_id.to_string();
1794
1795        guard.spawn(async move {
1796            // Bounded parallel dispatch via futures::stream::buffer_unordered.
1797            // Sequential cancel of a 1000-member flow at ~2ms/FCALL is ~2s —
1798            // too long to finish inside a 15s shutdown abort window for
1799            // large flows. Bounding at CONCURRENCY keeps Valkey load
1800            // predictable while still cutting wall-clock dispatch time by
1801            // ~CONCURRENCY× for large member sets.
1802            use futures::stream::StreamExt;
1803            const CONCURRENCY: usize = 16;
1804
1805            let member_count = dispatch_members.len();
1806            let flow_id_for_log = flow_id.clone();
1807            futures::stream::iter(dispatch_members)
1808                .map(|eid_str| {
1809                    let client = client.clone();
1810                    let reason = reason.clone();
1811                    let flow_id = flow_id.clone();
1812                    let pending = pending_key_owned.clone();
1813                    let backlog = backlog_key_owned.clone();
1814                    let flow_id_str = flow_id_str.clone();
1815                    async move {
1816                        match cancel_member_execution(
1817                            &client,
1818                            &partition_config,
1819                            &eid_str,
1820                            &reason,
1821                            now,
1822                        )
1823                        .await
1824                        {
1825                            Ok(()) => {
1826                                ack_cancel_member(
1827                                    &client,
1828                                    &pending,
1829                                    &backlog,
1830                                    &eid_str,
1831                                    &flow_id_str,
1832                                )
1833                                .await;
1834                            }
1835                            Err(e) => {
1836                                if is_terminal_ack_error(&e) {
1837                                    ack_cancel_member(
1838                                        &client,
1839                                        &pending,
1840                                        &backlog,
1841                                        &eid_str,
1842                                        &flow_id_str,
1843                                    )
1844                                    .await;
1845                                } else {
1846                                    tracing::warn!(
1847                                        flow_id = %flow_id,
1848                                        execution_id = %eid_str,
1849                                        error = %e,
1850                                        "cancel_flow(async): individual execution cancel failed \
1851                                         (transport/contract fault; reconciler will retry if transient)"
1852                                    );
1853                                }
1854                            }
1855                        }
1856                    }
1857                })
1858                .buffer_unordered(CONCURRENCY)
1859                .for_each(|()| async {})
1860                .await;
1861
1862            tracing::debug!(
1863                flow_id = %flow_id_for_log,
1864                member_count,
1865                concurrency = CONCURRENCY,
1866                "cancel_flow: background member dispatch complete"
1867            );
1868        });
1869        drop(guard);
1870
1871        let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1872        Ok(CancelFlowResult::CancellationScheduled {
1873            cancellation_policy: policy,
1874            member_count,
1875            member_execution_ids: members,
1876        })
1877    }
1878
1879    /// Stage a dependency edge between two executions in a flow.
1880    ///
1881    /// Runs on the flow partition {fp:N}.
1882    /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1883    pub async fn stage_dependency_edge(
1884        &self,
1885        args: &StageDependencyEdgeArgs,
1886    ) -> Result<StageDependencyEdgeResult, ServerError> {
1887        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1888        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1889
1890        // KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set, grant_hash
1891        let fcall_keys: Vec<String> = vec![
1892            fctx.core(),
1893            fctx.members(),
1894            fctx.edge(&args.edge_id),
1895            fctx.outgoing(&args.upstream_execution_id),
1896            fctx.incoming(&args.downstream_execution_id),
1897            fctx.grant(&args.edge_id.to_string()),
1898        ];
1899
1900        // ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
1901        //           dependency_kind, data_passing_ref, expected_graph_revision, now_ms
1902        let fcall_args: Vec<String> = vec![
1903            args.flow_id.to_string(),
1904            args.edge_id.to_string(),
1905            args.upstream_execution_id.to_string(),
1906            args.downstream_execution_id.to_string(),
1907            args.dependency_kind.clone(),
1908            args.data_passing_ref.clone().unwrap_or_default(),
1909            args.expected_graph_revision.to_string(),
1910            args.now.to_string(),
1911        ];
1912
1913        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1914        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1915
1916        let raw: Value = self
1917            .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1918            .await?;
1919
1920        parse_stage_dependency_edge_result(&raw)
1921    }
1922
1923    /// Apply a staged dependency edge to the child execution.
1924    ///
1925    /// Runs on the child execution partition {p:N}.
1926    /// KEYS (7), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.
1927    pub async fn apply_dependency_to_child(
1928        &self,
1929        args: &ApplyDependencyToChildArgs,
1930    ) -> Result<ApplyDependencyToChildResult, ServerError> {
1931        let partition = execution_partition(
1932            &args.downstream_execution_id,
1933            &self.config.partition_config,
1934        );
1935        let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1936        let idx = IndexKeys::new(&partition);
1937
1938        // Pre-read lane_id for index keys
1939        let lane_str: Option<String> = self
1940            .client
1941            .hget(&ctx.core(), "lane_id")
1942            .await
1943            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
1944        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1945
1946        // KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,
1947        //           eligible_zset, blocked_deps_zset, deps_all_edges
1948        let fcall_keys: Vec<String> = vec![
1949            ctx.core(),
1950            ctx.deps_meta(),
1951            ctx.deps_unresolved(),
1952            ctx.dep_edge(&args.edge_id),
1953            idx.lane_eligible(&lane),
1954            idx.lane_blocked_dependencies(&lane),
1955            ctx.deps_all_edges(),
1956        ];
1957
1958        // ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
1959        //           dependency_kind, data_passing_ref, now_ms
1960        let fcall_args: Vec<String> = vec![
1961            args.flow_id.to_string(),
1962            args.edge_id.to_string(),
1963            args.upstream_execution_id.to_string(),
1964            args.graph_revision.to_string(),
1965            args.dependency_kind.clone(),
1966            args.data_passing_ref.clone().unwrap_or_default(),
1967            args.now.to_string(),
1968        ];
1969
1970        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1971        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1972
1973        let raw: Value = self
1974            .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1975            .await?;
1976
1977        parse_apply_dependency_result(&raw)
1978    }
1979
1980    // ── Execution operations API ──
1981
1982    /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1983    ///
1984    /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1985    /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1986    pub async fn deliver_signal(
1987        &self,
1988        args: &DeliverSignalArgs,
1989    ) -> Result<DeliverSignalResult, ServerError> {
1990        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1991        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1992        let idx = IndexKeys::new(&partition);
1993
1994        // Pre-read lane_id for index keys
1995        let lane_str: Option<String> = self
1996            .client
1997            .hget(&ctx.core(), "lane_id")
1998            .await
1999            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2000        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2001
2002        let wp_id = &args.waitpoint_id;
2003        let sig_id = &args.signal_id;
2004        let idem_key = args
2005            .idempotency_key
2006            .as_ref()
2007            .filter(|k| !k.is_empty())
2008            .map(|k| ctx.signal_dedup(wp_id, k))
2009            .unwrap_or_else(|| ctx.noop());
2010
2011        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
2012        //            exec_signals_zset, signal_hash, signal_payload,
2013        //            idem_key, waitpoint_hash, suspension_current,
2014        //            eligible_zset, suspended_zset, delayed_zset,
2015        //            suspension_timeout_zset, hmac_secrets
2016        let fcall_keys: Vec<String> = vec![
2017            ctx.core(),                       // 1
2018            ctx.waitpoint_condition(wp_id),    // 2
2019            ctx.waitpoint_signals(wp_id),      // 3
2020            ctx.exec_signals(),                // 4
2021            ctx.signal(sig_id),                // 5
2022            ctx.signal_payload(sig_id),        // 6
2023            idem_key,                          // 7
2024            ctx.waitpoint(wp_id),              // 8
2025            ctx.suspension_current(),          // 9
2026            idx.lane_eligible(&lane),          // 10
2027            idx.lane_suspended(&lane),         // 11
2028            idx.lane_delayed(&lane),           // 12
2029            idx.suspension_timeout(),          // 13
2030            idx.waitpoint_hmac_secrets(),      // 14
2031        ];
2032
2033        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
2034        //            signal_category, source_type, source_identity,
2035        //            payload, payload_encoding, idempotency_key,
2036        //            correlation_id, target_scope, created_at,
2037        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
2038        //            max_signals_per_execution, waitpoint_token
2039        let fcall_args: Vec<String> = vec![
2040            args.signal_id.to_string(),                          // 1
2041            args.execution_id.to_string(),                       // 2
2042            args.waitpoint_id.to_string(),                       // 3
2043            args.signal_name.clone(),                            // 4
2044            args.signal_category.clone(),                        // 5
2045            args.source_type.clone(),                            // 6
2046            args.source_identity.clone(),                        // 7
2047            args.payload.as_ref()
2048                .map(|p| String::from_utf8_lossy(p).into_owned())
2049                .unwrap_or_default(),                            // 8
2050            args.payload_encoding
2051                .clone()
2052                .unwrap_or_else(|| "json".to_owned()),           // 9
2053            args.idempotency_key
2054                .clone()
2055                .unwrap_or_default(),                            // 10
2056            args.correlation_id
2057                .clone()
2058                .unwrap_or_default(),                            // 11
2059            args.target_scope.clone(),                           // 12
2060            args.created_at
2061                .map(|ts| ts.to_string())
2062                .unwrap_or_else(|| args.now.to_string()),        // 13
2063            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), // 14
2064            args.resume_delay_ms.unwrap_or(0).to_string(),       // 15
2065            args.signal_maxlen.unwrap_or(1000).to_string(),      // 16
2066            args.max_signals_per_execution
2067                .unwrap_or(10_000)
2068                .to_string(),                                    // 17
2069            // WIRE BOUNDARY — raw token to Lua. Display is redacted
2070            // for log safety; use .as_str() at the wire crossing.
2071            args.waitpoint_token.as_str().to_owned(),            // 18
2072        ];
2073
2074        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2075        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2076
2077        let raw: Value = self
2078            .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2079            .await?;
2080
2081        parse_deliver_signal_result(&raw, &args.signal_id)
2082    }
2083
2084    /// Change an execution's priority.
2085    ///
2086    /// KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.
2087    pub async fn change_priority(
2088        &self,
2089        execution_id: &ExecutionId,
2090        new_priority: i32,
2091    ) -> Result<ChangePriorityResult, ServerError> {
2092        let partition = execution_partition(execution_id, &self.config.partition_config);
2093        let ctx = ExecKeyContext::new(&partition, execution_id);
2094        let idx = IndexKeys::new(&partition);
2095
2096        // Read lane_id for eligible_zset key
2097        let lane_str: Option<String> = self
2098            .client
2099            .hget(&ctx.core(), "lane_id")
2100            .await
2101            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2102        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2103
2104        // KEYS (2): exec_core, eligible_zset
2105        let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2106
2107        // ARGV (2): execution_id, new_priority
2108        let fcall_args: Vec<String> = vec![
2109            execution_id.to_string(),
2110            new_priority.to_string(),
2111        ];
2112
2113        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2114        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2115
2116        let raw: Value = self
2117            .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2118            .await?;
2119
2120        parse_change_priority_result(&raw, execution_id)
2121    }
2122
2123    /// Scheduler-routed claim entry point (Batch C item 2 PR-B).
2124    ///
2125    /// Delegates to [`ff_scheduler::Scheduler::claim_for_worker`] which
2126    /// runs budget + quota + capability admission before issuing the
2127    /// grant. Returns `Ok(None)` when no eligible execution exists on
2128    /// the lane at this scan cycle. The worker's subsequent
2129    /// `claim_from_grant(lane, grant)` mints the lease.
2130    ///
2131    /// Keeping the claim-grant mint inside the server (rather than the
2132    /// worker) means capability CSV validation, budget/quota breach
2133    /// checks, and lane routing run in one place for every tenant
2134    /// worker — the same invariants as the `direct-valkey-claim` path
2135    /// enforces inline, but gated at a single server choke point.
2136    pub async fn claim_for_worker(
2137        &self,
2138        lane: &LaneId,
2139        worker_id: &WorkerId,
2140        worker_instance_id: &WorkerInstanceId,
2141        worker_capabilities: &std::collections::BTreeSet<String>,
2142        grant_ttl_ms: u64,
2143    ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2144        self.scheduler
2145            .claim_for_worker(
2146                lane,
2147                worker_id,
2148                worker_instance_id,
2149                worker_capabilities,
2150                grant_ttl_ms,
2151            )
2152            .await
2153            .map_err(|e| match e {
2154                ff_scheduler::SchedulerError::Valkey(inner) => ServerError::from(inner),
2155                ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2156                    crate::server::backend_context(source, context)
2157                }
2158                ff_scheduler::SchedulerError::Config(msg) => ServerError::InvalidInput(msg),
2159            })
2160    }
2161
2162    /// Revoke an active lease (operator-initiated).
2163    pub async fn revoke_lease(
2164        &self,
2165        execution_id: &ExecutionId,
2166    ) -> Result<RevokeLeaseResult, ServerError> {
2167        let partition = execution_partition(execution_id, &self.config.partition_config);
2168        let ctx = ExecKeyContext::new(&partition, execution_id);
2169        let idx = IndexKeys::new(&partition);
2170
2171        // Pre-read worker_instance_id for worker_leases key
2172        let wiid_str: Option<String> = self
2173            .client
2174            .hget(&ctx.core(), "current_worker_instance_id")
2175            .await
2176            .map_err(|e| crate::server::backend_context(e, "HGET worker_instance_id"))?;
2177        let wiid = match wiid_str {
2178            Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2179            _ => {
2180                return Err(ServerError::NotFound(format!(
2181                    "no active lease for execution {execution_id} (no current_worker_instance_id)"
2182                )));
2183            }
2184        };
2185
2186        // KEYS (5): exec_core, lease_current, lease_history, lease_expiry_zset, worker_leases
2187        let fcall_keys: Vec<String> = vec![
2188            ctx.core(),
2189            ctx.lease_current(),
2190            ctx.lease_history(),
2191            idx.lease_expiry(),
2192            idx.worker_leases(&wiid),
2193        ];
2194
2195        // ARGV (3): execution_id, expected_lease_id (empty = skip check), revoke_reason
2196        let fcall_args: Vec<String> = vec![
2197            execution_id.to_string(),
2198            String::new(), // no expected_lease_id check
2199            "operator_revoke".to_owned(),
2200        ];
2201
2202        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2203        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2204
2205        let raw: Value = self
2206            .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2207            .await?;
2208
2209        parse_revoke_lease_result(&raw)
2210    }
2211
2212    /// Get full execution info via HGETALL on exec_core.
2213    pub async fn get_execution(
2214        &self,
2215        execution_id: &ExecutionId,
2216    ) -> Result<ExecutionInfo, ServerError> {
2217        let partition = execution_partition(execution_id, &self.config.partition_config);
2218        let ctx = ExecKeyContext::new(&partition, execution_id);
2219
2220        let fields: HashMap<String, String> = self
2221            .client
2222            .hgetall(&ctx.core())
2223            .await
2224            .map_err(|e| crate::server::backend_context(e, "HGETALL exec_core"))?;
2225
2226        if fields.is_empty() {
2227            return Err(ServerError::NotFound(format!(
2228                "execution not found: {execution_id}"
2229            )));
2230        }
2231
2232        let parse_enum = |field: &str| -> String {
2233            fields.get(field).cloned().unwrap_or_default()
2234        };
2235        fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2236            let quoted = format!("\"{raw}\"");
2237            serde_json::from_str(&quoted).map_err(|e| {
2238                ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2239            })
2240        }
2241
2242        let lp_str = parse_enum("lifecycle_phase");
2243        let os_str = parse_enum("ownership_state");
2244        let es_str = parse_enum("eligibility_state");
2245        let br_str = parse_enum("blocking_reason");
2246        let to_str = parse_enum("terminal_outcome");
2247        let as_str = parse_enum("attempt_state");
2248        let ps_str = parse_enum("public_state");
2249
2250        let state_vector = StateVector {
2251            lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2252            ownership_state: deserialize("ownership_state", &os_str)?,
2253            eligibility_state: deserialize("eligibility_state", &es_str)?,
2254            blocking_reason: deserialize("blocking_reason", &br_str)?,
2255            terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2256            attempt_state: deserialize("attempt_state", &as_str)?,
2257            public_state: deserialize("public_state", &ps_str)?,
2258        };
2259
2260        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2261        // stamped atomically with membership in `add_execution_to_flow`'s
2262        // single FCALL. Empty iff the exec has no flow affinity (never
2263        // called `add_execution_to_flow` — solo execs) — NOT "orphaned
2264        // mid-two-phase" (that failure mode is retired). Filter on empty
2265        // to surface "no flow affinity" distinctly from "flow X".
2266        let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2267
2268        // started_at / completed_at come from the exec_core hash —
2269        // lua/execution.lua writes them alongside the rest of the state
2270        // vector. `created_at` is required; the other two are optional
2271        // (pre-claim / pre-terminal reads) and elided when empty so the
2272        // wire payload for in-flight executions stays the shape existing
2273        // callers expect.
2274        let started_at_opt = fields
2275            .get("started_at")
2276            .filter(|s| !s.is_empty())
2277            .cloned();
2278        let completed_at_opt = fields
2279            .get("completed_at")
2280            .filter(|s| !s.is_empty())
2281            .cloned();
2282
2283        Ok(ExecutionInfo {
2284            execution_id: execution_id.clone(),
2285            namespace: parse_enum("namespace"),
2286            lane_id: parse_enum("lane_id"),
2287            priority: fields
2288                .get("priority")
2289                .and_then(|v| v.parse().ok())
2290                .unwrap_or(0),
2291            execution_kind: parse_enum("execution_kind"),
2292            state_vector,
2293            public_state: deserialize("public_state", &ps_str)?,
2294            created_at: parse_enum("created_at"),
2295            started_at: started_at_opt,
2296            completed_at: completed_at_opt,
2297            current_attempt_index: fields
2298                .get("current_attempt_index")
2299                .and_then(|v| v.parse().ok())
2300                .unwrap_or(0),
2301            flow_id: flow_id_val,
2302            blocking_detail: parse_enum("blocking_detail"),
2303        })
2304    }
2305
2306    /// Partition-scoped forward-only cursor listing of executions.
2307    ///
2308    /// Parity-wrapper around the Valkey body of
2309    /// [`ff_core::engine_backend::EngineBackend::list_executions`].
2310    /// Issue #182 replaced the previous offset + lane + state-filter
2311    /// shape with this cursor-based API (per owner adjudication:
2312    /// cursor-everywhere, HTTP surface unreleased). Reads
2313    /// `ff:idx:{p:N}:all_executions`, sorts lexicographically on
2314    /// `ExecutionId`, filters `> cursor`, and trims to `limit`.
2315    pub async fn list_executions_page(
2316        &self,
2317        partition_id: u16,
2318        cursor: Option<ExecutionId>,
2319        limit: usize,
2320    ) -> Result<ListExecutionsPage, ServerError> {
2321        if limit == 0 {
2322            return Ok(ListExecutionsPage::new(Vec::new(), None));
2323        }
2324        let partition = ff_core::partition::Partition {
2325            family: ff_core::partition::PartitionFamily::Execution,
2326            index: partition_id,
2327        };
2328        let idx = IndexKeys::new(&partition);
2329        let all_key = idx.all_executions();
2330
2331        let raw_members: Vec<String> = self
2332            .client
2333            .cmd("SMEMBERS")
2334            .arg(&all_key)
2335            .execute()
2336            .await
2337            .map_err(|e| crate::server::backend_context(e, format!("SMEMBERS {all_key}")))?;
2338
2339        if raw_members.is_empty() {
2340            return Ok(ListExecutionsPage::new(Vec::new(), None));
2341        }
2342
2343        let mut parsed: Vec<ExecutionId> = Vec::with_capacity(raw_members.len());
2344        for raw in &raw_members {
2345            match ExecutionId::parse(raw) {
2346                Ok(id) => parsed.push(id),
2347                Err(e) => {
2348                    tracing::warn!(
2349                        raw_id = %raw,
2350                        error = %e,
2351                        set = %all_key,
2352                        "list_executions_page: SMEMBERS member failed to parse as ExecutionId \
2353                         (data corruption?)"
2354                    );
2355                }
2356            }
2357        }
2358        parsed.sort_by(|a, b| a.as_str().cmp(b.as_str()));
2359
2360        let filtered: Vec<ExecutionId> = if let Some(c) = cursor.as_ref() {
2361            let cs = c.as_str();
2362            parsed.into_iter().filter(|e| e.as_str() > cs).collect()
2363        } else {
2364            parsed
2365        };
2366
2367        let effective_limit = limit.min(1000);
2368        let has_more = filtered.len() > effective_limit;
2369        let page: Vec<ExecutionId> = filtered.into_iter().take(effective_limit).collect();
2370        let next_cursor = if has_more { page.last().cloned() } else { None };
2371        Ok(ListExecutionsPage::new(page, next_cursor))
2372    }
2373
2374    /// Replay a terminal execution.
2375    ///
2376    /// Pre-reads exec_core for flow_id and dep edges (variable KEYS).
2377    /// KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.
2378    pub async fn replay_execution(
2379        &self,
2380        execution_id: &ExecutionId,
2381    ) -> Result<ReplayExecutionResult, ServerError> {
2382        let partition = execution_partition(execution_id, &self.config.partition_config);
2383        let ctx = ExecKeyContext::new(&partition, execution_id);
2384        let idx = IndexKeys::new(&partition);
2385
2386        // Pre-read lane_id, flow_id, terminal_outcome.
2387        //
2388        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2389        // stamped atomically with membership by `add_execution_to_flow`'s
2390        // single FCALL. Empty iff the exec has no flow affinity
2391        // (solo-path create_execution — never added to a flow). The
2392        // `is_skipped_flow_member` branch below gates on
2393        // `!flow_id_str.is_empty()`, so solo execs correctly fall back
2394        // to the non-flow-member replay path. See
2395        // `add_execution_to_flow` rustdoc for the atomic-commit
2396        // invariant.
2397        let dyn_fields: Vec<Option<String>> = self
2398            .client
2399            .cmd("HMGET")
2400            .arg(ctx.core())
2401            .arg("lane_id")
2402            .arg("flow_id")
2403            .arg("terminal_outcome")
2404            .execute()
2405            .await
2406            .map_err(|e| crate::server::backend_context(e, "HMGET replay pre-read"))?;
2407        let lane = LaneId::new(
2408            dyn_fields
2409                .first()
2410                .and_then(|v| v.as_ref())
2411                .cloned()
2412                .unwrap_or_else(|| "default".to_owned()),
2413        );
2414        let flow_id_str = dyn_fields
2415            .get(1)
2416            .and_then(|v| v.as_ref())
2417            .cloned()
2418            .unwrap_or_default();
2419        let terminal_outcome = dyn_fields
2420            .get(2)
2421            .and_then(|v| v.as_ref())
2422            .cloned()
2423            .unwrap_or_default();
2424
2425        let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2426
2427        // Base KEYS (4): exec_core, terminal_zset, eligible_zset, lease_history
2428        let mut fcall_keys: Vec<String> = vec![
2429            ctx.core(),
2430            idx.lane_terminal(&lane),
2431            idx.lane_eligible(&lane),
2432            ctx.lease_history(),
2433        ];
2434
2435        // Base ARGV (2): execution_id, now_ms
2436        let now = TimestampMs::now();
2437        let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2438
2439        if is_skipped_flow_member {
2440            // Read ALL inbound edge IDs from the flow partition's adjacency set.
2441            // Cannot use deps:unresolved because impossible edges were SREM'd
2442            // by ff_resolve_dependency. The flow's in:<eid> set has all edges.
2443            let flow_id = FlowId::parse(&flow_id_str)
2444                .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2445            let flow_part =
2446                flow_partition(&flow_id, &self.config.partition_config);
2447            let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2448            let edge_ids: Vec<String> = self
2449                .client
2450                .cmd("SMEMBERS")
2451                .arg(flow_ctx.incoming(execution_id))
2452                .execute()
2453                .await
2454                .map_err(|e| crate::server::backend_context(e, "SMEMBERS replay edges"))?;
2455
2456            // Extended KEYS: blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N
2457            fcall_keys.push(idx.lane_blocked_dependencies(&lane)); // 5
2458            fcall_keys.push(ctx.deps_meta()); // 6
2459            fcall_keys.push(ctx.deps_unresolved()); // 7
2460            for eid_str in &edge_ids {
2461                let edge_id = EdgeId::parse(eid_str)
2462                    .unwrap_or_else(|_| EdgeId::new());
2463                fcall_keys.push(ctx.dep_edge(&edge_id)); // 8..8+N
2464                fcall_args.push(eid_str.clone()); // 3..3+N
2465            }
2466        }
2467
2468        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2469        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2470
2471        let raw: Value = self
2472            .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2473            .await?;
2474
2475        parse_replay_result(&raw)
2476    }
2477
2478    /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
2479    /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
2480    /// when the producer finalizes.
2481    ///
2482    /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
2483    /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
2484    /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
2485    /// and the Lua-side reject).
2486    ///
2487    /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
2488    /// execution id, so all KEYS share the same slot.
2489    pub async fn read_attempt_stream(
2490        &self,
2491        execution_id: &ExecutionId,
2492        attempt_index: AttemptIndex,
2493        from_id: &str,
2494        to_id: &str,
2495        count_limit: u64,
2496    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2497        use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2498
2499        if count_limit == 0 {
2500            return Err(ServerError::InvalidInput(
2501                "count_limit must be >= 1".to_owned(),
2502            ));
2503        }
2504
2505        // Share the same semaphore as tail. A large XRANGE reply (10_000
2506        // frames × ~64KB) is just as capable of head-of-line-blocking the
2507        // tail_client mux as a long BLOCK — fairness accounting must be
2508        // unified. Non-blocking acquire → 429 on contention.
2509        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2510            Ok(p) => p,
2511            Err(tokio::sync::TryAcquireError::NoPermits) => {
2512                return Err(ServerError::ConcurrencyLimitExceeded(
2513                    "stream_ops",
2514                    self.config.max_concurrent_stream_ops,
2515                ));
2516            }
2517            Err(tokio::sync::TryAcquireError::Closed) => {
2518                return Err(ServerError::OperationFailed(
2519                    "stream semaphore closed (server shutting down)".into(),
2520                ));
2521            }
2522        };
2523
2524        let args = ReadFramesArgs {
2525            execution_id: execution_id.clone(),
2526            attempt_index,
2527            from_id: from_id.to_owned(),
2528            to_id: to_id.to_owned(),
2529            count_limit,
2530        };
2531
2532        let partition = execution_partition(execution_id, &self.config.partition_config);
2533        let ctx = ExecKeyContext::new(&partition, execution_id);
2534        let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2535
2536        // Route on the dedicated stream client, same as tail. A 10_000-
2537        // frame XRANGE reply on the main mux would stall every other
2538        // FCALL behind reply serialization.
2539        let result = ff_script::functions::stream::ff_read_attempt_stream(
2540            &self.tail_client, &keys, &args,
2541        )
2542        .await
2543        .map_err(script_error_to_server);
2544
2545        drop(permit);
2546
2547        match result? {
2548            ReadFramesResult::Frames(f) => Ok(f),
2549        }
2550    }
2551
2552    /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
2553    /// plus the terminal signal so a polling consumer can exit when the
2554    /// producer closes the stream.
2555    ///
2556    /// `last_id` is exclusive — XREAD returns entries with id > last_id.
2557    /// Pass `"0-0"` to read from the beginning.
2558    ///
2559    /// `block_ms == 0` → non-blocking peek (returns immediately).
2560    /// `block_ms > 0`  → blocks up to that many ms. Empty `frames` +
2561    /// `closed_at=None` → timeout, no new data, still open.
2562    ///
2563    /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
2564    ///
2565    /// Implemented as a direct XREAD command (not FCALL) because blocking
2566    /// commands are rejected inside Valkey Functions. The terminal
2567    /// markers come from a companion HMGET on `stream_meta` — see
2568    /// `ff_script::stream_tail` module docs.
2569    pub async fn tail_attempt_stream(
2570        &self,
2571        execution_id: &ExecutionId,
2572        attempt_index: AttemptIndex,
2573        last_id: &str,
2574        block_ms: u64,
2575        count_limit: u64,
2576    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2577        if count_limit == 0 {
2578            return Err(ServerError::InvalidInput(
2579                "count_limit must be >= 1".to_owned(),
2580            ));
2581        }
2582
2583        // Non-blocking permit acquisition on the shared stream_semaphore
2584        // (read + tail split the same pool). If the server is already at
2585        // the `max_concurrent_stream_ops` ceiling, return `TailUnavailable`
2586        // (→ 429) rather than queueing — a queued tail holds the caller's
2587        // HTTP request open with no upper bound, which is exactly the
2588        // resource-exhaustion pattern this limit exists to prevent.
2589        // Clients retry with backoff on 429.
2590        //
2591        // Worst-case permit hold: if the producer closes the stream via
2592        // HSET on stream_meta (not an XADD), the XREAD BLOCK won't wake
2593        // until `block_ms` elapses — so a permit can be held for up to
2594        // the caller's block_ms even though the terminal signal is
2595        // ready. This is a v1-accepted limitation; RFC-006 §Terminal-
2596        // signal timing under active tail documents it and sketches the
2597        // v2 sentinel-XADD upgrade path.
2598        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2599            Ok(p) => p,
2600            Err(tokio::sync::TryAcquireError::NoPermits) => {
2601                return Err(ServerError::ConcurrencyLimitExceeded(
2602                    "stream_ops",
2603                    self.config.max_concurrent_stream_ops,
2604                ));
2605            }
2606            Err(tokio::sync::TryAcquireError::Closed) => {
2607                return Err(ServerError::OperationFailed(
2608                    "stream semaphore closed (server shutting down)".into(),
2609                ));
2610            }
2611        };
2612
2613        let partition = execution_partition(execution_id, &self.config.partition_config);
2614        let ctx = ExecKeyContext::new(&partition, execution_id);
2615        let stream_key = ctx.stream(attempt_index);
2616        let stream_meta_key = ctx.stream_meta(attempt_index);
2617
2618        // Acquire the XREAD BLOCK serializer AFTER the stream semaphore.
2619        // Nesting order matters: the semaphore is the user-visible
2620        // ceiling (surfaces as 429), the Mutex is an internal fairness
2621        // gate. Holding the permit while waiting on the Mutex means the
2622        // ceiling still bounds queue depth. See the field docstring for
2623        // the full rationale (ferriskey pipeline FIFO + client-side
2624        // per-call timeout race).
2625        let _xread_guard = self.xread_block_lock.lock().await;
2626
2627        let result = ff_script::stream_tail::xread_block(
2628            &self.tail_client,
2629            &stream_key,
2630            &stream_meta_key,
2631            last_id,
2632            block_ms,
2633            count_limit,
2634        )
2635        .await
2636        .map_err(script_error_to_server);
2637
2638        drop(_xread_guard);
2639        drop(permit);
2640        result
2641    }
2642
2643    /// Graceful shutdown — stops scanners, drains background handler tasks
2644    /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
2645    /// for scanners to finish.
2646    ///
2647    /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
2648    /// cleanly without new arrivals piling up:
2649    ///
2650    /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
2651    ///    with `ServerError::OperationFailed("stream semaphore closed …")`
2652    ///    which the REST layer surfaces as a 500 with `retryable=false`
2653    ///    (ops tooling may choose to wait + retry on 503-class responses;
2654    ///    the body clearly names the shutdown reason).
2655    /// 2. Drain handler-spawned background tasks with a 15s ceiling.
2656    /// 3. `engine.shutdown()` stops scanners.
2657    ///
2658    /// Existing in-flight tails finish on their natural `block_ms`
2659    /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
2660    /// is dropped after this function returns. We do NOT wait for tails
2661    /// to drain explicitly — the semaphore-close + natural-timeout
2662    /// combination bounds shutdown to roughly `block_ms + 15s` in the
2663    /// worst case. Callers observing a dropped connection retry against
2664    /// whatever replacement is coming up.
2665    pub async fn shutdown(self) {
2666        tracing::info!("shutting down FlowFabric server");
2667
2668        // Step 1: Close the stream semaphore FIRST so any in-flight
2669        // read/tail calls that are between `try_acquire` and their
2670        // Valkey command still hold a valid permit, but no NEW stream
2671        // op can start. `Semaphore::close()` is idempotent.
2672        self.stream_semaphore.close();
2673        tracing::info!(
2674            "stream semaphore closed; no new read/tail attempts will be accepted"
2675        );
2676
2677        // Step 2: Drain handler-spawned background tasks with the same
2678        // ceiling as Engine::shutdown. If dispatch is still running at
2679        // the deadline, drop the JoinSet to abort remaining tasks.
2680        let drain_timeout = Duration::from_secs(15);
2681        let background = self.background_tasks.clone();
2682        let drain = async move {
2683            let mut guard = background.lock().await;
2684            while guard.join_next().await.is_some() {}
2685        };
2686        match tokio::time::timeout(drain_timeout, drain).await {
2687            Ok(()) => {}
2688            Err(_) => {
2689                tracing::warn!(
2690                    timeout_s = drain_timeout.as_secs(),
2691                    "shutdown: background tasks did not finish in time, aborting"
2692                );
2693                self.background_tasks.lock().await.abort_all();
2694            }
2695        }
2696
2697        self.engine.shutdown().await;
2698        tracing::info!("FlowFabric server shutdown complete");
2699    }
2700}
2701
2702// ── Valkey version check (RFC-011 §13) ──
2703
2704/// Minimum Valkey version the engine requires (see RFC-011 §13). 7.2 is the
2705/// release where Valkey Functions and RESP3 stabilized — the primitives the
2706/// co-location design and typed FCALL wrappers actually depend on.
2707const REQUIRED_VALKEY_MAJOR: u32 = 7;
2708const REQUIRED_VALKEY_MINOR: u32 = 2;
2709
2710/// Upper bound on the rolling-upgrade retry window (RFC-011 §9.17). A Valkey
2711/// node cycling through SIGTERM → restart typically completes in well under
2712/// 60s; this budget is generous without letting a truly-stuck cluster hang
2713/// boot indefinitely.
2714const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2715
2716/// Verify the connected Valkey reports a version ≥ 7.2.
2717///
2718/// Per RFC-011 §9.17, during a rolling upgrade the node we happen to connect
2719/// to may temporarily be pre-upgrade while others are post-upgrade. The check
2720/// tolerates this by retrying the whole verification (including low-version
2721/// responses) with exponential backoff, capped at a 60s budget.
2722///
2723/// **Retries on:**
2724/// - Low-version responses (below `(REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR)`)
2725///   — may resolve as the rolling upgrade progresses onto the connected node.
2726/// - Retryable ferriskey transport errors — connection refused,
2727///   `BusyLoadingError`, `ClusterDown`, etc., classified via
2728///   `ff_script::retry::is_retryable_kind`.
2729/// - Missing/unparsable version field — treated as transient (fresh-boot
2730///   server may not have the INFO fields populated yet). Reads
2731///   `valkey_version` when present (authoritative on Valkey 8.0+), falls
2732///   back to `redis_version` for Valkey 7.x.
2733///
2734/// **Does NOT retry on:**
2735/// - Non-retryable transport errors (auth failures, permission denied,
2736///   invalid client config) — these are operator misconfiguration, not
2737///   transient cluster state; fast-fail preserves a clear signal.
2738///
2739/// On budget exhaustion, returns the last observed error.
2740async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2741    let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2742    let mut backoff = Duration::from_millis(200);
2743    loop {
2744        let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2745            match query_valkey_version(client).await {
2746                Ok((detected_major, detected_minor))
2747                    if (detected_major, detected_minor)
2748                        >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2749                {
2750                    tracing::info!(
2751                        detected_major,
2752                        detected_minor,
2753                        required_major = REQUIRED_VALKEY_MAJOR,
2754                        required_minor = REQUIRED_VALKEY_MINOR,
2755                        "Valkey version accepted"
2756                    );
2757                    return Ok(());
2758                }
2759                Ok((detected_major, detected_minor)) => (
2760                    // Low version — may be a rolling-upgrade stale node.
2761                    // Retry within budget; after exhaustion, the cluster
2762                    // is misconfigured and fast-fail is the correct signal.
2763                    true,
2764                    ServerError::ValkeyVersionTooLow {
2765                        detected: format!("{detected_major}.{detected_minor}"),
2766                        required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2767                    },
2768                    format!(
2769                        "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2770                    ),
2771                ),
2772                Err(e) => {
2773                    // Only retry if the underlying Valkey error is retryable
2774                    // by kind. Auth / permission / invalid-config should fast-
2775                    // fail so operators see the true root cause immediately,
2776                    // not a 60s hang followed by a generic "transient" error.
2777                    let retryable = e
2778                        .backend_kind()
2779                        .map(|k| k.is_retryable())
2780                        // Non-backend errors (parse, missing field, operation
2781                        // failures) are treated as transient — a fresh-boot
2782                        // Valkey may not have redis_version populated yet.
2783                        .unwrap_or(true);
2784                    let detail = e.to_string();
2785                    (retryable, e, detail)
2786                }
2787            };
2788
2789        if !should_retry {
2790            return Err(err_for_budget_exhaust);
2791        }
2792        if tokio::time::Instant::now() >= deadline {
2793            return Err(err_for_budget_exhaust);
2794        }
2795        tracing::warn!(
2796            backoff_ms = backoff.as_millis() as u64,
2797            detail = %log_detail,
2798            "valkey version check transient failure; retrying"
2799        );
2800        tokio::time::sleep(backoff).await;
2801        backoff = (backoff * 2).min(Duration::from_secs(5));
2802    }
2803}
2804
2805/// Run `INFO server` and extract the `(major, minor)` components of the
2806/// Valkey version.
2807///
2808/// Returns `Err` on transport errors, missing field, or unparsable version.
2809/// Handles three response shapes:
2810///
2811/// - **Standalone:** single string body; parse directly.
2812/// - **Cluster (RESP3 map):** `INFO` returns a map keyed by node address;
2813///   every node runs the same version in a healthy deployment, so we pick
2814///   one entry and parse it. Divergent versions during a rolling upgrade
2815///   are handled by the outer retry loop.
2816/// - **Empty / unexpected:** surfaces as `OperationFailed` with context.
2817async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2818    let raw: Value = client
2819        .cmd("INFO")
2820        .arg("server")
2821        .execute()
2822        .await
2823        .map_err(|e| crate::server::backend_context(e, "INFO server"))?;
2824    let bodies = extract_info_bodies(&raw)?;
2825    // Cluster: return the minimum (major, minor) across all nodes so a stale
2826    // pre-upgrade replica cannot hide behind an already-upgraded primary.
2827    // Standalone: exactly one body. The outer retry loop tolerates rolling
2828    // upgrades — a briefly-low minimum gets retried; a persistently-low one
2829    // exits with the structured floor error.
2830    let mut min_version: Option<(u32, u32)> = None;
2831    for body in &bodies {
2832        let version = parse_valkey_version(body)?;
2833        min_version = Some(match min_version {
2834            None => version,
2835            Some(existing) => existing.min(version),
2836        });
2837    }
2838    min_version.ok_or_else(|| {
2839        ServerError::OperationFailed(
2840            "valkey version check: cluster INFO returned no node bodies".into(),
2841        )
2842    })
2843}
2844
2845/// Normalize an `INFO server` response to one string body per node.
2846///
2847/// Standalone returns a single body. Cluster (RESP3 map keyed by node address)
2848/// returns every node's body — the caller must consider all of them to reject
2849/// a mixed-version cluster where one stale node is below the floor.
2850fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2851    match raw {
2852        Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2853        Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2854        Value::SimpleString(s) => Ok(vec![s.clone()]),
2855        Value::Map(entries) => {
2856            if entries.is_empty() {
2857                return Err(ServerError::OperationFailed(
2858                    "valkey version check: cluster INFO returned empty map".into(),
2859                ));
2860            }
2861            let mut out = Vec::with_capacity(entries.len());
2862            for (_, body) in entries {
2863                out.extend(extract_info_bodies(body)?);
2864            }
2865            Ok(out)
2866        }
2867        other => Err(ServerError::OperationFailed(format!(
2868            "valkey version check: unexpected INFO shape: {other:?}"
2869        ))),
2870    }
2871}
2872
2873/// Extract the `(major, minor)` components of the Valkey version from an
2874/// `INFO server` response body. Pure parser — pulled out of
2875/// [`query_valkey_version`] so it is unit-testable without a live Valkey.
2876///
2877/// **Prefers `valkey_version:`** (introduced in Valkey 8.0+; this is the real
2878/// server version on 8.x/9.x, which pin `redis_version:7.2.4` for
2879/// Redis-client compatibility).
2880///
2881/// **Falls back to `redis_version:` only when the body carries an affirmative
2882/// `server_name:valkey` field.** `server_name:` was introduced in Valkey 7.2,
2883/// which is our floor — so every floor-compliant Valkey deployment carries
2884/// the marker. Redis does not emit `server_name:valkey`, which is how we
2885/// reject a Redis backend that would otherwise look identical to Valkey 7.x
2886/// at the `redis_version:` level.
2887fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2888    let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2889        let trimmed = line.trim();
2890        let mut parts = trimmed.split('.');
2891        let major_str = parts.next().unwrap_or("").trim();
2892        if major_str.is_empty() {
2893            return Err(ServerError::OperationFailed(format!(
2894                "valkey version check: empty version field in '{trimmed}'"
2895            )));
2896        }
2897        let major = major_str.parse::<u32>().map_err(|_| {
2898            ServerError::OperationFailed(format!(
2899                "valkey version check: non-numeric major in '{trimmed}'"
2900            ))
2901        })?;
2902        // Minor is required — a bare major ("7") cannot be compared against
2903        // the (major, minor) floor reliably. Valkey always reports
2904        // major.minor.patch for INFO, so missing minor is a real parse error.
2905        let minor_str = parts.next().unwrap_or("").trim();
2906        if minor_str.is_empty() {
2907            return Err(ServerError::OperationFailed(format!(
2908                "valkey version check: missing minor component in '{trimmed}'"
2909            )));
2910        }
2911        let minor = minor_str.parse::<u32>().map_err(|_| {
2912            ServerError::OperationFailed(format!(
2913                "valkey version check: non-numeric minor in '{trimmed}'"
2914            ))
2915        })?;
2916        Ok((major, minor))
2917    };
2918    // Prefer valkey_version (authoritative on Valkey 8.0+).
2919    if let Some(valkey_line) = info
2920        .lines()
2921        .find_map(|line| line.strip_prefix("valkey_version:"))
2922    {
2923        return extract_major_minor(valkey_line);
2924    }
2925    // No valkey_version — could be Valkey 7.2 (which doesn't emit the field)
2926    // or could be Redis. Require an affirmative server_name:valkey marker
2927    // before falling back to redis_version. This rejects Redis backends,
2928    // which don't emit server_name:valkey.
2929    let server_is_valkey = info
2930        .lines()
2931        .map(str::trim)
2932        .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
2933    if !server_is_valkey {
2934        return Err(ServerError::OperationFailed(
2935            "valkey version check: INFO missing valkey_version and server_name:valkey marker \
2936             (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
2937                .into(),
2938        ));
2939    }
2940    // Valkey 7.x fallback. 8.x+ pins redis_version:7.2.4 for Redis-client
2941    // compat, so reading it there would under-report — but 8.x+ is handled
2942    // by the valkey_version branch above, so we only reach here on 7.x.
2943    if let Some(redis_line) = info
2944        .lines()
2945        .find_map(|line| line.strip_prefix("redis_version:"))
2946    {
2947        return extract_major_minor(redis_line);
2948    }
2949    Err(ServerError::OperationFailed(
2950        "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
2951            .into(),
2952    ))
2953}
2954
2955// ── Partition config validation ──
2956
2957/// Validate or create the `ff:config:partitions` key on first boot.
2958///
2959/// If the key exists, its values must match the server's config.
2960/// If it doesn't exist, create it (first boot).
2961async fn validate_or_create_partition_config(
2962    client: &Client,
2963    config: &PartitionConfig,
2964) -> Result<(), ServerError> {
2965    let key = keys::global_config_partitions();
2966
2967    let existing: HashMap<String, String> = client
2968        .hgetall(&key)
2969        .await
2970        .map_err(|e| crate::server::backend_context(e, format!("HGETALL {key}")))?;
2971
2972    if existing.is_empty() {
2973        // First boot — create the config
2974        tracing::info!("first boot: creating {key}");
2975        client
2976            .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
2977            .await
2978            .map_err(|e| crate::server::backend_context(e, "HSET num_flow_partitions"))?;
2979        client
2980            .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
2981            .await
2982            .map_err(|e| crate::server::backend_context(e, "HSET num_budget_partitions"))?;
2983        client
2984            .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
2985            .await
2986            .map_err(|e| crate::server::backend_context(e, "HSET num_quota_partitions"))?;
2987        return Ok(());
2988    }
2989
2990    // Validate existing config matches
2991    let check = |field: &str, expected: u16| -> Result<(), ServerError> {
2992        let stored: u16 = existing
2993            .get(field)
2994            .and_then(|v| v.parse().ok())
2995            .unwrap_or(0);
2996        if stored != expected {
2997            return Err(ServerError::PartitionMismatch(format!(
2998                "{field}: stored={stored}, config={expected}. \
2999                 Partition counts are fixed at deployment time. \
3000                 Either fix your config or migrate the data."
3001            )));
3002        }
3003        Ok(())
3004    };
3005
3006    check("num_flow_partitions", config.num_flow_partitions)?;
3007    check("num_budget_partitions", config.num_budget_partitions)?;
3008    check("num_quota_partitions", config.num_quota_partitions)?;
3009
3010    tracing::info!("partition config validated against stored {key}");
3011    Ok(())
3012}
3013
3014// ── Waitpoint HMAC secret bootstrap (RFC-004 §Waitpoint Security) ──
3015
3016/// Stable initial kid written on first boot. Rotation promotes to k2, k3, ...
3017/// The kid is stored alongside the secret in every partition's hash so each
3018/// FCALL can self-identify which secret produced a given token.
3019const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3020
3021/// Per-partition outcome of the HMAC bootstrap step. Collected across the
3022/// parallel scan so we can emit aggregated logs once at the end.
3023enum PartitionBootOutcome {
3024    /// Partition already had a matching (kid, secret) pair.
3025    Match,
3026    /// Stored secret diverges from env — likely operator rotation; kept.
3027    Mismatch,
3028    /// Torn write (current_kid present, secret:<kid> missing); repaired.
3029    Repaired,
3030    /// Fresh partition; atomically installed env secret under kid=k1.
3031    Installed,
3032}
3033
3034/// Bounded in-flight concurrency for the startup fan-out. Large enough to
3035/// turn a 256-partition install from ~15s sequential into ~1s on cross-AZ
3036/// Valkey, small enough to leave a cold cluster breathing room for other
3037/// Server::start work (library load, engine scanner spawn).
3038const BOOT_INIT_CONCURRENCY: usize = 16;
3039
3040async fn init_one_partition(
3041    client: &Client,
3042    partition: Partition,
3043    secret_hex: &str,
3044) -> Result<PartitionBootOutcome, ServerError> {
3045    let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3046
3047    // Probe for an existing install. Fast path (fresh partition): HGET
3048    // returns nil and we fall through to the atomic install below. Slow
3049    // path: secret:<stored_kid> is then HGET'd once we know the kid name.
3050    // (A previous version used a 2-field HMGET that included a fake
3051    // `secret:probe` placeholder — vestigial from an abandoned
3052    // optimization attempt, and confusing to read. Collapsed back to a
3053    // single-field HGET.)
3054    let stored_kid: Option<String> = client
3055        .cmd("HGET")
3056        .arg(&key)
3057        .arg("current_kid")
3058        .execute()
3059        .await
3060        .map_err(|e| crate::server::backend_context(e, format!("HGET {key} current_kid (init probe)")))?;
3061
3062    if let Some(stored_kid) = stored_kid {
3063        // We didn't know the stored kid up front, so now HGET the real
3064        // secret:<stored_kid> field. Two round-trips in the slow path; the
3065        // fast path (fresh partition) stays at one.
3066        let field = format!("secret:{stored_kid}");
3067        let stored_secret: Option<String> = client
3068            .hget(&key, &field)
3069            .await
3070            .map_err(|e| crate::server::backend_context(e, format!("HGET {key} secret:<kid> (init check)")))?;
3071        if stored_secret.is_none() {
3072            // Torn write from a previous boot: current_kid present but
3073            // secret:<kid> missing. Without repair, mint returns
3074            // "hmac_secret_not_initialized" on that partition forever.
3075            // Repair in place with env secret. Not rotation — rotation
3076            // always writes the secret first.
3077            client
3078                .hset(&key, &field, secret_hex)
3079                .await
3080                .map_err(|e| crate::server::backend_context(e, format!("HSET {key} secret:<kid> (repair torn write)")))?;
3081            return Ok(PartitionBootOutcome::Repaired);
3082        }
3083        if stored_secret.as_deref() != Some(secret_hex) {
3084            return Ok(PartitionBootOutcome::Mismatch);
3085        }
3086        return Ok(PartitionBootOutcome::Match);
3087    }
3088
3089    // Fresh partition — install current_kid + secret:<kid> atomically in
3090    // one HSET. Multi-field HSET is single-command atomic, so a crash
3091    // can't leave current_kid without its secret.
3092    let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3093    let _: i64 = client
3094        .cmd("HSET")
3095        .arg(&key)
3096        .arg("current_kid")
3097        .arg(WAITPOINT_HMAC_INITIAL_KID)
3098        .arg(&secret_field)
3099        .arg(secret_hex)
3100        .execute()
3101        .await
3102        .map_err(|e| crate::server::backend_context(e, format!("HSET {key} (init waitpoint HMAC atomic)")))?;
3103    Ok(PartitionBootOutcome::Installed)
3104}
3105
3106/// Install the waitpoint HMAC secret on every execution partition.
3107///
3108/// Parallelized fan-out with bounded in-flight concurrency
3109/// (`BOOT_INIT_CONCURRENCY`) so 256-partition boots finish in ~1s instead
3110/// of ~15s sequential — the prior sequential loop was tight on K8s
3111/// `initialDelaySeconds=30` defaults, especially cross-AZ. Fail-fast:
3112/// the first per-partition error aborts boot.
3113///
3114/// Outcomes aggregate into mismatch/repaired counts (logged once at end)
3115/// so operators see a single loud warning per fault class instead of 256
3116/// per-partition lines.
3117async fn initialize_waitpoint_hmac_secret(
3118    client: &Client,
3119    partition_config: &PartitionConfig,
3120    secret_hex: &str,
3121) -> Result<(), ServerError> {
3122    use futures::stream::{FuturesUnordered, StreamExt};
3123
3124    let n = partition_config.num_flow_partitions;
3125    tracing::info!(
3126        partitions = n,
3127        concurrency = BOOT_INIT_CONCURRENCY,
3128        "installing waitpoint HMAC secret across {n} execution partitions"
3129    );
3130
3131    let mut mismatch_count: u16 = 0;
3132    let mut repaired_count: u16 = 0;
3133    let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3134    let mut next_index: u16 = 0;
3135
3136    loop {
3137        while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3138            let partition = Partition {
3139                family: PartitionFamily::Execution,
3140                index: next_index,
3141            };
3142            let client = client.clone();
3143            let secret_hex = secret_hex.to_owned();
3144            pending.push(async move {
3145                init_one_partition(&client, partition, &secret_hex).await
3146            });
3147            next_index += 1;
3148        }
3149        match pending.next().await {
3150            Some(res) => match res? {
3151                PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3152                PartitionBootOutcome::Mismatch => mismatch_count += 1,
3153                PartitionBootOutcome::Repaired => repaired_count += 1,
3154            },
3155            None => break,
3156        }
3157    }
3158
3159    if repaired_count > 0 {
3160        tracing::warn!(
3161            repaired_partitions = repaired_count,
3162            total_partitions = n,
3163            "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3164             (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3165        );
3166    }
3167
3168    if mismatch_count > 0 {
3169        tracing::warn!(
3170            mismatched_partitions = mismatch_count,
3171            total_partitions = n,
3172            "stored/env secret mismatch on {mismatch_count} partitions — \
3173             env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3174             run POST /v1/admin/rotate-waitpoint-secret to sync"
3175        );
3176    }
3177
3178    tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3179    Ok(())
3180}
3181
3182/// Result of a waitpoint HMAC secret rotation across all execution partitions.
3183#[derive(Debug, Clone, serde::Serialize)]
3184pub struct RotateWaitpointSecretResult {
3185    /// Count of partitions that accepted the rotation.
3186    pub rotated: u16,
3187    /// Partition indices that failed — operator should investigate (Valkey
3188    /// outage, auth failure, cluster split). Rotation is idempotent, so a
3189    /// re-run after the underlying fault clears converges to the correct
3190    /// state.
3191    pub failed: Vec<u16>,
3192    /// New kid installed as current.
3193    pub new_kid: String,
3194}
3195
3196impl Server {
3197    /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
3198    /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
3199    /// as the new current kid. Idempotent: re-running with the same `new_kid`
3200    /// and `new_secret_hex` converges partitions to the same state.
3201    ///
3202    /// Returns a structured result so operators can see which partitions failed.
3203    /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
3204    pub async fn rotate_waitpoint_secret(
3205        &self,
3206        new_kid: &str,
3207        new_secret_hex: &str,
3208    ) -> Result<RotateWaitpointSecretResult, ServerError> {
3209        if new_kid.is_empty() || new_kid.contains(':') {
3210            return Err(ServerError::OperationFailed(
3211                "new_kid must be non-empty and must not contain ':'".into(),
3212            ));
3213        }
3214        if new_secret_hex.is_empty()
3215            || !new_secret_hex.len().is_multiple_of(2)
3216            || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3217        {
3218            return Err(ServerError::OperationFailed(
3219                "new_secret_hex must be a non-empty even-length hex string".into(),
3220            ));
3221        }
3222
3223        // Single-writer gate. Concurrent rotates against the SAME operator
3224        // token are an attack pattern (or a retry-loop bug); legitimate
3225        // operators rotate monthly and can afford to serialize. Contention
3226        // returns ConcurrencyLimitExceeded("admin_rotate", 1) (→ HTTP 429
3227        // with a labelled error body) rather than queueing the HTTP
3228        // handler past the 120s endpoint timeout. Permit is held for
3229        // the full partition fan-out.
3230        let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3231            Ok(p) => p,
3232            Err(tokio::sync::TryAcquireError::NoPermits) => {
3233                return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3234            }
3235            Err(tokio::sync::TryAcquireError::Closed) => {
3236                return Err(ServerError::OperationFailed(
3237                    "admin rotate semaphore closed (server shutting down)".into(),
3238                ));
3239            }
3240        };
3241
3242        let n = self.config.partition_config.num_flow_partitions;
3243        // "now" is derived inside the FCALL from `redis.call("TIME")`
3244        // (consistency with validate_waitpoint_token and flow scanners);
3245        // grace_ms is a duration — safe to carry from config.
3246        let grace_ms = self.config.waitpoint_hmac_grace_ms;
3247
3248        // Parallelize the rotation fan-out with the same bounded
3249        // concurrency as boot init (BOOT_INIT_CONCURRENCY = 16). A 256-
3250        // partition sequential rotation takes ~7.7s at 30ms cross-AZ RTT,
3251        // uncomfortably close to the 120s HTTP endpoint timeout under
3252        // contention. Atomicity per partition now lives inside the
3253        // `ff_rotate_waitpoint_hmac_secret` FCALL (FCALL is atomic per
3254        // shard); parallelism across DIFFERENT partitions is safe. The
3255        // outer `admin_rotate_semaphore(1)` bounds server-wide concurrent
3256        // rotations, so this fan-out only affects a single in-flight
3257        // rotate call at a time.
3258        use futures::stream::{FuturesUnordered, StreamExt};
3259
3260        let mut rotated = 0u16;
3261        let mut failed = Vec::new();
3262        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3263        let mut next_index: u16 = 0;
3264
3265        loop {
3266            while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3267                let partition = Partition {
3268                    family: PartitionFamily::Execution,
3269                    index: next_index,
3270                };
3271                let idx = next_index;
3272                // Clone only what the per-partition future needs. The
3273                // new_kid / new_secret_hex references outlive the loop
3274                // (they come from the enclosing function args), but
3275                // FuturesUnordered needs 'static futures. Own the strings.
3276                let new_kid_owned = new_kid.to_owned();
3277                let new_secret_owned = new_secret_hex.to_owned();
3278                let partition_owned = partition;
3279                let fut = async move {
3280                    let outcome = self
3281                        .rotate_single_partition(
3282                            &partition_owned,
3283                            &new_kid_owned,
3284                            &new_secret_owned,
3285                            grace_ms,
3286                        )
3287                        .await;
3288                    (idx, partition_owned, outcome)
3289                };
3290                pending.push(fut);
3291                next_index += 1;
3292            }
3293            match pending.next().await {
3294                Some((idx, partition, outcome)) => match outcome {
3295                    Ok(()) => {
3296                        rotated += 1;
3297                        // Per-partition event → DEBUG (not INFO). Rationale:
3298                        // one rotate endpoint call produces 256 partition-level
3299                        // events, which would blow up paid aggregator budgets
3300                        // (Datadog/Splunk) at no operational value. The single
3301                        // aggregated audit event below is the compliance
3302                        // artifact. Failures stay at ERROR with per-partition
3303                        // detail — that's where operators need it.
3304                        tracing::debug!(
3305                            partition = %partition,
3306                            new_kid = %new_kid,
3307                            "waitpoint_hmac_rotated"
3308                        );
3309                    }
3310                    Err(e) => {
3311                        // Failures stay at ERROR (target=audit) per-partition —
3312                        // operators need the partition index + error to debug
3313                        // Valkey/config faults. Low cardinality in practice.
3314                        tracing::error!(
3315                            target: "audit",
3316                            partition = %partition,
3317                            err = %e,
3318                            "waitpoint_hmac_rotation_failed"
3319                        );
3320                        failed.push(idx);
3321                    }
3322                },
3323                None => break,
3324            }
3325        }
3326
3327        // Single aggregated audit event for the whole rotation. This is
3328        // the load-bearing compliance artifact — operators alert on
3329        // target="audit" at INFO level and this is the stable schema.
3330        tracing::info!(
3331            target: "audit",
3332            new_kid = %new_kid,
3333            total_partitions = n,
3334            rotated,
3335            failed_count = failed.len(),
3336            "waitpoint_hmac_rotation_complete"
3337        );
3338
3339        Ok(RotateWaitpointSecretResult {
3340            rotated,
3341            failed,
3342            new_kid: new_kid.to_owned(),
3343        })
3344    }
3345
3346    /// Rotate on a single partition by dispatching the
3347    /// `ff_rotate_waitpoint_hmac_secret` FCALL. FCALL is atomic per shard,
3348    /// so no external SETNX lock is needed — the script itself IS the
3349    /// atomicity boundary. Single source of truth (Lua); the Rust
3350    /// implementation that previously lived here was an exact duplicate
3351    /// and has been removed.
3352    async fn rotate_single_partition(
3353        &self,
3354        partition: &Partition,
3355        new_kid: &str,
3356        new_secret_hex: &str,
3357        grace_ms: u64,
3358    ) -> Result<(), ServerError> {
3359        let idx = IndexKeys::new(partition);
3360        let args = RotateWaitpointHmacSecretArgs {
3361            new_kid: new_kid.to_owned(),
3362            new_secret_hex: new_secret_hex.to_owned(),
3363            grace_ms,
3364        };
3365        let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3366            &self.client,
3367            &idx,
3368            &args,
3369        )
3370        .await
3371        .map_err(|e| match e {
3372            // Same kid + different secret. Map to the same 409-style
3373            // error the old Rust path returned so HTTP callers keep the
3374            // current surface.
3375            ff_script::ScriptError::RotationConflict(kid) => {
3376                ServerError::OperationFailed(format!(
3377                    "rotation conflict: kid {kid} already installed with a \
3378                     different secret. Either use a fresh kid or restore the \
3379                     original secret for this kid before retrying."
3380                ))
3381            }
3382            ff_script::ScriptError::Valkey(v) => crate::server::backend_context(
3383                v,
3384                format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3385            ),
3386            other => ServerError::OperationFailed(format!(
3387                "rotation failed on partition {partition}: {other}"
3388            )),
3389        })?;
3390        // Either outcome is a successful write from the operator's POV.
3391        // Rotated → new install; Noop → idempotent replay.
3392        let _ = outcome;
3393        Ok(())
3394    }
3395}
3396
3397// ── FCALL result parsing ──
3398
3399fn parse_create_result(
3400    raw: &Value,
3401    execution_id: &ExecutionId,
3402) -> Result<CreateExecutionResult, ServerError> {
3403    let arr = match raw {
3404        Value::Array(arr) => arr,
3405        _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3406    };
3407
3408    let status = match arr.first() {
3409        Some(Ok(Value::Int(n))) => *n,
3410        _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3411    };
3412
3413    if status == 1 {
3414        // Check sub-status: OK or DUPLICATE
3415        let sub = arr
3416            .get(1)
3417            .and_then(|v| match v {
3418                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3419                Ok(Value::SimpleString(s)) => Some(s.clone()),
3420                _ => None,
3421            })
3422            .unwrap_or_default();
3423
3424        if sub == "DUPLICATE" {
3425            Ok(CreateExecutionResult::Duplicate {
3426                execution_id: execution_id.clone(),
3427            })
3428        } else {
3429            Ok(CreateExecutionResult::Created {
3430                execution_id: execution_id.clone(),
3431                public_state: PublicState::Waiting,
3432            })
3433        }
3434    } else {
3435        let error_code = arr
3436            .get(1)
3437            .and_then(|v| match v {
3438                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3439                Ok(Value::SimpleString(s)) => Some(s.clone()),
3440                _ => None,
3441            })
3442            .unwrap_or_else(|| "unknown".to_owned());
3443        Err(ServerError::OperationFailed(format!(
3444            "ff_create_execution failed: {error_code}"
3445        )))
3446    }
3447}
3448
3449fn parse_cancel_result(
3450    raw: &Value,
3451    execution_id: &ExecutionId,
3452) -> Result<CancelExecutionResult, ServerError> {
3453    let arr = match raw {
3454        Value::Array(arr) => arr,
3455        _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3456    };
3457
3458    let status = match arr.first() {
3459        Some(Ok(Value::Int(n))) => *n,
3460        _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3461    };
3462
3463    if status == 1 {
3464        Ok(CancelExecutionResult::Cancelled {
3465            execution_id: execution_id.clone(),
3466            public_state: PublicState::Cancelled,
3467        })
3468    } else {
3469        let error_code = arr
3470            .get(1)
3471            .and_then(|v| match v {
3472                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3473                Ok(Value::SimpleString(s)) => Some(s.clone()),
3474                _ => None,
3475            })
3476            .unwrap_or_else(|| "unknown".to_owned());
3477        Err(ServerError::OperationFailed(format!(
3478            "ff_cancel_execution failed: {error_code}"
3479        )))
3480    }
3481}
3482
3483fn parse_budget_create_result(
3484    raw: &Value,
3485    budget_id: &BudgetId,
3486) -> Result<CreateBudgetResult, ServerError> {
3487    let arr = match raw {
3488        Value::Array(arr) => arr,
3489        _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3490    };
3491
3492    let status = match arr.first() {
3493        Some(Ok(Value::Int(n))) => *n,
3494        _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3495    };
3496
3497    if status == 1 {
3498        let sub = arr
3499            .get(1)
3500            .and_then(|v| match v {
3501                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3502                Ok(Value::SimpleString(s)) => Some(s.clone()),
3503                _ => None,
3504            })
3505            .unwrap_or_default();
3506
3507        if sub == "ALREADY_SATISFIED" {
3508            Ok(CreateBudgetResult::AlreadySatisfied {
3509                budget_id: budget_id.clone(),
3510            })
3511        } else {
3512            Ok(CreateBudgetResult::Created {
3513                budget_id: budget_id.clone(),
3514            })
3515        }
3516    } else {
3517        let error_code = arr
3518            .get(1)
3519            .and_then(|v| match v {
3520                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3521                Ok(Value::SimpleString(s)) => Some(s.clone()),
3522                _ => None,
3523            })
3524            .unwrap_or_else(|| "unknown".to_owned());
3525        Err(ServerError::OperationFailed(format!(
3526            "ff_create_budget failed: {error_code}"
3527        )))
3528    }
3529}
3530
3531fn parse_quota_create_result(
3532    raw: &Value,
3533    quota_policy_id: &QuotaPolicyId,
3534) -> Result<CreateQuotaPolicyResult, ServerError> {
3535    let arr = match raw {
3536        Value::Array(arr) => arr,
3537        _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3538    };
3539
3540    let status = match arr.first() {
3541        Some(Ok(Value::Int(n))) => *n,
3542        _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3543    };
3544
3545    if status == 1 {
3546        let sub = arr
3547            .get(1)
3548            .and_then(|v| match v {
3549                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3550                Ok(Value::SimpleString(s)) => Some(s.clone()),
3551                _ => None,
3552            })
3553            .unwrap_or_default();
3554
3555        if sub == "ALREADY_SATISFIED" {
3556            Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3557                quota_policy_id: quota_policy_id.clone(),
3558            })
3559        } else {
3560            Ok(CreateQuotaPolicyResult::Created {
3561                quota_policy_id: quota_policy_id.clone(),
3562            })
3563        }
3564    } else {
3565        let error_code = arr
3566            .get(1)
3567            .and_then(|v| match v {
3568                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3569                Ok(Value::SimpleString(s)) => Some(s.clone()),
3570                _ => None,
3571            })
3572            .unwrap_or_else(|| "unknown".to_owned());
3573        Err(ServerError::OperationFailed(format!(
3574            "ff_create_quota_policy failed: {error_code}"
3575        )))
3576    }
3577}
3578
3579// ── Flow FCALL result parsing ──
3580
3581fn parse_create_flow_result(
3582    raw: &Value,
3583    flow_id: &FlowId,
3584) -> Result<CreateFlowResult, ServerError> {
3585    let arr = match raw {
3586        Value::Array(arr) => arr,
3587        _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3588    };
3589    let status = match arr.first() {
3590        Some(Ok(Value::Int(n))) => *n,
3591        _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3592    };
3593    if status == 1 {
3594        let sub = fcall_field_str(arr, 1);
3595        if sub == "ALREADY_SATISFIED" {
3596            Ok(CreateFlowResult::AlreadySatisfied {
3597                flow_id: flow_id.clone(),
3598            })
3599        } else {
3600            Ok(CreateFlowResult::Created {
3601                flow_id: flow_id.clone(),
3602            })
3603        }
3604    } else {
3605        let error_code = fcall_field_str(arr, 1);
3606        Err(ServerError::OperationFailed(format!(
3607            "ff_create_flow failed: {error_code}"
3608        )))
3609    }
3610}
3611
3612fn parse_add_execution_to_flow_result(
3613    raw: &Value,
3614) -> Result<AddExecutionToFlowResult, ServerError> {
3615    let arr = match raw {
3616        Value::Array(arr) => arr,
3617        _ => {
3618            return Err(ServerError::Script(
3619                "ff_add_execution_to_flow: expected Array".into(),
3620            ))
3621        }
3622    };
3623    let status = match arr.first() {
3624        Some(Ok(Value::Int(n))) => *n,
3625        _ => {
3626            return Err(ServerError::Script(
3627                "ff_add_execution_to_flow: bad status code".into(),
3628            ))
3629        }
3630    };
3631    if status == 1 {
3632        let sub = fcall_field_str(arr, 1);
3633        let eid_str = fcall_field_str(arr, 2);
3634        let nc_str = fcall_field_str(arr, 3);
3635        let eid = ExecutionId::parse(&eid_str)
3636            .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3637        let nc: u32 = nc_str.parse().unwrap_or(0);
3638        if sub == "ALREADY_SATISFIED" {
3639            Ok(AddExecutionToFlowResult::AlreadyMember {
3640                execution_id: eid,
3641                node_count: nc,
3642            })
3643        } else {
3644            Ok(AddExecutionToFlowResult::Added {
3645                execution_id: eid,
3646                new_node_count: nc,
3647            })
3648        }
3649    } else {
3650        let error_code = fcall_field_str(arr, 1);
3651        Err(ServerError::OperationFailed(format!(
3652            "ff_add_execution_to_flow failed: {error_code}"
3653        )))
3654    }
3655}
3656
3657/// Outcome of parsing a raw `ff_cancel_flow` FCALL response.
3658///
3659/// Keeps `AlreadyTerminal` distinct from other script errors so the caller
3660/// can treat cancel on an already-cancelled/completed/failed flow as
3661/// idempotent success instead of surfacing a 400 to the client.
3662enum ParsedCancelFlow {
3663    Cancelled {
3664        policy: String,
3665        member_execution_ids: Vec<String>,
3666    },
3667    AlreadyTerminal,
3668}
3669
3670/// Parse the raw `ff_cancel_flow` FCALL response.
3671///
3672/// Returns [`ParsedCancelFlow::Cancelled`] on success, [`ParsedCancelFlow::AlreadyTerminal`]
3673/// when the flow was already in a terminal state (idempotent retry), or a
3674/// [`ServerError`] for any other failure.
3675fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3676    let arr = match raw {
3677        Value::Array(arr) => arr,
3678        _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3679    };
3680    let status = match arr.first() {
3681        Some(Ok(Value::Int(n))) => *n,
3682        _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3683    };
3684    if status != 1 {
3685        let error_code = fcall_field_str(arr, 1);
3686        if error_code == "flow_already_terminal" {
3687            return Ok(ParsedCancelFlow::AlreadyTerminal);
3688        }
3689        return Err(ServerError::OperationFailed(format!(
3690            "ff_cancel_flow failed: {error_code}"
3691        )));
3692    }
3693    // {1, "OK", cancellation_policy, member1, member2, ...}
3694    let policy = fcall_field_str(arr, 2);
3695    // Iterate to arr.len() rather than breaking on the first empty string —
3696    // safer against malformed Lua responses and clearer than a sentinel loop.
3697    let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3698    for i in 3..arr.len() {
3699        members.push(fcall_field_str(arr, i));
3700    }
3701    Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3702}
3703
3704fn parse_stage_dependency_edge_result(
3705    raw: &Value,
3706) -> Result<StageDependencyEdgeResult, ServerError> {
3707    let arr = match raw {
3708        Value::Array(arr) => arr,
3709        _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3710    };
3711    let status = match arr.first() {
3712        Some(Ok(Value::Int(n))) => *n,
3713        _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3714    };
3715    if status == 1 {
3716        let edge_id_str = fcall_field_str(arr, 2);
3717        let rev_str = fcall_field_str(arr, 3);
3718        let edge_id = EdgeId::parse(&edge_id_str)
3719            .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3720        let rev: u64 = rev_str.parse().unwrap_or(0);
3721        Ok(StageDependencyEdgeResult::Staged {
3722            edge_id,
3723            new_graph_revision: rev,
3724        })
3725    } else {
3726        let error_code = fcall_field_str(arr, 1);
3727        Err(ServerError::OperationFailed(format!(
3728            "ff_stage_dependency_edge failed: {error_code}"
3729        )))
3730    }
3731}
3732
3733fn parse_apply_dependency_result(
3734    raw: &Value,
3735) -> Result<ApplyDependencyToChildResult, ServerError> {
3736    let arr = match raw {
3737        Value::Array(arr) => arr,
3738        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3739    };
3740    let status = match arr.first() {
3741        Some(Ok(Value::Int(n))) => *n,
3742        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3743    };
3744    if status == 1 {
3745        let sub = fcall_field_str(arr, 1);
3746        if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3747            Ok(ApplyDependencyToChildResult::AlreadyApplied)
3748        } else {
3749            // OK status — field at index 2 is unsatisfied count
3750            let count_str = fcall_field_str(arr, 2);
3751            let count: u32 = count_str.parse().unwrap_or(0);
3752            Ok(ApplyDependencyToChildResult::Applied {
3753                unsatisfied_count: count,
3754            })
3755        }
3756    } else {
3757        let error_code = fcall_field_str(arr, 1);
3758        Err(ServerError::OperationFailed(format!(
3759            "ff_apply_dependency_to_child failed: {error_code}"
3760        )))
3761    }
3762}
3763
3764fn parse_deliver_signal_result(
3765    raw: &Value,
3766    signal_id: &SignalId,
3767) -> Result<DeliverSignalResult, ServerError> {
3768    let arr = match raw {
3769        Value::Array(arr) => arr,
3770        _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3771    };
3772    let status = match arr.first() {
3773        Some(Ok(Value::Int(n))) => *n,
3774        _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3775    };
3776    if status == 1 {
3777        let sub = fcall_field_str(arr, 1);
3778        if sub == "DUPLICATE" {
3779            // ok_duplicate(existing_signal_id) → {1, "DUPLICATE", existing_signal_id}
3780            let existing_str = fcall_field_str(arr, 2);
3781            let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3782            Ok(DeliverSignalResult::Duplicate {
3783                existing_signal_id: existing_id,
3784            })
3785        } else {
3786            // ok(signal_id, effect) → {1, "OK", signal_id, effect}
3787            let effect = fcall_field_str(arr, 3);
3788            Ok(DeliverSignalResult::Accepted {
3789                signal_id: signal_id.clone(),
3790                effect,
3791            })
3792        }
3793    } else {
3794        let error_code = fcall_field_str(arr, 1);
3795        Err(ServerError::OperationFailed(format!(
3796            "ff_deliver_signal failed: {error_code}"
3797        )))
3798    }
3799}
3800
3801fn parse_change_priority_result(
3802    raw: &Value,
3803    execution_id: &ExecutionId,
3804) -> Result<ChangePriorityResult, ServerError> {
3805    let arr = match raw {
3806        Value::Array(arr) => arr,
3807        _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3808    };
3809    let status = match arr.first() {
3810        Some(Ok(Value::Int(n))) => *n,
3811        _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3812    };
3813    if status == 1 {
3814        Ok(ChangePriorityResult::Changed {
3815            execution_id: execution_id.clone(),
3816        })
3817    } else {
3818        let error_code = fcall_field_str(arr, 1);
3819        Err(ServerError::OperationFailed(format!(
3820            "ff_change_priority failed: {error_code}"
3821        )))
3822    }
3823}
3824
3825fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3826    let arr = match raw {
3827        Value::Array(arr) => arr,
3828        _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3829    };
3830    let status = match arr.first() {
3831        Some(Ok(Value::Int(n))) => *n,
3832        _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3833    };
3834    if status == 1 {
3835        // ok("0") for normal replay, ok(N) for skipped flow member
3836        let unsatisfied = fcall_field_str(arr, 2);
3837        let ps = if unsatisfied == "0" {
3838            PublicState::Waiting
3839        } else {
3840            PublicState::WaitingChildren
3841        };
3842        Ok(ReplayExecutionResult::Replayed { public_state: ps })
3843    } else {
3844        let error_code = fcall_field_str(arr, 1);
3845        Err(ServerError::OperationFailed(format!(
3846            "ff_replay_execution failed: {error_code}"
3847        )))
3848    }
3849}
3850
3851/// Extract a string from an FCALL result array at the given index.
3852/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
3853/// for transport-level variants. Business-logic variants keep their code as
3854/// `ServerError::Script(String)` so HTTP clients see a stable message.
3855///
3856/// Why this exists: before R2, the stream handlers did
3857/// `ScriptError → format!() → ServerError::Script(String)`, which erased
3858/// the ErrorKind and made `ServerError::is_retryable()` always return
3859/// false. Retry-capable clients (cairn-fabric) would not retry a legit
3860/// transient error like `IoError`.
3861fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3862    match e {
3863        ff_script::error::ScriptError::Valkey(valkey_err) => {
3864            crate::server::backend_context(valkey_err, "stream FCALL transport")
3865        }
3866        other => ServerError::Script(other.to_string()),
3867    }
3868}
3869
3870fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3871    match arr.get(index) {
3872        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3873        Some(Ok(Value::SimpleString(s))) => s.clone(),
3874        Some(Ok(Value::Int(n))) => n.to_string(),
3875        _ => String::new(),
3876    }
3877}
3878
3879/// Parse ff_report_usage_and_check result.
3880/// Standard format: {1, "OK"}, {1, "SOFT_BREACH", dim, current, limit},
3881///                  {1, "HARD_BREACH", dim, current, limit}, {1, "ALREADY_APPLIED"}
3882fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3883    let arr = match raw {
3884        Value::Array(arr) => arr,
3885        _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3886    };
3887    let status_code = match arr.first() {
3888        Some(Ok(Value::Int(n))) => *n,
3889        _ => {
3890            return Err(ServerError::Script(
3891                "ff_report_usage_and_check: expected Int status code".into(),
3892            ))
3893        }
3894    };
3895    if status_code != 1 {
3896        let error_code = fcall_field_str(arr, 1);
3897        return Err(ServerError::OperationFailed(format!(
3898            "ff_report_usage_and_check failed: {error_code}"
3899        )));
3900    }
3901    let sub_status = fcall_field_str(arr, 1);
3902    match sub_status.as_str() {
3903        "OK" => Ok(ReportUsageResult::Ok),
3904        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3905        "SOFT_BREACH" => {
3906            let dim = fcall_field_str(arr, 2);
3907            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3908            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3909            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3910        }
3911        "HARD_BREACH" => {
3912            let dim = fcall_field_str(arr, 2);
3913            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3914            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3915            Ok(ReportUsageResult::HardBreach {
3916                dimension: dim,
3917                current_usage: current,
3918                hard_limit: limit,
3919            })
3920        }
3921        _ => Err(ServerError::OperationFailed(format!(
3922            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3923        ))),
3924    }
3925}
3926
3927fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3928    let arr = match raw {
3929        Value::Array(arr) => arr,
3930        _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3931    };
3932    let status = match arr.first() {
3933        Some(Ok(Value::Int(n))) => *n,
3934        _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3935    };
3936    if status == 1 {
3937        let sub = fcall_field_str(arr, 1);
3938        if sub == "ALREADY_SATISFIED" {
3939            let reason = fcall_field_str(arr, 2);
3940            Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3941        } else {
3942            let lid = fcall_field_str(arr, 2);
3943            let epoch = fcall_field_str(arr, 3);
3944            Ok(RevokeLeaseResult::Revoked {
3945                lease_id: lid,
3946                lease_epoch: epoch,
3947            })
3948        }
3949    } else {
3950        let error_code = fcall_field_str(arr, 1);
3951        Err(ServerError::OperationFailed(format!(
3952            "ff_revoke_lease failed: {error_code}"
3953        )))
3954    }
3955}
3956
3957/// Detect Valkey errors indicating the Lua function library is not loaded.
3958///
3959/// After a failover, the new primary may not have the library if replication
3960/// was incomplete. Valkey returns `ERR Function not loaded` for FCALL calls
3961/// targeting missing functions.
3962fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3963    if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
3964        return true;
3965    }
3966    e.detail()
3967        .map(|d| {
3968            d.contains("Function not loaded")
3969                || d.contains("No matching function")
3970                || d.contains("function not found")
3971        })
3972        .unwrap_or(false)
3973        || e.to_string().contains("Function not loaded")
3974}
3975
3976/// Free-function form of [`Server::fcall_with_reload`] — callable from
3977/// background tasks that own a cloned `Client` but no `&Server`.
3978async fn fcall_with_reload_on_client(
3979    client: &Client,
3980    function: &str,
3981    keys: &[&str],
3982    args: &[&str],
3983) -> Result<Value, ServerError> {
3984    match client.fcall(function, keys, args).await {
3985        Ok(v) => Ok(v),
3986        Err(e) if is_function_not_loaded(&e) => {
3987            tracing::warn!(function, "Lua library not found on server, reloading");
3988            ff_script::loader::ensure_library(client)
3989                .await
3990                .map_err(ServerError::LibraryLoad)?;
3991            client
3992                .fcall(function, keys, args)
3993                .await
3994                .map_err(ServerError::from)
3995        }
3996        Err(e) => Err(ServerError::from(e)),
3997    }
3998}
3999
4000/// Build the `ff_cancel_execution` KEYS (21) and ARGV (5) by pre-reading
4001/// dynamic fields from `exec_core`. Shared by [`Server::cancel_execution`]
4002/// and the async cancel_flow member-dispatch path.
4003async fn build_cancel_execution_fcall(
4004    client: &Client,
4005    partition_config: &PartitionConfig,
4006    args: &CancelExecutionArgs,
4007) -> Result<(Vec<String>, Vec<String>), ServerError> {
4008    let partition = execution_partition(&args.execution_id, partition_config);
4009    let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4010    let idx = IndexKeys::new(&partition);
4011
4012    let lane_str: Option<String> = client
4013        .hget(&ctx.core(), "lane_id")
4014        .await
4015        .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
4016    let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4017
4018    let dyn_fields: Vec<Option<String>> = client
4019        .cmd("HMGET")
4020        .arg(ctx.core())
4021        .arg("current_attempt_index")
4022        .arg("current_waitpoint_id")
4023        .arg("current_worker_instance_id")
4024        .execute()
4025        .await
4026        .map_err(|e| crate::server::backend_context(e, "HMGET cancel pre-read"))?;
4027
4028    let att_idx_val = dyn_fields.first()
4029        .and_then(|v| v.as_ref())
4030        .and_then(|s| s.parse::<u32>().ok())
4031        .unwrap_or(0);
4032    let att_idx = AttemptIndex::new(att_idx_val);
4033    let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4034    let wp_id = if wp_id_str.is_empty() {
4035        WaitpointId::new()
4036    } else {
4037        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4038    };
4039    let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4040    let wiid = WorkerInstanceId::new(&wiid_str);
4041
4042    let keys: Vec<String> = vec![
4043        ctx.core(),                              // 1
4044        ctx.attempt_hash(att_idx),               // 2
4045        ctx.stream_meta(att_idx),                // 3
4046        ctx.lease_current(),                     // 4
4047        ctx.lease_history(),                     // 5
4048        idx.lease_expiry(),                      // 6
4049        idx.worker_leases(&wiid),                // 7
4050        ctx.suspension_current(),                // 8
4051        ctx.waitpoint(&wp_id),                   // 9
4052        ctx.waitpoint_condition(&wp_id),         // 10
4053        idx.suspension_timeout(),                // 11
4054        idx.lane_terminal(&lane),                // 12
4055        idx.attempt_timeout(),                   // 13
4056        idx.execution_deadline(),                // 14
4057        idx.lane_eligible(&lane),                // 15
4058        idx.lane_delayed(&lane),                 // 16
4059        idx.lane_blocked_dependencies(&lane),    // 17
4060        idx.lane_blocked_budget(&lane),          // 18
4061        idx.lane_blocked_quota(&lane),           // 19
4062        idx.lane_blocked_route(&lane),           // 20
4063        idx.lane_blocked_operator(&lane),        // 21
4064    ];
4065    let argv: Vec<String> = vec![
4066        args.execution_id.to_string(),
4067        args.reason.clone(),
4068        args.source.to_string(),
4069        args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4070        args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4071    ];
4072    Ok((keys, argv))
4073}
4074
4075/// Backoff schedule for transient Valkey errors during async cancel_flow
4076/// dispatch. Length = retry-attempt count (including the initial attempt).
4077/// The last entry is not slept on because it's the final attempt.
4078const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4079
4080/// Reach into a `ServerError` for a transport-layer retryability hint.
4081/// Matches the variants that can carry a backend transport fault:
4082/// direct `Backend`, `BackendContext`, and `LibraryLoad` (which
4083/// wraps a ferriskey error internally).
4084///
4085/// Renamed from `extract_valkey_kind` in #88 — the previous return
4086/// type `Option<ferriskey::ErrorKind>` leaked ferriskey into the
4087/// server's internal retry logic. Callers now get a backend-agnostic
4088/// [`BackendErrorKind`] and dispatch via
4089/// [`BackendErrorKind::is_retryable`].
4090fn extract_backend_kind(e: &ServerError) -> Option<ff_core::BackendErrorKind> {
4091    e.backend_kind()
4092}
4093
4094/// Cancel a single member execution from a cancel_flow dispatch context.
4095/// Parses the flow-member EID string, builds the FCALL via the shared helper,
4096/// and executes with the same reload-on-failover semantics as the inline path.
4097///
4098/// Wrapped in a bounded retry loop (see [`CANCEL_MEMBER_RETRY_DELAYS_MS`]) so
4099/// that transient Valkey errors mid-dispatch (failover, `TryAgain`,
4100/// `ClusterDown`, `IoError`, `FatalSendError`) do not silently leak
4101/// non-cancelled members. `FatalReceiveError` and non-retryable kinds bubble
4102/// up immediately — those either indicate the Lua ran server-side anyway or a
4103/// permanent mismatch that retries cannot fix.
4104/// Acknowledge that a member cancel has committed. Fires
4105/// `ff_ack_cancel_member` on `{fp:N}` to SREM the execution from the
4106/// flow's `pending_cancels` set and, if empty, ZREM the flow from the
4107/// partition-level `cancel_backlog`. Best-effort — failures are logged
4108/// but not propagated, since the reconciler will catch anything that
4109/// stays behind on its next pass.
4110async fn ack_cancel_member(
4111    client: &Client,
4112    pending_cancels_key: &str,
4113    cancel_backlog_key: &str,
4114    eid_str: &str,
4115    flow_id: &str,
4116) {
4117    let keys = [pending_cancels_key, cancel_backlog_key];
4118    let args_v = [eid_str, flow_id];
4119    let fut: Result<Value, _> =
4120        client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4121    if let Err(e) = fut {
4122        tracing::warn!(
4123            flow_id = %flow_id,
4124            execution_id = %eid_str,
4125            error = %e,
4126            "ff_ack_cancel_member failed; reconciler will drain on next pass"
4127        );
4128    }
4129}
4130
4131/// Returns true if a cancel_member failure reflects an already-terminal
4132/// (or never-existed) execution, which from the flow-cancel perspective
4133/// is ack-worthy success rather than a partial failure. The Lua
4134/// `ff_cancel_execution` function emits `execution_not_active` when the
4135/// member is already in a terminal phase, and `execution_not_found` when
4136/// the key is gone. Both codes arrive here wrapped in
4137/// `ServerError::OperationFailed("ff_cancel_execution failed: <code>")`
4138/// via `parse_cancel_result`.
4139fn is_terminal_ack_error(err: &ServerError) -> bool {
4140    match err {
4141        ServerError::OperationFailed(msg) => {
4142            msg.contains("execution_not_active") || msg.contains("execution_not_found")
4143        }
4144        _ => false,
4145    }
4146}
4147
4148async fn cancel_member_execution(
4149    client: &Client,
4150    partition_config: &PartitionConfig,
4151    eid_str: &str,
4152    reason: &str,
4153    now: TimestampMs,
4154) -> Result<(), ServerError> {
4155    let execution_id = ExecutionId::parse(eid_str)
4156        .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4157    let args = CancelExecutionArgs {
4158        execution_id: execution_id.clone(),
4159        reason: reason.to_owned(),
4160        source: CancelSource::OperatorOverride,
4161        lease_id: None,
4162        lease_epoch: None,
4163        attempt_id: None,
4164        now,
4165    };
4166
4167    let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4168    for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4169        let is_last = attempt_idx + 1 == attempts;
4170        match try_cancel_member_once(client, partition_config, &args).await {
4171            Ok(()) => return Ok(()),
4172            Err(e) => {
4173                // Only retry transport-layer transients; business-logic
4174                // errors (Script / OperationFailed / NotFound / InvalidInput)
4175                // won't change on retry.
4176                let retryable = extract_backend_kind(&e)
4177                    .map(|k| k.is_retryable())
4178                    .unwrap_or(false);
4179                if !retryable || is_last {
4180                    return Err(e);
4181                }
4182                tracing::debug!(
4183                    execution_id = %execution_id,
4184                    attempt = attempt_idx + 1,
4185                    delay_ms = *delay_ms,
4186                    error = %e,
4187                    "cancel_member_execution: transient error, retrying"
4188                );
4189                tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4190            }
4191        }
4192    }
4193    // Unreachable: the loop above either returns Ok, returns Err on the
4194    // last attempt, or returns Err on a non-retryable error. Keep a
4195    // defensive fallback for future edits to the retry structure.
4196    Err(ServerError::OperationFailed(format!(
4197        "cancel_member_execution: retries exhausted for {execution_id}"
4198    )))
4199}
4200
4201/// Single cancel attempt — pre-read + FCALL + parse. Factored out so the
4202/// retry loop in [`cancel_member_execution`] can invoke it cleanly.
4203async fn try_cancel_member_once(
4204    client: &Client,
4205    partition_config: &PartitionConfig,
4206    args: &CancelExecutionArgs,
4207) -> Result<(), ServerError> {
4208    let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4209    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4210    let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4211    let raw =
4212        fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4213    parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4214}
4215
4216fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4217    let arr = match raw {
4218        Value::Array(arr) => arr,
4219        _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4220    };
4221    let status = match arr.first() {
4222        Some(Ok(Value::Int(n))) => *n,
4223        _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4224    };
4225    if status == 1 {
4226        let next_str = fcall_field_str(arr, 2);
4227        let next_ms: i64 = next_str.parse().unwrap_or(0);
4228        Ok(ResetBudgetResult::Reset {
4229            next_reset_at: TimestampMs::from_millis(next_ms),
4230        })
4231    } else {
4232        let error_code = fcall_field_str(arr, 1);
4233        Err(ServerError::OperationFailed(format!(
4234            "ff_reset_budget failed: {error_code}"
4235        )))
4236    }
4237}
4238
4239#[cfg(test)]
4240mod tests {
4241    use super::*;
4242    use ferriskey::ErrorKind;
4243
4244    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4245        ferriskey::Error::from((kind, "synthetic"))
4246    }
4247
4248    // ── Budget dimension-cap validation (issue #104) ──
4249
4250    #[test]
4251    fn create_budget_rejects_over_cap_dimension_count() {
4252        let n = MAX_BUDGET_DIMENSIONS + 1;
4253        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4254        let hard = vec![1u64; n];
4255        let soft = vec![0u64; n];
4256        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4257        match err {
4258            ServerError::InvalidInput(msg) => {
4259                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4260                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4261                assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4262            }
4263            other => panic!("expected InvalidInput, got {other:?}"),
4264        }
4265    }
4266
4267    #[test]
4268    fn create_budget_accepts_exactly_cap_dimensions() {
4269        let n = MAX_BUDGET_DIMENSIONS;
4270        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4271        let hard = vec![1u64; n];
4272        let soft = vec![0u64; n];
4273        assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4274    }
4275
4276    #[test]
4277    fn create_budget_rejects_hard_limit_length_mismatch() {
4278        let dims = vec!["a".to_string(), "b".to_string()];
4279        let hard = vec![1u64]; // too short
4280        let soft = vec![0u64, 0u64];
4281        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4282        match err {
4283            ServerError::InvalidInput(msg) => {
4284                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4285                assert!(msg.contains("hard_limits=1"), "got: {msg}");
4286                assert!(msg.contains("dimensions=2"), "got: {msg}");
4287            }
4288            other => panic!("expected InvalidInput, got {other:?}"),
4289        }
4290    }
4291
4292    #[test]
4293    fn create_budget_rejects_soft_limit_length_mismatch() {
4294        let dims = vec!["a".to_string(), "b".to_string()];
4295        let hard = vec![1u64, 2u64];
4296        let soft = vec![0u64, 0u64, 0u64]; // too long
4297        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4298        match err {
4299            ServerError::InvalidInput(msg) => {
4300                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4301                assert!(msg.contains("soft_limits=3"), "got: {msg}");
4302            }
4303            other => panic!("expected InvalidInput, got {other:?}"),
4304        }
4305    }
4306
4307    #[test]
4308    fn report_usage_rejects_over_cap_dimension_count() {
4309        let n = MAX_BUDGET_DIMENSIONS + 1;
4310        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4311        let deltas = vec![1u64; n];
4312        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4313        match err {
4314            ServerError::InvalidInput(msg) => {
4315                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4316                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4317            }
4318            other => panic!("expected InvalidInput, got {other:?}"),
4319        }
4320    }
4321
4322    #[test]
4323    fn report_usage_accepts_exactly_cap_dimensions() {
4324        let n = MAX_BUDGET_DIMENSIONS;
4325        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4326        let deltas = vec![1u64; n];
4327        assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4328    }
4329
4330    #[test]
4331    fn report_usage_rejects_delta_length_mismatch() {
4332        let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4333        let deltas = vec![1u64, 2u64]; // too short
4334        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4335        match err {
4336            ServerError::InvalidInput(msg) => {
4337                assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4338                assert!(msg.contains("dimensions=3"), "got: {msg}");
4339                assert!(msg.contains("deltas=2"), "got: {msg}");
4340            }
4341            other => panic!("expected InvalidInput, got {other:?}"),
4342        }
4343    }
4344
4345    #[test]
4346    fn report_usage_accepts_empty_dimensions() {
4347        // Edge case: zero-dimension report_usage is a no-op that should pass
4348        // validation (Lua handles dim_count=0 correctly).
4349        assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4350    }
4351
4352    #[test]
4353    fn is_retryable_backend_variant_uses_kind_table() {
4354        // Transport-bucketed: retryable.
4355        assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
4356        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4357        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4358        // Cluster-bucketed (Moved / Ask / TryAgain / ClusterDown): retryable
4359        // after topology settles — the #88 BackendErrorKind classifier
4360        // treats these as transient cluster-churn, a semantic refinement
4361        // over the previous ff-script retry table.
4362        assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4363        assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4364        assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
4365        assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
4366        // BusyLoading: retryable.
4367        assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4368
4369        // Auth / Protocol / ScriptNotLoaded: terminal.
4370        assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4371        assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4372        assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4373    }
4374
4375    #[test]
4376    fn is_retryable_backend_context_uses_kind_table() {
4377        let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
4378        assert!(err.is_retryable());
4379
4380        let err =
4381            crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
4382        assert!(!err.is_retryable());
4383    }
4384
4385    #[test]
4386    fn is_retryable_library_load_delegates_to_inner_kind() {
4387        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4388            mk_fk_err(ErrorKind::IoError),
4389        ));
4390        assert!(err.is_retryable());
4391
4392        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4393            mk_fk_err(ErrorKind::AuthenticationFailed),
4394        ));
4395        assert!(!err.is_retryable());
4396
4397        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4398            expected: "1".into(),
4399            got: "2".into(),
4400        });
4401        assert!(!err.is_retryable());
4402    }
4403
4404    #[test]
4405    fn is_retryable_business_logic_variants_are_false() {
4406        assert!(!ServerError::NotFound("x".into()).is_retryable());
4407        assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4408        assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4409        assert!(!ServerError::Script("x".into()).is_retryable());
4410        assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4411    }
4412
4413    #[test]
4414    fn backend_kind_delegates_through_library_load() {
4415        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4416            mk_fk_err(ErrorKind::ClusterDown),
4417        ));
4418        assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
4419
4420        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4421            expected: "1".into(),
4422            got: "2".into(),
4423        });
4424        assert_eq!(err.backend_kind(), None);
4425    }
4426
4427    // ── Valkey version check (RFC-011 §13) ──
4428
4429    #[test]
4430    fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4431        // Valkey 8.x+ pins redis_version to 7.2.4 for Redis-client compat
4432        // and exposes the real version in valkey_version. Parser must use
4433        // valkey_version when both are present.
4434        let info = "\
4435# Server\r\n\
4436redis_version:7.2.4\r\n\
4437valkey_version:9.0.3\r\n\
4438server_mode:cluster\r\n\
4439os:Linux\r\n";
4440        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4441    }
4442
4443    #[test]
4444    fn parse_valkey_version_real_valkey_8_cluster_body() {
4445        // Actual INFO server response observed on valkey/valkey:latest in
4446        // cluster mode (CI matrix): redis_version compat-pinned to 7.2.4,
4447        // valkey_version authoritative at 9.0.3.
4448        let info = "\
4449# Server\r\n\
4450redis_version:7.2.4\r\n\
4451server_name:valkey\r\n\
4452valkey_version:9.0.3\r\n\
4453valkey_release_stage:ga\r\n\
4454redis_git_sha1:00000000\r\n\
4455server_mode:cluster\r\n";
4456        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4457    }
4458
4459    #[test]
4460    fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4461        // Valkey 7.x doesn't emit valkey_version, but does emit
4462        // server_name:valkey; parser falls back to redis_version.
4463        let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4464        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4465    }
4466
4467    #[test]
4468    fn parse_valkey_version_rejects_redis_backend() {
4469        // Real Redis emits redis_version: but not server_name:valkey and
4470        // not valkey_version:. Parser must reject this affirmatively so an
4471        // operator pointing ff-server at a Redis instance fails loud instead
4472        // of silently running against an unsupported backend.
4473        let info = "\
4474# Server\r\n\
4475redis_version:7.4.0\r\n\
4476redis_mode:standalone\r\n\
4477os:Linux\r\n";
4478        let err = parse_valkey_version(info).unwrap_err();
4479        assert!(matches!(err, ServerError::OperationFailed(_)));
4480        let msg = err.to_string();
4481        assert!(
4482            msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4483            "expected Redis-rejection message, got: {msg}"
4484        );
4485    }
4486
4487    #[test]
4488    fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4489        // INFO values are conventionally lowercase but be defensive.
4490        let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4491        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4492    }
4493
4494    #[test]
4495    fn parse_valkey_version_errors_when_no_version_field() {
4496        let info = "# Server\r\nfoo:bar\r\n";
4497        let err = parse_valkey_version(info).unwrap_err();
4498        assert!(matches!(err, ServerError::OperationFailed(_)));
4499        assert!(
4500            err.to_string().contains("missing"),
4501            "expected 'missing' in message, got: {err}"
4502        );
4503    }
4504
4505    #[test]
4506    fn parse_valkey_version_errors_on_non_numeric_major() {
4507        let info = "valkey_version:invalid.x.y\n";
4508        let err = parse_valkey_version(info).unwrap_err();
4509        assert!(matches!(err, ServerError::OperationFailed(_)));
4510        assert!(err.to_string().contains("non-numeric major"));
4511    }
4512
4513    #[test]
4514    fn parse_valkey_version_errors_on_non_numeric_minor() {
4515        let info = "valkey_version:7.x.0\n";
4516        let err = parse_valkey_version(info).unwrap_err();
4517        assert!(matches!(err, ServerError::OperationFailed(_)));
4518        assert!(err.to_string().contains("non-numeric minor"));
4519    }
4520
4521    #[test]
4522    fn parse_valkey_version_errors_on_missing_minor() {
4523        // Bare major (no dot) — cannot be compared against a (major, minor)
4524        // floor. Flag as a real parse error.
4525        let info = "valkey_version:7\n";
4526        let err = parse_valkey_version(info).unwrap_err();
4527        assert!(matches!(err, ServerError::OperationFailed(_)));
4528        assert!(err.to_string().contains("missing minor"));
4529    }
4530
4531    #[test]
4532    fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4533        // Simulates cluster-mode INFO response: map of node_addr → body.
4534        // extract_info_bodies must return EVERY node's body so a stale
4535        // pre-upgrade replica cannot hide behind an upgraded primary.
4536        let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4537        let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4538        let map = Value::Map(vec![
4539            (
4540                Value::SimpleString("127.0.0.1:7000".to_string()),
4541                Value::VerbatimString {
4542                    format: ferriskey::value::VerbatimFormat::Text,
4543                    text: body_a.to_string(),
4544                },
4545            ),
4546            (
4547                Value::SimpleString("127.0.0.1:7001".to_string()),
4548                Value::VerbatimString {
4549                    format: ferriskey::value::VerbatimFormat::Text,
4550                    text: body_b.to_string(),
4551                },
4552            ),
4553        ]);
4554        let bodies = extract_info_bodies(&map).unwrap();
4555        assert_eq!(bodies.len(), 2);
4556        assert_eq!(bodies[0], body_a);
4557        assert_eq!(bodies[1], body_b);
4558    }
4559
4560    #[test]
4561    fn extract_info_bodies_handles_simple_string() {
4562        let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4563        let v = Value::SimpleString(body_text.to_string());
4564        let bodies = extract_info_bodies(&v).unwrap();
4565        assert_eq!(bodies, vec![body_text.to_string()]);
4566    }
4567
4568    #[test]
4569    fn extract_info_bodies_rejects_empty_cluster_map() {
4570        let map = Value::Map(vec![]);
4571        let err = extract_info_bodies(&map).unwrap_err();
4572        assert!(matches!(err, ServerError::OperationFailed(_)));
4573        assert!(err.to_string().contains("empty map"));
4574    }
4575
4576    /// End-to-end composition test for the cluster-min fix (issue #84):
4577    /// `extract_info_bodies` → `parse_valkey_version` per node → min-reduce →
4578    /// floor comparison. A mixed-version cluster where one node is 7.1.0 must
4579    /// fail the gate, even if another node is already on 8.0.0 and that
4580    /// node's entry appears first in the map.
4581    #[test]
4582    fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4583        // node1 appears first and is above the floor. Pre-fix behavior
4584        // (first-entry only) would accept. The min across all three nodes is
4585        // (7, 1), below the (7, 2) floor, so the gate must reject.
4586        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4587        let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4588        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4589        let map = Value::Map(vec![
4590            (
4591                Value::SimpleString("node1:6379".to_string()),
4592                Value::VerbatimString {
4593                    format: ferriskey::value::VerbatimFormat::Text,
4594                    text: body_node1.to_string(),
4595                },
4596            ),
4597            (
4598                Value::SimpleString("node2:6379".to_string()),
4599                Value::VerbatimString {
4600                    format: ferriskey::value::VerbatimFormat::Text,
4601                    text: body_node2.to_string(),
4602                },
4603            ),
4604            (
4605                Value::SimpleString("node3:6379".to_string()),
4606                Value::VerbatimString {
4607                    format: ferriskey::value::VerbatimFormat::Text,
4608                    text: body_node3.to_string(),
4609                },
4610            ),
4611        ]);
4612
4613        let bodies = extract_info_bodies(&map).unwrap();
4614        let min = bodies
4615            .iter()
4616            .map(|b| parse_valkey_version(b).unwrap())
4617            .min()
4618            .unwrap();
4619
4620        assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4621        assert!(
4622            min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4623            "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4624        );
4625    }
4626
4627    /// Companion to `parse_valkey_version_min_across_cluster_map_picks_lowest`:
4628    /// when every node is at or above the floor, the min-reduce + gate
4629    /// composition accepts.
4630    #[test]
4631    fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4632        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4633        let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4634        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4635        let map = Value::Map(vec![
4636            (
4637                Value::SimpleString("node1:6379".to_string()),
4638                Value::VerbatimString {
4639                    format: ferriskey::value::VerbatimFormat::Text,
4640                    text: body_node1.to_string(),
4641                },
4642            ),
4643            (
4644                Value::SimpleString("node2:6379".to_string()),
4645                Value::VerbatimString {
4646                    format: ferriskey::value::VerbatimFormat::Text,
4647                    text: body_node2.to_string(),
4648                },
4649            ),
4650            (
4651                Value::SimpleString("node3:6379".to_string()),
4652                Value::VerbatimString {
4653                    format: ferriskey::value::VerbatimFormat::Text,
4654                    text: body_node3.to_string(),
4655                },
4656            ),
4657        ]);
4658
4659        let bodies = extract_info_bodies(&map).unwrap();
4660        let min = bodies
4661            .iter()
4662            .map(|b| parse_valkey_version(b).unwrap())
4663            .min()
4664            .unwrap();
4665
4666        assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4667        assert!(
4668            min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4669            "all-above-floor cluster must pass the gate"
4670        );
4671    }
4672
4673    #[test]
4674    fn valkey_version_too_low_is_not_retryable() {
4675        let err = ServerError::ValkeyVersionTooLow {
4676            detected: "7.0".into(),
4677            required: "7.2".into(),
4678        };
4679        assert!(!err.is_retryable());
4680        assert_eq!(err.backend_kind(), None);
4681    }
4682
4683    #[test]
4684    fn valkey_version_too_low_error_message_includes_both_versions() {
4685        let err = ServerError::ValkeyVersionTooLow {
4686            detected: "7.0".into(),
4687            required: "7.2".into(),
4688        };
4689        let msg = err.to_string();
4690        assert!(msg.contains("7.0"), "detected version in message: {msg}");
4691        assert!(msg.contains("7.2"), "required version in message: {msg}");
4692        assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4693    }
4694}