Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6use ferriskey::{Client, Value};
7use ff_core::keys::{ExecKeyContext, IndexKeys};
8use ff_core::partition::PartitionConfig;
9use ff_core::types::*;
10use tokio::sync::Semaphore;
11
12use crate::config::WorkerConfig;
13use crate::task::ClaimedTask;
14use crate::SdkError;
15
16/// FlowFabric worker — connects to Valkey, claims executions, and provides
17/// the worker-facing API.
18///
19/// # Admission control
20///
21/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
22/// **bypasses the scheduler's admission controls**: it reads the eligible
23/// ZSET directly and mints its own claim grant without consulting budget
24/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
25/// benchmarks, tests, and single-tenant development where the scheduler
26/// hop is measurement noise, not for production.
27///
28/// For production deployments, consume scheduler-issued grants via
29/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
30/// budget breach, quota sliding-window, concurrency cap, and
31/// capability-match checks before issuing grants.
32///
33/// # Usage
34///
35/// ```rust,ignore
36/// use ff_core::backend::BackendConfig;
37/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
38/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
39///
40/// let config = WorkerConfig {
41///     backend: BackendConfig::valkey("localhost", 6379),
42///     worker_id: WorkerId::new("w1"),
43///     worker_instance_id: WorkerInstanceId::new("w1-i1"),
44///     namespace: Namespace::new("default"),
45///     lanes: vec![LaneId::new("main")],
46///     capabilities: Vec::new(),
47///     lease_ttl_ms: 30_000,
48///     claim_poll_interval_ms: 1_000,
49///     max_concurrent_tasks: 1,
50/// };
51/// let worker = FlowFabricWorker::connect(config).await?;
52///
53/// loop {
54///     if let Some(task) = worker.claim_next().await? {
55///         // Process task...
56///         task.complete(Some(b"result".to_vec())).await?;
57///     } else {
58///         tokio::time::sleep(Duration::from_secs(1)).await;
59///     }
60/// }
61/// ```
62pub struct FlowFabricWorker {
63    client: Client,
64    config: WorkerConfig,
65    partition_config: PartitionConfig,
66    /// Sorted, deduplicated, comma-separated capabilities — computed once
67    /// from `config.capabilities` at connect time. Passed as ARGV[9] to
68    /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
69    /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
70    /// reproducible logs and tests.
71    #[cfg(feature = "direct-valkey-claim")]
72    worker_capabilities_csv: String,
73    /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
74    /// per-mismatch logs so the 4KB CSV never echoes on every reject
75    /// during an incident. Full CSV logged once at connect-time WARN for
76    /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
77    #[cfg(feature = "direct-valkey-claim")]
78    worker_capabilities_hash: String,
79    #[cfg(feature = "direct-valkey-claim")]
80    lane_index: AtomicUsize,
81    /// Concurrency cap for in-flight tasks. Permits are acquired or
82    /// transferred by [`claim_next`] (feature-gated),
83    /// [`claim_from_grant`] (always available), and
84    /// [`claim_from_reclaim_grant`], transferred to the returned
85    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
86    /// Holds `max_concurrent_tasks` permits total.
87    ///
88    /// [`claim_next`]: FlowFabricWorker::claim_next
89    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
90    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
91    concurrency_semaphore: Arc<Semaphore>,
92    /// Rolling offset for chunked partition scans. Each poll advances the
93    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
94    /// chunk)` polls every partition is covered. The initial value is
95    /// derived from `worker_instance_id` so idle workers spread their
96    /// scans across different partitions from the first poll onward.
97    ///
98    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
99    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
100    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
101    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
102    /// correctness because the sequence simply restarts a new cycle.
103    #[cfg(feature = "direct-valkey-claim")]
104    scan_cursor: AtomicUsize,
105    /// The [`EngineBackend`] the Stage-1b trait forwarders route
106    /// through.
107    ///
108    /// **RFC-012 Stage 1b.** Always populated:
109    /// [`FlowFabricWorker::connect`] now wraps the worker's own
110    /// `ferriskey::Client` in a `ValkeyBackend` via
111    /// `ValkeyBackend::from_client_and_partitions`, and
112    /// [`FlowFabricWorker::connect_with`] replaces that default with
113    /// the caller-supplied `Arc<dyn EngineBackend>`. The
114    /// [`FlowFabricWorker::backend`] accessor still returns
115    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
116    /// narrows the return type once consumers have migrated.
117    ///
118    /// Hot paths (claim, deliver_signal, admin queries) still use the
119    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
120    /// migrates them through this field, and Stage 1d removes the
121    /// embedded client.
122    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
123    /// Optional handle to the same underlying backend viewed as a
124    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
125    /// Populated by [`Self::connect`] from the bundled
126    /// `ValkeyBackend` (which implements the trait); supplied by the
127    /// caller on [`Self::connect_with`] as an explicit
128    /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
129    /// backend does not support push-based completion" (e.g. a future
130    /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
131    /// and other completion-subscription consumers reach this through
132    /// [`Self::completion_backend`].
133    completion_backend_handle:
134        Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
135}
136
137/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
138/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
139/// O(num_flow_partitions).
140#[cfg(feature = "direct-valkey-claim")]
141const PARTITION_SCAN_CHUNK: usize = 32;
142
143impl FlowFabricWorker {
144    /// Connect to Valkey and prepare the worker.
145    ///
146    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
147    /// library — that is the server's responsibility (ff-server calls
148    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
149    /// the library is already loaded.
150    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
151        if config.lanes.is_empty() {
152            return Err(SdkError::Config {
153                context: "worker_config".into(),
154                field: None,
155                message: "at least one lane is required".into(),
156            });
157        }
158
159        // Build the ferriskey client from the nested `BackendConfig`.
160        // Delegates to `ff_backend_valkey::build_client` so host/port +
161        // TLS + cluster + `BackendTimeouts::request` +
162        // `BackendRetry` wiring lives in exactly one place (pre-Stage
163        // 1c this path had its own `ClientBuilder` chain that diverged
164        // from the backend's shape; RFC-012 Stage 1c tranche 1
165        // consolidates).
166        let client = ff_backend_valkey::build_client(&config.backend).await?;
167
168        // Verify connectivity
169        let pong: String = client
170            .cmd("PING")
171            .execute()
172            .await
173            .map_err(|e| crate::backend_context(e, "PING failed"))?;
174        if pong != "PONG" {
175            return Err(SdkError::Config {
176                context: "worker_connect".into(),
177                field: None,
178                message: format!("unexpected PING response: {pong}"),
179            });
180        }
181
182        // Guard against two worker processes sharing the same
183        // `worker_instance_id`. A duplicate instance would clobber each
184        // other's lease_current/active_index entries and double-claim work.
185        // SET NX on a liveness key with 2× lease TTL; if the key already
186        // exists another process is live. The key auto-expires if this
187        // process crashes without renewal, so a restart after a hard crash
188        // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
189        //
190        // Known limitations of this minimal scheme (documented for operators):
191        //   1. **Startup-only, not runtime.** There is no heartbeat renewal
192        //      path. After `2 × lease_ttl_ms` elapses the alive key expires
193        //      naturally even while this worker is still running, and a
194        //      second process with the same `worker_instance_id` launched
195        //      later will successfully SET NX alongside the first. The check
196        //      catches misconfiguration at boot; it does not fence duplicates
197        //      that appear mid-lifetime. Production deployments should rely
198        //      on the orchestrator (Kubernetes, systemd unit with
199        //      `Restart=on-failure`, etc.) as the authoritative single-
200        //      instance enforcer; this SET NX is belt-and-suspenders.
201        //
202        //   2. **Restart delay after a crash.** If a worker crashes
203        //      ungracefully (SIGKILL, container OOM) and is restarted within
204        //      `2 × lease_ttl_ms`, the alive key is still present and the
205        //      new process exits with `SdkError::Config("duplicate
206        //      worker_instance_id ...")`. Options for operators:
207        //        - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
208        //          before restarting.
209        //        - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
210        //          unblock the restart.
211        //        - Use a fresh `worker_instance_id` for the restart (the
212        //          orchestrator should already do this per-Pod).
213        //
214        //   3. **No graceful cleanup on shutdown.** There is no explicit
215        //      `disconnect()` call that DELs the alive key. On clean
216        //      `SIGTERM` the key lingers until its TTL expires. A follow-up
217        //      can add `FlowFabricWorker::disconnect(self)` for callers that
218        //      want to skip the restart-delay window.
219        let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
220        let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
221        let set_result: Option<String> = client
222            .cmd("SET")
223            .arg(&alive_key)
224            .arg("1")
225            .arg("NX")
226            .arg("PX")
227            .arg(alive_ttl_ms.to_string().as_str())
228            .execute()
229            .await
230            .map_err(|e| crate::backend_context(e, "SET NX worker alive key"))?;
231        if set_result.is_none() {
232            return Err(SdkError::Config {
233                context: "worker_connect".into(),
234                field: Some("worker_instance_id".into()),
235                message: format!(
236                    "duplicate worker_instance_id '{}': another process already holds {alive_key}",
237                    config.worker_instance_id
238                ),
239            });
240        }
241
242        // Read partition config from Valkey (set by ff-server on startup).
243        // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
244        let partition_config = read_partition_config(&client).await
245            .unwrap_or_else(|e| {
246                tracing::warn!(
247                    error = %e,
248                    "ff:config:partitions not found, using defaults"
249                );
250                PartitionConfig::default()
251            });
252
253        let max_tasks = config.max_concurrent_tasks.max(1);
254        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
255
256        tracing::info!(
257            worker_id = %config.worker_id,
258            instance_id = %config.worker_instance_id,
259            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
260            "FlowFabricWorker connected"
261        );
262
263        #[cfg(feature = "direct-valkey-claim")]
264        let scan_cursor_init = scan_cursor_seed(
265            config.worker_instance_id.as_str(),
266            partition_config.num_flow_partitions.max(1) as usize,
267        );
268
269        // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
270        // and deduplicates in one pass; string joining happens once here.
271        //
272        // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
273        //   - `,` is the CSV delimiter; a token containing one would split
274        //     mid-parse and could let a {"gpu"} worker appear to satisfy
275        //     {"gpu,cuda"} (silent auth bypass).
276        //   - Empty strings would produce leading/adjacent commas on the
277        //     wire and inflate token count for no semantic reason.
278        //   - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
279        //     `"gpu\n"` vs `"gpu"` produce silent mismatches that are
280        //     miserable to debug. Reject anything outside printable ASCII
281        //     excluding space (`'!'..='~'`) at ingress so a typo fails
282        //     loudly at connect instead of silently mis-routing forever.
283        // Reject at boot so operator misconfig is loud, symmetric with the
284        // scheduler path.
285        #[cfg(feature = "direct-valkey-claim")]
286        for cap in &config.capabilities {
287            if cap.is_empty() {
288                return Err(SdkError::Config {
289                    context: "worker_config".into(),
290                    field: Some("capabilities".into()),
291                    message: "capability token must not be empty".into(),
292                });
293            }
294            if cap.contains(',') {
295                return Err(SdkError::Config {
296                    context: "worker_config".into(),
297                    field: Some("capabilities".into()),
298                    message: format!(
299                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
300                    ),
301                });
302            }
303            // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
304            // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
305            // characters above 0x7F are ALLOWED so i18n caps like
306            // "东京-gpu" can be used. The CSV wire form is byte-safe for
307            // multibyte UTF-8 because `,` is always a single byte and
308            // never part of a multibyte continuation (only 0x80-0xBF are
309            // continuations, ',' is 0x2C).
310            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
311                return Err(SdkError::Config {
312                    context: "worker_config".into(),
313                    field: Some("capabilities".into()),
314                    message: format!(
315                        "capability token must not contain whitespace or control \
316                         characters: {cap:?}"
317                    ),
318                });
319            }
320        }
321        #[cfg(feature = "direct-valkey-claim")]
322        let worker_capabilities_csv: String = {
323            let set: std::collections::BTreeSet<&str> = config
324                .capabilities
325                .iter()
326                .map(|s| s.as_str())
327                .filter(|s| !s.is_empty())
328                .collect();
329            if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
330                return Err(SdkError::Config {
331                    context: "worker_config".into(),
332                    field: Some("capabilities".into()),
333                    message: format!(
334                        "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
335                        ff_core::policy::CAPS_MAX_TOKENS,
336                        set.len()
337                    ),
338                });
339            }
340            let csv = set.into_iter().collect::<Vec<_>>().join(",");
341            if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
342                return Err(SdkError::Config {
343                    context: "worker_config".into(),
344                    field: Some("capabilities".into()),
345                    message: format!(
346                        "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
347                        ff_core::policy::CAPS_MAX_BYTES,
348                        csv.len()
349                    ),
350                });
351            }
352            csv
353        };
354
355        // Short stable digest of the sorted caps CSV, computed once so
356        // per-mismatch logs carry a stable identifier instead of the 4KB
357        // CSV. Shared helper — ff-scheduler uses the same one for its
358        // own per-mismatch logs, so cross-component log lines are
359        // diffable against each other.
360        #[cfg(feature = "direct-valkey-claim")]
361        let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
362
363        // Full CSV logged once at connect so per-mismatch logs (which
364        // carry only the 8-hex hash) can be cross-referenced by ops.
365        #[cfg(feature = "direct-valkey-claim")]
366        if !worker_capabilities_csv.is_empty() {
367            tracing::info!(
368                worker_instance_id = %config.worker_instance_id,
369                worker_caps_hash = %worker_capabilities_hash,
370                worker_caps = %worker_capabilities_csv,
371                "worker connected with capabilities (full CSV — mismatch logs use hash only)"
372            );
373        }
374
375        // Non-authoritative advertisement of caps for operator visibility
376        // (CLI introspection, dashboards). The AUTHORITATIVE source for
377        // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
378        // that, never this string. Lossy here is correctness-safe.
379        //
380        // Storage: a single STRING key holding the sorted CSV. Rationale:
381        //   * **Atomic overwrite.** `SET` is a single command — a concurrent
382        //     reader can never observe a transient empty value (the prior
383        //     DEL+SADD pair had that window).
384        //   * **Crash cleanup without refresh loop.** The alive-key SET NX
385        //     is startup-only (see §1 above); there's no periodic renew
386        //     to piggy-back on, so a TTL on caps would independently
387        //     expire mid-flight and hide a live worker's caps from ops
388        //     tools. Instead we drop the TTL: each reconnect overwrites;
389        //     a crashed worker leaves a stale CSV until a new process
390        //     with the same worker_instance_id boots (which triggers
391        //     `duplicate worker_instance_id` via alive-key guard anyway —
392        //     the orchestrator allocates a new id, and operators can DEL
393        //     the stale caps key if they care).
394        //   * **Empty caps = DEL.** A restart from {gpu} to {} clears the
395        //     advertisement rather than leaving stale data.
396        // Cluster-safe advertisement: the per-worker caps STRING lives at
397        // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
398        // and the INSTANCE ID is SADD'd to the global workers-index SET
399        // `ff:idx:workers` (single slot). The unblock scanner's cluster
400        // enumeration uses SMEMBERS on the index + per-member GET on each
401        // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
402        // cluster mode only scans the shard the SCAN lands on and misses
403        // workers whose key hashes elsewhere). Pattern mirrors Batch A
404        // `budget_policies_index` / `flow_index` / `deps_all_edges`:
405        // operations stay atomic per command, the index is the
406        // cluster-wide enumeration surface.
407        #[cfg(feature = "direct-valkey-claim")]
408        {
409            let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
410            let index_key = ff_core::keys::workers_index_key();
411            let instance_id = config.worker_instance_id.to_string();
412            if worker_capabilities_csv.is_empty() {
413                // No caps advertised. DEL the per-worker caps string AND
414                // SREM from the index so the scanner doesn't GET an empty
415                // string for a worker that never declares caps.
416                let _ = client
417                    .cmd("DEL")
418                    .arg(&caps_key)
419                    .execute::<Option<i64>>()
420                    .await;
421                if let Err(e) = client
422                    .cmd("SREM")
423                    .arg(&index_key)
424                    .arg(&instance_id)
425                    .execute::<Option<i64>>()
426                    .await
427                {
428                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
429                        "SREM workers-index failed; continuing (non-authoritative)");
430                }
431            } else {
432                // Atomic overwrite of the caps STRING (one-command). Then
433                // SADD to the index (idempotent — re-running connect for
434                // the same id is a no-op at SADD level). The per-worker
435                // caps key is written BEFORE the index SADD so that when
436                // the scanner observes the id in the index, the caps key
437                // is guaranteed to resolve to a non-stale CSV (the reverse
438                // order would leak an index entry pointing at a stale or
439                // empty caps key during a narrow window).
440                if let Err(e) = client
441                    .cmd("SET")
442                    .arg(&caps_key)
443                    .arg(&worker_capabilities_csv)
444                    .execute::<Option<String>>()
445                    .await
446                {
447                    tracing::warn!(error = %e, key = %caps_key,
448                        "SET worker caps advertisement failed; continuing");
449                }
450                if let Err(e) = client
451                    .cmd("SADD")
452                    .arg(&index_key)
453                    .arg(&instance_id)
454                    .execute::<Option<i64>>()
455                    .await
456                {
457                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
458                        "SADD workers-index failed; continuing");
459                }
460            }
461        }
462
463        // RFC-012 Stage 1b: wrap the dialed client in a
464        // ValkeyBackend so `ClaimedTask`'s trait forwarders have
465        // something to call. `from_client_and_partitions` reuses the
466        // already-dialed client — no second connection.
467        // Share the concrete `Arc<ValkeyBackend>` across the two
468        // trait objects — one allocation, both accessors yield
469        // identity-equivalent handles.
470        let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
471            ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
472                client.clone(),
473                partition_config,
474            );
475        let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
476        let completion_backend_handle: Option<
477            Arc<dyn ff_core::completion_backend::CompletionBackend>,
478        > = Some(valkey_backend);
479
480        Ok(Self {
481            client,
482            config,
483            partition_config,
484            #[cfg(feature = "direct-valkey-claim")]
485            worker_capabilities_csv,
486            #[cfg(feature = "direct-valkey-claim")]
487            worker_capabilities_hash,
488            #[cfg(feature = "direct-valkey-claim")]
489            lane_index: AtomicUsize::new(0),
490            concurrency_semaphore,
491            #[cfg(feature = "direct-valkey-claim")]
492            scan_cursor: AtomicUsize::new(scan_cursor_init),
493            backend,
494            completion_backend_handle,
495        })
496    }
497
498    /// Store pre-built [`EngineBackend`] and (optional)
499    /// [`CompletionBackend`] handles on the worker. Builds the worker
500    /// via the legacy [`FlowFabricWorker::connect`] path first (so the
501    /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
502    /// paths still use is dialed), then replaces the default
503    /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
504    ///
505    /// The `completion` argument is explicit: 0.3.3 previously accepted
506    /// only `backend` and `completion_backend()` silently returned
507    /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
508    /// upcast to `Arc<dyn CompletionBackend>` without loss of
509    /// trait-object identity. 0.3.4 lets the caller decide.
510    ///
511    /// - `Some(arc)` — caller supplies a completion backend.
512    ///   [`Self::completion_backend`] returns `Some(clone)`.
513    /// - `None` — this backend does not support push-based completion
514    ///   (future Postgres backend without LISTEN/NOTIFY, test mocks).
515    ///   [`Self::completion_backend`] returns `None`.
516    ///
517    /// When the underlying backend implements both traits (as
518    /// `ValkeyBackend` does), pass the same `Arc` twice — the two
519    /// trait-object views share one allocation:
520    ///
521    /// ```rust,ignore
522    /// use std::sync::Arc;
523    /// use ff_backend_valkey::ValkeyBackend;
524    /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
525    ///
526    /// # async fn doc(worker_config: WorkerConfig,
527    /// #              backend_config: ff_backend_valkey::BackendConfig)
528    /// #     -> Result<(), ff_sdk::SdkError> {
529    /// // Valkey (completion supported):
530    /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
531    /// let worker = FlowFabricWorker::connect_with(
532    ///     worker_config,
533    ///     valkey.clone(),
534    ///     Some(valkey),
535    /// ).await?;
536    /// # Ok(()) }
537    /// ```
538    ///
539    /// Backend without completion support:
540    ///
541    /// ```rust,ignore
542    /// let worker = FlowFabricWorker::connect_with(
543    ///     worker_config,
544    ///     backend,
545    ///     None,
546    /// ).await?;
547    /// ```
548    ///
549    /// **Stage 1b + Round-7 scope — what the injected backend covers
550    /// today.** The injected backend currently covers these per-task
551    /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
552    /// `delay_execution` / `move_to_waiting_children` / `complete` /
553    /// `cancel` / `fail` / `create_pending_waitpoint` /
554    /// `append_frame` / `report_usage`. A mock backend therefore sees
555    /// that portion of the worker's per-task write surface. Lease
556    /// renewal also routes through `backend.renew(&handle)`. Round-7
557    /// (#135/#145) closed the four trait-shape gaps tracked by #117,
558    /// but `suspend` still reaches the embedded `ferriskey::Client`
559    /// directly via `ff_suspend_execution` — this is the deferred
560    /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
561    /// work. `claim_next` / `claim_from_grant` /
562    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
563    /// are Stage 1c hot-path work. Stage 1d removes the embedded
564    /// client entirely.
565    ///
566    /// Today's constructor is therefore NOT yet a drop-in way to swap
567    /// in a non-Valkey backend — it requires a reachable Valkey node
568    /// for `suspend` plus the remaining hot-path ops. Tests that
569    /// exercise only the migrated per-task ops can run fully against
570    /// a mock backend.
571    ///
572    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
573    /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
574    pub async fn connect_with(
575        config: WorkerConfig,
576        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
577        completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
578    ) -> Result<Self, SdkError> {
579        let mut worker = Self::connect(config).await?;
580        worker.backend = backend;
581        worker.completion_backend_handle = completion;
582        Ok(worker)
583    }
584
585    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
586    /// ops through.
587    ///
588    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
589    /// the `Option` wrapper is retained for API stability with the
590    /// Stage-1a shape. Stage 1c narrows the return type to
591    /// `&Arc<dyn EngineBackend>`.
592    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
593        Some(&self.backend)
594    }
595
596    /// Crate-internal direct borrow of the backend. The public
597    /// [`Self::backend`] still returns `Option` for API stability
598    /// (Stage 1b holdover). Snapshot trait-forwarders in
599    /// [`crate::snapshot`] need an un-wrapped reference.
600    pub(crate) fn backend_ref(
601        &self,
602    ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
603        &self.backend
604    }
605
606    /// Handle to the completion-event subscription backend, for
607    /// consumers that need to observe execution completions (DAG
608    /// reconcilers, tenant-isolated subscribers).
609    ///
610    /// Returns `Some` when the worker was built through
611    /// [`Self::connect`] on the default `valkey-default` feature
612    /// (the bundled `ValkeyBackend` implements
613    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
614    /// or via [`Self::connect_with`] with a `Some(..)` completion
615    /// handle. Returns `None` when the caller passed `None` to
616    /// [`Self::connect_with`] — i.e. the backend does not support
617    /// push-based completion streams (future Postgres without
618    /// LISTEN/NOTIFY, test mocks).
619    ///
620    /// The returned handle shares the same underlying allocation as
621    /// [`Self::backend`]; calls through it (e.g.
622    /// `subscribe_completions_filtered`) hit the same connection
623    /// the worker itself uses.
624    pub fn completion_backend(
625        &self,
626    ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
627        self.completion_backend_handle.clone()
628    }
629
630    /// Crate-internal borrow of the underlying `ferriskey::Client`.
631    ///
632    /// **RFC-012 Stage 1c tranche-4 (#87):** downgraded from `pub` to
633    /// `pub(crate)`. The public `FlowFabricWorker` surface no longer
634    /// exposes the ferriskey type — consumers that previously reached
635    /// in for `read_stream` / `tail_stream` should now use
636    /// [`ClaimedTask::read_stream`](crate::ClaimedTask::read_stream) /
637    /// [`ClaimedTask::tail_stream`](crate::ClaimedTask::tail_stream)
638    /// (task-holders) or [`read_stream`](crate::read_stream) /
639    /// [`tail_stream`](crate::tail_stream) through a `&dyn
640    /// EngineBackend` (non-task callers).
641    ///
642    /// Retained for in-crate use by
643    /// [`crate::worker::FlowFabricWorker`]'s raw HGET helpers, which
644    /// are not yet migrated to the trait (separate tracking issue).
645    pub(crate) fn client(&self) -> &Client {
646        &self.client
647    }
648
649    /// Get the worker config.
650    pub fn config(&self) -> &WorkerConfig {
651        &self.config
652    }
653
654    /// Get the server-published partition config this worker bound to at
655    /// `connect()`. Exposed so consumers that mint custom
656    /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
657    /// produced outside this worker) stay aligned with the server's
658    /// `num_flow_partitions` — using `PartitionConfig::default()`
659    /// assumes 256 partitions and silently misses data on deployments
660    /// with any other value.
661    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
662        &self.partition_config
663    }
664
665    /// Attempt to claim the next eligible execution.
666    ///
667    /// Phase 1 simplified claim flow:
668    /// 1. Pick a lane (round-robin across configured lanes)
669    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
670    /// 3. Claim the execution via `ff_claim_execution`
671    /// 4. Read execution payload + tags
672    /// 5. Return a [`ClaimedTask`] with auto lease renewal
673    ///
674    /// Gated behind the `direct-valkey-claim` feature — bypasses the
675    /// scheduler's budget / quota / capability admission checks. Enable
676    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
677    /// the scheduler hop would be measurement noise (benches) or when
678    /// the test harness needs a deterministic worker-local path. Prefer
679    /// the scheduler-routed HTTP claim path in production.
680    ///
681    /// # `None` semantics
682    ///
683    /// `Ok(None)` means **no work was found in the partition window this
684    /// poll covered**, not "the cluster is idle". Each call scans a chunk
685    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
686    /// `scan_cursor`; the cursor advances by that chunk size on every
687    /// invocation, so a worker covers every partition exactly once every
688    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
689    ///
690    /// Callers should treat `None` as "poll again soon" (typically after
691    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
692    /// time". Backing off too aggressively on `None` can starve workers
693    /// when work lives on partitions outside the current window.
694    ///
695    /// Returns `Err` on Valkey errors or script failures.
696    #[cfg(feature = "direct-valkey-claim")]
697    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
698        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
699        // try_acquire returns immediately — if no permits available, the worker
700        // is at capacity and should not claim more work.
701        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
702            Ok(p) => p,
703            Err(_) => return Ok(None), // At capacity — no claim attempted
704        };
705
706        let lane_id = self.next_lane();
707        let now = TimestampMs::now();
708
709        // Phase 1: We scan eligible executions directly by reading the eligible
710        // ZSET across execution partitions. In production the scheduler
711        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
712        // simplified inline claim.
713        //
714        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
715        // partitions starting at a rolling offset. This keeps idle Valkey
716        // load at O(chunk) per worker-second instead of O(num_partitions),
717        // and the worker-instance-seeded initial cursor spreads concurrent
718        // workers across different partition windows.
719        let num_partitions = self.partition_config.num_flow_partitions as usize;
720        if num_partitions == 0 {
721            return Ok(None);
722        }
723        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
724        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
725
726        for step in 0..chunk {
727            let partition_idx = ((start + step) % num_partitions) as u16;
728            let partition = ff_core::partition::Partition {
729                family: ff_core::partition::PartitionFamily::Execution,
730                index: partition_idx,
731            };
732            let idx = IndexKeys::new(&partition);
733            let eligible_key = idx.lane_eligible(&lane_id);
734
735            // ZRANGEBYSCORE to get the highest-priority eligible execution.
736            // Score format: -(priority * 1_000_000_000_000) + created_at_ms
737            // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
738            let result: Value = self
739                .client
740                .cmd("ZRANGEBYSCORE")
741                .arg(&eligible_key)
742                .arg("-inf")
743                .arg("+inf")
744                .arg("LIMIT")
745                .arg("0")
746                .arg("1")
747                .execute()
748                .await
749                .map_err(|e| crate::backend_context(e, "ZRANGEBYSCORE failed"))?;
750
751            let execution_id_str = match extract_first_array_string(&result) {
752                Some(s) => s,
753                None => continue, // No eligible executions on this partition
754            };
755
756            let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
757                SdkError::from(ff_script::error::ScriptError::Parse {
758                    fcall: "claim_execution_from_eligible_set".into(),
759                    execution_id: None,
760                    message: format!("bad execution_id in eligible set: {e}"),
761                })
762            })?;
763
764            // Step 1: Issue claim grant
765            let grant_result = self
766                .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
767                .await;
768
769            match grant_result {
770                Ok(()) => {}
771                Err(SdkError::Engine(ref boxed))
772                    if matches!(
773                        **boxed,
774                        crate::EngineError::Validation {
775                            kind: crate::ValidationKind::CapabilityMismatch,
776                            ..
777                        }
778                    ) =>
779                {
780                    let missing = match &**boxed {
781                        crate::EngineError::Validation { detail, .. } => detail.clone(),
782                        _ => unreachable!(),
783                    };
784                    // Block-on-mismatch (RFC-009 §7.5) — parity with
785                    // ff-scheduler's Scheduler::claim_for_worker. Without
786                    // this, the inline-direct-claim path would hot-loop
787                    // on an unclaimable top-of-zset (every tick picks the
788                    // same execution, wastes an FCALL, logs, releases,
789                    // repeats). The scheduler-side unblock scanner
790                    // promotes blocked_route executions back to eligible
791                    // when a worker with matching caps registers.
792                    tracing::info!(
793                        execution_id = %execution_id,
794                        worker_id = %self.config.worker_id,
795                        worker_caps_hash = %self.worker_capabilities_hash,
796                        missing = %missing,
797                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
798                    );
799                    self.block_route(&execution_id, &lane_id, &partition, &idx).await;
800                    continue;
801                }
802                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
803                    tracing::debug!(
804                        execution_id = %execution_id,
805                        error = %e,
806                        "claim grant failed (retryable), trying next partition"
807                    );
808                    continue;
809                }
810                Err(e) => return Err(e),
811            }
812
813            // Step 2: Claim the execution
814            match self
815                .claim_execution(&execution_id, &lane_id, &partition, now)
816                .await
817            {
818                Ok(mut task) => {
819                    // Transfer concurrency permit to the task. When the task is
820                    // completed/failed/cancelled/dropped the permit returns to
821                    // the semaphore, allowing another claim.
822                    task.set_concurrency_permit(permit);
823                    return Ok(Some(task));
824                }
825                Err(SdkError::Engine(ref boxed))
826                    if matches!(
827                        **boxed,
828                        crate::EngineError::Contention(
829                            crate::ContentionKind::UseClaimResumedExecution
830                        )
831                    ) =>
832                {
833                    // Execution was resumed from suspension — attempt_interrupted.
834                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
835                    // which reuses the existing attempt instead of creating a new one.
836                    tracing::debug!(
837                        execution_id = %execution_id,
838                        "execution is resumed, using claim_resumed path"
839                    );
840                    match self
841                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
842                        .await
843                    {
844                        Ok(mut task) => {
845                            task.set_concurrency_permit(permit);
846                            return Ok(Some(task));
847                        }
848                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
849                            tracing::debug!(
850                                execution_id = %execution_id,
851                                error = %e2,
852                                "claim_resumed failed (retryable), trying next partition"
853                            );
854                            continue;
855                        }
856                        Err(e2) => return Err(e2),
857                    }
858                }
859                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
860                    tracing::debug!(
861                        execution_id = %execution_id,
862                        error = %e,
863                        "claim execution failed (retryable), trying next partition"
864                    );
865                    continue;
866                }
867                Err(e) => return Err(e),
868            }
869        }
870
871        // No eligible work found on any partition
872        Ok(None)
873    }
874
875    #[cfg(feature = "direct-valkey-claim")]
876    async fn issue_claim_grant(
877        &self,
878        execution_id: &ExecutionId,
879        lane_id: &LaneId,
880        partition: &ff_core::partition::Partition,
881        idx: &IndexKeys,
882    ) -> Result<(), SdkError> {
883        let ctx = ExecKeyContext::new(partition, execution_id);
884
885        // KEYS (3): exec_core, claim_grant_key, eligible_zset
886        let keys: Vec<String> = vec![
887            ctx.core(),
888            ctx.claim_grant(),
889            idx.lane_eligible(lane_id),
890        ];
891
892        // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
893        //           capability_hash, grant_ttl_ms, route_snapshot_json,
894        //           admission_summary, worker_capabilities_csv (sorted)
895        let args: Vec<String> = vec![
896            execution_id.to_string(),
897            self.config.worker_id.to_string(),
898            self.config.worker_instance_id.to_string(),
899            lane_id.to_string(),
900            String::new(), // capability_hash
901            "5000".to_owned(), // grant_ttl_ms (5 seconds)
902            String::new(), // route_snapshot_json
903            String::new(), // admission_summary
904            self.worker_capabilities_csv.clone(), // sorted CSV
905        ];
906
907        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
908        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
909
910        let raw: Value = self
911            .client
912            .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
913            .await
914            .map_err(SdkError::from)?;
915
916        crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
917    }
918
919    /// Move an execution from the lane's eligible ZSET into its
920    /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
921    /// after a `CapabilityMismatch` reject — without this, the inline
922    /// direct-claim path would re-pick the same top-of-zset every tick
923    /// (same pattern the scheduler's block_candidate handles). The
924    /// engine's unblock scanner periodically promotes blocked_route
925    /// back to eligible once a worker with matching caps registers.
926    ///
927    /// Best-effort: transport or logical rejects (e.g. the execution
928    /// already went terminal between pick and block) are logged and the
929    /// outer loop simply `continue`s to the next partition. Parity with
930    /// ff-scheduler::Scheduler::block_candidate.
931    #[cfg(feature = "direct-valkey-claim")]
932    async fn block_route(
933        &self,
934        execution_id: &ExecutionId,
935        lane_id: &LaneId,
936        partition: &ff_core::partition::Partition,
937        idx: &IndexKeys,
938    ) {
939        let ctx = ExecKeyContext::new(partition, execution_id);
940        let core_key = ctx.core();
941        let eligible_key = idx.lane_eligible(lane_id);
942        let blocked_key = idx.lane_blocked_route(lane_id);
943        let eid_s = execution_id.to_string();
944        let now_ms = TimestampMs::now().0.to_string();
945
946        let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
947        let argv: [&str; 4] = [
948            &eid_s,
949            "waiting_for_capable_worker",
950            "no connected worker satisfies required_capabilities",
951            &now_ms,
952        ];
953
954        match self
955            .client
956            .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
957            .await
958        {
959            Ok(v) => {
960                // Parse Lua result so a logical reject (e.g. execution
961                // went terminal mid-flight) is visible — same fix we
962                // applied to ff-scheduler's block_candidate.
963                if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
964                    tracing::warn!(
965                        execution_id = %execution_id,
966                        error = %e,
967                        "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
968                         will re-evaluate"
969                    );
970                }
971            }
972            Err(e) => {
973                tracing::warn!(
974                    execution_id = %execution_id,
975                    error = %e,
976                    "SDK block_route: transport failure; eligible ZSET unchanged"
977                );
978            }
979        }
980    }
981
982    /// Low-level claim of a granted execution. Invokes
983    /// `ff_claim_execution` and returns a `ClaimedTask` with auto
984    /// lease renewal.
985    ///
986    /// Previously gated behind `direct-valkey-claim`; ungated so
987    /// the public [`claim_from_grant`] entry point can reuse the
988    /// same FCALL plumbing. The method stays private — external
989    /// callers use `claim_from_grant`.
990    ///
991    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
992    async fn claim_execution(
993        &self,
994        execution_id: &ExecutionId,
995        lane_id: &LaneId,
996        partition: &ff_core::partition::Partition,
997        _now: TimestampMs,
998    ) -> Result<ClaimedTask, SdkError> {
999        let ctx = ExecKeyContext::new(partition, execution_id);
1000        let idx = IndexKeys::new(partition);
1001
1002        // Pre-read total_attempt_count from exec_core to derive next attempt index.
1003        // The Lua uses total_attempt_count as the new index and dynamically builds
1004        // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
1005        // we pass the correct index for documentation/debugging.
1006        let total_str: Option<String> = self.client
1007            .cmd("HGET")
1008            .arg(ctx.core())
1009            .arg("total_attempt_count")
1010            .execute()
1011            .await
1012            .unwrap_or(None);
1013        let next_idx = total_str
1014            .as_deref()
1015            .and_then(|s| s.parse::<u32>().ok())
1016            .unwrap_or(0);
1017        let att_idx = AttemptIndex::new(next_idx);
1018
1019        let lease_id = LeaseId::new().to_string();
1020        let attempt_id = AttemptId::new().to_string();
1021        let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
1022
1023        // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
1024        let keys: Vec<String> = vec![
1025            ctx.core(),                                    // 1  exec_core
1026            ctx.claim_grant(),                             // 2  claim_grant
1027            idx.lane_eligible(lane_id),                    // 3  eligible_zset
1028            idx.lease_expiry(),                            // 4  lease_expiry_zset
1029            idx.worker_leases(&self.config.worker_instance_id), // 5  worker_leases
1030            ctx.attempt_hash(att_idx),                     // 6  attempt_hash (placeholder)
1031            ctx.attempt_usage(att_idx),                    // 7  attempt_usage (placeholder)
1032            ctx.attempt_policy(att_idx),                   // 8  attempt_policy (placeholder)
1033            ctx.attempts(),                                // 9  attempts_zset
1034            ctx.lease_current(),                           // 10 lease_current
1035            ctx.lease_history(),                           // 11 lease_history
1036            idx.lane_active(lane_id),                      // 12 active_index
1037            idx.attempt_timeout(),                         // 13 attempt_timeout_zset
1038            idx.execution_deadline(),                      // 14 execution_deadline_zset
1039        ];
1040
1041        // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
1042        let args: Vec<String> = vec![
1043            execution_id.to_string(),                      // 1  execution_id
1044            self.config.worker_id.to_string(),             // 2  worker_id
1045            self.config.worker_instance_id.to_string(),    // 3  worker_instance_id
1046            lane_id.to_string(),                           // 4  lane
1047            String::new(),                                 // 5  capability_hash
1048            lease_id.clone(),                              // 6  lease_id
1049            self.config.lease_ttl_ms.to_string(),          // 7  lease_ttl_ms
1050            renew_before_ms.to_string(),                   // 8  renew_before_ms
1051            attempt_id.clone(),                            // 9  attempt_id
1052            "{}".to_owned(),                               // 10 attempt_policy_json
1053            String::new(),                                 // 11 attempt_timeout_ms
1054            String::new(),                                 // 12 execution_deadline_at
1055        ];
1056
1057        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1058        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1059
1060        let raw: Value = self
1061            .client
1062            .fcall("ff_claim_execution", &key_refs, &arg_refs)
1063            .await
1064            .map_err(SdkError::from)?;
1065
1066        // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
1067        //                      attempt_id, attempt_type, lease_expires_at}
1068        let arr = match &raw {
1069            Value::Array(arr) => arr,
1070            _ => {
1071                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1072                    fcall: "ff_claim_execution".into(),
1073                    execution_id: Some(execution_id.to_string()),
1074                    message: "expected Array".into(),
1075                }));
1076            }
1077        };
1078
1079        let status_code = match arr.first() {
1080            Some(Ok(Value::Int(n))) => *n,
1081            _ => {
1082                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1083                    fcall: "ff_claim_execution".into(),
1084                    execution_id: Some(execution_id.to_string()),
1085                    message: "bad status code".into(),
1086                }));
1087            }
1088        };
1089
1090        if status_code != 1 {
1091            let err_field_str = |idx: usize| -> String {
1092                arr.get(idx)
1093                    .and_then(|v| match v {
1094                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1095                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1096                        _ => None,
1097                    })
1098                    .unwrap_or_default()
1099            };
1100            let error_code = {
1101                let s = err_field_str(1);
1102                if s.is_empty() { "unknown".to_owned() } else { s }
1103            };
1104            let detail = err_field_str(2);
1105
1106            return Err(SdkError::from(
1107                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1108                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1109                        fcall: "ff_claim_execution".into(),
1110                        execution_id: Some(execution_id.to_string()),
1111                        message: format!("unknown error: {error_code}"),
1112                    }),
1113            ));
1114        }
1115
1116        // Extract fields from success response
1117        let field_str = |idx: usize| -> String {
1118            arr.get(idx + 2) // skip status_code and "OK"
1119                .and_then(|v| match v {
1120                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1121                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1122                    Ok(Value::Int(n)) => Some(n.to_string()),
1123                    _ => None,
1124                })
1125                .unwrap_or_default()
1126        };
1127
1128        // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1129        // Positions:       0         1       2           3            4              5
1130        let lease_id = LeaseId::parse(&field_str(0))
1131            .unwrap_or_else(|_| LeaseId::new());
1132        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1133        // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1134        let attempt_id = AttemptId::parse(&field_str(3))
1135            .unwrap_or_else(|_| AttemptId::new());
1136        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1137
1138        // Read execution payload and metadata
1139        let (input_payload, execution_kind, tags) = self
1140            .read_execution_context(execution_id, partition)
1141            .await?;
1142
1143        Ok(ClaimedTask::new(
1144            self.client.clone(),
1145            self.backend.clone(),
1146            self.partition_config,
1147            execution_id.clone(),
1148            attempt_index,
1149            attempt_id,
1150            lease_id,
1151            lease_epoch,
1152            self.config.lease_ttl_ms,
1153            lane_id.clone(),
1154            self.config.worker_instance_id.clone(),
1155            input_payload,
1156            execution_kind,
1157            tags,
1158        ))
1159    }
1160
1161    /// Consume a [`ClaimGrant`] and claim the granted execution on
1162    /// this worker. The intended production entry point: pair with
1163    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1164    /// scheduler-issued grants into the SDK without enabling the
1165    /// `direct-valkey-claim` feature (which bypasses budget/quota
1166    /// admission control).
1167    ///
1168    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1169    /// so a saturated worker does not consume the grant: the grant
1170    /// stays valid for its remaining TTL and the caller can either
1171    /// release it back to the scheduler or retry after some other
1172    /// in-flight task completes.
1173    ///
1174    /// On success the returned [`ClaimedTask`] holds a concurrency
1175    /// permit that releases automatically on
1176    /// `complete`/`fail`/`cancel`/drop — same contract as
1177    /// `claim_next`.
1178    ///
1179    /// # Arguments
1180    ///
1181    /// * `lane` — the lane the grant was issued for. Must match what
1182    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1183    ///   uses it to look up `lane_eligible`, `lane_active`, and the
1184    ///   `worker_leases` index slot.
1185    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1186    ///
1187    /// # Errors
1188    ///
1189    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1190    ///   permits all held. Retryable; the grant is untouched.
1191    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1192    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1193    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1194    ///   (wrapped in [`SdkError::Engine`]).
1195    /// * `ScriptError::CapabilityMismatch` — execution's required
1196    ///   capabilities not a subset of this worker's caps (wrapped in
1197    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
1198    ///   between grant issuance and caps change allows it.
1199    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1200    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
1201    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1202    ///   transport error during the FCALL or the
1203    ///   `read_execution_context` follow-up.
1204    ///
1205    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1206    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1207    pub async fn claim_from_grant(
1208        &self,
1209        lane: LaneId,
1210        grant: ff_core::contracts::ClaimGrant,
1211    ) -> Result<ClaimedTask, SdkError> {
1212        // Semaphore check FIRST. If the worker is saturated we must
1213        // surface the condition to the caller without touching the
1214        // grant — silently returning Ok(None) (as claim_next does)
1215        // would drop a grant the scheduler has already committed work
1216        // to issuing, wasting the slot until its TTL elapses.
1217        let permit = self
1218            .concurrency_semaphore
1219            .clone()
1220            .try_acquire_owned()
1221            .map_err(|_| SdkError::WorkerAtCapacity)?;
1222
1223        let now = TimestampMs::now();
1224        let partition = grant.partition().map_err(|e| SdkError::Config {
1225            context: "claim_from_grant".to_owned(),
1226            field: Some("partition_key".to_owned()),
1227            message: e.to_string(),
1228        })?;
1229        let mut task = self
1230            .claim_execution(&grant.execution_id, &lane, &partition, now)
1231            .await?;
1232        task.set_concurrency_permit(permit);
1233        Ok(task)
1234    }
1235
1236    /// Scheduler-routed claim: POST the server's
1237    /// `/v1/workers/{id}/claim`, then chain to
1238    /// [`Self::claim_from_grant`].
1239    ///
1240    /// Batch C item 2 PR-B. This is the production entry point —
1241    /// budget + quota + capability admission run server-side inside
1242    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1243    /// enable the `direct-valkey-claim` feature.
1244    ///
1245    /// Returns `Ok(None)` when the server says no eligible execution
1246    /// (HTTP 204). Callers typically back off by
1247    /// `config.claim_poll_interval_ms` and try again, same cadence
1248    /// as the direct-claim path's `Ok(None)`.
1249    ///
1250    /// The `admin` client is the established HTTP surface
1251    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1252    /// second reqwest client around. Build once at worker boot and
1253    /// hand in by reference on every claim.
1254    pub async fn claim_via_server(
1255        &self,
1256        admin: &crate::FlowFabricAdminClient,
1257        lane: &LaneId,
1258        grant_ttl_ms: u64,
1259    ) -> Result<Option<ClaimedTask>, SdkError> {
1260        let req = crate::admin::ClaimForWorkerRequest {
1261            worker_id: self.config.worker_id.to_string(),
1262            lane_id: lane.to_string(),
1263            worker_instance_id: self.config.worker_instance_id.to_string(),
1264            capabilities: self.config.capabilities.clone(),
1265            grant_ttl_ms,
1266        };
1267        let Some(resp) = admin.claim_for_worker(req).await? else {
1268            return Ok(None);
1269        };
1270        let grant = resp.into_grant()?;
1271        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1272    }
1273
1274    /// Consume a [`ReclaimGrant`] and transition the granted
1275    /// `attempt_interrupted` execution into a `started` state on this
1276    /// worker. Symmetric partner to [`claim_from_grant`] for the
1277    /// resume path.
1278    ///
1279    /// The grant must have been issued to THIS worker (matching
1280    /// `worker_id` at grant time). A mismatch returns
1281    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1282    /// atomically by `ff_claim_resumed_execution`; a second call with
1283    /// the same grant also returns `InvalidClaimGrant`.
1284    ///
1285    /// # Concurrency
1286    ///
1287    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1288    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1289    /// assume pre-existing capacity on this worker — a reclaim can
1290    /// land on a fresh worker instance that just came up after a
1291    /// crash/restart and is picking up a previously-interrupted
1292    /// execution. If the worker is saturated, the grant stays valid
1293    /// for its remaining TTL and the caller can release it or retry.
1294    ///
1295    /// On success the returned [`ClaimedTask`] holds a concurrency
1296    /// permit that releases automatically on
1297    /// `complete`/`fail`/`cancel`/drop.
1298    ///
1299    /// # Errors
1300    ///
1301    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1302    ///   permits all held. Retryable; the grant is untouched (no
1303    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1304    ///   atomically consume the grant key).
1305    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1306    ///   or `worker_id` mismatch.
1307    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1308    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1309    ///   `attempt_interrupted`.
1310    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1311    ///   not `runnable`.
1312    /// * `ScriptError::ExecutionNotFound` — core key missing.
1313    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1314    ///   transport.
1315    ///
1316    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1317    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1318    pub async fn claim_from_reclaim_grant(
1319        &self,
1320        grant: ff_core::contracts::ReclaimGrant,
1321    ) -> Result<ClaimedTask, SdkError> {
1322        // Semaphore check FIRST — same load-bearing ordering as
1323        // `claim_from_grant`. If the worker is saturated, surface
1324        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1325        // atomic consume on the grant key, so calling it past-
1326        // saturation would destroy the grant while leaving no
1327        // permit to attach to the returned `ClaimedTask`.
1328        let permit = self
1329            .concurrency_semaphore
1330            .clone()
1331            .try_acquire_owned()
1332            .map_err(|_| SdkError::WorkerAtCapacity)?;
1333
1334        // Grant carries partition + lane_id so no round-trip is needed
1335        // to resolve them before the FCALL.
1336        let partition = grant.partition().map_err(|e| SdkError::Config {
1337            context: "claim_from_reclaim_grant".to_owned(),
1338            field: Some("partition_key".to_owned()),
1339            message: e.to_string(),
1340        })?;
1341        let mut task = self
1342            .claim_resumed_execution(
1343                &grant.execution_id,
1344                &grant.lane_id,
1345                &partition,
1346            )
1347            .await?;
1348        task.set_concurrency_permit(permit);
1349        Ok(task)
1350    }
1351
1352    /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1353    /// and returns a `ClaimedTask` bound to the resumed attempt.
1354    ///
1355    /// Previously gated behind `direct-valkey-claim`; ungated so the
1356    /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1357    /// The method stays private — external callers use
1358    /// `claim_from_reclaim_grant`.
1359    ///
1360    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1361    async fn claim_resumed_execution(
1362        &self,
1363        execution_id: &ExecutionId,
1364        lane_id: &LaneId,
1365        partition: &ff_core::partition::Partition,
1366    ) -> Result<ClaimedTask, SdkError> {
1367        let ctx = ExecKeyContext::new(partition, execution_id);
1368        let idx = IndexKeys::new(partition);
1369
1370        // Pre-read current_attempt_index for the existing attempt hash key.
1371        // This is load-bearing: KEYS[6] must point to the real attempt hash.
1372        let att_idx_str: Option<String> = self.client
1373            .cmd("HGET")
1374            .arg(ctx.core())
1375            .arg("current_attempt_index")
1376            .execute()
1377            .await
1378            .map_err(|e| crate::backend_context(e, "read attempt_index"))?;
1379        let att_idx = AttemptIndex::new(
1380            att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1381        );
1382
1383        let lease_id = LeaseId::new().to_string();
1384
1385        // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1386        let keys: Vec<String> = vec![
1387            ctx.core(),                                             // 1  exec_core
1388            ctx.claim_grant(),                                      // 2  claim_grant
1389            idx.lane_eligible(lane_id),                             // 3  eligible_zset
1390            idx.lease_expiry(),                                     // 4  lease_expiry_zset
1391            idx.worker_leases(&self.config.worker_instance_id),     // 5  worker_leases
1392            ctx.attempt_hash(att_idx),                              // 6  existing_attempt_hash
1393            ctx.lease_current(),                                    // 7  lease_current
1394            ctx.lease_history(),                                    // 8  lease_history
1395            idx.lane_active(lane_id),                               // 9  active_index
1396            idx.attempt_timeout(),                                  // 10 attempt_timeout_zset
1397            idx.execution_deadline(),                               // 11 execution_deadline_zset
1398        ];
1399
1400        // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1401        let args: Vec<String> = vec![
1402            execution_id.to_string(),                               // 1  execution_id
1403            self.config.worker_id.to_string(),                      // 2  worker_id
1404            self.config.worker_instance_id.to_string(),             // 3  worker_instance_id
1405            lane_id.to_string(),                                    // 4  lane
1406            String::new(),                                          // 5  capability_hash
1407            lease_id.clone(),                                       // 6  lease_id
1408            self.config.lease_ttl_ms.to_string(),                   // 7  lease_ttl_ms
1409            String::new(),                                          // 8  remaining_attempt_timeout_ms
1410        ];
1411
1412        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1413        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1414
1415        // TODO(#150): migrate when EngineBackend trait grows a
1416        // `claim_resumed_execution` method; for now this FCALL stays on
1417        // ff-sdk's direct client path.
1418        let raw: Value = self
1419            .client
1420            .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1421            .await
1422            .map_err(SdkError::from)?;
1423
1424        // Parse result — same format as ff_claim_execution:
1425        // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1426        let arr = match &raw {
1427            Value::Array(arr) => arr,
1428            _ => {
1429                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1430                    fcall: "ff_claim_resumed_execution".into(),
1431                    execution_id: Some(execution_id.to_string()),
1432                    message: "expected Array".into(),
1433                }));
1434            }
1435        };
1436
1437        let status_code = match arr.first() {
1438            Some(Ok(Value::Int(n))) => *n,
1439            _ => {
1440                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1441                    fcall: "ff_claim_resumed_execution".into(),
1442                    execution_id: Some(execution_id.to_string()),
1443                    message: "bad status code".into(),
1444                }));
1445            }
1446        };
1447
1448        if status_code != 1 {
1449            let err_field_str = |idx: usize| -> String {
1450                arr.get(idx)
1451                    .and_then(|v| match v {
1452                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1453                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1454                        _ => None,
1455                    })
1456                    .unwrap_or_default()
1457            };
1458            let error_code = {
1459                let s = err_field_str(1);
1460                if s.is_empty() { "unknown".to_owned() } else { s }
1461            };
1462            let detail = err_field_str(2);
1463
1464            return Err(SdkError::from(
1465                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1466                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1467                        fcall: "ff_claim_resumed_execution".into(),
1468                        execution_id: Some(execution_id.to_string()),
1469                        message: format!("unknown error: {error_code}"),
1470                    }),
1471            ));
1472        }
1473
1474        let field_str = |idx: usize| -> String {
1475            arr.get(idx + 2)
1476                .and_then(|v| match v {
1477                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1478                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1479                    Ok(Value::Int(n)) => Some(n.to_string()),
1480                    _ => None,
1481                })
1482                .unwrap_or_default()
1483        };
1484
1485        let lease_id = LeaseId::parse(&field_str(0))
1486            .unwrap_or_else(|_| LeaseId::new());
1487        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1488        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1489        let attempt_id = AttemptId::parse(&field_str(3))
1490            .unwrap_or_else(|_| AttemptId::new());
1491
1492        let (input_payload, execution_kind, tags) = self
1493            .read_execution_context(execution_id, partition)
1494            .await?;
1495
1496        Ok(ClaimedTask::new(
1497            self.client.clone(),
1498            self.backend.clone(),
1499            self.partition_config,
1500            execution_id.clone(),
1501            attempt_index,
1502            attempt_id,
1503            lease_id,
1504            lease_epoch,
1505            self.config.lease_ttl_ms,
1506            lane_id.clone(),
1507            self.config.worker_instance_id.clone(),
1508            input_payload,
1509            execution_kind,
1510            tags,
1511        ))
1512    }
1513
1514    /// Read payload + execution_kind + tags from exec_core. Previously
1515    /// gated behind `direct-valkey-claim`; now shared by the
1516    /// feature-gated inline claim path and the public
1517    /// `claim_from_reclaim_grant` entry point.
1518    async fn read_execution_context(
1519        &self,
1520        execution_id: &ExecutionId,
1521        partition: &ff_core::partition::Partition,
1522    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1523        let ctx = ExecKeyContext::new(partition, execution_id);
1524
1525        // Read payload
1526        let payload: Option<String> = self
1527            .client
1528            .get(&ctx.payload())
1529            .await
1530            .map_err(|e| crate::backend_context(e, "GET payload failed"))?;
1531        let input_payload = payload.unwrap_or_default().into_bytes();
1532
1533        // Read execution_kind from core
1534        let kind: Option<String> = self
1535            .client
1536            .hget(&ctx.core(), "execution_kind")
1537            .await
1538            .map_err(|e| crate::backend_context(e, "HGET execution_kind failed"))?;
1539        let execution_kind = kind.unwrap_or_default();
1540
1541        // Read tags
1542        let tags: HashMap<String, String> = self
1543            .client
1544            .hgetall(&ctx.tags())
1545            .await
1546            .map_err(|e| crate::backend_context(e, "HGETALL tags"))?;
1547
1548        Ok((input_payload, execution_kind, tags))
1549    }
1550
1551    // ── Phase 3: Signal delivery ──
1552
1553    /// Deliver a signal to a suspended execution's waitpoint.
1554    ///
1555    /// The engine atomically records the signal, evaluates the resume condition,
1556    /// and optionally transitions the execution from `suspended` to `runnable`.
1557    pub async fn deliver_signal(
1558        &self,
1559        execution_id: &ExecutionId,
1560        waitpoint_id: &WaitpointId,
1561        signal: crate::task::Signal,
1562    ) -> Result<crate::task::SignalOutcome, SdkError> {
1563        let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1564        let ctx = ExecKeyContext::new(&partition, execution_id);
1565        let idx = IndexKeys::new(&partition);
1566
1567        let signal_id = ff_core::types::SignalId::new();
1568        let now = TimestampMs::now();
1569
1570        // Pre-read lane_id from exec_core — the execution may be on any lane,
1571        // not necessarily one of this worker's configured lanes.
1572        let lane_str: Option<String> = self
1573            .client
1574            .hget(&ctx.core(), "lane_id")
1575            .await
1576            .map_err(|e| crate::backend_context(e, "HGET lane_id"))?;
1577        let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1578
1579        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1580        //            exec_signals_zset, signal_hash, signal_payload,
1581        //            idem_key, waitpoint_hash, suspension_current,
1582        //            eligible_zset, suspended_zset, delayed_zset,
1583        //            suspension_timeout_zset, hmac_secrets
1584        let idem_key = if let Some(ref ik) = signal.idempotency_key {
1585            ctx.signal_dedup(waitpoint_id, ik)
1586        } else {
1587            ctx.noop() // must share {p:N} hash tag for cluster mode
1588        };
1589        let keys: Vec<String> = vec![
1590            ctx.core(),                                    // 1
1591            ctx.waitpoint_condition(waitpoint_id),         // 2
1592            ctx.waitpoint_signals(waitpoint_id),           // 3
1593            ctx.exec_signals(),                            // 4
1594            ctx.signal(&signal_id),                        // 5
1595            ctx.signal_payload(&signal_id),                // 6
1596            idem_key,                                      // 7
1597            ctx.waitpoint(waitpoint_id),                   // 8
1598            ctx.suspension_current(),                      // 9
1599            idx.lane_eligible(&lane_id),                   // 10
1600            idx.lane_suspended(&lane_id),                  // 11
1601            idx.lane_delayed(&lane_id),                    // 12
1602            idx.suspension_timeout(),                      // 13
1603            idx.waitpoint_hmac_secrets(),                  // 14
1604        ];
1605
1606        let payload_str = signal
1607            .payload
1608            .as_ref()
1609            .map(|p| String::from_utf8_lossy(p).into_owned())
1610            .unwrap_or_default();
1611
1612        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1613        //            signal_category, source_type, source_identity,
1614        //            payload, payload_encoding, idempotency_key,
1615        //            correlation_id, target_scope, created_at,
1616        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1617        //            max_signals_per_execution, waitpoint_token
1618        let args: Vec<String> = vec![
1619            signal_id.to_string(),                           // 1
1620            execution_id.to_string(),                        // 2
1621            waitpoint_id.to_string(),                        // 3
1622            signal.signal_name,                              // 4
1623            signal.signal_category,                          // 5
1624            signal.source_type,                              // 6
1625            signal.source_identity,                          // 7
1626            payload_str,                                     // 8
1627            "json".to_owned(),                               // 9 payload_encoding
1628            signal.idempotency_key.unwrap_or_default(),      // 10
1629            String::new(),                                   // 11 correlation_id
1630            "waitpoint".to_owned(),                          // 12 target_scope
1631            now.to_string(),                                 // 13 created_at
1632            "86400000".to_owned(),                           // 14 dedup_ttl_ms
1633            "0".to_owned(),                                  // 15 resume_delay_ms
1634            "1000".to_owned(),                               // 16 signal_maxlen
1635            "10000".to_owned(),                              // 17 max_signals
1636            // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1637            // use ToString/Display (those are redacted for log safety);
1638            // .as_str() is the explicit opt-in that gets the secret bytes.
1639            signal.waitpoint_token.as_str().to_owned(),      // 18 waitpoint_token
1640        ];
1641
1642        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1643        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1644
1645        // TODO(#150): migrate when EngineBackend trait grows a
1646        // `deliver_signal` method; for now this FCALL stays on
1647        // ff-sdk's direct client path.
1648        let raw: Value = self
1649            .client
1650            .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1651            .await
1652            .map_err(SdkError::from)?;
1653
1654        crate::task::parse_signal_result(&raw)
1655    }
1656
1657    #[cfg(feature = "direct-valkey-claim")]
1658    fn next_lane(&self) -> LaneId {
1659        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1660        self.config.lanes[idx].clone()
1661    }
1662}
1663
1664#[cfg(feature = "direct-valkey-claim")]
1665fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1666    use ff_core::error::ErrorClass;
1667    matches!(
1668        ff_script::engine_error_ext::class(err),
1669        ErrorClass::Retryable | ErrorClass::Informational
1670    )
1671}
1672
1673/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1674/// instance id with FNV-1a to place distinct worker processes on different
1675/// partition windows from their first poll. Zero is valid for single-worker
1676/// clusters but spreads work in multi-worker deployments.
1677#[cfg(feature = "direct-valkey-claim")]
1678fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1679    if num_partitions == 0 {
1680        return 0;
1681    }
1682    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1683}
1684
1685#[cfg(feature = "direct-valkey-claim")]
1686fn extract_first_array_string(value: &Value) -> Option<String> {
1687    match value {
1688        Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1689            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1690            Ok(Value::SimpleString(s)) => Some(s.clone()),
1691            _ => None,
1692        },
1693        _ => None,
1694    }
1695}
1696
1697/// Read partition config from Valkey's `ff:config:partitions` hash.
1698/// Returns Err if the key doesn't exist or can't be read.
1699async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1700    let key = ff_core::keys::global_config_partitions();
1701    let fields: HashMap<String, String> = client
1702        .hgetall(&key)
1703        .await
1704        .map_err(|e| crate::backend_context(e, format!("HGETALL {key}")))?;
1705
1706    if fields.is_empty() {
1707        return Err(SdkError::Config {
1708            context: "read_partition_config".into(),
1709            field: None,
1710            message: "ff:config:partitions not found in Valkey".into(),
1711        });
1712    }
1713
1714    let parse = |field: &str, default: u16| -> u16 {
1715        fields
1716            .get(field)
1717            .and_then(|v| v.parse().ok())
1718            .filter(|&n: &u16| n > 0)
1719            .unwrap_or(default)
1720    };
1721
1722    Ok(PartitionConfig {
1723        num_flow_partitions: parse("num_flow_partitions", 256),
1724        num_budget_partitions: parse("num_budget_partitions", 32),
1725        num_quota_partitions: parse("num_quota_partitions", 32),
1726    })
1727}
1728
1729#[cfg(test)]
1730mod completion_accessor_type_tests {
1731    //! Type-level compile check that
1732    //! [`FlowFabricWorker::completion_backend`] returns an
1733    //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1734    //! the assertion is at the function-pointer type level and the
1735    //! #[test] body exists solely so the compiler elaborates it.
1736    use super::FlowFabricWorker;
1737    use ff_core::completion_backend::CompletionBackend;
1738    use std::sync::Arc;
1739
1740    #[test]
1741    fn completion_backend_accessor_signature() {
1742        // If this line compiles, the public accessor returns the
1743        // advertised type. The function is never called (no live
1744        // worker), so no I/O happens.
1745        let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1746            FlowFabricWorker::completion_backend;
1747    }
1748}
1749
1750#[cfg(all(test, feature = "direct-valkey-claim"))]
1751mod scan_cursor_tests {
1752    use super::scan_cursor_seed;
1753
1754    #[test]
1755    fn stable_for_same_input() {
1756        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1757    }
1758
1759    #[test]
1760    fn distinct_for_different_ids() {
1761        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1762    }
1763
1764    #[test]
1765    fn bounded_by_partition_count() {
1766        for i in 0..100 {
1767            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1768        }
1769    }
1770
1771    #[test]
1772    fn zero_partitions_returns_zero() {
1773        assert_eq!(scan_cursor_seed("w1", 0), 0);
1774    }
1775}