Skip to main content

ff_server/
server.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9    AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10    CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13    ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14    DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15    ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16    ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17    RevokeLeaseResult,
18    RotateWaitpointHmacSecretArgs,
19    StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22    self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23    IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26    budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27    PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36/// Upper bound on `member_execution_ids` returned in the
37/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
38/// terminal state (idempotent retry). The first (non-idempotent) cancel call
39/// returns the full list; retries only need a sample.
40const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42/// FlowFabric server — connects everything together.
43///
44/// Manages the Valkey connection, Lua library loading, background scanners,
45/// and provides a minimal API for Phase 1.
46pub struct Server {
47    client: Client,
48    /// Dedicated Valkey connection used EXCLUSIVELY for stream-op calls:
49    /// `xread_block` tails AND `ff_read_attempt_stream` range reads.
50    /// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
51    /// processes commands FIFO on it.
52    ///
53    /// Two head-of-line risks motivate the split from the main client:
54    ///
55    /// * **Blocking**: `XREAD BLOCK 30_000` holds the read side until a
56    ///   new entry arrives or `block_ms` elapses.
57    /// * **Large replies**: `XRANGE … COUNT 10_000` with ~64 KB per
58    ///   frame returns a multi-MB reply serialized on one connection.
59    ///
60    /// Sharing either load with the main client would starve every other
61    /// FCALL (create_execution, claim, rotate_waitpoint_secret,
62    /// budget/quota, admin endpoints) AND every engine scanner.
63    ///
64    /// Kept separate from `client` and from the `Engine` scanner client so
65    /// tail latency cannot couple to foreground API latency or background
66    /// scanner cadence. See RFC-006 Impl Notes for the cascading-failure
67    /// rationale.
68    tail_client: Client,
69    /// Bounds concurrent stream-op calls server-wide — read AND tail
70    /// combined. Each caller acquires one permit for the duration of its
71    /// Valkey round-trip(s); contention surfaces as HTTP 429 at the REST
72    /// boundary, not as a silent queue on the stream connection. Default
73    /// size is `FF_MAX_CONCURRENT_STREAM_OPS` (64; legacy env
74    /// `FF_MAX_CONCURRENT_TAIL` accepted for one release).
75    ///
76    /// Read and tail share the same pool deliberately: they share the
77    /// `tail_client`, so fairness accounting must be unified or a flood
78    /// of one can starve the other. The semaphore is also `close()`d on
79    /// shutdown so no new stream ops can start while existing ones drain
80    /// (see `Server::shutdown`).
81    stream_semaphore: Arc<tokio::sync::Semaphore>,
82    /// Serializes `XREAD BLOCK` calls against `tail_client`.
83    ///
84    /// `ferriskey::Client` is a pipelined multiplexed connection — Valkey
85    /// processes commands FIFO on one socket. `XREAD BLOCK` holds the
86    /// connection's read side for the full `block_ms`, so two parallel
87    /// BLOCKs sent down the same mux serialize: the second waits for the
88    /// first to return before its own BLOCK even begins at the server.
89    /// Meanwhile ferriskey's per-call `request_timeout` (auto-extended to
90    /// `block_ms + 500ms`) starts at future-poll on the CLIENT side, so
91    /// the second call's timeout fires before its turn at the server —
92    /// spurious `timed_out` errors under concurrent tail load.
93    ///
94    /// Explicit serialization around `xread_block` removes the
95    /// silent-failure mode: concurrent tails queue on this Mutex (inside
96    /// an already-acquired semaphore permit), then dispatch one at a
97    /// time with their full `block_ms` budget intact. The semaphore
98    /// ceiling (`max_concurrent_stream_ops`) effectively becomes queue
99    /// depth; throughput on the tail client is 1 BLOCK at a time.
100    ///
101    /// V2 upgrade: a pool of N dedicated `ferriskey::Client` connections
102    /// replacing the single `tail_client` + this Mutex. Deferred; the
103    /// Mutex here is correct v1 behavior.
104    ///
105    /// XRANGE reads (`read_attempt_stream`) are NOT gated by this Mutex —
106    /// XRANGE is non-blocking at the server, so pipelined XRANGEs on one
107    /// mux complete in microseconds each and don't trigger the same
108    /// client-side timeout race. Keeping reads unserialized preserves
109    /// read throughput.
110    xread_block_lock: Arc<tokio::sync::Mutex<()>>,
111    /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
112    /// operators rotate ~monthly and can afford to serialize; concurrent
113    /// rotate requests are an attack or misbehaving script. Holding the
114    /// permit also guards against interleaved partial rotations on the
115    /// Server side of the per-partition locks, surfacing contention as
116    /// HTTP 429 instead of silently queueing and blowing past the 120s
117    /// HTTP timeout. See `rotate_waitpoint_secret` handler.
118    admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
119    engine: Engine,
120    config: ServerConfig,
121    /// Background tasks spawned by async handlers (e.g. cancel_flow member
122    /// dispatch). Drained on shutdown with a bounded timeout.
123    background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
124}
125
126/// Server error type.
127#[derive(Debug, thiserror::Error)]
128pub enum ServerError {
129    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
130    #[error("valkey: {0}")]
131    Valkey(#[from] ferriskey::Error),
132    /// Valkey error with additional context (preserves ErrorKind via #[source]).
133    #[error("valkey ({context}): {source}")]
134    ValkeyContext {
135        #[source]
136        source: ferriskey::Error,
137        context: String,
138    },
139    #[error("config: {0}")]
140    Config(#[from] crate::config::ConfigError),
141    #[error("library load: {0}")]
142    LibraryLoad(#[from] ff_script::loader::LoadError),
143    #[error("partition mismatch: {0}")]
144    PartitionMismatch(String),
145    #[error("not found: {0}")]
146    NotFound(String),
147    #[error("invalid input: {0}")]
148    InvalidInput(String),
149    #[error("operation failed: {0}")]
150    OperationFailed(String),
151    #[error("script: {0}")]
152    Script(String),
153    /// Server-wide concurrency limit reached on a labelled pool. Surfaces
154    /// as HTTP 429 at the REST boundary so load balancers and clients can
155    /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
156    /// etc.) identifies WHICH pool is exhausted so operators aren't
157    /// misled by a single "tail unavailable" string when the real fault
158    /// is rotation contention.
159    ///
160    /// Fields: (source_label, max_permits).
161    #[error("too many concurrent {0} calls (max: {1})")]
162    ConcurrencyLimitExceeded(&'static str, u32),
163    /// Detected Valkey version is below the RFC-011 §13 minimum. The hash-slot
164    /// co-location design and Valkey Functions API behavior the engine depends
165    /// on were introduced/stabilized in 8.0; older versions silently behave
166    /// differently on some cluster edge cases. Operator action: upgrade Valkey.
167    #[error(
168        "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
169    )]
170    ValkeyVersionTooLow {
171        detected: String,
172        required: String,
173    },
174}
175
176impl ServerError {
177    /// Returns the underlying ferriskey ErrorKind, if this error carries one.
178    /// Covers direct Valkey variants and library-load failures that bubble a
179    /// `ferriskey::Error` through `LoadError::Valkey`.
180    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
181        match self {
182            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
183            Self::LibraryLoad(e) => e.valkey_kind(),
184            _ => None,
185        }
186    }
187
188    /// Whether this error is safely retryable by a caller. Semantics match
189    /// `ScriptError::class() == Retryable` for Lua errors plus a kind-aware
190    /// check for transport/library-load failures. Business-logic rejections
191    /// (NotFound, InvalidInput, OperationFailed, Script, Config, PartitionMismatch)
192    /// return false — those won't change on retry.
193    pub fn is_retryable(&self) -> bool {
194        match self {
195            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
196                is_retryable_kind(e.kind())
197            }
198            Self::LibraryLoad(load_err) => load_err
199                .valkey_kind()
200                .map(is_retryable_kind)
201                .unwrap_or(false),
202            Self::Config(_)
203            | Self::PartitionMismatch(_)
204            | Self::NotFound(_)
205            | Self::InvalidInput(_)
206            | Self::OperationFailed(_)
207            | Self::Script(_) => false,
208            // Back off and retry — the bound is a server-side permit pool,
209            // so the retry will succeed once a permit frees up. Applies
210            // equally to stream ops, admin rotate, etc.
211            Self::ConcurrencyLimitExceeded(_, _) => true,
212            // Operator must upgrade Valkey; a retry at the caller won't help.
213            Self::ValkeyVersionTooLow { .. } => false,
214        }
215    }
216}
217
218impl Server {
219    /// Start the FlowFabric server.
220    ///
221    /// Boot sequence:
222    /// 1. Connect to Valkey
223    /// 2. Validate or create partition config key
224    /// 3. Load the FlowFabric Lua library
225    /// 4. Start engine (14 background scanners)
226    pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
227        // Step 1: Connect to Valkey via ClientBuilder
228        tracing::info!(
229            host = %config.host, port = config.port,
230            tls = config.tls, cluster = config.cluster,
231            "connecting to Valkey"
232        );
233        let mut builder = ClientBuilder::new()
234            .host(&config.host, config.port)
235            .connect_timeout(Duration::from_secs(10))
236            .request_timeout(Duration::from_millis(5000));
237        if config.tls {
238            builder = builder.tls();
239        }
240        if config.cluster {
241            builder = builder.cluster();
242        }
243        let client = builder
244            .build()
245            .await
246            .map_err(|e| ServerError::ValkeyContext { source: e, context: "connect".into() })?;
247
248        // Verify connectivity
249        let pong: String = client
250            .cmd("PING")
251            .execute()
252            .await
253            .map_err(|e| ServerError::ValkeyContext { source: e, context: "PING".into() })?;
254        if pong != "PONG" {
255            return Err(ServerError::OperationFailed(format!(
256                "unexpected PING response: {pong}"
257            )));
258        }
259        tracing::info!("Valkey connection established");
260
261        // Step 1b: Verify Valkey version meets the RFC-011 §13 minimum (8.0).
262        // Tolerates a rolling upgrade via a 60s exponential-backoff budget
263        // per RFC-011 §9.17 — transient INFO errors during a node restart
264        // don't trip the check until the whole budget is exhausted.
265        verify_valkey_version(&client).await?;
266
267        // Step 2: Validate or create partition config
268        validate_or_create_partition_config(&client, &config.partition_config).await?;
269
270        // Step 2b: Install waitpoint HMAC secret into every execution partition
271        // (RFC-004 §Waitpoint Security). Fail-fast: if any partition fails,
272        // the server refuses to start — a partial install would silently
273        // reject signal deliveries on half the partitions.
274        initialize_waitpoint_hmac_secret(
275            &client,
276            &config.partition_config,
277            &config.waitpoint_hmac_secret,
278        )
279        .await?;
280
281        // Step 3: Load Lua library (skippable for tests where fixture already loaded)
282        if !config.skip_library_load {
283            tracing::info!("loading flowfabric Lua library");
284            ff_script::loader::ensure_library(&client)
285                .await
286                .map_err(ServerError::LibraryLoad)?;
287        } else {
288            tracing::info!("skipping library load (skip_library_load=true)");
289        }
290
291        // Step 4: Start engine with scanners
292        // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
293        let engine_cfg = ff_engine::EngineConfig {
294            partition_config: config.partition_config,
295            lanes: config.lanes.clone(),
296            lease_expiry_interval: config.engine_config.lease_expiry_interval,
297            delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
298            index_reconciler_interval: config.engine_config.index_reconciler_interval,
299            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
300            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
301            pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
302            retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
303            budget_reset_interval: config.engine_config.budget_reset_interval,
304            budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
305            quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
306            unblock_interval: config.engine_config.unblock_interval,
307            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
308            // Push-based DAG promotion (Batch C item 6). Enabled by
309            // default with the server's Valkey endpoint; the engine
310            // spawns a dedicated RESP3 client for SUBSCRIBE.
311            completion_listener: Some(ff_engine::CompletionListenerConfig {
312                addresses: vec![(config.host.clone(), config.port)],
313                tls: config.tls,
314                cluster: config.cluster,
315            }),
316            flow_projector_interval: config.engine_config.flow_projector_interval,
317            execution_deadline_interval: config.engine_config.execution_deadline_interval,
318        };
319        // Engine scanners keep running on the MAIN `client` mux — NOT on
320        // `tail_client`. Scanner cadence is foreground-latency-coupled by
321        // design (an incident that blocks all FCALLs should also visibly
322        // block scanners), and keeping scanners off the tail client means a
323        // long-poll tail can never starve lease-expiry, retention-trim,
324        // etc. Do not change this without revisiting RFC-006 Impl Notes.
325        let engine = Engine::start(engine_cfg, client.clone());
326
327        // Dedicated tail client. Built AFTER library load + HMAC install
328        // because those steps use the main client; tail client only runs
329        // XREAD/XREAD BLOCK + HMGET on stream_meta, so it never needs the
330        // library loaded — but we build it on the same host/port/TLS
331        // options so network reachability is identical.
332        tracing::info!("opening dedicated tail connection");
333        let mut tail_builder = ClientBuilder::new()
334            .host(&config.host, config.port)
335            .connect_timeout(Duration::from_secs(10))
336            // `request_timeout` is ignored for XREAD BLOCK (ferriskey
337            // auto-extends to `block_ms + 500ms` for blocking commands),
338            // but is used for the companion HMGET — 5s matches main.
339            .request_timeout(Duration::from_millis(5000));
340        if config.tls {
341            tail_builder = tail_builder.tls();
342        }
343        if config.cluster {
344            tail_builder = tail_builder.cluster();
345        }
346        let tail_client = tail_builder
347            .build()
348            .await
349            .map_err(|e| ServerError::ValkeyContext {
350                source: e,
351                context: "connect (tail)".into(),
352            })?;
353        let tail_pong: String = tail_client
354            .cmd("PING")
355            .execute()
356            .await
357            .map_err(|e| ServerError::ValkeyContext {
358                source: e,
359                context: "PING (tail)".into(),
360            })?;
361        if tail_pong != "PONG" {
362            return Err(ServerError::OperationFailed(format!(
363                "tail client unexpected PING response: {tail_pong}"
364            )));
365        }
366
367        let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
368            config.max_concurrent_stream_ops as usize,
369        ));
370        let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
371        tracing::info!(
372            max_concurrent_stream_ops = config.max_concurrent_stream_ops,
373            "stream-op client ready (read + tail share the semaphore; \
374             tails additionally serialize via xread_block_lock)"
375        );
376
377        // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
378        // rotation, etc.) are only protected by the global Bearer
379        // middleware in api.rs — which is only installed when
380        // config.api_token is set. Without FF_API_TOKEN, a public
381        // deployment exposes secret rotation to anyone that can reach
382        // the listen_addr. Warn loudly so operators can't miss it; we
383        // don't fail-start because single-tenant dev uses this path.
384        if config.api_token.is_none() {
385            tracing::warn!(
386                listen_addr = %config.listen_addr,
387                "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
388                 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
389                 FF_API_TOKEN for any deployment reachable from untrusted \
390                 networks."
391            );
392            // Explicit callout for the credential-bearing read endpoints.
393            // Auditors grep for these on a per-endpoint basis; lumping
394            // into the admin warning alone hides the fact that
395            // /pending-waitpoints returns HMAC tokens and /result
396            // returns completion payload bytes.
397            tracing::warn!(
398                listen_addr = %config.listen_addr,
399                "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
400                 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
401                 and GET /v1/executions/{{id}}/result returns raw completion payload \
402                 bytes (may contain PII). Both are UNAUTHENTICATED in this \
403                 configuration."
404            );
405        }
406
407        // Partition counts — post-RFC-011 there are three physical families.
408        // Execution keys co-locate with their parent flow's partition (§2 of
409        // the RFC), so `num_flow_partitions` governs both exec and flow
410        // routing; no separate `num_execution_partitions` count exists.
411        tracing::info!(
412            flow_partitions = config.partition_config.num_flow_partitions,
413            budget_partitions = config.partition_config.num_budget_partitions,
414            quota_partitions = config.partition_config.num_quota_partitions,
415            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
416            listen_addr = %config.listen_addr,
417            "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
418            config.partition_config.num_flow_partitions,
419            config.partition_config.num_budget_partitions,
420            config.partition_config.num_quota_partitions,
421        );
422
423        Ok(Self {
424            client,
425            tail_client,
426            stream_semaphore,
427            xread_block_lock,
428            // Single-permit semaphore: only one rotate-waitpoint-secret can
429            // be mid-flight server-wide. Attackers or misbehaving scripts
430            // firing parallel rotations fail fast with 429 instead of
431            // queueing HTTP handlers.
432            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
433            engine,
434            config,
435            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
436        })
437    }
438
439    /// Get a reference to the ferriskey client.
440    pub fn client(&self) -> &Client {
441        &self.client
442    }
443
444    /// Execute an FCALL with automatic Lua library reload on "function not loaded".
445    ///
446    /// After a Valkey failover the new primary may not have the Lua library
447    /// loaded (replication lag or cold replica). This wrapper detects that
448    /// condition, reloads the library via `ff_script::loader::ensure_library`,
449    /// and retries the FCALL once.
450    async fn fcall_with_reload(
451        &self,
452        function: &str,
453        keys: &[&str],
454        args: &[&str],
455    ) -> Result<Value, ServerError> {
456        fcall_with_reload_on_client(&self.client, function, keys, args).await
457    }
458
459    /// Get the server config.
460    pub fn config(&self) -> &ServerConfig {
461        &self.config
462    }
463
464    /// Get the partition config.
465    pub fn partition_config(&self) -> &PartitionConfig {
466        &self.config.partition_config
467    }
468
469    // ── Minimal Phase 1 API ──
470
471    /// Create a new execution.
472    ///
473    /// Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.
474    pub async fn create_execution(
475        &self,
476        args: &CreateExecutionArgs,
477    ) -> Result<CreateExecutionResult, ServerError> {
478        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
479        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
480        let idx = IndexKeys::new(&partition);
481
482        let lane = &args.lane_id;
483        let tag = partition.hash_tag();
484        let idem_key = match &args.idempotency_key {
485            Some(k) if !k.is_empty() => {
486                keys::idempotency_key(&tag, args.namespace.as_str(), k)
487            }
488            _ => ctx.noop(),
489        };
490
491        let delay_str = args
492            .delay_until
493            .map(|d| d.0.to_string())
494            .unwrap_or_default();
495        let is_delayed = !delay_str.is_empty();
496
497        // KEYS (8) must match lua/execution.lua ff_create_execution positional order:
498        //   [1] exec_core, [2] payload, [3] policy, [4] tags,
499        //   [5] scheduling_zset (eligible OR delayed — ONE key),
500        //   [6] idem_key, [7] execution_deadline, [8] all_executions
501        let scheduling_zset = if is_delayed {
502            idx.lane_delayed(lane)
503        } else {
504            idx.lane_eligible(lane)
505        };
506
507        let fcall_keys: Vec<String> = vec![
508            ctx.core(),                  // 1
509            ctx.payload(),               // 2
510            ctx.policy(),                // 3
511            ctx.tags(),                  // 4
512            scheduling_zset,             // 5
513            idem_key,                    // 6
514            idx.execution_deadline(),    // 7
515            idx.all_executions(),        // 8
516        ];
517
518        let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
519
520        // ARGV (13) must match lua/execution.lua ff_create_execution positional order:
521        //   [1] execution_id, [2] namespace, [3] lane_id, [4] execution_kind,
522        //   [5] priority, [6] creator_identity, [7] policy_json,
523        //   [8] input_payload, [9] delay_until, [10] dedup_ttl_ms,
524        //   [11] tags_json, [12] execution_deadline_at, [13] partition_id
525        let fcall_args: Vec<String> = vec![
526            args.execution_id.to_string(),           // 1
527            args.namespace.to_string(),              // 2
528            args.lane_id.to_string(),                // 3
529            args.execution_kind.clone(),             // 4
530            args.priority.to_string(),               // 5
531            args.creator_identity.clone(),           // 6
532            args.policy.as_ref()
533                .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
534                .unwrap_or_else(|| "{}".to_owned()), // 7
535            String::from_utf8_lossy(&args.input_payload).into_owned(), // 8
536            delay_str,                               // 9
537            args.idempotency_key.as_ref()
538                .map(|_| "86400000".to_string())
539                .unwrap_or_default(),                // 10 dedup_ttl_ms
540            tags_json,                               // 11
541            args.execution_deadline_at
542                .map(|d| d.to_string())
543                .unwrap_or_default(),                // 12 execution_deadline_at
544            args.partition_id.to_string(),           // 13
545        ];
546
547        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
548        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
549
550        let raw: Value = self
551            .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
552            .await?;
553
554        parse_create_result(&raw, &args.execution_id)
555    }
556
557    /// Cancel an execution.
558    pub async fn cancel_execution(
559        &self,
560        args: &CancelExecutionArgs,
561    ) -> Result<CancelExecutionResult, ServerError> {
562        let raw = self
563            .fcall_cancel_execution_with_reload(args)
564            .await?;
565        parse_cancel_result(&raw, &args.execution_id)
566    }
567
568    /// Build KEYS/ARGV for `ff_cancel_execution` and invoke via the server's
569    /// reload-capable FCALL. Shared by the inline method and background
570    /// cancel_flow dispatch via [`Self::fcall_cancel_execution_with_reload`].
571    async fn fcall_cancel_execution_with_reload(
572        &self,
573        args: &CancelExecutionArgs,
574    ) -> Result<Value, ServerError> {
575        let (keys, argv) = build_cancel_execution_fcall(
576            &self.client,
577            &self.config.partition_config,
578            args,
579        )
580        .await?;
581        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
582        let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
583        self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
584    }
585
586    /// Get the public state of an execution.
587    ///
588    /// Reads `public_state` from the exec_core hash. Returns the parsed
589    /// PublicState enum. If the execution is not found, returns an error.
590    pub async fn get_execution_state(
591        &self,
592        execution_id: &ExecutionId,
593    ) -> Result<PublicState, ServerError> {
594        let partition = execution_partition(execution_id, &self.config.partition_config);
595        let ctx = ExecKeyContext::new(&partition, execution_id);
596
597        let state_str: Option<String> = self
598            .client
599            .hget(&ctx.core(), "public_state")
600            .await
601            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET public_state".into() })?;
602
603        match state_str {
604            Some(s) => {
605                let quoted = format!("\"{s}\"");
606                serde_json::from_str(&quoted).map_err(|e| {
607                    ServerError::Script(format!(
608                        "invalid public_state '{s}' for {execution_id}: {e}"
609                    ))
610                })
611            }
612            None => Err(ServerError::NotFound(format!(
613                "execution not found: {execution_id}"
614            ))),
615        }
616    }
617
618    /// Read the raw result payload written by `ff_complete_execution`.
619    ///
620    /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
621    /// No FCALL — this is a direct GET; returns `Ok(None)` when the
622    /// execution is missing, not yet complete, or (in a future
623    /// retention-policy world) when the result was trimmed.
624    ///
625    /// # Contract vs `get_execution_state`
626    ///
627    /// `get_execution_state` is the authoritative completion signal. If
628    /// a caller observes `state == completed` but `get_execution_result`
629    /// returns `None`, the result bytes are unavailable — not a caller
630    /// bug and not a server bug, just the retention policy trimming the
631    /// blob. V1 sets no retention, so callers on v1 can treat
632    /// `state == completed` + `Ok(None)` as a server bug.
633    ///
634    /// # Ordering
635    ///
636    /// Callers MUST wait for `state == completed` before calling this
637    /// method; polls issued before the state transition may hit a
638    /// narrow window where the completion Lua has written
639    /// `public_state = completed` but the `result` key SET is still
640    /// on-wire. The current Lua `ff_complete_execution` writes both in
641    /// the same atomic script, so the window is effectively zero for
642    /// direct callers — but retries via `ff_replay_execution` open it
643    /// briefly.
644    pub async fn get_execution_result(
645        &self,
646        execution_id: &ExecutionId,
647    ) -> Result<Option<Vec<u8>>, ServerError> {
648        let partition = execution_partition(execution_id, &self.config.partition_config);
649        let ctx = ExecKeyContext::new(&partition, execution_id);
650
651        // Binary-safe read. Decoding into `String` would reject any
652        // non-UTF-8 payload (bincode-encoded f32 vectors, compressed
653        // artifacts, arbitrary binary stages) with a ferriskey decode
654        // error that surfaces as HTTP 500. The endpoint response is
655        // already `application/octet-stream`; `Vec<u8>` has a
656        // ferriskey-specialized `FromValue` impl that preserves raw
657        // bytes without UTF-8 validation (see ferriskey/src/value.rs:
658        // `from_byte_vec`). Nil → None via the blanket
659        // `Option<T: FromValue>` wrapper.
660        let payload: Option<Vec<u8>> = self
661            .client
662            .cmd("GET")
663            .arg(ctx.result())
664            .execute()
665            .await
666            .map_err(|e| ServerError::ValkeyContext {
667                source: e,
668                context: "GET exec result".into(),
669            })?;
670        Ok(payload)
671    }
672
673    /// List the active (`pending` or `active`) waitpoints for an execution.
674    ///
675    /// Returns one [`PendingWaitpointInfo`] per open waitpoint, including the
676    /// HMAC-SHA1 `waitpoint_token` needed to deliver authenticated signals.
677    /// `closed` waitpoints are elided — callers looking at history should
678    /// read the stream or lease history instead.
679    ///
680    /// Read plan: SSCAN `ctx.waitpoints()` with `COUNT 100` (bounded
681    /// page size, matching the unblock / flow-projector / budget-
682    /// reconciler convention) to enumerate waitpoint IDs, then TWO
683    /// pipelines:
684    ///
685    /// * Pass 1 — single round-trip containing, per waitpoint, one
686    ///   HMGET over the documented field set + one HGET for
687    ///   `total_matchers` on the condition hash.
688    /// * Pass 2 (conditional) — single round-trip containing an
689    ///   HMGET per waitpoint with `total_matchers > 0` to read the
690    ///   `matcher:N:name` fields.
691    ///
692    /// No FCALL — this is a read-only view built from already-
693    /// persisted state, so skipping Lua keeps the Valkey single-
694    /// writer path uncontended. HMGET (vs HGETALL) bounds the per-
695    /// waitpoint read to the documented field set and defends
696    /// against a poisoned waitpoint hash with unbounded extra fields
697    /// accumulating response memory.
698    ///
699    /// # Empty result semantics (TOCTOU)
700    ///
701    /// An empty `Vec` is returned in three cases:
702    ///
703    /// 1. The execution exists but has never suspended.
704    /// 2. All existing waitpoints are `closed` (already resolved).
705    /// 3. A narrow teardown race: `SSCAN` read the waitpoint set after
706    ///    a concurrent `ff_close_waitpoint` or execution-cleanup script
707    ///    deleted the waitpoint hashes but before it SREM'd the set
708    ///    members. Each HMGET returns all-None and we skip.
709    ///
710    /// Callers that get an unexpected empty list should cross-check
711    /// execution state (`get_execution_state`) to distinguish "pipeline
712    /// moved past suspended" from "nothing to review yet".
713    ///
714    /// A waitpoint hash that's present but missing its `waitpoint_token`
715    /// field is similarly elided and a server-side WARN is emitted —
716    /// this indicates storage corruption (a write that half-populated
717    /// the hash) and operators should investigate.
718    pub async fn list_pending_waitpoints(
719        &self,
720        execution_id: &ExecutionId,
721    ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
722        let partition = execution_partition(execution_id, &self.config.partition_config);
723        let ctx = ExecKeyContext::new(&partition, execution_id);
724
725        let core_exists: bool = self
726            .client
727            .cmd("EXISTS")
728            .arg(ctx.core())
729            .execute()
730            .await
731            .map_err(|e| ServerError::ValkeyContext {
732                source: e,
733                context: "EXISTS exec_core (pending waitpoints)".into(),
734            })?;
735        if !core_exists {
736            return Err(ServerError::NotFound(format!(
737                "execution not found: {execution_id}"
738            )));
739        }
740
741        // SSCAN the waitpoints set in bounded pages. SMEMBERS would
742        // load the entire set in one reply — fine for today's
743        // single-waitpoint executions, but the codebase convention
744        // (budget_reconciler, flow_projector, unblock scanner) is SSCAN
745        // COUNT 100 so response size per round-trip is bounded as the
746        // set grows. Match the precedent instead of carving a new
747        // pattern here.
748        const WAITPOINTS_SSCAN_COUNT: usize = 100;
749        let waitpoints_key = ctx.waitpoints();
750        let mut wp_ids_raw: Vec<String> = Vec::new();
751        let mut cursor: String = "0".to_owned();
752        loop {
753            let reply: (String, Vec<String>) = self
754                .client
755                .cmd("SSCAN")
756                .arg(&waitpoints_key)
757                .arg(&cursor)
758                .arg("COUNT")
759                .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
760                .execute()
761                .await
762                .map_err(|e| ServerError::ValkeyContext {
763                    source: e,
764                    context: "SSCAN waitpoints".into(),
765                })?;
766            cursor = reply.0;
767            wp_ids_raw.extend(reply.1);
768            if cursor == "0" {
769                break;
770            }
771        }
772
773        // SSCAN may observe a member more than once across iterations —
774        // that is documented Valkey behavior, not a bug (see
775        // https://valkey.io/commands/sscan/). Dedup in-place before
776        // building the typed id list so the HTTP response and the
777        // downstream pipelined HMGETs don't duplicate work. Sort +
778        // dedup keeps this O(n log n) without a BTreeSet allocation;
779        // typical n is 1 (the single waitpoint on a suspended exec).
780        wp_ids_raw.sort_unstable();
781        wp_ids_raw.dedup();
782
783        if wp_ids_raw.is_empty() {
784            return Ok(Vec::new());
785        }
786
787        // Parse + filter out malformed waitpoint_ids up-front so the
788        // downstream pipeline indexes stay aligned to the Vec of
789        // typed ids.
790        let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
791        for raw in &wp_ids_raw {
792            match WaitpointId::parse(raw) {
793                Ok(id) => wp_ids.push(id),
794                Err(e) => tracing::warn!(
795                    raw_id = %raw,
796                    error = %e,
797                    execution_id = %execution_id,
798                    "list_pending_waitpoints: skipping unparseable waitpoint_id"
799                ),
800            }
801        }
802        if wp_ids.is_empty() {
803            return Ok(Vec::new());
804        }
805
806        // Bounded HMGET field set — these are the six hash fields that
807        // surface in `PendingWaitpointInfo`. Fixed-size indexing into
808        // the response below tracks this order.
809        const WP_FIELDS: [&str; 6] = [
810            "state",
811            "waitpoint_key",
812            "waitpoint_token",
813            "created_at",
814            "activated_at",
815            "expires_at",
816        ];
817
818        // Pass 1 pipeline: waitpoint HMGET + condition HGET
819        // total_matchers, one of each per waitpoint, in a SINGLE
820        // round-trip. Sequential HMGETs were an N-round-trip latency
821        // floor on flows with fan-out waitpoints.
822        let mut pass1 = self.client.pipeline();
823        let mut wp_slots = Vec::with_capacity(wp_ids.len());
824        let mut cond_slots = Vec::with_capacity(wp_ids.len());
825        for wp_id in &wp_ids {
826            let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
827            cmd = cmd.arg(ctx.waitpoint(wp_id));
828            for f in WP_FIELDS {
829                cmd = cmd.arg(f);
830            }
831            wp_slots.push(cmd.finish());
832
833            cond_slots.push(
834                pass1
835                    .cmd::<Option<String>>("HGET")
836                    .arg(ctx.waitpoint_condition(wp_id))
837                    .arg("total_matchers")
838                    .finish(),
839            );
840        }
841        pass1
842            .execute()
843            .await
844            .map_err(|e| ServerError::ValkeyContext {
845                source: e,
846                context: "pipeline HMGET waitpoints + HGET total_matchers".into(),
847            })?;
848
849        // Collect pass-1 results + queue pass-2 HMGETs for condition
850        // matcher names on waitpoints that are actionable (state in
851        // pending/active, non-empty token, total_matchers > 0). PipeSlot
852        // values are owning, so iterate all three ordered zip'd iters
853        // and consume them together.
854        struct Kept {
855            wp_id: WaitpointId,
856            wp_fields: Vec<Option<String>>,
857            total_matchers: usize,
858        }
859        let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
860        for ((wp_id, wp_slot), cond_slot) in wp_ids
861            .iter()
862            .zip(wp_slots)
863            .zip(cond_slots)
864        {
865            let wp_fields: Vec<Option<String>> =
866                wp_slot.value().map_err(|e| ServerError::ValkeyContext {
867                    source: e,
868                    context: format!("pipeline slot HMGET waitpoint {wp_id}"),
869                })?;
870
871            // Waitpoint hash may have been GC'd between the SSCAN and
872            // this HMGET (rotation / cleanup race). Skip silently.
873            if wp_fields.iter().all(Option::is_none) {
874                // Still consume the cond_slot to free its buffer.
875                let _ = cond_slot.value();
876                continue;
877            }
878            let state_ref = wp_fields
879                .first()
880                .and_then(|v| v.as_deref())
881                .unwrap_or("");
882            if state_ref != "pending" && state_ref != "active" {
883                let _ = cond_slot.value();
884                continue;
885            }
886            let token_ref = wp_fields
887                .get(2)
888                .and_then(|v| v.as_deref())
889                .unwrap_or("");
890            if token_ref.is_empty() {
891                let _ = cond_slot.value();
892                tracing::warn!(
893                    waitpoint_id = %wp_id,
894                    execution_id = %execution_id,
895                    waitpoint_hash_key = %ctx.waitpoint(wp_id),
896                    state = %state_ref,
897                    "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
898                     field is empty — likely storage corruption (half-populated write, \
899                     operator edit, or interrupted script). Skipping this entry in the \
900                     response. HGETALL the waitpoint_hash_key to inspect."
901                );
902                continue;
903            }
904
905            let total_matchers = cond_slot
906                .value()
907                .map_err(|e| ServerError::ValkeyContext {
908                    source: e,
909                    context: format!("pipeline slot HGET total_matchers {wp_id}"),
910                })?
911                .and_then(|s| s.parse::<usize>().ok())
912                .unwrap_or(0);
913
914            kept.push(Kept {
915                wp_id: wp_id.clone(),
916                wp_fields,
917                total_matchers,
918            });
919        }
920
921        if kept.is_empty() {
922            return Ok(Vec::new());
923        }
924
925        // Pass 2 pipeline: matcher-name HMGETs for every kept waitpoint
926        // with total_matchers > 0. Single round-trip regardless of
927        // per-waitpoint matcher count. Waitpoints with total_matchers==0
928        // (wildcard) skip the HMGET entirely.
929        let mut pass2 = self.client.pipeline();
930        let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
931        let mut pass2_needed = false;
932        for k in &kept {
933            if k.total_matchers == 0 {
934                matcher_slots.push(None);
935                continue;
936            }
937            pass2_needed = true;
938            let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
939            cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
940            for i in 0..k.total_matchers {
941                cmd = cmd.arg(format!("matcher:{i}:name"));
942            }
943            matcher_slots.push(Some(cmd.finish()));
944        }
945        if pass2_needed {
946            pass2.execute().await.map_err(|e| ServerError::ValkeyContext {
947                source: e,
948                context: "pipeline HMGET wp_condition matchers".into(),
949            })?;
950        }
951
952        let parse_ts = |raw: &str| -> Option<TimestampMs> {
953            if raw.is_empty() {
954                None
955            } else {
956                raw.parse::<i64>().ok().map(TimestampMs)
957            }
958        };
959
960        let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
961        for (k, slot) in kept.into_iter().zip(matcher_slots) {
962            let get = |i: usize| -> &str {
963                k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
964            };
965
966            // Collect matcher names, eliding the empty-name wildcard
967            // sentinel (see lua/helpers.lua initialize_condition).
968            let required_signal_names: Vec<String> = match slot {
969                None => Vec::new(),
970                Some(s) => {
971                    let vals: Vec<Option<String>> =
972                        s.value().map_err(|e| ServerError::ValkeyContext {
973                            source: e,
974                            context: format!(
975                                "pipeline slot HMGET wp_condition matchers {}",
976                                k.wp_id
977                            ),
978                        })?;
979                    vals.into_iter()
980                        .flatten()
981                        .filter(|name| !name.is_empty())
982                        .collect()
983                }
984            };
985
986            out.push(PendingWaitpointInfo {
987                waitpoint_id: k.wp_id,
988                waitpoint_key: get(1).to_owned(),
989                state: get(0).to_owned(),
990                waitpoint_token: WaitpointToken(get(2).to_owned()),
991                required_signal_names,
992                created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
993                activated_at: parse_ts(get(4)),
994                expires_at: parse_ts(get(5)),
995            });
996        }
997
998        Ok(out)
999    }
1000
1001    // ── Budget / Quota API ──
1002
1003    /// Create a new budget policy.
1004    pub async fn create_budget(
1005        &self,
1006        args: &CreateBudgetArgs,
1007    ) -> Result<CreateBudgetResult, ServerError> {
1008        let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1009        let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1010        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1011        let policies_index = keys::budget_policies_index(bctx.hash_tag());
1012
1013        // KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,
1014        //           budget_policies_index
1015        let fcall_keys: Vec<String> = vec![
1016            bctx.definition(),
1017            bctx.limits(),
1018            bctx.usage(),
1019            resets_key,
1020            policies_index,
1021        ];
1022
1023        // ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,
1024        //   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
1025        //   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
1026        let dim_count = args.dimensions.len();
1027        let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1028        fcall_args.push(args.budget_id.to_string());
1029        fcall_args.push(args.scope_type.clone());
1030        fcall_args.push(args.scope_id.clone());
1031        fcall_args.push(args.enforcement_mode.clone());
1032        fcall_args.push(args.on_hard_limit.clone());
1033        fcall_args.push(args.on_soft_limit.clone());
1034        fcall_args.push(args.reset_interval_ms.to_string());
1035        fcall_args.push(args.now.to_string());
1036        fcall_args.push(dim_count.to_string());
1037        for dim in &args.dimensions {
1038            fcall_args.push(dim.clone());
1039        }
1040        for hard in &args.hard_limits {
1041            fcall_args.push(hard.to_string());
1042        }
1043        for soft in &args.soft_limits {
1044            fcall_args.push(soft.to_string());
1045        }
1046
1047        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1048        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1049
1050        let raw: Value = self
1051            .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1052            .await?;
1053
1054        parse_budget_create_result(&raw, &args.budget_id)
1055    }
1056
1057    /// Create a new quota/rate-limit policy.
1058    pub async fn create_quota_policy(
1059        &self,
1060        args: &CreateQuotaPolicyArgs,
1061    ) -> Result<CreateQuotaPolicyResult, ServerError> {
1062        let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1063        let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1064
1065        // KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,
1066        //           admitted_set, quota_policies_index
1067        let fcall_keys: Vec<String> = vec![
1068            qctx.definition(),
1069            qctx.window("requests_per_window"),
1070            qctx.concurrency(),
1071            qctx.admitted_set(),
1072            keys::quota_policies_index(qctx.hash_tag()),
1073        ];
1074
1075        // ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,
1076        //           max_concurrent, now_ms
1077        let fcall_args: Vec<String> = vec![
1078            args.quota_policy_id.to_string(),
1079            args.window_seconds.to_string(),
1080            args.max_requests_per_window.to_string(),
1081            args.max_concurrent.to_string(),
1082            args.now.to_string(),
1083        ];
1084
1085        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1086        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1087
1088        let raw: Value = self
1089            .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1090            .await?;
1091
1092        parse_quota_create_result(&raw, &args.quota_policy_id)
1093    }
1094
1095    /// Read-only budget status for operator visibility.
1096    pub async fn get_budget_status(
1097        &self,
1098        budget_id: &BudgetId,
1099    ) -> Result<BudgetStatus, ServerError> {
1100        let partition = budget_partition(budget_id, &self.config.partition_config);
1101        let bctx = BudgetKeyContext::new(&partition, budget_id);
1102
1103        // Read budget definition
1104        let def: HashMap<String, String> = self
1105            .client
1106            .hgetall(&bctx.definition())
1107            .await
1108            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_def".into() })?;
1109
1110        if def.is_empty() {
1111            return Err(ServerError::NotFound(format!(
1112                "budget not found: {budget_id}"
1113            )));
1114        }
1115
1116        // Read usage
1117        let usage_raw: HashMap<String, String> = self
1118            .client
1119            .hgetall(&bctx.usage())
1120            .await
1121            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_usage".into() })?;
1122        let usage: HashMap<String, u64> = usage_raw
1123            .into_iter()
1124            .filter(|(k, _)| k != "_init")
1125            .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1126            .collect();
1127
1128        // Read limits
1129        let limits_raw: HashMap<String, String> = self
1130            .client
1131            .hgetall(&bctx.limits())
1132            .await
1133            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_limits".into() })?;
1134        let mut hard_limits = HashMap::new();
1135        let mut soft_limits = HashMap::new();
1136        for (k, v) in &limits_raw {
1137            if let Some(dim) = k.strip_prefix("hard:") {
1138                hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1139            } else if let Some(dim) = k.strip_prefix("soft:") {
1140                soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1141            }
1142        }
1143
1144        let non_empty = |s: Option<&String>| -> Option<String> {
1145            s.filter(|v| !v.is_empty()).cloned()
1146        };
1147
1148        Ok(BudgetStatus {
1149            budget_id: budget_id.to_string(),
1150            scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1151            scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1152            enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1153            usage,
1154            hard_limits,
1155            soft_limits,
1156            breach_count: def
1157                .get("breach_count")
1158                .and_then(|v| v.parse().ok())
1159                .unwrap_or(0),
1160            soft_breach_count: def
1161                .get("soft_breach_count")
1162                .and_then(|v| v.parse().ok())
1163                .unwrap_or(0),
1164            last_breach_at: non_empty(def.get("last_breach_at")),
1165            last_breach_dim: non_empty(def.get("last_breach_dim")),
1166            next_reset_at: non_empty(def.get("next_reset_at")),
1167            created_at: non_empty(def.get("created_at")),
1168        })
1169    }
1170
1171    /// Report usage against a budget and check limits.
1172    pub async fn report_usage(
1173        &self,
1174        budget_id: &BudgetId,
1175        args: &ReportUsageArgs,
1176    ) -> Result<ReportUsageResult, ServerError> {
1177        let partition = budget_partition(budget_id, &self.config.partition_config);
1178        let bctx = BudgetKeyContext::new(&partition, budget_id);
1179
1180        // KEYS (3): budget_usage, budget_limits, budget_def
1181        let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1182
1183        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
1184        let dim_count = args.dimensions.len();
1185        let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1186        fcall_args.push(dim_count.to_string());
1187        for dim in &args.dimensions {
1188            fcall_args.push(dim.clone());
1189        }
1190        for delta in &args.deltas {
1191            fcall_args.push(delta.to_string());
1192        }
1193        fcall_args.push(args.now.to_string());
1194        let dedup_key_val = args
1195            .dedup_key
1196            .as_ref()
1197            .filter(|k| !k.is_empty())
1198            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1199            .unwrap_or_default();
1200        fcall_args.push(dedup_key_val);
1201
1202        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1203        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1204
1205        let raw: Value = self
1206            .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1207            .await?;
1208
1209        parse_report_usage_result(&raw)
1210    }
1211
1212    /// Reset a budget's usage counters and schedule the next reset.
1213    pub async fn reset_budget(
1214        &self,
1215        budget_id: &BudgetId,
1216    ) -> Result<ResetBudgetResult, ServerError> {
1217        let partition = budget_partition(budget_id, &self.config.partition_config);
1218        let bctx = BudgetKeyContext::new(&partition, budget_id);
1219        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1220
1221        // KEYS (3): budget_def, budget_usage, budget_resets_zset
1222        let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1223
1224        // ARGV (2): budget_id, now_ms
1225        let now = TimestampMs::now();
1226        let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1227
1228        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1229        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1230
1231        let raw: Value = self
1232            .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1233            .await?;
1234
1235        parse_reset_budget_result(&raw)
1236    }
1237
1238    // ── Flow API ──
1239
1240    /// Create a new flow container.
1241    pub async fn create_flow(
1242        &self,
1243        args: &CreateFlowArgs,
1244    ) -> Result<CreateFlowResult, ServerError> {
1245        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1246        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1247        let fidx = FlowIndexKeys::new(&partition);
1248
1249        // KEYS (3): flow_core, members_set, flow_index
1250        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1251
1252        // ARGV (4): flow_id, flow_kind, namespace, now_ms
1253        let fcall_args: Vec<String> = vec![
1254            args.flow_id.to_string(),
1255            args.flow_kind.clone(),
1256            args.namespace.to_string(),
1257            args.now.to_string(),
1258        ];
1259
1260        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1261        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1262
1263        let raw: Value = self
1264            .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1265            .await?;
1266
1267        parse_create_flow_result(&raw, &args.flow_id)
1268    }
1269
1270    /// Add an execution to a flow.
1271    ///
1272    /// # Atomic single-FCALL commit (RFC-011 §7.3)
1273    ///
1274    /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
1275    /// hash-tag routing (both hash to `{fp:N}` via the exec id's
1276    /// embedded partition). A single atomic FCALL writes:
1277    ///
1278    ///   - `members_set` SADD (flow membership)
1279    ///   - `exec_core.flow_id` HSET (back-pointer)
1280    ///   - `flow_index` SADD (self-heal)
1281    ///   - `flow_core` HINCRBY node_count / graph_revision +
1282    ///     HSET last_mutation_at
1283    ///
1284    /// All four writes commit atomically or none do (Valkey scripting
1285    /// contract: validates-before-writing in the Lua body means
1286    /// `flow_not_found` / `flow_already_terminal` early-returns fire
1287    /// BEFORE any `redis.call()` mutation, and a mid-body error after
1288    /// writes is not expected because all writes are on the same slot).
1289    ///
1290    /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
1291    ///
1292    /// # Consumer contract
1293    ///
1294    /// The caller's `args.execution_id` **must** be co-located with
1295    /// `args.flow_id`'s partition — i.e. minted via
1296    /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
1297    /// `solo`-minted id (or any exec id hashing to a different
1298    /// `{fp:N}` than the flow's) will fail at the Valkey level with
1299    /// `CROSSSLOT` on a clustered deploy.
1300    ///
1301    /// Callers with a flow context in scope always use `for_flow`;
1302    /// this is the only supported mint path for flow-member execs
1303    /// post-RFC-011. Test fixtures that pre-date the co-location
1304    /// contract use `TestCluster::new_execution_id_on_partition` to
1305    /// pin to a specific hash-tag index for `fcall_create_flow`-style
1306    /// helpers that hard-code their flow partition.
1307    pub async fn add_execution_to_flow(
1308        &self,
1309        args: &AddExecutionToFlowArgs,
1310    ) -> Result<AddExecutionToFlowResult, ServerError> {
1311        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1312        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1313        let fidx = FlowIndexKeys::new(&partition);
1314
1315        // exec_core co-locates with flow_core under RFC-011 §7.3 —
1316        // same `{fp:N}` hash-tag, same slot, part of the same atomic
1317        // FCALL below.
1318        let exec_partition =
1319            execution_partition(&args.execution_id, &self.config.partition_config);
1320        let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1321
1322        // Pre-flight: exec_partition must match flow_partition under
1323        // RFC-011 §7.3 co-location contract. If the caller hands us a
1324        // `solo`-minted exec whose hash-tag ≠ flow_partition, the FCALL
1325        // would fail with raw `CROSSSLOT` on a clustered deploy — a
1326        // typed `ServerError::PartitionMismatch` is a clearer signal
1327        // that the consumer-contract invariant was violated at mint
1328        // time (caller should have used `ExecutionId::for_flow(...)`).
1329        // See the Consumer contract section of this method's rustdoc.
1330        if exec_partition.index != partition.index {
1331            return Err(ServerError::PartitionMismatch(format!(
1332                "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1333                 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1334                 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1335                exec_p = exec_partition.index,
1336                flow_p = partition.index,
1337            )));
1338        }
1339
1340        // KEYS (4): flow_core, members_set, flow_index, exec_core
1341        let fcall_keys: Vec<String> = vec![
1342            fctx.core(),
1343            fctx.members(),
1344            fidx.flow_index(),
1345            ectx.core(),
1346        ];
1347
1348        // ARGV (3): flow_id, execution_id, now_ms
1349        let fcall_args: Vec<String> = vec![
1350            args.flow_id.to_string(),
1351            args.execution_id.to_string(),
1352            args.now.to_string(),
1353        ];
1354
1355        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1356        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1357
1358        let raw: Value = self
1359            .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1360            .await?;
1361
1362        parse_add_execution_to_flow_result(&raw)
1363    }
1364
1365    /// Cancel a flow.
1366    ///
1367    /// Flips `public_flow_state` to `cancelled` atomically via
1368    /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
1369    /// executions must be cancelled cross-partition; this dispatch runs in
1370    /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
1371    /// immediately. For all other policies (or flows with no members), or
1372    /// when the flow was already in a terminal state (idempotent retry),
1373    /// the call returns [`CancelFlowResult::Cancelled`].
1374    ///
1375    /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
1376    ///
1377    /// # Backpressure
1378    ///
1379    /// Each call that hits the async dispatch path spawns a new task into
1380    /// the shared background `JoinSet`. Rapid repeated calls against the
1381    /// same flow will spawn *multiple* overlapping dispatch tasks. This is
1382    /// not a correctness issue — each member cancel is idempotent and
1383    /// terminal flows short-circuit via [`ParsedCancelFlow::AlreadyTerminal`]
1384    /// — but heavy burst callers should either use `?wait=true` (serialises
1385    /// the dispatch on the HTTP thread, giving natural backpressure) or
1386    /// implement client-side deduplication on `flow_id`. The `JoinSet` is
1387    /// drained with a 15s timeout on [`Self::shutdown`], so very long
1388    /// dispatch tails may be aborted during graceful shutdown.
1389    ///
1390    /// # Orphan-member semantics on shutdown abort
1391    ///
1392    /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
1393    /// while a dispatch loop is mid-iteration, the already-issued
1394    /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
1395    /// `terminal_outcome = cancelled` and the caller-supplied reason. The
1396    /// members not yet visited are abandoned mid-loop. They remain in
1397    /// whichever state they were in (active/eligible/suspended) until the
1398    /// natural lifecycle scanners reach them: active leases expire
1399    /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
1400    /// members time out to `skipped`, eligible ones sit until retention
1401    /// trim. So no orphan state — but the terminal_outcome for the
1402    /// abandoned members will be `expired`/`skipped` rather than
1403    /// `cancelled`, and the operator-supplied `reason` is lost for them.
1404    /// Audit tooling that requires reason fidelity across shutdowns should
1405    /// use `?wait=true`.
1406    pub async fn cancel_flow(
1407        &self,
1408        args: &CancelFlowArgs,
1409    ) -> Result<CancelFlowResult, ServerError> {
1410        self.cancel_flow_inner(args, false).await
1411    }
1412
1413    /// Cancel a flow and wait for all member cancellations to complete
1414    /// inline. Slower than [`Self::cancel_flow`] for large flows, but
1415    /// guarantees every member is in a terminal state on return.
1416    pub async fn cancel_flow_wait(
1417        &self,
1418        args: &CancelFlowArgs,
1419    ) -> Result<CancelFlowResult, ServerError> {
1420        self.cancel_flow_inner(args, true).await
1421    }
1422
1423    async fn cancel_flow_inner(
1424        &self,
1425        args: &CancelFlowArgs,
1426        wait: bool,
1427    ) -> Result<CancelFlowResult, ServerError> {
1428        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1429        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1430        let fidx = FlowIndexKeys::new(&partition);
1431
1432        // KEYS (3): flow_core, members_set, flow_index
1433        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1434
1435        // ARGV (4): flow_id, reason, cancellation_policy, now_ms
1436        let fcall_args: Vec<String> = vec![
1437            args.flow_id.to_string(),
1438            args.reason.clone(),
1439            args.cancellation_policy.clone(),
1440            args.now.to_string(),
1441        ];
1442
1443        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1444        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1445
1446        let raw: Value = self
1447            .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1448            .await?;
1449
1450        let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1451            ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1452                (policy, member_execution_ids)
1453            }
1454            // Idempotent retry: flow was already cancelled/completed/failed.
1455            // Return Cancelled with the *stored* policy and member list so
1456            // observability tooling gets the real historical state rather
1457            // than echoing the caller's retry intent. One HMGET + SMEMBERS
1458            // on the idempotent path — both on {fp:N}, same slot.
1459            ParsedCancelFlow::AlreadyTerminal => {
1460                let flow_meta: Vec<Option<String>> = self
1461                    .client
1462                    .cmd("HMGET")
1463                    .arg(fctx.core())
1464                    .arg("cancellation_policy")
1465                    .arg("cancel_reason")
1466                    .execute()
1467                    .await
1468                    .map_err(|e| ServerError::ValkeyContext {
1469                        source: e,
1470                        context: "HMGET flow_core cancellation_policy,cancel_reason".into(),
1471                    })?;
1472                let stored_policy = flow_meta
1473                    .first()
1474                    .and_then(|v| v.as_ref())
1475                    .filter(|s| !s.is_empty())
1476                    .cloned();
1477                let stored_reason = flow_meta
1478                    .get(1)
1479                    .and_then(|v| v.as_ref())
1480                    .filter(|s| !s.is_empty())
1481                    .cloned();
1482                let all_members: Vec<String> = self
1483                    .client
1484                    .cmd("SMEMBERS")
1485                    .arg(fctx.members())
1486                    .execute()
1487                    .await
1488                    .map_err(|e| ServerError::ValkeyContext {
1489                        source: e,
1490                        context: "SMEMBERS flow members (already terminal)".into(),
1491                    })?;
1492                // Cap the returned list to avoid pathological bandwidth on
1493                // idempotent retries for flows with 10k+ members. Clients
1494                // already received the authoritative member list on the
1495                // first (non-idempotent) call; subsequent retries just need
1496                // enough to confirm the operation and trigger per-member
1497                // polling if desired.
1498                let total_members = all_members.len();
1499                let stored_members: Vec<String> = all_members
1500                    .into_iter()
1501                    .take(ALREADY_TERMINAL_MEMBER_CAP)
1502                    .collect();
1503                tracing::debug!(
1504                    flow_id = %args.flow_id,
1505                    stored_policy = stored_policy.as_deref().unwrap_or(""),
1506                    stored_reason = stored_reason.as_deref().unwrap_or(""),
1507                    total_members,
1508                    returned_members = stored_members.len(),
1509                    "cancel_flow: flow already terminal, returning idempotent Cancelled"
1510                );
1511                return Ok(CancelFlowResult::Cancelled {
1512                    // Fall back to caller's policy only if the stored field
1513                    // is missing (flows cancelled by older Lua that did not
1514                    // persist cancellation_policy).
1515                    cancellation_policy: stored_policy
1516                        .unwrap_or_else(|| args.cancellation_policy.clone()),
1517                    member_execution_ids: stored_members,
1518                });
1519            }
1520        };
1521        let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1522
1523        if !needs_dispatch {
1524            return Ok(CancelFlowResult::Cancelled {
1525                cancellation_policy: policy,
1526                member_execution_ids: members,
1527            });
1528        }
1529
1530        if wait {
1531            // Synchronous dispatch — cancel every member inline before returning.
1532            for eid_str in &members {
1533                if let Err(e) = cancel_member_execution(
1534                    &self.client,
1535                    &self.config.partition_config,
1536                    eid_str,
1537                    &args.reason,
1538                    args.now,
1539                )
1540                .await
1541                {
1542                    tracing::warn!(
1543                        execution_id = %eid_str,
1544                        error = %e,
1545                        "cancel_flow(wait): individual execution cancel failed (may be terminal)"
1546                    );
1547                }
1548            }
1549            return Ok(CancelFlowResult::Cancelled {
1550                cancellation_policy: policy,
1551                member_execution_ids: members,
1552            });
1553        }
1554
1555        // Asynchronous dispatch — spawn into the shared JoinSet so Server::shutdown
1556        // can wait for pending cancellations (bounded by a shutdown timeout).
1557        let client = self.client.clone();
1558        let partition_config = self.config.partition_config;
1559        let reason = args.reason.clone();
1560        let now = args.now;
1561        let dispatch_members = members.clone();
1562        let flow_id = args.flow_id.clone();
1563        // Every async cancel_flow contends on this lock, but the critical
1564        // section is tiny: try_join_next drain + spawn. Drain is amortized
1565        // O(1) — each completed task is reaped exactly once across all
1566        // callers, and spawn is synchronous. At realistic cancel rates the
1567        // lock hold time is microseconds and does not bottleneck handlers.
1568        let mut guard = self.background_tasks.lock().await;
1569
1570        // Reap completed background dispatches before spawning the next.
1571        // Without this sweep, JoinSet accumulates Ok(()) results for every
1572        // async cancel ever issued — a memory leak in long-running servers
1573        // that would otherwise only drain on Server::shutdown. Surface any
1574        // panicked/aborted dispatches via tracing so silent failures in
1575        // cancel_member_execution are visible in logs.
1576        while let Some(joined) = guard.try_join_next() {
1577            if let Err(e) = joined {
1578                tracing::warn!(
1579                    error = %e,
1580                    "cancel_flow: background dispatch task panicked or was aborted"
1581                );
1582            }
1583        }
1584
1585        guard.spawn(async move {
1586            // Bounded parallel dispatch via futures::stream::buffer_unordered.
1587            // Sequential cancel of a 1000-member flow at ~2ms/FCALL is ~2s —
1588            // too long to finish inside a 15s shutdown abort window for
1589            // large flows. Bounding at CONCURRENCY keeps Valkey load
1590            // predictable while still cutting wall-clock dispatch time by
1591            // ~CONCURRENCY× for large member sets.
1592            use futures::stream::StreamExt;
1593            const CONCURRENCY: usize = 16;
1594
1595            let member_count = dispatch_members.len();
1596            let flow_id_for_log = flow_id.clone();
1597            futures::stream::iter(dispatch_members)
1598                .map(|eid_str| {
1599                    let client = client.clone();
1600                    let reason = reason.clone();
1601                    let flow_id = flow_id.clone();
1602                    async move {
1603                        if let Err(e) = cancel_member_execution(
1604                            &client,
1605                            &partition_config,
1606                            &eid_str,
1607                            &reason,
1608                            now,
1609                        )
1610                        .await
1611                        {
1612                            tracing::warn!(
1613                                flow_id = %flow_id,
1614                                execution_id = %eid_str,
1615                                error = %e,
1616                                "cancel_flow(async): individual execution cancel failed (may be terminal)"
1617                            );
1618                        }
1619                    }
1620                })
1621                .buffer_unordered(CONCURRENCY)
1622                .for_each(|()| async {})
1623                .await;
1624
1625            tracing::debug!(
1626                flow_id = %flow_id_for_log,
1627                member_count,
1628                concurrency = CONCURRENCY,
1629                "cancel_flow: background member dispatch complete"
1630            );
1631        });
1632        drop(guard);
1633
1634        let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1635        Ok(CancelFlowResult::CancellationScheduled {
1636            cancellation_policy: policy,
1637            member_count,
1638            member_execution_ids: members,
1639        })
1640    }
1641
1642    /// Stage a dependency edge between two executions in a flow.
1643    ///
1644    /// Runs on the flow partition {fp:N}.
1645    /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1646    pub async fn stage_dependency_edge(
1647        &self,
1648        args: &StageDependencyEdgeArgs,
1649    ) -> Result<StageDependencyEdgeResult, ServerError> {
1650        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1651        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1652
1653        // KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set, grant_hash
1654        let fcall_keys: Vec<String> = vec![
1655            fctx.core(),
1656            fctx.members(),
1657            fctx.edge(&args.edge_id),
1658            fctx.outgoing(&args.upstream_execution_id),
1659            fctx.incoming(&args.downstream_execution_id),
1660            fctx.grant(&args.edge_id.to_string()),
1661        ];
1662
1663        // ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
1664        //           dependency_kind, data_passing_ref, expected_graph_revision, now_ms
1665        let fcall_args: Vec<String> = vec![
1666            args.flow_id.to_string(),
1667            args.edge_id.to_string(),
1668            args.upstream_execution_id.to_string(),
1669            args.downstream_execution_id.to_string(),
1670            args.dependency_kind.clone(),
1671            args.data_passing_ref.clone().unwrap_or_default(),
1672            args.expected_graph_revision.to_string(),
1673            args.now.to_string(),
1674        ];
1675
1676        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1677        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1678
1679        let raw: Value = self
1680            .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1681            .await?;
1682
1683        parse_stage_dependency_edge_result(&raw)
1684    }
1685
1686    /// Apply a staged dependency edge to the child execution.
1687    ///
1688    /// Runs on the child execution partition {p:N}.
1689    /// KEYS (7), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.
1690    pub async fn apply_dependency_to_child(
1691        &self,
1692        args: &ApplyDependencyToChildArgs,
1693    ) -> Result<ApplyDependencyToChildResult, ServerError> {
1694        let partition = execution_partition(
1695            &args.downstream_execution_id,
1696            &self.config.partition_config,
1697        );
1698        let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1699        let idx = IndexKeys::new(&partition);
1700
1701        // Pre-read lane_id for index keys
1702        let lane_str: Option<String> = self
1703            .client
1704            .hget(&ctx.core(), "lane_id")
1705            .await
1706            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1707        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1708
1709        // KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,
1710        //           eligible_zset, blocked_deps_zset, deps_all_edges
1711        let fcall_keys: Vec<String> = vec![
1712            ctx.core(),
1713            ctx.deps_meta(),
1714            ctx.deps_unresolved(),
1715            ctx.dep_edge(&args.edge_id),
1716            idx.lane_eligible(&lane),
1717            idx.lane_blocked_dependencies(&lane),
1718            ctx.deps_all_edges(),
1719        ];
1720
1721        // ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
1722        //           dependency_kind, data_passing_ref, now_ms
1723        let fcall_args: Vec<String> = vec![
1724            args.flow_id.to_string(),
1725            args.edge_id.to_string(),
1726            args.upstream_execution_id.to_string(),
1727            args.graph_revision.to_string(),
1728            args.dependency_kind.clone(),
1729            args.data_passing_ref.clone().unwrap_or_default(),
1730            args.now.to_string(),
1731        ];
1732
1733        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1734        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1735
1736        let raw: Value = self
1737            .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1738            .await?;
1739
1740        parse_apply_dependency_result(&raw)
1741    }
1742
1743    // ── Execution operations API ──
1744
1745    /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1746    ///
1747    /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1748    /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1749    pub async fn deliver_signal(
1750        &self,
1751        args: &DeliverSignalArgs,
1752    ) -> Result<DeliverSignalResult, ServerError> {
1753        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1754        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1755        let idx = IndexKeys::new(&partition);
1756
1757        // Pre-read lane_id for index keys
1758        let lane_str: Option<String> = self
1759            .client
1760            .hget(&ctx.core(), "lane_id")
1761            .await
1762            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1763        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1764
1765        let wp_id = &args.waitpoint_id;
1766        let sig_id = &args.signal_id;
1767        let idem_key = args
1768            .idempotency_key
1769            .as_ref()
1770            .filter(|k| !k.is_empty())
1771            .map(|k| ctx.signal_dedup(wp_id, k))
1772            .unwrap_or_else(|| ctx.noop());
1773
1774        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1775        //            exec_signals_zset, signal_hash, signal_payload,
1776        //            idem_key, waitpoint_hash, suspension_current,
1777        //            eligible_zset, suspended_zset, delayed_zset,
1778        //            suspension_timeout_zset, hmac_secrets
1779        let fcall_keys: Vec<String> = vec![
1780            ctx.core(),                       // 1
1781            ctx.waitpoint_condition(wp_id),    // 2
1782            ctx.waitpoint_signals(wp_id),      // 3
1783            ctx.exec_signals(),                // 4
1784            ctx.signal(sig_id),                // 5
1785            ctx.signal_payload(sig_id),        // 6
1786            idem_key,                          // 7
1787            ctx.waitpoint(wp_id),              // 8
1788            ctx.suspension_current(),          // 9
1789            idx.lane_eligible(&lane),          // 10
1790            idx.lane_suspended(&lane),         // 11
1791            idx.lane_delayed(&lane),           // 12
1792            idx.suspension_timeout(),          // 13
1793            idx.waitpoint_hmac_secrets(),      // 14
1794        ];
1795
1796        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1797        //            signal_category, source_type, source_identity,
1798        //            payload, payload_encoding, idempotency_key,
1799        //            correlation_id, target_scope, created_at,
1800        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1801        //            max_signals_per_execution, waitpoint_token
1802        let fcall_args: Vec<String> = vec![
1803            args.signal_id.to_string(),                          // 1
1804            args.execution_id.to_string(),                       // 2
1805            args.waitpoint_id.to_string(),                       // 3
1806            args.signal_name.clone(),                            // 4
1807            args.signal_category.clone(),                        // 5
1808            args.source_type.clone(),                            // 6
1809            args.source_identity.clone(),                        // 7
1810            args.payload.as_ref()
1811                .map(|p| String::from_utf8_lossy(p).into_owned())
1812                .unwrap_or_default(),                            // 8
1813            args.payload_encoding
1814                .clone()
1815                .unwrap_or_else(|| "json".to_owned()),           // 9
1816            args.idempotency_key
1817                .clone()
1818                .unwrap_or_default(),                            // 10
1819            args.correlation_id
1820                .clone()
1821                .unwrap_or_default(),                            // 11
1822            args.target_scope.clone(),                           // 12
1823            args.created_at
1824                .map(|ts| ts.to_string())
1825                .unwrap_or_else(|| args.now.to_string()),        // 13
1826            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), // 14
1827            args.resume_delay_ms.unwrap_or(0).to_string(),       // 15
1828            args.signal_maxlen.unwrap_or(1000).to_string(),      // 16
1829            args.max_signals_per_execution
1830                .unwrap_or(10_000)
1831                .to_string(),                                    // 17
1832            // WIRE BOUNDARY — raw token to Lua. Display is redacted
1833            // for log safety; use .as_str() at the wire crossing.
1834            args.waitpoint_token.as_str().to_owned(),            // 18
1835        ];
1836
1837        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1838        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1839
1840        let raw: Value = self
1841            .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
1842            .await?;
1843
1844        parse_deliver_signal_result(&raw, &args.signal_id)
1845    }
1846
1847    /// Change an execution's priority.
1848    ///
1849    /// KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.
1850    pub async fn change_priority(
1851        &self,
1852        execution_id: &ExecutionId,
1853        new_priority: i32,
1854    ) -> Result<ChangePriorityResult, ServerError> {
1855        let partition = execution_partition(execution_id, &self.config.partition_config);
1856        let ctx = ExecKeyContext::new(&partition, execution_id);
1857        let idx = IndexKeys::new(&partition);
1858
1859        // Read lane_id for eligible_zset key
1860        let lane_str: Option<String> = self
1861            .client
1862            .hget(&ctx.core(), "lane_id")
1863            .await
1864            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1865        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1866
1867        // KEYS (2): exec_core, eligible_zset
1868        let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
1869
1870        // ARGV (2): execution_id, new_priority
1871        let fcall_args: Vec<String> = vec![
1872            execution_id.to_string(),
1873            new_priority.to_string(),
1874        ];
1875
1876        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1877        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1878
1879        let raw: Value = self
1880            .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
1881            .await?;
1882
1883        parse_change_priority_result(&raw, execution_id)
1884    }
1885
1886    /// Scheduler-routed claim entry point (Batch C item 2 PR-B).
1887    ///
1888    /// Delegates to [`ff_scheduler::Scheduler::claim_for_worker`] which
1889    /// runs budget + quota + capability admission before issuing the
1890    /// grant. Returns `Ok(None)` when no eligible execution exists on
1891    /// the lane at this scan cycle. The worker's subsequent
1892    /// `claim_from_grant(lane, grant)` mints the lease.
1893    ///
1894    /// Keeping the claim-grant mint inside the server (rather than the
1895    /// worker) means capability CSV validation, budget/quota breach
1896    /// checks, and lane routing run in one place for every tenant
1897    /// worker — the same invariants as the `direct-valkey-claim` path
1898    /// enforces inline, but gated at a single server choke point.
1899    pub async fn claim_for_worker(
1900        &self,
1901        lane: &LaneId,
1902        worker_id: &WorkerId,
1903        worker_instance_id: &WorkerInstanceId,
1904        worker_capabilities: &std::collections::BTreeSet<String>,
1905        grant_ttl_ms: u64,
1906    ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
1907        let scheduler = ff_scheduler::Scheduler::new(
1908            self.client.clone(),
1909            self.config.partition_config,
1910        );
1911        scheduler
1912            .claim_for_worker(
1913                lane,
1914                worker_id,
1915                worker_instance_id,
1916                worker_capabilities,
1917                grant_ttl_ms,
1918            )
1919            .await
1920            .map_err(|e| match e {
1921                ff_scheduler::SchedulerError::Valkey(inner) => {
1922                    ServerError::Valkey(inner)
1923                }
1924                ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
1925                    ServerError::ValkeyContext { source, context }
1926                }
1927                ff_scheduler::SchedulerError::Config(msg) => {
1928                    ServerError::InvalidInput(msg)
1929                }
1930            })
1931    }
1932
1933    /// Revoke an active lease (operator-initiated).
1934    pub async fn revoke_lease(
1935        &self,
1936        execution_id: &ExecutionId,
1937    ) -> Result<RevokeLeaseResult, ServerError> {
1938        let partition = execution_partition(execution_id, &self.config.partition_config);
1939        let ctx = ExecKeyContext::new(&partition, execution_id);
1940        let idx = IndexKeys::new(&partition);
1941
1942        // Pre-read worker_instance_id for worker_leases key
1943        let wiid_str: Option<String> = self
1944            .client
1945            .hget(&ctx.core(), "current_worker_instance_id")
1946            .await
1947            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET worker_instance_id".into() })?;
1948        let wiid = match wiid_str {
1949            Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
1950            _ => {
1951                return Err(ServerError::NotFound(format!(
1952                    "no active lease for execution {execution_id} (no current_worker_instance_id)"
1953                )));
1954            }
1955        };
1956
1957        // KEYS (5): exec_core, lease_current, lease_history, lease_expiry_zset, worker_leases
1958        let fcall_keys: Vec<String> = vec![
1959            ctx.core(),
1960            ctx.lease_current(),
1961            ctx.lease_history(),
1962            idx.lease_expiry(),
1963            idx.worker_leases(&wiid),
1964        ];
1965
1966        // ARGV (3): execution_id, expected_lease_id (empty = skip check), revoke_reason
1967        let fcall_args: Vec<String> = vec![
1968            execution_id.to_string(),
1969            String::new(), // no expected_lease_id check
1970            "operator_revoke".to_owned(),
1971        ];
1972
1973        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1974        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1975
1976        let raw: Value = self
1977            .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
1978            .await?;
1979
1980        parse_revoke_lease_result(&raw)
1981    }
1982
1983    /// Get full execution info via HGETALL on exec_core.
1984    pub async fn get_execution(
1985        &self,
1986        execution_id: &ExecutionId,
1987    ) -> Result<ExecutionInfo, ServerError> {
1988        let partition = execution_partition(execution_id, &self.config.partition_config);
1989        let ctx = ExecKeyContext::new(&partition, execution_id);
1990
1991        let fields: HashMap<String, String> = self
1992            .client
1993            .hgetall(&ctx.core())
1994            .await
1995            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL exec_core".into() })?;
1996
1997        if fields.is_empty() {
1998            return Err(ServerError::NotFound(format!(
1999                "execution not found: {execution_id}"
2000            )));
2001        }
2002
2003        let parse_enum = |field: &str| -> String {
2004            fields.get(field).cloned().unwrap_or_default()
2005        };
2006        fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2007            let quoted = format!("\"{raw}\"");
2008            serde_json::from_str(&quoted).map_err(|e| {
2009                ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2010            })
2011        }
2012
2013        let lp_str = parse_enum("lifecycle_phase");
2014        let os_str = parse_enum("ownership_state");
2015        let es_str = parse_enum("eligibility_state");
2016        let br_str = parse_enum("blocking_reason");
2017        let to_str = parse_enum("terminal_outcome");
2018        let as_str = parse_enum("attempt_state");
2019        let ps_str = parse_enum("public_state");
2020
2021        let state_vector = StateVector {
2022            lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2023            ownership_state: deserialize("ownership_state", &os_str)?,
2024            eligibility_state: deserialize("eligibility_state", &es_str)?,
2025            blocking_reason: deserialize("blocking_reason", &br_str)?,
2026            terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2027            attempt_state: deserialize("attempt_state", &as_str)?,
2028            public_state: deserialize("public_state", &ps_str)?,
2029        };
2030
2031        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2032        // stamped atomically with membership in `add_execution_to_flow`'s
2033        // single FCALL. Empty iff the exec has no flow affinity (never
2034        // called `add_execution_to_flow` — solo execs) — NOT "orphaned
2035        // mid-two-phase" (that failure mode is retired). Filter on empty
2036        // to surface "no flow affinity" distinctly from "flow X".
2037        let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2038
2039        // started_at / completed_at come from the exec_core hash —
2040        // lua/execution.lua writes them alongside the rest of the state
2041        // vector. `created_at` is required; the other two are optional
2042        // (pre-claim / pre-terminal reads) and elided when empty so the
2043        // wire payload for in-flight executions stays the shape existing
2044        // callers expect.
2045        let started_at_opt = fields
2046            .get("started_at")
2047            .filter(|s| !s.is_empty())
2048            .cloned();
2049        let completed_at_opt = fields
2050            .get("completed_at")
2051            .filter(|s| !s.is_empty())
2052            .cloned();
2053
2054        Ok(ExecutionInfo {
2055            execution_id: execution_id.clone(),
2056            namespace: parse_enum("namespace"),
2057            lane_id: parse_enum("lane_id"),
2058            priority: fields
2059                .get("priority")
2060                .and_then(|v| v.parse().ok())
2061                .unwrap_or(0),
2062            execution_kind: parse_enum("execution_kind"),
2063            state_vector,
2064            public_state: deserialize("public_state", &ps_str)?,
2065            created_at: parse_enum("created_at"),
2066            started_at: started_at_opt,
2067            completed_at: completed_at_opt,
2068            current_attempt_index: fields
2069                .get("current_attempt_index")
2070                .and_then(|v| v.parse().ok())
2071                .unwrap_or(0),
2072            flow_id: flow_id_val,
2073            blocking_detail: parse_enum("blocking_detail"),
2074        })
2075    }
2076
2077    /// List executions from a partition's index ZSET.
2078    ///
2079    /// No FCALL — direct ZRANGE + pipelined HMGET reads.
2080    pub async fn list_executions(
2081        &self,
2082        partition_id: u16,
2083        lane: &LaneId,
2084        state_filter: &str,
2085        offset: u64,
2086        limit: u64,
2087    ) -> Result<ListExecutionsResult, ServerError> {
2088        let partition = ff_core::partition::Partition {
2089            family: ff_core::partition::PartitionFamily::Execution,
2090            index: partition_id,
2091        };
2092        let idx = IndexKeys::new(&partition);
2093
2094        let zset_key = match state_filter {
2095            "eligible" => idx.lane_eligible(lane),
2096            "delayed" => idx.lane_delayed(lane),
2097            "terminal" => idx.lane_terminal(lane),
2098            "suspended" => idx.lane_suspended(lane),
2099            "active" => idx.lane_active(lane),
2100            other => {
2101                return Err(ServerError::InvalidInput(format!(
2102                    "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2103                )));
2104            }
2105        };
2106
2107        // ZRANGE key -inf +inf BYSCORE LIMIT offset count
2108        let eids: Vec<String> = self
2109            .client
2110            .cmd("ZRANGE")
2111            .arg(&zset_key)
2112            .arg("-inf")
2113            .arg("+inf")
2114            .arg("BYSCORE")
2115            .arg("LIMIT")
2116            .arg(offset)
2117            .arg(limit)
2118            .execute()
2119            .await
2120            .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("ZRANGE {zset_key}") })?;
2121
2122        if eids.is_empty() {
2123            return Ok(ListExecutionsResult {
2124                executions: vec![],
2125                total_returned: 0,
2126            });
2127        }
2128
2129        // Parse execution IDs, warning on corrupt ZSET members
2130        let mut parsed = Vec::with_capacity(eids.len());
2131        for eid_str in &eids {
2132            match ExecutionId::parse(eid_str) {
2133                Ok(id) => parsed.push(id),
2134                Err(e) => {
2135                    tracing::warn!(
2136                        raw_id = %eid_str,
2137                        error = %e,
2138                        zset = %zset_key,
2139                        "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2140                    );
2141                }
2142            }
2143        }
2144
2145        if parsed.is_empty() {
2146            return Ok(ListExecutionsResult {
2147                executions: vec![],
2148                total_returned: 0,
2149            });
2150        }
2151
2152        // Pipeline all HMGETs into a single round-trip
2153        let mut pipe = self.client.pipeline();
2154        let mut slots = Vec::with_capacity(parsed.len());
2155        for eid in &parsed {
2156            let ep = execution_partition(eid, &self.config.partition_config);
2157            let ctx = ExecKeyContext::new(&ep, eid);
2158            let slot = pipe
2159                .cmd::<Vec<Option<String>>>("HMGET")
2160                .arg(ctx.core())
2161                .arg("namespace")
2162                .arg("lane_id")
2163                .arg("execution_kind")
2164                .arg("public_state")
2165                .arg("priority")
2166                .arg("created_at")
2167                .finish();
2168            slots.push(slot);
2169        }
2170
2171        pipe.execute()
2172            .await
2173            .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline HMGET".into() })?;
2174
2175        let mut summaries = Vec::with_capacity(parsed.len());
2176        for (eid, slot) in parsed.into_iter().zip(slots) {
2177            let fields: Vec<Option<String>> = slot.value()
2178                .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline slot".into() })?;
2179
2180            let field = |i: usize| -> String {
2181                fields
2182                    .get(i)
2183                    .and_then(|v| v.as_ref())
2184                    .cloned()
2185                    .unwrap_or_default()
2186            };
2187
2188            summaries.push(ExecutionSummary {
2189                execution_id: eid,
2190                namespace: field(0),
2191                lane_id: field(1),
2192                execution_kind: field(2),
2193                public_state: field(3),
2194                priority: field(4).parse().unwrap_or(0),
2195                created_at: field(5),
2196            });
2197        }
2198
2199        let total = summaries.len();
2200        Ok(ListExecutionsResult {
2201            executions: summaries,
2202            total_returned: total,
2203        })
2204    }
2205
2206    /// Replay a terminal execution.
2207    ///
2208    /// Pre-reads exec_core for flow_id and dep edges (variable KEYS).
2209    /// KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.
2210    pub async fn replay_execution(
2211        &self,
2212        execution_id: &ExecutionId,
2213    ) -> Result<ReplayExecutionResult, ServerError> {
2214        let partition = execution_partition(execution_id, &self.config.partition_config);
2215        let ctx = ExecKeyContext::new(&partition, execution_id);
2216        let idx = IndexKeys::new(&partition);
2217
2218        // Pre-read lane_id, flow_id, terminal_outcome.
2219        //
2220        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2221        // stamped atomically with membership by `add_execution_to_flow`'s
2222        // single FCALL. Empty iff the exec has no flow affinity
2223        // (solo-path create_execution — never added to a flow). The
2224        // `is_skipped_flow_member` branch below gates on
2225        // `!flow_id_str.is_empty()`, so solo execs correctly fall back
2226        // to the non-flow-member replay path. See
2227        // `add_execution_to_flow` rustdoc for the atomic-commit
2228        // invariant.
2229        let dyn_fields: Vec<Option<String>> = self
2230            .client
2231            .cmd("HMGET")
2232            .arg(ctx.core())
2233            .arg("lane_id")
2234            .arg("flow_id")
2235            .arg("terminal_outcome")
2236            .execute()
2237            .await
2238            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET replay pre-read".into() })?;
2239        let lane = LaneId::new(
2240            dyn_fields
2241                .first()
2242                .and_then(|v| v.as_ref())
2243                .cloned()
2244                .unwrap_or_else(|| "default".to_owned()),
2245        );
2246        let flow_id_str = dyn_fields
2247            .get(1)
2248            .and_then(|v| v.as_ref())
2249            .cloned()
2250            .unwrap_or_default();
2251        let terminal_outcome = dyn_fields
2252            .get(2)
2253            .and_then(|v| v.as_ref())
2254            .cloned()
2255            .unwrap_or_default();
2256
2257        let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2258
2259        // Base KEYS (4): exec_core, terminal_zset, eligible_zset, lease_history
2260        let mut fcall_keys: Vec<String> = vec![
2261            ctx.core(),
2262            idx.lane_terminal(&lane),
2263            idx.lane_eligible(&lane),
2264            ctx.lease_history(),
2265        ];
2266
2267        // Base ARGV (2): execution_id, now_ms
2268        let now = TimestampMs::now();
2269        let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2270
2271        if is_skipped_flow_member {
2272            // Read ALL inbound edge IDs from the flow partition's adjacency set.
2273            // Cannot use deps:unresolved because impossible edges were SREM'd
2274            // by ff_resolve_dependency. The flow's in:<eid> set has all edges.
2275            let flow_id = FlowId::parse(&flow_id_str)
2276                .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2277            let flow_part =
2278                flow_partition(&flow_id, &self.config.partition_config);
2279            let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2280            let edge_ids: Vec<String> = self
2281                .client
2282                .cmd("SMEMBERS")
2283                .arg(flow_ctx.incoming(execution_id))
2284                .execute()
2285                .await
2286                .map_err(|e| ServerError::ValkeyContext { source: e, context: "SMEMBERS replay edges".into() })?;
2287
2288            // Extended KEYS: blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N
2289            fcall_keys.push(idx.lane_blocked_dependencies(&lane)); // 5
2290            fcall_keys.push(ctx.deps_meta()); // 6
2291            fcall_keys.push(ctx.deps_unresolved()); // 7
2292            for eid_str in &edge_ids {
2293                let edge_id = EdgeId::parse(eid_str)
2294                    .unwrap_or_else(|_| EdgeId::new());
2295                fcall_keys.push(ctx.dep_edge(&edge_id)); // 8..8+N
2296                fcall_args.push(eid_str.clone()); // 3..3+N
2297            }
2298        }
2299
2300        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2301        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2302
2303        let raw: Value = self
2304            .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2305            .await?;
2306
2307        parse_replay_result(&raw)
2308    }
2309
2310    /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
2311    /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
2312    /// when the producer finalizes.
2313    ///
2314    /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
2315    /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
2316    /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
2317    /// and the Lua-side reject).
2318    ///
2319    /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
2320    /// execution id, so all KEYS share the same slot.
2321    pub async fn read_attempt_stream(
2322        &self,
2323        execution_id: &ExecutionId,
2324        attempt_index: AttemptIndex,
2325        from_id: &str,
2326        to_id: &str,
2327        count_limit: u64,
2328    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2329        use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2330
2331        if count_limit == 0 {
2332            return Err(ServerError::InvalidInput(
2333                "count_limit must be >= 1".to_owned(),
2334            ));
2335        }
2336
2337        // Share the same semaphore as tail. A large XRANGE reply (10_000
2338        // frames × ~64KB) is just as capable of head-of-line-blocking the
2339        // tail_client mux as a long BLOCK — fairness accounting must be
2340        // unified. Non-blocking acquire → 429 on contention.
2341        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2342            Ok(p) => p,
2343            Err(tokio::sync::TryAcquireError::NoPermits) => {
2344                return Err(ServerError::ConcurrencyLimitExceeded(
2345                    "stream_ops",
2346                    self.config.max_concurrent_stream_ops,
2347                ));
2348            }
2349            Err(tokio::sync::TryAcquireError::Closed) => {
2350                return Err(ServerError::OperationFailed(
2351                    "stream semaphore closed (server shutting down)".into(),
2352                ));
2353            }
2354        };
2355
2356        let args = ReadFramesArgs {
2357            execution_id: execution_id.clone(),
2358            attempt_index,
2359            from_id: from_id.to_owned(),
2360            to_id: to_id.to_owned(),
2361            count_limit,
2362        };
2363
2364        let partition = execution_partition(execution_id, &self.config.partition_config);
2365        let ctx = ExecKeyContext::new(&partition, execution_id);
2366        let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2367
2368        // Route on the dedicated stream client, same as tail. A 10_000-
2369        // frame XRANGE reply on the main mux would stall every other
2370        // FCALL behind reply serialization.
2371        let result = ff_script::functions::stream::ff_read_attempt_stream(
2372            &self.tail_client, &keys, &args,
2373        )
2374        .await
2375        .map_err(script_error_to_server);
2376
2377        drop(permit);
2378
2379        match result? {
2380            ReadFramesResult::Frames(f) => Ok(f),
2381        }
2382    }
2383
2384    /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
2385    /// plus the terminal signal so a polling consumer can exit when the
2386    /// producer closes the stream.
2387    ///
2388    /// `last_id` is exclusive — XREAD returns entries with id > last_id.
2389    /// Pass `"0-0"` to read from the beginning.
2390    ///
2391    /// `block_ms == 0` → non-blocking peek (returns immediately).
2392    /// `block_ms > 0`  → blocks up to that many ms. Empty `frames` +
2393    /// `closed_at=None` → timeout, no new data, still open.
2394    ///
2395    /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
2396    ///
2397    /// Implemented as a direct XREAD command (not FCALL) because blocking
2398    /// commands are rejected inside Valkey Functions. The terminal
2399    /// markers come from a companion HMGET on `stream_meta` — see
2400    /// `ff_script::stream_tail` module docs.
2401    pub async fn tail_attempt_stream(
2402        &self,
2403        execution_id: &ExecutionId,
2404        attempt_index: AttemptIndex,
2405        last_id: &str,
2406        block_ms: u64,
2407        count_limit: u64,
2408    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2409        if count_limit == 0 {
2410            return Err(ServerError::InvalidInput(
2411                "count_limit must be >= 1".to_owned(),
2412            ));
2413        }
2414
2415        // Non-blocking permit acquisition on the shared stream_semaphore
2416        // (read + tail split the same pool). If the server is already at
2417        // the `max_concurrent_stream_ops` ceiling, return `TailUnavailable`
2418        // (→ 429) rather than queueing — a queued tail holds the caller's
2419        // HTTP request open with no upper bound, which is exactly the
2420        // resource-exhaustion pattern this limit exists to prevent.
2421        // Clients retry with backoff on 429.
2422        //
2423        // Worst-case permit hold: if the producer closes the stream via
2424        // HSET on stream_meta (not an XADD), the XREAD BLOCK won't wake
2425        // until `block_ms` elapses — so a permit can be held for up to
2426        // the caller's block_ms even though the terminal signal is
2427        // ready. This is a v1-accepted limitation; RFC-006 §Terminal-
2428        // signal timing under active tail documents it and sketches the
2429        // v2 sentinel-XADD upgrade path.
2430        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2431            Ok(p) => p,
2432            Err(tokio::sync::TryAcquireError::NoPermits) => {
2433                return Err(ServerError::ConcurrencyLimitExceeded(
2434                    "stream_ops",
2435                    self.config.max_concurrent_stream_ops,
2436                ));
2437            }
2438            Err(tokio::sync::TryAcquireError::Closed) => {
2439                return Err(ServerError::OperationFailed(
2440                    "stream semaphore closed (server shutting down)".into(),
2441                ));
2442            }
2443        };
2444
2445        let partition = execution_partition(execution_id, &self.config.partition_config);
2446        let ctx = ExecKeyContext::new(&partition, execution_id);
2447        let stream_key = ctx.stream(attempt_index);
2448        let stream_meta_key = ctx.stream_meta(attempt_index);
2449
2450        // Acquire the XREAD BLOCK serializer AFTER the stream semaphore.
2451        // Nesting order matters: the semaphore is the user-visible
2452        // ceiling (surfaces as 429), the Mutex is an internal fairness
2453        // gate. Holding the permit while waiting on the Mutex means the
2454        // ceiling still bounds queue depth. See the field docstring for
2455        // the full rationale (ferriskey pipeline FIFO + client-side
2456        // per-call timeout race).
2457        let _xread_guard = self.xread_block_lock.lock().await;
2458
2459        let result = ff_script::stream_tail::xread_block(
2460            &self.tail_client,
2461            &stream_key,
2462            &stream_meta_key,
2463            last_id,
2464            block_ms,
2465            count_limit,
2466        )
2467        .await
2468        .map_err(script_error_to_server);
2469
2470        drop(_xread_guard);
2471        drop(permit);
2472        result
2473    }
2474
2475    /// Graceful shutdown — stops scanners, drains background handler tasks
2476    /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
2477    /// for scanners to finish.
2478    ///
2479    /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
2480    /// cleanly without new arrivals piling up:
2481    ///
2482    /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
2483    ///    with `ServerError::OperationFailed("stream semaphore closed …")`
2484    ///    which the REST layer surfaces as a 500 with `retryable=false`
2485    ///    (ops tooling may choose to wait + retry on 503-class responses;
2486    ///    the body clearly names the shutdown reason).
2487    /// 2. Drain handler-spawned background tasks with a 15s ceiling.
2488    /// 3. `engine.shutdown()` stops scanners.
2489    ///
2490    /// Existing in-flight tails finish on their natural `block_ms`
2491    /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
2492    /// is dropped after this function returns. We do NOT wait for tails
2493    /// to drain explicitly — the semaphore-close + natural-timeout
2494    /// combination bounds shutdown to roughly `block_ms + 15s` in the
2495    /// worst case. Callers observing a dropped connection retry against
2496    /// whatever replacement is coming up.
2497    pub async fn shutdown(self) {
2498        tracing::info!("shutting down FlowFabric server");
2499
2500        // Step 1: Close the stream semaphore FIRST so any in-flight
2501        // read/tail calls that are between `try_acquire` and their
2502        // Valkey command still hold a valid permit, but no NEW stream
2503        // op can start. `Semaphore::close()` is idempotent.
2504        self.stream_semaphore.close();
2505        tracing::info!(
2506            "stream semaphore closed; no new read/tail attempts will be accepted"
2507        );
2508
2509        // Step 2: Drain handler-spawned background tasks with the same
2510        // ceiling as Engine::shutdown. If dispatch is still running at
2511        // the deadline, drop the JoinSet to abort remaining tasks.
2512        let drain_timeout = Duration::from_secs(15);
2513        let background = self.background_tasks.clone();
2514        let drain = async move {
2515            let mut guard = background.lock().await;
2516            while guard.join_next().await.is_some() {}
2517        };
2518        match tokio::time::timeout(drain_timeout, drain).await {
2519            Ok(()) => {}
2520            Err(_) => {
2521                tracing::warn!(
2522                    timeout_s = drain_timeout.as_secs(),
2523                    "shutdown: background tasks did not finish in time, aborting"
2524                );
2525                self.background_tasks.lock().await.abort_all();
2526            }
2527        }
2528
2529        self.engine.shutdown().await;
2530        tracing::info!("FlowFabric server shutdown complete");
2531    }
2532}
2533
2534// ── Valkey version check (RFC-011 §13) ──
2535
2536/// Minimum Valkey version the engine requires (see RFC-011 §13). 8.0 ships the
2537/// hash-slot and Functions behavior the co-location design relies on.
2538const REQUIRED_VALKEY_MAJOR: u32 = 8;
2539
2540/// Upper bound on the rolling-upgrade retry window (RFC-011 §9.17). A Valkey
2541/// node cycling through SIGTERM → restart typically completes in well under
2542/// 60s; this budget is generous without letting a truly-stuck cluster hang
2543/// boot indefinitely.
2544const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2545
2546/// Verify the connected Valkey reports `redis_version` ≥ 8.0.
2547///
2548/// Per RFC-011 §9.17, during a rolling upgrade the node we happen to connect
2549/// to may temporarily be pre-upgrade while others are post-upgrade. The check
2550/// tolerates this by retrying the whole verification (including low-version
2551/// responses) with exponential backoff, capped at a 60s budget.
2552///
2553/// **Retries on:**
2554/// - Low-version responses (`detected_major < REQUIRED_VALKEY_MAJOR`) — may
2555///   resolve as the rolling upgrade progresses onto the connected node.
2556/// - Retryable ferriskey transport errors — connection refused,
2557///   `BusyLoadingError`, `ClusterDown`, etc., classified via
2558///   `ff_script::retry::is_retryable_kind`.
2559/// - Missing/unparsable version field — treated as transient (fresh-boot
2560///   server may not have the INFO fields populated yet). Reads
2561///   `valkey_version` when present (authoritative on Valkey 8.0+), falls
2562///   back to `redis_version` for Valkey 7.x.
2563///
2564/// **Does NOT retry on:**
2565/// - Non-retryable transport errors (auth failures, permission denied,
2566///   invalid client config) — these are operator misconfiguration, not
2567///   transient cluster state; fast-fail preserves a clear signal.
2568///
2569/// On budget exhaustion, returns the last observed error.
2570async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2571    let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2572    let mut backoff = Duration::from_millis(200);
2573    loop {
2574        let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2575            match query_valkey_version(client).await {
2576                Ok(detected_major) if detected_major >= REQUIRED_VALKEY_MAJOR => {
2577                    tracing::info!(
2578                        detected_major,
2579                        required = REQUIRED_VALKEY_MAJOR,
2580                        "Valkey version accepted"
2581                    );
2582                    return Ok(());
2583                }
2584                Ok(detected_major) => (
2585                    // Low version — may be a rolling-upgrade stale node.
2586                    // Retry within budget; after exhaustion, the cluster
2587                    // is misconfigured and fast-fail is the correct signal.
2588                    true,
2589                    ServerError::ValkeyVersionTooLow {
2590                        detected: detected_major.to_string(),
2591                        required: format!("{REQUIRED_VALKEY_MAJOR}.0"),
2592                    },
2593                    format!("detected_major={detected_major} < required={REQUIRED_VALKEY_MAJOR}"),
2594                ),
2595                Err(e) => {
2596                    // Only retry if the underlying Valkey error is retryable
2597                    // by kind. Auth / permission / invalid-config should fast-
2598                    // fail so operators see the true root cause immediately,
2599                    // not a 60s hang followed by a generic "transient" error.
2600                    let retryable = e
2601                        .valkey_kind()
2602                        .map(ff_script::retry::is_retryable_kind)
2603                        // Non-Valkey errors (parse, missing field, operation
2604                        // failures) are treated as transient — a fresh-boot
2605                        // Valkey may not have redis_version populated yet.
2606                        .unwrap_or(true);
2607                    let detail = e.to_string();
2608                    (retryable, e, detail)
2609                }
2610            };
2611
2612        if !should_retry {
2613            return Err(err_for_budget_exhaust);
2614        }
2615        if tokio::time::Instant::now() >= deadline {
2616            return Err(err_for_budget_exhaust);
2617        }
2618        tracing::warn!(
2619            backoff_ms = backoff.as_millis() as u64,
2620            detail = %log_detail,
2621            "valkey version check transient failure; retrying"
2622        );
2623        tokio::time::sleep(backoff).await;
2624        backoff = (backoff * 2).min(Duration::from_secs(5));
2625    }
2626}
2627
2628/// Run `INFO server` and extract the major component of the Valkey version.
2629///
2630/// Returns `Err` on transport errors, missing field, or unparsable version.
2631/// Handles three response shapes:
2632///
2633/// - **Standalone:** single string body; parse directly.
2634/// - **Cluster (RESP3 map):** `INFO` returns a map keyed by node address;
2635///   every node runs the same version in a healthy deployment, so we pick
2636///   one entry and parse it. Divergent versions during a rolling upgrade
2637///   are handled by the outer retry loop.
2638/// - **Empty / unexpected:** surfaces as `OperationFailed` with context.
2639async fn query_valkey_version(client: &Client) -> Result<u32, ServerError> {
2640    let raw: Value = client
2641        .cmd("INFO")
2642        .arg("server")
2643        .execute()
2644        .await
2645        .map_err(|e| ServerError::ValkeyContext {
2646            source: e,
2647            context: "INFO server".into(),
2648        })?;
2649    let info_body = extract_info_body(&raw)?;
2650    parse_valkey_major_version(&info_body)
2651}
2652
2653/// Normalize an `INFO server` response to a single string body.
2654///
2655/// Standalone returns the body directly. Cluster returns a map of
2656/// `node_addr → body`; we pick any entry (they should all report the same
2657/// version in a stable cluster; divergent versions during rolling upgrade
2658/// are reconciled by the outer retry loop).
2659fn extract_info_body(raw: &Value) -> Result<String, ServerError> {
2660    match raw {
2661        Value::BulkString(bytes) => Ok(String::from_utf8_lossy(bytes).into_owned()),
2662        Value::VerbatimString { text, .. } => Ok(text.clone()),
2663        Value::SimpleString(s) => Ok(s.clone()),
2664        Value::Map(entries) => {
2665            // Pick the first entry's value. For cluster-wide INFO the map is
2666            // node_addr → body; every body carries the same version. If a
2667            // node is mid-upgrade, the outer retry loop handles divergence.
2668            let (_, body) = entries.first().ok_or_else(|| {
2669                ServerError::OperationFailed(
2670                    "valkey version check: cluster INFO returned empty map".into(),
2671                )
2672            })?;
2673            extract_info_body(body)
2674        }
2675        other => Err(ServerError::OperationFailed(format!(
2676            "valkey version check: unexpected INFO shape: {other:?}"
2677        ))),
2678    }
2679}
2680
2681/// Extract the major component of the Valkey version from an `INFO server`
2682/// response body. Pure parser — pulled out of [`query_valkey_version`] so it
2683/// is unit-testable without a live Valkey.
2684///
2685/// Prefers the `valkey_version:` field introduced in Valkey 8.0+. Falls back
2686/// to `redis_version:` for Valkey 7.x compat. **Note:** Valkey 8.x/9.x still
2687/// emit `redis_version:7.2.4` for Redis-client compatibility; the real
2688/// server version is in `valkey_version:`, so always check that field first.
2689fn parse_valkey_major_version(info: &str) -> Result<u32, ServerError> {
2690    let extract_major = |line: &str| -> Result<u32, ServerError> {
2691        let trimmed = line.trim();
2692        let major_str = trimmed.split('.').next().unwrap_or("").trim();
2693        if major_str.is_empty() {
2694            return Err(ServerError::OperationFailed(format!(
2695                "valkey version check: empty version field in '{trimmed}'"
2696            )));
2697        }
2698        major_str.parse::<u32>().map_err(|_| {
2699            ServerError::OperationFailed(format!(
2700                "valkey version check: non-numeric major in '{trimmed}'"
2701            ))
2702        })
2703    };
2704    // Prefer valkey_version (authoritative on Valkey 8.0+).
2705    if let Some(valkey_line) = info
2706        .lines()
2707        .find_map(|line| line.strip_prefix("valkey_version:"))
2708    {
2709        return extract_major(valkey_line);
2710    }
2711    // Fall back to redis_version (Valkey 7.x only — 8.x+ pins this to 7.2.4
2712    // for Redis-client compat, so reading it there would under-report).
2713    if let Some(redis_line) = info
2714        .lines()
2715        .find_map(|line| line.strip_prefix("redis_version:"))
2716    {
2717        return extract_major(redis_line);
2718    }
2719    Err(ServerError::OperationFailed(
2720        "valkey version check: INFO missing valkey_version and redis_version".into(),
2721    ))
2722}
2723
2724// ── Partition config validation ──
2725
2726/// Validate or create the `ff:config:partitions` key on first boot.
2727///
2728/// If the key exists, its values must match the server's config.
2729/// If it doesn't exist, create it (first boot).
2730async fn validate_or_create_partition_config(
2731    client: &Client,
2732    config: &PartitionConfig,
2733) -> Result<(), ServerError> {
2734    let key = keys::global_config_partitions();
2735
2736    let existing: HashMap<String, String> = client
2737        .hgetall(&key)
2738        .await
2739        .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
2740
2741    if existing.is_empty() {
2742        // First boot — create the config
2743        tracing::info!("first boot: creating {key}");
2744        client
2745            .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
2746            .await
2747            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_flow_partitions".into() })?;
2748        client
2749            .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
2750            .await
2751            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_budget_partitions".into() })?;
2752        client
2753            .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
2754            .await
2755            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_quota_partitions".into() })?;
2756        return Ok(());
2757    }
2758
2759    // Validate existing config matches
2760    let check = |field: &str, expected: u16| -> Result<(), ServerError> {
2761        let stored: u16 = existing
2762            .get(field)
2763            .and_then(|v| v.parse().ok())
2764            .unwrap_or(0);
2765        if stored != expected {
2766            return Err(ServerError::PartitionMismatch(format!(
2767                "{field}: stored={stored}, config={expected}. \
2768                 Partition counts are fixed at deployment time. \
2769                 Either fix your config or migrate the data."
2770            )));
2771        }
2772        Ok(())
2773    };
2774
2775    check("num_flow_partitions", config.num_flow_partitions)?;
2776    check("num_budget_partitions", config.num_budget_partitions)?;
2777    check("num_quota_partitions", config.num_quota_partitions)?;
2778
2779    tracing::info!("partition config validated against stored {key}");
2780    Ok(())
2781}
2782
2783// ── Waitpoint HMAC secret bootstrap (RFC-004 §Waitpoint Security) ──
2784
2785/// Stable initial kid written on first boot. Rotation promotes to k2, k3, ...
2786/// The kid is stored alongside the secret in every partition's hash so each
2787/// FCALL can self-identify which secret produced a given token.
2788const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
2789
2790/// Per-partition outcome of the HMAC bootstrap step. Collected across the
2791/// parallel scan so we can emit aggregated logs once at the end.
2792enum PartitionBootOutcome {
2793    /// Partition already had a matching (kid, secret) pair.
2794    Match,
2795    /// Stored secret diverges from env — likely operator rotation; kept.
2796    Mismatch,
2797    /// Torn write (current_kid present, secret:<kid> missing); repaired.
2798    Repaired,
2799    /// Fresh partition; atomically installed env secret under kid=k1.
2800    Installed,
2801}
2802
2803/// Bounded in-flight concurrency for the startup fan-out. Large enough to
2804/// turn a 256-partition install from ~15s sequential into ~1s on cross-AZ
2805/// Valkey, small enough to leave a cold cluster breathing room for other
2806/// Server::start work (library load, engine scanner spawn).
2807const BOOT_INIT_CONCURRENCY: usize = 16;
2808
2809async fn init_one_partition(
2810    client: &Client,
2811    partition: Partition,
2812    secret_hex: &str,
2813) -> Result<PartitionBootOutcome, ServerError> {
2814    let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
2815
2816    // Probe for an existing install. Fast path (fresh partition): HGET
2817    // returns nil and we fall through to the atomic install below. Slow
2818    // path: secret:<stored_kid> is then HGET'd once we know the kid name.
2819    // (A previous version used a 2-field HMGET that included a fake
2820    // `secret:probe` placeholder — vestigial from an abandoned
2821    // optimization attempt, and confusing to read. Collapsed back to a
2822    // single-field HGET.)
2823    let stored_kid: Option<String> = client
2824        .cmd("HGET")
2825        .arg(&key)
2826        .arg("current_kid")
2827        .execute()
2828        .await
2829        .map_err(|e| ServerError::ValkeyContext {
2830            source: e,
2831            context: format!("HGET {key} current_kid (init probe)"),
2832        })?;
2833
2834    if let Some(stored_kid) = stored_kid {
2835        // We didn't know the stored kid up front, so now HGET the real
2836        // secret:<stored_kid> field. Two round-trips in the slow path; the
2837        // fast path (fresh partition) stays at one.
2838        let field = format!("secret:{stored_kid}");
2839        let stored_secret: Option<String> = client
2840            .hget(&key, &field)
2841            .await
2842            .map_err(|e| ServerError::ValkeyContext {
2843                source: e,
2844                context: format!("HGET {key} secret:<kid> (init check)"),
2845            })?;
2846        if stored_secret.is_none() {
2847            // Torn write from a previous boot: current_kid present but
2848            // secret:<kid> missing. Without repair, mint returns
2849            // "hmac_secret_not_initialized" on that partition forever.
2850            // Repair in place with env secret. Not rotation — rotation
2851            // always writes the secret first.
2852            client
2853                .hset(&key, &field, secret_hex)
2854                .await
2855                .map_err(|e| ServerError::ValkeyContext {
2856                    source: e,
2857                    context: format!("HSET {key} secret:<kid> (repair torn write)"),
2858                })?;
2859            return Ok(PartitionBootOutcome::Repaired);
2860        }
2861        if stored_secret.as_deref() != Some(secret_hex) {
2862            return Ok(PartitionBootOutcome::Mismatch);
2863        }
2864        return Ok(PartitionBootOutcome::Match);
2865    }
2866
2867    // Fresh partition — install current_kid + secret:<kid> atomically in
2868    // one HSET. Multi-field HSET is single-command atomic, so a crash
2869    // can't leave current_kid without its secret.
2870    let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
2871    let _: i64 = client
2872        .cmd("HSET")
2873        .arg(&key)
2874        .arg("current_kid")
2875        .arg(WAITPOINT_HMAC_INITIAL_KID)
2876        .arg(&secret_field)
2877        .arg(secret_hex)
2878        .execute()
2879        .await
2880        .map_err(|e| ServerError::ValkeyContext {
2881            source: e,
2882            context: format!("HSET {key} (init waitpoint HMAC atomic)"),
2883        })?;
2884    Ok(PartitionBootOutcome::Installed)
2885}
2886
2887/// Install the waitpoint HMAC secret on every execution partition.
2888///
2889/// Parallelized fan-out with bounded in-flight concurrency
2890/// (`BOOT_INIT_CONCURRENCY`) so 256-partition boots finish in ~1s instead
2891/// of ~15s sequential — the prior sequential loop was tight on K8s
2892/// `initialDelaySeconds=30` defaults, especially cross-AZ. Fail-fast:
2893/// the first per-partition error aborts boot.
2894///
2895/// Outcomes aggregate into mismatch/repaired counts (logged once at end)
2896/// so operators see a single loud warning per fault class instead of 256
2897/// per-partition lines.
2898async fn initialize_waitpoint_hmac_secret(
2899    client: &Client,
2900    partition_config: &PartitionConfig,
2901    secret_hex: &str,
2902) -> Result<(), ServerError> {
2903    use futures::stream::{FuturesUnordered, StreamExt};
2904
2905    let n = partition_config.num_flow_partitions;
2906    tracing::info!(
2907        partitions = n,
2908        concurrency = BOOT_INIT_CONCURRENCY,
2909        "installing waitpoint HMAC secret across {n} execution partitions"
2910    );
2911
2912    let mut mismatch_count: u16 = 0;
2913    let mut repaired_count: u16 = 0;
2914    let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
2915    let mut next_index: u16 = 0;
2916
2917    loop {
2918        while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
2919            let partition = Partition {
2920                family: PartitionFamily::Execution,
2921                index: next_index,
2922            };
2923            let client = client.clone();
2924            let secret_hex = secret_hex.to_owned();
2925            pending.push(async move {
2926                init_one_partition(&client, partition, &secret_hex).await
2927            });
2928            next_index += 1;
2929        }
2930        match pending.next().await {
2931            Some(res) => match res? {
2932                PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
2933                PartitionBootOutcome::Mismatch => mismatch_count += 1,
2934                PartitionBootOutcome::Repaired => repaired_count += 1,
2935            },
2936            None => break,
2937        }
2938    }
2939
2940    if repaired_count > 0 {
2941        tracing::warn!(
2942            repaired_partitions = repaired_count,
2943            total_partitions = n,
2944            "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
2945             (current_kid present but secret:<kid> missing, likely crash during prior boot)"
2946        );
2947    }
2948
2949    if mismatch_count > 0 {
2950        tracing::warn!(
2951            mismatched_partitions = mismatch_count,
2952            total_partitions = n,
2953            "stored/env secret mismatch on {mismatch_count} partitions — \
2954             env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
2955             run POST /v1/admin/rotate-waitpoint-secret to sync"
2956        );
2957    }
2958
2959    tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
2960    Ok(())
2961}
2962
2963/// Result of a waitpoint HMAC secret rotation across all execution partitions.
2964#[derive(Debug, Clone, serde::Serialize)]
2965pub struct RotateWaitpointSecretResult {
2966    /// Count of partitions that accepted the rotation.
2967    pub rotated: u16,
2968    /// Partition indices that failed — operator should investigate (Valkey
2969    /// outage, auth failure, cluster split). Rotation is idempotent, so a
2970    /// re-run after the underlying fault clears converges to the correct
2971    /// state.
2972    pub failed: Vec<u16>,
2973    /// New kid installed as current.
2974    pub new_kid: String,
2975}
2976
2977impl Server {
2978    /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
2979    /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
2980    /// as the new current kid. Idempotent: re-running with the same `new_kid`
2981    /// and `new_secret_hex` converges partitions to the same state.
2982    ///
2983    /// Returns a structured result so operators can see which partitions failed.
2984    /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
2985    pub async fn rotate_waitpoint_secret(
2986        &self,
2987        new_kid: &str,
2988        new_secret_hex: &str,
2989    ) -> Result<RotateWaitpointSecretResult, ServerError> {
2990        if new_kid.is_empty() || new_kid.contains(':') {
2991            return Err(ServerError::OperationFailed(
2992                "new_kid must be non-empty and must not contain ':'".into(),
2993            ));
2994        }
2995        if new_secret_hex.is_empty()
2996            || !new_secret_hex.len().is_multiple_of(2)
2997            || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
2998        {
2999            return Err(ServerError::OperationFailed(
3000                "new_secret_hex must be a non-empty even-length hex string".into(),
3001            ));
3002        }
3003
3004        // Single-writer gate. Concurrent rotates against the SAME operator
3005        // token are an attack pattern (or a retry-loop bug); legitimate
3006        // operators rotate monthly and can afford to serialize. Contention
3007        // returns ConcurrencyLimitExceeded("admin_rotate", 1) (→ HTTP 429
3008        // with a labelled error body) rather than queueing the HTTP
3009        // handler past the 120s endpoint timeout. Permit is held for
3010        // the full partition fan-out.
3011        let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3012            Ok(p) => p,
3013            Err(tokio::sync::TryAcquireError::NoPermits) => {
3014                return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3015            }
3016            Err(tokio::sync::TryAcquireError::Closed) => {
3017                return Err(ServerError::OperationFailed(
3018                    "admin rotate semaphore closed (server shutting down)".into(),
3019                ));
3020            }
3021        };
3022
3023        let n = self.config.partition_config.num_flow_partitions;
3024        // "now" is derived inside the FCALL from `redis.call("TIME")`
3025        // (consistency with validate_waitpoint_token and flow scanners);
3026        // grace_ms is a duration — safe to carry from config.
3027        let grace_ms = self.config.waitpoint_hmac_grace_ms;
3028
3029        // Parallelize the rotation fan-out with the same bounded
3030        // concurrency as boot init (BOOT_INIT_CONCURRENCY = 16). A 256-
3031        // partition sequential rotation takes ~7.7s at 30ms cross-AZ RTT,
3032        // uncomfortably close to the 120s HTTP endpoint timeout under
3033        // contention. Atomicity per partition now lives inside the
3034        // `ff_rotate_waitpoint_hmac_secret` FCALL (FCALL is atomic per
3035        // shard); parallelism across DIFFERENT partitions is safe. The
3036        // outer `admin_rotate_semaphore(1)` bounds server-wide concurrent
3037        // rotations, so this fan-out only affects a single in-flight
3038        // rotate call at a time.
3039        use futures::stream::{FuturesUnordered, StreamExt};
3040
3041        let mut rotated = 0u16;
3042        let mut failed = Vec::new();
3043        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3044        let mut next_index: u16 = 0;
3045
3046        loop {
3047            while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3048                let partition = Partition {
3049                    family: PartitionFamily::Execution,
3050                    index: next_index,
3051                };
3052                let idx = next_index;
3053                // Clone only what the per-partition future needs. The
3054                // new_kid / new_secret_hex references outlive the loop
3055                // (they come from the enclosing function args), but
3056                // FuturesUnordered needs 'static futures. Own the strings.
3057                let new_kid_owned = new_kid.to_owned();
3058                let new_secret_owned = new_secret_hex.to_owned();
3059                let partition_owned = partition;
3060                let fut = async move {
3061                    let outcome = self
3062                        .rotate_single_partition(
3063                            &partition_owned,
3064                            &new_kid_owned,
3065                            &new_secret_owned,
3066                            grace_ms,
3067                        )
3068                        .await;
3069                    (idx, partition_owned, outcome)
3070                };
3071                pending.push(fut);
3072                next_index += 1;
3073            }
3074            match pending.next().await {
3075                Some((idx, partition, outcome)) => match outcome {
3076                    Ok(()) => {
3077                        rotated += 1;
3078                        // Per-partition event → DEBUG (not INFO). Rationale:
3079                        // one rotate endpoint call produces 256 partition-level
3080                        // events, which would blow up paid aggregator budgets
3081                        // (Datadog/Splunk) at no operational value. The single
3082                        // aggregated audit event below is the compliance
3083                        // artifact. Failures stay at ERROR with per-partition
3084                        // detail — that's where operators need it.
3085                        tracing::debug!(
3086                            partition = %partition,
3087                            new_kid = %new_kid,
3088                            "waitpoint_hmac_rotated"
3089                        );
3090                    }
3091                    Err(e) => {
3092                        // Failures stay at ERROR (target=audit) per-partition —
3093                        // operators need the partition index + error to debug
3094                        // Valkey/config faults. Low cardinality in practice.
3095                        tracing::error!(
3096                            target: "audit",
3097                            partition = %partition,
3098                            err = %e,
3099                            "waitpoint_hmac_rotation_failed"
3100                        );
3101                        failed.push(idx);
3102                    }
3103                },
3104                None => break,
3105            }
3106        }
3107
3108        // Single aggregated audit event for the whole rotation. This is
3109        // the load-bearing compliance artifact — operators alert on
3110        // target="audit" at INFO level and this is the stable schema.
3111        tracing::info!(
3112            target: "audit",
3113            new_kid = %new_kid,
3114            total_partitions = n,
3115            rotated,
3116            failed_count = failed.len(),
3117            "waitpoint_hmac_rotation_complete"
3118        );
3119
3120        Ok(RotateWaitpointSecretResult {
3121            rotated,
3122            failed,
3123            new_kid: new_kid.to_owned(),
3124        })
3125    }
3126
3127    /// Rotate on a single partition by dispatching the
3128    /// `ff_rotate_waitpoint_hmac_secret` FCALL. FCALL is atomic per shard,
3129    /// so no external SETNX lock is needed — the script itself IS the
3130    /// atomicity boundary. Single source of truth (Lua); the Rust
3131    /// implementation that previously lived here was an exact duplicate
3132    /// and has been removed.
3133    async fn rotate_single_partition(
3134        &self,
3135        partition: &Partition,
3136        new_kid: &str,
3137        new_secret_hex: &str,
3138        grace_ms: u64,
3139    ) -> Result<(), ServerError> {
3140        let idx = IndexKeys::new(partition);
3141        let args = RotateWaitpointHmacSecretArgs {
3142            new_kid: new_kid.to_owned(),
3143            new_secret_hex: new_secret_hex.to_owned(),
3144            grace_ms,
3145        };
3146        let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3147            &self.client,
3148            &idx,
3149            &args,
3150        )
3151        .await
3152        .map_err(|e| match e {
3153            // Same kid + different secret. Map to the same 409-style
3154            // error the old Rust path returned so HTTP callers keep the
3155            // current surface.
3156            ff_script::ScriptError::RotationConflict(kid) => {
3157                ServerError::OperationFailed(format!(
3158                    "rotation conflict: kid {kid} already installed with a \
3159                     different secret. Either use a fresh kid or restore the \
3160                     original secret for this kid before retrying."
3161                ))
3162            }
3163            ff_script::ScriptError::Valkey(v) => ServerError::ValkeyContext {
3164                source: v,
3165                context: format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3166            },
3167            other => ServerError::OperationFailed(format!(
3168                "rotation failed on partition {partition}: {other}"
3169            )),
3170        })?;
3171        // Either outcome is a successful write from the operator's POV.
3172        // Rotated → new install; Noop → idempotent replay.
3173        let _ = outcome;
3174        Ok(())
3175    }
3176}
3177
3178// ── FCALL result parsing ──
3179
3180fn parse_create_result(
3181    raw: &Value,
3182    execution_id: &ExecutionId,
3183) -> Result<CreateExecutionResult, ServerError> {
3184    let arr = match raw {
3185        Value::Array(arr) => arr,
3186        _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3187    };
3188
3189    let status = match arr.first() {
3190        Some(Ok(Value::Int(n))) => *n,
3191        _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3192    };
3193
3194    if status == 1 {
3195        // Check sub-status: OK or DUPLICATE
3196        let sub = arr
3197            .get(1)
3198            .and_then(|v| match v {
3199                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3200                Ok(Value::SimpleString(s)) => Some(s.clone()),
3201                _ => None,
3202            })
3203            .unwrap_or_default();
3204
3205        if sub == "DUPLICATE" {
3206            Ok(CreateExecutionResult::Duplicate {
3207                execution_id: execution_id.clone(),
3208            })
3209        } else {
3210            Ok(CreateExecutionResult::Created {
3211                execution_id: execution_id.clone(),
3212                public_state: PublicState::Waiting,
3213            })
3214        }
3215    } else {
3216        let error_code = arr
3217            .get(1)
3218            .and_then(|v| match v {
3219                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3220                Ok(Value::SimpleString(s)) => Some(s.clone()),
3221                _ => None,
3222            })
3223            .unwrap_or_else(|| "unknown".to_owned());
3224        Err(ServerError::OperationFailed(format!(
3225            "ff_create_execution failed: {error_code}"
3226        )))
3227    }
3228}
3229
3230fn parse_cancel_result(
3231    raw: &Value,
3232    execution_id: &ExecutionId,
3233) -> Result<CancelExecutionResult, ServerError> {
3234    let arr = match raw {
3235        Value::Array(arr) => arr,
3236        _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3237    };
3238
3239    let status = match arr.first() {
3240        Some(Ok(Value::Int(n))) => *n,
3241        _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3242    };
3243
3244    if status == 1 {
3245        Ok(CancelExecutionResult::Cancelled {
3246            execution_id: execution_id.clone(),
3247            public_state: PublicState::Cancelled,
3248        })
3249    } else {
3250        let error_code = arr
3251            .get(1)
3252            .and_then(|v| match v {
3253                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3254                Ok(Value::SimpleString(s)) => Some(s.clone()),
3255                _ => None,
3256            })
3257            .unwrap_or_else(|| "unknown".to_owned());
3258        Err(ServerError::OperationFailed(format!(
3259            "ff_cancel_execution failed: {error_code}"
3260        )))
3261    }
3262}
3263
3264fn parse_budget_create_result(
3265    raw: &Value,
3266    budget_id: &BudgetId,
3267) -> Result<CreateBudgetResult, ServerError> {
3268    let arr = match raw {
3269        Value::Array(arr) => arr,
3270        _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3271    };
3272
3273    let status = match arr.first() {
3274        Some(Ok(Value::Int(n))) => *n,
3275        _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3276    };
3277
3278    if status == 1 {
3279        let sub = arr
3280            .get(1)
3281            .and_then(|v| match v {
3282                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3283                Ok(Value::SimpleString(s)) => Some(s.clone()),
3284                _ => None,
3285            })
3286            .unwrap_or_default();
3287
3288        if sub == "ALREADY_SATISFIED" {
3289            Ok(CreateBudgetResult::AlreadySatisfied {
3290                budget_id: budget_id.clone(),
3291            })
3292        } else {
3293            Ok(CreateBudgetResult::Created {
3294                budget_id: budget_id.clone(),
3295            })
3296        }
3297    } else {
3298        let error_code = arr
3299            .get(1)
3300            .and_then(|v| match v {
3301                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3302                Ok(Value::SimpleString(s)) => Some(s.clone()),
3303                _ => None,
3304            })
3305            .unwrap_or_else(|| "unknown".to_owned());
3306        Err(ServerError::OperationFailed(format!(
3307            "ff_create_budget failed: {error_code}"
3308        )))
3309    }
3310}
3311
3312fn parse_quota_create_result(
3313    raw: &Value,
3314    quota_policy_id: &QuotaPolicyId,
3315) -> Result<CreateQuotaPolicyResult, ServerError> {
3316    let arr = match raw {
3317        Value::Array(arr) => arr,
3318        _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3319    };
3320
3321    let status = match arr.first() {
3322        Some(Ok(Value::Int(n))) => *n,
3323        _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3324    };
3325
3326    if status == 1 {
3327        let sub = arr
3328            .get(1)
3329            .and_then(|v| match v {
3330                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3331                Ok(Value::SimpleString(s)) => Some(s.clone()),
3332                _ => None,
3333            })
3334            .unwrap_or_default();
3335
3336        if sub == "ALREADY_SATISFIED" {
3337            Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3338                quota_policy_id: quota_policy_id.clone(),
3339            })
3340        } else {
3341            Ok(CreateQuotaPolicyResult::Created {
3342                quota_policy_id: quota_policy_id.clone(),
3343            })
3344        }
3345    } else {
3346        let error_code = arr
3347            .get(1)
3348            .and_then(|v| match v {
3349                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3350                Ok(Value::SimpleString(s)) => Some(s.clone()),
3351                _ => None,
3352            })
3353            .unwrap_or_else(|| "unknown".to_owned());
3354        Err(ServerError::OperationFailed(format!(
3355            "ff_create_quota_policy failed: {error_code}"
3356        )))
3357    }
3358}
3359
3360// ── Flow FCALL result parsing ──
3361
3362fn parse_create_flow_result(
3363    raw: &Value,
3364    flow_id: &FlowId,
3365) -> Result<CreateFlowResult, ServerError> {
3366    let arr = match raw {
3367        Value::Array(arr) => arr,
3368        _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3369    };
3370    let status = match arr.first() {
3371        Some(Ok(Value::Int(n))) => *n,
3372        _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3373    };
3374    if status == 1 {
3375        let sub = fcall_field_str(arr, 1);
3376        if sub == "ALREADY_SATISFIED" {
3377            Ok(CreateFlowResult::AlreadySatisfied {
3378                flow_id: flow_id.clone(),
3379            })
3380        } else {
3381            Ok(CreateFlowResult::Created {
3382                flow_id: flow_id.clone(),
3383            })
3384        }
3385    } else {
3386        let error_code = fcall_field_str(arr, 1);
3387        Err(ServerError::OperationFailed(format!(
3388            "ff_create_flow failed: {error_code}"
3389        )))
3390    }
3391}
3392
3393fn parse_add_execution_to_flow_result(
3394    raw: &Value,
3395) -> Result<AddExecutionToFlowResult, ServerError> {
3396    let arr = match raw {
3397        Value::Array(arr) => arr,
3398        _ => {
3399            return Err(ServerError::Script(
3400                "ff_add_execution_to_flow: expected Array".into(),
3401            ))
3402        }
3403    };
3404    let status = match arr.first() {
3405        Some(Ok(Value::Int(n))) => *n,
3406        _ => {
3407            return Err(ServerError::Script(
3408                "ff_add_execution_to_flow: bad status code".into(),
3409            ))
3410        }
3411    };
3412    if status == 1 {
3413        let sub = fcall_field_str(arr, 1);
3414        let eid_str = fcall_field_str(arr, 2);
3415        let nc_str = fcall_field_str(arr, 3);
3416        let eid = ExecutionId::parse(&eid_str)
3417            .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3418        let nc: u32 = nc_str.parse().unwrap_or(0);
3419        if sub == "ALREADY_SATISFIED" {
3420            Ok(AddExecutionToFlowResult::AlreadyMember {
3421                execution_id: eid,
3422                node_count: nc,
3423            })
3424        } else {
3425            Ok(AddExecutionToFlowResult::Added {
3426                execution_id: eid,
3427                new_node_count: nc,
3428            })
3429        }
3430    } else {
3431        let error_code = fcall_field_str(arr, 1);
3432        Err(ServerError::OperationFailed(format!(
3433            "ff_add_execution_to_flow failed: {error_code}"
3434        )))
3435    }
3436}
3437
3438/// Outcome of parsing a raw `ff_cancel_flow` FCALL response.
3439///
3440/// Keeps `AlreadyTerminal` distinct from other script errors so the caller
3441/// can treat cancel on an already-cancelled/completed/failed flow as
3442/// idempotent success instead of surfacing a 400 to the client.
3443enum ParsedCancelFlow {
3444    Cancelled {
3445        policy: String,
3446        member_execution_ids: Vec<String>,
3447    },
3448    AlreadyTerminal,
3449}
3450
3451/// Parse the raw `ff_cancel_flow` FCALL response.
3452///
3453/// Returns [`ParsedCancelFlow::Cancelled`] on success, [`ParsedCancelFlow::AlreadyTerminal`]
3454/// when the flow was already in a terminal state (idempotent retry), or a
3455/// [`ServerError`] for any other failure.
3456fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3457    let arr = match raw {
3458        Value::Array(arr) => arr,
3459        _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3460    };
3461    let status = match arr.first() {
3462        Some(Ok(Value::Int(n))) => *n,
3463        _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3464    };
3465    if status != 1 {
3466        let error_code = fcall_field_str(arr, 1);
3467        if error_code == "flow_already_terminal" {
3468            return Ok(ParsedCancelFlow::AlreadyTerminal);
3469        }
3470        return Err(ServerError::OperationFailed(format!(
3471            "ff_cancel_flow failed: {error_code}"
3472        )));
3473    }
3474    // {1, "OK", cancellation_policy, member1, member2, ...}
3475    let policy = fcall_field_str(arr, 2);
3476    // Iterate to arr.len() rather than breaking on the first empty string —
3477    // safer against malformed Lua responses and clearer than a sentinel loop.
3478    let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3479    for i in 3..arr.len() {
3480        members.push(fcall_field_str(arr, i));
3481    }
3482    Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3483}
3484
3485fn parse_stage_dependency_edge_result(
3486    raw: &Value,
3487) -> Result<StageDependencyEdgeResult, ServerError> {
3488    let arr = match raw {
3489        Value::Array(arr) => arr,
3490        _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3491    };
3492    let status = match arr.first() {
3493        Some(Ok(Value::Int(n))) => *n,
3494        _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3495    };
3496    if status == 1 {
3497        let edge_id_str = fcall_field_str(arr, 2);
3498        let rev_str = fcall_field_str(arr, 3);
3499        let edge_id = EdgeId::parse(&edge_id_str)
3500            .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3501        let rev: u64 = rev_str.parse().unwrap_or(0);
3502        Ok(StageDependencyEdgeResult::Staged {
3503            edge_id,
3504            new_graph_revision: rev,
3505        })
3506    } else {
3507        let error_code = fcall_field_str(arr, 1);
3508        Err(ServerError::OperationFailed(format!(
3509            "ff_stage_dependency_edge failed: {error_code}"
3510        )))
3511    }
3512}
3513
3514fn parse_apply_dependency_result(
3515    raw: &Value,
3516) -> Result<ApplyDependencyToChildResult, ServerError> {
3517    let arr = match raw {
3518        Value::Array(arr) => arr,
3519        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3520    };
3521    let status = match arr.first() {
3522        Some(Ok(Value::Int(n))) => *n,
3523        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3524    };
3525    if status == 1 {
3526        let sub = fcall_field_str(arr, 1);
3527        if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3528            Ok(ApplyDependencyToChildResult::AlreadyApplied)
3529        } else {
3530            // OK status — field at index 2 is unsatisfied count
3531            let count_str = fcall_field_str(arr, 2);
3532            let count: u32 = count_str.parse().unwrap_or(0);
3533            Ok(ApplyDependencyToChildResult::Applied {
3534                unsatisfied_count: count,
3535            })
3536        }
3537    } else {
3538        let error_code = fcall_field_str(arr, 1);
3539        Err(ServerError::OperationFailed(format!(
3540            "ff_apply_dependency_to_child failed: {error_code}"
3541        )))
3542    }
3543}
3544
3545fn parse_deliver_signal_result(
3546    raw: &Value,
3547    signal_id: &SignalId,
3548) -> Result<DeliverSignalResult, ServerError> {
3549    let arr = match raw {
3550        Value::Array(arr) => arr,
3551        _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3552    };
3553    let status = match arr.first() {
3554        Some(Ok(Value::Int(n))) => *n,
3555        _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3556    };
3557    if status == 1 {
3558        let sub = fcall_field_str(arr, 1);
3559        if sub == "DUPLICATE" {
3560            // ok_duplicate(existing_signal_id) → {1, "DUPLICATE", existing_signal_id}
3561            let existing_str = fcall_field_str(arr, 2);
3562            let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3563            Ok(DeliverSignalResult::Duplicate {
3564                existing_signal_id: existing_id,
3565            })
3566        } else {
3567            // ok(signal_id, effect) → {1, "OK", signal_id, effect}
3568            let effect = fcall_field_str(arr, 3);
3569            Ok(DeliverSignalResult::Accepted {
3570                signal_id: signal_id.clone(),
3571                effect,
3572            })
3573        }
3574    } else {
3575        let error_code = fcall_field_str(arr, 1);
3576        Err(ServerError::OperationFailed(format!(
3577            "ff_deliver_signal failed: {error_code}"
3578        )))
3579    }
3580}
3581
3582fn parse_change_priority_result(
3583    raw: &Value,
3584    execution_id: &ExecutionId,
3585) -> Result<ChangePriorityResult, ServerError> {
3586    let arr = match raw {
3587        Value::Array(arr) => arr,
3588        _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3589    };
3590    let status = match arr.first() {
3591        Some(Ok(Value::Int(n))) => *n,
3592        _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3593    };
3594    if status == 1 {
3595        Ok(ChangePriorityResult::Changed {
3596            execution_id: execution_id.clone(),
3597        })
3598    } else {
3599        let error_code = fcall_field_str(arr, 1);
3600        Err(ServerError::OperationFailed(format!(
3601            "ff_change_priority failed: {error_code}"
3602        )))
3603    }
3604}
3605
3606fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3607    let arr = match raw {
3608        Value::Array(arr) => arr,
3609        _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3610    };
3611    let status = match arr.first() {
3612        Some(Ok(Value::Int(n))) => *n,
3613        _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3614    };
3615    if status == 1 {
3616        // ok("0") for normal replay, ok(N) for skipped flow member
3617        let unsatisfied = fcall_field_str(arr, 2);
3618        let ps = if unsatisfied == "0" {
3619            PublicState::Waiting
3620        } else {
3621            PublicState::WaitingChildren
3622        };
3623        Ok(ReplayExecutionResult::Replayed { public_state: ps })
3624    } else {
3625        let error_code = fcall_field_str(arr, 1);
3626        Err(ServerError::OperationFailed(format!(
3627            "ff_replay_execution failed: {error_code}"
3628        )))
3629    }
3630}
3631
3632/// Extract a string from an FCALL result array at the given index.
3633/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
3634/// for transport-level variants. Business-logic variants keep their code as
3635/// `ServerError::Script(String)` so HTTP clients see a stable message.
3636///
3637/// Why this exists: before R2, the stream handlers did
3638/// `ScriptError → format!() → ServerError::Script(String)`, which erased
3639/// the ErrorKind and made `ServerError::is_retryable()` always return
3640/// false. Retry-capable clients (cairn-fabric) would not retry a legit
3641/// transient error like `IoError`.
3642fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3643    match e {
3644        ff_script::error::ScriptError::Valkey(valkey_err) => ServerError::ValkeyContext {
3645            source: valkey_err,
3646            context: "stream FCALL transport".into(),
3647        },
3648        other => ServerError::Script(other.to_string()),
3649    }
3650}
3651
3652fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3653    match arr.get(index) {
3654        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3655        Some(Ok(Value::SimpleString(s))) => s.clone(),
3656        Some(Ok(Value::Int(n))) => n.to_string(),
3657        _ => String::new(),
3658    }
3659}
3660
3661/// Parse ff_report_usage_and_check result.
3662/// Standard format: {1, "OK"}, {1, "SOFT_BREACH", dim, current, limit},
3663///                  {1, "HARD_BREACH", dim, current, limit}, {1, "ALREADY_APPLIED"}
3664fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3665    let arr = match raw {
3666        Value::Array(arr) => arr,
3667        _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3668    };
3669    let status_code = match arr.first() {
3670        Some(Ok(Value::Int(n))) => *n,
3671        _ => {
3672            return Err(ServerError::Script(
3673                "ff_report_usage_and_check: expected Int status code".into(),
3674            ))
3675        }
3676    };
3677    if status_code != 1 {
3678        let error_code = fcall_field_str(arr, 1);
3679        return Err(ServerError::OperationFailed(format!(
3680            "ff_report_usage_and_check failed: {error_code}"
3681        )));
3682    }
3683    let sub_status = fcall_field_str(arr, 1);
3684    match sub_status.as_str() {
3685        "OK" => Ok(ReportUsageResult::Ok),
3686        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3687        "SOFT_BREACH" => {
3688            let dim = fcall_field_str(arr, 2);
3689            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3690            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3691            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3692        }
3693        "HARD_BREACH" => {
3694            let dim = fcall_field_str(arr, 2);
3695            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3696            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3697            Ok(ReportUsageResult::HardBreach {
3698                dimension: dim,
3699                current_usage: current,
3700                hard_limit: limit,
3701            })
3702        }
3703        _ => Err(ServerError::OperationFailed(format!(
3704            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3705        ))),
3706    }
3707}
3708
3709fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3710    let arr = match raw {
3711        Value::Array(arr) => arr,
3712        _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3713    };
3714    let status = match arr.first() {
3715        Some(Ok(Value::Int(n))) => *n,
3716        _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3717    };
3718    if status == 1 {
3719        let sub = fcall_field_str(arr, 1);
3720        if sub == "ALREADY_SATISFIED" {
3721            let reason = fcall_field_str(arr, 2);
3722            Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3723        } else {
3724            let lid = fcall_field_str(arr, 2);
3725            let epoch = fcall_field_str(arr, 3);
3726            Ok(RevokeLeaseResult::Revoked {
3727                lease_id: lid,
3728                lease_epoch: epoch,
3729            })
3730        }
3731    } else {
3732        let error_code = fcall_field_str(arr, 1);
3733        Err(ServerError::OperationFailed(format!(
3734            "ff_revoke_lease failed: {error_code}"
3735        )))
3736    }
3737}
3738
3739/// Detect Valkey errors indicating the Lua function library is not loaded.
3740///
3741/// After a failover, the new primary may not have the library if replication
3742/// was incomplete. Valkey returns `ERR Function not loaded` for FCALL calls
3743/// targeting missing functions.
3744fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3745    if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
3746        return true;
3747    }
3748    e.detail()
3749        .map(|d| {
3750            d.contains("Function not loaded")
3751                || d.contains("No matching function")
3752                || d.contains("function not found")
3753        })
3754        .unwrap_or(false)
3755        || e.to_string().contains("Function not loaded")
3756}
3757
3758/// Free-function form of [`Server::fcall_with_reload`] — callable from
3759/// background tasks that own a cloned `Client` but no `&Server`.
3760async fn fcall_with_reload_on_client(
3761    client: &Client,
3762    function: &str,
3763    keys: &[&str],
3764    args: &[&str],
3765) -> Result<Value, ServerError> {
3766    match client.fcall(function, keys, args).await {
3767        Ok(v) => Ok(v),
3768        Err(e) if is_function_not_loaded(&e) => {
3769            tracing::warn!(function, "Lua library not found on server, reloading");
3770            ff_script::loader::ensure_library(client)
3771                .await
3772                .map_err(ServerError::LibraryLoad)?;
3773            client
3774                .fcall(function, keys, args)
3775                .await
3776                .map_err(ServerError::Valkey)
3777        }
3778        Err(e) => Err(ServerError::Valkey(e)),
3779    }
3780}
3781
3782/// Build the `ff_cancel_execution` KEYS (21) and ARGV (5) by pre-reading
3783/// dynamic fields from `exec_core`. Shared by [`Server::cancel_execution`]
3784/// and the async cancel_flow member-dispatch path.
3785async fn build_cancel_execution_fcall(
3786    client: &Client,
3787    partition_config: &PartitionConfig,
3788    args: &CancelExecutionArgs,
3789) -> Result<(Vec<String>, Vec<String>), ServerError> {
3790    let partition = execution_partition(&args.execution_id, partition_config);
3791    let ctx = ExecKeyContext::new(&partition, &args.execution_id);
3792    let idx = IndexKeys::new(&partition);
3793
3794    let lane_str: Option<String> = client
3795        .hget(&ctx.core(), "lane_id")
3796        .await
3797        .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
3798    let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
3799
3800    let dyn_fields: Vec<Option<String>> = client
3801        .cmd("HMGET")
3802        .arg(ctx.core())
3803        .arg("current_attempt_index")
3804        .arg("current_waitpoint_id")
3805        .arg("current_worker_instance_id")
3806        .execute()
3807        .await
3808        .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET cancel pre-read".into() })?;
3809
3810    let att_idx_val = dyn_fields.first()
3811        .and_then(|v| v.as_ref())
3812        .and_then(|s| s.parse::<u32>().ok())
3813        .unwrap_or(0);
3814    let att_idx = AttemptIndex::new(att_idx_val);
3815    let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
3816    let wp_id = if wp_id_str.is_empty() {
3817        WaitpointId::new()
3818    } else {
3819        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
3820    };
3821    let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
3822    let wiid = WorkerInstanceId::new(&wiid_str);
3823
3824    let keys: Vec<String> = vec![
3825        ctx.core(),                              // 1
3826        ctx.attempt_hash(att_idx),               // 2
3827        ctx.stream_meta(att_idx),                // 3
3828        ctx.lease_current(),                     // 4
3829        ctx.lease_history(),                     // 5
3830        idx.lease_expiry(),                      // 6
3831        idx.worker_leases(&wiid),                // 7
3832        ctx.suspension_current(),                // 8
3833        ctx.waitpoint(&wp_id),                   // 9
3834        ctx.waitpoint_condition(&wp_id),         // 10
3835        idx.suspension_timeout(),                // 11
3836        idx.lane_terminal(&lane),                // 12
3837        idx.attempt_timeout(),                   // 13
3838        idx.execution_deadline(),                // 14
3839        idx.lane_eligible(&lane),                // 15
3840        idx.lane_delayed(&lane),                 // 16
3841        idx.lane_blocked_dependencies(&lane),    // 17
3842        idx.lane_blocked_budget(&lane),          // 18
3843        idx.lane_blocked_quota(&lane),           // 19
3844        idx.lane_blocked_route(&lane),           // 20
3845        idx.lane_blocked_operator(&lane),        // 21
3846    ];
3847    let argv: Vec<String> = vec![
3848        args.execution_id.to_string(),
3849        args.reason.clone(),
3850        args.source.to_string(),
3851        args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
3852        args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
3853    ];
3854    Ok((keys, argv))
3855}
3856
3857/// Backoff schedule for transient Valkey errors during async cancel_flow
3858/// dispatch. Length = retry-attempt count (including the initial attempt).
3859/// The last entry is not slept on because it's the final attempt.
3860const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
3861
3862/// Reach into a `ServerError` for a `ferriskey::ErrorKind` when one is
3863/// available. Matches the variants that can carry a transport-layer kind:
3864/// direct `Valkey`, `ValkeyContext`, and `LibraryLoad` (which itself wraps
3865/// a `ferriskey::Error`).
3866fn extract_valkey_kind(e: &ServerError) -> Option<ferriskey::ErrorKind> {
3867    match e {
3868        ServerError::Valkey(err) | ServerError::ValkeyContext { source: err, .. } => {
3869            Some(err.kind())
3870        }
3871        ServerError::LibraryLoad(load_err) => load_err.valkey_kind(),
3872        _ => None,
3873    }
3874}
3875
3876/// Cancel a single member execution from a cancel_flow dispatch context.
3877/// Parses the flow-member EID string, builds the FCALL via the shared helper,
3878/// and executes with the same reload-on-failover semantics as the inline path.
3879///
3880/// Wrapped in a bounded retry loop (see [`CANCEL_MEMBER_RETRY_DELAYS_MS`]) so
3881/// that transient Valkey errors mid-dispatch (failover, `TryAgain`,
3882/// `ClusterDown`, `IoError`, `FatalSendError`) do not silently leak
3883/// non-cancelled members. `FatalReceiveError` and non-retryable kinds bubble
3884/// up immediately — those either indicate the Lua ran server-side anyway or a
3885/// permanent mismatch that retries cannot fix.
3886async fn cancel_member_execution(
3887    client: &Client,
3888    partition_config: &PartitionConfig,
3889    eid_str: &str,
3890    reason: &str,
3891    now: TimestampMs,
3892) -> Result<(), ServerError> {
3893    let execution_id = ExecutionId::parse(eid_str)
3894        .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
3895    let args = CancelExecutionArgs {
3896        execution_id: execution_id.clone(),
3897        reason: reason.to_owned(),
3898        source: CancelSource::OperatorOverride,
3899        lease_id: None,
3900        lease_epoch: None,
3901        attempt_id: None,
3902        now,
3903    };
3904
3905    let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
3906    for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
3907        let is_last = attempt_idx + 1 == attempts;
3908        match try_cancel_member_once(client, partition_config, &args).await {
3909            Ok(()) => return Ok(()),
3910            Err(e) => {
3911                // Only retry transport-layer transients; business-logic
3912                // errors (Script / OperationFailed / NotFound / InvalidInput)
3913                // won't change on retry.
3914                let retryable = extract_valkey_kind(&e)
3915                    .map(ff_script::retry::is_retryable_kind)
3916                    .unwrap_or(false);
3917                if !retryable || is_last {
3918                    return Err(e);
3919                }
3920                tracing::debug!(
3921                    execution_id = %execution_id,
3922                    attempt = attempt_idx + 1,
3923                    delay_ms = *delay_ms,
3924                    error = %e,
3925                    "cancel_member_execution: transient error, retrying"
3926                );
3927                tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
3928            }
3929        }
3930    }
3931    // Unreachable: the loop above either returns Ok, returns Err on the
3932    // last attempt, or returns Err on a non-retryable error. Keep a
3933    // defensive fallback for future edits to the retry structure.
3934    Err(ServerError::OperationFailed(format!(
3935        "cancel_member_execution: retries exhausted for {execution_id}"
3936    )))
3937}
3938
3939/// Single cancel attempt — pre-read + FCALL + parse. Factored out so the
3940/// retry loop in [`cancel_member_execution`] can invoke it cleanly.
3941async fn try_cancel_member_once(
3942    client: &Client,
3943    partition_config: &PartitionConfig,
3944    args: &CancelExecutionArgs,
3945) -> Result<(), ServerError> {
3946    let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
3947    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
3948    let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
3949    let raw =
3950        fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
3951    parse_cancel_result(&raw, &args.execution_id).map(|_| ())
3952}
3953
3954fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
3955    let arr = match raw {
3956        Value::Array(arr) => arr,
3957        _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
3958    };
3959    let status = match arr.first() {
3960        Some(Ok(Value::Int(n))) => *n,
3961        _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
3962    };
3963    if status == 1 {
3964        let next_str = fcall_field_str(arr, 2);
3965        let next_ms: i64 = next_str.parse().unwrap_or(0);
3966        Ok(ResetBudgetResult::Reset {
3967            next_reset_at: TimestampMs::from_millis(next_ms),
3968        })
3969    } else {
3970        let error_code = fcall_field_str(arr, 1);
3971        Err(ServerError::OperationFailed(format!(
3972            "ff_reset_budget failed: {error_code}"
3973        )))
3974    }
3975}
3976
3977#[cfg(test)]
3978mod tests {
3979    use super::*;
3980    use ferriskey::ErrorKind;
3981
3982    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
3983        ferriskey::Error::from((kind, "synthetic"))
3984    }
3985
3986    #[test]
3987    fn is_retryable_valkey_variant_uses_kind_table() {
3988        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
3989        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
3990        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
3991        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
3992        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
3993
3994        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
3995        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
3996        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
3997        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
3998        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Ask)).is_retryable());
3999        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4000    }
4001
4002    #[test]
4003    fn is_retryable_valkey_context_uses_kind_table() {
4004        let err = ServerError::ValkeyContext {
4005            source: mk_fk_err(ErrorKind::IoError),
4006            context: "HGET test".into(),
4007        };
4008        assert!(err.is_retryable());
4009
4010        let err = ServerError::ValkeyContext {
4011            source: mk_fk_err(ErrorKind::AuthenticationFailed),
4012            context: "auth".into(),
4013        };
4014        assert!(!err.is_retryable());
4015    }
4016
4017    #[test]
4018    fn is_retryable_library_load_delegates_to_inner_kind() {
4019        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4020            mk_fk_err(ErrorKind::IoError),
4021        ));
4022        assert!(err.is_retryable());
4023
4024        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4025            mk_fk_err(ErrorKind::AuthenticationFailed),
4026        ));
4027        assert!(!err.is_retryable());
4028
4029        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4030            expected: "1".into(),
4031            got: "2".into(),
4032        });
4033        assert!(!err.is_retryable());
4034    }
4035
4036    #[test]
4037    fn is_retryable_business_logic_variants_are_false() {
4038        assert!(!ServerError::NotFound("x".into()).is_retryable());
4039        assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4040        assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4041        assert!(!ServerError::Script("x".into()).is_retryable());
4042        assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4043    }
4044
4045    #[test]
4046    fn valkey_kind_delegates_through_library_load() {
4047        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4048            mk_fk_err(ErrorKind::ClusterDown),
4049        ));
4050        assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
4051
4052        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4053            expected: "1".into(),
4054            got: "2".into(),
4055        });
4056        assert_eq!(err.valkey_kind(), None);
4057    }
4058
4059    // ── Valkey version check (RFC-011 §13) ──
4060
4061    #[test]
4062    fn parse_valkey_major_prefers_valkey_version_over_redis_version() {
4063        // Valkey 8.x+ pins redis_version to 7.2.4 for Redis-client compat
4064        // and exposes the real version in valkey_version. Parser must use
4065        // valkey_version when both are present.
4066        let info = "\
4067# Server\r\n\
4068redis_version:7.2.4\r\n\
4069valkey_version:9.0.3\r\n\
4070server_mode:cluster\r\n\
4071os:Linux\r\n";
4072        assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4073    }
4074
4075    #[test]
4076    fn parse_valkey_major_real_valkey_8_cluster_body() {
4077        // Actual INFO server response observed on valkey/valkey:latest in
4078        // cluster mode (CI matrix): redis_version compat-pinned to 7.2.4,
4079        // valkey_version authoritative at 9.0.3.
4080        let info = "\
4081# Server\r\n\
4082redis_version:7.2.4\r\n\
4083server_name:valkey\r\n\
4084valkey_version:9.0.3\r\n\
4085valkey_release_stage:ga\r\n\
4086redis_git_sha1:00000000\r\n\
4087server_mode:cluster\r\n";
4088        assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4089    }
4090
4091    #[test]
4092    fn parse_valkey_major_falls_back_to_redis_version_on_valkey_7() {
4093        // Valkey 7.x doesn't emit valkey_version; parser falls back.
4094        let info = "# Server\r\nredis_version:7.2.4\r\nfoo:bar\r\n";
4095        assert_eq!(parse_valkey_major_version(info).unwrap(), 7);
4096    }
4097
4098    #[test]
4099    fn parse_valkey_major_errors_when_no_version_field() {
4100        let info = "# Server\r\nfoo:bar\r\n";
4101        let err = parse_valkey_major_version(info).unwrap_err();
4102        assert!(matches!(err, ServerError::OperationFailed(_)));
4103        assert!(
4104            err.to_string().contains("missing"),
4105            "expected 'missing' in message, got: {err}"
4106        );
4107    }
4108
4109    #[test]
4110    fn parse_valkey_major_errors_on_non_numeric() {
4111        let info = "valkey_version:invalid.x.y\n";
4112        let err = parse_valkey_major_version(info).unwrap_err();
4113        assert!(matches!(err, ServerError::OperationFailed(_)));
4114        assert!(err.to_string().contains("non-numeric major"));
4115    }
4116
4117    #[test]
4118    fn extract_info_body_unwraps_cluster_map() {
4119        // Simulates cluster-mode INFO response: map of node_addr → body.
4120        // extract_info_body picks any entry's body.
4121        let body_text = "\
4122# Server\r\n\
4123redis_version:7.2.4\r\n\
4124valkey_version:9.0.3\r\n";
4125        let node_entry = (
4126            Value::SimpleString("127.0.0.1:7000".to_string()),
4127            Value::VerbatimString {
4128                format: ferriskey::value::VerbatimFormat::Text,
4129                text: body_text.to_string(),
4130            },
4131        );
4132        let map = Value::Map(vec![node_entry]);
4133        let body = extract_info_body(&map).unwrap();
4134        assert_eq!(body, body_text);
4135        assert_eq!(parse_valkey_major_version(&body).unwrap(), 9);
4136    }
4137
4138    #[test]
4139    fn extract_info_body_handles_simple_string() {
4140        let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4141        let v = Value::SimpleString(body_text.to_string());
4142        let body = extract_info_body(&v).unwrap();
4143        assert_eq!(body, body_text);
4144    }
4145
4146    #[test]
4147    fn valkey_version_too_low_is_not_retryable() {
4148        let err = ServerError::ValkeyVersionTooLow {
4149            detected: "7".into(),
4150            required: "8.0".into(),
4151        };
4152        assert!(!err.is_retryable());
4153        assert_eq!(err.valkey_kind(), None);
4154    }
4155
4156    #[test]
4157    fn valkey_version_too_low_error_message_includes_both_versions() {
4158        let err = ServerError::ValkeyVersionTooLow {
4159            detected: "7".into(),
4160            required: "8.0".into(),
4161        };
4162        let msg = err.to_string();
4163        assert!(msg.contains("7"), "detected version in message: {msg}");
4164        assert!(msg.contains("8.0"), "required version in message: {msg}");
4165        assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4166    }
4167}