Skip to main content

ff_sdk/
worker.rs

1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6// RFC-023 Phase 1a (§4.4 item 10b): ferriskey imports scoped behind
7// `valkey-default` so the module compiles under
8// `--no-default-features, features = ["sqlite"]`.
9#[cfg(feature = "valkey-default")]
10use ferriskey::Client;
11use ff_core::partition::PartitionConfig;
12use ff_core::types::*;
13use tokio::sync::Semaphore;
14
15use crate::config::WorkerConfig;
16use crate::task::ClaimedTask;
17use crate::SdkError;
18
19/// FlowFabric worker — connects to Valkey, claims executions, and provides
20/// the worker-facing API.
21///
22/// # Admission control
23///
24/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
25/// **bypasses the scheduler's admission controls**: it reads the eligible
26/// ZSET directly and mints its own claim grant without consulting budget
27/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
28/// benchmarks, tests, and single-tenant development where the scheduler
29/// hop is measurement noise, not for production.
30///
31/// For production deployments, consume scheduler-issued grants via
32/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
33/// budget breach, quota sliding-window, concurrency cap, and
34/// capability-match checks before issuing grants.
35///
36/// # Usage
37///
38/// ```rust,ignore
39/// use ff_core::backend::BackendConfig;
40/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
41/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
42///
43/// let config = WorkerConfig {
44///     backend: Some(BackendConfig::valkey("localhost", 6379)),
45///     worker_id: WorkerId::new("w1"),
46///     worker_instance_id: WorkerInstanceId::new("w1-i1"),
47///     namespace: Namespace::new("default"),
48///     lanes: vec![LaneId::new("main")],
49///     capabilities: Vec::new(),
50///     lease_ttl_ms: 30_000,
51///     claim_poll_interval_ms: 1_000,
52///     max_concurrent_tasks: 1,
53///     partition_config: None,
54/// };
55/// let worker = FlowFabricWorker::connect(config).await?;
56///
57/// loop {
58///     if let Some(task) = worker.claim_next().await? {
59///         // Process task...
60///         task.complete(Some(b"result".to_vec())).await?;
61///     } else {
62///         tokio::time::sleep(Duration::from_secs(1)).await;
63///     }
64/// }
65/// ```
66pub struct FlowFabricWorker {
67    /// RFC-023 Phase 1a (§4.4 item 10c): `ferriskey::Client` is
68    /// `valkey-default`-gated **and** `Option` wrapped. Under
69    /// sqlite-only features the field is absent. Under
70    /// `valkey-default` the field is `Some(client)` when the worker
71    /// was built via [`Self::connect`] (the Valkey-bundled entry
72    /// point that dials), and `None` when it was built via
73    /// [`Self::connect_with`] (the backend-agnostic entry point per
74    /// §4.4 item 10e — no ferriskey round-trip). Claim/signal hot
75    /// paths (all `valkey-default`-gated per §4.4 item 10f) expect
76    /// `Some`; a claim call against a `connect_with`-built worker
77    /// panics with a clear message until the backend-agnostic SDK
78    /// worker-loop RFC lands (tracked in §8).
79    #[cfg(feature = "valkey-default")]
80    #[allow(dead_code)]
81    client: Option<Client>,
82    config: WorkerConfig,
83    partition_config: PartitionConfig,
84    /// 8-hex FNV-1a digest of the sorted capabilities CSV. Used in
85    /// per-mismatch logs so the 4KB CSV never echoes on every reject
86    /// during an incident. For [`Self::connect`]-built workers, the
87    /// full CSV is additionally logged once at connect-time WARN via
88    /// `valkey_preamble::run` for cross-reference; [`Self::connect_with`]-
89    /// built workers compute the hash without the companion CSV log.
90    /// Mirrors `ff-scheduler::claim::worker_caps_digest`.
91    #[cfg(feature = "direct-valkey-claim")]
92    worker_capabilities_hash: String,
93    #[cfg(feature = "direct-valkey-claim")]
94    lane_index: AtomicUsize,
95    /// Concurrency cap for in-flight tasks. Permits are acquired or
96    /// transferred by [`claim_next`] (feature-gated),
97    /// [`claim_from_grant`] (always available), and
98    /// [`claim_from_reclaim_grant`], transferred to the returned
99    /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
100    /// Holds `max_concurrent_tasks` permits total.
101    ///
102    /// [`claim_next`]: FlowFabricWorker::claim_next
103    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
104    /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
105    #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
106    concurrency_semaphore: Arc<Semaphore>,
107    /// Rolling offset for chunked partition scans. Each poll advances the
108    /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
109    /// chunk)` polls every partition is covered. The initial value is
110    /// derived from `worker_instance_id` so idle workers spread their
111    /// scans across different partitions from the first poll onward.
112    ///
113    /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
114    /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
115    /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
116    /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
117    /// correctness because the sequence simply restarts a new cycle.
118    #[cfg(feature = "direct-valkey-claim")]
119    scan_cursor: AtomicUsize,
120    /// The [`EngineBackend`] the Stage-1b trait forwarders route
121    /// through.
122    ///
123    /// **RFC-012 Stage 1b.** Always populated:
124    /// [`FlowFabricWorker::connect`] now wraps the worker's own
125    /// `ferriskey::Client` in a `ValkeyBackend` via
126    /// `ValkeyBackend::from_client_and_partitions`, and
127    /// [`FlowFabricWorker::connect_with`] replaces that default with
128    /// the caller-supplied `Arc<dyn EngineBackend>`. The
129    /// [`FlowFabricWorker::backend`] accessor still returns
130    /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
131    /// narrows the return type once consumers have migrated.
132    ///
133    /// Hot paths (claim, deliver_signal, admin queries) still use the
134    /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
135    /// migrates them through this field, and Stage 1d removes the
136    /// embedded client.
137    backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
138    /// Optional handle to the same underlying backend viewed as a
139    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
140    /// Populated by [`Self::connect`] from the bundled
141    /// `ValkeyBackend` (which implements the trait); supplied by the
142    /// caller on [`Self::connect_with`] as an explicit
143    /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
144    /// backend does not support push-based completion" (e.g. a future
145    /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
146    /// and other completion-subscription consumers reach this through
147    /// [`Self::completion_backend`].
148    completion_backend_handle:
149        Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
150}
151
152/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
153/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
154/// O(num_flow_partitions).
155#[cfg(feature = "direct-valkey-claim")]
156const PARTITION_SCAN_CHUNK: usize = 32;
157
158impl FlowFabricWorker {
159    /// RFC-023 Phase 1a helper: borrow the embedded `ferriskey::Client`,
160    /// panicking with a clear message if the worker was built via
161    /// [`Self::connect_with`] (which does not dial Valkey). Every
162    /// Valkey-specific claim/signal method funnels through this accessor
163    /// so the panic site is uniform and traceable.
164    ///
165    /// A backend-agnostic claim/signal API is deferred per §8 breaking-
166    /// change disclosure; until that lands, valkey-default consumers
167    /// must use [`Self::connect`] if they intend to drive the Valkey
168    /// claim/signal loop.
169    #[cfg(feature = "valkey-default")]
170    #[inline]
171    #[allow(dead_code)]
172    fn valkey_client(&self) -> &Client {
173        self.client.as_ref().expect(
174            "FlowFabricWorker was built via connect_with (no Valkey dial) \
175             but a Valkey-specific claim/signal method was invoked. \
176             Use FlowFabricWorker::connect to dial Valkey, or drive the \
177             backend directly through the trait surface via .backend().",
178        )
179    }
180}
181
182impl FlowFabricWorker {
183    /// Connect to Valkey and prepare the worker.
184    ///
185    /// Establishes the ferriskey connection. Does NOT load the FlowFabric
186    /// library — that is the server's responsibility (ff-server calls
187    /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
188    /// the library is already loaded.
189    ///
190    /// # Smoke / dev scripts: rotate `WorkerInstanceId`
191    ///
192    /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
193    /// `WorkerInstanceId`. When a smoke / dev script reuses the same
194    /// `WorkerInstanceId` across restarts, subsequent runs trap behind
195    /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
196    /// configured lease TTL) expires — the worker appears stuck and
197    /// claims nothing. Iterative scripts should synthesise a fresh
198    /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
199    /// or embed a UUID/timestamp) rather than hard-coding a stable
200    /// value. Production workers that cleanly shut down release the
201    /// key; only crashed / kill -9'd processes hit this trap.
202    ///
203    /// **RFC-023 Phase 1a (§4.4 item 10d, v0.12.0):** this Valkey-bundled
204    /// entry point is `#[cfg(feature = "valkey-default")]`-gated.
205    /// Consumers on the sqlite-only feature set must use
206    /// [`FlowFabricWorker::connect_with`] and drive the backend directly
207    /// through the trait surface.
208    #[cfg(feature = "valkey-default")]
209    pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
210        if config.lanes.is_empty() {
211            return Err(SdkError::Config {
212                context: "worker_config".into(),
213                field: None,
214                message: "at least one lane is required".into(),
215            });
216        }
217
218        // Build the ferriskey client from the nested `BackendConfig`.
219        // Delegates to `ff_backend_valkey::build_client` so host/port +
220        // TLS + cluster + `BackendTimeouts::request` + `BackendRetry`
221        // wiring lives in exactly one place (RFC-012 Stage 1c tranche 1).
222        //
223        // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
224        // Finding 2): `WorkerConfig.backend` is `Option<BackendConfig>`
225        // because `connect_with` ignores it. The URL-based `connect`
226        // path still requires a concrete `BackendConfig` to dial;
227        // reject `None` up front with a typed Config error instead of
228        // silently using a default that would hide a caller mistake.
229        let backend_config = config.backend.as_ref().ok_or_else(|| SdkError::Config {
230            context: "worker_config".into(),
231            field: Some("backend".into()),
232            message: "FlowFabricWorker::connect requires WorkerConfig.backend = \
233                Some(BackendConfig::...); use FlowFabricWorker::connect_with for \
234                the backend-agnostic path that takes a pre-built \
235                Arc<dyn EngineBackend>".into(),
236        })?;
237        let client = ff_backend_valkey::build_client(backend_config).await?;
238
239        // v0.12 PR-6: the Valkey-specific preamble (PING, alive-key
240        // SET-NX, `ff:config:partitions` HGETALL, caps ingress +
241        // sorted-dedup CSV, caps STRING + workers-index writes) lives
242        // in `crate::valkey_preamble`. Extracted byte-for-byte from
243        // the pre-PR-6 inline body; the write order is observable
244        // from scheduler-side reads (unblock scanner:
245        // SMEMBERS ff:idx:workers → GET ff:worker:{id}:caps) so
246        // preservation is load-bearing. See `valkey_preamble::run`.
247        let crate::valkey_preamble::PreambleOutput {
248            partition_config,
249            #[cfg(feature = "direct-valkey-claim")]
250            capabilities_csv: _worker_capabilities_csv,
251            #[cfg(feature = "direct-valkey-claim")]
252            capabilities_hash: worker_capabilities_hash,
253        } = crate::valkey_preamble::run(&client, &config).await?;
254
255        let max_tasks = config.max_concurrent_tasks.max(1);
256        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
257
258        tracing::info!(
259            worker_id = %config.worker_id,
260            instance_id = %config.worker_instance_id,
261            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
262            "FlowFabricWorker connected"
263        );
264
265        #[cfg(feature = "direct-valkey-claim")]
266        let scan_cursor_init = scan_cursor_seed(
267            config.worker_instance_id.as_str(),
268            partition_config.num_flow_partitions.max(1) as usize,
269        );
270
271        // RFC-012 Stage 1b: wrap the dialed client in a ValkeyBackend
272        // so `ClaimedTask`'s trait forwarders have something to call.
273        // `from_client_and_partitions` reuses the already-dialed client
274        // — no second connection. Share the concrete
275        // `Arc<ValkeyBackend>` across the two trait objects — one
276        // allocation, both accessors yield identity-equivalent handles.
277        let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
278            ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
279                client.clone(),
280                partition_config,
281            );
282        let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
283        let completion_backend_handle: Option<
284            Arc<dyn ff_core::completion_backend::CompletionBackend>,
285        > = Some(valkey_backend);
286
287        Ok(Self {
288            client: Some(client),
289            config,
290            partition_config,
291            #[cfg(feature = "direct-valkey-claim")]
292            worker_capabilities_hash,
293            #[cfg(feature = "direct-valkey-claim")]
294            lane_index: AtomicUsize::new(0),
295            concurrency_semaphore,
296            #[cfg(feature = "direct-valkey-claim")]
297            scan_cursor: AtomicUsize::new(scan_cursor_init),
298            backend,
299            completion_backend_handle,
300        })
301    }
302
303    /// Store pre-built [`EngineBackend`] and (optional)
304    /// [`CompletionBackend`] handles on the worker. Builds the worker
305    /// via the legacy [`FlowFabricWorker::connect`] path first (so the
306    /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
307    /// paths still use is dialed), then replaces the default
308    /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
309    ///
310    /// The `completion` argument is explicit: 0.3.3 previously accepted
311    /// only `backend` and `completion_backend()` silently returned
312    /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
313    /// upcast to `Arc<dyn CompletionBackend>` without loss of
314    /// trait-object identity. 0.3.4 lets the caller decide.
315    ///
316    /// - `Some(arc)` — caller supplies a completion backend.
317    ///   [`Self::completion_backend`] returns `Some(clone)`.
318    /// - `None` — this backend does not support push-based completion
319    ///   (future Postgres backend without LISTEN/NOTIFY, test mocks).
320    ///   [`Self::completion_backend`] returns `None`.
321    ///
322    /// When the underlying backend implements both traits (as
323    /// `ValkeyBackend` does), pass the same `Arc` twice — the two
324    /// trait-object views share one allocation:
325    ///
326    /// ```rust,ignore
327    /// use std::sync::Arc;
328    /// use ff_backend_valkey::ValkeyBackend;
329    /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
330    ///
331    /// # async fn doc(worker_config: WorkerConfig,
332    /// #              backend_config: ff_backend_valkey::BackendConfig)
333    /// #     -> Result<(), ff_sdk::SdkError> {
334    /// // Valkey (completion supported):
335    /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
336    /// let worker = FlowFabricWorker::connect_with(
337    ///     worker_config,
338    ///     valkey.clone(),
339    ///     Some(valkey),
340    /// ).await?;
341    /// # Ok(()) }
342    /// ```
343    ///
344    /// Backend without completion support:
345    ///
346    /// ```rust,ignore
347    /// let worker = FlowFabricWorker::connect_with(
348    ///     worker_config,
349    ///     backend,
350    ///     None,
351    /// ).await?;
352    /// ```
353    ///
354    /// **Stage 1b + Round-7 scope — what the injected backend covers
355    /// today.** The injected backend currently covers these per-task
356    /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
357    /// `delay_execution` / `move_to_waiting_children` / `complete` /
358    /// `cancel` / `fail` / `create_pending_waitpoint` /
359    /// `append_frame` / `report_usage`. A mock backend therefore sees
360    /// that portion of the worker's per-task write surface. Lease
361    /// renewal also routes through `backend.renew(&handle)`. Round-7
362    /// (#135/#145) closed the four trait-shape gaps tracked by #117,
363    /// but `suspend` still reaches the embedded `ferriskey::Client`
364    /// directly via `ff_suspend_execution` — this is the deferred
365    /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
366    /// work. `claim_next` / `claim_from_grant` /
367    /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
368    /// are Stage 1c hot-path work. Stage 1d removes the embedded
369    /// client entirely.
370    ///
371    /// Today's constructor is therefore NOT yet a drop-in way to swap
372    /// in a non-Valkey backend — it requires a reachable Valkey node
373    /// for `suspend` plus the remaining hot-path ops. Tests that
374    /// exercise only the migrated per-task ops can run fully against
375    /// a mock backend.
376    ///
377    /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
378    /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
379    pub async fn connect_with(
380        config: WorkerConfig,
381        backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
382        completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
383    ) -> Result<Self, SdkError> {
384        // RFC-023 Phase 1a (§4.4 item 10e, v0.12.0): direct
385        // backend-agnostic construction. No `Self::connect` preamble,
386        // no ferriskey round-trips (no PING, no alive-key SET-NX, no
387        // `ff:config:partitions` HGETALL). Callers needing a
388        // non-default `PartitionConfig` under non-Valkey backends set
389        // [`WorkerConfig::partition_config`] (v0.12 PR-6 closed the
390        // original follow-up flagged here).
391        //
392        // Lane-empty validation (the one check `connect` did before
393        // any Valkey work) is hoisted here so every entry point
394        // refuses an empty lane list.
395        if config.lanes.is_empty() {
396            return Err(SdkError::Config {
397                context: "worker_config".into(),
398                field: None,
399                message: "at least one lane is required".into(),
400            });
401        }
402
403        // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
404        // Finding 2): `WorkerConfig.backend` is ignored on this path —
405        // the caller-supplied `Arc<dyn EngineBackend>` is authoritative.
406        // If the caller left behind a `Some(..)` from an earlier
407        // `connect(..)` template, warn so the ambiguity is visible.
408        if config.backend.is_some() {
409            tracing::warn!(
410                worker_id = %config.worker_id,
411                instance_id = %config.worker_instance_id,
412                "WorkerConfig.backend is Some(..) but FlowFabricWorker::connect_with \
413                 was invoked — BackendConfig is ignored, the injected \
414                 Arc<dyn EngineBackend> is authoritative. Set WorkerConfig.backend = \
415                 None on the connect_with path to silence this warning."
416            );
417        }
418
419        let max_tasks = config.max_concurrent_tasks.max(1);
420        let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
421        // v0.12 PR-6: honor the optional `WorkerConfig::partition_config`
422        // override. `None` keeps the pre-PR-6 default shape (256 / 32 /
423        // 32); `Some(cfg)` lets non-Valkey deployments with a custom
424        // `num_flow_partitions` bind correctly instead of silently
425        // missing data via the wrong partition index.
426        let partition_config = config.partition_config.unwrap_or_default();
427
428        #[cfg(feature = "direct-valkey-claim")]
429        let scan_cursor_init = scan_cursor_seed(
430            config.worker_instance_id.as_str(),
431            partition_config.num_flow_partitions.max(1) as usize,
432        );
433
434        // Build the capabilities CSV / hash inline for the
435        // direct-valkey-claim path. The validation mirrors `connect`'s
436        // ingress so the two entry points refuse the same malformed
437        // tokens.
438        #[cfg(feature = "direct-valkey-claim")]
439        for cap in &config.capabilities {
440            if cap.is_empty() {
441                return Err(SdkError::Config {
442                    context: "worker_config".into(),
443                    field: Some("capabilities".into()),
444                    message: "capability token must not be empty".into(),
445                });
446            }
447            if cap.contains(',') {
448                return Err(SdkError::Config {
449                    context: "worker_config".into(),
450                    field: Some("capabilities".into()),
451                    message: format!(
452                        "capability token may not contain ',' (CSV delimiter): {cap:?}"
453                    ),
454                });
455            }
456            if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
457                return Err(SdkError::Config {
458                    context: "worker_config".into(),
459                    field: Some("capabilities".into()),
460                    message: format!(
461                        "capability token must not contain whitespace or control \
462                         characters: {cap:?}"
463                    ),
464                });
465            }
466        }
467        #[cfg(feature = "direct-valkey-claim")]
468        let worker_capabilities_hash = {
469            let set: std::collections::BTreeSet<&str> = config
470                .capabilities
471                .iter()
472                .map(|s| s.as_str())
473                .filter(|s| !s.is_empty())
474                .collect();
475            let csv: String = set.into_iter().collect::<Vec<_>>().join(",");
476            ff_core::hash::fnv1a_xor8hex(&csv)
477        };
478
479        Ok(Self {
480            #[cfg(feature = "valkey-default")]
481            client: None,
482            config,
483            partition_config,
484            #[cfg(feature = "direct-valkey-claim")]
485            worker_capabilities_hash,
486            #[cfg(feature = "direct-valkey-claim")]
487            lane_index: AtomicUsize::new(0),
488            concurrency_semaphore,
489            #[cfg(feature = "direct-valkey-claim")]
490            scan_cursor: AtomicUsize::new(scan_cursor_init),
491            backend,
492            completion_backend_handle: completion,
493        })
494    }
495
496    /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
497    /// ops through.
498    ///
499    /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
500    /// the `Option` wrapper is retained for API stability with the
501    /// Stage-1a shape. Stage 1c narrows the return type to
502    /// `&Arc<dyn EngineBackend>`.
503    pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
504        Some(&self.backend)
505    }
506
507    /// Crate-internal direct borrow of the backend. The public
508    /// [`Self::backend`] still returns `Option` for API stability
509    /// (Stage 1b holdover). Snapshot trait-forwarders in
510    /// [`crate::snapshot`] need an un-wrapped reference.
511    #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
512    pub(crate) fn backend_ref(
513        &self,
514    ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
515        &self.backend
516    }
517
518    /// Handle to the completion-event subscription backend, for
519    /// consumers that need to observe execution completions (DAG
520    /// reconcilers, tenant-isolated subscribers).
521    ///
522    /// Returns `Some` when the worker was built through
523    /// [`Self::connect`] on the default `valkey-default` feature
524    /// (the bundled `ValkeyBackend` implements
525    /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
526    /// or via [`Self::connect_with`] with a `Some(..)` completion
527    /// handle. Returns `None` when the caller passed `None` to
528    /// [`Self::connect_with`] — i.e. the backend does not support
529    /// push-based completion streams (future Postgres without
530    /// LISTEN/NOTIFY, test mocks).
531    ///
532    /// The returned handle shares the same underlying allocation as
533    /// [`Self::backend`]; calls through it (e.g.
534    /// `subscribe_completions_filtered`) hit the same connection
535    /// the worker itself uses.
536    pub fn completion_backend(
537        &self,
538    ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
539        self.completion_backend_handle.clone()
540    }
541
542    /// Get the worker config.
543    pub fn config(&self) -> &WorkerConfig {
544        &self.config
545    }
546
547    /// Get the server-published partition config this worker bound to at
548    /// `connect()`. Exposed so consumers that mint custom
549    /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
550    /// produced outside this worker) stay aligned with the server's
551    /// `num_flow_partitions` — using `PartitionConfig::default()`
552    /// assumes 256 partitions and silently misses data on deployments
553    /// with any other value.
554    pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
555        &self.partition_config
556    }
557
558    /// Attempt to claim the next eligible execution.
559    ///
560    /// Phase 1 simplified claim flow:
561    /// 1. Pick a lane (round-robin across configured lanes)
562    /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
563    /// 3. Claim the execution via `ff_claim_execution`
564    /// 4. Read execution payload + tags
565    /// 5. Return a [`ClaimedTask`] with auto lease renewal
566    ///
567    /// Gated behind the `direct-valkey-claim` feature — bypasses the
568    /// scheduler's budget / quota / capability admission checks. Enable
569    /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
570    /// the scheduler hop would be measurement noise (benches) or when
571    /// the test harness needs a deterministic worker-local path. Prefer
572    /// the scheduler-routed HTTP claim path in production.
573    ///
574    /// # `None` semantics
575    ///
576    /// `Ok(None)` means **no work was found in the partition window this
577    /// poll covered**, not "the cluster is idle". Each call scans a chunk
578    /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
579    /// `scan_cursor`; the cursor advances by that chunk size on every
580    /// invocation, so a worker covers every partition exactly once every
581    /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
582    ///
583    /// Callers should treat `None` as "poll again soon" (typically after
584    /// `config.claim_poll_interval_ms`) rather than "sleep for a long
585    /// time". Backing off too aggressively on `None` can starve workers
586    /// when work lives on partitions outside the current window.
587    ///
588    /// Returns `Err` on Valkey errors or script failures.
589    #[cfg(feature = "direct-valkey-claim")]
590    pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
591        // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
592        // try_acquire returns immediately — if no permits available, the worker
593        // is at capacity and should not claim more work.
594        let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
595            Ok(p) => p,
596            Err(_) => return Ok(None), // At capacity — no claim attempted
597        };
598
599        let lane_id = self.next_lane();
600        let now = TimestampMs::now();
601
602        // Phase 1: We scan eligible executions directly by reading the eligible
603        // ZSET across execution partitions. In production the scheduler
604        // (ff-scheduler) would handle this. For Phase 1, the SDK does a
605        // simplified inline claim.
606        //
607        // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
608        // partitions starting at a rolling offset. This keeps idle Valkey
609        // load at O(chunk) per worker-second instead of O(num_partitions),
610        // and the worker-instance-seeded initial cursor spreads concurrent
611        // workers across different partition windows.
612        let num_partitions = self.partition_config.num_flow_partitions as usize;
613        if num_partitions == 0 {
614            return Ok(None);
615        }
616        let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
617        let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
618
619        // Hoist the sorted/deduped capability set out of the loop — the
620        // per-partition iteration reused a fresh BTreeSet on every
621        // step pre-fix, adding O(n log n) alloc/sort to the scanner
622        // hot path on every tick. Computed once per `claim_next` call.
623        let worker_capabilities: std::collections::BTreeSet<String> = self
624            .config
625            .capabilities
626            .iter()
627            .cloned()
628            .collect();
629
630        for step in 0..chunk {
631            let partition_idx = ((start + step) % num_partitions) as u16;
632            let partition = ff_core::partition::Partition {
633                family: ff_core::partition::PartitionFamily::Execution,
634                index: partition_idx,
635            };
636
637            // v0.12 PR-5: trait-routed scanner primitive. Replaces the
638            // pre-PR-5 `ZRANGEBYSCORE` inline; the Valkey backend fires
639            // the identical command byte-for-byte (see
640            // `ff_backend_valkey::scan_eligible_executions_impl`).
641            let scan_args = ff_core::contracts::ScanEligibleArgs::new(
642                lane_id.clone(),
643                partition,
644                1,
645            );
646            let candidates = self.backend.scan_eligible_executions(scan_args).await?;
647            let execution_id = match candidates.into_iter().next() {
648                Some(id) => id,
649                None => continue, // No eligible executions on this partition
650            };
651
652            // Step 1: Issue claim grant (v0.12 PR-5: trait-routed).
653            let grant_args = ff_core::contracts::IssueClaimGrantArgs::new(
654                execution_id.clone(),
655                lane_id.clone(),
656                self.config.worker_id.clone(),
657                self.config.worker_instance_id.clone(),
658                partition,
659                worker_capabilities.clone(),
660                5_000, // grant_ttl_ms
661                now,
662            );
663            let grant_result = self.backend.issue_claim_grant(grant_args).await;
664
665            match grant_result {
666                Ok(_) => {}
667                Err(ref boxed)
668                    if matches!(
669                        boxed,
670                        crate::EngineError::Validation {
671                            kind: crate::ValidationKind::CapabilityMismatch,
672                            ..
673                        }
674                    ) =>
675                {
676                    let missing = match boxed {
677                        crate::EngineError::Validation { detail, .. } => detail.clone(),
678                        _ => unreachable!(),
679                    };
680                    // Block-on-mismatch (RFC-009 §7.5) — parity with
681                    // ff-scheduler's Scheduler::claim_for_worker. Without
682                    // this, the inline-direct-claim path would hot-loop
683                    // on an unclaimable top-of-zset (every tick picks the
684                    // same execution, wastes an FCALL, logs, releases,
685                    // repeats). The scheduler-side unblock scanner
686                    // promotes blocked_route executions back to eligible
687                    // when a worker with matching caps registers.
688                    tracing::info!(
689                        execution_id = %execution_id,
690                        worker_id = %self.config.worker_id,
691                        worker_caps_hash = %self.worker_capabilities_hash,
692                        missing = %missing,
693                        "capability mismatch, blocking execution off eligible (SDK inline claim)"
694                    );
695                    // v0.12 PR-5: trait-routed block_route. Swallow
696                    // typed outcomes + transport faults (best-effort
697                    // semantic preserved from pre-PR-5 behaviour —
698                    // see `BlockRouteOutcome::LuaRejected` rustdoc).
699                    let block_args = ff_core::contracts::BlockRouteArgs::new(
700                        execution_id.clone(),
701                        lane_id.clone(),
702                        partition,
703                        "waiting_for_capable_worker".to_owned(),
704                        "no connected worker satisfies required_capabilities".to_owned(),
705                        now,
706                    );
707                    match self.backend.block_route(block_args).await {
708                        Ok(ff_core::contracts::BlockRouteOutcome::Blocked { .. }) => {}
709                        Ok(ff_core::contracts::BlockRouteOutcome::LuaRejected { message }) => {
710                            tracing::warn!(
711                                execution_id = %execution_id,
712                                error = %message,
713                                "SDK block_route: Lua rejected; eligible ZSET unchanged, next \
714                                 poll will re-evaluate"
715                            );
716                        }
717                        Ok(_) => {}
718                        Err(e) => {
719                            tracing::warn!(
720                                execution_id = %execution_id,
721                                error = %e,
722                                "SDK block_route: transport failure; eligible ZSET unchanged"
723                            );
724                        }
725                    }
726                    continue;
727                }
728                Err(ref e) if is_retryable_claim_error(e) => {
729                    tracing::debug!(
730                        execution_id = %execution_id,
731                        error = %e,
732                        "claim grant failed (retryable), trying next partition"
733                    );
734                    continue;
735                }
736                Err(e) => return Err(SdkError::from(e)),
737            }
738
739            // Step 2: Claim the execution
740            match self
741                .claim_execution(&execution_id, &lane_id, &partition, now)
742                .await
743            {
744                Ok(mut task) => {
745                    // Transfer concurrency permit to the task. When the task is
746                    // completed/failed/cancelled/dropped the permit returns to
747                    // the semaphore, allowing another claim.
748                    task.set_concurrency_permit(permit);
749                    return Ok(Some(task));
750                }
751                Err(SdkError::Engine(ref boxed))
752                    if matches!(
753                        **boxed,
754                        crate::EngineError::Contention(
755                            crate::ContentionKind::UseClaimResumedExecution
756                        )
757                    ) =>
758                {
759                    // Execution was resumed from suspension — attempt_interrupted.
760                    // ff_claim_execution rejects this; use ff_claim_resumed_execution
761                    // which reuses the existing attempt instead of creating a new one.
762                    tracing::debug!(
763                        execution_id = %execution_id,
764                        "execution is resumed, using claim_resumed path"
765                    );
766                    match self
767                        .claim_resumed_execution(&execution_id, &lane_id, &partition)
768                        .await
769                    {
770                        Ok(mut task) => {
771                            task.set_concurrency_permit(permit);
772                            return Ok(Some(task));
773                        }
774                        Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
775                            tracing::debug!(
776                                execution_id = %execution_id,
777                                error = %e2,
778                                "claim_resumed failed (retryable), trying next partition"
779                            );
780                            continue;
781                        }
782                        Err(e2) => return Err(e2),
783                    }
784                }
785                Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
786                    tracing::debug!(
787                        execution_id = %execution_id,
788                        error = %e,
789                        "claim execution failed (retryable), trying next partition"
790                    );
791                    continue;
792                }
793                Err(e) => return Err(e),
794            }
795        }
796
797        // No eligible work found on any partition
798        Ok(None)
799    }
800
801    /// Low-level claim of a granted execution. Routes through
802    /// [`EngineBackend::claim_execution`](ff_core::engine_backend::EngineBackend::claim_execution)
803    /// — the trait-level grant-consumer method landed in v0.12 PR-4 —
804    /// and returns a `ClaimedTask` with auto lease renewal.
805    ///
806    /// Pre-PR-4 the SDK fired the `ff_claim_execution` FCALL directly
807    /// against `valkey_client()` and hand-parsed the Lua response shape.
808    /// The body now collapses to `backend.claim_execution(args)` +
809    /// `backend.read_execution_context(...)` + `ClaimedTask::new(...)`;
810    /// the Valkey-specific FCALL plumbing lives behind the trait in
811    /// `ff_backend_valkey::claim_execution_impl`.
812    ///
813    /// As of v0.12 PR-5.5 this helper + `claim_from_grant` are no
814    /// longer `valkey-default`-gated: the backend mints the `Handle`
815    /// at claim time and `ClaimedTask::new` caches it, so no
816    /// Valkey-specific handle synthesis is required on this path. The
817    /// `EngineBackend::claim_execution` default impl returns
818    /// `Err(Unavailable)` on PG/SQLite today (grant-consumer surface
819    /// is Valkey-only until the PG/SQLite grant-consumer RFC lands);
820    /// the compile surface is fully agnostic.
821    ///
822    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
823    async fn claim_execution(
824        &self,
825        execution_id: &ExecutionId,
826        lane_id: &LaneId,
827        partition: &ff_core::partition::Partition,
828        now: TimestampMs,
829    ) -> Result<ClaimedTask, SdkError> {
830        // v0.12 PR-5.5 retry-path fix — pre-read the **total attempt
831        // counter**, not the current-attempt pointer. The fresh-claim
832        // path mints a new attempt row whose index is the counter's
833        // current value (the backend's Lua 5920 / PG `ff_claim_execution`
834        // / SQLite `claim_impl` all consult this same counter to compute
835        // `next_att_idx`). Pre-PR-5.5 this call went inline as `HGET
836        // {exec}:core total_attempt_count` on the Valkey client; the
837        // first PR-5.5 landing mistakenly routed it to
838        // `read_current_attempt_index`, which returns the *pointer* at
839        // the previously-leased attempt — on a retry-of-a-retry that
840        // still named the terminal-failed prior attempt and the new
841        // claim collided with it. `read_total_attempt_count` wraps
842        // the same byte-for-byte HGET on Valkey and provides the JSONB
843        // / json_extract equivalents on PG / SQLite. See
844        // `EngineBackend::read_total_attempt_count` rustdoc for the
845        // pointer-vs-counter distinction.
846        let att_idx = self
847            .backend
848            .read_total_attempt_count(execution_id)
849            .await
850            .map_err(|e| SdkError::Engine(Box::new(e)))?;
851
852        let args = ff_core::contracts::ClaimExecutionArgs::new(
853            execution_id.clone(),
854            self.config.worker_id.clone(),
855            self.config.worker_instance_id.clone(),
856            lane_id.clone(),
857            LeaseId::new(),
858            self.config.lease_ttl_ms,
859            AttemptId::new(),
860            att_idx,
861            "{}".to_owned(),
862            None,
863            None,
864            now,
865        );
866
867        // `ClaimExecutionResult` is `#[non_exhaustive]` (v0.12 PR-4)
868        // so let-binding requires an explicit wildcard arm even though
869        // `Claimed` is the only variant today. A future additive variant
870        // surfaces here as a typed `ScriptError::Parse` — loud, routed
871        // through the existing SDK error path, and never silently
872        // dropped.
873        let claimed = match self.backend.claim_execution(args).await? {
874            ff_core::contracts::ClaimExecutionResult::Claimed(c) => c,
875            other => {
876                return Err(SdkError::from(ff_script::error::ScriptError::Parse {
877                    fcall: "ff_claim_execution".into(),
878                    execution_id: Some(execution_id.to_string()),
879                    message: format!(
880                        "unexpected ClaimExecutionResult variant: {other:?}"
881                    ),
882                }));
883            }
884        };
885
886        // Read execution payload and metadata
887        let (input_payload, execution_kind, tags) = self
888            .read_execution_context(execution_id, partition)
889            .await?;
890
891        Ok(ClaimedTask::new(
892            self.backend.clone(),
893            self.partition_config,
894            claimed.handle,
895            execution_id.clone(),
896            claimed.attempt_index,
897            claimed.attempt_id,
898            claimed.lease_id,
899            claimed.lease_epoch,
900            self.config.lease_ttl_ms,
901            lane_id.clone(),
902            self.config.worker_instance_id.clone(),
903            input_payload,
904            execution_kind,
905            tags,
906        ))
907    }
908
909    /// Consume a [`ClaimGrant`] and claim the granted execution on
910    /// this worker. The intended production entry point: pair with
911    /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
912    /// scheduler-issued grants into the SDK without enabling the
913    /// `direct-valkey-claim` feature (which bypasses budget/quota
914    /// admission control).
915    ///
916    /// The worker's concurrency semaphore is checked BEFORE the FCALL
917    /// so a saturated worker does not consume the grant: the grant
918    /// stays valid for its remaining TTL and the caller can either
919    /// release it back to the scheduler or retry after some other
920    /// in-flight task completes.
921    ///
922    /// On success the returned [`ClaimedTask`] holds a concurrency
923    /// permit that releases automatically on
924    /// `complete`/`fail`/`cancel`/drop — same contract as
925    /// `claim_next`.
926    ///
927    /// # Arguments
928    ///
929    /// * `lane` — the lane the grant was issued for. Must match what
930    ///   was passed to `Scheduler::claim_for_worker`; the Lua FCALL
931    ///   uses it to look up `lane_eligible`, `lane_active`, and the
932    ///   `worker_leases` index slot.
933    /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
934    ///
935    /// # Errors
936    ///
937    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
938    ///   permits all held. Retryable; the grant is untouched.
939    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
940    ///   or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
941    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
942    ///   (wrapped in [`SdkError::Engine`]).
943    /// * `ScriptError::CapabilityMismatch` — execution's required
944    ///   capabilities not a subset of this worker's caps (wrapped in
945    ///   [`SdkError::Engine`]). Surfaced post-grant if a race
946    ///   between grant issuance and caps change allows it.
947    /// * `ScriptError::Parse` — `ff_claim_execution` returned an
948    ///   unexpected shape (wrapped in [`SdkError::Engine`]).
949    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
950    ///   transport error during the FCALL or the
951    ///   `read_execution_context` follow-up.
952    ///
953    /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
954    /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
955    ///
956    /// # Backend coverage (v0.12 PR-5.5)
957    ///
958    /// Method ungated across backends. The Valkey backend handles the
959    /// grant fully. Postgres + SQLite backends return
960    /// [`EngineError::Unavailable`](ff_core::engine_error::EngineError::Unavailable)
961    /// from [`EngineBackend::claim_execution`] today — grants on those
962    /// backends flow through the scheduler-routed [`claim_via_server`]
963    /// path (the PG/SQLite scheduler lives outside the `EngineBackend`
964    /// trait in this release). See
965    /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation and
966    /// planned follow-up.
967    ///
968    /// [`claim_via_server`]: FlowFabricWorker::claim_via_server
969    /// [`EngineBackend::claim_execution`]: ff_core::engine_backend::EngineBackend::claim_execution
970    pub async fn claim_from_grant(
971        &self,
972        lane: LaneId,
973        grant: ff_core::contracts::ClaimGrant,
974    ) -> Result<ClaimedTask, SdkError> {
975        // Semaphore check FIRST. If the worker is saturated we must
976        // surface the condition to the caller without touching the
977        // grant — silently returning Ok(None) (as claim_next does)
978        // would drop a grant the scheduler has already committed work
979        // to issuing, wasting the slot until its TTL elapses.
980        let permit = self
981            .concurrency_semaphore
982            .clone()
983            .try_acquire_owned()
984            .map_err(|_| SdkError::WorkerAtCapacity)?;
985
986        let now = TimestampMs::now();
987        let partition = grant.partition().map_err(|e| SdkError::Config {
988            context: "claim_from_grant".to_owned(),
989            field: Some("partition_key".to_owned()),
990            message: e.to_string(),
991        })?;
992        let mut task = match self
993            .claim_execution(&grant.execution_id, &lane, &partition, now)
994            .await
995        {
996            Ok(task) => task,
997            Err(SdkError::Engine(ref boxed))
998                if matches!(
999                    **boxed,
1000                    crate::EngineError::Contention(
1001                        crate::ContentionKind::UseClaimResumedExecution
1002                    )
1003                ) =>
1004            {
1005                // Execution was resumed from suspension — attempt_interrupted.
1006                // ff_claim_execution rejects this; use ff_claim_resumed_execution
1007                // which reuses the existing attempt instead of creating a new one.
1008                // Mirrors the fallback inside `claim_next` so HTTP-routed callers
1009                // (`claim_via_server` → `claim_from_grant`) get the same transparent
1010                // re-claim behavior.
1011                tracing::debug!(
1012                    execution_id = %grant.execution_id,
1013                    "execution is resumed, using claim_resumed path"
1014                );
1015                self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
1016                    .await?
1017            }
1018            Err(e) => return Err(e),
1019        };
1020        task.set_concurrency_permit(permit);
1021        Ok(task)
1022    }
1023
1024    /// Scheduler-routed claim: POST the server's
1025    /// `/v1/workers/{id}/claim`, then chain to
1026    /// [`Self::claim_from_grant`].
1027    ///
1028    /// Batch C item 2 PR-B. This is the production entry point —
1029    /// budget + quota + capability admission run server-side inside
1030    /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1031    /// enable the `direct-valkey-claim` feature.
1032    ///
1033    /// Returns `Ok(None)` when the server says no eligible execution
1034    /// (HTTP 204). Callers typically back off by
1035    /// `config.claim_poll_interval_ms` and try again, same cadence
1036    /// as the direct-claim path's `Ok(None)`.
1037    ///
1038    /// The `admin` client is the established HTTP surface
1039    /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1040    /// second reqwest client around. Build once at worker boot and
1041    /// hand in by reference on every claim.
1042    pub async fn claim_via_server(
1043        &self,
1044        admin: &crate::FlowFabricAdminClient,
1045        lane: &LaneId,
1046        grant_ttl_ms: u64,
1047    ) -> Result<Option<ClaimedTask>, SdkError> {
1048        let req = crate::admin::ClaimForWorkerRequest {
1049            worker_id: self.config.worker_id.to_string(),
1050            lane_id: lane.to_string(),
1051            worker_instance_id: self.config.worker_instance_id.to_string(),
1052            capabilities: self.config.capabilities.clone(),
1053            grant_ttl_ms,
1054        };
1055        let Some(resp) = admin.claim_for_worker(req).await? else {
1056            return Ok(None);
1057        };
1058        let grant = resp.into_grant()?;
1059        self.claim_from_grant(lane.clone(), grant).await.map(Some)
1060    }
1061
1062    /// Consume a [`ResumeGrant`] and transition the granted
1063    /// `attempt_interrupted` execution into a `started` state on this
1064    /// worker. Symmetric partner to [`claim_from_grant`] for the
1065    /// resume path.
1066    ///
1067    /// **Renamed from `claim_from_reclaim_grant` (RFC-024 PR-B+C).**
1068    /// The new `claim_from_reclaim_grant` method lands with PR-G and
1069    /// dispatches to [`EngineBackend::reclaim_execution`] for the
1070    /// lease-reclaim path (distinct semantic, distinct grant type).
1071    ///
1072    /// The grant must have been issued to THIS worker (matching
1073    /// `worker_id` at grant time). A mismatch returns
1074    /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1075    /// atomically by `ff_claim_resumed_execution`; a second call with
1076    /// the same grant also returns `InvalidClaimGrant`.
1077    ///
1078    /// # Concurrency
1079    ///
1080    /// The worker's concurrency semaphore is checked BEFORE the FCALL
1081    /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1082    /// assume pre-existing capacity on this worker — a reclaim can
1083    /// land on a fresh worker instance that just came up after a
1084    /// crash/restart and is picking up a previously-interrupted
1085    /// execution. If the worker is saturated, the grant stays valid
1086    /// for its remaining TTL and the caller can release it or retry.
1087    ///
1088    /// On success the returned [`ClaimedTask`] holds a concurrency
1089    /// permit that releases automatically on
1090    /// `complete`/`fail`/`cancel`/drop.
1091    ///
1092    /// # Errors
1093    ///
1094    /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1095    ///   permits all held. Retryable; the grant is untouched (no
1096    ///   FCALL was issued, so `ff_claim_resumed_execution` did not
1097    ///   atomically consume the grant key).
1098    /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1099    ///   or `worker_id` mismatch.
1100    /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1101    /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1102    ///   `attempt_interrupted`.
1103    /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1104    ///   not `runnable`.
1105    /// * `ScriptError::ExecutionNotFound` — core key missing.
1106    /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1107    ///   transport.
1108    ///
1109    /// [`ResumeGrant`]: ff_core::contracts::ResumeGrant
1110    /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1111    pub async fn claim_from_resume_grant(
1112        &self,
1113        grant: ff_core::contracts::ResumeGrant,
1114    ) -> Result<ClaimedTask, SdkError> {
1115        // Semaphore check FIRST — same load-bearing ordering as
1116        // `claim_from_grant`. If the worker is saturated, surface
1117        // WorkerAtCapacity without firing the FCALL; the FCALL is an
1118        // atomic consume on the grant key, so calling it past-
1119        // saturation would destroy the grant while leaving no
1120        // permit to attach to the returned `ClaimedTask`.
1121        let permit = self
1122            .concurrency_semaphore
1123            .clone()
1124            .try_acquire_owned()
1125            .map_err(|_| SdkError::WorkerAtCapacity)?;
1126
1127        // Grant carries partition + lane_id so no round-trip is needed
1128        // to resolve them before the FCALL.
1129        let partition = grant.partition().map_err(|e| SdkError::Config {
1130            context: "claim_from_resume_grant".to_owned(),
1131            field: Some("partition_key".to_owned()),
1132            message: e.to_string(),
1133        })?;
1134        let mut task = self
1135            .claim_resumed_execution(
1136                &grant.execution_id,
1137                &grant.lane_id,
1138                &partition,
1139            )
1140            .await?;
1141        task.set_concurrency_permit(permit);
1142        Ok(task)
1143    }
1144
1145    /// Consume a [`ReclaimGrant`] to mint a fresh attempt for a
1146    /// lease-expired / lease-revoked execution (RFC-024 §3.4).
1147    ///
1148    /// Backend-agnostic. Routes through
1149    /// [`EngineBackend::reclaim_execution`] on whatever backend the
1150    /// worker was connected with (Valkey via `connect` / `connect_with`,
1151    /// Postgres / SQLite via `connect_with`). Distinct from
1152    /// [`claim_from_resume_grant`]: reclaim creates a NEW attempt row
1153    /// and bumps `lease_reclaim_count` (`HandleKind::Reclaimed`), while
1154    /// resume re-uses the existing attempt under
1155    /// `ff_claim_resumed_execution` (`HandleKind::Resumed`).
1156    ///
1157    /// # Return shape
1158    ///
1159    /// Returns the raw [`ReclaimExecutionOutcome`] so consumers match
1160    /// on the four outcomes (`Claimed(Handle)`, `NotReclaimable`,
1161    /// `ReclaimCapExceeded`, `GrantNotFound`) and decide their
1162    /// dispatch. The `Claimed` variant carries a [`Handle`] whose
1163    /// `kind` is [`HandleKind::Reclaimed`]; downstream ops (complete,
1164    /// fail, renew, append_frame, …) take the handle directly via the
1165    /// `EngineBackend` trait.
1166    ///
1167    /// This contrasts with [`claim_from_resume_grant`], which wraps
1168    /// the handle in a [`ClaimedTask`] with a concurrency-permit and
1169    /// auto-lease-renewal loop. Those affordances are
1170    /// `valkey-default`-gated today (they depend on the bundled
1171    /// `ferriskey::Client` + `lease_ttl_ms` renewal timer). The
1172    /// reclaim surface is intentionally narrower so it compiles under
1173    /// `--no-default-features, features = ["sqlite"]` and consumers
1174    /// can drive the reclaim flow on any backend.
1175    ///
1176    /// # Feature compatibility
1177    ///
1178    /// No cfg-gate. Compiles + runs under every feature set ff-sdk
1179    /// supports (including sqlite-only). Verified by the compile-time
1180    /// type assertion
1181    /// `worker_claim_from_reclaim_grant_is_backend_agnostic_at_type_level`
1182    /// in `crates/ff-sdk/tests/rfc024_sdk.rs`, which pins the method's
1183    /// full signature under the default feature set and is paralleled
1184    /// by `sqlite_only_compile_surface_tests` in this file for the
1185    /// `--no-default-features, features = ["sqlite"]` compile anchor on
1186    /// the rest of the backend-agnostic surface.
1187    ///
1188    /// # worker_capabilities
1189    ///
1190    /// `ReclaimExecutionArgs::worker_capabilities` is NOT part of the
1191    /// Lua FCALL (the reclaim Lua validates grant consumption via
1192    /// `grant.worker_id == args.worker_id` only — see
1193    /// `crates/ff-script/src/flowfabric.lua:3088` and RFC-024 §4.4).
1194    /// Capability matching happens at grant-issuance time (see
1195    /// [`FlowFabricAdminClient::issue_reclaim_grant`]
1196    /// — in the `ff-sdk::admin` module, `valkey-default`-gated).
1197    ///
1198    /// # Errors
1199    ///
1200    /// * [`SdkError::Engine`] — the backend's `reclaim_execution`
1201    ///   returned an [`EngineError`] (transport fault, validation
1202    ///   failure, `Unavailable` from a backend that does not
1203    ///   implement RFC-024 — currently only pre-RFC out-of-tree
1204    ///   backends).
1205    ///
1206    /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1207    /// [`ReclaimExecutionOutcome`]: ff_core::contracts::ReclaimExecutionOutcome
1208    /// [`Handle`]: ff_core::backend::Handle
1209    /// [`HandleKind::Reclaimed`]: ff_core::backend::HandleKind::Reclaimed
1210    /// [`EngineBackend::reclaim_execution`]: ff_core::engine_backend::EngineBackend::reclaim_execution
1211    /// [`EngineError`]: ff_core::engine_error::EngineError
1212    /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1213    /// [`FlowFabricAdminClient::issue_reclaim_grant`]: crate::admin::FlowFabricAdminClient::issue_reclaim_grant
1214    pub async fn claim_from_reclaim_grant(
1215        &self,
1216        grant: ff_core::contracts::ReclaimGrant,
1217        args: ff_core::contracts::ReclaimExecutionArgs,
1218    ) -> Result<ff_core::contracts::ReclaimExecutionOutcome, SdkError> {
1219        // `ReclaimGrant` is accepted as a parameter so the call-site
1220        // shape matches RFC-024 §3.4 (consumer receives a grant from
1221        // `issue_reclaim_grant` and feeds it + args into
1222        // `claim_from_reclaim_grant`). The grant metadata is
1223        // already embedded in the backend's server-side store
1224        // (Valkey `claim_grant` hash, PG/SQLite `ff_claim_grant`
1225        // table) keyed by (execution_id, grant_key) — the trait
1226        // method looks it up from `args.execution_id`.
1227        //
1228        // The overlap between `ReclaimGrant` and
1229        // `ReclaimExecutionArgs` is (execution_id, lane_id);
1230        // mismatched grant/args is a consumer-side bug (grant-for-A
1231        // + args-for-B), and silently forwarding lets the SDK act
1232        // on the args execution. Reject the mismatch up front so
1233        // the misuse surfaces at the SDK boundary instead of
1234        // succeeding against the wrong execution. The grant's
1235        // `expires_at_ms` is validated against wall-clock now so
1236        // an already-expired grant is rejected without a backend
1237        // round-trip (the backend also enforces expiry, but the
1238        // SDK-side check gives a crisper error and preserves the
1239        // reclaim-budget / lease slot).
1240        // `TimestampMs` is an `i64` (Unix-epoch ms); cast to `u64` for
1241        // comparison against `ReclaimGrant::expires_at_ms`. Clamp
1242        // negatives to 0 so a pre-epoch system clock (vanishingly
1243        // unlikely, but representable) doesn't wrap to a future u64.
1244        let now_ms: u64 = ff_core::types::TimestampMs::now().0.max(0) as u64;
1245        validate_reclaim_grant_against_args(&grant, &args, now_ms)?;
1246        self.backend
1247            .reclaim_execution(args)
1248            .await
1249            .map_err(|e| SdkError::Engine(Box::new(e)))
1250    }
1251
1252    /// Low-level resume claim. Forwards through
1253    /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1254    /// — the trait-level trigger surface landed in issue #150 — and
1255    /// returns a [`ClaimedTask`] bound to the resumed attempt.
1256    ///
1257    /// The method stays private; external callers use
1258    /// [`claim_from_resume_grant`].
1259    ///
1260    /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1261    async fn claim_resumed_execution(
1262        &self,
1263        execution_id: &ExecutionId,
1264        lane_id: &LaneId,
1265        partition: &ff_core::partition::Partition,
1266    ) -> Result<ClaimedTask, SdkError> {
1267        // v0.12 PR-3 — pre-read current_attempt_index via the trait
1268        // rather than an inline `HGET` on the Valkey client. Load-bearing:
1269        // the backend's KEYS[6] (Valkey) / `ff_attempt` PK tuple (PG/SQLite)
1270        // must target the real existing attempt hash/row, and the backend
1271        // takes the index verbatim from
1272        // `ClaimResumedExecutionArgs::current_attempt_index`.
1273        let att_idx = self
1274            .backend
1275            .read_current_attempt_index(execution_id)
1276            .await
1277            .map_err(|e| SdkError::Engine(Box::new(e)))?;
1278
1279        let args = ff_core::contracts::ClaimResumedExecutionArgs {
1280            execution_id: execution_id.clone(),
1281            worker_id: self.config.worker_id.clone(),
1282            worker_instance_id: self.config.worker_instance_id.clone(),
1283            lane_id: lane_id.clone(),
1284            lease_id: LeaseId::new(),
1285            lease_ttl_ms: self.config.lease_ttl_ms,
1286            current_attempt_index: att_idx,
1287            remaining_attempt_timeout_ms: None,
1288            now: TimestampMs::now(),
1289        };
1290
1291        let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1292            self.backend.claim_resumed_execution(args).await?;
1293
1294        let (input_payload, execution_kind, tags) = self
1295            .read_execution_context(execution_id, partition)
1296            .await?;
1297
1298        Ok(ClaimedTask::new(
1299            self.backend.clone(),
1300            self.partition_config,
1301            claimed.handle,
1302            execution_id.clone(),
1303            claimed.attempt_index,
1304            claimed.attempt_id,
1305            claimed.lease_id,
1306            claimed.lease_epoch,
1307            self.config.lease_ttl_ms,
1308            lane_id.clone(),
1309            self.config.worker_instance_id.clone(),
1310            input_payload,
1311            execution_kind,
1312            tags,
1313        ))
1314    }
1315
1316    /// Read payload + execution_kind + tags from exec_core.
1317    ///
1318    /// As of v0.12 PR-1 this forwards through
1319    /// [`EngineBackend::read_execution_context`](ff_core::engine_backend::EngineBackend::read_execution_context)
1320    /// rather than issuing direct GET/HGET/HGETALL against Valkey. The
1321    /// outer `valkey-default` gate + `(&ExecutionId, &Partition)`
1322    /// signature are preserved; hot-path decoupling (ungating this
1323    /// helper + its call sites in `claim_execution` and
1324    /// `claim_resumed_execution`) is PR-4/PR-5 scope per the v0.12
1325    /// agnostic-SDK plan.
1326    async fn read_execution_context(
1327        &self,
1328        execution_id: &ExecutionId,
1329        _partition: &ff_core::partition::Partition,
1330    ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1331        let ctx = self.backend.read_execution_context(execution_id).await?;
1332        Ok((ctx.input_payload, ctx.execution_kind, ctx.tags))
1333    }
1334
1335    // ── Phase 3: Signal delivery ──
1336
1337    /// Deliver a signal to a suspended execution's waitpoint.
1338    ///
1339    /// The engine atomically records the signal, evaluates the resume condition,
1340    /// and optionally transitions the execution from `suspended` to `runnable`.
1341    ///
1342    /// Forwards through
1343    /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1344    /// — the trait-level trigger surface landed in issue #150.
1345    ///
1346    /// Backend-agnostic as of v0.12 PR-3. Compiles + runs under every
1347    /// feature set ff-sdk supports (including
1348    /// `--no-default-features --features sqlite`); pinned by
1349    /// `sqlite_only_compile_surface_tests::deliver_signal_addressable_under_sqlite_only`.
1350    pub async fn deliver_signal(
1351        &self,
1352        execution_id: &ExecutionId,
1353        waitpoint_id: &WaitpointId,
1354        signal: crate::task::Signal,
1355    ) -> Result<crate::task::SignalOutcome, SdkError> {
1356        let args = ff_core::contracts::DeliverSignalArgs {
1357            execution_id: execution_id.clone(),
1358            waitpoint_id: waitpoint_id.clone(),
1359            signal_id: ff_core::types::SignalId::new(),
1360            signal_name: signal.signal_name,
1361            signal_category: signal.signal_category,
1362            source_type: signal.source_type,
1363            source_identity: signal.source_identity,
1364            payload: signal.payload,
1365            payload_encoding: Some("json".to_owned()),
1366            correlation_id: None,
1367            idempotency_key: signal.idempotency_key,
1368            target_scope: "waitpoint".to_owned(),
1369            created_at: Some(TimestampMs::now()),
1370            dedup_ttl_ms: None,
1371            resume_delay_ms: None,
1372            max_signals_per_execution: None,
1373            signal_maxlen: None,
1374            waitpoint_token: signal.waitpoint_token,
1375            now: TimestampMs::now(),
1376        };
1377
1378        let result = self.backend.deliver_signal(args).await?;
1379        Ok(match result {
1380            ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1381                if effect == "resume_condition_satisfied" {
1382                    crate::task::SignalOutcome::TriggeredResume { signal_id }
1383                } else {
1384                    crate::task::SignalOutcome::Accepted { signal_id, effect }
1385                }
1386            }
1387            ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1388                crate::task::SignalOutcome::Duplicate {
1389                    existing_signal_id: existing_signal_id.to_string(),
1390                }
1391            }
1392        })
1393    }
1394
1395    #[cfg(feature = "direct-valkey-claim")]
1396    fn next_lane(&self) -> LaneId {
1397        let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1398        self.config.lanes[idx].clone()
1399    }
1400}
1401
1402#[cfg(feature = "direct-valkey-claim")]
1403fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1404    use ff_core::error::ErrorClass;
1405    matches!(
1406        ff_script::engine_error_ext::class(err),
1407        ErrorClass::Retryable | ErrorClass::Informational
1408    )
1409}
1410
1411/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1412/// instance id with FNV-1a to place distinct worker processes on different
1413/// partition windows from their first poll. Zero is valid for single-worker
1414/// clusters but spreads work in multi-worker deployments.
1415#[cfg(feature = "direct-valkey-claim")]
1416fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1417    if num_partitions == 0 {
1418        return 0;
1419    }
1420    (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1421}
1422
1423/// Cross-check the [`ReclaimGrant`] handed back by
1424/// `issue_reclaim_grant` against the [`ReclaimExecutionArgs`] the
1425/// consumer is about to dispatch. Catches the grant-for-A + args-for-B
1426/// misuse at the SDK boundary before a backend round-trip (PR #407
1427/// review F1).
1428///
1429/// The overlap between the two types is `(execution_id, lane_id)`;
1430/// `partition_key` / `grant_key` live only on the grant and are
1431/// verified server-side. The grant's `expires_at_ms` is also
1432/// validated against `now_ms` so an expired grant fails fast without
1433/// burning a backend call (the backend enforces expiry too, but the
1434/// SDK-side check gives a crisper, typed error).
1435///
1436/// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1437/// [`ReclaimExecutionArgs`]: ff_core::contracts::ReclaimExecutionArgs
1438fn validate_reclaim_grant_against_args(
1439    grant: &ff_core::contracts::ReclaimGrant,
1440    args: &ff_core::contracts::ReclaimExecutionArgs,
1441    now_ms: u64,
1442) -> Result<(), SdkError> {
1443    if grant.execution_id != args.execution_id {
1444        return Err(SdkError::Config {
1445            context: "claim_from_reclaim_grant".to_owned(),
1446            field: Some("execution_id".to_owned()),
1447            message: format!(
1448                "grant.execution_id ({}) does not match args.execution_id ({})",
1449                grant.execution_id, args.execution_id
1450            ),
1451        });
1452    }
1453    if grant.lane_id != args.lane_id {
1454        return Err(SdkError::Config {
1455            context: "claim_from_reclaim_grant".to_owned(),
1456            field: Some("lane_id".to_owned()),
1457            message: format!(
1458                "grant.lane_id ({}) does not match args.lane_id ({})",
1459                grant.lane_id.as_str(),
1460                args.lane_id.as_str()
1461            ),
1462        });
1463    }
1464    if grant.expires_at_ms <= now_ms {
1465        return Err(SdkError::Config {
1466            context: "claim_from_reclaim_grant".to_owned(),
1467            field: Some("expires_at_ms".to_owned()),
1468            message: format!(
1469                "grant expired: expires_at_ms={} now_ms={}",
1470                grant.expires_at_ms, now_ms
1471            ),
1472        });
1473    }
1474    Ok(())
1475}
1476
1477#[cfg(test)]
1478mod reclaim_grant_validation_tests {
1479    //! Unit tests for `validate_reclaim_grant_against_args` — the
1480    //! SDK-side cross-check that catches grant/args mismatch (PR #407
1481    //! review F1). No Valkey / backend required: the helper is pure.
1482    use super::validate_reclaim_grant_against_args;
1483    use crate::SdkError;
1484    use ff_core::contracts::{ReclaimExecutionArgs, ReclaimGrant};
1485    use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1486    use ff_core::types::{
1487        AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseId, WorkerId, WorkerInstanceId,
1488    };
1489
1490    const EXEC_A: &str = "{fp:7}:00000000-0000-4000-8000-000000000001";
1491    const EXEC_B: &str = "{fp:7}:00000000-0000-4000-8000-000000000002";
1492
1493    fn exec(s: &str) -> ExecutionId {
1494        ExecutionId::parse(s).expect("valid execution id")
1495    }
1496
1497    fn grant_for(execution_id: ExecutionId, lane: &str, expires_at_ms: u64) -> ReclaimGrant {
1498        ReclaimGrant::new(
1499            execution_id,
1500            PartitionKey::from(&Partition { family: PartitionFamily::Flow, index: 7 }),
1501            "reclaim:grant:abc".to_owned(),
1502            expires_at_ms,
1503            LaneId::new(lane),
1504        )
1505    }
1506
1507    fn args_for(execution_id: ExecutionId, lane: &str) -> ReclaimExecutionArgs {
1508        ReclaimExecutionArgs::new(
1509            execution_id,
1510            WorkerId::new("w1"),
1511            WorkerInstanceId::new("w1-i1"),
1512            LaneId::new(lane),
1513            None,
1514            LeaseId::new(),
1515            30_000,
1516            AttemptId::new(),
1517            "{}".to_owned(),
1518            None,
1519            WorkerInstanceId::new("w1-i0"),
1520            AttemptIndex::new(0),
1521        )
1522    }
1523
1524    #[test]
1525    fn accepts_matching_grant_and_args() {
1526        let g = grant_for(exec(EXEC_A), "main", 2_000);
1527        let a = args_for(exec(EXEC_A), "main");
1528        assert!(validate_reclaim_grant_against_args(&g, &a, 1_000).is_ok());
1529    }
1530
1531    #[test]
1532    fn rejects_mismatched_execution_id() {
1533        let g = grant_for(exec(EXEC_A), "main", 2_000);
1534        let a = args_for(exec(EXEC_B), "main");
1535        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1536            .expect_err("expected mismatched execution_id to fail");
1537        match err {
1538            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("execution_id")),
1539            other => panic!("expected Config error, got {other:?}"),
1540        }
1541    }
1542
1543    #[test]
1544    fn rejects_mismatched_lane_id() {
1545        let g = grant_for(exec(EXEC_A), "main", 2_000);
1546        let a = args_for(exec(EXEC_A), "other");
1547        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1548            .expect_err("expected mismatched lane_id to fail");
1549        match err {
1550            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("lane_id")),
1551            other => panic!("expected Config error, got {other:?}"),
1552        }
1553    }
1554
1555    #[test]
1556    fn rejects_expired_grant() {
1557        // expires_at_ms == now_ms is rejected (strict `<=`) so the
1558        // server's TTL enforcement window can't race us.
1559        let g = grant_for(exec(EXEC_A), "main", 1_000);
1560        let a = args_for(exec(EXEC_A), "main");
1561        let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1562            .expect_err("expected expired grant to fail");
1563        match err {
1564            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1565            other => panic!("expected Config error, got {other:?}"),
1566        }
1567
1568        // Also rejected when already past expiry.
1569        let err2 = validate_reclaim_grant_against_args(&g, &a, 5_000)
1570            .expect_err("expected past-expiry grant to fail");
1571        match err2 {
1572            SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1573            other => panic!("expected Config error, got {other:?}"),
1574        }
1575    }
1576}
1577
1578#[cfg(test)]
1579mod completion_accessor_type_tests {
1580    //! Type-level compile check that
1581    //! [`FlowFabricWorker::completion_backend`] returns an
1582    //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1583    //! the assertion is at the function-pointer type level and the
1584    //! #[test] body exists solely so the compiler elaborates it.
1585    use super::FlowFabricWorker;
1586    use ff_core::completion_backend::CompletionBackend;
1587    use std::sync::Arc;
1588
1589    #[test]
1590    fn completion_backend_accessor_signature() {
1591        // If this line compiles, the public accessor returns the
1592        // advertised type. The function is never called (no live
1593        // worker), so no I/O happens.
1594        let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1595            FlowFabricWorker::completion_backend;
1596    }
1597}
1598
1599/// RFC-023 Phase 1a §4.4 item 10g compile-only anchor. Parallels
1600/// `completion_accessor_type_tests` but fires under the sqlite-only
1601/// feature set: proves `FlowFabricWorker::connect_with` is callable
1602/// and returns the advertised type under `--no-default-features,
1603/// features = ["sqlite"]`. The matching `§9` CI cell (`cargo check
1604/// -p ff-sdk --no-default-features --features sqlite`) executes this
1605/// compile check mechanically so future PRs that accidentally reach
1606/// outside the `valkey-default` gate fail the build.
1607#[cfg(all(test, not(feature = "valkey-default")))]
1608mod sqlite_only_compile_surface_tests {
1609    use super::FlowFabricWorker;
1610    use ff_core::completion_backend::CompletionBackend;
1611    use ff_core::engine_backend::EngineBackend;
1612    use std::sync::Arc;
1613
1614    #[test]
1615    fn addressable_surface_under_sqlite_only() {
1616        // Type-level proof that the backend-agnostic accessors are
1617        // reachable without the `valkey-default` feature. None of
1618        // these are called; the assignment targets pin the signature.
1619        let _a: fn(
1620            &FlowFabricWorker,
1621        ) -> Option<&Arc<dyn EngineBackend>> = FlowFabricWorker::backend;
1622        let _b: fn(
1623            &FlowFabricWorker,
1624        ) -> Option<Arc<dyn CompletionBackend>> =
1625            FlowFabricWorker::completion_backend;
1626        let _c: fn(&FlowFabricWorker) -> &crate::config::WorkerConfig =
1627            FlowFabricWorker::config;
1628        let _d: fn(&FlowFabricWorker) -> &ff_core::partition::PartitionConfig =
1629            FlowFabricWorker::partition_config;
1630    }
1631
1632    /// v0.12 PR-1 compile anchor — the new
1633    /// [`EngineBackend::read_execution_context`] trait method MUST be
1634    /// addressable under `--no-default-features --features sqlite`. A
1635    /// direct fn-pointer cast is awkward under `#[async_trait]`
1636    /// (lifetime-generic fn items don't coerce to `fn` pointers), so
1637    /// we take the next-best compile proof: exercise a generic that
1638    /// names the method through the trait bound. A bodyless call would
1639    /// require a concrete backend; the generic keeps the test pure
1640    /// compile-time.
1641    #[test]
1642    fn read_execution_context_addressable_under_sqlite_only() {
1643        use ff_core::contracts::ExecutionContext;
1644        use ff_core::engine_error::EngineError;
1645        use ff_core::types::ExecutionId;
1646
1647        #[allow(dead_code)]
1648        async fn _pin<B: EngineBackend + ?Sized>(
1649            b: &B,
1650            id: &ExecutionId,
1651        ) -> Result<ExecutionContext, EngineError> {
1652            b.read_execution_context(id).await
1653        }
1654    }
1655
1656    /// v0.12 PR-2 compile anchor — `ClaimedTask` as a type MUST be
1657    /// addressable under `--no-default-features --features sqlite`
1658    /// (the `task` module is no longer `valkey-default`-gated at the
1659    /// module level). As of v0.12 PR-5.5 the `impl ClaimedTask { ... }`
1660    /// block is likewise ungated: the backend mints the `Handle` at
1661    /// claim time and `cloned_handle` just clones the cached field, so
1662    /// no Valkey-specific synthesis is required. This anchor pins the
1663    /// struct path + its field types through a generic-over-T wrapper
1664    /// so a compile-time lookup of `ClaimedTask` is exercised
1665    /// mechanically under the sqlite-only feature set.
1666    #[test]
1667    fn claimed_task_type_addressable_under_sqlite_only() {
1668        use crate::task::ClaimedTask;
1669
1670        #[allow(dead_code)]
1671        fn _pin<T>(_: std::marker::PhantomData<T>) -> std::marker::PhantomData<ClaimedTask> {
1672            std::marker::PhantomData
1673        }
1674    }
1675
1676    /// v0.12 PR-6 compile anchor — ungating `admin` + `snapshot`
1677    /// modules at module level must not re-introduce ferriskey
1678    /// symbols under `--no-default-features --features sqlite`.
1679    /// Pins that [`FlowFabricAdminClient::new`] and a representative
1680    /// snapshot forwarder are addressable without the
1681    /// `valkey-default` feature.
1682    #[test]
1683    fn admin_and_snapshot_addressable_under_sqlite_only() {
1684        use crate::admin::FlowFabricAdminClient;
1685        use crate::SdkError;
1686        use ff_core::contracts::ExecutionSnapshot;
1687        use ff_core::types::ExecutionId;
1688
1689        // `FlowFabricAdminClient::new` is `fn(impl Into<String>)` —
1690        // can't fn-pointer-coerce directly. A no-op call with a
1691        // concrete `&str` pins the method addressably under the
1692        // sqlite-only feature set (the error return-type is the
1693        // same `SdkError` the rest of the module returns).
1694        #[allow(dead_code)]
1695        fn _pin_admin_new() -> Result<FlowFabricAdminClient, SdkError> {
1696            FlowFabricAdminClient::new("http://anchor")
1697        }
1698
1699        // Snapshot method is `async`, so coerce via the same
1700        // trait-bound pattern as `read_execution_context` above.
1701        #[allow(dead_code)]
1702        async fn _pin_describe(
1703            w: &FlowFabricWorker,
1704            id: &ExecutionId,
1705        ) -> Result<Option<ExecutionSnapshot>, SdkError> {
1706            w.describe_execution(id).await
1707        }
1708    }
1709
1710    /// v0.13 SC-10 compile anchor — the backend-agnostic admin facade
1711    /// (`FlowFabricAdminClient::connect_with` + embedded-transport
1712    /// methods) MUST be addressable under
1713    /// `--no-default-features --features sqlite`. Without this anchor,
1714    /// a regression that re-introduces a ferriskey symbol on the
1715    /// embedded admin path would only surface on downstream
1716    /// sqlite-only consumers.
1717    #[test]
1718    fn admin_facade_addressable_under_sqlite_only() {
1719        use crate::admin::{
1720            ClaimForWorkerRequest, ClaimForWorkerResponse, FlowFabricAdminClient,
1721            IssueReclaimGrantRequest, IssueReclaimGrantResponse, RotateWaitpointSecretRequest,
1722            RotateWaitpointSecretResponse,
1723        };
1724        use crate::SdkError;
1725        use std::sync::Arc;
1726
1727        // `connect_with` is infallible; fn-pointer coerce pins the
1728        // signature under the sqlite-only feature set.
1729        let _ctor: fn(Arc<dyn EngineBackend>) -> FlowFabricAdminClient =
1730            FlowFabricAdminClient::connect_with;
1731
1732        // Each async method is generic over `&self` lifetimes under
1733        // `#[async_trait]`-style expansion, so pin via a wrapper fn.
1734        #[allow(dead_code)]
1735        async fn _pin_claim(
1736            c: &FlowFabricAdminClient,
1737            req: ClaimForWorkerRequest,
1738        ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
1739            c.claim_for_worker(req).await
1740        }
1741        #[allow(dead_code)]
1742        async fn _pin_reclaim(
1743            c: &FlowFabricAdminClient,
1744            id: &str,
1745            req: IssueReclaimGrantRequest,
1746        ) -> Result<IssueReclaimGrantResponse, SdkError> {
1747            c.issue_reclaim_grant(id, req).await
1748        }
1749        #[allow(dead_code)]
1750        async fn _pin_rotate(
1751            c: &FlowFabricAdminClient,
1752            req: RotateWaitpointSecretRequest,
1753        ) -> Result<RotateWaitpointSecretResponse, SdkError> {
1754            c.rotate_waitpoint_secret(req).await
1755        }
1756    }
1757
1758    /// v0.12 PR-3 compile anchor — the new
1759    /// [`EngineBackend::read_current_attempt_index`] trait method MUST
1760    /// be addressable under `--no-default-features --features sqlite`.
1761    /// Mirrors the PR-1 `read_execution_context` anchor: takes a
1762    /// generic over the trait bound so `#[async_trait]` lifetime
1763    /// elision doesn't block an `fn`-pointer coercion.
1764    #[test]
1765    fn read_current_attempt_index_addressable_under_sqlite_only() {
1766        use ff_core::engine_error::EngineError;
1767        use ff_core::types::{AttemptIndex, ExecutionId};
1768
1769        #[allow(dead_code)]
1770        async fn _pin<B: EngineBackend + ?Sized>(
1771            b: &B,
1772            id: &ExecutionId,
1773        ) -> Result<AttemptIndex, EngineError> {
1774            b.read_current_attempt_index(id).await
1775        }
1776    }
1777
1778    /// v0.12 PR-5.5 retry-path-fix compile anchor — the new
1779    /// [`EngineBackend::read_total_attempt_count`] trait method MUST
1780    /// be addressable under `--no-default-features --features sqlite`.
1781    /// Mirrors the PR-3 `read_current_attempt_index` anchor.
1782    #[test]
1783    fn read_total_attempt_count_addressable_under_sqlite_only() {
1784        use ff_core::engine_error::EngineError;
1785        use ff_core::types::{AttemptIndex, ExecutionId};
1786
1787        #[allow(dead_code)]
1788        async fn _pin<B: EngineBackend + ?Sized>(
1789            b: &B,
1790            id: &ExecutionId,
1791        ) -> Result<AttemptIndex, EngineError> {
1792            b.read_total_attempt_count(id).await
1793        }
1794    }
1795
1796    /// v0.12 PR-4 compile anchor — the new
1797    /// [`EngineBackend::claim_execution`] trait method MUST be
1798    /// addressable under `--no-default-features --features sqlite`.
1799    /// Mirrors the PR-3 `read_current_attempt_index` anchor: takes a
1800    /// generic over the trait bound so `#[async_trait]` lifetime
1801    /// elision doesn't block a plain fn-pointer coercion.
1802    ///
1803    /// The method has an `Err(Unavailable)` default impl; PG + SQLite
1804    /// backends don't override it today (grants are Valkey-only until
1805    /// the PG/SQLite grant-consumer RFC lands). The anchor pins the
1806    /// trait-surface signature — not a runtime call — so the compile
1807    /// check passes cleanly on every feature set.
1808    #[test]
1809    fn claim_execution_addressable_under_sqlite_only() {
1810        use ff_core::contracts::{ClaimExecutionArgs, ClaimExecutionResult};
1811        use ff_core::engine_error::EngineError;
1812
1813        #[allow(dead_code)]
1814        async fn _pin<B: EngineBackend + ?Sized>(
1815            b: &B,
1816            args: ClaimExecutionArgs,
1817        ) -> Result<ClaimExecutionResult, EngineError> {
1818            b.claim_execution(args).await
1819        }
1820    }
1821
1822    /// v0.12 PR-5 compile anchor — the three new scanner-primitive
1823    /// trait methods (`scan_eligible_executions`, `issue_claim_grant`,
1824    /// `block_route`) MUST be addressable under
1825    /// `--no-default-features --features sqlite`. Each has an
1826    /// `Err(Unavailable)` default impl; PG + SQLite backends don't
1827    /// override (the scheduler-routed `claim_for_worker` path is the
1828    /// supported PG/SQLite entry point). The anchor pins the trait-
1829    /// surface signatures — not runtime calls — so the compile check
1830    /// passes cleanly on every feature set.
1831    #[test]
1832    fn scan_eligible_executions_addressable_under_sqlite_only() {
1833        use ff_core::contracts::ScanEligibleArgs;
1834        use ff_core::engine_error::EngineError;
1835        use ff_core::types::ExecutionId;
1836
1837        #[allow(dead_code)]
1838        async fn _pin<B: EngineBackend + ?Sized>(
1839            b: &B,
1840            args: ScanEligibleArgs,
1841        ) -> Result<Vec<ExecutionId>, EngineError> {
1842            b.scan_eligible_executions(args).await
1843        }
1844    }
1845
1846    #[test]
1847    fn issue_claim_grant_addressable_under_sqlite_only() {
1848        use ff_core::contracts::{IssueClaimGrantArgs, IssueClaimGrantOutcome};
1849        use ff_core::engine_error::EngineError;
1850
1851        #[allow(dead_code)]
1852        async fn _pin<B: EngineBackend + ?Sized>(
1853            b: &B,
1854            args: IssueClaimGrantArgs,
1855        ) -> Result<IssueClaimGrantOutcome, EngineError> {
1856            b.issue_claim_grant(args).await
1857        }
1858    }
1859
1860    #[test]
1861    fn block_route_addressable_under_sqlite_only() {
1862        use ff_core::contracts::{BlockRouteArgs, BlockRouteOutcome};
1863        use ff_core::engine_error::EngineError;
1864
1865        #[allow(dead_code)]
1866        async fn _pin<B: EngineBackend + ?Sized>(
1867            b: &B,
1868            args: BlockRouteArgs,
1869        ) -> Result<BlockRouteOutcome, EngineError> {
1870            b.block_route(args).await
1871        }
1872    }
1873
1874    /// v0.12 PR-3 compile anchor — `FlowFabricWorker::deliver_signal`
1875    /// MUST be addressable under `--no-default-features --features sqlite`
1876    /// (ungated in PR-3 — the body is pure
1877    /// `self.backend.deliver_signal(...)` trait dispatch).
1878    #[test]
1879    fn deliver_signal_addressable_under_sqlite_only() {
1880        use crate::task::{Signal, SignalOutcome};
1881        use crate::SdkError;
1882        use ff_core::types::{ExecutionId, WaitpointId};
1883        use std::future::Future;
1884        use std::pin::Pin;
1885
1886        // `deliver_signal` is `async fn`, so its item signature bakes
1887        // in a hidden lifetime and opaque return future. Take an
1888        // `fn`-pointer to an explicit wrapper that names the return
1889        // type through the trait — same shape as the PR-1 anchor
1890        // (`read_execution_context_addressable_under_sqlite_only`).
1891        #[allow(dead_code)]
1892        fn _pin<'a>(
1893            w: &'a FlowFabricWorker,
1894            id: &'a ExecutionId,
1895            wp: &'a WaitpointId,
1896            s: Signal,
1897        ) -> Pin<Box<dyn Future<Output = Result<SignalOutcome, SdkError>> + Send + 'a>> {
1898            Box::pin(w.deliver_signal(id, wp, s))
1899        }
1900    }
1901
1902    /// v0.12 PR-5.5 compile anchor — `claim_from_grant` MUST be
1903    /// addressable under `--no-default-features --features sqlite`
1904    /// (ungated in PR-5.5). PG / SQLite return `EngineError::Unavailable`
1905    /// from the underlying trait method today, so the call path is
1906    /// compile-reachable but runtime-unavailable. The anchor pins the
1907    /// signature; a future PR wiring real PG/SQLite grant-consumer
1908    /// bodies flips the runtime behaviour without touching this test.
1909    #[test]
1910    fn claim_from_grant_addressable_under_sqlite_only() {
1911        use crate::task::ClaimedTask;
1912        use crate::SdkError;
1913        use ff_core::contracts::ClaimGrant;
1914        use ff_core::types::LaneId;
1915        use std::future::Future;
1916        use std::pin::Pin;
1917
1918        #[allow(dead_code)]
1919        fn _pin<'a>(
1920            w: &'a FlowFabricWorker,
1921            lane: LaneId,
1922            grant: ClaimGrant,
1923        ) -> Pin<Box<dyn Future<Output = Result<ClaimedTask, SdkError>> + Send + 'a>> {
1924            Box::pin(w.claim_from_grant(lane, grant))
1925        }
1926    }
1927
1928    /// v0.12 PR-5.5 compile anchor — `claim_via_server` MUST be
1929    /// addressable under `--no-default-features --features sqlite`.
1930    /// The scheduler-routed path is the supported PG/SQLite claim
1931    /// entry point per `project_claim_from_grant_pg_sqlite_gap.md`;
1932    /// pinning the signature here prevents a future PR from
1933    /// accidentally re-gating it behind `valkey-default`.
1934    #[test]
1935    fn claim_via_server_addressable_under_sqlite_only() {
1936        use crate::admin::FlowFabricAdminClient;
1937        use crate::task::ClaimedTask;
1938        use crate::SdkError;
1939        use ff_core::types::LaneId;
1940        use std::future::Future;
1941        use std::pin::Pin;
1942
1943        #[allow(dead_code)]
1944        fn _pin<'a>(
1945            w: &'a FlowFabricWorker,
1946            admin: &'a FlowFabricAdminClient,
1947            lane: &'a LaneId,
1948            grant_ttl_ms: u64,
1949        ) -> Pin<Box<dyn Future<Output = Result<Option<ClaimedTask>, SdkError>> + Send + 'a>>
1950        {
1951            Box::pin(w.claim_via_server(admin, lane, grant_ttl_ms))
1952        }
1953    }
1954
1955    /// v0.12 PR-5.5 compile anchor — `ClaimedTask::{complete, fail,
1956    /// cancel}` MUST be addressable under `--no-default-features
1957    /// --features sqlite`. The `impl ClaimedTask` block is now
1958    /// module-level ungated (PR-5.5); the terminal ops route through
1959    /// `EngineBackend::{complete, fail, cancel}` which are core trait
1960    /// methods (no `streaming` / `suspension` / `budget` gate).
1961    #[test]
1962    fn claimed_task_terminal_ops_addressable_under_sqlite_only() {
1963        use crate::task::{ClaimedTask, FailOutcome};
1964        use crate::SdkError;
1965        use std::future::Future;
1966        use std::pin::Pin;
1967
1968        #[allow(dead_code)]
1969        fn _pin_complete(
1970            t: ClaimedTask,
1971            payload: Option<Vec<u8>>,
1972        ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send>> {
1973            Box::pin(t.complete(payload))
1974        }
1975
1976        #[allow(dead_code)]
1977        fn _pin_fail<'a>(
1978            t: ClaimedTask,
1979            reason: &'a str,
1980            error_category: &'a str,
1981        ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, SdkError>> + Send + 'a>> {
1982            Box::pin(t.fail(reason, error_category))
1983        }
1984
1985        #[allow(dead_code)]
1986        fn _pin_cancel<'a>(
1987            t: ClaimedTask,
1988            reason: &'a str,
1989        ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send + 'a>> {
1990            Box::pin(t.cancel(reason))
1991        }
1992    }
1993}
1994
1995#[cfg(all(test, feature = "direct-valkey-claim"))]
1996mod scan_cursor_tests {
1997    use super::scan_cursor_seed;
1998
1999    #[test]
2000    fn stable_for_same_input() {
2001        assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
2002    }
2003
2004    #[test]
2005    fn distinct_for_different_ids() {
2006        assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
2007    }
2008
2009    #[test]
2010    fn bounded_by_partition_count() {
2011        for i in 0..100 {
2012            assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
2013        }
2014    }
2015
2016    #[test]
2017    fn zero_partitions_returns_zero() {
2018        assert_eq!(scan_cursor_seed("w1", 0), 0);
2019    }
2020}