Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43///     if let Some(task) = worker.claim_next().await? {
44///         // Process task...
45///         task.complete(Some(b"result".to_vec())).await?;
46///     } else {
47///         tokio::time::sleep(Duration::from_secs(1)).await;
48///     }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52    client: Client,
53    config: WorkerConfig,
54    partition_config: PartitionConfig,
55    /// Sorted, deduplicated, comma-separated capabilities — computed once
56    /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57    /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58    /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59    /// reproducible logs and tests.
60    #[cfg(feature = "direct-valkey-claim")]
61    worker_capabilities_csv: String,
62    /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63    /// per-mismatch logs so the 4KB CSV never echoes on every reject
64    /// during an incident. Full CSV logged once at connect-time WARN for
65    /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66    #[cfg(feature = "direct-valkey-claim")]
67    worker_capabilities_hash: String,
68    #[cfg(feature = "direct-valkey-claim")]
69    lane_index: AtomicUsize,
70    /// Concurrency cap for in-flight tasks. Permits are acquired or
71    /// transferred by [`claim_next`] (feature-gated),
72    /// [`claim_from_grant`] (always available), and
73    /// [`claim_from_reclaim_grant`], transferred to the returned
74    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75    /// Holds `max_concurrent_tasks` permits total.
76    ///
77    /// [`claim_next`]: FlowFabricWorker::claim_next
78    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80    concurrency_semaphore: Arc<Semaphore>,
81    /// Rolling offset for chunked partition scans. Each poll advances the
82    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83    /// chunk)` polls every partition is covered. The initial value is
84    /// derived from `worker_instance_id` so idle workers spread their
85    /// scans across different partitions from the first poll onward.
86    ///
87    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91    /// correctness because the sequence simply restarts a new cycle.
92    #[cfg(feature = "direct-valkey-claim")]
93    scan_cursor: AtomicUsize,
94    /// The [`EngineBackend`] the Stage-1b trait forwarders route
95    /// through.
96    ///
97    /// **RFC-012 Stage 1b.** Always populated:
98    /// [`FlowFabricWorker::connect`] now wraps the worker's own
99    /// `ferriskey::Client` in a `ValkeyBackend` via
100    /// `ValkeyBackend::from_client_and_partitions`, and
101    /// [`FlowFabricWorker::connect_with`] replaces that default with
102    /// the caller-supplied `Arc<dyn EngineBackend>`. The
103    /// [`FlowFabricWorker::backend`] accessor still returns
104    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
105    /// narrows the return type once consumers have migrated.
106    ///
107    /// Hot paths (claim, deliver_signal, admin queries) still use the
108    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
109    /// migrates them through this field, and Stage 1d removes the
110    /// embedded client.
111    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
112}
113
114/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
115/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
116/// O(num_flow_partitions).
117#[cfg(feature = "direct-valkey-claim")]
118const PARTITION_SCAN_CHUNK: usize = 32;
119
120impl FlowFabricWorker {
121    /// Connect to Valkey and prepare the worker.
122    ///
123    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
124    /// library — that is the server's responsibility (ff-server calls
125    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
126    /// the library is already loaded.
127    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
128        if config.lanes.is_empty() {
129            return Err(SdkError::Config {
130                context: "worker_config".into(),
131                field: None,
132                message: "at least one lane is required".into(),
133            });
134        }
135
136        let mut builder = ClientBuilder::new()
137            .host(&config.host, config.port)
138            .connect_timeout(Duration::from_secs(10))
139            .request_timeout(Duration::from_millis(5000));
140        if config.tls {
141            builder = builder.tls();
142        }
143        if config.cluster {
144            builder = builder.cluster();
145        }
146        let client = builder.build()
147            .await
148            .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
149
150        // Verify connectivity
151        let pong: String = client
152            .cmd("PING")
153            .execute()
154            .await
155            .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
156        if pong != "PONG" {
157            return Err(SdkError::Config {
158                context: "worker_connect".into(),
159                field: None,
160                message: format!("unexpected PING response: {pong}"),
161            });
162        }
163
164        // Guard against two worker processes sharing the same
165        // `worker_instance_id`. A duplicate instance would clobber each
166        // other's lease_current/active_index entries and double-claim work.
167        // SET NX on a liveness key with 2× lease TTL; if the key already
168        // exists another process is live. The key auto-expires if this
169        // process crashes without renewal, so a restart after a hard crash
170        // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
171        //
172        // Known limitations of this minimal scheme (documented for operators):
173        //   1. **Startup-only, not runtime.** There is no heartbeat renewal
174        //      path. After `2 × lease_ttl_ms` elapses the alive key expires
175        //      naturally even while this worker is still running, and a
176        //      second process with the same `worker_instance_id` launched
177        //      later will successfully SET NX alongside the first. The check
178        //      catches misconfiguration at boot; it does not fence duplicates
179        //      that appear mid-lifetime. Production deployments should rely
180        //      on the orchestrator (Kubernetes, systemd unit with
181        //      `Restart=on-failure`, etc.) as the authoritative single-
182        //      instance enforcer; this SET NX is belt-and-suspenders.
183        //
184        //   2. **Restart delay after a crash.** If a worker crashes
185        //      ungracefully (SIGKILL, container OOM) and is restarted within
186        //      `2 × lease_ttl_ms`, the alive key is still present and the
187        //      new process exits with `SdkError::Config("duplicate
188        //      worker_instance_id ...")`. Options for operators:
189        //        - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
190        //          before restarting.
191        //        - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
192        //          unblock the restart.
193        //        - Use a fresh `worker_instance_id` for the restart (the
194        //          orchestrator should already do this per-Pod).
195        //
196        //   3. **No graceful cleanup on shutdown.** There is no explicit
197        //      `disconnect()` call that DELs the alive key. On clean
198        //      `SIGTERM` the key lingers until its TTL expires. A follow-up
199        //      can add `FlowFabricWorker::disconnect(self)` for callers that
200        //      want to skip the restart-delay window.
201        let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
202        let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
203        let set_result: Option<String> = client
204            .cmd("SET")
205            .arg(&alive_key)
206            .arg("1")
207            .arg("NX")
208            .arg("PX")
209            .arg(alive_ttl_ms.to_string().as_str())
210            .execute()
211            .await
212            .map_err(|e| SdkError::ValkeyContext {
213                source: e,
214                context: "SET NX worker alive key".into(),
215            })?;
216        if set_result.is_none() {
217            return Err(SdkError::Config {
218                context: "worker_connect".into(),
219                field: Some("worker_instance_id".into()),
220                message: format!(
221                    "duplicate worker_instance_id '{}': another process already holds {alive_key}",
222                    config.worker_instance_id
223                ),
224            });
225        }
226
227        // Read partition config from Valkey (set by ff-server on startup).
228        // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
229        let partition_config = read_partition_config(&client).await
230            .unwrap_or_else(|e| {
231                tracing::warn!(
232                    error = %e,
233                    "ff:config:partitions not found, using defaults"
234                );
235                PartitionConfig::default()
236            });
237
238        let max_tasks = config.max_concurrent_tasks.max(1);
239        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
240
241        tracing::info!(
242            worker_id = %config.worker_id,
243            instance_id = %config.worker_instance_id,
244            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
245            "FlowFabricWorker connected"
246        );
247
248        #[cfg(feature = "direct-valkey-claim")]
249        let scan_cursor_init = scan_cursor_seed(
250            config.worker_instance_id.as_str(),
251            partition_config.num_flow_partitions.max(1) as usize,
252        );
253
254        // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
255        // and deduplicates in one pass; string joining happens once here.
256        //
257        // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
258        //   - `,` is the CSV delimiter; a token containing one would split
259        //     mid-parse and could let a {"gpu"} worker appear to satisfy
260        //     {"gpu,cuda"} (silent auth bypass).
261        //   - Empty strings would produce leading/adjacent commas on the
262        //     wire and inflate token count for no semantic reason.
263        //   - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
264        //     `"gpu\n"` vs `"gpu"` produce silent mismatches that are
265        //     miserable to debug. Reject anything outside printable ASCII
266        //     excluding space (`'!'..='~'`) at ingress so a typo fails
267        //     loudly at connect instead of silently mis-routing forever.
268        // Reject at boot so operator misconfig is loud, symmetric with the
269        // scheduler path.
270        #[cfg(feature = "direct-valkey-claim")]
271        for cap in &config.capabilities {
272            if cap.is_empty() {
273                return Err(SdkError::Config {
274                    context: "worker_config".into(),
275                    field: Some("capabilities".into()),
276                    message: "capability token must not be empty".into(),
277                });
278            }
279            if cap.contains(',') {
280                return Err(SdkError::Config {
281                    context: "worker_config".into(),
282                    field: Some("capabilities".into()),
283                    message: format!(
284                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
285                    ),
286                });
287            }
288            // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
289            // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
290            // characters above 0x7F are ALLOWED so i18n caps like
291            // "东京-gpu" can be used. The CSV wire form is byte-safe for
292            // multibyte UTF-8 because `,` is always a single byte and
293            // never part of a multibyte continuation (only 0x80-0xBF are
294            // continuations, ',' is 0x2C).
295            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
296                return Err(SdkError::Config {
297                    context: "worker_config".into(),
298                    field: Some("capabilities".into()),
299                    message: format!(
300                        "capability token must not contain whitespace or control \
301                         characters: {cap:?}"
302                    ),
303                });
304            }
305        }
306        #[cfg(feature = "direct-valkey-claim")]
307        let worker_capabilities_csv: String = {
308            let set: std::collections::BTreeSet<&str> = config
309                .capabilities
310                .iter()
311                .map(|s| s.as_str())
312                .filter(|s| !s.is_empty())
313                .collect();
314            if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
315                return Err(SdkError::Config {
316                    context: "worker_config".into(),
317                    field: Some("capabilities".into()),
318                    message: format!(
319                        "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
320                        ff_core::policy::CAPS_MAX_TOKENS,
321                        set.len()
322                    ),
323                });
324            }
325            let csv = set.into_iter().collect::<Vec<_>>().join(",");
326            if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
327                return Err(SdkError::Config {
328                    context: "worker_config".into(),
329                    field: Some("capabilities".into()),
330                    message: format!(
331                        "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
332                        ff_core::policy::CAPS_MAX_BYTES,
333                        csv.len()
334                    ),
335                });
336            }
337            csv
338        };
339
340        // Short stable digest of the sorted caps CSV, computed once so
341        // per-mismatch logs carry a stable identifier instead of the 4KB
342        // CSV. Shared helper — ff-scheduler uses the same one for its
343        // own per-mismatch logs, so cross-component log lines are
344        // diffable against each other.
345        #[cfg(feature = "direct-valkey-claim")]
346        let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
347
348        // Full CSV logged once at connect so per-mismatch logs (which
349        // carry only the 8-hex hash) can be cross-referenced by ops.
350        #[cfg(feature = "direct-valkey-claim")]
351        if !worker_capabilities_csv.is_empty() {
352            tracing::info!(
353                worker_instance_id = %config.worker_instance_id,
354                worker_caps_hash = %worker_capabilities_hash,
355                worker_caps = %worker_capabilities_csv,
356                "worker connected with capabilities (full CSV — mismatch logs use hash only)"
357            );
358        }
359
360        // Non-authoritative advertisement of caps for operator visibility
361        // (CLI introspection, dashboards). The AUTHORITATIVE source for
362        // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
363        // that, never this string. Lossy here is correctness-safe.
364        //
365        // Storage: a single STRING key holding the sorted CSV. Rationale:
366        //   * **Atomic overwrite.** `SET` is a single command — a concurrent
367        //     reader can never observe a transient empty value (the prior
368        //     DEL+SADD pair had that window).
369        //   * **Crash cleanup without refresh loop.** The alive-key SET NX
370        //     is startup-only (see §1 above); there's no periodic renew
371        //     to piggy-back on, so a TTL on caps would independently
372        //     expire mid-flight and hide a live worker's caps from ops
373        //     tools. Instead we drop the TTL: each reconnect overwrites;
374        //     a crashed worker leaves a stale CSV until a new process
375        //     with the same worker_instance_id boots (which triggers
376        //     `duplicate worker_instance_id` via alive-key guard anyway —
377        //     the orchestrator allocates a new id, and operators can DEL
378        //     the stale caps key if they care).
379        //   * **Empty caps = DEL.** A restart from {gpu} to {} clears the
380        //     advertisement rather than leaving stale data.
381        // Cluster-safe advertisement: the per-worker caps STRING lives at
382        // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
383        // and the INSTANCE ID is SADD'd to the global workers-index SET
384        // `ff:idx:workers` (single slot). The unblock scanner's cluster
385        // enumeration uses SMEMBERS on the index + per-member GET on each
386        // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
387        // cluster mode only scans the shard the SCAN lands on and misses
388        // workers whose key hashes elsewhere). Pattern mirrors Batch A
389        // `budget_policies_index` / `flow_index` / `deps_all_edges`:
390        // operations stay atomic per command, the index is the
391        // cluster-wide enumeration surface.
392        #[cfg(feature = "direct-valkey-claim")]
393        {
394            let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
395            let index_key = ff_core::keys::workers_index_key();
396            let instance_id = config.worker_instance_id.to_string();
397            if worker_capabilities_csv.is_empty() {
398                // No caps advertised. DEL the per-worker caps string AND
399                // SREM from the index so the scanner doesn't GET an empty
400                // string for a worker that never declares caps.
401                let _ = client
402                    .cmd("DEL")
403                    .arg(&caps_key)
404                    .execute::<Option<i64>>()
405                    .await;
406                if let Err(e) = client
407                    .cmd("SREM")
408                    .arg(&index_key)
409                    .arg(&instance_id)
410                    .execute::<Option<i64>>()
411                    .await
412                {
413                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
414                        "SREM workers-index failed; continuing (non-authoritative)");
415                }
416            } else {
417                // Atomic overwrite of the caps STRING (one-command). Then
418                // SADD to the index (idempotent — re-running connect for
419                // the same id is a no-op at SADD level). The per-worker
420                // caps key is written BEFORE the index SADD so that when
421                // the scanner observes the id in the index, the caps key
422                // is guaranteed to resolve to a non-stale CSV (the reverse
423                // order would leak an index entry pointing at a stale or
424                // empty caps key during a narrow window).
425                if let Err(e) = client
426                    .cmd("SET")
427                    .arg(&caps_key)
428                    .arg(&worker_capabilities_csv)
429                    .execute::<Option<String>>()
430                    .await
431                {
432                    tracing::warn!(error = %e, key = %caps_key,
433                        "SET worker caps advertisement failed; continuing");
434                }
435                if let Err(e) = client
436                    .cmd("SADD")
437                    .arg(&index_key)
438                    .arg(&instance_id)
439                    .execute::<Option<i64>>()
440                    .await
441                {
442                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
443                        "SADD workers-index failed; continuing");
444                }
445            }
446        }
447
448        // RFC-012 Stage 1b: wrap the dialed client in a
449        // ValkeyBackend so `ClaimedTask`'s trait forwarders have
450        // something to call. `from_client_and_partitions` reuses the
451        // already-dialed client — no second connection.
452        let backend = ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
453            client.clone(),
454            partition_config,
455        );
456
457        Ok(Self {
458            client,
459            config,
460            partition_config,
461            #[cfg(feature = "direct-valkey-claim")]
462            worker_capabilities_csv,
463            #[cfg(feature = "direct-valkey-claim")]
464            worker_capabilities_hash,
465            #[cfg(feature = "direct-valkey-claim")]
466            lane_index: AtomicUsize::new(0),
467            concurrency_semaphore,
468            #[cfg(feature = "direct-valkey-claim")]
469            scan_cursor: AtomicUsize::new(scan_cursor_init),
470            backend,
471        })
472    }
473
474    /// Store a pre-built [`EngineBackend`] on the worker. Builds
475    /// the worker via the legacy [`FlowFabricWorker::connect`] path
476    /// first (so the embedded `ferriskey::Client` that the Stage 1b
477    /// non-migrated hot paths still use is dialed), then replaces
478    /// the default `ValkeyBackend` wrapper with the caller-supplied
479    /// `Arc<dyn EngineBackend>`.
480    ///
481    /// **Stage 1b scope — what the injected backend covers today.**
482    /// After this PR, `ClaimedTask`'s 8 migrated ops
483    /// (`renew_lease` / `update_progress` / `resume_signals` /
484    /// `delay_execution` / `move_to_waiting_children` /
485    /// `complete` / `cancel` / `fail`) route through the injected
486    /// backend. That's the first material use of `connect_with`: a
487    /// mock backend now genuinely sees the worker's per-task
488    /// write-surface calls. The 4 trait-shape-deferred ops
489    /// (`create_pending_waitpoint`, `append_frame`, `suspend`,
490    /// `report_usage`) still reach the embedded
491    /// `ferriskey::Client` directly until issue #117's trait
492    /// amendment lands; `claim_next` / `claim_from_grant` /
493    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
494    /// are Stage 1c hot-path work. Stage 1d removes the embedded
495    /// client entirely.
496    ///
497    /// Today's constructor is therefore NOT yet a drop-in way to swap
498    /// in a non-Valkey backend — it requires a reachable Valkey node
499    /// for the 4 deferred + hot-path ops. Tests that exercise only
500    /// the 8 migrated ops can run fully against a mock backend.
501    ///
502    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
503    pub async fn connect_with(
504        config: WorkerConfig,
505        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
506    ) -> Result<Self, SdkError> {
507        let mut worker = Self::connect(config).await?;
508        worker.backend = backend;
509        Ok(worker)
510    }
511
512    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
513    /// ops through.
514    ///
515    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
516    /// the `Option` wrapper is retained for API stability with the
517    /// Stage-1a shape. Stage 1c narrows the return type to
518    /// `&Arc<dyn EngineBackend>`.
519    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
520        Some(&self.backend)
521    }
522
523    /// Get a reference to the underlying ferriskey client.
524    pub fn client(&self) -> &Client {
525        &self.client
526    }
527
528    /// Get the worker config.
529    pub fn config(&self) -> &WorkerConfig {
530        &self.config
531    }
532
533    /// Get the server-published partition config this worker bound to at
534    /// `connect()`. Callers need this when computing partition hash-tags
535    /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
536    /// with the server's `num_flow_partitions` — using
537    /// `PartitionConfig::default()` assumes 256 partitions and silently
538    /// misses data on deployments with any other value.
539    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
540        &self.partition_config
541    }
542
543    /// Attempt to claim the next eligible execution.
544    ///
545    /// Phase 1 simplified claim flow:
546    /// 1. Pick a lane (round-robin across configured lanes)
547    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
548    /// 3. Claim the execution via `ff_claim_execution`
549    /// 4. Read execution payload + tags
550    /// 5. Return a [`ClaimedTask`] with auto lease renewal
551    ///
552    /// Gated behind the `direct-valkey-claim` feature — bypasses the
553    /// scheduler's budget / quota / capability admission checks. Enable
554    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
555    /// the scheduler hop would be measurement noise (benches) or when
556    /// the test harness needs a deterministic worker-local path. Prefer
557    /// the scheduler-routed HTTP claim path in production.
558    ///
559    /// # `None` semantics
560    ///
561    /// `Ok(None)` means **no work was found in the partition window this
562    /// poll covered**, not "the cluster is idle". Each call scans a chunk
563    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
564    /// `scan_cursor`; the cursor advances by that chunk size on every
565    /// invocation, so a worker covers every partition exactly once every
566    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
567    ///
568    /// Callers should treat `None` as "poll again soon" (typically after
569    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
570    /// time". Backing off too aggressively on `None` can starve workers
571    /// when work lives on partitions outside the current window.
572    ///
573    /// Returns `Err` on Valkey errors or script failures.
574    #[cfg(feature = "direct-valkey-claim")]
575    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
576        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
577        // try_acquire returns immediately — if no permits available, the worker
578        // is at capacity and should not claim more work.
579        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
580            Ok(p) => p,
581            Err(_) => return Ok(None), // At capacity — no claim attempted
582        };
583
584        let lane_id = self.next_lane();
585        let now = TimestampMs::now();
586
587        // Phase 1: We scan eligible executions directly by reading the eligible
588        // ZSET across execution partitions. In production the scheduler
589        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
590        // simplified inline claim.
591        //
592        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
593        // partitions starting at a rolling offset. This keeps idle Valkey
594        // load at O(chunk) per worker-second instead of O(num_partitions),
595        // and the worker-instance-seeded initial cursor spreads concurrent
596        // workers across different partition windows.
597        let num_partitions = self.partition_config.num_flow_partitions as usize;
598        if num_partitions == 0 {
599            return Ok(None);
600        }
601        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
602        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
603
604        for step in 0..chunk {
605            let partition_idx = ((start + step) % num_partitions) as u16;
606            let partition = ff_core::partition::Partition {
607                family: ff_core::partition::PartitionFamily::Execution,
608                index: partition_idx,
609            };
610            let idx = IndexKeys::new(&partition);
611            let eligible_key = idx.lane_eligible(&lane_id);
612
613            // ZRANGEBYSCORE to get the highest-priority eligible execution.
614            // Score format: -(priority * 1_000_000_000_000) + created_at_ms
615            // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
616            let result: Value = self
617                .client
618                .cmd("ZRANGEBYSCORE")
619                .arg(&eligible_key)
620                .arg("-inf")
621                .arg("+inf")
622                .arg("LIMIT")
623                .arg("0")
624                .arg("1")
625                .execute()
626                .await
627                .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
628
629            let execution_id_str = match extract_first_array_string(&result) {
630                Some(s) => s,
631                None => continue, // No eligible executions on this partition
632            };
633
634            let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
635                SdkError::from(ff_script::error::ScriptError::Parse {
636                    fcall: "claim_execution_from_eligible_set".into(),
637                    execution_id: None,
638                    message: format!("bad execution_id in eligible set: {e}"),
639                })
640            })?;
641
642            // Step 1: Issue claim grant
643            let grant_result = self
644                .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
645                .await;
646
647            match grant_result {
648                Ok(()) => {}
649                Err(SdkError::Engine(ref boxed))
650                    if matches!(
651                        **boxed,
652                        crate::EngineError::Validation {
653                            kind: crate::ValidationKind::CapabilityMismatch,
654                            ..
655                        }
656                    ) =>
657                {
658                    let missing = match &**boxed {
659                        crate::EngineError::Validation { detail, .. } => detail.clone(),
660                        _ => unreachable!(),
661                    };
662                    // Block-on-mismatch (RFC-009 §7.5) — parity with
663                    // ff-scheduler's Scheduler::claim_for_worker. Without
664                    // this, the inline-direct-claim path would hot-loop
665                    // on an unclaimable top-of-zset (every tick picks the
666                    // same execution, wastes an FCALL, logs, releases,
667                    // repeats). The scheduler-side unblock scanner
668                    // promotes blocked_route executions back to eligible
669                    // when a worker with matching caps registers.
670                    tracing::info!(
671                        execution_id = %execution_id,
672                        worker_id = %self.config.worker_id,
673                        worker_caps_hash = %self.worker_capabilities_hash,
674                        missing = %missing,
675                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
676                    );
677                    self.block_route(&execution_id, &lane_id, &partition, &idx).await;
678                    continue;
679                }
680                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
681                    tracing::debug!(
682                        execution_id = %execution_id,
683                        error = %e,
684                        "claim grant failed (retryable), trying next partition"
685                    );
686                    continue;
687                }
688                Err(e) => return Err(e),
689            }
690
691            // Step 2: Claim the execution
692            match self
693                .claim_execution(&execution_id, &lane_id, &partition, now)
694                .await
695            {
696                Ok(mut task) => {
697                    // Transfer concurrency permit to the task. When the task is
698                    // completed/failed/cancelled/dropped the permit returns to
699                    // the semaphore, allowing another claim.
700                    task.set_concurrency_permit(permit);
701                    return Ok(Some(task));
702                }
703                Err(SdkError::Engine(ref boxed))
704                    if matches!(
705                        **boxed,
706                        crate::EngineError::Contention(
707                            crate::ContentionKind::UseClaimResumedExecution
708                        )
709                    ) =>
710                {
711                    // Execution was resumed from suspension — attempt_interrupted.
712                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
713                    // which reuses the existing attempt instead of creating a new one.
714                    tracing::debug!(
715                        execution_id = %execution_id,
716                        "execution is resumed, using claim_resumed path"
717                    );
718                    match self
719                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
720                        .await
721                    {
722                        Ok(mut task) => {
723                            task.set_concurrency_permit(permit);
724                            return Ok(Some(task));
725                        }
726                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
727                            tracing::debug!(
728                                execution_id = %execution_id,
729                                error = %e2,
730                                "claim_resumed failed (retryable), trying next partition"
731                            );
732                            continue;
733                        }
734                        Err(e2) => return Err(e2),
735                    }
736                }
737                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
738                    tracing::debug!(
739                        execution_id = %execution_id,
740                        error = %e,
741                        "claim execution failed (retryable), trying next partition"
742                    );
743                    continue;
744                }
745                Err(e) => return Err(e),
746            }
747        }
748
749        // No eligible work found on any partition
750        Ok(None)
751    }
752
753    #[cfg(feature = "direct-valkey-claim")]
754    async fn issue_claim_grant(
755        &self,
756        execution_id: &ExecutionId,
757        lane_id: &LaneId,
758        partition: &ff_core::partition::Partition,
759        idx: &IndexKeys,
760    ) -> Result<(), SdkError> {
761        let ctx = ExecKeyContext::new(partition, execution_id);
762
763        // KEYS (3): exec_core, claim_grant_key, eligible_zset
764        let keys: Vec<String> = vec![
765            ctx.core(),
766            ctx.claim_grant(),
767            idx.lane_eligible(lane_id),
768        ];
769
770        // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
771        //           capability_hash, grant_ttl_ms, route_snapshot_json,
772        //           admission_summary, worker_capabilities_csv (sorted)
773        let args: Vec<String> = vec![
774            execution_id.to_string(),
775            self.config.worker_id.to_string(),
776            self.config.worker_instance_id.to_string(),
777            lane_id.to_string(),
778            String::new(), // capability_hash
779            "5000".to_owned(), // grant_ttl_ms (5 seconds)
780            String::new(), // route_snapshot_json
781            String::new(), // admission_summary
782            self.worker_capabilities_csv.clone(), // sorted CSV
783        ];
784
785        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
786        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
787
788        let raw: Value = self
789            .client
790            .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
791            .await
792            .map_err(SdkError::Valkey)?;
793
794        crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
795    }
796
797    /// Move an execution from the lane's eligible ZSET into its
798    /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
799    /// after a `CapabilityMismatch` reject — without this, the inline
800    /// direct-claim path would re-pick the same top-of-zset every tick
801    /// (same pattern the scheduler's block_candidate handles). The
802    /// engine's unblock scanner periodically promotes blocked_route
803    /// back to eligible once a worker with matching caps registers.
804    ///
805    /// Best-effort: transport or logical rejects (e.g. the execution
806    /// already went terminal between pick and block) are logged and the
807    /// outer loop simply `continue`s to the next partition. Parity with
808    /// ff-scheduler::Scheduler::block_candidate.
809    #[cfg(feature = "direct-valkey-claim")]
810    async fn block_route(
811        &self,
812        execution_id: &ExecutionId,
813        lane_id: &LaneId,
814        partition: &ff_core::partition::Partition,
815        idx: &IndexKeys,
816    ) {
817        let ctx = ExecKeyContext::new(partition, execution_id);
818        let core_key = ctx.core();
819        let eligible_key = idx.lane_eligible(lane_id);
820        let blocked_key = idx.lane_blocked_route(lane_id);
821        let eid_s = execution_id.to_string();
822        let now_ms = TimestampMs::now().0.to_string();
823
824        let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
825        let argv: [&str; 4] = [
826            &eid_s,
827            "waiting_for_capable_worker",
828            "no connected worker satisfies required_capabilities",
829            &now_ms,
830        ];
831
832        match self
833            .client
834            .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
835            .await
836        {
837            Ok(v) => {
838                // Parse Lua result so a logical reject (e.g. execution
839                // went terminal mid-flight) is visible — same fix we
840                // applied to ff-scheduler's block_candidate.
841                if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
842                    tracing::warn!(
843                        execution_id = %execution_id,
844                        error = %e,
845                        "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
846                         will re-evaluate"
847                    );
848                }
849            }
850            Err(e) => {
851                tracing::warn!(
852                    execution_id = %execution_id,
853                    error = %e,
854                    "SDK block_route: transport failure; eligible ZSET unchanged"
855                );
856            }
857        }
858    }
859
860    /// Low-level claim of a granted execution. Invokes
861    /// `ff_claim_execution` and returns a `ClaimedTask` with auto
862    /// lease renewal.
863    ///
864    /// Previously gated behind `direct-valkey-claim`; ungated so
865    /// the public [`claim_from_grant`] entry point can reuse the
866    /// same FCALL plumbing. The method stays private — external
867    /// callers use `claim_from_grant`.
868    ///
869    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
870    async fn claim_execution(
871        &self,
872        execution_id: &ExecutionId,
873        lane_id: &LaneId,
874        partition: &ff_core::partition::Partition,
875        _now: TimestampMs,
876    ) -> Result<ClaimedTask, SdkError> {
877        let ctx = ExecKeyContext::new(partition, execution_id);
878        let idx = IndexKeys::new(partition);
879
880        // Pre-read total_attempt_count from exec_core to derive next attempt index.
881        // The Lua uses total_attempt_count as the new index and dynamically builds
882        // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
883        // we pass the correct index for documentation/debugging.
884        let total_str: Option<String> = self.client
885            .cmd("HGET")
886            .arg(ctx.core())
887            .arg("total_attempt_count")
888            .execute()
889            .await
890            .unwrap_or(None);
891        let next_idx = total_str
892            .as_deref()
893            .and_then(|s| s.parse::<u32>().ok())
894            .unwrap_or(0);
895        let att_idx = AttemptIndex::new(next_idx);
896
897        let lease_id = LeaseId::new().to_string();
898        let attempt_id = AttemptId::new().to_string();
899        let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
900
901        // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
902        let keys: Vec<String> = vec![
903            ctx.core(),                                    // 1  exec_core
904            ctx.claim_grant(),                             // 2  claim_grant
905            idx.lane_eligible(lane_id),                    // 3  eligible_zset
906            idx.lease_expiry(),                            // 4  lease_expiry_zset
907            idx.worker_leases(&self.config.worker_instance_id), // 5  worker_leases
908            ctx.attempt_hash(att_idx),                     // 6  attempt_hash (placeholder)
909            ctx.attempt_usage(att_idx),                    // 7  attempt_usage (placeholder)
910            ctx.attempt_policy(att_idx),                   // 8  attempt_policy (placeholder)
911            ctx.attempts(),                                // 9  attempts_zset
912            ctx.lease_current(),                           // 10 lease_current
913            ctx.lease_history(),                           // 11 lease_history
914            idx.lane_active(lane_id),                      // 12 active_index
915            idx.attempt_timeout(),                         // 13 attempt_timeout_zset
916            idx.execution_deadline(),                      // 14 execution_deadline_zset
917        ];
918
919        // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
920        let args: Vec<String> = vec![
921            execution_id.to_string(),                      // 1  execution_id
922            self.config.worker_id.to_string(),             // 2  worker_id
923            self.config.worker_instance_id.to_string(),    // 3  worker_instance_id
924            lane_id.to_string(),                           // 4  lane
925            String::new(),                                 // 5  capability_hash
926            lease_id.clone(),                              // 6  lease_id
927            self.config.lease_ttl_ms.to_string(),          // 7  lease_ttl_ms
928            renew_before_ms.to_string(),                   // 8  renew_before_ms
929            attempt_id.clone(),                            // 9  attempt_id
930            "{}".to_owned(),                               // 10 attempt_policy_json
931            String::new(),                                 // 11 attempt_timeout_ms
932            String::new(),                                 // 12 execution_deadline_at
933        ];
934
935        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
936        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
937
938        let raw: Value = self
939            .client
940            .fcall("ff_claim_execution", &key_refs, &arg_refs)
941            .await
942            .map_err(SdkError::Valkey)?;
943
944        // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
945        //                      attempt_id, attempt_type, lease_expires_at}
946        let arr = match &raw {
947            Value::Array(arr) => arr,
948            _ => {
949                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
950                    fcall: "ff_claim_execution".into(),
951                    execution_id: Some(execution_id.to_string()),
952                    message: "expected Array".into(),
953                }));
954            }
955        };
956
957        let status_code = match arr.first() {
958            Some(Ok(Value::Int(n))) => *n,
959            _ => {
960                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
961                    fcall: "ff_claim_execution".into(),
962                    execution_id: Some(execution_id.to_string()),
963                    message: "bad status code".into(),
964                }));
965            }
966        };
967
968        if status_code != 1 {
969            let err_field_str = |idx: usize| -> String {
970                arr.get(idx)
971                    .and_then(|v| match v {
972                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
973                        Ok(Value::SimpleString(s)) => Some(s.clone()),
974                        _ => None,
975                    })
976                    .unwrap_or_default()
977            };
978            let error_code = {
979                let s = err_field_str(1);
980                if s.is_empty() { "unknown".to_owned() } else { s }
981            };
982            let detail = err_field_str(2);
983
984            return Err(SdkError::from(
985                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
986                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
987                        fcall: "ff_claim_execution".into(),
988                        execution_id: Some(execution_id.to_string()),
989                        message: format!("unknown error: {error_code}"),
990                    }),
991            ));
992        }
993
994        // Extract fields from success response
995        let field_str = |idx: usize| -> String {
996            arr.get(idx + 2) // skip status_code and "OK"
997                .and_then(|v| match v {
998                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
999                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1000                    Ok(Value::Int(n)) => Some(n.to_string()),
1001                    _ => None,
1002                })
1003                .unwrap_or_default()
1004        };
1005
1006        // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1007        // Positions:       0         1       2           3            4              5
1008        let lease_id = LeaseId::parse(&field_str(0))
1009            .unwrap_or_else(|_| LeaseId::new());
1010        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1011        // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1012        let attempt_id = AttemptId::parse(&field_str(3))
1013            .unwrap_or_else(|_| AttemptId::new());
1014        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1015
1016        // Read execution payload and metadata
1017        let (input_payload, execution_kind, tags) = self
1018            .read_execution_context(execution_id, partition)
1019            .await?;
1020
1021        Ok(ClaimedTask::new(
1022            self.client.clone(),
1023            self.backend.clone(),
1024            self.partition_config,
1025            execution_id.clone(),
1026            attempt_index,
1027            attempt_id,
1028            lease_id,
1029            lease_epoch,
1030            self.config.lease_ttl_ms,
1031            lane_id.clone(),
1032            self.config.worker_instance_id.clone(),
1033            input_payload,
1034            execution_kind,
1035            tags,
1036        ))
1037    }
1038
1039    /// Consume a [`ClaimGrant`] and claim the granted execution on
1040    /// this worker. The intended production entry point: pair with
1041    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1042    /// scheduler-issued grants into the SDK without enabling the
1043    /// `direct-valkey-claim` feature (which bypasses budget/quota
1044    /// admission control).
1045    ///
1046    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1047    /// so a saturated worker does not consume the grant: the grant
1048    /// stays valid for its remaining TTL and the caller can either
1049    /// release it back to the scheduler or retry after some other
1050    /// in-flight task completes.
1051    ///
1052    /// On success the returned [`ClaimedTask`] holds a concurrency
1053    /// permit that releases automatically on
1054    /// `complete`/`fail`/`cancel`/drop — same contract as
1055    /// `claim_next`.
1056    ///
1057    /// # Arguments
1058    ///
1059    /// * `lane` — the lane the grant was issued for. Must match what
1060    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1061    ///   uses it to look up `lane_eligible`, `lane_active`, and the
1062    ///   `worker_leases` index slot.
1063    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1064    ///
1065    /// # Errors
1066    ///
1067    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1068    ///   permits all held. Retryable; the grant is untouched.
1069    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1070    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1071    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1072    ///   (wrapped in [`SdkError::Engine`]).
1073    /// * `ScriptError::CapabilityMismatch` — execution's required
1074    ///   capabilities not a subset of this worker's caps (wrapped in
1075    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
1076    ///   between grant issuance and caps change allows it.
1077    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1078    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
1079    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1080    ///   transport error during the FCALL or the
1081    ///   `read_execution_context` follow-up.
1082    ///
1083    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1084    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1085    pub async fn claim_from_grant(
1086        &self,
1087        lane: LaneId,
1088        grant: ff_core::contracts::ClaimGrant,
1089    ) -> Result<ClaimedTask, SdkError> {
1090        // Semaphore check FIRST. If the worker is saturated we must
1091        // surface the condition to the caller without touching the
1092        // grant — silently returning Ok(None) (as claim_next does)
1093        // would drop a grant the scheduler has already committed work
1094        // to issuing, wasting the slot until its TTL elapses.
1095        let permit = self
1096            .concurrency_semaphore
1097            .clone()
1098            .try_acquire_owned()
1099            .map_err(|_| SdkError::WorkerAtCapacity)?;
1100
1101        let now = TimestampMs::now();
1102        let partition = grant.partition().map_err(|e| SdkError::Config {
1103            context: "claim_from_grant".to_owned(),
1104            field: Some("partition_key".to_owned()),
1105            message: e.to_string(),
1106        })?;
1107        let mut task = self
1108            .claim_execution(&grant.execution_id, &lane, &partition, now)
1109            .await?;
1110        task.set_concurrency_permit(permit);
1111        Ok(task)
1112    }
1113
1114    /// Scheduler-routed claim: POST the server's
1115    /// `/v1/workers/{id}/claim`, then chain to
1116    /// [`Self::claim_from_grant`].
1117    ///
1118    /// Batch C item 2 PR-B. This is the production entry point —
1119    /// budget + quota + capability admission run server-side inside
1120    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1121    /// enable the `direct-valkey-claim` feature.
1122    ///
1123    /// Returns `Ok(None)` when the server says no eligible execution
1124    /// (HTTP 204). Callers typically back off by
1125    /// `config.claim_poll_interval_ms` and try again, same cadence
1126    /// as the direct-claim path's `Ok(None)`.
1127    ///
1128    /// The `admin` client is the established HTTP surface
1129    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1130    /// second reqwest client around. Build once at worker boot and
1131    /// hand in by reference on every claim.
1132    pub async fn claim_via_server(
1133        &self,
1134        admin: &crate::FlowFabricAdminClient,
1135        lane: &LaneId,
1136        grant_ttl_ms: u64,
1137    ) -> Result<Option<ClaimedTask>, SdkError> {
1138        let req = crate::admin::ClaimForWorkerRequest {
1139            worker_id: self.config.worker_id.to_string(),
1140            lane_id: lane.to_string(),
1141            worker_instance_id: self.config.worker_instance_id.to_string(),
1142            capabilities: self.config.capabilities.clone(),
1143            grant_ttl_ms,
1144        };
1145        let Some(resp) = admin.claim_for_worker(req).await? else {
1146            return Ok(None);
1147        };
1148        let grant = resp.into_grant()?;
1149        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1150    }
1151
1152    /// Consume a [`ReclaimGrant`] and transition the granted
1153    /// `attempt_interrupted` execution into a `started` state on this
1154    /// worker. Symmetric partner to [`claim_from_grant`] for the
1155    /// resume path.
1156    ///
1157    /// The grant must have been issued to THIS worker (matching
1158    /// `worker_id` at grant time). A mismatch returns
1159    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1160    /// atomically by `ff_claim_resumed_execution`; a second call with
1161    /// the same grant also returns `InvalidClaimGrant`.
1162    ///
1163    /// # Concurrency
1164    ///
1165    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1166    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1167    /// assume pre-existing capacity on this worker — a reclaim can
1168    /// land on a fresh worker instance that just came up after a
1169    /// crash/restart and is picking up a previously-interrupted
1170    /// execution. If the worker is saturated, the grant stays valid
1171    /// for its remaining TTL and the caller can release it or retry.
1172    ///
1173    /// On success the returned [`ClaimedTask`] holds a concurrency
1174    /// permit that releases automatically on
1175    /// `complete`/`fail`/`cancel`/drop.
1176    ///
1177    /// # Errors
1178    ///
1179    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1180    ///   permits all held. Retryable; the grant is untouched (no
1181    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1182    ///   atomically consume the grant key).
1183    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1184    ///   or `worker_id` mismatch.
1185    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1186    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1187    ///   `attempt_interrupted`.
1188    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1189    ///   not `runnable`.
1190    /// * `ScriptError::ExecutionNotFound` — core key missing.
1191    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1192    ///   transport.
1193    ///
1194    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1195    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1196    pub async fn claim_from_reclaim_grant(
1197        &self,
1198        grant: ff_core::contracts::ReclaimGrant,
1199    ) -> Result<ClaimedTask, SdkError> {
1200        // Semaphore check FIRST — same load-bearing ordering as
1201        // `claim_from_grant`. If the worker is saturated, surface
1202        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1203        // atomic consume on the grant key, so calling it past-
1204        // saturation would destroy the grant while leaving no
1205        // permit to attach to the returned `ClaimedTask`.
1206        let permit = self
1207            .concurrency_semaphore
1208            .clone()
1209            .try_acquire_owned()
1210            .map_err(|_| SdkError::WorkerAtCapacity)?;
1211
1212        // Grant carries partition + lane_id so no round-trip is needed
1213        // to resolve them before the FCALL.
1214        let partition = grant.partition().map_err(|e| SdkError::Config {
1215            context: "claim_from_reclaim_grant".to_owned(),
1216            field: Some("partition_key".to_owned()),
1217            message: e.to_string(),
1218        })?;
1219        let mut task = self
1220            .claim_resumed_execution(
1221                &grant.execution_id,
1222                &grant.lane_id,
1223                &partition,
1224            )
1225            .await?;
1226        task.set_concurrency_permit(permit);
1227        Ok(task)
1228    }
1229
1230    /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1231    /// and returns a `ClaimedTask` bound to the resumed attempt.
1232    ///
1233    /// Previously gated behind `direct-valkey-claim`; ungated so the
1234    /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1235    /// The method stays private — external callers use
1236    /// `claim_from_reclaim_grant`.
1237    ///
1238    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1239    async fn claim_resumed_execution(
1240        &self,
1241        execution_id: &ExecutionId,
1242        lane_id: &LaneId,
1243        partition: &ff_core::partition::Partition,
1244    ) -> Result<ClaimedTask, SdkError> {
1245        let ctx = ExecKeyContext::new(partition, execution_id);
1246        let idx = IndexKeys::new(partition);
1247
1248        // Pre-read current_attempt_index for the existing attempt hash key.
1249        // This is load-bearing: KEYS[6] must point to the real attempt hash.
1250        let att_idx_str: Option<String> = self.client
1251            .cmd("HGET")
1252            .arg(ctx.core())
1253            .arg("current_attempt_index")
1254            .execute()
1255            .await
1256            .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1257        let att_idx = AttemptIndex::new(
1258            att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1259        );
1260
1261        let lease_id = LeaseId::new().to_string();
1262
1263        // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1264        let keys: Vec<String> = vec![
1265            ctx.core(),                                             // 1  exec_core
1266            ctx.claim_grant(),                                      // 2  claim_grant
1267            idx.lane_eligible(lane_id),                             // 3  eligible_zset
1268            idx.lease_expiry(),                                     // 4  lease_expiry_zset
1269            idx.worker_leases(&self.config.worker_instance_id),     // 5  worker_leases
1270            ctx.attempt_hash(att_idx),                              // 6  existing_attempt_hash
1271            ctx.lease_current(),                                    // 7  lease_current
1272            ctx.lease_history(),                                    // 8  lease_history
1273            idx.lane_active(lane_id),                               // 9  active_index
1274            idx.attempt_timeout(),                                  // 10 attempt_timeout_zset
1275            idx.execution_deadline(),                               // 11 execution_deadline_zset
1276        ];
1277
1278        // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1279        let args: Vec<String> = vec![
1280            execution_id.to_string(),                               // 1  execution_id
1281            self.config.worker_id.to_string(),                      // 2  worker_id
1282            self.config.worker_instance_id.to_string(),             // 3  worker_instance_id
1283            lane_id.to_string(),                                    // 4  lane
1284            String::new(),                                          // 5  capability_hash
1285            lease_id.clone(),                                       // 6  lease_id
1286            self.config.lease_ttl_ms.to_string(),                   // 7  lease_ttl_ms
1287            String::new(),                                          // 8  remaining_attempt_timeout_ms
1288        ];
1289
1290        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1291        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1292
1293        let raw: Value = self
1294            .client
1295            .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1296            .await
1297            .map_err(SdkError::Valkey)?;
1298
1299        // Parse result — same format as ff_claim_execution:
1300        // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1301        let arr = match &raw {
1302            Value::Array(arr) => arr,
1303            _ => {
1304                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1305                    fcall: "ff_claim_resumed_execution".into(),
1306                    execution_id: Some(execution_id.to_string()),
1307                    message: "expected Array".into(),
1308                }));
1309            }
1310        };
1311
1312        let status_code = match arr.first() {
1313            Some(Ok(Value::Int(n))) => *n,
1314            _ => {
1315                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1316                    fcall: "ff_claim_resumed_execution".into(),
1317                    execution_id: Some(execution_id.to_string()),
1318                    message: "bad status code".into(),
1319                }));
1320            }
1321        };
1322
1323        if status_code != 1 {
1324            let err_field_str = |idx: usize| -> String {
1325                arr.get(idx)
1326                    .and_then(|v| match v {
1327                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1328                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1329                        _ => None,
1330                    })
1331                    .unwrap_or_default()
1332            };
1333            let error_code = {
1334                let s = err_field_str(1);
1335                if s.is_empty() { "unknown".to_owned() } else { s }
1336            };
1337            let detail = err_field_str(2);
1338
1339            return Err(SdkError::from(
1340                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1341                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1342                        fcall: "ff_claim_resumed_execution".into(),
1343                        execution_id: Some(execution_id.to_string()),
1344                        message: format!("unknown error: {error_code}"),
1345                    }),
1346            ));
1347        }
1348
1349        let field_str = |idx: usize| -> String {
1350            arr.get(idx + 2)
1351                .and_then(|v| match v {
1352                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1353                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1354                    Ok(Value::Int(n)) => Some(n.to_string()),
1355                    _ => None,
1356                })
1357                .unwrap_or_default()
1358        };
1359
1360        let lease_id = LeaseId::parse(&field_str(0))
1361            .unwrap_or_else(|_| LeaseId::new());
1362        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1363        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1364        let attempt_id = AttemptId::parse(&field_str(3))
1365            .unwrap_or_else(|_| AttemptId::new());
1366
1367        let (input_payload, execution_kind, tags) = self
1368            .read_execution_context(execution_id, partition)
1369            .await?;
1370
1371        Ok(ClaimedTask::new(
1372            self.client.clone(),
1373            self.backend.clone(),
1374            self.partition_config,
1375            execution_id.clone(),
1376            attempt_index,
1377            attempt_id,
1378            lease_id,
1379            lease_epoch,
1380            self.config.lease_ttl_ms,
1381            lane_id.clone(),
1382            self.config.worker_instance_id.clone(),
1383            input_payload,
1384            execution_kind,
1385            tags,
1386        ))
1387    }
1388
1389    /// Read payload + execution_kind + tags from exec_core. Previously
1390    /// gated behind `direct-valkey-claim`; now shared by the
1391    /// feature-gated inline claim path and the public
1392    /// `claim_from_reclaim_grant` entry point.
1393    async fn read_execution_context(
1394        &self,
1395        execution_id: &ExecutionId,
1396        partition: &ff_core::partition::Partition,
1397    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1398        let ctx = ExecKeyContext::new(partition, execution_id);
1399
1400        // Read payload
1401        let payload: Option<String> = self
1402            .client
1403            .get(&ctx.payload())
1404            .await
1405            .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1406        let input_payload = payload.unwrap_or_default().into_bytes();
1407
1408        // Read execution_kind from core
1409        let kind: Option<String> = self
1410            .client
1411            .hget(&ctx.core(), "execution_kind")
1412            .await
1413            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1414        let execution_kind = kind.unwrap_or_default();
1415
1416        // Read tags
1417        let tags: HashMap<String, String> = self
1418            .client
1419            .hgetall(&ctx.tags())
1420            .await
1421            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1422
1423        Ok((input_payload, execution_kind, tags))
1424    }
1425
1426    // ── Phase 3: Signal delivery ──
1427
1428    /// Deliver a signal to a suspended execution's waitpoint.
1429    ///
1430    /// The engine atomically records the signal, evaluates the resume condition,
1431    /// and optionally transitions the execution from `suspended` to `runnable`.
1432    pub async fn deliver_signal(
1433        &self,
1434        execution_id: &ExecutionId,
1435        waitpoint_id: &WaitpointId,
1436        signal: crate::task::Signal,
1437    ) -> Result<crate::task::SignalOutcome, SdkError> {
1438        let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1439        let ctx = ExecKeyContext::new(&partition, execution_id);
1440        let idx = IndexKeys::new(&partition);
1441
1442        let signal_id = ff_core::types::SignalId::new();
1443        let now = TimestampMs::now();
1444
1445        // Pre-read lane_id from exec_core — the execution may be on any lane,
1446        // not necessarily one of this worker's configured lanes.
1447        let lane_str: Option<String> = self
1448            .client
1449            .hget(&ctx.core(), "lane_id")
1450            .await
1451            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1452        let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1453
1454        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1455        //            exec_signals_zset, signal_hash, signal_payload,
1456        //            idem_key, waitpoint_hash, suspension_current,
1457        //            eligible_zset, suspended_zset, delayed_zset,
1458        //            suspension_timeout_zset, hmac_secrets
1459        let idem_key = if let Some(ref ik) = signal.idempotency_key {
1460            ctx.signal_dedup(waitpoint_id, ik)
1461        } else {
1462            ctx.noop() // must share {p:N} hash tag for cluster mode
1463        };
1464        let keys: Vec<String> = vec![
1465            ctx.core(),                                    // 1
1466            ctx.waitpoint_condition(waitpoint_id),         // 2
1467            ctx.waitpoint_signals(waitpoint_id),           // 3
1468            ctx.exec_signals(),                            // 4
1469            ctx.signal(&signal_id),                        // 5
1470            ctx.signal_payload(&signal_id),                // 6
1471            idem_key,                                      // 7
1472            ctx.waitpoint(waitpoint_id),                   // 8
1473            ctx.suspension_current(),                      // 9
1474            idx.lane_eligible(&lane_id),                   // 10
1475            idx.lane_suspended(&lane_id),                  // 11
1476            idx.lane_delayed(&lane_id),                    // 12
1477            idx.suspension_timeout(),                      // 13
1478            idx.waitpoint_hmac_secrets(),                  // 14
1479        ];
1480
1481        let payload_str = signal
1482            .payload
1483            .as_ref()
1484            .map(|p| String::from_utf8_lossy(p).into_owned())
1485            .unwrap_or_default();
1486
1487        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1488        //            signal_category, source_type, source_identity,
1489        //            payload, payload_encoding, idempotency_key,
1490        //            correlation_id, target_scope, created_at,
1491        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1492        //            max_signals_per_execution, waitpoint_token
1493        let args: Vec<String> = vec![
1494            signal_id.to_string(),                           // 1
1495            execution_id.to_string(),                        // 2
1496            waitpoint_id.to_string(),                        // 3
1497            signal.signal_name,                              // 4
1498            signal.signal_category,                          // 5
1499            signal.source_type,                              // 6
1500            signal.source_identity,                          // 7
1501            payload_str,                                     // 8
1502            "json".to_owned(),                               // 9 payload_encoding
1503            signal.idempotency_key.unwrap_or_default(),      // 10
1504            String::new(),                                   // 11 correlation_id
1505            "waitpoint".to_owned(),                          // 12 target_scope
1506            now.to_string(),                                 // 13 created_at
1507            "86400000".to_owned(),                           // 14 dedup_ttl_ms
1508            "0".to_owned(),                                  // 15 resume_delay_ms
1509            "1000".to_owned(),                               // 16 signal_maxlen
1510            "10000".to_owned(),                              // 17 max_signals
1511            // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1512            // use ToString/Display (those are redacted for log safety);
1513            // .as_str() is the explicit opt-in that gets the secret bytes.
1514            signal.waitpoint_token.as_str().to_owned(),      // 18 waitpoint_token
1515        ];
1516
1517        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1518        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1519
1520        let raw: Value = self
1521            .client
1522            .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1523            .await
1524            .map_err(SdkError::Valkey)?;
1525
1526        crate::task::parse_signal_result(&raw)
1527    }
1528
1529    #[cfg(feature = "direct-valkey-claim")]
1530    fn next_lane(&self) -> LaneId {
1531        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1532        self.config.lanes[idx].clone()
1533    }
1534}
1535
1536#[cfg(feature = "direct-valkey-claim")]
1537fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1538    use ff_core::error::ErrorClass;
1539    matches!(
1540        ff_script::engine_error_ext::class(err),
1541        ErrorClass::Retryable | ErrorClass::Informational
1542    )
1543}
1544
1545/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1546/// instance id with FNV-1a to place distinct worker processes on different
1547/// partition windows from their first poll. Zero is valid for single-worker
1548/// clusters but spreads work in multi-worker deployments.
1549#[cfg(feature = "direct-valkey-claim")]
1550fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1551    if num_partitions == 0 {
1552        return 0;
1553    }
1554    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1555}
1556
1557#[cfg(feature = "direct-valkey-claim")]
1558fn extract_first_array_string(value: &Value) -> Option<String> {
1559    match value {
1560        Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1561            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1562            Ok(Value::SimpleString(s)) => Some(s.clone()),
1563            _ => None,
1564        },
1565        _ => None,
1566    }
1567}
1568
1569/// Read partition config from Valkey's `ff:config:partitions` hash.
1570/// Returns Err if the key doesn't exist or can't be read.
1571async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1572    let key = ff_core::keys::global_config_partitions();
1573    let fields: HashMap<String, String> = client
1574        .hgetall(&key)
1575        .await
1576        .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1577
1578    if fields.is_empty() {
1579        return Err(SdkError::Config {
1580            context: "read_partition_config".into(),
1581            field: None,
1582            message: "ff:config:partitions not found in Valkey".into(),
1583        });
1584    }
1585
1586    let parse = |field: &str, default: u16| -> u16 {
1587        fields
1588            .get(field)
1589            .and_then(|v| v.parse().ok())
1590            .filter(|&n: &u16| n > 0)
1591            .unwrap_or(default)
1592    };
1593
1594    Ok(PartitionConfig {
1595        num_flow_partitions: parse("num_flow_partitions", 256),
1596        num_budget_partitions: parse("num_budget_partitions", 32),
1597        num_quota_partitions: parse("num_quota_partitions", 32),
1598    })
1599}
1600
1601#[cfg(all(test, feature = "direct-valkey-claim"))]
1602mod scan_cursor_tests {
1603    use super::scan_cursor_seed;
1604
1605    #[test]
1606    fn stable_for_same_input() {
1607        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1608    }
1609
1610    #[test]
1611    fn distinct_for_different_ids() {
1612        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1613    }
1614
1615    #[test]
1616    fn bounded_by_partition_count() {
1617        for i in 0..100 {
1618            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1619        }
1620    }
1621
1622    #[test]
1623    fn zero_partitions_returns_zero() {
1624        assert_eq!(scan_cursor_seed("w1", 0), 0);
1625    }
1626}