Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4
5// RFC-023 Phase 1a (§4.4 item 10b): ferriskey imports scoped behind
6// `valkey-default` so the module compiles under
7// `--no-default-features, features = ["sqlite"]`.
8#[cfg(feature = "valkey-default")]
9use ferriskey::Client;
10use ff_core::partition::PartitionConfig;
11use ff_core::types::*;
12use tokio::sync::Semaphore;
13
14use crate::config::WorkerConfig;
15use crate::task::ClaimedTask;
16use crate::SdkError;
17
18/// FlowFabric worker — connects to Valkey, claims executions, and provides
19/// the worker-facing API.
20///
21/// # Admission control
22///
23/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
24/// **bypasses the scheduler's admission controls**: it reads the eligible
25/// ZSET directly and mints its own claim grant without consulting budget
26/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
27/// benchmarks, tests, and single-tenant development where the scheduler
28/// hop is measurement noise, not for production.
29///
30/// For production deployments, consume scheduler-issued grants via
31/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
32/// budget breach, quota sliding-window, concurrency cap, and
33/// capability-match checks before issuing grants.
34///
35/// # Usage
36///
37/// ```rust,ignore
38/// use ff_core::backend::BackendConfig;
39/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
40/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
41///
42/// let config = WorkerConfig {
43///     backend: Some(BackendConfig::valkey("localhost", 6379)),
44///     worker_id: WorkerId::new("w1"),
45///     worker_instance_id: WorkerInstanceId::new("w1-i1"),
46///     namespace: Namespace::new("default"),
47///     lanes: vec![LaneId::new("main")],
48///     capabilities: Vec::new(),
49///     lease_ttl_ms: 30_000,
50///     claim_poll_interval_ms: 1_000,
51///     max_concurrent_tasks: 1,
52///     partition_config: None,
53/// };
54/// let worker = FlowFabricWorker::connect(config).await?;
55///
56/// loop {
57///     if let Some(task) = worker.claim_next().await? {
58///         // Process task...
59///         task.complete(Some(b"result".to_vec())).await?;
60///     } else {
61///         tokio::time::sleep(Duration::from_secs(1)).await;
62///     }
63/// }
64/// ```
65pub struct FlowFabricWorker {
66    /// RFC-023 Phase 1a (§4.4 item 10c): `ferriskey::Client` is
67    /// `valkey-default`-gated **and** `Option` wrapped. Under
68    /// sqlite-only features the field is absent. Under
69    /// `valkey-default` the field is `Some(client)` when the worker
70    /// was built via [`Self::connect`] (the Valkey-bundled entry
71    /// point that dials), and `None` when it was built via
72    /// [`Self::connect_with`] (the backend-agnostic entry point per
73    /// §4.4 item 10e — no ferriskey round-trip). Claim/signal hot
74    /// paths (all `valkey-default`-gated per §4.4 item 10f) expect
75    /// `Some`; a claim call against a `connect_with`-built worker
76    /// panics with a clear message until the backend-agnostic SDK
77    /// worker-loop RFC lands (tracked in §8).
78    #[cfg(feature = "valkey-default")]
79    #[allow(dead_code)]
80    client: Option<Client>,
81    config: WorkerConfig,
82    partition_config: PartitionConfig,
83    /// 8-hex FNV-1a digest of the sorted capabilities CSV. Used in
84    /// per-mismatch logs so the 4KB CSV never echoes on every reject
85    /// during an incident. For [`Self::connect`]-built workers, the
86    /// full CSV is additionally logged once at connect-time WARN via
87    /// `valkey_preamble::run` for cross-reference; [`Self::connect_with`]-
88    /// built workers compute the hash without the companion CSV log.
89    /// Mirrors `ff-scheduler::claim::worker_caps_digest`.
90    worker_capabilities_hash: String,
91    lane_index: AtomicUsize,
92    /// Concurrency cap for in-flight tasks. Permits are acquired or
93    /// transferred by [`claim_next`] (feature-gated),
94    /// [`claim_from_grant`] (always available), and
95    /// [`claim_from_reclaim_grant`], transferred to the returned
96    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
97    /// Holds `max_concurrent_tasks` permits total.
98    ///
99    /// [`claim_next`]: FlowFabricWorker::claim_next
100    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
101    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
102    #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
103    concurrency_semaphore: Arc<Semaphore>,
104    /// Rolling offset for chunked partition scans. Each poll advances the
105    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
106    /// chunk)` polls every partition is covered. The initial value is
107    /// derived from `worker_instance_id` so idle workers spread their
108    /// scans across different partitions from the first poll onward.
109    ///
110    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
111    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
112    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
113    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
114    /// correctness because the sequence simply restarts a new cycle.
115    scan_cursor: AtomicUsize,
116    /// The [`EngineBackend`] the Stage-1b trait forwarders route
117    /// through.
118    ///
119    /// **RFC-012 Stage 1b.** Always populated:
120    /// [`FlowFabricWorker::connect`] now wraps the worker's own
121    /// `ferriskey::Client` in a `ValkeyBackend` via
122    /// `ValkeyBackend::from_client_and_partitions`, and
123    /// [`FlowFabricWorker::connect_with`] replaces that default with
124    /// the caller-supplied `Arc<dyn EngineBackend>`. The
125    /// [`FlowFabricWorker::backend`] accessor still returns
126    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
127    /// narrows the return type once consumers have migrated.
128    ///
129    /// Hot paths (claim, deliver_signal, admin queries) still use the
130    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
131    /// migrates them through this field, and Stage 1d removes the
132    /// embedded client.
133    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
134    /// Optional handle to the same underlying backend viewed as a
135    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
136    /// Populated by [`Self::connect`] from the bundled
137    /// `ValkeyBackend` (which implements the trait); supplied by the
138    /// caller on [`Self::connect_with`] as an explicit
139    /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
140    /// backend does not support push-based completion" (e.g. a future
141    /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
142    /// and other completion-subscription consumers reach this through
143    /// [`Self::completion_backend`].
144    completion_backend_handle:
145        Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
146}
147
148/// Number of partitions scanned per scheduler-bypass claim poll
149/// (`claim_next_via_backend` and its `direct-valkey-claim`-gated
150/// back-compat alias `claim_next`). Keeps idle backend load at
151/// O(PARTITION_SCAN_CHUNK) per worker-second instead of
152/// O(num_flow_partitions).
153const PARTITION_SCAN_CHUNK: usize = 32;
154
155impl FlowFabricWorker {
156    /// RFC-023 Phase 1a helper: borrow the embedded `ferriskey::Client`,
157    /// panicking with a clear message if the worker was built via
158    /// [`Self::connect_with`] (which does not dial Valkey). Every
159    /// Valkey-specific claim/signal method funnels through this accessor
160    /// so the panic site is uniform and traceable.
161    ///
162    /// A backend-agnostic claim/signal API is deferred per §8 breaking-
163    /// change disclosure; until that lands, valkey-default consumers
164    /// must use [`Self::connect`] if they intend to drive the Valkey
165    /// claim/signal loop.
166    #[cfg(feature = "valkey-default")]
167    #[inline]
168    #[allow(dead_code)]
169    fn valkey_client(&self) -> &Client {
170        self.client.as_ref().expect(
171            "FlowFabricWorker was built via connect_with (no Valkey dial) \
172             but a Valkey-specific claim/signal method was invoked. \
173             Use FlowFabricWorker::connect to dial Valkey, or drive the \
174             backend directly through the trait surface via .backend().",
175        )
176    }
177}
178
179impl FlowFabricWorker {
180    /// Connect to Valkey and prepare the worker.
181    ///
182    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
183    /// library — that is the server's responsibility (ff-server calls
184    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
185    /// the library is already loaded.
186    ///
187    /// # Smoke / dev scripts: rotate `WorkerInstanceId`
188    ///
189    /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
190    /// `WorkerInstanceId`. When a smoke / dev script reuses the same
191    /// `WorkerInstanceId` across restarts, subsequent runs trap behind
192    /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
193    /// configured lease TTL) expires — the worker appears stuck and
194    /// claims nothing. Iterative scripts should synthesise a fresh
195    /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
196    /// or embed a UUID/timestamp) rather than hard-coding a stable
197    /// value. Production workers that cleanly shut down release the
198    /// key; only crashed / kill -9'd processes hit this trap.
199    ///
200    /// **RFC-023 Phase 1a (§4.4 item 10d, v0.12.0):** this Valkey-bundled
201    /// entry point is `#[cfg(feature = "valkey-default")]`-gated.
202    /// Consumers on the sqlite-only feature set must use
203    /// [`FlowFabricWorker::connect_with`] and drive the backend directly
204    /// through the trait surface.
205    #[cfg(feature = "valkey-default")]
206    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
207        if config.lanes.is_empty() {
208            return Err(SdkError::Config {
209                context: "worker_config".into(),
210                field: None,
211                message: "at least one lane is required".into(),
212            });
213        }
214
215        // Build the ferriskey client from the nested `BackendConfig`.
216        // Delegates to `ff_backend_valkey::build_client` so host/port +
217        // TLS + cluster + `BackendTimeouts::request` + `BackendRetry`
218        // wiring lives in exactly one place (RFC-012 Stage 1c tranche 1).
219        //
220        // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
221        // Finding 2): `WorkerConfig.backend` is `Option<BackendConfig>`
222        // because `connect_with` ignores it. The URL-based `connect`
223        // path still requires a concrete `BackendConfig` to dial;
224        // reject `None` up front with a typed Config error instead of
225        // silently using a default that would hide a caller mistake.
226        let backend_config = config.backend.as_ref().ok_or_else(|| SdkError::Config {
227            context: "worker_config".into(),
228            field: Some("backend".into()),
229            message: "FlowFabricWorker::connect requires WorkerConfig.backend = \
230                Some(BackendConfig::...); use FlowFabricWorker::connect_with for \
231                the backend-agnostic path that takes a pre-built \
232                Arc<dyn EngineBackend>".into(),
233        })?;
234        let client = ff_backend_valkey::build_client(backend_config).await?;
235
236        // v0.12 PR-6: the Valkey-specific preamble (PING, alive-key
237        // SET-NX, `ff:config:partitions` HGETALL, caps ingress +
238        // sorted-dedup CSV, caps HASH + workers-index writes) lives
239        // in `crate::valkey_preamble`. RFC-025 Phase 5 cutover: the
240        // caps write is now a namespaced HASH matching the
241        // `ff_register_worker` FCALL shape. The write order is
242        // observable from scheduler-side reads (unblock scanner:
243        // SMEMBERS ff:idx:{ns}:workers → HGET ff:worker:{ns}:{id}:caps
244        // caps_csv) so preservation is load-bearing. See
245        // `valkey_preamble::run`.
246        let crate::valkey_preamble::PreambleOutput {
247            partition_config,
248            capabilities_csv: _worker_capabilities_csv,
249            capabilities_hash: worker_capabilities_hash,
250        } = crate::valkey_preamble::run(&client, &config).await?;
251
252        let max_tasks = config.max_concurrent_tasks.max(1);
253        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
254
255        tracing::info!(
256            worker_id = %config.worker_id,
257            instance_id = %config.worker_instance_id,
258            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
259            "FlowFabricWorker connected"
260        );
261
262        let scan_cursor_init = scan_cursor_seed(
263            config.worker_instance_id.as_str(),
264            partition_config.num_flow_partitions.max(1) as usize,
265        );
266
267        // RFC-012 Stage 1b: wrap the dialed client in a ValkeyBackend
268        // so `ClaimedTask`'s trait forwarders have something to call.
269        // `from_client_and_partitions` reuses the already-dialed client
270        // — no second connection. Share the concrete
271        // `Arc<ValkeyBackend>` across the two trait objects — one
272        // allocation, both accessors yield identity-equivalent handles.
273        let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
274            ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
275                client.clone(),
276                partition_config,
277            );
278        let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
279        let completion_backend_handle: Option<
280            Arc<dyn ff_core::completion_backend::CompletionBackend>,
281        > = Some(valkey_backend);
282
283        Ok(Self {
284            client: Some(client),
285            config,
286            partition_config,
287            worker_capabilities_hash,
288            lane_index: AtomicUsize::new(0),
289            concurrency_semaphore,
290            scan_cursor: AtomicUsize::new(scan_cursor_init),
291            backend,
292            completion_backend_handle,
293        })
294    }
295
296    /// Store pre-built [`EngineBackend`] and (optional)
297    /// [`CompletionBackend`] handles on the worker. Builds the worker
298    /// via the legacy [`FlowFabricWorker::connect`] path first (so the
299    /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
300    /// paths still use is dialed), then replaces the default
301    /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
302    ///
303    /// The `completion` argument is explicit: 0.3.3 previously accepted
304    /// only `backend` and `completion_backend()` silently returned
305    /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
306    /// upcast to `Arc<dyn CompletionBackend>` without loss of
307    /// trait-object identity. 0.3.4 lets the caller decide.
308    ///
309    /// - `Some(arc)` — caller supplies a completion backend.
310    ///   [`Self::completion_backend`] returns `Some(clone)`.
311    /// - `None` — this backend does not support push-based completion
312    ///   (future Postgres backend without LISTEN/NOTIFY, test mocks).
313    ///   [`Self::completion_backend`] returns `None`.
314    ///
315    /// When the underlying backend implements both traits (as
316    /// `ValkeyBackend` does), pass the same `Arc` twice — the two
317    /// trait-object views share one allocation:
318    ///
319    /// ```rust,ignore
320    /// use std::sync::Arc;
321    /// use ff_backend_valkey::ValkeyBackend;
322    /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
323    ///
324    /// # async fn doc(worker_config: WorkerConfig,
325    /// #              backend_config: ff_backend_valkey::BackendConfig)
326    /// #     -> Result<(), ff_sdk::SdkError> {
327    /// // Valkey (completion supported):
328    /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
329    /// let worker = FlowFabricWorker::connect_with(
330    ///     worker_config,
331    ///     valkey.clone(),
332    ///     Some(valkey),
333    /// ).await?;
334    /// # Ok(()) }
335    /// ```
336    ///
337    /// Backend without completion support:
338    ///
339    /// ```rust,ignore
340    /// let worker = FlowFabricWorker::connect_with(
341    ///     worker_config,
342    ///     backend,
343    ///     None,
344    /// ).await?;
345    /// ```
346    ///
347    /// **Stage 1b + Round-7 scope — what the injected backend covers
348    /// today.** The injected backend currently covers these per-task
349    /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
350    /// `delay_execution` / `move_to_waiting_children` / `complete` /
351    /// `cancel` / `fail` / `create_pending_waitpoint` /
352    /// `append_frame` / `report_usage`. A mock backend therefore sees
353    /// that portion of the worker's per-task write surface. Lease
354    /// renewal also routes through `backend.renew(&handle)`. Round-7
355    /// (#135/#145) closed the four trait-shape gaps tracked by #117,
356    /// but `suspend` still reaches the embedded `ferriskey::Client`
357    /// directly via `ff_suspend_execution` — this is the deferred
358    /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
359    /// work. `claim_next` / `claim_from_grant` /
360    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
361    /// are Stage 1c hot-path work. Stage 1d removes the embedded
362    /// client entirely.
363    ///
364    /// Today's constructor is therefore NOT yet a drop-in way to swap
365    /// in a non-Valkey backend — it requires a reachable Valkey node
366    /// for `suspend` plus the remaining hot-path ops. Tests that
367    /// exercise only the migrated per-task ops can run fully against
368    /// a mock backend.
369    ///
370    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
371    /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
372    pub async fn connect_with(
373        config: WorkerConfig,
374        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
375        completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
376    ) -> Result<Self, SdkError> {
377        // RFC-023 Phase 1a (§4.4 item 10e, v0.12.0): direct
378        // backend-agnostic construction. No `Self::connect` preamble,
379        // no ferriskey round-trips (no PING, no alive-key SET-NX, no
380        // `ff:config:partitions` HGETALL). Callers needing a
381        // non-default `PartitionConfig` under non-Valkey backends set
382        // [`WorkerConfig::partition_config`] (v0.12 PR-6 closed the
383        // original follow-up flagged here).
384        //
385        // Lane-empty validation (the one check `connect` did before
386        // any Valkey work) is hoisted here so every entry point
387        // refuses an empty lane list.
388        if config.lanes.is_empty() {
389            return Err(SdkError::Config {
390                context: "worker_config".into(),
391                field: None,
392                message: "at least one lane is required".into(),
393            });
394        }
395
396        // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
397        // Finding 2): `WorkerConfig.backend` is ignored on this path —
398        // the caller-supplied `Arc<dyn EngineBackend>` is authoritative.
399        // If the caller left behind a `Some(..)` from an earlier
400        // `connect(..)` template, warn so the ambiguity is visible.
401        if config.backend.is_some() {
402            tracing::warn!(
403                worker_id = %config.worker_id,
404                instance_id = %config.worker_instance_id,
405                "WorkerConfig.backend is Some(..) but FlowFabricWorker::connect_with \
406                 was invoked — BackendConfig is ignored, the injected \
407                 Arc<dyn EngineBackend> is authoritative. Set WorkerConfig.backend = \
408                 None on the connect_with path to silence this warning."
409            );
410        }
411
412        let max_tasks = config.max_concurrent_tasks.max(1);
413        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
414        // v0.12 PR-6: honor the optional `WorkerConfig::partition_config`
415        // override. `None` keeps the pre-PR-6 default shape (256 / 32 /
416        // 32); `Some(cfg)` lets non-Valkey deployments with a custom
417        // `num_flow_partitions` bind correctly instead of silently
418        // missing data via the wrong partition index.
419        let partition_config = config.partition_config.unwrap_or_default();
420
421        let scan_cursor_init = scan_cursor_seed(
422            config.worker_instance_id.as_str(),
423            partition_config.num_flow_partitions.max(1) as usize,
424        );
425
426        // Capability validation + CSV/hash compute mirrors `connect`'s
427        // preamble (valkey_preamble::run) so both entry points refuse
428        // the same malformed tokens and populate the same
429        // `worker_capabilities_hash` used by claim_next_via_backend's
430        // per-mismatch log.
431        for cap in &config.capabilities {
432            if cap.is_empty() {
433                return Err(SdkError::Config {
434                    context: "worker_config".into(),
435                    field: Some("capabilities".into()),
436                    message: "capability token must not be empty".into(),
437                });
438            }
439            if cap.contains(',') {
440                return Err(SdkError::Config {
441                    context: "worker_config".into(),
442                    field: Some("capabilities".into()),
443                    message: format!(
444                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
445                    ),
446                });
447            }
448            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
449                return Err(SdkError::Config {
450                    context: "worker_config".into(),
451                    field: Some("capabilities".into()),
452                    message: format!(
453                        "capability token must not contain whitespace or control \
454                         characters: {cap:?}"
455                    ),
456                });
457            }
458        }
459        let worker_capabilities_hash = {
460            let set: std::collections::BTreeSet<&str> = config
461                .capabilities
462                .iter()
463                .map(|s| s.as_str())
464                .filter(|s| !s.is_empty())
465                .collect();
466            let csv: String = set.into_iter().collect::<Vec<_>>().join(",");
467            ff_core::hash::fnv1a_xor8hex(&csv)
468        };
469
470        Ok(Self {
471            #[cfg(feature = "valkey-default")]
472            client: None,
473            config,
474            partition_config,
475            worker_capabilities_hash,
476            lane_index: AtomicUsize::new(0),
477            concurrency_semaphore,
478            scan_cursor: AtomicUsize::new(scan_cursor_init),
479            backend,
480            completion_backend_handle: completion,
481        })
482    }
483
484    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
485    /// ops through.
486    ///
487    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
488    /// the `Option` wrapper is retained for API stability with the
489    /// Stage-1a shape. Stage 1c narrows the return type to
490    /// `&Arc<dyn EngineBackend>`.
491    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
492        Some(&self.backend)
493    }
494
495    /// Crate-internal direct borrow of the backend. The public
496    /// [`Self::backend`] still returns `Option` for API stability
497    /// (Stage 1b holdover). Snapshot trait-forwarders in
498    /// [`crate::snapshot`] need an un-wrapped reference.
499    #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
500    pub(crate) fn backend_ref(
501        &self,
502    ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
503        &self.backend
504    }
505
506    /// Handle to the completion-event subscription backend, for
507    /// consumers that need to observe execution completions (DAG
508    /// reconcilers, tenant-isolated subscribers).
509    ///
510    /// Returns `Some` when the worker was built through
511    /// [`Self::connect`] on the default `valkey-default` feature
512    /// (the bundled `ValkeyBackend` implements
513    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
514    /// or via [`Self::connect_with`] with a `Some(..)` completion
515    /// handle. Returns `None` when the caller passed `None` to
516    /// [`Self::connect_with`] — i.e. the backend does not support
517    /// push-based completion streams (future Postgres without
518    /// LISTEN/NOTIFY, test mocks).
519    ///
520    /// The returned handle shares the same underlying allocation as
521    /// [`Self::backend`]; calls through it (e.g.
522    /// `subscribe_completions_filtered`) hit the same connection
523    /// the worker itself uses.
524    pub fn completion_backend(
525        &self,
526    ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
527        self.completion_backend_handle.clone()
528    }
529
530    /// Get the worker config.
531    pub fn config(&self) -> &WorkerConfig {
532        &self.config
533    }
534
535    /// Get the server-published partition config this worker bound to at
536    /// `connect()`. Exposed so consumers that mint custom
537    /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
538    /// produced outside this worker) stay aligned with the server's
539    /// `num_flow_partitions` — using `PartitionConfig::default()`
540    /// assumes 256 partitions and silently misses data on deployments
541    /// with any other value.
542    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
543        &self.partition_config
544    }
545
546    /// Back-compat alias for [`Self::claim_next_via_backend`]. Stays
547    /// behind the `direct-valkey-claim` feature to preserve the prior
548    /// opt-in ergonomics; new consumers should call
549    /// [`Self::claim_next_via_backend`] directly (no feature flag
550    /// required) — v0.14 un-gating of the scheduler-bypass scanner.
551    #[cfg(feature = "direct-valkey-claim")]
552    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
553        self.claim_next_via_backend().await
554    }
555
556    /// Attempt to claim the next eligible execution through the
557    /// [`EngineBackend`] trait — works on any backend with
558    /// `scan_eligible_executions` + `issue_claim_grant` + `claim_execution`
559    /// trait bodies (Valkey today; Postgres/SQLite when their
560    /// grant-consumer RFC lands).
561    ///
562    /// Simplified claim flow:
563    /// 1. Pick a lane (round-robin across configured lanes)
564    /// 2. Scan a [`PARTITION_SCAN_CHUNK`] window of partitions for
565    ///    eligible executions via `backend.scan_eligible_executions`
566    /// 3. Issue a claim grant via `backend.issue_claim_grant`
567    /// 4. Claim the execution via `backend.claim_execution`
568    /// 5. Return a [`ClaimedTask`] with auto lease renewal
569    ///
570    /// # Scheduler bypass — read before production use
571    ///
572    /// This path **bypasses the scheduler's budget / quota admission
573    /// checks**. Enable only when the scheduler hop is measurement
574    /// noise (benches, single-tenant dev, examples demonstrating the
575    /// claim primitive) or when the test harness wants a deterministic
576    /// worker-local path. For production, consume scheduler-issued
577    /// grants via [`Self::claim_from_grant`] — the scheduler enforces
578    /// budget breach, quota sliding-window, concurrency cap, and
579    /// capability-match checks before issuing grants.
580    ///
581    /// # `None` semantics
582    ///
583    /// `Ok(None)` means **no work was found in the partition window this
584    /// poll covered**, not "the cluster is idle". Each call scans a chunk
585    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
586    /// `scan_cursor`; the cursor advances by that chunk size on every
587    /// invocation, so a worker covers every partition exactly once every
588    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
589    ///
590    /// Callers should treat `None` as "poll again soon" (typically after
591    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
592    /// time". Backing off too aggressively on `None` can starve workers
593    /// when work lives on partitions outside the current window.
594    ///
595    /// Returns `Err` on backend errors or script failures.
596    ///
597    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
598    pub async fn claim_next_via_backend(&self) -> Result<Option<ClaimedTask>, SdkError> {
599        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
600        // try_acquire returns immediately — if no permits available, the worker
601        // is at capacity and should not claim more work.
602        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
603            Ok(p) => p,
604            Err(_) => return Ok(None), // At capacity — no claim attempted
605        };
606
607        let lane_id = self.next_lane();
608        let now = TimestampMs::now();
609
610        // Phase 1: We scan eligible executions directly by reading the eligible
611        // ZSET across execution partitions. In production the scheduler
612        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
613        // simplified inline claim.
614        //
615        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
616        // partitions starting at a rolling offset. This keeps idle Valkey
617        // load at O(chunk) per worker-second instead of O(num_partitions),
618        // and the worker-instance-seeded initial cursor spreads concurrent
619        // workers across different partition windows.
620        let num_partitions = self.partition_config.num_flow_partitions as usize;
621        if num_partitions == 0 {
622            return Ok(None);
623        }
624        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
625        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
626
627        // Hoist the sorted/deduped capability set out of the loop — the
628        // per-partition iteration reused a fresh BTreeSet on every
629        // step pre-fix, adding O(n log n) alloc/sort to the scanner
630        // hot path on every tick. Computed once per `claim_next` call.
631        let worker_capabilities: std::collections::BTreeSet<String> = self
632            .config
633            .capabilities
634            .iter()
635            .cloned()
636            .collect();
637
638        for step in 0..chunk {
639            let partition_idx = ((start + step) % num_partitions) as u16;
640            let partition = ff_core::partition::Partition {
641                family: ff_core::partition::PartitionFamily::Execution,
642                index: partition_idx,
643            };
644
645            // v0.12 PR-5: trait-routed scanner primitive. Replaces the
646            // pre-PR-5 `ZRANGEBYSCORE` inline; the Valkey backend fires
647            // the identical command byte-for-byte (see
648            // `ff_backend_valkey::scan_eligible_executions_impl`).
649            let scan_args = ff_core::contracts::ScanEligibleArgs::new(
650                lane_id.clone(),
651                partition,
652                1,
653            );
654            let candidates = self.backend.scan_eligible_executions(scan_args).await?;
655            let execution_id = match candidates.into_iter().next() {
656                Some(id) => id,
657                None => continue, // No eligible executions on this partition
658            };
659
660            // Step 1: Issue claim grant (v0.12 PR-5: trait-routed).
661            let grant_args = ff_core::contracts::IssueClaimGrantArgs::new(
662                execution_id.clone(),
663                lane_id.clone(),
664                self.config.worker_id.clone(),
665                self.config.worker_instance_id.clone(),
666                partition,
667                worker_capabilities.clone(),
668                5_000, // grant_ttl_ms
669                now,
670            );
671            let grant_result = self.backend.issue_claim_grant(grant_args).await;
672
673            match grant_result {
674                Ok(_) => {}
675                Err(ref boxed)
676                    if matches!(
677                        boxed,
678                        crate::EngineError::Validation {
679                            kind: crate::ValidationKind::CapabilityMismatch,
680                            ..
681                        }
682                    ) =>
683                {
684                    let missing = match boxed {
685                        crate::EngineError::Validation { detail, .. } => detail.clone(),
686                        _ => unreachable!(),
687                    };
688                    // Block-on-mismatch (RFC-009 §7.5) — parity with
689                    // ff-scheduler's Scheduler::claim_for_worker. Without
690                    // this, the inline-direct-claim path would hot-loop
691                    // on an unclaimable top-of-zset (every tick picks the
692                    // same execution, wastes an FCALL, logs, releases,
693                    // repeats). The scheduler-side unblock scanner
694                    // promotes blocked_route executions back to eligible
695                    // when a worker with matching caps registers.
696                    tracing::info!(
697                        execution_id = %execution_id,
698                        worker_id = %self.config.worker_id,
699                        worker_caps_hash = %self.worker_capabilities_hash,
700                        missing = %missing,
701                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
702                    );
703                    // v0.12 PR-5: trait-routed block_route. Swallow
704                    // typed outcomes + transport faults (best-effort
705                    // semantic preserved from pre-PR-5 behaviour —
706                    // see `BlockRouteOutcome::LuaRejected` rustdoc).
707                    let block_args = ff_core::contracts::BlockRouteArgs::new(
708                        execution_id.clone(),
709                        lane_id.clone(),
710                        partition,
711                        "waiting_for_capable_worker".to_owned(),
712                        "no connected worker satisfies required_capabilities".to_owned(),
713                        now,
714                    );
715                    match self.backend.block_route(block_args).await {
716                        Ok(ff_core::contracts::BlockRouteOutcome::Blocked { .. }) => {}
717                        Ok(ff_core::contracts::BlockRouteOutcome::LuaRejected { message }) => {
718                            tracing::warn!(
719                                execution_id = %execution_id,
720                                error = %message,
721                                "SDK block_route: Lua rejected; eligible ZSET unchanged, next \
722                                 poll will re-evaluate"
723                            );
724                        }
725                        Ok(_) => {}
726                        Err(e) => {
727                            tracing::warn!(
728                                execution_id = %execution_id,
729                                error = %e,
730                                "SDK block_route: transport failure; eligible ZSET unchanged"
731                            );
732                        }
733                    }
734                    continue;
735                }
736                Err(ref e) if is_retryable_claim_error(e) => {
737                    tracing::debug!(
738                        execution_id = %execution_id,
739                        error = %e,
740                        "claim grant failed (retryable), trying next partition"
741                    );
742                    continue;
743                }
744                Err(e) => return Err(SdkError::from(e)),
745            }
746
747            // Step 2: Claim the execution
748            match self
749                .claim_execution(&execution_id, &lane_id, &partition, now)
750                .await
751            {
752                Ok(mut task) => {
753                    // Transfer concurrency permit to the task. When the task is
754                    // completed/failed/cancelled/dropped the permit returns to
755                    // the semaphore, allowing another claim.
756                    task.set_concurrency_permit(permit);
757                    return Ok(Some(task));
758                }
759                Err(SdkError::Engine(ref boxed))
760                    if matches!(
761                        **boxed,
762                        crate::EngineError::Contention(
763                            crate::ContentionKind::UseClaimResumedExecution
764                        )
765                    ) =>
766                {
767                    // Execution was resumed from suspension — attempt_interrupted.
768                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
769                    // which reuses the existing attempt instead of creating a new one.
770                    tracing::debug!(
771                        execution_id = %execution_id,
772                        "execution is resumed, using claim_resumed path"
773                    );
774                    match self
775                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
776                        .await
777                    {
778                        Ok(mut task) => {
779                            task.set_concurrency_permit(permit);
780                            return Ok(Some(task));
781                        }
782                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
783                            tracing::debug!(
784                                execution_id = %execution_id,
785                                error = %e2,
786                                "claim_resumed failed (retryable), trying next partition"
787                            );
788                            continue;
789                        }
790                        Err(e2) => return Err(e2),
791                    }
792                }
793                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
794                    tracing::debug!(
795                        execution_id = %execution_id,
796                        error = %e,
797                        "claim execution failed (retryable), trying next partition"
798                    );
799                    continue;
800                }
801                Err(e) => return Err(e),
802            }
803        }
804
805        // No eligible work found on any partition
806        Ok(None)
807    }
808
809    /// Low-level claim of a granted execution. Routes through
810    /// [`EngineBackend::claim_execution`](ff_core::engine_backend::EngineBackend::claim_execution)
811    /// — the trait-level grant-consumer method landed in v0.12 PR-4 —
812    /// and returns a `ClaimedTask` with auto lease renewal.
813    ///
814    /// Pre-PR-4 the SDK fired the `ff_claim_execution` FCALL directly
815    /// against `valkey_client()` and hand-parsed the Lua response shape.
816    /// The body now collapses to `backend.claim_execution(args)` +
817    /// `backend.read_execution_context(...)` + `ClaimedTask::new(...)`;
818    /// the Valkey-specific FCALL plumbing lives behind the trait in
819    /// `ff_backend_valkey::claim_execution_impl`.
820    ///
821    /// As of v0.12 PR-5.5 this helper + `claim_from_grant` are no
822    /// longer `valkey-default`-gated: the backend mints the `Handle`
823    /// at claim time and `ClaimedTask::new` caches it, so no
824    /// Valkey-specific handle synthesis is required on this path. The
825    /// `EngineBackend::claim_execution` default impl returns
826    /// `Err(Unavailable)` on PG/SQLite today (grant-consumer surface
827    /// is Valkey-only until the PG/SQLite grant-consumer RFC lands);
828    /// the compile surface is fully agnostic.
829    ///
830    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
831    async fn claim_execution(
832        &self,
833        execution_id: &ExecutionId,
834        lane_id: &LaneId,
835        partition: &ff_core::partition::Partition,
836        now: TimestampMs,
837    ) -> Result<ClaimedTask, SdkError> {
838        // v0.12 PR-5.5 retry-path fix — pre-read the **total attempt
839        // counter**, not the current-attempt pointer. The fresh-claim
840        // path mints a new attempt row whose index is the counter's
841        // current value (the backend's Lua 5920 / PG `ff_claim_execution`
842        // / SQLite `claim_impl` all consult this same counter to compute
843        // `next_att_idx`). Pre-PR-5.5 this call went inline as `HGET
844        // {exec}:core total_attempt_count` on the Valkey client; the
845        // first PR-5.5 landing mistakenly routed it to
846        // `read_current_attempt_index`, which returns the *pointer* at
847        // the previously-leased attempt — on a retry-of-a-retry that
848        // still named the terminal-failed prior attempt and the new
849        // claim collided with it. `read_total_attempt_count` wraps
850        // the same byte-for-byte HGET on Valkey and provides the JSONB
851        // / json_extract equivalents on PG / SQLite. See
852        // `EngineBackend::read_total_attempt_count` rustdoc for the
853        // pointer-vs-counter distinction.
854        let att_idx = self
855            .backend
856            .read_total_attempt_count(execution_id)
857            .await
858            .map_err(|e| SdkError::Engine(Box::new(e)))?;
859
860        let args = ff_core::contracts::ClaimExecutionArgs::new(
861            execution_id.clone(),
862            self.config.worker_id.clone(),
863            self.config.worker_instance_id.clone(),
864            lane_id.clone(),
865            LeaseId::new(),
866            self.config.lease_ttl_ms,
867            AttemptId::new(),
868            att_idx,
869            "{}".to_owned(),
870            None,
871            None,
872            now,
873        );
874
875        // `ClaimExecutionResult` is `#[non_exhaustive]` (v0.12 PR-4)
876        // so let-binding requires an explicit wildcard arm even though
877        // `Claimed` is the only variant today. A future additive variant
878        // surfaces here as a typed `ScriptError::Parse` — loud, routed
879        // through the existing SDK error path, and never silently
880        // dropped.
881        let claimed = match self.backend.claim_execution(args).await? {
882            ff_core::contracts::ClaimExecutionResult::Claimed(c) => c,
883            other => {
884                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
885                    fcall: "ff_claim_execution".into(),
886                    execution_id: Some(execution_id.to_string()),
887                    message: format!(
888                        "unexpected ClaimExecutionResult variant: {other:?}"
889                    ),
890                }));
891            }
892        };
893
894        // Read execution payload and metadata
895        let (input_payload, execution_kind, tags) = self
896            .read_execution_context(execution_id, partition)
897            .await?;
898
899        Ok(ClaimedTask::new(
900            self.backend.clone(),
901            self.partition_config,
902            claimed.handle,
903            execution_id.clone(),
904            claimed.attempt_index,
905            claimed.attempt_id,
906            claimed.lease_id,
907            claimed.lease_epoch,
908            self.config.lease_ttl_ms,
909            lane_id.clone(),
910            self.config.worker_instance_id.clone(),
911            input_payload,
912            execution_kind,
913            tags,
914        ))
915    }
916
917    /// Consume a [`ClaimGrant`] and claim the granted execution on
918    /// this worker. The intended production entry point: pair with
919    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
920    /// scheduler-issued grants into the SDK without enabling the
921    /// `direct-valkey-claim` feature (which bypasses budget/quota
922    /// admission control).
923    ///
924    /// The worker's concurrency semaphore is checked BEFORE the FCALL
925    /// so a saturated worker does not consume the grant: the grant
926    /// stays valid for its remaining TTL and the caller can either
927    /// release it back to the scheduler or retry after some other
928    /// in-flight task completes.
929    ///
930    /// On success the returned [`ClaimedTask`] holds a concurrency
931    /// permit that releases automatically on
932    /// `complete`/`fail`/`cancel`/drop — same contract as
933    /// `claim_next`.
934    ///
935    /// # Arguments
936    ///
937    /// * `lane` — the lane the grant was issued for. Must match what
938    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
939    ///   uses it to look up `lane_eligible`, `lane_active`, and the
940    ///   `worker_leases` index slot.
941    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
942    ///
943    /// # Errors
944    ///
945    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
946    ///   permits all held. Retryable; the grant is untouched.
947    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
948    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
949    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
950    ///   (wrapped in [`SdkError::Engine`]).
951    /// * `ScriptError::CapabilityMismatch` — execution's required
952    ///   capabilities not a subset of this worker's caps (wrapped in
953    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
954    ///   between grant issuance and caps change allows it.
955    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
956    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
957    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
958    ///   transport error during the FCALL or the
959    ///   `read_execution_context` follow-up.
960    ///
961    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
962    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
963    ///
964    /// # Backend coverage (v0.12 PR-5.5)
965    ///
966    /// Method ungated across backends. The Valkey backend handles the
967    /// grant fully. Postgres + SQLite backends return
968    /// [`EngineError::Unavailable`](ff_core::engine_error::EngineError::Unavailable)
969    /// from [`EngineBackend::claim_execution`] today — grants on those
970    /// backends flow through the scheduler-routed [`claim_via_server`]
971    /// path (the PG/SQLite scheduler lives outside the `EngineBackend`
972    /// trait in this release). See
973    /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation and
974    /// planned follow-up.
975    ///
976    /// [`claim_via_server`]: FlowFabricWorker::claim_via_server
977    /// [`EngineBackend::claim_execution`]: ff_core::engine_backend::EngineBackend::claim_execution
978    pub async fn claim_from_grant(
979        &self,
980        lane: LaneId,
981        grant: ff_core::contracts::ClaimGrant,
982    ) -> Result<ClaimedTask, SdkError> {
983        // Semaphore check FIRST. If the worker is saturated we must
984        // surface the condition to the caller without touching the
985        // grant — silently returning Ok(None) (as claim_next does)
986        // would drop a grant the scheduler has already committed work
987        // to issuing, wasting the slot until its TTL elapses.
988        let permit = self
989            .concurrency_semaphore
990            .clone()
991            .try_acquire_owned()
992            .map_err(|_| SdkError::WorkerAtCapacity)?;
993
994        let now = TimestampMs::now();
995        let partition = grant.partition().map_err(|e| SdkError::Config {
996            context: "claim_from_grant".to_owned(),
997            field: Some("partition_key".to_owned()),
998            message: e.to_string(),
999        })?;
1000        let mut task = match self
1001            .claim_execution(&grant.execution_id, &lane, &partition, now)
1002            .await
1003        {
1004            Ok(task) => task,
1005            Err(SdkError::Engine(ref boxed))
1006                if matches!(
1007                    **boxed,
1008                    crate::EngineError::Contention(
1009                        crate::ContentionKind::UseClaimResumedExecution
1010                    )
1011                ) =>
1012            {
1013                // Execution was resumed from suspension — attempt_interrupted.
1014                // ff_claim_execution rejects this; use ff_claim_resumed_execution
1015                // which reuses the existing attempt instead of creating a new one.
1016                // Mirrors the fallback inside `claim_next` so HTTP-routed callers
1017                // (`claim_via_server` → `claim_from_grant`) get the same transparent
1018                // re-claim behavior.
1019                tracing::debug!(
1020                    execution_id = %grant.execution_id,
1021                    "execution is resumed, using claim_resumed path"
1022                );
1023                self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
1024                    .await?
1025            }
1026            Err(e) => return Err(e),
1027        };
1028        task.set_concurrency_permit(permit);
1029        Ok(task)
1030    }
1031
1032    /// Scheduler-routed claim: POST the server's
1033    /// `/v1/workers/{id}/claim`, then chain to
1034    /// [`Self::claim_from_grant`].
1035    ///
1036    /// Batch C item 2 PR-B. This is the production entry point —
1037    /// budget + quota + capability admission run server-side inside
1038    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1039    /// enable the `direct-valkey-claim` feature.
1040    ///
1041    /// Returns `Ok(None)` when the server says no eligible execution
1042    /// (HTTP 204). Callers typically back off by
1043    /// `config.claim_poll_interval_ms` and try again, same cadence
1044    /// as the direct-claim path's `Ok(None)`.
1045    ///
1046    /// The `admin` client is the established HTTP surface
1047    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1048    /// second reqwest client around. Build once at worker boot and
1049    /// hand in by reference on every claim.
1050    pub async fn claim_via_server(
1051        &self,
1052        admin: &crate::FlowFabricAdminClient,
1053        lane: &LaneId,
1054        grant_ttl_ms: u64,
1055    ) -> Result<Option<ClaimedTask>, SdkError> {
1056        let req = crate::admin::ClaimForWorkerRequest {
1057            worker_id: self.config.worker_id.to_string(),
1058            lane_id: lane.to_string(),
1059            worker_instance_id: self.config.worker_instance_id.to_string(),
1060            capabilities: self.config.capabilities.clone(),
1061            grant_ttl_ms,
1062        };
1063        let Some(resp) = admin.claim_for_worker(req).await? else {
1064            return Ok(None);
1065        };
1066        let grant = resp.into_grant()?;
1067        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1068    }
1069
1070    /// Consume a [`ResumeGrant`] and transition the granted
1071    /// `attempt_interrupted` execution into a `started` state on this
1072    /// worker. Symmetric partner to [`claim_from_grant`] for the
1073    /// resume path.
1074    ///
1075    /// **Renamed from `claim_from_reclaim_grant` (RFC-024 PR-B+C).**
1076    /// The new `claim_from_reclaim_grant` method lands with PR-G and
1077    /// dispatches to [`EngineBackend::reclaim_execution`] for the
1078    /// lease-reclaim path (distinct semantic, distinct grant type).
1079    ///
1080    /// The grant must have been issued to THIS worker (matching
1081    /// `worker_id` at grant time). A mismatch returns
1082    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1083    /// atomically by `ff_claim_resumed_execution`; a second call with
1084    /// the same grant also returns `InvalidClaimGrant`.
1085    ///
1086    /// # Concurrency
1087    ///
1088    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1089    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1090    /// assume pre-existing capacity on this worker — a reclaim can
1091    /// land on a fresh worker instance that just came up after a
1092    /// crash/restart and is picking up a previously-interrupted
1093    /// execution. If the worker is saturated, the grant stays valid
1094    /// for its remaining TTL and the caller can release it or retry.
1095    ///
1096    /// On success the returned [`ClaimedTask`] holds a concurrency
1097    /// permit that releases automatically on
1098    /// `complete`/`fail`/`cancel`/drop.
1099    ///
1100    /// # Errors
1101    ///
1102    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1103    ///   permits all held. Retryable; the grant is untouched (no
1104    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1105    ///   atomically consume the grant key).
1106    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1107    ///   or `worker_id` mismatch.
1108    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1109    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1110    ///   `attempt_interrupted`.
1111    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1112    ///   not `runnable`.
1113    /// * `ScriptError::ExecutionNotFound` — core key missing.
1114    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1115    ///   transport.
1116    ///
1117    /// [`ResumeGrant`]: ff_core::contracts::ResumeGrant
1118    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1119    pub async fn claim_from_resume_grant(
1120        &self,
1121        grant: ff_core::contracts::ResumeGrant,
1122    ) -> Result<ClaimedTask, SdkError> {
1123        // Semaphore check FIRST — same load-bearing ordering as
1124        // `claim_from_grant`. If the worker is saturated, surface
1125        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1126        // atomic consume on the grant key, so calling it past-
1127        // saturation would destroy the grant while leaving no
1128        // permit to attach to the returned `ClaimedTask`.
1129        let permit = self
1130            .concurrency_semaphore
1131            .clone()
1132            .try_acquire_owned()
1133            .map_err(|_| SdkError::WorkerAtCapacity)?;
1134
1135        // Grant carries partition + lane_id so no round-trip is needed
1136        // to resolve them before the FCALL.
1137        let partition = grant.partition().map_err(|e| SdkError::Config {
1138            context: "claim_from_resume_grant".to_owned(),
1139            field: Some("partition_key".to_owned()),
1140            message: e.to_string(),
1141        })?;
1142        let mut task = self
1143            .claim_resumed_execution(
1144                &grant.execution_id,
1145                &grant.lane_id,
1146                &partition,
1147            )
1148            .await?;
1149        task.set_concurrency_permit(permit);
1150        Ok(task)
1151    }
1152
1153    /// Consume a [`ReclaimGrant`] to mint a fresh attempt for a
1154    /// lease-expired / lease-revoked execution (RFC-024 §3.4).
1155    ///
1156    /// Backend-agnostic. Routes through
1157    /// [`EngineBackend::reclaim_execution`] on whatever backend the
1158    /// worker was connected with (Valkey via `connect` / `connect_with`,
1159    /// Postgres / SQLite via `connect_with`). Distinct from
1160    /// [`claim_from_resume_grant`]: reclaim creates a NEW attempt row
1161    /// and bumps `lease_reclaim_count` (`HandleKind::Reclaimed`), while
1162    /// resume re-uses the existing attempt under
1163    /// `ff_claim_resumed_execution` (`HandleKind::Resumed`).
1164    ///
1165    /// # Return shape
1166    ///
1167    /// Returns the raw [`ReclaimExecutionOutcome`] so consumers match
1168    /// on the four outcomes (`Claimed(Handle)`, `NotReclaimable`,
1169    /// `ReclaimCapExceeded`, `GrantNotFound`) and decide their
1170    /// dispatch. The `Claimed` variant carries a [`Handle`] whose
1171    /// `kind` is [`HandleKind::Reclaimed`]; downstream ops (complete,
1172    /// fail, renew, append_frame, …) take the handle directly via the
1173    /// `EngineBackend` trait.
1174    ///
1175    /// This contrasts with [`claim_from_resume_grant`], which wraps
1176    /// the handle in a [`ClaimedTask`] with a concurrency-permit and
1177    /// auto-lease-renewal loop. Those affordances are
1178    /// `valkey-default`-gated today (they depend on the bundled
1179    /// `ferriskey::Client` + `lease_ttl_ms` renewal timer). The
1180    /// reclaim surface is intentionally narrower so it compiles under
1181    /// `--no-default-features, features = ["sqlite"]` and consumers
1182    /// can drive the reclaim flow on any backend.
1183    ///
1184    /// # Feature compatibility
1185    ///
1186    /// No cfg-gate. Compiles + runs under every feature set ff-sdk
1187    /// supports (including sqlite-only). Verified by the compile-time
1188    /// type assertion
1189    /// `worker_claim_from_reclaim_grant_is_backend_agnostic_at_type_level`
1190    /// in `crates/ff-sdk/tests/rfc024_sdk.rs`, which pins the method's
1191    /// full signature under the default feature set and is paralleled
1192    /// by `sqlite_only_compile_surface_tests` in this file for the
1193    /// `--no-default-features, features = ["sqlite"]` compile anchor on
1194    /// the rest of the backend-agnostic surface.
1195    ///
1196    /// # worker_capabilities
1197    ///
1198    /// `ReclaimExecutionArgs::worker_capabilities` is NOT part of the
1199    /// Lua FCALL (the reclaim Lua validates grant consumption via
1200    /// `grant.worker_id == args.worker_id` only — see
1201    /// `crates/ff-script/src/flowfabric.lua:3088` and RFC-024 §4.4).
1202    /// Capability matching happens at grant-issuance time (see
1203    /// [`FlowFabricAdminClient::issue_reclaim_grant`]
1204    /// — in the `ff-sdk::admin` module, `valkey-default`-gated).
1205    ///
1206    /// # Errors
1207    ///
1208    /// * [`SdkError::Engine`] — the backend's `reclaim_execution`
1209    ///   returned an [`EngineError`] (transport fault, validation
1210    ///   failure, `Unavailable` from a backend that does not
1211    ///   implement RFC-024 — currently only pre-RFC out-of-tree
1212    ///   backends).
1213    ///
1214    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1215    /// [`ReclaimExecutionOutcome`]: ff_core::contracts::ReclaimExecutionOutcome
1216    /// [`Handle`]: ff_core::backend::Handle
1217    /// [`HandleKind::Reclaimed`]: ff_core::backend::HandleKind::Reclaimed
1218    /// [`EngineBackend::reclaim_execution`]: ff_core::engine_backend::EngineBackend::reclaim_execution
1219    /// [`EngineError`]: ff_core::engine_error::EngineError
1220    /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1221    /// [`FlowFabricAdminClient::issue_reclaim_grant`]: crate::admin::FlowFabricAdminClient::issue_reclaim_grant
1222    pub async fn claim_from_reclaim_grant(
1223        &self,
1224        grant: ff_core::contracts::ReclaimGrant,
1225        args: ff_core::contracts::ReclaimExecutionArgs,
1226    ) -> Result<ff_core::contracts::ReclaimExecutionOutcome, SdkError> {
1227        // `ReclaimGrant` is accepted as a parameter so the call-site
1228        // shape matches RFC-024 §3.4 (consumer receives a grant from
1229        // `issue_reclaim_grant` and feeds it + args into
1230        // `claim_from_reclaim_grant`). The grant metadata is
1231        // already embedded in the backend's server-side store
1232        // (Valkey `claim_grant` hash, PG/SQLite `ff_claim_grant`
1233        // table) keyed by (execution_id, grant_key) — the trait
1234        // method looks it up from `args.execution_id`.
1235        //
1236        // The overlap between `ReclaimGrant` and
1237        // `ReclaimExecutionArgs` is (execution_id, lane_id);
1238        // mismatched grant/args is a consumer-side bug (grant-for-A
1239        // + args-for-B), and silently forwarding lets the SDK act
1240        // on the args execution. Reject the mismatch up front so
1241        // the misuse surfaces at the SDK boundary instead of
1242        // succeeding against the wrong execution. The grant's
1243        // `expires_at_ms` is validated against wall-clock now so
1244        // an already-expired grant is rejected without a backend
1245        // round-trip (the backend also enforces expiry, but the
1246        // SDK-side check gives a crisper error and preserves the
1247        // reclaim-budget / lease slot).
1248        // `TimestampMs` is an `i64` (Unix-epoch ms); cast to `u64` for
1249        // comparison against `ReclaimGrant::expires_at_ms`. Clamp
1250        // negatives to 0 so a pre-epoch system clock (vanishingly
1251        // unlikely, but representable) doesn't wrap to a future u64.
1252        let now_ms: u64 = ff_core::types::TimestampMs::now().0.max(0) as u64;
1253        validate_reclaim_grant_against_args(&grant, &args, now_ms)?;
1254        self.backend
1255            .reclaim_execution(args)
1256            .await
1257            .map_err(|e| SdkError::Engine(Box::new(e)))
1258    }
1259
1260    /// Low-level resume claim. Forwards through
1261    /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1262    /// — the trait-level trigger surface landed in issue #150 — and
1263    /// returns a [`ClaimedTask`] bound to the resumed attempt.
1264    ///
1265    /// The method stays private; external callers use
1266    /// [`claim_from_resume_grant`].
1267    ///
1268    /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1269    async fn claim_resumed_execution(
1270        &self,
1271        execution_id: &ExecutionId,
1272        lane_id: &LaneId,
1273        partition: &ff_core::partition::Partition,
1274    ) -> Result<ClaimedTask, SdkError> {
1275        // v0.12 PR-3 — pre-read current_attempt_index via the trait
1276        // rather than an inline `HGET` on the Valkey client. Load-bearing:
1277        // the backend's KEYS[6] (Valkey) / `ff_attempt` PK tuple (PG/SQLite)
1278        // must target the real existing attempt hash/row, and the backend
1279        // takes the index verbatim from
1280        // `ClaimResumedExecutionArgs::current_attempt_index`.
1281        let att_idx = self
1282            .backend
1283            .read_current_attempt_index(execution_id)
1284            .await
1285            .map_err(|e| SdkError::Engine(Box::new(e)))?;
1286
1287        let args = ff_core::contracts::ClaimResumedExecutionArgs {
1288            execution_id: execution_id.clone(),
1289            worker_id: self.config.worker_id.clone(),
1290            worker_instance_id: self.config.worker_instance_id.clone(),
1291            lane_id: lane_id.clone(),
1292            lease_id: LeaseId::new(),
1293            lease_ttl_ms: self.config.lease_ttl_ms,
1294            current_attempt_index: att_idx,
1295            remaining_attempt_timeout_ms: None,
1296            now: TimestampMs::now(),
1297        };
1298
1299        let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1300            self.backend.claim_resumed_execution(args).await?;
1301
1302        let (input_payload, execution_kind, tags) = self
1303            .read_execution_context(execution_id, partition)
1304            .await?;
1305
1306        Ok(ClaimedTask::new(
1307            self.backend.clone(),
1308            self.partition_config,
1309            claimed.handle,
1310            execution_id.clone(),
1311            claimed.attempt_index,
1312            claimed.attempt_id,
1313            claimed.lease_id,
1314            claimed.lease_epoch,
1315            self.config.lease_ttl_ms,
1316            lane_id.clone(),
1317            self.config.worker_instance_id.clone(),
1318            input_payload,
1319            execution_kind,
1320            tags,
1321        ))
1322    }
1323
1324    /// Read payload + execution_kind + tags from exec_core.
1325    ///
1326    /// As of v0.12 PR-1 this forwards through
1327    /// [`EngineBackend::read_execution_context`](ff_core::engine_backend::EngineBackend::read_execution_context)
1328    /// rather than issuing direct GET/HGET/HGETALL against Valkey. The
1329    /// outer `valkey-default` gate + `(&ExecutionId, &Partition)`
1330    /// signature are preserved; hot-path decoupling (ungating this
1331    /// helper + its call sites in `claim_execution` and
1332    /// `claim_resumed_execution`) is PR-4/PR-5 scope per the v0.12
1333    /// agnostic-SDK plan.
1334    async fn read_execution_context(
1335        &self,
1336        execution_id: &ExecutionId,
1337        _partition: &ff_core::partition::Partition,
1338    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1339        let ctx = self.backend.read_execution_context(execution_id).await?;
1340        Ok((ctx.input_payload, ctx.execution_kind, ctx.tags))
1341    }
1342
1343    // ── Phase 3: Signal delivery ──
1344
1345    /// Deliver a signal to a suspended execution's waitpoint.
1346    ///
1347    /// The engine atomically records the signal, evaluates the resume condition,
1348    /// and optionally transitions the execution from `suspended` to `runnable`.
1349    ///
1350    /// Forwards through
1351    /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1352    /// — the trait-level trigger surface landed in issue #150.
1353    ///
1354    /// Backend-agnostic as of v0.12 PR-3. Compiles + runs under every
1355    /// feature set ff-sdk supports (including
1356    /// `--no-default-features --features sqlite`); pinned by
1357    /// `sqlite_only_compile_surface_tests::deliver_signal_addressable_under_sqlite_only`.
1358    pub async fn deliver_signal(
1359        &self,
1360        execution_id: &ExecutionId,
1361        waitpoint_id: &WaitpointId,
1362        signal: crate::task::Signal,
1363    ) -> Result<crate::task::SignalOutcome, SdkError> {
1364        let args = ff_core::contracts::DeliverSignalArgs {
1365            execution_id: execution_id.clone(),
1366            waitpoint_id: waitpoint_id.clone(),
1367            signal_id: ff_core::types::SignalId::new(),
1368            signal_name: signal.signal_name,
1369            signal_category: signal.signal_category,
1370            source_type: signal.source_type,
1371            source_identity: signal.source_identity,
1372            payload: signal.payload,
1373            payload_encoding: Some("json".to_owned()),
1374            correlation_id: None,
1375            idempotency_key: signal.idempotency_key,
1376            target_scope: "waitpoint".to_owned(),
1377            created_at: Some(TimestampMs::now()),
1378            dedup_ttl_ms: None,
1379            resume_delay_ms: None,
1380            max_signals_per_execution: None,
1381            signal_maxlen: None,
1382            waitpoint_token: signal.waitpoint_token,
1383            now: TimestampMs::now(),
1384        };
1385
1386        let result = self.backend.deliver_signal(args).await?;
1387        Ok(match result {
1388            ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1389                if effect == "resume_condition_satisfied" {
1390                    crate::task::SignalOutcome::TriggeredResume { signal_id }
1391                } else {
1392                    crate::task::SignalOutcome::Accepted { signal_id, effect }
1393                }
1394            }
1395            ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1396                crate::task::SignalOutcome::Duplicate {
1397                    existing_signal_id: existing_signal_id.to_string(),
1398                }
1399            }
1400        })
1401    }
1402
1403    fn next_lane(&self) -> LaneId {
1404        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1405        self.config.lanes[idx].clone()
1406    }
1407}
1408
1409fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1410    use ff_core::error::ErrorClass;
1411    matches!(
1412        ff_script::engine_error_ext::class(err),
1413        ErrorClass::Retryable | ErrorClass::Informational
1414    )
1415}
1416
1417/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1418/// instance id with FNV-1a to place distinct worker processes on different
1419/// partition windows from their first poll. Zero is valid for single-worker
1420/// clusters but spreads work in multi-worker deployments.
1421fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1422    if num_partitions == 0 {
1423        return 0;
1424    }
1425    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1426}
1427
1428/// Cross-check the [`ReclaimGrant`] handed back by
1429/// `issue_reclaim_grant` against the [`ReclaimExecutionArgs`] the
1430/// consumer is about to dispatch. Catches the grant-for-A + args-for-B
1431/// misuse at the SDK boundary before a backend round-trip (PR #407
1432/// review F1).
1433///
1434/// The overlap between the two types is `(execution_id, lane_id)`;
1435/// `partition_key` / `grant_key` live only on the grant and are
1436/// verified server-side. The grant's `expires_at_ms` is also
1437/// validated against `now_ms` so an expired grant fails fast without
1438/// burning a backend call (the backend enforces expiry too, but the
1439/// SDK-side check gives a crisper, typed error).
1440///
1441/// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1442/// [`ReclaimExecutionArgs`]: ff_core::contracts::ReclaimExecutionArgs
1443fn validate_reclaim_grant_against_args(
1444    grant: &ff_core::contracts::ReclaimGrant,
1445    args: &ff_core::contracts::ReclaimExecutionArgs,
1446    now_ms: u64,
1447) -> Result<(), SdkError> {
1448    if grant.execution_id != args.execution_id {
1449        return Err(SdkError::Config {
1450            context: "claim_from_reclaim_grant".to_owned(),
1451            field: Some("execution_id".to_owned()),
1452            message: format!(
1453                "grant.execution_id ({}) does not match args.execution_id ({})",
1454                grant.execution_id, args.execution_id
1455            ),
1456        });
1457    }
1458    if grant.lane_id != args.lane_id {
1459        return Err(SdkError::Config {
1460            context: "claim_from_reclaim_grant".to_owned(),
1461            field: Some("lane_id".to_owned()),
1462            message: format!(
1463                "grant.lane_id ({}) does not match args.lane_id ({})",
1464                grant.lane_id.as_str(),
1465                args.lane_id.as_str()
1466            ),
1467        });
1468    }
1469    if grant.expires_at_ms <= now_ms {
1470        return Err(SdkError::Config {
1471            context: "claim_from_reclaim_grant".to_owned(),
1472            field: Some("expires_at_ms".to_owned()),
1473            message: format!(
1474                "grant expired: expires_at_ms={} now_ms={}",
1475                grant.expires_at_ms, now_ms
1476            ),
1477        });
1478    }
1479    Ok(())
1480}
1481
1482#[cfg(test)]
1483mod reclaim_grant_validation_tests {
1484    //! Unit tests for `validate_reclaim_grant_against_args` — the
1485    //! SDK-side cross-check that catches grant/args mismatch (PR #407
1486    //! review F1). No Valkey / backend required: the helper is pure.
1487    use super::validate_reclaim_grant_against_args;
1488    use crate::SdkError;
1489    use ff_core::contracts::{ReclaimExecutionArgs, ReclaimGrant};
1490    use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1491    use ff_core::types::{
1492        AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseId, WorkerId, WorkerInstanceId,
1493    };
1494
1495    const EXEC_A: &str = "{fp:7}:00000000-0000-4000-8000-000000000001";
1496    const EXEC_B: &str = "{fp:7}:00000000-0000-4000-8000-000000000002";
1497
1498    fn exec(s: &str) -> ExecutionId {
1499        ExecutionId::parse(s).expect("valid execution id")
1500    }
1501
1502    fn grant_for(execution_id: ExecutionId, lane: &str, expires_at_ms: u64) -> ReclaimGrant {
1503        ReclaimGrant::new(
1504            execution_id,
1505            PartitionKey::from(&Partition { family: PartitionFamily::Flow, index: 7 }),
1506            "reclaim:grant:abc".to_owned(),
1507            expires_at_ms,
1508            LaneId::new(lane),
1509        )
1510    }
1511
1512    fn args_for(execution_id: ExecutionId, lane: &str) -> ReclaimExecutionArgs {
1513        ReclaimExecutionArgs::new(
1514            execution_id,
1515            WorkerId::new("w1"),
1516            WorkerInstanceId::new("w1-i1"),
1517            LaneId::new(lane),
1518            None,
1519            LeaseId::new(),
1520            30_000,
1521            AttemptId::new(),
1522            "{}".to_owned(),
1523            None,
1524            WorkerInstanceId::new("w1-i0"),
1525            AttemptIndex::new(0),
1526        )
1527    }
1528
1529    #[test]
1530    fn accepts_matching_grant_and_args() {
1531        let g = grant_for(exec(EXEC_A), "main", 2_000);
1532        let a = args_for(exec(EXEC_A), "main");
1533        assert!(validate_reclaim_grant_against_args(&g, &a, 1_000).is_ok());
1534    }
1535
1536    #[test]
1537    fn rejects_mismatched_execution_id() {
1538        let g = grant_for(exec(EXEC_A), "main", 2_000);
1539        let a = args_for(exec(EXEC_B), "main");
1540        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1541            .expect_err("expected mismatched execution_id to fail");
1542        match err {
1543            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("execution_id")),
1544            other => panic!("expected Config error, got {other:?}"),
1545        }
1546    }
1547
1548    #[test]
1549    fn rejects_mismatched_lane_id() {
1550        let g = grant_for(exec(EXEC_A), "main", 2_000);
1551        let a = args_for(exec(EXEC_A), "other");
1552        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1553            .expect_err("expected mismatched lane_id to fail");
1554        match err {
1555            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("lane_id")),
1556            other => panic!("expected Config error, got {other:?}"),
1557        }
1558    }
1559
1560    #[test]
1561    fn rejects_expired_grant() {
1562        // expires_at_ms == now_ms is rejected (strict `<=`) so the
1563        // server's TTL enforcement window can't race us.
1564        let g = grant_for(exec(EXEC_A), "main", 1_000);
1565        let a = args_for(exec(EXEC_A), "main");
1566        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1567            .expect_err("expected expired grant to fail");
1568        match err {
1569            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1570            other => panic!("expected Config error, got {other:?}"),
1571        }
1572
1573        // Also rejected when already past expiry.
1574        let err2 = validate_reclaim_grant_against_args(&g, &a, 5_000)
1575            .expect_err("expected past-expiry grant to fail");
1576        match err2 {
1577            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1578            other => panic!("expected Config error, got {other:?}"),
1579        }
1580    }
1581}
1582
1583#[cfg(test)]
1584mod completion_accessor_type_tests {
1585    //! Type-level compile check that
1586    //! [`FlowFabricWorker::completion_backend`] returns an
1587    //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1588    //! the assertion is at the function-pointer type level and the
1589    //! #[test] body exists solely so the compiler elaborates it.
1590    use super::FlowFabricWorker;
1591    use ff_core::completion_backend::CompletionBackend;
1592    use std::sync::Arc;
1593
1594    #[test]
1595    fn completion_backend_accessor_signature() {
1596        // If this line compiles, the public accessor returns the
1597        // advertised type. The function is never called (no live
1598        // worker), so no I/O happens.
1599        let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1600            FlowFabricWorker::completion_backend;
1601    }
1602}
1603
1604/// RFC-023 Phase 1a §4.4 item 10g compile-only anchor. Parallels
1605/// `completion_accessor_type_tests` but fires under the sqlite-only
1606/// feature set: proves `FlowFabricWorker::connect_with` is callable
1607/// and returns the advertised type under `--no-default-features,
1608/// features = ["sqlite"]`. The matching `§9` CI cell (`cargo check
1609/// -p ff-sdk --no-default-features --features sqlite`) executes this
1610/// compile check mechanically so future PRs that accidentally reach
1611/// outside the `valkey-default` gate fail the build.
1612#[cfg(all(test, not(feature = "valkey-default")))]
1613mod sqlite_only_compile_surface_tests {
1614    use super::FlowFabricWorker;
1615    use ff_core::completion_backend::CompletionBackend;
1616    use ff_core::engine_backend::EngineBackend;
1617    use std::sync::Arc;
1618
1619    #[test]
1620    fn addressable_surface_under_sqlite_only() {
1621        // Type-level proof that the backend-agnostic accessors are
1622        // reachable without the `valkey-default` feature. None of
1623        // these are called; the assignment targets pin the signature.
1624        let _a: fn(
1625            &FlowFabricWorker,
1626        ) -> Option<&Arc<dyn EngineBackend>> = FlowFabricWorker::backend;
1627        let _b: fn(
1628            &FlowFabricWorker,
1629        ) -> Option<Arc<dyn CompletionBackend>> =
1630            FlowFabricWorker::completion_backend;
1631        let _c: fn(&FlowFabricWorker) -> &crate::config::WorkerConfig =
1632            FlowFabricWorker::config;
1633        let _d: fn(&FlowFabricWorker) -> &ff_core::partition::PartitionConfig =
1634            FlowFabricWorker::partition_config;
1635    }
1636
1637    /// v0.12 PR-1 compile anchor — the new
1638    /// [`EngineBackend::read_execution_context`] trait method MUST be
1639    /// addressable under `--no-default-features --features sqlite`. A
1640    /// direct fn-pointer cast is awkward under `#[async_trait]`
1641    /// (lifetime-generic fn items don't coerce to `fn` pointers), so
1642    /// we take the next-best compile proof: exercise a generic that
1643    /// names the method through the trait bound. A bodyless call would
1644    /// require a concrete backend; the generic keeps the test pure
1645    /// compile-time.
1646    #[test]
1647    fn read_execution_context_addressable_under_sqlite_only() {
1648        use ff_core::contracts::ExecutionContext;
1649        use ff_core::engine_error::EngineError;
1650        use ff_core::types::ExecutionId;
1651
1652        #[allow(dead_code)]
1653        async fn _pin<B: EngineBackend + ?Sized>(
1654            b: &B,
1655            id: &ExecutionId,
1656        ) -> Result<ExecutionContext, EngineError> {
1657            b.read_execution_context(id).await
1658        }
1659    }
1660
1661    /// v0.12 PR-2 compile anchor — `ClaimedTask` as a type MUST be
1662    /// addressable under `--no-default-features --features sqlite`
1663    /// (the `task` module is no longer `valkey-default`-gated at the
1664    /// module level). As of v0.12 PR-5.5 the `impl ClaimedTask { ... }`
1665    /// block is likewise ungated: the backend mints the `Handle` at
1666    /// claim time and `cloned_handle` just clones the cached field, so
1667    /// no Valkey-specific synthesis is required. This anchor pins the
1668    /// struct path + its field types through a generic-over-T wrapper
1669    /// so a compile-time lookup of `ClaimedTask` is exercised
1670    /// mechanically under the sqlite-only feature set.
1671    #[test]
1672    fn claimed_task_type_addressable_under_sqlite_only() {
1673        use crate::task::ClaimedTask;
1674
1675        #[allow(dead_code)]
1676        fn _pin<T>(_: std::marker::PhantomData<T>) -> std::marker::PhantomData<ClaimedTask> {
1677            std::marker::PhantomData
1678        }
1679    }
1680
1681    /// v0.12 PR-6 compile anchor — ungating `admin` + `snapshot`
1682    /// modules at module level must not re-introduce ferriskey
1683    /// symbols under `--no-default-features --features sqlite`.
1684    /// Pins that [`FlowFabricAdminClient::new`] and a representative
1685    /// snapshot forwarder are addressable without the
1686    /// `valkey-default` feature.
1687    #[test]
1688    fn admin_and_snapshot_addressable_under_sqlite_only() {
1689        use crate::admin::FlowFabricAdminClient;
1690        use crate::SdkError;
1691        use ff_core::contracts::ExecutionSnapshot;
1692        use ff_core::types::ExecutionId;
1693
1694        // `FlowFabricAdminClient::new` is `fn(impl Into<String>)` —
1695        // can't fn-pointer-coerce directly. A no-op call with a
1696        // concrete `&str` pins the method addressably under the
1697        // sqlite-only feature set (the error return-type is the
1698        // same `SdkError` the rest of the module returns).
1699        #[allow(dead_code)]
1700        fn _pin_admin_new() -> Result<FlowFabricAdminClient, SdkError> {
1701            FlowFabricAdminClient::new("http://anchor")
1702        }
1703
1704        // Snapshot method is `async`, so coerce via the same
1705        // trait-bound pattern as `read_execution_context` above.
1706        #[allow(dead_code)]
1707        async fn _pin_describe(
1708            w: &FlowFabricWorker,
1709            id: &ExecutionId,
1710        ) -> Result<Option<ExecutionSnapshot>, SdkError> {
1711            w.describe_execution(id).await
1712        }
1713    }
1714
1715    /// v0.13 SC-10 compile anchor — the backend-agnostic admin facade
1716    /// (`FlowFabricAdminClient::connect_with` + embedded-transport
1717    /// methods) MUST be addressable under
1718    /// `--no-default-features --features sqlite`. Without this anchor,
1719    /// a regression that re-introduces a ferriskey symbol on the
1720    /// embedded admin path would only surface on downstream
1721    /// sqlite-only consumers.
1722    #[test]
1723    fn admin_facade_addressable_under_sqlite_only() {
1724        use crate::admin::{
1725            ClaimForWorkerRequest, ClaimForWorkerResponse, FlowFabricAdminClient,
1726            IssueReclaimGrantRequest, IssueReclaimGrantResponse, RotateWaitpointSecretRequest,
1727            RotateWaitpointSecretResponse,
1728        };
1729        use crate::SdkError;
1730        use std::sync::Arc;
1731
1732        // `connect_with` is infallible; fn-pointer coerce pins the
1733        // signature under the sqlite-only feature set.
1734        let _ctor: fn(Arc<dyn EngineBackend>) -> FlowFabricAdminClient =
1735            FlowFabricAdminClient::connect_with;
1736
1737        // Each async method is generic over `&self` lifetimes under
1738        // `#[async_trait]`-style expansion, so pin via a wrapper fn.
1739        #[allow(dead_code)]
1740        async fn _pin_claim(
1741            c: &FlowFabricAdminClient,
1742            req: ClaimForWorkerRequest,
1743        ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
1744            c.claim_for_worker(req).await
1745        }
1746        #[allow(dead_code)]
1747        async fn _pin_reclaim(
1748            c: &FlowFabricAdminClient,
1749            id: &str,
1750            req: IssueReclaimGrantRequest,
1751        ) -> Result<IssueReclaimGrantResponse, SdkError> {
1752            c.issue_reclaim_grant(id, req).await
1753        }
1754        #[allow(dead_code)]
1755        async fn _pin_rotate(
1756            c: &FlowFabricAdminClient,
1757            req: RotateWaitpointSecretRequest,
1758        ) -> Result<RotateWaitpointSecretResponse, SdkError> {
1759            c.rotate_waitpoint_secret(req).await
1760        }
1761    }
1762
1763    /// v0.12 PR-3 compile anchor — the new
1764    /// [`EngineBackend::read_current_attempt_index`] trait method MUST
1765    /// be addressable under `--no-default-features --features sqlite`.
1766    /// Mirrors the PR-1 `read_execution_context` anchor: takes a
1767    /// generic over the trait bound so `#[async_trait]` lifetime
1768    /// elision doesn't block an `fn`-pointer coercion.
1769    #[test]
1770    fn read_current_attempt_index_addressable_under_sqlite_only() {
1771        use ff_core::engine_error::EngineError;
1772        use ff_core::types::{AttemptIndex, ExecutionId};
1773
1774        #[allow(dead_code)]
1775        async fn _pin<B: EngineBackend + ?Sized>(
1776            b: &B,
1777            id: &ExecutionId,
1778        ) -> Result<AttemptIndex, EngineError> {
1779            b.read_current_attempt_index(id).await
1780        }
1781    }
1782
1783    /// v0.12 PR-5.5 retry-path-fix compile anchor — the new
1784    /// [`EngineBackend::read_total_attempt_count`] trait method MUST
1785    /// be addressable under `--no-default-features --features sqlite`.
1786    /// Mirrors the PR-3 `read_current_attempt_index` anchor.
1787    #[test]
1788    fn read_total_attempt_count_addressable_under_sqlite_only() {
1789        use ff_core::engine_error::EngineError;
1790        use ff_core::types::{AttemptIndex, ExecutionId};
1791
1792        #[allow(dead_code)]
1793        async fn _pin<B: EngineBackend + ?Sized>(
1794            b: &B,
1795            id: &ExecutionId,
1796        ) -> Result<AttemptIndex, EngineError> {
1797            b.read_total_attempt_count(id).await
1798        }
1799    }
1800
1801    /// v0.12 PR-4 compile anchor — the new
1802    /// [`EngineBackend::claim_execution`] trait method MUST be
1803    /// addressable under `--no-default-features --features sqlite`.
1804    /// Mirrors the PR-3 `read_current_attempt_index` anchor: takes a
1805    /// generic over the trait bound so `#[async_trait]` lifetime
1806    /// elision doesn't block a plain fn-pointer coercion.
1807    ///
1808    /// The method has an `Err(Unavailable)` default impl; PG + SQLite
1809    /// backends don't override it today (grants are Valkey-only until
1810    /// the PG/SQLite grant-consumer RFC lands). The anchor pins the
1811    /// trait-surface signature — not a runtime call — so the compile
1812    /// check passes cleanly on every feature set.
1813    #[test]
1814    fn claim_execution_addressable_under_sqlite_only() {
1815        use ff_core::contracts::{ClaimExecutionArgs, ClaimExecutionResult};
1816        use ff_core::engine_error::EngineError;
1817
1818        #[allow(dead_code)]
1819        async fn _pin<B: EngineBackend + ?Sized>(
1820            b: &B,
1821            args: ClaimExecutionArgs,
1822        ) -> Result<ClaimExecutionResult, EngineError> {
1823            b.claim_execution(args).await
1824        }
1825    }
1826
1827    /// v0.12 PR-5 compile anchor — the three new scanner-primitive
1828    /// trait methods (`scan_eligible_executions`, `issue_claim_grant`,
1829    /// `block_route`) MUST be addressable under
1830    /// `--no-default-features --features sqlite`. Each has an
1831    /// `Err(Unavailable)` default impl; PG + SQLite backends don't
1832    /// override (the scheduler-routed `claim_for_worker` path is the
1833    /// supported PG/SQLite entry point). The anchor pins the trait-
1834    /// surface signatures — not runtime calls — so the compile check
1835    /// passes cleanly on every feature set.
1836    #[test]
1837    fn scan_eligible_executions_addressable_under_sqlite_only() {
1838        use ff_core::contracts::ScanEligibleArgs;
1839        use ff_core::engine_error::EngineError;
1840        use ff_core::types::ExecutionId;
1841
1842        #[allow(dead_code)]
1843        async fn _pin<B: EngineBackend + ?Sized>(
1844            b: &B,
1845            args: ScanEligibleArgs,
1846        ) -> Result<Vec<ExecutionId>, EngineError> {
1847            b.scan_eligible_executions(args).await
1848        }
1849    }
1850
1851    #[test]
1852    fn issue_claim_grant_addressable_under_sqlite_only() {
1853        use ff_core::contracts::{IssueClaimGrantArgs, IssueClaimGrantOutcome};
1854        use ff_core::engine_error::EngineError;
1855
1856        #[allow(dead_code)]
1857        async fn _pin<B: EngineBackend + ?Sized>(
1858            b: &B,
1859            args: IssueClaimGrantArgs,
1860        ) -> Result<IssueClaimGrantOutcome, EngineError> {
1861            b.issue_claim_grant(args).await
1862        }
1863    }
1864
1865    #[test]
1866    fn block_route_addressable_under_sqlite_only() {
1867        use ff_core::contracts::{BlockRouteArgs, BlockRouteOutcome};
1868        use ff_core::engine_error::EngineError;
1869
1870        #[allow(dead_code)]
1871        async fn _pin<B: EngineBackend + ?Sized>(
1872            b: &B,
1873            args: BlockRouteArgs,
1874        ) -> Result<BlockRouteOutcome, EngineError> {
1875            b.block_route(args).await
1876        }
1877    }
1878
1879    /// v0.12 PR-3 compile anchor — `FlowFabricWorker::deliver_signal`
1880    /// MUST be addressable under `--no-default-features --features sqlite`
1881    /// (ungated in PR-3 — the body is pure
1882    /// `self.backend.deliver_signal(...)` trait dispatch).
1883    #[test]
1884    fn deliver_signal_addressable_under_sqlite_only() {
1885        use crate::task::{Signal, SignalOutcome};
1886        use crate::SdkError;
1887        use ff_core::types::{ExecutionId, WaitpointId};
1888        use std::future::Future;
1889        use std::pin::Pin;
1890
1891        // `deliver_signal` is `async fn`, so its item signature bakes
1892        // in a hidden lifetime and opaque return future. Take an
1893        // `fn`-pointer to an explicit wrapper that names the return
1894        // type through the trait — same shape as the PR-1 anchor
1895        // (`read_execution_context_addressable_under_sqlite_only`).
1896        #[allow(dead_code)]
1897        fn _pin<'a>(
1898            w: &'a FlowFabricWorker,
1899            id: &'a ExecutionId,
1900            wp: &'a WaitpointId,
1901            s: Signal,
1902        ) -> Pin<Box<dyn Future<Output = Result<SignalOutcome, SdkError>> + Send + 'a>> {
1903            Box::pin(w.deliver_signal(id, wp, s))
1904        }
1905    }
1906
1907    /// v0.12 PR-5.5 compile anchor — `claim_from_grant` MUST be
1908    /// addressable under `--no-default-features --features sqlite`
1909    /// (ungated in PR-5.5). PG / SQLite return `EngineError::Unavailable`
1910    /// from the underlying trait method today, so the call path is
1911    /// compile-reachable but runtime-unavailable. The anchor pins the
1912    /// signature; a future PR wiring real PG/SQLite grant-consumer
1913    /// bodies flips the runtime behaviour without touching this test.
1914    #[test]
1915    fn claim_from_grant_addressable_under_sqlite_only() {
1916        use crate::task::ClaimedTask;
1917        use crate::SdkError;
1918        use ff_core::contracts::ClaimGrant;
1919        use ff_core::types::LaneId;
1920        use std::future::Future;
1921        use std::pin::Pin;
1922
1923        #[allow(dead_code)]
1924        fn _pin<'a>(
1925            w: &'a FlowFabricWorker,
1926            lane: LaneId,
1927            grant: ClaimGrant,
1928        ) -> Pin<Box<dyn Future<Output = Result<ClaimedTask, SdkError>> + Send + 'a>> {
1929            Box::pin(w.claim_from_grant(lane, grant))
1930        }
1931    }
1932
1933    /// v0.12 PR-5.5 compile anchor — `claim_via_server` MUST be
1934    /// addressable under `--no-default-features --features sqlite`.
1935    /// The scheduler-routed path is the supported PG/SQLite claim
1936    /// entry point per `project_claim_from_grant_pg_sqlite_gap.md`;
1937    /// pinning the signature here prevents a future PR from
1938    /// accidentally re-gating it behind `valkey-default`.
1939    #[test]
1940    fn claim_via_server_addressable_under_sqlite_only() {
1941        use crate::admin::FlowFabricAdminClient;
1942        use crate::task::ClaimedTask;
1943        use crate::SdkError;
1944        use ff_core::types::LaneId;
1945        use std::future::Future;
1946        use std::pin::Pin;
1947
1948        #[allow(dead_code)]
1949        fn _pin<'a>(
1950            w: &'a FlowFabricWorker,
1951            admin: &'a FlowFabricAdminClient,
1952            lane: &'a LaneId,
1953            grant_ttl_ms: u64,
1954        ) -> Pin<Box<dyn Future<Output = Result<Option<ClaimedTask>, SdkError>> + Send + 'a>>
1955        {
1956            Box::pin(w.claim_via_server(admin, lane, grant_ttl_ms))
1957        }
1958    }
1959
1960    /// v0.12 PR-5.5 compile anchor — `ClaimedTask::{complete, fail,
1961    /// cancel}` MUST be addressable under `--no-default-features
1962    /// --features sqlite`. The `impl ClaimedTask` block is now
1963    /// module-level ungated (PR-5.5); the terminal ops route through
1964    /// `EngineBackend::{complete, fail, cancel}` which are core trait
1965    /// methods (no `streaming` / `suspension` / `budget` gate).
1966    #[test]
1967    fn claimed_task_terminal_ops_addressable_under_sqlite_only() {
1968        use crate::task::{ClaimedTask, FailOutcome};
1969        use crate::SdkError;
1970        use std::future::Future;
1971        use std::pin::Pin;
1972
1973        #[allow(dead_code)]
1974        fn _pin_complete(
1975            t: ClaimedTask,
1976            payload: Option<Vec<u8>>,
1977        ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send>> {
1978            Box::pin(t.complete(payload))
1979        }
1980
1981        #[allow(dead_code)]
1982        fn _pin_fail<'a>(
1983            t: ClaimedTask,
1984            reason: &'a str,
1985            error_category: &'a str,
1986        ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, SdkError>> + Send + 'a>> {
1987            Box::pin(t.fail(reason, error_category))
1988        }
1989
1990        #[allow(dead_code)]
1991        fn _pin_cancel<'a>(
1992            t: ClaimedTask,
1993            reason: &'a str,
1994        ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send + 'a>> {
1995            Box::pin(t.cancel(reason))
1996        }
1997    }
1998}
1999
2000#[cfg(all(test, feature = "direct-valkey-claim"))]
2001mod scan_cursor_tests {
2002    use super::scan_cursor_seed;
2003
2004    #[test]
2005    fn stable_for_same_input() {
2006        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
2007    }
2008
2009    #[test]
2010    fn distinct_for_different_ids() {
2011        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
2012    }
2013
2014    #[test]
2015    fn bounded_by_partition_count() {
2016        for i in 0..100 {
2017            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
2018        }
2019    }
2020
2021    #[test]
2022    fn zero_partitions_returns_zero() {
2023        assert_eq!(scan_cursor_seed("w1", 0), 0);
2024    }
2025}