Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43///     if let Some(task) = worker.claim_next().await? {
44///         // Process task...
45///         task.complete(Some(b"result".to_vec())).await?;
46///     } else {
47///         tokio::time::sleep(Duration::from_secs(1)).await;
48///     }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52    client: Client,
53    config: WorkerConfig,
54    partition_config: PartitionConfig,
55    /// Sorted, deduplicated, comma-separated capabilities — computed once
56    /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57    /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58    /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59    /// reproducible logs and tests.
60    #[cfg(feature = "direct-valkey-claim")]
61    worker_capabilities_csv: String,
62    /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63    /// per-mismatch logs so the 4KB CSV never echoes on every reject
64    /// during an incident. Full CSV logged once at connect-time WARN for
65    /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66    #[cfg(feature = "direct-valkey-claim")]
67    worker_capabilities_hash: String,
68    #[cfg(feature = "direct-valkey-claim")]
69    lane_index: AtomicUsize,
70    /// Concurrency cap for in-flight tasks. Permits are acquired or
71    /// transferred by [`claim_next`] (feature-gated),
72    /// [`claim_from_grant`] (always available), and
73    /// [`claim_from_reclaim_grant`], transferred to the returned
74    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75    /// Holds `max_concurrent_tasks` permits total.
76    ///
77    /// [`claim_next`]: FlowFabricWorker::claim_next
78    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80    concurrency_semaphore: Arc<Semaphore>,
81    /// Rolling offset for chunked partition scans. Each poll advances the
82    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83    /// chunk)` polls every partition is covered. The initial value is
84    /// derived from `worker_instance_id` so idle workers spread their
85    /// scans across different partitions from the first poll onward.
86    ///
87    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91    /// correctness because the sequence simply restarts a new cycle.
92    #[cfg(feature = "direct-valkey-claim")]
93    scan_cursor: AtomicUsize,
94}
95
96/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
97/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
98/// O(num_flow_partitions).
99#[cfg(feature = "direct-valkey-claim")]
100const PARTITION_SCAN_CHUNK: usize = 32;
101
102impl FlowFabricWorker {
103    /// Connect to Valkey and prepare the worker.
104    ///
105    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
106    /// library — that is the server's responsibility (ff-server calls
107    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
108    /// the library is already loaded.
109    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
110        if config.lanes.is_empty() {
111            return Err(SdkError::Config("at least one lane is required".into()));
112        }
113
114        let mut builder = ClientBuilder::new()
115            .host(&config.host, config.port)
116            .connect_timeout(Duration::from_secs(10))
117            .request_timeout(Duration::from_millis(5000));
118        if config.tls {
119            builder = builder.tls();
120        }
121        if config.cluster {
122            builder = builder.cluster();
123        }
124        let client = builder.build()
125            .await
126            .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
127
128        // Verify connectivity
129        let pong: String = client
130            .cmd("PING")
131            .execute()
132            .await
133            .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
134        if pong != "PONG" {
135            return Err(SdkError::Config(format!(
136                "unexpected PING response: {pong}"
137            )));
138        }
139
140        // Guard against two worker processes sharing the same
141        // `worker_instance_id`. A duplicate instance would clobber each
142        // other's lease_current/active_index entries and double-claim work.
143        // SET NX on a liveness key with 2× lease TTL; if the key already
144        // exists another process is live. The key auto-expires if this
145        // process crashes without renewal, so a restart after a hard crash
146        // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
147        //
148        // Known limitations of this minimal scheme (documented for operators):
149        //   1. **Startup-only, not runtime.** There is no heartbeat renewal
150        //      path. After `2 × lease_ttl_ms` elapses the alive key expires
151        //      naturally even while this worker is still running, and a
152        //      second process with the same `worker_instance_id` launched
153        //      later will successfully SET NX alongside the first. The check
154        //      catches misconfiguration at boot; it does not fence duplicates
155        //      that appear mid-lifetime. Production deployments should rely
156        //      on the orchestrator (Kubernetes, systemd unit with
157        //      `Restart=on-failure`, etc.) as the authoritative single-
158        //      instance enforcer; this SET NX is belt-and-suspenders.
159        //
160        //   2. **Restart delay after a crash.** If a worker crashes
161        //      ungracefully (SIGKILL, container OOM) and is restarted within
162        //      `2 × lease_ttl_ms`, the alive key is still present and the
163        //      new process exits with `SdkError::Config("duplicate
164        //      worker_instance_id ...")`. Options for operators:
165        //        - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
166        //          before restarting.
167        //        - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
168        //          unblock the restart.
169        //        - Use a fresh `worker_instance_id` for the restart (the
170        //          orchestrator should already do this per-Pod).
171        //
172        //   3. **No graceful cleanup on shutdown.** There is no explicit
173        //      `disconnect()` call that DELs the alive key. On clean
174        //      `SIGTERM` the key lingers until its TTL expires. A follow-up
175        //      can add `FlowFabricWorker::disconnect(self)` for callers that
176        //      want to skip the restart-delay window.
177        let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
178        let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
179        let set_result: Option<String> = client
180            .cmd("SET")
181            .arg(&alive_key)
182            .arg("1")
183            .arg("NX")
184            .arg("PX")
185            .arg(alive_ttl_ms.to_string().as_str())
186            .execute()
187            .await
188            .map_err(|e| SdkError::ValkeyContext {
189                source: e,
190                context: "SET NX worker alive key".into(),
191            })?;
192        if set_result.is_none() {
193            return Err(SdkError::Config(format!(
194                "duplicate worker_instance_id '{}': another process already holds {alive_key}",
195                config.worker_instance_id
196            )));
197        }
198
199        // Read partition config from Valkey (set by ff-server on startup).
200        // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
201        let partition_config = read_partition_config(&client).await
202            .unwrap_or_else(|e| {
203                tracing::warn!(
204                    error = %e,
205                    "ff:config:partitions not found, using defaults"
206                );
207                PartitionConfig::default()
208            });
209
210        let max_tasks = config.max_concurrent_tasks.max(1);
211        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
212
213        tracing::info!(
214            worker_id = %config.worker_id,
215            instance_id = %config.worker_instance_id,
216            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
217            "FlowFabricWorker connected"
218        );
219
220        #[cfg(feature = "direct-valkey-claim")]
221        let scan_cursor_init = scan_cursor_seed(
222            config.worker_instance_id.as_str(),
223            partition_config.num_flow_partitions.max(1) as usize,
224        );
225
226        // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
227        // and deduplicates in one pass; string joining happens once here.
228        //
229        // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
230        //   - `,` is the CSV delimiter; a token containing one would split
231        //     mid-parse and could let a {"gpu"} worker appear to satisfy
232        //     {"gpu,cuda"} (silent auth bypass).
233        //   - Empty strings would produce leading/adjacent commas on the
234        //     wire and inflate token count for no semantic reason.
235        //   - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
236        //     `"gpu\n"` vs `"gpu"` produce silent mismatches that are
237        //     miserable to debug. Reject anything outside printable ASCII
238        //     excluding space (`'!'..='~'`) at ingress so a typo fails
239        //     loudly at connect instead of silently mis-routing forever.
240        // Reject at boot so operator misconfig is loud, symmetric with the
241        // scheduler path.
242        #[cfg(feature = "direct-valkey-claim")]
243        for cap in &config.capabilities {
244            if cap.is_empty() {
245                return Err(SdkError::Config(
246                    "capability token must not be empty".into(),
247                ));
248            }
249            if cap.contains(',') {
250                return Err(SdkError::Config(format!(
251                    "capability token may not contain ',' (CSV delimiter): {cap:?}"
252                )));
253            }
254            // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
255            // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
256            // characters above 0x7F are ALLOWED so i18n caps like
257            // "东京-gpu" can be used. The CSV wire form is byte-safe for
258            // multibyte UTF-8 because `,` is always a single byte and
259            // never part of a multibyte continuation (only 0x80-0xBF are
260            // continuations, ',' is 0x2C).
261            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
262                return Err(SdkError::Config(format!(
263                    "capability token must not contain whitespace or control characters: {cap:?}"
264                )));
265            }
266        }
267        #[cfg(feature = "direct-valkey-claim")]
268        let worker_capabilities_csv: String = {
269            let set: std::collections::BTreeSet<&str> = config
270                .capabilities
271                .iter()
272                .map(|s| s.as_str())
273                .filter(|s| !s.is_empty())
274                .collect();
275            if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
276                return Err(SdkError::Config(format!(
277                    "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
278                    ff_core::policy::CAPS_MAX_TOKENS,
279                    set.len()
280                )));
281            }
282            let csv = set.into_iter().collect::<Vec<_>>().join(",");
283            if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
284                return Err(SdkError::Config(format!(
285                    "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
286                    ff_core::policy::CAPS_MAX_BYTES,
287                    csv.len()
288                )));
289            }
290            csv
291        };
292
293        // Short stable digest of the sorted caps CSV, computed once so
294        // per-mismatch logs carry a stable identifier instead of the 4KB
295        // CSV. Shared helper — ff-scheduler uses the same one for its
296        // own per-mismatch logs, so cross-component log lines are
297        // diffable against each other.
298        #[cfg(feature = "direct-valkey-claim")]
299        let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
300
301        // Full CSV logged once at connect so per-mismatch logs (which
302        // carry only the 8-hex hash) can be cross-referenced by ops.
303        #[cfg(feature = "direct-valkey-claim")]
304        if !worker_capabilities_csv.is_empty() {
305            tracing::info!(
306                worker_instance_id = %config.worker_instance_id,
307                worker_caps_hash = %worker_capabilities_hash,
308                worker_caps = %worker_capabilities_csv,
309                "worker connected with capabilities (full CSV — mismatch logs use hash only)"
310            );
311        }
312
313        // Non-authoritative advertisement of caps for operator visibility
314        // (CLI introspection, dashboards). The AUTHORITATIVE source for
315        // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
316        // that, never this string. Lossy here is correctness-safe.
317        //
318        // Storage: a single STRING key holding the sorted CSV. Rationale:
319        //   * **Atomic overwrite.** `SET` is a single command — a concurrent
320        //     reader can never observe a transient empty value (the prior
321        //     DEL+SADD pair had that window).
322        //   * **Crash cleanup without refresh loop.** The alive-key SET NX
323        //     is startup-only (see §1 above); there's no periodic renew
324        //     to piggy-back on, so a TTL on caps would independently
325        //     expire mid-flight and hide a live worker's caps from ops
326        //     tools. Instead we drop the TTL: each reconnect overwrites;
327        //     a crashed worker leaves a stale CSV until a new process
328        //     with the same worker_instance_id boots (which triggers
329        //     `duplicate worker_instance_id` via alive-key guard anyway —
330        //     the orchestrator allocates a new id, and operators can DEL
331        //     the stale caps key if they care).
332        //   * **Empty caps = DEL.** A restart from {gpu} to {} clears the
333        //     advertisement rather than leaving stale data.
334        // Cluster-safe advertisement: the per-worker caps STRING lives at
335        // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
336        // and the INSTANCE ID is SADD'd to the global workers-index SET
337        // `ff:idx:workers` (single slot). The unblock scanner's cluster
338        // enumeration uses SMEMBERS on the index + per-member GET on each
339        // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
340        // cluster mode only scans the shard the SCAN lands on and misses
341        // workers whose key hashes elsewhere). Pattern mirrors Batch A
342        // `budget_policies_index` / `flow_index` / `deps_all_edges`:
343        // operations stay atomic per command, the index is the
344        // cluster-wide enumeration surface.
345        #[cfg(feature = "direct-valkey-claim")]
346        {
347            let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
348            let index_key = ff_core::keys::workers_index_key();
349            let instance_id = config.worker_instance_id.to_string();
350            if worker_capabilities_csv.is_empty() {
351                // No caps advertised. DEL the per-worker caps string AND
352                // SREM from the index so the scanner doesn't GET an empty
353                // string for a worker that never declares caps.
354                let _ = client
355                    .cmd("DEL")
356                    .arg(&caps_key)
357                    .execute::<Option<i64>>()
358                    .await;
359                if let Err(e) = client
360                    .cmd("SREM")
361                    .arg(&index_key)
362                    .arg(&instance_id)
363                    .execute::<Option<i64>>()
364                    .await
365                {
366                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
367                        "SREM workers-index failed; continuing (non-authoritative)");
368                }
369            } else {
370                // Atomic overwrite of the caps STRING (one-command). Then
371                // SADD to the index (idempotent — re-running connect for
372                // the same id is a no-op at SADD level). The per-worker
373                // caps key is written BEFORE the index SADD so that when
374                // the scanner observes the id in the index, the caps key
375                // is guaranteed to resolve to a non-stale CSV (the reverse
376                // order would leak an index entry pointing at a stale or
377                // empty caps key during a narrow window).
378                if let Err(e) = client
379                    .cmd("SET")
380                    .arg(&caps_key)
381                    .arg(&worker_capabilities_csv)
382                    .execute::<Option<String>>()
383                    .await
384                {
385                    tracing::warn!(error = %e, key = %caps_key,
386                        "SET worker caps advertisement failed; continuing");
387                }
388                if let Err(e) = client
389                    .cmd("SADD")
390                    .arg(&index_key)
391                    .arg(&instance_id)
392                    .execute::<Option<i64>>()
393                    .await
394                {
395                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
396                        "SADD workers-index failed; continuing");
397                }
398            }
399        }
400
401        Ok(Self {
402            client,
403            config,
404            partition_config,
405            #[cfg(feature = "direct-valkey-claim")]
406            worker_capabilities_csv,
407            #[cfg(feature = "direct-valkey-claim")]
408            worker_capabilities_hash,
409            #[cfg(feature = "direct-valkey-claim")]
410            lane_index: AtomicUsize::new(0),
411            concurrency_semaphore,
412            #[cfg(feature = "direct-valkey-claim")]
413            scan_cursor: AtomicUsize::new(scan_cursor_init),
414        })
415    }
416
417    /// Get a reference to the underlying ferriskey client.
418    pub fn client(&self) -> &Client {
419        &self.client
420    }
421
422    /// Get the worker config.
423    pub fn config(&self) -> &WorkerConfig {
424        &self.config
425    }
426
427    /// Get the server-published partition config this worker bound to at
428    /// `connect()`. Callers need this when computing partition hash-tags
429    /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
430    /// with the server's `num_flow_partitions` — using
431    /// `PartitionConfig::default()` assumes 256 partitions and silently
432    /// misses data on deployments with any other value.
433    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
434        &self.partition_config
435    }
436
437    /// Attempt to claim the next eligible execution.
438    ///
439    /// Phase 1 simplified claim flow:
440    /// 1. Pick a lane (round-robin across configured lanes)
441    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
442    /// 3. Claim the execution via `ff_claim_execution`
443    /// 4. Read execution payload + tags
444    /// 5. Return a [`ClaimedTask`] with auto lease renewal
445    ///
446    /// Gated behind the `direct-valkey-claim` feature — bypasses the
447    /// scheduler's budget / quota / capability admission checks. Enable
448    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
449    /// the scheduler hop would be measurement noise (benches) or when
450    /// the test harness needs a deterministic worker-local path. Prefer
451    /// the scheduler-routed HTTP claim path in production.
452    ///
453    /// # `None` semantics
454    ///
455    /// `Ok(None)` means **no work was found in the partition window this
456    /// poll covered**, not "the cluster is idle". Each call scans a chunk
457    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
458    /// `scan_cursor`; the cursor advances by that chunk size on every
459    /// invocation, so a worker covers every partition exactly once every
460    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
461    ///
462    /// Callers should treat `None` as "poll again soon" (typically after
463    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
464    /// time". Backing off too aggressively on `None` can starve workers
465    /// when work lives on partitions outside the current window.
466    ///
467    /// Returns `Err` on Valkey errors or script failures.
468    #[cfg(feature = "direct-valkey-claim")]
469    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
470        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
471        // try_acquire returns immediately — if no permits available, the worker
472        // is at capacity and should not claim more work.
473        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
474            Ok(p) => p,
475            Err(_) => return Ok(None), // At capacity — no claim attempted
476        };
477
478        let lane_id = self.next_lane();
479        let now = TimestampMs::now();
480
481        // Phase 1: We scan eligible executions directly by reading the eligible
482        // ZSET across execution partitions. In production the scheduler
483        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
484        // simplified inline claim.
485        //
486        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
487        // partitions starting at a rolling offset. This keeps idle Valkey
488        // load at O(chunk) per worker-second instead of O(num_partitions),
489        // and the worker-instance-seeded initial cursor spreads concurrent
490        // workers across different partition windows.
491        let num_partitions = self.partition_config.num_flow_partitions as usize;
492        if num_partitions == 0 {
493            return Ok(None);
494        }
495        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
496        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
497
498        for step in 0..chunk {
499            let partition_idx = ((start + step) % num_partitions) as u16;
500            let partition = ff_core::partition::Partition {
501                family: ff_core::partition::PartitionFamily::Execution,
502                index: partition_idx,
503            };
504            let idx = IndexKeys::new(&partition);
505            let eligible_key = idx.lane_eligible(&lane_id);
506
507            // ZRANGEBYSCORE to get the highest-priority eligible execution.
508            // Score format: -(priority * 1_000_000_000_000) + created_at_ms
509            // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
510            let result: Value = self
511                .client
512                .cmd("ZRANGEBYSCORE")
513                .arg(&eligible_key)
514                .arg("-inf")
515                .arg("+inf")
516                .arg("LIMIT")
517                .arg("0")
518                .arg("1")
519                .execute()
520                .await
521                .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
522
523            let execution_id_str = match extract_first_array_string(&result) {
524                Some(s) => s,
525                None => continue, // No eligible executions on this partition
526            };
527
528            let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
529                SdkError::Script(ff_script::error::ScriptError::Parse(format!(
530                    "bad execution_id in eligible set: {e}"
531                )))
532            })?;
533
534            // Step 1: Issue claim grant
535            let grant_result = self
536                .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
537                .await;
538
539            match grant_result {
540                Ok(()) => {}
541                Err(SdkError::Script(ff_script::error::ScriptError::CapabilityMismatch(
542                    ref missing,
543                ))) => {
544                    // Block-on-mismatch (RFC-009 §7.5) — parity with
545                    // ff-scheduler's Scheduler::claim_for_worker. Without
546                    // this, the inline-direct-claim path would hot-loop
547                    // on an unclaimable top-of-zset (every tick picks the
548                    // same execution, wastes an FCALL, logs, releases,
549                    // repeats). The scheduler-side unblock scanner
550                    // promotes blocked_route executions back to eligible
551                    // when a worker with matching caps registers.
552                    tracing::info!(
553                        execution_id = %execution_id,
554                        worker_id = %self.config.worker_id,
555                        worker_caps_hash = %self.worker_capabilities_hash,
556                        missing = %missing,
557                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
558                    );
559                    self.block_route(&execution_id, &lane_id, &partition, &idx).await;
560                    continue;
561                }
562                Err(SdkError::Script(ref e)) if is_retryable_claim_error(e) => {
563                    tracing::debug!(
564                        execution_id = %execution_id,
565                        error = %e,
566                        "claim grant failed (retryable), trying next partition"
567                    );
568                    continue;
569                }
570                Err(e) => return Err(e),
571            }
572
573            // Step 2: Claim the execution
574            match self
575                .claim_execution(&execution_id, &lane_id, &partition, now)
576                .await
577            {
578                Ok(mut task) => {
579                    // Transfer concurrency permit to the task. When the task is
580                    // completed/failed/cancelled/dropped the permit returns to
581                    // the semaphore, allowing another claim.
582                    task.set_concurrency_permit(permit);
583                    return Ok(Some(task));
584                }
585                Err(SdkError::Script(ff_script::error::ScriptError::UseClaimResumedExecution)) => {
586                    // Execution was resumed from suspension — attempt_interrupted.
587                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
588                    // which reuses the existing attempt instead of creating a new one.
589                    tracing::debug!(
590                        execution_id = %execution_id,
591                        "execution is resumed, using claim_resumed path"
592                    );
593                    match self
594                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
595                        .await
596                    {
597                        Ok(mut task) => {
598                            task.set_concurrency_permit(permit);
599                            return Ok(Some(task));
600                        }
601                        Err(SdkError::Script(ref e2)) if is_retryable_claim_error(e2) => {
602                            tracing::debug!(
603                                execution_id = %execution_id,
604                                error = %e2,
605                                "claim_resumed failed (retryable), trying next partition"
606                            );
607                            continue;
608                        }
609                        Err(e2) => return Err(e2),
610                    }
611                }
612                Err(SdkError::Script(ref e)) if is_retryable_claim_error(e) => {
613                    tracing::debug!(
614                        execution_id = %execution_id,
615                        error = %e,
616                        "claim execution failed (retryable), trying next partition"
617                    );
618                    continue;
619                }
620                Err(e) => return Err(e),
621            }
622        }
623
624        // No eligible work found on any partition
625        Ok(None)
626    }
627
628    #[cfg(feature = "direct-valkey-claim")]
629    async fn issue_claim_grant(
630        &self,
631        execution_id: &ExecutionId,
632        lane_id: &LaneId,
633        partition: &ff_core::partition::Partition,
634        idx: &IndexKeys,
635    ) -> Result<(), SdkError> {
636        let ctx = ExecKeyContext::new(partition, execution_id);
637
638        // KEYS (3): exec_core, claim_grant_key, eligible_zset
639        let keys: Vec<String> = vec![
640            ctx.core(),
641            ctx.claim_grant(),
642            idx.lane_eligible(lane_id),
643        ];
644
645        // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
646        //           capability_hash, grant_ttl_ms, route_snapshot_json,
647        //           admission_summary, worker_capabilities_csv (sorted)
648        let args: Vec<String> = vec![
649            execution_id.to_string(),
650            self.config.worker_id.to_string(),
651            self.config.worker_instance_id.to_string(),
652            lane_id.to_string(),
653            String::new(), // capability_hash
654            "5000".to_owned(), // grant_ttl_ms (5 seconds)
655            String::new(), // route_snapshot_json
656            String::new(), // admission_summary
657            self.worker_capabilities_csv.clone(), // sorted CSV
658        ];
659
660        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
661        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
662
663        let raw: Value = self
664            .client
665            .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
666            .await
667            .map_err(SdkError::Valkey)?;
668
669        crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
670    }
671
672    /// Move an execution from the lane's eligible ZSET into its
673    /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
674    /// after a `CapabilityMismatch` reject — without this, the inline
675    /// direct-claim path would re-pick the same top-of-zset every tick
676    /// (same pattern the scheduler's block_candidate handles). The
677    /// engine's unblock scanner periodically promotes blocked_route
678    /// back to eligible once a worker with matching caps registers.
679    ///
680    /// Best-effort: transport or logical rejects (e.g. the execution
681    /// already went terminal between pick and block) are logged and the
682    /// outer loop simply `continue`s to the next partition. Parity with
683    /// ff-scheduler::Scheduler::block_candidate.
684    #[cfg(feature = "direct-valkey-claim")]
685    async fn block_route(
686        &self,
687        execution_id: &ExecutionId,
688        lane_id: &LaneId,
689        partition: &ff_core::partition::Partition,
690        idx: &IndexKeys,
691    ) {
692        let ctx = ExecKeyContext::new(partition, execution_id);
693        let core_key = ctx.core();
694        let eligible_key = idx.lane_eligible(lane_id);
695        let blocked_key = idx.lane_blocked_route(lane_id);
696        let eid_s = execution_id.to_string();
697        let now_ms = TimestampMs::now().0.to_string();
698
699        let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
700        let argv: [&str; 4] = [
701            &eid_s,
702            "waiting_for_capable_worker",
703            "no connected worker satisfies required_capabilities",
704            &now_ms,
705        ];
706
707        match self
708            .client
709            .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
710            .await
711        {
712            Ok(v) => {
713                // Parse Lua result so a logical reject (e.g. execution
714                // went terminal mid-flight) is visible — same fix we
715                // applied to ff-scheduler's block_candidate.
716                if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
717                    tracing::warn!(
718                        execution_id = %execution_id,
719                        error = %e,
720                        "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
721                         will re-evaluate"
722                    );
723                }
724            }
725            Err(e) => {
726                tracing::warn!(
727                    execution_id = %execution_id,
728                    error = %e,
729                    "SDK block_route: transport failure; eligible ZSET unchanged"
730                );
731            }
732        }
733    }
734
735    /// Low-level claim of a granted execution. Invokes
736    /// `ff_claim_execution` and returns a `ClaimedTask` with auto
737    /// lease renewal.
738    ///
739    /// Previously gated behind `direct-valkey-claim`; ungated so
740    /// the public [`claim_from_grant`] entry point can reuse the
741    /// same FCALL plumbing. The method stays private — external
742    /// callers use `claim_from_grant`.
743    ///
744    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
745    async fn claim_execution(
746        &self,
747        execution_id: &ExecutionId,
748        lane_id: &LaneId,
749        partition: &ff_core::partition::Partition,
750        _now: TimestampMs,
751    ) -> Result<ClaimedTask, SdkError> {
752        let ctx = ExecKeyContext::new(partition, execution_id);
753        let idx = IndexKeys::new(partition);
754
755        // Pre-read total_attempt_count from exec_core to derive next attempt index.
756        // The Lua uses total_attempt_count as the new index and dynamically builds
757        // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
758        // we pass the correct index for documentation/debugging.
759        let total_str: Option<String> = self.client
760            .cmd("HGET")
761            .arg(ctx.core())
762            .arg("total_attempt_count")
763            .execute()
764            .await
765            .unwrap_or(None);
766        let next_idx = total_str
767            .as_deref()
768            .and_then(|s| s.parse::<u32>().ok())
769            .unwrap_or(0);
770        let att_idx = AttemptIndex::new(next_idx);
771
772        let lease_id = LeaseId::new().to_string();
773        let attempt_id = AttemptId::new().to_string();
774        let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
775
776        // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
777        let keys: Vec<String> = vec![
778            ctx.core(),                                    // 1  exec_core
779            ctx.claim_grant(),                             // 2  claim_grant
780            idx.lane_eligible(lane_id),                    // 3  eligible_zset
781            idx.lease_expiry(),                            // 4  lease_expiry_zset
782            idx.worker_leases(&self.config.worker_instance_id), // 5  worker_leases
783            ctx.attempt_hash(att_idx),                     // 6  attempt_hash (placeholder)
784            ctx.attempt_usage(att_idx),                    // 7  attempt_usage (placeholder)
785            ctx.attempt_policy(att_idx),                   // 8  attempt_policy (placeholder)
786            ctx.attempts(),                                // 9  attempts_zset
787            ctx.lease_current(),                           // 10 lease_current
788            ctx.lease_history(),                           // 11 lease_history
789            idx.lane_active(lane_id),                      // 12 active_index
790            idx.attempt_timeout(),                         // 13 attempt_timeout_zset
791            idx.execution_deadline(),                      // 14 execution_deadline_zset
792        ];
793
794        // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
795        let args: Vec<String> = vec![
796            execution_id.to_string(),                      // 1  execution_id
797            self.config.worker_id.to_string(),             // 2  worker_id
798            self.config.worker_instance_id.to_string(),    // 3  worker_instance_id
799            lane_id.to_string(),                           // 4  lane
800            String::new(),                                 // 5  capability_hash
801            lease_id.clone(),                              // 6  lease_id
802            self.config.lease_ttl_ms.to_string(),          // 7  lease_ttl_ms
803            renew_before_ms.to_string(),                   // 8  renew_before_ms
804            attempt_id.clone(),                            // 9  attempt_id
805            "{}".to_owned(),                               // 10 attempt_policy_json
806            String::new(),                                 // 11 attempt_timeout_ms
807            String::new(),                                 // 12 execution_deadline_at
808        ];
809
810        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
811        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
812
813        let raw: Value = self
814            .client
815            .fcall("ff_claim_execution", &key_refs, &arg_refs)
816            .await
817            .map_err(SdkError::Valkey)?;
818
819        // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
820        //                      attempt_id, attempt_type, lease_expires_at}
821        let arr = match &raw {
822            Value::Array(arr) => arr,
823            _ => {
824                return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
825                    "ff_claim_execution: expected Array".into(),
826                )));
827            }
828        };
829
830        let status_code = match arr.first() {
831            Some(Ok(Value::Int(n))) => *n,
832            _ => {
833                return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
834                    "ff_claim_execution: bad status code".into(),
835                )));
836            }
837        };
838
839        if status_code != 1 {
840            let err_field_str = |idx: usize| -> String {
841                arr.get(idx)
842                    .and_then(|v| match v {
843                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
844                        Ok(Value::SimpleString(s)) => Some(s.clone()),
845                        _ => None,
846                    })
847                    .unwrap_or_default()
848            };
849            let error_code = {
850                let s = err_field_str(1);
851                if s.is_empty() { "unknown".to_owned() } else { s }
852            };
853            let detail = err_field_str(2);
854
855            return Err(SdkError::Script(
856                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
857                    .unwrap_or_else(|| {
858                        ff_script::error::ScriptError::Parse(format!(
859                            "ff_claim_execution: {error_code}"
860                        ))
861                    }),
862            ));
863        }
864
865        // Extract fields from success response
866        let field_str = |idx: usize| -> String {
867            arr.get(idx + 2) // skip status_code and "OK"
868                .and_then(|v| match v {
869                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
870                    Ok(Value::SimpleString(s)) => Some(s.clone()),
871                    Ok(Value::Int(n)) => Some(n.to_string()),
872                    _ => None,
873                })
874                .unwrap_or_default()
875        };
876
877        // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
878        // Positions:       0         1       2           3            4              5
879        let lease_id = LeaseId::parse(&field_str(0))
880            .unwrap_or_else(|_| LeaseId::new());
881        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
882        // field_str(2) is expires_at — skip it (lease timing managed by renewal)
883        let attempt_id = AttemptId::parse(&field_str(3))
884            .unwrap_or_else(|_| AttemptId::new());
885        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
886
887        // Read execution payload and metadata
888        let (input_payload, execution_kind, tags) = self
889            .read_execution_context(execution_id, partition)
890            .await?;
891
892        Ok(ClaimedTask::new(
893            self.client.clone(),
894            self.partition_config,
895            execution_id.clone(),
896            attempt_index,
897            attempt_id,
898            lease_id,
899            lease_epoch,
900            self.config.lease_ttl_ms,
901            lane_id.clone(),
902            self.config.worker_instance_id.clone(),
903            input_payload,
904            execution_kind,
905            tags,
906        ))
907    }
908
909    /// Consume a [`ClaimGrant`] and claim the granted execution on
910    /// this worker. The intended production entry point: pair with
911    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
912    /// scheduler-issued grants into the SDK without enabling the
913    /// `direct-valkey-claim` feature (which bypasses budget/quota
914    /// admission control).
915    ///
916    /// The worker's concurrency semaphore is checked BEFORE the FCALL
917    /// so a saturated worker does not consume the grant: the grant
918    /// stays valid for its remaining TTL and the caller can either
919    /// release it back to the scheduler or retry after some other
920    /// in-flight task completes.
921    ///
922    /// On success the returned [`ClaimedTask`] holds a concurrency
923    /// permit that releases automatically on
924    /// `complete`/`fail`/`cancel`/drop — same contract as
925    /// `claim_next`.
926    ///
927    /// # Arguments
928    ///
929    /// * `lane` — the lane the grant was issued for. Must match what
930    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
931    ///   uses it to look up `lane_eligible`, `lane_active`, and the
932    ///   `worker_leases` index slot.
933    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
934    ///
935    /// # Errors
936    ///
937    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
938    ///   permits all held. Retryable; the grant is untouched.
939    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
940    ///   or `worker_id` mismatch (wrapped in [`SdkError::Script`]).
941    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
942    ///   (wrapped in [`SdkError::Script`]).
943    /// * `ScriptError::CapabilityMismatch` — execution's required
944    ///   capabilities not a subset of this worker's caps (wrapped in
945    ///   [`SdkError::Script`]). Surfaced post-grant if a race
946    ///   between grant issuance and caps change allows it.
947    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
948    ///   unexpected shape (wrapped in [`SdkError::Script`]).
949    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
950    ///   transport error during the FCALL or the
951    ///   `read_execution_context` follow-up.
952    ///
953    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
954    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
955    pub async fn claim_from_grant(
956        &self,
957        lane: LaneId,
958        grant: ff_core::contracts::ClaimGrant,
959    ) -> Result<ClaimedTask, SdkError> {
960        // Semaphore check FIRST. If the worker is saturated we must
961        // surface the condition to the caller without touching the
962        // grant — silently returning Ok(None) (as claim_next does)
963        // would drop a grant the scheduler has already committed work
964        // to issuing, wasting the slot until its TTL elapses.
965        let permit = self
966            .concurrency_semaphore
967            .clone()
968            .try_acquire_owned()
969            .map_err(|_| SdkError::WorkerAtCapacity)?;
970
971        let now = TimestampMs::now();
972        let mut task = self
973            .claim_execution(&grant.execution_id, &lane, &grant.partition, now)
974            .await?;
975        task.set_concurrency_permit(permit);
976        Ok(task)
977    }
978
979    /// Scheduler-routed claim: POST the server's
980    /// `/v1/workers/{id}/claim`, then chain to
981    /// [`Self::claim_from_grant`].
982    ///
983    /// Batch C item 2 PR-B. This is the production entry point —
984    /// budget + quota + capability admission run server-side inside
985    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
986    /// enable the `direct-valkey-claim` feature.
987    ///
988    /// Returns `Ok(None)` when the server says no eligible execution
989    /// (HTTP 204). Callers typically back off by
990    /// `config.claim_poll_interval_ms` and try again, same cadence
991    /// as the direct-claim path's `Ok(None)`.
992    ///
993    /// The `admin` client is the established HTTP surface
994    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
995    /// second reqwest client around. Build once at worker boot and
996    /// hand in by reference on every claim.
997    pub async fn claim_via_server(
998        &self,
999        admin: &crate::FlowFabricAdminClient,
1000        lane: &LaneId,
1001        grant_ttl_ms: u64,
1002    ) -> Result<Option<ClaimedTask>, SdkError> {
1003        let req = crate::admin::ClaimForWorkerRequest {
1004            worker_id: self.config.worker_id.to_string(),
1005            lane_id: lane.to_string(),
1006            worker_instance_id: self.config.worker_instance_id.to_string(),
1007            capabilities: self.config.capabilities.clone(),
1008            grant_ttl_ms,
1009        };
1010        let Some(resp) = admin.claim_for_worker(req).await? else {
1011            return Ok(None);
1012        };
1013        let grant = resp.into_grant()?;
1014        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1015    }
1016
1017    /// Consume a [`ReclaimGrant`] and transition the granted
1018    /// `attempt_interrupted` execution into a `started` state on this
1019    /// worker. Symmetric partner to [`claim_from_grant`] for the
1020    /// resume path.
1021    ///
1022    /// The grant must have been issued to THIS worker (matching
1023    /// `worker_id` at grant time). A mismatch returns
1024    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1025    /// atomically by `ff_claim_resumed_execution`; a second call with
1026    /// the same grant also returns `InvalidClaimGrant`.
1027    ///
1028    /// # Concurrency
1029    ///
1030    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1031    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1032    /// assume pre-existing capacity on this worker — a reclaim can
1033    /// land on a fresh worker instance that just came up after a
1034    /// crash/restart and is picking up a previously-interrupted
1035    /// execution. If the worker is saturated, the grant stays valid
1036    /// for its remaining TTL and the caller can release it or retry.
1037    ///
1038    /// On success the returned [`ClaimedTask`] holds a concurrency
1039    /// permit that releases automatically on
1040    /// `complete`/`fail`/`cancel`/drop.
1041    ///
1042    /// # Errors
1043    ///
1044    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1045    ///   permits all held. Retryable; the grant is untouched (no
1046    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1047    ///   atomically consume the grant key).
1048    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1049    ///   or `worker_id` mismatch.
1050    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1051    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1052    ///   `attempt_interrupted`.
1053    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1054    ///   not `runnable`.
1055    /// * `ScriptError::ExecutionNotFound` — core key missing.
1056    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1057    ///   transport.
1058    ///
1059    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1060    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1061    pub async fn claim_from_reclaim_grant(
1062        &self,
1063        grant: ff_core::contracts::ReclaimGrant,
1064    ) -> Result<ClaimedTask, SdkError> {
1065        // Semaphore check FIRST — same load-bearing ordering as
1066        // `claim_from_grant`. If the worker is saturated, surface
1067        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1068        // atomic consume on the grant key, so calling it past-
1069        // saturation would destroy the grant while leaving no
1070        // permit to attach to the returned `ClaimedTask`.
1071        let permit = self
1072            .concurrency_semaphore
1073            .clone()
1074            .try_acquire_owned()
1075            .map_err(|_| SdkError::WorkerAtCapacity)?;
1076
1077        // Grant carries partition + lane_id so no round-trip is needed
1078        // to resolve them before the FCALL.
1079        let mut task = self
1080            .claim_resumed_execution(
1081                &grant.execution_id,
1082                &grant.lane_id,
1083                &grant.partition,
1084            )
1085            .await?;
1086        task.set_concurrency_permit(permit);
1087        Ok(task)
1088    }
1089
1090    /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1091    /// and returns a `ClaimedTask` bound to the resumed attempt.
1092    ///
1093    /// Previously gated behind `direct-valkey-claim`; ungated so the
1094    /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1095    /// The method stays private — external callers use
1096    /// `claim_from_reclaim_grant`.
1097    ///
1098    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1099    async fn claim_resumed_execution(
1100        &self,
1101        execution_id: &ExecutionId,
1102        lane_id: &LaneId,
1103        partition: &ff_core::partition::Partition,
1104    ) -> Result<ClaimedTask, SdkError> {
1105        let ctx = ExecKeyContext::new(partition, execution_id);
1106        let idx = IndexKeys::new(partition);
1107
1108        // Pre-read current_attempt_index for the existing attempt hash key.
1109        // This is load-bearing: KEYS[6] must point to the real attempt hash.
1110        let att_idx_str: Option<String> = self.client
1111            .cmd("HGET")
1112            .arg(ctx.core())
1113            .arg("current_attempt_index")
1114            .execute()
1115            .await
1116            .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1117        let att_idx = AttemptIndex::new(
1118            att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1119        );
1120
1121        let lease_id = LeaseId::new().to_string();
1122
1123        // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1124        let keys: Vec<String> = vec![
1125            ctx.core(),                                             // 1  exec_core
1126            ctx.claim_grant(),                                      // 2  claim_grant
1127            idx.lane_eligible(lane_id),                             // 3  eligible_zset
1128            idx.lease_expiry(),                                     // 4  lease_expiry_zset
1129            idx.worker_leases(&self.config.worker_instance_id),     // 5  worker_leases
1130            ctx.attempt_hash(att_idx),                              // 6  existing_attempt_hash
1131            ctx.lease_current(),                                    // 7  lease_current
1132            ctx.lease_history(),                                    // 8  lease_history
1133            idx.lane_active(lane_id),                               // 9  active_index
1134            idx.attempt_timeout(),                                  // 10 attempt_timeout_zset
1135            idx.execution_deadline(),                               // 11 execution_deadline_zset
1136        ];
1137
1138        // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1139        let args: Vec<String> = vec![
1140            execution_id.to_string(),                               // 1  execution_id
1141            self.config.worker_id.to_string(),                      // 2  worker_id
1142            self.config.worker_instance_id.to_string(),             // 3  worker_instance_id
1143            lane_id.to_string(),                                    // 4  lane
1144            String::new(),                                          // 5  capability_hash
1145            lease_id.clone(),                                       // 6  lease_id
1146            self.config.lease_ttl_ms.to_string(),                   // 7  lease_ttl_ms
1147            String::new(),                                          // 8  remaining_attempt_timeout_ms
1148        ];
1149
1150        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1151        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1152
1153        let raw: Value = self
1154            .client
1155            .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1156            .await
1157            .map_err(SdkError::Valkey)?;
1158
1159        // Parse result — same format as ff_claim_execution:
1160        // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1161        let arr = match &raw {
1162            Value::Array(arr) => arr,
1163            _ => {
1164                return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
1165                    "ff_claim_resumed_execution: expected Array".into(),
1166                )));
1167            }
1168        };
1169
1170        let status_code = match arr.first() {
1171            Some(Ok(Value::Int(n))) => *n,
1172            _ => {
1173                return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
1174                    "ff_claim_resumed_execution: bad status code".into(),
1175                )));
1176            }
1177        };
1178
1179        if status_code != 1 {
1180            let err_field_str = |idx: usize| -> String {
1181                arr.get(idx)
1182                    .and_then(|v| match v {
1183                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1184                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1185                        _ => None,
1186                    })
1187                    .unwrap_or_default()
1188            };
1189            let error_code = {
1190                let s = err_field_str(1);
1191                if s.is_empty() { "unknown".to_owned() } else { s }
1192            };
1193            let detail = err_field_str(2);
1194
1195            return Err(SdkError::Script(
1196                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1197                    .unwrap_or_else(|| {
1198                        ff_script::error::ScriptError::Parse(format!(
1199                            "ff_claim_resumed_execution: {error_code}"
1200                        ))
1201                    }),
1202            ));
1203        }
1204
1205        let field_str = |idx: usize| -> String {
1206            arr.get(idx + 2)
1207                .and_then(|v| match v {
1208                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1209                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1210                    Ok(Value::Int(n)) => Some(n.to_string()),
1211                    _ => None,
1212                })
1213                .unwrap_or_default()
1214        };
1215
1216        let lease_id = LeaseId::parse(&field_str(0))
1217            .unwrap_or_else(|_| LeaseId::new());
1218        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1219        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1220        let attempt_id = AttemptId::parse(&field_str(3))
1221            .unwrap_or_else(|_| AttemptId::new());
1222
1223        let (input_payload, execution_kind, tags) = self
1224            .read_execution_context(execution_id, partition)
1225            .await?;
1226
1227        Ok(ClaimedTask::new(
1228            self.client.clone(),
1229            self.partition_config,
1230            execution_id.clone(),
1231            attempt_index,
1232            attempt_id,
1233            lease_id,
1234            lease_epoch,
1235            self.config.lease_ttl_ms,
1236            lane_id.clone(),
1237            self.config.worker_instance_id.clone(),
1238            input_payload,
1239            execution_kind,
1240            tags,
1241        ))
1242    }
1243
1244    /// Read payload + execution_kind + tags from exec_core. Previously
1245    /// gated behind `direct-valkey-claim`; now shared by the
1246    /// feature-gated inline claim path and the public
1247    /// `claim_from_reclaim_grant` entry point.
1248    async fn read_execution_context(
1249        &self,
1250        execution_id: &ExecutionId,
1251        partition: &ff_core::partition::Partition,
1252    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1253        let ctx = ExecKeyContext::new(partition, execution_id);
1254
1255        // Read payload
1256        let payload: Option<String> = self
1257            .client
1258            .get(&ctx.payload())
1259            .await
1260            .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1261        let input_payload = payload.unwrap_or_default().into_bytes();
1262
1263        // Read execution_kind from core
1264        let kind: Option<String> = self
1265            .client
1266            .hget(&ctx.core(), "execution_kind")
1267            .await
1268            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1269        let execution_kind = kind.unwrap_or_default();
1270
1271        // Read tags
1272        let tags: HashMap<String, String> = self
1273            .client
1274            .hgetall(&ctx.tags())
1275            .await
1276            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1277
1278        Ok((input_payload, execution_kind, tags))
1279    }
1280
1281    // ── Phase 3: Signal delivery ──
1282
1283    /// Deliver a signal to a suspended execution's waitpoint.
1284    ///
1285    /// The engine atomically records the signal, evaluates the resume condition,
1286    /// and optionally transitions the execution from `suspended` to `runnable`.
1287    pub async fn deliver_signal(
1288        &self,
1289        execution_id: &ExecutionId,
1290        waitpoint_id: &WaitpointId,
1291        signal: crate::task::Signal,
1292    ) -> Result<crate::task::SignalOutcome, SdkError> {
1293        let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1294        let ctx = ExecKeyContext::new(&partition, execution_id);
1295        let idx = IndexKeys::new(&partition);
1296
1297        let signal_id = ff_core::types::SignalId::new();
1298        let now = TimestampMs::now();
1299
1300        // Pre-read lane_id from exec_core — the execution may be on any lane,
1301        // not necessarily one of this worker's configured lanes.
1302        let lane_str: Option<String> = self
1303            .client
1304            .hget(&ctx.core(), "lane_id")
1305            .await
1306            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1307        let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1308
1309        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1310        //            exec_signals_zset, signal_hash, signal_payload,
1311        //            idem_key, waitpoint_hash, suspension_current,
1312        //            eligible_zset, suspended_zset, delayed_zset,
1313        //            suspension_timeout_zset, hmac_secrets
1314        let idem_key = if let Some(ref ik) = signal.idempotency_key {
1315            ctx.signal_dedup(waitpoint_id, ik)
1316        } else {
1317            ctx.noop() // must share {p:N} hash tag for cluster mode
1318        };
1319        let keys: Vec<String> = vec![
1320            ctx.core(),                                    // 1
1321            ctx.waitpoint_condition(waitpoint_id),         // 2
1322            ctx.waitpoint_signals(waitpoint_id),           // 3
1323            ctx.exec_signals(),                            // 4
1324            ctx.signal(&signal_id),                        // 5
1325            ctx.signal_payload(&signal_id),                // 6
1326            idem_key,                                      // 7
1327            ctx.waitpoint(waitpoint_id),                   // 8
1328            ctx.suspension_current(),                      // 9
1329            idx.lane_eligible(&lane_id),                   // 10
1330            idx.lane_suspended(&lane_id),                  // 11
1331            idx.lane_delayed(&lane_id),                    // 12
1332            idx.suspension_timeout(),                      // 13
1333            idx.waitpoint_hmac_secrets(),                  // 14
1334        ];
1335
1336        let payload_str = signal
1337            .payload
1338            .as_ref()
1339            .map(|p| String::from_utf8_lossy(p).into_owned())
1340            .unwrap_or_default();
1341
1342        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1343        //            signal_category, source_type, source_identity,
1344        //            payload, payload_encoding, idempotency_key,
1345        //            correlation_id, target_scope, created_at,
1346        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1347        //            max_signals_per_execution, waitpoint_token
1348        let args: Vec<String> = vec![
1349            signal_id.to_string(),                           // 1
1350            execution_id.to_string(),                        // 2
1351            waitpoint_id.to_string(),                        // 3
1352            signal.signal_name,                              // 4
1353            signal.signal_category,                          // 5
1354            signal.source_type,                              // 6
1355            signal.source_identity,                          // 7
1356            payload_str,                                     // 8
1357            "json".to_owned(),                               // 9 payload_encoding
1358            signal.idempotency_key.unwrap_or_default(),      // 10
1359            String::new(),                                   // 11 correlation_id
1360            "waitpoint".to_owned(),                          // 12 target_scope
1361            now.to_string(),                                 // 13 created_at
1362            "86400000".to_owned(),                           // 14 dedup_ttl_ms
1363            "0".to_owned(),                                  // 15 resume_delay_ms
1364            "1000".to_owned(),                               // 16 signal_maxlen
1365            "10000".to_owned(),                              // 17 max_signals
1366            // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1367            // use ToString/Display (those are redacted for log safety);
1368            // .as_str() is the explicit opt-in that gets the secret bytes.
1369            signal.waitpoint_token.as_str().to_owned(),      // 18 waitpoint_token
1370        ];
1371
1372        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1373        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1374
1375        let raw: Value = self
1376            .client
1377            .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1378            .await
1379            .map_err(SdkError::Valkey)?;
1380
1381        crate::task::parse_signal_result(&raw)
1382    }
1383
1384    #[cfg(feature = "direct-valkey-claim")]
1385    fn next_lane(&self) -> LaneId {
1386        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1387        self.config.lanes[idx].clone()
1388    }
1389}
1390
1391#[cfg(feature = "direct-valkey-claim")]
1392fn is_retryable_claim_error(err: &ff_script::error::ScriptError) -> bool {
1393    use ff_core::error::ErrorClass;
1394    matches!(
1395        err.class(),
1396        ErrorClass::Retryable | ErrorClass::Informational
1397    )
1398}
1399
1400/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1401/// instance id with FNV-1a to place distinct worker processes on different
1402/// partition windows from their first poll. Zero is valid for single-worker
1403/// clusters but spreads work in multi-worker deployments.
1404#[cfg(feature = "direct-valkey-claim")]
1405fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1406    if num_partitions == 0 {
1407        return 0;
1408    }
1409    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1410}
1411
1412#[cfg(feature = "direct-valkey-claim")]
1413fn extract_first_array_string(value: &Value) -> Option<String> {
1414    match value {
1415        Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1416            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1417            Ok(Value::SimpleString(s)) => Some(s.clone()),
1418            _ => None,
1419        },
1420        _ => None,
1421    }
1422}
1423
1424/// Read partition config from Valkey's `ff:config:partitions` hash.
1425/// Returns Err if the key doesn't exist or can't be read.
1426async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1427    let key = ff_core::keys::global_config_partitions();
1428    let fields: HashMap<String, String> = client
1429        .hgetall(&key)
1430        .await
1431        .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1432
1433    if fields.is_empty() {
1434        return Err(SdkError::Config(
1435            "ff:config:partitions not found in Valkey".into(),
1436        ));
1437    }
1438
1439    let parse = |field: &str, default: u16| -> u16 {
1440        fields
1441            .get(field)
1442            .and_then(|v| v.parse().ok())
1443            .filter(|&n: &u16| n > 0)
1444            .unwrap_or(default)
1445    };
1446
1447    Ok(PartitionConfig {
1448        num_flow_partitions: parse("num_flow_partitions", 256),
1449        num_budget_partitions: parse("num_budget_partitions", 32),
1450        num_quota_partitions: parse("num_quota_partitions", 32),
1451    })
1452}
1453
1454#[cfg(all(test, feature = "direct-valkey-claim"))]
1455mod scan_cursor_tests {
1456    use super::scan_cursor_seed;
1457
1458    #[test]
1459    fn stable_for_same_input() {
1460        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1461    }
1462
1463    #[test]
1464    fn distinct_for_different_ids() {
1465        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1466    }
1467
1468    #[test]
1469    fn bounded_by_partition_count() {
1470        for i in 0..100 {
1471            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1472        }
1473    }
1474
1475    #[test]
1476    fn zero_partitions_returns_zero() {
1477        assert_eq!(scan_cursor_seed("w1", 0), 0);
1478    }
1479}