Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43///     if let Some(task) = worker.claim_next().await? {
44///         // Process task...
45///         task.complete(Some(b"result".to_vec())).await?;
46///     } else {
47///         tokio::time::sleep(Duration::from_secs(1)).await;
48///     }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52    client: Client,
53    config: WorkerConfig,
54    partition_config: PartitionConfig,
55    /// Sorted, deduplicated, comma-separated capabilities — computed once
56    /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57    /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58    /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59    /// reproducible logs and tests.
60    #[cfg(feature = "direct-valkey-claim")]
61    worker_capabilities_csv: String,
62    /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63    /// per-mismatch logs so the 4KB CSV never echoes on every reject
64    /// during an incident. Full CSV logged once at connect-time WARN for
65    /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66    #[cfg(feature = "direct-valkey-claim")]
67    worker_capabilities_hash: String,
68    #[cfg(feature = "direct-valkey-claim")]
69    lane_index: AtomicUsize,
70    /// Concurrency cap for in-flight tasks. Permits are acquired or
71    /// transferred by [`claim_next`] (feature-gated),
72    /// [`claim_from_grant`] (always available), and
73    /// [`claim_from_reclaim_grant`], transferred to the returned
74    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75    /// Holds `max_concurrent_tasks` permits total.
76    ///
77    /// [`claim_next`]: FlowFabricWorker::claim_next
78    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80    concurrency_semaphore: Arc<Semaphore>,
81    /// Rolling offset for chunked partition scans. Each poll advances the
82    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83    /// chunk)` polls every partition is covered. The initial value is
84    /// derived from `worker_instance_id` so idle workers spread their
85    /// scans across different partitions from the first poll onward.
86    ///
87    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91    /// correctness because the sequence simply restarts a new cycle.
92    #[cfg(feature = "direct-valkey-claim")]
93    scan_cursor: AtomicUsize,
94    /// The [`EngineBackend`] the Stage-1b trait forwarders route
95    /// through.
96    ///
97    /// **RFC-012 Stage 1b.** Always populated:
98    /// [`FlowFabricWorker::connect`] now wraps the worker's own
99    /// `ferriskey::Client` in a `ValkeyBackend` via
100    /// `ValkeyBackend::from_client_and_partitions`, and
101    /// [`FlowFabricWorker::connect_with`] replaces that default with
102    /// the caller-supplied `Arc<dyn EngineBackend>`. The
103    /// [`FlowFabricWorker::backend`] accessor still returns
104    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
105    /// narrows the return type once consumers have migrated.
106    ///
107    /// Hot paths (claim, deliver_signal, admin queries) still use the
108    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
109    /// migrates them through this field, and Stage 1d removes the
110    /// embedded client.
111    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
112    /// Optional handle to the same underlying backend viewed as a
113    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
114    /// Populated by [`Self::connect`] from the bundled
115    /// `ValkeyBackend` (which implements the trait); supplied by the
116    /// caller on [`Self::connect_with`] as an explicit
117    /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
118    /// backend does not support push-based completion" (e.g. a future
119    /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
120    /// and other completion-subscription consumers reach this through
121    /// [`Self::completion_backend`].
122    completion_backend_handle:
123        Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
124}
125
126/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
127/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
128/// O(num_flow_partitions).
129#[cfg(feature = "direct-valkey-claim")]
130const PARTITION_SCAN_CHUNK: usize = 32;
131
132impl FlowFabricWorker {
133    /// Connect to Valkey and prepare the worker.
134    ///
135    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
136    /// library — that is the server's responsibility (ff-server calls
137    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
138    /// the library is already loaded.
139    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
140        if config.lanes.is_empty() {
141            return Err(SdkError::Config {
142                context: "worker_config".into(),
143                field: None,
144                message: "at least one lane is required".into(),
145            });
146        }
147
148        let mut builder = ClientBuilder::new()
149            .host(&config.host, config.port)
150            .connect_timeout(Duration::from_secs(10))
151            .request_timeout(Duration::from_millis(5000));
152        if config.tls {
153            builder = builder.tls();
154        }
155        if config.cluster {
156            builder = builder.cluster();
157        }
158        let client = builder.build()
159            .await
160            .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
161
162        // Verify connectivity
163        let pong: String = client
164            .cmd("PING")
165            .execute()
166            .await
167            .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
168        if pong != "PONG" {
169            return Err(SdkError::Config {
170                context: "worker_connect".into(),
171                field: None,
172                message: format!("unexpected PING response: {pong}"),
173            });
174        }
175
176        // Guard against two worker processes sharing the same
177        // `worker_instance_id`. A duplicate instance would clobber each
178        // other's lease_current/active_index entries and double-claim work.
179        // SET NX on a liveness key with 2× lease TTL; if the key already
180        // exists another process is live. The key auto-expires if this
181        // process crashes without renewal, so a restart after a hard crash
182        // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
183        //
184        // Known limitations of this minimal scheme (documented for operators):
185        //   1. **Startup-only, not runtime.** There is no heartbeat renewal
186        //      path. After `2 × lease_ttl_ms` elapses the alive key expires
187        //      naturally even while this worker is still running, and a
188        //      second process with the same `worker_instance_id` launched
189        //      later will successfully SET NX alongside the first. The check
190        //      catches misconfiguration at boot; it does not fence duplicates
191        //      that appear mid-lifetime. Production deployments should rely
192        //      on the orchestrator (Kubernetes, systemd unit with
193        //      `Restart=on-failure`, etc.) as the authoritative single-
194        //      instance enforcer; this SET NX is belt-and-suspenders.
195        //
196        //   2. **Restart delay after a crash.** If a worker crashes
197        //      ungracefully (SIGKILL, container OOM) and is restarted within
198        //      `2 × lease_ttl_ms`, the alive key is still present and the
199        //      new process exits with `SdkError::Config("duplicate
200        //      worker_instance_id ...")`. Options for operators:
201        //        - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
202        //          before restarting.
203        //        - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
204        //          unblock the restart.
205        //        - Use a fresh `worker_instance_id` for the restart (the
206        //          orchestrator should already do this per-Pod).
207        //
208        //   3. **No graceful cleanup on shutdown.** There is no explicit
209        //      `disconnect()` call that DELs the alive key. On clean
210        //      `SIGTERM` the key lingers until its TTL expires. A follow-up
211        //      can add `FlowFabricWorker::disconnect(self)` for callers that
212        //      want to skip the restart-delay window.
213        let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
214        let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
215        let set_result: Option<String> = client
216            .cmd("SET")
217            .arg(&alive_key)
218            .arg("1")
219            .arg("NX")
220            .arg("PX")
221            .arg(alive_ttl_ms.to_string().as_str())
222            .execute()
223            .await
224            .map_err(|e| SdkError::ValkeyContext {
225                source: e,
226                context: "SET NX worker alive key".into(),
227            })?;
228        if set_result.is_none() {
229            return Err(SdkError::Config {
230                context: "worker_connect".into(),
231                field: Some("worker_instance_id".into()),
232                message: format!(
233                    "duplicate worker_instance_id '{}': another process already holds {alive_key}",
234                    config.worker_instance_id
235                ),
236            });
237        }
238
239        // Read partition config from Valkey (set by ff-server on startup).
240        // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
241        let partition_config = read_partition_config(&client).await
242            .unwrap_or_else(|e| {
243                tracing::warn!(
244                    error = %e,
245                    "ff:config:partitions not found, using defaults"
246                );
247                PartitionConfig::default()
248            });
249
250        let max_tasks = config.max_concurrent_tasks.max(1);
251        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
252
253        tracing::info!(
254            worker_id = %config.worker_id,
255            instance_id = %config.worker_instance_id,
256            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
257            "FlowFabricWorker connected"
258        );
259
260        #[cfg(feature = "direct-valkey-claim")]
261        let scan_cursor_init = scan_cursor_seed(
262            config.worker_instance_id.as_str(),
263            partition_config.num_flow_partitions.max(1) as usize,
264        );
265
266        // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
267        // and deduplicates in one pass; string joining happens once here.
268        //
269        // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
270        //   - `,` is the CSV delimiter; a token containing one would split
271        //     mid-parse and could let a {"gpu"} worker appear to satisfy
272        //     {"gpu,cuda"} (silent auth bypass).
273        //   - Empty strings would produce leading/adjacent commas on the
274        //     wire and inflate token count for no semantic reason.
275        //   - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
276        //     `"gpu\n"` vs `"gpu"` produce silent mismatches that are
277        //     miserable to debug. Reject anything outside printable ASCII
278        //     excluding space (`'!'..='~'`) at ingress so a typo fails
279        //     loudly at connect instead of silently mis-routing forever.
280        // Reject at boot so operator misconfig is loud, symmetric with the
281        // scheduler path.
282        #[cfg(feature = "direct-valkey-claim")]
283        for cap in &config.capabilities {
284            if cap.is_empty() {
285                return Err(SdkError::Config {
286                    context: "worker_config".into(),
287                    field: Some("capabilities".into()),
288                    message: "capability token must not be empty".into(),
289                });
290            }
291            if cap.contains(',') {
292                return Err(SdkError::Config {
293                    context: "worker_config".into(),
294                    field: Some("capabilities".into()),
295                    message: format!(
296                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
297                    ),
298                });
299            }
300            // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
301            // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
302            // characters above 0x7F are ALLOWED so i18n caps like
303            // "东京-gpu" can be used. The CSV wire form is byte-safe for
304            // multibyte UTF-8 because `,` is always a single byte and
305            // never part of a multibyte continuation (only 0x80-0xBF are
306            // continuations, ',' is 0x2C).
307            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
308                return Err(SdkError::Config {
309                    context: "worker_config".into(),
310                    field: Some("capabilities".into()),
311                    message: format!(
312                        "capability token must not contain whitespace or control \
313                         characters: {cap:?}"
314                    ),
315                });
316            }
317        }
318        #[cfg(feature = "direct-valkey-claim")]
319        let worker_capabilities_csv: String = {
320            let set: std::collections::BTreeSet<&str> = config
321                .capabilities
322                .iter()
323                .map(|s| s.as_str())
324                .filter(|s| !s.is_empty())
325                .collect();
326            if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
327                return Err(SdkError::Config {
328                    context: "worker_config".into(),
329                    field: Some("capabilities".into()),
330                    message: format!(
331                        "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
332                        ff_core::policy::CAPS_MAX_TOKENS,
333                        set.len()
334                    ),
335                });
336            }
337            let csv = set.into_iter().collect::<Vec<_>>().join(",");
338            if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
339                return Err(SdkError::Config {
340                    context: "worker_config".into(),
341                    field: Some("capabilities".into()),
342                    message: format!(
343                        "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
344                        ff_core::policy::CAPS_MAX_BYTES,
345                        csv.len()
346                    ),
347                });
348            }
349            csv
350        };
351
352        // Short stable digest of the sorted caps CSV, computed once so
353        // per-mismatch logs carry a stable identifier instead of the 4KB
354        // CSV. Shared helper — ff-scheduler uses the same one for its
355        // own per-mismatch logs, so cross-component log lines are
356        // diffable against each other.
357        #[cfg(feature = "direct-valkey-claim")]
358        let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
359
360        // Full CSV logged once at connect so per-mismatch logs (which
361        // carry only the 8-hex hash) can be cross-referenced by ops.
362        #[cfg(feature = "direct-valkey-claim")]
363        if !worker_capabilities_csv.is_empty() {
364            tracing::info!(
365                worker_instance_id = %config.worker_instance_id,
366                worker_caps_hash = %worker_capabilities_hash,
367                worker_caps = %worker_capabilities_csv,
368                "worker connected with capabilities (full CSV — mismatch logs use hash only)"
369            );
370        }
371
372        // Non-authoritative advertisement of caps for operator visibility
373        // (CLI introspection, dashboards). The AUTHORITATIVE source for
374        // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
375        // that, never this string. Lossy here is correctness-safe.
376        //
377        // Storage: a single STRING key holding the sorted CSV. Rationale:
378        //   * **Atomic overwrite.** `SET` is a single command — a concurrent
379        //     reader can never observe a transient empty value (the prior
380        //     DEL+SADD pair had that window).
381        //   * **Crash cleanup without refresh loop.** The alive-key SET NX
382        //     is startup-only (see §1 above); there's no periodic renew
383        //     to piggy-back on, so a TTL on caps would independently
384        //     expire mid-flight and hide a live worker's caps from ops
385        //     tools. Instead we drop the TTL: each reconnect overwrites;
386        //     a crashed worker leaves a stale CSV until a new process
387        //     with the same worker_instance_id boots (which triggers
388        //     `duplicate worker_instance_id` via alive-key guard anyway —
389        //     the orchestrator allocates a new id, and operators can DEL
390        //     the stale caps key if they care).
391        //   * **Empty caps = DEL.** A restart from {gpu} to {} clears the
392        //     advertisement rather than leaving stale data.
393        // Cluster-safe advertisement: the per-worker caps STRING lives at
394        // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
395        // and the INSTANCE ID is SADD'd to the global workers-index SET
396        // `ff:idx:workers` (single slot). The unblock scanner's cluster
397        // enumeration uses SMEMBERS on the index + per-member GET on each
398        // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
399        // cluster mode only scans the shard the SCAN lands on and misses
400        // workers whose key hashes elsewhere). Pattern mirrors Batch A
401        // `budget_policies_index` / `flow_index` / `deps_all_edges`:
402        // operations stay atomic per command, the index is the
403        // cluster-wide enumeration surface.
404        #[cfg(feature = "direct-valkey-claim")]
405        {
406            let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
407            let index_key = ff_core::keys::workers_index_key();
408            let instance_id = config.worker_instance_id.to_string();
409            if worker_capabilities_csv.is_empty() {
410                // No caps advertised. DEL the per-worker caps string AND
411                // SREM from the index so the scanner doesn't GET an empty
412                // string for a worker that never declares caps.
413                let _ = client
414                    .cmd("DEL")
415                    .arg(&caps_key)
416                    .execute::<Option<i64>>()
417                    .await;
418                if let Err(e) = client
419                    .cmd("SREM")
420                    .arg(&index_key)
421                    .arg(&instance_id)
422                    .execute::<Option<i64>>()
423                    .await
424                {
425                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
426                        "SREM workers-index failed; continuing (non-authoritative)");
427                }
428            } else {
429                // Atomic overwrite of the caps STRING (one-command). Then
430                // SADD to the index (idempotent — re-running connect for
431                // the same id is a no-op at SADD level). The per-worker
432                // caps key is written BEFORE the index SADD so that when
433                // the scanner observes the id in the index, the caps key
434                // is guaranteed to resolve to a non-stale CSV (the reverse
435                // order would leak an index entry pointing at a stale or
436                // empty caps key during a narrow window).
437                if let Err(e) = client
438                    .cmd("SET")
439                    .arg(&caps_key)
440                    .arg(&worker_capabilities_csv)
441                    .execute::<Option<String>>()
442                    .await
443                {
444                    tracing::warn!(error = %e, key = %caps_key,
445                        "SET worker caps advertisement failed; continuing");
446                }
447                if let Err(e) = client
448                    .cmd("SADD")
449                    .arg(&index_key)
450                    .arg(&instance_id)
451                    .execute::<Option<i64>>()
452                    .await
453                {
454                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
455                        "SADD workers-index failed; continuing");
456                }
457            }
458        }
459
460        // RFC-012 Stage 1b: wrap the dialed client in a
461        // ValkeyBackend so `ClaimedTask`'s trait forwarders have
462        // something to call. `from_client_and_partitions` reuses the
463        // already-dialed client — no second connection.
464        // Share the concrete `Arc<ValkeyBackend>` across the two
465        // trait objects — one allocation, both accessors yield
466        // identity-equivalent handles.
467        let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
468            ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
469                client.clone(),
470                partition_config,
471            );
472        let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
473        let completion_backend_handle: Option<
474            Arc<dyn ff_core::completion_backend::CompletionBackend>,
475        > = Some(valkey_backend);
476
477        Ok(Self {
478            client,
479            config,
480            partition_config,
481            #[cfg(feature = "direct-valkey-claim")]
482            worker_capabilities_csv,
483            #[cfg(feature = "direct-valkey-claim")]
484            worker_capabilities_hash,
485            #[cfg(feature = "direct-valkey-claim")]
486            lane_index: AtomicUsize::new(0),
487            concurrency_semaphore,
488            #[cfg(feature = "direct-valkey-claim")]
489            scan_cursor: AtomicUsize::new(scan_cursor_init),
490            backend,
491            completion_backend_handle,
492        })
493    }
494
495    /// Store pre-built [`EngineBackend`] and (optional)
496    /// [`CompletionBackend`] handles on the worker. Builds the worker
497    /// via the legacy [`FlowFabricWorker::connect`] path first (so the
498    /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
499    /// paths still use is dialed), then replaces the default
500    /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
501    ///
502    /// The `completion` argument is explicit: 0.3.3 previously accepted
503    /// only `backend` and `completion_backend()` silently returned
504    /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
505    /// upcast to `Arc<dyn CompletionBackend>` without loss of
506    /// trait-object identity. 0.3.4 lets the caller decide.
507    ///
508    /// - `Some(arc)` — caller supplies a completion backend.
509    ///   [`Self::completion_backend`] returns `Some(clone)`.
510    /// - `None` — this backend does not support push-based completion
511    ///   (future Postgres backend without LISTEN/NOTIFY, test mocks).
512    ///   [`Self::completion_backend`] returns `None`.
513    ///
514    /// When the underlying backend implements both traits (as
515    /// `ValkeyBackend` does), pass the same `Arc` twice — the two
516    /// trait-object views share one allocation:
517    ///
518    /// ```rust,ignore
519    /// use std::sync::Arc;
520    /// use ff_backend_valkey::ValkeyBackend;
521    /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
522    ///
523    /// # async fn doc(worker_config: WorkerConfig,
524    /// #              backend_config: ff_backend_valkey::BackendConfig)
525    /// #     -> Result<(), ff_sdk::SdkError> {
526    /// // Valkey (completion supported):
527    /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
528    /// let worker = FlowFabricWorker::connect_with(
529    ///     worker_config,
530    ///     valkey.clone(),
531    ///     Some(valkey),
532    /// ).await?;
533    /// # Ok(()) }
534    /// ```
535    ///
536    /// Backend without completion support:
537    ///
538    /// ```rust,ignore
539    /// let worker = FlowFabricWorker::connect_with(
540    ///     worker_config,
541    ///     backend,
542    ///     None,
543    /// ).await?;
544    /// ```
545    ///
546    /// **Stage 1b scope — what the injected backend covers today.**
547    /// After this PR, `ClaimedTask`'s 8 migrated ops
548    /// (`renew_lease` / `update_progress` / `resume_signals` /
549    /// `delay_execution` / `move_to_waiting_children` /
550    /// `complete` / `cancel` / `fail`) route through the injected
551    /// backend. That's the first material use of `connect_with`: a
552    /// mock backend now genuinely sees the worker's per-task
553    /// write-surface calls. The 4 trait-shape-deferred ops
554    /// (`create_pending_waitpoint`, `append_frame`, `suspend`,
555    /// `report_usage`) still reach the embedded
556    /// `ferriskey::Client` directly until issue #117's trait
557    /// amendment lands; `claim_next` / `claim_from_grant` /
558    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
559    /// are Stage 1c hot-path work. Stage 1d removes the embedded
560    /// client entirely.
561    ///
562    /// Today's constructor is therefore NOT yet a drop-in way to swap
563    /// in a non-Valkey backend — it requires a reachable Valkey node
564    /// for the 4 deferred + hot-path ops. Tests that exercise only
565    /// the 8 migrated ops can run fully against a mock backend.
566    ///
567    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
568    /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
569    pub async fn connect_with(
570        config: WorkerConfig,
571        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
572        completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
573    ) -> Result<Self, SdkError> {
574        let mut worker = Self::connect(config).await?;
575        worker.backend = backend;
576        worker.completion_backend_handle = completion;
577        Ok(worker)
578    }
579
580    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
581    /// ops through.
582    ///
583    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
584    /// the `Option` wrapper is retained for API stability with the
585    /// Stage-1a shape. Stage 1c narrows the return type to
586    /// `&Arc<dyn EngineBackend>`.
587    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
588        Some(&self.backend)
589    }
590
591    /// Handle to the completion-event subscription backend, for
592    /// consumers that need to observe execution completions (DAG
593    /// reconcilers, tenant-isolated subscribers).
594    ///
595    /// Returns `Some` when the worker was built through
596    /// [`Self::connect`] on the default `valkey-default` feature
597    /// (the bundled `ValkeyBackend` implements
598    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
599    /// or via [`Self::connect_with`] with a `Some(..)` completion
600    /// handle. Returns `None` when the caller passed `None` to
601    /// [`Self::connect_with`] — i.e. the backend does not support
602    /// push-based completion streams (future Postgres without
603    /// LISTEN/NOTIFY, test mocks).
604    ///
605    /// The returned handle shares the same underlying allocation as
606    /// [`Self::backend`]; calls through it (e.g.
607    /// `subscribe_completions_filtered`) hit the same connection
608    /// the worker itself uses.
609    pub fn completion_backend(
610        &self,
611    ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
612        self.completion_backend_handle.clone()
613    }
614
615    /// Get a reference to the underlying ferriskey client.
616    pub fn client(&self) -> &Client {
617        &self.client
618    }
619
620    /// Get the worker config.
621    pub fn config(&self) -> &WorkerConfig {
622        &self.config
623    }
624
625    /// Get the server-published partition config this worker bound to at
626    /// `connect()`. Callers need this when computing partition hash-tags
627    /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
628    /// with the server's `num_flow_partitions` — using
629    /// `PartitionConfig::default()` assumes 256 partitions and silently
630    /// misses data on deployments with any other value.
631    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
632        &self.partition_config
633    }
634
635    /// Attempt to claim the next eligible execution.
636    ///
637    /// Phase 1 simplified claim flow:
638    /// 1. Pick a lane (round-robin across configured lanes)
639    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
640    /// 3. Claim the execution via `ff_claim_execution`
641    /// 4. Read execution payload + tags
642    /// 5. Return a [`ClaimedTask`] with auto lease renewal
643    ///
644    /// Gated behind the `direct-valkey-claim` feature — bypasses the
645    /// scheduler's budget / quota / capability admission checks. Enable
646    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
647    /// the scheduler hop would be measurement noise (benches) or when
648    /// the test harness needs a deterministic worker-local path. Prefer
649    /// the scheduler-routed HTTP claim path in production.
650    ///
651    /// # `None` semantics
652    ///
653    /// `Ok(None)` means **no work was found in the partition window this
654    /// poll covered**, not "the cluster is idle". Each call scans a chunk
655    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
656    /// `scan_cursor`; the cursor advances by that chunk size on every
657    /// invocation, so a worker covers every partition exactly once every
658    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
659    ///
660    /// Callers should treat `None` as "poll again soon" (typically after
661    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
662    /// time". Backing off too aggressively on `None` can starve workers
663    /// when work lives on partitions outside the current window.
664    ///
665    /// Returns `Err` on Valkey errors or script failures.
666    #[cfg(feature = "direct-valkey-claim")]
667    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
668        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
669        // try_acquire returns immediately — if no permits available, the worker
670        // is at capacity and should not claim more work.
671        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
672            Ok(p) => p,
673            Err(_) => return Ok(None), // At capacity — no claim attempted
674        };
675
676        let lane_id = self.next_lane();
677        let now = TimestampMs::now();
678
679        // Phase 1: We scan eligible executions directly by reading the eligible
680        // ZSET across execution partitions. In production the scheduler
681        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
682        // simplified inline claim.
683        //
684        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
685        // partitions starting at a rolling offset. This keeps idle Valkey
686        // load at O(chunk) per worker-second instead of O(num_partitions),
687        // and the worker-instance-seeded initial cursor spreads concurrent
688        // workers across different partition windows.
689        let num_partitions = self.partition_config.num_flow_partitions as usize;
690        if num_partitions == 0 {
691            return Ok(None);
692        }
693        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
694        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
695
696        for step in 0..chunk {
697            let partition_idx = ((start + step) % num_partitions) as u16;
698            let partition = ff_core::partition::Partition {
699                family: ff_core::partition::PartitionFamily::Execution,
700                index: partition_idx,
701            };
702            let idx = IndexKeys::new(&partition);
703            let eligible_key = idx.lane_eligible(&lane_id);
704
705            // ZRANGEBYSCORE to get the highest-priority eligible execution.
706            // Score format: -(priority * 1_000_000_000_000) + created_at_ms
707            // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
708            let result: Value = self
709                .client
710                .cmd("ZRANGEBYSCORE")
711                .arg(&eligible_key)
712                .arg("-inf")
713                .arg("+inf")
714                .arg("LIMIT")
715                .arg("0")
716                .arg("1")
717                .execute()
718                .await
719                .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
720
721            let execution_id_str = match extract_first_array_string(&result) {
722                Some(s) => s,
723                None => continue, // No eligible executions on this partition
724            };
725
726            let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
727                SdkError::from(ff_script::error::ScriptError::Parse {
728                    fcall: "claim_execution_from_eligible_set".into(),
729                    execution_id: None,
730                    message: format!("bad execution_id in eligible set: {e}"),
731                })
732            })?;
733
734            // Step 1: Issue claim grant
735            let grant_result = self
736                .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
737                .await;
738
739            match grant_result {
740                Ok(()) => {}
741                Err(SdkError::Engine(ref boxed))
742                    if matches!(
743                        **boxed,
744                        crate::EngineError::Validation {
745                            kind: crate::ValidationKind::CapabilityMismatch,
746                            ..
747                        }
748                    ) =>
749                {
750                    let missing = match &**boxed {
751                        crate::EngineError::Validation { detail, .. } => detail.clone(),
752                        _ => unreachable!(),
753                    };
754                    // Block-on-mismatch (RFC-009 §7.5) — parity with
755                    // ff-scheduler's Scheduler::claim_for_worker. Without
756                    // this, the inline-direct-claim path would hot-loop
757                    // on an unclaimable top-of-zset (every tick picks the
758                    // same execution, wastes an FCALL, logs, releases,
759                    // repeats). The scheduler-side unblock scanner
760                    // promotes blocked_route executions back to eligible
761                    // when a worker with matching caps registers.
762                    tracing::info!(
763                        execution_id = %execution_id,
764                        worker_id = %self.config.worker_id,
765                        worker_caps_hash = %self.worker_capabilities_hash,
766                        missing = %missing,
767                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
768                    );
769                    self.block_route(&execution_id, &lane_id, &partition, &idx).await;
770                    continue;
771                }
772                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
773                    tracing::debug!(
774                        execution_id = %execution_id,
775                        error = %e,
776                        "claim grant failed (retryable), trying next partition"
777                    );
778                    continue;
779                }
780                Err(e) => return Err(e),
781            }
782
783            // Step 2: Claim the execution
784            match self
785                .claim_execution(&execution_id, &lane_id, &partition, now)
786                .await
787            {
788                Ok(mut task) => {
789                    // Transfer concurrency permit to the task. When the task is
790                    // completed/failed/cancelled/dropped the permit returns to
791                    // the semaphore, allowing another claim.
792                    task.set_concurrency_permit(permit);
793                    return Ok(Some(task));
794                }
795                Err(SdkError::Engine(ref boxed))
796                    if matches!(
797                        **boxed,
798                        crate::EngineError::Contention(
799                            crate::ContentionKind::UseClaimResumedExecution
800                        )
801                    ) =>
802                {
803                    // Execution was resumed from suspension — attempt_interrupted.
804                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
805                    // which reuses the existing attempt instead of creating a new one.
806                    tracing::debug!(
807                        execution_id = %execution_id,
808                        "execution is resumed, using claim_resumed path"
809                    );
810                    match self
811                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
812                        .await
813                    {
814                        Ok(mut task) => {
815                            task.set_concurrency_permit(permit);
816                            return Ok(Some(task));
817                        }
818                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
819                            tracing::debug!(
820                                execution_id = %execution_id,
821                                error = %e2,
822                                "claim_resumed failed (retryable), trying next partition"
823                            );
824                            continue;
825                        }
826                        Err(e2) => return Err(e2),
827                    }
828                }
829                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
830                    tracing::debug!(
831                        execution_id = %execution_id,
832                        error = %e,
833                        "claim execution failed (retryable), trying next partition"
834                    );
835                    continue;
836                }
837                Err(e) => return Err(e),
838            }
839        }
840
841        // No eligible work found on any partition
842        Ok(None)
843    }
844
845    #[cfg(feature = "direct-valkey-claim")]
846    async fn issue_claim_grant(
847        &self,
848        execution_id: &ExecutionId,
849        lane_id: &LaneId,
850        partition: &ff_core::partition::Partition,
851        idx: &IndexKeys,
852    ) -> Result<(), SdkError> {
853        let ctx = ExecKeyContext::new(partition, execution_id);
854
855        // KEYS (3): exec_core, claim_grant_key, eligible_zset
856        let keys: Vec<String> = vec![
857            ctx.core(),
858            ctx.claim_grant(),
859            idx.lane_eligible(lane_id),
860        ];
861
862        // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
863        //           capability_hash, grant_ttl_ms, route_snapshot_json,
864        //           admission_summary, worker_capabilities_csv (sorted)
865        let args: Vec<String> = vec![
866            execution_id.to_string(),
867            self.config.worker_id.to_string(),
868            self.config.worker_instance_id.to_string(),
869            lane_id.to_string(),
870            String::new(), // capability_hash
871            "5000".to_owned(), // grant_ttl_ms (5 seconds)
872            String::new(), // route_snapshot_json
873            String::new(), // admission_summary
874            self.worker_capabilities_csv.clone(), // sorted CSV
875        ];
876
877        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
878        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
879
880        let raw: Value = self
881            .client
882            .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
883            .await
884            .map_err(SdkError::Valkey)?;
885
886        crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
887    }
888
889    /// Move an execution from the lane's eligible ZSET into its
890    /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
891    /// after a `CapabilityMismatch` reject — without this, the inline
892    /// direct-claim path would re-pick the same top-of-zset every tick
893    /// (same pattern the scheduler's block_candidate handles). The
894    /// engine's unblock scanner periodically promotes blocked_route
895    /// back to eligible once a worker with matching caps registers.
896    ///
897    /// Best-effort: transport or logical rejects (e.g. the execution
898    /// already went terminal between pick and block) are logged and the
899    /// outer loop simply `continue`s to the next partition. Parity with
900    /// ff-scheduler::Scheduler::block_candidate.
901    #[cfg(feature = "direct-valkey-claim")]
902    async fn block_route(
903        &self,
904        execution_id: &ExecutionId,
905        lane_id: &LaneId,
906        partition: &ff_core::partition::Partition,
907        idx: &IndexKeys,
908    ) {
909        let ctx = ExecKeyContext::new(partition, execution_id);
910        let core_key = ctx.core();
911        let eligible_key = idx.lane_eligible(lane_id);
912        let blocked_key = idx.lane_blocked_route(lane_id);
913        let eid_s = execution_id.to_string();
914        let now_ms = TimestampMs::now().0.to_string();
915
916        let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
917        let argv: [&str; 4] = [
918            &eid_s,
919            "waiting_for_capable_worker",
920            "no connected worker satisfies required_capabilities",
921            &now_ms,
922        ];
923
924        match self
925            .client
926            .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
927            .await
928        {
929            Ok(v) => {
930                // Parse Lua result so a logical reject (e.g. execution
931                // went terminal mid-flight) is visible — same fix we
932                // applied to ff-scheduler's block_candidate.
933                if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
934                    tracing::warn!(
935                        execution_id = %execution_id,
936                        error = %e,
937                        "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
938                         will re-evaluate"
939                    );
940                }
941            }
942            Err(e) => {
943                tracing::warn!(
944                    execution_id = %execution_id,
945                    error = %e,
946                    "SDK block_route: transport failure; eligible ZSET unchanged"
947                );
948            }
949        }
950    }
951
952    /// Low-level claim of a granted execution. Invokes
953    /// `ff_claim_execution` and returns a `ClaimedTask` with auto
954    /// lease renewal.
955    ///
956    /// Previously gated behind `direct-valkey-claim`; ungated so
957    /// the public [`claim_from_grant`] entry point can reuse the
958    /// same FCALL plumbing. The method stays private — external
959    /// callers use `claim_from_grant`.
960    ///
961    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
962    async fn claim_execution(
963        &self,
964        execution_id: &ExecutionId,
965        lane_id: &LaneId,
966        partition: &ff_core::partition::Partition,
967        _now: TimestampMs,
968    ) -> Result<ClaimedTask, SdkError> {
969        let ctx = ExecKeyContext::new(partition, execution_id);
970        let idx = IndexKeys::new(partition);
971
972        // Pre-read total_attempt_count from exec_core to derive next attempt index.
973        // The Lua uses total_attempt_count as the new index and dynamically builds
974        // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
975        // we pass the correct index for documentation/debugging.
976        let total_str: Option<String> = self.client
977            .cmd("HGET")
978            .arg(ctx.core())
979            .arg("total_attempt_count")
980            .execute()
981            .await
982            .unwrap_or(None);
983        let next_idx = total_str
984            .as_deref()
985            .and_then(|s| s.parse::<u32>().ok())
986            .unwrap_or(0);
987        let att_idx = AttemptIndex::new(next_idx);
988
989        let lease_id = LeaseId::new().to_string();
990        let attempt_id = AttemptId::new().to_string();
991        let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
992
993        // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
994        let keys: Vec<String> = vec![
995            ctx.core(),                                    // 1  exec_core
996            ctx.claim_grant(),                             // 2  claim_grant
997            idx.lane_eligible(lane_id),                    // 3  eligible_zset
998            idx.lease_expiry(),                            // 4  lease_expiry_zset
999            idx.worker_leases(&self.config.worker_instance_id), // 5  worker_leases
1000            ctx.attempt_hash(att_idx),                     // 6  attempt_hash (placeholder)
1001            ctx.attempt_usage(att_idx),                    // 7  attempt_usage (placeholder)
1002            ctx.attempt_policy(att_idx),                   // 8  attempt_policy (placeholder)
1003            ctx.attempts(),                                // 9  attempts_zset
1004            ctx.lease_current(),                           // 10 lease_current
1005            ctx.lease_history(),                           // 11 lease_history
1006            idx.lane_active(lane_id),                      // 12 active_index
1007            idx.attempt_timeout(),                         // 13 attempt_timeout_zset
1008            idx.execution_deadline(),                      // 14 execution_deadline_zset
1009        ];
1010
1011        // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
1012        let args: Vec<String> = vec![
1013            execution_id.to_string(),                      // 1  execution_id
1014            self.config.worker_id.to_string(),             // 2  worker_id
1015            self.config.worker_instance_id.to_string(),    // 3  worker_instance_id
1016            lane_id.to_string(),                           // 4  lane
1017            String::new(),                                 // 5  capability_hash
1018            lease_id.clone(),                              // 6  lease_id
1019            self.config.lease_ttl_ms.to_string(),          // 7  lease_ttl_ms
1020            renew_before_ms.to_string(),                   // 8  renew_before_ms
1021            attempt_id.clone(),                            // 9  attempt_id
1022            "{}".to_owned(),                               // 10 attempt_policy_json
1023            String::new(),                                 // 11 attempt_timeout_ms
1024            String::new(),                                 // 12 execution_deadline_at
1025        ];
1026
1027        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1028        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1029
1030        let raw: Value = self
1031            .client
1032            .fcall("ff_claim_execution", &key_refs, &arg_refs)
1033            .await
1034            .map_err(SdkError::Valkey)?;
1035
1036        // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
1037        //                      attempt_id, attempt_type, lease_expires_at}
1038        let arr = match &raw {
1039            Value::Array(arr) => arr,
1040            _ => {
1041                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1042                    fcall: "ff_claim_execution".into(),
1043                    execution_id: Some(execution_id.to_string()),
1044                    message: "expected Array".into(),
1045                }));
1046            }
1047        };
1048
1049        let status_code = match arr.first() {
1050            Some(Ok(Value::Int(n))) => *n,
1051            _ => {
1052                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1053                    fcall: "ff_claim_execution".into(),
1054                    execution_id: Some(execution_id.to_string()),
1055                    message: "bad status code".into(),
1056                }));
1057            }
1058        };
1059
1060        if status_code != 1 {
1061            let err_field_str = |idx: usize| -> String {
1062                arr.get(idx)
1063                    .and_then(|v| match v {
1064                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1065                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1066                        _ => None,
1067                    })
1068                    .unwrap_or_default()
1069            };
1070            let error_code = {
1071                let s = err_field_str(1);
1072                if s.is_empty() { "unknown".to_owned() } else { s }
1073            };
1074            let detail = err_field_str(2);
1075
1076            return Err(SdkError::from(
1077                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1078                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1079                        fcall: "ff_claim_execution".into(),
1080                        execution_id: Some(execution_id.to_string()),
1081                        message: format!("unknown error: {error_code}"),
1082                    }),
1083            ));
1084        }
1085
1086        // Extract fields from success response
1087        let field_str = |idx: usize| -> String {
1088            arr.get(idx + 2) // skip status_code and "OK"
1089                .and_then(|v| match v {
1090                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1091                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1092                    Ok(Value::Int(n)) => Some(n.to_string()),
1093                    _ => None,
1094                })
1095                .unwrap_or_default()
1096        };
1097
1098        // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1099        // Positions:       0         1       2           3            4              5
1100        let lease_id = LeaseId::parse(&field_str(0))
1101            .unwrap_or_else(|_| LeaseId::new());
1102        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1103        // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1104        let attempt_id = AttemptId::parse(&field_str(3))
1105            .unwrap_or_else(|_| AttemptId::new());
1106        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1107
1108        // Read execution payload and metadata
1109        let (input_payload, execution_kind, tags) = self
1110            .read_execution_context(execution_id, partition)
1111            .await?;
1112
1113        Ok(ClaimedTask::new(
1114            self.client.clone(),
1115            self.backend.clone(),
1116            self.partition_config,
1117            execution_id.clone(),
1118            attempt_index,
1119            attempt_id,
1120            lease_id,
1121            lease_epoch,
1122            self.config.lease_ttl_ms,
1123            lane_id.clone(),
1124            self.config.worker_instance_id.clone(),
1125            input_payload,
1126            execution_kind,
1127            tags,
1128        ))
1129    }
1130
1131    /// Consume a [`ClaimGrant`] and claim the granted execution on
1132    /// this worker. The intended production entry point: pair with
1133    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1134    /// scheduler-issued grants into the SDK without enabling the
1135    /// `direct-valkey-claim` feature (which bypasses budget/quota
1136    /// admission control).
1137    ///
1138    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1139    /// so a saturated worker does not consume the grant: the grant
1140    /// stays valid for its remaining TTL and the caller can either
1141    /// release it back to the scheduler or retry after some other
1142    /// in-flight task completes.
1143    ///
1144    /// On success the returned [`ClaimedTask`] holds a concurrency
1145    /// permit that releases automatically on
1146    /// `complete`/`fail`/`cancel`/drop — same contract as
1147    /// `claim_next`.
1148    ///
1149    /// # Arguments
1150    ///
1151    /// * `lane` — the lane the grant was issued for. Must match what
1152    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1153    ///   uses it to look up `lane_eligible`, `lane_active`, and the
1154    ///   `worker_leases` index slot.
1155    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1156    ///
1157    /// # Errors
1158    ///
1159    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1160    ///   permits all held. Retryable; the grant is untouched.
1161    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1162    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1163    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1164    ///   (wrapped in [`SdkError::Engine`]).
1165    /// * `ScriptError::CapabilityMismatch` — execution's required
1166    ///   capabilities not a subset of this worker's caps (wrapped in
1167    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
1168    ///   between grant issuance and caps change allows it.
1169    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1170    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
1171    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1172    ///   transport error during the FCALL or the
1173    ///   `read_execution_context` follow-up.
1174    ///
1175    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1176    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1177    pub async fn claim_from_grant(
1178        &self,
1179        lane: LaneId,
1180        grant: ff_core::contracts::ClaimGrant,
1181    ) -> Result<ClaimedTask, SdkError> {
1182        // Semaphore check FIRST. If the worker is saturated we must
1183        // surface the condition to the caller without touching the
1184        // grant — silently returning Ok(None) (as claim_next does)
1185        // would drop a grant the scheduler has already committed work
1186        // to issuing, wasting the slot until its TTL elapses.
1187        let permit = self
1188            .concurrency_semaphore
1189            .clone()
1190            .try_acquire_owned()
1191            .map_err(|_| SdkError::WorkerAtCapacity)?;
1192
1193        let now = TimestampMs::now();
1194        let partition = grant.partition().map_err(|e| SdkError::Config {
1195            context: "claim_from_grant".to_owned(),
1196            field: Some("partition_key".to_owned()),
1197            message: e.to_string(),
1198        })?;
1199        let mut task = self
1200            .claim_execution(&grant.execution_id, &lane, &partition, now)
1201            .await?;
1202        task.set_concurrency_permit(permit);
1203        Ok(task)
1204    }
1205
1206    /// Scheduler-routed claim: POST the server's
1207    /// `/v1/workers/{id}/claim`, then chain to
1208    /// [`Self::claim_from_grant`].
1209    ///
1210    /// Batch C item 2 PR-B. This is the production entry point —
1211    /// budget + quota + capability admission run server-side inside
1212    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1213    /// enable the `direct-valkey-claim` feature.
1214    ///
1215    /// Returns `Ok(None)` when the server says no eligible execution
1216    /// (HTTP 204). Callers typically back off by
1217    /// `config.claim_poll_interval_ms` and try again, same cadence
1218    /// as the direct-claim path's `Ok(None)`.
1219    ///
1220    /// The `admin` client is the established HTTP surface
1221    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1222    /// second reqwest client around. Build once at worker boot and
1223    /// hand in by reference on every claim.
1224    pub async fn claim_via_server(
1225        &self,
1226        admin: &crate::FlowFabricAdminClient,
1227        lane: &LaneId,
1228        grant_ttl_ms: u64,
1229    ) -> Result<Option<ClaimedTask>, SdkError> {
1230        let req = crate::admin::ClaimForWorkerRequest {
1231            worker_id: self.config.worker_id.to_string(),
1232            lane_id: lane.to_string(),
1233            worker_instance_id: self.config.worker_instance_id.to_string(),
1234            capabilities: self.config.capabilities.clone(),
1235            grant_ttl_ms,
1236        };
1237        let Some(resp) = admin.claim_for_worker(req).await? else {
1238            return Ok(None);
1239        };
1240        let grant = resp.into_grant()?;
1241        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1242    }
1243
1244    /// Consume a [`ReclaimGrant`] and transition the granted
1245    /// `attempt_interrupted` execution into a `started` state on this
1246    /// worker. Symmetric partner to [`claim_from_grant`] for the
1247    /// resume path.
1248    ///
1249    /// The grant must have been issued to THIS worker (matching
1250    /// `worker_id` at grant time). A mismatch returns
1251    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1252    /// atomically by `ff_claim_resumed_execution`; a second call with
1253    /// the same grant also returns `InvalidClaimGrant`.
1254    ///
1255    /// # Concurrency
1256    ///
1257    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1258    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1259    /// assume pre-existing capacity on this worker — a reclaim can
1260    /// land on a fresh worker instance that just came up after a
1261    /// crash/restart and is picking up a previously-interrupted
1262    /// execution. If the worker is saturated, the grant stays valid
1263    /// for its remaining TTL and the caller can release it or retry.
1264    ///
1265    /// On success the returned [`ClaimedTask`] holds a concurrency
1266    /// permit that releases automatically on
1267    /// `complete`/`fail`/`cancel`/drop.
1268    ///
1269    /// # Errors
1270    ///
1271    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1272    ///   permits all held. Retryable; the grant is untouched (no
1273    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1274    ///   atomically consume the grant key).
1275    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1276    ///   or `worker_id` mismatch.
1277    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1278    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1279    ///   `attempt_interrupted`.
1280    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1281    ///   not `runnable`.
1282    /// * `ScriptError::ExecutionNotFound` — core key missing.
1283    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1284    ///   transport.
1285    ///
1286    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1287    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1288    pub async fn claim_from_reclaim_grant(
1289        &self,
1290        grant: ff_core::contracts::ReclaimGrant,
1291    ) -> Result<ClaimedTask, SdkError> {
1292        // Semaphore check FIRST — same load-bearing ordering as
1293        // `claim_from_grant`. If the worker is saturated, surface
1294        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1295        // atomic consume on the grant key, so calling it past-
1296        // saturation would destroy the grant while leaving no
1297        // permit to attach to the returned `ClaimedTask`.
1298        let permit = self
1299            .concurrency_semaphore
1300            .clone()
1301            .try_acquire_owned()
1302            .map_err(|_| SdkError::WorkerAtCapacity)?;
1303
1304        // Grant carries partition + lane_id so no round-trip is needed
1305        // to resolve them before the FCALL.
1306        let partition = grant.partition().map_err(|e| SdkError::Config {
1307            context: "claim_from_reclaim_grant".to_owned(),
1308            field: Some("partition_key".to_owned()),
1309            message: e.to_string(),
1310        })?;
1311        let mut task = self
1312            .claim_resumed_execution(
1313                &grant.execution_id,
1314                &grant.lane_id,
1315                &partition,
1316            )
1317            .await?;
1318        task.set_concurrency_permit(permit);
1319        Ok(task)
1320    }
1321
1322    /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1323    /// and returns a `ClaimedTask` bound to the resumed attempt.
1324    ///
1325    /// Previously gated behind `direct-valkey-claim`; ungated so the
1326    /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1327    /// The method stays private — external callers use
1328    /// `claim_from_reclaim_grant`.
1329    ///
1330    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1331    async fn claim_resumed_execution(
1332        &self,
1333        execution_id: &ExecutionId,
1334        lane_id: &LaneId,
1335        partition: &ff_core::partition::Partition,
1336    ) -> Result<ClaimedTask, SdkError> {
1337        let ctx = ExecKeyContext::new(partition, execution_id);
1338        let idx = IndexKeys::new(partition);
1339
1340        // Pre-read current_attempt_index for the existing attempt hash key.
1341        // This is load-bearing: KEYS[6] must point to the real attempt hash.
1342        let att_idx_str: Option<String> = self.client
1343            .cmd("HGET")
1344            .arg(ctx.core())
1345            .arg("current_attempt_index")
1346            .execute()
1347            .await
1348            .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1349        let att_idx = AttemptIndex::new(
1350            att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1351        );
1352
1353        let lease_id = LeaseId::new().to_string();
1354
1355        // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1356        let keys: Vec<String> = vec![
1357            ctx.core(),                                             // 1  exec_core
1358            ctx.claim_grant(),                                      // 2  claim_grant
1359            idx.lane_eligible(lane_id),                             // 3  eligible_zset
1360            idx.lease_expiry(),                                     // 4  lease_expiry_zset
1361            idx.worker_leases(&self.config.worker_instance_id),     // 5  worker_leases
1362            ctx.attempt_hash(att_idx),                              // 6  existing_attempt_hash
1363            ctx.lease_current(),                                    // 7  lease_current
1364            ctx.lease_history(),                                    // 8  lease_history
1365            idx.lane_active(lane_id),                               // 9  active_index
1366            idx.attempt_timeout(),                                  // 10 attempt_timeout_zset
1367            idx.execution_deadline(),                               // 11 execution_deadline_zset
1368        ];
1369
1370        // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1371        let args: Vec<String> = vec![
1372            execution_id.to_string(),                               // 1  execution_id
1373            self.config.worker_id.to_string(),                      // 2  worker_id
1374            self.config.worker_instance_id.to_string(),             // 3  worker_instance_id
1375            lane_id.to_string(),                                    // 4  lane
1376            String::new(),                                          // 5  capability_hash
1377            lease_id.clone(),                                       // 6  lease_id
1378            self.config.lease_ttl_ms.to_string(),                   // 7  lease_ttl_ms
1379            String::new(),                                          // 8  remaining_attempt_timeout_ms
1380        ];
1381
1382        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1383        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1384
1385        let raw: Value = self
1386            .client
1387            .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1388            .await
1389            .map_err(SdkError::Valkey)?;
1390
1391        // Parse result — same format as ff_claim_execution:
1392        // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1393        let arr = match &raw {
1394            Value::Array(arr) => arr,
1395            _ => {
1396                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1397                    fcall: "ff_claim_resumed_execution".into(),
1398                    execution_id: Some(execution_id.to_string()),
1399                    message: "expected Array".into(),
1400                }));
1401            }
1402        };
1403
1404        let status_code = match arr.first() {
1405            Some(Ok(Value::Int(n))) => *n,
1406            _ => {
1407                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1408                    fcall: "ff_claim_resumed_execution".into(),
1409                    execution_id: Some(execution_id.to_string()),
1410                    message: "bad status code".into(),
1411                }));
1412            }
1413        };
1414
1415        if status_code != 1 {
1416            let err_field_str = |idx: usize| -> String {
1417                arr.get(idx)
1418                    .and_then(|v| match v {
1419                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1420                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1421                        _ => None,
1422                    })
1423                    .unwrap_or_default()
1424            };
1425            let error_code = {
1426                let s = err_field_str(1);
1427                if s.is_empty() { "unknown".to_owned() } else { s }
1428            };
1429            let detail = err_field_str(2);
1430
1431            return Err(SdkError::from(
1432                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1433                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1434                        fcall: "ff_claim_resumed_execution".into(),
1435                        execution_id: Some(execution_id.to_string()),
1436                        message: format!("unknown error: {error_code}"),
1437                    }),
1438            ));
1439        }
1440
1441        let field_str = |idx: usize| -> String {
1442            arr.get(idx + 2)
1443                .and_then(|v| match v {
1444                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1445                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1446                    Ok(Value::Int(n)) => Some(n.to_string()),
1447                    _ => None,
1448                })
1449                .unwrap_or_default()
1450        };
1451
1452        let lease_id = LeaseId::parse(&field_str(0))
1453            .unwrap_or_else(|_| LeaseId::new());
1454        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1455        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1456        let attempt_id = AttemptId::parse(&field_str(3))
1457            .unwrap_or_else(|_| AttemptId::new());
1458
1459        let (input_payload, execution_kind, tags) = self
1460            .read_execution_context(execution_id, partition)
1461            .await?;
1462
1463        Ok(ClaimedTask::new(
1464            self.client.clone(),
1465            self.backend.clone(),
1466            self.partition_config,
1467            execution_id.clone(),
1468            attempt_index,
1469            attempt_id,
1470            lease_id,
1471            lease_epoch,
1472            self.config.lease_ttl_ms,
1473            lane_id.clone(),
1474            self.config.worker_instance_id.clone(),
1475            input_payload,
1476            execution_kind,
1477            tags,
1478        ))
1479    }
1480
1481    /// Read payload + execution_kind + tags from exec_core. Previously
1482    /// gated behind `direct-valkey-claim`; now shared by the
1483    /// feature-gated inline claim path and the public
1484    /// `claim_from_reclaim_grant` entry point.
1485    async fn read_execution_context(
1486        &self,
1487        execution_id: &ExecutionId,
1488        partition: &ff_core::partition::Partition,
1489    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1490        let ctx = ExecKeyContext::new(partition, execution_id);
1491
1492        // Read payload
1493        let payload: Option<String> = self
1494            .client
1495            .get(&ctx.payload())
1496            .await
1497            .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1498        let input_payload = payload.unwrap_or_default().into_bytes();
1499
1500        // Read execution_kind from core
1501        let kind: Option<String> = self
1502            .client
1503            .hget(&ctx.core(), "execution_kind")
1504            .await
1505            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1506        let execution_kind = kind.unwrap_or_default();
1507
1508        // Read tags
1509        let tags: HashMap<String, String> = self
1510            .client
1511            .hgetall(&ctx.tags())
1512            .await
1513            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1514
1515        Ok((input_payload, execution_kind, tags))
1516    }
1517
1518    // ── Phase 3: Signal delivery ──
1519
1520    /// Deliver a signal to a suspended execution's waitpoint.
1521    ///
1522    /// The engine atomically records the signal, evaluates the resume condition,
1523    /// and optionally transitions the execution from `suspended` to `runnable`.
1524    pub async fn deliver_signal(
1525        &self,
1526        execution_id: &ExecutionId,
1527        waitpoint_id: &WaitpointId,
1528        signal: crate::task::Signal,
1529    ) -> Result<crate::task::SignalOutcome, SdkError> {
1530        let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1531        let ctx = ExecKeyContext::new(&partition, execution_id);
1532        let idx = IndexKeys::new(&partition);
1533
1534        let signal_id = ff_core::types::SignalId::new();
1535        let now = TimestampMs::now();
1536
1537        // Pre-read lane_id from exec_core — the execution may be on any lane,
1538        // not necessarily one of this worker's configured lanes.
1539        let lane_str: Option<String> = self
1540            .client
1541            .hget(&ctx.core(), "lane_id")
1542            .await
1543            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1544        let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1545
1546        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1547        //            exec_signals_zset, signal_hash, signal_payload,
1548        //            idem_key, waitpoint_hash, suspension_current,
1549        //            eligible_zset, suspended_zset, delayed_zset,
1550        //            suspension_timeout_zset, hmac_secrets
1551        let idem_key = if let Some(ref ik) = signal.idempotency_key {
1552            ctx.signal_dedup(waitpoint_id, ik)
1553        } else {
1554            ctx.noop() // must share {p:N} hash tag for cluster mode
1555        };
1556        let keys: Vec<String> = vec![
1557            ctx.core(),                                    // 1
1558            ctx.waitpoint_condition(waitpoint_id),         // 2
1559            ctx.waitpoint_signals(waitpoint_id),           // 3
1560            ctx.exec_signals(),                            // 4
1561            ctx.signal(&signal_id),                        // 5
1562            ctx.signal_payload(&signal_id),                // 6
1563            idem_key,                                      // 7
1564            ctx.waitpoint(waitpoint_id),                   // 8
1565            ctx.suspension_current(),                      // 9
1566            idx.lane_eligible(&lane_id),                   // 10
1567            idx.lane_suspended(&lane_id),                  // 11
1568            idx.lane_delayed(&lane_id),                    // 12
1569            idx.suspension_timeout(),                      // 13
1570            idx.waitpoint_hmac_secrets(),                  // 14
1571        ];
1572
1573        let payload_str = signal
1574            .payload
1575            .as_ref()
1576            .map(|p| String::from_utf8_lossy(p).into_owned())
1577            .unwrap_or_default();
1578
1579        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1580        //            signal_category, source_type, source_identity,
1581        //            payload, payload_encoding, idempotency_key,
1582        //            correlation_id, target_scope, created_at,
1583        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1584        //            max_signals_per_execution, waitpoint_token
1585        let args: Vec<String> = vec![
1586            signal_id.to_string(),                           // 1
1587            execution_id.to_string(),                        // 2
1588            waitpoint_id.to_string(),                        // 3
1589            signal.signal_name,                              // 4
1590            signal.signal_category,                          // 5
1591            signal.source_type,                              // 6
1592            signal.source_identity,                          // 7
1593            payload_str,                                     // 8
1594            "json".to_owned(),                               // 9 payload_encoding
1595            signal.idempotency_key.unwrap_or_default(),      // 10
1596            String::new(),                                   // 11 correlation_id
1597            "waitpoint".to_owned(),                          // 12 target_scope
1598            now.to_string(),                                 // 13 created_at
1599            "86400000".to_owned(),                           // 14 dedup_ttl_ms
1600            "0".to_owned(),                                  // 15 resume_delay_ms
1601            "1000".to_owned(),                               // 16 signal_maxlen
1602            "10000".to_owned(),                              // 17 max_signals
1603            // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1604            // use ToString/Display (those are redacted for log safety);
1605            // .as_str() is the explicit opt-in that gets the secret bytes.
1606            signal.waitpoint_token.as_str().to_owned(),      // 18 waitpoint_token
1607        ];
1608
1609        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1610        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1611
1612        let raw: Value = self
1613            .client
1614            .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1615            .await
1616            .map_err(SdkError::Valkey)?;
1617
1618        crate::task::parse_signal_result(&raw)
1619    }
1620
1621    #[cfg(feature = "direct-valkey-claim")]
1622    fn next_lane(&self) -> LaneId {
1623        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1624        self.config.lanes[idx].clone()
1625    }
1626}
1627
1628#[cfg(feature = "direct-valkey-claim")]
1629fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1630    use ff_core::error::ErrorClass;
1631    matches!(
1632        ff_script::engine_error_ext::class(err),
1633        ErrorClass::Retryable | ErrorClass::Informational
1634    )
1635}
1636
1637/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1638/// instance id with FNV-1a to place distinct worker processes on different
1639/// partition windows from their first poll. Zero is valid for single-worker
1640/// clusters but spreads work in multi-worker deployments.
1641#[cfg(feature = "direct-valkey-claim")]
1642fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1643    if num_partitions == 0 {
1644        return 0;
1645    }
1646    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1647}
1648
1649#[cfg(feature = "direct-valkey-claim")]
1650fn extract_first_array_string(value: &Value) -> Option<String> {
1651    match value {
1652        Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1653            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1654            Ok(Value::SimpleString(s)) => Some(s.clone()),
1655            _ => None,
1656        },
1657        _ => None,
1658    }
1659}
1660
1661/// Read partition config from Valkey's `ff:config:partitions` hash.
1662/// Returns Err if the key doesn't exist or can't be read.
1663async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1664    let key = ff_core::keys::global_config_partitions();
1665    let fields: HashMap<String, String> = client
1666        .hgetall(&key)
1667        .await
1668        .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1669
1670    if fields.is_empty() {
1671        return Err(SdkError::Config {
1672            context: "read_partition_config".into(),
1673            field: None,
1674            message: "ff:config:partitions not found in Valkey".into(),
1675        });
1676    }
1677
1678    let parse = |field: &str, default: u16| -> u16 {
1679        fields
1680            .get(field)
1681            .and_then(|v| v.parse().ok())
1682            .filter(|&n: &u16| n > 0)
1683            .unwrap_or(default)
1684    };
1685
1686    Ok(PartitionConfig {
1687        num_flow_partitions: parse("num_flow_partitions", 256),
1688        num_budget_partitions: parse("num_budget_partitions", 32),
1689        num_quota_partitions: parse("num_quota_partitions", 32),
1690    })
1691}
1692
1693#[cfg(test)]
1694mod completion_accessor_type_tests {
1695    //! Type-level compile check that
1696    //! [`FlowFabricWorker::completion_backend`] returns an
1697    //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1698    //! the assertion is at the function-pointer type level and the
1699    //! #[test] body exists solely so the compiler elaborates it.
1700    use super::FlowFabricWorker;
1701    use ff_core::completion_backend::CompletionBackend;
1702    use std::sync::Arc;
1703
1704    #[test]
1705    fn completion_backend_accessor_signature() {
1706        // If this line compiles, the public accessor returns the
1707        // advertised type. The function is never called (no live
1708        // worker), so no I/O happens.
1709        let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1710            FlowFabricWorker::completion_backend;
1711    }
1712}
1713
1714#[cfg(all(test, feature = "direct-valkey-claim"))]
1715mod scan_cursor_tests {
1716    use super::scan_cursor_seed;
1717
1718    #[test]
1719    fn stable_for_same_input() {
1720        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1721    }
1722
1723    #[test]
1724    fn distinct_for_different_ids() {
1725        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1726    }
1727
1728    #[test]
1729    fn bounded_by_partition_count() {
1730        for i in 0..100 {
1731            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1732        }
1733    }
1734
1735    #[test]
1736    fn zero_partitions_returns_zero() {
1737        assert_eq!(scan_cursor_seed("w1", 0), 0);
1738    }
1739}