Skip to main content

ff_server/
server.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9    AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10    CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13    ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14    DeliverSignalArgs, DeliverSignalResult, ExecutionInfo,
15    ListExecutionsPage, PendingWaitpointInfo, ReplayExecutionResult,
16    ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17    RevokeLeaseResult,
18    RotateWaitpointHmacSecretArgs,
19    StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22    self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23    IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26    budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27    PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36/// Upper bound on `member_execution_ids` returned in the
37/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
38/// terminal state (idempotent retry). The first (non-idempotent) cancel call
39/// returns the full list; retries only need a sample.
40const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42/// Re-export of the budget dimension cap.
43///
44/// Defined as the single source of truth in `ff_script::functions::budget` so
45/// the typed FCALL wrappers and the REST boundary cannot silently drift
46/// (PR #106 review). The limit exists to cap FCALL ARGV allocation: both
47/// `create_budget` and `report_usage` build argv whose length is linear in
48/// `dimensions.len()`, so an untrusted caller could otherwise request an
49/// unbounded `Vec` allocation (CodeQL `rust/uncontrolled-allocation-size`,
50/// issue #104).
51pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53/// Validate `create_budget` dimension inputs before building the FCALL argv.
54///
55/// Rejects:
56///   * more than [`MAX_BUDGET_DIMENSIONS`] dimensions (prevents unbounded
57///     `Vec::with_capacity` allocation on attacker-controlled length);
58///   * parallel-array length mismatches between `dimensions`, `hard_limits`,
59///     and `soft_limits` — these are positional inputs the Lua side indexes
60///     by `i = 1..dim_count`, so a mismatch silently corrupts limits rather
61///     than raising.
62fn validate_create_budget_dimensions(
63    dimensions: &[String],
64    hard_limits: &[u64],
65    soft_limits: &[u64],
66) -> Result<(), ServerError> {
67    let dim_count = dimensions.len();
68    if dim_count > MAX_BUDGET_DIMENSIONS {
69        return Err(ServerError::InvalidInput(format!(
70            "too_many_dimensions: limit={}, got={}",
71            MAX_BUDGET_DIMENSIONS, dim_count
72        )));
73    }
74    if hard_limits.len() != dim_count {
75        return Err(ServerError::InvalidInput(format!(
76            "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77            dim_count,
78            hard_limits.len()
79        )));
80    }
81    if soft_limits.len() != dim_count {
82        return Err(ServerError::InvalidInput(format!(
83            "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84            dim_count,
85            soft_limits.len()
86        )));
87    }
88    Ok(())
89}
90
91/// Validate `report_usage` dimension inputs before building the FCALL argv.
92///
93/// Same class of defense as [`validate_create_budget_dimensions`]: caps
94/// argv length and enforces the `dimensions`/`deltas` parallel-array
95/// invariant the Lua side relies on.
96fn validate_report_usage_dimensions(
97    dimensions: &[String],
98    deltas: &[u64],
99) -> Result<(), ServerError> {
100    let dim_count = dimensions.len();
101    if dim_count > MAX_BUDGET_DIMENSIONS {
102        return Err(ServerError::InvalidInput(format!(
103            "too_many_dimensions: limit={}, got={}",
104            MAX_BUDGET_DIMENSIONS, dim_count
105        )));
106    }
107    if deltas.len() != dim_count {
108        return Err(ServerError::InvalidInput(format!(
109            "dimension_delta_array_mismatch: dimensions={} deltas={}",
110            dim_count,
111            deltas.len()
112        )));
113    }
114    Ok(())
115}
116
117/// FlowFabric server — connects everything together.
118///
119/// Manages the Valkey connection, Lua library loading, background scanners,
120/// and provides a minimal API for Phase 1.
121pub struct Server {
122    client: Client,
123    /// Dedicated Valkey connection used EXCLUSIVELY for stream-op calls:
124    /// `xread_block` tails AND `ff_read_attempt_stream` range reads.
125    /// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
126    /// processes commands FIFO on it.
127    ///
128    /// Two head-of-line risks motivate the split from the main client:
129    ///
130    /// * **Blocking**: `XREAD BLOCK 30_000` holds the read side until a
131    ///   new entry arrives or `block_ms` elapses.
132    /// * **Large replies**: `XRANGE … COUNT 10_000` with ~64 KB per
133    ///   frame returns a multi-MB reply serialized on one connection.
134    ///
135    /// Sharing either load with the main client would starve every other
136    /// FCALL (create_execution, claim, rotate_waitpoint_secret,
137    /// budget/quota, admin endpoints) AND every engine scanner.
138    ///
139    /// Kept separate from `client` and from the `Engine` scanner client so
140    /// tail latency cannot couple to foreground API latency or background
141    /// scanner cadence. See RFC-006 Impl Notes for the cascading-failure
142    /// rationale.
143    tail_client: Client,
144    /// Bounds concurrent stream-op calls server-wide — read AND tail
145    /// combined. Each caller acquires one permit for the duration of its
146    /// Valkey round-trip(s); contention surfaces as HTTP 429 at the REST
147    /// boundary, not as a silent queue on the stream connection. Default
148    /// size is `FF_MAX_CONCURRENT_STREAM_OPS` (64; legacy env
149    /// `FF_MAX_CONCURRENT_TAIL` accepted for one release).
150    ///
151    /// Read and tail share the same pool deliberately: they share the
152    /// `tail_client`, so fairness accounting must be unified or a flood
153    /// of one can starve the other. The semaphore is also `close()`d on
154    /// shutdown so no new stream ops can start while existing ones drain
155    /// (see `Server::shutdown`).
156    stream_semaphore: Arc<tokio::sync::Semaphore>,
157    /// Serializes `XREAD BLOCK` calls against `tail_client`.
158    ///
159    /// `ferriskey::Client` is a pipelined multiplexed connection — Valkey
160    /// processes commands FIFO on one socket. `XREAD BLOCK` holds the
161    /// connection's read side for the full `block_ms`, so two parallel
162    /// BLOCKs sent down the same mux serialize: the second waits for the
163    /// first to return before its own BLOCK even begins at the server.
164    /// Meanwhile ferriskey's per-call `request_timeout` (auto-extended to
165    /// `block_ms + 500ms`) starts at future-poll on the CLIENT side, so
166    /// the second call's timeout fires before its turn at the server —
167    /// spurious `timed_out` errors under concurrent tail load.
168    ///
169    /// Explicit serialization around `xread_block` removes the
170    /// silent-failure mode: concurrent tails queue on this Mutex (inside
171    /// an already-acquired semaphore permit), then dispatch one at a
172    /// time with their full `block_ms` budget intact. The semaphore
173    /// ceiling (`max_concurrent_stream_ops`) effectively becomes queue
174    /// depth; throughput on the tail client is 1 BLOCK at a time.
175    ///
176    /// V2 upgrade: a pool of N dedicated `ferriskey::Client` connections
177    /// replacing the single `tail_client` + this Mutex. Deferred; the
178    /// Mutex here is correct v1 behavior.
179    ///
180    /// XRANGE reads (`read_attempt_stream`) are NOT gated by this Mutex —
181    /// XRANGE is non-blocking at the server, so pipelined XRANGEs on one
182    /// mux complete in microseconds each and don't trigger the same
183    /// client-side timeout race. Keeping reads unserialized preserves
184    /// read throughput.
185    xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186    /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
187    /// operators rotate ~monthly and can afford to serialize; concurrent
188    /// rotate requests are an attack or misbehaving script. Holding the
189    /// permit also guards against interleaved partial rotations on the
190    /// Server side of the per-partition locks, surfacing contention as
191    /// HTTP 429 instead of silently queueing and blowing past the 120s
192    /// HTTP timeout. See `rotate_waitpoint_secret` handler.
193    admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194    engine: Engine,
195    config: ServerConfig,
196    /// Long-lived scheduler instance. Held on the server (not rebuilt per
197    /// claim call) so its rotation cursor can advance across calls — a
198    /// fresh-per-call scheduler would reset the cursor on every tick,
199    /// defeating the fairness property (RFC-009 §scan rotation).
200    scheduler: Arc<ff_scheduler::Scheduler>,
201    /// Background tasks spawned by async handlers (e.g. cancel_flow member
202    /// dispatch). Drained on shutdown with a bounded timeout.
203    background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204    /// PR-94: observability registry. Always present; the no-op shim
205    /// takes zero runtime cost when the `observability` feature is
206    /// off, and the real OTEL-backed registry is passed in via
207    /// [`Server::start_with_metrics`] when on. Same `Arc` is shared
208    /// with [`Engine::start_with_metrics`] and
209    /// [`ff_scheduler::Scheduler::with_metrics`] so a single scrape
210    /// sees everything the process produces.
211    metrics: Arc<ff_observability::Metrics>,
212}
213
214/// Server error type.
215#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217    /// Backend transport error. Previously wrapped `ferriskey::Error`
218    /// directly (#88); now carries a backend-agnostic
219    /// [`ff_core::BackendError`] so consumers match on
220    /// [`ff_core::BackendErrorKind`] instead of ferriskey's native
221    /// taxonomy. The ferriskey → [`ff_core::BackendError`] mapping
222    /// lives in `ff_backend_valkey::backend_error_from_ferriskey`.
223    #[error("backend: {0}")]
224    Backend(#[from] ff_core::BackendError),
225    /// Backend error with additional context. Previously
226    /// `ValkeyContext { source: ferriskey::Error }` (#88).
227    #[error("backend ({context}): {source}")]
228    BackendContext {
229        #[source]
230        source: ff_core::BackendError,
231        context: String,
232    },
233    #[error("config: {0}")]
234    Config(#[from] crate::config::ConfigError),
235    #[error("library load: {0}")]
236    LibraryLoad(#[from] ff_script::loader::LoadError),
237    #[error("partition mismatch: {0}")]
238    PartitionMismatch(String),
239    #[error("not found: {0}")]
240    NotFound(String),
241    #[error("invalid input: {0}")]
242    InvalidInput(String),
243    #[error("operation failed: {0}")]
244    OperationFailed(String),
245    #[error("script: {0}")]
246    Script(String),
247    /// Server-wide concurrency limit reached on a labelled pool. Surfaces
248    /// as HTTP 429 at the REST boundary so load balancers and clients can
249    /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
250    /// etc.) identifies WHICH pool is exhausted so operators aren't
251    /// misled by a single "tail unavailable" string when the real fault
252    /// is rotation contention.
253    ///
254    /// Fields: (source_label, max_permits).
255    #[error("too many concurrent {0} calls (max: {1})")]
256    ConcurrencyLimitExceeded(&'static str, u32),
257    /// Detected Valkey version is below the RFC-011 §13 minimum. The engine
258    /// depends on Valkey Functions (stabilized in 7.2), RESP3 (7.2+), and
259    /// hash-tag routing; older versions do not implement the required
260    /// primitives. Operator action: upgrade Valkey.
261    #[error(
262        "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
263    )]
264    ValkeyVersionTooLow {
265        detected: String,
266        required: String,
267    },
268}
269
270/// Lift a native `ferriskey::Error` into [`ServerError::Backend`] via
271/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
272/// `?`-propagation ergonomic at ferriskey call sites while the
273/// public variant stays backend-agnostic.
274impl From<ferriskey::Error> for ServerError {
275    fn from(err: ferriskey::Error) -> Self {
276        Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
277    }
278}
279
280/// Build a [`ServerError::BackendContext`] from a native
281/// `ferriskey::Error` and a call-site label (#88).
282pub(crate) fn backend_context(
283    err: ferriskey::Error,
284    context: impl Into<String>,
285) -> ServerError {
286    ServerError::BackendContext {
287        source: ff_backend_valkey::backend_error_from_ferriskey(&err),
288        context: context.into(),
289    }
290}
291
292impl ServerError {
293    /// Returns the classified [`ff_core::BackendErrorKind`] if this
294    /// error carries a backend transport fault. Covers direct
295    /// Backend variants and library-load failures.
296    ///
297    /// Renamed from `valkey_kind` in #88 — the previous return type
298    /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
299    /// consumer doing retry/error classification.
300    pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
301        match self {
302            Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
303            Self::LibraryLoad(e) => e
304                .valkey_kind()
305                .map(ff_backend_valkey::classify_ferriskey_kind),
306            _ => None,
307        }
308    }
309
310    /// Whether this error is safely retryable by a caller. For
311    /// backend transport variants, delegates to
312    /// [`ff_core::BackendErrorKind::is_retryable`]. Business-logic
313    /// rejections (NotFound, InvalidInput, OperationFailed, Script,
314    /// Config, PartitionMismatch) return false — those won't change
315    /// on retry.
316    pub fn is_retryable(&self) -> bool {
317        match self {
318            Self::Backend(be) | Self::BackendContext { source: be, .. } => {
319                be.kind().is_retryable()
320            }
321            Self::LibraryLoad(load_err) => load_err
322                .valkey_kind()
323                .map(is_retryable_kind)
324                .unwrap_or(false),
325            Self::Config(_)
326            | Self::PartitionMismatch(_)
327            | Self::NotFound(_)
328            | Self::InvalidInput(_)
329            | Self::OperationFailed(_)
330            | Self::Script(_) => false,
331            // Back off and retry — the bound is a server-side permit pool,
332            // so the retry will succeed once a permit frees up. Applies
333            // equally to stream ops, admin rotate, etc.
334            Self::ConcurrencyLimitExceeded(_, _) => true,
335            // Operator must upgrade Valkey; a retry at the caller won't help.
336            Self::ValkeyVersionTooLow { .. } => false,
337        }
338    }
339}
340
341impl Server {
342    /// Start the FlowFabric server.
343    ///
344    /// Boot sequence:
345    /// 1. Connect to Valkey
346    /// 2. Validate or create partition config key
347    /// 3. Load the FlowFabric Lua library
348    /// 4. Start engine (14 background scanners)
349    pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
350        Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
351    }
352
353    /// PR-94: boot the server with a shared observability registry.
354    ///
355    /// Scanner cycle + scheduler metrics record into this registry;
356    /// `main.rs` threads the same handle into the router so `/metrics`
357    /// exposes what the engine produces. The no-arg [`Server::start`]
358    /// forwards here with a fresh `Metrics::new()` — under the default
359    /// build that's the shim, under `observability` it's a real
360    /// registry not shared with any HTTP route (useful for tests
361    /// exercising the engine in isolation).
362    pub async fn start_with_metrics(
363        config: ServerConfig,
364        metrics: Arc<ff_observability::Metrics>,
365    ) -> Result<Self, ServerError> {
366        // Step 1: Connect to Valkey via ClientBuilder
367        tracing::info!(
368            host = %config.host, port = config.port,
369            tls = config.tls, cluster = config.cluster,
370            "connecting to Valkey"
371        );
372        let mut builder = ClientBuilder::new()
373            .host(&config.host, config.port)
374            .connect_timeout(Duration::from_secs(10))
375            .request_timeout(Duration::from_millis(5000));
376        if config.tls {
377            builder = builder.tls();
378        }
379        if config.cluster {
380            builder = builder.cluster();
381        }
382        let client = builder
383            .build()
384            .await
385            .map_err(|e| crate::server::backend_context(e, "connect"))?;
386
387        // Verify connectivity
388        let pong: String = client
389            .cmd("PING")
390            .execute()
391            .await
392            .map_err(|e| crate::server::backend_context(e, "PING"))?;
393        if pong != "PONG" {
394            return Err(ServerError::OperationFailed(format!(
395                "unexpected PING response: {pong}"
396            )));
397        }
398        tracing::info!("Valkey connection established");
399
400        // Step 1b: Verify Valkey version meets the RFC-011 §13 minimum (8.0).
401        // Tolerates a rolling upgrade via a 60s exponential-backoff budget
402        // per RFC-011 §9.17 — transient INFO errors during a node restart
403        // don't trip the check until the whole budget is exhausted.
404        verify_valkey_version(&client).await?;
405
406        // Step 2: Validate or create partition config
407        validate_or_create_partition_config(&client, &config.partition_config).await?;
408
409        // Step 2b: Install waitpoint HMAC secret into every execution partition
410        // (RFC-004 §Waitpoint Security). Fail-fast: if any partition fails,
411        // the server refuses to start — a partial install would silently
412        // reject signal deliveries on half the partitions.
413        initialize_waitpoint_hmac_secret(
414            &client,
415            &config.partition_config,
416            &config.waitpoint_hmac_secret,
417        )
418        .await?;
419
420        // Step 3: Load Lua library (skippable for tests where fixture already loaded)
421        if !config.skip_library_load {
422            tracing::info!("loading flowfabric Lua library");
423            ff_script::loader::ensure_library(&client)
424                .await
425                .map_err(ServerError::LibraryLoad)?;
426        } else {
427            tracing::info!("skipping library load (skip_library_load=true)");
428        }
429
430        // Step 3b: Seed the global lanes registry (`ff:idx:lanes`) with
431        // every lane from ServerConfig.lanes (issue #203).
432        //
433        // `EngineBackend::list_lanes` reads SMEMBERS from this global SET,
434        // so without a boot-time seed a freshly-provisioned deployment
435        // returns an empty lane list until the first execution is created
436        // on each lane. The Lua `ff_create_execution` path also SADDs on
437        // first-sight, which covers dynamic / post-boot lanes; this block
438        // covers the pre-declared set. SADD is idempotent.
439        //
440        // NOTE: `ff:idx:lanes` is intentionally NOT hash-tagged per
441        // partition — it is the single cross-partition global SET in the
442        // system (see `ff_core::keys::lanes_index_key`).
443        if !config.lanes.is_empty() {
444            let lane_strs: Vec<&str> = config.lanes.iter().map(|l| l.as_str()).collect();
445            let _: i64 = client
446                .cmd("SADD")
447                .arg(ff_core::keys::lanes_index_key().as_str())
448                .arg(lane_strs.as_slice())
449                .execute()
450                .await
451                .map_err(|e| crate::server::backend_context(e, "SADD ff:idx:lanes (seed)"))?;
452            tracing::info!(
453                lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
454                "seeded lanes index (ff:idx:lanes)"
455            );
456        }
457
458        // Step 4: Start engine with scanners
459        // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
460        let engine_cfg = ff_engine::EngineConfig {
461            partition_config: config.partition_config,
462            lanes: config.lanes.clone(),
463            lease_expiry_interval: config.engine_config.lease_expiry_interval,
464            delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
465            index_reconciler_interval: config.engine_config.index_reconciler_interval,
466            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
467            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
468            pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
469            retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
470            budget_reset_interval: config.engine_config.budget_reset_interval,
471            budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
472            quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
473            unblock_interval: config.engine_config.unblock_interval,
474            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
475            flow_projector_interval: config.engine_config.flow_projector_interval,
476            execution_deadline_interval: config.engine_config.execution_deadline_interval,
477            cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
478            edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
479            edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
480            scanner_filter: config.engine_config.scanner_filter.clone(),
481        };
482        // Engine scanners keep running on the MAIN `client` mux — NOT on
483        // `tail_client`. Scanner cadence is foreground-latency-coupled by
484        // design (an incident that blocks all FCALLs should also visibly
485        // block scanners), and keeping scanners off the tail client means a
486        // long-poll tail can never starve lease-expiry, retention-trim,
487        // etc. Do not change this without revisiting RFC-006 Impl Notes.
488        // Build the Valkey completion backend (issue #90) and subscribe.
489        // This replaces the pre-#90 `CompletionListenerConfig` path:
490        // the wire subscription now lives in ff-backend-valkey, the
491        // engine just consumes the resulting stream.
492        let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
493            config.host.clone(),
494            config.port,
495        );
496        valkey_conn.tls = config.tls;
497        valkey_conn.cluster = config.cluster;
498        let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
499            client.clone(),
500            config.partition_config,
501            valkey_conn,
502        );
503        let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
504            .await
505            .map_err(|e| ServerError::OperationFailed(format!(
506                "subscribe_completions: {e}"
507            )))?;
508
509        let engine = Engine::start_with_completions(
510            engine_cfg,
511            client.clone(),
512            metrics.clone(),
513            completion_stream,
514        );
515
516        // Dedicated tail client. Built AFTER library load + HMAC install
517        // because those steps use the main client; tail client only runs
518        // XREAD/XREAD BLOCK + HMGET on stream_meta, so it never needs the
519        // library loaded — but we build it on the same host/port/TLS
520        // options so network reachability is identical.
521        tracing::info!("opening dedicated tail connection");
522        let mut tail_builder = ClientBuilder::new()
523            .host(&config.host, config.port)
524            .connect_timeout(Duration::from_secs(10))
525            // `request_timeout` is ignored for XREAD BLOCK (ferriskey
526            // auto-extends to `block_ms + 500ms` for blocking commands),
527            // but is used for the companion HMGET — 5s matches main.
528            .request_timeout(Duration::from_millis(5000));
529        if config.tls {
530            tail_builder = tail_builder.tls();
531        }
532        if config.cluster {
533            tail_builder = tail_builder.cluster();
534        }
535        let tail_client = tail_builder
536            .build()
537            .await
538            .map_err(|e| crate::server::backend_context(e, "connect (tail)"))?;
539        let tail_pong: String = tail_client
540            .cmd("PING")
541            .execute()
542            .await
543            .map_err(|e| crate::server::backend_context(e, "PING (tail)"))?;
544        if tail_pong != "PONG" {
545            return Err(ServerError::OperationFailed(format!(
546                "tail client unexpected PING response: {tail_pong}"
547            )));
548        }
549
550        let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
551            config.max_concurrent_stream_ops as usize,
552        ));
553        let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
554        tracing::info!(
555            max_concurrent_stream_ops = config.max_concurrent_stream_ops,
556            "stream-op client ready (read + tail share the semaphore; \
557             tails additionally serialize via xread_block_lock)"
558        );
559
560        // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
561        // rotation, etc.) are only protected by the global Bearer
562        // middleware in api.rs — which is only installed when
563        // config.api_token is set. Without FF_API_TOKEN, a public
564        // deployment exposes secret rotation to anyone that can reach
565        // the listen_addr. Warn loudly so operators can't miss it; we
566        // don't fail-start because single-tenant dev uses this path.
567        if config.api_token.is_none() {
568            tracing::warn!(
569                listen_addr = %config.listen_addr,
570                "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
571                 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
572                 FF_API_TOKEN for any deployment reachable from untrusted \
573                 networks."
574            );
575            // Explicit callout for the credential-bearing read endpoints.
576            // Auditors grep for these on a per-endpoint basis; lumping
577            // into the admin warning alone hides the fact that
578            // /pending-waitpoints returns HMAC tokens and /result
579            // returns completion payload bytes.
580            tracing::warn!(
581                listen_addr = %config.listen_addr,
582                "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
583                 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
584                 and GET /v1/executions/{{id}}/result returns raw completion payload \
585                 bytes (may contain PII). Both are UNAUTHENTICATED in this \
586                 configuration."
587            );
588        }
589
590        // Partition counts — post-RFC-011 there are three physical families.
591        // Execution keys co-locate with their parent flow's partition (§2 of
592        // the RFC), so `num_flow_partitions` governs both exec and flow
593        // routing; no separate `num_execution_partitions` count exists.
594        tracing::info!(
595            flow_partitions = config.partition_config.num_flow_partitions,
596            budget_partitions = config.partition_config.num_budget_partitions,
597            quota_partitions = config.partition_config.num_quota_partitions,
598            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
599            listen_addr = %config.listen_addr,
600            "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
601            config.partition_config.num_flow_partitions,
602            config.partition_config.num_budget_partitions,
603            config.partition_config.num_quota_partitions,
604        );
605
606        let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
607            client.clone(),
608            config.partition_config,
609            metrics.clone(),
610        ));
611
612        Ok(Self {
613            client,
614            tail_client,
615            stream_semaphore,
616            xread_block_lock,
617            // Single-permit semaphore: only one rotate-waitpoint-secret can
618            // be mid-flight server-wide. Attackers or misbehaving scripts
619            // firing parallel rotations fail fast with 429 instead of
620            // queueing HTTP handlers.
621            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
622            engine,
623            config,
624            scheduler,
625            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
626            metrics,
627        })
628    }
629
630    /// PR-94: access the shared observability registry.
631    pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
632        &self.metrics
633    }
634
635    /// Get a reference to the ferriskey client.
636    pub fn client(&self) -> &Client {
637        &self.client
638    }
639
640    /// Execute an FCALL with automatic Lua library reload on "function not loaded".
641    ///
642    /// After a Valkey failover the new primary may not have the Lua library
643    /// loaded (replication lag or cold replica). This wrapper detects that
644    /// condition, reloads the library via `ff_script::loader::ensure_library`,
645    /// and retries the FCALL once.
646    async fn fcall_with_reload(
647        &self,
648        function: &str,
649        keys: &[&str],
650        args: &[&str],
651    ) -> Result<Value, ServerError> {
652        fcall_with_reload_on_client(&self.client, function, keys, args).await
653    }
654
655    /// Get the server config.
656    pub fn config(&self) -> &ServerConfig {
657        &self.config
658    }
659
660    /// Get the partition config.
661    pub fn partition_config(&self) -> &PartitionConfig {
662        &self.config.partition_config
663    }
664
665    // ── Minimal Phase 1 API ──
666
667    /// Create a new execution.
668    ///
669    /// Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.
670    pub async fn create_execution(
671        &self,
672        args: &CreateExecutionArgs,
673    ) -> Result<CreateExecutionResult, ServerError> {
674        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
675        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
676        let idx = IndexKeys::new(&partition);
677
678        let lane = &args.lane_id;
679        let tag = partition.hash_tag();
680        let idem_key = match &args.idempotency_key {
681            Some(k) if !k.is_empty() => {
682                keys::idempotency_key(&tag, args.namespace.as_str(), k)
683            }
684            _ => ctx.noop(),
685        };
686
687        let delay_str = args
688            .delay_until
689            .map(|d| d.0.to_string())
690            .unwrap_or_default();
691        let is_delayed = !delay_str.is_empty();
692
693        // KEYS (8) must match lua/execution.lua ff_create_execution positional order:
694        //   [1] exec_core, [2] payload, [3] policy, [4] tags,
695        //   [5] scheduling_zset (eligible OR delayed — ONE key),
696        //   [6] idem_key, [7] execution_deadline, [8] all_executions
697        let scheduling_zset = if is_delayed {
698            idx.lane_delayed(lane)
699        } else {
700            idx.lane_eligible(lane)
701        };
702
703        let fcall_keys: Vec<String> = vec![
704            ctx.core(),                  // 1
705            ctx.payload(),               // 2
706            ctx.policy(),                // 3
707            ctx.tags(),                  // 4
708            scheduling_zset,             // 5
709            idem_key,                    // 6
710            idx.execution_deadline(),    // 7
711            idx.all_executions(),        // 8
712        ];
713
714        let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
715
716        // ARGV (13) must match lua/execution.lua ff_create_execution positional order:
717        //   [1] execution_id, [2] namespace, [3] lane_id, [4] execution_kind,
718        //   [5] priority, [6] creator_identity, [7] policy_json,
719        //   [8] input_payload, [9] delay_until, [10] dedup_ttl_ms,
720        //   [11] tags_json, [12] execution_deadline_at, [13] partition_id
721        let fcall_args: Vec<String> = vec![
722            args.execution_id.to_string(),           // 1
723            args.namespace.to_string(),              // 2
724            args.lane_id.to_string(),                // 3
725            args.execution_kind.clone(),             // 4
726            args.priority.to_string(),               // 5
727            args.creator_identity.clone(),           // 6
728            args.policy.as_ref()
729                .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
730                .unwrap_or_else(|| "{}".to_owned()), // 7
731            String::from_utf8_lossy(&args.input_payload).into_owned(), // 8
732            delay_str,                               // 9
733            args.idempotency_key.as_ref()
734                .map(|_| "86400000".to_string())
735                .unwrap_or_default(),                // 10 dedup_ttl_ms
736            tags_json,                               // 11
737            args.execution_deadline_at
738                .map(|d| d.to_string())
739                .unwrap_or_default(),                // 12 execution_deadline_at
740            args.partition_id.to_string(),           // 13
741        ];
742
743        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
744        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
745
746        let raw: Value = self
747            .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
748            .await?;
749
750        parse_create_result(&raw, &args.execution_id)
751    }
752
753    /// Cancel an execution.
754    pub async fn cancel_execution(
755        &self,
756        args: &CancelExecutionArgs,
757    ) -> Result<CancelExecutionResult, ServerError> {
758        let raw = self
759            .fcall_cancel_execution_with_reload(args)
760            .await?;
761        parse_cancel_result(&raw, &args.execution_id)
762    }
763
764    /// Build KEYS/ARGV for `ff_cancel_execution` and invoke via the server's
765    /// reload-capable FCALL. Shared by the inline method and background
766    /// cancel_flow dispatch via [`Self::fcall_cancel_execution_with_reload`].
767    async fn fcall_cancel_execution_with_reload(
768        &self,
769        args: &CancelExecutionArgs,
770    ) -> Result<Value, ServerError> {
771        let (keys, argv) = build_cancel_execution_fcall(
772            &self.client,
773            &self.config.partition_config,
774            args,
775        )
776        .await?;
777        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
778        let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
779        self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
780    }
781
782    /// Get the public state of an execution.
783    ///
784    /// Reads `public_state` from the exec_core hash. Returns the parsed
785    /// PublicState enum. If the execution is not found, returns an error.
786    pub async fn get_execution_state(
787        &self,
788        execution_id: &ExecutionId,
789    ) -> Result<PublicState, ServerError> {
790        let partition = execution_partition(execution_id, &self.config.partition_config);
791        let ctx = ExecKeyContext::new(&partition, execution_id);
792
793        let state_str: Option<String> = self
794            .client
795            .hget(&ctx.core(), "public_state")
796            .await
797            .map_err(|e| crate::server::backend_context(e, "HGET public_state"))?;
798
799        match state_str {
800            Some(s) => {
801                let quoted = format!("\"{s}\"");
802                serde_json::from_str(&quoted).map_err(|e| {
803                    ServerError::Script(format!(
804                        "invalid public_state '{s}' for {execution_id}: {e}"
805                    ))
806                })
807            }
808            None => Err(ServerError::NotFound(format!(
809                "execution not found: {execution_id}"
810            ))),
811        }
812    }
813
814    /// Read the raw result payload written by `ff_complete_execution`.
815    ///
816    /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
817    /// No FCALL — this is a direct GET; returns `Ok(None)` when the
818    /// execution is missing, not yet complete, or (in a future
819    /// retention-policy world) when the result was trimmed.
820    ///
821    /// # Contract vs `get_execution_state`
822    ///
823    /// `get_execution_state` is the authoritative completion signal. If
824    /// a caller observes `state == completed` but `get_execution_result`
825    /// returns `None`, the result bytes are unavailable — not a caller
826    /// bug and not a server bug, just the retention policy trimming the
827    /// blob. V1 sets no retention, so callers on v1 can treat
828    /// `state == completed` + `Ok(None)` as a server bug.
829    ///
830    /// # Ordering
831    ///
832    /// Callers MUST wait for `state == completed` before calling this
833    /// method; polls issued before the state transition may hit a
834    /// narrow window where the completion Lua has written
835    /// `public_state = completed` but the `result` key SET is still
836    /// on-wire. The current Lua `ff_complete_execution` writes both in
837    /// the same atomic script, so the window is effectively zero for
838    /// direct callers — but retries via `ff_replay_execution` open it
839    /// briefly.
840    pub async fn get_execution_result(
841        &self,
842        execution_id: &ExecutionId,
843    ) -> Result<Option<Vec<u8>>, ServerError> {
844        let partition = execution_partition(execution_id, &self.config.partition_config);
845        let ctx = ExecKeyContext::new(&partition, execution_id);
846
847        // Binary-safe read. Decoding into `String` would reject any
848        // non-UTF-8 payload (bincode-encoded f32 vectors, compressed
849        // artifacts, arbitrary binary stages) with a ferriskey decode
850        // error that surfaces as HTTP 500. The endpoint response is
851        // already `application/octet-stream`; `Vec<u8>` has a
852        // ferriskey-specialized `FromValue` impl that preserves raw
853        // bytes without UTF-8 validation (see ferriskey/src/value.rs:
854        // `from_byte_vec`). Nil → None via the blanket
855        // `Option<T: FromValue>` wrapper.
856        let payload: Option<Vec<u8>> = self
857            .client
858            .cmd("GET")
859            .arg(ctx.result())
860            .execute()
861            .await
862            .map_err(|e| crate::server::backend_context(e, "GET exec result"))?;
863        Ok(payload)
864    }
865
866    /// List the active (`pending` or `active`) waitpoints for an execution.
867    ///
868    /// Returns one [`PendingWaitpointInfo`] per open waitpoint, including the
869    /// HMAC-SHA1 `waitpoint_token` needed to deliver authenticated signals.
870    /// `closed` waitpoints are elided — callers looking at history should
871    /// read the stream or lease history instead.
872    ///
873    /// Read plan: SSCAN `ctx.waitpoints()` with `COUNT 100` (bounded
874    /// page size, matching the unblock / flow-projector / budget-
875    /// reconciler convention) to enumerate waitpoint IDs, then TWO
876    /// pipelines:
877    ///
878    /// * Pass 1 — single round-trip containing, per waitpoint, one
879    ///   HMGET over the documented field set + one HGET for
880    ///   `total_matchers` on the condition hash.
881    /// * Pass 2 (conditional) — single round-trip containing an
882    ///   HMGET per waitpoint with `total_matchers > 0` to read the
883    ///   `matcher:N:name` fields.
884    ///
885    /// No FCALL — this is a read-only view built from already-
886    /// persisted state, so skipping Lua keeps the Valkey single-
887    /// writer path uncontended. HMGET (vs HGETALL) bounds the per-
888    /// waitpoint read to the documented field set and defends
889    /// against a poisoned waitpoint hash with unbounded extra fields
890    /// accumulating response memory.
891    ///
892    /// # Empty result semantics (TOCTOU)
893    ///
894    /// An empty `Vec` is returned in three cases:
895    ///
896    /// 1. The execution exists but has never suspended.
897    /// 2. All existing waitpoints are `closed` (already resolved).
898    /// 3. A narrow teardown race: `SSCAN` read the waitpoint set after
899    ///    a concurrent `ff_close_waitpoint` or execution-cleanup script
900    ///    deleted the waitpoint hashes but before it SREM'd the set
901    ///    members. Each HMGET returns all-None and we skip.
902    ///
903    /// Callers that get an unexpected empty list should cross-check
904    /// execution state (`get_execution_state`) to distinguish "pipeline
905    /// moved past suspended" from "nothing to review yet".
906    ///
907    /// A waitpoint hash that's present but missing its `waitpoint_token`
908    /// field is similarly elided and a server-side WARN is emitted —
909    /// this indicates storage corruption (a write that half-populated
910    /// the hash) and operators should investigate.
911    pub async fn list_pending_waitpoints(
912        &self,
913        execution_id: &ExecutionId,
914    ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
915        let partition = execution_partition(execution_id, &self.config.partition_config);
916        let ctx = ExecKeyContext::new(&partition, execution_id);
917
918        let core_exists: bool = self
919            .client
920            .cmd("EXISTS")
921            .arg(ctx.core())
922            .execute()
923            .await
924            .map_err(|e| crate::server::backend_context(e, "EXISTS exec_core (pending waitpoints)"))?;
925        if !core_exists {
926            return Err(ServerError::NotFound(format!(
927                "execution not found: {execution_id}"
928            )));
929        }
930
931        // SSCAN the waitpoints set in bounded pages. SMEMBERS would
932        // load the entire set in one reply — fine for today's
933        // single-waitpoint executions, but the codebase convention
934        // (budget_reconciler, flow_projector, unblock scanner) is SSCAN
935        // COUNT 100 so response size per round-trip is bounded as the
936        // set grows. Match the precedent instead of carving a new
937        // pattern here.
938        const WAITPOINTS_SSCAN_COUNT: usize = 100;
939        let waitpoints_key = ctx.waitpoints();
940        let mut wp_ids_raw: Vec<String> = Vec::new();
941        let mut cursor: String = "0".to_owned();
942        loop {
943            let reply: (String, Vec<String>) = self
944                .client
945                .cmd("SSCAN")
946                .arg(&waitpoints_key)
947                .arg(&cursor)
948                .arg("COUNT")
949                .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
950                .execute()
951                .await
952                .map_err(|e| crate::server::backend_context(e, "SSCAN waitpoints"))?;
953            cursor = reply.0;
954            wp_ids_raw.extend(reply.1);
955            if cursor == "0" {
956                break;
957            }
958        }
959
960        // SSCAN may observe a member more than once across iterations —
961        // that is documented Valkey behavior, not a bug (see
962        // https://valkey.io/commands/sscan/). Dedup in-place before
963        // building the typed id list so the HTTP response and the
964        // downstream pipelined HMGETs don't duplicate work. Sort +
965        // dedup keeps this O(n log n) without a BTreeSet allocation;
966        // typical n is 1 (the single waitpoint on a suspended exec).
967        wp_ids_raw.sort_unstable();
968        wp_ids_raw.dedup();
969
970        if wp_ids_raw.is_empty() {
971            return Ok(Vec::new());
972        }
973
974        // Parse + filter out malformed waitpoint_ids up-front so the
975        // downstream pipeline indexes stay aligned to the Vec of
976        // typed ids.
977        let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
978        for raw in &wp_ids_raw {
979            match WaitpointId::parse(raw) {
980                Ok(id) => wp_ids.push(id),
981                Err(e) => tracing::warn!(
982                    raw_id = %raw,
983                    error = %e,
984                    execution_id = %execution_id,
985                    "list_pending_waitpoints: skipping unparseable waitpoint_id"
986                ),
987            }
988        }
989        if wp_ids.is_empty() {
990            return Ok(Vec::new());
991        }
992
993        // Bounded HMGET field set — these are the six hash fields that
994        // surface in `PendingWaitpointInfo`. Fixed-size indexing into
995        // the response below tracks this order.
996        const WP_FIELDS: [&str; 6] = [
997            "state",
998            "waitpoint_key",
999            "waitpoint_token",
1000            "created_at",
1001            "activated_at",
1002            "expires_at",
1003        ];
1004
1005        // Pass 1 pipeline: waitpoint HMGET + condition HGET
1006        // total_matchers, one of each per waitpoint, in a SINGLE
1007        // round-trip. Sequential HMGETs were an N-round-trip latency
1008        // floor on flows with fan-out waitpoints.
1009        let mut pass1 = self.client.pipeline();
1010        let mut wp_slots = Vec::with_capacity(wp_ids.len());
1011        let mut cond_slots = Vec::with_capacity(wp_ids.len());
1012        for wp_id in &wp_ids {
1013            let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
1014            cmd = cmd.arg(ctx.waitpoint(wp_id));
1015            for f in WP_FIELDS {
1016                cmd = cmd.arg(f);
1017            }
1018            wp_slots.push(cmd.finish());
1019
1020            cond_slots.push(
1021                pass1
1022                    .cmd::<Option<String>>("HGET")
1023                    .arg(ctx.waitpoint_condition(wp_id))
1024                    .arg("total_matchers")
1025                    .finish(),
1026            );
1027        }
1028        pass1
1029            .execute()
1030            .await
1031            .map_err(|e| crate::server::backend_context(e, "pipeline HMGET waitpoints + HGET total_matchers"))?;
1032
1033        // Collect pass-1 results + queue pass-2 HMGETs for condition
1034        // matcher names on waitpoints that are actionable (state in
1035        // pending/active, non-empty token, total_matchers > 0). PipeSlot
1036        // values are owning, so iterate all three ordered zip'd iters
1037        // and consume them together.
1038        struct Kept {
1039            wp_id: WaitpointId,
1040            wp_fields: Vec<Option<String>>,
1041            total_matchers: usize,
1042        }
1043        let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
1044        for ((wp_id, wp_slot), cond_slot) in wp_ids
1045            .iter()
1046            .zip(wp_slots)
1047            .zip(cond_slots)
1048        {
1049            let wp_fields: Vec<Option<String>> =
1050                wp_slot.value().map_err(|e| crate::server::backend_context(e, format!("pipeline slot HMGET waitpoint {wp_id}")))?;
1051
1052            // Waitpoint hash may have been GC'd between the SSCAN and
1053            // this HMGET (rotation / cleanup race). Skip silently.
1054            if wp_fields.iter().all(Option::is_none) {
1055                // Still consume the cond_slot to free its buffer.
1056                let _ = cond_slot.value();
1057                continue;
1058            }
1059            let state_ref = wp_fields
1060                .first()
1061                .and_then(|v| v.as_deref())
1062                .unwrap_or("");
1063            if state_ref != "pending" && state_ref != "active" {
1064                let _ = cond_slot.value();
1065                continue;
1066            }
1067            let token_ref = wp_fields
1068                .get(2)
1069                .and_then(|v| v.as_deref())
1070                .unwrap_or("");
1071            if token_ref.is_empty() {
1072                let _ = cond_slot.value();
1073                tracing::warn!(
1074                    waitpoint_id = %wp_id,
1075                    execution_id = %execution_id,
1076                    waitpoint_hash_key = %ctx.waitpoint(wp_id),
1077                    state = %state_ref,
1078                    "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1079                     field is empty — likely storage corruption (half-populated write, \
1080                     operator edit, or interrupted script). Skipping this entry in the \
1081                     response. HGETALL the waitpoint_hash_key to inspect."
1082                );
1083                continue;
1084            }
1085
1086            let total_matchers = cond_slot
1087                .value()
1088                .map_err(|e| crate::server::backend_context(e, format!("pipeline slot HGET total_matchers {wp_id}")))?
1089                .and_then(|s| s.parse::<usize>().ok())
1090                .unwrap_or(0);
1091
1092            kept.push(Kept {
1093                wp_id: wp_id.clone(),
1094                wp_fields,
1095                total_matchers,
1096            });
1097        }
1098
1099        if kept.is_empty() {
1100            return Ok(Vec::new());
1101        }
1102
1103        // Pass 2 pipeline: matcher-name HMGETs for every kept waitpoint
1104        // with total_matchers > 0. Single round-trip regardless of
1105        // per-waitpoint matcher count. Waitpoints with total_matchers==0
1106        // (wildcard) skip the HMGET entirely.
1107        let mut pass2 = self.client.pipeline();
1108        let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1109        let mut pass2_needed = false;
1110        for k in &kept {
1111            if k.total_matchers == 0 {
1112                matcher_slots.push(None);
1113                continue;
1114            }
1115            pass2_needed = true;
1116            let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1117            cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1118            for i in 0..k.total_matchers {
1119                cmd = cmd.arg(format!("matcher:{i}:name"));
1120            }
1121            matcher_slots.push(Some(cmd.finish()));
1122        }
1123        if pass2_needed {
1124            pass2.execute().await.map_err(|e| crate::server::backend_context(e, "pipeline HMGET wp_condition matchers"))?;
1125        }
1126
1127        let parse_ts = |raw: &str| -> Option<TimestampMs> {
1128            if raw.is_empty() {
1129                None
1130            } else {
1131                raw.parse::<i64>().ok().map(TimestampMs)
1132            }
1133        };
1134
1135        let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1136        for (k, slot) in kept.into_iter().zip(matcher_slots) {
1137            let get = |i: usize| -> &str {
1138                k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1139            };
1140
1141            // Collect matcher names, eliding the empty-name wildcard
1142            // sentinel (see lua/helpers.lua initialize_condition).
1143            let required_signal_names: Vec<String> = match slot {
1144                None => Vec::new(),
1145                Some(s) => {
1146                    let vals: Vec<Option<String>> =
1147                        s.value().map_err(|e| crate::server::backend_context(e, format!(
1148                                "pipeline slot HMGET wp_condition matchers {}",
1149                                k.wp_id
1150                            )))?;
1151                    vals.into_iter()
1152                        .flatten()
1153                        .filter(|name| !name.is_empty())
1154                        .collect()
1155                }
1156            };
1157
1158            out.push(PendingWaitpointInfo {
1159                waitpoint_id: k.wp_id,
1160                waitpoint_key: get(1).to_owned(),
1161                state: get(0).to_owned(),
1162                waitpoint_token: WaitpointToken(get(2).to_owned()),
1163                required_signal_names,
1164                created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1165                activated_at: parse_ts(get(4)),
1166                expires_at: parse_ts(get(5)),
1167            });
1168        }
1169
1170        Ok(out)
1171    }
1172
1173    // ── Budget / Quota API ──
1174
1175    /// Create a new budget policy.
1176    pub async fn create_budget(
1177        &self,
1178        args: &CreateBudgetArgs,
1179    ) -> Result<CreateBudgetResult, ServerError> {
1180        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1181        validate_create_budget_dimensions(
1182            &args.dimensions,
1183            &args.hard_limits,
1184            &args.soft_limits,
1185        )?;
1186        let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1187        let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1188        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1189        let policies_index = keys::budget_policies_index(bctx.hash_tag());
1190
1191        // KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,
1192        //           budget_policies_index
1193        let fcall_keys: Vec<String> = vec![
1194            bctx.definition(),
1195            bctx.limits(),
1196            bctx.usage(),
1197            resets_key,
1198            policies_index,
1199        ];
1200
1201        // ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,
1202        //   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
1203        //   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
1204        let dim_count = args.dimensions.len();
1205        let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1206        fcall_args.push(args.budget_id.to_string());
1207        fcall_args.push(args.scope_type.clone());
1208        fcall_args.push(args.scope_id.clone());
1209        fcall_args.push(args.enforcement_mode.clone());
1210        fcall_args.push(args.on_hard_limit.clone());
1211        fcall_args.push(args.on_soft_limit.clone());
1212        fcall_args.push(args.reset_interval_ms.to_string());
1213        fcall_args.push(args.now.to_string());
1214        fcall_args.push(dim_count.to_string());
1215        for dim in &args.dimensions {
1216            fcall_args.push(dim.clone());
1217        }
1218        for hard in &args.hard_limits {
1219            fcall_args.push(hard.to_string());
1220        }
1221        for soft in &args.soft_limits {
1222            fcall_args.push(soft.to_string());
1223        }
1224
1225        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1226        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1227
1228        let raw: Value = self
1229            .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1230            .await?;
1231
1232        parse_budget_create_result(&raw, &args.budget_id)
1233    }
1234
1235    /// Create a new quota/rate-limit policy.
1236    pub async fn create_quota_policy(
1237        &self,
1238        args: &CreateQuotaPolicyArgs,
1239    ) -> Result<CreateQuotaPolicyResult, ServerError> {
1240        let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1241        let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1242
1243        // KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,
1244        //           admitted_set, quota_policies_index
1245        let fcall_keys: Vec<String> = vec![
1246            qctx.definition(),
1247            qctx.window("requests_per_window"),
1248            qctx.concurrency(),
1249            qctx.admitted_set(),
1250            keys::quota_policies_index(qctx.hash_tag()),
1251        ];
1252
1253        // ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,
1254        //           max_concurrent, now_ms
1255        let fcall_args: Vec<String> = vec![
1256            args.quota_policy_id.to_string(),
1257            args.window_seconds.to_string(),
1258            args.max_requests_per_window.to_string(),
1259            args.max_concurrent.to_string(),
1260            args.now.to_string(),
1261        ];
1262
1263        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1264        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1265
1266        let raw: Value = self
1267            .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1268            .await?;
1269
1270        parse_quota_create_result(&raw, &args.quota_policy_id)
1271    }
1272
1273    /// Read-only budget status for operator visibility.
1274    pub async fn get_budget_status(
1275        &self,
1276        budget_id: &BudgetId,
1277    ) -> Result<BudgetStatus, ServerError> {
1278        let partition = budget_partition(budget_id, &self.config.partition_config);
1279        let bctx = BudgetKeyContext::new(&partition, budget_id);
1280
1281        // Read budget definition
1282        let def: HashMap<String, String> = self
1283            .client
1284            .hgetall(&bctx.definition())
1285            .await
1286            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_def"))?;
1287
1288        if def.is_empty() {
1289            return Err(ServerError::NotFound(format!(
1290                "budget not found: {budget_id}"
1291            )));
1292        }
1293
1294        // Read usage
1295        let usage_raw: HashMap<String, String> = self
1296            .client
1297            .hgetall(&bctx.usage())
1298            .await
1299            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_usage"))?;
1300        let usage: HashMap<String, u64> = usage_raw
1301            .into_iter()
1302            .filter(|(k, _)| k != "_init")
1303            .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1304            .collect();
1305
1306        // Read limits
1307        let limits_raw: HashMap<String, String> = self
1308            .client
1309            .hgetall(&bctx.limits())
1310            .await
1311            .map_err(|e| crate::server::backend_context(e, "HGETALL budget_limits"))?;
1312        let mut hard_limits = HashMap::new();
1313        let mut soft_limits = HashMap::new();
1314        for (k, v) in &limits_raw {
1315            if let Some(dim) = k.strip_prefix("hard:") {
1316                hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1317            } else if let Some(dim) = k.strip_prefix("soft:") {
1318                soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1319            }
1320        }
1321
1322        let non_empty = |s: Option<&String>| -> Option<String> {
1323            s.filter(|v| !v.is_empty()).cloned()
1324        };
1325
1326        Ok(BudgetStatus {
1327            budget_id: budget_id.to_string(),
1328            scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1329            scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1330            enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1331            usage,
1332            hard_limits,
1333            soft_limits,
1334            breach_count: def
1335                .get("breach_count")
1336                .and_then(|v| v.parse().ok())
1337                .unwrap_or(0),
1338            soft_breach_count: def
1339                .get("soft_breach_count")
1340                .and_then(|v| v.parse().ok())
1341                .unwrap_or(0),
1342            last_breach_at: non_empty(def.get("last_breach_at")),
1343            last_breach_dim: non_empty(def.get("last_breach_dim")),
1344            next_reset_at: non_empty(def.get("next_reset_at")),
1345            created_at: non_empty(def.get("created_at")),
1346        })
1347    }
1348
1349    /// Report usage against a budget and check limits.
1350    pub async fn report_usage(
1351        &self,
1352        budget_id: &BudgetId,
1353        args: &ReportUsageArgs,
1354    ) -> Result<ReportUsageResult, ServerError> {
1355        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1356        validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1357        let partition = budget_partition(budget_id, &self.config.partition_config);
1358        let bctx = BudgetKeyContext::new(&partition, budget_id);
1359
1360        // KEYS (3): budget_usage, budget_limits, budget_def
1361        let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1362
1363        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
1364        let dim_count = args.dimensions.len();
1365        let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1366        fcall_args.push(dim_count.to_string());
1367        for dim in &args.dimensions {
1368            fcall_args.push(dim.clone());
1369        }
1370        for delta in &args.deltas {
1371            fcall_args.push(delta.to_string());
1372        }
1373        fcall_args.push(args.now.to_string());
1374        let dedup_key_val = args
1375            .dedup_key
1376            .as_ref()
1377            .filter(|k| !k.is_empty())
1378            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1379            .unwrap_or_default();
1380        fcall_args.push(dedup_key_val);
1381
1382        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1383        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1384
1385        let raw: Value = self
1386            .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1387            .await?;
1388
1389        parse_report_usage_result(&raw)
1390    }
1391
1392    /// Reset a budget's usage counters and schedule the next reset.
1393    pub async fn reset_budget(
1394        &self,
1395        budget_id: &BudgetId,
1396    ) -> Result<ResetBudgetResult, ServerError> {
1397        let partition = budget_partition(budget_id, &self.config.partition_config);
1398        let bctx = BudgetKeyContext::new(&partition, budget_id);
1399        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1400
1401        // KEYS (3): budget_def, budget_usage, budget_resets_zset
1402        let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1403
1404        // ARGV (2): budget_id, now_ms
1405        let now = TimestampMs::now();
1406        let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1407
1408        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1409        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1410
1411        let raw: Value = self
1412            .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1413            .await?;
1414
1415        parse_reset_budget_result(&raw)
1416    }
1417
1418    // ── Flow API ──
1419
1420    /// Create a new flow container.
1421    pub async fn create_flow(
1422        &self,
1423        args: &CreateFlowArgs,
1424    ) -> Result<CreateFlowResult, ServerError> {
1425        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1426        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1427        let fidx = FlowIndexKeys::new(&partition);
1428
1429        // KEYS (3): flow_core, members_set, flow_index
1430        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1431
1432        // ARGV (4): flow_id, flow_kind, namespace, now_ms
1433        let fcall_args: Vec<String> = vec![
1434            args.flow_id.to_string(),
1435            args.flow_kind.clone(),
1436            args.namespace.to_string(),
1437            args.now.to_string(),
1438        ];
1439
1440        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1441        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1442
1443        let raw: Value = self
1444            .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1445            .await?;
1446
1447        parse_create_flow_result(&raw, &args.flow_id)
1448    }
1449
1450    /// Add an execution to a flow.
1451    ///
1452    /// # Atomic single-FCALL commit (RFC-011 §7.3)
1453    ///
1454    /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
1455    /// hash-tag routing (both hash to `{fp:N}` via the exec id's
1456    /// embedded partition). A single atomic FCALL writes:
1457    ///
1458    ///   - `members_set` SADD (flow membership)
1459    ///   - `exec_core.flow_id` HSET (back-pointer)
1460    ///   - `flow_index` SADD (self-heal)
1461    ///   - `flow_core` HINCRBY node_count / graph_revision +
1462    ///     HSET last_mutation_at
1463    ///
1464    /// All four writes commit atomically or none do (Valkey scripting
1465    /// contract: validates-before-writing in the Lua body means
1466    /// `flow_not_found` / `flow_already_terminal` early-returns fire
1467    /// BEFORE any `redis.call()` mutation, and a mid-body error after
1468    /// writes is not expected because all writes are on the same slot).
1469    ///
1470    /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
1471    ///
1472    /// # Consumer contract
1473    ///
1474    /// The caller's `args.execution_id` **must** be co-located with
1475    /// `args.flow_id`'s partition — i.e. minted via
1476    /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
1477    /// `solo`-minted id (or any exec id hashing to a different
1478    /// `{fp:N}` than the flow's) will fail at the Valkey level with
1479    /// `CROSSSLOT` on a clustered deploy.
1480    ///
1481    /// Callers with a flow context in scope always use `for_flow`;
1482    /// this is the only supported mint path for flow-member execs
1483    /// post-RFC-011. Test fixtures that pre-date the co-location
1484    /// contract use `TestCluster::new_execution_id_on_partition` to
1485    /// pin to a specific hash-tag index for `fcall_create_flow`-style
1486    /// helpers that hard-code their flow partition.
1487    pub async fn add_execution_to_flow(
1488        &self,
1489        args: &AddExecutionToFlowArgs,
1490    ) -> Result<AddExecutionToFlowResult, ServerError> {
1491        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1492        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1493        let fidx = FlowIndexKeys::new(&partition);
1494
1495        // exec_core co-locates with flow_core under RFC-011 §7.3 —
1496        // same `{fp:N}` hash-tag, same slot, part of the same atomic
1497        // FCALL below.
1498        let exec_partition =
1499            execution_partition(&args.execution_id, &self.config.partition_config);
1500        let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1501
1502        // Pre-flight: exec_partition must match flow_partition under
1503        // RFC-011 §7.3 co-location contract. If the caller hands us a
1504        // `solo`-minted exec whose hash-tag ≠ flow_partition, the FCALL
1505        // would fail with raw `CROSSSLOT` on a clustered deploy — a
1506        // typed `ServerError::PartitionMismatch` is a clearer signal
1507        // that the consumer-contract invariant was violated at mint
1508        // time (caller should have used `ExecutionId::for_flow(...)`).
1509        // See the Consumer contract section of this method's rustdoc.
1510        if exec_partition.index != partition.index {
1511            return Err(ServerError::PartitionMismatch(format!(
1512                "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1513                 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1514                 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1515                exec_p = exec_partition.index,
1516                flow_p = partition.index,
1517            )));
1518        }
1519
1520        // KEYS (4): flow_core, members_set, flow_index, exec_core
1521        let fcall_keys: Vec<String> = vec![
1522            fctx.core(),
1523            fctx.members(),
1524            fidx.flow_index(),
1525            ectx.core(),
1526        ];
1527
1528        // ARGV (3): flow_id, execution_id, now_ms
1529        let fcall_args: Vec<String> = vec![
1530            args.flow_id.to_string(),
1531            args.execution_id.to_string(),
1532            args.now.to_string(),
1533        ];
1534
1535        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1536        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1537
1538        let raw: Value = self
1539            .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1540            .await?;
1541
1542        parse_add_execution_to_flow_result(&raw)
1543    }
1544
1545    /// Cancel a flow.
1546    ///
1547    /// Flips `public_flow_state` to `cancelled` atomically via
1548    /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
1549    /// executions must be cancelled cross-partition; this dispatch runs in
1550    /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
1551    /// immediately. For all other policies (or flows with no members), or
1552    /// when the flow was already in a terminal state (idempotent retry),
1553    /// the call returns [`CancelFlowResult::Cancelled`].
1554    ///
1555    /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
1556    ///
1557    /// # Backpressure
1558    ///
1559    /// Each call that hits the async dispatch path spawns a new task into
1560    /// the shared background `JoinSet`. Rapid repeated calls against the
1561    /// same flow will spawn *multiple* overlapping dispatch tasks. This is
1562    /// not a correctness issue — each member cancel is idempotent and
1563    /// terminal flows short-circuit via [`ParsedCancelFlow::AlreadyTerminal`]
1564    /// — but heavy burst callers should either use `?wait=true` (serialises
1565    /// the dispatch on the HTTP thread, giving natural backpressure) or
1566    /// implement client-side deduplication on `flow_id`. The `JoinSet` is
1567    /// drained with a 15s timeout on [`Self::shutdown`], so very long
1568    /// dispatch tails may be aborted during graceful shutdown.
1569    ///
1570    /// # Orphan-member semantics on shutdown abort
1571    ///
1572    /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
1573    /// while a dispatch loop is mid-iteration, the already-issued
1574    /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
1575    /// `terminal_outcome = cancelled` and the caller-supplied reason. The
1576    /// members not yet visited are abandoned mid-loop. They remain in
1577    /// whichever state they were in (active/eligible/suspended) until the
1578    /// natural lifecycle scanners reach them: active leases expire
1579    /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
1580    /// members time out to `skipped`, eligible ones sit until retention
1581    /// trim. So no orphan state — but the terminal_outcome for the
1582    /// abandoned members will be `expired`/`skipped` rather than
1583    /// `cancelled`, and the operator-supplied `reason` is lost for them.
1584    /// Audit tooling that requires reason fidelity across shutdowns should
1585    /// use `?wait=true`.
1586    pub async fn cancel_flow(
1587        &self,
1588        args: &CancelFlowArgs,
1589    ) -> Result<CancelFlowResult, ServerError> {
1590        self.cancel_flow_inner(args, false).await
1591    }
1592
1593    /// Cancel a flow and wait for all member cancellations to complete
1594    /// inline. Slower than [`Self::cancel_flow`] for large flows, but
1595    /// guarantees every member is in a terminal state on return.
1596    pub async fn cancel_flow_wait(
1597        &self,
1598        args: &CancelFlowArgs,
1599    ) -> Result<CancelFlowResult, ServerError> {
1600        self.cancel_flow_inner(args, true).await
1601    }
1602
1603    async fn cancel_flow_inner(
1604        &self,
1605        args: &CancelFlowArgs,
1606        wait: bool,
1607    ) -> Result<CancelFlowResult, ServerError> {
1608        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1609        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1610        let fidx = FlowIndexKeys::new(&partition);
1611
1612        // Grace window before the reconciler may pick up this flow.
1613        // Kept short because the in-process dispatch is fast and bounded;
1614        // long enough to cover transport round-trips under load without
1615        // the reconciler fighting the live dispatch.
1616        const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1617
1618        // KEYS (5): flow_core, members_set, flow_index, pending_cancels, cancel_backlog
1619        let fcall_keys: Vec<String> = vec![
1620            fctx.core(),
1621            fctx.members(),
1622            fidx.flow_index(),
1623            fctx.pending_cancels(),
1624            fidx.cancel_backlog(),
1625        ];
1626
1627        // ARGV (5): flow_id, reason, cancellation_policy, now_ms, grace_ms
1628        let fcall_args: Vec<String> = vec![
1629            args.flow_id.to_string(),
1630            args.reason.clone(),
1631            args.cancellation_policy.clone(),
1632            args.now.to_string(),
1633            CANCEL_RECONCILER_GRACE_MS.to_string(),
1634        ];
1635
1636        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1637        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1638
1639        let raw: Value = self
1640            .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1641            .await?;
1642
1643        let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1644            ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1645                (policy, member_execution_ids)
1646            }
1647            // Idempotent retry: flow was already cancelled/completed/failed.
1648            // Return Cancelled with the *stored* policy and member list so
1649            // observability tooling gets the real historical state rather
1650            // than echoing the caller's retry intent. One HMGET + SMEMBERS
1651            // on the idempotent path — both on {fp:N}, same slot.
1652            ParsedCancelFlow::AlreadyTerminal => {
1653                let flow_meta: Vec<Option<String>> = self
1654                    .client
1655                    .cmd("HMGET")
1656                    .arg(fctx.core())
1657                    .arg("cancellation_policy")
1658                    .arg("cancel_reason")
1659                    .execute()
1660                    .await
1661                    .map_err(|e| crate::server::backend_context(e, "HMGET flow_core cancellation_policy,cancel_reason"))?;
1662                let stored_policy = flow_meta
1663                    .first()
1664                    .and_then(|v| v.as_ref())
1665                    .filter(|s| !s.is_empty())
1666                    .cloned();
1667                let stored_reason = flow_meta
1668                    .get(1)
1669                    .and_then(|v| v.as_ref())
1670                    .filter(|s| !s.is_empty())
1671                    .cloned();
1672                let all_members: Vec<String> = self
1673                    .client
1674                    .cmd("SMEMBERS")
1675                    .arg(fctx.members())
1676                    .execute()
1677                    .await
1678                    .map_err(|e| crate::server::backend_context(e, "SMEMBERS flow members (already terminal)"))?;
1679                // Cap the returned list to avoid pathological bandwidth on
1680                // idempotent retries for flows with 10k+ members. Clients
1681                // already received the authoritative member list on the
1682                // first (non-idempotent) call; subsequent retries just need
1683                // enough to confirm the operation and trigger per-member
1684                // polling if desired.
1685                let total_members = all_members.len();
1686                let stored_members: Vec<String> = all_members
1687                    .into_iter()
1688                    .take(ALREADY_TERMINAL_MEMBER_CAP)
1689                    .collect();
1690                tracing::debug!(
1691                    flow_id = %args.flow_id,
1692                    stored_policy = stored_policy.as_deref().unwrap_or(""),
1693                    stored_reason = stored_reason.as_deref().unwrap_or(""),
1694                    total_members,
1695                    returned_members = stored_members.len(),
1696                    "cancel_flow: flow already terminal, returning idempotent Cancelled"
1697                );
1698                return Ok(CancelFlowResult::Cancelled {
1699                    // Fall back to caller's policy only if the stored field
1700                    // is missing (flows cancelled by older Lua that did not
1701                    // persist cancellation_policy).
1702                    cancellation_policy: stored_policy
1703                        .unwrap_or_else(|| args.cancellation_policy.clone()),
1704                    member_execution_ids: stored_members,
1705                });
1706            }
1707        };
1708        let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1709
1710        if !needs_dispatch {
1711            return Ok(CancelFlowResult::Cancelled {
1712                cancellation_policy: policy,
1713                member_execution_ids: members,
1714            });
1715        }
1716
1717        let pending_cancels_key = fctx.pending_cancels();
1718        let cancel_backlog_key = fidx.cancel_backlog();
1719
1720        if wait {
1721            // Synchronous dispatch — cancel every member inline before returning.
1722            // Collect per-member failures so the caller sees a
1723            // PartiallyCancelled outcome instead of a false-positive
1724            // Cancelled when any member cancel faulted (ghost member,
1725            // transport exhaustion, Lua reject). The cancel-backlog
1726            // reconciler still retries the unacked members; surfacing
1727            // the partial state lets operator tooling alert without
1728            // polling per-member state.
1729            let mut failed: Vec<String> = Vec::new();
1730            for eid_str in &members {
1731                match cancel_member_execution(
1732                    &self.client,
1733                    &self.config.partition_config,
1734                    eid_str,
1735                    &args.reason,
1736                    args.now,
1737                )
1738                .await
1739                {
1740                    Ok(()) => {
1741                        ack_cancel_member(
1742                            &self.client,
1743                            &pending_cancels_key,
1744                            &cancel_backlog_key,
1745                            eid_str,
1746                            &args.flow_id.to_string(),
1747                        )
1748                        .await;
1749                    }
1750                    Err(e) => {
1751                        // If the member was already terminal (execution_not_active /
1752                        // execution_not_found), treat this as ack-worthy success —
1753                        // the member is effectively "cancelled" from the flow's
1754                        // perspective and shouldn't be surfaced as a partial
1755                        // failure. Mirrors cancel_reconciler::cancel_member which
1756                        // acks on the same codes to avoid backlog poisoning.
1757                        if is_terminal_ack_error(&e) {
1758                            ack_cancel_member(
1759                                &self.client,
1760                                &pending_cancels_key,
1761                                &cancel_backlog_key,
1762                                eid_str,
1763                                &args.flow_id.to_string(),
1764                            )
1765                            .await;
1766                            continue;
1767                        }
1768                        tracing::warn!(
1769                            execution_id = %eid_str,
1770                            error = %e,
1771                            "cancel_flow(wait): individual execution cancel failed \
1772                             (transport/contract fault; reconciler will retry if transient)"
1773                        );
1774                        failed.push(eid_str.clone());
1775                    }
1776                }
1777            }
1778            if failed.is_empty() {
1779                return Ok(CancelFlowResult::Cancelled {
1780                    cancellation_policy: policy,
1781                    member_execution_ids: members,
1782                });
1783            }
1784            return Ok(CancelFlowResult::PartiallyCancelled {
1785                cancellation_policy: policy,
1786                member_execution_ids: members,
1787                failed_member_execution_ids: failed,
1788            });
1789        }
1790
1791        // Asynchronous dispatch — spawn into the shared JoinSet so Server::shutdown
1792        // can wait for pending cancellations (bounded by a shutdown timeout).
1793        let client = self.client.clone();
1794        let partition_config = self.config.partition_config;
1795        let reason = args.reason.clone();
1796        let now = args.now;
1797        let dispatch_members = members.clone();
1798        let flow_id = args.flow_id.clone();
1799        // Every async cancel_flow contends on this lock, but the critical
1800        // section is tiny: try_join_next drain + spawn. Drain is amortized
1801        // O(1) — each completed task is reaped exactly once across all
1802        // callers, and spawn is synchronous. At realistic cancel rates the
1803        // lock hold time is microseconds and does not bottleneck handlers.
1804        let mut guard = self.background_tasks.lock().await;
1805
1806        // Reap completed background dispatches before spawning the next.
1807        // Without this sweep, JoinSet accumulates Ok(()) results for every
1808        // async cancel ever issued — a memory leak in long-running servers
1809        // that would otherwise only drain on Server::shutdown. Surface any
1810        // panicked/aborted dispatches via tracing so silent failures in
1811        // cancel_member_execution are visible in logs.
1812        while let Some(joined) = guard.try_join_next() {
1813            if let Err(e) = joined {
1814                tracing::warn!(
1815                    error = %e,
1816                    "cancel_flow: background dispatch task panicked or was aborted"
1817                );
1818            }
1819        }
1820
1821        let pending_key_owned = pending_cancels_key.clone();
1822        let backlog_key_owned = cancel_backlog_key.clone();
1823        let flow_id_str = args.flow_id.to_string();
1824
1825        guard.spawn(async move {
1826            // Bounded parallel dispatch via futures::stream::buffer_unordered.
1827            // Sequential cancel of a 1000-member flow at ~2ms/FCALL is ~2s —
1828            // too long to finish inside a 15s shutdown abort window for
1829            // large flows. Bounding at CONCURRENCY keeps Valkey load
1830            // predictable while still cutting wall-clock dispatch time by
1831            // ~CONCURRENCY× for large member sets.
1832            use futures::stream::StreamExt;
1833            const CONCURRENCY: usize = 16;
1834
1835            let member_count = dispatch_members.len();
1836            let flow_id_for_log = flow_id.clone();
1837            futures::stream::iter(dispatch_members)
1838                .map(|eid_str| {
1839                    let client = client.clone();
1840                    let reason = reason.clone();
1841                    let flow_id = flow_id.clone();
1842                    let pending = pending_key_owned.clone();
1843                    let backlog = backlog_key_owned.clone();
1844                    let flow_id_str = flow_id_str.clone();
1845                    async move {
1846                        match cancel_member_execution(
1847                            &client,
1848                            &partition_config,
1849                            &eid_str,
1850                            &reason,
1851                            now,
1852                        )
1853                        .await
1854                        {
1855                            Ok(()) => {
1856                                ack_cancel_member(
1857                                    &client,
1858                                    &pending,
1859                                    &backlog,
1860                                    &eid_str,
1861                                    &flow_id_str,
1862                                )
1863                                .await;
1864                            }
1865                            Err(e) => {
1866                                if is_terminal_ack_error(&e) {
1867                                    ack_cancel_member(
1868                                        &client,
1869                                        &pending,
1870                                        &backlog,
1871                                        &eid_str,
1872                                        &flow_id_str,
1873                                    )
1874                                    .await;
1875                                } else {
1876                                    tracing::warn!(
1877                                        flow_id = %flow_id,
1878                                        execution_id = %eid_str,
1879                                        error = %e,
1880                                        "cancel_flow(async): individual execution cancel failed \
1881                                         (transport/contract fault; reconciler will retry if transient)"
1882                                    );
1883                                }
1884                            }
1885                        }
1886                    }
1887                })
1888                .buffer_unordered(CONCURRENCY)
1889                .for_each(|()| async {})
1890                .await;
1891
1892            tracing::debug!(
1893                flow_id = %flow_id_for_log,
1894                member_count,
1895                concurrency = CONCURRENCY,
1896                "cancel_flow: background member dispatch complete"
1897            );
1898        });
1899        drop(guard);
1900
1901        let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1902        Ok(CancelFlowResult::CancellationScheduled {
1903            cancellation_policy: policy,
1904            member_count,
1905            member_execution_ids: members,
1906        })
1907    }
1908
1909    /// Stage a dependency edge between two executions in a flow.
1910    ///
1911    /// Runs on the flow partition {fp:N}.
1912    /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1913    pub async fn stage_dependency_edge(
1914        &self,
1915        args: &StageDependencyEdgeArgs,
1916    ) -> Result<StageDependencyEdgeResult, ServerError> {
1917        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1918        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1919
1920        // KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set, grant_hash
1921        let fcall_keys: Vec<String> = vec![
1922            fctx.core(),
1923            fctx.members(),
1924            fctx.edge(&args.edge_id),
1925            fctx.outgoing(&args.upstream_execution_id),
1926            fctx.incoming(&args.downstream_execution_id),
1927            fctx.grant(&args.edge_id.to_string()),
1928        ];
1929
1930        // ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
1931        //           dependency_kind, data_passing_ref, expected_graph_revision, now_ms
1932        let fcall_args: Vec<String> = vec![
1933            args.flow_id.to_string(),
1934            args.edge_id.to_string(),
1935            args.upstream_execution_id.to_string(),
1936            args.downstream_execution_id.to_string(),
1937            args.dependency_kind.clone(),
1938            args.data_passing_ref.clone().unwrap_or_default(),
1939            args.expected_graph_revision.to_string(),
1940            args.now.to_string(),
1941        ];
1942
1943        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1944        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1945
1946        let raw: Value = self
1947            .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1948            .await?;
1949
1950        parse_stage_dependency_edge_result(&raw)
1951    }
1952
1953    /// Apply a staged dependency edge to the child execution.
1954    ///
1955    /// Runs on the child execution partition {p:N}.
1956    /// KEYS (8), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.
1957    pub async fn apply_dependency_to_child(
1958        &self,
1959        args: &ApplyDependencyToChildArgs,
1960    ) -> Result<ApplyDependencyToChildResult, ServerError> {
1961        let partition = execution_partition(
1962            &args.downstream_execution_id,
1963            &self.config.partition_config,
1964        );
1965        let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1966        let idx = IndexKeys::new(&partition);
1967        let flow_partition = ff_core::partition::flow_partition(
1968            &args.flow_id,
1969            &self.config.partition_config,
1970        );
1971        let flow_ctx = ff_core::keys::FlowKeyContext::new(&flow_partition, &args.flow_id);
1972
1973        // Pre-read lane_id for index keys
1974        let lane_str: Option<String> = self
1975            .client
1976            .hget(&ctx.core(), "lane_id")
1977            .await
1978            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
1979        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1980
1981        // KEYS (8): exec_core, deps_meta, unresolved_set, dep_hash,
1982        //           eligible_zset, blocked_deps_zset, deps_all_edges, edgegroup
1983        let fcall_keys: Vec<String> = vec![
1984            ctx.core(),
1985            ctx.deps_meta(),
1986            ctx.deps_unresolved(),
1987            ctx.dep_edge(&args.edge_id),
1988            idx.lane_eligible(&lane),
1989            idx.lane_blocked_dependencies(&lane),
1990            ctx.deps_all_edges(),
1991            flow_ctx.edgegroup(&args.downstream_execution_id),
1992        ];
1993
1994        // ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
1995        //           dependency_kind, data_passing_ref, now_ms
1996        let fcall_args: Vec<String> = vec![
1997            args.flow_id.to_string(),
1998            args.edge_id.to_string(),
1999            args.upstream_execution_id.to_string(),
2000            args.graph_revision.to_string(),
2001            args.dependency_kind.clone(),
2002            args.data_passing_ref.clone().unwrap_or_default(),
2003            args.now.to_string(),
2004        ];
2005
2006        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2007        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2008
2009        let raw: Value = self
2010            .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
2011            .await?;
2012
2013        parse_apply_dependency_result(&raw)
2014    }
2015
2016    // ── Execution operations API ──
2017
2018    /// Deliver a signal to a suspended (or pending-waitpoint) execution.
2019    ///
2020    /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
2021    /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
2022    pub async fn deliver_signal(
2023        &self,
2024        args: &DeliverSignalArgs,
2025    ) -> Result<DeliverSignalResult, ServerError> {
2026        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
2027        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
2028        let idx = IndexKeys::new(&partition);
2029
2030        // Pre-read lane_id for index keys
2031        let lane_str: Option<String> = self
2032            .client
2033            .hget(&ctx.core(), "lane_id")
2034            .await
2035            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2036        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2037
2038        let wp_id = &args.waitpoint_id;
2039        let sig_id = &args.signal_id;
2040        let idem_key = args
2041            .idempotency_key
2042            .as_ref()
2043            .filter(|k| !k.is_empty())
2044            .map(|k| ctx.signal_dedup(wp_id, k))
2045            .unwrap_or_else(|| ctx.noop());
2046
2047        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
2048        //            exec_signals_zset, signal_hash, signal_payload,
2049        //            idem_key, waitpoint_hash, suspension_current,
2050        //            eligible_zset, suspended_zset, delayed_zset,
2051        //            suspension_timeout_zset, hmac_secrets
2052        let fcall_keys: Vec<String> = vec![
2053            ctx.core(),                       // 1
2054            ctx.waitpoint_condition(wp_id),    // 2
2055            ctx.waitpoint_signals(wp_id),      // 3
2056            ctx.exec_signals(),                // 4
2057            ctx.signal(sig_id),                // 5
2058            ctx.signal_payload(sig_id),        // 6
2059            idem_key,                          // 7
2060            ctx.waitpoint(wp_id),              // 8
2061            ctx.suspension_current(),          // 9
2062            idx.lane_eligible(&lane),          // 10
2063            idx.lane_suspended(&lane),         // 11
2064            idx.lane_delayed(&lane),           // 12
2065            idx.suspension_timeout(),          // 13
2066            idx.waitpoint_hmac_secrets(),      // 14
2067        ];
2068
2069        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
2070        //            signal_category, source_type, source_identity,
2071        //            payload, payload_encoding, idempotency_key,
2072        //            correlation_id, target_scope, created_at,
2073        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
2074        //            max_signals_per_execution, waitpoint_token
2075        let fcall_args: Vec<String> = vec![
2076            args.signal_id.to_string(),                          // 1
2077            args.execution_id.to_string(),                       // 2
2078            args.waitpoint_id.to_string(),                       // 3
2079            args.signal_name.clone(),                            // 4
2080            args.signal_category.clone(),                        // 5
2081            args.source_type.clone(),                            // 6
2082            args.source_identity.clone(),                        // 7
2083            args.payload.as_ref()
2084                .map(|p| String::from_utf8_lossy(p).into_owned())
2085                .unwrap_or_default(),                            // 8
2086            args.payload_encoding
2087                .clone()
2088                .unwrap_or_else(|| "json".to_owned()),           // 9
2089            args.idempotency_key
2090                .clone()
2091                .unwrap_or_default(),                            // 10
2092            args.correlation_id
2093                .clone()
2094                .unwrap_or_default(),                            // 11
2095            args.target_scope.clone(),                           // 12
2096            args.created_at
2097                .map(|ts| ts.to_string())
2098                .unwrap_or_else(|| args.now.to_string()),        // 13
2099            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), // 14
2100            args.resume_delay_ms.unwrap_or(0).to_string(),       // 15
2101            args.signal_maxlen.unwrap_or(1000).to_string(),      // 16
2102            args.max_signals_per_execution
2103                .unwrap_or(10_000)
2104                .to_string(),                                    // 17
2105            // WIRE BOUNDARY — raw token to Lua. Display is redacted
2106            // for log safety; use .as_str() at the wire crossing.
2107            args.waitpoint_token.as_str().to_owned(),            // 18
2108        ];
2109
2110        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2111        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2112
2113        let raw: Value = self
2114            .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2115            .await?;
2116
2117        parse_deliver_signal_result(&raw, &args.signal_id)
2118    }
2119
2120    /// Change an execution's priority.
2121    ///
2122    /// KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.
2123    pub async fn change_priority(
2124        &self,
2125        execution_id: &ExecutionId,
2126        new_priority: i32,
2127    ) -> Result<ChangePriorityResult, ServerError> {
2128        let partition = execution_partition(execution_id, &self.config.partition_config);
2129        let ctx = ExecKeyContext::new(&partition, execution_id);
2130        let idx = IndexKeys::new(&partition);
2131
2132        // Read lane_id for eligible_zset key
2133        let lane_str: Option<String> = self
2134            .client
2135            .hget(&ctx.core(), "lane_id")
2136            .await
2137            .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2138        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2139
2140        // KEYS (2): exec_core, eligible_zset
2141        let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2142
2143        // ARGV (2): execution_id, new_priority
2144        let fcall_args: Vec<String> = vec![
2145            execution_id.to_string(),
2146            new_priority.to_string(),
2147        ];
2148
2149        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2150        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2151
2152        let raw: Value = self
2153            .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2154            .await?;
2155
2156        parse_change_priority_result(&raw, execution_id)
2157    }
2158
2159    /// Scheduler-routed claim entry point (Batch C item 2 PR-B).
2160    ///
2161    /// Delegates to [`ff_scheduler::Scheduler::claim_for_worker`] which
2162    /// runs budget + quota + capability admission before issuing the
2163    /// grant. Returns `Ok(None)` when no eligible execution exists on
2164    /// the lane at this scan cycle. The worker's subsequent
2165    /// `claim_from_grant(lane, grant)` mints the lease.
2166    ///
2167    /// Keeping the claim-grant mint inside the server (rather than the
2168    /// worker) means capability CSV validation, budget/quota breach
2169    /// checks, and lane routing run in one place for every tenant
2170    /// worker — the same invariants as the `direct-valkey-claim` path
2171    /// enforces inline, but gated at a single server choke point.
2172    pub async fn claim_for_worker(
2173        &self,
2174        lane: &LaneId,
2175        worker_id: &WorkerId,
2176        worker_instance_id: &WorkerInstanceId,
2177        worker_capabilities: &std::collections::BTreeSet<String>,
2178        grant_ttl_ms: u64,
2179    ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2180        self.scheduler
2181            .claim_for_worker(
2182                lane,
2183                worker_id,
2184                worker_instance_id,
2185                worker_capabilities,
2186                grant_ttl_ms,
2187            )
2188            .await
2189            .map_err(|e| match e {
2190                ff_scheduler::SchedulerError::Valkey(inner) => ServerError::from(inner),
2191                ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2192                    crate::server::backend_context(source, context)
2193                }
2194                ff_scheduler::SchedulerError::Config(msg) => ServerError::InvalidInput(msg),
2195            })
2196    }
2197
2198    /// Revoke an active lease (operator-initiated).
2199    pub async fn revoke_lease(
2200        &self,
2201        execution_id: &ExecutionId,
2202    ) -> Result<RevokeLeaseResult, ServerError> {
2203        let partition = execution_partition(execution_id, &self.config.partition_config);
2204        let ctx = ExecKeyContext::new(&partition, execution_id);
2205        let idx = IndexKeys::new(&partition);
2206
2207        // Pre-read worker_instance_id for worker_leases key
2208        let wiid_str: Option<String> = self
2209            .client
2210            .hget(&ctx.core(), "current_worker_instance_id")
2211            .await
2212            .map_err(|e| crate::server::backend_context(e, "HGET worker_instance_id"))?;
2213        let wiid = match wiid_str {
2214            Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2215            _ => {
2216                return Err(ServerError::NotFound(format!(
2217                    "no active lease for execution {execution_id} (no current_worker_instance_id)"
2218                )));
2219            }
2220        };
2221
2222        // KEYS (5): exec_core, lease_current, lease_history, lease_expiry_zset, worker_leases
2223        let fcall_keys: Vec<String> = vec![
2224            ctx.core(),
2225            ctx.lease_current(),
2226            ctx.lease_history(),
2227            idx.lease_expiry(),
2228            idx.worker_leases(&wiid),
2229        ];
2230
2231        // ARGV (3): execution_id, expected_lease_id (empty = skip check), revoke_reason
2232        let fcall_args: Vec<String> = vec![
2233            execution_id.to_string(),
2234            String::new(), // no expected_lease_id check
2235            "operator_revoke".to_owned(),
2236        ];
2237
2238        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2239        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2240
2241        let raw: Value = self
2242            .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2243            .await?;
2244
2245        parse_revoke_lease_result(&raw)
2246    }
2247
2248    /// Get full execution info via HGETALL on exec_core.
2249    pub async fn get_execution(
2250        &self,
2251        execution_id: &ExecutionId,
2252    ) -> Result<ExecutionInfo, ServerError> {
2253        let partition = execution_partition(execution_id, &self.config.partition_config);
2254        let ctx = ExecKeyContext::new(&partition, execution_id);
2255
2256        let fields: HashMap<String, String> = self
2257            .client
2258            .hgetall(&ctx.core())
2259            .await
2260            .map_err(|e| crate::server::backend_context(e, "HGETALL exec_core"))?;
2261
2262        if fields.is_empty() {
2263            return Err(ServerError::NotFound(format!(
2264                "execution not found: {execution_id}"
2265            )));
2266        }
2267
2268        let parse_enum = |field: &str| -> String {
2269            fields.get(field).cloned().unwrap_or_default()
2270        };
2271        fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2272            let quoted = format!("\"{raw}\"");
2273            serde_json::from_str(&quoted).map_err(|e| {
2274                ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2275            })
2276        }
2277
2278        let lp_str = parse_enum("lifecycle_phase");
2279        let os_str = parse_enum("ownership_state");
2280        let es_str = parse_enum("eligibility_state");
2281        let br_str = parse_enum("blocking_reason");
2282        let to_str = parse_enum("terminal_outcome");
2283        let as_str = parse_enum("attempt_state");
2284        let ps_str = parse_enum("public_state");
2285
2286        let state_vector = StateVector {
2287            lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2288            ownership_state: deserialize("ownership_state", &os_str)?,
2289            eligibility_state: deserialize("eligibility_state", &es_str)?,
2290            blocking_reason: deserialize("blocking_reason", &br_str)?,
2291            terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2292            attempt_state: deserialize("attempt_state", &as_str)?,
2293            public_state: deserialize("public_state", &ps_str)?,
2294        };
2295
2296        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2297        // stamped atomically with membership in `add_execution_to_flow`'s
2298        // single FCALL. Empty iff the exec has no flow affinity (never
2299        // called `add_execution_to_flow` — solo execs) — NOT "orphaned
2300        // mid-two-phase" (that failure mode is retired). Filter on empty
2301        // to surface "no flow affinity" distinctly from "flow X".
2302        let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2303
2304        // started_at / completed_at come from the exec_core hash —
2305        // lua/execution.lua writes them alongside the rest of the state
2306        // vector. `created_at` is required; the other two are optional
2307        // (pre-claim / pre-terminal reads) and elided when empty so the
2308        // wire payload for in-flight executions stays the shape existing
2309        // callers expect.
2310        let started_at_opt = fields
2311            .get("started_at")
2312            .filter(|s| !s.is_empty())
2313            .cloned();
2314        let completed_at_opt = fields
2315            .get("completed_at")
2316            .filter(|s| !s.is_empty())
2317            .cloned();
2318
2319        Ok(ExecutionInfo {
2320            execution_id: execution_id.clone(),
2321            namespace: parse_enum("namespace"),
2322            lane_id: parse_enum("lane_id"),
2323            priority: fields
2324                .get("priority")
2325                .and_then(|v| v.parse().ok())
2326                .unwrap_or(0),
2327            execution_kind: parse_enum("execution_kind"),
2328            state_vector,
2329            public_state: deserialize("public_state", &ps_str)?,
2330            created_at: parse_enum("created_at"),
2331            started_at: started_at_opt,
2332            completed_at: completed_at_opt,
2333            current_attempt_index: fields
2334                .get("current_attempt_index")
2335                .and_then(|v| v.parse().ok())
2336                .unwrap_or(0),
2337            flow_id: flow_id_val,
2338            blocking_detail: parse_enum("blocking_detail"),
2339        })
2340    }
2341
2342    /// Partition-scoped forward-only cursor listing of executions.
2343    ///
2344    /// Parity-wrapper around the Valkey body of
2345    /// [`ff_core::engine_backend::EngineBackend::list_executions`].
2346    /// Issue #182 replaced the previous offset + lane + state-filter
2347    /// shape with this cursor-based API (per owner adjudication:
2348    /// cursor-everywhere, HTTP surface unreleased). Reads
2349    /// `ff:idx:{p:N}:all_executions`, sorts lexicographically on
2350    /// `ExecutionId`, filters `> cursor`, and trims to `limit`.
2351    pub async fn list_executions_page(
2352        &self,
2353        partition_id: u16,
2354        cursor: Option<ExecutionId>,
2355        limit: usize,
2356    ) -> Result<ListExecutionsPage, ServerError> {
2357        if limit == 0 {
2358            return Ok(ListExecutionsPage::new(Vec::new(), None));
2359        }
2360        let partition = ff_core::partition::Partition {
2361            family: ff_core::partition::PartitionFamily::Execution,
2362            index: partition_id,
2363        };
2364        let idx = IndexKeys::new(&partition);
2365        let all_key = idx.all_executions();
2366
2367        let raw_members: Vec<String> = self
2368            .client
2369            .cmd("SMEMBERS")
2370            .arg(&all_key)
2371            .execute()
2372            .await
2373            .map_err(|e| crate::server::backend_context(e, format!("SMEMBERS {all_key}")))?;
2374
2375        if raw_members.is_empty() {
2376            return Ok(ListExecutionsPage::new(Vec::new(), None));
2377        }
2378
2379        let mut parsed: Vec<ExecutionId> = Vec::with_capacity(raw_members.len());
2380        for raw in &raw_members {
2381            match ExecutionId::parse(raw) {
2382                Ok(id) => parsed.push(id),
2383                Err(e) => {
2384                    tracing::warn!(
2385                        raw_id = %raw,
2386                        error = %e,
2387                        set = %all_key,
2388                        "list_executions_page: SMEMBERS member failed to parse as ExecutionId \
2389                         (data corruption?)"
2390                    );
2391                }
2392            }
2393        }
2394        parsed.sort_by(|a, b| a.as_str().cmp(b.as_str()));
2395
2396        let filtered: Vec<ExecutionId> = if let Some(c) = cursor.as_ref() {
2397            let cs = c.as_str();
2398            parsed.into_iter().filter(|e| e.as_str() > cs).collect()
2399        } else {
2400            parsed
2401        };
2402
2403        let effective_limit = limit.min(1000);
2404        let has_more = filtered.len() > effective_limit;
2405        let page: Vec<ExecutionId> = filtered.into_iter().take(effective_limit).collect();
2406        let next_cursor = if has_more { page.last().cloned() } else { None };
2407        Ok(ListExecutionsPage::new(page, next_cursor))
2408    }
2409
2410    /// Replay a terminal execution.
2411    ///
2412    /// Pre-reads exec_core for flow_id and dep edges (variable KEYS).
2413    /// KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.
2414    pub async fn replay_execution(
2415        &self,
2416        execution_id: &ExecutionId,
2417    ) -> Result<ReplayExecutionResult, ServerError> {
2418        let partition = execution_partition(execution_id, &self.config.partition_config);
2419        let ctx = ExecKeyContext::new(&partition, execution_id);
2420        let idx = IndexKeys::new(&partition);
2421
2422        // Pre-read lane_id, flow_id, terminal_outcome.
2423        //
2424        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2425        // stamped atomically with membership by `add_execution_to_flow`'s
2426        // single FCALL. Empty iff the exec has no flow affinity
2427        // (solo-path create_execution — never added to a flow). The
2428        // `is_skipped_flow_member` branch below gates on
2429        // `!flow_id_str.is_empty()`, so solo execs correctly fall back
2430        // to the non-flow-member replay path. See
2431        // `add_execution_to_flow` rustdoc for the atomic-commit
2432        // invariant.
2433        let dyn_fields: Vec<Option<String>> = self
2434            .client
2435            .cmd("HMGET")
2436            .arg(ctx.core())
2437            .arg("lane_id")
2438            .arg("flow_id")
2439            .arg("terminal_outcome")
2440            .execute()
2441            .await
2442            .map_err(|e| crate::server::backend_context(e, "HMGET replay pre-read"))?;
2443        let lane = LaneId::new(
2444            dyn_fields
2445                .first()
2446                .and_then(|v| v.as_ref())
2447                .cloned()
2448                .unwrap_or_else(|| "default".to_owned()),
2449        );
2450        let flow_id_str = dyn_fields
2451            .get(1)
2452            .and_then(|v| v.as_ref())
2453            .cloned()
2454            .unwrap_or_default();
2455        let terminal_outcome = dyn_fields
2456            .get(2)
2457            .and_then(|v| v.as_ref())
2458            .cloned()
2459            .unwrap_or_default();
2460
2461        let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2462
2463        // Base KEYS (4): exec_core, terminal_zset, eligible_zset, lease_history
2464        let mut fcall_keys: Vec<String> = vec![
2465            ctx.core(),
2466            idx.lane_terminal(&lane),
2467            idx.lane_eligible(&lane),
2468            ctx.lease_history(),
2469        ];
2470
2471        // Base ARGV (2): execution_id, now_ms
2472        let now = TimestampMs::now();
2473        let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2474
2475        if is_skipped_flow_member {
2476            // Read ALL inbound edge IDs from the flow partition's adjacency set.
2477            // Cannot use deps:unresolved because impossible edges were SREM'd
2478            // by ff_resolve_dependency. The flow's in:<eid> set has all edges.
2479            let flow_id = FlowId::parse(&flow_id_str)
2480                .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2481            let flow_part =
2482                flow_partition(&flow_id, &self.config.partition_config);
2483            let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2484            let edge_ids: Vec<String> = self
2485                .client
2486                .cmd("SMEMBERS")
2487                .arg(flow_ctx.incoming(execution_id))
2488                .execute()
2489                .await
2490                .map_err(|e| crate::server::backend_context(e, "SMEMBERS replay edges"))?;
2491
2492            // Extended KEYS: blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N
2493            fcall_keys.push(idx.lane_blocked_dependencies(&lane)); // 5
2494            fcall_keys.push(ctx.deps_meta()); // 6
2495            fcall_keys.push(ctx.deps_unresolved()); // 7
2496            for eid_str in &edge_ids {
2497                let edge_id = EdgeId::parse(eid_str)
2498                    .unwrap_or_else(|_| EdgeId::new());
2499                fcall_keys.push(ctx.dep_edge(&edge_id)); // 8..8+N
2500                fcall_args.push(eid_str.clone()); // 3..3+N
2501            }
2502        }
2503
2504        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2505        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2506
2507        let raw: Value = self
2508            .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2509            .await?;
2510
2511        parse_replay_result(&raw)
2512    }
2513
2514    /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
2515    /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
2516    /// when the producer finalizes.
2517    ///
2518    /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
2519    /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
2520    /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
2521    /// and the Lua-side reject).
2522    ///
2523    /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
2524    /// execution id, so all KEYS share the same slot.
2525    pub async fn read_attempt_stream(
2526        &self,
2527        execution_id: &ExecutionId,
2528        attempt_index: AttemptIndex,
2529        from_id: &str,
2530        to_id: &str,
2531        count_limit: u64,
2532    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2533        use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2534
2535        if count_limit == 0 {
2536            return Err(ServerError::InvalidInput(
2537                "count_limit must be >= 1".to_owned(),
2538            ));
2539        }
2540
2541        // Share the same semaphore as tail. A large XRANGE reply (10_000
2542        // frames × ~64KB) is just as capable of head-of-line-blocking the
2543        // tail_client mux as a long BLOCK — fairness accounting must be
2544        // unified. Non-blocking acquire → 429 on contention.
2545        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2546            Ok(p) => p,
2547            Err(tokio::sync::TryAcquireError::NoPermits) => {
2548                return Err(ServerError::ConcurrencyLimitExceeded(
2549                    "stream_ops",
2550                    self.config.max_concurrent_stream_ops,
2551                ));
2552            }
2553            Err(tokio::sync::TryAcquireError::Closed) => {
2554                return Err(ServerError::OperationFailed(
2555                    "stream semaphore closed (server shutting down)".into(),
2556                ));
2557            }
2558        };
2559
2560        let args = ReadFramesArgs {
2561            execution_id: execution_id.clone(),
2562            attempt_index,
2563            from_id: from_id.to_owned(),
2564            to_id: to_id.to_owned(),
2565            count_limit,
2566        };
2567
2568        let partition = execution_partition(execution_id, &self.config.partition_config);
2569        let ctx = ExecKeyContext::new(&partition, execution_id);
2570        let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2571
2572        // Route on the dedicated stream client, same as tail. A 10_000-
2573        // frame XRANGE reply on the main mux would stall every other
2574        // FCALL behind reply serialization.
2575        let result = ff_script::functions::stream::ff_read_attempt_stream(
2576            &self.tail_client, &keys, &args,
2577        )
2578        .await
2579        .map_err(script_error_to_server);
2580
2581        drop(permit);
2582
2583        match result? {
2584            ReadFramesResult::Frames(f) => Ok(f),
2585        }
2586    }
2587
2588    /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
2589    /// plus the terminal signal so a polling consumer can exit when the
2590    /// producer closes the stream.
2591    ///
2592    /// `last_id` is exclusive — XREAD returns entries with id > last_id.
2593    /// Pass `"0-0"` to read from the beginning.
2594    ///
2595    /// `block_ms == 0` → non-blocking peek (returns immediately).
2596    /// `block_ms > 0`  → blocks up to that many ms. Empty `frames` +
2597    /// `closed_at=None` → timeout, no new data, still open.
2598    ///
2599    /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
2600    ///
2601    /// Implemented as a direct XREAD command (not FCALL) because blocking
2602    /// commands are rejected inside Valkey Functions. The terminal
2603    /// markers come from a companion HMGET on `stream_meta` — see
2604    /// `ff_script::stream_tail` module docs.
2605    pub async fn tail_attempt_stream(
2606        &self,
2607        execution_id: &ExecutionId,
2608        attempt_index: AttemptIndex,
2609        last_id: &str,
2610        block_ms: u64,
2611        count_limit: u64,
2612    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2613        if count_limit == 0 {
2614            return Err(ServerError::InvalidInput(
2615                "count_limit must be >= 1".to_owned(),
2616            ));
2617        }
2618
2619        // Non-blocking permit acquisition on the shared stream_semaphore
2620        // (read + tail split the same pool). If the server is already at
2621        // the `max_concurrent_stream_ops` ceiling, return `TailUnavailable`
2622        // (→ 429) rather than queueing — a queued tail holds the caller's
2623        // HTTP request open with no upper bound, which is exactly the
2624        // resource-exhaustion pattern this limit exists to prevent.
2625        // Clients retry with backoff on 429.
2626        //
2627        // Worst-case permit hold: if the producer closes the stream via
2628        // HSET on stream_meta (not an XADD), the XREAD BLOCK won't wake
2629        // until `block_ms` elapses — so a permit can be held for up to
2630        // the caller's block_ms even though the terminal signal is
2631        // ready. This is a v1-accepted limitation; RFC-006 §Terminal-
2632        // signal timing under active tail documents it and sketches the
2633        // v2 sentinel-XADD upgrade path.
2634        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2635            Ok(p) => p,
2636            Err(tokio::sync::TryAcquireError::NoPermits) => {
2637                return Err(ServerError::ConcurrencyLimitExceeded(
2638                    "stream_ops",
2639                    self.config.max_concurrent_stream_ops,
2640                ));
2641            }
2642            Err(tokio::sync::TryAcquireError::Closed) => {
2643                return Err(ServerError::OperationFailed(
2644                    "stream semaphore closed (server shutting down)".into(),
2645                ));
2646            }
2647        };
2648
2649        let partition = execution_partition(execution_id, &self.config.partition_config);
2650        let ctx = ExecKeyContext::new(&partition, execution_id);
2651        let stream_key = ctx.stream(attempt_index);
2652        let stream_meta_key = ctx.stream_meta(attempt_index);
2653
2654        // Acquire the XREAD BLOCK serializer AFTER the stream semaphore.
2655        // Nesting order matters: the semaphore is the user-visible
2656        // ceiling (surfaces as 429), the Mutex is an internal fairness
2657        // gate. Holding the permit while waiting on the Mutex means the
2658        // ceiling still bounds queue depth. See the field docstring for
2659        // the full rationale (ferriskey pipeline FIFO + client-side
2660        // per-call timeout race).
2661        let _xread_guard = self.xread_block_lock.lock().await;
2662
2663        let result = ff_script::stream_tail::xread_block(
2664            &self.tail_client,
2665            &stream_key,
2666            &stream_meta_key,
2667            last_id,
2668            block_ms,
2669            count_limit,
2670        )
2671        .await
2672        .map_err(script_error_to_server);
2673
2674        drop(_xread_guard);
2675        drop(permit);
2676        result
2677    }
2678
2679    /// Graceful shutdown — stops scanners, drains background handler tasks
2680    /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
2681    /// for scanners to finish.
2682    ///
2683    /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
2684    /// cleanly without new arrivals piling up:
2685    ///
2686    /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
2687    ///    with `ServerError::OperationFailed("stream semaphore closed …")`
2688    ///    which the REST layer surfaces as a 500 with `retryable=false`
2689    ///    (ops tooling may choose to wait + retry on 503-class responses;
2690    ///    the body clearly names the shutdown reason).
2691    /// 2. Drain handler-spawned background tasks with a 15s ceiling.
2692    /// 3. `engine.shutdown()` stops scanners.
2693    ///
2694    /// Existing in-flight tails finish on their natural `block_ms`
2695    /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
2696    /// is dropped after this function returns. We do NOT wait for tails
2697    /// to drain explicitly — the semaphore-close + natural-timeout
2698    /// combination bounds shutdown to roughly `block_ms + 15s` in the
2699    /// worst case. Callers observing a dropped connection retry against
2700    /// whatever replacement is coming up.
2701    pub async fn shutdown(self) {
2702        tracing::info!("shutting down FlowFabric server");
2703
2704        // Step 1: Close the stream semaphore FIRST so any in-flight
2705        // read/tail calls that are between `try_acquire` and their
2706        // Valkey command still hold a valid permit, but no NEW stream
2707        // op can start. `Semaphore::close()` is idempotent.
2708        self.stream_semaphore.close();
2709        tracing::info!(
2710            "stream semaphore closed; no new read/tail attempts will be accepted"
2711        );
2712
2713        // Step 2: Drain handler-spawned background tasks with the same
2714        // ceiling as Engine::shutdown. If dispatch is still running at
2715        // the deadline, drop the JoinSet to abort remaining tasks.
2716        let drain_timeout = Duration::from_secs(15);
2717        let background = self.background_tasks.clone();
2718        let drain = async move {
2719            let mut guard = background.lock().await;
2720            while guard.join_next().await.is_some() {}
2721        };
2722        match tokio::time::timeout(drain_timeout, drain).await {
2723            Ok(()) => {}
2724            Err(_) => {
2725                tracing::warn!(
2726                    timeout_s = drain_timeout.as_secs(),
2727                    "shutdown: background tasks did not finish in time, aborting"
2728                );
2729                self.background_tasks.lock().await.abort_all();
2730            }
2731        }
2732
2733        self.engine.shutdown().await;
2734        tracing::info!("FlowFabric server shutdown complete");
2735    }
2736}
2737
2738// ── Valkey version check (RFC-011 §13) ──
2739
2740/// Minimum Valkey version the engine requires (see RFC-011 §13). 7.2 is the
2741/// release where Valkey Functions and RESP3 stabilized — the primitives the
2742/// co-location design and typed FCALL wrappers actually depend on.
2743const REQUIRED_VALKEY_MAJOR: u32 = 7;
2744const REQUIRED_VALKEY_MINOR: u32 = 2;
2745
2746/// Upper bound on the rolling-upgrade retry window (RFC-011 §9.17). A Valkey
2747/// node cycling through SIGTERM → restart typically completes in well under
2748/// 60s; this budget is generous without letting a truly-stuck cluster hang
2749/// boot indefinitely.
2750const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2751
2752/// Verify the connected Valkey reports a version ≥ 7.2.
2753///
2754/// Per RFC-011 §9.17, during a rolling upgrade the node we happen to connect
2755/// to may temporarily be pre-upgrade while others are post-upgrade. The check
2756/// tolerates this by retrying the whole verification (including low-version
2757/// responses) with exponential backoff, capped at a 60s budget.
2758///
2759/// **Retries on:**
2760/// - Low-version responses (below `(REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR)`)
2761///   — may resolve as the rolling upgrade progresses onto the connected node.
2762/// - Retryable ferriskey transport errors — connection refused,
2763///   `BusyLoadingError`, `ClusterDown`, etc., classified via
2764///   `ff_script::retry::is_retryable_kind`.
2765/// - Missing/unparsable version field — treated as transient (fresh-boot
2766///   server may not have the INFO fields populated yet). Reads
2767///   `valkey_version` when present (authoritative on Valkey 8.0+), falls
2768///   back to `redis_version` for Valkey 7.x.
2769///
2770/// **Does NOT retry on:**
2771/// - Non-retryable transport errors (auth failures, permission denied,
2772///   invalid client config) — these are operator misconfiguration, not
2773///   transient cluster state; fast-fail preserves a clear signal.
2774///
2775/// On budget exhaustion, returns the last observed error.
2776async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2777    let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2778    let mut backoff = Duration::from_millis(200);
2779    loop {
2780        let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2781            match query_valkey_version(client).await {
2782                Ok((detected_major, detected_minor))
2783                    if (detected_major, detected_minor)
2784                        >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2785                {
2786                    tracing::info!(
2787                        detected_major,
2788                        detected_minor,
2789                        required_major = REQUIRED_VALKEY_MAJOR,
2790                        required_minor = REQUIRED_VALKEY_MINOR,
2791                        "Valkey version accepted"
2792                    );
2793                    return Ok(());
2794                }
2795                Ok((detected_major, detected_minor)) => (
2796                    // Low version — may be a rolling-upgrade stale node.
2797                    // Retry within budget; after exhaustion, the cluster
2798                    // is misconfigured and fast-fail is the correct signal.
2799                    true,
2800                    ServerError::ValkeyVersionTooLow {
2801                        detected: format!("{detected_major}.{detected_minor}"),
2802                        required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2803                    },
2804                    format!(
2805                        "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2806                    ),
2807                ),
2808                Err(e) => {
2809                    // Only retry if the underlying Valkey error is retryable
2810                    // by kind. Auth / permission / invalid-config should fast-
2811                    // fail so operators see the true root cause immediately,
2812                    // not a 60s hang followed by a generic "transient" error.
2813                    let retryable = e
2814                        .backend_kind()
2815                        .map(|k| k.is_retryable())
2816                        // Non-backend errors (parse, missing field, operation
2817                        // failures) are treated as transient — a fresh-boot
2818                        // Valkey may not have redis_version populated yet.
2819                        .unwrap_or(true);
2820                    let detail = e.to_string();
2821                    (retryable, e, detail)
2822                }
2823            };
2824
2825        if !should_retry {
2826            return Err(err_for_budget_exhaust);
2827        }
2828        if tokio::time::Instant::now() >= deadline {
2829            return Err(err_for_budget_exhaust);
2830        }
2831        tracing::warn!(
2832            backoff_ms = backoff.as_millis() as u64,
2833            detail = %log_detail,
2834            "valkey version check transient failure; retrying"
2835        );
2836        tokio::time::sleep(backoff).await;
2837        backoff = (backoff * 2).min(Duration::from_secs(5));
2838    }
2839}
2840
2841/// Run `INFO server` and extract the `(major, minor)` components of the
2842/// Valkey version.
2843///
2844/// Returns `Err` on transport errors, missing field, or unparsable version.
2845/// Handles three response shapes:
2846///
2847/// - **Standalone:** single string body; parse directly.
2848/// - **Cluster (RESP3 map):** `INFO` returns a map keyed by node address;
2849///   every node runs the same version in a healthy deployment, so we pick
2850///   one entry and parse it. Divergent versions during a rolling upgrade
2851///   are handled by the outer retry loop.
2852/// - **Empty / unexpected:** surfaces as `OperationFailed` with context.
2853async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2854    let raw: Value = client
2855        .cmd("INFO")
2856        .arg("server")
2857        .execute()
2858        .await
2859        .map_err(|e| crate::server::backend_context(e, "INFO server"))?;
2860    let bodies = extract_info_bodies(&raw)?;
2861    // Cluster: return the minimum (major, minor) across all nodes so a stale
2862    // pre-upgrade replica cannot hide behind an already-upgraded primary.
2863    // Standalone: exactly one body. The outer retry loop tolerates rolling
2864    // upgrades — a briefly-low minimum gets retried; a persistently-low one
2865    // exits with the structured floor error.
2866    let mut min_version: Option<(u32, u32)> = None;
2867    for body in &bodies {
2868        let version = parse_valkey_version(body)?;
2869        min_version = Some(match min_version {
2870            None => version,
2871            Some(existing) => existing.min(version),
2872        });
2873    }
2874    min_version.ok_or_else(|| {
2875        ServerError::OperationFailed(
2876            "valkey version check: cluster INFO returned no node bodies".into(),
2877        )
2878    })
2879}
2880
2881/// Normalize an `INFO server` response to one string body per node.
2882///
2883/// Standalone returns a single body. Cluster (RESP3 map keyed by node address)
2884/// returns every node's body — the caller must consider all of them to reject
2885/// a mixed-version cluster where one stale node is below the floor.
2886fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2887    match raw {
2888        Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2889        Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2890        Value::SimpleString(s) => Ok(vec![s.clone()]),
2891        Value::Map(entries) => {
2892            if entries.is_empty() {
2893                return Err(ServerError::OperationFailed(
2894                    "valkey version check: cluster INFO returned empty map".into(),
2895                ));
2896            }
2897            let mut out = Vec::with_capacity(entries.len());
2898            for (_, body) in entries {
2899                out.extend(extract_info_bodies(body)?);
2900            }
2901            Ok(out)
2902        }
2903        other => Err(ServerError::OperationFailed(format!(
2904            "valkey version check: unexpected INFO shape: {other:?}"
2905        ))),
2906    }
2907}
2908
2909/// Extract the `(major, minor)` components of the Valkey version from an
2910/// `INFO server` response body. Pure parser — pulled out of
2911/// [`query_valkey_version`] so it is unit-testable without a live Valkey.
2912///
2913/// **Prefers `valkey_version:`** (introduced in Valkey 8.0+; this is the real
2914/// server version on 8.x/9.x, which pin `redis_version:7.2.4` for
2915/// Redis-client compatibility).
2916///
2917/// **Falls back to `redis_version:` only when the body carries an affirmative
2918/// `server_name:valkey` field.** `server_name:` was introduced in Valkey 7.2,
2919/// which is our floor — so every floor-compliant Valkey deployment carries
2920/// the marker. Redis does not emit `server_name:valkey`, which is how we
2921/// reject a Redis backend that would otherwise look identical to Valkey 7.x
2922/// at the `redis_version:` level.
2923fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2924    let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2925        let trimmed = line.trim();
2926        let mut parts = trimmed.split('.');
2927        let major_str = parts.next().unwrap_or("").trim();
2928        if major_str.is_empty() {
2929            return Err(ServerError::OperationFailed(format!(
2930                "valkey version check: empty version field in '{trimmed}'"
2931            )));
2932        }
2933        let major = major_str.parse::<u32>().map_err(|_| {
2934            ServerError::OperationFailed(format!(
2935                "valkey version check: non-numeric major in '{trimmed}'"
2936            ))
2937        })?;
2938        // Minor is required — a bare major ("7") cannot be compared against
2939        // the (major, minor) floor reliably. Valkey always reports
2940        // major.minor.patch for INFO, so missing minor is a real parse error.
2941        let minor_str = parts.next().unwrap_or("").trim();
2942        if minor_str.is_empty() {
2943            return Err(ServerError::OperationFailed(format!(
2944                "valkey version check: missing minor component in '{trimmed}'"
2945            )));
2946        }
2947        let minor = minor_str.parse::<u32>().map_err(|_| {
2948            ServerError::OperationFailed(format!(
2949                "valkey version check: non-numeric minor in '{trimmed}'"
2950            ))
2951        })?;
2952        Ok((major, minor))
2953    };
2954    // Prefer valkey_version (authoritative on Valkey 8.0+).
2955    if let Some(valkey_line) = info
2956        .lines()
2957        .find_map(|line| line.strip_prefix("valkey_version:"))
2958    {
2959        return extract_major_minor(valkey_line);
2960    }
2961    // No valkey_version — could be Valkey 7.2 (which doesn't emit the field)
2962    // or could be Redis. Require an affirmative server_name:valkey marker
2963    // before falling back to redis_version. This rejects Redis backends,
2964    // which don't emit server_name:valkey.
2965    let server_is_valkey = info
2966        .lines()
2967        .map(str::trim)
2968        .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
2969    if !server_is_valkey {
2970        return Err(ServerError::OperationFailed(
2971            "valkey version check: INFO missing valkey_version and server_name:valkey marker \
2972             (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
2973                .into(),
2974        ));
2975    }
2976    // Valkey 7.x fallback. 8.x+ pins redis_version:7.2.4 for Redis-client
2977    // compat, so reading it there would under-report — but 8.x+ is handled
2978    // by the valkey_version branch above, so we only reach here on 7.x.
2979    if let Some(redis_line) = info
2980        .lines()
2981        .find_map(|line| line.strip_prefix("redis_version:"))
2982    {
2983        return extract_major_minor(redis_line);
2984    }
2985    Err(ServerError::OperationFailed(
2986        "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
2987            .into(),
2988    ))
2989}
2990
2991// ── Partition config validation ──
2992
2993/// Validate or create the `ff:config:partitions` key on first boot.
2994///
2995/// If the key exists, its values must match the server's config.
2996/// If it doesn't exist, create it (first boot).
2997async fn validate_or_create_partition_config(
2998    client: &Client,
2999    config: &PartitionConfig,
3000) -> Result<(), ServerError> {
3001    let key = keys::global_config_partitions();
3002
3003    let existing: HashMap<String, String> = client
3004        .hgetall(&key)
3005        .await
3006        .map_err(|e| crate::server::backend_context(e, format!("HGETALL {key}")))?;
3007
3008    if existing.is_empty() {
3009        // First boot — create the config
3010        tracing::info!("first boot: creating {key}");
3011        client
3012            .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
3013            .await
3014            .map_err(|e| crate::server::backend_context(e, "HSET num_flow_partitions"))?;
3015        client
3016            .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
3017            .await
3018            .map_err(|e| crate::server::backend_context(e, "HSET num_budget_partitions"))?;
3019        client
3020            .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
3021            .await
3022            .map_err(|e| crate::server::backend_context(e, "HSET num_quota_partitions"))?;
3023        return Ok(());
3024    }
3025
3026    // Validate existing config matches
3027    let check = |field: &str, expected: u16| -> Result<(), ServerError> {
3028        let stored: u16 = existing
3029            .get(field)
3030            .and_then(|v| v.parse().ok())
3031            .unwrap_or(0);
3032        if stored != expected {
3033            return Err(ServerError::PartitionMismatch(format!(
3034                "{field}: stored={stored}, config={expected}. \
3035                 Partition counts are fixed at deployment time. \
3036                 Either fix your config or migrate the data."
3037            )));
3038        }
3039        Ok(())
3040    };
3041
3042    check("num_flow_partitions", config.num_flow_partitions)?;
3043    check("num_budget_partitions", config.num_budget_partitions)?;
3044    check("num_quota_partitions", config.num_quota_partitions)?;
3045
3046    tracing::info!("partition config validated against stored {key}");
3047    Ok(())
3048}
3049
3050// ── Waitpoint HMAC secret bootstrap (RFC-004 §Waitpoint Security) ──
3051
3052/// Stable initial kid written on first boot. Rotation promotes to k2, k3, ...
3053/// The kid is stored alongside the secret in every partition's hash so each
3054/// FCALL can self-identify which secret produced a given token.
3055const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3056
3057/// Per-partition outcome of the HMAC bootstrap step. Collected across the
3058/// parallel scan so we can emit aggregated logs once at the end.
3059enum PartitionBootOutcome {
3060    /// Partition already had a matching (kid, secret) pair.
3061    Match,
3062    /// Stored secret diverges from env — likely operator rotation; kept.
3063    Mismatch,
3064    /// Torn write (current_kid present, secret:<kid> missing); repaired.
3065    Repaired,
3066    /// Fresh partition; atomically installed env secret under kid=k1.
3067    Installed,
3068}
3069
3070/// Bounded in-flight concurrency for the startup fan-out. Large enough to
3071/// turn a 256-partition install from ~15s sequential into ~1s on cross-AZ
3072/// Valkey, small enough to leave a cold cluster breathing room for other
3073/// Server::start work (library load, engine scanner spawn).
3074const BOOT_INIT_CONCURRENCY: usize = 16;
3075
3076async fn init_one_partition(
3077    client: &Client,
3078    partition: Partition,
3079    secret_hex: &str,
3080) -> Result<PartitionBootOutcome, ServerError> {
3081    let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3082
3083    // Probe for an existing install. Fast path (fresh partition): HGET
3084    // returns nil and we fall through to the atomic install below. Slow
3085    // path: secret:<stored_kid> is then HGET'd once we know the kid name.
3086    // (A previous version used a 2-field HMGET that included a fake
3087    // `secret:probe` placeholder — vestigial from an abandoned
3088    // optimization attempt, and confusing to read. Collapsed back to a
3089    // single-field HGET.)
3090    let stored_kid: Option<String> = client
3091        .cmd("HGET")
3092        .arg(&key)
3093        .arg("current_kid")
3094        .execute()
3095        .await
3096        .map_err(|e| crate::server::backend_context(e, format!("HGET {key} current_kid (init probe)")))?;
3097
3098    if let Some(stored_kid) = stored_kid {
3099        // We didn't know the stored kid up front, so now HGET the real
3100        // secret:<stored_kid> field. Two round-trips in the slow path; the
3101        // fast path (fresh partition) stays at one.
3102        let field = format!("secret:{stored_kid}");
3103        let stored_secret: Option<String> = client
3104            .hget(&key, &field)
3105            .await
3106            .map_err(|e| crate::server::backend_context(e, format!("HGET {key} secret:<kid> (init check)")))?;
3107        if stored_secret.is_none() {
3108            // Torn write from a previous boot: current_kid present but
3109            // secret:<kid> missing. Without repair, mint returns
3110            // "hmac_secret_not_initialized" on that partition forever.
3111            // Repair in place with env secret. Not rotation — rotation
3112            // always writes the secret first.
3113            client
3114                .hset(&key, &field, secret_hex)
3115                .await
3116                .map_err(|e| crate::server::backend_context(e, format!("HSET {key} secret:<kid> (repair torn write)")))?;
3117            return Ok(PartitionBootOutcome::Repaired);
3118        }
3119        if stored_secret.as_deref() != Some(secret_hex) {
3120            return Ok(PartitionBootOutcome::Mismatch);
3121        }
3122        return Ok(PartitionBootOutcome::Match);
3123    }
3124
3125    // Fresh partition — install current_kid + secret:<kid> atomically in
3126    // one HSET. Multi-field HSET is single-command atomic, so a crash
3127    // can't leave current_kid without its secret.
3128    let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3129    let _: i64 = client
3130        .cmd("HSET")
3131        .arg(&key)
3132        .arg("current_kid")
3133        .arg(WAITPOINT_HMAC_INITIAL_KID)
3134        .arg(&secret_field)
3135        .arg(secret_hex)
3136        .execute()
3137        .await
3138        .map_err(|e| crate::server::backend_context(e, format!("HSET {key} (init waitpoint HMAC atomic)")))?;
3139    Ok(PartitionBootOutcome::Installed)
3140}
3141
3142/// Install the waitpoint HMAC secret on every execution partition.
3143///
3144/// Parallelized fan-out with bounded in-flight concurrency
3145/// (`BOOT_INIT_CONCURRENCY`) so 256-partition boots finish in ~1s instead
3146/// of ~15s sequential — the prior sequential loop was tight on K8s
3147/// `initialDelaySeconds=30` defaults, especially cross-AZ. Fail-fast:
3148/// the first per-partition error aborts boot.
3149///
3150/// Outcomes aggregate into mismatch/repaired counts (logged once at end)
3151/// so operators see a single loud warning per fault class instead of 256
3152/// per-partition lines.
3153async fn initialize_waitpoint_hmac_secret(
3154    client: &Client,
3155    partition_config: &PartitionConfig,
3156    secret_hex: &str,
3157) -> Result<(), ServerError> {
3158    use futures::stream::{FuturesUnordered, StreamExt};
3159
3160    let n = partition_config.num_flow_partitions;
3161    tracing::info!(
3162        partitions = n,
3163        concurrency = BOOT_INIT_CONCURRENCY,
3164        "installing waitpoint HMAC secret across {n} execution partitions"
3165    );
3166
3167    let mut mismatch_count: u16 = 0;
3168    let mut repaired_count: u16 = 0;
3169    let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3170    let mut next_index: u16 = 0;
3171
3172    loop {
3173        while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3174            let partition = Partition {
3175                family: PartitionFamily::Execution,
3176                index: next_index,
3177            };
3178            let client = client.clone();
3179            let secret_hex = secret_hex.to_owned();
3180            pending.push(async move {
3181                init_one_partition(&client, partition, &secret_hex).await
3182            });
3183            next_index += 1;
3184        }
3185        match pending.next().await {
3186            Some(res) => match res? {
3187                PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3188                PartitionBootOutcome::Mismatch => mismatch_count += 1,
3189                PartitionBootOutcome::Repaired => repaired_count += 1,
3190            },
3191            None => break,
3192        }
3193    }
3194
3195    if repaired_count > 0 {
3196        tracing::warn!(
3197            repaired_partitions = repaired_count,
3198            total_partitions = n,
3199            "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3200             (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3201        );
3202    }
3203
3204    if mismatch_count > 0 {
3205        tracing::warn!(
3206            mismatched_partitions = mismatch_count,
3207            total_partitions = n,
3208            "stored/env secret mismatch on {mismatch_count} partitions — \
3209             env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3210             run POST /v1/admin/rotate-waitpoint-secret to sync"
3211        );
3212    }
3213
3214    tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3215    Ok(())
3216}
3217
3218/// Result of a waitpoint HMAC secret rotation across all execution partitions.
3219#[derive(Debug, Clone, serde::Serialize)]
3220pub struct RotateWaitpointSecretResult {
3221    /// Count of partitions that accepted the rotation.
3222    pub rotated: u16,
3223    /// Partition indices that failed — operator should investigate (Valkey
3224    /// outage, auth failure, cluster split). Rotation is idempotent, so a
3225    /// re-run after the underlying fault clears converges to the correct
3226    /// state.
3227    pub failed: Vec<u16>,
3228    /// New kid installed as current.
3229    pub new_kid: String,
3230}
3231
3232impl Server {
3233    /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
3234    /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
3235    /// as the new current kid. Idempotent: re-running with the same `new_kid`
3236    /// and `new_secret_hex` converges partitions to the same state.
3237    ///
3238    /// Returns a structured result so operators can see which partitions failed.
3239    /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
3240    pub async fn rotate_waitpoint_secret(
3241        &self,
3242        new_kid: &str,
3243        new_secret_hex: &str,
3244    ) -> Result<RotateWaitpointSecretResult, ServerError> {
3245        if new_kid.is_empty() || new_kid.contains(':') {
3246            return Err(ServerError::OperationFailed(
3247                "new_kid must be non-empty and must not contain ':'".into(),
3248            ));
3249        }
3250        if new_secret_hex.is_empty()
3251            || !new_secret_hex.len().is_multiple_of(2)
3252            || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3253        {
3254            return Err(ServerError::OperationFailed(
3255                "new_secret_hex must be a non-empty even-length hex string".into(),
3256            ));
3257        }
3258
3259        // Single-writer gate. Concurrent rotates against the SAME operator
3260        // token are an attack pattern (or a retry-loop bug); legitimate
3261        // operators rotate monthly and can afford to serialize. Contention
3262        // returns ConcurrencyLimitExceeded("admin_rotate", 1) (→ HTTP 429
3263        // with a labelled error body) rather than queueing the HTTP
3264        // handler past the 120s endpoint timeout. Permit is held for
3265        // the full partition fan-out.
3266        let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3267            Ok(p) => p,
3268            Err(tokio::sync::TryAcquireError::NoPermits) => {
3269                return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3270            }
3271            Err(tokio::sync::TryAcquireError::Closed) => {
3272                return Err(ServerError::OperationFailed(
3273                    "admin rotate semaphore closed (server shutting down)".into(),
3274                ));
3275            }
3276        };
3277
3278        let n = self.config.partition_config.num_flow_partitions;
3279        // "now" is derived inside the FCALL from `redis.call("TIME")`
3280        // (consistency with validate_waitpoint_token and flow scanners);
3281        // grace_ms is a duration — safe to carry from config.
3282        let grace_ms = self.config.waitpoint_hmac_grace_ms;
3283
3284        // Parallelize the rotation fan-out with the same bounded
3285        // concurrency as boot init (BOOT_INIT_CONCURRENCY = 16). A 256-
3286        // partition sequential rotation takes ~7.7s at 30ms cross-AZ RTT,
3287        // uncomfortably close to the 120s HTTP endpoint timeout under
3288        // contention. Atomicity per partition now lives inside the
3289        // `ff_rotate_waitpoint_hmac_secret` FCALL (FCALL is atomic per
3290        // shard); parallelism across DIFFERENT partitions is safe. The
3291        // outer `admin_rotate_semaphore(1)` bounds server-wide concurrent
3292        // rotations, so this fan-out only affects a single in-flight
3293        // rotate call at a time.
3294        use futures::stream::{FuturesUnordered, StreamExt};
3295
3296        let mut rotated = 0u16;
3297        let mut failed = Vec::new();
3298        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3299        let mut next_index: u16 = 0;
3300
3301        loop {
3302            while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3303                let partition = Partition {
3304                    family: PartitionFamily::Execution,
3305                    index: next_index,
3306                };
3307                let idx = next_index;
3308                // Clone only what the per-partition future needs. The
3309                // new_kid / new_secret_hex references outlive the loop
3310                // (they come from the enclosing function args), but
3311                // FuturesUnordered needs 'static futures. Own the strings.
3312                let new_kid_owned = new_kid.to_owned();
3313                let new_secret_owned = new_secret_hex.to_owned();
3314                let partition_owned = partition;
3315                let fut = async move {
3316                    let outcome = self
3317                        .rotate_single_partition(
3318                            &partition_owned,
3319                            &new_kid_owned,
3320                            &new_secret_owned,
3321                            grace_ms,
3322                        )
3323                        .await;
3324                    (idx, partition_owned, outcome)
3325                };
3326                pending.push(fut);
3327                next_index += 1;
3328            }
3329            match pending.next().await {
3330                Some((idx, partition, outcome)) => match outcome {
3331                    Ok(()) => {
3332                        rotated += 1;
3333                        // Per-partition event → DEBUG (not INFO). Rationale:
3334                        // one rotate endpoint call produces 256 partition-level
3335                        // events, which would blow up paid aggregator budgets
3336                        // (Datadog/Splunk) at no operational value. The single
3337                        // aggregated audit event below is the compliance
3338                        // artifact. Failures stay at ERROR with per-partition
3339                        // detail — that's where operators need it.
3340                        tracing::debug!(
3341                            partition = %partition,
3342                            new_kid = %new_kid,
3343                            "waitpoint_hmac_rotated"
3344                        );
3345                    }
3346                    Err(e) => {
3347                        // Failures stay at ERROR (target=audit) per-partition —
3348                        // operators need the partition index + error to debug
3349                        // Valkey/config faults. Low cardinality in practice.
3350                        tracing::error!(
3351                            target: "audit",
3352                            partition = %partition,
3353                            err = %e,
3354                            "waitpoint_hmac_rotation_failed"
3355                        );
3356                        failed.push(idx);
3357                    }
3358                },
3359                None => break,
3360            }
3361        }
3362
3363        // Single aggregated audit event for the whole rotation. This is
3364        // the load-bearing compliance artifact — operators alert on
3365        // target="audit" at INFO level and this is the stable schema.
3366        tracing::info!(
3367            target: "audit",
3368            new_kid = %new_kid,
3369            total_partitions = n,
3370            rotated,
3371            failed_count = failed.len(),
3372            "waitpoint_hmac_rotation_complete"
3373        );
3374
3375        Ok(RotateWaitpointSecretResult {
3376            rotated,
3377            failed,
3378            new_kid: new_kid.to_owned(),
3379        })
3380    }
3381
3382    /// Rotate on a single partition by dispatching the
3383    /// `ff_rotate_waitpoint_hmac_secret` FCALL. FCALL is atomic per shard,
3384    /// so no external SETNX lock is needed — the script itself IS the
3385    /// atomicity boundary. Single source of truth (Lua); the Rust
3386    /// implementation that previously lived here was an exact duplicate
3387    /// and has been removed.
3388    async fn rotate_single_partition(
3389        &self,
3390        partition: &Partition,
3391        new_kid: &str,
3392        new_secret_hex: &str,
3393        grace_ms: u64,
3394    ) -> Result<(), ServerError> {
3395        let idx = IndexKeys::new(partition);
3396        let args = RotateWaitpointHmacSecretArgs {
3397            new_kid: new_kid.to_owned(),
3398            new_secret_hex: new_secret_hex.to_owned(),
3399            grace_ms,
3400        };
3401        let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3402            &self.client,
3403            &idx,
3404            &args,
3405        )
3406        .await
3407        .map_err(|e| match e {
3408            // Same kid + different secret. Map to the same 409-style
3409            // error the old Rust path returned so HTTP callers keep the
3410            // current surface.
3411            ff_script::ScriptError::RotationConflict(kid) => {
3412                ServerError::OperationFailed(format!(
3413                    "rotation conflict: kid {kid} already installed with a \
3414                     different secret. Either use a fresh kid or restore the \
3415                     original secret for this kid before retrying."
3416                ))
3417            }
3418            ff_script::ScriptError::Valkey(v) => crate::server::backend_context(
3419                v,
3420                format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3421            ),
3422            other => ServerError::OperationFailed(format!(
3423                "rotation failed on partition {partition}: {other}"
3424            )),
3425        })?;
3426        // Either outcome is a successful write from the operator's POV.
3427        // Rotated → new install; Noop → idempotent replay.
3428        let _ = outcome;
3429        Ok(())
3430    }
3431}
3432
3433// ── FCALL result parsing ──
3434
3435fn parse_create_result(
3436    raw: &Value,
3437    execution_id: &ExecutionId,
3438) -> Result<CreateExecutionResult, ServerError> {
3439    let arr = match raw {
3440        Value::Array(arr) => arr,
3441        _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3442    };
3443
3444    let status = match arr.first() {
3445        Some(Ok(Value::Int(n))) => *n,
3446        _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3447    };
3448
3449    if status == 1 {
3450        // Check sub-status: OK or DUPLICATE
3451        let sub = arr
3452            .get(1)
3453            .and_then(|v| match v {
3454                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3455                Ok(Value::SimpleString(s)) => Some(s.clone()),
3456                _ => None,
3457            })
3458            .unwrap_or_default();
3459
3460        if sub == "DUPLICATE" {
3461            Ok(CreateExecutionResult::Duplicate {
3462                execution_id: execution_id.clone(),
3463            })
3464        } else {
3465            Ok(CreateExecutionResult::Created {
3466                execution_id: execution_id.clone(),
3467                public_state: PublicState::Waiting,
3468            })
3469        }
3470    } else {
3471        let error_code = arr
3472            .get(1)
3473            .and_then(|v| match v {
3474                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3475                Ok(Value::SimpleString(s)) => Some(s.clone()),
3476                _ => None,
3477            })
3478            .unwrap_or_else(|| "unknown".to_owned());
3479        Err(ServerError::OperationFailed(format!(
3480            "ff_create_execution failed: {error_code}"
3481        )))
3482    }
3483}
3484
3485fn parse_cancel_result(
3486    raw: &Value,
3487    execution_id: &ExecutionId,
3488) -> Result<CancelExecutionResult, ServerError> {
3489    let arr = match raw {
3490        Value::Array(arr) => arr,
3491        _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3492    };
3493
3494    let status = match arr.first() {
3495        Some(Ok(Value::Int(n))) => *n,
3496        _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3497    };
3498
3499    if status == 1 {
3500        Ok(CancelExecutionResult::Cancelled {
3501            execution_id: execution_id.clone(),
3502            public_state: PublicState::Cancelled,
3503        })
3504    } else {
3505        let error_code = arr
3506            .get(1)
3507            .and_then(|v| match v {
3508                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3509                Ok(Value::SimpleString(s)) => Some(s.clone()),
3510                _ => None,
3511            })
3512            .unwrap_or_else(|| "unknown".to_owned());
3513        Err(ServerError::OperationFailed(format!(
3514            "ff_cancel_execution failed: {error_code}"
3515        )))
3516    }
3517}
3518
3519fn parse_budget_create_result(
3520    raw: &Value,
3521    budget_id: &BudgetId,
3522) -> Result<CreateBudgetResult, ServerError> {
3523    let arr = match raw {
3524        Value::Array(arr) => arr,
3525        _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3526    };
3527
3528    let status = match arr.first() {
3529        Some(Ok(Value::Int(n))) => *n,
3530        _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3531    };
3532
3533    if status == 1 {
3534        let sub = arr
3535            .get(1)
3536            .and_then(|v| match v {
3537                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3538                Ok(Value::SimpleString(s)) => Some(s.clone()),
3539                _ => None,
3540            })
3541            .unwrap_or_default();
3542
3543        if sub == "ALREADY_SATISFIED" {
3544            Ok(CreateBudgetResult::AlreadySatisfied {
3545                budget_id: budget_id.clone(),
3546            })
3547        } else {
3548            Ok(CreateBudgetResult::Created {
3549                budget_id: budget_id.clone(),
3550            })
3551        }
3552    } else {
3553        let error_code = arr
3554            .get(1)
3555            .and_then(|v| match v {
3556                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3557                Ok(Value::SimpleString(s)) => Some(s.clone()),
3558                _ => None,
3559            })
3560            .unwrap_or_else(|| "unknown".to_owned());
3561        Err(ServerError::OperationFailed(format!(
3562            "ff_create_budget failed: {error_code}"
3563        )))
3564    }
3565}
3566
3567fn parse_quota_create_result(
3568    raw: &Value,
3569    quota_policy_id: &QuotaPolicyId,
3570) -> Result<CreateQuotaPolicyResult, ServerError> {
3571    let arr = match raw {
3572        Value::Array(arr) => arr,
3573        _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3574    };
3575
3576    let status = match arr.first() {
3577        Some(Ok(Value::Int(n))) => *n,
3578        _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3579    };
3580
3581    if status == 1 {
3582        let sub = arr
3583            .get(1)
3584            .and_then(|v| match v {
3585                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3586                Ok(Value::SimpleString(s)) => Some(s.clone()),
3587                _ => None,
3588            })
3589            .unwrap_or_default();
3590
3591        if sub == "ALREADY_SATISFIED" {
3592            Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3593                quota_policy_id: quota_policy_id.clone(),
3594            })
3595        } else {
3596            Ok(CreateQuotaPolicyResult::Created {
3597                quota_policy_id: quota_policy_id.clone(),
3598            })
3599        }
3600    } else {
3601        let error_code = arr
3602            .get(1)
3603            .and_then(|v| match v {
3604                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3605                Ok(Value::SimpleString(s)) => Some(s.clone()),
3606                _ => None,
3607            })
3608            .unwrap_or_else(|| "unknown".to_owned());
3609        Err(ServerError::OperationFailed(format!(
3610            "ff_create_quota_policy failed: {error_code}"
3611        )))
3612    }
3613}
3614
3615// ── Flow FCALL result parsing ──
3616
3617fn parse_create_flow_result(
3618    raw: &Value,
3619    flow_id: &FlowId,
3620) -> Result<CreateFlowResult, ServerError> {
3621    let arr = match raw {
3622        Value::Array(arr) => arr,
3623        _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3624    };
3625    let status = match arr.first() {
3626        Some(Ok(Value::Int(n))) => *n,
3627        _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3628    };
3629    if status == 1 {
3630        let sub = fcall_field_str(arr, 1);
3631        if sub == "ALREADY_SATISFIED" {
3632            Ok(CreateFlowResult::AlreadySatisfied {
3633                flow_id: flow_id.clone(),
3634            })
3635        } else {
3636            Ok(CreateFlowResult::Created {
3637                flow_id: flow_id.clone(),
3638            })
3639        }
3640    } else {
3641        let error_code = fcall_field_str(arr, 1);
3642        Err(ServerError::OperationFailed(format!(
3643            "ff_create_flow failed: {error_code}"
3644        )))
3645    }
3646}
3647
3648fn parse_add_execution_to_flow_result(
3649    raw: &Value,
3650) -> Result<AddExecutionToFlowResult, ServerError> {
3651    let arr = match raw {
3652        Value::Array(arr) => arr,
3653        _ => {
3654            return Err(ServerError::Script(
3655                "ff_add_execution_to_flow: expected Array".into(),
3656            ))
3657        }
3658    };
3659    let status = match arr.first() {
3660        Some(Ok(Value::Int(n))) => *n,
3661        _ => {
3662            return Err(ServerError::Script(
3663                "ff_add_execution_to_flow: bad status code".into(),
3664            ))
3665        }
3666    };
3667    if status == 1 {
3668        let sub = fcall_field_str(arr, 1);
3669        let eid_str = fcall_field_str(arr, 2);
3670        let nc_str = fcall_field_str(arr, 3);
3671        let eid = ExecutionId::parse(&eid_str)
3672            .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3673        let nc: u32 = nc_str.parse().unwrap_or(0);
3674        if sub == "ALREADY_SATISFIED" {
3675            Ok(AddExecutionToFlowResult::AlreadyMember {
3676                execution_id: eid,
3677                node_count: nc,
3678            })
3679        } else {
3680            Ok(AddExecutionToFlowResult::Added {
3681                execution_id: eid,
3682                new_node_count: nc,
3683            })
3684        }
3685    } else {
3686        let error_code = fcall_field_str(arr, 1);
3687        Err(ServerError::OperationFailed(format!(
3688            "ff_add_execution_to_flow failed: {error_code}"
3689        )))
3690    }
3691}
3692
3693/// Outcome of parsing a raw `ff_cancel_flow` FCALL response.
3694///
3695/// Keeps `AlreadyTerminal` distinct from other script errors so the caller
3696/// can treat cancel on an already-cancelled/completed/failed flow as
3697/// idempotent success instead of surfacing a 400 to the client.
3698enum ParsedCancelFlow {
3699    Cancelled {
3700        policy: String,
3701        member_execution_ids: Vec<String>,
3702    },
3703    AlreadyTerminal,
3704}
3705
3706/// Parse the raw `ff_cancel_flow` FCALL response.
3707///
3708/// Returns [`ParsedCancelFlow::Cancelled`] on success, [`ParsedCancelFlow::AlreadyTerminal`]
3709/// when the flow was already in a terminal state (idempotent retry), or a
3710/// [`ServerError`] for any other failure.
3711fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3712    let arr = match raw {
3713        Value::Array(arr) => arr,
3714        _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3715    };
3716    let status = match arr.first() {
3717        Some(Ok(Value::Int(n))) => *n,
3718        _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3719    };
3720    if status != 1 {
3721        let error_code = fcall_field_str(arr, 1);
3722        if error_code == "flow_already_terminal" {
3723            return Ok(ParsedCancelFlow::AlreadyTerminal);
3724        }
3725        return Err(ServerError::OperationFailed(format!(
3726            "ff_cancel_flow failed: {error_code}"
3727        )));
3728    }
3729    // {1, "OK", cancellation_policy, member1, member2, ...}
3730    let policy = fcall_field_str(arr, 2);
3731    // Iterate to arr.len() rather than breaking on the first empty string —
3732    // safer against malformed Lua responses and clearer than a sentinel loop.
3733    let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3734    for i in 3..arr.len() {
3735        members.push(fcall_field_str(arr, i));
3736    }
3737    Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3738}
3739
3740fn parse_stage_dependency_edge_result(
3741    raw: &Value,
3742) -> Result<StageDependencyEdgeResult, ServerError> {
3743    let arr = match raw {
3744        Value::Array(arr) => arr,
3745        _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3746    };
3747    let status = match arr.first() {
3748        Some(Ok(Value::Int(n))) => *n,
3749        _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3750    };
3751    if status == 1 {
3752        let edge_id_str = fcall_field_str(arr, 2);
3753        let rev_str = fcall_field_str(arr, 3);
3754        let edge_id = EdgeId::parse(&edge_id_str)
3755            .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3756        let rev: u64 = rev_str.parse().unwrap_or(0);
3757        Ok(StageDependencyEdgeResult::Staged {
3758            edge_id,
3759            new_graph_revision: rev,
3760        })
3761    } else {
3762        let error_code = fcall_field_str(arr, 1);
3763        Err(ServerError::OperationFailed(format!(
3764            "ff_stage_dependency_edge failed: {error_code}"
3765        )))
3766    }
3767}
3768
3769fn parse_apply_dependency_result(
3770    raw: &Value,
3771) -> Result<ApplyDependencyToChildResult, ServerError> {
3772    let arr = match raw {
3773        Value::Array(arr) => arr,
3774        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3775    };
3776    let status = match arr.first() {
3777        Some(Ok(Value::Int(n))) => *n,
3778        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3779    };
3780    if status == 1 {
3781        let sub = fcall_field_str(arr, 1);
3782        if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3783            Ok(ApplyDependencyToChildResult::AlreadyApplied)
3784        } else {
3785            // OK status — field at index 2 is unsatisfied count
3786            let count_str = fcall_field_str(arr, 2);
3787            let count: u32 = count_str.parse().unwrap_or(0);
3788            Ok(ApplyDependencyToChildResult::Applied {
3789                unsatisfied_count: count,
3790            })
3791        }
3792    } else {
3793        let error_code = fcall_field_str(arr, 1);
3794        Err(ServerError::OperationFailed(format!(
3795            "ff_apply_dependency_to_child failed: {error_code}"
3796        )))
3797    }
3798}
3799
3800fn parse_deliver_signal_result(
3801    raw: &Value,
3802    signal_id: &SignalId,
3803) -> Result<DeliverSignalResult, ServerError> {
3804    let arr = match raw {
3805        Value::Array(arr) => arr,
3806        _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3807    };
3808    let status = match arr.first() {
3809        Some(Ok(Value::Int(n))) => *n,
3810        _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3811    };
3812    if status == 1 {
3813        let sub = fcall_field_str(arr, 1);
3814        if sub == "DUPLICATE" {
3815            // ok_duplicate(existing_signal_id) → {1, "DUPLICATE", existing_signal_id}
3816            let existing_str = fcall_field_str(arr, 2);
3817            let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3818            Ok(DeliverSignalResult::Duplicate {
3819                existing_signal_id: existing_id,
3820            })
3821        } else {
3822            // ok(signal_id, effect) → {1, "OK", signal_id, effect}
3823            let effect = fcall_field_str(arr, 3);
3824            Ok(DeliverSignalResult::Accepted {
3825                signal_id: signal_id.clone(),
3826                effect,
3827            })
3828        }
3829    } else {
3830        let error_code = fcall_field_str(arr, 1);
3831        Err(ServerError::OperationFailed(format!(
3832            "ff_deliver_signal failed: {error_code}"
3833        )))
3834    }
3835}
3836
3837fn parse_change_priority_result(
3838    raw: &Value,
3839    execution_id: &ExecutionId,
3840) -> Result<ChangePriorityResult, ServerError> {
3841    let arr = match raw {
3842        Value::Array(arr) => arr,
3843        _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3844    };
3845    let status = match arr.first() {
3846        Some(Ok(Value::Int(n))) => *n,
3847        _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3848    };
3849    if status == 1 {
3850        Ok(ChangePriorityResult::Changed {
3851            execution_id: execution_id.clone(),
3852        })
3853    } else {
3854        let error_code = fcall_field_str(arr, 1);
3855        Err(ServerError::OperationFailed(format!(
3856            "ff_change_priority failed: {error_code}"
3857        )))
3858    }
3859}
3860
3861fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3862    let arr = match raw {
3863        Value::Array(arr) => arr,
3864        _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3865    };
3866    let status = match arr.first() {
3867        Some(Ok(Value::Int(n))) => *n,
3868        _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3869    };
3870    if status == 1 {
3871        // ok("0") for normal replay, ok(N) for skipped flow member
3872        let unsatisfied = fcall_field_str(arr, 2);
3873        let ps = if unsatisfied == "0" {
3874            PublicState::Waiting
3875        } else {
3876            PublicState::WaitingChildren
3877        };
3878        Ok(ReplayExecutionResult::Replayed { public_state: ps })
3879    } else {
3880        let error_code = fcall_field_str(arr, 1);
3881        Err(ServerError::OperationFailed(format!(
3882            "ff_replay_execution failed: {error_code}"
3883        )))
3884    }
3885}
3886
3887/// Extract a string from an FCALL result array at the given index.
3888/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
3889/// for transport-level variants. Business-logic variants keep their code as
3890/// `ServerError::Script(String)` so HTTP clients see a stable message.
3891///
3892/// Why this exists: before R2, the stream handlers did
3893/// `ScriptError → format!() → ServerError::Script(String)`, which erased
3894/// the ErrorKind and made `ServerError::is_retryable()` always return
3895/// false. Retry-capable clients (cairn-fabric) would not retry a legit
3896/// transient error like `IoError`.
3897fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3898    match e {
3899        ff_script::error::ScriptError::Valkey(valkey_err) => {
3900            crate::server::backend_context(valkey_err, "stream FCALL transport")
3901        }
3902        other => ServerError::Script(other.to_string()),
3903    }
3904}
3905
3906fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3907    match arr.get(index) {
3908        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3909        Some(Ok(Value::SimpleString(s))) => s.clone(),
3910        Some(Ok(Value::Int(n))) => n.to_string(),
3911        _ => String::new(),
3912    }
3913}
3914
3915/// Parse ff_report_usage_and_check result.
3916/// Standard format: {1, "OK"}, {1, "SOFT_BREACH", dim, current, limit},
3917///                  {1, "HARD_BREACH", dim, current, limit}, {1, "ALREADY_APPLIED"}
3918fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3919    let arr = match raw {
3920        Value::Array(arr) => arr,
3921        _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3922    };
3923    let status_code = match arr.first() {
3924        Some(Ok(Value::Int(n))) => *n,
3925        _ => {
3926            return Err(ServerError::Script(
3927                "ff_report_usage_and_check: expected Int status code".into(),
3928            ))
3929        }
3930    };
3931    if status_code != 1 {
3932        let error_code = fcall_field_str(arr, 1);
3933        return Err(ServerError::OperationFailed(format!(
3934            "ff_report_usage_and_check failed: {error_code}"
3935        )));
3936    }
3937    let sub_status = fcall_field_str(arr, 1);
3938    match sub_status.as_str() {
3939        "OK" => Ok(ReportUsageResult::Ok),
3940        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3941        "SOFT_BREACH" => {
3942            let dim = fcall_field_str(arr, 2);
3943            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3944            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3945            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3946        }
3947        "HARD_BREACH" => {
3948            let dim = fcall_field_str(arr, 2);
3949            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3950            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3951            Ok(ReportUsageResult::HardBreach {
3952                dimension: dim,
3953                current_usage: current,
3954                hard_limit: limit,
3955            })
3956        }
3957        _ => Err(ServerError::OperationFailed(format!(
3958            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3959        ))),
3960    }
3961}
3962
3963fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3964    let arr = match raw {
3965        Value::Array(arr) => arr,
3966        _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3967    };
3968    let status = match arr.first() {
3969        Some(Ok(Value::Int(n))) => *n,
3970        _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3971    };
3972    if status == 1 {
3973        let sub = fcall_field_str(arr, 1);
3974        if sub == "ALREADY_SATISFIED" {
3975            let reason = fcall_field_str(arr, 2);
3976            Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3977        } else {
3978            let lid = fcall_field_str(arr, 2);
3979            let epoch = fcall_field_str(arr, 3);
3980            Ok(RevokeLeaseResult::Revoked {
3981                lease_id: lid,
3982                lease_epoch: epoch,
3983            })
3984        }
3985    } else {
3986        let error_code = fcall_field_str(arr, 1);
3987        Err(ServerError::OperationFailed(format!(
3988            "ff_revoke_lease failed: {error_code}"
3989        )))
3990    }
3991}
3992
3993/// Detect Valkey errors indicating the Lua function library is not loaded.
3994///
3995/// After a failover, the new primary may not have the library if replication
3996/// was incomplete. Valkey returns `ERR Function not loaded` for FCALL calls
3997/// targeting missing functions.
3998fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3999    if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
4000        return true;
4001    }
4002    e.detail()
4003        .map(|d| {
4004            d.contains("Function not loaded")
4005                || d.contains("No matching function")
4006                || d.contains("function not found")
4007        })
4008        .unwrap_or(false)
4009        || e.to_string().contains("Function not loaded")
4010}
4011
4012/// Free-function form of [`Server::fcall_with_reload`] — callable from
4013/// background tasks that own a cloned `Client` but no `&Server`.
4014async fn fcall_with_reload_on_client(
4015    client: &Client,
4016    function: &str,
4017    keys: &[&str],
4018    args: &[&str],
4019) -> Result<Value, ServerError> {
4020    match client.fcall(function, keys, args).await {
4021        Ok(v) => Ok(v),
4022        Err(e) if is_function_not_loaded(&e) => {
4023            tracing::warn!(function, "Lua library not found on server, reloading");
4024            ff_script::loader::ensure_library(client)
4025                .await
4026                .map_err(ServerError::LibraryLoad)?;
4027            client
4028                .fcall(function, keys, args)
4029                .await
4030                .map_err(ServerError::from)
4031        }
4032        Err(e) => Err(ServerError::from(e)),
4033    }
4034}
4035
4036/// Build the `ff_cancel_execution` KEYS (21) and ARGV (5) by pre-reading
4037/// dynamic fields from `exec_core`. Shared by [`Server::cancel_execution`]
4038/// and the async cancel_flow member-dispatch path.
4039async fn build_cancel_execution_fcall(
4040    client: &Client,
4041    partition_config: &PartitionConfig,
4042    args: &CancelExecutionArgs,
4043) -> Result<(Vec<String>, Vec<String>), ServerError> {
4044    let partition = execution_partition(&args.execution_id, partition_config);
4045    let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4046    let idx = IndexKeys::new(&partition);
4047
4048    let lane_str: Option<String> = client
4049        .hget(&ctx.core(), "lane_id")
4050        .await
4051        .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
4052    let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4053
4054    let dyn_fields: Vec<Option<String>> = client
4055        .cmd("HMGET")
4056        .arg(ctx.core())
4057        .arg("current_attempt_index")
4058        .arg("current_waitpoint_id")
4059        .arg("current_worker_instance_id")
4060        .execute()
4061        .await
4062        .map_err(|e| crate::server::backend_context(e, "HMGET cancel pre-read"))?;
4063
4064    let att_idx_val = dyn_fields.first()
4065        .and_then(|v| v.as_ref())
4066        .and_then(|s| s.parse::<u32>().ok())
4067        .unwrap_or(0);
4068    let att_idx = AttemptIndex::new(att_idx_val);
4069    let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4070    let wp_id = if wp_id_str.is_empty() {
4071        WaitpointId::new()
4072    } else {
4073        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4074    };
4075    let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4076    let wiid = WorkerInstanceId::new(&wiid_str);
4077
4078    let keys: Vec<String> = vec![
4079        ctx.core(),                              // 1
4080        ctx.attempt_hash(att_idx),               // 2
4081        ctx.stream_meta(att_idx),                // 3
4082        ctx.lease_current(),                     // 4
4083        ctx.lease_history(),                     // 5
4084        idx.lease_expiry(),                      // 6
4085        idx.worker_leases(&wiid),                // 7
4086        ctx.suspension_current(),                // 8
4087        ctx.waitpoint(&wp_id),                   // 9
4088        ctx.waitpoint_condition(&wp_id),         // 10
4089        idx.suspension_timeout(),                // 11
4090        idx.lane_terminal(&lane),                // 12
4091        idx.attempt_timeout(),                   // 13
4092        idx.execution_deadline(),                // 14
4093        idx.lane_eligible(&lane),                // 15
4094        idx.lane_delayed(&lane),                 // 16
4095        idx.lane_blocked_dependencies(&lane),    // 17
4096        idx.lane_blocked_budget(&lane),          // 18
4097        idx.lane_blocked_quota(&lane),           // 19
4098        idx.lane_blocked_route(&lane),           // 20
4099        idx.lane_blocked_operator(&lane),        // 21
4100    ];
4101    let argv: Vec<String> = vec![
4102        args.execution_id.to_string(),
4103        args.reason.clone(),
4104        args.source.to_string(),
4105        args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4106        args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4107    ];
4108    Ok((keys, argv))
4109}
4110
4111/// Backoff schedule for transient Valkey errors during async cancel_flow
4112/// dispatch. Length = retry-attempt count (including the initial attempt).
4113/// The last entry is not slept on because it's the final attempt.
4114const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4115
4116/// Reach into a `ServerError` for a transport-layer retryability hint.
4117/// Matches the variants that can carry a backend transport fault:
4118/// direct `Backend`, `BackendContext`, and `LibraryLoad` (which
4119/// wraps a ferriskey error internally).
4120///
4121/// Renamed from `extract_valkey_kind` in #88 — the previous return
4122/// type `Option<ferriskey::ErrorKind>` leaked ferriskey into the
4123/// server's internal retry logic. Callers now get a backend-agnostic
4124/// [`BackendErrorKind`] and dispatch via
4125/// [`BackendErrorKind::is_retryable`].
4126fn extract_backend_kind(e: &ServerError) -> Option<ff_core::BackendErrorKind> {
4127    e.backend_kind()
4128}
4129
4130/// Cancel a single member execution from a cancel_flow dispatch context.
4131/// Parses the flow-member EID string, builds the FCALL via the shared helper,
4132/// and executes with the same reload-on-failover semantics as the inline path.
4133///
4134/// Wrapped in a bounded retry loop (see [`CANCEL_MEMBER_RETRY_DELAYS_MS`]) so
4135/// that transient Valkey errors mid-dispatch (failover, `TryAgain`,
4136/// `ClusterDown`, `IoError`, `FatalSendError`) do not silently leak
4137/// non-cancelled members. `FatalReceiveError` and non-retryable kinds bubble
4138/// up immediately — those either indicate the Lua ran server-side anyway or a
4139/// permanent mismatch that retries cannot fix.
4140/// Acknowledge that a member cancel has committed. Fires
4141/// `ff_ack_cancel_member` on `{fp:N}` to SREM the execution from the
4142/// flow's `pending_cancels` set and, if empty, ZREM the flow from the
4143/// partition-level `cancel_backlog`. Best-effort — failures are logged
4144/// but not propagated, since the reconciler will catch anything that
4145/// stays behind on its next pass.
4146async fn ack_cancel_member(
4147    client: &Client,
4148    pending_cancels_key: &str,
4149    cancel_backlog_key: &str,
4150    eid_str: &str,
4151    flow_id: &str,
4152) {
4153    let keys = [pending_cancels_key, cancel_backlog_key];
4154    let args_v = [eid_str, flow_id];
4155    let fut: Result<Value, _> =
4156        client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4157    if let Err(e) = fut {
4158        tracing::warn!(
4159            flow_id = %flow_id,
4160            execution_id = %eid_str,
4161            error = %e,
4162            "ff_ack_cancel_member failed; reconciler will drain on next pass"
4163        );
4164    }
4165}
4166
4167/// Returns true if a cancel_member failure reflects an already-terminal
4168/// (or never-existed) execution, which from the flow-cancel perspective
4169/// is ack-worthy success rather than a partial failure. The Lua
4170/// `ff_cancel_execution` function emits `execution_not_active` when the
4171/// member is already in a terminal phase, and `execution_not_found` when
4172/// the key is gone. Both codes arrive here wrapped in
4173/// `ServerError::OperationFailed("ff_cancel_execution failed: <code>")`
4174/// via `parse_cancel_result`.
4175fn is_terminal_ack_error(err: &ServerError) -> bool {
4176    match err {
4177        ServerError::OperationFailed(msg) => {
4178            msg.contains("execution_not_active") || msg.contains("execution_not_found")
4179        }
4180        _ => false,
4181    }
4182}
4183
4184async fn cancel_member_execution(
4185    client: &Client,
4186    partition_config: &PartitionConfig,
4187    eid_str: &str,
4188    reason: &str,
4189    now: TimestampMs,
4190) -> Result<(), ServerError> {
4191    let execution_id = ExecutionId::parse(eid_str)
4192        .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4193    let args = CancelExecutionArgs {
4194        execution_id: execution_id.clone(),
4195        reason: reason.to_owned(),
4196        source: CancelSource::OperatorOverride,
4197        lease_id: None,
4198        lease_epoch: None,
4199        attempt_id: None,
4200        now,
4201    };
4202
4203    let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4204    for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4205        let is_last = attempt_idx + 1 == attempts;
4206        match try_cancel_member_once(client, partition_config, &args).await {
4207            Ok(()) => return Ok(()),
4208            Err(e) => {
4209                // Only retry transport-layer transients; business-logic
4210                // errors (Script / OperationFailed / NotFound / InvalidInput)
4211                // won't change on retry.
4212                let retryable = extract_backend_kind(&e)
4213                    .map(|k| k.is_retryable())
4214                    .unwrap_or(false);
4215                if !retryable || is_last {
4216                    return Err(e);
4217                }
4218                tracing::debug!(
4219                    execution_id = %execution_id,
4220                    attempt = attempt_idx + 1,
4221                    delay_ms = *delay_ms,
4222                    error = %e,
4223                    "cancel_member_execution: transient error, retrying"
4224                );
4225                tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4226            }
4227        }
4228    }
4229    // Unreachable: the loop above either returns Ok, returns Err on the
4230    // last attempt, or returns Err on a non-retryable error. Keep a
4231    // defensive fallback for future edits to the retry structure.
4232    Err(ServerError::OperationFailed(format!(
4233        "cancel_member_execution: retries exhausted for {execution_id}"
4234    )))
4235}
4236
4237/// Single cancel attempt — pre-read + FCALL + parse. Factored out so the
4238/// retry loop in [`cancel_member_execution`] can invoke it cleanly.
4239async fn try_cancel_member_once(
4240    client: &Client,
4241    partition_config: &PartitionConfig,
4242    args: &CancelExecutionArgs,
4243) -> Result<(), ServerError> {
4244    let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4245    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4246    let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4247    let raw =
4248        fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4249    parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4250}
4251
4252fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4253    let arr = match raw {
4254        Value::Array(arr) => arr,
4255        _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4256    };
4257    let status = match arr.first() {
4258        Some(Ok(Value::Int(n))) => *n,
4259        _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4260    };
4261    if status == 1 {
4262        let next_str = fcall_field_str(arr, 2);
4263        let next_ms: i64 = next_str.parse().unwrap_or(0);
4264        Ok(ResetBudgetResult::Reset {
4265            next_reset_at: TimestampMs::from_millis(next_ms),
4266        })
4267    } else {
4268        let error_code = fcall_field_str(arr, 1);
4269        Err(ServerError::OperationFailed(format!(
4270            "ff_reset_budget failed: {error_code}"
4271        )))
4272    }
4273}
4274
4275#[cfg(test)]
4276mod tests {
4277    use super::*;
4278    use ferriskey::ErrorKind;
4279
4280    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4281        ferriskey::Error::from((kind, "synthetic"))
4282    }
4283
4284    // ── Budget dimension-cap validation (issue #104) ──
4285
4286    #[test]
4287    fn create_budget_rejects_over_cap_dimension_count() {
4288        let n = MAX_BUDGET_DIMENSIONS + 1;
4289        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4290        let hard = vec![1u64; n];
4291        let soft = vec![0u64; n];
4292        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4293        match err {
4294            ServerError::InvalidInput(msg) => {
4295                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4296                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4297                assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4298            }
4299            other => panic!("expected InvalidInput, got {other:?}"),
4300        }
4301    }
4302
4303    #[test]
4304    fn create_budget_accepts_exactly_cap_dimensions() {
4305        let n = MAX_BUDGET_DIMENSIONS;
4306        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4307        let hard = vec![1u64; n];
4308        let soft = vec![0u64; n];
4309        assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4310    }
4311
4312    #[test]
4313    fn create_budget_rejects_hard_limit_length_mismatch() {
4314        let dims = vec!["a".to_string(), "b".to_string()];
4315        let hard = vec![1u64]; // too short
4316        let soft = vec![0u64, 0u64];
4317        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4318        match err {
4319            ServerError::InvalidInput(msg) => {
4320                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4321                assert!(msg.contains("hard_limits=1"), "got: {msg}");
4322                assert!(msg.contains("dimensions=2"), "got: {msg}");
4323            }
4324            other => panic!("expected InvalidInput, got {other:?}"),
4325        }
4326    }
4327
4328    #[test]
4329    fn create_budget_rejects_soft_limit_length_mismatch() {
4330        let dims = vec!["a".to_string(), "b".to_string()];
4331        let hard = vec![1u64, 2u64];
4332        let soft = vec![0u64, 0u64, 0u64]; // too long
4333        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4334        match err {
4335            ServerError::InvalidInput(msg) => {
4336                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4337                assert!(msg.contains("soft_limits=3"), "got: {msg}");
4338            }
4339            other => panic!("expected InvalidInput, got {other:?}"),
4340        }
4341    }
4342
4343    #[test]
4344    fn report_usage_rejects_over_cap_dimension_count() {
4345        let n = MAX_BUDGET_DIMENSIONS + 1;
4346        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4347        let deltas = vec![1u64; n];
4348        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4349        match err {
4350            ServerError::InvalidInput(msg) => {
4351                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4352                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4353            }
4354            other => panic!("expected InvalidInput, got {other:?}"),
4355        }
4356    }
4357
4358    #[test]
4359    fn report_usage_accepts_exactly_cap_dimensions() {
4360        let n = MAX_BUDGET_DIMENSIONS;
4361        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4362        let deltas = vec![1u64; n];
4363        assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4364    }
4365
4366    #[test]
4367    fn report_usage_rejects_delta_length_mismatch() {
4368        let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4369        let deltas = vec![1u64, 2u64]; // too short
4370        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4371        match err {
4372            ServerError::InvalidInput(msg) => {
4373                assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4374                assert!(msg.contains("dimensions=3"), "got: {msg}");
4375                assert!(msg.contains("deltas=2"), "got: {msg}");
4376            }
4377            other => panic!("expected InvalidInput, got {other:?}"),
4378        }
4379    }
4380
4381    #[test]
4382    fn report_usage_accepts_empty_dimensions() {
4383        // Edge case: zero-dimension report_usage is a no-op that should pass
4384        // validation (Lua handles dim_count=0 correctly).
4385        assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4386    }
4387
4388    #[test]
4389    fn is_retryable_backend_variant_uses_kind_table() {
4390        // Transport-bucketed: retryable.
4391        assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
4392        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4393        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4394        // Cluster-bucketed (Moved / Ask / TryAgain / ClusterDown): retryable
4395        // after topology settles — the #88 BackendErrorKind classifier
4396        // treats these as transient cluster-churn, a semantic refinement
4397        // over the previous ff-script retry table.
4398        assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4399        assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4400        assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
4401        assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
4402        // BusyLoading: retryable.
4403        assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4404
4405        // Auth / Protocol / ScriptNotLoaded: terminal.
4406        assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4407        assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4408        assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4409    }
4410
4411    #[test]
4412    fn is_retryable_backend_context_uses_kind_table() {
4413        let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
4414        assert!(err.is_retryable());
4415
4416        let err =
4417            crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
4418        assert!(!err.is_retryable());
4419    }
4420
4421    #[test]
4422    fn is_retryable_library_load_delegates_to_inner_kind() {
4423        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4424            mk_fk_err(ErrorKind::IoError),
4425        ));
4426        assert!(err.is_retryable());
4427
4428        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4429            mk_fk_err(ErrorKind::AuthenticationFailed),
4430        ));
4431        assert!(!err.is_retryable());
4432
4433        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4434            expected: "1".into(),
4435            got: "2".into(),
4436        });
4437        assert!(!err.is_retryable());
4438    }
4439
4440    #[test]
4441    fn is_retryable_business_logic_variants_are_false() {
4442        assert!(!ServerError::NotFound("x".into()).is_retryable());
4443        assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4444        assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4445        assert!(!ServerError::Script("x".into()).is_retryable());
4446        assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4447    }
4448
4449    #[test]
4450    fn backend_kind_delegates_through_library_load() {
4451        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4452            mk_fk_err(ErrorKind::ClusterDown),
4453        ));
4454        assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
4455
4456        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4457            expected: "1".into(),
4458            got: "2".into(),
4459        });
4460        assert_eq!(err.backend_kind(), None);
4461    }
4462
4463    // ── Valkey version check (RFC-011 §13) ──
4464
4465    #[test]
4466    fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4467        // Valkey 8.x+ pins redis_version to 7.2.4 for Redis-client compat
4468        // and exposes the real version in valkey_version. Parser must use
4469        // valkey_version when both are present.
4470        let info = "\
4471# Server\r\n\
4472redis_version:7.2.4\r\n\
4473valkey_version:9.0.3\r\n\
4474server_mode:cluster\r\n\
4475os:Linux\r\n";
4476        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4477    }
4478
4479    #[test]
4480    fn parse_valkey_version_real_valkey_8_cluster_body() {
4481        // Actual INFO server response observed on valkey/valkey:latest in
4482        // cluster mode (CI matrix): redis_version compat-pinned to 7.2.4,
4483        // valkey_version authoritative at 9.0.3.
4484        let info = "\
4485# Server\r\n\
4486redis_version:7.2.4\r\n\
4487server_name:valkey\r\n\
4488valkey_version:9.0.3\r\n\
4489valkey_release_stage:ga\r\n\
4490redis_git_sha1:00000000\r\n\
4491server_mode:cluster\r\n";
4492        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4493    }
4494
4495    #[test]
4496    fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4497        // Valkey 7.x doesn't emit valkey_version, but does emit
4498        // server_name:valkey; parser falls back to redis_version.
4499        let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4500        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4501    }
4502
4503    #[test]
4504    fn parse_valkey_version_rejects_redis_backend() {
4505        // Real Redis emits redis_version: but not server_name:valkey and
4506        // not valkey_version:. Parser must reject this affirmatively so an
4507        // operator pointing ff-server at a Redis instance fails loud instead
4508        // of silently running against an unsupported backend.
4509        let info = "\
4510# Server\r\n\
4511redis_version:7.4.0\r\n\
4512redis_mode:standalone\r\n\
4513os:Linux\r\n";
4514        let err = parse_valkey_version(info).unwrap_err();
4515        assert!(matches!(err, ServerError::OperationFailed(_)));
4516        let msg = err.to_string();
4517        assert!(
4518            msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4519            "expected Redis-rejection message, got: {msg}"
4520        );
4521    }
4522
4523    #[test]
4524    fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4525        // INFO values are conventionally lowercase but be defensive.
4526        let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4527        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4528    }
4529
4530    #[test]
4531    fn parse_valkey_version_errors_when_no_version_field() {
4532        let info = "# Server\r\nfoo:bar\r\n";
4533        let err = parse_valkey_version(info).unwrap_err();
4534        assert!(matches!(err, ServerError::OperationFailed(_)));
4535        assert!(
4536            err.to_string().contains("missing"),
4537            "expected 'missing' in message, got: {err}"
4538        );
4539    }
4540
4541    #[test]
4542    fn parse_valkey_version_errors_on_non_numeric_major() {
4543        let info = "valkey_version:invalid.x.y\n";
4544        let err = parse_valkey_version(info).unwrap_err();
4545        assert!(matches!(err, ServerError::OperationFailed(_)));
4546        assert!(err.to_string().contains("non-numeric major"));
4547    }
4548
4549    #[test]
4550    fn parse_valkey_version_errors_on_non_numeric_minor() {
4551        let info = "valkey_version:7.x.0\n";
4552        let err = parse_valkey_version(info).unwrap_err();
4553        assert!(matches!(err, ServerError::OperationFailed(_)));
4554        assert!(err.to_string().contains("non-numeric minor"));
4555    }
4556
4557    #[test]
4558    fn parse_valkey_version_errors_on_missing_minor() {
4559        // Bare major (no dot) — cannot be compared against a (major, minor)
4560        // floor. Flag as a real parse error.
4561        let info = "valkey_version:7\n";
4562        let err = parse_valkey_version(info).unwrap_err();
4563        assert!(matches!(err, ServerError::OperationFailed(_)));
4564        assert!(err.to_string().contains("missing minor"));
4565    }
4566
4567    #[test]
4568    fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4569        // Simulates cluster-mode INFO response: map of node_addr → body.
4570        // extract_info_bodies must return EVERY node's body so a stale
4571        // pre-upgrade replica cannot hide behind an upgraded primary.
4572        let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4573        let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4574        let map = Value::Map(vec![
4575            (
4576                Value::SimpleString("127.0.0.1:7000".to_string()),
4577                Value::VerbatimString {
4578                    format: ferriskey::value::VerbatimFormat::Text,
4579                    text: body_a.to_string(),
4580                },
4581            ),
4582            (
4583                Value::SimpleString("127.0.0.1:7001".to_string()),
4584                Value::VerbatimString {
4585                    format: ferriskey::value::VerbatimFormat::Text,
4586                    text: body_b.to_string(),
4587                },
4588            ),
4589        ]);
4590        let bodies = extract_info_bodies(&map).unwrap();
4591        assert_eq!(bodies.len(), 2);
4592        assert_eq!(bodies[0], body_a);
4593        assert_eq!(bodies[1], body_b);
4594    }
4595
4596    #[test]
4597    fn extract_info_bodies_handles_simple_string() {
4598        let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4599        let v = Value::SimpleString(body_text.to_string());
4600        let bodies = extract_info_bodies(&v).unwrap();
4601        assert_eq!(bodies, vec![body_text.to_string()]);
4602    }
4603
4604    #[test]
4605    fn extract_info_bodies_rejects_empty_cluster_map() {
4606        let map = Value::Map(vec![]);
4607        let err = extract_info_bodies(&map).unwrap_err();
4608        assert!(matches!(err, ServerError::OperationFailed(_)));
4609        assert!(err.to_string().contains("empty map"));
4610    }
4611
4612    /// End-to-end composition test for the cluster-min fix (issue #84):
4613    /// `extract_info_bodies` → `parse_valkey_version` per node → min-reduce →
4614    /// floor comparison. A mixed-version cluster where one node is 7.1.0 must
4615    /// fail the gate, even if another node is already on 8.0.0 and that
4616    /// node's entry appears first in the map.
4617    #[test]
4618    fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4619        // node1 appears first and is above the floor. Pre-fix behavior
4620        // (first-entry only) would accept. The min across all three nodes is
4621        // (7, 1), below the (7, 2) floor, so the gate must reject.
4622        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4623        let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4624        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4625        let map = Value::Map(vec![
4626            (
4627                Value::SimpleString("node1:6379".to_string()),
4628                Value::VerbatimString {
4629                    format: ferriskey::value::VerbatimFormat::Text,
4630                    text: body_node1.to_string(),
4631                },
4632            ),
4633            (
4634                Value::SimpleString("node2:6379".to_string()),
4635                Value::VerbatimString {
4636                    format: ferriskey::value::VerbatimFormat::Text,
4637                    text: body_node2.to_string(),
4638                },
4639            ),
4640            (
4641                Value::SimpleString("node3:6379".to_string()),
4642                Value::VerbatimString {
4643                    format: ferriskey::value::VerbatimFormat::Text,
4644                    text: body_node3.to_string(),
4645                },
4646            ),
4647        ]);
4648
4649        let bodies = extract_info_bodies(&map).unwrap();
4650        let min = bodies
4651            .iter()
4652            .map(|b| parse_valkey_version(b).unwrap())
4653            .min()
4654            .unwrap();
4655
4656        assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4657        assert!(
4658            min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4659            "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4660        );
4661    }
4662
4663    /// Companion to `parse_valkey_version_min_across_cluster_map_picks_lowest`:
4664    /// when every node is at or above the floor, the min-reduce + gate
4665    /// composition accepts.
4666    #[test]
4667    fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4668        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4669        let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4670        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4671        let map = Value::Map(vec![
4672            (
4673                Value::SimpleString("node1:6379".to_string()),
4674                Value::VerbatimString {
4675                    format: ferriskey::value::VerbatimFormat::Text,
4676                    text: body_node1.to_string(),
4677                },
4678            ),
4679            (
4680                Value::SimpleString("node2:6379".to_string()),
4681                Value::VerbatimString {
4682                    format: ferriskey::value::VerbatimFormat::Text,
4683                    text: body_node2.to_string(),
4684                },
4685            ),
4686            (
4687                Value::SimpleString("node3:6379".to_string()),
4688                Value::VerbatimString {
4689                    format: ferriskey::value::VerbatimFormat::Text,
4690                    text: body_node3.to_string(),
4691                },
4692            ),
4693        ]);
4694
4695        let bodies = extract_info_bodies(&map).unwrap();
4696        let min = bodies
4697            .iter()
4698            .map(|b| parse_valkey_version(b).unwrap())
4699            .min()
4700            .unwrap();
4701
4702        assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4703        assert!(
4704            min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4705            "all-above-floor cluster must pass the gate"
4706        );
4707    }
4708
4709    #[test]
4710    fn valkey_version_too_low_is_not_retryable() {
4711        let err = ServerError::ValkeyVersionTooLow {
4712            detected: "7.0".into(),
4713            required: "7.2".into(),
4714        };
4715        assert!(!err.is_retryable());
4716        assert_eq!(err.backend_kind(), None);
4717    }
4718
4719    #[test]
4720    fn valkey_version_too_low_error_message_includes_both_versions() {
4721        let err = ServerError::ValkeyVersionTooLow {
4722            detected: "7.0".into(),
4723            required: "7.2".into(),
4724        };
4725        let msg = err.to_string();
4726        assert!(msg.contains("7.0"), "detected version in message: {msg}");
4727        assert!(msg.contains("7.2"), "required version in message: {msg}");
4728        assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4729    }
4730}