Skip to main content

ff_server/
server.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9    AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10    CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13    ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14    DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15    ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16    ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17    RevokeLeaseResult,
18    RotateWaitpointHmacSecretArgs,
19    StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22    self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23    IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26    budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27    PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36/// Upper bound on `member_execution_ids` returned in the
37/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
38/// terminal state (idempotent retry). The first (non-idempotent) cancel call
39/// returns the full list; retries only need a sample.
40const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42/// Re-export of the budget dimension cap.
43///
44/// Defined as the single source of truth in `ff_script::functions::budget` so
45/// the typed FCALL wrappers and the REST boundary cannot silently drift
46/// (PR #106 review). The limit exists to cap FCALL ARGV allocation: both
47/// `create_budget` and `report_usage` build argv whose length is linear in
48/// `dimensions.len()`, so an untrusted caller could otherwise request an
49/// unbounded `Vec` allocation (CodeQL `rust/uncontrolled-allocation-size`,
50/// issue #104).
51pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53/// Validate `create_budget` dimension inputs before building the FCALL argv.
54///
55/// Rejects:
56///   * more than [`MAX_BUDGET_DIMENSIONS`] dimensions (prevents unbounded
57///     `Vec::with_capacity` allocation on attacker-controlled length);
58///   * parallel-array length mismatches between `dimensions`, `hard_limits`,
59///     and `soft_limits` — these are positional inputs the Lua side indexes
60///     by `i = 1..dim_count`, so a mismatch silently corrupts limits rather
61///     than raising.
62fn validate_create_budget_dimensions(
63    dimensions: &[String],
64    hard_limits: &[u64],
65    soft_limits: &[u64],
66) -> Result<(), ServerError> {
67    let dim_count = dimensions.len();
68    if dim_count > MAX_BUDGET_DIMENSIONS {
69        return Err(ServerError::InvalidInput(format!(
70            "too_many_dimensions: limit={}, got={}",
71            MAX_BUDGET_DIMENSIONS, dim_count
72        )));
73    }
74    if hard_limits.len() != dim_count {
75        return Err(ServerError::InvalidInput(format!(
76            "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77            dim_count,
78            hard_limits.len()
79        )));
80    }
81    if soft_limits.len() != dim_count {
82        return Err(ServerError::InvalidInput(format!(
83            "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84            dim_count,
85            soft_limits.len()
86        )));
87    }
88    Ok(())
89}
90
91/// Validate `report_usage` dimension inputs before building the FCALL argv.
92///
93/// Same class of defense as [`validate_create_budget_dimensions`]: caps
94/// argv length and enforces the `dimensions`/`deltas` parallel-array
95/// invariant the Lua side relies on.
96fn validate_report_usage_dimensions(
97    dimensions: &[String],
98    deltas: &[u64],
99) -> Result<(), ServerError> {
100    let dim_count = dimensions.len();
101    if dim_count > MAX_BUDGET_DIMENSIONS {
102        return Err(ServerError::InvalidInput(format!(
103            "too_many_dimensions: limit={}, got={}",
104            MAX_BUDGET_DIMENSIONS, dim_count
105        )));
106    }
107    if deltas.len() != dim_count {
108        return Err(ServerError::InvalidInput(format!(
109            "dimension_delta_array_mismatch: dimensions={} deltas={}",
110            dim_count,
111            deltas.len()
112        )));
113    }
114    Ok(())
115}
116
117/// FlowFabric server — connects everything together.
118///
119/// Manages the Valkey connection, Lua library loading, background scanners,
120/// and provides a minimal API for Phase 1.
121pub struct Server {
122    client: Client,
123    /// Dedicated Valkey connection used EXCLUSIVELY for stream-op calls:
124    /// `xread_block` tails AND `ff_read_attempt_stream` range reads.
125    /// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
126    /// processes commands FIFO on it.
127    ///
128    /// Two head-of-line risks motivate the split from the main client:
129    ///
130    /// * **Blocking**: `XREAD BLOCK 30_000` holds the read side until a
131    ///   new entry arrives or `block_ms` elapses.
132    /// * **Large replies**: `XRANGE … COUNT 10_000` with ~64 KB per
133    ///   frame returns a multi-MB reply serialized on one connection.
134    ///
135    /// Sharing either load with the main client would starve every other
136    /// FCALL (create_execution, claim, rotate_waitpoint_secret,
137    /// budget/quota, admin endpoints) AND every engine scanner.
138    ///
139    /// Kept separate from `client` and from the `Engine` scanner client so
140    /// tail latency cannot couple to foreground API latency or background
141    /// scanner cadence. See RFC-006 Impl Notes for the cascading-failure
142    /// rationale.
143    tail_client: Client,
144    /// Bounds concurrent stream-op calls server-wide — read AND tail
145    /// combined. Each caller acquires one permit for the duration of its
146    /// Valkey round-trip(s); contention surfaces as HTTP 429 at the REST
147    /// boundary, not as a silent queue on the stream connection. Default
148    /// size is `FF_MAX_CONCURRENT_STREAM_OPS` (64; legacy env
149    /// `FF_MAX_CONCURRENT_TAIL` accepted for one release).
150    ///
151    /// Read and tail share the same pool deliberately: they share the
152    /// `tail_client`, so fairness accounting must be unified or a flood
153    /// of one can starve the other. The semaphore is also `close()`d on
154    /// shutdown so no new stream ops can start while existing ones drain
155    /// (see `Server::shutdown`).
156    stream_semaphore: Arc<tokio::sync::Semaphore>,
157    /// Serializes `XREAD BLOCK` calls against `tail_client`.
158    ///
159    /// `ferriskey::Client` is a pipelined multiplexed connection — Valkey
160    /// processes commands FIFO on one socket. `XREAD BLOCK` holds the
161    /// connection's read side for the full `block_ms`, so two parallel
162    /// BLOCKs sent down the same mux serialize: the second waits for the
163    /// first to return before its own BLOCK even begins at the server.
164    /// Meanwhile ferriskey's per-call `request_timeout` (auto-extended to
165    /// `block_ms + 500ms`) starts at future-poll on the CLIENT side, so
166    /// the second call's timeout fires before its turn at the server —
167    /// spurious `timed_out` errors under concurrent tail load.
168    ///
169    /// Explicit serialization around `xread_block` removes the
170    /// silent-failure mode: concurrent tails queue on this Mutex (inside
171    /// an already-acquired semaphore permit), then dispatch one at a
172    /// time with their full `block_ms` budget intact. The semaphore
173    /// ceiling (`max_concurrent_stream_ops`) effectively becomes queue
174    /// depth; throughput on the tail client is 1 BLOCK at a time.
175    ///
176    /// V2 upgrade: a pool of N dedicated `ferriskey::Client` connections
177    /// replacing the single `tail_client` + this Mutex. Deferred; the
178    /// Mutex here is correct v1 behavior.
179    ///
180    /// XRANGE reads (`read_attempt_stream`) are NOT gated by this Mutex —
181    /// XRANGE is non-blocking at the server, so pipelined XRANGEs on one
182    /// mux complete in microseconds each and don't trigger the same
183    /// client-side timeout race. Keeping reads unserialized preserves
184    /// read throughput.
185    xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186    /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
187    /// operators rotate ~monthly and can afford to serialize; concurrent
188    /// rotate requests are an attack or misbehaving script. Holding the
189    /// permit also guards against interleaved partial rotations on the
190    /// Server side of the per-partition locks, surfacing contention as
191    /// HTTP 429 instead of silently queueing and blowing past the 120s
192    /// HTTP timeout. See `rotate_waitpoint_secret` handler.
193    admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194    engine: Engine,
195    config: ServerConfig,
196    /// Long-lived scheduler instance. Held on the server (not rebuilt per
197    /// claim call) so its rotation cursor can advance across calls — a
198    /// fresh-per-call scheduler would reset the cursor on every tick,
199    /// defeating the fairness property (RFC-009 §scan rotation).
200    scheduler: Arc<ff_scheduler::Scheduler>,
201    /// Background tasks spawned by async handlers (e.g. cancel_flow member
202    /// dispatch). Drained on shutdown with a bounded timeout.
203    background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204    /// PR-94: observability registry. Always present; the no-op shim
205    /// takes zero runtime cost when the `observability` feature is
206    /// off, and the real OTEL-backed registry is passed in via
207    /// [`Server::start_with_metrics`] when on. Same `Arc` is shared
208    /// with [`Engine::start_with_metrics`] and
209    /// [`ff_scheduler::Scheduler::with_metrics`] so a single scrape
210    /// sees everything the process produces.
211    metrics: Arc<ff_observability::Metrics>,
212}
213
214/// Server error type.
215#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
218    #[error("valkey: {0}")]
219    Valkey(#[from] ferriskey::Error),
220    /// Valkey error with additional context (preserves ErrorKind via #[source]).
221    #[error("valkey ({context}): {source}")]
222    ValkeyContext {
223        #[source]
224        source: ferriskey::Error,
225        context: String,
226    },
227    #[error("config: {0}")]
228    Config(#[from] crate::config::ConfigError),
229    #[error("library load: {0}")]
230    LibraryLoad(#[from] ff_script::loader::LoadError),
231    #[error("partition mismatch: {0}")]
232    PartitionMismatch(String),
233    #[error("not found: {0}")]
234    NotFound(String),
235    #[error("invalid input: {0}")]
236    InvalidInput(String),
237    #[error("operation failed: {0}")]
238    OperationFailed(String),
239    #[error("script: {0}")]
240    Script(String),
241    /// Server-wide concurrency limit reached on a labelled pool. Surfaces
242    /// as HTTP 429 at the REST boundary so load balancers and clients can
243    /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
244    /// etc.) identifies WHICH pool is exhausted so operators aren't
245    /// misled by a single "tail unavailable" string when the real fault
246    /// is rotation contention.
247    ///
248    /// Fields: (source_label, max_permits).
249    #[error("too many concurrent {0} calls (max: {1})")]
250    ConcurrencyLimitExceeded(&'static str, u32),
251    /// Detected Valkey version is below the RFC-011 §13 minimum. The engine
252    /// depends on Valkey Functions (stabilized in 7.2), RESP3 (7.2+), and
253    /// hash-tag routing; older versions do not implement the required
254    /// primitives. Operator action: upgrade Valkey.
255    #[error(
256        "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
257    )]
258    ValkeyVersionTooLow {
259        detected: String,
260        required: String,
261    },
262}
263
264impl ServerError {
265    /// Returns the underlying ferriskey ErrorKind, if this error carries one.
266    /// Covers direct Valkey variants and library-load failures that bubble a
267    /// `ferriskey::Error` through `LoadError::Valkey`.
268    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
269        match self {
270            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
271            Self::LibraryLoad(e) => e.valkey_kind(),
272            _ => None,
273        }
274    }
275
276    /// Whether this error is safely retryable by a caller. Semantics match
277    /// `ScriptError::class() == Retryable` for Lua errors plus a kind-aware
278    /// check for transport/library-load failures. Business-logic rejections
279    /// (NotFound, InvalidInput, OperationFailed, Script, Config, PartitionMismatch)
280    /// return false — those won't change on retry.
281    pub fn is_retryable(&self) -> bool {
282        match self {
283            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
284                is_retryable_kind(e.kind())
285            }
286            Self::LibraryLoad(load_err) => load_err
287                .valkey_kind()
288                .map(is_retryable_kind)
289                .unwrap_or(false),
290            Self::Config(_)
291            | Self::PartitionMismatch(_)
292            | Self::NotFound(_)
293            | Self::InvalidInput(_)
294            | Self::OperationFailed(_)
295            | Self::Script(_) => false,
296            // Back off and retry — the bound is a server-side permit pool,
297            // so the retry will succeed once a permit frees up. Applies
298            // equally to stream ops, admin rotate, etc.
299            Self::ConcurrencyLimitExceeded(_, _) => true,
300            // Operator must upgrade Valkey; a retry at the caller won't help.
301            Self::ValkeyVersionTooLow { .. } => false,
302        }
303    }
304}
305
306impl Server {
307    /// Start the FlowFabric server.
308    ///
309    /// Boot sequence:
310    /// 1. Connect to Valkey
311    /// 2. Validate or create partition config key
312    /// 3. Load the FlowFabric Lua library
313    /// 4. Start engine (14 background scanners)
314    pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
315        Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
316    }
317
318    /// PR-94: boot the server with a shared observability registry.
319    ///
320    /// Scanner cycle + scheduler metrics record into this registry;
321    /// `main.rs` threads the same handle into the router so `/metrics`
322    /// exposes what the engine produces. The no-arg [`Server::start`]
323    /// forwards here with a fresh `Metrics::new()` — under the default
324    /// build that's the shim, under `observability` it's a real
325    /// registry not shared with any HTTP route (useful for tests
326    /// exercising the engine in isolation).
327    pub async fn start_with_metrics(
328        config: ServerConfig,
329        metrics: Arc<ff_observability::Metrics>,
330    ) -> Result<Self, ServerError> {
331        // Step 1: Connect to Valkey via ClientBuilder
332        tracing::info!(
333            host = %config.host, port = config.port,
334            tls = config.tls, cluster = config.cluster,
335            "connecting to Valkey"
336        );
337        let mut builder = ClientBuilder::new()
338            .host(&config.host, config.port)
339            .connect_timeout(Duration::from_secs(10))
340            .request_timeout(Duration::from_millis(5000));
341        if config.tls {
342            builder = builder.tls();
343        }
344        if config.cluster {
345            builder = builder.cluster();
346        }
347        let client = builder
348            .build()
349            .await
350            .map_err(|e| ServerError::ValkeyContext { source: e, context: "connect".into() })?;
351
352        // Verify connectivity
353        let pong: String = client
354            .cmd("PING")
355            .execute()
356            .await
357            .map_err(|e| ServerError::ValkeyContext { source: e, context: "PING".into() })?;
358        if pong != "PONG" {
359            return Err(ServerError::OperationFailed(format!(
360                "unexpected PING response: {pong}"
361            )));
362        }
363        tracing::info!("Valkey connection established");
364
365        // Step 1b: Verify Valkey version meets the RFC-011 §13 minimum (8.0).
366        // Tolerates a rolling upgrade via a 60s exponential-backoff budget
367        // per RFC-011 §9.17 — transient INFO errors during a node restart
368        // don't trip the check until the whole budget is exhausted.
369        verify_valkey_version(&client).await?;
370
371        // Step 2: Validate or create partition config
372        validate_or_create_partition_config(&client, &config.partition_config).await?;
373
374        // Step 2b: Install waitpoint HMAC secret into every execution partition
375        // (RFC-004 §Waitpoint Security). Fail-fast: if any partition fails,
376        // the server refuses to start — a partial install would silently
377        // reject signal deliveries on half the partitions.
378        initialize_waitpoint_hmac_secret(
379            &client,
380            &config.partition_config,
381            &config.waitpoint_hmac_secret,
382        )
383        .await?;
384
385        // Step 3: Load Lua library (skippable for tests where fixture already loaded)
386        if !config.skip_library_load {
387            tracing::info!("loading flowfabric Lua library");
388            ff_script::loader::ensure_library(&client)
389                .await
390                .map_err(ServerError::LibraryLoad)?;
391        } else {
392            tracing::info!("skipping library load (skip_library_load=true)");
393        }
394
395        // Step 4: Start engine with scanners
396        // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
397        let engine_cfg = ff_engine::EngineConfig {
398            partition_config: config.partition_config,
399            lanes: config.lanes.clone(),
400            lease_expiry_interval: config.engine_config.lease_expiry_interval,
401            delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
402            index_reconciler_interval: config.engine_config.index_reconciler_interval,
403            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
404            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
405            pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
406            retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
407            budget_reset_interval: config.engine_config.budget_reset_interval,
408            budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
409            quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
410            unblock_interval: config.engine_config.unblock_interval,
411            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
412            flow_projector_interval: config.engine_config.flow_projector_interval,
413            execution_deadline_interval: config.engine_config.execution_deadline_interval,
414            cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
415            scanner_filter: config.engine_config.scanner_filter.clone(),
416        };
417        // Engine scanners keep running on the MAIN `client` mux — NOT on
418        // `tail_client`. Scanner cadence is foreground-latency-coupled by
419        // design (an incident that blocks all FCALLs should also visibly
420        // block scanners), and keeping scanners off the tail client means a
421        // long-poll tail can never starve lease-expiry, retention-trim,
422        // etc. Do not change this without revisiting RFC-006 Impl Notes.
423        // Build the Valkey completion backend (issue #90) and subscribe.
424        // This replaces the pre-#90 `CompletionListenerConfig` path:
425        // the wire subscription now lives in ff-backend-valkey, the
426        // engine just consumes the resulting stream.
427        let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
428            config.host.clone(),
429            config.port,
430        );
431        valkey_conn.tls = config.tls;
432        valkey_conn.cluster = config.cluster;
433        let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
434            client.clone(),
435            config.partition_config,
436            valkey_conn,
437        );
438        let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
439            .await
440            .map_err(|e| ServerError::OperationFailed(format!(
441                "subscribe_completions: {e}"
442            )))?;
443
444        let engine = Engine::start_with_completions(
445            engine_cfg,
446            client.clone(),
447            metrics.clone(),
448            completion_stream,
449        );
450
451        // Dedicated tail client. Built AFTER library load + HMAC install
452        // because those steps use the main client; tail client only runs
453        // XREAD/XREAD BLOCK + HMGET on stream_meta, so it never needs the
454        // library loaded — but we build it on the same host/port/TLS
455        // options so network reachability is identical.
456        tracing::info!("opening dedicated tail connection");
457        let mut tail_builder = ClientBuilder::new()
458            .host(&config.host, config.port)
459            .connect_timeout(Duration::from_secs(10))
460            // `request_timeout` is ignored for XREAD BLOCK (ferriskey
461            // auto-extends to `block_ms + 500ms` for blocking commands),
462            // but is used for the companion HMGET — 5s matches main.
463            .request_timeout(Duration::from_millis(5000));
464        if config.tls {
465            tail_builder = tail_builder.tls();
466        }
467        if config.cluster {
468            tail_builder = tail_builder.cluster();
469        }
470        let tail_client = tail_builder
471            .build()
472            .await
473            .map_err(|e| ServerError::ValkeyContext {
474                source: e,
475                context: "connect (tail)".into(),
476            })?;
477        let tail_pong: String = tail_client
478            .cmd("PING")
479            .execute()
480            .await
481            .map_err(|e| ServerError::ValkeyContext {
482                source: e,
483                context: "PING (tail)".into(),
484            })?;
485        if tail_pong != "PONG" {
486            return Err(ServerError::OperationFailed(format!(
487                "tail client unexpected PING response: {tail_pong}"
488            )));
489        }
490
491        let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
492            config.max_concurrent_stream_ops as usize,
493        ));
494        let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
495        tracing::info!(
496            max_concurrent_stream_ops = config.max_concurrent_stream_ops,
497            "stream-op client ready (read + tail share the semaphore; \
498             tails additionally serialize via xread_block_lock)"
499        );
500
501        // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
502        // rotation, etc.) are only protected by the global Bearer
503        // middleware in api.rs — which is only installed when
504        // config.api_token is set. Without FF_API_TOKEN, a public
505        // deployment exposes secret rotation to anyone that can reach
506        // the listen_addr. Warn loudly so operators can't miss it; we
507        // don't fail-start because single-tenant dev uses this path.
508        if config.api_token.is_none() {
509            tracing::warn!(
510                listen_addr = %config.listen_addr,
511                "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
512                 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
513                 FF_API_TOKEN for any deployment reachable from untrusted \
514                 networks."
515            );
516            // Explicit callout for the credential-bearing read endpoints.
517            // Auditors grep for these on a per-endpoint basis; lumping
518            // into the admin warning alone hides the fact that
519            // /pending-waitpoints returns HMAC tokens and /result
520            // returns completion payload bytes.
521            tracing::warn!(
522                listen_addr = %config.listen_addr,
523                "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
524                 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
525                 and GET /v1/executions/{{id}}/result returns raw completion payload \
526                 bytes (may contain PII). Both are UNAUTHENTICATED in this \
527                 configuration."
528            );
529        }
530
531        // Partition counts — post-RFC-011 there are three physical families.
532        // Execution keys co-locate with their parent flow's partition (§2 of
533        // the RFC), so `num_flow_partitions` governs both exec and flow
534        // routing; no separate `num_execution_partitions` count exists.
535        tracing::info!(
536            flow_partitions = config.partition_config.num_flow_partitions,
537            budget_partitions = config.partition_config.num_budget_partitions,
538            quota_partitions = config.partition_config.num_quota_partitions,
539            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
540            listen_addr = %config.listen_addr,
541            "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
542            config.partition_config.num_flow_partitions,
543            config.partition_config.num_budget_partitions,
544            config.partition_config.num_quota_partitions,
545        );
546
547        let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
548            client.clone(),
549            config.partition_config,
550            metrics.clone(),
551        ));
552
553        Ok(Self {
554            client,
555            tail_client,
556            stream_semaphore,
557            xread_block_lock,
558            // Single-permit semaphore: only one rotate-waitpoint-secret can
559            // be mid-flight server-wide. Attackers or misbehaving scripts
560            // firing parallel rotations fail fast with 429 instead of
561            // queueing HTTP handlers.
562            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
563            engine,
564            config,
565            scheduler,
566            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
567            metrics,
568        })
569    }
570
571    /// PR-94: access the shared observability registry.
572    pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
573        &self.metrics
574    }
575
576    /// Get a reference to the ferriskey client.
577    pub fn client(&self) -> &Client {
578        &self.client
579    }
580
581    /// Execute an FCALL with automatic Lua library reload on "function not loaded".
582    ///
583    /// After a Valkey failover the new primary may not have the Lua library
584    /// loaded (replication lag or cold replica). This wrapper detects that
585    /// condition, reloads the library via `ff_script::loader::ensure_library`,
586    /// and retries the FCALL once.
587    async fn fcall_with_reload(
588        &self,
589        function: &str,
590        keys: &[&str],
591        args: &[&str],
592    ) -> Result<Value, ServerError> {
593        fcall_with_reload_on_client(&self.client, function, keys, args).await
594    }
595
596    /// Get the server config.
597    pub fn config(&self) -> &ServerConfig {
598        &self.config
599    }
600
601    /// Get the partition config.
602    pub fn partition_config(&self) -> &PartitionConfig {
603        &self.config.partition_config
604    }
605
606    // ── Minimal Phase 1 API ──
607
608    /// Create a new execution.
609    ///
610    /// Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.
611    pub async fn create_execution(
612        &self,
613        args: &CreateExecutionArgs,
614    ) -> Result<CreateExecutionResult, ServerError> {
615        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
616        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
617        let idx = IndexKeys::new(&partition);
618
619        let lane = &args.lane_id;
620        let tag = partition.hash_tag();
621        let idem_key = match &args.idempotency_key {
622            Some(k) if !k.is_empty() => {
623                keys::idempotency_key(&tag, args.namespace.as_str(), k)
624            }
625            _ => ctx.noop(),
626        };
627
628        let delay_str = args
629            .delay_until
630            .map(|d| d.0.to_string())
631            .unwrap_or_default();
632        let is_delayed = !delay_str.is_empty();
633
634        // KEYS (8) must match lua/execution.lua ff_create_execution positional order:
635        //   [1] exec_core, [2] payload, [3] policy, [4] tags,
636        //   [5] scheduling_zset (eligible OR delayed — ONE key),
637        //   [6] idem_key, [7] execution_deadline, [8] all_executions
638        let scheduling_zset = if is_delayed {
639            idx.lane_delayed(lane)
640        } else {
641            idx.lane_eligible(lane)
642        };
643
644        let fcall_keys: Vec<String> = vec![
645            ctx.core(),                  // 1
646            ctx.payload(),               // 2
647            ctx.policy(),                // 3
648            ctx.tags(),                  // 4
649            scheduling_zset,             // 5
650            idem_key,                    // 6
651            idx.execution_deadline(),    // 7
652            idx.all_executions(),        // 8
653        ];
654
655        let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
656
657        // ARGV (13) must match lua/execution.lua ff_create_execution positional order:
658        //   [1] execution_id, [2] namespace, [3] lane_id, [4] execution_kind,
659        //   [5] priority, [6] creator_identity, [7] policy_json,
660        //   [8] input_payload, [9] delay_until, [10] dedup_ttl_ms,
661        //   [11] tags_json, [12] execution_deadline_at, [13] partition_id
662        let fcall_args: Vec<String> = vec![
663            args.execution_id.to_string(),           // 1
664            args.namespace.to_string(),              // 2
665            args.lane_id.to_string(),                // 3
666            args.execution_kind.clone(),             // 4
667            args.priority.to_string(),               // 5
668            args.creator_identity.clone(),           // 6
669            args.policy.as_ref()
670                .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
671                .unwrap_or_else(|| "{}".to_owned()), // 7
672            String::from_utf8_lossy(&args.input_payload).into_owned(), // 8
673            delay_str,                               // 9
674            args.idempotency_key.as_ref()
675                .map(|_| "86400000".to_string())
676                .unwrap_or_default(),                // 10 dedup_ttl_ms
677            tags_json,                               // 11
678            args.execution_deadline_at
679                .map(|d| d.to_string())
680                .unwrap_or_default(),                // 12 execution_deadline_at
681            args.partition_id.to_string(),           // 13
682        ];
683
684        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
685        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
686
687        let raw: Value = self
688            .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
689            .await?;
690
691        parse_create_result(&raw, &args.execution_id)
692    }
693
694    /// Cancel an execution.
695    pub async fn cancel_execution(
696        &self,
697        args: &CancelExecutionArgs,
698    ) -> Result<CancelExecutionResult, ServerError> {
699        let raw = self
700            .fcall_cancel_execution_with_reload(args)
701            .await?;
702        parse_cancel_result(&raw, &args.execution_id)
703    }
704
705    /// Build KEYS/ARGV for `ff_cancel_execution` and invoke via the server's
706    /// reload-capable FCALL. Shared by the inline method and background
707    /// cancel_flow dispatch via [`Self::fcall_cancel_execution_with_reload`].
708    async fn fcall_cancel_execution_with_reload(
709        &self,
710        args: &CancelExecutionArgs,
711    ) -> Result<Value, ServerError> {
712        let (keys, argv) = build_cancel_execution_fcall(
713            &self.client,
714            &self.config.partition_config,
715            args,
716        )
717        .await?;
718        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
719        let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
720        self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
721    }
722
723    /// Get the public state of an execution.
724    ///
725    /// Reads `public_state` from the exec_core hash. Returns the parsed
726    /// PublicState enum. If the execution is not found, returns an error.
727    pub async fn get_execution_state(
728        &self,
729        execution_id: &ExecutionId,
730    ) -> Result<PublicState, ServerError> {
731        let partition = execution_partition(execution_id, &self.config.partition_config);
732        let ctx = ExecKeyContext::new(&partition, execution_id);
733
734        let state_str: Option<String> = self
735            .client
736            .hget(&ctx.core(), "public_state")
737            .await
738            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET public_state".into() })?;
739
740        match state_str {
741            Some(s) => {
742                let quoted = format!("\"{s}\"");
743                serde_json::from_str(&quoted).map_err(|e| {
744                    ServerError::Script(format!(
745                        "invalid public_state '{s}' for {execution_id}: {e}"
746                    ))
747                })
748            }
749            None => Err(ServerError::NotFound(format!(
750                "execution not found: {execution_id}"
751            ))),
752        }
753    }
754
755    /// Read the raw result payload written by `ff_complete_execution`.
756    ///
757    /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
758    /// No FCALL — this is a direct GET; returns `Ok(None)` when the
759    /// execution is missing, not yet complete, or (in a future
760    /// retention-policy world) when the result was trimmed.
761    ///
762    /// # Contract vs `get_execution_state`
763    ///
764    /// `get_execution_state` is the authoritative completion signal. If
765    /// a caller observes `state == completed` but `get_execution_result`
766    /// returns `None`, the result bytes are unavailable — not a caller
767    /// bug and not a server bug, just the retention policy trimming the
768    /// blob. V1 sets no retention, so callers on v1 can treat
769    /// `state == completed` + `Ok(None)` as a server bug.
770    ///
771    /// # Ordering
772    ///
773    /// Callers MUST wait for `state == completed` before calling this
774    /// method; polls issued before the state transition may hit a
775    /// narrow window where the completion Lua has written
776    /// `public_state = completed` but the `result` key SET is still
777    /// on-wire. The current Lua `ff_complete_execution` writes both in
778    /// the same atomic script, so the window is effectively zero for
779    /// direct callers — but retries via `ff_replay_execution` open it
780    /// briefly.
781    pub async fn get_execution_result(
782        &self,
783        execution_id: &ExecutionId,
784    ) -> Result<Option<Vec<u8>>, ServerError> {
785        let partition = execution_partition(execution_id, &self.config.partition_config);
786        let ctx = ExecKeyContext::new(&partition, execution_id);
787
788        // Binary-safe read. Decoding into `String` would reject any
789        // non-UTF-8 payload (bincode-encoded f32 vectors, compressed
790        // artifacts, arbitrary binary stages) with a ferriskey decode
791        // error that surfaces as HTTP 500. The endpoint response is
792        // already `application/octet-stream`; `Vec<u8>` has a
793        // ferriskey-specialized `FromValue` impl that preserves raw
794        // bytes without UTF-8 validation (see ferriskey/src/value.rs:
795        // `from_byte_vec`). Nil → None via the blanket
796        // `Option<T: FromValue>` wrapper.
797        let payload: Option<Vec<u8>> = self
798            .client
799            .cmd("GET")
800            .arg(ctx.result())
801            .execute()
802            .await
803            .map_err(|e| ServerError::ValkeyContext {
804                source: e,
805                context: "GET exec result".into(),
806            })?;
807        Ok(payload)
808    }
809
810    /// List the active (`pending` or `active`) waitpoints for an execution.
811    ///
812    /// Returns one [`PendingWaitpointInfo`] per open waitpoint, including the
813    /// HMAC-SHA1 `waitpoint_token` needed to deliver authenticated signals.
814    /// `closed` waitpoints are elided — callers looking at history should
815    /// read the stream or lease history instead.
816    ///
817    /// Read plan: SSCAN `ctx.waitpoints()` with `COUNT 100` (bounded
818    /// page size, matching the unblock / flow-projector / budget-
819    /// reconciler convention) to enumerate waitpoint IDs, then TWO
820    /// pipelines:
821    ///
822    /// * Pass 1 — single round-trip containing, per waitpoint, one
823    ///   HMGET over the documented field set + one HGET for
824    ///   `total_matchers` on the condition hash.
825    /// * Pass 2 (conditional) — single round-trip containing an
826    ///   HMGET per waitpoint with `total_matchers > 0` to read the
827    ///   `matcher:N:name` fields.
828    ///
829    /// No FCALL — this is a read-only view built from already-
830    /// persisted state, so skipping Lua keeps the Valkey single-
831    /// writer path uncontended. HMGET (vs HGETALL) bounds the per-
832    /// waitpoint read to the documented field set and defends
833    /// against a poisoned waitpoint hash with unbounded extra fields
834    /// accumulating response memory.
835    ///
836    /// # Empty result semantics (TOCTOU)
837    ///
838    /// An empty `Vec` is returned in three cases:
839    ///
840    /// 1. The execution exists but has never suspended.
841    /// 2. All existing waitpoints are `closed` (already resolved).
842    /// 3. A narrow teardown race: `SSCAN` read the waitpoint set after
843    ///    a concurrent `ff_close_waitpoint` or execution-cleanup script
844    ///    deleted the waitpoint hashes but before it SREM'd the set
845    ///    members. Each HMGET returns all-None and we skip.
846    ///
847    /// Callers that get an unexpected empty list should cross-check
848    /// execution state (`get_execution_state`) to distinguish "pipeline
849    /// moved past suspended" from "nothing to review yet".
850    ///
851    /// A waitpoint hash that's present but missing its `waitpoint_token`
852    /// field is similarly elided and a server-side WARN is emitted —
853    /// this indicates storage corruption (a write that half-populated
854    /// the hash) and operators should investigate.
855    pub async fn list_pending_waitpoints(
856        &self,
857        execution_id: &ExecutionId,
858    ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
859        let partition = execution_partition(execution_id, &self.config.partition_config);
860        let ctx = ExecKeyContext::new(&partition, execution_id);
861
862        let core_exists: bool = self
863            .client
864            .cmd("EXISTS")
865            .arg(ctx.core())
866            .execute()
867            .await
868            .map_err(|e| ServerError::ValkeyContext {
869                source: e,
870                context: "EXISTS exec_core (pending waitpoints)".into(),
871            })?;
872        if !core_exists {
873            return Err(ServerError::NotFound(format!(
874                "execution not found: {execution_id}"
875            )));
876        }
877
878        // SSCAN the waitpoints set in bounded pages. SMEMBERS would
879        // load the entire set in one reply — fine for today's
880        // single-waitpoint executions, but the codebase convention
881        // (budget_reconciler, flow_projector, unblock scanner) is SSCAN
882        // COUNT 100 so response size per round-trip is bounded as the
883        // set grows. Match the precedent instead of carving a new
884        // pattern here.
885        const WAITPOINTS_SSCAN_COUNT: usize = 100;
886        let waitpoints_key = ctx.waitpoints();
887        let mut wp_ids_raw: Vec<String> = Vec::new();
888        let mut cursor: String = "0".to_owned();
889        loop {
890            let reply: (String, Vec<String>) = self
891                .client
892                .cmd("SSCAN")
893                .arg(&waitpoints_key)
894                .arg(&cursor)
895                .arg("COUNT")
896                .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
897                .execute()
898                .await
899                .map_err(|e| ServerError::ValkeyContext {
900                    source: e,
901                    context: "SSCAN waitpoints".into(),
902                })?;
903            cursor = reply.0;
904            wp_ids_raw.extend(reply.1);
905            if cursor == "0" {
906                break;
907            }
908        }
909
910        // SSCAN may observe a member more than once across iterations —
911        // that is documented Valkey behavior, not a bug (see
912        // https://valkey.io/commands/sscan/). Dedup in-place before
913        // building the typed id list so the HTTP response and the
914        // downstream pipelined HMGETs don't duplicate work. Sort +
915        // dedup keeps this O(n log n) without a BTreeSet allocation;
916        // typical n is 1 (the single waitpoint on a suspended exec).
917        wp_ids_raw.sort_unstable();
918        wp_ids_raw.dedup();
919
920        if wp_ids_raw.is_empty() {
921            return Ok(Vec::new());
922        }
923
924        // Parse + filter out malformed waitpoint_ids up-front so the
925        // downstream pipeline indexes stay aligned to the Vec of
926        // typed ids.
927        let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
928        for raw in &wp_ids_raw {
929            match WaitpointId::parse(raw) {
930                Ok(id) => wp_ids.push(id),
931                Err(e) => tracing::warn!(
932                    raw_id = %raw,
933                    error = %e,
934                    execution_id = %execution_id,
935                    "list_pending_waitpoints: skipping unparseable waitpoint_id"
936                ),
937            }
938        }
939        if wp_ids.is_empty() {
940            return Ok(Vec::new());
941        }
942
943        // Bounded HMGET field set — these are the six hash fields that
944        // surface in `PendingWaitpointInfo`. Fixed-size indexing into
945        // the response below tracks this order.
946        const WP_FIELDS: [&str; 6] = [
947            "state",
948            "waitpoint_key",
949            "waitpoint_token",
950            "created_at",
951            "activated_at",
952            "expires_at",
953        ];
954
955        // Pass 1 pipeline: waitpoint HMGET + condition HGET
956        // total_matchers, one of each per waitpoint, in a SINGLE
957        // round-trip. Sequential HMGETs were an N-round-trip latency
958        // floor on flows with fan-out waitpoints.
959        let mut pass1 = self.client.pipeline();
960        let mut wp_slots = Vec::with_capacity(wp_ids.len());
961        let mut cond_slots = Vec::with_capacity(wp_ids.len());
962        for wp_id in &wp_ids {
963            let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
964            cmd = cmd.arg(ctx.waitpoint(wp_id));
965            for f in WP_FIELDS {
966                cmd = cmd.arg(f);
967            }
968            wp_slots.push(cmd.finish());
969
970            cond_slots.push(
971                pass1
972                    .cmd::<Option<String>>("HGET")
973                    .arg(ctx.waitpoint_condition(wp_id))
974                    .arg("total_matchers")
975                    .finish(),
976            );
977        }
978        pass1
979            .execute()
980            .await
981            .map_err(|e| ServerError::ValkeyContext {
982                source: e,
983                context: "pipeline HMGET waitpoints + HGET total_matchers".into(),
984            })?;
985
986        // Collect pass-1 results + queue pass-2 HMGETs for condition
987        // matcher names on waitpoints that are actionable (state in
988        // pending/active, non-empty token, total_matchers > 0). PipeSlot
989        // values are owning, so iterate all three ordered zip'd iters
990        // and consume them together.
991        struct Kept {
992            wp_id: WaitpointId,
993            wp_fields: Vec<Option<String>>,
994            total_matchers: usize,
995        }
996        let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
997        for ((wp_id, wp_slot), cond_slot) in wp_ids
998            .iter()
999            .zip(wp_slots)
1000            .zip(cond_slots)
1001        {
1002            let wp_fields: Vec<Option<String>> =
1003                wp_slot.value().map_err(|e| ServerError::ValkeyContext {
1004                    source: e,
1005                    context: format!("pipeline slot HMGET waitpoint {wp_id}"),
1006                })?;
1007
1008            // Waitpoint hash may have been GC'd between the SSCAN and
1009            // this HMGET (rotation / cleanup race). Skip silently.
1010            if wp_fields.iter().all(Option::is_none) {
1011                // Still consume the cond_slot to free its buffer.
1012                let _ = cond_slot.value();
1013                continue;
1014            }
1015            let state_ref = wp_fields
1016                .first()
1017                .and_then(|v| v.as_deref())
1018                .unwrap_or("");
1019            if state_ref != "pending" && state_ref != "active" {
1020                let _ = cond_slot.value();
1021                continue;
1022            }
1023            let token_ref = wp_fields
1024                .get(2)
1025                .and_then(|v| v.as_deref())
1026                .unwrap_or("");
1027            if token_ref.is_empty() {
1028                let _ = cond_slot.value();
1029                tracing::warn!(
1030                    waitpoint_id = %wp_id,
1031                    execution_id = %execution_id,
1032                    waitpoint_hash_key = %ctx.waitpoint(wp_id),
1033                    state = %state_ref,
1034                    "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1035                     field is empty — likely storage corruption (half-populated write, \
1036                     operator edit, or interrupted script). Skipping this entry in the \
1037                     response. HGETALL the waitpoint_hash_key to inspect."
1038                );
1039                continue;
1040            }
1041
1042            let total_matchers = cond_slot
1043                .value()
1044                .map_err(|e| ServerError::ValkeyContext {
1045                    source: e,
1046                    context: format!("pipeline slot HGET total_matchers {wp_id}"),
1047                })?
1048                .and_then(|s| s.parse::<usize>().ok())
1049                .unwrap_or(0);
1050
1051            kept.push(Kept {
1052                wp_id: wp_id.clone(),
1053                wp_fields,
1054                total_matchers,
1055            });
1056        }
1057
1058        if kept.is_empty() {
1059            return Ok(Vec::new());
1060        }
1061
1062        // Pass 2 pipeline: matcher-name HMGETs for every kept waitpoint
1063        // with total_matchers > 0. Single round-trip regardless of
1064        // per-waitpoint matcher count. Waitpoints with total_matchers==0
1065        // (wildcard) skip the HMGET entirely.
1066        let mut pass2 = self.client.pipeline();
1067        let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1068        let mut pass2_needed = false;
1069        for k in &kept {
1070            if k.total_matchers == 0 {
1071                matcher_slots.push(None);
1072                continue;
1073            }
1074            pass2_needed = true;
1075            let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1076            cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1077            for i in 0..k.total_matchers {
1078                cmd = cmd.arg(format!("matcher:{i}:name"));
1079            }
1080            matcher_slots.push(Some(cmd.finish()));
1081        }
1082        if pass2_needed {
1083            pass2.execute().await.map_err(|e| ServerError::ValkeyContext {
1084                source: e,
1085                context: "pipeline HMGET wp_condition matchers".into(),
1086            })?;
1087        }
1088
1089        let parse_ts = |raw: &str| -> Option<TimestampMs> {
1090            if raw.is_empty() {
1091                None
1092            } else {
1093                raw.parse::<i64>().ok().map(TimestampMs)
1094            }
1095        };
1096
1097        let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1098        for (k, slot) in kept.into_iter().zip(matcher_slots) {
1099            let get = |i: usize| -> &str {
1100                k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1101            };
1102
1103            // Collect matcher names, eliding the empty-name wildcard
1104            // sentinel (see lua/helpers.lua initialize_condition).
1105            let required_signal_names: Vec<String> = match slot {
1106                None => Vec::new(),
1107                Some(s) => {
1108                    let vals: Vec<Option<String>> =
1109                        s.value().map_err(|e| ServerError::ValkeyContext {
1110                            source: e,
1111                            context: format!(
1112                                "pipeline slot HMGET wp_condition matchers {}",
1113                                k.wp_id
1114                            ),
1115                        })?;
1116                    vals.into_iter()
1117                        .flatten()
1118                        .filter(|name| !name.is_empty())
1119                        .collect()
1120                }
1121            };
1122
1123            out.push(PendingWaitpointInfo {
1124                waitpoint_id: k.wp_id,
1125                waitpoint_key: get(1).to_owned(),
1126                state: get(0).to_owned(),
1127                waitpoint_token: WaitpointToken(get(2).to_owned()),
1128                required_signal_names,
1129                created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1130                activated_at: parse_ts(get(4)),
1131                expires_at: parse_ts(get(5)),
1132            });
1133        }
1134
1135        Ok(out)
1136    }
1137
1138    // ── Budget / Quota API ──
1139
1140    /// Create a new budget policy.
1141    pub async fn create_budget(
1142        &self,
1143        args: &CreateBudgetArgs,
1144    ) -> Result<CreateBudgetResult, ServerError> {
1145        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1146        validate_create_budget_dimensions(
1147            &args.dimensions,
1148            &args.hard_limits,
1149            &args.soft_limits,
1150        )?;
1151        let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1152        let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1153        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1154        let policies_index = keys::budget_policies_index(bctx.hash_tag());
1155
1156        // KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,
1157        //           budget_policies_index
1158        let fcall_keys: Vec<String> = vec![
1159            bctx.definition(),
1160            bctx.limits(),
1161            bctx.usage(),
1162            resets_key,
1163            policies_index,
1164        ];
1165
1166        // ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,
1167        //   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
1168        //   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
1169        let dim_count = args.dimensions.len();
1170        let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1171        fcall_args.push(args.budget_id.to_string());
1172        fcall_args.push(args.scope_type.clone());
1173        fcall_args.push(args.scope_id.clone());
1174        fcall_args.push(args.enforcement_mode.clone());
1175        fcall_args.push(args.on_hard_limit.clone());
1176        fcall_args.push(args.on_soft_limit.clone());
1177        fcall_args.push(args.reset_interval_ms.to_string());
1178        fcall_args.push(args.now.to_string());
1179        fcall_args.push(dim_count.to_string());
1180        for dim in &args.dimensions {
1181            fcall_args.push(dim.clone());
1182        }
1183        for hard in &args.hard_limits {
1184            fcall_args.push(hard.to_string());
1185        }
1186        for soft in &args.soft_limits {
1187            fcall_args.push(soft.to_string());
1188        }
1189
1190        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1191        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1192
1193        let raw: Value = self
1194            .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1195            .await?;
1196
1197        parse_budget_create_result(&raw, &args.budget_id)
1198    }
1199
1200    /// Create a new quota/rate-limit policy.
1201    pub async fn create_quota_policy(
1202        &self,
1203        args: &CreateQuotaPolicyArgs,
1204    ) -> Result<CreateQuotaPolicyResult, ServerError> {
1205        let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1206        let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1207
1208        // KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,
1209        //           admitted_set, quota_policies_index
1210        let fcall_keys: Vec<String> = vec![
1211            qctx.definition(),
1212            qctx.window("requests_per_window"),
1213            qctx.concurrency(),
1214            qctx.admitted_set(),
1215            keys::quota_policies_index(qctx.hash_tag()),
1216        ];
1217
1218        // ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,
1219        //           max_concurrent, now_ms
1220        let fcall_args: Vec<String> = vec![
1221            args.quota_policy_id.to_string(),
1222            args.window_seconds.to_string(),
1223            args.max_requests_per_window.to_string(),
1224            args.max_concurrent.to_string(),
1225            args.now.to_string(),
1226        ];
1227
1228        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1229        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1230
1231        let raw: Value = self
1232            .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1233            .await?;
1234
1235        parse_quota_create_result(&raw, &args.quota_policy_id)
1236    }
1237
1238    /// Read-only budget status for operator visibility.
1239    pub async fn get_budget_status(
1240        &self,
1241        budget_id: &BudgetId,
1242    ) -> Result<BudgetStatus, ServerError> {
1243        let partition = budget_partition(budget_id, &self.config.partition_config);
1244        let bctx = BudgetKeyContext::new(&partition, budget_id);
1245
1246        // Read budget definition
1247        let def: HashMap<String, String> = self
1248            .client
1249            .hgetall(&bctx.definition())
1250            .await
1251            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_def".into() })?;
1252
1253        if def.is_empty() {
1254            return Err(ServerError::NotFound(format!(
1255                "budget not found: {budget_id}"
1256            )));
1257        }
1258
1259        // Read usage
1260        let usage_raw: HashMap<String, String> = self
1261            .client
1262            .hgetall(&bctx.usage())
1263            .await
1264            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_usage".into() })?;
1265        let usage: HashMap<String, u64> = usage_raw
1266            .into_iter()
1267            .filter(|(k, _)| k != "_init")
1268            .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1269            .collect();
1270
1271        // Read limits
1272        let limits_raw: HashMap<String, String> = self
1273            .client
1274            .hgetall(&bctx.limits())
1275            .await
1276            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_limits".into() })?;
1277        let mut hard_limits = HashMap::new();
1278        let mut soft_limits = HashMap::new();
1279        for (k, v) in &limits_raw {
1280            if let Some(dim) = k.strip_prefix("hard:") {
1281                hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1282            } else if let Some(dim) = k.strip_prefix("soft:") {
1283                soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1284            }
1285        }
1286
1287        let non_empty = |s: Option<&String>| -> Option<String> {
1288            s.filter(|v| !v.is_empty()).cloned()
1289        };
1290
1291        Ok(BudgetStatus {
1292            budget_id: budget_id.to_string(),
1293            scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1294            scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1295            enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1296            usage,
1297            hard_limits,
1298            soft_limits,
1299            breach_count: def
1300                .get("breach_count")
1301                .and_then(|v| v.parse().ok())
1302                .unwrap_or(0),
1303            soft_breach_count: def
1304                .get("soft_breach_count")
1305                .and_then(|v| v.parse().ok())
1306                .unwrap_or(0),
1307            last_breach_at: non_empty(def.get("last_breach_at")),
1308            last_breach_dim: non_empty(def.get("last_breach_dim")),
1309            next_reset_at: non_empty(def.get("next_reset_at")),
1310            created_at: non_empty(def.get("created_at")),
1311        })
1312    }
1313
1314    /// Report usage against a budget and check limits.
1315    pub async fn report_usage(
1316        &self,
1317        budget_id: &BudgetId,
1318        args: &ReportUsageArgs,
1319    ) -> Result<ReportUsageResult, ServerError> {
1320        // Cap ARGV before allocation — see MAX_BUDGET_DIMENSIONS (#104).
1321        validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1322        let partition = budget_partition(budget_id, &self.config.partition_config);
1323        let bctx = BudgetKeyContext::new(&partition, budget_id);
1324
1325        // KEYS (3): budget_usage, budget_limits, budget_def
1326        let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1327
1328        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
1329        let dim_count = args.dimensions.len();
1330        let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1331        fcall_args.push(dim_count.to_string());
1332        for dim in &args.dimensions {
1333            fcall_args.push(dim.clone());
1334        }
1335        for delta in &args.deltas {
1336            fcall_args.push(delta.to_string());
1337        }
1338        fcall_args.push(args.now.to_string());
1339        let dedup_key_val = args
1340            .dedup_key
1341            .as_ref()
1342            .filter(|k| !k.is_empty())
1343            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1344            .unwrap_or_default();
1345        fcall_args.push(dedup_key_val);
1346
1347        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1348        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1349
1350        let raw: Value = self
1351            .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1352            .await?;
1353
1354        parse_report_usage_result(&raw)
1355    }
1356
1357    /// Reset a budget's usage counters and schedule the next reset.
1358    pub async fn reset_budget(
1359        &self,
1360        budget_id: &BudgetId,
1361    ) -> Result<ResetBudgetResult, ServerError> {
1362        let partition = budget_partition(budget_id, &self.config.partition_config);
1363        let bctx = BudgetKeyContext::new(&partition, budget_id);
1364        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1365
1366        // KEYS (3): budget_def, budget_usage, budget_resets_zset
1367        let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1368
1369        // ARGV (2): budget_id, now_ms
1370        let now = TimestampMs::now();
1371        let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1372
1373        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1374        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1375
1376        let raw: Value = self
1377            .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1378            .await?;
1379
1380        parse_reset_budget_result(&raw)
1381    }
1382
1383    // ── Flow API ──
1384
1385    /// Create a new flow container.
1386    pub async fn create_flow(
1387        &self,
1388        args: &CreateFlowArgs,
1389    ) -> Result<CreateFlowResult, ServerError> {
1390        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1391        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1392        let fidx = FlowIndexKeys::new(&partition);
1393
1394        // KEYS (3): flow_core, members_set, flow_index
1395        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1396
1397        // ARGV (4): flow_id, flow_kind, namespace, now_ms
1398        let fcall_args: Vec<String> = vec![
1399            args.flow_id.to_string(),
1400            args.flow_kind.clone(),
1401            args.namespace.to_string(),
1402            args.now.to_string(),
1403        ];
1404
1405        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1406        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1407
1408        let raw: Value = self
1409            .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1410            .await?;
1411
1412        parse_create_flow_result(&raw, &args.flow_id)
1413    }
1414
1415    /// Add an execution to a flow.
1416    ///
1417    /// # Atomic single-FCALL commit (RFC-011 §7.3)
1418    ///
1419    /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
1420    /// hash-tag routing (both hash to `{fp:N}` via the exec id's
1421    /// embedded partition). A single atomic FCALL writes:
1422    ///
1423    ///   - `members_set` SADD (flow membership)
1424    ///   - `exec_core.flow_id` HSET (back-pointer)
1425    ///   - `flow_index` SADD (self-heal)
1426    ///   - `flow_core` HINCRBY node_count / graph_revision +
1427    ///     HSET last_mutation_at
1428    ///
1429    /// All four writes commit atomically or none do (Valkey scripting
1430    /// contract: validates-before-writing in the Lua body means
1431    /// `flow_not_found` / `flow_already_terminal` early-returns fire
1432    /// BEFORE any `redis.call()` mutation, and a mid-body error after
1433    /// writes is not expected because all writes are on the same slot).
1434    ///
1435    /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
1436    ///
1437    /// # Consumer contract
1438    ///
1439    /// The caller's `args.execution_id` **must** be co-located with
1440    /// `args.flow_id`'s partition — i.e. minted via
1441    /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
1442    /// `solo`-minted id (or any exec id hashing to a different
1443    /// `{fp:N}` than the flow's) will fail at the Valkey level with
1444    /// `CROSSSLOT` on a clustered deploy.
1445    ///
1446    /// Callers with a flow context in scope always use `for_flow`;
1447    /// this is the only supported mint path for flow-member execs
1448    /// post-RFC-011. Test fixtures that pre-date the co-location
1449    /// contract use `TestCluster::new_execution_id_on_partition` to
1450    /// pin to a specific hash-tag index for `fcall_create_flow`-style
1451    /// helpers that hard-code their flow partition.
1452    pub async fn add_execution_to_flow(
1453        &self,
1454        args: &AddExecutionToFlowArgs,
1455    ) -> Result<AddExecutionToFlowResult, ServerError> {
1456        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1457        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1458        let fidx = FlowIndexKeys::new(&partition);
1459
1460        // exec_core co-locates with flow_core under RFC-011 §7.3 —
1461        // same `{fp:N}` hash-tag, same slot, part of the same atomic
1462        // FCALL below.
1463        let exec_partition =
1464            execution_partition(&args.execution_id, &self.config.partition_config);
1465        let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1466
1467        // Pre-flight: exec_partition must match flow_partition under
1468        // RFC-011 §7.3 co-location contract. If the caller hands us a
1469        // `solo`-minted exec whose hash-tag ≠ flow_partition, the FCALL
1470        // would fail with raw `CROSSSLOT` on a clustered deploy — a
1471        // typed `ServerError::PartitionMismatch` is a clearer signal
1472        // that the consumer-contract invariant was violated at mint
1473        // time (caller should have used `ExecutionId::for_flow(...)`).
1474        // See the Consumer contract section of this method's rustdoc.
1475        if exec_partition.index != partition.index {
1476            return Err(ServerError::PartitionMismatch(format!(
1477                "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1478                 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1479                 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1480                exec_p = exec_partition.index,
1481                flow_p = partition.index,
1482            )));
1483        }
1484
1485        // KEYS (4): flow_core, members_set, flow_index, exec_core
1486        let fcall_keys: Vec<String> = vec![
1487            fctx.core(),
1488            fctx.members(),
1489            fidx.flow_index(),
1490            ectx.core(),
1491        ];
1492
1493        // ARGV (3): flow_id, execution_id, now_ms
1494        let fcall_args: Vec<String> = vec![
1495            args.flow_id.to_string(),
1496            args.execution_id.to_string(),
1497            args.now.to_string(),
1498        ];
1499
1500        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1501        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1502
1503        let raw: Value = self
1504            .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1505            .await?;
1506
1507        parse_add_execution_to_flow_result(&raw)
1508    }
1509
1510    /// Cancel a flow.
1511    ///
1512    /// Flips `public_flow_state` to `cancelled` atomically via
1513    /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
1514    /// executions must be cancelled cross-partition; this dispatch runs in
1515    /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
1516    /// immediately. For all other policies (or flows with no members), or
1517    /// when the flow was already in a terminal state (idempotent retry),
1518    /// the call returns [`CancelFlowResult::Cancelled`].
1519    ///
1520    /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
1521    ///
1522    /// # Backpressure
1523    ///
1524    /// Each call that hits the async dispatch path spawns a new task into
1525    /// the shared background `JoinSet`. Rapid repeated calls against the
1526    /// same flow will spawn *multiple* overlapping dispatch tasks. This is
1527    /// not a correctness issue — each member cancel is idempotent and
1528    /// terminal flows short-circuit via [`ParsedCancelFlow::AlreadyTerminal`]
1529    /// — but heavy burst callers should either use `?wait=true` (serialises
1530    /// the dispatch on the HTTP thread, giving natural backpressure) or
1531    /// implement client-side deduplication on `flow_id`. The `JoinSet` is
1532    /// drained with a 15s timeout on [`Self::shutdown`], so very long
1533    /// dispatch tails may be aborted during graceful shutdown.
1534    ///
1535    /// # Orphan-member semantics on shutdown abort
1536    ///
1537    /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
1538    /// while a dispatch loop is mid-iteration, the already-issued
1539    /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
1540    /// `terminal_outcome = cancelled` and the caller-supplied reason. The
1541    /// members not yet visited are abandoned mid-loop. They remain in
1542    /// whichever state they were in (active/eligible/suspended) until the
1543    /// natural lifecycle scanners reach them: active leases expire
1544    /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
1545    /// members time out to `skipped`, eligible ones sit until retention
1546    /// trim. So no orphan state — but the terminal_outcome for the
1547    /// abandoned members will be `expired`/`skipped` rather than
1548    /// `cancelled`, and the operator-supplied `reason` is lost for them.
1549    /// Audit tooling that requires reason fidelity across shutdowns should
1550    /// use `?wait=true`.
1551    pub async fn cancel_flow(
1552        &self,
1553        args: &CancelFlowArgs,
1554    ) -> Result<CancelFlowResult, ServerError> {
1555        self.cancel_flow_inner(args, false).await
1556    }
1557
1558    /// Cancel a flow and wait for all member cancellations to complete
1559    /// inline. Slower than [`Self::cancel_flow`] for large flows, but
1560    /// guarantees every member is in a terminal state on return.
1561    pub async fn cancel_flow_wait(
1562        &self,
1563        args: &CancelFlowArgs,
1564    ) -> Result<CancelFlowResult, ServerError> {
1565        self.cancel_flow_inner(args, true).await
1566    }
1567
1568    async fn cancel_flow_inner(
1569        &self,
1570        args: &CancelFlowArgs,
1571        wait: bool,
1572    ) -> Result<CancelFlowResult, ServerError> {
1573        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1574        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1575        let fidx = FlowIndexKeys::new(&partition);
1576
1577        // Grace window before the reconciler may pick up this flow.
1578        // Kept short because the in-process dispatch is fast and bounded;
1579        // long enough to cover transport round-trips under load without
1580        // the reconciler fighting the live dispatch.
1581        const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1582
1583        // KEYS (5): flow_core, members_set, flow_index, pending_cancels, cancel_backlog
1584        let fcall_keys: Vec<String> = vec![
1585            fctx.core(),
1586            fctx.members(),
1587            fidx.flow_index(),
1588            fctx.pending_cancels(),
1589            fidx.cancel_backlog(),
1590        ];
1591
1592        // ARGV (5): flow_id, reason, cancellation_policy, now_ms, grace_ms
1593        let fcall_args: Vec<String> = vec![
1594            args.flow_id.to_string(),
1595            args.reason.clone(),
1596            args.cancellation_policy.clone(),
1597            args.now.to_string(),
1598            CANCEL_RECONCILER_GRACE_MS.to_string(),
1599        ];
1600
1601        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1602        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1603
1604        let raw: Value = self
1605            .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1606            .await?;
1607
1608        let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1609            ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1610                (policy, member_execution_ids)
1611            }
1612            // Idempotent retry: flow was already cancelled/completed/failed.
1613            // Return Cancelled with the *stored* policy and member list so
1614            // observability tooling gets the real historical state rather
1615            // than echoing the caller's retry intent. One HMGET + SMEMBERS
1616            // on the idempotent path — both on {fp:N}, same slot.
1617            ParsedCancelFlow::AlreadyTerminal => {
1618                let flow_meta: Vec<Option<String>> = self
1619                    .client
1620                    .cmd("HMGET")
1621                    .arg(fctx.core())
1622                    .arg("cancellation_policy")
1623                    .arg("cancel_reason")
1624                    .execute()
1625                    .await
1626                    .map_err(|e| ServerError::ValkeyContext {
1627                        source: e,
1628                        context: "HMGET flow_core cancellation_policy,cancel_reason".into(),
1629                    })?;
1630                let stored_policy = flow_meta
1631                    .first()
1632                    .and_then(|v| v.as_ref())
1633                    .filter(|s| !s.is_empty())
1634                    .cloned();
1635                let stored_reason = flow_meta
1636                    .get(1)
1637                    .and_then(|v| v.as_ref())
1638                    .filter(|s| !s.is_empty())
1639                    .cloned();
1640                let all_members: Vec<String> = self
1641                    .client
1642                    .cmd("SMEMBERS")
1643                    .arg(fctx.members())
1644                    .execute()
1645                    .await
1646                    .map_err(|e| ServerError::ValkeyContext {
1647                        source: e,
1648                        context: "SMEMBERS flow members (already terminal)".into(),
1649                    })?;
1650                // Cap the returned list to avoid pathological bandwidth on
1651                // idempotent retries for flows with 10k+ members. Clients
1652                // already received the authoritative member list on the
1653                // first (non-idempotent) call; subsequent retries just need
1654                // enough to confirm the operation and trigger per-member
1655                // polling if desired.
1656                let total_members = all_members.len();
1657                let stored_members: Vec<String> = all_members
1658                    .into_iter()
1659                    .take(ALREADY_TERMINAL_MEMBER_CAP)
1660                    .collect();
1661                tracing::debug!(
1662                    flow_id = %args.flow_id,
1663                    stored_policy = stored_policy.as_deref().unwrap_or(""),
1664                    stored_reason = stored_reason.as_deref().unwrap_or(""),
1665                    total_members,
1666                    returned_members = stored_members.len(),
1667                    "cancel_flow: flow already terminal, returning idempotent Cancelled"
1668                );
1669                return Ok(CancelFlowResult::Cancelled {
1670                    // Fall back to caller's policy only if the stored field
1671                    // is missing (flows cancelled by older Lua that did not
1672                    // persist cancellation_policy).
1673                    cancellation_policy: stored_policy
1674                        .unwrap_or_else(|| args.cancellation_policy.clone()),
1675                    member_execution_ids: stored_members,
1676                });
1677            }
1678        };
1679        let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1680
1681        if !needs_dispatch {
1682            return Ok(CancelFlowResult::Cancelled {
1683                cancellation_policy: policy,
1684                member_execution_ids: members,
1685            });
1686        }
1687
1688        let pending_cancels_key = fctx.pending_cancels();
1689        let cancel_backlog_key = fidx.cancel_backlog();
1690
1691        if wait {
1692            // Synchronous dispatch — cancel every member inline before returning.
1693            // Collect per-member failures so the caller sees a
1694            // PartiallyCancelled outcome instead of a false-positive
1695            // Cancelled when any member cancel faulted (ghost member,
1696            // transport exhaustion, Lua reject). The cancel-backlog
1697            // reconciler still retries the unacked members; surfacing
1698            // the partial state lets operator tooling alert without
1699            // polling per-member state.
1700            let mut failed: Vec<String> = Vec::new();
1701            for eid_str in &members {
1702                match cancel_member_execution(
1703                    &self.client,
1704                    &self.config.partition_config,
1705                    eid_str,
1706                    &args.reason,
1707                    args.now,
1708                )
1709                .await
1710                {
1711                    Ok(()) => {
1712                        ack_cancel_member(
1713                            &self.client,
1714                            &pending_cancels_key,
1715                            &cancel_backlog_key,
1716                            eid_str,
1717                            &args.flow_id.to_string(),
1718                        )
1719                        .await;
1720                    }
1721                    Err(e) => {
1722                        // If the member was already terminal (execution_not_active /
1723                        // execution_not_found), treat this as ack-worthy success —
1724                        // the member is effectively "cancelled" from the flow's
1725                        // perspective and shouldn't be surfaced as a partial
1726                        // failure. Mirrors cancel_reconciler::cancel_member which
1727                        // acks on the same codes to avoid backlog poisoning.
1728                        if is_terminal_ack_error(&e) {
1729                            ack_cancel_member(
1730                                &self.client,
1731                                &pending_cancels_key,
1732                                &cancel_backlog_key,
1733                                eid_str,
1734                                &args.flow_id.to_string(),
1735                            )
1736                            .await;
1737                            continue;
1738                        }
1739                        tracing::warn!(
1740                            execution_id = %eid_str,
1741                            error = %e,
1742                            "cancel_flow(wait): individual execution cancel failed \
1743                             (transport/contract fault; reconciler will retry if transient)"
1744                        );
1745                        failed.push(eid_str.clone());
1746                    }
1747                }
1748            }
1749            if failed.is_empty() {
1750                return Ok(CancelFlowResult::Cancelled {
1751                    cancellation_policy: policy,
1752                    member_execution_ids: members,
1753                });
1754            }
1755            return Ok(CancelFlowResult::PartiallyCancelled {
1756                cancellation_policy: policy,
1757                member_execution_ids: members,
1758                failed_member_execution_ids: failed,
1759            });
1760        }
1761
1762        // Asynchronous dispatch — spawn into the shared JoinSet so Server::shutdown
1763        // can wait for pending cancellations (bounded by a shutdown timeout).
1764        let client = self.client.clone();
1765        let partition_config = self.config.partition_config;
1766        let reason = args.reason.clone();
1767        let now = args.now;
1768        let dispatch_members = members.clone();
1769        let flow_id = args.flow_id.clone();
1770        // Every async cancel_flow contends on this lock, but the critical
1771        // section is tiny: try_join_next drain + spawn. Drain is amortized
1772        // O(1) — each completed task is reaped exactly once across all
1773        // callers, and spawn is synchronous. At realistic cancel rates the
1774        // lock hold time is microseconds and does not bottleneck handlers.
1775        let mut guard = self.background_tasks.lock().await;
1776
1777        // Reap completed background dispatches before spawning the next.
1778        // Without this sweep, JoinSet accumulates Ok(()) results for every
1779        // async cancel ever issued — a memory leak in long-running servers
1780        // that would otherwise only drain on Server::shutdown. Surface any
1781        // panicked/aborted dispatches via tracing so silent failures in
1782        // cancel_member_execution are visible in logs.
1783        while let Some(joined) = guard.try_join_next() {
1784            if let Err(e) = joined {
1785                tracing::warn!(
1786                    error = %e,
1787                    "cancel_flow: background dispatch task panicked or was aborted"
1788                );
1789            }
1790        }
1791
1792        let pending_key_owned = pending_cancels_key.clone();
1793        let backlog_key_owned = cancel_backlog_key.clone();
1794        let flow_id_str = args.flow_id.to_string();
1795
1796        guard.spawn(async move {
1797            // Bounded parallel dispatch via futures::stream::buffer_unordered.
1798            // Sequential cancel of a 1000-member flow at ~2ms/FCALL is ~2s —
1799            // too long to finish inside a 15s shutdown abort window for
1800            // large flows. Bounding at CONCURRENCY keeps Valkey load
1801            // predictable while still cutting wall-clock dispatch time by
1802            // ~CONCURRENCY× for large member sets.
1803            use futures::stream::StreamExt;
1804            const CONCURRENCY: usize = 16;
1805
1806            let member_count = dispatch_members.len();
1807            let flow_id_for_log = flow_id.clone();
1808            futures::stream::iter(dispatch_members)
1809                .map(|eid_str| {
1810                    let client = client.clone();
1811                    let reason = reason.clone();
1812                    let flow_id = flow_id.clone();
1813                    let pending = pending_key_owned.clone();
1814                    let backlog = backlog_key_owned.clone();
1815                    let flow_id_str = flow_id_str.clone();
1816                    async move {
1817                        match cancel_member_execution(
1818                            &client,
1819                            &partition_config,
1820                            &eid_str,
1821                            &reason,
1822                            now,
1823                        )
1824                        .await
1825                        {
1826                            Ok(()) => {
1827                                ack_cancel_member(
1828                                    &client,
1829                                    &pending,
1830                                    &backlog,
1831                                    &eid_str,
1832                                    &flow_id_str,
1833                                )
1834                                .await;
1835                            }
1836                            Err(e) => {
1837                                if is_terminal_ack_error(&e) {
1838                                    ack_cancel_member(
1839                                        &client,
1840                                        &pending,
1841                                        &backlog,
1842                                        &eid_str,
1843                                        &flow_id_str,
1844                                    )
1845                                    .await;
1846                                } else {
1847                                    tracing::warn!(
1848                                        flow_id = %flow_id,
1849                                        execution_id = %eid_str,
1850                                        error = %e,
1851                                        "cancel_flow(async): individual execution cancel failed \
1852                                         (transport/contract fault; reconciler will retry if transient)"
1853                                    );
1854                                }
1855                            }
1856                        }
1857                    }
1858                })
1859                .buffer_unordered(CONCURRENCY)
1860                .for_each(|()| async {})
1861                .await;
1862
1863            tracing::debug!(
1864                flow_id = %flow_id_for_log,
1865                member_count,
1866                concurrency = CONCURRENCY,
1867                "cancel_flow: background member dispatch complete"
1868            );
1869        });
1870        drop(guard);
1871
1872        let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1873        Ok(CancelFlowResult::CancellationScheduled {
1874            cancellation_policy: policy,
1875            member_count,
1876            member_execution_ids: members,
1877        })
1878    }
1879
1880    /// Stage a dependency edge between two executions in a flow.
1881    ///
1882    /// Runs on the flow partition {fp:N}.
1883    /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1884    pub async fn stage_dependency_edge(
1885        &self,
1886        args: &StageDependencyEdgeArgs,
1887    ) -> Result<StageDependencyEdgeResult, ServerError> {
1888        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1889        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1890
1891        // KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set, grant_hash
1892        let fcall_keys: Vec<String> = vec![
1893            fctx.core(),
1894            fctx.members(),
1895            fctx.edge(&args.edge_id),
1896            fctx.outgoing(&args.upstream_execution_id),
1897            fctx.incoming(&args.downstream_execution_id),
1898            fctx.grant(&args.edge_id.to_string()),
1899        ];
1900
1901        // ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
1902        //           dependency_kind, data_passing_ref, expected_graph_revision, now_ms
1903        let fcall_args: Vec<String> = vec![
1904            args.flow_id.to_string(),
1905            args.edge_id.to_string(),
1906            args.upstream_execution_id.to_string(),
1907            args.downstream_execution_id.to_string(),
1908            args.dependency_kind.clone(),
1909            args.data_passing_ref.clone().unwrap_or_default(),
1910            args.expected_graph_revision.to_string(),
1911            args.now.to_string(),
1912        ];
1913
1914        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1915        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1916
1917        let raw: Value = self
1918            .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1919            .await?;
1920
1921        parse_stage_dependency_edge_result(&raw)
1922    }
1923
1924    /// Apply a staged dependency edge to the child execution.
1925    ///
1926    /// Runs on the child execution partition {p:N}.
1927    /// KEYS (7), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.
1928    pub async fn apply_dependency_to_child(
1929        &self,
1930        args: &ApplyDependencyToChildArgs,
1931    ) -> Result<ApplyDependencyToChildResult, ServerError> {
1932        let partition = execution_partition(
1933            &args.downstream_execution_id,
1934            &self.config.partition_config,
1935        );
1936        let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1937        let idx = IndexKeys::new(&partition);
1938
1939        // Pre-read lane_id for index keys
1940        let lane_str: Option<String> = self
1941            .client
1942            .hget(&ctx.core(), "lane_id")
1943            .await
1944            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1945        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1946
1947        // KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,
1948        //           eligible_zset, blocked_deps_zset, deps_all_edges
1949        let fcall_keys: Vec<String> = vec![
1950            ctx.core(),
1951            ctx.deps_meta(),
1952            ctx.deps_unresolved(),
1953            ctx.dep_edge(&args.edge_id),
1954            idx.lane_eligible(&lane),
1955            idx.lane_blocked_dependencies(&lane),
1956            ctx.deps_all_edges(),
1957        ];
1958
1959        // ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
1960        //           dependency_kind, data_passing_ref, now_ms
1961        let fcall_args: Vec<String> = vec![
1962            args.flow_id.to_string(),
1963            args.edge_id.to_string(),
1964            args.upstream_execution_id.to_string(),
1965            args.graph_revision.to_string(),
1966            args.dependency_kind.clone(),
1967            args.data_passing_ref.clone().unwrap_or_default(),
1968            args.now.to_string(),
1969        ];
1970
1971        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1972        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1973
1974        let raw: Value = self
1975            .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1976            .await?;
1977
1978        parse_apply_dependency_result(&raw)
1979    }
1980
1981    // ── Execution operations API ──
1982
1983    /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1984    ///
1985    /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1986    /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1987    pub async fn deliver_signal(
1988        &self,
1989        args: &DeliverSignalArgs,
1990    ) -> Result<DeliverSignalResult, ServerError> {
1991        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1992        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1993        let idx = IndexKeys::new(&partition);
1994
1995        // Pre-read lane_id for index keys
1996        let lane_str: Option<String> = self
1997            .client
1998            .hget(&ctx.core(), "lane_id")
1999            .await
2000            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
2001        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2002
2003        let wp_id = &args.waitpoint_id;
2004        let sig_id = &args.signal_id;
2005        let idem_key = args
2006            .idempotency_key
2007            .as_ref()
2008            .filter(|k| !k.is_empty())
2009            .map(|k| ctx.signal_dedup(wp_id, k))
2010            .unwrap_or_else(|| ctx.noop());
2011
2012        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
2013        //            exec_signals_zset, signal_hash, signal_payload,
2014        //            idem_key, waitpoint_hash, suspension_current,
2015        //            eligible_zset, suspended_zset, delayed_zset,
2016        //            suspension_timeout_zset, hmac_secrets
2017        let fcall_keys: Vec<String> = vec![
2018            ctx.core(),                       // 1
2019            ctx.waitpoint_condition(wp_id),    // 2
2020            ctx.waitpoint_signals(wp_id),      // 3
2021            ctx.exec_signals(),                // 4
2022            ctx.signal(sig_id),                // 5
2023            ctx.signal_payload(sig_id),        // 6
2024            idem_key,                          // 7
2025            ctx.waitpoint(wp_id),              // 8
2026            ctx.suspension_current(),          // 9
2027            idx.lane_eligible(&lane),          // 10
2028            idx.lane_suspended(&lane),         // 11
2029            idx.lane_delayed(&lane),           // 12
2030            idx.suspension_timeout(),          // 13
2031            idx.waitpoint_hmac_secrets(),      // 14
2032        ];
2033
2034        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
2035        //            signal_category, source_type, source_identity,
2036        //            payload, payload_encoding, idempotency_key,
2037        //            correlation_id, target_scope, created_at,
2038        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
2039        //            max_signals_per_execution, waitpoint_token
2040        let fcall_args: Vec<String> = vec![
2041            args.signal_id.to_string(),                          // 1
2042            args.execution_id.to_string(),                       // 2
2043            args.waitpoint_id.to_string(),                       // 3
2044            args.signal_name.clone(),                            // 4
2045            args.signal_category.clone(),                        // 5
2046            args.source_type.clone(),                            // 6
2047            args.source_identity.clone(),                        // 7
2048            args.payload.as_ref()
2049                .map(|p| String::from_utf8_lossy(p).into_owned())
2050                .unwrap_or_default(),                            // 8
2051            args.payload_encoding
2052                .clone()
2053                .unwrap_or_else(|| "json".to_owned()),           // 9
2054            args.idempotency_key
2055                .clone()
2056                .unwrap_or_default(),                            // 10
2057            args.correlation_id
2058                .clone()
2059                .unwrap_or_default(),                            // 11
2060            args.target_scope.clone(),                           // 12
2061            args.created_at
2062                .map(|ts| ts.to_string())
2063                .unwrap_or_else(|| args.now.to_string()),        // 13
2064            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), // 14
2065            args.resume_delay_ms.unwrap_or(0).to_string(),       // 15
2066            args.signal_maxlen.unwrap_or(1000).to_string(),      // 16
2067            args.max_signals_per_execution
2068                .unwrap_or(10_000)
2069                .to_string(),                                    // 17
2070            // WIRE BOUNDARY — raw token to Lua. Display is redacted
2071            // for log safety; use .as_str() at the wire crossing.
2072            args.waitpoint_token.as_str().to_owned(),            // 18
2073        ];
2074
2075        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2076        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2077
2078        let raw: Value = self
2079            .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2080            .await?;
2081
2082        parse_deliver_signal_result(&raw, &args.signal_id)
2083    }
2084
2085    /// Change an execution's priority.
2086    ///
2087    /// KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.
2088    pub async fn change_priority(
2089        &self,
2090        execution_id: &ExecutionId,
2091        new_priority: i32,
2092    ) -> Result<ChangePriorityResult, ServerError> {
2093        let partition = execution_partition(execution_id, &self.config.partition_config);
2094        let ctx = ExecKeyContext::new(&partition, execution_id);
2095        let idx = IndexKeys::new(&partition);
2096
2097        // Read lane_id for eligible_zset key
2098        let lane_str: Option<String> = self
2099            .client
2100            .hget(&ctx.core(), "lane_id")
2101            .await
2102            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
2103        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2104
2105        // KEYS (2): exec_core, eligible_zset
2106        let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2107
2108        // ARGV (2): execution_id, new_priority
2109        let fcall_args: Vec<String> = vec![
2110            execution_id.to_string(),
2111            new_priority.to_string(),
2112        ];
2113
2114        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2115        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2116
2117        let raw: Value = self
2118            .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2119            .await?;
2120
2121        parse_change_priority_result(&raw, execution_id)
2122    }
2123
2124    /// Scheduler-routed claim entry point (Batch C item 2 PR-B).
2125    ///
2126    /// Delegates to [`ff_scheduler::Scheduler::claim_for_worker`] which
2127    /// runs budget + quota + capability admission before issuing the
2128    /// grant. Returns `Ok(None)` when no eligible execution exists on
2129    /// the lane at this scan cycle. The worker's subsequent
2130    /// `claim_from_grant(lane, grant)` mints the lease.
2131    ///
2132    /// Keeping the claim-grant mint inside the server (rather than the
2133    /// worker) means capability CSV validation, budget/quota breach
2134    /// checks, and lane routing run in one place for every tenant
2135    /// worker — the same invariants as the `direct-valkey-claim` path
2136    /// enforces inline, but gated at a single server choke point.
2137    pub async fn claim_for_worker(
2138        &self,
2139        lane: &LaneId,
2140        worker_id: &WorkerId,
2141        worker_instance_id: &WorkerInstanceId,
2142        worker_capabilities: &std::collections::BTreeSet<String>,
2143        grant_ttl_ms: u64,
2144    ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2145        self.scheduler
2146            .claim_for_worker(
2147                lane,
2148                worker_id,
2149                worker_instance_id,
2150                worker_capabilities,
2151                grant_ttl_ms,
2152            )
2153            .await
2154            .map_err(|e| match e {
2155                ff_scheduler::SchedulerError::Valkey(inner) => {
2156                    ServerError::Valkey(inner)
2157                }
2158                ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2159                    ServerError::ValkeyContext { source, context }
2160                }
2161                ff_scheduler::SchedulerError::Config(msg) => {
2162                    ServerError::InvalidInput(msg)
2163                }
2164            })
2165    }
2166
2167    /// Revoke an active lease (operator-initiated).
2168    pub async fn revoke_lease(
2169        &self,
2170        execution_id: &ExecutionId,
2171    ) -> Result<RevokeLeaseResult, ServerError> {
2172        let partition = execution_partition(execution_id, &self.config.partition_config);
2173        let ctx = ExecKeyContext::new(&partition, execution_id);
2174        let idx = IndexKeys::new(&partition);
2175
2176        // Pre-read worker_instance_id for worker_leases key
2177        let wiid_str: Option<String> = self
2178            .client
2179            .hget(&ctx.core(), "current_worker_instance_id")
2180            .await
2181            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET worker_instance_id".into() })?;
2182        let wiid = match wiid_str {
2183            Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2184            _ => {
2185                return Err(ServerError::NotFound(format!(
2186                    "no active lease for execution {execution_id} (no current_worker_instance_id)"
2187                )));
2188            }
2189        };
2190
2191        // KEYS (5): exec_core, lease_current, lease_history, lease_expiry_zset, worker_leases
2192        let fcall_keys: Vec<String> = vec![
2193            ctx.core(),
2194            ctx.lease_current(),
2195            ctx.lease_history(),
2196            idx.lease_expiry(),
2197            idx.worker_leases(&wiid),
2198        ];
2199
2200        // ARGV (3): execution_id, expected_lease_id (empty = skip check), revoke_reason
2201        let fcall_args: Vec<String> = vec![
2202            execution_id.to_string(),
2203            String::new(), // no expected_lease_id check
2204            "operator_revoke".to_owned(),
2205        ];
2206
2207        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2208        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2209
2210        let raw: Value = self
2211            .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2212            .await?;
2213
2214        parse_revoke_lease_result(&raw)
2215    }
2216
2217    /// Get full execution info via HGETALL on exec_core.
2218    pub async fn get_execution(
2219        &self,
2220        execution_id: &ExecutionId,
2221    ) -> Result<ExecutionInfo, ServerError> {
2222        let partition = execution_partition(execution_id, &self.config.partition_config);
2223        let ctx = ExecKeyContext::new(&partition, execution_id);
2224
2225        let fields: HashMap<String, String> = self
2226            .client
2227            .hgetall(&ctx.core())
2228            .await
2229            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL exec_core".into() })?;
2230
2231        if fields.is_empty() {
2232            return Err(ServerError::NotFound(format!(
2233                "execution not found: {execution_id}"
2234            )));
2235        }
2236
2237        let parse_enum = |field: &str| -> String {
2238            fields.get(field).cloned().unwrap_or_default()
2239        };
2240        fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2241            let quoted = format!("\"{raw}\"");
2242            serde_json::from_str(&quoted).map_err(|e| {
2243                ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2244            })
2245        }
2246
2247        let lp_str = parse_enum("lifecycle_phase");
2248        let os_str = parse_enum("ownership_state");
2249        let es_str = parse_enum("eligibility_state");
2250        let br_str = parse_enum("blocking_reason");
2251        let to_str = parse_enum("terminal_outcome");
2252        let as_str = parse_enum("attempt_state");
2253        let ps_str = parse_enum("public_state");
2254
2255        let state_vector = StateVector {
2256            lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2257            ownership_state: deserialize("ownership_state", &os_str)?,
2258            eligibility_state: deserialize("eligibility_state", &es_str)?,
2259            blocking_reason: deserialize("blocking_reason", &br_str)?,
2260            terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2261            attempt_state: deserialize("attempt_state", &as_str)?,
2262            public_state: deserialize("public_state", &ps_str)?,
2263        };
2264
2265        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2266        // stamped atomically with membership in `add_execution_to_flow`'s
2267        // single FCALL. Empty iff the exec has no flow affinity (never
2268        // called `add_execution_to_flow` — solo execs) — NOT "orphaned
2269        // mid-two-phase" (that failure mode is retired). Filter on empty
2270        // to surface "no flow affinity" distinctly from "flow X".
2271        let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2272
2273        // started_at / completed_at come from the exec_core hash —
2274        // lua/execution.lua writes them alongside the rest of the state
2275        // vector. `created_at` is required; the other two are optional
2276        // (pre-claim / pre-terminal reads) and elided when empty so the
2277        // wire payload for in-flight executions stays the shape existing
2278        // callers expect.
2279        let started_at_opt = fields
2280            .get("started_at")
2281            .filter(|s| !s.is_empty())
2282            .cloned();
2283        let completed_at_opt = fields
2284            .get("completed_at")
2285            .filter(|s| !s.is_empty())
2286            .cloned();
2287
2288        Ok(ExecutionInfo {
2289            execution_id: execution_id.clone(),
2290            namespace: parse_enum("namespace"),
2291            lane_id: parse_enum("lane_id"),
2292            priority: fields
2293                .get("priority")
2294                .and_then(|v| v.parse().ok())
2295                .unwrap_or(0),
2296            execution_kind: parse_enum("execution_kind"),
2297            state_vector,
2298            public_state: deserialize("public_state", &ps_str)?,
2299            created_at: parse_enum("created_at"),
2300            started_at: started_at_opt,
2301            completed_at: completed_at_opt,
2302            current_attempt_index: fields
2303                .get("current_attempt_index")
2304                .and_then(|v| v.parse().ok())
2305                .unwrap_or(0),
2306            flow_id: flow_id_val,
2307            blocking_detail: parse_enum("blocking_detail"),
2308        })
2309    }
2310
2311    /// List executions from a partition's index ZSET.
2312    ///
2313    /// No FCALL — direct ZRANGE + pipelined HMGET reads.
2314    pub async fn list_executions(
2315        &self,
2316        partition_id: u16,
2317        lane: &LaneId,
2318        state_filter: &str,
2319        offset: u64,
2320        limit: u64,
2321    ) -> Result<ListExecutionsResult, ServerError> {
2322        let partition = ff_core::partition::Partition {
2323            family: ff_core::partition::PartitionFamily::Execution,
2324            index: partition_id,
2325        };
2326        let idx = IndexKeys::new(&partition);
2327
2328        let zset_key = match state_filter {
2329            "eligible" => idx.lane_eligible(lane),
2330            "delayed" => idx.lane_delayed(lane),
2331            "terminal" => idx.lane_terminal(lane),
2332            "suspended" => idx.lane_suspended(lane),
2333            "active" => idx.lane_active(lane),
2334            other => {
2335                return Err(ServerError::InvalidInput(format!(
2336                    "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2337                )));
2338            }
2339        };
2340
2341        // ZRANGE key -inf +inf BYSCORE LIMIT offset count
2342        let eids: Vec<String> = self
2343            .client
2344            .cmd("ZRANGE")
2345            .arg(&zset_key)
2346            .arg("-inf")
2347            .arg("+inf")
2348            .arg("BYSCORE")
2349            .arg("LIMIT")
2350            .arg(offset)
2351            .arg(limit)
2352            .execute()
2353            .await
2354            .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("ZRANGE {zset_key}") })?;
2355
2356        if eids.is_empty() {
2357            return Ok(ListExecutionsResult {
2358                executions: vec![],
2359                total_returned: 0,
2360            });
2361        }
2362
2363        // Parse execution IDs, warning on corrupt ZSET members
2364        let mut parsed = Vec::with_capacity(eids.len());
2365        for eid_str in &eids {
2366            match ExecutionId::parse(eid_str) {
2367                Ok(id) => parsed.push(id),
2368                Err(e) => {
2369                    tracing::warn!(
2370                        raw_id = %eid_str,
2371                        error = %e,
2372                        zset = %zset_key,
2373                        "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2374                    );
2375                }
2376            }
2377        }
2378
2379        if parsed.is_empty() {
2380            return Ok(ListExecutionsResult {
2381                executions: vec![],
2382                total_returned: 0,
2383            });
2384        }
2385
2386        // Pipeline all HMGETs into a single round-trip
2387        let mut pipe = self.client.pipeline();
2388        let mut slots = Vec::with_capacity(parsed.len());
2389        for eid in &parsed {
2390            let ep = execution_partition(eid, &self.config.partition_config);
2391            let ctx = ExecKeyContext::new(&ep, eid);
2392            let slot = pipe
2393                .cmd::<Vec<Option<String>>>("HMGET")
2394                .arg(ctx.core())
2395                .arg("namespace")
2396                .arg("lane_id")
2397                .arg("execution_kind")
2398                .arg("public_state")
2399                .arg("priority")
2400                .arg("created_at")
2401                .finish();
2402            slots.push(slot);
2403        }
2404
2405        pipe.execute()
2406            .await
2407            .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline HMGET".into() })?;
2408
2409        let mut summaries = Vec::with_capacity(parsed.len());
2410        for (eid, slot) in parsed.into_iter().zip(slots) {
2411            let fields: Vec<Option<String>> = slot.value()
2412                .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline slot".into() })?;
2413
2414            let field = |i: usize| -> String {
2415                fields
2416                    .get(i)
2417                    .and_then(|v| v.as_ref())
2418                    .cloned()
2419                    .unwrap_or_default()
2420            };
2421
2422            summaries.push(ExecutionSummary {
2423                execution_id: eid,
2424                namespace: field(0),
2425                lane_id: field(1),
2426                execution_kind: field(2),
2427                public_state: field(3),
2428                priority: field(4).parse().unwrap_or(0),
2429                created_at: field(5),
2430            });
2431        }
2432
2433        let total = summaries.len();
2434        Ok(ListExecutionsResult {
2435            executions: summaries,
2436            total_returned: total,
2437        })
2438    }
2439
2440    /// Replay a terminal execution.
2441    ///
2442    /// Pre-reads exec_core for flow_id and dep edges (variable KEYS).
2443    /// KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.
2444    pub async fn replay_execution(
2445        &self,
2446        execution_id: &ExecutionId,
2447    ) -> Result<ReplayExecutionResult, ServerError> {
2448        let partition = execution_partition(execution_id, &self.config.partition_config);
2449        let ctx = ExecKeyContext::new(&partition, execution_id);
2450        let idx = IndexKeys::new(&partition);
2451
2452        // Pre-read lane_id, flow_id, terminal_outcome.
2453        //
2454        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2455        // stamped atomically with membership by `add_execution_to_flow`'s
2456        // single FCALL. Empty iff the exec has no flow affinity
2457        // (solo-path create_execution — never added to a flow). The
2458        // `is_skipped_flow_member` branch below gates on
2459        // `!flow_id_str.is_empty()`, so solo execs correctly fall back
2460        // to the non-flow-member replay path. See
2461        // `add_execution_to_flow` rustdoc for the atomic-commit
2462        // invariant.
2463        let dyn_fields: Vec<Option<String>> = self
2464            .client
2465            .cmd("HMGET")
2466            .arg(ctx.core())
2467            .arg("lane_id")
2468            .arg("flow_id")
2469            .arg("terminal_outcome")
2470            .execute()
2471            .await
2472            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET replay pre-read".into() })?;
2473        let lane = LaneId::new(
2474            dyn_fields
2475                .first()
2476                .and_then(|v| v.as_ref())
2477                .cloned()
2478                .unwrap_or_else(|| "default".to_owned()),
2479        );
2480        let flow_id_str = dyn_fields
2481            .get(1)
2482            .and_then(|v| v.as_ref())
2483            .cloned()
2484            .unwrap_or_default();
2485        let terminal_outcome = dyn_fields
2486            .get(2)
2487            .and_then(|v| v.as_ref())
2488            .cloned()
2489            .unwrap_or_default();
2490
2491        let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2492
2493        // Base KEYS (4): exec_core, terminal_zset, eligible_zset, lease_history
2494        let mut fcall_keys: Vec<String> = vec![
2495            ctx.core(),
2496            idx.lane_terminal(&lane),
2497            idx.lane_eligible(&lane),
2498            ctx.lease_history(),
2499        ];
2500
2501        // Base ARGV (2): execution_id, now_ms
2502        let now = TimestampMs::now();
2503        let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2504
2505        if is_skipped_flow_member {
2506            // Read ALL inbound edge IDs from the flow partition's adjacency set.
2507            // Cannot use deps:unresolved because impossible edges were SREM'd
2508            // by ff_resolve_dependency. The flow's in:<eid> set has all edges.
2509            let flow_id = FlowId::parse(&flow_id_str)
2510                .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2511            let flow_part =
2512                flow_partition(&flow_id, &self.config.partition_config);
2513            let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2514            let edge_ids: Vec<String> = self
2515                .client
2516                .cmd("SMEMBERS")
2517                .arg(flow_ctx.incoming(execution_id))
2518                .execute()
2519                .await
2520                .map_err(|e| ServerError::ValkeyContext { source: e, context: "SMEMBERS replay edges".into() })?;
2521
2522            // Extended KEYS: blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N
2523            fcall_keys.push(idx.lane_blocked_dependencies(&lane)); // 5
2524            fcall_keys.push(ctx.deps_meta()); // 6
2525            fcall_keys.push(ctx.deps_unresolved()); // 7
2526            for eid_str in &edge_ids {
2527                let edge_id = EdgeId::parse(eid_str)
2528                    .unwrap_or_else(|_| EdgeId::new());
2529                fcall_keys.push(ctx.dep_edge(&edge_id)); // 8..8+N
2530                fcall_args.push(eid_str.clone()); // 3..3+N
2531            }
2532        }
2533
2534        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2535        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2536
2537        let raw: Value = self
2538            .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2539            .await?;
2540
2541        parse_replay_result(&raw)
2542    }
2543
2544    /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
2545    /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
2546    /// when the producer finalizes.
2547    ///
2548    /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
2549    /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
2550    /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
2551    /// and the Lua-side reject).
2552    ///
2553    /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
2554    /// execution id, so all KEYS share the same slot.
2555    pub async fn read_attempt_stream(
2556        &self,
2557        execution_id: &ExecutionId,
2558        attempt_index: AttemptIndex,
2559        from_id: &str,
2560        to_id: &str,
2561        count_limit: u64,
2562    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2563        use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2564
2565        if count_limit == 0 {
2566            return Err(ServerError::InvalidInput(
2567                "count_limit must be >= 1".to_owned(),
2568            ));
2569        }
2570
2571        // Share the same semaphore as tail. A large XRANGE reply (10_000
2572        // frames × ~64KB) is just as capable of head-of-line-blocking the
2573        // tail_client mux as a long BLOCK — fairness accounting must be
2574        // unified. Non-blocking acquire → 429 on contention.
2575        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2576            Ok(p) => p,
2577            Err(tokio::sync::TryAcquireError::NoPermits) => {
2578                return Err(ServerError::ConcurrencyLimitExceeded(
2579                    "stream_ops",
2580                    self.config.max_concurrent_stream_ops,
2581                ));
2582            }
2583            Err(tokio::sync::TryAcquireError::Closed) => {
2584                return Err(ServerError::OperationFailed(
2585                    "stream semaphore closed (server shutting down)".into(),
2586                ));
2587            }
2588        };
2589
2590        let args = ReadFramesArgs {
2591            execution_id: execution_id.clone(),
2592            attempt_index,
2593            from_id: from_id.to_owned(),
2594            to_id: to_id.to_owned(),
2595            count_limit,
2596        };
2597
2598        let partition = execution_partition(execution_id, &self.config.partition_config);
2599        let ctx = ExecKeyContext::new(&partition, execution_id);
2600        let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2601
2602        // Route on the dedicated stream client, same as tail. A 10_000-
2603        // frame XRANGE reply on the main mux would stall every other
2604        // FCALL behind reply serialization.
2605        let result = ff_script::functions::stream::ff_read_attempt_stream(
2606            &self.tail_client, &keys, &args,
2607        )
2608        .await
2609        .map_err(script_error_to_server);
2610
2611        drop(permit);
2612
2613        match result? {
2614            ReadFramesResult::Frames(f) => Ok(f),
2615        }
2616    }
2617
2618    /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
2619    /// plus the terminal signal so a polling consumer can exit when the
2620    /// producer closes the stream.
2621    ///
2622    /// `last_id` is exclusive — XREAD returns entries with id > last_id.
2623    /// Pass `"0-0"` to read from the beginning.
2624    ///
2625    /// `block_ms == 0` → non-blocking peek (returns immediately).
2626    /// `block_ms > 0`  → blocks up to that many ms. Empty `frames` +
2627    /// `closed_at=None` → timeout, no new data, still open.
2628    ///
2629    /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
2630    ///
2631    /// Implemented as a direct XREAD command (not FCALL) because blocking
2632    /// commands are rejected inside Valkey Functions. The terminal
2633    /// markers come from a companion HMGET on `stream_meta` — see
2634    /// `ff_script::stream_tail` module docs.
2635    pub async fn tail_attempt_stream(
2636        &self,
2637        execution_id: &ExecutionId,
2638        attempt_index: AttemptIndex,
2639        last_id: &str,
2640        block_ms: u64,
2641        count_limit: u64,
2642    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2643        if count_limit == 0 {
2644            return Err(ServerError::InvalidInput(
2645                "count_limit must be >= 1".to_owned(),
2646            ));
2647        }
2648
2649        // Non-blocking permit acquisition on the shared stream_semaphore
2650        // (read + tail split the same pool). If the server is already at
2651        // the `max_concurrent_stream_ops` ceiling, return `TailUnavailable`
2652        // (→ 429) rather than queueing — a queued tail holds the caller's
2653        // HTTP request open with no upper bound, which is exactly the
2654        // resource-exhaustion pattern this limit exists to prevent.
2655        // Clients retry with backoff on 429.
2656        //
2657        // Worst-case permit hold: if the producer closes the stream via
2658        // HSET on stream_meta (not an XADD), the XREAD BLOCK won't wake
2659        // until `block_ms` elapses — so a permit can be held for up to
2660        // the caller's block_ms even though the terminal signal is
2661        // ready. This is a v1-accepted limitation; RFC-006 §Terminal-
2662        // signal timing under active tail documents it and sketches the
2663        // v2 sentinel-XADD upgrade path.
2664        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2665            Ok(p) => p,
2666            Err(tokio::sync::TryAcquireError::NoPermits) => {
2667                return Err(ServerError::ConcurrencyLimitExceeded(
2668                    "stream_ops",
2669                    self.config.max_concurrent_stream_ops,
2670                ));
2671            }
2672            Err(tokio::sync::TryAcquireError::Closed) => {
2673                return Err(ServerError::OperationFailed(
2674                    "stream semaphore closed (server shutting down)".into(),
2675                ));
2676            }
2677        };
2678
2679        let partition = execution_partition(execution_id, &self.config.partition_config);
2680        let ctx = ExecKeyContext::new(&partition, execution_id);
2681        let stream_key = ctx.stream(attempt_index);
2682        let stream_meta_key = ctx.stream_meta(attempt_index);
2683
2684        // Acquire the XREAD BLOCK serializer AFTER the stream semaphore.
2685        // Nesting order matters: the semaphore is the user-visible
2686        // ceiling (surfaces as 429), the Mutex is an internal fairness
2687        // gate. Holding the permit while waiting on the Mutex means the
2688        // ceiling still bounds queue depth. See the field docstring for
2689        // the full rationale (ferriskey pipeline FIFO + client-side
2690        // per-call timeout race).
2691        let _xread_guard = self.xread_block_lock.lock().await;
2692
2693        let result = ff_script::stream_tail::xread_block(
2694            &self.tail_client,
2695            &stream_key,
2696            &stream_meta_key,
2697            last_id,
2698            block_ms,
2699            count_limit,
2700        )
2701        .await
2702        .map_err(script_error_to_server);
2703
2704        drop(_xread_guard);
2705        drop(permit);
2706        result
2707    }
2708
2709    /// Graceful shutdown — stops scanners, drains background handler tasks
2710    /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
2711    /// for scanners to finish.
2712    ///
2713    /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
2714    /// cleanly without new arrivals piling up:
2715    ///
2716    /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
2717    ///    with `ServerError::OperationFailed("stream semaphore closed …")`
2718    ///    which the REST layer surfaces as a 500 with `retryable=false`
2719    ///    (ops tooling may choose to wait + retry on 503-class responses;
2720    ///    the body clearly names the shutdown reason).
2721    /// 2. Drain handler-spawned background tasks with a 15s ceiling.
2722    /// 3. `engine.shutdown()` stops scanners.
2723    ///
2724    /// Existing in-flight tails finish on their natural `block_ms`
2725    /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
2726    /// is dropped after this function returns. We do NOT wait for tails
2727    /// to drain explicitly — the semaphore-close + natural-timeout
2728    /// combination bounds shutdown to roughly `block_ms + 15s` in the
2729    /// worst case. Callers observing a dropped connection retry against
2730    /// whatever replacement is coming up.
2731    pub async fn shutdown(self) {
2732        tracing::info!("shutting down FlowFabric server");
2733
2734        // Step 1: Close the stream semaphore FIRST so any in-flight
2735        // read/tail calls that are between `try_acquire` and their
2736        // Valkey command still hold a valid permit, but no NEW stream
2737        // op can start. `Semaphore::close()` is idempotent.
2738        self.stream_semaphore.close();
2739        tracing::info!(
2740            "stream semaphore closed; no new read/tail attempts will be accepted"
2741        );
2742
2743        // Step 2: Drain handler-spawned background tasks with the same
2744        // ceiling as Engine::shutdown. If dispatch is still running at
2745        // the deadline, drop the JoinSet to abort remaining tasks.
2746        let drain_timeout = Duration::from_secs(15);
2747        let background = self.background_tasks.clone();
2748        let drain = async move {
2749            let mut guard = background.lock().await;
2750            while guard.join_next().await.is_some() {}
2751        };
2752        match tokio::time::timeout(drain_timeout, drain).await {
2753            Ok(()) => {}
2754            Err(_) => {
2755                tracing::warn!(
2756                    timeout_s = drain_timeout.as_secs(),
2757                    "shutdown: background tasks did not finish in time, aborting"
2758                );
2759                self.background_tasks.lock().await.abort_all();
2760            }
2761        }
2762
2763        self.engine.shutdown().await;
2764        tracing::info!("FlowFabric server shutdown complete");
2765    }
2766}
2767
2768// ── Valkey version check (RFC-011 §13) ──
2769
2770/// Minimum Valkey version the engine requires (see RFC-011 §13). 7.2 is the
2771/// release where Valkey Functions and RESP3 stabilized — the primitives the
2772/// co-location design and typed FCALL wrappers actually depend on.
2773const REQUIRED_VALKEY_MAJOR: u32 = 7;
2774const REQUIRED_VALKEY_MINOR: u32 = 2;
2775
2776/// Upper bound on the rolling-upgrade retry window (RFC-011 §9.17). A Valkey
2777/// node cycling through SIGTERM → restart typically completes in well under
2778/// 60s; this budget is generous without letting a truly-stuck cluster hang
2779/// boot indefinitely.
2780const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2781
2782/// Verify the connected Valkey reports a version ≥ 7.2.
2783///
2784/// Per RFC-011 §9.17, during a rolling upgrade the node we happen to connect
2785/// to may temporarily be pre-upgrade while others are post-upgrade. The check
2786/// tolerates this by retrying the whole verification (including low-version
2787/// responses) with exponential backoff, capped at a 60s budget.
2788///
2789/// **Retries on:**
2790/// - Low-version responses (below `(REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR)`)
2791///   — may resolve as the rolling upgrade progresses onto the connected node.
2792/// - Retryable ferriskey transport errors — connection refused,
2793///   `BusyLoadingError`, `ClusterDown`, etc., classified via
2794///   `ff_script::retry::is_retryable_kind`.
2795/// - Missing/unparsable version field — treated as transient (fresh-boot
2796///   server may not have the INFO fields populated yet). Reads
2797///   `valkey_version` when present (authoritative on Valkey 8.0+), falls
2798///   back to `redis_version` for Valkey 7.x.
2799///
2800/// **Does NOT retry on:**
2801/// - Non-retryable transport errors (auth failures, permission denied,
2802///   invalid client config) — these are operator misconfiguration, not
2803///   transient cluster state; fast-fail preserves a clear signal.
2804///
2805/// On budget exhaustion, returns the last observed error.
2806async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2807    let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2808    let mut backoff = Duration::from_millis(200);
2809    loop {
2810        let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2811            match query_valkey_version(client).await {
2812                Ok((detected_major, detected_minor))
2813                    if (detected_major, detected_minor)
2814                        >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2815                {
2816                    tracing::info!(
2817                        detected_major,
2818                        detected_minor,
2819                        required_major = REQUIRED_VALKEY_MAJOR,
2820                        required_minor = REQUIRED_VALKEY_MINOR,
2821                        "Valkey version accepted"
2822                    );
2823                    return Ok(());
2824                }
2825                Ok((detected_major, detected_minor)) => (
2826                    // Low version — may be a rolling-upgrade stale node.
2827                    // Retry within budget; after exhaustion, the cluster
2828                    // is misconfigured and fast-fail is the correct signal.
2829                    true,
2830                    ServerError::ValkeyVersionTooLow {
2831                        detected: format!("{detected_major}.{detected_minor}"),
2832                        required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2833                    },
2834                    format!(
2835                        "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2836                    ),
2837                ),
2838                Err(e) => {
2839                    // Only retry if the underlying Valkey error is retryable
2840                    // by kind. Auth / permission / invalid-config should fast-
2841                    // fail so operators see the true root cause immediately,
2842                    // not a 60s hang followed by a generic "transient" error.
2843                    let retryable = e
2844                        .valkey_kind()
2845                        .map(ff_script::retry::is_retryable_kind)
2846                        // Non-Valkey errors (parse, missing field, operation
2847                        // failures) are treated as transient — a fresh-boot
2848                        // Valkey may not have redis_version populated yet.
2849                        .unwrap_or(true);
2850                    let detail = e.to_string();
2851                    (retryable, e, detail)
2852                }
2853            };
2854
2855        if !should_retry {
2856            return Err(err_for_budget_exhaust);
2857        }
2858        if tokio::time::Instant::now() >= deadline {
2859            return Err(err_for_budget_exhaust);
2860        }
2861        tracing::warn!(
2862            backoff_ms = backoff.as_millis() as u64,
2863            detail = %log_detail,
2864            "valkey version check transient failure; retrying"
2865        );
2866        tokio::time::sleep(backoff).await;
2867        backoff = (backoff * 2).min(Duration::from_secs(5));
2868    }
2869}
2870
2871/// Run `INFO server` and extract the `(major, minor)` components of the
2872/// Valkey version.
2873///
2874/// Returns `Err` on transport errors, missing field, or unparsable version.
2875/// Handles three response shapes:
2876///
2877/// - **Standalone:** single string body; parse directly.
2878/// - **Cluster (RESP3 map):** `INFO` returns a map keyed by node address;
2879///   every node runs the same version in a healthy deployment, so we pick
2880///   one entry and parse it. Divergent versions during a rolling upgrade
2881///   are handled by the outer retry loop.
2882/// - **Empty / unexpected:** surfaces as `OperationFailed` with context.
2883async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2884    let raw: Value = client
2885        .cmd("INFO")
2886        .arg("server")
2887        .execute()
2888        .await
2889        .map_err(|e| ServerError::ValkeyContext {
2890            source: e,
2891            context: "INFO server".into(),
2892        })?;
2893    let bodies = extract_info_bodies(&raw)?;
2894    // Cluster: return the minimum (major, minor) across all nodes so a stale
2895    // pre-upgrade replica cannot hide behind an already-upgraded primary.
2896    // Standalone: exactly one body. The outer retry loop tolerates rolling
2897    // upgrades — a briefly-low minimum gets retried; a persistently-low one
2898    // exits with the structured floor error.
2899    let mut min_version: Option<(u32, u32)> = None;
2900    for body in &bodies {
2901        let version = parse_valkey_version(body)?;
2902        min_version = Some(match min_version {
2903            None => version,
2904            Some(existing) => existing.min(version),
2905        });
2906    }
2907    min_version.ok_or_else(|| {
2908        ServerError::OperationFailed(
2909            "valkey version check: cluster INFO returned no node bodies".into(),
2910        )
2911    })
2912}
2913
2914/// Normalize an `INFO server` response to one string body per node.
2915///
2916/// Standalone returns a single body. Cluster (RESP3 map keyed by node address)
2917/// returns every node's body — the caller must consider all of them to reject
2918/// a mixed-version cluster where one stale node is below the floor.
2919fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2920    match raw {
2921        Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2922        Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2923        Value::SimpleString(s) => Ok(vec![s.clone()]),
2924        Value::Map(entries) => {
2925            if entries.is_empty() {
2926                return Err(ServerError::OperationFailed(
2927                    "valkey version check: cluster INFO returned empty map".into(),
2928                ));
2929            }
2930            let mut out = Vec::with_capacity(entries.len());
2931            for (_, body) in entries {
2932                out.extend(extract_info_bodies(body)?);
2933            }
2934            Ok(out)
2935        }
2936        other => Err(ServerError::OperationFailed(format!(
2937            "valkey version check: unexpected INFO shape: {other:?}"
2938        ))),
2939    }
2940}
2941
2942/// Extract the `(major, minor)` components of the Valkey version from an
2943/// `INFO server` response body. Pure parser — pulled out of
2944/// [`query_valkey_version`] so it is unit-testable without a live Valkey.
2945///
2946/// **Prefers `valkey_version:`** (introduced in Valkey 8.0+; this is the real
2947/// server version on 8.x/9.x, which pin `redis_version:7.2.4` for
2948/// Redis-client compatibility).
2949///
2950/// **Falls back to `redis_version:` only when the body carries an affirmative
2951/// `server_name:valkey` field.** `server_name:` was introduced in Valkey 7.2,
2952/// which is our floor — so every floor-compliant Valkey deployment carries
2953/// the marker. Redis does not emit `server_name:valkey`, which is how we
2954/// reject a Redis backend that would otherwise look identical to Valkey 7.x
2955/// at the `redis_version:` level.
2956fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2957    let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2958        let trimmed = line.trim();
2959        let mut parts = trimmed.split('.');
2960        let major_str = parts.next().unwrap_or("").trim();
2961        if major_str.is_empty() {
2962            return Err(ServerError::OperationFailed(format!(
2963                "valkey version check: empty version field in '{trimmed}'"
2964            )));
2965        }
2966        let major = major_str.parse::<u32>().map_err(|_| {
2967            ServerError::OperationFailed(format!(
2968                "valkey version check: non-numeric major in '{trimmed}'"
2969            ))
2970        })?;
2971        // Minor is required — a bare major ("7") cannot be compared against
2972        // the (major, minor) floor reliably. Valkey always reports
2973        // major.minor.patch for INFO, so missing minor is a real parse error.
2974        let minor_str = parts.next().unwrap_or("").trim();
2975        if minor_str.is_empty() {
2976            return Err(ServerError::OperationFailed(format!(
2977                "valkey version check: missing minor component in '{trimmed}'"
2978            )));
2979        }
2980        let minor = minor_str.parse::<u32>().map_err(|_| {
2981            ServerError::OperationFailed(format!(
2982                "valkey version check: non-numeric minor in '{trimmed}'"
2983            ))
2984        })?;
2985        Ok((major, minor))
2986    };
2987    // Prefer valkey_version (authoritative on Valkey 8.0+).
2988    if let Some(valkey_line) = info
2989        .lines()
2990        .find_map(|line| line.strip_prefix("valkey_version:"))
2991    {
2992        return extract_major_minor(valkey_line);
2993    }
2994    // No valkey_version — could be Valkey 7.2 (which doesn't emit the field)
2995    // or could be Redis. Require an affirmative server_name:valkey marker
2996    // before falling back to redis_version. This rejects Redis backends,
2997    // which don't emit server_name:valkey.
2998    let server_is_valkey = info
2999        .lines()
3000        .map(str::trim)
3001        .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
3002    if !server_is_valkey {
3003        return Err(ServerError::OperationFailed(
3004            "valkey version check: INFO missing valkey_version and server_name:valkey marker \
3005             (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
3006                .into(),
3007        ));
3008    }
3009    // Valkey 7.x fallback. 8.x+ pins redis_version:7.2.4 for Redis-client
3010    // compat, so reading it there would under-report — but 8.x+ is handled
3011    // by the valkey_version branch above, so we only reach here on 7.x.
3012    if let Some(redis_line) = info
3013        .lines()
3014        .find_map(|line| line.strip_prefix("redis_version:"))
3015    {
3016        return extract_major_minor(redis_line);
3017    }
3018    Err(ServerError::OperationFailed(
3019        "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
3020            .into(),
3021    ))
3022}
3023
3024// ── Partition config validation ──
3025
3026/// Validate or create the `ff:config:partitions` key on first boot.
3027///
3028/// If the key exists, its values must match the server's config.
3029/// If it doesn't exist, create it (first boot).
3030async fn validate_or_create_partition_config(
3031    client: &Client,
3032    config: &PartitionConfig,
3033) -> Result<(), ServerError> {
3034    let key = keys::global_config_partitions();
3035
3036    let existing: HashMap<String, String> = client
3037        .hgetall(&key)
3038        .await
3039        .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
3040
3041    if existing.is_empty() {
3042        // First boot — create the config
3043        tracing::info!("first boot: creating {key}");
3044        client
3045            .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
3046            .await
3047            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_flow_partitions".into() })?;
3048        client
3049            .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
3050            .await
3051            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_budget_partitions".into() })?;
3052        client
3053            .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
3054            .await
3055            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_quota_partitions".into() })?;
3056        return Ok(());
3057    }
3058
3059    // Validate existing config matches
3060    let check = |field: &str, expected: u16| -> Result<(), ServerError> {
3061        let stored: u16 = existing
3062            .get(field)
3063            .and_then(|v| v.parse().ok())
3064            .unwrap_or(0);
3065        if stored != expected {
3066            return Err(ServerError::PartitionMismatch(format!(
3067                "{field}: stored={stored}, config={expected}. \
3068                 Partition counts are fixed at deployment time. \
3069                 Either fix your config or migrate the data."
3070            )));
3071        }
3072        Ok(())
3073    };
3074
3075    check("num_flow_partitions", config.num_flow_partitions)?;
3076    check("num_budget_partitions", config.num_budget_partitions)?;
3077    check("num_quota_partitions", config.num_quota_partitions)?;
3078
3079    tracing::info!("partition config validated against stored {key}");
3080    Ok(())
3081}
3082
3083// ── Waitpoint HMAC secret bootstrap (RFC-004 §Waitpoint Security) ──
3084
3085/// Stable initial kid written on first boot. Rotation promotes to k2, k3, ...
3086/// The kid is stored alongside the secret in every partition's hash so each
3087/// FCALL can self-identify which secret produced a given token.
3088const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3089
3090/// Per-partition outcome of the HMAC bootstrap step. Collected across the
3091/// parallel scan so we can emit aggregated logs once at the end.
3092enum PartitionBootOutcome {
3093    /// Partition already had a matching (kid, secret) pair.
3094    Match,
3095    /// Stored secret diverges from env — likely operator rotation; kept.
3096    Mismatch,
3097    /// Torn write (current_kid present, secret:<kid> missing); repaired.
3098    Repaired,
3099    /// Fresh partition; atomically installed env secret under kid=k1.
3100    Installed,
3101}
3102
3103/// Bounded in-flight concurrency for the startup fan-out. Large enough to
3104/// turn a 256-partition install from ~15s sequential into ~1s on cross-AZ
3105/// Valkey, small enough to leave a cold cluster breathing room for other
3106/// Server::start work (library load, engine scanner spawn).
3107const BOOT_INIT_CONCURRENCY: usize = 16;
3108
3109async fn init_one_partition(
3110    client: &Client,
3111    partition: Partition,
3112    secret_hex: &str,
3113) -> Result<PartitionBootOutcome, ServerError> {
3114    let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3115
3116    // Probe for an existing install. Fast path (fresh partition): HGET
3117    // returns nil and we fall through to the atomic install below. Slow
3118    // path: secret:<stored_kid> is then HGET'd once we know the kid name.
3119    // (A previous version used a 2-field HMGET that included a fake
3120    // `secret:probe` placeholder — vestigial from an abandoned
3121    // optimization attempt, and confusing to read. Collapsed back to a
3122    // single-field HGET.)
3123    let stored_kid: Option<String> = client
3124        .cmd("HGET")
3125        .arg(&key)
3126        .arg("current_kid")
3127        .execute()
3128        .await
3129        .map_err(|e| ServerError::ValkeyContext {
3130            source: e,
3131            context: format!("HGET {key} current_kid (init probe)"),
3132        })?;
3133
3134    if let Some(stored_kid) = stored_kid {
3135        // We didn't know the stored kid up front, so now HGET the real
3136        // secret:<stored_kid> field. Two round-trips in the slow path; the
3137        // fast path (fresh partition) stays at one.
3138        let field = format!("secret:{stored_kid}");
3139        let stored_secret: Option<String> = client
3140            .hget(&key, &field)
3141            .await
3142            .map_err(|e| ServerError::ValkeyContext {
3143                source: e,
3144                context: format!("HGET {key} secret:<kid> (init check)"),
3145            })?;
3146        if stored_secret.is_none() {
3147            // Torn write from a previous boot: current_kid present but
3148            // secret:<kid> missing. Without repair, mint returns
3149            // "hmac_secret_not_initialized" on that partition forever.
3150            // Repair in place with env secret. Not rotation — rotation
3151            // always writes the secret first.
3152            client
3153                .hset(&key, &field, secret_hex)
3154                .await
3155                .map_err(|e| ServerError::ValkeyContext {
3156                    source: e,
3157                    context: format!("HSET {key} secret:<kid> (repair torn write)"),
3158                })?;
3159            return Ok(PartitionBootOutcome::Repaired);
3160        }
3161        if stored_secret.as_deref() != Some(secret_hex) {
3162            return Ok(PartitionBootOutcome::Mismatch);
3163        }
3164        return Ok(PartitionBootOutcome::Match);
3165    }
3166
3167    // Fresh partition — install current_kid + secret:<kid> atomically in
3168    // one HSET. Multi-field HSET is single-command atomic, so a crash
3169    // can't leave current_kid without its secret.
3170    let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3171    let _: i64 = client
3172        .cmd("HSET")
3173        .arg(&key)
3174        .arg("current_kid")
3175        .arg(WAITPOINT_HMAC_INITIAL_KID)
3176        .arg(&secret_field)
3177        .arg(secret_hex)
3178        .execute()
3179        .await
3180        .map_err(|e| ServerError::ValkeyContext {
3181            source: e,
3182            context: format!("HSET {key} (init waitpoint HMAC atomic)"),
3183        })?;
3184    Ok(PartitionBootOutcome::Installed)
3185}
3186
3187/// Install the waitpoint HMAC secret on every execution partition.
3188///
3189/// Parallelized fan-out with bounded in-flight concurrency
3190/// (`BOOT_INIT_CONCURRENCY`) so 256-partition boots finish in ~1s instead
3191/// of ~15s sequential — the prior sequential loop was tight on K8s
3192/// `initialDelaySeconds=30` defaults, especially cross-AZ. Fail-fast:
3193/// the first per-partition error aborts boot.
3194///
3195/// Outcomes aggregate into mismatch/repaired counts (logged once at end)
3196/// so operators see a single loud warning per fault class instead of 256
3197/// per-partition lines.
3198async fn initialize_waitpoint_hmac_secret(
3199    client: &Client,
3200    partition_config: &PartitionConfig,
3201    secret_hex: &str,
3202) -> Result<(), ServerError> {
3203    use futures::stream::{FuturesUnordered, StreamExt};
3204
3205    let n = partition_config.num_flow_partitions;
3206    tracing::info!(
3207        partitions = n,
3208        concurrency = BOOT_INIT_CONCURRENCY,
3209        "installing waitpoint HMAC secret across {n} execution partitions"
3210    );
3211
3212    let mut mismatch_count: u16 = 0;
3213    let mut repaired_count: u16 = 0;
3214    let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3215    let mut next_index: u16 = 0;
3216
3217    loop {
3218        while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3219            let partition = Partition {
3220                family: PartitionFamily::Execution,
3221                index: next_index,
3222            };
3223            let client = client.clone();
3224            let secret_hex = secret_hex.to_owned();
3225            pending.push(async move {
3226                init_one_partition(&client, partition, &secret_hex).await
3227            });
3228            next_index += 1;
3229        }
3230        match pending.next().await {
3231            Some(res) => match res? {
3232                PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3233                PartitionBootOutcome::Mismatch => mismatch_count += 1,
3234                PartitionBootOutcome::Repaired => repaired_count += 1,
3235            },
3236            None => break,
3237        }
3238    }
3239
3240    if repaired_count > 0 {
3241        tracing::warn!(
3242            repaired_partitions = repaired_count,
3243            total_partitions = n,
3244            "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3245             (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3246        );
3247    }
3248
3249    if mismatch_count > 0 {
3250        tracing::warn!(
3251            mismatched_partitions = mismatch_count,
3252            total_partitions = n,
3253            "stored/env secret mismatch on {mismatch_count} partitions — \
3254             env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3255             run POST /v1/admin/rotate-waitpoint-secret to sync"
3256        );
3257    }
3258
3259    tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3260    Ok(())
3261}
3262
3263/// Result of a waitpoint HMAC secret rotation across all execution partitions.
3264#[derive(Debug, Clone, serde::Serialize)]
3265pub struct RotateWaitpointSecretResult {
3266    /// Count of partitions that accepted the rotation.
3267    pub rotated: u16,
3268    /// Partition indices that failed — operator should investigate (Valkey
3269    /// outage, auth failure, cluster split). Rotation is idempotent, so a
3270    /// re-run after the underlying fault clears converges to the correct
3271    /// state.
3272    pub failed: Vec<u16>,
3273    /// New kid installed as current.
3274    pub new_kid: String,
3275}
3276
3277impl Server {
3278    /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
3279    /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
3280    /// as the new current kid. Idempotent: re-running with the same `new_kid`
3281    /// and `new_secret_hex` converges partitions to the same state.
3282    ///
3283    /// Returns a structured result so operators can see which partitions failed.
3284    /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
3285    pub async fn rotate_waitpoint_secret(
3286        &self,
3287        new_kid: &str,
3288        new_secret_hex: &str,
3289    ) -> Result<RotateWaitpointSecretResult, ServerError> {
3290        if new_kid.is_empty() || new_kid.contains(':') {
3291            return Err(ServerError::OperationFailed(
3292                "new_kid must be non-empty and must not contain ':'".into(),
3293            ));
3294        }
3295        if new_secret_hex.is_empty()
3296            || !new_secret_hex.len().is_multiple_of(2)
3297            || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3298        {
3299            return Err(ServerError::OperationFailed(
3300                "new_secret_hex must be a non-empty even-length hex string".into(),
3301            ));
3302        }
3303
3304        // Single-writer gate. Concurrent rotates against the SAME operator
3305        // token are an attack pattern (or a retry-loop bug); legitimate
3306        // operators rotate monthly and can afford to serialize. Contention
3307        // returns ConcurrencyLimitExceeded("admin_rotate", 1) (→ HTTP 429
3308        // with a labelled error body) rather than queueing the HTTP
3309        // handler past the 120s endpoint timeout. Permit is held for
3310        // the full partition fan-out.
3311        let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3312            Ok(p) => p,
3313            Err(tokio::sync::TryAcquireError::NoPermits) => {
3314                return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3315            }
3316            Err(tokio::sync::TryAcquireError::Closed) => {
3317                return Err(ServerError::OperationFailed(
3318                    "admin rotate semaphore closed (server shutting down)".into(),
3319                ));
3320            }
3321        };
3322
3323        let n = self.config.partition_config.num_flow_partitions;
3324        // "now" is derived inside the FCALL from `redis.call("TIME")`
3325        // (consistency with validate_waitpoint_token and flow scanners);
3326        // grace_ms is a duration — safe to carry from config.
3327        let grace_ms = self.config.waitpoint_hmac_grace_ms;
3328
3329        // Parallelize the rotation fan-out with the same bounded
3330        // concurrency as boot init (BOOT_INIT_CONCURRENCY = 16). A 256-
3331        // partition sequential rotation takes ~7.7s at 30ms cross-AZ RTT,
3332        // uncomfortably close to the 120s HTTP endpoint timeout under
3333        // contention. Atomicity per partition now lives inside the
3334        // `ff_rotate_waitpoint_hmac_secret` FCALL (FCALL is atomic per
3335        // shard); parallelism across DIFFERENT partitions is safe. The
3336        // outer `admin_rotate_semaphore(1)` bounds server-wide concurrent
3337        // rotations, so this fan-out only affects a single in-flight
3338        // rotate call at a time.
3339        use futures::stream::{FuturesUnordered, StreamExt};
3340
3341        let mut rotated = 0u16;
3342        let mut failed = Vec::new();
3343        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3344        let mut next_index: u16 = 0;
3345
3346        loop {
3347            while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3348                let partition = Partition {
3349                    family: PartitionFamily::Execution,
3350                    index: next_index,
3351                };
3352                let idx = next_index;
3353                // Clone only what the per-partition future needs. The
3354                // new_kid / new_secret_hex references outlive the loop
3355                // (they come from the enclosing function args), but
3356                // FuturesUnordered needs 'static futures. Own the strings.
3357                let new_kid_owned = new_kid.to_owned();
3358                let new_secret_owned = new_secret_hex.to_owned();
3359                let partition_owned = partition;
3360                let fut = async move {
3361                    let outcome = self
3362                        .rotate_single_partition(
3363                            &partition_owned,
3364                            &new_kid_owned,
3365                            &new_secret_owned,
3366                            grace_ms,
3367                        )
3368                        .await;
3369                    (idx, partition_owned, outcome)
3370                };
3371                pending.push(fut);
3372                next_index += 1;
3373            }
3374            match pending.next().await {
3375                Some((idx, partition, outcome)) => match outcome {
3376                    Ok(()) => {
3377                        rotated += 1;
3378                        // Per-partition event → DEBUG (not INFO). Rationale:
3379                        // one rotate endpoint call produces 256 partition-level
3380                        // events, which would blow up paid aggregator budgets
3381                        // (Datadog/Splunk) at no operational value. The single
3382                        // aggregated audit event below is the compliance
3383                        // artifact. Failures stay at ERROR with per-partition
3384                        // detail — that's where operators need it.
3385                        tracing::debug!(
3386                            partition = %partition,
3387                            new_kid = %new_kid,
3388                            "waitpoint_hmac_rotated"
3389                        );
3390                    }
3391                    Err(e) => {
3392                        // Failures stay at ERROR (target=audit) per-partition —
3393                        // operators need the partition index + error to debug
3394                        // Valkey/config faults. Low cardinality in practice.
3395                        tracing::error!(
3396                            target: "audit",
3397                            partition = %partition,
3398                            err = %e,
3399                            "waitpoint_hmac_rotation_failed"
3400                        );
3401                        failed.push(idx);
3402                    }
3403                },
3404                None => break,
3405            }
3406        }
3407
3408        // Single aggregated audit event for the whole rotation. This is
3409        // the load-bearing compliance artifact — operators alert on
3410        // target="audit" at INFO level and this is the stable schema.
3411        tracing::info!(
3412            target: "audit",
3413            new_kid = %new_kid,
3414            total_partitions = n,
3415            rotated,
3416            failed_count = failed.len(),
3417            "waitpoint_hmac_rotation_complete"
3418        );
3419
3420        Ok(RotateWaitpointSecretResult {
3421            rotated,
3422            failed,
3423            new_kid: new_kid.to_owned(),
3424        })
3425    }
3426
3427    /// Rotate on a single partition by dispatching the
3428    /// `ff_rotate_waitpoint_hmac_secret` FCALL. FCALL is atomic per shard,
3429    /// so no external SETNX lock is needed — the script itself IS the
3430    /// atomicity boundary. Single source of truth (Lua); the Rust
3431    /// implementation that previously lived here was an exact duplicate
3432    /// and has been removed.
3433    async fn rotate_single_partition(
3434        &self,
3435        partition: &Partition,
3436        new_kid: &str,
3437        new_secret_hex: &str,
3438        grace_ms: u64,
3439    ) -> Result<(), ServerError> {
3440        let idx = IndexKeys::new(partition);
3441        let args = RotateWaitpointHmacSecretArgs {
3442            new_kid: new_kid.to_owned(),
3443            new_secret_hex: new_secret_hex.to_owned(),
3444            grace_ms,
3445        };
3446        let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3447            &self.client,
3448            &idx,
3449            &args,
3450        )
3451        .await
3452        .map_err(|e| match e {
3453            // Same kid + different secret. Map to the same 409-style
3454            // error the old Rust path returned so HTTP callers keep the
3455            // current surface.
3456            ff_script::ScriptError::RotationConflict(kid) => {
3457                ServerError::OperationFailed(format!(
3458                    "rotation conflict: kid {kid} already installed with a \
3459                     different secret. Either use a fresh kid or restore the \
3460                     original secret for this kid before retrying."
3461                ))
3462            }
3463            ff_script::ScriptError::Valkey(v) => ServerError::ValkeyContext {
3464                source: v,
3465                context: format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3466            },
3467            other => ServerError::OperationFailed(format!(
3468                "rotation failed on partition {partition}: {other}"
3469            )),
3470        })?;
3471        // Either outcome is a successful write from the operator's POV.
3472        // Rotated → new install; Noop → idempotent replay.
3473        let _ = outcome;
3474        Ok(())
3475    }
3476}
3477
3478// ── FCALL result parsing ──
3479
3480fn parse_create_result(
3481    raw: &Value,
3482    execution_id: &ExecutionId,
3483) -> Result<CreateExecutionResult, ServerError> {
3484    let arr = match raw {
3485        Value::Array(arr) => arr,
3486        _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3487    };
3488
3489    let status = match arr.first() {
3490        Some(Ok(Value::Int(n))) => *n,
3491        _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3492    };
3493
3494    if status == 1 {
3495        // Check sub-status: OK or DUPLICATE
3496        let sub = arr
3497            .get(1)
3498            .and_then(|v| match v {
3499                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3500                Ok(Value::SimpleString(s)) => Some(s.clone()),
3501                _ => None,
3502            })
3503            .unwrap_or_default();
3504
3505        if sub == "DUPLICATE" {
3506            Ok(CreateExecutionResult::Duplicate {
3507                execution_id: execution_id.clone(),
3508            })
3509        } else {
3510            Ok(CreateExecutionResult::Created {
3511                execution_id: execution_id.clone(),
3512                public_state: PublicState::Waiting,
3513            })
3514        }
3515    } else {
3516        let error_code = arr
3517            .get(1)
3518            .and_then(|v| match v {
3519                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3520                Ok(Value::SimpleString(s)) => Some(s.clone()),
3521                _ => None,
3522            })
3523            .unwrap_or_else(|| "unknown".to_owned());
3524        Err(ServerError::OperationFailed(format!(
3525            "ff_create_execution failed: {error_code}"
3526        )))
3527    }
3528}
3529
3530fn parse_cancel_result(
3531    raw: &Value,
3532    execution_id: &ExecutionId,
3533) -> Result<CancelExecutionResult, ServerError> {
3534    let arr = match raw {
3535        Value::Array(arr) => arr,
3536        _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3537    };
3538
3539    let status = match arr.first() {
3540        Some(Ok(Value::Int(n))) => *n,
3541        _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3542    };
3543
3544    if status == 1 {
3545        Ok(CancelExecutionResult::Cancelled {
3546            execution_id: execution_id.clone(),
3547            public_state: PublicState::Cancelled,
3548        })
3549    } else {
3550        let error_code = arr
3551            .get(1)
3552            .and_then(|v| match v {
3553                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3554                Ok(Value::SimpleString(s)) => Some(s.clone()),
3555                _ => None,
3556            })
3557            .unwrap_or_else(|| "unknown".to_owned());
3558        Err(ServerError::OperationFailed(format!(
3559            "ff_cancel_execution failed: {error_code}"
3560        )))
3561    }
3562}
3563
3564fn parse_budget_create_result(
3565    raw: &Value,
3566    budget_id: &BudgetId,
3567) -> Result<CreateBudgetResult, ServerError> {
3568    let arr = match raw {
3569        Value::Array(arr) => arr,
3570        _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3571    };
3572
3573    let status = match arr.first() {
3574        Some(Ok(Value::Int(n))) => *n,
3575        _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3576    };
3577
3578    if status == 1 {
3579        let sub = arr
3580            .get(1)
3581            .and_then(|v| match v {
3582                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3583                Ok(Value::SimpleString(s)) => Some(s.clone()),
3584                _ => None,
3585            })
3586            .unwrap_or_default();
3587
3588        if sub == "ALREADY_SATISFIED" {
3589            Ok(CreateBudgetResult::AlreadySatisfied {
3590                budget_id: budget_id.clone(),
3591            })
3592        } else {
3593            Ok(CreateBudgetResult::Created {
3594                budget_id: budget_id.clone(),
3595            })
3596        }
3597    } else {
3598        let error_code = arr
3599            .get(1)
3600            .and_then(|v| match v {
3601                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3602                Ok(Value::SimpleString(s)) => Some(s.clone()),
3603                _ => None,
3604            })
3605            .unwrap_or_else(|| "unknown".to_owned());
3606        Err(ServerError::OperationFailed(format!(
3607            "ff_create_budget failed: {error_code}"
3608        )))
3609    }
3610}
3611
3612fn parse_quota_create_result(
3613    raw: &Value,
3614    quota_policy_id: &QuotaPolicyId,
3615) -> Result<CreateQuotaPolicyResult, ServerError> {
3616    let arr = match raw {
3617        Value::Array(arr) => arr,
3618        _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3619    };
3620
3621    let status = match arr.first() {
3622        Some(Ok(Value::Int(n))) => *n,
3623        _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3624    };
3625
3626    if status == 1 {
3627        let sub = arr
3628            .get(1)
3629            .and_then(|v| match v {
3630                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3631                Ok(Value::SimpleString(s)) => Some(s.clone()),
3632                _ => None,
3633            })
3634            .unwrap_or_default();
3635
3636        if sub == "ALREADY_SATISFIED" {
3637            Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3638                quota_policy_id: quota_policy_id.clone(),
3639            })
3640        } else {
3641            Ok(CreateQuotaPolicyResult::Created {
3642                quota_policy_id: quota_policy_id.clone(),
3643            })
3644        }
3645    } else {
3646        let error_code = arr
3647            .get(1)
3648            .and_then(|v| match v {
3649                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3650                Ok(Value::SimpleString(s)) => Some(s.clone()),
3651                _ => None,
3652            })
3653            .unwrap_or_else(|| "unknown".to_owned());
3654        Err(ServerError::OperationFailed(format!(
3655            "ff_create_quota_policy failed: {error_code}"
3656        )))
3657    }
3658}
3659
3660// ── Flow FCALL result parsing ──
3661
3662fn parse_create_flow_result(
3663    raw: &Value,
3664    flow_id: &FlowId,
3665) -> Result<CreateFlowResult, ServerError> {
3666    let arr = match raw {
3667        Value::Array(arr) => arr,
3668        _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3669    };
3670    let status = match arr.first() {
3671        Some(Ok(Value::Int(n))) => *n,
3672        _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3673    };
3674    if status == 1 {
3675        let sub = fcall_field_str(arr, 1);
3676        if sub == "ALREADY_SATISFIED" {
3677            Ok(CreateFlowResult::AlreadySatisfied {
3678                flow_id: flow_id.clone(),
3679            })
3680        } else {
3681            Ok(CreateFlowResult::Created {
3682                flow_id: flow_id.clone(),
3683            })
3684        }
3685    } else {
3686        let error_code = fcall_field_str(arr, 1);
3687        Err(ServerError::OperationFailed(format!(
3688            "ff_create_flow failed: {error_code}"
3689        )))
3690    }
3691}
3692
3693fn parse_add_execution_to_flow_result(
3694    raw: &Value,
3695) -> Result<AddExecutionToFlowResult, ServerError> {
3696    let arr = match raw {
3697        Value::Array(arr) => arr,
3698        _ => {
3699            return Err(ServerError::Script(
3700                "ff_add_execution_to_flow: expected Array".into(),
3701            ))
3702        }
3703    };
3704    let status = match arr.first() {
3705        Some(Ok(Value::Int(n))) => *n,
3706        _ => {
3707            return Err(ServerError::Script(
3708                "ff_add_execution_to_flow: bad status code".into(),
3709            ))
3710        }
3711    };
3712    if status == 1 {
3713        let sub = fcall_field_str(arr, 1);
3714        let eid_str = fcall_field_str(arr, 2);
3715        let nc_str = fcall_field_str(arr, 3);
3716        let eid = ExecutionId::parse(&eid_str)
3717            .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3718        let nc: u32 = nc_str.parse().unwrap_or(0);
3719        if sub == "ALREADY_SATISFIED" {
3720            Ok(AddExecutionToFlowResult::AlreadyMember {
3721                execution_id: eid,
3722                node_count: nc,
3723            })
3724        } else {
3725            Ok(AddExecutionToFlowResult::Added {
3726                execution_id: eid,
3727                new_node_count: nc,
3728            })
3729        }
3730    } else {
3731        let error_code = fcall_field_str(arr, 1);
3732        Err(ServerError::OperationFailed(format!(
3733            "ff_add_execution_to_flow failed: {error_code}"
3734        )))
3735    }
3736}
3737
3738/// Outcome of parsing a raw `ff_cancel_flow` FCALL response.
3739///
3740/// Keeps `AlreadyTerminal` distinct from other script errors so the caller
3741/// can treat cancel on an already-cancelled/completed/failed flow as
3742/// idempotent success instead of surfacing a 400 to the client.
3743enum ParsedCancelFlow {
3744    Cancelled {
3745        policy: String,
3746        member_execution_ids: Vec<String>,
3747    },
3748    AlreadyTerminal,
3749}
3750
3751/// Parse the raw `ff_cancel_flow` FCALL response.
3752///
3753/// Returns [`ParsedCancelFlow::Cancelled`] on success, [`ParsedCancelFlow::AlreadyTerminal`]
3754/// when the flow was already in a terminal state (idempotent retry), or a
3755/// [`ServerError`] for any other failure.
3756fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3757    let arr = match raw {
3758        Value::Array(arr) => arr,
3759        _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3760    };
3761    let status = match arr.first() {
3762        Some(Ok(Value::Int(n))) => *n,
3763        _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3764    };
3765    if status != 1 {
3766        let error_code = fcall_field_str(arr, 1);
3767        if error_code == "flow_already_terminal" {
3768            return Ok(ParsedCancelFlow::AlreadyTerminal);
3769        }
3770        return Err(ServerError::OperationFailed(format!(
3771            "ff_cancel_flow failed: {error_code}"
3772        )));
3773    }
3774    // {1, "OK", cancellation_policy, member1, member2, ...}
3775    let policy = fcall_field_str(arr, 2);
3776    // Iterate to arr.len() rather than breaking on the first empty string —
3777    // safer against malformed Lua responses and clearer than a sentinel loop.
3778    let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3779    for i in 3..arr.len() {
3780        members.push(fcall_field_str(arr, i));
3781    }
3782    Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3783}
3784
3785fn parse_stage_dependency_edge_result(
3786    raw: &Value,
3787) -> Result<StageDependencyEdgeResult, ServerError> {
3788    let arr = match raw {
3789        Value::Array(arr) => arr,
3790        _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3791    };
3792    let status = match arr.first() {
3793        Some(Ok(Value::Int(n))) => *n,
3794        _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3795    };
3796    if status == 1 {
3797        let edge_id_str = fcall_field_str(arr, 2);
3798        let rev_str = fcall_field_str(arr, 3);
3799        let edge_id = EdgeId::parse(&edge_id_str)
3800            .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3801        let rev: u64 = rev_str.parse().unwrap_or(0);
3802        Ok(StageDependencyEdgeResult::Staged {
3803            edge_id,
3804            new_graph_revision: rev,
3805        })
3806    } else {
3807        let error_code = fcall_field_str(arr, 1);
3808        Err(ServerError::OperationFailed(format!(
3809            "ff_stage_dependency_edge failed: {error_code}"
3810        )))
3811    }
3812}
3813
3814fn parse_apply_dependency_result(
3815    raw: &Value,
3816) -> Result<ApplyDependencyToChildResult, ServerError> {
3817    let arr = match raw {
3818        Value::Array(arr) => arr,
3819        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3820    };
3821    let status = match arr.first() {
3822        Some(Ok(Value::Int(n))) => *n,
3823        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3824    };
3825    if status == 1 {
3826        let sub = fcall_field_str(arr, 1);
3827        if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3828            Ok(ApplyDependencyToChildResult::AlreadyApplied)
3829        } else {
3830            // OK status — field at index 2 is unsatisfied count
3831            let count_str = fcall_field_str(arr, 2);
3832            let count: u32 = count_str.parse().unwrap_or(0);
3833            Ok(ApplyDependencyToChildResult::Applied {
3834                unsatisfied_count: count,
3835            })
3836        }
3837    } else {
3838        let error_code = fcall_field_str(arr, 1);
3839        Err(ServerError::OperationFailed(format!(
3840            "ff_apply_dependency_to_child failed: {error_code}"
3841        )))
3842    }
3843}
3844
3845fn parse_deliver_signal_result(
3846    raw: &Value,
3847    signal_id: &SignalId,
3848) -> Result<DeliverSignalResult, ServerError> {
3849    let arr = match raw {
3850        Value::Array(arr) => arr,
3851        _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3852    };
3853    let status = match arr.first() {
3854        Some(Ok(Value::Int(n))) => *n,
3855        _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3856    };
3857    if status == 1 {
3858        let sub = fcall_field_str(arr, 1);
3859        if sub == "DUPLICATE" {
3860            // ok_duplicate(existing_signal_id) → {1, "DUPLICATE", existing_signal_id}
3861            let existing_str = fcall_field_str(arr, 2);
3862            let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3863            Ok(DeliverSignalResult::Duplicate {
3864                existing_signal_id: existing_id,
3865            })
3866        } else {
3867            // ok(signal_id, effect) → {1, "OK", signal_id, effect}
3868            let effect = fcall_field_str(arr, 3);
3869            Ok(DeliverSignalResult::Accepted {
3870                signal_id: signal_id.clone(),
3871                effect,
3872            })
3873        }
3874    } else {
3875        let error_code = fcall_field_str(arr, 1);
3876        Err(ServerError::OperationFailed(format!(
3877            "ff_deliver_signal failed: {error_code}"
3878        )))
3879    }
3880}
3881
3882fn parse_change_priority_result(
3883    raw: &Value,
3884    execution_id: &ExecutionId,
3885) -> Result<ChangePriorityResult, ServerError> {
3886    let arr = match raw {
3887        Value::Array(arr) => arr,
3888        _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3889    };
3890    let status = match arr.first() {
3891        Some(Ok(Value::Int(n))) => *n,
3892        _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3893    };
3894    if status == 1 {
3895        Ok(ChangePriorityResult::Changed {
3896            execution_id: execution_id.clone(),
3897        })
3898    } else {
3899        let error_code = fcall_field_str(arr, 1);
3900        Err(ServerError::OperationFailed(format!(
3901            "ff_change_priority failed: {error_code}"
3902        )))
3903    }
3904}
3905
3906fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3907    let arr = match raw {
3908        Value::Array(arr) => arr,
3909        _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3910    };
3911    let status = match arr.first() {
3912        Some(Ok(Value::Int(n))) => *n,
3913        _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3914    };
3915    if status == 1 {
3916        // ok("0") for normal replay, ok(N) for skipped flow member
3917        let unsatisfied = fcall_field_str(arr, 2);
3918        let ps = if unsatisfied == "0" {
3919            PublicState::Waiting
3920        } else {
3921            PublicState::WaitingChildren
3922        };
3923        Ok(ReplayExecutionResult::Replayed { public_state: ps })
3924    } else {
3925        let error_code = fcall_field_str(arr, 1);
3926        Err(ServerError::OperationFailed(format!(
3927            "ff_replay_execution failed: {error_code}"
3928        )))
3929    }
3930}
3931
3932/// Extract a string from an FCALL result array at the given index.
3933/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
3934/// for transport-level variants. Business-logic variants keep their code as
3935/// `ServerError::Script(String)` so HTTP clients see a stable message.
3936///
3937/// Why this exists: before R2, the stream handlers did
3938/// `ScriptError → format!() → ServerError::Script(String)`, which erased
3939/// the ErrorKind and made `ServerError::is_retryable()` always return
3940/// false. Retry-capable clients (cairn-fabric) would not retry a legit
3941/// transient error like `IoError`.
3942fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3943    match e {
3944        ff_script::error::ScriptError::Valkey(valkey_err) => ServerError::ValkeyContext {
3945            source: valkey_err,
3946            context: "stream FCALL transport".into(),
3947        },
3948        other => ServerError::Script(other.to_string()),
3949    }
3950}
3951
3952fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3953    match arr.get(index) {
3954        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3955        Some(Ok(Value::SimpleString(s))) => s.clone(),
3956        Some(Ok(Value::Int(n))) => n.to_string(),
3957        _ => String::new(),
3958    }
3959}
3960
3961/// Parse ff_report_usage_and_check result.
3962/// Standard format: {1, "OK"}, {1, "SOFT_BREACH", dim, current, limit},
3963///                  {1, "HARD_BREACH", dim, current, limit}, {1, "ALREADY_APPLIED"}
3964fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3965    let arr = match raw {
3966        Value::Array(arr) => arr,
3967        _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3968    };
3969    let status_code = match arr.first() {
3970        Some(Ok(Value::Int(n))) => *n,
3971        _ => {
3972            return Err(ServerError::Script(
3973                "ff_report_usage_and_check: expected Int status code".into(),
3974            ))
3975        }
3976    };
3977    if status_code != 1 {
3978        let error_code = fcall_field_str(arr, 1);
3979        return Err(ServerError::OperationFailed(format!(
3980            "ff_report_usage_and_check failed: {error_code}"
3981        )));
3982    }
3983    let sub_status = fcall_field_str(arr, 1);
3984    match sub_status.as_str() {
3985        "OK" => Ok(ReportUsageResult::Ok),
3986        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3987        "SOFT_BREACH" => {
3988            let dim = fcall_field_str(arr, 2);
3989            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3990            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3991            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3992        }
3993        "HARD_BREACH" => {
3994            let dim = fcall_field_str(arr, 2);
3995            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3996            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3997            Ok(ReportUsageResult::HardBreach {
3998                dimension: dim,
3999                current_usage: current,
4000                hard_limit: limit,
4001            })
4002        }
4003        _ => Err(ServerError::OperationFailed(format!(
4004            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
4005        ))),
4006    }
4007}
4008
4009fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
4010    let arr = match raw {
4011        Value::Array(arr) => arr,
4012        _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
4013    };
4014    let status = match arr.first() {
4015        Some(Ok(Value::Int(n))) => *n,
4016        _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
4017    };
4018    if status == 1 {
4019        let sub = fcall_field_str(arr, 1);
4020        if sub == "ALREADY_SATISFIED" {
4021            let reason = fcall_field_str(arr, 2);
4022            Ok(RevokeLeaseResult::AlreadySatisfied { reason })
4023        } else {
4024            let lid = fcall_field_str(arr, 2);
4025            let epoch = fcall_field_str(arr, 3);
4026            Ok(RevokeLeaseResult::Revoked {
4027                lease_id: lid,
4028                lease_epoch: epoch,
4029            })
4030        }
4031    } else {
4032        let error_code = fcall_field_str(arr, 1);
4033        Err(ServerError::OperationFailed(format!(
4034            "ff_revoke_lease failed: {error_code}"
4035        )))
4036    }
4037}
4038
4039/// Detect Valkey errors indicating the Lua function library is not loaded.
4040///
4041/// After a failover, the new primary may not have the library if replication
4042/// was incomplete. Valkey returns `ERR Function not loaded` for FCALL calls
4043/// targeting missing functions.
4044fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
4045    if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
4046        return true;
4047    }
4048    e.detail()
4049        .map(|d| {
4050            d.contains("Function not loaded")
4051                || d.contains("No matching function")
4052                || d.contains("function not found")
4053        })
4054        .unwrap_or(false)
4055        || e.to_string().contains("Function not loaded")
4056}
4057
4058/// Free-function form of [`Server::fcall_with_reload`] — callable from
4059/// background tasks that own a cloned `Client` but no `&Server`.
4060async fn fcall_with_reload_on_client(
4061    client: &Client,
4062    function: &str,
4063    keys: &[&str],
4064    args: &[&str],
4065) -> Result<Value, ServerError> {
4066    match client.fcall(function, keys, args).await {
4067        Ok(v) => Ok(v),
4068        Err(e) if is_function_not_loaded(&e) => {
4069            tracing::warn!(function, "Lua library not found on server, reloading");
4070            ff_script::loader::ensure_library(client)
4071                .await
4072                .map_err(ServerError::LibraryLoad)?;
4073            client
4074                .fcall(function, keys, args)
4075                .await
4076                .map_err(ServerError::Valkey)
4077        }
4078        Err(e) => Err(ServerError::Valkey(e)),
4079    }
4080}
4081
4082/// Build the `ff_cancel_execution` KEYS (21) and ARGV (5) by pre-reading
4083/// dynamic fields from `exec_core`. Shared by [`Server::cancel_execution`]
4084/// and the async cancel_flow member-dispatch path.
4085async fn build_cancel_execution_fcall(
4086    client: &Client,
4087    partition_config: &PartitionConfig,
4088    args: &CancelExecutionArgs,
4089) -> Result<(Vec<String>, Vec<String>), ServerError> {
4090    let partition = execution_partition(&args.execution_id, partition_config);
4091    let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4092    let idx = IndexKeys::new(&partition);
4093
4094    let lane_str: Option<String> = client
4095        .hget(&ctx.core(), "lane_id")
4096        .await
4097        .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
4098    let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4099
4100    let dyn_fields: Vec<Option<String>> = client
4101        .cmd("HMGET")
4102        .arg(ctx.core())
4103        .arg("current_attempt_index")
4104        .arg("current_waitpoint_id")
4105        .arg("current_worker_instance_id")
4106        .execute()
4107        .await
4108        .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET cancel pre-read".into() })?;
4109
4110    let att_idx_val = dyn_fields.first()
4111        .and_then(|v| v.as_ref())
4112        .and_then(|s| s.parse::<u32>().ok())
4113        .unwrap_or(0);
4114    let att_idx = AttemptIndex::new(att_idx_val);
4115    let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4116    let wp_id = if wp_id_str.is_empty() {
4117        WaitpointId::new()
4118    } else {
4119        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4120    };
4121    let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4122    let wiid = WorkerInstanceId::new(&wiid_str);
4123
4124    let keys: Vec<String> = vec![
4125        ctx.core(),                              // 1
4126        ctx.attempt_hash(att_idx),               // 2
4127        ctx.stream_meta(att_idx),                // 3
4128        ctx.lease_current(),                     // 4
4129        ctx.lease_history(),                     // 5
4130        idx.lease_expiry(),                      // 6
4131        idx.worker_leases(&wiid),                // 7
4132        ctx.suspension_current(),                // 8
4133        ctx.waitpoint(&wp_id),                   // 9
4134        ctx.waitpoint_condition(&wp_id),         // 10
4135        idx.suspension_timeout(),                // 11
4136        idx.lane_terminal(&lane),                // 12
4137        idx.attempt_timeout(),                   // 13
4138        idx.execution_deadline(),                // 14
4139        idx.lane_eligible(&lane),                // 15
4140        idx.lane_delayed(&lane),                 // 16
4141        idx.lane_blocked_dependencies(&lane),    // 17
4142        idx.lane_blocked_budget(&lane),          // 18
4143        idx.lane_blocked_quota(&lane),           // 19
4144        idx.lane_blocked_route(&lane),           // 20
4145        idx.lane_blocked_operator(&lane),        // 21
4146    ];
4147    let argv: Vec<String> = vec![
4148        args.execution_id.to_string(),
4149        args.reason.clone(),
4150        args.source.to_string(),
4151        args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4152        args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4153    ];
4154    Ok((keys, argv))
4155}
4156
4157/// Backoff schedule for transient Valkey errors during async cancel_flow
4158/// dispatch. Length = retry-attempt count (including the initial attempt).
4159/// The last entry is not slept on because it's the final attempt.
4160const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4161
4162/// Reach into a `ServerError` for a `ferriskey::ErrorKind` when one is
4163/// available. Matches the variants that can carry a transport-layer kind:
4164/// direct `Valkey`, `ValkeyContext`, and `LibraryLoad` (which itself wraps
4165/// a `ferriskey::Error`).
4166fn extract_valkey_kind(e: &ServerError) -> Option<ferriskey::ErrorKind> {
4167    match e {
4168        ServerError::Valkey(err) | ServerError::ValkeyContext { source: err, .. } => {
4169            Some(err.kind())
4170        }
4171        ServerError::LibraryLoad(load_err) => load_err.valkey_kind(),
4172        _ => None,
4173    }
4174}
4175
4176/// Cancel a single member execution from a cancel_flow dispatch context.
4177/// Parses the flow-member EID string, builds the FCALL via the shared helper,
4178/// and executes with the same reload-on-failover semantics as the inline path.
4179///
4180/// Wrapped in a bounded retry loop (see [`CANCEL_MEMBER_RETRY_DELAYS_MS`]) so
4181/// that transient Valkey errors mid-dispatch (failover, `TryAgain`,
4182/// `ClusterDown`, `IoError`, `FatalSendError`) do not silently leak
4183/// non-cancelled members. `FatalReceiveError` and non-retryable kinds bubble
4184/// up immediately — those either indicate the Lua ran server-side anyway or a
4185/// permanent mismatch that retries cannot fix.
4186/// Acknowledge that a member cancel has committed. Fires
4187/// `ff_ack_cancel_member` on `{fp:N}` to SREM the execution from the
4188/// flow's `pending_cancels` set and, if empty, ZREM the flow from the
4189/// partition-level `cancel_backlog`. Best-effort — failures are logged
4190/// but not propagated, since the reconciler will catch anything that
4191/// stays behind on its next pass.
4192async fn ack_cancel_member(
4193    client: &Client,
4194    pending_cancels_key: &str,
4195    cancel_backlog_key: &str,
4196    eid_str: &str,
4197    flow_id: &str,
4198) {
4199    let keys = [pending_cancels_key, cancel_backlog_key];
4200    let args_v = [eid_str, flow_id];
4201    let fut: Result<Value, _> =
4202        client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4203    if let Err(e) = fut {
4204        tracing::warn!(
4205            flow_id = %flow_id,
4206            execution_id = %eid_str,
4207            error = %e,
4208            "ff_ack_cancel_member failed; reconciler will drain on next pass"
4209        );
4210    }
4211}
4212
4213/// Returns true if a cancel_member failure reflects an already-terminal
4214/// (or never-existed) execution, which from the flow-cancel perspective
4215/// is ack-worthy success rather than a partial failure. The Lua
4216/// `ff_cancel_execution` function emits `execution_not_active` when the
4217/// member is already in a terminal phase, and `execution_not_found` when
4218/// the key is gone. Both codes arrive here wrapped in
4219/// `ServerError::OperationFailed("ff_cancel_execution failed: <code>")`
4220/// via `parse_cancel_result`.
4221fn is_terminal_ack_error(err: &ServerError) -> bool {
4222    match err {
4223        ServerError::OperationFailed(msg) => {
4224            msg.contains("execution_not_active") || msg.contains("execution_not_found")
4225        }
4226        _ => false,
4227    }
4228}
4229
4230async fn cancel_member_execution(
4231    client: &Client,
4232    partition_config: &PartitionConfig,
4233    eid_str: &str,
4234    reason: &str,
4235    now: TimestampMs,
4236) -> Result<(), ServerError> {
4237    let execution_id = ExecutionId::parse(eid_str)
4238        .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4239    let args = CancelExecutionArgs {
4240        execution_id: execution_id.clone(),
4241        reason: reason.to_owned(),
4242        source: CancelSource::OperatorOverride,
4243        lease_id: None,
4244        lease_epoch: None,
4245        attempt_id: None,
4246        now,
4247    };
4248
4249    let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4250    for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4251        let is_last = attempt_idx + 1 == attempts;
4252        match try_cancel_member_once(client, partition_config, &args).await {
4253            Ok(()) => return Ok(()),
4254            Err(e) => {
4255                // Only retry transport-layer transients; business-logic
4256                // errors (Script / OperationFailed / NotFound / InvalidInput)
4257                // won't change on retry.
4258                let retryable = extract_valkey_kind(&e)
4259                    .map(ff_script::retry::is_retryable_kind)
4260                    .unwrap_or(false);
4261                if !retryable || is_last {
4262                    return Err(e);
4263                }
4264                tracing::debug!(
4265                    execution_id = %execution_id,
4266                    attempt = attempt_idx + 1,
4267                    delay_ms = *delay_ms,
4268                    error = %e,
4269                    "cancel_member_execution: transient error, retrying"
4270                );
4271                tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4272            }
4273        }
4274    }
4275    // Unreachable: the loop above either returns Ok, returns Err on the
4276    // last attempt, or returns Err on a non-retryable error. Keep a
4277    // defensive fallback for future edits to the retry structure.
4278    Err(ServerError::OperationFailed(format!(
4279        "cancel_member_execution: retries exhausted for {execution_id}"
4280    )))
4281}
4282
4283/// Single cancel attempt — pre-read + FCALL + parse. Factored out so the
4284/// retry loop in [`cancel_member_execution`] can invoke it cleanly.
4285async fn try_cancel_member_once(
4286    client: &Client,
4287    partition_config: &PartitionConfig,
4288    args: &CancelExecutionArgs,
4289) -> Result<(), ServerError> {
4290    let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4291    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4292    let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4293    let raw =
4294        fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4295    parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4296}
4297
4298fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4299    let arr = match raw {
4300        Value::Array(arr) => arr,
4301        _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4302    };
4303    let status = match arr.first() {
4304        Some(Ok(Value::Int(n))) => *n,
4305        _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4306    };
4307    if status == 1 {
4308        let next_str = fcall_field_str(arr, 2);
4309        let next_ms: i64 = next_str.parse().unwrap_or(0);
4310        Ok(ResetBudgetResult::Reset {
4311            next_reset_at: TimestampMs::from_millis(next_ms),
4312        })
4313    } else {
4314        let error_code = fcall_field_str(arr, 1);
4315        Err(ServerError::OperationFailed(format!(
4316            "ff_reset_budget failed: {error_code}"
4317        )))
4318    }
4319}
4320
4321#[cfg(test)]
4322mod tests {
4323    use super::*;
4324    use ferriskey::ErrorKind;
4325
4326    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4327        ferriskey::Error::from((kind, "synthetic"))
4328    }
4329
4330    // ── Budget dimension-cap validation (issue #104) ──
4331
4332    #[test]
4333    fn create_budget_rejects_over_cap_dimension_count() {
4334        let n = MAX_BUDGET_DIMENSIONS + 1;
4335        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4336        let hard = vec![1u64; n];
4337        let soft = vec![0u64; n];
4338        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4339        match err {
4340            ServerError::InvalidInput(msg) => {
4341                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4342                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4343                assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4344            }
4345            other => panic!("expected InvalidInput, got {other:?}"),
4346        }
4347    }
4348
4349    #[test]
4350    fn create_budget_accepts_exactly_cap_dimensions() {
4351        let n = MAX_BUDGET_DIMENSIONS;
4352        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4353        let hard = vec![1u64; n];
4354        let soft = vec![0u64; n];
4355        assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4356    }
4357
4358    #[test]
4359    fn create_budget_rejects_hard_limit_length_mismatch() {
4360        let dims = vec!["a".to_string(), "b".to_string()];
4361        let hard = vec![1u64]; // too short
4362        let soft = vec![0u64, 0u64];
4363        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4364        match err {
4365            ServerError::InvalidInput(msg) => {
4366                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4367                assert!(msg.contains("hard_limits=1"), "got: {msg}");
4368                assert!(msg.contains("dimensions=2"), "got: {msg}");
4369            }
4370            other => panic!("expected InvalidInput, got {other:?}"),
4371        }
4372    }
4373
4374    #[test]
4375    fn create_budget_rejects_soft_limit_length_mismatch() {
4376        let dims = vec!["a".to_string(), "b".to_string()];
4377        let hard = vec![1u64, 2u64];
4378        let soft = vec![0u64, 0u64, 0u64]; // too long
4379        let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4380        match err {
4381            ServerError::InvalidInput(msg) => {
4382                assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4383                assert!(msg.contains("soft_limits=3"), "got: {msg}");
4384            }
4385            other => panic!("expected InvalidInput, got {other:?}"),
4386        }
4387    }
4388
4389    #[test]
4390    fn report_usage_rejects_over_cap_dimension_count() {
4391        let n = MAX_BUDGET_DIMENSIONS + 1;
4392        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4393        let deltas = vec![1u64; n];
4394        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4395        match err {
4396            ServerError::InvalidInput(msg) => {
4397                assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4398                assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4399            }
4400            other => panic!("expected InvalidInput, got {other:?}"),
4401        }
4402    }
4403
4404    #[test]
4405    fn report_usage_accepts_exactly_cap_dimensions() {
4406        let n = MAX_BUDGET_DIMENSIONS;
4407        let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4408        let deltas = vec![1u64; n];
4409        assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4410    }
4411
4412    #[test]
4413    fn report_usage_rejects_delta_length_mismatch() {
4414        let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4415        let deltas = vec![1u64, 2u64]; // too short
4416        let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4417        match err {
4418            ServerError::InvalidInput(msg) => {
4419                assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4420                assert!(msg.contains("dimensions=3"), "got: {msg}");
4421                assert!(msg.contains("deltas=2"), "got: {msg}");
4422            }
4423            other => panic!("expected InvalidInput, got {other:?}"),
4424        }
4425    }
4426
4427    #[test]
4428    fn report_usage_accepts_empty_dimensions() {
4429        // Edge case: zero-dimension report_usage is a no-op that should pass
4430        // validation (Lua handles dim_count=0 correctly).
4431        assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4432    }
4433
4434    #[test]
4435    fn is_retryable_valkey_variant_uses_kind_table() {
4436        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
4437        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4438        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4439        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4440        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4441
4442        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4443        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4444        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4445        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
4446        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Ask)).is_retryable());
4447        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4448    }
4449
4450    #[test]
4451    fn is_retryable_valkey_context_uses_kind_table() {
4452        let err = ServerError::ValkeyContext {
4453            source: mk_fk_err(ErrorKind::IoError),
4454            context: "HGET test".into(),
4455        };
4456        assert!(err.is_retryable());
4457
4458        let err = ServerError::ValkeyContext {
4459            source: mk_fk_err(ErrorKind::AuthenticationFailed),
4460            context: "auth".into(),
4461        };
4462        assert!(!err.is_retryable());
4463    }
4464
4465    #[test]
4466    fn is_retryable_library_load_delegates_to_inner_kind() {
4467        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4468            mk_fk_err(ErrorKind::IoError),
4469        ));
4470        assert!(err.is_retryable());
4471
4472        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4473            mk_fk_err(ErrorKind::AuthenticationFailed),
4474        ));
4475        assert!(!err.is_retryable());
4476
4477        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4478            expected: "1".into(),
4479            got: "2".into(),
4480        });
4481        assert!(!err.is_retryable());
4482    }
4483
4484    #[test]
4485    fn is_retryable_business_logic_variants_are_false() {
4486        assert!(!ServerError::NotFound("x".into()).is_retryable());
4487        assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4488        assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4489        assert!(!ServerError::Script("x".into()).is_retryable());
4490        assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4491    }
4492
4493    #[test]
4494    fn valkey_kind_delegates_through_library_load() {
4495        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4496            mk_fk_err(ErrorKind::ClusterDown),
4497        ));
4498        assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
4499
4500        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4501            expected: "1".into(),
4502            got: "2".into(),
4503        });
4504        assert_eq!(err.valkey_kind(), None);
4505    }
4506
4507    // ── Valkey version check (RFC-011 §13) ──
4508
4509    #[test]
4510    fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4511        // Valkey 8.x+ pins redis_version to 7.2.4 for Redis-client compat
4512        // and exposes the real version in valkey_version. Parser must use
4513        // valkey_version when both are present.
4514        let info = "\
4515# Server\r\n\
4516redis_version:7.2.4\r\n\
4517valkey_version:9.0.3\r\n\
4518server_mode:cluster\r\n\
4519os:Linux\r\n";
4520        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4521    }
4522
4523    #[test]
4524    fn parse_valkey_version_real_valkey_8_cluster_body() {
4525        // Actual INFO server response observed on valkey/valkey:latest in
4526        // cluster mode (CI matrix): redis_version compat-pinned to 7.2.4,
4527        // valkey_version authoritative at 9.0.3.
4528        let info = "\
4529# Server\r\n\
4530redis_version:7.2.4\r\n\
4531server_name:valkey\r\n\
4532valkey_version:9.0.3\r\n\
4533valkey_release_stage:ga\r\n\
4534redis_git_sha1:00000000\r\n\
4535server_mode:cluster\r\n";
4536        assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4537    }
4538
4539    #[test]
4540    fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4541        // Valkey 7.x doesn't emit valkey_version, but does emit
4542        // server_name:valkey; parser falls back to redis_version.
4543        let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4544        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4545    }
4546
4547    #[test]
4548    fn parse_valkey_version_rejects_redis_backend() {
4549        // Real Redis emits redis_version: but not server_name:valkey and
4550        // not valkey_version:. Parser must reject this affirmatively so an
4551        // operator pointing ff-server at a Redis instance fails loud instead
4552        // of silently running against an unsupported backend.
4553        let info = "\
4554# Server\r\n\
4555redis_version:7.4.0\r\n\
4556redis_mode:standalone\r\n\
4557os:Linux\r\n";
4558        let err = parse_valkey_version(info).unwrap_err();
4559        assert!(matches!(err, ServerError::OperationFailed(_)));
4560        let msg = err.to_string();
4561        assert!(
4562            msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4563            "expected Redis-rejection message, got: {msg}"
4564        );
4565    }
4566
4567    #[test]
4568    fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4569        // INFO values are conventionally lowercase but be defensive.
4570        let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4571        assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4572    }
4573
4574    #[test]
4575    fn parse_valkey_version_errors_when_no_version_field() {
4576        let info = "# Server\r\nfoo:bar\r\n";
4577        let err = parse_valkey_version(info).unwrap_err();
4578        assert!(matches!(err, ServerError::OperationFailed(_)));
4579        assert!(
4580            err.to_string().contains("missing"),
4581            "expected 'missing' in message, got: {err}"
4582        );
4583    }
4584
4585    #[test]
4586    fn parse_valkey_version_errors_on_non_numeric_major() {
4587        let info = "valkey_version:invalid.x.y\n";
4588        let err = parse_valkey_version(info).unwrap_err();
4589        assert!(matches!(err, ServerError::OperationFailed(_)));
4590        assert!(err.to_string().contains("non-numeric major"));
4591    }
4592
4593    #[test]
4594    fn parse_valkey_version_errors_on_non_numeric_minor() {
4595        let info = "valkey_version:7.x.0\n";
4596        let err = parse_valkey_version(info).unwrap_err();
4597        assert!(matches!(err, ServerError::OperationFailed(_)));
4598        assert!(err.to_string().contains("non-numeric minor"));
4599    }
4600
4601    #[test]
4602    fn parse_valkey_version_errors_on_missing_minor() {
4603        // Bare major (no dot) — cannot be compared against a (major, minor)
4604        // floor. Flag as a real parse error.
4605        let info = "valkey_version:7\n";
4606        let err = parse_valkey_version(info).unwrap_err();
4607        assert!(matches!(err, ServerError::OperationFailed(_)));
4608        assert!(err.to_string().contains("missing minor"));
4609    }
4610
4611    #[test]
4612    fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4613        // Simulates cluster-mode INFO response: map of node_addr → body.
4614        // extract_info_bodies must return EVERY node's body so a stale
4615        // pre-upgrade replica cannot hide behind an upgraded primary.
4616        let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4617        let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4618        let map = Value::Map(vec![
4619            (
4620                Value::SimpleString("127.0.0.1:7000".to_string()),
4621                Value::VerbatimString {
4622                    format: ferriskey::value::VerbatimFormat::Text,
4623                    text: body_a.to_string(),
4624                },
4625            ),
4626            (
4627                Value::SimpleString("127.0.0.1:7001".to_string()),
4628                Value::VerbatimString {
4629                    format: ferriskey::value::VerbatimFormat::Text,
4630                    text: body_b.to_string(),
4631                },
4632            ),
4633        ]);
4634        let bodies = extract_info_bodies(&map).unwrap();
4635        assert_eq!(bodies.len(), 2);
4636        assert_eq!(bodies[0], body_a);
4637        assert_eq!(bodies[1], body_b);
4638    }
4639
4640    #[test]
4641    fn extract_info_bodies_handles_simple_string() {
4642        let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4643        let v = Value::SimpleString(body_text.to_string());
4644        let bodies = extract_info_bodies(&v).unwrap();
4645        assert_eq!(bodies, vec![body_text.to_string()]);
4646    }
4647
4648    #[test]
4649    fn extract_info_bodies_rejects_empty_cluster_map() {
4650        let map = Value::Map(vec![]);
4651        let err = extract_info_bodies(&map).unwrap_err();
4652        assert!(matches!(err, ServerError::OperationFailed(_)));
4653        assert!(err.to_string().contains("empty map"));
4654    }
4655
4656    /// End-to-end composition test for the cluster-min fix (issue #84):
4657    /// `extract_info_bodies` → `parse_valkey_version` per node → min-reduce →
4658    /// floor comparison. A mixed-version cluster where one node is 7.1.0 must
4659    /// fail the gate, even if another node is already on 8.0.0 and that
4660    /// node's entry appears first in the map.
4661    #[test]
4662    fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4663        // node1 appears first and is above the floor. Pre-fix behavior
4664        // (first-entry only) would accept. The min across all three nodes is
4665        // (7, 1), below the (7, 2) floor, so the gate must reject.
4666        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4667        let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4668        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4669        let map = Value::Map(vec![
4670            (
4671                Value::SimpleString("node1:6379".to_string()),
4672                Value::VerbatimString {
4673                    format: ferriskey::value::VerbatimFormat::Text,
4674                    text: body_node1.to_string(),
4675                },
4676            ),
4677            (
4678                Value::SimpleString("node2:6379".to_string()),
4679                Value::VerbatimString {
4680                    format: ferriskey::value::VerbatimFormat::Text,
4681                    text: body_node2.to_string(),
4682                },
4683            ),
4684            (
4685                Value::SimpleString("node3:6379".to_string()),
4686                Value::VerbatimString {
4687                    format: ferriskey::value::VerbatimFormat::Text,
4688                    text: body_node3.to_string(),
4689                },
4690            ),
4691        ]);
4692
4693        let bodies = extract_info_bodies(&map).unwrap();
4694        let min = bodies
4695            .iter()
4696            .map(|b| parse_valkey_version(b).unwrap())
4697            .min()
4698            .unwrap();
4699
4700        assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4701        assert!(
4702            min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4703            "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4704        );
4705    }
4706
4707    /// Companion to `parse_valkey_version_min_across_cluster_map_picks_lowest`:
4708    /// when every node is at or above the floor, the min-reduce + gate
4709    /// composition accepts.
4710    #[test]
4711    fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4712        let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4713        let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4714        let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4715        let map = Value::Map(vec![
4716            (
4717                Value::SimpleString("node1:6379".to_string()),
4718                Value::VerbatimString {
4719                    format: ferriskey::value::VerbatimFormat::Text,
4720                    text: body_node1.to_string(),
4721                },
4722            ),
4723            (
4724                Value::SimpleString("node2:6379".to_string()),
4725                Value::VerbatimString {
4726                    format: ferriskey::value::VerbatimFormat::Text,
4727                    text: body_node2.to_string(),
4728                },
4729            ),
4730            (
4731                Value::SimpleString("node3:6379".to_string()),
4732                Value::VerbatimString {
4733                    format: ferriskey::value::VerbatimFormat::Text,
4734                    text: body_node3.to_string(),
4735                },
4736            ),
4737        ]);
4738
4739        let bodies = extract_info_bodies(&map).unwrap();
4740        let min = bodies
4741            .iter()
4742            .map(|b| parse_valkey_version(b).unwrap())
4743            .min()
4744            .unwrap();
4745
4746        assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4747        assert!(
4748            min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4749            "all-above-floor cluster must pass the gate"
4750        );
4751    }
4752
4753    #[test]
4754    fn valkey_version_too_low_is_not_retryable() {
4755        let err = ServerError::ValkeyVersionTooLow {
4756            detected: "7.0".into(),
4757            required: "7.2".into(),
4758        };
4759        assert!(!err.is_retryable());
4760        assert_eq!(err.valkey_kind(), None);
4761    }
4762
4763    #[test]
4764    fn valkey_version_too_low_error_message_includes_both_versions() {
4765        let err = ServerError::ValkeyVersionTooLow {
4766            detected: "7.0".into(),
4767            required: "7.2".into(),
4768        };
4769        let msg = err.to_string();
4770        assert!(msg.contains("7.0"), "detected version in message: {msg}");
4771        assert!(msg.contains("7.2"), "required version in message: {msg}");
4772        assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4773    }
4774}