Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6use ferriskey::{Client, Value};
7use ff_core::keys::{ExecKeyContext, IndexKeys};
8use ff_core::partition::PartitionConfig;
9use ff_core::types::*;
10use tokio::sync::Semaphore;
11
12use crate::config::WorkerConfig;
13use crate::task::ClaimedTask;
14use crate::SdkError;
15
16/// FlowFabric worker — connects to Valkey, claims executions, and provides
17/// the worker-facing API.
18///
19/// # Admission control
20///
21/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
22/// **bypasses the scheduler's admission controls**: it reads the eligible
23/// ZSET directly and mints its own claim grant without consulting budget
24/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
25/// benchmarks, tests, and single-tenant development where the scheduler
26/// hop is measurement noise, not for production.
27///
28/// For production deployments, consume scheduler-issued grants via
29/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
30/// budget breach, quota sliding-window, concurrency cap, and
31/// capability-match checks before issuing grants.
32///
33/// # Usage
34///
35/// ```rust,ignore
36/// use ff_core::backend::BackendConfig;
37/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
38/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
39///
40/// let config = WorkerConfig {
41///     backend: BackendConfig::valkey("localhost", 6379),
42///     worker_id: WorkerId::new("w1"),
43///     worker_instance_id: WorkerInstanceId::new("w1-i1"),
44///     namespace: Namespace::new("default"),
45///     lanes: vec![LaneId::new("main")],
46///     capabilities: Vec::new(),
47///     lease_ttl_ms: 30_000,
48///     claim_poll_interval_ms: 1_000,
49///     max_concurrent_tasks: 1,
50/// };
51/// let worker = FlowFabricWorker::connect(config).await?;
52///
53/// loop {
54///     if let Some(task) = worker.claim_next().await? {
55///         // Process task...
56///         task.complete(Some(b"result".to_vec())).await?;
57///     } else {
58///         tokio::time::sleep(Duration::from_secs(1)).await;
59///     }
60/// }
61/// ```
62pub struct FlowFabricWorker {
63    client: Client,
64    config: WorkerConfig,
65    partition_config: PartitionConfig,
66    /// Sorted, deduplicated, comma-separated capabilities — computed once
67    /// from `config.capabilities` at connect time. Passed as ARGV[9] to
68    /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
69    /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
70    /// reproducible logs and tests.
71    #[cfg(feature = "direct-valkey-claim")]
72    worker_capabilities_csv: String,
73    /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
74    /// per-mismatch logs so the 4KB CSV never echoes on every reject
75    /// during an incident. Full CSV logged once at connect-time WARN for
76    /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
77    #[cfg(feature = "direct-valkey-claim")]
78    worker_capabilities_hash: String,
79    #[cfg(feature = "direct-valkey-claim")]
80    lane_index: AtomicUsize,
81    /// Concurrency cap for in-flight tasks. Permits are acquired or
82    /// transferred by [`claim_next`] (feature-gated),
83    /// [`claim_from_grant`] (always available), and
84    /// [`claim_from_reclaim_grant`], transferred to the returned
85    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
86    /// Holds `max_concurrent_tasks` permits total.
87    ///
88    /// [`claim_next`]: FlowFabricWorker::claim_next
89    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
90    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
91    concurrency_semaphore: Arc<Semaphore>,
92    /// Rolling offset for chunked partition scans. Each poll advances the
93    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
94    /// chunk)` polls every partition is covered. The initial value is
95    /// derived from `worker_instance_id` so idle workers spread their
96    /// scans across different partitions from the first poll onward.
97    ///
98    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
99    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
100    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
101    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
102    /// correctness because the sequence simply restarts a new cycle.
103    #[cfg(feature = "direct-valkey-claim")]
104    scan_cursor: AtomicUsize,
105    /// The [`EngineBackend`] the Stage-1b trait forwarders route
106    /// through.
107    ///
108    /// **RFC-012 Stage 1b.** Always populated:
109    /// [`FlowFabricWorker::connect`] now wraps the worker's own
110    /// `ferriskey::Client` in a `ValkeyBackend` via
111    /// `ValkeyBackend::from_client_and_partitions`, and
112    /// [`FlowFabricWorker::connect_with`] replaces that default with
113    /// the caller-supplied `Arc<dyn EngineBackend>`. The
114    /// [`FlowFabricWorker::backend`] accessor still returns
115    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
116    /// narrows the return type once consumers have migrated.
117    ///
118    /// Hot paths (claim, deliver_signal, admin queries) still use the
119    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
120    /// migrates them through this field, and Stage 1d removes the
121    /// embedded client.
122    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
123    /// Optional handle to the same underlying backend viewed as a
124    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
125    /// Populated by [`Self::connect`] from the bundled
126    /// `ValkeyBackend` (which implements the trait); supplied by the
127    /// caller on [`Self::connect_with`] as an explicit
128    /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
129    /// backend does not support push-based completion" (e.g. a future
130    /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
131    /// and other completion-subscription consumers reach this through
132    /// [`Self::completion_backend`].
133    completion_backend_handle:
134        Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
135}
136
137/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
138/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
139/// O(num_flow_partitions).
140#[cfg(feature = "direct-valkey-claim")]
141const PARTITION_SCAN_CHUNK: usize = 32;
142
143impl FlowFabricWorker {
144    /// Connect to Valkey and prepare the worker.
145    ///
146    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
147    /// library — that is the server's responsibility (ff-server calls
148    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
149    /// the library is already loaded.
150    ///
151    /// # Smoke / dev scripts: rotate `WorkerInstanceId`
152    ///
153    /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
154    /// `WorkerInstanceId`. When a smoke / dev script reuses the same
155    /// `WorkerInstanceId` across restarts, subsequent runs trap behind
156    /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
157    /// configured lease TTL) expires — the worker appears stuck and
158    /// claims nothing. Iterative scripts should synthesise a fresh
159    /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
160    /// or embed a UUID/timestamp) rather than hard-coding a stable
161    /// value. Production workers that cleanly shut down release the
162    /// key; only crashed / kill -9'd processes hit this trap.
163    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
164        if config.lanes.is_empty() {
165            return Err(SdkError::Config {
166                context: "worker_config".into(),
167                field: None,
168                message: "at least one lane is required".into(),
169            });
170        }
171
172        // Build the ferriskey client from the nested `BackendConfig`.
173        // Delegates to `ff_backend_valkey::build_client` so host/port +
174        // TLS + cluster + `BackendTimeouts::request` +
175        // `BackendRetry` wiring lives in exactly one place (pre-Stage
176        // 1c this path had its own `ClientBuilder` chain that diverged
177        // from the backend's shape; RFC-012 Stage 1c tranche 1
178        // consolidates).
179        let client = ff_backend_valkey::build_client(&config.backend).await?;
180
181        // Verify connectivity
182        let pong: String = client
183            .cmd("PING")
184            .execute()
185            .await
186            .map_err(|e| crate::backend_context(e, "PING failed"))?;
187        if pong != "PONG" {
188            return Err(SdkError::Config {
189                context: "worker_connect".into(),
190                field: None,
191                message: format!("unexpected PING response: {pong}"),
192            });
193        }
194
195        // Guard against two worker processes sharing the same
196        // `worker_instance_id`. A duplicate instance would clobber each
197        // other's lease_current/active_index entries and double-claim work.
198        // SET NX on a liveness key with 2× lease TTL; if the key already
199        // exists another process is live. The key auto-expires if this
200        // process crashes without renewal, so a restart after a hard crash
201        // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
202        //
203        // Known limitations of this minimal scheme (documented for operators):
204        //   1. **Startup-only, not runtime.** There is no heartbeat renewal
205        //      path. After `2 × lease_ttl_ms` elapses the alive key expires
206        //      naturally even while this worker is still running, and a
207        //      second process with the same `worker_instance_id` launched
208        //      later will successfully SET NX alongside the first. The check
209        //      catches misconfiguration at boot; it does not fence duplicates
210        //      that appear mid-lifetime. Production deployments should rely
211        //      on the orchestrator (Kubernetes, systemd unit with
212        //      `Restart=on-failure`, etc.) as the authoritative single-
213        //      instance enforcer; this SET NX is belt-and-suspenders.
214        //
215        //   2. **Restart delay after a crash.** If a worker crashes
216        //      ungracefully (SIGKILL, container OOM) and is restarted within
217        //      `2 × lease_ttl_ms`, the alive key is still present and the
218        //      new process exits with `SdkError::Config("duplicate
219        //      worker_instance_id ...")`. Options for operators:
220        //        - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
221        //          before restarting.
222        //        - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
223        //          unblock the restart.
224        //        - Use a fresh `worker_instance_id` for the restart (the
225        //          orchestrator should already do this per-Pod).
226        //
227        //   3. **No graceful cleanup on shutdown.** There is no explicit
228        //      `disconnect()` call that DELs the alive key. On clean
229        //      `SIGTERM` the key lingers until its TTL expires. A follow-up
230        //      can add `FlowFabricWorker::disconnect(self)` for callers that
231        //      want to skip the restart-delay window.
232        let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
233        let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
234        let set_result: Option<String> = client
235            .cmd("SET")
236            .arg(&alive_key)
237            .arg("1")
238            .arg("NX")
239            .arg("PX")
240            .arg(alive_ttl_ms.to_string().as_str())
241            .execute()
242            .await
243            .map_err(|e| crate::backend_context(e, "SET NX worker alive key"))?;
244        if set_result.is_none() {
245            return Err(SdkError::Config {
246                context: "worker_connect".into(),
247                field: Some("worker_instance_id".into()),
248                message: format!(
249                    "duplicate worker_instance_id '{}': another process already holds {alive_key}",
250                    config.worker_instance_id
251                ),
252            });
253        }
254
255        // Read partition config from Valkey (set by ff-server on startup).
256        // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
257        let partition_config = read_partition_config(&client).await
258            .unwrap_or_else(|e| {
259                tracing::warn!(
260                    error = %e,
261                    "ff:config:partitions not found, using defaults"
262                );
263                PartitionConfig::default()
264            });
265
266        let max_tasks = config.max_concurrent_tasks.max(1);
267        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
268
269        tracing::info!(
270            worker_id = %config.worker_id,
271            instance_id = %config.worker_instance_id,
272            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
273            "FlowFabricWorker connected"
274        );
275
276        #[cfg(feature = "direct-valkey-claim")]
277        let scan_cursor_init = scan_cursor_seed(
278            config.worker_instance_id.as_str(),
279            partition_config.num_flow_partitions.max(1) as usize,
280        );
281
282        // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
283        // and deduplicates in one pass; string joining happens once here.
284        //
285        // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
286        //   - `,` is the CSV delimiter; a token containing one would split
287        //     mid-parse and could let a {"gpu"} worker appear to satisfy
288        //     {"gpu,cuda"} (silent auth bypass).
289        //   - Empty strings would produce leading/adjacent commas on the
290        //     wire and inflate token count for no semantic reason.
291        //   - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
292        //     `"gpu\n"` vs `"gpu"` produce silent mismatches that are
293        //     miserable to debug. Reject anything outside printable ASCII
294        //     excluding space (`'!'..='~'`) at ingress so a typo fails
295        //     loudly at connect instead of silently mis-routing forever.
296        // Reject at boot so operator misconfig is loud, symmetric with the
297        // scheduler path.
298        #[cfg(feature = "direct-valkey-claim")]
299        for cap in &config.capabilities {
300            if cap.is_empty() {
301                return Err(SdkError::Config {
302                    context: "worker_config".into(),
303                    field: Some("capabilities".into()),
304                    message: "capability token must not be empty".into(),
305                });
306            }
307            if cap.contains(',') {
308                return Err(SdkError::Config {
309                    context: "worker_config".into(),
310                    field: Some("capabilities".into()),
311                    message: format!(
312                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
313                    ),
314                });
315            }
316            // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
317            // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
318            // characters above 0x7F are ALLOWED so i18n caps like
319            // "东京-gpu" can be used. The CSV wire form is byte-safe for
320            // multibyte UTF-8 because `,` is always a single byte and
321            // never part of a multibyte continuation (only 0x80-0xBF are
322            // continuations, ',' is 0x2C).
323            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
324                return Err(SdkError::Config {
325                    context: "worker_config".into(),
326                    field: Some("capabilities".into()),
327                    message: format!(
328                        "capability token must not contain whitespace or control \
329                         characters: {cap:?}"
330                    ),
331                });
332            }
333        }
334        #[cfg(feature = "direct-valkey-claim")]
335        let worker_capabilities_csv: String = {
336            let set: std::collections::BTreeSet<&str> = config
337                .capabilities
338                .iter()
339                .map(|s| s.as_str())
340                .filter(|s| !s.is_empty())
341                .collect();
342            if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
343                return Err(SdkError::Config {
344                    context: "worker_config".into(),
345                    field: Some("capabilities".into()),
346                    message: format!(
347                        "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
348                        ff_core::policy::CAPS_MAX_TOKENS,
349                        set.len()
350                    ),
351                });
352            }
353            let csv = set.into_iter().collect::<Vec<_>>().join(",");
354            if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
355                return Err(SdkError::Config {
356                    context: "worker_config".into(),
357                    field: Some("capabilities".into()),
358                    message: format!(
359                        "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
360                        ff_core::policy::CAPS_MAX_BYTES,
361                        csv.len()
362                    ),
363                });
364            }
365            csv
366        };
367
368        // Short stable digest of the sorted caps CSV, computed once so
369        // per-mismatch logs carry a stable identifier instead of the 4KB
370        // CSV. Shared helper — ff-scheduler uses the same one for its
371        // own per-mismatch logs, so cross-component log lines are
372        // diffable against each other.
373        #[cfg(feature = "direct-valkey-claim")]
374        let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
375
376        // Full CSV logged once at connect so per-mismatch logs (which
377        // carry only the 8-hex hash) can be cross-referenced by ops.
378        #[cfg(feature = "direct-valkey-claim")]
379        if !worker_capabilities_csv.is_empty() {
380            tracing::info!(
381                worker_instance_id = %config.worker_instance_id,
382                worker_caps_hash = %worker_capabilities_hash,
383                worker_caps = %worker_capabilities_csv,
384                "worker connected with capabilities (full CSV — mismatch logs use hash only)"
385            );
386        }
387
388        // Non-authoritative advertisement of caps for operator visibility
389        // (CLI introspection, dashboards). The AUTHORITATIVE source for
390        // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
391        // that, never this string. Lossy here is correctness-safe.
392        //
393        // Storage: a single STRING key holding the sorted CSV. Rationale:
394        //   * **Atomic overwrite.** `SET` is a single command — a concurrent
395        //     reader can never observe a transient empty value (the prior
396        //     DEL+SADD pair had that window).
397        //   * **Crash cleanup without refresh loop.** The alive-key SET NX
398        //     is startup-only (see §1 above); there's no periodic renew
399        //     to piggy-back on, so a TTL on caps would independently
400        //     expire mid-flight and hide a live worker's caps from ops
401        //     tools. Instead we drop the TTL: each reconnect overwrites;
402        //     a crashed worker leaves a stale CSV until a new process
403        //     with the same worker_instance_id boots (which triggers
404        //     `duplicate worker_instance_id` via alive-key guard anyway —
405        //     the orchestrator allocates a new id, and operators can DEL
406        //     the stale caps key if they care).
407        //   * **Empty caps = DEL.** A restart from {gpu} to {} clears the
408        //     advertisement rather than leaving stale data.
409        // Cluster-safe advertisement: the per-worker caps STRING lives at
410        // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
411        // and the INSTANCE ID is SADD'd to the global workers-index SET
412        // `ff:idx:workers` (single slot). The unblock scanner's cluster
413        // enumeration uses SMEMBERS on the index + per-member GET on each
414        // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
415        // cluster mode only scans the shard the SCAN lands on and misses
416        // workers whose key hashes elsewhere). Pattern mirrors Batch A
417        // `budget_policies_index` / `flow_index` / `deps_all_edges`:
418        // operations stay atomic per command, the index is the
419        // cluster-wide enumeration surface.
420        #[cfg(feature = "direct-valkey-claim")]
421        {
422            let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
423            let index_key = ff_core::keys::workers_index_key();
424            let instance_id = config.worker_instance_id.to_string();
425            if worker_capabilities_csv.is_empty() {
426                // No caps advertised. DEL the per-worker caps string AND
427                // SREM from the index so the scanner doesn't GET an empty
428                // string for a worker that never declares caps.
429                let _ = client
430                    .cmd("DEL")
431                    .arg(&caps_key)
432                    .execute::<Option<i64>>()
433                    .await;
434                if let Err(e) = client
435                    .cmd("SREM")
436                    .arg(&index_key)
437                    .arg(&instance_id)
438                    .execute::<Option<i64>>()
439                    .await
440                {
441                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
442                        "SREM workers-index failed; continuing (non-authoritative)");
443                }
444            } else {
445                // Atomic overwrite of the caps STRING (one-command). Then
446                // SADD to the index (idempotent — re-running connect for
447                // the same id is a no-op at SADD level). The per-worker
448                // caps key is written BEFORE the index SADD so that when
449                // the scanner observes the id in the index, the caps key
450                // is guaranteed to resolve to a non-stale CSV (the reverse
451                // order would leak an index entry pointing at a stale or
452                // empty caps key during a narrow window).
453                if let Err(e) = client
454                    .cmd("SET")
455                    .arg(&caps_key)
456                    .arg(&worker_capabilities_csv)
457                    .execute::<Option<String>>()
458                    .await
459                {
460                    tracing::warn!(error = %e, key = %caps_key,
461                        "SET worker caps advertisement failed; continuing");
462                }
463                if let Err(e) = client
464                    .cmd("SADD")
465                    .arg(&index_key)
466                    .arg(&instance_id)
467                    .execute::<Option<i64>>()
468                    .await
469                {
470                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
471                        "SADD workers-index failed; continuing");
472                }
473            }
474        }
475
476        // RFC-012 Stage 1b: wrap the dialed client in a
477        // ValkeyBackend so `ClaimedTask`'s trait forwarders have
478        // something to call. `from_client_and_partitions` reuses the
479        // already-dialed client — no second connection.
480        // Share the concrete `Arc<ValkeyBackend>` across the two
481        // trait objects — one allocation, both accessors yield
482        // identity-equivalent handles.
483        let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
484            ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
485                client.clone(),
486                partition_config,
487            );
488        let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
489        let completion_backend_handle: Option<
490            Arc<dyn ff_core::completion_backend::CompletionBackend>,
491        > = Some(valkey_backend);
492
493        Ok(Self {
494            client,
495            config,
496            partition_config,
497            #[cfg(feature = "direct-valkey-claim")]
498            worker_capabilities_csv,
499            #[cfg(feature = "direct-valkey-claim")]
500            worker_capabilities_hash,
501            #[cfg(feature = "direct-valkey-claim")]
502            lane_index: AtomicUsize::new(0),
503            concurrency_semaphore,
504            #[cfg(feature = "direct-valkey-claim")]
505            scan_cursor: AtomicUsize::new(scan_cursor_init),
506            backend,
507            completion_backend_handle,
508        })
509    }
510
511    /// Store pre-built [`EngineBackend`] and (optional)
512    /// [`CompletionBackend`] handles on the worker. Builds the worker
513    /// via the legacy [`FlowFabricWorker::connect`] path first (so the
514    /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
515    /// paths still use is dialed), then replaces the default
516    /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
517    ///
518    /// The `completion` argument is explicit: 0.3.3 previously accepted
519    /// only `backend` and `completion_backend()` silently returned
520    /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
521    /// upcast to `Arc<dyn CompletionBackend>` without loss of
522    /// trait-object identity. 0.3.4 lets the caller decide.
523    ///
524    /// - `Some(arc)` — caller supplies a completion backend.
525    ///   [`Self::completion_backend`] returns `Some(clone)`.
526    /// - `None` — this backend does not support push-based completion
527    ///   (future Postgres backend without LISTEN/NOTIFY, test mocks).
528    ///   [`Self::completion_backend`] returns `None`.
529    ///
530    /// When the underlying backend implements both traits (as
531    /// `ValkeyBackend` does), pass the same `Arc` twice — the two
532    /// trait-object views share one allocation:
533    ///
534    /// ```rust,ignore
535    /// use std::sync::Arc;
536    /// use ff_backend_valkey::ValkeyBackend;
537    /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
538    ///
539    /// # async fn doc(worker_config: WorkerConfig,
540    /// #              backend_config: ff_backend_valkey::BackendConfig)
541    /// #     -> Result<(), ff_sdk::SdkError> {
542    /// // Valkey (completion supported):
543    /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
544    /// let worker = FlowFabricWorker::connect_with(
545    ///     worker_config,
546    ///     valkey.clone(),
547    ///     Some(valkey),
548    /// ).await?;
549    /// # Ok(()) }
550    /// ```
551    ///
552    /// Backend without completion support:
553    ///
554    /// ```rust,ignore
555    /// let worker = FlowFabricWorker::connect_with(
556    ///     worker_config,
557    ///     backend,
558    ///     None,
559    /// ).await?;
560    /// ```
561    ///
562    /// **Stage 1b + Round-7 scope — what the injected backend covers
563    /// today.** The injected backend currently covers these per-task
564    /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
565    /// `delay_execution` / `move_to_waiting_children` / `complete` /
566    /// `cancel` / `fail` / `create_pending_waitpoint` /
567    /// `append_frame` / `report_usage`. A mock backend therefore sees
568    /// that portion of the worker's per-task write surface. Lease
569    /// renewal also routes through `backend.renew(&handle)`. Round-7
570    /// (#135/#145) closed the four trait-shape gaps tracked by #117,
571    /// but `suspend` still reaches the embedded `ferriskey::Client`
572    /// directly via `ff_suspend_execution` — this is the deferred
573    /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
574    /// work. `claim_next` / `claim_from_grant` /
575    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
576    /// are Stage 1c hot-path work. Stage 1d removes the embedded
577    /// client entirely.
578    ///
579    /// Today's constructor is therefore NOT yet a drop-in way to swap
580    /// in a non-Valkey backend — it requires a reachable Valkey node
581    /// for `suspend` plus the remaining hot-path ops. Tests that
582    /// exercise only the migrated per-task ops can run fully against
583    /// a mock backend.
584    ///
585    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
586    /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
587    pub async fn connect_with(
588        config: WorkerConfig,
589        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
590        completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
591    ) -> Result<Self, SdkError> {
592        let mut worker = Self::connect(config).await?;
593        worker.backend = backend;
594        worker.completion_backend_handle = completion;
595        Ok(worker)
596    }
597
598    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
599    /// ops through.
600    ///
601    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
602    /// the `Option` wrapper is retained for API stability with the
603    /// Stage-1a shape. Stage 1c narrows the return type to
604    /// `&Arc<dyn EngineBackend>`.
605    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
606        Some(&self.backend)
607    }
608
609    /// Crate-internal direct borrow of the backend. The public
610    /// [`Self::backend`] still returns `Option` for API stability
611    /// (Stage 1b holdover). Snapshot trait-forwarders in
612    /// [`crate::snapshot`] need an un-wrapped reference.
613    pub(crate) fn backend_ref(
614        &self,
615    ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
616        &self.backend
617    }
618
619    /// Handle to the completion-event subscription backend, for
620    /// consumers that need to observe execution completions (DAG
621    /// reconcilers, tenant-isolated subscribers).
622    ///
623    /// Returns `Some` when the worker was built through
624    /// [`Self::connect`] on the default `valkey-default` feature
625    /// (the bundled `ValkeyBackend` implements
626    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
627    /// or via [`Self::connect_with`] with a `Some(..)` completion
628    /// handle. Returns `None` when the caller passed `None` to
629    /// [`Self::connect_with`] — i.e. the backend does not support
630    /// push-based completion streams (future Postgres without
631    /// LISTEN/NOTIFY, test mocks).
632    ///
633    /// The returned handle shares the same underlying allocation as
634    /// [`Self::backend`]; calls through it (e.g.
635    /// `subscribe_completions_filtered`) hit the same connection
636    /// the worker itself uses.
637    pub fn completion_backend(
638        &self,
639    ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
640        self.completion_backend_handle.clone()
641    }
642
643    /// Get the worker config.
644    pub fn config(&self) -> &WorkerConfig {
645        &self.config
646    }
647
648    /// Get the server-published partition config this worker bound to at
649    /// `connect()`. Exposed so consumers that mint custom
650    /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
651    /// produced outside this worker) stay aligned with the server's
652    /// `num_flow_partitions` — using `PartitionConfig::default()`
653    /// assumes 256 partitions and silently misses data on deployments
654    /// with any other value.
655    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
656        &self.partition_config
657    }
658
659    /// Attempt to claim the next eligible execution.
660    ///
661    /// Phase 1 simplified claim flow:
662    /// 1. Pick a lane (round-robin across configured lanes)
663    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
664    /// 3. Claim the execution via `ff_claim_execution`
665    /// 4. Read execution payload + tags
666    /// 5. Return a [`ClaimedTask`] with auto lease renewal
667    ///
668    /// Gated behind the `direct-valkey-claim` feature — bypasses the
669    /// scheduler's budget / quota / capability admission checks. Enable
670    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
671    /// the scheduler hop would be measurement noise (benches) or when
672    /// the test harness needs a deterministic worker-local path. Prefer
673    /// the scheduler-routed HTTP claim path in production.
674    ///
675    /// # `None` semantics
676    ///
677    /// `Ok(None)` means **no work was found in the partition window this
678    /// poll covered**, not "the cluster is idle". Each call scans a chunk
679    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
680    /// `scan_cursor`; the cursor advances by that chunk size on every
681    /// invocation, so a worker covers every partition exactly once every
682    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
683    ///
684    /// Callers should treat `None` as "poll again soon" (typically after
685    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
686    /// time". Backing off too aggressively on `None` can starve workers
687    /// when work lives on partitions outside the current window.
688    ///
689    /// Returns `Err` on Valkey errors or script failures.
690    #[cfg(feature = "direct-valkey-claim")]
691    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
692        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
693        // try_acquire returns immediately — if no permits available, the worker
694        // is at capacity and should not claim more work.
695        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
696            Ok(p) => p,
697            Err(_) => return Ok(None), // At capacity — no claim attempted
698        };
699
700        let lane_id = self.next_lane();
701        let now = TimestampMs::now();
702
703        // Phase 1: We scan eligible executions directly by reading the eligible
704        // ZSET across execution partitions. In production the scheduler
705        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
706        // simplified inline claim.
707        //
708        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
709        // partitions starting at a rolling offset. This keeps idle Valkey
710        // load at O(chunk) per worker-second instead of O(num_partitions),
711        // and the worker-instance-seeded initial cursor spreads concurrent
712        // workers across different partition windows.
713        let num_partitions = self.partition_config.num_flow_partitions as usize;
714        if num_partitions == 0 {
715            return Ok(None);
716        }
717        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
718        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
719
720        for step in 0..chunk {
721            let partition_idx = ((start + step) % num_partitions) as u16;
722            let partition = ff_core::partition::Partition {
723                family: ff_core::partition::PartitionFamily::Execution,
724                index: partition_idx,
725            };
726            let idx = IndexKeys::new(&partition);
727            let eligible_key = idx.lane_eligible(&lane_id);
728
729            // ZRANGEBYSCORE to get the highest-priority eligible execution.
730            // Score format: -(priority * 1_000_000_000_000) + created_at_ms
731            // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
732            let result: Value = self
733                .client
734                .cmd("ZRANGEBYSCORE")
735                .arg(&eligible_key)
736                .arg("-inf")
737                .arg("+inf")
738                .arg("LIMIT")
739                .arg("0")
740                .arg("1")
741                .execute()
742                .await
743                .map_err(|e| crate::backend_context(e, "ZRANGEBYSCORE failed"))?;
744
745            let execution_id_str = match extract_first_array_string(&result) {
746                Some(s) => s,
747                None => continue, // No eligible executions on this partition
748            };
749
750            let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
751                SdkError::from(ff_script::error::ScriptError::Parse {
752                    fcall: "claim_execution_from_eligible_set".into(),
753                    execution_id: None,
754                    message: format!("bad execution_id in eligible set: {e}"),
755                })
756            })?;
757
758            // Step 1: Issue claim grant
759            let grant_result = self
760                .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
761                .await;
762
763            match grant_result {
764                Ok(()) => {}
765                Err(SdkError::Engine(ref boxed))
766                    if matches!(
767                        **boxed,
768                        crate::EngineError::Validation {
769                            kind: crate::ValidationKind::CapabilityMismatch,
770                            ..
771                        }
772                    ) =>
773                {
774                    let missing = match &**boxed {
775                        crate::EngineError::Validation { detail, .. } => detail.clone(),
776                        _ => unreachable!(),
777                    };
778                    // Block-on-mismatch (RFC-009 §7.5) — parity with
779                    // ff-scheduler's Scheduler::claim_for_worker. Without
780                    // this, the inline-direct-claim path would hot-loop
781                    // on an unclaimable top-of-zset (every tick picks the
782                    // same execution, wastes an FCALL, logs, releases,
783                    // repeats). The scheduler-side unblock scanner
784                    // promotes blocked_route executions back to eligible
785                    // when a worker with matching caps registers.
786                    tracing::info!(
787                        execution_id = %execution_id,
788                        worker_id = %self.config.worker_id,
789                        worker_caps_hash = %self.worker_capabilities_hash,
790                        missing = %missing,
791                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
792                    );
793                    self.block_route(&execution_id, &lane_id, &partition, &idx).await;
794                    continue;
795                }
796                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
797                    tracing::debug!(
798                        execution_id = %execution_id,
799                        error = %e,
800                        "claim grant failed (retryable), trying next partition"
801                    );
802                    continue;
803                }
804                Err(e) => return Err(e),
805            }
806
807            // Step 2: Claim the execution
808            match self
809                .claim_execution(&execution_id, &lane_id, &partition, now)
810                .await
811            {
812                Ok(mut task) => {
813                    // Transfer concurrency permit to the task. When the task is
814                    // completed/failed/cancelled/dropped the permit returns to
815                    // the semaphore, allowing another claim.
816                    task.set_concurrency_permit(permit);
817                    return Ok(Some(task));
818                }
819                Err(SdkError::Engine(ref boxed))
820                    if matches!(
821                        **boxed,
822                        crate::EngineError::Contention(
823                            crate::ContentionKind::UseClaimResumedExecution
824                        )
825                    ) =>
826                {
827                    // Execution was resumed from suspension — attempt_interrupted.
828                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
829                    // which reuses the existing attempt instead of creating a new one.
830                    tracing::debug!(
831                        execution_id = %execution_id,
832                        "execution is resumed, using claim_resumed path"
833                    );
834                    match self
835                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
836                        .await
837                    {
838                        Ok(mut task) => {
839                            task.set_concurrency_permit(permit);
840                            return Ok(Some(task));
841                        }
842                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
843                            tracing::debug!(
844                                execution_id = %execution_id,
845                                error = %e2,
846                                "claim_resumed failed (retryable), trying next partition"
847                            );
848                            continue;
849                        }
850                        Err(e2) => return Err(e2),
851                    }
852                }
853                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
854                    tracing::debug!(
855                        execution_id = %execution_id,
856                        error = %e,
857                        "claim execution failed (retryable), trying next partition"
858                    );
859                    continue;
860                }
861                Err(e) => return Err(e),
862            }
863        }
864
865        // No eligible work found on any partition
866        Ok(None)
867    }
868
869    #[cfg(feature = "direct-valkey-claim")]
870    async fn issue_claim_grant(
871        &self,
872        execution_id: &ExecutionId,
873        lane_id: &LaneId,
874        partition: &ff_core::partition::Partition,
875        idx: &IndexKeys,
876    ) -> Result<(), SdkError> {
877        let ctx = ExecKeyContext::new(partition, execution_id);
878
879        // KEYS (3): exec_core, claim_grant_key, eligible_zset
880        let keys: Vec<String> = vec![
881            ctx.core(),
882            ctx.claim_grant(),
883            idx.lane_eligible(lane_id),
884        ];
885
886        // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
887        //           capability_hash, grant_ttl_ms, route_snapshot_json,
888        //           admission_summary, worker_capabilities_csv (sorted)
889        let args: Vec<String> = vec![
890            execution_id.to_string(),
891            self.config.worker_id.to_string(),
892            self.config.worker_instance_id.to_string(),
893            lane_id.to_string(),
894            String::new(), // capability_hash
895            "5000".to_owned(), // grant_ttl_ms (5 seconds)
896            String::new(), // route_snapshot_json
897            String::new(), // admission_summary
898            self.worker_capabilities_csv.clone(), // sorted CSV
899        ];
900
901        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
902        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
903
904        let raw: Value = self
905            .client
906            .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
907            .await
908            .map_err(SdkError::from)?;
909
910        crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
911    }
912
913    /// Move an execution from the lane's eligible ZSET into its
914    /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
915    /// after a `CapabilityMismatch` reject — without this, the inline
916    /// direct-claim path would re-pick the same top-of-zset every tick
917    /// (same pattern the scheduler's block_candidate handles). The
918    /// engine's unblock scanner periodically promotes blocked_route
919    /// back to eligible once a worker with matching caps registers.
920    ///
921    /// Best-effort: transport or logical rejects (e.g. the execution
922    /// already went terminal between pick and block) are logged and the
923    /// outer loop simply `continue`s to the next partition. Parity with
924    /// ff-scheduler::Scheduler::block_candidate.
925    #[cfg(feature = "direct-valkey-claim")]
926    async fn block_route(
927        &self,
928        execution_id: &ExecutionId,
929        lane_id: &LaneId,
930        partition: &ff_core::partition::Partition,
931        idx: &IndexKeys,
932    ) {
933        let ctx = ExecKeyContext::new(partition, execution_id);
934        let core_key = ctx.core();
935        let eligible_key = idx.lane_eligible(lane_id);
936        let blocked_key = idx.lane_blocked_route(lane_id);
937        let eid_s = execution_id.to_string();
938        let now_ms = TimestampMs::now().0.to_string();
939
940        let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
941        let argv: [&str; 4] = [
942            &eid_s,
943            "waiting_for_capable_worker",
944            "no connected worker satisfies required_capabilities",
945            &now_ms,
946        ];
947
948        match self
949            .client
950            .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
951            .await
952        {
953            Ok(v) => {
954                // Parse Lua result so a logical reject (e.g. execution
955                // went terminal mid-flight) is visible — same fix we
956                // applied to ff-scheduler's block_candidate.
957                if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
958                    tracing::warn!(
959                        execution_id = %execution_id,
960                        error = %e,
961                        "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
962                         will re-evaluate"
963                    );
964                }
965            }
966            Err(e) => {
967                tracing::warn!(
968                    execution_id = %execution_id,
969                    error = %e,
970                    "SDK block_route: transport failure; eligible ZSET unchanged"
971                );
972            }
973        }
974    }
975
976    /// Low-level claim of a granted execution. Invokes
977    /// `ff_claim_execution` and returns a `ClaimedTask` with auto
978    /// lease renewal.
979    ///
980    /// Previously gated behind `direct-valkey-claim`; ungated so
981    /// the public [`claim_from_grant`] entry point can reuse the
982    /// same FCALL plumbing. The method stays private — external
983    /// callers use `claim_from_grant`.
984    ///
985    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
986    async fn claim_execution(
987        &self,
988        execution_id: &ExecutionId,
989        lane_id: &LaneId,
990        partition: &ff_core::partition::Partition,
991        _now: TimestampMs,
992    ) -> Result<ClaimedTask, SdkError> {
993        let ctx = ExecKeyContext::new(partition, execution_id);
994        let idx = IndexKeys::new(partition);
995
996        // Pre-read total_attempt_count from exec_core to derive next attempt index.
997        // The Lua uses total_attempt_count as the new index and dynamically builds
998        // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
999        // we pass the correct index for documentation/debugging.
1000        let total_str: Option<String> = self.client
1001            .cmd("HGET")
1002            .arg(ctx.core())
1003            .arg("total_attempt_count")
1004            .execute()
1005            .await
1006            .unwrap_or(None);
1007        let next_idx = total_str
1008            .as_deref()
1009            .and_then(|s| s.parse::<u32>().ok())
1010            .unwrap_or(0);
1011        let att_idx = AttemptIndex::new(next_idx);
1012
1013        let lease_id = LeaseId::new().to_string();
1014        let attempt_id = AttemptId::new().to_string();
1015        let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
1016
1017        // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
1018        let keys: Vec<String> = vec![
1019            ctx.core(),                                    // 1  exec_core
1020            ctx.claim_grant(),                             // 2  claim_grant
1021            idx.lane_eligible(lane_id),                    // 3  eligible_zset
1022            idx.lease_expiry(),                            // 4  lease_expiry_zset
1023            idx.worker_leases(&self.config.worker_instance_id), // 5  worker_leases
1024            ctx.attempt_hash(att_idx),                     // 6  attempt_hash (placeholder)
1025            ctx.attempt_usage(att_idx),                    // 7  attempt_usage (placeholder)
1026            ctx.attempt_policy(att_idx),                   // 8  attempt_policy (placeholder)
1027            ctx.attempts(),                                // 9  attempts_zset
1028            ctx.lease_current(),                           // 10 lease_current
1029            ctx.lease_history(),                           // 11 lease_history
1030            idx.lane_active(lane_id),                      // 12 active_index
1031            idx.attempt_timeout(),                         // 13 attempt_timeout_zset
1032            idx.execution_deadline(),                      // 14 execution_deadline_zset
1033        ];
1034
1035        // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
1036        let args: Vec<String> = vec![
1037            execution_id.to_string(),                      // 1  execution_id
1038            self.config.worker_id.to_string(),             // 2  worker_id
1039            self.config.worker_instance_id.to_string(),    // 3  worker_instance_id
1040            lane_id.to_string(),                           // 4  lane
1041            String::new(),                                 // 5  capability_hash
1042            lease_id.clone(),                              // 6  lease_id
1043            self.config.lease_ttl_ms.to_string(),          // 7  lease_ttl_ms
1044            renew_before_ms.to_string(),                   // 8  renew_before_ms
1045            attempt_id.clone(),                            // 9  attempt_id
1046            "{}".to_owned(),                               // 10 attempt_policy_json
1047            String::new(),                                 // 11 attempt_timeout_ms
1048            String::new(),                                 // 12 execution_deadline_at
1049        ];
1050
1051        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1052        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1053
1054        let raw: Value = self
1055            .client
1056            .fcall("ff_claim_execution", &key_refs, &arg_refs)
1057            .await
1058            .map_err(SdkError::from)?;
1059
1060        // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
1061        //                      attempt_id, attempt_type, lease_expires_at}
1062        let arr = match &raw {
1063            Value::Array(arr) => arr,
1064            _ => {
1065                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1066                    fcall: "ff_claim_execution".into(),
1067                    execution_id: Some(execution_id.to_string()),
1068                    message: "expected Array".into(),
1069                }));
1070            }
1071        };
1072
1073        let status_code = match arr.first() {
1074            Some(Ok(Value::Int(n))) => *n,
1075            _ => {
1076                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1077                    fcall: "ff_claim_execution".into(),
1078                    execution_id: Some(execution_id.to_string()),
1079                    message: "bad status code".into(),
1080                }));
1081            }
1082        };
1083
1084        if status_code != 1 {
1085            let err_field_str = |idx: usize| -> String {
1086                arr.get(idx)
1087                    .and_then(|v| match v {
1088                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1089                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1090                        _ => None,
1091                    })
1092                    .unwrap_or_default()
1093            };
1094            let error_code = {
1095                let s = err_field_str(1);
1096                if s.is_empty() { "unknown".to_owned() } else { s }
1097            };
1098            let detail = err_field_str(2);
1099
1100            return Err(SdkError::from(
1101                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1102                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1103                        fcall: "ff_claim_execution".into(),
1104                        execution_id: Some(execution_id.to_string()),
1105                        message: format!("unknown error: {error_code}"),
1106                    }),
1107            ));
1108        }
1109
1110        // Extract fields from success response
1111        let field_str = |idx: usize| -> String {
1112            arr.get(idx + 2) // skip status_code and "OK"
1113                .and_then(|v| match v {
1114                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1115                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1116                    Ok(Value::Int(n)) => Some(n.to_string()),
1117                    _ => None,
1118                })
1119                .unwrap_or_default()
1120        };
1121
1122        // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1123        // Positions:       0         1       2           3            4              5
1124        let lease_id = LeaseId::parse(&field_str(0))
1125            .unwrap_or_else(|_| LeaseId::new());
1126        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1127        // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1128        let attempt_id = AttemptId::parse(&field_str(3))
1129            .unwrap_or_else(|_| AttemptId::new());
1130        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1131
1132        // Read execution payload and metadata
1133        let (input_payload, execution_kind, tags) = self
1134            .read_execution_context(execution_id, partition)
1135            .await?;
1136
1137        Ok(ClaimedTask::new(
1138            self.client.clone(),
1139            self.backend.clone(),
1140            self.partition_config,
1141            execution_id.clone(),
1142            attempt_index,
1143            attempt_id,
1144            lease_id,
1145            lease_epoch,
1146            self.config.lease_ttl_ms,
1147            lane_id.clone(),
1148            self.config.worker_instance_id.clone(),
1149            input_payload,
1150            execution_kind,
1151            tags,
1152        ))
1153    }
1154
1155    /// Consume a [`ClaimGrant`] and claim the granted execution on
1156    /// this worker. The intended production entry point: pair with
1157    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1158    /// scheduler-issued grants into the SDK without enabling the
1159    /// `direct-valkey-claim` feature (which bypasses budget/quota
1160    /// admission control).
1161    ///
1162    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1163    /// so a saturated worker does not consume the grant: the grant
1164    /// stays valid for its remaining TTL and the caller can either
1165    /// release it back to the scheduler or retry after some other
1166    /// in-flight task completes.
1167    ///
1168    /// On success the returned [`ClaimedTask`] holds a concurrency
1169    /// permit that releases automatically on
1170    /// `complete`/`fail`/`cancel`/drop — same contract as
1171    /// `claim_next`.
1172    ///
1173    /// # Arguments
1174    ///
1175    /// * `lane` — the lane the grant was issued for. Must match what
1176    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1177    ///   uses it to look up `lane_eligible`, `lane_active`, and the
1178    ///   `worker_leases` index slot.
1179    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1180    ///
1181    /// # Errors
1182    ///
1183    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1184    ///   permits all held. Retryable; the grant is untouched.
1185    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1186    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1187    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1188    ///   (wrapped in [`SdkError::Engine`]).
1189    /// * `ScriptError::CapabilityMismatch` — execution's required
1190    ///   capabilities not a subset of this worker's caps (wrapped in
1191    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
1192    ///   between grant issuance and caps change allows it.
1193    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1194    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
1195    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1196    ///   transport error during the FCALL or the
1197    ///   `read_execution_context` follow-up.
1198    ///
1199    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1200    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1201    pub async fn claim_from_grant(
1202        &self,
1203        lane: LaneId,
1204        grant: ff_core::contracts::ClaimGrant,
1205    ) -> Result<ClaimedTask, SdkError> {
1206        // Semaphore check FIRST. If the worker is saturated we must
1207        // surface the condition to the caller without touching the
1208        // grant — silently returning Ok(None) (as claim_next does)
1209        // would drop a grant the scheduler has already committed work
1210        // to issuing, wasting the slot until its TTL elapses.
1211        let permit = self
1212            .concurrency_semaphore
1213            .clone()
1214            .try_acquire_owned()
1215            .map_err(|_| SdkError::WorkerAtCapacity)?;
1216
1217        let now = TimestampMs::now();
1218        let partition = grant.partition().map_err(|e| SdkError::Config {
1219            context: "claim_from_grant".to_owned(),
1220            field: Some("partition_key".to_owned()),
1221            message: e.to_string(),
1222        })?;
1223        let mut task = match self
1224            .claim_execution(&grant.execution_id, &lane, &partition, now)
1225            .await
1226        {
1227            Ok(task) => task,
1228            Err(SdkError::Engine(ref boxed))
1229                if matches!(
1230                    **boxed,
1231                    crate::EngineError::Contention(
1232                        crate::ContentionKind::UseClaimResumedExecution
1233                    )
1234                ) =>
1235            {
1236                // Execution was resumed from suspension — attempt_interrupted.
1237                // ff_claim_execution rejects this; use ff_claim_resumed_execution
1238                // which reuses the existing attempt instead of creating a new one.
1239                // Mirrors the fallback inside `claim_next` so HTTP-routed callers
1240                // (`claim_via_server` → `claim_from_grant`) get the same transparent
1241                // re-claim behavior.
1242                tracing::debug!(
1243                    execution_id = %grant.execution_id,
1244                    "execution is resumed, using claim_resumed path"
1245                );
1246                self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
1247                    .await?
1248            }
1249            Err(e) => return Err(e),
1250        };
1251        task.set_concurrency_permit(permit);
1252        Ok(task)
1253    }
1254
1255    /// Scheduler-routed claim: POST the server's
1256    /// `/v1/workers/{id}/claim`, then chain to
1257    /// [`Self::claim_from_grant`].
1258    ///
1259    /// Batch C item 2 PR-B. This is the production entry point —
1260    /// budget + quota + capability admission run server-side inside
1261    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1262    /// enable the `direct-valkey-claim` feature.
1263    ///
1264    /// Returns `Ok(None)` when the server says no eligible execution
1265    /// (HTTP 204). Callers typically back off by
1266    /// `config.claim_poll_interval_ms` and try again, same cadence
1267    /// as the direct-claim path's `Ok(None)`.
1268    ///
1269    /// The `admin` client is the established HTTP surface
1270    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1271    /// second reqwest client around. Build once at worker boot and
1272    /// hand in by reference on every claim.
1273    pub async fn claim_via_server(
1274        &self,
1275        admin: &crate::FlowFabricAdminClient,
1276        lane: &LaneId,
1277        grant_ttl_ms: u64,
1278    ) -> Result<Option<ClaimedTask>, SdkError> {
1279        let req = crate::admin::ClaimForWorkerRequest {
1280            worker_id: self.config.worker_id.to_string(),
1281            lane_id: lane.to_string(),
1282            worker_instance_id: self.config.worker_instance_id.to_string(),
1283            capabilities: self.config.capabilities.clone(),
1284            grant_ttl_ms,
1285        };
1286        let Some(resp) = admin.claim_for_worker(req).await? else {
1287            return Ok(None);
1288        };
1289        let grant = resp.into_grant()?;
1290        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1291    }
1292
1293    /// Consume a [`ReclaimGrant`] and transition the granted
1294    /// `attempt_interrupted` execution into a `started` state on this
1295    /// worker. Symmetric partner to [`claim_from_grant`] for the
1296    /// resume path.
1297    ///
1298    /// The grant must have been issued to THIS worker (matching
1299    /// `worker_id` at grant time). A mismatch returns
1300    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1301    /// atomically by `ff_claim_resumed_execution`; a second call with
1302    /// the same grant also returns `InvalidClaimGrant`.
1303    ///
1304    /// # Concurrency
1305    ///
1306    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1307    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1308    /// assume pre-existing capacity on this worker — a reclaim can
1309    /// land on a fresh worker instance that just came up after a
1310    /// crash/restart and is picking up a previously-interrupted
1311    /// execution. If the worker is saturated, the grant stays valid
1312    /// for its remaining TTL and the caller can release it or retry.
1313    ///
1314    /// On success the returned [`ClaimedTask`] holds a concurrency
1315    /// permit that releases automatically on
1316    /// `complete`/`fail`/`cancel`/drop.
1317    ///
1318    /// # Errors
1319    ///
1320    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1321    ///   permits all held. Retryable; the grant is untouched (no
1322    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1323    ///   atomically consume the grant key).
1324    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1325    ///   or `worker_id` mismatch.
1326    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1327    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1328    ///   `attempt_interrupted`.
1329    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1330    ///   not `runnable`.
1331    /// * `ScriptError::ExecutionNotFound` — core key missing.
1332    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1333    ///   transport.
1334    ///
1335    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1336    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1337    pub async fn claim_from_reclaim_grant(
1338        &self,
1339        grant: ff_core::contracts::ReclaimGrant,
1340    ) -> Result<ClaimedTask, SdkError> {
1341        // Semaphore check FIRST — same load-bearing ordering as
1342        // `claim_from_grant`. If the worker is saturated, surface
1343        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1344        // atomic consume on the grant key, so calling it past-
1345        // saturation would destroy the grant while leaving no
1346        // permit to attach to the returned `ClaimedTask`.
1347        let permit = self
1348            .concurrency_semaphore
1349            .clone()
1350            .try_acquire_owned()
1351            .map_err(|_| SdkError::WorkerAtCapacity)?;
1352
1353        // Grant carries partition + lane_id so no round-trip is needed
1354        // to resolve them before the FCALL.
1355        let partition = grant.partition().map_err(|e| SdkError::Config {
1356            context: "claim_from_reclaim_grant".to_owned(),
1357            field: Some("partition_key".to_owned()),
1358            message: e.to_string(),
1359        })?;
1360        let mut task = self
1361            .claim_resumed_execution(
1362                &grant.execution_id,
1363                &grant.lane_id,
1364                &partition,
1365            )
1366            .await?;
1367        task.set_concurrency_permit(permit);
1368        Ok(task)
1369    }
1370
1371    /// Low-level resume claim. Forwards through
1372    /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1373    /// — the trait-level trigger surface landed in issue #150 — and
1374    /// returns a [`ClaimedTask`] bound to the resumed attempt.
1375    ///
1376    /// The method stays private; external callers use
1377    /// [`claim_from_reclaim_grant`].
1378    ///
1379    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1380    async fn claim_resumed_execution(
1381        &self,
1382        execution_id: &ExecutionId,
1383        lane_id: &LaneId,
1384        partition: &ff_core::partition::Partition,
1385    ) -> Result<ClaimedTask, SdkError> {
1386        let ctx = ExecKeyContext::new(partition, execution_id);
1387
1388        // Pre-read current_attempt_index for the existing attempt hash key.
1389        // This is load-bearing: the backend's KEYS[6] must point to the
1390        // real attempt hash, and the backend takes the index verbatim
1391        // from `ClaimResumedExecutionArgs::current_attempt_index`.
1392        let att_idx_str: Option<String> = self.client
1393            .cmd("HGET")
1394            .arg(ctx.core())
1395            .arg("current_attempt_index")
1396            .execute()
1397            .await
1398            .map_err(|e| crate::backend_context(e, "read attempt_index"))?;
1399        let att_idx = AttemptIndex::new(
1400            att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1401        );
1402
1403        let args = ff_core::contracts::ClaimResumedExecutionArgs {
1404            execution_id: execution_id.clone(),
1405            worker_id: self.config.worker_id.clone(),
1406            worker_instance_id: self.config.worker_instance_id.clone(),
1407            lane_id: lane_id.clone(),
1408            lease_id: LeaseId::new(),
1409            lease_ttl_ms: self.config.lease_ttl_ms,
1410            current_attempt_index: att_idx,
1411            remaining_attempt_timeout_ms: None,
1412            now: TimestampMs::now(),
1413        };
1414
1415        let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1416            self.backend.claim_resumed_execution(args).await?;
1417
1418        let (input_payload, execution_kind, tags) = self
1419            .read_execution_context(execution_id, partition)
1420            .await?;
1421
1422        Ok(ClaimedTask::new(
1423            self.client.clone(),
1424            self.backend.clone(),
1425            self.partition_config,
1426            execution_id.clone(),
1427            claimed.attempt_index,
1428            claimed.attempt_id,
1429            claimed.lease_id,
1430            claimed.lease_epoch,
1431            self.config.lease_ttl_ms,
1432            lane_id.clone(),
1433            self.config.worker_instance_id.clone(),
1434            input_payload,
1435            execution_kind,
1436            tags,
1437        ))
1438    }
1439
1440    /// Read payload + execution_kind + tags from exec_core. Previously
1441    /// gated behind `direct-valkey-claim`; now shared by the
1442    /// feature-gated inline claim path and the public
1443    /// `claim_from_reclaim_grant` entry point.
1444    async fn read_execution_context(
1445        &self,
1446        execution_id: &ExecutionId,
1447        partition: &ff_core::partition::Partition,
1448    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1449        let ctx = ExecKeyContext::new(partition, execution_id);
1450
1451        // Read payload
1452        let payload: Option<String> = self
1453            .client
1454            .get(&ctx.payload())
1455            .await
1456            .map_err(|e| crate::backend_context(e, "GET payload failed"))?;
1457        let input_payload = payload.unwrap_or_default().into_bytes();
1458
1459        // Read execution_kind from core
1460        let kind: Option<String> = self
1461            .client
1462            .hget(&ctx.core(), "execution_kind")
1463            .await
1464            .map_err(|e| crate::backend_context(e, "HGET execution_kind failed"))?;
1465        let execution_kind = kind.unwrap_or_default();
1466
1467        // Read tags
1468        let tags: HashMap<String, String> = self
1469            .client
1470            .hgetall(&ctx.tags())
1471            .await
1472            .map_err(|e| crate::backend_context(e, "HGETALL tags"))?;
1473
1474        Ok((input_payload, execution_kind, tags))
1475    }
1476
1477    // ── Phase 3: Signal delivery ──
1478
1479    /// Deliver a signal to a suspended execution's waitpoint.
1480    ///
1481    /// The engine atomically records the signal, evaluates the resume condition,
1482    /// and optionally transitions the execution from `suspended` to `runnable`.
1483    ///
1484    /// Forwards through
1485    /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1486    /// — the trait-level trigger surface landed in issue #150.
1487    pub async fn deliver_signal(
1488        &self,
1489        execution_id: &ExecutionId,
1490        waitpoint_id: &WaitpointId,
1491        signal: crate::task::Signal,
1492    ) -> Result<crate::task::SignalOutcome, SdkError> {
1493        let args = ff_core::contracts::DeliverSignalArgs {
1494            execution_id: execution_id.clone(),
1495            waitpoint_id: waitpoint_id.clone(),
1496            signal_id: ff_core::types::SignalId::new(),
1497            signal_name: signal.signal_name,
1498            signal_category: signal.signal_category,
1499            source_type: signal.source_type,
1500            source_identity: signal.source_identity,
1501            payload: signal.payload,
1502            payload_encoding: Some("json".to_owned()),
1503            correlation_id: None,
1504            idempotency_key: signal.idempotency_key,
1505            target_scope: "waitpoint".to_owned(),
1506            created_at: Some(TimestampMs::now()),
1507            dedup_ttl_ms: None,
1508            resume_delay_ms: None,
1509            max_signals_per_execution: None,
1510            signal_maxlen: None,
1511            waitpoint_token: signal.waitpoint_token,
1512            now: TimestampMs::now(),
1513        };
1514
1515        let result = self.backend.deliver_signal(args).await?;
1516        Ok(match result {
1517            ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1518                if effect == "resume_condition_satisfied" {
1519                    crate::task::SignalOutcome::TriggeredResume { signal_id }
1520                } else {
1521                    crate::task::SignalOutcome::Accepted { signal_id, effect }
1522                }
1523            }
1524            ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1525                crate::task::SignalOutcome::Duplicate {
1526                    existing_signal_id: existing_signal_id.to_string(),
1527                }
1528            }
1529        })
1530    }
1531
1532    #[cfg(feature = "direct-valkey-claim")]
1533    fn next_lane(&self) -> LaneId {
1534        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1535        self.config.lanes[idx].clone()
1536    }
1537}
1538
1539#[cfg(feature = "direct-valkey-claim")]
1540fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1541    use ff_core::error::ErrorClass;
1542    matches!(
1543        ff_script::engine_error_ext::class(err),
1544        ErrorClass::Retryable | ErrorClass::Informational
1545    )
1546}
1547
1548/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1549/// instance id with FNV-1a to place distinct worker processes on different
1550/// partition windows from their first poll. Zero is valid for single-worker
1551/// clusters but spreads work in multi-worker deployments.
1552#[cfg(feature = "direct-valkey-claim")]
1553fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1554    if num_partitions == 0 {
1555        return 0;
1556    }
1557    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1558}
1559
1560#[cfg(feature = "direct-valkey-claim")]
1561fn extract_first_array_string(value: &Value) -> Option<String> {
1562    match value {
1563        Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1564            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1565            Ok(Value::SimpleString(s)) => Some(s.clone()),
1566            _ => None,
1567        },
1568        _ => None,
1569    }
1570}
1571
1572/// Read partition config from Valkey's `ff:config:partitions` hash.
1573/// Returns Err if the key doesn't exist or can't be read.
1574async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1575    let key = ff_core::keys::global_config_partitions();
1576    let fields: HashMap<String, String> = client
1577        .hgetall(&key)
1578        .await
1579        .map_err(|e| crate::backend_context(e, format!("HGETALL {key}")))?;
1580
1581    if fields.is_empty() {
1582        return Err(SdkError::Config {
1583            context: "read_partition_config".into(),
1584            field: None,
1585            message: "ff:config:partitions not found in Valkey".into(),
1586        });
1587    }
1588
1589    let parse = |field: &str, default: u16| -> u16 {
1590        fields
1591            .get(field)
1592            .and_then(|v| v.parse().ok())
1593            .filter(|&n: &u16| n > 0)
1594            .unwrap_or(default)
1595    };
1596
1597    Ok(PartitionConfig {
1598        num_flow_partitions: parse("num_flow_partitions", 256),
1599        num_budget_partitions: parse("num_budget_partitions", 32),
1600        num_quota_partitions: parse("num_quota_partitions", 32),
1601    })
1602}
1603
1604#[cfg(test)]
1605mod completion_accessor_type_tests {
1606    //! Type-level compile check that
1607    //! [`FlowFabricWorker::completion_backend`] returns an
1608    //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1609    //! the assertion is at the function-pointer type level and the
1610    //! #[test] body exists solely so the compiler elaborates it.
1611    use super::FlowFabricWorker;
1612    use ff_core::completion_backend::CompletionBackend;
1613    use std::sync::Arc;
1614
1615    #[test]
1616    fn completion_backend_accessor_signature() {
1617        // If this line compiles, the public accessor returns the
1618        // advertised type. The function is never called (no live
1619        // worker), so no I/O happens.
1620        let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1621            FlowFabricWorker::completion_backend;
1622    }
1623}
1624
1625#[cfg(all(test, feature = "direct-valkey-claim"))]
1626mod scan_cursor_tests {
1627    use super::scan_cursor_seed;
1628
1629    #[test]
1630    fn stable_for_same_input() {
1631        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1632    }
1633
1634    #[test]
1635    fn distinct_for_different_ids() {
1636        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1637    }
1638
1639    #[test]
1640    fn bounded_by_partition_count() {
1641        for i in 0..100 {
1642            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1643        }
1644    }
1645
1646    #[test]
1647    fn zero_partitions_returns_zero() {
1648        assert_eq!(scan_cursor_seed("w1", 0), 0);
1649    }
1650}