Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43///     if let Some(task) = worker.claim_next().await? {
44///         // Process task...
45///         task.complete(Some(b"result".to_vec())).await?;
46///     } else {
47///         tokio::time::sleep(Duration::from_secs(1)).await;
48///     }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52    client: Client,
53    config: WorkerConfig,
54    partition_config: PartitionConfig,
55    /// Sorted, deduplicated, comma-separated capabilities — computed once
56    /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57    /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58    /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59    /// reproducible logs and tests.
60    #[cfg(feature = "direct-valkey-claim")]
61    worker_capabilities_csv: String,
62    /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63    /// per-mismatch logs so the 4KB CSV never echoes on every reject
64    /// during an incident. Full CSV logged once at connect-time WARN for
65    /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66    #[cfg(feature = "direct-valkey-claim")]
67    worker_capabilities_hash: String,
68    #[cfg(feature = "direct-valkey-claim")]
69    lane_index: AtomicUsize,
70    /// Concurrency cap for in-flight tasks. Permits are acquired or
71    /// transferred by [`claim_next`] (feature-gated),
72    /// [`claim_from_grant`] (always available), and
73    /// [`claim_from_reclaim_grant`], transferred to the returned
74    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75    /// Holds `max_concurrent_tasks` permits total.
76    ///
77    /// [`claim_next`]: FlowFabricWorker::claim_next
78    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80    concurrency_semaphore: Arc<Semaphore>,
81    /// Rolling offset for chunked partition scans. Each poll advances the
82    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83    /// chunk)` polls every partition is covered. The initial value is
84    /// derived from `worker_instance_id` so idle workers spread their
85    /// scans across different partitions from the first poll onward.
86    ///
87    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91    /// correctness because the sequence simply restarts a new cycle.
92    #[cfg(feature = "direct-valkey-claim")]
93    scan_cursor: AtomicUsize,
94    /// The [`EngineBackend`] the Stage-1b trait forwarders route
95    /// through.
96    ///
97    /// **RFC-012 Stage 1b.** Always populated:
98    /// [`FlowFabricWorker::connect`] now wraps the worker's own
99    /// `ferriskey::Client` in a `ValkeyBackend` via
100    /// `ValkeyBackend::from_client_and_partitions`, and
101    /// [`FlowFabricWorker::connect_with`] replaces that default with
102    /// the caller-supplied `Arc<dyn EngineBackend>`. The
103    /// [`FlowFabricWorker::backend`] accessor still returns
104    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
105    /// narrows the return type once consumers have migrated.
106    ///
107    /// Hot paths (claim, deliver_signal, admin queries) still use the
108    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
109    /// migrates them through this field, and Stage 1d removes the
110    /// embedded client.
111    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
112    /// Optional upcast to the same underlying backend viewed as a
113    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
114    /// Populated by [`Self::connect`] from the bundled
115    /// `ValkeyBackend` (which implements the trait); cleared by
116    /// [`Self::connect_with`] because a caller-supplied
117    /// `Arc<dyn EngineBackend>` cannot be re-upcast. Cairn and other
118    /// completion-subscription consumers reach this through
119    /// [`Self::completion_backend`].
120    completion_backend_handle:
121        Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
122}
123
124/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
125/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
126/// O(num_flow_partitions).
127#[cfg(feature = "direct-valkey-claim")]
128const PARTITION_SCAN_CHUNK: usize = 32;
129
130impl FlowFabricWorker {
131    /// Connect to Valkey and prepare the worker.
132    ///
133    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
134    /// library — that is the server's responsibility (ff-server calls
135    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
136    /// the library is already loaded.
137    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
138        if config.lanes.is_empty() {
139            return Err(SdkError::Config {
140                context: "worker_config".into(),
141                field: None,
142                message: "at least one lane is required".into(),
143            });
144        }
145
146        let mut builder = ClientBuilder::new()
147            .host(&config.host, config.port)
148            .connect_timeout(Duration::from_secs(10))
149            .request_timeout(Duration::from_millis(5000));
150        if config.tls {
151            builder = builder.tls();
152        }
153        if config.cluster {
154            builder = builder.cluster();
155        }
156        let client = builder.build()
157            .await
158            .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
159
160        // Verify connectivity
161        let pong: String = client
162            .cmd("PING")
163            .execute()
164            .await
165            .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
166        if pong != "PONG" {
167            return Err(SdkError::Config {
168                context: "worker_connect".into(),
169                field: None,
170                message: format!("unexpected PING response: {pong}"),
171            });
172        }
173
174        // Guard against two worker processes sharing the same
175        // `worker_instance_id`. A duplicate instance would clobber each
176        // other's lease_current/active_index entries and double-claim work.
177        // SET NX on a liveness key with 2× lease TTL; if the key already
178        // exists another process is live. The key auto-expires if this
179        // process crashes without renewal, so a restart after a hard crash
180        // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
181        //
182        // Known limitations of this minimal scheme (documented for operators):
183        //   1. **Startup-only, not runtime.** There is no heartbeat renewal
184        //      path. After `2 × lease_ttl_ms` elapses the alive key expires
185        //      naturally even while this worker is still running, and a
186        //      second process with the same `worker_instance_id` launched
187        //      later will successfully SET NX alongside the first. The check
188        //      catches misconfiguration at boot; it does not fence duplicates
189        //      that appear mid-lifetime. Production deployments should rely
190        //      on the orchestrator (Kubernetes, systemd unit with
191        //      `Restart=on-failure`, etc.) as the authoritative single-
192        //      instance enforcer; this SET NX is belt-and-suspenders.
193        //
194        //   2. **Restart delay after a crash.** If a worker crashes
195        //      ungracefully (SIGKILL, container OOM) and is restarted within
196        //      `2 × lease_ttl_ms`, the alive key is still present and the
197        //      new process exits with `SdkError::Config("duplicate
198        //      worker_instance_id ...")`. Options for operators:
199        //        - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
200        //          before restarting.
201        //        - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
202        //          unblock the restart.
203        //        - Use a fresh `worker_instance_id` for the restart (the
204        //          orchestrator should already do this per-Pod).
205        //
206        //   3. **No graceful cleanup on shutdown.** There is no explicit
207        //      `disconnect()` call that DELs the alive key. On clean
208        //      `SIGTERM` the key lingers until its TTL expires. A follow-up
209        //      can add `FlowFabricWorker::disconnect(self)` for callers that
210        //      want to skip the restart-delay window.
211        let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
212        let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
213        let set_result: Option<String> = client
214            .cmd("SET")
215            .arg(&alive_key)
216            .arg("1")
217            .arg("NX")
218            .arg("PX")
219            .arg(alive_ttl_ms.to_string().as_str())
220            .execute()
221            .await
222            .map_err(|e| SdkError::ValkeyContext {
223                source: e,
224                context: "SET NX worker alive key".into(),
225            })?;
226        if set_result.is_none() {
227            return Err(SdkError::Config {
228                context: "worker_connect".into(),
229                field: Some("worker_instance_id".into()),
230                message: format!(
231                    "duplicate worker_instance_id '{}': another process already holds {alive_key}",
232                    config.worker_instance_id
233                ),
234            });
235        }
236
237        // Read partition config from Valkey (set by ff-server on startup).
238        // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
239        let partition_config = read_partition_config(&client).await
240            .unwrap_or_else(|e| {
241                tracing::warn!(
242                    error = %e,
243                    "ff:config:partitions not found, using defaults"
244                );
245                PartitionConfig::default()
246            });
247
248        let max_tasks = config.max_concurrent_tasks.max(1);
249        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
250
251        tracing::info!(
252            worker_id = %config.worker_id,
253            instance_id = %config.worker_instance_id,
254            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
255            "FlowFabricWorker connected"
256        );
257
258        #[cfg(feature = "direct-valkey-claim")]
259        let scan_cursor_init = scan_cursor_seed(
260            config.worker_instance_id.as_str(),
261            partition_config.num_flow_partitions.max(1) as usize,
262        );
263
264        // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
265        // and deduplicates in one pass; string joining happens once here.
266        //
267        // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
268        //   - `,` is the CSV delimiter; a token containing one would split
269        //     mid-parse and could let a {"gpu"} worker appear to satisfy
270        //     {"gpu,cuda"} (silent auth bypass).
271        //   - Empty strings would produce leading/adjacent commas on the
272        //     wire and inflate token count for no semantic reason.
273        //   - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
274        //     `"gpu\n"` vs `"gpu"` produce silent mismatches that are
275        //     miserable to debug. Reject anything outside printable ASCII
276        //     excluding space (`'!'..='~'`) at ingress so a typo fails
277        //     loudly at connect instead of silently mis-routing forever.
278        // Reject at boot so operator misconfig is loud, symmetric with the
279        // scheduler path.
280        #[cfg(feature = "direct-valkey-claim")]
281        for cap in &config.capabilities {
282            if cap.is_empty() {
283                return Err(SdkError::Config {
284                    context: "worker_config".into(),
285                    field: Some("capabilities".into()),
286                    message: "capability token must not be empty".into(),
287                });
288            }
289            if cap.contains(',') {
290                return Err(SdkError::Config {
291                    context: "worker_config".into(),
292                    field: Some("capabilities".into()),
293                    message: format!(
294                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
295                    ),
296                });
297            }
298            // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
299            // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
300            // characters above 0x7F are ALLOWED so i18n caps like
301            // "东京-gpu" can be used. The CSV wire form is byte-safe for
302            // multibyte UTF-8 because `,` is always a single byte and
303            // never part of a multibyte continuation (only 0x80-0xBF are
304            // continuations, ',' is 0x2C).
305            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
306                return Err(SdkError::Config {
307                    context: "worker_config".into(),
308                    field: Some("capabilities".into()),
309                    message: format!(
310                        "capability token must not contain whitespace or control \
311                         characters: {cap:?}"
312                    ),
313                });
314            }
315        }
316        #[cfg(feature = "direct-valkey-claim")]
317        let worker_capabilities_csv: String = {
318            let set: std::collections::BTreeSet<&str> = config
319                .capabilities
320                .iter()
321                .map(|s| s.as_str())
322                .filter(|s| !s.is_empty())
323                .collect();
324            if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
325                return Err(SdkError::Config {
326                    context: "worker_config".into(),
327                    field: Some("capabilities".into()),
328                    message: format!(
329                        "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
330                        ff_core::policy::CAPS_MAX_TOKENS,
331                        set.len()
332                    ),
333                });
334            }
335            let csv = set.into_iter().collect::<Vec<_>>().join(",");
336            if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
337                return Err(SdkError::Config {
338                    context: "worker_config".into(),
339                    field: Some("capabilities".into()),
340                    message: format!(
341                        "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
342                        ff_core::policy::CAPS_MAX_BYTES,
343                        csv.len()
344                    ),
345                });
346            }
347            csv
348        };
349
350        // Short stable digest of the sorted caps CSV, computed once so
351        // per-mismatch logs carry a stable identifier instead of the 4KB
352        // CSV. Shared helper — ff-scheduler uses the same one for its
353        // own per-mismatch logs, so cross-component log lines are
354        // diffable against each other.
355        #[cfg(feature = "direct-valkey-claim")]
356        let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
357
358        // Full CSV logged once at connect so per-mismatch logs (which
359        // carry only the 8-hex hash) can be cross-referenced by ops.
360        #[cfg(feature = "direct-valkey-claim")]
361        if !worker_capabilities_csv.is_empty() {
362            tracing::info!(
363                worker_instance_id = %config.worker_instance_id,
364                worker_caps_hash = %worker_capabilities_hash,
365                worker_caps = %worker_capabilities_csv,
366                "worker connected with capabilities (full CSV — mismatch logs use hash only)"
367            );
368        }
369
370        // Non-authoritative advertisement of caps for operator visibility
371        // (CLI introspection, dashboards). The AUTHORITATIVE source for
372        // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
373        // that, never this string. Lossy here is correctness-safe.
374        //
375        // Storage: a single STRING key holding the sorted CSV. Rationale:
376        //   * **Atomic overwrite.** `SET` is a single command — a concurrent
377        //     reader can never observe a transient empty value (the prior
378        //     DEL+SADD pair had that window).
379        //   * **Crash cleanup without refresh loop.** The alive-key SET NX
380        //     is startup-only (see §1 above); there's no periodic renew
381        //     to piggy-back on, so a TTL on caps would independently
382        //     expire mid-flight and hide a live worker's caps from ops
383        //     tools. Instead we drop the TTL: each reconnect overwrites;
384        //     a crashed worker leaves a stale CSV until a new process
385        //     with the same worker_instance_id boots (which triggers
386        //     `duplicate worker_instance_id` via alive-key guard anyway —
387        //     the orchestrator allocates a new id, and operators can DEL
388        //     the stale caps key if they care).
389        //   * **Empty caps = DEL.** A restart from {gpu} to {} clears the
390        //     advertisement rather than leaving stale data.
391        // Cluster-safe advertisement: the per-worker caps STRING lives at
392        // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
393        // and the INSTANCE ID is SADD'd to the global workers-index SET
394        // `ff:idx:workers` (single slot). The unblock scanner's cluster
395        // enumeration uses SMEMBERS on the index + per-member GET on each
396        // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
397        // cluster mode only scans the shard the SCAN lands on and misses
398        // workers whose key hashes elsewhere). Pattern mirrors Batch A
399        // `budget_policies_index` / `flow_index` / `deps_all_edges`:
400        // operations stay atomic per command, the index is the
401        // cluster-wide enumeration surface.
402        #[cfg(feature = "direct-valkey-claim")]
403        {
404            let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
405            let index_key = ff_core::keys::workers_index_key();
406            let instance_id = config.worker_instance_id.to_string();
407            if worker_capabilities_csv.is_empty() {
408                // No caps advertised. DEL the per-worker caps string AND
409                // SREM from the index so the scanner doesn't GET an empty
410                // string for a worker that never declares caps.
411                let _ = client
412                    .cmd("DEL")
413                    .arg(&caps_key)
414                    .execute::<Option<i64>>()
415                    .await;
416                if let Err(e) = client
417                    .cmd("SREM")
418                    .arg(&index_key)
419                    .arg(&instance_id)
420                    .execute::<Option<i64>>()
421                    .await
422                {
423                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
424                        "SREM workers-index failed; continuing (non-authoritative)");
425                }
426            } else {
427                // Atomic overwrite of the caps STRING (one-command). Then
428                // SADD to the index (idempotent — re-running connect for
429                // the same id is a no-op at SADD level). The per-worker
430                // caps key is written BEFORE the index SADD so that when
431                // the scanner observes the id in the index, the caps key
432                // is guaranteed to resolve to a non-stale CSV (the reverse
433                // order would leak an index entry pointing at a stale or
434                // empty caps key during a narrow window).
435                if let Err(e) = client
436                    .cmd("SET")
437                    .arg(&caps_key)
438                    .arg(&worker_capabilities_csv)
439                    .execute::<Option<String>>()
440                    .await
441                {
442                    tracing::warn!(error = %e, key = %caps_key,
443                        "SET worker caps advertisement failed; continuing");
444                }
445                if let Err(e) = client
446                    .cmd("SADD")
447                    .arg(&index_key)
448                    .arg(&instance_id)
449                    .execute::<Option<i64>>()
450                    .await
451                {
452                    tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
453                        "SADD workers-index failed; continuing");
454                }
455            }
456        }
457
458        // RFC-012 Stage 1b: wrap the dialed client in a
459        // ValkeyBackend so `ClaimedTask`'s trait forwarders have
460        // something to call. `from_client_and_partitions` reuses the
461        // already-dialed client — no second connection.
462        // Share the concrete `Arc<ValkeyBackend>` across the two
463        // trait objects — one allocation, both accessors yield
464        // identity-equivalent handles.
465        let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
466            ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
467                client.clone(),
468                partition_config,
469            );
470        let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
471        let completion_backend_handle: Option<
472            Arc<dyn ff_core::completion_backend::CompletionBackend>,
473        > = Some(valkey_backend);
474
475        Ok(Self {
476            client,
477            config,
478            partition_config,
479            #[cfg(feature = "direct-valkey-claim")]
480            worker_capabilities_csv,
481            #[cfg(feature = "direct-valkey-claim")]
482            worker_capabilities_hash,
483            #[cfg(feature = "direct-valkey-claim")]
484            lane_index: AtomicUsize::new(0),
485            concurrency_semaphore,
486            #[cfg(feature = "direct-valkey-claim")]
487            scan_cursor: AtomicUsize::new(scan_cursor_init),
488            backend,
489            completion_backend_handle,
490        })
491    }
492
493    /// Store a pre-built [`EngineBackend`] on the worker. Builds
494    /// the worker via the legacy [`FlowFabricWorker::connect`] path
495    /// first (so the embedded `ferriskey::Client` that the Stage 1b
496    /// non-migrated hot paths still use is dialed), then replaces
497    /// the default `ValkeyBackend` wrapper with the caller-supplied
498    /// `Arc<dyn EngineBackend>`.
499    ///
500    /// **Stage 1b scope — what the injected backend covers today.**
501    /// After this PR, `ClaimedTask`'s 8 migrated ops
502    /// (`renew_lease` / `update_progress` / `resume_signals` /
503    /// `delay_execution` / `move_to_waiting_children` /
504    /// `complete` / `cancel` / `fail`) route through the injected
505    /// backend. That's the first material use of `connect_with`: a
506    /// mock backend now genuinely sees the worker's per-task
507    /// write-surface calls. The 4 trait-shape-deferred ops
508    /// (`create_pending_waitpoint`, `append_frame`, `suspend`,
509    /// `report_usage`) still reach the embedded
510    /// `ferriskey::Client` directly until issue #117's trait
511    /// amendment lands; `claim_next` / `claim_from_grant` /
512    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
513    /// are Stage 1c hot-path work. Stage 1d removes the embedded
514    /// client entirely.
515    ///
516    /// Today's constructor is therefore NOT yet a drop-in way to swap
517    /// in a non-Valkey backend — it requires a reachable Valkey node
518    /// for the 4 deferred + hot-path ops. Tests that exercise only
519    /// the 8 migrated ops can run fully against a mock backend.
520    ///
521    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
522    pub async fn connect_with(
523        config: WorkerConfig,
524        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
525    ) -> Result<Self, SdkError> {
526        let mut worker = Self::connect(config).await?;
527        worker.backend = backend;
528        // The caller-supplied trait object cannot be upcast to
529        // `CompletionBackend` without loss of identity; clear the
530        // default `ValkeyBackend`-derived handle so
531        // `completion_backend()` reports `None`, reflecting that
532        // the caller-supplied backend owns the surface now.
533        worker.completion_backend_handle = None;
534        Ok(worker)
535    }
536
537    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
538    /// ops through.
539    ///
540    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
541    /// the `Option` wrapper is retained for API stability with the
542    /// Stage-1a shape. Stage 1c narrows the return type to
543    /// `&Arc<dyn EngineBackend>`.
544    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
545        Some(&self.backend)
546    }
547
548    /// Handle to the completion-event subscription backend, for
549    /// consumers that need to observe execution completions (DAG
550    /// reconcilers, tenant-isolated subscribers).
551    ///
552    /// Returns `Some` when the worker was built through
553    /// [`Self::connect`] on the default `valkey-default` feature
554    /// (the bundled `ValkeyBackend` implements
555    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)).
556    /// Returns `None` when the worker was built via
557    /// [`Self::connect_with`] (the caller-supplied
558    /// `Arc<dyn EngineBackend>` cannot be re-upcast to
559    /// `CompletionBackend`), or for hypothetical future backends
560    /// that don't support push-based completion streams.
561    ///
562    /// The returned handle shares the same underlying allocation as
563    /// [`Self::backend`]; calls through it (e.g.
564    /// `subscribe_completions_filtered`) hit the same connection
565    /// the worker itself uses.
566    pub fn completion_backend(
567        &self,
568    ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
569        self.completion_backend_handle.clone()
570    }
571
572    /// Get a reference to the underlying ferriskey client.
573    pub fn client(&self) -> &Client {
574        &self.client
575    }
576
577    /// Get the worker config.
578    pub fn config(&self) -> &WorkerConfig {
579        &self.config
580    }
581
582    /// Get the server-published partition config this worker bound to at
583    /// `connect()`. Callers need this when computing partition hash-tags
584    /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
585    /// with the server's `num_flow_partitions` — using
586    /// `PartitionConfig::default()` assumes 256 partitions and silently
587    /// misses data on deployments with any other value.
588    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
589        &self.partition_config
590    }
591
592    /// Attempt to claim the next eligible execution.
593    ///
594    /// Phase 1 simplified claim flow:
595    /// 1. Pick a lane (round-robin across configured lanes)
596    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
597    /// 3. Claim the execution via `ff_claim_execution`
598    /// 4. Read execution payload + tags
599    /// 5. Return a [`ClaimedTask`] with auto lease renewal
600    ///
601    /// Gated behind the `direct-valkey-claim` feature — bypasses the
602    /// scheduler's budget / quota / capability admission checks. Enable
603    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
604    /// the scheduler hop would be measurement noise (benches) or when
605    /// the test harness needs a deterministic worker-local path. Prefer
606    /// the scheduler-routed HTTP claim path in production.
607    ///
608    /// # `None` semantics
609    ///
610    /// `Ok(None)` means **no work was found in the partition window this
611    /// poll covered**, not "the cluster is idle". Each call scans a chunk
612    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
613    /// `scan_cursor`; the cursor advances by that chunk size on every
614    /// invocation, so a worker covers every partition exactly once every
615    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
616    ///
617    /// Callers should treat `None` as "poll again soon" (typically after
618    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
619    /// time". Backing off too aggressively on `None` can starve workers
620    /// when work lives on partitions outside the current window.
621    ///
622    /// Returns `Err` on Valkey errors or script failures.
623    #[cfg(feature = "direct-valkey-claim")]
624    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
625        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
626        // try_acquire returns immediately — if no permits available, the worker
627        // is at capacity and should not claim more work.
628        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
629            Ok(p) => p,
630            Err(_) => return Ok(None), // At capacity — no claim attempted
631        };
632
633        let lane_id = self.next_lane();
634        let now = TimestampMs::now();
635
636        // Phase 1: We scan eligible executions directly by reading the eligible
637        // ZSET across execution partitions. In production the scheduler
638        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
639        // simplified inline claim.
640        //
641        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
642        // partitions starting at a rolling offset. This keeps idle Valkey
643        // load at O(chunk) per worker-second instead of O(num_partitions),
644        // and the worker-instance-seeded initial cursor spreads concurrent
645        // workers across different partition windows.
646        let num_partitions = self.partition_config.num_flow_partitions as usize;
647        if num_partitions == 0 {
648            return Ok(None);
649        }
650        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
651        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
652
653        for step in 0..chunk {
654            let partition_idx = ((start + step) % num_partitions) as u16;
655            let partition = ff_core::partition::Partition {
656                family: ff_core::partition::PartitionFamily::Execution,
657                index: partition_idx,
658            };
659            let idx = IndexKeys::new(&partition);
660            let eligible_key = idx.lane_eligible(&lane_id);
661
662            // ZRANGEBYSCORE to get the highest-priority eligible execution.
663            // Score format: -(priority * 1_000_000_000_000) + created_at_ms
664            // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
665            let result: Value = self
666                .client
667                .cmd("ZRANGEBYSCORE")
668                .arg(&eligible_key)
669                .arg("-inf")
670                .arg("+inf")
671                .arg("LIMIT")
672                .arg("0")
673                .arg("1")
674                .execute()
675                .await
676                .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
677
678            let execution_id_str = match extract_first_array_string(&result) {
679                Some(s) => s,
680                None => continue, // No eligible executions on this partition
681            };
682
683            let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
684                SdkError::from(ff_script::error::ScriptError::Parse {
685                    fcall: "claim_execution_from_eligible_set".into(),
686                    execution_id: None,
687                    message: format!("bad execution_id in eligible set: {e}"),
688                })
689            })?;
690
691            // Step 1: Issue claim grant
692            let grant_result = self
693                .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
694                .await;
695
696            match grant_result {
697                Ok(()) => {}
698                Err(SdkError::Engine(ref boxed))
699                    if matches!(
700                        **boxed,
701                        crate::EngineError::Validation {
702                            kind: crate::ValidationKind::CapabilityMismatch,
703                            ..
704                        }
705                    ) =>
706                {
707                    let missing = match &**boxed {
708                        crate::EngineError::Validation { detail, .. } => detail.clone(),
709                        _ => unreachable!(),
710                    };
711                    // Block-on-mismatch (RFC-009 §7.5) — parity with
712                    // ff-scheduler's Scheduler::claim_for_worker. Without
713                    // this, the inline-direct-claim path would hot-loop
714                    // on an unclaimable top-of-zset (every tick picks the
715                    // same execution, wastes an FCALL, logs, releases,
716                    // repeats). The scheduler-side unblock scanner
717                    // promotes blocked_route executions back to eligible
718                    // when a worker with matching caps registers.
719                    tracing::info!(
720                        execution_id = %execution_id,
721                        worker_id = %self.config.worker_id,
722                        worker_caps_hash = %self.worker_capabilities_hash,
723                        missing = %missing,
724                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
725                    );
726                    self.block_route(&execution_id, &lane_id, &partition, &idx).await;
727                    continue;
728                }
729                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
730                    tracing::debug!(
731                        execution_id = %execution_id,
732                        error = %e,
733                        "claim grant failed (retryable), trying next partition"
734                    );
735                    continue;
736                }
737                Err(e) => return Err(e),
738            }
739
740            // Step 2: Claim the execution
741            match self
742                .claim_execution(&execution_id, &lane_id, &partition, now)
743                .await
744            {
745                Ok(mut task) => {
746                    // Transfer concurrency permit to the task. When the task is
747                    // completed/failed/cancelled/dropped the permit returns to
748                    // the semaphore, allowing another claim.
749                    task.set_concurrency_permit(permit);
750                    return Ok(Some(task));
751                }
752                Err(SdkError::Engine(ref boxed))
753                    if matches!(
754                        **boxed,
755                        crate::EngineError::Contention(
756                            crate::ContentionKind::UseClaimResumedExecution
757                        )
758                    ) =>
759                {
760                    // Execution was resumed from suspension — attempt_interrupted.
761                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
762                    // which reuses the existing attempt instead of creating a new one.
763                    tracing::debug!(
764                        execution_id = %execution_id,
765                        "execution is resumed, using claim_resumed path"
766                    );
767                    match self
768                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
769                        .await
770                    {
771                        Ok(mut task) => {
772                            task.set_concurrency_permit(permit);
773                            return Ok(Some(task));
774                        }
775                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
776                            tracing::debug!(
777                                execution_id = %execution_id,
778                                error = %e2,
779                                "claim_resumed failed (retryable), trying next partition"
780                            );
781                            continue;
782                        }
783                        Err(e2) => return Err(e2),
784                    }
785                }
786                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
787                    tracing::debug!(
788                        execution_id = %execution_id,
789                        error = %e,
790                        "claim execution failed (retryable), trying next partition"
791                    );
792                    continue;
793                }
794                Err(e) => return Err(e),
795            }
796        }
797
798        // No eligible work found on any partition
799        Ok(None)
800    }
801
802    #[cfg(feature = "direct-valkey-claim")]
803    async fn issue_claim_grant(
804        &self,
805        execution_id: &ExecutionId,
806        lane_id: &LaneId,
807        partition: &ff_core::partition::Partition,
808        idx: &IndexKeys,
809    ) -> Result<(), SdkError> {
810        let ctx = ExecKeyContext::new(partition, execution_id);
811
812        // KEYS (3): exec_core, claim_grant_key, eligible_zset
813        let keys: Vec<String> = vec![
814            ctx.core(),
815            ctx.claim_grant(),
816            idx.lane_eligible(lane_id),
817        ];
818
819        // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
820        //           capability_hash, grant_ttl_ms, route_snapshot_json,
821        //           admission_summary, worker_capabilities_csv (sorted)
822        let args: Vec<String> = vec![
823            execution_id.to_string(),
824            self.config.worker_id.to_string(),
825            self.config.worker_instance_id.to_string(),
826            lane_id.to_string(),
827            String::new(), // capability_hash
828            "5000".to_owned(), // grant_ttl_ms (5 seconds)
829            String::new(), // route_snapshot_json
830            String::new(), // admission_summary
831            self.worker_capabilities_csv.clone(), // sorted CSV
832        ];
833
834        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
835        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
836
837        let raw: Value = self
838            .client
839            .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
840            .await
841            .map_err(SdkError::Valkey)?;
842
843        crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
844    }
845
846    /// Move an execution from the lane's eligible ZSET into its
847    /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
848    /// after a `CapabilityMismatch` reject — without this, the inline
849    /// direct-claim path would re-pick the same top-of-zset every tick
850    /// (same pattern the scheduler's block_candidate handles). The
851    /// engine's unblock scanner periodically promotes blocked_route
852    /// back to eligible once a worker with matching caps registers.
853    ///
854    /// Best-effort: transport or logical rejects (e.g. the execution
855    /// already went terminal between pick and block) are logged and the
856    /// outer loop simply `continue`s to the next partition. Parity with
857    /// ff-scheduler::Scheduler::block_candidate.
858    #[cfg(feature = "direct-valkey-claim")]
859    async fn block_route(
860        &self,
861        execution_id: &ExecutionId,
862        lane_id: &LaneId,
863        partition: &ff_core::partition::Partition,
864        idx: &IndexKeys,
865    ) {
866        let ctx = ExecKeyContext::new(partition, execution_id);
867        let core_key = ctx.core();
868        let eligible_key = idx.lane_eligible(lane_id);
869        let blocked_key = idx.lane_blocked_route(lane_id);
870        let eid_s = execution_id.to_string();
871        let now_ms = TimestampMs::now().0.to_string();
872
873        let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
874        let argv: [&str; 4] = [
875            &eid_s,
876            "waiting_for_capable_worker",
877            "no connected worker satisfies required_capabilities",
878            &now_ms,
879        ];
880
881        match self
882            .client
883            .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
884            .await
885        {
886            Ok(v) => {
887                // Parse Lua result so a logical reject (e.g. execution
888                // went terminal mid-flight) is visible — same fix we
889                // applied to ff-scheduler's block_candidate.
890                if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
891                    tracing::warn!(
892                        execution_id = %execution_id,
893                        error = %e,
894                        "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
895                         will re-evaluate"
896                    );
897                }
898            }
899            Err(e) => {
900                tracing::warn!(
901                    execution_id = %execution_id,
902                    error = %e,
903                    "SDK block_route: transport failure; eligible ZSET unchanged"
904                );
905            }
906        }
907    }
908
909    /// Low-level claim of a granted execution. Invokes
910    /// `ff_claim_execution` and returns a `ClaimedTask` with auto
911    /// lease renewal.
912    ///
913    /// Previously gated behind `direct-valkey-claim`; ungated so
914    /// the public [`claim_from_grant`] entry point can reuse the
915    /// same FCALL plumbing. The method stays private — external
916    /// callers use `claim_from_grant`.
917    ///
918    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
919    async fn claim_execution(
920        &self,
921        execution_id: &ExecutionId,
922        lane_id: &LaneId,
923        partition: &ff_core::partition::Partition,
924        _now: TimestampMs,
925    ) -> Result<ClaimedTask, SdkError> {
926        let ctx = ExecKeyContext::new(partition, execution_id);
927        let idx = IndexKeys::new(partition);
928
929        // Pre-read total_attempt_count from exec_core to derive next attempt index.
930        // The Lua uses total_attempt_count as the new index and dynamically builds
931        // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
932        // we pass the correct index for documentation/debugging.
933        let total_str: Option<String> = self.client
934            .cmd("HGET")
935            .arg(ctx.core())
936            .arg("total_attempt_count")
937            .execute()
938            .await
939            .unwrap_or(None);
940        let next_idx = total_str
941            .as_deref()
942            .and_then(|s| s.parse::<u32>().ok())
943            .unwrap_or(0);
944        let att_idx = AttemptIndex::new(next_idx);
945
946        let lease_id = LeaseId::new().to_string();
947        let attempt_id = AttemptId::new().to_string();
948        let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
949
950        // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
951        let keys: Vec<String> = vec![
952            ctx.core(),                                    // 1  exec_core
953            ctx.claim_grant(),                             // 2  claim_grant
954            idx.lane_eligible(lane_id),                    // 3  eligible_zset
955            idx.lease_expiry(),                            // 4  lease_expiry_zset
956            idx.worker_leases(&self.config.worker_instance_id), // 5  worker_leases
957            ctx.attempt_hash(att_idx),                     // 6  attempt_hash (placeholder)
958            ctx.attempt_usage(att_idx),                    // 7  attempt_usage (placeholder)
959            ctx.attempt_policy(att_idx),                   // 8  attempt_policy (placeholder)
960            ctx.attempts(),                                // 9  attempts_zset
961            ctx.lease_current(),                           // 10 lease_current
962            ctx.lease_history(),                           // 11 lease_history
963            idx.lane_active(lane_id),                      // 12 active_index
964            idx.attempt_timeout(),                         // 13 attempt_timeout_zset
965            idx.execution_deadline(),                      // 14 execution_deadline_zset
966        ];
967
968        // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
969        let args: Vec<String> = vec![
970            execution_id.to_string(),                      // 1  execution_id
971            self.config.worker_id.to_string(),             // 2  worker_id
972            self.config.worker_instance_id.to_string(),    // 3  worker_instance_id
973            lane_id.to_string(),                           // 4  lane
974            String::new(),                                 // 5  capability_hash
975            lease_id.clone(),                              // 6  lease_id
976            self.config.lease_ttl_ms.to_string(),          // 7  lease_ttl_ms
977            renew_before_ms.to_string(),                   // 8  renew_before_ms
978            attempt_id.clone(),                            // 9  attempt_id
979            "{}".to_owned(),                               // 10 attempt_policy_json
980            String::new(),                                 // 11 attempt_timeout_ms
981            String::new(),                                 // 12 execution_deadline_at
982        ];
983
984        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
985        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
986
987        let raw: Value = self
988            .client
989            .fcall("ff_claim_execution", &key_refs, &arg_refs)
990            .await
991            .map_err(SdkError::Valkey)?;
992
993        // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
994        //                      attempt_id, attempt_type, lease_expires_at}
995        let arr = match &raw {
996            Value::Array(arr) => arr,
997            _ => {
998                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
999                    fcall: "ff_claim_execution".into(),
1000                    execution_id: Some(execution_id.to_string()),
1001                    message: "expected Array".into(),
1002                }));
1003            }
1004        };
1005
1006        let status_code = match arr.first() {
1007            Some(Ok(Value::Int(n))) => *n,
1008            _ => {
1009                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1010                    fcall: "ff_claim_execution".into(),
1011                    execution_id: Some(execution_id.to_string()),
1012                    message: "bad status code".into(),
1013                }));
1014            }
1015        };
1016
1017        if status_code != 1 {
1018            let err_field_str = |idx: usize| -> String {
1019                arr.get(idx)
1020                    .and_then(|v| match v {
1021                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1022                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1023                        _ => None,
1024                    })
1025                    .unwrap_or_default()
1026            };
1027            let error_code = {
1028                let s = err_field_str(1);
1029                if s.is_empty() { "unknown".to_owned() } else { s }
1030            };
1031            let detail = err_field_str(2);
1032
1033            return Err(SdkError::from(
1034                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1035                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1036                        fcall: "ff_claim_execution".into(),
1037                        execution_id: Some(execution_id.to_string()),
1038                        message: format!("unknown error: {error_code}"),
1039                    }),
1040            ));
1041        }
1042
1043        // Extract fields from success response
1044        let field_str = |idx: usize| -> String {
1045            arr.get(idx + 2) // skip status_code and "OK"
1046                .and_then(|v| match v {
1047                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1048                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1049                    Ok(Value::Int(n)) => Some(n.to_string()),
1050                    _ => None,
1051                })
1052                .unwrap_or_default()
1053        };
1054
1055        // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1056        // Positions:       0         1       2           3            4              5
1057        let lease_id = LeaseId::parse(&field_str(0))
1058            .unwrap_or_else(|_| LeaseId::new());
1059        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1060        // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1061        let attempt_id = AttemptId::parse(&field_str(3))
1062            .unwrap_or_else(|_| AttemptId::new());
1063        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1064
1065        // Read execution payload and metadata
1066        let (input_payload, execution_kind, tags) = self
1067            .read_execution_context(execution_id, partition)
1068            .await?;
1069
1070        Ok(ClaimedTask::new(
1071            self.client.clone(),
1072            self.backend.clone(),
1073            self.partition_config,
1074            execution_id.clone(),
1075            attempt_index,
1076            attempt_id,
1077            lease_id,
1078            lease_epoch,
1079            self.config.lease_ttl_ms,
1080            lane_id.clone(),
1081            self.config.worker_instance_id.clone(),
1082            input_payload,
1083            execution_kind,
1084            tags,
1085        ))
1086    }
1087
1088    /// Consume a [`ClaimGrant`] and claim the granted execution on
1089    /// this worker. The intended production entry point: pair with
1090    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1091    /// scheduler-issued grants into the SDK without enabling the
1092    /// `direct-valkey-claim` feature (which bypasses budget/quota
1093    /// admission control).
1094    ///
1095    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1096    /// so a saturated worker does not consume the grant: the grant
1097    /// stays valid for its remaining TTL and the caller can either
1098    /// release it back to the scheduler or retry after some other
1099    /// in-flight task completes.
1100    ///
1101    /// On success the returned [`ClaimedTask`] holds a concurrency
1102    /// permit that releases automatically on
1103    /// `complete`/`fail`/`cancel`/drop — same contract as
1104    /// `claim_next`.
1105    ///
1106    /// # Arguments
1107    ///
1108    /// * `lane` — the lane the grant was issued for. Must match what
1109    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1110    ///   uses it to look up `lane_eligible`, `lane_active`, and the
1111    ///   `worker_leases` index slot.
1112    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1113    ///
1114    /// # Errors
1115    ///
1116    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1117    ///   permits all held. Retryable; the grant is untouched.
1118    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1119    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1120    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1121    ///   (wrapped in [`SdkError::Engine`]).
1122    /// * `ScriptError::CapabilityMismatch` — execution's required
1123    ///   capabilities not a subset of this worker's caps (wrapped in
1124    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
1125    ///   between grant issuance and caps change allows it.
1126    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1127    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
1128    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1129    ///   transport error during the FCALL or the
1130    ///   `read_execution_context` follow-up.
1131    ///
1132    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1133    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1134    pub async fn claim_from_grant(
1135        &self,
1136        lane: LaneId,
1137        grant: ff_core::contracts::ClaimGrant,
1138    ) -> Result<ClaimedTask, SdkError> {
1139        // Semaphore check FIRST. If the worker is saturated we must
1140        // surface the condition to the caller without touching the
1141        // grant — silently returning Ok(None) (as claim_next does)
1142        // would drop a grant the scheduler has already committed work
1143        // to issuing, wasting the slot until its TTL elapses.
1144        let permit = self
1145            .concurrency_semaphore
1146            .clone()
1147            .try_acquire_owned()
1148            .map_err(|_| SdkError::WorkerAtCapacity)?;
1149
1150        let now = TimestampMs::now();
1151        let partition = grant.partition().map_err(|e| SdkError::Config {
1152            context: "claim_from_grant".to_owned(),
1153            field: Some("partition_key".to_owned()),
1154            message: e.to_string(),
1155        })?;
1156        let mut task = self
1157            .claim_execution(&grant.execution_id, &lane, &partition, now)
1158            .await?;
1159        task.set_concurrency_permit(permit);
1160        Ok(task)
1161    }
1162
1163    /// Scheduler-routed claim: POST the server's
1164    /// `/v1/workers/{id}/claim`, then chain to
1165    /// [`Self::claim_from_grant`].
1166    ///
1167    /// Batch C item 2 PR-B. This is the production entry point —
1168    /// budget + quota + capability admission run server-side inside
1169    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1170    /// enable the `direct-valkey-claim` feature.
1171    ///
1172    /// Returns `Ok(None)` when the server says no eligible execution
1173    /// (HTTP 204). Callers typically back off by
1174    /// `config.claim_poll_interval_ms` and try again, same cadence
1175    /// as the direct-claim path's `Ok(None)`.
1176    ///
1177    /// The `admin` client is the established HTTP surface
1178    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1179    /// second reqwest client around. Build once at worker boot and
1180    /// hand in by reference on every claim.
1181    pub async fn claim_via_server(
1182        &self,
1183        admin: &crate::FlowFabricAdminClient,
1184        lane: &LaneId,
1185        grant_ttl_ms: u64,
1186    ) -> Result<Option<ClaimedTask>, SdkError> {
1187        let req = crate::admin::ClaimForWorkerRequest {
1188            worker_id: self.config.worker_id.to_string(),
1189            lane_id: lane.to_string(),
1190            worker_instance_id: self.config.worker_instance_id.to_string(),
1191            capabilities: self.config.capabilities.clone(),
1192            grant_ttl_ms,
1193        };
1194        let Some(resp) = admin.claim_for_worker(req).await? else {
1195            return Ok(None);
1196        };
1197        let grant = resp.into_grant()?;
1198        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1199    }
1200
1201    /// Consume a [`ReclaimGrant`] and transition the granted
1202    /// `attempt_interrupted` execution into a `started` state on this
1203    /// worker. Symmetric partner to [`claim_from_grant`] for the
1204    /// resume path.
1205    ///
1206    /// The grant must have been issued to THIS worker (matching
1207    /// `worker_id` at grant time). A mismatch returns
1208    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1209    /// atomically by `ff_claim_resumed_execution`; a second call with
1210    /// the same grant also returns `InvalidClaimGrant`.
1211    ///
1212    /// # Concurrency
1213    ///
1214    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1215    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1216    /// assume pre-existing capacity on this worker — a reclaim can
1217    /// land on a fresh worker instance that just came up after a
1218    /// crash/restart and is picking up a previously-interrupted
1219    /// execution. If the worker is saturated, the grant stays valid
1220    /// for its remaining TTL and the caller can release it or retry.
1221    ///
1222    /// On success the returned [`ClaimedTask`] holds a concurrency
1223    /// permit that releases automatically on
1224    /// `complete`/`fail`/`cancel`/drop.
1225    ///
1226    /// # Errors
1227    ///
1228    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1229    ///   permits all held. Retryable; the grant is untouched (no
1230    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1231    ///   atomically consume the grant key).
1232    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1233    ///   or `worker_id` mismatch.
1234    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1235    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1236    ///   `attempt_interrupted`.
1237    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1238    ///   not `runnable`.
1239    /// * `ScriptError::ExecutionNotFound` — core key missing.
1240    /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1241    ///   transport.
1242    ///
1243    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1244    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1245    pub async fn claim_from_reclaim_grant(
1246        &self,
1247        grant: ff_core::contracts::ReclaimGrant,
1248    ) -> Result<ClaimedTask, SdkError> {
1249        // Semaphore check FIRST — same load-bearing ordering as
1250        // `claim_from_grant`. If the worker is saturated, surface
1251        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1252        // atomic consume on the grant key, so calling it past-
1253        // saturation would destroy the grant while leaving no
1254        // permit to attach to the returned `ClaimedTask`.
1255        let permit = self
1256            .concurrency_semaphore
1257            .clone()
1258            .try_acquire_owned()
1259            .map_err(|_| SdkError::WorkerAtCapacity)?;
1260
1261        // Grant carries partition + lane_id so no round-trip is needed
1262        // to resolve them before the FCALL.
1263        let partition = grant.partition().map_err(|e| SdkError::Config {
1264            context: "claim_from_reclaim_grant".to_owned(),
1265            field: Some("partition_key".to_owned()),
1266            message: e.to_string(),
1267        })?;
1268        let mut task = self
1269            .claim_resumed_execution(
1270                &grant.execution_id,
1271                &grant.lane_id,
1272                &partition,
1273            )
1274            .await?;
1275        task.set_concurrency_permit(permit);
1276        Ok(task)
1277    }
1278
1279    /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1280    /// and returns a `ClaimedTask` bound to the resumed attempt.
1281    ///
1282    /// Previously gated behind `direct-valkey-claim`; ungated so the
1283    /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1284    /// The method stays private — external callers use
1285    /// `claim_from_reclaim_grant`.
1286    ///
1287    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1288    async fn claim_resumed_execution(
1289        &self,
1290        execution_id: &ExecutionId,
1291        lane_id: &LaneId,
1292        partition: &ff_core::partition::Partition,
1293    ) -> Result<ClaimedTask, SdkError> {
1294        let ctx = ExecKeyContext::new(partition, execution_id);
1295        let idx = IndexKeys::new(partition);
1296
1297        // Pre-read current_attempt_index for the existing attempt hash key.
1298        // This is load-bearing: KEYS[6] must point to the real attempt hash.
1299        let att_idx_str: Option<String> = self.client
1300            .cmd("HGET")
1301            .arg(ctx.core())
1302            .arg("current_attempt_index")
1303            .execute()
1304            .await
1305            .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1306        let att_idx = AttemptIndex::new(
1307            att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1308        );
1309
1310        let lease_id = LeaseId::new().to_string();
1311
1312        // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1313        let keys: Vec<String> = vec![
1314            ctx.core(),                                             // 1  exec_core
1315            ctx.claim_grant(),                                      // 2  claim_grant
1316            idx.lane_eligible(lane_id),                             // 3  eligible_zset
1317            idx.lease_expiry(),                                     // 4  lease_expiry_zset
1318            idx.worker_leases(&self.config.worker_instance_id),     // 5  worker_leases
1319            ctx.attempt_hash(att_idx),                              // 6  existing_attempt_hash
1320            ctx.lease_current(),                                    // 7  lease_current
1321            ctx.lease_history(),                                    // 8  lease_history
1322            idx.lane_active(lane_id),                               // 9  active_index
1323            idx.attempt_timeout(),                                  // 10 attempt_timeout_zset
1324            idx.execution_deadline(),                               // 11 execution_deadline_zset
1325        ];
1326
1327        // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1328        let args: Vec<String> = vec![
1329            execution_id.to_string(),                               // 1  execution_id
1330            self.config.worker_id.to_string(),                      // 2  worker_id
1331            self.config.worker_instance_id.to_string(),             // 3  worker_instance_id
1332            lane_id.to_string(),                                    // 4  lane
1333            String::new(),                                          // 5  capability_hash
1334            lease_id.clone(),                                       // 6  lease_id
1335            self.config.lease_ttl_ms.to_string(),                   // 7  lease_ttl_ms
1336            String::new(),                                          // 8  remaining_attempt_timeout_ms
1337        ];
1338
1339        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1340        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1341
1342        let raw: Value = self
1343            .client
1344            .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1345            .await
1346            .map_err(SdkError::Valkey)?;
1347
1348        // Parse result — same format as ff_claim_execution:
1349        // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1350        let arr = match &raw {
1351            Value::Array(arr) => arr,
1352            _ => {
1353                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1354                    fcall: "ff_claim_resumed_execution".into(),
1355                    execution_id: Some(execution_id.to_string()),
1356                    message: "expected Array".into(),
1357                }));
1358            }
1359        };
1360
1361        let status_code = match arr.first() {
1362            Some(Ok(Value::Int(n))) => *n,
1363            _ => {
1364                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1365                    fcall: "ff_claim_resumed_execution".into(),
1366                    execution_id: Some(execution_id.to_string()),
1367                    message: "bad status code".into(),
1368                }));
1369            }
1370        };
1371
1372        if status_code != 1 {
1373            let err_field_str = |idx: usize| -> String {
1374                arr.get(idx)
1375                    .and_then(|v| match v {
1376                        Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1377                        Ok(Value::SimpleString(s)) => Some(s.clone()),
1378                        _ => None,
1379                    })
1380                    .unwrap_or_default()
1381            };
1382            let error_code = {
1383                let s = err_field_str(1);
1384                if s.is_empty() { "unknown".to_owned() } else { s }
1385            };
1386            let detail = err_field_str(2);
1387
1388            return Err(SdkError::from(
1389                ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1390                    .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1391                        fcall: "ff_claim_resumed_execution".into(),
1392                        execution_id: Some(execution_id.to_string()),
1393                        message: format!("unknown error: {error_code}"),
1394                    }),
1395            ));
1396        }
1397
1398        let field_str = |idx: usize| -> String {
1399            arr.get(idx + 2)
1400                .and_then(|v| match v {
1401                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1402                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1403                    Ok(Value::Int(n)) => Some(n.to_string()),
1404                    _ => None,
1405                })
1406                .unwrap_or_default()
1407        };
1408
1409        let lease_id = LeaseId::parse(&field_str(0))
1410            .unwrap_or_else(|_| LeaseId::new());
1411        let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1412        let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1413        let attempt_id = AttemptId::parse(&field_str(3))
1414            .unwrap_or_else(|_| AttemptId::new());
1415
1416        let (input_payload, execution_kind, tags) = self
1417            .read_execution_context(execution_id, partition)
1418            .await?;
1419
1420        Ok(ClaimedTask::new(
1421            self.client.clone(),
1422            self.backend.clone(),
1423            self.partition_config,
1424            execution_id.clone(),
1425            attempt_index,
1426            attempt_id,
1427            lease_id,
1428            lease_epoch,
1429            self.config.lease_ttl_ms,
1430            lane_id.clone(),
1431            self.config.worker_instance_id.clone(),
1432            input_payload,
1433            execution_kind,
1434            tags,
1435        ))
1436    }
1437
1438    /// Read payload + execution_kind + tags from exec_core. Previously
1439    /// gated behind `direct-valkey-claim`; now shared by the
1440    /// feature-gated inline claim path and the public
1441    /// `claim_from_reclaim_grant` entry point.
1442    async fn read_execution_context(
1443        &self,
1444        execution_id: &ExecutionId,
1445        partition: &ff_core::partition::Partition,
1446    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1447        let ctx = ExecKeyContext::new(partition, execution_id);
1448
1449        // Read payload
1450        let payload: Option<String> = self
1451            .client
1452            .get(&ctx.payload())
1453            .await
1454            .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1455        let input_payload = payload.unwrap_or_default().into_bytes();
1456
1457        // Read execution_kind from core
1458        let kind: Option<String> = self
1459            .client
1460            .hget(&ctx.core(), "execution_kind")
1461            .await
1462            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1463        let execution_kind = kind.unwrap_or_default();
1464
1465        // Read tags
1466        let tags: HashMap<String, String> = self
1467            .client
1468            .hgetall(&ctx.tags())
1469            .await
1470            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1471
1472        Ok((input_payload, execution_kind, tags))
1473    }
1474
1475    // ── Phase 3: Signal delivery ──
1476
1477    /// Deliver a signal to a suspended execution's waitpoint.
1478    ///
1479    /// The engine atomically records the signal, evaluates the resume condition,
1480    /// and optionally transitions the execution from `suspended` to `runnable`.
1481    pub async fn deliver_signal(
1482        &self,
1483        execution_id: &ExecutionId,
1484        waitpoint_id: &WaitpointId,
1485        signal: crate::task::Signal,
1486    ) -> Result<crate::task::SignalOutcome, SdkError> {
1487        let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1488        let ctx = ExecKeyContext::new(&partition, execution_id);
1489        let idx = IndexKeys::new(&partition);
1490
1491        let signal_id = ff_core::types::SignalId::new();
1492        let now = TimestampMs::now();
1493
1494        // Pre-read lane_id from exec_core — the execution may be on any lane,
1495        // not necessarily one of this worker's configured lanes.
1496        let lane_str: Option<String> = self
1497            .client
1498            .hget(&ctx.core(), "lane_id")
1499            .await
1500            .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1501        let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1502
1503        // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1504        //            exec_signals_zset, signal_hash, signal_payload,
1505        //            idem_key, waitpoint_hash, suspension_current,
1506        //            eligible_zset, suspended_zset, delayed_zset,
1507        //            suspension_timeout_zset, hmac_secrets
1508        let idem_key = if let Some(ref ik) = signal.idempotency_key {
1509            ctx.signal_dedup(waitpoint_id, ik)
1510        } else {
1511            ctx.noop() // must share {p:N} hash tag for cluster mode
1512        };
1513        let keys: Vec<String> = vec![
1514            ctx.core(),                                    // 1
1515            ctx.waitpoint_condition(waitpoint_id),         // 2
1516            ctx.waitpoint_signals(waitpoint_id),           // 3
1517            ctx.exec_signals(),                            // 4
1518            ctx.signal(&signal_id),                        // 5
1519            ctx.signal_payload(&signal_id),                // 6
1520            idem_key,                                      // 7
1521            ctx.waitpoint(waitpoint_id),                   // 8
1522            ctx.suspension_current(),                      // 9
1523            idx.lane_eligible(&lane_id),                   // 10
1524            idx.lane_suspended(&lane_id),                  // 11
1525            idx.lane_delayed(&lane_id),                    // 12
1526            idx.suspension_timeout(),                      // 13
1527            idx.waitpoint_hmac_secrets(),                  // 14
1528        ];
1529
1530        let payload_str = signal
1531            .payload
1532            .as_ref()
1533            .map(|p| String::from_utf8_lossy(p).into_owned())
1534            .unwrap_or_default();
1535
1536        // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1537        //            signal_category, source_type, source_identity,
1538        //            payload, payload_encoding, idempotency_key,
1539        //            correlation_id, target_scope, created_at,
1540        //            dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1541        //            max_signals_per_execution, waitpoint_token
1542        let args: Vec<String> = vec![
1543            signal_id.to_string(),                           // 1
1544            execution_id.to_string(),                        // 2
1545            waitpoint_id.to_string(),                        // 3
1546            signal.signal_name,                              // 4
1547            signal.signal_category,                          // 5
1548            signal.source_type,                              // 6
1549            signal.source_identity,                          // 7
1550            payload_str,                                     // 8
1551            "json".to_owned(),                               // 9 payload_encoding
1552            signal.idempotency_key.unwrap_or_default(),      // 10
1553            String::new(),                                   // 11 correlation_id
1554            "waitpoint".to_owned(),                          // 12 target_scope
1555            now.to_string(),                                 // 13 created_at
1556            "86400000".to_owned(),                           // 14 dedup_ttl_ms
1557            "0".to_owned(),                                  // 15 resume_delay_ms
1558            "1000".to_owned(),                               // 16 signal_maxlen
1559            "10000".to_owned(),                              // 17 max_signals
1560            // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1561            // use ToString/Display (those are redacted for log safety);
1562            // .as_str() is the explicit opt-in that gets the secret bytes.
1563            signal.waitpoint_token.as_str().to_owned(),      // 18 waitpoint_token
1564        ];
1565
1566        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1567        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1568
1569        let raw: Value = self
1570            .client
1571            .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1572            .await
1573            .map_err(SdkError::Valkey)?;
1574
1575        crate::task::parse_signal_result(&raw)
1576    }
1577
1578    #[cfg(feature = "direct-valkey-claim")]
1579    fn next_lane(&self) -> LaneId {
1580        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1581        self.config.lanes[idx].clone()
1582    }
1583}
1584
1585#[cfg(feature = "direct-valkey-claim")]
1586fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1587    use ff_core::error::ErrorClass;
1588    matches!(
1589        ff_script::engine_error_ext::class(err),
1590        ErrorClass::Retryable | ErrorClass::Informational
1591    )
1592}
1593
1594/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1595/// instance id with FNV-1a to place distinct worker processes on different
1596/// partition windows from their first poll. Zero is valid for single-worker
1597/// clusters but spreads work in multi-worker deployments.
1598#[cfg(feature = "direct-valkey-claim")]
1599fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1600    if num_partitions == 0 {
1601        return 0;
1602    }
1603    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1604}
1605
1606#[cfg(feature = "direct-valkey-claim")]
1607fn extract_first_array_string(value: &Value) -> Option<String> {
1608    match value {
1609        Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1610            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1611            Ok(Value::SimpleString(s)) => Some(s.clone()),
1612            _ => None,
1613        },
1614        _ => None,
1615    }
1616}
1617
1618/// Read partition config from Valkey's `ff:config:partitions` hash.
1619/// Returns Err if the key doesn't exist or can't be read.
1620async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1621    let key = ff_core::keys::global_config_partitions();
1622    let fields: HashMap<String, String> = client
1623        .hgetall(&key)
1624        .await
1625        .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1626
1627    if fields.is_empty() {
1628        return Err(SdkError::Config {
1629            context: "read_partition_config".into(),
1630            field: None,
1631            message: "ff:config:partitions not found in Valkey".into(),
1632        });
1633    }
1634
1635    let parse = |field: &str, default: u16| -> u16 {
1636        fields
1637            .get(field)
1638            .and_then(|v| v.parse().ok())
1639            .filter(|&n: &u16| n > 0)
1640            .unwrap_or(default)
1641    };
1642
1643    Ok(PartitionConfig {
1644        num_flow_partitions: parse("num_flow_partitions", 256),
1645        num_budget_partitions: parse("num_budget_partitions", 32),
1646        num_quota_partitions: parse("num_quota_partitions", 32),
1647    })
1648}
1649
1650#[cfg(test)]
1651mod completion_accessor_type_tests {
1652    //! Type-level compile check that
1653    //! [`FlowFabricWorker::completion_backend`] returns an
1654    //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1655    //! the assertion is at the function-pointer type level and the
1656    //! #[test] body exists solely so the compiler elaborates it.
1657    use super::FlowFabricWorker;
1658    use ff_core::completion_backend::CompletionBackend;
1659    use std::sync::Arc;
1660
1661    #[test]
1662    fn completion_backend_accessor_signature() {
1663        // If this line compiles, the public accessor returns the
1664        // advertised type. The function is never called (no live
1665        // worker), so no I/O happens.
1666        let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1667            FlowFabricWorker::completion_backend;
1668    }
1669}
1670
1671#[cfg(all(test, feature = "direct-valkey-claim"))]
1672mod scan_cursor_tests {
1673    use super::scan_cursor_seed;
1674
1675    #[test]
1676    fn stable_for_same_input() {
1677        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1678    }
1679
1680    #[test]
1681    fn distinct_for_different_ids() {
1682        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1683    }
1684
1685    #[test]
1686    fn bounded_by_partition_count() {
1687        for i in 0..100 {
1688            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1689        }
1690    }
1691
1692    #[test]
1693    fn zero_partitions_returns_zero() {
1694        assert_eq!(scan_cursor_seed("w1", 0), 0);
1695    }
1696}