Skip to main content

ff_server/
server.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9    AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10    CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13    ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14    DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15    ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16    ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17    RevokeLeaseResult,
18    StageDependencyEdgeArgs, StageDependencyEdgeResult,
19};
20use ff_core::keys::{
21    self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
22    IndexKeys, QuotaKeyContext,
23};
24use ff_core::partition::{
25    budget_partition, execution_partition, flow_partition, quota_partition, Partition,
26    PartitionConfig, PartitionFamily,
27};
28use ff_core::state::{PublicState, StateVector};
29use ff_core::types::*;
30use ff_engine::Engine;
31use ff_script::retry::is_retryable_kind;
32
33use crate::config::ServerConfig;
34
35/// Upper bound on `member_execution_ids` returned in the
36/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
37/// terminal state (idempotent retry). The first (non-idempotent) cancel call
38/// returns the full list; retries only need a sample.
39const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
40
41/// FlowFabric server — connects everything together.
42///
43/// Manages the Valkey connection, Lua library loading, background scanners,
44/// and provides a minimal API for Phase 1.
45pub struct Server {
46    client: Client,
47    /// Dedicated Valkey connection used EXCLUSIVELY for stream-op calls:
48    /// `xread_block` tails AND `ff_read_attempt_stream` range reads.
49    /// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
50    /// processes commands FIFO on it.
51    ///
52    /// Two head-of-line risks motivate the split from the main client:
53    ///
54    /// * **Blocking**: `XREAD BLOCK 30_000` holds the read side until a
55    ///   new entry arrives or `block_ms` elapses.
56    /// * **Large replies**: `XRANGE … COUNT 10_000` with ~64 KB per
57    ///   frame returns a multi-MB reply serialized on one connection.
58    ///
59    /// Sharing either load with the main client would starve every other
60    /// FCALL (create_execution, claim, rotate_waitpoint_secret,
61    /// budget/quota, admin endpoints) AND every engine scanner.
62    ///
63    /// Kept separate from `client` and from the `Engine` scanner client so
64    /// tail latency cannot couple to foreground API latency or background
65    /// scanner cadence. See RFC-006 Impl Notes for the cascading-failure
66    /// rationale.
67    tail_client: Client,
68    /// Bounds concurrent stream-op calls server-wide — read AND tail
69    /// combined. Each caller acquires one permit for the duration of its
70    /// Valkey round-trip(s); contention surfaces as HTTP 429 at the REST
71    /// boundary, not as a silent queue on the stream connection. Default
72    /// size is `FF_MAX_CONCURRENT_STREAM_OPS` (64; legacy env
73    /// `FF_MAX_CONCURRENT_TAIL` accepted for one release).
74    ///
75    /// Read and tail share the same pool deliberately: they share the
76    /// `tail_client`, so fairness accounting must be unified or a flood
77    /// of one can starve the other. The semaphore is also `close()`d on
78    /// shutdown so no new stream ops can start while existing ones drain
79    /// (see `Server::shutdown`).
80    stream_semaphore: Arc<tokio::sync::Semaphore>,
81    /// Serializes `XREAD BLOCK` calls against `tail_client`.
82    ///
83    /// `ferriskey::Client` is a pipelined multiplexed connection — Valkey
84    /// processes commands FIFO on one socket. `XREAD BLOCK` holds the
85    /// connection's read side for the full `block_ms`, so two parallel
86    /// BLOCKs sent down the same mux serialize: the second waits for the
87    /// first to return before its own BLOCK even begins at the server.
88    /// Meanwhile ferriskey's per-call `request_timeout` (auto-extended to
89    /// `block_ms + 500ms`) starts at future-poll on the CLIENT side, so
90    /// the second call's timeout fires before its turn at the server —
91    /// spurious `timed_out` errors under concurrent tail load.
92    ///
93    /// Explicit serialization around `xread_block` removes the
94    /// silent-failure mode: concurrent tails queue on this Mutex (inside
95    /// an already-acquired semaphore permit), then dispatch one at a
96    /// time with their full `block_ms` budget intact. The semaphore
97    /// ceiling (`max_concurrent_stream_ops`) effectively becomes queue
98    /// depth; throughput on the tail client is 1 BLOCK at a time.
99    ///
100    /// V2 upgrade: a pool of N dedicated `ferriskey::Client` connections
101    /// replacing the single `tail_client` + this Mutex. Deferred; the
102    /// Mutex here is correct v1 behavior.
103    ///
104    /// XRANGE reads (`read_attempt_stream`) are NOT gated by this Mutex —
105    /// XRANGE is non-blocking at the server, so pipelined XRANGEs on one
106    /// mux complete in microseconds each and don't trigger the same
107    /// client-side timeout race. Keeping reads unserialized preserves
108    /// read throughput.
109    xread_block_lock: Arc<tokio::sync::Mutex<()>>,
110    /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
111    /// operators rotate ~monthly and can afford to serialize; concurrent
112    /// rotate requests are an attack or misbehaving script. Holding the
113    /// permit also guards against interleaved partial rotations on the
114    /// Server side of the per-partition locks, surfacing contention as
115    /// HTTP 429 instead of silently queueing and blowing past the 120s
116    /// HTTP timeout. See `rotate_waitpoint_secret` handler.
117    admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
118    engine: Engine,
119    config: ServerConfig,
120    /// Background tasks spawned by async handlers (e.g. cancel_flow member
121    /// dispatch). Drained on shutdown with a bounded timeout.
122    background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
123}
124
125/// Server error type.
126#[derive(Debug, thiserror::Error)]
127pub enum ServerError {
128    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
129    #[error("valkey: {0}")]
130    Valkey(#[from] ferriskey::Error),
131    /// Valkey error with additional context (preserves ErrorKind via #[source]).
132    #[error("valkey ({context}): {source}")]
133    ValkeyContext {
134        #[source]
135        source: ferriskey::Error,
136        context: String,
137    },
138    #[error("config: {0}")]
139    Config(#[from] crate::config::ConfigError),
140    #[error("library load: {0}")]
141    LibraryLoad(#[from] ff_script::loader::LoadError),
142    #[error("partition mismatch: {0}")]
143    PartitionMismatch(String),
144    #[error("not found: {0}")]
145    NotFound(String),
146    #[error("invalid input: {0}")]
147    InvalidInput(String),
148    #[error("operation failed: {0}")]
149    OperationFailed(String),
150    #[error("script: {0}")]
151    Script(String),
152    /// Server-wide concurrency limit reached on a labelled pool. Surfaces
153    /// as HTTP 429 at the REST boundary so load balancers and clients can
154    /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
155    /// etc.) identifies WHICH pool is exhausted so operators aren't
156    /// misled by a single "tail unavailable" string when the real fault
157    /// is rotation contention.
158    ///
159    /// Fields: (source_label, max_permits).
160    #[error("too many concurrent {0} calls (max: {1})")]
161    ConcurrencyLimitExceeded(&'static str, u32),
162    /// Detected Valkey version is below the RFC-011 §13 minimum. The hash-slot
163    /// co-location design and Valkey Functions API behavior the engine depends
164    /// on were introduced/stabilized in 8.0; older versions silently behave
165    /// differently on some cluster edge cases. Operator action: upgrade Valkey.
166    #[error(
167        "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
168    )]
169    ValkeyVersionTooLow {
170        detected: String,
171        required: String,
172    },
173}
174
175impl ServerError {
176    /// Returns the underlying ferriskey ErrorKind, if this error carries one.
177    /// Covers direct Valkey variants and library-load failures that bubble a
178    /// `ferriskey::Error` through `LoadError::Valkey`.
179    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
180        match self {
181            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
182            Self::LibraryLoad(e) => e.valkey_kind(),
183            _ => None,
184        }
185    }
186
187    /// Whether this error is safely retryable by a caller. Semantics match
188    /// `ScriptError::class() == Retryable` for Lua errors plus a kind-aware
189    /// check for transport/library-load failures. Business-logic rejections
190    /// (NotFound, InvalidInput, OperationFailed, Script, Config, PartitionMismatch)
191    /// return false — those won't change on retry.
192    pub fn is_retryable(&self) -> bool {
193        match self {
194            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
195                is_retryable_kind(e.kind())
196            }
197            Self::LibraryLoad(load_err) => load_err
198                .valkey_kind()
199                .map(is_retryable_kind)
200                .unwrap_or(false),
201            Self::Config(_)
202            | Self::PartitionMismatch(_)
203            | Self::NotFound(_)
204            | Self::InvalidInput(_)
205            | Self::OperationFailed(_)
206            | Self::Script(_) => false,
207            // Back off and retry — the bound is a server-side permit pool,
208            // so the retry will succeed once a permit frees up. Applies
209            // equally to stream ops, admin rotate, etc.
210            Self::ConcurrencyLimitExceeded(_, _) => true,
211            // Operator must upgrade Valkey; a retry at the caller won't help.
212            Self::ValkeyVersionTooLow { .. } => false,
213        }
214    }
215}
216
217impl Server {
218    /// Start the FlowFabric server.
219    ///
220    /// Boot sequence:
221    /// 1. Connect to Valkey
222    /// 2. Validate or create partition config key
223    /// 3. Load the FlowFabric Lua library
224    /// 4. Start engine (14 background scanners)
225    pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
226        // Step 1: Connect to Valkey via ClientBuilder
227        tracing::info!(
228            host = %config.host, port = config.port,
229            tls = config.tls, cluster = config.cluster,
230            "connecting to Valkey"
231        );
232        let mut builder = ClientBuilder::new()
233            .host(&config.host, config.port)
234            .connect_timeout(Duration::from_secs(10))
235            .request_timeout(Duration::from_millis(5000));
236        if config.tls {
237            builder = builder.tls();
238        }
239        if config.cluster {
240            builder = builder.cluster();
241        }
242        let client = builder
243            .build()
244            .await
245            .map_err(|e| ServerError::ValkeyContext { source: e, context: "connect".into() })?;
246
247        // Verify connectivity
248        let pong: String = client
249            .cmd("PING")
250            .execute()
251            .await
252            .map_err(|e| ServerError::ValkeyContext { source: e, context: "PING".into() })?;
253        if pong != "PONG" {
254            return Err(ServerError::OperationFailed(format!(
255                "unexpected PING response: {pong}"
256            )));
257        }
258        tracing::info!("Valkey connection established");
259
260        // Step 1b: Verify Valkey version meets the RFC-011 §13 minimum (8.0).
261        // Tolerates a rolling upgrade via a 60s exponential-backoff budget
262        // per RFC-011 §9.17 — transient INFO errors during a node restart
263        // don't trip the check until the whole budget is exhausted.
264        verify_valkey_version(&client).await?;
265
266        // Step 2: Validate or create partition config
267        validate_or_create_partition_config(&client, &config.partition_config).await?;
268
269        // Step 2b: Install waitpoint HMAC secret into every execution partition
270        // (RFC-004 §Waitpoint Security). Fail-fast: if any partition fails,
271        // the server refuses to start — a partial install would silently
272        // reject signal deliveries on half the partitions.
273        initialize_waitpoint_hmac_secret(
274            &client,
275            &config.partition_config,
276            &config.waitpoint_hmac_secret,
277        )
278        .await?;
279
280        // Step 3: Load Lua library (skippable for tests where fixture already loaded)
281        if !config.skip_library_load {
282            tracing::info!("loading flowfabric Lua library");
283            ff_script::loader::ensure_library(&client)
284                .await
285                .map_err(ServerError::LibraryLoad)?;
286        } else {
287            tracing::info!("skipping library load (skip_library_load=true)");
288        }
289
290        // Step 4: Start engine with scanners
291        // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
292        let engine_cfg = ff_engine::EngineConfig {
293            partition_config: config.partition_config,
294            lanes: config.lanes.clone(),
295            lease_expiry_interval: config.engine_config.lease_expiry_interval,
296            delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
297            index_reconciler_interval: config.engine_config.index_reconciler_interval,
298            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
299            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
300            pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
301            retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
302            budget_reset_interval: config.engine_config.budget_reset_interval,
303            budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
304            quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
305            unblock_interval: config.engine_config.unblock_interval,
306            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
307            // Push-based DAG promotion (Batch C item 6). Enabled by
308            // default with the server's Valkey endpoint; the engine
309            // spawns a dedicated RESP3 client for SUBSCRIBE.
310            completion_listener: Some(ff_engine::CompletionListenerConfig {
311                addresses: vec![(config.host.clone(), config.port)],
312                tls: config.tls,
313                cluster: config.cluster,
314            }),
315            flow_projector_interval: config.engine_config.flow_projector_interval,
316            execution_deadline_interval: config.engine_config.execution_deadline_interval,
317        };
318        // Engine scanners keep running on the MAIN `client` mux — NOT on
319        // `tail_client`. Scanner cadence is foreground-latency-coupled by
320        // design (an incident that blocks all FCALLs should also visibly
321        // block scanners), and keeping scanners off the tail client means a
322        // long-poll tail can never starve lease-expiry, retention-trim,
323        // etc. Do not change this without revisiting RFC-006 Impl Notes.
324        let engine = Engine::start(engine_cfg, client.clone());
325
326        // Dedicated tail client. Built AFTER library load + HMAC install
327        // because those steps use the main client; tail client only runs
328        // XREAD/XREAD BLOCK + HMGET on stream_meta, so it never needs the
329        // library loaded — but we build it on the same host/port/TLS
330        // options so network reachability is identical.
331        tracing::info!("opening dedicated tail connection");
332        let mut tail_builder = ClientBuilder::new()
333            .host(&config.host, config.port)
334            .connect_timeout(Duration::from_secs(10))
335            // `request_timeout` is ignored for XREAD BLOCK (ferriskey
336            // auto-extends to `block_ms + 500ms` for blocking commands),
337            // but is used for the companion HMGET — 5s matches main.
338            .request_timeout(Duration::from_millis(5000));
339        if config.tls {
340            tail_builder = tail_builder.tls();
341        }
342        if config.cluster {
343            tail_builder = tail_builder.cluster();
344        }
345        let tail_client = tail_builder
346            .build()
347            .await
348            .map_err(|e| ServerError::ValkeyContext {
349                source: e,
350                context: "connect (tail)".into(),
351            })?;
352        let tail_pong: String = tail_client
353            .cmd("PING")
354            .execute()
355            .await
356            .map_err(|e| ServerError::ValkeyContext {
357                source: e,
358                context: "PING (tail)".into(),
359            })?;
360        if tail_pong != "PONG" {
361            return Err(ServerError::OperationFailed(format!(
362                "tail client unexpected PING response: {tail_pong}"
363            )));
364        }
365
366        let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
367            config.max_concurrent_stream_ops as usize,
368        ));
369        let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
370        tracing::info!(
371            max_concurrent_stream_ops = config.max_concurrent_stream_ops,
372            "stream-op client ready (read + tail share the semaphore; \
373             tails additionally serialize via xread_block_lock)"
374        );
375
376        // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
377        // rotation, etc.) are only protected by the global Bearer
378        // middleware in api.rs — which is only installed when
379        // config.api_token is set. Without FF_API_TOKEN, a public
380        // deployment exposes secret rotation to anyone that can reach
381        // the listen_addr. Warn loudly so operators can't miss it; we
382        // don't fail-start because single-tenant dev uses this path.
383        if config.api_token.is_none() {
384            tracing::warn!(
385                listen_addr = %config.listen_addr,
386                "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
387                 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
388                 FF_API_TOKEN for any deployment reachable from untrusted \
389                 networks."
390            );
391            // Explicit callout for the credential-bearing read endpoints.
392            // Auditors grep for these on a per-endpoint basis; lumping
393            // into the admin warning alone hides the fact that
394            // /pending-waitpoints returns HMAC tokens and /result
395            // returns completion payload bytes.
396            tracing::warn!(
397                listen_addr = %config.listen_addr,
398                "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
399                 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
400                 and GET /v1/executions/{{id}}/result returns raw completion payload \
401                 bytes (may contain PII). Both are UNAUTHENTICATED in this \
402                 configuration."
403            );
404        }
405
406        // Partition counts — post-RFC-011 there are three physical families.
407        // Execution keys co-locate with their parent flow's partition (§2 of
408        // the RFC), so `num_flow_partitions` governs both exec and flow
409        // routing; no separate `num_execution_partitions` count exists.
410        tracing::info!(
411            flow_partitions = config.partition_config.num_flow_partitions,
412            budget_partitions = config.partition_config.num_budget_partitions,
413            quota_partitions = config.partition_config.num_quota_partitions,
414            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
415            listen_addr = %config.listen_addr,
416            "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
417            config.partition_config.num_flow_partitions,
418            config.partition_config.num_budget_partitions,
419            config.partition_config.num_quota_partitions,
420        );
421
422        Ok(Self {
423            client,
424            tail_client,
425            stream_semaphore,
426            xread_block_lock,
427            // Single-permit semaphore: only one rotate-waitpoint-secret can
428            // be mid-flight server-wide. Attackers or misbehaving scripts
429            // firing parallel rotations fail fast with 429 instead of
430            // queueing HTTP handlers.
431            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
432            engine,
433            config,
434            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
435        })
436    }
437
438    /// Get a reference to the ferriskey client.
439    pub fn client(&self) -> &Client {
440        &self.client
441    }
442
443    /// Execute an FCALL with automatic Lua library reload on "function not loaded".
444    ///
445    /// After a Valkey failover the new primary may not have the Lua library
446    /// loaded (replication lag or cold replica). This wrapper detects that
447    /// condition, reloads the library via `ff_script::loader::ensure_library`,
448    /// and retries the FCALL once.
449    async fn fcall_with_reload(
450        &self,
451        function: &str,
452        keys: &[&str],
453        args: &[&str],
454    ) -> Result<Value, ServerError> {
455        fcall_with_reload_on_client(&self.client, function, keys, args).await
456    }
457
458    /// Get the server config.
459    pub fn config(&self) -> &ServerConfig {
460        &self.config
461    }
462
463    /// Get the partition config.
464    pub fn partition_config(&self) -> &PartitionConfig {
465        &self.config.partition_config
466    }
467
468    // ── Minimal Phase 1 API ──
469
470    /// Create a new execution.
471    ///
472    /// Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.
473    pub async fn create_execution(
474        &self,
475        args: &CreateExecutionArgs,
476    ) -> Result<CreateExecutionResult, ServerError> {
477        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
478        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
479        let idx = IndexKeys::new(&partition);
480
481        let lane = &args.lane_id;
482        let tag = partition.hash_tag();
483        let idem_key = match &args.idempotency_key {
484            Some(k) if !k.is_empty() => {
485                keys::idempotency_key(&tag, args.namespace.as_str(), k)
486            }
487            _ => ctx.noop(),
488        };
489
490        let delay_str = args
491            .delay_until
492            .map(|d| d.0.to_string())
493            .unwrap_or_default();
494        let is_delayed = !delay_str.is_empty();
495
496        // KEYS (8) must match lua/execution.lua ff_create_execution positional order:
497        //   [1] exec_core, [2] payload, [3] policy, [4] tags,
498        //   [5] scheduling_zset (eligible OR delayed — ONE key),
499        //   [6] idem_key, [7] execution_deadline, [8] all_executions
500        let scheduling_zset = if is_delayed {
501            idx.lane_delayed(lane)
502        } else {
503            idx.lane_eligible(lane)
504        };
505
506        let fcall_keys: Vec<String> = vec![
507            ctx.core(),                  // 1
508            ctx.payload(),               // 2
509            ctx.policy(),                // 3
510            ctx.tags(),                  // 4
511            scheduling_zset,             // 5
512            idem_key,                    // 6
513            idx.execution_deadline(),    // 7
514            idx.all_executions(),        // 8
515        ];
516
517        let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
518
519        // ARGV (13) must match lua/execution.lua ff_create_execution positional order:
520        //   [1] execution_id, [2] namespace, [3] lane_id, [4] execution_kind,
521        //   [5] priority, [6] creator_identity, [7] policy_json,
522        //   [8] input_payload, [9] delay_until, [10] dedup_ttl_ms,
523        //   [11] tags_json, [12] execution_deadline_at, [13] partition_id
524        let fcall_args: Vec<String> = vec![
525            args.execution_id.to_string(),           // 1
526            args.namespace.to_string(),              // 2
527            args.lane_id.to_string(),                // 3
528            args.execution_kind.clone(),             // 4
529            args.priority.to_string(),               // 5
530            args.creator_identity.clone(),           // 6
531            args.policy.as_ref()
532                .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
533                .unwrap_or_else(|| "{}".to_owned()), // 7
534            String::from_utf8_lossy(&args.input_payload).into_owned(), // 8
535            delay_str,                               // 9
536            args.idempotency_key.as_ref()
537                .map(|_| "86400000".to_string())
538                .unwrap_or_default(),                // 10 dedup_ttl_ms
539            tags_json,                               // 11
540            args.execution_deadline_at
541                .map(|d| d.to_string())
542                .unwrap_or_default(),                // 12 execution_deadline_at
543            args.partition_id.to_string(),           // 13
544        ];
545
546        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
547        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
548
549        let raw: Value = self
550            .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
551            .await?;
552
553        parse_create_result(&raw, &args.execution_id)
554    }
555
556    /// Cancel an execution.
557    pub async fn cancel_execution(
558        &self,
559        args: &CancelExecutionArgs,
560    ) -> Result<CancelExecutionResult, ServerError> {
561        let raw = self
562            .fcall_cancel_execution_with_reload(args)
563            .await?;
564        parse_cancel_result(&raw, &args.execution_id)
565    }
566
567    /// Build KEYS/ARGV for `ff_cancel_execution` and invoke via the server's
568    /// reload-capable FCALL. Shared by the inline method and background
569    /// cancel_flow dispatch via [`Self::fcall_cancel_execution_with_reload`].
570    async fn fcall_cancel_execution_with_reload(
571        &self,
572        args: &CancelExecutionArgs,
573    ) -> Result<Value, ServerError> {
574        let (keys, argv) = build_cancel_execution_fcall(
575            &self.client,
576            &self.config.partition_config,
577            args,
578        )
579        .await?;
580        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
581        let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
582        self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
583    }
584
585    /// Get the public state of an execution.
586    ///
587    /// Reads `public_state` from the exec_core hash. Returns the parsed
588    /// PublicState enum. If the execution is not found, returns an error.
589    pub async fn get_execution_state(
590        &self,
591        execution_id: &ExecutionId,
592    ) -> Result<PublicState, ServerError> {
593        let partition = execution_partition(execution_id, &self.config.partition_config);
594        let ctx = ExecKeyContext::new(&partition, execution_id);
595
596        let state_str: Option<String> = self
597            .client
598            .hget(&ctx.core(), "public_state")
599            .await
600            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET public_state".into() })?;
601
602        match state_str {
603            Some(s) => {
604                let quoted = format!("\"{s}\"");
605                serde_json::from_str(&quoted).map_err(|e| {
606                    ServerError::Script(format!(
607                        "invalid public_state '{s}' for {execution_id}: {e}"
608                    ))
609                })
610            }
611            None => Err(ServerError::NotFound(format!(
612                "execution not found: {execution_id}"
613            ))),
614        }
615    }
616
617    /// Read the raw result payload written by `ff_complete_execution`.
618    ///
619    /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
620    /// No FCALL — this is a direct GET; returns `Ok(None)` when the
621    /// execution is missing, not yet complete, or (in a future
622    /// retention-policy world) when the result was trimmed.
623    ///
624    /// # Contract vs `get_execution_state`
625    ///
626    /// `get_execution_state` is the authoritative completion signal. If
627    /// a caller observes `state == completed` but `get_execution_result`
628    /// returns `None`, the result bytes are unavailable — not a caller
629    /// bug and not a server bug, just the retention policy trimming the
630    /// blob. V1 sets no retention, so callers on v1 can treat
631    /// `state == completed` + `Ok(None)` as a server bug.
632    ///
633    /// # Ordering
634    ///
635    /// Callers MUST wait for `state == completed` before calling this
636    /// method; polls issued before the state transition may hit a
637    /// narrow window where the completion Lua has written
638    /// `public_state = completed` but the `result` key SET is still
639    /// on-wire. The current Lua `ff_complete_execution` writes both in
640    /// the same atomic script, so the window is effectively zero for
641    /// direct callers — but retries via `ff_replay_execution` open it
642    /// briefly.
643    pub async fn get_execution_result(
644        &self,
645        execution_id: &ExecutionId,
646    ) -> Result<Option<Vec<u8>>, ServerError> {
647        let partition = execution_partition(execution_id, &self.config.partition_config);
648        let ctx = ExecKeyContext::new(&partition, execution_id);
649
650        // Binary-safe read. Decoding into `String` would reject any
651        // non-UTF-8 payload (bincode-encoded f32 vectors, compressed
652        // artifacts, arbitrary binary stages) with a ferriskey decode
653        // error that surfaces as HTTP 500. The endpoint response is
654        // already `application/octet-stream`; `Vec<u8>` has a
655        // ferriskey-specialized `FromValue` impl that preserves raw
656        // bytes without UTF-8 validation (see ferriskey/src/value.rs:
657        // `from_byte_vec`). Nil → None via the blanket
658        // `Option<T: FromValue>` wrapper.
659        let payload: Option<Vec<u8>> = self
660            .client
661            .cmd("GET")
662            .arg(ctx.result())
663            .execute()
664            .await
665            .map_err(|e| ServerError::ValkeyContext {
666                source: e,
667                context: "GET exec result".into(),
668            })?;
669        Ok(payload)
670    }
671
672    /// List the active (`pending` or `active`) waitpoints for an execution.
673    ///
674    /// Returns one [`PendingWaitpointInfo`] per open waitpoint, including the
675    /// HMAC-SHA1 `waitpoint_token` needed to deliver authenticated signals.
676    /// `closed` waitpoints are elided — callers looking at history should
677    /// read the stream or lease history instead.
678    ///
679    /// Read plan: SSCAN `ctx.waitpoints()` with `COUNT 100` (bounded
680    /// page size, matching the unblock / flow-projector / budget-
681    /// reconciler convention) to enumerate waitpoint IDs, then TWO
682    /// pipelines:
683    ///
684    /// * Pass 1 — single round-trip containing, per waitpoint, one
685    ///   HMGET over the documented field set + one HGET for
686    ///   `total_matchers` on the condition hash.
687    /// * Pass 2 (conditional) — single round-trip containing an
688    ///   HMGET per waitpoint with `total_matchers > 0` to read the
689    ///   `matcher:N:name` fields.
690    ///
691    /// No FCALL — this is a read-only view built from already-
692    /// persisted state, so skipping Lua keeps the Valkey single-
693    /// writer path uncontended. HMGET (vs HGETALL) bounds the per-
694    /// waitpoint read to the documented field set and defends
695    /// against a poisoned waitpoint hash with unbounded extra fields
696    /// accumulating response memory.
697    ///
698    /// # Empty result semantics (TOCTOU)
699    ///
700    /// An empty `Vec` is returned in three cases:
701    ///
702    /// 1. The execution exists but has never suspended.
703    /// 2. All existing waitpoints are `closed` (already resolved).
704    /// 3. A narrow teardown race: `SSCAN` read the waitpoint set after
705    ///    a concurrent `ff_close_waitpoint` or execution-cleanup script
706    ///    deleted the waitpoint hashes but before it SREM'd the set
707    ///    members. Each HMGET returns all-None and we skip.
708    ///
709    /// Callers that get an unexpected empty list should cross-check
710    /// execution state (`get_execution_state`) to distinguish "pipeline
711    /// moved past suspended" from "nothing to review yet".
712    ///
713    /// A waitpoint hash that's present but missing its `waitpoint_token`
714    /// field is similarly elided and a server-side WARN is emitted —
715    /// this indicates storage corruption (a write that half-populated
716    /// the hash) and operators should investigate.
717    pub async fn list_pending_waitpoints(
718        &self,
719        execution_id: &ExecutionId,
720    ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
721        let partition = execution_partition(execution_id, &self.config.partition_config);
722        let ctx = ExecKeyContext::new(&partition, execution_id);
723
724        let core_exists: bool = self
725            .client
726            .cmd("EXISTS")
727            .arg(ctx.core())
728            .execute()
729            .await
730            .map_err(|e| ServerError::ValkeyContext {
731                source: e,
732                context: "EXISTS exec_core (pending waitpoints)".into(),
733            })?;
734        if !core_exists {
735            return Err(ServerError::NotFound(format!(
736                "execution not found: {execution_id}"
737            )));
738        }
739
740        // SSCAN the waitpoints set in bounded pages. SMEMBERS would
741        // load the entire set in one reply — fine for today's
742        // single-waitpoint executions, but the codebase convention
743        // (budget_reconciler, flow_projector, unblock scanner) is SSCAN
744        // COUNT 100 so response size per round-trip is bounded as the
745        // set grows. Match the precedent instead of carving a new
746        // pattern here.
747        const WAITPOINTS_SSCAN_COUNT: usize = 100;
748        let waitpoints_key = ctx.waitpoints();
749        let mut wp_ids_raw: Vec<String> = Vec::new();
750        let mut cursor: String = "0".to_owned();
751        loop {
752            let reply: (String, Vec<String>) = self
753                .client
754                .cmd("SSCAN")
755                .arg(&waitpoints_key)
756                .arg(&cursor)
757                .arg("COUNT")
758                .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
759                .execute()
760                .await
761                .map_err(|e| ServerError::ValkeyContext {
762                    source: e,
763                    context: "SSCAN waitpoints".into(),
764                })?;
765            cursor = reply.0;
766            wp_ids_raw.extend(reply.1);
767            if cursor == "0" {
768                break;
769            }
770        }
771
772        // SSCAN may observe a member more than once across iterations —
773        // that is documented Valkey behavior, not a bug (see
774        // https://valkey.io/commands/sscan/). Dedup in-place before
775        // building the typed id list so the HTTP response and the
776        // downstream pipelined HMGETs don't duplicate work. Sort +
777        // dedup keeps this O(n log n) without a BTreeSet allocation;
778        // typical n is 1 (the single waitpoint on a suspended exec).
779        wp_ids_raw.sort_unstable();
780        wp_ids_raw.dedup();
781
782        if wp_ids_raw.is_empty() {
783            return Ok(Vec::new());
784        }
785
786        // Parse + filter out malformed waitpoint_ids up-front so the
787        // downstream pipeline indexes stay aligned to the Vec of
788        // typed ids.
789        let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
790        for raw in &wp_ids_raw {
791            match WaitpointId::parse(raw) {
792                Ok(id) => wp_ids.push(id),
793                Err(e) => tracing::warn!(
794                    raw_id = %raw,
795                    error = %e,
796                    execution_id = %execution_id,
797                    "list_pending_waitpoints: skipping unparseable waitpoint_id"
798                ),
799            }
800        }
801        if wp_ids.is_empty() {
802            return Ok(Vec::new());
803        }
804
805        // Bounded HMGET field set — these are the six hash fields that
806        // surface in `PendingWaitpointInfo`. Fixed-size indexing into
807        // the response below tracks this order.
808        const WP_FIELDS: [&str; 6] = [
809            "state",
810            "waitpoint_key",
811            "waitpoint_token",
812            "created_at",
813            "activated_at",
814            "expires_at",
815        ];
816
817        // Pass 1 pipeline: waitpoint HMGET + condition HGET
818        // total_matchers, one of each per waitpoint, in a SINGLE
819        // round-trip. Sequential HMGETs were an N-round-trip latency
820        // floor on flows with fan-out waitpoints.
821        let mut pass1 = self.client.pipeline();
822        let mut wp_slots = Vec::with_capacity(wp_ids.len());
823        let mut cond_slots = Vec::with_capacity(wp_ids.len());
824        for wp_id in &wp_ids {
825            let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
826            cmd = cmd.arg(ctx.waitpoint(wp_id));
827            for f in WP_FIELDS {
828                cmd = cmd.arg(f);
829            }
830            wp_slots.push(cmd.finish());
831
832            cond_slots.push(
833                pass1
834                    .cmd::<Option<String>>("HGET")
835                    .arg(ctx.waitpoint_condition(wp_id))
836                    .arg("total_matchers")
837                    .finish(),
838            );
839        }
840        pass1
841            .execute()
842            .await
843            .map_err(|e| ServerError::ValkeyContext {
844                source: e,
845                context: "pipeline HMGET waitpoints + HGET total_matchers".into(),
846            })?;
847
848        // Collect pass-1 results + queue pass-2 HMGETs for condition
849        // matcher names on waitpoints that are actionable (state in
850        // pending/active, non-empty token, total_matchers > 0). PipeSlot
851        // values are owning, so iterate all three ordered zip'd iters
852        // and consume them together.
853        struct Kept {
854            wp_id: WaitpointId,
855            wp_fields: Vec<Option<String>>,
856            total_matchers: usize,
857        }
858        let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
859        for ((wp_id, wp_slot), cond_slot) in wp_ids
860            .iter()
861            .zip(wp_slots)
862            .zip(cond_slots)
863        {
864            let wp_fields: Vec<Option<String>> =
865                wp_slot.value().map_err(|e| ServerError::ValkeyContext {
866                    source: e,
867                    context: format!("pipeline slot HMGET waitpoint {wp_id}"),
868                })?;
869
870            // Waitpoint hash may have been GC'd between the SSCAN and
871            // this HMGET (rotation / cleanup race). Skip silently.
872            if wp_fields.iter().all(Option::is_none) {
873                // Still consume the cond_slot to free its buffer.
874                let _ = cond_slot.value();
875                continue;
876            }
877            let state_ref = wp_fields
878                .first()
879                .and_then(|v| v.as_deref())
880                .unwrap_or("");
881            if state_ref != "pending" && state_ref != "active" {
882                let _ = cond_slot.value();
883                continue;
884            }
885            let token_ref = wp_fields
886                .get(2)
887                .and_then(|v| v.as_deref())
888                .unwrap_or("");
889            if token_ref.is_empty() {
890                let _ = cond_slot.value();
891                tracing::warn!(
892                    waitpoint_id = %wp_id,
893                    execution_id = %execution_id,
894                    waitpoint_hash_key = %ctx.waitpoint(wp_id),
895                    state = %state_ref,
896                    "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
897                     field is empty — likely storage corruption (half-populated write, \
898                     operator edit, or interrupted script). Skipping this entry in the \
899                     response. HGETALL the waitpoint_hash_key to inspect."
900                );
901                continue;
902            }
903
904            let total_matchers = cond_slot
905                .value()
906                .map_err(|e| ServerError::ValkeyContext {
907                    source: e,
908                    context: format!("pipeline slot HGET total_matchers {wp_id}"),
909                })?
910                .and_then(|s| s.parse::<usize>().ok())
911                .unwrap_or(0);
912
913            kept.push(Kept {
914                wp_id: wp_id.clone(),
915                wp_fields,
916                total_matchers,
917            });
918        }
919
920        if kept.is_empty() {
921            return Ok(Vec::new());
922        }
923
924        // Pass 2 pipeline: matcher-name HMGETs for every kept waitpoint
925        // with total_matchers > 0. Single round-trip regardless of
926        // per-waitpoint matcher count. Waitpoints with total_matchers==0
927        // (wildcard) skip the HMGET entirely.
928        let mut pass2 = self.client.pipeline();
929        let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
930        let mut pass2_needed = false;
931        for k in &kept {
932            if k.total_matchers == 0 {
933                matcher_slots.push(None);
934                continue;
935            }
936            pass2_needed = true;
937            let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
938            cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
939            for i in 0..k.total_matchers {
940                cmd = cmd.arg(format!("matcher:{i}:name"));
941            }
942            matcher_slots.push(Some(cmd.finish()));
943        }
944        if pass2_needed {
945            pass2.execute().await.map_err(|e| ServerError::ValkeyContext {
946                source: e,
947                context: "pipeline HMGET wp_condition matchers".into(),
948            })?;
949        }
950
951        let parse_ts = |raw: &str| -> Option<TimestampMs> {
952            if raw.is_empty() {
953                None
954            } else {
955                raw.parse::<i64>().ok().map(TimestampMs)
956            }
957        };
958
959        let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
960        for (k, slot) in kept.into_iter().zip(matcher_slots) {
961            let get = |i: usize| -> &str {
962                k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
963            };
964
965            // Collect matcher names, eliding the empty-name wildcard
966            // sentinel (see lua/helpers.lua initialize_condition).
967            let required_signal_names: Vec<String> = match slot {
968                None => Vec::new(),
969                Some(s) => {
970                    let vals: Vec<Option<String>> =
971                        s.value().map_err(|e| ServerError::ValkeyContext {
972                            source: e,
973                            context: format!(
974                                "pipeline slot HMGET wp_condition matchers {}",
975                                k.wp_id
976                            ),
977                        })?;
978                    vals.into_iter()
979                        .flatten()
980                        .filter(|name| !name.is_empty())
981                        .collect()
982                }
983            };
984
985            out.push(PendingWaitpointInfo {
986                waitpoint_id: k.wp_id,
987                waitpoint_key: get(1).to_owned(),
988                state: get(0).to_owned(),
989                waitpoint_token: WaitpointToken(get(2).to_owned()),
990                required_signal_names,
991                created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
992                activated_at: parse_ts(get(4)),
993                expires_at: parse_ts(get(5)),
994            });
995        }
996
997        Ok(out)
998    }
999
1000    // ── Budget / Quota API ──
1001
1002    /// Create a new budget policy.
1003    pub async fn create_budget(
1004        &self,
1005        args: &CreateBudgetArgs,
1006    ) -> Result<CreateBudgetResult, ServerError> {
1007        let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1008        let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1009        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1010        let policies_index = keys::budget_policies_index(bctx.hash_tag());
1011
1012        // KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,
1013        //           budget_policies_index
1014        let fcall_keys: Vec<String> = vec![
1015            bctx.definition(),
1016            bctx.limits(),
1017            bctx.usage(),
1018            resets_key,
1019            policies_index,
1020        ];
1021
1022        // ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,
1023        //   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,
1024        //   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N
1025        let dim_count = args.dimensions.len();
1026        let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1027        fcall_args.push(args.budget_id.to_string());
1028        fcall_args.push(args.scope_type.clone());
1029        fcall_args.push(args.scope_id.clone());
1030        fcall_args.push(args.enforcement_mode.clone());
1031        fcall_args.push(args.on_hard_limit.clone());
1032        fcall_args.push(args.on_soft_limit.clone());
1033        fcall_args.push(args.reset_interval_ms.to_string());
1034        fcall_args.push(args.now.to_string());
1035        fcall_args.push(dim_count.to_string());
1036        for dim in &args.dimensions {
1037            fcall_args.push(dim.clone());
1038        }
1039        for hard in &args.hard_limits {
1040            fcall_args.push(hard.to_string());
1041        }
1042        for soft in &args.soft_limits {
1043            fcall_args.push(soft.to_string());
1044        }
1045
1046        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1047        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1048
1049        let raw: Value = self
1050            .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1051            .await?;
1052
1053        parse_budget_create_result(&raw, &args.budget_id)
1054    }
1055
1056    /// Create a new quota/rate-limit policy.
1057    pub async fn create_quota_policy(
1058        &self,
1059        args: &CreateQuotaPolicyArgs,
1060    ) -> Result<CreateQuotaPolicyResult, ServerError> {
1061        let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1062        let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1063
1064        // KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,
1065        //           admitted_set, quota_policies_index
1066        let fcall_keys: Vec<String> = vec![
1067            qctx.definition(),
1068            qctx.window("requests_per_window"),
1069            qctx.concurrency(),
1070            qctx.admitted_set(),
1071            keys::quota_policies_index(qctx.hash_tag()),
1072        ];
1073
1074        // ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,
1075        //           max_concurrent, now_ms
1076        let fcall_args: Vec<String> = vec![
1077            args.quota_policy_id.to_string(),
1078            args.window_seconds.to_string(),
1079            args.max_requests_per_window.to_string(),
1080            args.max_concurrent.to_string(),
1081            args.now.to_string(),
1082        ];
1083
1084        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1085        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1086
1087        let raw: Value = self
1088            .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1089            .await?;
1090
1091        parse_quota_create_result(&raw, &args.quota_policy_id)
1092    }
1093
1094    /// Read-only budget status for operator visibility.
1095    pub async fn get_budget_status(
1096        &self,
1097        budget_id: &BudgetId,
1098    ) -> Result<BudgetStatus, ServerError> {
1099        let partition = budget_partition(budget_id, &self.config.partition_config);
1100        let bctx = BudgetKeyContext::new(&partition, budget_id);
1101
1102        // Read budget definition
1103        let def: HashMap<String, String> = self
1104            .client
1105            .hgetall(&bctx.definition())
1106            .await
1107            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_def".into() })?;
1108
1109        if def.is_empty() {
1110            return Err(ServerError::NotFound(format!(
1111                "budget not found: {budget_id}"
1112            )));
1113        }
1114
1115        // Read usage
1116        let usage_raw: HashMap<String, String> = self
1117            .client
1118            .hgetall(&bctx.usage())
1119            .await
1120            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_usage".into() })?;
1121        let usage: HashMap<String, u64> = usage_raw
1122            .into_iter()
1123            .filter(|(k, _)| k != "_init")
1124            .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1125            .collect();
1126
1127        // Read limits
1128        let limits_raw: HashMap<String, String> = self
1129            .client
1130            .hgetall(&bctx.limits())
1131            .await
1132            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_limits".into() })?;
1133        let mut hard_limits = HashMap::new();
1134        let mut soft_limits = HashMap::new();
1135        for (k, v) in &limits_raw {
1136            if let Some(dim) = k.strip_prefix("hard:") {
1137                hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1138            } else if let Some(dim) = k.strip_prefix("soft:") {
1139                soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1140            }
1141        }
1142
1143        let non_empty = |s: Option<&String>| -> Option<String> {
1144            s.filter(|v| !v.is_empty()).cloned()
1145        };
1146
1147        Ok(BudgetStatus {
1148            budget_id: budget_id.to_string(),
1149            scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1150            scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1151            enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1152            usage,
1153            hard_limits,
1154            soft_limits,
1155            breach_count: def
1156                .get("breach_count")
1157                .and_then(|v| v.parse().ok())
1158                .unwrap_or(0),
1159            soft_breach_count: def
1160                .get("soft_breach_count")
1161                .and_then(|v| v.parse().ok())
1162                .unwrap_or(0),
1163            last_breach_at: non_empty(def.get("last_breach_at")),
1164            last_breach_dim: non_empty(def.get("last_breach_dim")),
1165            next_reset_at: non_empty(def.get("next_reset_at")),
1166            created_at: non_empty(def.get("created_at")),
1167        })
1168    }
1169
1170    /// Report usage against a budget and check limits.
1171    pub async fn report_usage(
1172        &self,
1173        budget_id: &BudgetId,
1174        args: &ReportUsageArgs,
1175    ) -> Result<ReportUsageResult, ServerError> {
1176        let partition = budget_partition(budget_id, &self.config.partition_config);
1177        let bctx = BudgetKeyContext::new(&partition, budget_id);
1178
1179        // KEYS (3): budget_usage, budget_limits, budget_def
1180        let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1181
1182        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
1183        let dim_count = args.dimensions.len();
1184        let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1185        fcall_args.push(dim_count.to_string());
1186        for dim in &args.dimensions {
1187            fcall_args.push(dim.clone());
1188        }
1189        for delta in &args.deltas {
1190            fcall_args.push(delta.to_string());
1191        }
1192        fcall_args.push(args.now.to_string());
1193        let dedup_key_val = args
1194            .dedup_key
1195            .as_ref()
1196            .filter(|k| !k.is_empty())
1197            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1198            .unwrap_or_default();
1199        fcall_args.push(dedup_key_val);
1200
1201        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1202        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1203
1204        let raw: Value = self
1205            .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1206            .await?;
1207
1208        parse_report_usage_result(&raw)
1209    }
1210
1211    /// Reset a budget's usage counters and schedule the next reset.
1212    pub async fn reset_budget(
1213        &self,
1214        budget_id: &BudgetId,
1215    ) -> Result<ResetBudgetResult, ServerError> {
1216        let partition = budget_partition(budget_id, &self.config.partition_config);
1217        let bctx = BudgetKeyContext::new(&partition, budget_id);
1218        let resets_key = keys::budget_resets_key(bctx.hash_tag());
1219
1220        // KEYS (3): budget_def, budget_usage, budget_resets_zset
1221        let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1222
1223        // ARGV (2): budget_id, now_ms
1224        let now = TimestampMs::now();
1225        let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1226
1227        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1228        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1229
1230        let raw: Value = self
1231            .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1232            .await?;
1233
1234        parse_reset_budget_result(&raw)
1235    }
1236
1237    // ── Flow API ──
1238
1239    /// Create a new flow container.
1240    pub async fn create_flow(
1241        &self,
1242        args: &CreateFlowArgs,
1243    ) -> Result<CreateFlowResult, ServerError> {
1244        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1245        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1246        let fidx = FlowIndexKeys::new(&partition);
1247
1248        // KEYS (3): flow_core, members_set, flow_index
1249        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1250
1251        // ARGV (4): flow_id, flow_kind, namespace, now_ms
1252        let fcall_args: Vec<String> = vec![
1253            args.flow_id.to_string(),
1254            args.flow_kind.clone(),
1255            args.namespace.to_string(),
1256            args.now.to_string(),
1257        ];
1258
1259        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1260        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1261
1262        let raw: Value = self
1263            .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1264            .await?;
1265
1266        parse_create_flow_result(&raw, &args.flow_id)
1267    }
1268
1269    /// Add an execution to a flow.
1270    ///
1271    /// # Atomic single-FCALL commit (RFC-011 §7.3)
1272    ///
1273    /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
1274    /// hash-tag routing (both hash to `{fp:N}` via the exec id's
1275    /// embedded partition). A single atomic FCALL writes:
1276    ///
1277    ///   - `members_set` SADD (flow membership)
1278    ///   - `exec_core.flow_id` HSET (back-pointer)
1279    ///   - `flow_index` SADD (self-heal)
1280    ///   - `flow_core` HINCRBY node_count / graph_revision +
1281    ///     HSET last_mutation_at
1282    ///
1283    /// All four writes commit atomically or none do (Valkey scripting
1284    /// contract: validates-before-writing in the Lua body means
1285    /// `flow_not_found` / `flow_already_terminal` early-returns fire
1286    /// BEFORE any `redis.call()` mutation, and a mid-body error after
1287    /// writes is not expected because all writes are on the same slot).
1288    ///
1289    /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
1290    ///
1291    /// # Consumer contract
1292    ///
1293    /// The caller's `args.execution_id` **must** be co-located with
1294    /// `args.flow_id`'s partition — i.e. minted via
1295    /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
1296    /// `solo`-minted id (or any exec id hashing to a different
1297    /// `{fp:N}` than the flow's) will fail at the Valkey level with
1298    /// `CROSSSLOT` on a clustered deploy.
1299    ///
1300    /// Callers with a flow context in scope always use `for_flow`;
1301    /// this is the only supported mint path for flow-member execs
1302    /// post-RFC-011. Test fixtures that pre-date the co-location
1303    /// contract use `TestCluster::new_execution_id_on_partition` to
1304    /// pin to a specific hash-tag index for `fcall_create_flow`-style
1305    /// helpers that hard-code their flow partition.
1306    pub async fn add_execution_to_flow(
1307        &self,
1308        args: &AddExecutionToFlowArgs,
1309    ) -> Result<AddExecutionToFlowResult, ServerError> {
1310        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1311        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1312        let fidx = FlowIndexKeys::new(&partition);
1313
1314        // exec_core co-locates with flow_core under RFC-011 §7.3 —
1315        // same `{fp:N}` hash-tag, same slot, part of the same atomic
1316        // FCALL below.
1317        let exec_partition =
1318            execution_partition(&args.execution_id, &self.config.partition_config);
1319        let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1320
1321        // Pre-flight: exec_partition must match flow_partition under
1322        // RFC-011 §7.3 co-location contract. If the caller hands us a
1323        // `solo`-minted exec whose hash-tag ≠ flow_partition, the FCALL
1324        // would fail with raw `CROSSSLOT` on a clustered deploy — a
1325        // typed `ServerError::PartitionMismatch` is a clearer signal
1326        // that the consumer-contract invariant was violated at mint
1327        // time (caller should have used `ExecutionId::for_flow(...)`).
1328        // See the Consumer contract section of this method's rustdoc.
1329        if exec_partition.index != partition.index {
1330            return Err(ServerError::PartitionMismatch(format!(
1331                "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1332                 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1333                 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1334                exec_p = exec_partition.index,
1335                flow_p = partition.index,
1336            )));
1337        }
1338
1339        // KEYS (4): flow_core, members_set, flow_index, exec_core
1340        let fcall_keys: Vec<String> = vec![
1341            fctx.core(),
1342            fctx.members(),
1343            fidx.flow_index(),
1344            ectx.core(),
1345        ];
1346
1347        // ARGV (3): flow_id, execution_id, now_ms
1348        let fcall_args: Vec<String> = vec![
1349            args.flow_id.to_string(),
1350            args.execution_id.to_string(),
1351            args.now.to_string(),
1352        ];
1353
1354        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1355        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1356
1357        let raw: Value = self
1358            .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1359            .await?;
1360
1361        parse_add_execution_to_flow_result(&raw)
1362    }
1363
1364    /// Cancel a flow.
1365    ///
1366    /// Flips `public_flow_state` to `cancelled` atomically via
1367    /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
1368    /// executions must be cancelled cross-partition; this dispatch runs in
1369    /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
1370    /// immediately. For all other policies (or flows with no members), or
1371    /// when the flow was already in a terminal state (idempotent retry),
1372    /// the call returns [`CancelFlowResult::Cancelled`].
1373    ///
1374    /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
1375    ///
1376    /// # Backpressure
1377    ///
1378    /// Each call that hits the async dispatch path spawns a new task into
1379    /// the shared background `JoinSet`. Rapid repeated calls against the
1380    /// same flow will spawn *multiple* overlapping dispatch tasks. This is
1381    /// not a correctness issue — each member cancel is idempotent and
1382    /// terminal flows short-circuit via [`ParsedCancelFlow::AlreadyTerminal`]
1383    /// — but heavy burst callers should either use `?wait=true` (serialises
1384    /// the dispatch on the HTTP thread, giving natural backpressure) or
1385    /// implement client-side deduplication on `flow_id`. The `JoinSet` is
1386    /// drained with a 15s timeout on [`Self::shutdown`], so very long
1387    /// dispatch tails may be aborted during graceful shutdown.
1388    ///
1389    /// # Orphan-member semantics on shutdown abort
1390    ///
1391    /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
1392    /// while a dispatch loop is mid-iteration, the already-issued
1393    /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
1394    /// `terminal_outcome = cancelled` and the caller-supplied reason. The
1395    /// members not yet visited are abandoned mid-loop. They remain in
1396    /// whichever state they were in (active/eligible/suspended) until the
1397    /// natural lifecycle scanners reach them: active leases expire
1398    /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
1399    /// members time out to `skipped`, eligible ones sit until retention
1400    /// trim. So no orphan state — but the terminal_outcome for the
1401    /// abandoned members will be `expired`/`skipped` rather than
1402    /// `cancelled`, and the operator-supplied `reason` is lost for them.
1403    /// Audit tooling that requires reason fidelity across shutdowns should
1404    /// use `?wait=true`.
1405    pub async fn cancel_flow(
1406        &self,
1407        args: &CancelFlowArgs,
1408    ) -> Result<CancelFlowResult, ServerError> {
1409        self.cancel_flow_inner(args, false).await
1410    }
1411
1412    /// Cancel a flow and wait for all member cancellations to complete
1413    /// inline. Slower than [`Self::cancel_flow`] for large flows, but
1414    /// guarantees every member is in a terminal state on return.
1415    pub async fn cancel_flow_wait(
1416        &self,
1417        args: &CancelFlowArgs,
1418    ) -> Result<CancelFlowResult, ServerError> {
1419        self.cancel_flow_inner(args, true).await
1420    }
1421
1422    async fn cancel_flow_inner(
1423        &self,
1424        args: &CancelFlowArgs,
1425        wait: bool,
1426    ) -> Result<CancelFlowResult, ServerError> {
1427        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1428        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1429        let fidx = FlowIndexKeys::new(&partition);
1430
1431        // KEYS (3): flow_core, members_set, flow_index
1432        let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1433
1434        // ARGV (4): flow_id, reason, cancellation_policy, now_ms
1435        let fcall_args: Vec<String> = vec![
1436            args.flow_id.to_string(),
1437            args.reason.clone(),
1438            args.cancellation_policy.clone(),
1439            args.now.to_string(),
1440        ];
1441
1442        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1443        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1444
1445        let raw: Value = self
1446            .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1447            .await?;
1448
1449        let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1450            ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1451                (policy, member_execution_ids)
1452            }
1453            // Idempotent retry: flow was already cancelled/completed/failed.
1454            // Return Cancelled with the *stored* policy and member list so
1455            // observability tooling gets the real historical state rather
1456            // than echoing the caller's retry intent. One HMGET + SMEMBERS
1457            // on the idempotent path — both on {fp:N}, same slot.
1458            ParsedCancelFlow::AlreadyTerminal => {
1459                let flow_meta: Vec<Option<String>> = self
1460                    .client
1461                    .cmd("HMGET")
1462                    .arg(fctx.core())
1463                    .arg("cancellation_policy")
1464                    .arg("cancel_reason")
1465                    .execute()
1466                    .await
1467                    .map_err(|e| ServerError::ValkeyContext {
1468                        source: e,
1469                        context: "HMGET flow_core cancellation_policy,cancel_reason".into(),
1470                    })?;
1471                let stored_policy = flow_meta
1472                    .first()
1473                    .and_then(|v| v.as_ref())
1474                    .filter(|s| !s.is_empty())
1475                    .cloned();
1476                let stored_reason = flow_meta
1477                    .get(1)
1478                    .and_then(|v| v.as_ref())
1479                    .filter(|s| !s.is_empty())
1480                    .cloned();
1481                let all_members: Vec<String> = self
1482                    .client
1483                    .cmd("SMEMBERS")
1484                    .arg(fctx.members())
1485                    .execute()
1486                    .await
1487                    .map_err(|e| ServerError::ValkeyContext {
1488                        source: e,
1489                        context: "SMEMBERS flow members (already terminal)".into(),
1490                    })?;
1491                // Cap the returned list to avoid pathological bandwidth on
1492                // idempotent retries for flows with 10k+ members. Clients
1493                // already received the authoritative member list on the
1494                // first (non-idempotent) call; subsequent retries just need
1495                // enough to confirm the operation and trigger per-member
1496                // polling if desired.
1497                let total_members = all_members.len();
1498                let stored_members: Vec<String> = all_members
1499                    .into_iter()
1500                    .take(ALREADY_TERMINAL_MEMBER_CAP)
1501                    .collect();
1502                tracing::debug!(
1503                    flow_id = %args.flow_id,
1504                    stored_policy = stored_policy.as_deref().unwrap_or(""),
1505                    stored_reason = stored_reason.as_deref().unwrap_or(""),
1506                    total_members,
1507                    returned_members = stored_members.len(),
1508                    "cancel_flow: flow already terminal, returning idempotent Cancelled"
1509                );
1510                return Ok(CancelFlowResult::Cancelled {
1511                    // Fall back to caller's policy only if the stored field
1512                    // is missing (flows cancelled by older Lua that did not
1513                    // persist cancellation_policy).
1514                    cancellation_policy: stored_policy
1515                        .unwrap_or_else(|| args.cancellation_policy.clone()),
1516                    member_execution_ids: stored_members,
1517                });
1518            }
1519        };
1520        let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1521
1522        if !needs_dispatch {
1523            return Ok(CancelFlowResult::Cancelled {
1524                cancellation_policy: policy,
1525                member_execution_ids: members,
1526            });
1527        }
1528
1529        if wait {
1530            // Synchronous dispatch — cancel every member inline before returning.
1531            for eid_str in &members {
1532                if let Err(e) = cancel_member_execution(
1533                    &self.client,
1534                    &self.config.partition_config,
1535                    eid_str,
1536                    &args.reason,
1537                    args.now,
1538                )
1539                .await
1540                {
1541                    tracing::warn!(
1542                        execution_id = %eid_str,
1543                        error = %e,
1544                        "cancel_flow(wait): individual execution cancel failed (may be terminal)"
1545                    );
1546                }
1547            }
1548            return Ok(CancelFlowResult::Cancelled {
1549                cancellation_policy: policy,
1550                member_execution_ids: members,
1551            });
1552        }
1553
1554        // Asynchronous dispatch — spawn into the shared JoinSet so Server::shutdown
1555        // can wait for pending cancellations (bounded by a shutdown timeout).
1556        let client = self.client.clone();
1557        let partition_config = self.config.partition_config;
1558        let reason = args.reason.clone();
1559        let now = args.now;
1560        let dispatch_members = members.clone();
1561        let flow_id = args.flow_id.clone();
1562        // Every async cancel_flow contends on this lock, but the critical
1563        // section is tiny: try_join_next drain + spawn. Drain is amortized
1564        // O(1) — each completed task is reaped exactly once across all
1565        // callers, and spawn is synchronous. At realistic cancel rates the
1566        // lock hold time is microseconds and does not bottleneck handlers.
1567        let mut guard = self.background_tasks.lock().await;
1568
1569        // Reap completed background dispatches before spawning the next.
1570        // Without this sweep, JoinSet accumulates Ok(()) results for every
1571        // async cancel ever issued — a memory leak in long-running servers
1572        // that would otherwise only drain on Server::shutdown. Surface any
1573        // panicked/aborted dispatches via tracing so silent failures in
1574        // cancel_member_execution are visible in logs.
1575        while let Some(joined) = guard.try_join_next() {
1576            if let Err(e) = joined {
1577                tracing::warn!(
1578                    error = %e,
1579                    "cancel_flow: background dispatch task panicked or was aborted"
1580                );
1581            }
1582        }
1583
1584        guard.spawn(async move {
1585            // Bounded parallel dispatch via futures::stream::buffer_unordered.
1586            // Sequential cancel of a 1000-member flow at ~2ms/FCALL is ~2s —
1587            // too long to finish inside a 15s shutdown abort window for
1588            // large flows. Bounding at CONCURRENCY keeps Valkey load
1589            // predictable while still cutting wall-clock dispatch time by
1590            // ~CONCURRENCY× for large member sets.
1591            use futures::stream::StreamExt;
1592            const CONCURRENCY: usize = 16;
1593
1594            let member_count = dispatch_members.len();
1595            let flow_id_for_log = flow_id.clone();
1596            futures::stream::iter(dispatch_members)
1597                .map(|eid_str| {
1598                    let client = client.clone();
1599                    let reason = reason.clone();
1600                    let flow_id = flow_id.clone();
1601                    async move {
1602                        if let Err(e) = cancel_member_execution(
1603                            &client,
1604                            &partition_config,
1605                            &eid_str,
1606                            &reason,
1607                            now,
1608                        )
1609                        .await
1610                        {
1611                            tracing::warn!(
1612                                flow_id = %flow_id,
1613                                execution_id = %eid_str,
1614                                error = %e,
1615                                "cancel_flow(async): individual execution cancel failed (may be terminal)"
1616                            );
1617                        }
1618                    }
1619                })
1620                .buffer_unordered(CONCURRENCY)
1621                .for_each(|()| async {})
1622                .await;
1623
1624            tracing::debug!(
1625                flow_id = %flow_id_for_log,
1626                member_count,
1627                concurrency = CONCURRENCY,
1628                "cancel_flow: background member dispatch complete"
1629            );
1630        });
1631        drop(guard);
1632
1633        let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1634        Ok(CancelFlowResult::CancellationScheduled {
1635            cancellation_policy: policy,
1636            member_count,
1637            member_execution_ids: members,
1638        })
1639    }
1640
1641    /// Stage a dependency edge between two executions in a flow.
1642    ///
1643    /// Runs on the flow partition {fp:N}.
1644    /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1645    pub async fn stage_dependency_edge(
1646        &self,
1647        args: &StageDependencyEdgeArgs,
1648    ) -> Result<StageDependencyEdgeResult, ServerError> {
1649        let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1650        let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1651
1652        // KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set, grant_hash
1653        let fcall_keys: Vec<String> = vec![
1654            fctx.core(),
1655            fctx.members(),
1656            fctx.edge(&args.edge_id),
1657            fctx.outgoing(&args.upstream_execution_id),
1658            fctx.incoming(&args.downstream_execution_id),
1659            fctx.grant(&args.edge_id.to_string()),
1660        ];
1661
1662        // ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,
1663        //           dependency_kind, data_passing_ref, expected_graph_revision, now_ms
1664        let fcall_args: Vec<String> = vec![
1665            args.flow_id.to_string(),
1666            args.edge_id.to_string(),
1667            args.upstream_execution_id.to_string(),
1668            args.downstream_execution_id.to_string(),
1669            args.dependency_kind.clone(),
1670            args.data_passing_ref.clone().unwrap_or_default(),
1671            args.expected_graph_revision.to_string(),
1672            args.now.to_string(),
1673        ];
1674
1675        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1676        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1677
1678        let raw: Value = self
1679            .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1680            .await?;
1681
1682        parse_stage_dependency_edge_result(&raw)
1683    }
1684
1685    /// Apply a staged dependency edge to the child execution.
1686    ///
1687    /// Runs on the child execution partition {p:N}.
1688    /// KEYS (7), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.
1689    pub async fn apply_dependency_to_child(
1690        &self,
1691        args: &ApplyDependencyToChildArgs,
1692    ) -> Result<ApplyDependencyToChildResult, ServerError> {
1693        let partition = execution_partition(
1694            &args.downstream_execution_id,
1695            &self.config.partition_config,
1696        );
1697        let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1698        let idx = IndexKeys::new(&partition);
1699
1700        // Pre-read lane_id for index keys
1701        let lane_str: Option<String> = self
1702            .client
1703            .hget(&ctx.core(), "lane_id")
1704            .await
1705            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1706        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1707
1708        // KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,
1709        //           eligible_zset, blocked_deps_zset, deps_all_edges
1710        let fcall_keys: Vec<String> = vec![
1711            ctx.core(),
1712            ctx.deps_meta(),
1713            ctx.deps_unresolved(),
1714            ctx.dep_edge(&args.edge_id),
1715            idx.lane_eligible(&lane),
1716            idx.lane_blocked_dependencies(&lane),
1717            ctx.deps_all_edges(),
1718        ];
1719
1720        // ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,
1721        //           dependency_kind, data_passing_ref, now_ms
1722        let fcall_args: Vec<String> = vec![
1723            args.flow_id.to_string(),
1724            args.edge_id.to_string(),
1725            args.upstream_execution_id.to_string(),
1726            args.graph_revision.to_string(),
1727            args.dependency_kind.clone(),
1728            args.data_passing_ref.clone().unwrap_or_default(),
1729            args.now.to_string(),
1730        ];
1731
1732        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1733        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1734
1735        let raw: Value = self
1736            .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1737            .await?;
1738
1739        parse_apply_dependency_result(&raw)
1740    }
1741
1742    // ── Execution operations API ──
1743
1744    /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1745    ///
1746    /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1747    /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1748    pub async fn deliver_signal(
1749        &self,
1750        args: &DeliverSignalArgs,
1751    ) -> Result<DeliverSignalResult, ServerError> {
1752        let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1753        let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1754        let idx = IndexKeys::new(&partition);
1755
1756        // Pre-read lane_id for index keys
1757        let lane_str: Option<String> = self
1758            .client
1759            .hget(&ctx.core(), "lane_id")
1760            .await
1761            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1762        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1763
1764        let wp_id = &args.waitpoint_id;
1765        let sig_id = &args.signal_id;
1766        let idem_key = args
1767            .idempotency_key
1768            .as_ref()
1769            .filter(|k| !k.is_empty())
1770            .map(|k| ctx.signal_dedup(wp_id, k))
1771            .unwrap_or_else(|| ctx.noop());
1772
1773        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1774        //            exec_signals_zset, signal_hash, signal_payload,
1775        //            idem_key, waitpoint_hash, suspension_current,
1776        //            eligible_zset, suspended_zset, delayed_zset,
1777        //            suspension_timeout_zset, hmac_secrets
1778        let fcall_keys: Vec<String> = vec![
1779            ctx.core(),                       // 1
1780            ctx.waitpoint_condition(wp_id),    // 2
1781            ctx.waitpoint_signals(wp_id),      // 3
1782            ctx.exec_signals(),                // 4
1783            ctx.signal(sig_id),                // 5
1784            ctx.signal_payload(sig_id),        // 6
1785            idem_key,                          // 7
1786            ctx.waitpoint(wp_id),              // 8
1787            ctx.suspension_current(),          // 9
1788            idx.lane_eligible(&lane),          // 10
1789            idx.lane_suspended(&lane),         // 11
1790            idx.lane_delayed(&lane),           // 12
1791            idx.suspension_timeout(),          // 13
1792            idx.waitpoint_hmac_secrets(),      // 14
1793        ];
1794
1795        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1796        //            signal_category, source_type, source_identity,
1797        //            payload, payload_encoding, idempotency_key,
1798        //            correlation_id, target_scope, created_at,
1799        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1800        //            max_signals_per_execution, waitpoint_token
1801        let fcall_args: Vec<String> = vec![
1802            args.signal_id.to_string(),                          // 1
1803            args.execution_id.to_string(),                       // 2
1804            args.waitpoint_id.to_string(),                       // 3
1805            args.signal_name.clone(),                            // 4
1806            args.signal_category.clone(),                        // 5
1807            args.source_type.clone(),                            // 6
1808            args.source_identity.clone(),                        // 7
1809            args.payload.as_ref()
1810                .map(|p| String::from_utf8_lossy(p).into_owned())
1811                .unwrap_or_default(),                            // 8
1812            args.payload_encoding
1813                .clone()
1814                .unwrap_or_else(|| "json".to_owned()),           // 9
1815            args.idempotency_key
1816                .clone()
1817                .unwrap_or_default(),                            // 10
1818            args.correlation_id
1819                .clone()
1820                .unwrap_or_default(),                            // 11
1821            args.target_scope.clone(),                           // 12
1822            args.created_at
1823                .map(|ts| ts.to_string())
1824                .unwrap_or_else(|| args.now.to_string()),        // 13
1825            args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), // 14
1826            args.resume_delay_ms.unwrap_or(0).to_string(),       // 15
1827            args.signal_maxlen.unwrap_or(1000).to_string(),      // 16
1828            args.max_signals_per_execution
1829                .unwrap_or(10_000)
1830                .to_string(),                                    // 17
1831            // WIRE BOUNDARY — raw token to Lua. Display is redacted
1832            // for log safety; use .as_str() at the wire crossing.
1833            args.waitpoint_token.as_str().to_owned(),            // 18
1834        ];
1835
1836        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1837        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1838
1839        let raw: Value = self
1840            .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
1841            .await?;
1842
1843        parse_deliver_signal_result(&raw, &args.signal_id)
1844    }
1845
1846    /// Change an execution's priority.
1847    ///
1848    /// KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.
1849    pub async fn change_priority(
1850        &self,
1851        execution_id: &ExecutionId,
1852        new_priority: i32,
1853    ) -> Result<ChangePriorityResult, ServerError> {
1854        let partition = execution_partition(execution_id, &self.config.partition_config);
1855        let ctx = ExecKeyContext::new(&partition, execution_id);
1856        let idx = IndexKeys::new(&partition);
1857
1858        // Read lane_id for eligible_zset key
1859        let lane_str: Option<String> = self
1860            .client
1861            .hget(&ctx.core(), "lane_id")
1862            .await
1863            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1864        let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1865
1866        // KEYS (2): exec_core, eligible_zset
1867        let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
1868
1869        // ARGV (2): execution_id, new_priority
1870        let fcall_args: Vec<String> = vec![
1871            execution_id.to_string(),
1872            new_priority.to_string(),
1873        ];
1874
1875        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1876        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1877
1878        let raw: Value = self
1879            .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
1880            .await?;
1881
1882        parse_change_priority_result(&raw, execution_id)
1883    }
1884
1885    /// Scheduler-routed claim entry point (Batch C item 2 PR-B).
1886    ///
1887    /// Delegates to [`ff_scheduler::Scheduler::claim_for_worker`] which
1888    /// runs budget + quota + capability admission before issuing the
1889    /// grant. Returns `Ok(None)` when no eligible execution exists on
1890    /// the lane at this scan cycle. The worker's subsequent
1891    /// `claim_from_grant(lane, grant)` mints the lease.
1892    ///
1893    /// Keeping the claim-grant mint inside the server (rather than the
1894    /// worker) means capability CSV validation, budget/quota breach
1895    /// checks, and lane routing run in one place for every tenant
1896    /// worker — the same invariants as the `direct-valkey-claim` path
1897    /// enforces inline, but gated at a single server choke point.
1898    pub async fn claim_for_worker(
1899        &self,
1900        lane: &LaneId,
1901        worker_id: &WorkerId,
1902        worker_instance_id: &WorkerInstanceId,
1903        worker_capabilities: &std::collections::BTreeSet<String>,
1904        grant_ttl_ms: u64,
1905    ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
1906        let scheduler = ff_scheduler::Scheduler::new(
1907            self.client.clone(),
1908            self.config.partition_config,
1909        );
1910        scheduler
1911            .claim_for_worker(
1912                lane,
1913                worker_id,
1914                worker_instance_id,
1915                worker_capabilities,
1916                grant_ttl_ms,
1917            )
1918            .await
1919            .map_err(|e| match e {
1920                ff_scheduler::SchedulerError::Valkey(inner) => {
1921                    ServerError::Valkey(inner)
1922                }
1923                ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
1924                    ServerError::ValkeyContext { source, context }
1925                }
1926                ff_scheduler::SchedulerError::Config(msg) => {
1927                    ServerError::InvalidInput(msg)
1928                }
1929            })
1930    }
1931
1932    /// Revoke an active lease (operator-initiated).
1933    pub async fn revoke_lease(
1934        &self,
1935        execution_id: &ExecutionId,
1936    ) -> Result<RevokeLeaseResult, ServerError> {
1937        let partition = execution_partition(execution_id, &self.config.partition_config);
1938        let ctx = ExecKeyContext::new(&partition, execution_id);
1939        let idx = IndexKeys::new(&partition);
1940
1941        // Pre-read worker_instance_id for worker_leases key
1942        let wiid_str: Option<String> = self
1943            .client
1944            .hget(&ctx.core(), "current_worker_instance_id")
1945            .await
1946            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET worker_instance_id".into() })?;
1947        let wiid = match wiid_str {
1948            Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
1949            _ => {
1950                return Err(ServerError::NotFound(format!(
1951                    "no active lease for execution {execution_id} (no current_worker_instance_id)"
1952                )));
1953            }
1954        };
1955
1956        // KEYS (5): exec_core, lease_current, lease_history, lease_expiry_zset, worker_leases
1957        let fcall_keys: Vec<String> = vec![
1958            ctx.core(),
1959            ctx.lease_current(),
1960            ctx.lease_history(),
1961            idx.lease_expiry(),
1962            idx.worker_leases(&wiid),
1963        ];
1964
1965        // ARGV (3): execution_id, expected_lease_id (empty = skip check), revoke_reason
1966        let fcall_args: Vec<String> = vec![
1967            execution_id.to_string(),
1968            String::new(), // no expected_lease_id check
1969            "operator_revoke".to_owned(),
1970        ];
1971
1972        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1973        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1974
1975        let raw: Value = self
1976            .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
1977            .await?;
1978
1979        parse_revoke_lease_result(&raw)
1980    }
1981
1982    /// Get full execution info via HGETALL on exec_core.
1983    pub async fn get_execution(
1984        &self,
1985        execution_id: &ExecutionId,
1986    ) -> Result<ExecutionInfo, ServerError> {
1987        let partition = execution_partition(execution_id, &self.config.partition_config);
1988        let ctx = ExecKeyContext::new(&partition, execution_id);
1989
1990        let fields: HashMap<String, String> = self
1991            .client
1992            .hgetall(&ctx.core())
1993            .await
1994            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL exec_core".into() })?;
1995
1996        if fields.is_empty() {
1997            return Err(ServerError::NotFound(format!(
1998                "execution not found: {execution_id}"
1999            )));
2000        }
2001
2002        let parse_enum = |field: &str| -> String {
2003            fields.get(field).cloned().unwrap_or_default()
2004        };
2005        fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2006            let quoted = format!("\"{raw}\"");
2007            serde_json::from_str(&quoted).map_err(|e| {
2008                ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2009            })
2010        }
2011
2012        let lp_str = parse_enum("lifecycle_phase");
2013        let os_str = parse_enum("ownership_state");
2014        let es_str = parse_enum("eligibility_state");
2015        let br_str = parse_enum("blocking_reason");
2016        let to_str = parse_enum("terminal_outcome");
2017        let as_str = parse_enum("attempt_state");
2018        let ps_str = parse_enum("public_state");
2019
2020        let state_vector = StateVector {
2021            lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2022            ownership_state: deserialize("ownership_state", &os_str)?,
2023            eligibility_state: deserialize("eligibility_state", &es_str)?,
2024            blocking_reason: deserialize("blocking_reason", &br_str)?,
2025            terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2026            attempt_state: deserialize("attempt_state", &as_str)?,
2027            public_state: deserialize("public_state", &ps_str)?,
2028        };
2029
2030        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2031        // stamped atomically with membership in `add_execution_to_flow`'s
2032        // single FCALL. Empty iff the exec has no flow affinity (never
2033        // called `add_execution_to_flow` — solo execs) — NOT "orphaned
2034        // mid-two-phase" (that failure mode is retired). Filter on empty
2035        // to surface "no flow affinity" distinctly from "flow X".
2036        let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2037
2038        // started_at / completed_at come from the exec_core hash —
2039        // lua/execution.lua writes them alongside the rest of the state
2040        // vector. `created_at` is required; the other two are optional
2041        // (pre-claim / pre-terminal reads) and elided when empty so the
2042        // wire payload for in-flight executions stays the shape existing
2043        // callers expect.
2044        let started_at_opt = fields
2045            .get("started_at")
2046            .filter(|s| !s.is_empty())
2047            .cloned();
2048        let completed_at_opt = fields
2049            .get("completed_at")
2050            .filter(|s| !s.is_empty())
2051            .cloned();
2052
2053        Ok(ExecutionInfo {
2054            execution_id: execution_id.clone(),
2055            namespace: parse_enum("namespace"),
2056            lane_id: parse_enum("lane_id"),
2057            priority: fields
2058                .get("priority")
2059                .and_then(|v| v.parse().ok())
2060                .unwrap_or(0),
2061            execution_kind: parse_enum("execution_kind"),
2062            state_vector,
2063            public_state: deserialize("public_state", &ps_str)?,
2064            created_at: parse_enum("created_at"),
2065            started_at: started_at_opt,
2066            completed_at: completed_at_opt,
2067            current_attempt_index: fields
2068                .get("current_attempt_index")
2069                .and_then(|v| v.parse().ok())
2070                .unwrap_or(0),
2071            flow_id: flow_id_val,
2072            blocking_detail: parse_enum("blocking_detail"),
2073        })
2074    }
2075
2076    /// List executions from a partition's index ZSET.
2077    ///
2078    /// No FCALL — direct ZRANGE + pipelined HMGET reads.
2079    pub async fn list_executions(
2080        &self,
2081        partition_id: u16,
2082        lane: &LaneId,
2083        state_filter: &str,
2084        offset: u64,
2085        limit: u64,
2086    ) -> Result<ListExecutionsResult, ServerError> {
2087        let partition = ff_core::partition::Partition {
2088            family: ff_core::partition::PartitionFamily::Execution,
2089            index: partition_id,
2090        };
2091        let idx = IndexKeys::new(&partition);
2092
2093        let zset_key = match state_filter {
2094            "eligible" => idx.lane_eligible(lane),
2095            "delayed" => idx.lane_delayed(lane),
2096            "terminal" => idx.lane_terminal(lane),
2097            "suspended" => idx.lane_suspended(lane),
2098            "active" => idx.lane_active(lane),
2099            other => {
2100                return Err(ServerError::InvalidInput(format!(
2101                    "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2102                )));
2103            }
2104        };
2105
2106        // ZRANGE key -inf +inf BYSCORE LIMIT offset count
2107        let eids: Vec<String> = self
2108            .client
2109            .cmd("ZRANGE")
2110            .arg(&zset_key)
2111            .arg("-inf")
2112            .arg("+inf")
2113            .arg("BYSCORE")
2114            .arg("LIMIT")
2115            .arg(offset)
2116            .arg(limit)
2117            .execute()
2118            .await
2119            .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("ZRANGE {zset_key}") })?;
2120
2121        if eids.is_empty() {
2122            return Ok(ListExecutionsResult {
2123                executions: vec![],
2124                total_returned: 0,
2125            });
2126        }
2127
2128        // Parse execution IDs, warning on corrupt ZSET members
2129        let mut parsed = Vec::with_capacity(eids.len());
2130        for eid_str in &eids {
2131            match ExecutionId::parse(eid_str) {
2132                Ok(id) => parsed.push(id),
2133                Err(e) => {
2134                    tracing::warn!(
2135                        raw_id = %eid_str,
2136                        error = %e,
2137                        zset = %zset_key,
2138                        "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2139                    );
2140                }
2141            }
2142        }
2143
2144        if parsed.is_empty() {
2145            return Ok(ListExecutionsResult {
2146                executions: vec![],
2147                total_returned: 0,
2148            });
2149        }
2150
2151        // Pipeline all HMGETs into a single round-trip
2152        let mut pipe = self.client.pipeline();
2153        let mut slots = Vec::with_capacity(parsed.len());
2154        for eid in &parsed {
2155            let ep = execution_partition(eid, &self.config.partition_config);
2156            let ctx = ExecKeyContext::new(&ep, eid);
2157            let slot = pipe
2158                .cmd::<Vec<Option<String>>>("HMGET")
2159                .arg(ctx.core())
2160                .arg("namespace")
2161                .arg("lane_id")
2162                .arg("execution_kind")
2163                .arg("public_state")
2164                .arg("priority")
2165                .arg("created_at")
2166                .finish();
2167            slots.push(slot);
2168        }
2169
2170        pipe.execute()
2171            .await
2172            .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline HMGET".into() })?;
2173
2174        let mut summaries = Vec::with_capacity(parsed.len());
2175        for (eid, slot) in parsed.into_iter().zip(slots) {
2176            let fields: Vec<Option<String>> = slot.value()
2177                .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline slot".into() })?;
2178
2179            let field = |i: usize| -> String {
2180                fields
2181                    .get(i)
2182                    .and_then(|v| v.as_ref())
2183                    .cloned()
2184                    .unwrap_or_default()
2185            };
2186
2187            summaries.push(ExecutionSummary {
2188                execution_id: eid,
2189                namespace: field(0),
2190                lane_id: field(1),
2191                execution_kind: field(2),
2192                public_state: field(3),
2193                priority: field(4).parse().unwrap_or(0),
2194                created_at: field(5),
2195            });
2196        }
2197
2198        let total = summaries.len();
2199        Ok(ListExecutionsResult {
2200            executions: summaries,
2201            total_returned: total,
2202        })
2203    }
2204
2205    /// Replay a terminal execution.
2206    ///
2207    /// Pre-reads exec_core for flow_id and dep edges (variable KEYS).
2208    /// KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.
2209    pub async fn replay_execution(
2210        &self,
2211        execution_id: &ExecutionId,
2212    ) -> Result<ReplayExecutionResult, ServerError> {
2213        let partition = execution_partition(execution_id, &self.config.partition_config);
2214        let ctx = ExecKeyContext::new(&partition, execution_id);
2215        let idx = IndexKeys::new(&partition);
2216
2217        // Pre-read lane_id, flow_id, terminal_outcome.
2218        //
2219        // Reader invariant (RFC-011 §7.3): `flow_id` on exec_core is
2220        // stamped atomically with membership by `add_execution_to_flow`'s
2221        // single FCALL. Empty iff the exec has no flow affinity
2222        // (solo-path create_execution — never added to a flow). The
2223        // `is_skipped_flow_member` branch below gates on
2224        // `!flow_id_str.is_empty()`, so solo execs correctly fall back
2225        // to the non-flow-member replay path. See
2226        // `add_execution_to_flow` rustdoc for the atomic-commit
2227        // invariant.
2228        let dyn_fields: Vec<Option<String>> = self
2229            .client
2230            .cmd("HMGET")
2231            .arg(ctx.core())
2232            .arg("lane_id")
2233            .arg("flow_id")
2234            .arg("terminal_outcome")
2235            .execute()
2236            .await
2237            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET replay pre-read".into() })?;
2238        let lane = LaneId::new(
2239            dyn_fields
2240                .first()
2241                .and_then(|v| v.as_ref())
2242                .cloned()
2243                .unwrap_or_else(|| "default".to_owned()),
2244        );
2245        let flow_id_str = dyn_fields
2246            .get(1)
2247            .and_then(|v| v.as_ref())
2248            .cloned()
2249            .unwrap_or_default();
2250        let terminal_outcome = dyn_fields
2251            .get(2)
2252            .and_then(|v| v.as_ref())
2253            .cloned()
2254            .unwrap_or_default();
2255
2256        let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2257
2258        // Base KEYS (4): exec_core, terminal_zset, eligible_zset, lease_history
2259        let mut fcall_keys: Vec<String> = vec![
2260            ctx.core(),
2261            idx.lane_terminal(&lane),
2262            idx.lane_eligible(&lane),
2263            ctx.lease_history(),
2264        ];
2265
2266        // Base ARGV (2): execution_id, now_ms
2267        let now = TimestampMs::now();
2268        let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2269
2270        if is_skipped_flow_member {
2271            // Read ALL inbound edge IDs from the flow partition's adjacency set.
2272            // Cannot use deps:unresolved because impossible edges were SREM'd
2273            // by ff_resolve_dependency. The flow's in:<eid> set has all edges.
2274            let flow_id = FlowId::parse(&flow_id_str)
2275                .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2276            let flow_part =
2277                flow_partition(&flow_id, &self.config.partition_config);
2278            let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2279            let edge_ids: Vec<String> = self
2280                .client
2281                .cmd("SMEMBERS")
2282                .arg(flow_ctx.incoming(execution_id))
2283                .execute()
2284                .await
2285                .map_err(|e| ServerError::ValkeyContext { source: e, context: "SMEMBERS replay edges".into() })?;
2286
2287            // Extended KEYS: blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N
2288            fcall_keys.push(idx.lane_blocked_dependencies(&lane)); // 5
2289            fcall_keys.push(ctx.deps_meta()); // 6
2290            fcall_keys.push(ctx.deps_unresolved()); // 7
2291            for eid_str in &edge_ids {
2292                let edge_id = EdgeId::parse(eid_str)
2293                    .unwrap_or_else(|_| EdgeId::new());
2294                fcall_keys.push(ctx.dep_edge(&edge_id)); // 8..8+N
2295                fcall_args.push(eid_str.clone()); // 3..3+N
2296            }
2297        }
2298
2299        let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2300        let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2301
2302        let raw: Value = self
2303            .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2304            .await?;
2305
2306        parse_replay_result(&raw)
2307    }
2308
2309    /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
2310    /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
2311    /// when the producer finalizes.
2312    ///
2313    /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
2314    /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
2315    /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
2316    /// and the Lua-side reject).
2317    ///
2318    /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
2319    /// execution id, so all KEYS share the same slot.
2320    pub async fn read_attempt_stream(
2321        &self,
2322        execution_id: &ExecutionId,
2323        attempt_index: AttemptIndex,
2324        from_id: &str,
2325        to_id: &str,
2326        count_limit: u64,
2327    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2328        use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2329
2330        if count_limit == 0 {
2331            return Err(ServerError::InvalidInput(
2332                "count_limit must be >= 1".to_owned(),
2333            ));
2334        }
2335
2336        // Share the same semaphore as tail. A large XRANGE reply (10_000
2337        // frames × ~64KB) is just as capable of head-of-line-blocking the
2338        // tail_client mux as a long BLOCK — fairness accounting must be
2339        // unified. Non-blocking acquire → 429 on contention.
2340        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2341            Ok(p) => p,
2342            Err(tokio::sync::TryAcquireError::NoPermits) => {
2343                return Err(ServerError::ConcurrencyLimitExceeded(
2344                    "stream_ops",
2345                    self.config.max_concurrent_stream_ops,
2346                ));
2347            }
2348            Err(tokio::sync::TryAcquireError::Closed) => {
2349                return Err(ServerError::OperationFailed(
2350                    "stream semaphore closed (server shutting down)".into(),
2351                ));
2352            }
2353        };
2354
2355        let args = ReadFramesArgs {
2356            execution_id: execution_id.clone(),
2357            attempt_index,
2358            from_id: from_id.to_owned(),
2359            to_id: to_id.to_owned(),
2360            count_limit,
2361        };
2362
2363        let partition = execution_partition(execution_id, &self.config.partition_config);
2364        let ctx = ExecKeyContext::new(&partition, execution_id);
2365        let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2366
2367        // Route on the dedicated stream client, same as tail. A 10_000-
2368        // frame XRANGE reply on the main mux would stall every other
2369        // FCALL behind reply serialization.
2370        let result = ff_script::functions::stream::ff_read_attempt_stream(
2371            &self.tail_client, &keys, &args,
2372        )
2373        .await
2374        .map_err(script_error_to_server);
2375
2376        drop(permit);
2377
2378        match result? {
2379            ReadFramesResult::Frames(f) => Ok(f),
2380        }
2381    }
2382
2383    /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
2384    /// plus the terminal signal so a polling consumer can exit when the
2385    /// producer closes the stream.
2386    ///
2387    /// `last_id` is exclusive — XREAD returns entries with id > last_id.
2388    /// Pass `"0-0"` to read from the beginning.
2389    ///
2390    /// `block_ms == 0` → non-blocking peek (returns immediately).
2391    /// `block_ms > 0`  → blocks up to that many ms. Empty `frames` +
2392    /// `closed_at=None` → timeout, no new data, still open.
2393    ///
2394    /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
2395    ///
2396    /// Implemented as a direct XREAD command (not FCALL) because blocking
2397    /// commands are rejected inside Valkey Functions. The terminal
2398    /// markers come from a companion HMGET on `stream_meta` — see
2399    /// `ff_script::stream_tail` module docs.
2400    pub async fn tail_attempt_stream(
2401        &self,
2402        execution_id: &ExecutionId,
2403        attempt_index: AttemptIndex,
2404        last_id: &str,
2405        block_ms: u64,
2406        count_limit: u64,
2407    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2408        if count_limit == 0 {
2409            return Err(ServerError::InvalidInput(
2410                "count_limit must be >= 1".to_owned(),
2411            ));
2412        }
2413
2414        // Non-blocking permit acquisition on the shared stream_semaphore
2415        // (read + tail split the same pool). If the server is already at
2416        // the `max_concurrent_stream_ops` ceiling, return `TailUnavailable`
2417        // (→ 429) rather than queueing — a queued tail holds the caller's
2418        // HTTP request open with no upper bound, which is exactly the
2419        // resource-exhaustion pattern this limit exists to prevent.
2420        // Clients retry with backoff on 429.
2421        //
2422        // Worst-case permit hold: if the producer closes the stream via
2423        // HSET on stream_meta (not an XADD), the XREAD BLOCK won't wake
2424        // until `block_ms` elapses — so a permit can be held for up to
2425        // the caller's block_ms even though the terminal signal is
2426        // ready. This is a v1-accepted limitation; RFC-006 §Terminal-
2427        // signal timing under active tail documents it and sketches the
2428        // v2 sentinel-XADD upgrade path.
2429        let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2430            Ok(p) => p,
2431            Err(tokio::sync::TryAcquireError::NoPermits) => {
2432                return Err(ServerError::ConcurrencyLimitExceeded(
2433                    "stream_ops",
2434                    self.config.max_concurrent_stream_ops,
2435                ));
2436            }
2437            Err(tokio::sync::TryAcquireError::Closed) => {
2438                return Err(ServerError::OperationFailed(
2439                    "stream semaphore closed (server shutting down)".into(),
2440                ));
2441            }
2442        };
2443
2444        let partition = execution_partition(execution_id, &self.config.partition_config);
2445        let ctx = ExecKeyContext::new(&partition, execution_id);
2446        let stream_key = ctx.stream(attempt_index);
2447        let stream_meta_key = ctx.stream_meta(attempt_index);
2448
2449        // Acquire the XREAD BLOCK serializer AFTER the stream semaphore.
2450        // Nesting order matters: the semaphore is the user-visible
2451        // ceiling (surfaces as 429), the Mutex is an internal fairness
2452        // gate. Holding the permit while waiting on the Mutex means the
2453        // ceiling still bounds queue depth. See the field docstring for
2454        // the full rationale (ferriskey pipeline FIFO + client-side
2455        // per-call timeout race).
2456        let _xread_guard = self.xread_block_lock.lock().await;
2457
2458        let result = ff_script::stream_tail::xread_block(
2459            &self.tail_client,
2460            &stream_key,
2461            &stream_meta_key,
2462            last_id,
2463            block_ms,
2464            count_limit,
2465        )
2466        .await
2467        .map_err(script_error_to_server);
2468
2469        drop(_xread_guard);
2470        drop(permit);
2471        result
2472    }
2473
2474    /// Graceful shutdown — stops scanners, drains background handler tasks
2475    /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
2476    /// for scanners to finish.
2477    ///
2478    /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
2479    /// cleanly without new arrivals piling up:
2480    ///
2481    /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
2482    ///    with `ServerError::OperationFailed("stream semaphore closed …")`
2483    ///    which the REST layer surfaces as a 500 with `retryable=false`
2484    ///    (ops tooling may choose to wait + retry on 503-class responses;
2485    ///    the body clearly names the shutdown reason).
2486    /// 2. Drain handler-spawned background tasks with a 15s ceiling.
2487    /// 3. `engine.shutdown()` stops scanners.
2488    ///
2489    /// Existing in-flight tails finish on their natural `block_ms`
2490    /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
2491    /// is dropped after this function returns. We do NOT wait for tails
2492    /// to drain explicitly — the semaphore-close + natural-timeout
2493    /// combination bounds shutdown to roughly `block_ms + 15s` in the
2494    /// worst case. Callers observing a dropped connection retry against
2495    /// whatever replacement is coming up.
2496    pub async fn shutdown(self) {
2497        tracing::info!("shutting down FlowFabric server");
2498
2499        // Step 1: Close the stream semaphore FIRST so any in-flight
2500        // read/tail calls that are between `try_acquire` and their
2501        // Valkey command still hold a valid permit, but no NEW stream
2502        // op can start. `Semaphore::close()` is idempotent.
2503        self.stream_semaphore.close();
2504        tracing::info!(
2505            "stream semaphore closed; no new read/tail attempts will be accepted"
2506        );
2507
2508        // Step 2: Drain handler-spawned background tasks with the same
2509        // ceiling as Engine::shutdown. If dispatch is still running at
2510        // the deadline, drop the JoinSet to abort remaining tasks.
2511        let drain_timeout = Duration::from_secs(15);
2512        let background = self.background_tasks.clone();
2513        let drain = async move {
2514            let mut guard = background.lock().await;
2515            while guard.join_next().await.is_some() {}
2516        };
2517        match tokio::time::timeout(drain_timeout, drain).await {
2518            Ok(()) => {}
2519            Err(_) => {
2520                tracing::warn!(
2521                    timeout_s = drain_timeout.as_secs(),
2522                    "shutdown: background tasks did not finish in time, aborting"
2523                );
2524                self.background_tasks.lock().await.abort_all();
2525            }
2526        }
2527
2528        self.engine.shutdown().await;
2529        tracing::info!("FlowFabric server shutdown complete");
2530    }
2531}
2532
2533// ── Valkey version check (RFC-011 §13) ──
2534
2535/// Minimum Valkey version the engine requires (see RFC-011 §13). 8.0 ships the
2536/// hash-slot and Functions behavior the co-location design relies on.
2537const REQUIRED_VALKEY_MAJOR: u32 = 8;
2538
2539/// Upper bound on the rolling-upgrade retry window (RFC-011 §9.17). A Valkey
2540/// node cycling through SIGTERM → restart typically completes in well under
2541/// 60s; this budget is generous without letting a truly-stuck cluster hang
2542/// boot indefinitely.
2543const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2544
2545/// Verify the connected Valkey reports `redis_version` ≥ 8.0.
2546///
2547/// Per RFC-011 §9.17, during a rolling upgrade the node we happen to connect
2548/// to may temporarily be pre-upgrade while others are post-upgrade. The check
2549/// tolerates this by retrying the whole verification (including low-version
2550/// responses) with exponential backoff, capped at a 60s budget.
2551///
2552/// **Retries on:**
2553/// - Low-version responses (`detected_major < REQUIRED_VALKEY_MAJOR`) — may
2554///   resolve as the rolling upgrade progresses onto the connected node.
2555/// - Retryable ferriskey transport errors — connection refused,
2556///   `BusyLoadingError`, `ClusterDown`, etc., classified via
2557///   `ff_script::retry::is_retryable_kind`.
2558/// - Missing/unparsable version field — treated as transient (fresh-boot
2559///   server may not have the INFO fields populated yet). Reads
2560///   `valkey_version` when present (authoritative on Valkey 8.0+), falls
2561///   back to `redis_version` for Valkey 7.x.
2562///
2563/// **Does NOT retry on:**
2564/// - Non-retryable transport errors (auth failures, permission denied,
2565///   invalid client config) — these are operator misconfiguration, not
2566///   transient cluster state; fast-fail preserves a clear signal.
2567///
2568/// On budget exhaustion, returns the last observed error.
2569async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2570    let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2571    let mut backoff = Duration::from_millis(200);
2572    loop {
2573        let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2574            match query_valkey_version(client).await {
2575                Ok(detected_major) if detected_major >= REQUIRED_VALKEY_MAJOR => {
2576                    tracing::info!(
2577                        detected_major,
2578                        required = REQUIRED_VALKEY_MAJOR,
2579                        "Valkey version accepted"
2580                    );
2581                    return Ok(());
2582                }
2583                Ok(detected_major) => (
2584                    // Low version — may be a rolling-upgrade stale node.
2585                    // Retry within budget; after exhaustion, the cluster
2586                    // is misconfigured and fast-fail is the correct signal.
2587                    true,
2588                    ServerError::ValkeyVersionTooLow {
2589                        detected: detected_major.to_string(),
2590                        required: format!("{REQUIRED_VALKEY_MAJOR}.0"),
2591                    },
2592                    format!("detected_major={detected_major} < required={REQUIRED_VALKEY_MAJOR}"),
2593                ),
2594                Err(e) => {
2595                    // Only retry if the underlying Valkey error is retryable
2596                    // by kind. Auth / permission / invalid-config should fast-
2597                    // fail so operators see the true root cause immediately,
2598                    // not a 60s hang followed by a generic "transient" error.
2599                    let retryable = e
2600                        .valkey_kind()
2601                        .map(ff_script::retry::is_retryable_kind)
2602                        // Non-Valkey errors (parse, missing field, operation
2603                        // failures) are treated as transient — a fresh-boot
2604                        // Valkey may not have redis_version populated yet.
2605                        .unwrap_or(true);
2606                    let detail = e.to_string();
2607                    (retryable, e, detail)
2608                }
2609            };
2610
2611        if !should_retry {
2612            return Err(err_for_budget_exhaust);
2613        }
2614        if tokio::time::Instant::now() >= deadline {
2615            return Err(err_for_budget_exhaust);
2616        }
2617        tracing::warn!(
2618            backoff_ms = backoff.as_millis() as u64,
2619            detail = %log_detail,
2620            "valkey version check transient failure; retrying"
2621        );
2622        tokio::time::sleep(backoff).await;
2623        backoff = (backoff * 2).min(Duration::from_secs(5));
2624    }
2625}
2626
2627/// Run `INFO server` and extract the major component of the Valkey version.
2628///
2629/// Returns `Err` on transport errors, missing field, or unparsable version.
2630/// Handles three response shapes:
2631///
2632/// - **Standalone:** single string body; parse directly.
2633/// - **Cluster (RESP3 map):** `INFO` returns a map keyed by node address;
2634///   every node runs the same version in a healthy deployment, so we pick
2635///   one entry and parse it. Divergent versions during a rolling upgrade
2636///   are handled by the outer retry loop.
2637/// - **Empty / unexpected:** surfaces as `OperationFailed` with context.
2638async fn query_valkey_version(client: &Client) -> Result<u32, ServerError> {
2639    let raw: Value = client
2640        .cmd("INFO")
2641        .arg("server")
2642        .execute()
2643        .await
2644        .map_err(|e| ServerError::ValkeyContext {
2645            source: e,
2646            context: "INFO server".into(),
2647        })?;
2648    let info_body = extract_info_body(&raw)?;
2649    parse_valkey_major_version(&info_body)
2650}
2651
2652/// Normalize an `INFO server` response to a single string body.
2653///
2654/// Standalone returns the body directly. Cluster returns a map of
2655/// `node_addr → body`; we pick any entry (they should all report the same
2656/// version in a stable cluster; divergent versions during rolling upgrade
2657/// are reconciled by the outer retry loop).
2658fn extract_info_body(raw: &Value) -> Result<String, ServerError> {
2659    match raw {
2660        Value::BulkString(bytes) => Ok(String::from_utf8_lossy(bytes).into_owned()),
2661        Value::VerbatimString { text, .. } => Ok(text.clone()),
2662        Value::SimpleString(s) => Ok(s.clone()),
2663        Value::Map(entries) => {
2664            // Pick the first entry's value. For cluster-wide INFO the map is
2665            // node_addr → body; every body carries the same version. If a
2666            // node is mid-upgrade, the outer retry loop handles divergence.
2667            let (_, body) = entries.first().ok_or_else(|| {
2668                ServerError::OperationFailed(
2669                    "valkey version check: cluster INFO returned empty map".into(),
2670                )
2671            })?;
2672            extract_info_body(body)
2673        }
2674        other => Err(ServerError::OperationFailed(format!(
2675            "valkey version check: unexpected INFO shape: {other:?}"
2676        ))),
2677    }
2678}
2679
2680/// Extract the major component of the Valkey version from an `INFO server`
2681/// response body. Pure parser — pulled out of [`query_valkey_version`] so it
2682/// is unit-testable without a live Valkey.
2683///
2684/// Prefers the `valkey_version:` field introduced in Valkey 8.0+. Falls back
2685/// to `redis_version:` for Valkey 7.x compat. **Note:** Valkey 8.x/9.x still
2686/// emit `redis_version:7.2.4` for Redis-client compatibility; the real
2687/// server version is in `valkey_version:`, so always check that field first.
2688fn parse_valkey_major_version(info: &str) -> Result<u32, ServerError> {
2689    let extract_major = |line: &str| -> Result<u32, ServerError> {
2690        let trimmed = line.trim();
2691        let major_str = trimmed.split('.').next().unwrap_or("").trim();
2692        if major_str.is_empty() {
2693            return Err(ServerError::OperationFailed(format!(
2694                "valkey version check: empty version field in '{trimmed}'"
2695            )));
2696        }
2697        major_str.parse::<u32>().map_err(|_| {
2698            ServerError::OperationFailed(format!(
2699                "valkey version check: non-numeric major in '{trimmed}'"
2700            ))
2701        })
2702    };
2703    // Prefer valkey_version (authoritative on Valkey 8.0+).
2704    if let Some(valkey_line) = info
2705        .lines()
2706        .find_map(|line| line.strip_prefix("valkey_version:"))
2707    {
2708        return extract_major(valkey_line);
2709    }
2710    // Fall back to redis_version (Valkey 7.x only — 8.x+ pins this to 7.2.4
2711    // for Redis-client compat, so reading it there would under-report).
2712    if let Some(redis_line) = info
2713        .lines()
2714        .find_map(|line| line.strip_prefix("redis_version:"))
2715    {
2716        return extract_major(redis_line);
2717    }
2718    Err(ServerError::OperationFailed(
2719        "valkey version check: INFO missing valkey_version and redis_version".into(),
2720    ))
2721}
2722
2723// ── Partition config validation ──
2724
2725/// Validate or create the `ff:config:partitions` key on first boot.
2726///
2727/// If the key exists, its values must match the server's config.
2728/// If it doesn't exist, create it (first boot).
2729async fn validate_or_create_partition_config(
2730    client: &Client,
2731    config: &PartitionConfig,
2732) -> Result<(), ServerError> {
2733    let key = keys::global_config_partitions();
2734
2735    let existing: HashMap<String, String> = client
2736        .hgetall(&key)
2737        .await
2738        .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
2739
2740    if existing.is_empty() {
2741        // First boot — create the config
2742        tracing::info!("first boot: creating {key}");
2743        client
2744            .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
2745            .await
2746            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_flow_partitions".into() })?;
2747        client
2748            .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
2749            .await
2750            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_budget_partitions".into() })?;
2751        client
2752            .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
2753            .await
2754            .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_quota_partitions".into() })?;
2755        return Ok(());
2756    }
2757
2758    // Validate existing config matches
2759    let check = |field: &str, expected: u16| -> Result<(), ServerError> {
2760        let stored: u16 = existing
2761            .get(field)
2762            .and_then(|v| v.parse().ok())
2763            .unwrap_or(0);
2764        if stored != expected {
2765            return Err(ServerError::PartitionMismatch(format!(
2766                "{field}: stored={stored}, config={expected}. \
2767                 Partition counts are fixed at deployment time. \
2768                 Either fix your config or migrate the data."
2769            )));
2770        }
2771        Ok(())
2772    };
2773
2774    check("num_flow_partitions", config.num_flow_partitions)?;
2775    check("num_budget_partitions", config.num_budget_partitions)?;
2776    check("num_quota_partitions", config.num_quota_partitions)?;
2777
2778    tracing::info!("partition config validated against stored {key}");
2779    Ok(())
2780}
2781
2782// ── Waitpoint HMAC secret bootstrap (RFC-004 §Waitpoint Security) ──
2783
2784/// Stable initial kid written on first boot. Rotation promotes to k2, k3, ...
2785/// The kid is stored alongside the secret in every partition's hash so each
2786/// FCALL can self-identify which secret produced a given token.
2787const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
2788
2789/// Per-partition outcome of the HMAC bootstrap step. Collected across the
2790/// parallel scan so we can emit aggregated logs once at the end.
2791enum PartitionBootOutcome {
2792    /// Partition already had a matching (kid, secret) pair.
2793    Match,
2794    /// Stored secret diverges from env — likely operator rotation; kept.
2795    Mismatch,
2796    /// Torn write (current_kid present, secret:<kid> missing); repaired.
2797    Repaired,
2798    /// Fresh partition; atomically installed env secret under kid=k1.
2799    Installed,
2800}
2801
2802/// Bounded in-flight concurrency for the startup fan-out. Large enough to
2803/// turn a 256-partition install from ~15s sequential into ~1s on cross-AZ
2804/// Valkey, small enough to leave a cold cluster breathing room for other
2805/// Server::start work (library load, engine scanner spawn).
2806const BOOT_INIT_CONCURRENCY: usize = 16;
2807
2808async fn init_one_partition(
2809    client: &Client,
2810    partition: Partition,
2811    secret_hex: &str,
2812) -> Result<PartitionBootOutcome, ServerError> {
2813    let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
2814
2815    // Probe for an existing install. Fast path (fresh partition): HGET
2816    // returns nil and we fall through to the atomic install below. Slow
2817    // path: secret:<stored_kid> is then HGET'd once we know the kid name.
2818    // (A previous version used a 2-field HMGET that included a fake
2819    // `secret:probe` placeholder — vestigial from an abandoned
2820    // optimization attempt, and confusing to read. Collapsed back to a
2821    // single-field HGET.)
2822    let stored_kid: Option<String> = client
2823        .cmd("HGET")
2824        .arg(&key)
2825        .arg("current_kid")
2826        .execute()
2827        .await
2828        .map_err(|e| ServerError::ValkeyContext {
2829            source: e,
2830            context: format!("HGET {key} current_kid (init probe)"),
2831        })?;
2832
2833    if let Some(stored_kid) = stored_kid {
2834        // We didn't know the stored kid up front, so now HGET the real
2835        // secret:<stored_kid> field. Two round-trips in the slow path; the
2836        // fast path (fresh partition) stays at one.
2837        let field = format!("secret:{stored_kid}");
2838        let stored_secret: Option<String> = client
2839            .hget(&key, &field)
2840            .await
2841            .map_err(|e| ServerError::ValkeyContext {
2842                source: e,
2843                context: format!("HGET {key} secret:<kid> (init check)"),
2844            })?;
2845        if stored_secret.is_none() {
2846            // Torn write from a previous boot: current_kid present but
2847            // secret:<kid> missing. Without repair, mint returns
2848            // "hmac_secret_not_initialized" on that partition forever.
2849            // Repair in place with env secret. Not rotation — rotation
2850            // always writes the secret first.
2851            client
2852                .hset(&key, &field, secret_hex)
2853                .await
2854                .map_err(|e| ServerError::ValkeyContext {
2855                    source: e,
2856                    context: format!("HSET {key} secret:<kid> (repair torn write)"),
2857                })?;
2858            return Ok(PartitionBootOutcome::Repaired);
2859        }
2860        if stored_secret.as_deref() != Some(secret_hex) {
2861            return Ok(PartitionBootOutcome::Mismatch);
2862        }
2863        return Ok(PartitionBootOutcome::Match);
2864    }
2865
2866    // Fresh partition — install current_kid + secret:<kid> atomically in
2867    // one HSET. Multi-field HSET is single-command atomic, so a crash
2868    // can't leave current_kid without its secret.
2869    let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
2870    let _: i64 = client
2871        .cmd("HSET")
2872        .arg(&key)
2873        .arg("current_kid")
2874        .arg(WAITPOINT_HMAC_INITIAL_KID)
2875        .arg(&secret_field)
2876        .arg(secret_hex)
2877        .execute()
2878        .await
2879        .map_err(|e| ServerError::ValkeyContext {
2880            source: e,
2881            context: format!("HSET {key} (init waitpoint HMAC atomic)"),
2882        })?;
2883    Ok(PartitionBootOutcome::Installed)
2884}
2885
2886/// Install the waitpoint HMAC secret on every execution partition.
2887///
2888/// Parallelized fan-out with bounded in-flight concurrency
2889/// (`BOOT_INIT_CONCURRENCY`) so 256-partition boots finish in ~1s instead
2890/// of ~15s sequential — the prior sequential loop was tight on K8s
2891/// `initialDelaySeconds=30` defaults, especially cross-AZ. Fail-fast:
2892/// the first per-partition error aborts boot.
2893///
2894/// Outcomes aggregate into mismatch/repaired counts (logged once at end)
2895/// so operators see a single loud warning per fault class instead of 256
2896/// per-partition lines.
2897async fn initialize_waitpoint_hmac_secret(
2898    client: &Client,
2899    partition_config: &PartitionConfig,
2900    secret_hex: &str,
2901) -> Result<(), ServerError> {
2902    use futures::stream::{FuturesUnordered, StreamExt};
2903
2904    let n = partition_config.num_flow_partitions;
2905    tracing::info!(
2906        partitions = n,
2907        concurrency = BOOT_INIT_CONCURRENCY,
2908        "installing waitpoint HMAC secret across {n} execution partitions"
2909    );
2910
2911    let mut mismatch_count: u16 = 0;
2912    let mut repaired_count: u16 = 0;
2913    let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
2914    let mut next_index: u16 = 0;
2915
2916    loop {
2917        while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
2918            let partition = Partition {
2919                family: PartitionFamily::Execution,
2920                index: next_index,
2921            };
2922            let client = client.clone();
2923            let secret_hex = secret_hex.to_owned();
2924            pending.push(async move {
2925                init_one_partition(&client, partition, &secret_hex).await
2926            });
2927            next_index += 1;
2928        }
2929        match pending.next().await {
2930            Some(res) => match res? {
2931                PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
2932                PartitionBootOutcome::Mismatch => mismatch_count += 1,
2933                PartitionBootOutcome::Repaired => repaired_count += 1,
2934            },
2935            None => break,
2936        }
2937    }
2938
2939    if repaired_count > 0 {
2940        tracing::warn!(
2941            repaired_partitions = repaired_count,
2942            total_partitions = n,
2943            "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
2944             (current_kid present but secret:<kid> missing, likely crash during prior boot)"
2945        );
2946    }
2947
2948    if mismatch_count > 0 {
2949        tracing::warn!(
2950            mismatched_partitions = mismatch_count,
2951            total_partitions = n,
2952            "stored/env secret mismatch on {mismatch_count} partitions — \
2953             env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
2954             run POST /v1/admin/rotate-waitpoint-secret to sync"
2955        );
2956    }
2957
2958    tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
2959    Ok(())
2960}
2961
2962/// Result of a waitpoint HMAC secret rotation across all execution partitions.
2963#[derive(Debug, Clone, serde::Serialize)]
2964pub struct RotateWaitpointSecretResult {
2965    /// Count of partitions that accepted the rotation.
2966    pub rotated: u16,
2967    /// Partition indices that failed — operator should investigate (Valkey
2968    /// outage, auth failure, cluster split). Rotation is idempotent, so a
2969    /// re-run after the underlying fault clears converges to the correct
2970    /// state.
2971    pub failed: Vec<u16>,
2972    /// Partition indices where another rotation was already in progress
2973    /// (SETNX lock held). These will be rotated by the concurrent request;
2974    /// NOT a fault the operator should retry and NOT counted in `failed`.
2975    /// Surfaced separately so dashboards distinguish "retry needed" from
2976    /// "ignore, in-flight".
2977    #[serde(default)]
2978    pub in_progress: Vec<u16>,
2979    /// New kid installed as current.
2980    pub new_kid: String,
2981}
2982
2983/// Per-partition rotation step outcome. `InProgress` means the SETNX lock
2984/// was held by a concurrent rotation — not an error; the concurrent call
2985/// will finish the work.
2986enum RotationStepOutcome {
2987    Rotated,
2988    InProgress,
2989}
2990
2991impl Server {
2992    /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
2993    /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
2994    /// as the new current kid. Idempotent: re-running with the same `new_kid`
2995    /// and `new_secret_hex` converges partitions to the same state.
2996    ///
2997    /// Returns a structured result so operators can see which partitions failed.
2998    /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
2999    pub async fn rotate_waitpoint_secret(
3000        &self,
3001        new_kid: &str,
3002        new_secret_hex: &str,
3003    ) -> Result<RotateWaitpointSecretResult, ServerError> {
3004        if new_kid.is_empty() || new_kid.contains(':') {
3005            return Err(ServerError::OperationFailed(
3006                "new_kid must be non-empty and must not contain ':'".into(),
3007            ));
3008        }
3009        if new_secret_hex.is_empty()
3010            || !new_secret_hex.len().is_multiple_of(2)
3011            || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3012        {
3013            return Err(ServerError::OperationFailed(
3014                "new_secret_hex must be a non-empty even-length hex string".into(),
3015            ));
3016        }
3017
3018        // Single-writer gate. Concurrent rotates against the SAME operator
3019        // token are an attack pattern (or a retry-loop bug); legitimate
3020        // operators rotate monthly and can afford to serialize. Contention
3021        // returns ConcurrencyLimitExceeded("admin_rotate", 1) (→ HTTP 429
3022        // with a labelled error body) rather than queueing the HTTP
3023        // handler past the 120s endpoint timeout. Permit is held for
3024        // the full partition fan-out.
3025        let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3026            Ok(p) => p,
3027            Err(tokio::sync::TryAcquireError::NoPermits) => {
3028                return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3029            }
3030            Err(tokio::sync::TryAcquireError::Closed) => {
3031                return Err(ServerError::OperationFailed(
3032                    "admin rotate semaphore closed (server shutting down)".into(),
3033                ));
3034            }
3035        };
3036
3037        let n = self.config.partition_config.num_flow_partitions;
3038        let grace_ms = self.config.waitpoint_hmac_grace_ms;
3039        let now_ms = std::time::SystemTime::now()
3040            .duration_since(std::time::UNIX_EPOCH)
3041            .expect("system clock before unix epoch")
3042            .as_millis() as i64;
3043        let previous_expires_at = now_ms + grace_ms as i64;
3044
3045        // Parallelize the rotation fan-out with the same bounded
3046        // concurrency as boot init (BOOT_INIT_CONCURRENCY = 16). A 256-
3047        // partition sequential rotation takes ~7.7s at 30ms cross-AZ RTT,
3048        // uncomfortably close to the 120s HTTP endpoint timeout under
3049        // contention. The per-partition SETNX lock in
3050        // `rotate_single_partition` prevents interleaved writes against
3051        // the same partition; parallelism across DIFFERENT partitions is
3052        // safe. The outer `admin_rotate_semaphore(1)` already bounds
3053        // server-wide concurrent rotations, so this fan-out only affects
3054        // a single in-flight rotate call at a time.
3055        use futures::stream::{FuturesUnordered, StreamExt};
3056
3057        let mut rotated = 0u16;
3058        let mut failed = Vec::new();
3059        let mut in_progress: Vec<u16> = Vec::new();
3060        let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3061        let mut next_index: u16 = 0;
3062
3063        loop {
3064            while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3065                let partition = Partition {
3066                    family: PartitionFamily::Execution,
3067                    index: next_index,
3068                };
3069                let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3070                let idx = next_index;
3071                // Clone only what the per-partition future needs. The
3072                // new_kid / new_secret_hex references outlive the loop
3073                // (they come from the enclosing function args), but
3074                // FuturesUnordered needs 'static futures. Own the strings.
3075                let new_kid_owned = new_kid.to_owned();
3076                let new_secret_owned = new_secret_hex.to_owned();
3077                // Borrow-free call into rotate_single_partition: it only
3078                // needs &self (Arc is fine; we already have &self here),
3079                // &str (owned locals above), and i64 (Copy).
3080                let fut = async move {
3081                    let outcome = self
3082                        .rotate_single_partition(
3083                            &key,
3084                            &new_kid_owned,
3085                            &new_secret_owned,
3086                            previous_expires_at,
3087                        )
3088                        .await;
3089                    (idx, partition, outcome)
3090                };
3091                pending.push(fut);
3092                next_index += 1;
3093            }
3094            match pending.next().await {
3095                Some((idx, partition, outcome)) => match outcome {
3096                    Ok(RotationStepOutcome::Rotated) => {
3097                        rotated += 1;
3098                        // Per-partition event → DEBUG (not INFO). Rationale:
3099                        // one rotate endpoint call produces 256 partition-level
3100                        // events, which would blow up paid aggregator budgets
3101                        // (Datadog/Splunk) at no operational value. The single
3102                        // aggregated audit event below is the compliance
3103                        // artifact. Failures stay at ERROR with per-partition
3104                        // detail — that's where operators need it.
3105                        tracing::debug!(
3106                            partition = %partition,
3107                            new_kid = %new_kid,
3108                            "waitpoint_hmac_rotated"
3109                        );
3110                    }
3111                    Ok(RotationStepOutcome::InProgress) => {
3112                        // Another rotation is mid-flight on this partition.
3113                        // Idempotent + per-partition locked; NOT a failure
3114                        // the operator needs to retry. Kept out of `failed`
3115                        // so that list stays actionable.
3116                        in_progress.push(idx);
3117                        tracing::debug!(
3118                            partition = %partition,
3119                            new_kid = %new_kid,
3120                            "waitpoint_hmac_rotation_in_progress_skipped"
3121                        );
3122                    }
3123                    Err(e) => {
3124                        // Failures stay at ERROR (target=audit) per-partition —
3125                        // operators need the partition index + error to debug
3126                        // Valkey/config faults. Low cardinality in practice.
3127                        tracing::error!(
3128                            target: "audit",
3129                            partition = %partition,
3130                            err = %e,
3131                            "waitpoint_hmac_rotation_failed"
3132                        );
3133                        failed.push(idx);
3134                    }
3135                },
3136                None => break,
3137            }
3138        }
3139
3140        // Single aggregated audit event for the whole rotation. This is
3141        // the load-bearing compliance artifact — operators alert on
3142        // target="audit" at INFO level and this is the stable schema.
3143        tracing::info!(
3144            target: "audit",
3145            new_kid = %new_kid,
3146            total_partitions = n,
3147            rotated,
3148            failed_count = failed.len(),
3149            in_progress_count = in_progress.len(),
3150            "waitpoint_hmac_rotation_complete"
3151        );
3152
3153        Ok(RotateWaitpointSecretResult {
3154            rotated,
3155            failed,
3156            in_progress,
3157            new_kid: new_kid.to_owned(),
3158        })
3159    }
3160
3161    /// Rotate on a single partition. Reads current_kid, promotes it to
3162    /// previous_kid with `previous_expires_at`, installs new_kid + new secret.
3163    ///
3164    /// Serialized per-partition via a SETNX lock (`ff:sec:rotate_lock:{p:N}`)
3165    /// with a 10s TTL. Concurrent operator-triggered rotations against the
3166    /// same partition would otherwise interleave the HGET/HDEL/HSET sequence
3167    /// and lose writes — the loser's secret would end up partially stored
3168    /// and its tokens would stop validating. The lock key shares the
3169    /// partition hash tag so it stays in-slot under cluster mode.
3170    ///
3171    /// A single-partition rotation completes in a few ms on local Valkey,
3172    /// so the 10s TTL is a generous safety margin.
3173    async fn rotate_single_partition(
3174        &self,
3175        key: &str,
3176        new_kid: &str,
3177        new_secret_hex: &str,
3178        previous_expires_at: i64,
3179    ) -> Result<RotationStepOutcome, ServerError> {
3180        // Derive the lock key from the secrets key: keep the `{p:N}` hash tag
3181        // so the lock lives in the same cluster slot as the hash we're
3182        // mutating. `ff:sec:{p:N}:waitpoint_hmac` → `ff:sec:{p:N}:rotate_lock`.
3183        let lock_key = rotate_lock_key_for(key);
3184        const LOCK_TTL_MS: u64 = 10_000;
3185        let acquired: Option<String> = self
3186            .client
3187            .cmd("SET")
3188            .arg(&lock_key)
3189            .arg("1")
3190            .arg("NX")
3191            .arg("PX")
3192            .arg(LOCK_TTL_MS.to_string().as_str())
3193            .execute()
3194            .await
3195            .map_err(|e| ServerError::ValkeyContext {
3196                source: e,
3197                context: format!("SET NX {lock_key}"),
3198            })?;
3199        if acquired.is_none() {
3200            // Concurrent rotation holds the lock. Return InProgress (not
3201            // Err) — the concurrent caller will finish the work, the
3202            // outer loop buckets this partition into `in_progress`, and
3203            // the operator does not get a spurious "failed" count.
3204            return Ok(RotationStepOutcome::InProgress);
3205        }
3206
3207        // Best-effort DEL of the lock on all exit paths. Valkey auto-expires
3208        // via PX TTL anyway, so a dropped future only costs the remaining
3209        // lock window, not correctness.
3210        let result = self
3211            .rotate_single_partition_locked(key, new_kid, new_secret_hex, previous_expires_at)
3212            .await;
3213        let _: Option<i64> = self
3214            .client
3215            .cmd("DEL")
3216            .arg(&lock_key)
3217            .execute()
3218            .await
3219            .ok()
3220            .flatten();
3221        result.map(|()| RotationStepOutcome::Rotated)
3222    }
3223
3224    async fn rotate_single_partition_locked(
3225        &self,
3226        key: &str,
3227        new_kid: &str,
3228        new_secret_hex: &str,
3229        previous_expires_at: i64,
3230    ) -> Result<(), ServerError> {
3231        let current_kid: Option<String> = self
3232            .client
3233            .hget(key, "current_kid")
3234            .await
3235            .map_err(|e| ServerError::ValkeyContext {
3236                source: e,
3237                context: format!("HGET {key} current_kid"),
3238            })?;
3239
3240        if current_kid.as_deref() == Some(new_kid) {
3241            // Already rotated to this kid. Retry-safety rule (RFC-004):
3242            // - same kid + same secret → no-op (operator retry after HTTP
3243            //   timeout or transient 5xx is SAFE).
3244            // - same kid + DIFFERENT secret → REJECT with RotationConflict.
3245            //   Silently overwriting would invalidate every token minted
3246            //   under the stored secret since the first rotation to this
3247            //   kid, an unbounded amount of in-flight state.
3248            let field = format!("secret:{new_kid}");
3249            let stored_secret: Option<String> = self
3250                .client
3251                .hget(key, &field)
3252                .await
3253                .map_err(|e| ServerError::ValkeyContext {
3254                    source: e,
3255                    context: format!("HGET {key} secret:<kid> (idempotency check)"),
3256                })?;
3257            match stored_secret.as_deref() {
3258                Some(s) if s == new_secret_hex => {
3259                    // Exact replay — idempotent no-op.
3260                    return Ok(());
3261                }
3262                Some(_) => {
3263                    // Same kid, different secret bytes. This is almost
3264                    // certainly an operator mistake (typed wrong hex, or
3265                    // forgot they rotated, or two simultaneous rotations
3266                    // racing with distinct material). Refuse with a
3267                    // dedicated error that the HTTP layer maps to 409
3268                    // Conflict so the caller knows this isn't a retry
3269                    // opportunity.
3270                    return Err(ServerError::OperationFailed(format!(
3271                        "rotation conflict: kid {new_kid} already installed with a \
3272                         different secret. Either use a fresh kid or restore the \
3273                         original secret for this kid before retrying."
3274                    )));
3275                }
3276                None => {
3277                    // Torn write: current_kid=new_kid but secret:<new_kid>
3278                    // missing. Repair with the presented secret. Matches
3279                    // boot-init repair semantics.
3280                    self.client
3281                        .hset(key, &field, new_secret_hex)
3282                        .await
3283                        .map_err(|e| ServerError::ValkeyContext {
3284                            source: e,
3285                            context: format!("HSET {key} secret:<kid> (torn-write repair)"),
3286                        })?;
3287                    return Ok(());
3288                }
3289            }
3290        }
3291
3292        // Orphan GC: scan the entire expires_at:* namespace and HDEL both
3293        // expires_at:<kid> and secret:<kid> for every kid whose grace has
3294        // elapsed. Multi-kid validation (helpers.lua) accepts ANY kid
3295        // present with a future expires_at, so GC by recency (just
3296        // previous_kid) would leak historical kids forever AND — worse —
3297        // restricting validation to a two-slot window would violate the
3298        // grace contract on rapid rotation (A→B→C inside 24h kept A
3299        // accepted under the prior model only via previous_kid, which B
3300        // now claims).
3301        //
3302        // HGETALL the whole hash once (bounded: one entry per distinct
3303        // kid ever installed, plus scalar fields). Build the reaper set
3304        // in-memory, then one HDEL. Cheap.
3305        let full_hash: std::collections::HashMap<String, String> = self
3306            .client
3307            .hgetall(key)
3308            .await
3309            .map_err(|e| ServerError::ValkeyContext {
3310                source: e,
3311                context: format!("HGETALL {key} (orphan scan)"),
3312            })?;
3313        let now_ms = std::time::SystemTime::now()
3314            .duration_since(std::time::UNIX_EPOCH)
3315            .expect("system clock before unix epoch")
3316            .as_millis() as i64;
3317        let mut expired_kids: Vec<String> = Vec::new();
3318        for (field, value) in &full_hash {
3319            if let Some(kid) = field.strip_prefix("expires_at:") {
3320                let exp = value.parse::<i64>().unwrap_or(0);
3321                if exp <= 0 || exp <= now_ms {
3322                    expired_kids.push(kid.to_owned());
3323                }
3324            }
3325        }
3326        if !expired_kids.is_empty() {
3327            let mut hdel = self.client.cmd("HDEL").arg(key);
3328            for kid in &expired_kids {
3329                hdel = hdel.arg(format!("expires_at:{kid}")).arg(format!("secret:{kid}"));
3330            }
3331            let _: i64 = hdel.execute().await.map_err(|e| ServerError::ValkeyContext {
3332                source: e,
3333                context: format!("HDEL {key} expired kids (orphan GC)"),
3334            })?;
3335        }
3336
3337        // Promote current → previous: stamp expires_at:<current_kid> with
3338        // the new previous_expires_at. This is the per-kid grace window
3339        // that validate_waitpoint_token reads. previous_kid +
3340        // previous_expires_at scalars are kept in lockstep for audit log
3341        // observability (aggregated rotation event shape) and for any
3342        // downgrade path that still reads the two-slot scheme.
3343        //
3344        // INVARIANT: expires_at:<new_kid> is NEVER written — current_kid
3345        // has no expiry (always valid). It only gets an expires_at entry
3346        // when a FUTURE rotation promotes it out of current.
3347        let mut hset = self.client.cmd("HSET").arg(key);
3348        let prev_expires_str = previous_expires_at.to_string();
3349        let cur_owned;
3350        let outgoing_expires_field;
3351        if let Some(cur) = current_kid {
3352            cur_owned = cur;
3353            outgoing_expires_field = format!("expires_at:{cur_owned}");
3354            hset = hset
3355                .arg("previous_kid")
3356                .arg(cur_owned.as_str())
3357                .arg("previous_expires_at")
3358                .arg(prev_expires_str.as_str())
3359                .arg(outgoing_expires_field.as_str())
3360                .arg(prev_expires_str.as_str());
3361        }
3362        let secret_field = format!("secret:{new_kid}");
3363        let _: i64 = hset
3364            .arg("current_kid")
3365            .arg(new_kid)
3366            .arg(&secret_field)
3367            .arg(new_secret_hex)
3368            .execute()
3369            .await
3370            .map_err(|e| ServerError::ValkeyContext {
3371                source: e,
3372                context: format!("HSET {key} (rotate atomic)"),
3373            })?;
3374        Ok(())
3375    }
3376}
3377
3378/// Derive a per-partition rotation lock key that shares the hash tag of the
3379/// secrets key (so lock and hash live in the same cluster slot).
3380/// `ff:sec:{p:N}:waitpoint_hmac` → `ff:sec:{p:N}:rotate_lock`.
3381fn rotate_lock_key_for(secrets_key: &str) -> String {
3382    match secrets_key.rsplit_once(':') {
3383        Some((prefix, _last)) => format!("{prefix}:rotate_lock"),
3384        None => format!("{secrets_key}:rotate_lock"),
3385    }
3386}
3387
3388// ── FCALL result parsing ──
3389
3390fn parse_create_result(
3391    raw: &Value,
3392    execution_id: &ExecutionId,
3393) -> Result<CreateExecutionResult, ServerError> {
3394    let arr = match raw {
3395        Value::Array(arr) => arr,
3396        _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3397    };
3398
3399    let status = match arr.first() {
3400        Some(Ok(Value::Int(n))) => *n,
3401        _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3402    };
3403
3404    if status == 1 {
3405        // Check sub-status: OK or DUPLICATE
3406        let sub = arr
3407            .get(1)
3408            .and_then(|v| match v {
3409                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3410                Ok(Value::SimpleString(s)) => Some(s.clone()),
3411                _ => None,
3412            })
3413            .unwrap_or_default();
3414
3415        if sub == "DUPLICATE" {
3416            Ok(CreateExecutionResult::Duplicate {
3417                execution_id: execution_id.clone(),
3418            })
3419        } else {
3420            Ok(CreateExecutionResult::Created {
3421                execution_id: execution_id.clone(),
3422                public_state: PublicState::Waiting,
3423            })
3424        }
3425    } else {
3426        let error_code = arr
3427            .get(1)
3428            .and_then(|v| match v {
3429                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3430                Ok(Value::SimpleString(s)) => Some(s.clone()),
3431                _ => None,
3432            })
3433            .unwrap_or_else(|| "unknown".to_owned());
3434        Err(ServerError::OperationFailed(format!(
3435            "ff_create_execution failed: {error_code}"
3436        )))
3437    }
3438}
3439
3440fn parse_cancel_result(
3441    raw: &Value,
3442    execution_id: &ExecutionId,
3443) -> Result<CancelExecutionResult, ServerError> {
3444    let arr = match raw {
3445        Value::Array(arr) => arr,
3446        _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3447    };
3448
3449    let status = match arr.first() {
3450        Some(Ok(Value::Int(n))) => *n,
3451        _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3452    };
3453
3454    if status == 1 {
3455        Ok(CancelExecutionResult::Cancelled {
3456            execution_id: execution_id.clone(),
3457            public_state: PublicState::Cancelled,
3458        })
3459    } else {
3460        let error_code = arr
3461            .get(1)
3462            .and_then(|v| match v {
3463                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3464                Ok(Value::SimpleString(s)) => Some(s.clone()),
3465                _ => None,
3466            })
3467            .unwrap_or_else(|| "unknown".to_owned());
3468        Err(ServerError::OperationFailed(format!(
3469            "ff_cancel_execution failed: {error_code}"
3470        )))
3471    }
3472}
3473
3474fn parse_budget_create_result(
3475    raw: &Value,
3476    budget_id: &BudgetId,
3477) -> Result<CreateBudgetResult, ServerError> {
3478    let arr = match raw {
3479        Value::Array(arr) => arr,
3480        _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3481    };
3482
3483    let status = match arr.first() {
3484        Some(Ok(Value::Int(n))) => *n,
3485        _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3486    };
3487
3488    if status == 1 {
3489        let sub = arr
3490            .get(1)
3491            .and_then(|v| match v {
3492                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3493                Ok(Value::SimpleString(s)) => Some(s.clone()),
3494                _ => None,
3495            })
3496            .unwrap_or_default();
3497
3498        if sub == "ALREADY_SATISFIED" {
3499            Ok(CreateBudgetResult::AlreadySatisfied {
3500                budget_id: budget_id.clone(),
3501            })
3502        } else {
3503            Ok(CreateBudgetResult::Created {
3504                budget_id: budget_id.clone(),
3505            })
3506        }
3507    } else {
3508        let error_code = arr
3509            .get(1)
3510            .and_then(|v| match v {
3511                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3512                Ok(Value::SimpleString(s)) => Some(s.clone()),
3513                _ => None,
3514            })
3515            .unwrap_or_else(|| "unknown".to_owned());
3516        Err(ServerError::OperationFailed(format!(
3517            "ff_create_budget failed: {error_code}"
3518        )))
3519    }
3520}
3521
3522fn parse_quota_create_result(
3523    raw: &Value,
3524    quota_policy_id: &QuotaPolicyId,
3525) -> Result<CreateQuotaPolicyResult, ServerError> {
3526    let arr = match raw {
3527        Value::Array(arr) => arr,
3528        _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3529    };
3530
3531    let status = match arr.first() {
3532        Some(Ok(Value::Int(n))) => *n,
3533        _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3534    };
3535
3536    if status == 1 {
3537        let sub = arr
3538            .get(1)
3539            .and_then(|v| match v {
3540                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3541                Ok(Value::SimpleString(s)) => Some(s.clone()),
3542                _ => None,
3543            })
3544            .unwrap_or_default();
3545
3546        if sub == "ALREADY_SATISFIED" {
3547            Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3548                quota_policy_id: quota_policy_id.clone(),
3549            })
3550        } else {
3551            Ok(CreateQuotaPolicyResult::Created {
3552                quota_policy_id: quota_policy_id.clone(),
3553            })
3554        }
3555    } else {
3556        let error_code = arr
3557            .get(1)
3558            .and_then(|v| match v {
3559                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3560                Ok(Value::SimpleString(s)) => Some(s.clone()),
3561                _ => None,
3562            })
3563            .unwrap_or_else(|| "unknown".to_owned());
3564        Err(ServerError::OperationFailed(format!(
3565            "ff_create_quota_policy failed: {error_code}"
3566        )))
3567    }
3568}
3569
3570// ── Flow FCALL result parsing ──
3571
3572fn parse_create_flow_result(
3573    raw: &Value,
3574    flow_id: &FlowId,
3575) -> Result<CreateFlowResult, ServerError> {
3576    let arr = match raw {
3577        Value::Array(arr) => arr,
3578        _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3579    };
3580    let status = match arr.first() {
3581        Some(Ok(Value::Int(n))) => *n,
3582        _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3583    };
3584    if status == 1 {
3585        let sub = fcall_field_str(arr, 1);
3586        if sub == "ALREADY_SATISFIED" {
3587            Ok(CreateFlowResult::AlreadySatisfied {
3588                flow_id: flow_id.clone(),
3589            })
3590        } else {
3591            Ok(CreateFlowResult::Created {
3592                flow_id: flow_id.clone(),
3593            })
3594        }
3595    } else {
3596        let error_code = fcall_field_str(arr, 1);
3597        Err(ServerError::OperationFailed(format!(
3598            "ff_create_flow failed: {error_code}"
3599        )))
3600    }
3601}
3602
3603fn parse_add_execution_to_flow_result(
3604    raw: &Value,
3605) -> Result<AddExecutionToFlowResult, ServerError> {
3606    let arr = match raw {
3607        Value::Array(arr) => arr,
3608        _ => {
3609            return Err(ServerError::Script(
3610                "ff_add_execution_to_flow: expected Array".into(),
3611            ))
3612        }
3613    };
3614    let status = match arr.first() {
3615        Some(Ok(Value::Int(n))) => *n,
3616        _ => {
3617            return Err(ServerError::Script(
3618                "ff_add_execution_to_flow: bad status code".into(),
3619            ))
3620        }
3621    };
3622    if status == 1 {
3623        let sub = fcall_field_str(arr, 1);
3624        let eid_str = fcall_field_str(arr, 2);
3625        let nc_str = fcall_field_str(arr, 3);
3626        let eid = ExecutionId::parse(&eid_str)
3627            .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3628        let nc: u32 = nc_str.parse().unwrap_or(0);
3629        if sub == "ALREADY_SATISFIED" {
3630            Ok(AddExecutionToFlowResult::AlreadyMember {
3631                execution_id: eid,
3632                node_count: nc,
3633            })
3634        } else {
3635            Ok(AddExecutionToFlowResult::Added {
3636                execution_id: eid,
3637                new_node_count: nc,
3638            })
3639        }
3640    } else {
3641        let error_code = fcall_field_str(arr, 1);
3642        Err(ServerError::OperationFailed(format!(
3643            "ff_add_execution_to_flow failed: {error_code}"
3644        )))
3645    }
3646}
3647
3648/// Outcome of parsing a raw `ff_cancel_flow` FCALL response.
3649///
3650/// Keeps `AlreadyTerminal` distinct from other script errors so the caller
3651/// can treat cancel on an already-cancelled/completed/failed flow as
3652/// idempotent success instead of surfacing a 400 to the client.
3653enum ParsedCancelFlow {
3654    Cancelled {
3655        policy: String,
3656        member_execution_ids: Vec<String>,
3657    },
3658    AlreadyTerminal,
3659}
3660
3661/// Parse the raw `ff_cancel_flow` FCALL response.
3662///
3663/// Returns [`ParsedCancelFlow::Cancelled`] on success, [`ParsedCancelFlow::AlreadyTerminal`]
3664/// when the flow was already in a terminal state (idempotent retry), or a
3665/// [`ServerError`] for any other failure.
3666fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3667    let arr = match raw {
3668        Value::Array(arr) => arr,
3669        _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3670    };
3671    let status = match arr.first() {
3672        Some(Ok(Value::Int(n))) => *n,
3673        _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3674    };
3675    if status != 1 {
3676        let error_code = fcall_field_str(arr, 1);
3677        if error_code == "flow_already_terminal" {
3678            return Ok(ParsedCancelFlow::AlreadyTerminal);
3679        }
3680        return Err(ServerError::OperationFailed(format!(
3681            "ff_cancel_flow failed: {error_code}"
3682        )));
3683    }
3684    // {1, "OK", cancellation_policy, member1, member2, ...}
3685    let policy = fcall_field_str(arr, 2);
3686    // Iterate to arr.len() rather than breaking on the first empty string —
3687    // safer against malformed Lua responses and clearer than a sentinel loop.
3688    let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3689    for i in 3..arr.len() {
3690        members.push(fcall_field_str(arr, i));
3691    }
3692    Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3693}
3694
3695fn parse_stage_dependency_edge_result(
3696    raw: &Value,
3697) -> Result<StageDependencyEdgeResult, ServerError> {
3698    let arr = match raw {
3699        Value::Array(arr) => arr,
3700        _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3701    };
3702    let status = match arr.first() {
3703        Some(Ok(Value::Int(n))) => *n,
3704        _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3705    };
3706    if status == 1 {
3707        let edge_id_str = fcall_field_str(arr, 2);
3708        let rev_str = fcall_field_str(arr, 3);
3709        let edge_id = EdgeId::parse(&edge_id_str)
3710            .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3711        let rev: u64 = rev_str.parse().unwrap_or(0);
3712        Ok(StageDependencyEdgeResult::Staged {
3713            edge_id,
3714            new_graph_revision: rev,
3715        })
3716    } else {
3717        let error_code = fcall_field_str(arr, 1);
3718        Err(ServerError::OperationFailed(format!(
3719            "ff_stage_dependency_edge failed: {error_code}"
3720        )))
3721    }
3722}
3723
3724fn parse_apply_dependency_result(
3725    raw: &Value,
3726) -> Result<ApplyDependencyToChildResult, ServerError> {
3727    let arr = match raw {
3728        Value::Array(arr) => arr,
3729        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3730    };
3731    let status = match arr.first() {
3732        Some(Ok(Value::Int(n))) => *n,
3733        _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3734    };
3735    if status == 1 {
3736        let sub = fcall_field_str(arr, 1);
3737        if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3738            Ok(ApplyDependencyToChildResult::AlreadyApplied)
3739        } else {
3740            // OK status — field at index 2 is unsatisfied count
3741            let count_str = fcall_field_str(arr, 2);
3742            let count: u32 = count_str.parse().unwrap_or(0);
3743            Ok(ApplyDependencyToChildResult::Applied {
3744                unsatisfied_count: count,
3745            })
3746        }
3747    } else {
3748        let error_code = fcall_field_str(arr, 1);
3749        Err(ServerError::OperationFailed(format!(
3750            "ff_apply_dependency_to_child failed: {error_code}"
3751        )))
3752    }
3753}
3754
3755fn parse_deliver_signal_result(
3756    raw: &Value,
3757    signal_id: &SignalId,
3758) -> Result<DeliverSignalResult, ServerError> {
3759    let arr = match raw {
3760        Value::Array(arr) => arr,
3761        _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3762    };
3763    let status = match arr.first() {
3764        Some(Ok(Value::Int(n))) => *n,
3765        _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3766    };
3767    if status == 1 {
3768        let sub = fcall_field_str(arr, 1);
3769        if sub == "DUPLICATE" {
3770            // ok_duplicate(existing_signal_id) → {1, "DUPLICATE", existing_signal_id}
3771            let existing_str = fcall_field_str(arr, 2);
3772            let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3773            Ok(DeliverSignalResult::Duplicate {
3774                existing_signal_id: existing_id,
3775            })
3776        } else {
3777            // ok(signal_id, effect) → {1, "OK", signal_id, effect}
3778            let effect = fcall_field_str(arr, 3);
3779            Ok(DeliverSignalResult::Accepted {
3780                signal_id: signal_id.clone(),
3781                effect,
3782            })
3783        }
3784    } else {
3785        let error_code = fcall_field_str(arr, 1);
3786        Err(ServerError::OperationFailed(format!(
3787            "ff_deliver_signal failed: {error_code}"
3788        )))
3789    }
3790}
3791
3792fn parse_change_priority_result(
3793    raw: &Value,
3794    execution_id: &ExecutionId,
3795) -> Result<ChangePriorityResult, ServerError> {
3796    let arr = match raw {
3797        Value::Array(arr) => arr,
3798        _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3799    };
3800    let status = match arr.first() {
3801        Some(Ok(Value::Int(n))) => *n,
3802        _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3803    };
3804    if status == 1 {
3805        Ok(ChangePriorityResult::Changed {
3806            execution_id: execution_id.clone(),
3807        })
3808    } else {
3809        let error_code = fcall_field_str(arr, 1);
3810        Err(ServerError::OperationFailed(format!(
3811            "ff_change_priority failed: {error_code}"
3812        )))
3813    }
3814}
3815
3816fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3817    let arr = match raw {
3818        Value::Array(arr) => arr,
3819        _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3820    };
3821    let status = match arr.first() {
3822        Some(Ok(Value::Int(n))) => *n,
3823        _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3824    };
3825    if status == 1 {
3826        // ok("0") for normal replay, ok(N) for skipped flow member
3827        let unsatisfied = fcall_field_str(arr, 2);
3828        let ps = if unsatisfied == "0" {
3829            PublicState::Waiting
3830        } else {
3831            PublicState::WaitingChildren
3832        };
3833        Ok(ReplayExecutionResult::Replayed { public_state: ps })
3834    } else {
3835        let error_code = fcall_field_str(arr, 1);
3836        Err(ServerError::OperationFailed(format!(
3837            "ff_replay_execution failed: {error_code}"
3838        )))
3839    }
3840}
3841
3842/// Extract a string from an FCALL result array at the given index.
3843/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
3844/// for transport-level variants. Business-logic variants keep their code as
3845/// `ServerError::Script(String)` so HTTP clients see a stable message.
3846///
3847/// Why this exists: before R2, the stream handlers did
3848/// `ScriptError → format!() → ServerError::Script(String)`, which erased
3849/// the ErrorKind and made `ServerError::is_retryable()` always return
3850/// false. Retry-capable clients (cairn-fabric) would not retry a legit
3851/// transient error like `IoError`.
3852fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3853    match e {
3854        ff_script::error::ScriptError::Valkey(valkey_err) => ServerError::ValkeyContext {
3855            source: valkey_err,
3856            context: "stream FCALL transport".into(),
3857        },
3858        other => ServerError::Script(other.to_string()),
3859    }
3860}
3861
3862fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3863    match arr.get(index) {
3864        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3865        Some(Ok(Value::SimpleString(s))) => s.clone(),
3866        Some(Ok(Value::Int(n))) => n.to_string(),
3867        _ => String::new(),
3868    }
3869}
3870
3871/// Parse ff_report_usage_and_check result.
3872/// Standard format: {1, "OK"}, {1, "SOFT_BREACH", dim, current, limit},
3873///                  {1, "HARD_BREACH", dim, current, limit}, {1, "ALREADY_APPLIED"}
3874fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3875    let arr = match raw {
3876        Value::Array(arr) => arr,
3877        _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3878    };
3879    let status_code = match arr.first() {
3880        Some(Ok(Value::Int(n))) => *n,
3881        _ => {
3882            return Err(ServerError::Script(
3883                "ff_report_usage_and_check: expected Int status code".into(),
3884            ))
3885        }
3886    };
3887    if status_code != 1 {
3888        let error_code = fcall_field_str(arr, 1);
3889        return Err(ServerError::OperationFailed(format!(
3890            "ff_report_usage_and_check failed: {error_code}"
3891        )));
3892    }
3893    let sub_status = fcall_field_str(arr, 1);
3894    match sub_status.as_str() {
3895        "OK" => Ok(ReportUsageResult::Ok),
3896        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3897        "SOFT_BREACH" => {
3898            let dim = fcall_field_str(arr, 2);
3899            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3900            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3901            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3902        }
3903        "HARD_BREACH" => {
3904            let dim = fcall_field_str(arr, 2);
3905            let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3906            let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3907            Ok(ReportUsageResult::HardBreach {
3908                dimension: dim,
3909                current_usage: current,
3910                hard_limit: limit,
3911            })
3912        }
3913        _ => Err(ServerError::OperationFailed(format!(
3914            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3915        ))),
3916    }
3917}
3918
3919fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3920    let arr = match raw {
3921        Value::Array(arr) => arr,
3922        _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3923    };
3924    let status = match arr.first() {
3925        Some(Ok(Value::Int(n))) => *n,
3926        _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3927    };
3928    if status == 1 {
3929        let sub = fcall_field_str(arr, 1);
3930        if sub == "ALREADY_SATISFIED" {
3931            let reason = fcall_field_str(arr, 2);
3932            Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3933        } else {
3934            let lid = fcall_field_str(arr, 2);
3935            let epoch = fcall_field_str(arr, 3);
3936            Ok(RevokeLeaseResult::Revoked {
3937                lease_id: lid,
3938                lease_epoch: epoch,
3939            })
3940        }
3941    } else {
3942        let error_code = fcall_field_str(arr, 1);
3943        Err(ServerError::OperationFailed(format!(
3944            "ff_revoke_lease failed: {error_code}"
3945        )))
3946    }
3947}
3948
3949/// Detect Valkey errors indicating the Lua function library is not loaded.
3950///
3951/// After a failover, the new primary may not have the library if replication
3952/// was incomplete. Valkey returns `ERR Function not loaded` for FCALL calls
3953/// targeting missing functions.
3954fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3955    if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
3956        return true;
3957    }
3958    e.detail()
3959        .map(|d| {
3960            d.contains("Function not loaded")
3961                || d.contains("No matching function")
3962                || d.contains("function not found")
3963        })
3964        .unwrap_or(false)
3965        || e.to_string().contains("Function not loaded")
3966}
3967
3968/// Free-function form of [`Server::fcall_with_reload`] — callable from
3969/// background tasks that own a cloned `Client` but no `&Server`.
3970async fn fcall_with_reload_on_client(
3971    client: &Client,
3972    function: &str,
3973    keys: &[&str],
3974    args: &[&str],
3975) -> Result<Value, ServerError> {
3976    match client.fcall(function, keys, args).await {
3977        Ok(v) => Ok(v),
3978        Err(e) if is_function_not_loaded(&e) => {
3979            tracing::warn!(function, "Lua library not found on server, reloading");
3980            ff_script::loader::ensure_library(client)
3981                .await
3982                .map_err(ServerError::LibraryLoad)?;
3983            client
3984                .fcall(function, keys, args)
3985                .await
3986                .map_err(ServerError::Valkey)
3987        }
3988        Err(e) => Err(ServerError::Valkey(e)),
3989    }
3990}
3991
3992/// Build the `ff_cancel_execution` KEYS (21) and ARGV (5) by pre-reading
3993/// dynamic fields from `exec_core`. Shared by [`Server::cancel_execution`]
3994/// and the async cancel_flow member-dispatch path.
3995async fn build_cancel_execution_fcall(
3996    client: &Client,
3997    partition_config: &PartitionConfig,
3998    args: &CancelExecutionArgs,
3999) -> Result<(Vec<String>, Vec<String>), ServerError> {
4000    let partition = execution_partition(&args.execution_id, partition_config);
4001    let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4002    let idx = IndexKeys::new(&partition);
4003
4004    let lane_str: Option<String> = client
4005        .hget(&ctx.core(), "lane_id")
4006        .await
4007        .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
4008    let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4009
4010    let dyn_fields: Vec<Option<String>> = client
4011        .cmd("HMGET")
4012        .arg(ctx.core())
4013        .arg("current_attempt_index")
4014        .arg("current_waitpoint_id")
4015        .arg("current_worker_instance_id")
4016        .execute()
4017        .await
4018        .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET cancel pre-read".into() })?;
4019
4020    let att_idx_val = dyn_fields.first()
4021        .and_then(|v| v.as_ref())
4022        .and_then(|s| s.parse::<u32>().ok())
4023        .unwrap_or(0);
4024    let att_idx = AttemptIndex::new(att_idx_val);
4025    let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4026    let wp_id = if wp_id_str.is_empty() {
4027        WaitpointId::new()
4028    } else {
4029        WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4030    };
4031    let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4032    let wiid = WorkerInstanceId::new(&wiid_str);
4033
4034    let keys: Vec<String> = vec![
4035        ctx.core(),                              // 1
4036        ctx.attempt_hash(att_idx),               // 2
4037        ctx.stream_meta(att_idx),                // 3
4038        ctx.lease_current(),                     // 4
4039        ctx.lease_history(),                     // 5
4040        idx.lease_expiry(),                      // 6
4041        idx.worker_leases(&wiid),                // 7
4042        ctx.suspension_current(),                // 8
4043        ctx.waitpoint(&wp_id),                   // 9
4044        ctx.waitpoint_condition(&wp_id),         // 10
4045        idx.suspension_timeout(),                // 11
4046        idx.lane_terminal(&lane),                // 12
4047        idx.attempt_timeout(),                   // 13
4048        idx.execution_deadline(),                // 14
4049        idx.lane_eligible(&lane),                // 15
4050        idx.lane_delayed(&lane),                 // 16
4051        idx.lane_blocked_dependencies(&lane),    // 17
4052        idx.lane_blocked_budget(&lane),          // 18
4053        idx.lane_blocked_quota(&lane),           // 19
4054        idx.lane_blocked_route(&lane),           // 20
4055        idx.lane_blocked_operator(&lane),        // 21
4056    ];
4057    let argv: Vec<String> = vec![
4058        args.execution_id.to_string(),
4059        args.reason.clone(),
4060        args.source.to_string(),
4061        args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4062        args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4063    ];
4064    Ok((keys, argv))
4065}
4066
4067/// Backoff schedule for transient Valkey errors during async cancel_flow
4068/// dispatch. Length = retry-attempt count (including the initial attempt).
4069/// The last entry is not slept on because it's the final attempt.
4070const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4071
4072/// Reach into a `ServerError` for a `ferriskey::ErrorKind` when one is
4073/// available. Matches the variants that can carry a transport-layer kind:
4074/// direct `Valkey`, `ValkeyContext`, and `LibraryLoad` (which itself wraps
4075/// a `ferriskey::Error`).
4076fn extract_valkey_kind(e: &ServerError) -> Option<ferriskey::ErrorKind> {
4077    match e {
4078        ServerError::Valkey(err) | ServerError::ValkeyContext { source: err, .. } => {
4079            Some(err.kind())
4080        }
4081        ServerError::LibraryLoad(load_err) => load_err.valkey_kind(),
4082        _ => None,
4083    }
4084}
4085
4086/// Cancel a single member execution from a cancel_flow dispatch context.
4087/// Parses the flow-member EID string, builds the FCALL via the shared helper,
4088/// and executes with the same reload-on-failover semantics as the inline path.
4089///
4090/// Wrapped in a bounded retry loop (see [`CANCEL_MEMBER_RETRY_DELAYS_MS`]) so
4091/// that transient Valkey errors mid-dispatch (failover, `TryAgain`,
4092/// `ClusterDown`, `IoError`, `FatalSendError`) do not silently leak
4093/// non-cancelled members. `FatalReceiveError` and non-retryable kinds bubble
4094/// up immediately — those either indicate the Lua ran server-side anyway or a
4095/// permanent mismatch that retries cannot fix.
4096async fn cancel_member_execution(
4097    client: &Client,
4098    partition_config: &PartitionConfig,
4099    eid_str: &str,
4100    reason: &str,
4101    now: TimestampMs,
4102) -> Result<(), ServerError> {
4103    let execution_id = ExecutionId::parse(eid_str)
4104        .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4105    let args = CancelExecutionArgs {
4106        execution_id: execution_id.clone(),
4107        reason: reason.to_owned(),
4108        source: CancelSource::OperatorOverride,
4109        lease_id: None,
4110        lease_epoch: None,
4111        attempt_id: None,
4112        now,
4113    };
4114
4115    let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4116    for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4117        let is_last = attempt_idx + 1 == attempts;
4118        match try_cancel_member_once(client, partition_config, &args).await {
4119            Ok(()) => return Ok(()),
4120            Err(e) => {
4121                // Only retry transport-layer transients; business-logic
4122                // errors (Script / OperationFailed / NotFound / InvalidInput)
4123                // won't change on retry.
4124                let retryable = extract_valkey_kind(&e)
4125                    .map(ff_script::retry::is_retryable_kind)
4126                    .unwrap_or(false);
4127                if !retryable || is_last {
4128                    return Err(e);
4129                }
4130                tracing::debug!(
4131                    execution_id = %execution_id,
4132                    attempt = attempt_idx + 1,
4133                    delay_ms = *delay_ms,
4134                    error = %e,
4135                    "cancel_member_execution: transient error, retrying"
4136                );
4137                tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4138            }
4139        }
4140    }
4141    // Unreachable: the loop above either returns Ok, returns Err on the
4142    // last attempt, or returns Err on a non-retryable error. Keep a
4143    // defensive fallback for future edits to the retry structure.
4144    Err(ServerError::OperationFailed(format!(
4145        "cancel_member_execution: retries exhausted for {execution_id}"
4146    )))
4147}
4148
4149/// Single cancel attempt — pre-read + FCALL + parse. Factored out so the
4150/// retry loop in [`cancel_member_execution`] can invoke it cleanly.
4151async fn try_cancel_member_once(
4152    client: &Client,
4153    partition_config: &PartitionConfig,
4154    args: &CancelExecutionArgs,
4155) -> Result<(), ServerError> {
4156    let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4157    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4158    let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4159    let raw =
4160        fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4161    parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4162}
4163
4164fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4165    let arr = match raw {
4166        Value::Array(arr) => arr,
4167        _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4168    };
4169    let status = match arr.first() {
4170        Some(Ok(Value::Int(n))) => *n,
4171        _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4172    };
4173    if status == 1 {
4174        let next_str = fcall_field_str(arr, 2);
4175        let next_ms: i64 = next_str.parse().unwrap_or(0);
4176        Ok(ResetBudgetResult::Reset {
4177            next_reset_at: TimestampMs::from_millis(next_ms),
4178        })
4179    } else {
4180        let error_code = fcall_field_str(arr, 1);
4181        Err(ServerError::OperationFailed(format!(
4182            "ff_reset_budget failed: {error_code}"
4183        )))
4184    }
4185}
4186
4187#[cfg(test)]
4188mod tests {
4189    use super::*;
4190    use ferriskey::ErrorKind;
4191
4192    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4193        ferriskey::Error::from((kind, "synthetic"))
4194    }
4195
4196    #[test]
4197    fn is_retryable_valkey_variant_uses_kind_table() {
4198        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
4199        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4200        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4201        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4202        assert!(ServerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4203
4204        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4205        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4206        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4207        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
4208        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Ask)).is_retryable());
4209        assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4210    }
4211
4212    #[test]
4213    fn is_retryable_valkey_context_uses_kind_table() {
4214        let err = ServerError::ValkeyContext {
4215            source: mk_fk_err(ErrorKind::IoError),
4216            context: "HGET test".into(),
4217        };
4218        assert!(err.is_retryable());
4219
4220        let err = ServerError::ValkeyContext {
4221            source: mk_fk_err(ErrorKind::AuthenticationFailed),
4222            context: "auth".into(),
4223        };
4224        assert!(!err.is_retryable());
4225    }
4226
4227    #[test]
4228    fn is_retryable_library_load_delegates_to_inner_kind() {
4229        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4230            mk_fk_err(ErrorKind::IoError),
4231        ));
4232        assert!(err.is_retryable());
4233
4234        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4235            mk_fk_err(ErrorKind::AuthenticationFailed),
4236        ));
4237        assert!(!err.is_retryable());
4238
4239        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4240            expected: "1".into(),
4241            got: "2".into(),
4242        });
4243        assert!(!err.is_retryable());
4244    }
4245
4246    #[test]
4247    fn is_retryable_business_logic_variants_are_false() {
4248        assert!(!ServerError::NotFound("x".into()).is_retryable());
4249        assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4250        assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4251        assert!(!ServerError::Script("x".into()).is_retryable());
4252        assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4253    }
4254
4255    #[test]
4256    fn valkey_kind_delegates_through_library_load() {
4257        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4258            mk_fk_err(ErrorKind::ClusterDown),
4259        ));
4260        assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
4261
4262        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4263            expected: "1".into(),
4264            got: "2".into(),
4265        });
4266        assert_eq!(err.valkey_kind(), None);
4267    }
4268
4269    // ── Valkey version check (RFC-011 §13) ──
4270
4271    #[test]
4272    fn parse_valkey_major_prefers_valkey_version_over_redis_version() {
4273        // Valkey 8.x+ pins redis_version to 7.2.4 for Redis-client compat
4274        // and exposes the real version in valkey_version. Parser must use
4275        // valkey_version when both are present.
4276        let info = "\
4277# Server\r\n\
4278redis_version:7.2.4\r\n\
4279valkey_version:9.0.3\r\n\
4280server_mode:cluster\r\n\
4281os:Linux\r\n";
4282        assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4283    }
4284
4285    #[test]
4286    fn parse_valkey_major_real_valkey_8_cluster_body() {
4287        // Actual INFO server response observed on valkey/valkey:latest in
4288        // cluster mode (CI matrix): redis_version compat-pinned to 7.2.4,
4289        // valkey_version authoritative at 9.0.3.
4290        let info = "\
4291# Server\r\n\
4292redis_version:7.2.4\r\n\
4293server_name:valkey\r\n\
4294valkey_version:9.0.3\r\n\
4295valkey_release_stage:ga\r\n\
4296redis_git_sha1:00000000\r\n\
4297server_mode:cluster\r\n";
4298        assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4299    }
4300
4301    #[test]
4302    fn parse_valkey_major_falls_back_to_redis_version_on_valkey_7() {
4303        // Valkey 7.x doesn't emit valkey_version; parser falls back.
4304        let info = "# Server\r\nredis_version:7.2.4\r\nfoo:bar\r\n";
4305        assert_eq!(parse_valkey_major_version(info).unwrap(), 7);
4306    }
4307
4308    #[test]
4309    fn parse_valkey_major_errors_when_no_version_field() {
4310        let info = "# Server\r\nfoo:bar\r\n";
4311        let err = parse_valkey_major_version(info).unwrap_err();
4312        assert!(matches!(err, ServerError::OperationFailed(_)));
4313        assert!(
4314            err.to_string().contains("missing"),
4315            "expected 'missing' in message, got: {err}"
4316        );
4317    }
4318
4319    #[test]
4320    fn parse_valkey_major_errors_on_non_numeric() {
4321        let info = "valkey_version:invalid.x.y\n";
4322        let err = parse_valkey_major_version(info).unwrap_err();
4323        assert!(matches!(err, ServerError::OperationFailed(_)));
4324        assert!(err.to_string().contains("non-numeric major"));
4325    }
4326
4327    #[test]
4328    fn extract_info_body_unwraps_cluster_map() {
4329        // Simulates cluster-mode INFO response: map of node_addr → body.
4330        // extract_info_body picks any entry's body.
4331        let body_text = "\
4332# Server\r\n\
4333redis_version:7.2.4\r\n\
4334valkey_version:9.0.3\r\n";
4335        let node_entry = (
4336            Value::SimpleString("127.0.0.1:7000".to_string()),
4337            Value::VerbatimString {
4338                format: ferriskey::value::VerbatimFormat::Text,
4339                text: body_text.to_string(),
4340            },
4341        );
4342        let map = Value::Map(vec![node_entry]);
4343        let body = extract_info_body(&map).unwrap();
4344        assert_eq!(body, body_text);
4345        assert_eq!(parse_valkey_major_version(&body).unwrap(), 9);
4346    }
4347
4348    #[test]
4349    fn extract_info_body_handles_simple_string() {
4350        let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4351        let v = Value::SimpleString(body_text.to_string());
4352        let body = extract_info_body(&v).unwrap();
4353        assert_eq!(body, body_text);
4354    }
4355
4356    #[test]
4357    fn valkey_version_too_low_is_not_retryable() {
4358        let err = ServerError::ValkeyVersionTooLow {
4359            detected: "7".into(),
4360            required: "8.0".into(),
4361        };
4362        assert!(!err.is_retryable());
4363        assert_eq!(err.valkey_kind(), None);
4364    }
4365
4366    #[test]
4367    fn valkey_version_too_low_error_message_includes_both_versions() {
4368        let err = ServerError::ValkeyVersionTooLow {
4369            detected: "7".into(),
4370            required: "8.0".into(),
4371        };
4372        let msg = err.to_string();
4373        assert!(msg.contains("7"), "detected version in message: {msg}");
4374        assert!(msg.contains("8.0"), "required version in message: {msg}");
4375        assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4376    }
4377}