Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6// RFC-023 Phase 1a (§4.4 item 10b): ferriskey imports scoped behind
7// `valkey-default` so the module compiles under
8// `--no-default-features, features = ["sqlite"]`.
9#[cfg(feature = "valkey-default")]
10use ferriskey::Client;
11use ff_core::partition::PartitionConfig;
12use ff_core::types::*;
13use tokio::sync::Semaphore;
14
15use crate::config::WorkerConfig;
16use crate::task::ClaimedTask;
17use crate::SdkError;
18
19/// FlowFabric worker — connects to Valkey, claims executions, and provides
20/// the worker-facing API.
21///
22/// # Admission control
23///
24/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
25/// **bypasses the scheduler's admission controls**: it reads the eligible
26/// ZSET directly and mints its own claim grant without consulting budget
27/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
28/// benchmarks, tests, and single-tenant development where the scheduler
29/// hop is measurement noise, not for production.
30///
31/// For production deployments, consume scheduler-issued grants via
32/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
33/// budget breach, quota sliding-window, concurrency cap, and
34/// capability-match checks before issuing grants.
35///
36/// # Usage
37///
38/// ```rust,ignore
39/// use ff_core::backend::BackendConfig;
40/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
41/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
42///
43/// let config = WorkerConfig {
44///     backend: BackendConfig::valkey("localhost", 6379),
45///     worker_id: WorkerId::new("w1"),
46///     worker_instance_id: WorkerInstanceId::new("w1-i1"),
47///     namespace: Namespace::new("default"),
48///     lanes: vec![LaneId::new("main")],
49///     capabilities: Vec::new(),
50///     lease_ttl_ms: 30_000,
51///     claim_poll_interval_ms: 1_000,
52///     max_concurrent_tasks: 1,
53///     partition_config: None,
54/// };
55/// let worker = FlowFabricWorker::connect(config).await?;
56///
57/// loop {
58///     if let Some(task) = worker.claim_next().await? {
59///         // Process task...
60///         task.complete(Some(b"result".to_vec())).await?;
61///     } else {
62///         tokio::time::sleep(Duration::from_secs(1)).await;
63///     }
64/// }
65/// ```
66pub struct FlowFabricWorker {
67    /// RFC-023 Phase 1a (§4.4 item 10c): `ferriskey::Client` is
68    /// `valkey-default`-gated **and** `Option` wrapped. Under
69    /// sqlite-only features the field is absent. Under
70    /// `valkey-default` the field is `Some(client)` when the worker
71    /// was built via [`Self::connect`] (the Valkey-bundled entry
72    /// point that dials), and `None` when it was built via
73    /// [`Self::connect_with`] (the backend-agnostic entry point per
74    /// §4.4 item 10e — no ferriskey round-trip). Claim/signal hot
75    /// paths (all `valkey-default`-gated per §4.4 item 10f) expect
76    /// `Some`; a claim call against a `connect_with`-built worker
77    /// panics with a clear message until the backend-agnostic SDK
78    /// worker-loop RFC lands (tracked in §8).
79    #[cfg(feature = "valkey-default")]
80    #[allow(dead_code)]
81    client: Option<Client>,
82    config: WorkerConfig,
83    partition_config: PartitionConfig,
84    /// 8-hex FNV-1a digest of the sorted capabilities CSV. Used in
85    /// per-mismatch logs so the 4KB CSV never echoes on every reject
86    /// during an incident. For [`Self::connect`]-built workers, the
87    /// full CSV is additionally logged once at connect-time WARN via
88    /// `valkey_preamble::run` for cross-reference; [`Self::connect_with`]-
89    /// built workers compute the hash without the companion CSV log.
90    /// Mirrors `ff-scheduler::claim::worker_caps_digest`.
91    #[cfg(feature = "direct-valkey-claim")]
92    worker_capabilities_hash: String,
93    #[cfg(feature = "direct-valkey-claim")]
94    lane_index: AtomicUsize,
95    /// Concurrency cap for in-flight tasks. Permits are acquired or
96    /// transferred by [`claim_next`] (feature-gated),
97    /// [`claim_from_grant`] (always available), and
98    /// [`claim_from_reclaim_grant`], transferred to the returned
99    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
100    /// Holds `max_concurrent_tasks` permits total.
101    ///
102    /// [`claim_next`]: FlowFabricWorker::claim_next
103    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
104    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
105    #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
106    concurrency_semaphore: Arc<Semaphore>,
107    /// Rolling offset for chunked partition scans. Each poll advances the
108    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
109    /// chunk)` polls every partition is covered. The initial value is
110    /// derived from `worker_instance_id` so idle workers spread their
111    /// scans across different partitions from the first poll onward.
112    ///
113    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
114    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
115    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
116    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
117    /// correctness because the sequence simply restarts a new cycle.
118    #[cfg(feature = "direct-valkey-claim")]
119    scan_cursor: AtomicUsize,
120    /// The [`EngineBackend`] the Stage-1b trait forwarders route
121    /// through.
122    ///
123    /// **RFC-012 Stage 1b.** Always populated:
124    /// [`FlowFabricWorker::connect`] now wraps the worker's own
125    /// `ferriskey::Client` in a `ValkeyBackend` via
126    /// `ValkeyBackend::from_client_and_partitions`, and
127    /// [`FlowFabricWorker::connect_with`] replaces that default with
128    /// the caller-supplied `Arc<dyn EngineBackend>`. The
129    /// [`FlowFabricWorker::backend`] accessor still returns
130    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
131    /// narrows the return type once consumers have migrated.
132    ///
133    /// Hot paths (claim, deliver_signal, admin queries) still use the
134    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
135    /// migrates them through this field, and Stage 1d removes the
136    /// embedded client.
137    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
138    /// Optional handle to the same underlying backend viewed as a
139    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
140    /// Populated by [`Self::connect`] from the bundled
141    /// `ValkeyBackend` (which implements the trait); supplied by the
142    /// caller on [`Self::connect_with`] as an explicit
143    /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
144    /// backend does not support push-based completion" (e.g. a future
145    /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
146    /// and other completion-subscription consumers reach this through
147    /// [`Self::completion_backend`].
148    completion_backend_handle:
149        Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
150}
151
152/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
153/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
154/// O(num_flow_partitions).
155#[cfg(feature = "direct-valkey-claim")]
156const PARTITION_SCAN_CHUNK: usize = 32;
157
158impl FlowFabricWorker {
159    /// RFC-023 Phase 1a helper: borrow the embedded `ferriskey::Client`,
160    /// panicking with a clear message if the worker was built via
161    /// [`Self::connect_with`] (which does not dial Valkey). Every
162    /// Valkey-specific claim/signal method funnels through this accessor
163    /// so the panic site is uniform and traceable.
164    ///
165    /// A backend-agnostic claim/signal API is deferred per §8 breaking-
166    /// change disclosure; until that lands, valkey-default consumers
167    /// must use [`Self::connect`] if they intend to drive the Valkey
168    /// claim/signal loop.
169    #[cfg(feature = "valkey-default")]
170    #[inline]
171    #[allow(dead_code)]
172    fn valkey_client(&self) -> &Client {
173        self.client.as_ref().expect(
174            "FlowFabricWorker was built via connect_with (no Valkey dial) \
175             but a Valkey-specific claim/signal method was invoked. \
176             Use FlowFabricWorker::connect to dial Valkey, or drive the \
177             backend directly through the trait surface via .backend().",
178        )
179    }
180}
181
182impl FlowFabricWorker {
183    /// Connect to Valkey and prepare the worker.
184    ///
185    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
186    /// library — that is the server's responsibility (ff-server calls
187    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
188    /// the library is already loaded.
189    ///
190    /// # Smoke / dev scripts: rotate `WorkerInstanceId`
191    ///
192    /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
193    /// `WorkerInstanceId`. When a smoke / dev script reuses the same
194    /// `WorkerInstanceId` across restarts, subsequent runs trap behind
195    /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
196    /// configured lease TTL) expires — the worker appears stuck and
197    /// claims nothing. Iterative scripts should synthesise a fresh
198    /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
199    /// or embed a UUID/timestamp) rather than hard-coding a stable
200    /// value. Production workers that cleanly shut down release the
201    /// key; only crashed / kill -9'd processes hit this trap.
202    ///
203    /// **RFC-023 Phase 1a (§4.4 item 10d, v0.12.0):** this Valkey-bundled
204    /// entry point is `#[cfg(feature = "valkey-default")]`-gated.
205    /// Consumers on the sqlite-only feature set must use
206    /// [`FlowFabricWorker::connect_with`] and drive the backend directly
207    /// through the trait surface.
208    #[cfg(feature = "valkey-default")]
209    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
210        if config.lanes.is_empty() {
211            return Err(SdkError::Config {
212                context: "worker_config".into(),
213                field: None,
214                message: "at least one lane is required".into(),
215            });
216        }
217
218        // Build the ferriskey client from the nested `BackendConfig`.
219        // Delegates to `ff_backend_valkey::build_client` so host/port +
220        // TLS + cluster + `BackendTimeouts::request` + `BackendRetry`
221        // wiring lives in exactly one place (RFC-012 Stage 1c tranche 1).
222        let client = ff_backend_valkey::build_client(&config.backend).await?;
223
224        // v0.12 PR-6: the Valkey-specific preamble (PING, alive-key
225        // SET-NX, `ff:config:partitions` HGETALL, caps ingress +
226        // sorted-dedup CSV, caps STRING + workers-index writes) lives
227        // in `crate::valkey_preamble`. Extracted byte-for-byte from
228        // the pre-PR-6 inline body; the write order is observable
229        // from scheduler-side reads (unblock scanner:
230        // SMEMBERS ff:idx:workers → GET ff:worker:{id}:caps) so
231        // preservation is load-bearing. See `valkey_preamble::run`.
232        let crate::valkey_preamble::PreambleOutput {
233            partition_config,
234            #[cfg(feature = "direct-valkey-claim")]
235            capabilities_csv: _worker_capabilities_csv,
236            #[cfg(feature = "direct-valkey-claim")]
237            capabilities_hash: worker_capabilities_hash,
238        } = crate::valkey_preamble::run(&client, &config).await?;
239
240        let max_tasks = config.max_concurrent_tasks.max(1);
241        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
242
243        tracing::info!(
244            worker_id = %config.worker_id,
245            instance_id = %config.worker_instance_id,
246            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
247            "FlowFabricWorker connected"
248        );
249
250        #[cfg(feature = "direct-valkey-claim")]
251        let scan_cursor_init = scan_cursor_seed(
252            config.worker_instance_id.as_str(),
253            partition_config.num_flow_partitions.max(1) as usize,
254        );
255
256        // RFC-012 Stage 1b: wrap the dialed client in a ValkeyBackend
257        // so `ClaimedTask`'s trait forwarders have something to call.
258        // `from_client_and_partitions` reuses the already-dialed client
259        // — no second connection. Share the concrete
260        // `Arc<ValkeyBackend>` across the two trait objects — one
261        // allocation, both accessors yield identity-equivalent handles.
262        let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
263            ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
264                client.clone(),
265                partition_config,
266            );
267        let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
268        let completion_backend_handle: Option<
269            Arc<dyn ff_core::completion_backend::CompletionBackend>,
270        > = Some(valkey_backend);
271
272        Ok(Self {
273            client: Some(client),
274            config,
275            partition_config,
276            #[cfg(feature = "direct-valkey-claim")]
277            worker_capabilities_hash,
278            #[cfg(feature = "direct-valkey-claim")]
279            lane_index: AtomicUsize::new(0),
280            concurrency_semaphore,
281            #[cfg(feature = "direct-valkey-claim")]
282            scan_cursor: AtomicUsize::new(scan_cursor_init),
283            backend,
284            completion_backend_handle,
285        })
286    }
287
288    /// Store pre-built [`EngineBackend`] and (optional)
289    /// [`CompletionBackend`] handles on the worker. Builds the worker
290    /// via the legacy [`FlowFabricWorker::connect`] path first (so the
291    /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
292    /// paths still use is dialed), then replaces the default
293    /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
294    ///
295    /// The `completion` argument is explicit: 0.3.3 previously accepted
296    /// only `backend` and `completion_backend()` silently returned
297    /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
298    /// upcast to `Arc<dyn CompletionBackend>` without loss of
299    /// trait-object identity. 0.3.4 lets the caller decide.
300    ///
301    /// - `Some(arc)` — caller supplies a completion backend.
302    ///   [`Self::completion_backend`] returns `Some(clone)`.
303    /// - `None` — this backend does not support push-based completion
304    ///   (future Postgres backend without LISTEN/NOTIFY, test mocks).
305    ///   [`Self::completion_backend`] returns `None`.
306    ///
307    /// When the underlying backend implements both traits (as
308    /// `ValkeyBackend` does), pass the same `Arc` twice — the two
309    /// trait-object views share one allocation:
310    ///
311    /// ```rust,ignore
312    /// use std::sync::Arc;
313    /// use ff_backend_valkey::ValkeyBackend;
314    /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
315    ///
316    /// # async fn doc(worker_config: WorkerConfig,
317    /// #              backend_config: ff_backend_valkey::BackendConfig)
318    /// #     -> Result<(), ff_sdk::SdkError> {
319    /// // Valkey (completion supported):
320    /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
321    /// let worker = FlowFabricWorker::connect_with(
322    ///     worker_config,
323    ///     valkey.clone(),
324    ///     Some(valkey),
325    /// ).await?;
326    /// # Ok(()) }
327    /// ```
328    ///
329    /// Backend without completion support:
330    ///
331    /// ```rust,ignore
332    /// let worker = FlowFabricWorker::connect_with(
333    ///     worker_config,
334    ///     backend,
335    ///     None,
336    /// ).await?;
337    /// ```
338    ///
339    /// **Stage 1b + Round-7 scope — what the injected backend covers
340    /// today.** The injected backend currently covers these per-task
341    /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
342    /// `delay_execution` / `move_to_waiting_children` / `complete` /
343    /// `cancel` / `fail` / `create_pending_waitpoint` /
344    /// `append_frame` / `report_usage`. A mock backend therefore sees
345    /// that portion of the worker's per-task write surface. Lease
346    /// renewal also routes through `backend.renew(&handle)`. Round-7
347    /// (#135/#145) closed the four trait-shape gaps tracked by #117,
348    /// but `suspend` still reaches the embedded `ferriskey::Client`
349    /// directly via `ff_suspend_execution` — this is the deferred
350    /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
351    /// work. `claim_next` / `claim_from_grant` /
352    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
353    /// are Stage 1c hot-path work. Stage 1d removes the embedded
354    /// client entirely.
355    ///
356    /// Today's constructor is therefore NOT yet a drop-in way to swap
357    /// in a non-Valkey backend — it requires a reachable Valkey node
358    /// for `suspend` plus the remaining hot-path ops. Tests that
359    /// exercise only the migrated per-task ops can run fully against
360    /// a mock backend.
361    ///
362    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
363    /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
364    pub async fn connect_with(
365        config: WorkerConfig,
366        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
367        completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
368    ) -> Result<Self, SdkError> {
369        // RFC-023 Phase 1a (§4.4 item 10e, v0.12.0): direct
370        // backend-agnostic construction. No `Self::connect` preamble,
371        // no ferriskey round-trips (no PING, no alive-key SET-NX, no
372        // `ff:config:partitions` HGETALL). Callers needing a
373        // non-default `PartitionConfig` under non-Valkey backends set
374        // [`WorkerConfig::partition_config`] (v0.12 PR-6 closed the
375        // original follow-up flagged here).
376        //
377        // Lane-empty validation (the one check `connect` did before
378        // any Valkey work) is hoisted here so every entry point
379        // refuses an empty lane list.
380        if config.lanes.is_empty() {
381            return Err(SdkError::Config {
382                context: "worker_config".into(),
383                field: None,
384                message: "at least one lane is required".into(),
385            });
386        }
387
388        let max_tasks = config.max_concurrent_tasks.max(1);
389        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
390        // v0.12 PR-6: honor the optional `WorkerConfig::partition_config`
391        // override. `None` keeps the pre-PR-6 default shape (256 / 32 /
392        // 32); `Some(cfg)` lets non-Valkey deployments with a custom
393        // `num_flow_partitions` bind correctly instead of silently
394        // missing data via the wrong partition index.
395        let partition_config = config.partition_config.unwrap_or_default();
396
397        #[cfg(feature = "direct-valkey-claim")]
398        let scan_cursor_init = scan_cursor_seed(
399            config.worker_instance_id.as_str(),
400            partition_config.num_flow_partitions.max(1) as usize,
401        );
402
403        // Build the capabilities CSV / hash inline for the
404        // direct-valkey-claim path. The validation mirrors `connect`'s
405        // ingress so the two entry points refuse the same malformed
406        // tokens.
407        #[cfg(feature = "direct-valkey-claim")]
408        for cap in &config.capabilities {
409            if cap.is_empty() {
410                return Err(SdkError::Config {
411                    context: "worker_config".into(),
412                    field: Some("capabilities".into()),
413                    message: "capability token must not be empty".into(),
414                });
415            }
416            if cap.contains(',') {
417                return Err(SdkError::Config {
418                    context: "worker_config".into(),
419                    field: Some("capabilities".into()),
420                    message: format!(
421                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
422                    ),
423                });
424            }
425            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
426                return Err(SdkError::Config {
427                    context: "worker_config".into(),
428                    field: Some("capabilities".into()),
429                    message: format!(
430                        "capability token must not contain whitespace or control \
431                         characters: {cap:?}"
432                    ),
433                });
434            }
435        }
436        #[cfg(feature = "direct-valkey-claim")]
437        let worker_capabilities_hash = {
438            let set: std::collections::BTreeSet<&str> = config
439                .capabilities
440                .iter()
441                .map(|s| s.as_str())
442                .filter(|s| !s.is_empty())
443                .collect();
444            let csv: String = set.into_iter().collect::<Vec<_>>().join(",");
445            ff_core::hash::fnv1a_xor8hex(&csv)
446        };
447
448        Ok(Self {
449            #[cfg(feature = "valkey-default")]
450            client: None,
451            config,
452            partition_config,
453            #[cfg(feature = "direct-valkey-claim")]
454            worker_capabilities_hash,
455            #[cfg(feature = "direct-valkey-claim")]
456            lane_index: AtomicUsize::new(0),
457            concurrency_semaphore,
458            #[cfg(feature = "direct-valkey-claim")]
459            scan_cursor: AtomicUsize::new(scan_cursor_init),
460            backend,
461            completion_backend_handle: completion,
462        })
463    }
464
465    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
466    /// ops through.
467    ///
468    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
469    /// the `Option` wrapper is retained for API stability with the
470    /// Stage-1a shape. Stage 1c narrows the return type to
471    /// `&Arc<dyn EngineBackend>`.
472    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
473        Some(&self.backend)
474    }
475
476    /// Crate-internal direct borrow of the backend. The public
477    /// [`Self::backend`] still returns `Option` for API stability
478    /// (Stage 1b holdover). Snapshot trait-forwarders in
479    /// [`crate::snapshot`] need an un-wrapped reference.
480    #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
481    pub(crate) fn backend_ref(
482        &self,
483    ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
484        &self.backend
485    }
486
487    /// Handle to the completion-event subscription backend, for
488    /// consumers that need to observe execution completions (DAG
489    /// reconcilers, tenant-isolated subscribers).
490    ///
491    /// Returns `Some` when the worker was built through
492    /// [`Self::connect`] on the default `valkey-default` feature
493    /// (the bundled `ValkeyBackend` implements
494    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
495    /// or via [`Self::connect_with`] with a `Some(..)` completion
496    /// handle. Returns `None` when the caller passed `None` to
497    /// [`Self::connect_with`] — i.e. the backend does not support
498    /// push-based completion streams (future Postgres without
499    /// LISTEN/NOTIFY, test mocks).
500    ///
501    /// The returned handle shares the same underlying allocation as
502    /// [`Self::backend`]; calls through it (e.g.
503    /// `subscribe_completions_filtered`) hit the same connection
504    /// the worker itself uses.
505    pub fn completion_backend(
506        &self,
507    ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
508        self.completion_backend_handle.clone()
509    }
510
511    /// Get the worker config.
512    pub fn config(&self) -> &WorkerConfig {
513        &self.config
514    }
515
516    /// Get the server-published partition config this worker bound to at
517    /// `connect()`. Exposed so consumers that mint custom
518    /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
519    /// produced outside this worker) stay aligned with the server's
520    /// `num_flow_partitions` — using `PartitionConfig::default()`
521    /// assumes 256 partitions and silently misses data on deployments
522    /// with any other value.
523    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
524        &self.partition_config
525    }
526
527    /// Attempt to claim the next eligible execution.
528    ///
529    /// Phase 1 simplified claim flow:
530    /// 1. Pick a lane (round-robin across configured lanes)
531    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
532    /// 3. Claim the execution via `ff_claim_execution`
533    /// 4. Read execution payload + tags
534    /// 5. Return a [`ClaimedTask`] with auto lease renewal
535    ///
536    /// Gated behind the `direct-valkey-claim` feature — bypasses the
537    /// scheduler's budget / quota / capability admission checks. Enable
538    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
539    /// the scheduler hop would be measurement noise (benches) or when
540    /// the test harness needs a deterministic worker-local path. Prefer
541    /// the scheduler-routed HTTP claim path in production.
542    ///
543    /// # `None` semantics
544    ///
545    /// `Ok(None)` means **no work was found in the partition window this
546    /// poll covered**, not "the cluster is idle". Each call scans a chunk
547    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
548    /// `scan_cursor`; the cursor advances by that chunk size on every
549    /// invocation, so a worker covers every partition exactly once every
550    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
551    ///
552    /// Callers should treat `None` as "poll again soon" (typically after
553    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
554    /// time". Backing off too aggressively on `None` can starve workers
555    /// when work lives on partitions outside the current window.
556    ///
557    /// Returns `Err` on Valkey errors or script failures.
558    #[cfg(feature = "direct-valkey-claim")]
559    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
560        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
561        // try_acquire returns immediately — if no permits available, the worker
562        // is at capacity and should not claim more work.
563        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
564            Ok(p) => p,
565            Err(_) => return Ok(None), // At capacity — no claim attempted
566        };
567
568        let lane_id = self.next_lane();
569        let now = TimestampMs::now();
570
571        // Phase 1: We scan eligible executions directly by reading the eligible
572        // ZSET across execution partitions. In production the scheduler
573        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
574        // simplified inline claim.
575        //
576        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
577        // partitions starting at a rolling offset. This keeps idle Valkey
578        // load at O(chunk) per worker-second instead of O(num_partitions),
579        // and the worker-instance-seeded initial cursor spreads concurrent
580        // workers across different partition windows.
581        let num_partitions = self.partition_config.num_flow_partitions as usize;
582        if num_partitions == 0 {
583            return Ok(None);
584        }
585        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
586        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
587
588        // Hoist the sorted/deduped capability set out of the loop — the
589        // per-partition iteration reused a fresh BTreeSet on every
590        // step pre-fix, adding O(n log n) alloc/sort to the scanner
591        // hot path on every tick. Computed once per `claim_next` call.
592        let worker_capabilities: std::collections::BTreeSet<String> = self
593            .config
594            .capabilities
595            .iter()
596            .cloned()
597            .collect();
598
599        for step in 0..chunk {
600            let partition_idx = ((start + step) % num_partitions) as u16;
601            let partition = ff_core::partition::Partition {
602                family: ff_core::partition::PartitionFamily::Execution,
603                index: partition_idx,
604            };
605
606            // v0.12 PR-5: trait-routed scanner primitive. Replaces the
607            // pre-PR-5 `ZRANGEBYSCORE` inline; the Valkey backend fires
608            // the identical command byte-for-byte (see
609            // `ff_backend_valkey::scan_eligible_executions_impl`).
610            let scan_args = ff_core::contracts::ScanEligibleArgs::new(
611                lane_id.clone(),
612                partition,
613                1,
614            );
615            let candidates = self.backend.scan_eligible_executions(scan_args).await?;
616            let execution_id = match candidates.into_iter().next() {
617                Some(id) => id,
618                None => continue, // No eligible executions on this partition
619            };
620
621            // Step 1: Issue claim grant (v0.12 PR-5: trait-routed).
622            let grant_args = ff_core::contracts::IssueClaimGrantArgs::new(
623                execution_id.clone(),
624                lane_id.clone(),
625                self.config.worker_id.clone(),
626                self.config.worker_instance_id.clone(),
627                partition,
628                worker_capabilities.clone(),
629                5_000, // grant_ttl_ms
630                now,
631            );
632            let grant_result = self.backend.issue_claim_grant(grant_args).await;
633
634            match grant_result {
635                Ok(_) => {}
636                Err(ref boxed)
637                    if matches!(
638                        boxed,
639                        crate::EngineError::Validation {
640                            kind: crate::ValidationKind::CapabilityMismatch,
641                            ..
642                        }
643                    ) =>
644                {
645                    let missing = match boxed {
646                        crate::EngineError::Validation { detail, .. } => detail.clone(),
647                        _ => unreachable!(),
648                    };
649                    // Block-on-mismatch (RFC-009 §7.5) — parity with
650                    // ff-scheduler's Scheduler::claim_for_worker. Without
651                    // this, the inline-direct-claim path would hot-loop
652                    // on an unclaimable top-of-zset (every tick picks the
653                    // same execution, wastes an FCALL, logs, releases,
654                    // repeats). The scheduler-side unblock scanner
655                    // promotes blocked_route executions back to eligible
656                    // when a worker with matching caps registers.
657                    tracing::info!(
658                        execution_id = %execution_id,
659                        worker_id = %self.config.worker_id,
660                        worker_caps_hash = %self.worker_capabilities_hash,
661                        missing = %missing,
662                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
663                    );
664                    // v0.12 PR-5: trait-routed block_route. Swallow
665                    // typed outcomes + transport faults (best-effort
666                    // semantic preserved from pre-PR-5 behaviour —
667                    // see `BlockRouteOutcome::LuaRejected` rustdoc).
668                    let block_args = ff_core::contracts::BlockRouteArgs::new(
669                        execution_id.clone(),
670                        lane_id.clone(),
671                        partition,
672                        "waiting_for_capable_worker".to_owned(),
673                        "no connected worker satisfies required_capabilities".to_owned(),
674                        now,
675                    );
676                    match self.backend.block_route(block_args).await {
677                        Ok(ff_core::contracts::BlockRouteOutcome::Blocked { .. }) => {}
678                        Ok(ff_core::contracts::BlockRouteOutcome::LuaRejected { message }) => {
679                            tracing::warn!(
680                                execution_id = %execution_id,
681                                error = %message,
682                                "SDK block_route: Lua rejected; eligible ZSET unchanged, next \
683                                 poll will re-evaluate"
684                            );
685                        }
686                        Ok(_) => {}
687                        Err(e) => {
688                            tracing::warn!(
689                                execution_id = %execution_id,
690                                error = %e,
691                                "SDK block_route: transport failure; eligible ZSET unchanged"
692                            );
693                        }
694                    }
695                    continue;
696                }
697                Err(ref e) if is_retryable_claim_error(e) => {
698                    tracing::debug!(
699                        execution_id = %execution_id,
700                        error = %e,
701                        "claim grant failed (retryable), trying next partition"
702                    );
703                    continue;
704                }
705                Err(e) => return Err(SdkError::from(e)),
706            }
707
708            // Step 2: Claim the execution
709            match self
710                .claim_execution(&execution_id, &lane_id, &partition, now)
711                .await
712            {
713                Ok(mut task) => {
714                    // Transfer concurrency permit to the task. When the task is
715                    // completed/failed/cancelled/dropped the permit returns to
716                    // the semaphore, allowing another claim.
717                    task.set_concurrency_permit(permit);
718                    return Ok(Some(task));
719                }
720                Err(SdkError::Engine(ref boxed))
721                    if matches!(
722                        **boxed,
723                        crate::EngineError::Contention(
724                            crate::ContentionKind::UseClaimResumedExecution
725                        )
726                    ) =>
727                {
728                    // Execution was resumed from suspension — attempt_interrupted.
729                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
730                    // which reuses the existing attempt instead of creating a new one.
731                    tracing::debug!(
732                        execution_id = %execution_id,
733                        "execution is resumed, using claim_resumed path"
734                    );
735                    match self
736                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
737                        .await
738                    {
739                        Ok(mut task) => {
740                            task.set_concurrency_permit(permit);
741                            return Ok(Some(task));
742                        }
743                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
744                            tracing::debug!(
745                                execution_id = %execution_id,
746                                error = %e2,
747                                "claim_resumed failed (retryable), trying next partition"
748                            );
749                            continue;
750                        }
751                        Err(e2) => return Err(e2),
752                    }
753                }
754                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
755                    tracing::debug!(
756                        execution_id = %execution_id,
757                        error = %e,
758                        "claim execution failed (retryable), trying next partition"
759                    );
760                    continue;
761                }
762                Err(e) => return Err(e),
763            }
764        }
765
766        // No eligible work found on any partition
767        Ok(None)
768    }
769
770    /// Low-level claim of a granted execution. Routes through
771    /// [`EngineBackend::claim_execution`](ff_core::engine_backend::EngineBackend::claim_execution)
772    /// — the trait-level grant-consumer method landed in v0.12 PR-4 —
773    /// and returns a `ClaimedTask` with auto lease renewal.
774    ///
775    /// Pre-PR-4 the SDK fired the `ff_claim_execution` FCALL directly
776    /// against `valkey_client()` and hand-parsed the Lua response shape.
777    /// The body now collapses to `backend.claim_execution(args)` +
778    /// `backend.read_execution_context(...)` + `ClaimedTask::new(...)`;
779    /// the Valkey-specific FCALL plumbing lives behind the trait in
780    /// `ff_backend_valkey::claim_execution_impl`.
781    ///
782    /// As of v0.12 PR-5.5 this helper + `claim_from_grant` are no
783    /// longer `valkey-default`-gated: the backend mints the `Handle`
784    /// at claim time and `ClaimedTask::new` caches it, so no
785    /// Valkey-specific handle synthesis is required on this path. The
786    /// `EngineBackend::claim_execution` default impl returns
787    /// `Err(Unavailable)` on PG/SQLite today (grant-consumer surface
788    /// is Valkey-only until the PG/SQLite grant-consumer RFC lands);
789    /// the compile surface is fully agnostic.
790    ///
791    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
792    async fn claim_execution(
793        &self,
794        execution_id: &ExecutionId,
795        lane_id: &LaneId,
796        partition: &ff_core::partition::Partition,
797        now: TimestampMs,
798    ) -> Result<ClaimedTask, SdkError> {
799        // v0.12 PR-5.5 retry-path fix — pre-read the **total attempt
800        // counter**, not the current-attempt pointer. The fresh-claim
801        // path mints a new attempt row whose index is the counter's
802        // current value (the backend's Lua 5920 / PG `ff_claim_execution`
803        // / SQLite `claim_impl` all consult this same counter to compute
804        // `next_att_idx`). Pre-PR-5.5 this call went inline as `HGET
805        // {exec}:core total_attempt_count` on the Valkey client; the
806        // first PR-5.5 landing mistakenly routed it to
807        // `read_current_attempt_index`, which returns the *pointer* at
808        // the previously-leased attempt — on a retry-of-a-retry that
809        // still named the terminal-failed prior attempt and the new
810        // claim collided with it. `read_total_attempt_count` wraps
811        // the same byte-for-byte HGET on Valkey and provides the JSONB
812        // / json_extract equivalents on PG / SQLite. See
813        // `EngineBackend::read_total_attempt_count` rustdoc for the
814        // pointer-vs-counter distinction.
815        let att_idx = self
816            .backend
817            .read_total_attempt_count(execution_id)
818            .await
819            .map_err(|e| SdkError::Engine(Box::new(e)))?;
820
821        let args = ff_core::contracts::ClaimExecutionArgs::new(
822            execution_id.clone(),
823            self.config.worker_id.clone(),
824            self.config.worker_instance_id.clone(),
825            lane_id.clone(),
826            LeaseId::new(),
827            self.config.lease_ttl_ms,
828            AttemptId::new(),
829            att_idx,
830            "{}".to_owned(),
831            None,
832            None,
833            now,
834        );
835
836        // `ClaimExecutionResult` is `#[non_exhaustive]` (v0.12 PR-4)
837        // so let-binding requires an explicit wildcard arm even though
838        // `Claimed` is the only variant today. A future additive variant
839        // surfaces here as a typed `ScriptError::Parse` — loud, routed
840        // through the existing SDK error path, and never silently
841        // dropped.
842        let claimed = match self.backend.claim_execution(args).await? {
843            ff_core::contracts::ClaimExecutionResult::Claimed(c) => c,
844            other => {
845                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
846                    fcall: "ff_claim_execution".into(),
847                    execution_id: Some(execution_id.to_string()),
848                    message: format!(
849                        "unexpected ClaimExecutionResult variant: {other:?}"
850                    ),
851                }));
852            }
853        };
854
855        // Read execution payload and metadata
856        let (input_payload, execution_kind, tags) = self
857            .read_execution_context(execution_id, partition)
858            .await?;
859
860        Ok(ClaimedTask::new(
861            self.backend.clone(),
862            self.partition_config,
863            claimed.handle,
864            execution_id.clone(),
865            claimed.attempt_index,
866            claimed.attempt_id,
867            claimed.lease_id,
868            claimed.lease_epoch,
869            self.config.lease_ttl_ms,
870            lane_id.clone(),
871            self.config.worker_instance_id.clone(),
872            input_payload,
873            execution_kind,
874            tags,
875        ))
876    }
877
878    /// Consume a [`ClaimGrant`] and claim the granted execution on
879    /// this worker. The intended production entry point: pair with
880    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
881    /// scheduler-issued grants into the SDK without enabling the
882    /// `direct-valkey-claim` feature (which bypasses budget/quota
883    /// admission control).
884    ///
885    /// The worker's concurrency semaphore is checked BEFORE the FCALL
886    /// so a saturated worker does not consume the grant: the grant
887    /// stays valid for its remaining TTL and the caller can either
888    /// release it back to the scheduler or retry after some other
889    /// in-flight task completes.
890    ///
891    /// On success the returned [`ClaimedTask`] holds a concurrency
892    /// permit that releases automatically on
893    /// `complete`/`fail`/`cancel`/drop — same contract as
894    /// `claim_next`.
895    ///
896    /// # Arguments
897    ///
898    /// * `lane` — the lane the grant was issued for. Must match what
899    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
900    ///   uses it to look up `lane_eligible`, `lane_active`, and the
901    ///   `worker_leases` index slot.
902    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
903    ///
904    /// # Errors
905    ///
906    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
907    ///   permits all held. Retryable; the grant is untouched.
908    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
909    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
910    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
911    ///   (wrapped in [`SdkError::Engine`]).
912    /// * `ScriptError::CapabilityMismatch` — execution's required
913    ///   capabilities not a subset of this worker's caps (wrapped in
914    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
915    ///   between grant issuance and caps change allows it.
916    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
917    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
918    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
919    ///   transport error during the FCALL or the
920    ///   `read_execution_context` follow-up.
921    ///
922    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
923    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
924    ///
925    /// # Backend coverage (v0.12 PR-5.5)
926    ///
927    /// Method ungated across backends. The Valkey backend handles the
928    /// grant fully. Postgres + SQLite backends return
929    /// [`EngineError::Unavailable`](ff_core::engine_error::EngineError::Unavailable)
930    /// from [`EngineBackend::claim_execution`] today — grants on those
931    /// backends flow through the scheduler-routed [`claim_via_server`]
932    /// path (the PG/SQLite scheduler lives outside the `EngineBackend`
933    /// trait in this release). See
934    /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation and
935    /// planned follow-up.
936    ///
937    /// [`claim_via_server`]: FlowFabricWorker::claim_via_server
938    /// [`EngineBackend::claim_execution`]: ff_core::engine_backend::EngineBackend::claim_execution
939    pub async fn claim_from_grant(
940        &self,
941        lane: LaneId,
942        grant: ff_core::contracts::ClaimGrant,
943    ) -> Result<ClaimedTask, SdkError> {
944        // Semaphore check FIRST. If the worker is saturated we must
945        // surface the condition to the caller without touching the
946        // grant — silently returning Ok(None) (as claim_next does)
947        // would drop a grant the scheduler has already committed work
948        // to issuing, wasting the slot until its TTL elapses.
949        let permit = self
950            .concurrency_semaphore
951            .clone()
952            .try_acquire_owned()
953            .map_err(|_| SdkError::WorkerAtCapacity)?;
954
955        let now = TimestampMs::now();
956        let partition = grant.partition().map_err(|e| SdkError::Config {
957            context: "claim_from_grant".to_owned(),
958            field: Some("partition_key".to_owned()),
959            message: e.to_string(),
960        })?;
961        let mut task = match self
962            .claim_execution(&grant.execution_id, &lane, &partition, now)
963            .await
964        {
965            Ok(task) => task,
966            Err(SdkError::Engine(ref boxed))
967                if matches!(
968                    **boxed,
969                    crate::EngineError::Contention(
970                        crate::ContentionKind::UseClaimResumedExecution
971                    )
972                ) =>
973            {
974                // Execution was resumed from suspension — attempt_interrupted.
975                // ff_claim_execution rejects this; use ff_claim_resumed_execution
976                // which reuses the existing attempt instead of creating a new one.
977                // Mirrors the fallback inside `claim_next` so HTTP-routed callers
978                // (`claim_via_server` → `claim_from_grant`) get the same transparent
979                // re-claim behavior.
980                tracing::debug!(
981                    execution_id = %grant.execution_id,
982                    "execution is resumed, using claim_resumed path"
983                );
984                self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
985                    .await?
986            }
987            Err(e) => return Err(e),
988        };
989        task.set_concurrency_permit(permit);
990        Ok(task)
991    }
992
993    /// Scheduler-routed claim: POST the server's
994    /// `/v1/workers/{id}/claim`, then chain to
995    /// [`Self::claim_from_grant`].
996    ///
997    /// Batch C item 2 PR-B. This is the production entry point —
998    /// budget + quota + capability admission run server-side inside
999    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1000    /// enable the `direct-valkey-claim` feature.
1001    ///
1002    /// Returns `Ok(None)` when the server says no eligible execution
1003    /// (HTTP 204). Callers typically back off by
1004    /// `config.claim_poll_interval_ms` and try again, same cadence
1005    /// as the direct-claim path's `Ok(None)`.
1006    ///
1007    /// The `admin` client is the established HTTP surface
1008    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1009    /// second reqwest client around. Build once at worker boot and
1010    /// hand in by reference on every claim.
1011    pub async fn claim_via_server(
1012        &self,
1013        admin: &crate::FlowFabricAdminClient,
1014        lane: &LaneId,
1015        grant_ttl_ms: u64,
1016    ) -> Result<Option<ClaimedTask>, SdkError> {
1017        let req = crate::admin::ClaimForWorkerRequest {
1018            worker_id: self.config.worker_id.to_string(),
1019            lane_id: lane.to_string(),
1020            worker_instance_id: self.config.worker_instance_id.to_string(),
1021            capabilities: self.config.capabilities.clone(),
1022            grant_ttl_ms,
1023        };
1024        let Some(resp) = admin.claim_for_worker(req).await? else {
1025            return Ok(None);
1026        };
1027        let grant = resp.into_grant()?;
1028        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1029    }
1030
1031    /// Consume a [`ResumeGrant`] and transition the granted
1032    /// `attempt_interrupted` execution into a `started` state on this
1033    /// worker. Symmetric partner to [`claim_from_grant`] for the
1034    /// resume path.
1035    ///
1036    /// **Renamed from `claim_from_reclaim_grant` (RFC-024 PR-B+C).**
1037    /// The new `claim_from_reclaim_grant` method lands with PR-G and
1038    /// dispatches to [`EngineBackend::reclaim_execution`] for the
1039    /// lease-reclaim path (distinct semantic, distinct grant type).
1040    ///
1041    /// The grant must have been issued to THIS worker (matching
1042    /// `worker_id` at grant time). A mismatch returns
1043    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1044    /// atomically by `ff_claim_resumed_execution`; a second call with
1045    /// the same grant also returns `InvalidClaimGrant`.
1046    ///
1047    /// # Concurrency
1048    ///
1049    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1050    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1051    /// assume pre-existing capacity on this worker — a reclaim can
1052    /// land on a fresh worker instance that just came up after a
1053    /// crash/restart and is picking up a previously-interrupted
1054    /// execution. If the worker is saturated, the grant stays valid
1055    /// for its remaining TTL and the caller can release it or retry.
1056    ///
1057    /// On success the returned [`ClaimedTask`] holds a concurrency
1058    /// permit that releases automatically on
1059    /// `complete`/`fail`/`cancel`/drop.
1060    ///
1061    /// # Errors
1062    ///
1063    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1064    ///   permits all held. Retryable; the grant is untouched (no
1065    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1066    ///   atomically consume the grant key).
1067    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1068    ///   or `worker_id` mismatch.
1069    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1070    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1071    ///   `attempt_interrupted`.
1072    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1073    ///   not `runnable`.
1074    /// * `ScriptError::ExecutionNotFound` — core key missing.
1075    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1076    ///   transport.
1077    ///
1078    /// [`ResumeGrant`]: ff_core::contracts::ResumeGrant
1079    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1080    pub async fn claim_from_resume_grant(
1081        &self,
1082        grant: ff_core::contracts::ResumeGrant,
1083    ) -> Result<ClaimedTask, SdkError> {
1084        // Semaphore check FIRST — same load-bearing ordering as
1085        // `claim_from_grant`. If the worker is saturated, surface
1086        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1087        // atomic consume on the grant key, so calling it past-
1088        // saturation would destroy the grant while leaving no
1089        // permit to attach to the returned `ClaimedTask`.
1090        let permit = self
1091            .concurrency_semaphore
1092            .clone()
1093            .try_acquire_owned()
1094            .map_err(|_| SdkError::WorkerAtCapacity)?;
1095
1096        // Grant carries partition + lane_id so no round-trip is needed
1097        // to resolve them before the FCALL.
1098        let partition = grant.partition().map_err(|e| SdkError::Config {
1099            context: "claim_from_resume_grant".to_owned(),
1100            field: Some("partition_key".to_owned()),
1101            message: e.to_string(),
1102        })?;
1103        let mut task = self
1104            .claim_resumed_execution(
1105                &grant.execution_id,
1106                &grant.lane_id,
1107                &partition,
1108            )
1109            .await?;
1110        task.set_concurrency_permit(permit);
1111        Ok(task)
1112    }
1113
1114    /// Consume a [`ReclaimGrant`] to mint a fresh attempt for a
1115    /// lease-expired / lease-revoked execution (RFC-024 §3.4).
1116    ///
1117    /// Backend-agnostic. Routes through
1118    /// [`EngineBackend::reclaim_execution`] on whatever backend the
1119    /// worker was connected with (Valkey via `connect` / `connect_with`,
1120    /// Postgres / SQLite via `connect_with`). Distinct from
1121    /// [`claim_from_resume_grant`]: reclaim creates a NEW attempt row
1122    /// and bumps `lease_reclaim_count` (`HandleKind::Reclaimed`), while
1123    /// resume re-uses the existing attempt under
1124    /// `ff_claim_resumed_execution` (`HandleKind::Resumed`).
1125    ///
1126    /// # Return shape
1127    ///
1128    /// Returns the raw [`ReclaimExecutionOutcome`] so consumers match
1129    /// on the four outcomes (`Claimed(Handle)`, `NotReclaimable`,
1130    /// `ReclaimCapExceeded`, `GrantNotFound`) and decide their
1131    /// dispatch. The `Claimed` variant carries a [`Handle`] whose
1132    /// `kind` is [`HandleKind::Reclaimed`]; downstream ops (complete,
1133    /// fail, renew, append_frame, …) take the handle directly via the
1134    /// `EngineBackend` trait.
1135    ///
1136    /// This contrasts with [`claim_from_resume_grant`], which wraps
1137    /// the handle in a [`ClaimedTask`] with a concurrency-permit and
1138    /// auto-lease-renewal loop. Those affordances are
1139    /// `valkey-default`-gated today (they depend on the bundled
1140    /// `ferriskey::Client` + `lease_ttl_ms` renewal timer). The
1141    /// reclaim surface is intentionally narrower so it compiles under
1142    /// `--no-default-features, features = ["sqlite"]` and consumers
1143    /// can drive the reclaim flow on any backend.
1144    ///
1145    /// # Feature compatibility
1146    ///
1147    /// No cfg-gate. Compiles + runs under every feature set ff-sdk
1148    /// supports (including sqlite-only). Verified by the compile-time
1149    /// type assertion
1150    /// `worker_claim_from_reclaim_grant_is_backend_agnostic_at_type_level`
1151    /// in `crates/ff-sdk/tests/rfc024_sdk.rs`, which pins the method's
1152    /// full signature under the default feature set and is paralleled
1153    /// by `sqlite_only_compile_surface_tests` in this file for the
1154    /// `--no-default-features, features = ["sqlite"]` compile anchor on
1155    /// the rest of the backend-agnostic surface.
1156    ///
1157    /// # worker_capabilities
1158    ///
1159    /// `ReclaimExecutionArgs::worker_capabilities` is NOT part of the
1160    /// Lua FCALL (the reclaim Lua validates grant consumption via
1161    /// `grant.worker_id == args.worker_id` only — see
1162    /// `crates/ff-script/src/flowfabric.lua:3088` and RFC-024 §4.4).
1163    /// Capability matching happens at grant-issuance time (see
1164    /// [`FlowFabricAdminClient::issue_reclaim_grant`]
1165    /// — in the `ff-sdk::admin` module, `valkey-default`-gated).
1166    ///
1167    /// # Errors
1168    ///
1169    /// * [`SdkError::Engine`] — the backend's `reclaim_execution`
1170    ///   returned an [`EngineError`] (transport fault, validation
1171    ///   failure, `Unavailable` from a backend that does not
1172    ///   implement RFC-024 — currently only pre-RFC out-of-tree
1173    ///   backends).
1174    ///
1175    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1176    /// [`ReclaimExecutionOutcome`]: ff_core::contracts::ReclaimExecutionOutcome
1177    /// [`Handle`]: ff_core::backend::Handle
1178    /// [`HandleKind::Reclaimed`]: ff_core::backend::HandleKind::Reclaimed
1179    /// [`EngineBackend::reclaim_execution`]: ff_core::engine_backend::EngineBackend::reclaim_execution
1180    /// [`EngineError`]: ff_core::engine_error::EngineError
1181    /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1182    /// [`FlowFabricAdminClient::issue_reclaim_grant`]: crate::admin::FlowFabricAdminClient::issue_reclaim_grant
1183    pub async fn claim_from_reclaim_grant(
1184        &self,
1185        grant: ff_core::contracts::ReclaimGrant,
1186        args: ff_core::contracts::ReclaimExecutionArgs,
1187    ) -> Result<ff_core::contracts::ReclaimExecutionOutcome, SdkError> {
1188        // `ReclaimGrant` is accepted as a parameter so the call-site
1189        // shape matches RFC-024 §3.4 (consumer receives a grant from
1190        // `issue_reclaim_grant` and feeds it + args into
1191        // `claim_from_reclaim_grant`). The grant metadata is
1192        // already embedded in the backend's server-side store
1193        // (Valkey `claim_grant` hash, PG/SQLite `ff_claim_grant`
1194        // table) keyed by (execution_id, grant_key) — the trait
1195        // method looks it up from `args.execution_id`.
1196        //
1197        // The overlap between `ReclaimGrant` and
1198        // `ReclaimExecutionArgs` is (execution_id, lane_id);
1199        // mismatched grant/args is a consumer-side bug (grant-for-A
1200        // + args-for-B), and silently forwarding lets the SDK act
1201        // on the args execution. Reject the mismatch up front so
1202        // the misuse surfaces at the SDK boundary instead of
1203        // succeeding against the wrong execution. The grant's
1204        // `expires_at_ms` is validated against wall-clock now so
1205        // an already-expired grant is rejected without a backend
1206        // round-trip (the backend also enforces expiry, but the
1207        // SDK-side check gives a crisper error and preserves the
1208        // reclaim-budget / lease slot).
1209        // `TimestampMs` is an `i64` (Unix-epoch ms); cast to `u64` for
1210        // comparison against `ReclaimGrant::expires_at_ms`. Clamp
1211        // negatives to 0 so a pre-epoch system clock (vanishingly
1212        // unlikely, but representable) doesn't wrap to a future u64.
1213        let now_ms: u64 = ff_core::types::TimestampMs::now().0.max(0) as u64;
1214        validate_reclaim_grant_against_args(&grant, &args, now_ms)?;
1215        self.backend
1216            .reclaim_execution(args)
1217            .await
1218            .map_err(|e| SdkError::Engine(Box::new(e)))
1219    }
1220
1221    /// Low-level resume claim. Forwards through
1222    /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1223    /// — the trait-level trigger surface landed in issue #150 — and
1224    /// returns a [`ClaimedTask`] bound to the resumed attempt.
1225    ///
1226    /// The method stays private; external callers use
1227    /// [`claim_from_resume_grant`].
1228    ///
1229    /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1230    async fn claim_resumed_execution(
1231        &self,
1232        execution_id: &ExecutionId,
1233        lane_id: &LaneId,
1234        partition: &ff_core::partition::Partition,
1235    ) -> Result<ClaimedTask, SdkError> {
1236        // v0.12 PR-3 — pre-read current_attempt_index via the trait
1237        // rather than an inline `HGET` on the Valkey client. Load-bearing:
1238        // the backend's KEYS[6] (Valkey) / `ff_attempt` PK tuple (PG/SQLite)
1239        // must target the real existing attempt hash/row, and the backend
1240        // takes the index verbatim from
1241        // `ClaimResumedExecutionArgs::current_attempt_index`.
1242        let att_idx = self
1243            .backend
1244            .read_current_attempt_index(execution_id)
1245            .await
1246            .map_err(|e| SdkError::Engine(Box::new(e)))?;
1247
1248        let args = ff_core::contracts::ClaimResumedExecutionArgs {
1249            execution_id: execution_id.clone(),
1250            worker_id: self.config.worker_id.clone(),
1251            worker_instance_id: self.config.worker_instance_id.clone(),
1252            lane_id: lane_id.clone(),
1253            lease_id: LeaseId::new(),
1254            lease_ttl_ms: self.config.lease_ttl_ms,
1255            current_attempt_index: att_idx,
1256            remaining_attempt_timeout_ms: None,
1257            now: TimestampMs::now(),
1258        };
1259
1260        let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1261            self.backend.claim_resumed_execution(args).await?;
1262
1263        let (input_payload, execution_kind, tags) = self
1264            .read_execution_context(execution_id, partition)
1265            .await?;
1266
1267        Ok(ClaimedTask::new(
1268            self.backend.clone(),
1269            self.partition_config,
1270            claimed.handle,
1271            execution_id.clone(),
1272            claimed.attempt_index,
1273            claimed.attempt_id,
1274            claimed.lease_id,
1275            claimed.lease_epoch,
1276            self.config.lease_ttl_ms,
1277            lane_id.clone(),
1278            self.config.worker_instance_id.clone(),
1279            input_payload,
1280            execution_kind,
1281            tags,
1282        ))
1283    }
1284
1285    /// Read payload + execution_kind + tags from exec_core.
1286    ///
1287    /// As of v0.12 PR-1 this forwards through
1288    /// [`EngineBackend::read_execution_context`](ff_core::engine_backend::EngineBackend::read_execution_context)
1289    /// rather than issuing direct GET/HGET/HGETALL against Valkey. The
1290    /// outer `valkey-default` gate + `(&ExecutionId, &Partition)`
1291    /// signature are preserved; hot-path decoupling (ungating this
1292    /// helper + its call sites in `claim_execution` and
1293    /// `claim_resumed_execution`) is PR-4/PR-5 scope per the v0.12
1294    /// agnostic-SDK plan.
1295    async fn read_execution_context(
1296        &self,
1297        execution_id: &ExecutionId,
1298        _partition: &ff_core::partition::Partition,
1299    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1300        let ctx = self.backend.read_execution_context(execution_id).await?;
1301        Ok((ctx.input_payload, ctx.execution_kind, ctx.tags))
1302    }
1303
1304    // ── Phase 3: Signal delivery ──
1305
1306    /// Deliver a signal to a suspended execution's waitpoint.
1307    ///
1308    /// The engine atomically records the signal, evaluates the resume condition,
1309    /// and optionally transitions the execution from `suspended` to `runnable`.
1310    ///
1311    /// Forwards through
1312    /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1313    /// — the trait-level trigger surface landed in issue #150.
1314    ///
1315    /// Backend-agnostic as of v0.12 PR-3. Compiles + runs under every
1316    /// feature set ff-sdk supports (including
1317    /// `--no-default-features --features sqlite`); pinned by
1318    /// `sqlite_only_compile_surface_tests::deliver_signal_addressable_under_sqlite_only`.
1319    pub async fn deliver_signal(
1320        &self,
1321        execution_id: &ExecutionId,
1322        waitpoint_id: &WaitpointId,
1323        signal: crate::task::Signal,
1324    ) -> Result<crate::task::SignalOutcome, SdkError> {
1325        let args = ff_core::contracts::DeliverSignalArgs {
1326            execution_id: execution_id.clone(),
1327            waitpoint_id: waitpoint_id.clone(),
1328            signal_id: ff_core::types::SignalId::new(),
1329            signal_name: signal.signal_name,
1330            signal_category: signal.signal_category,
1331            source_type: signal.source_type,
1332            source_identity: signal.source_identity,
1333            payload: signal.payload,
1334            payload_encoding: Some("json".to_owned()),
1335            correlation_id: None,
1336            idempotency_key: signal.idempotency_key,
1337            target_scope: "waitpoint".to_owned(),
1338            created_at: Some(TimestampMs::now()),
1339            dedup_ttl_ms: None,
1340            resume_delay_ms: None,
1341            max_signals_per_execution: None,
1342            signal_maxlen: None,
1343            waitpoint_token: signal.waitpoint_token,
1344            now: TimestampMs::now(),
1345        };
1346
1347        let result = self.backend.deliver_signal(args).await?;
1348        Ok(match result {
1349            ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1350                if effect == "resume_condition_satisfied" {
1351                    crate::task::SignalOutcome::TriggeredResume { signal_id }
1352                } else {
1353                    crate::task::SignalOutcome::Accepted { signal_id, effect }
1354                }
1355            }
1356            ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1357                crate::task::SignalOutcome::Duplicate {
1358                    existing_signal_id: existing_signal_id.to_string(),
1359                }
1360            }
1361        })
1362    }
1363
1364    #[cfg(feature = "direct-valkey-claim")]
1365    fn next_lane(&self) -> LaneId {
1366        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1367        self.config.lanes[idx].clone()
1368    }
1369}
1370
1371#[cfg(feature = "direct-valkey-claim")]
1372fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1373    use ff_core::error::ErrorClass;
1374    matches!(
1375        ff_script::engine_error_ext::class(err),
1376        ErrorClass::Retryable | ErrorClass::Informational
1377    )
1378}
1379
1380/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1381/// instance id with FNV-1a to place distinct worker processes on different
1382/// partition windows from their first poll. Zero is valid for single-worker
1383/// clusters but spreads work in multi-worker deployments.
1384#[cfg(feature = "direct-valkey-claim")]
1385fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1386    if num_partitions == 0 {
1387        return 0;
1388    }
1389    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1390}
1391
1392/// Cross-check the [`ReclaimGrant`] handed back by
1393/// `issue_reclaim_grant` against the [`ReclaimExecutionArgs`] the
1394/// consumer is about to dispatch. Catches the grant-for-A + args-for-B
1395/// misuse at the SDK boundary before a backend round-trip (PR #407
1396/// review F1).
1397///
1398/// The overlap between the two types is `(execution_id, lane_id)`;
1399/// `partition_key` / `grant_key` live only on the grant and are
1400/// verified server-side. The grant's `expires_at_ms` is also
1401/// validated against `now_ms` so an expired grant fails fast without
1402/// burning a backend call (the backend enforces expiry too, but the
1403/// SDK-side check gives a crisper, typed error).
1404///
1405/// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1406/// [`ReclaimExecutionArgs`]: ff_core::contracts::ReclaimExecutionArgs
1407fn validate_reclaim_grant_against_args(
1408    grant: &ff_core::contracts::ReclaimGrant,
1409    args: &ff_core::contracts::ReclaimExecutionArgs,
1410    now_ms: u64,
1411) -> Result<(), SdkError> {
1412    if grant.execution_id != args.execution_id {
1413        return Err(SdkError::Config {
1414            context: "claim_from_reclaim_grant".to_owned(),
1415            field: Some("execution_id".to_owned()),
1416            message: format!(
1417                "grant.execution_id ({}) does not match args.execution_id ({})",
1418                grant.execution_id, args.execution_id
1419            ),
1420        });
1421    }
1422    if grant.lane_id != args.lane_id {
1423        return Err(SdkError::Config {
1424            context: "claim_from_reclaim_grant".to_owned(),
1425            field: Some("lane_id".to_owned()),
1426            message: format!(
1427                "grant.lane_id ({}) does not match args.lane_id ({})",
1428                grant.lane_id.as_str(),
1429                args.lane_id.as_str()
1430            ),
1431        });
1432    }
1433    if grant.expires_at_ms <= now_ms {
1434        return Err(SdkError::Config {
1435            context: "claim_from_reclaim_grant".to_owned(),
1436            field: Some("expires_at_ms".to_owned()),
1437            message: format!(
1438                "grant expired: expires_at_ms={} now_ms={}",
1439                grant.expires_at_ms, now_ms
1440            ),
1441        });
1442    }
1443    Ok(())
1444}
1445
1446#[cfg(test)]
1447mod reclaim_grant_validation_tests {
1448    //! Unit tests for `validate_reclaim_grant_against_args` — the
1449    //! SDK-side cross-check that catches grant/args mismatch (PR #407
1450    //! review F1). No Valkey / backend required: the helper is pure.
1451    use super::validate_reclaim_grant_against_args;
1452    use crate::SdkError;
1453    use ff_core::contracts::{ReclaimExecutionArgs, ReclaimGrant};
1454    use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1455    use ff_core::types::{
1456        AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseId, WorkerId, WorkerInstanceId,
1457    };
1458
1459    const EXEC_A: &str = "{fp:7}:00000000-0000-4000-8000-000000000001";
1460    const EXEC_B: &str = "{fp:7}:00000000-0000-4000-8000-000000000002";
1461
1462    fn exec(s: &str) -> ExecutionId {
1463        ExecutionId::parse(s).expect("valid execution id")
1464    }
1465
1466    fn grant_for(execution_id: ExecutionId, lane: &str, expires_at_ms: u64) -> ReclaimGrant {
1467        ReclaimGrant::new(
1468            execution_id,
1469            PartitionKey::from(&Partition { family: PartitionFamily::Flow, index: 7 }),
1470            "reclaim:grant:abc".to_owned(),
1471            expires_at_ms,
1472            LaneId::new(lane),
1473        )
1474    }
1475
1476    fn args_for(execution_id: ExecutionId, lane: &str) -> ReclaimExecutionArgs {
1477        ReclaimExecutionArgs::new(
1478            execution_id,
1479            WorkerId::new("w1"),
1480            WorkerInstanceId::new("w1-i1"),
1481            LaneId::new(lane),
1482            None,
1483            LeaseId::new(),
1484            30_000,
1485            AttemptId::new(),
1486            "{}".to_owned(),
1487            None,
1488            WorkerInstanceId::new("w1-i0"),
1489            AttemptIndex::new(0),
1490        )
1491    }
1492
1493    #[test]
1494    fn accepts_matching_grant_and_args() {
1495        let g = grant_for(exec(EXEC_A), "main", 2_000);
1496        let a = args_for(exec(EXEC_A), "main");
1497        assert!(validate_reclaim_grant_against_args(&g, &a, 1_000).is_ok());
1498    }
1499
1500    #[test]
1501    fn rejects_mismatched_execution_id() {
1502        let g = grant_for(exec(EXEC_A), "main", 2_000);
1503        let a = args_for(exec(EXEC_B), "main");
1504        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1505            .expect_err("expected mismatched execution_id to fail");
1506        match err {
1507            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("execution_id")),
1508            other => panic!("expected Config error, got {other:?}"),
1509        }
1510    }
1511
1512    #[test]
1513    fn rejects_mismatched_lane_id() {
1514        let g = grant_for(exec(EXEC_A), "main", 2_000);
1515        let a = args_for(exec(EXEC_A), "other");
1516        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1517            .expect_err("expected mismatched lane_id to fail");
1518        match err {
1519            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("lane_id")),
1520            other => panic!("expected Config error, got {other:?}"),
1521        }
1522    }
1523
1524    #[test]
1525    fn rejects_expired_grant() {
1526        // expires_at_ms == now_ms is rejected (strict `<=`) so the
1527        // server's TTL enforcement window can't race us.
1528        let g = grant_for(exec(EXEC_A), "main", 1_000);
1529        let a = args_for(exec(EXEC_A), "main");
1530        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1531            .expect_err("expected expired grant to fail");
1532        match err {
1533            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1534            other => panic!("expected Config error, got {other:?}"),
1535        }
1536
1537        // Also rejected when already past expiry.
1538        let err2 = validate_reclaim_grant_against_args(&g, &a, 5_000)
1539            .expect_err("expected past-expiry grant to fail");
1540        match err2 {
1541            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1542            other => panic!("expected Config error, got {other:?}"),
1543        }
1544    }
1545}
1546
1547#[cfg(test)]
1548mod completion_accessor_type_tests {
1549    //! Type-level compile check that
1550    //! [`FlowFabricWorker::completion_backend`] returns an
1551    //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1552    //! the assertion is at the function-pointer type level and the
1553    //! #[test] body exists solely so the compiler elaborates it.
1554    use super::FlowFabricWorker;
1555    use ff_core::completion_backend::CompletionBackend;
1556    use std::sync::Arc;
1557
1558    #[test]
1559    fn completion_backend_accessor_signature() {
1560        // If this line compiles, the public accessor returns the
1561        // advertised type. The function is never called (no live
1562        // worker), so no I/O happens.
1563        let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1564            FlowFabricWorker::completion_backend;
1565    }
1566}
1567
1568/// RFC-023 Phase 1a §4.4 item 10g compile-only anchor. Parallels
1569/// `completion_accessor_type_tests` but fires under the sqlite-only
1570/// feature set: proves `FlowFabricWorker::connect_with` is callable
1571/// and returns the advertised type under `--no-default-features,
1572/// features = ["sqlite"]`. The matching `§9` CI cell (`cargo check
1573/// -p ff-sdk --no-default-features --features sqlite`) executes this
1574/// compile check mechanically so future PRs that accidentally reach
1575/// outside the `valkey-default` gate fail the build.
1576#[cfg(all(test, not(feature = "valkey-default")))]
1577mod sqlite_only_compile_surface_tests {
1578    use super::FlowFabricWorker;
1579    use ff_core::completion_backend::CompletionBackend;
1580    use ff_core::engine_backend::EngineBackend;
1581    use std::sync::Arc;
1582
1583    #[test]
1584    fn addressable_surface_under_sqlite_only() {
1585        // Type-level proof that the backend-agnostic accessors are
1586        // reachable without the `valkey-default` feature. None of
1587        // these are called; the assignment targets pin the signature.
1588        let _a: fn(
1589            &FlowFabricWorker,
1590        ) -> Option<&Arc<dyn EngineBackend>> = FlowFabricWorker::backend;
1591        let _b: fn(
1592            &FlowFabricWorker,
1593        ) -> Option<Arc<dyn CompletionBackend>> =
1594            FlowFabricWorker::completion_backend;
1595        let _c: fn(&FlowFabricWorker) -> &crate::config::WorkerConfig =
1596            FlowFabricWorker::config;
1597        let _d: fn(&FlowFabricWorker) -> &ff_core::partition::PartitionConfig =
1598            FlowFabricWorker::partition_config;
1599    }
1600
1601    /// v0.12 PR-1 compile anchor — the new
1602    /// [`EngineBackend::read_execution_context`] trait method MUST be
1603    /// addressable under `--no-default-features --features sqlite`. A
1604    /// direct fn-pointer cast is awkward under `#[async_trait]`
1605    /// (lifetime-generic fn items don't coerce to `fn` pointers), so
1606    /// we take the next-best compile proof: exercise a generic that
1607    /// names the method through the trait bound. A bodyless call would
1608    /// require a concrete backend; the generic keeps the test pure
1609    /// compile-time.
1610    #[test]
1611    fn read_execution_context_addressable_under_sqlite_only() {
1612        use ff_core::contracts::ExecutionContext;
1613        use ff_core::engine_error::EngineError;
1614        use ff_core::types::ExecutionId;
1615
1616        #[allow(dead_code)]
1617        async fn _pin<B: EngineBackend + ?Sized>(
1618            b: &B,
1619            id: &ExecutionId,
1620        ) -> Result<ExecutionContext, EngineError> {
1621            b.read_execution_context(id).await
1622        }
1623    }
1624
1625    /// v0.12 PR-2 compile anchor — `ClaimedTask` as a type MUST be
1626    /// addressable under `--no-default-features --features sqlite`
1627    /// (the `task` module is no longer `valkey-default`-gated at the
1628    /// module level). As of v0.12 PR-5.5 the `impl ClaimedTask { ... }`
1629    /// block is likewise ungated: the backend mints the `Handle` at
1630    /// claim time and `cloned_handle` just clones the cached field, so
1631    /// no Valkey-specific synthesis is required. This anchor pins the
1632    /// struct path + its field types through a generic-over-T wrapper
1633    /// so a compile-time lookup of `ClaimedTask` is exercised
1634    /// mechanically under the sqlite-only feature set.
1635    #[test]
1636    fn claimed_task_type_addressable_under_sqlite_only() {
1637        use crate::task::ClaimedTask;
1638
1639        #[allow(dead_code)]
1640        fn _pin<T>(_: std::marker::PhantomData<T>) -> std::marker::PhantomData<ClaimedTask> {
1641            std::marker::PhantomData
1642        }
1643    }
1644
1645    /// v0.12 PR-6 compile anchor — ungating `admin` + `snapshot`
1646    /// modules at module level must not re-introduce ferriskey
1647    /// symbols under `--no-default-features --features sqlite`.
1648    /// Pins that [`FlowFabricAdminClient::new`] and a representative
1649    /// snapshot forwarder are addressable without the
1650    /// `valkey-default` feature.
1651    #[test]
1652    fn admin_and_snapshot_addressable_under_sqlite_only() {
1653        use crate::admin::FlowFabricAdminClient;
1654        use crate::SdkError;
1655        use ff_core::contracts::ExecutionSnapshot;
1656        use ff_core::types::ExecutionId;
1657
1658        // `FlowFabricAdminClient::new` is `fn(impl Into<String>)` —
1659        // can't fn-pointer-coerce directly. A no-op call with a
1660        // concrete `&str` pins the method addressably under the
1661        // sqlite-only feature set (the error return-type is the
1662        // same `SdkError` the rest of the module returns).
1663        #[allow(dead_code)]
1664        fn _pin_admin_new() -> Result<FlowFabricAdminClient, SdkError> {
1665            FlowFabricAdminClient::new("http://anchor")
1666        }
1667
1668        // Snapshot method is `async`, so coerce via the same
1669        // trait-bound pattern as `read_execution_context` above.
1670        #[allow(dead_code)]
1671        async fn _pin_describe(
1672            w: &FlowFabricWorker,
1673            id: &ExecutionId,
1674        ) -> Result<Option<ExecutionSnapshot>, SdkError> {
1675            w.describe_execution(id).await
1676        }
1677    }
1678
1679    /// v0.12 PR-3 compile anchor — the new
1680    /// [`EngineBackend::read_current_attempt_index`] trait method MUST
1681    /// be addressable under `--no-default-features --features sqlite`.
1682    /// Mirrors the PR-1 `read_execution_context` anchor: takes a
1683    /// generic over the trait bound so `#[async_trait]` lifetime
1684    /// elision doesn't block an `fn`-pointer coercion.
1685    #[test]
1686    fn read_current_attempt_index_addressable_under_sqlite_only() {
1687        use ff_core::engine_error::EngineError;
1688        use ff_core::types::{AttemptIndex, ExecutionId};
1689
1690        #[allow(dead_code)]
1691        async fn _pin<B: EngineBackend + ?Sized>(
1692            b: &B,
1693            id: &ExecutionId,
1694        ) -> Result<AttemptIndex, EngineError> {
1695            b.read_current_attempt_index(id).await
1696        }
1697    }
1698
1699    /// v0.12 PR-5.5 retry-path-fix compile anchor — the new
1700    /// [`EngineBackend::read_total_attempt_count`] trait method MUST
1701    /// be addressable under `--no-default-features --features sqlite`.
1702    /// Mirrors the PR-3 `read_current_attempt_index` anchor.
1703    #[test]
1704    fn read_total_attempt_count_addressable_under_sqlite_only() {
1705        use ff_core::engine_error::EngineError;
1706        use ff_core::types::{AttemptIndex, ExecutionId};
1707
1708        #[allow(dead_code)]
1709        async fn _pin<B: EngineBackend + ?Sized>(
1710            b: &B,
1711            id: &ExecutionId,
1712        ) -> Result<AttemptIndex, EngineError> {
1713            b.read_total_attempt_count(id).await
1714        }
1715    }
1716
1717    /// v0.12 PR-4 compile anchor — the new
1718    /// [`EngineBackend::claim_execution`] trait method MUST be
1719    /// addressable under `--no-default-features --features sqlite`.
1720    /// Mirrors the PR-3 `read_current_attempt_index` anchor: takes a
1721    /// generic over the trait bound so `#[async_trait]` lifetime
1722    /// elision doesn't block a plain fn-pointer coercion.
1723    ///
1724    /// The method has an `Err(Unavailable)` default impl; PG + SQLite
1725    /// backends don't override it today (grants are Valkey-only until
1726    /// the PG/SQLite grant-consumer RFC lands). The anchor pins the
1727    /// trait-surface signature — not a runtime call — so the compile
1728    /// check passes cleanly on every feature set.
1729    #[test]
1730    fn claim_execution_addressable_under_sqlite_only() {
1731        use ff_core::contracts::{ClaimExecutionArgs, ClaimExecutionResult};
1732        use ff_core::engine_error::EngineError;
1733
1734        #[allow(dead_code)]
1735        async fn _pin<B: EngineBackend + ?Sized>(
1736            b: &B,
1737            args: ClaimExecutionArgs,
1738        ) -> Result<ClaimExecutionResult, EngineError> {
1739            b.claim_execution(args).await
1740        }
1741    }
1742
1743    /// v0.12 PR-5 compile anchor — the three new scanner-primitive
1744    /// trait methods (`scan_eligible_executions`, `issue_claim_grant`,
1745    /// `block_route`) MUST be addressable under
1746    /// `--no-default-features --features sqlite`. Each has an
1747    /// `Err(Unavailable)` default impl; PG + SQLite backends don't
1748    /// override (the scheduler-routed `claim_for_worker` path is the
1749    /// supported PG/SQLite entry point). The anchor pins the trait-
1750    /// surface signatures — not runtime calls — so the compile check
1751    /// passes cleanly on every feature set.
1752    #[test]
1753    fn scan_eligible_executions_addressable_under_sqlite_only() {
1754        use ff_core::contracts::ScanEligibleArgs;
1755        use ff_core::engine_error::EngineError;
1756        use ff_core::types::ExecutionId;
1757
1758        #[allow(dead_code)]
1759        async fn _pin<B: EngineBackend + ?Sized>(
1760            b: &B,
1761            args: ScanEligibleArgs,
1762        ) -> Result<Vec<ExecutionId>, EngineError> {
1763            b.scan_eligible_executions(args).await
1764        }
1765    }
1766
1767    #[test]
1768    fn issue_claim_grant_addressable_under_sqlite_only() {
1769        use ff_core::contracts::{IssueClaimGrantArgs, IssueClaimGrantOutcome};
1770        use ff_core::engine_error::EngineError;
1771
1772        #[allow(dead_code)]
1773        async fn _pin<B: EngineBackend + ?Sized>(
1774            b: &B,
1775            args: IssueClaimGrantArgs,
1776        ) -> Result<IssueClaimGrantOutcome, EngineError> {
1777            b.issue_claim_grant(args).await
1778        }
1779    }
1780
1781    #[test]
1782    fn block_route_addressable_under_sqlite_only() {
1783        use ff_core::contracts::{BlockRouteArgs, BlockRouteOutcome};
1784        use ff_core::engine_error::EngineError;
1785
1786        #[allow(dead_code)]
1787        async fn _pin<B: EngineBackend + ?Sized>(
1788            b: &B,
1789            args: BlockRouteArgs,
1790        ) -> Result<BlockRouteOutcome, EngineError> {
1791            b.block_route(args).await
1792        }
1793    }
1794
1795    /// v0.12 PR-3 compile anchor — `FlowFabricWorker::deliver_signal`
1796    /// MUST be addressable under `--no-default-features --features sqlite`
1797    /// (ungated in PR-3 — the body is pure
1798    /// `self.backend.deliver_signal(...)` trait dispatch).
1799    #[test]
1800    fn deliver_signal_addressable_under_sqlite_only() {
1801        use crate::task::{Signal, SignalOutcome};
1802        use crate::SdkError;
1803        use ff_core::types::{ExecutionId, WaitpointId};
1804        use std::future::Future;
1805        use std::pin::Pin;
1806
1807        // `deliver_signal` is `async fn`, so its item signature bakes
1808        // in a hidden lifetime and opaque return future. Take an
1809        // `fn`-pointer to an explicit wrapper that names the return
1810        // type through the trait — same shape as the PR-1 anchor
1811        // (`read_execution_context_addressable_under_sqlite_only`).
1812        #[allow(dead_code)]
1813        fn _pin<'a>(
1814            w: &'a FlowFabricWorker,
1815            id: &'a ExecutionId,
1816            wp: &'a WaitpointId,
1817            s: Signal,
1818        ) -> Pin<Box<dyn Future<Output = Result<SignalOutcome, SdkError>> + Send + 'a>> {
1819            Box::pin(w.deliver_signal(id, wp, s))
1820        }
1821    }
1822
1823    /// v0.12 PR-5.5 compile anchor — `claim_from_grant` MUST be
1824    /// addressable under `--no-default-features --features sqlite`
1825    /// (ungated in PR-5.5). PG / SQLite return `EngineError::Unavailable`
1826    /// from the underlying trait method today, so the call path is
1827    /// compile-reachable but runtime-unavailable. The anchor pins the
1828    /// signature; a future PR wiring real PG/SQLite grant-consumer
1829    /// bodies flips the runtime behaviour without touching this test.
1830    #[test]
1831    fn claim_from_grant_addressable_under_sqlite_only() {
1832        use crate::task::ClaimedTask;
1833        use crate::SdkError;
1834        use ff_core::contracts::ClaimGrant;
1835        use ff_core::types::LaneId;
1836        use std::future::Future;
1837        use std::pin::Pin;
1838
1839        #[allow(dead_code)]
1840        fn _pin<'a>(
1841            w: &'a FlowFabricWorker,
1842            lane: LaneId,
1843            grant: ClaimGrant,
1844        ) -> Pin<Box<dyn Future<Output = Result<ClaimedTask, SdkError>> + Send + 'a>> {
1845            Box::pin(w.claim_from_grant(lane, grant))
1846        }
1847    }
1848
1849    /// v0.12 PR-5.5 compile anchor — `claim_via_server` MUST be
1850    /// addressable under `--no-default-features --features sqlite`.
1851    /// The scheduler-routed path is the supported PG/SQLite claim
1852    /// entry point per `project_claim_from_grant_pg_sqlite_gap.md`;
1853    /// pinning the signature here prevents a future PR from
1854    /// accidentally re-gating it behind `valkey-default`.
1855    #[test]
1856    fn claim_via_server_addressable_under_sqlite_only() {
1857        use crate::admin::FlowFabricAdminClient;
1858        use crate::task::ClaimedTask;
1859        use crate::SdkError;
1860        use ff_core::types::LaneId;
1861        use std::future::Future;
1862        use std::pin::Pin;
1863
1864        #[allow(dead_code)]
1865        fn _pin<'a>(
1866            w: &'a FlowFabricWorker,
1867            admin: &'a FlowFabricAdminClient,
1868            lane: &'a LaneId,
1869            grant_ttl_ms: u64,
1870        ) -> Pin<Box<dyn Future<Output = Result<Option<ClaimedTask>, SdkError>> + Send + 'a>>
1871        {
1872            Box::pin(w.claim_via_server(admin, lane, grant_ttl_ms))
1873        }
1874    }
1875
1876    /// v0.12 PR-5.5 compile anchor — `ClaimedTask::{complete, fail,
1877    /// cancel}` MUST be addressable under `--no-default-features
1878    /// --features sqlite`. The `impl ClaimedTask` block is now
1879    /// module-level ungated (PR-5.5); the terminal ops route through
1880    /// `EngineBackend::{complete, fail, cancel}` which are core trait
1881    /// methods (no `streaming` / `suspension` / `budget` gate).
1882    #[test]
1883    fn claimed_task_terminal_ops_addressable_under_sqlite_only() {
1884        use crate::task::{ClaimedTask, FailOutcome};
1885        use crate::SdkError;
1886        use std::future::Future;
1887        use std::pin::Pin;
1888
1889        #[allow(dead_code)]
1890        fn _pin_complete(
1891            t: ClaimedTask,
1892            payload: Option<Vec<u8>>,
1893        ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send>> {
1894            Box::pin(t.complete(payload))
1895        }
1896
1897        #[allow(dead_code)]
1898        fn _pin_fail<'a>(
1899            t: ClaimedTask,
1900            reason: &'a str,
1901            error_category: &'a str,
1902        ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, SdkError>> + Send + 'a>> {
1903            Box::pin(t.fail(reason, error_category))
1904        }
1905
1906        #[allow(dead_code)]
1907        fn _pin_cancel<'a>(
1908            t: ClaimedTask,
1909            reason: &'a str,
1910        ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send + 'a>> {
1911            Box::pin(t.cancel(reason))
1912        }
1913    }
1914}
1915
1916#[cfg(all(test, feature = "direct-valkey-claim"))]
1917mod scan_cursor_tests {
1918    use super::scan_cursor_seed;
1919
1920    #[test]
1921    fn stable_for_same_input() {
1922        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1923    }
1924
1925    #[test]
1926    fn distinct_for_different_ids() {
1927        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1928    }
1929
1930    #[test]
1931    fn bounded_by_partition_count() {
1932        for i in 0..100 {
1933            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1934        }
1935    }
1936
1937    #[test]
1938    fn zero_partitions_returns_zero() {
1939        assert_eq!(scan_cursor_seed("w1", 0), 0);
1940    }
1941}