ff_sdk/worker.rs
1use std::collections::HashMap;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4
5// RFC-023 Phase 1a (§4.4 item 10b): ferriskey imports scoped behind
6// `valkey-default` so the module compiles under
7// `--no-default-features, features = ["sqlite"]`.
8#[cfg(feature = "valkey-default")]
9use ferriskey::Client;
10use ff_core::partition::PartitionConfig;
11use ff_core::types::*;
12use tokio::sync::Semaphore;
13
14use crate::config::WorkerConfig;
15use crate::task::ClaimedTask;
16use crate::SdkError;
17
18/// FlowFabric worker — connects to Valkey, claims executions, and provides
19/// the worker-facing API.
20///
21/// # Admission control
22///
23/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
24/// **bypasses the scheduler's admission controls**: it reads the eligible
25/// ZSET directly and mints its own claim grant without consulting budget
26/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
27/// benchmarks, tests, and single-tenant development where the scheduler
28/// hop is measurement noise, not for production.
29///
30/// For production deployments, consume scheduler-issued grants via
31/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
32/// budget breach, quota sliding-window, concurrency cap, and
33/// capability-match checks before issuing grants.
34///
35/// # Usage
36///
37/// ```rust,ignore
38/// use ff_core::backend::BackendConfig;
39/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
40/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
41///
42/// let config = WorkerConfig {
43/// backend: Some(BackendConfig::valkey("localhost", 6379)),
44/// worker_id: WorkerId::new("w1"),
45/// worker_instance_id: WorkerInstanceId::new("w1-i1"),
46/// namespace: Namespace::new("default"),
47/// lanes: vec![LaneId::new("main")],
48/// capabilities: Vec::new(),
49/// lease_ttl_ms: 30_000,
50/// claim_poll_interval_ms: 1_000,
51/// max_concurrent_tasks: 1,
52/// partition_config: None,
53/// };
54/// let worker = FlowFabricWorker::connect(config).await?;
55///
56/// loop {
57/// if let Some(task) = worker.claim_next().await? {
58/// // Process task...
59/// task.complete(Some(b"result".to_vec())).await?;
60/// } else {
61/// tokio::time::sleep(Duration::from_secs(1)).await;
62/// }
63/// }
64/// ```
65pub struct FlowFabricWorker {
66 /// RFC-023 Phase 1a (§4.4 item 10c): `ferriskey::Client` is
67 /// `valkey-default`-gated **and** `Option` wrapped. Under
68 /// sqlite-only features the field is absent. Under
69 /// `valkey-default` the field is `Some(client)` when the worker
70 /// was built via [`Self::connect`] (the Valkey-bundled entry
71 /// point that dials), and `None` when it was built via
72 /// [`Self::connect_with`] (the backend-agnostic entry point per
73 /// §4.4 item 10e — no ferriskey round-trip). Claim/signal hot
74 /// paths (all `valkey-default`-gated per §4.4 item 10f) expect
75 /// `Some`; a claim call against a `connect_with`-built worker
76 /// panics with a clear message until the backend-agnostic SDK
77 /// worker-loop RFC lands (tracked in §8).
78 #[cfg(feature = "valkey-default")]
79 #[allow(dead_code)]
80 client: Option<Client>,
81 config: WorkerConfig,
82 partition_config: PartitionConfig,
83 /// 8-hex FNV-1a digest of the sorted capabilities CSV. Used in
84 /// per-mismatch logs so the 4KB CSV never echoes on every reject
85 /// during an incident. For [`Self::connect`]-built workers, the
86 /// full CSV is additionally logged once at connect-time WARN via
87 /// `valkey_preamble::run` for cross-reference; [`Self::connect_with`]-
88 /// built workers compute the hash without the companion CSV log.
89 /// Mirrors `ff-scheduler::claim::worker_caps_digest`.
90 worker_capabilities_hash: String,
91 lane_index: AtomicUsize,
92 /// Concurrency cap for in-flight tasks. Permits are acquired or
93 /// transferred by [`claim_next`] (feature-gated),
94 /// [`claim_from_grant`] (always available), and
95 /// [`claim_from_reclaim_grant`], transferred to the returned
96 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
97 /// Holds `max_concurrent_tasks` permits total.
98 ///
99 /// [`claim_next`]: FlowFabricWorker::claim_next
100 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
101 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
102 #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
103 concurrency_semaphore: Arc<Semaphore>,
104 /// Rolling offset for chunked partition scans. Each poll advances the
105 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
106 /// chunk)` polls every partition is covered. The initial value is
107 /// derived from `worker_instance_id` so idle workers spread their
108 /// scans across different partitions from the first poll onward.
109 ///
110 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
111 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
112 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
113 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
114 /// correctness because the sequence simply restarts a new cycle.
115 scan_cursor: AtomicUsize,
116 /// The [`EngineBackend`] the Stage-1b trait forwarders route
117 /// through.
118 ///
119 /// **RFC-012 Stage 1b.** Always populated:
120 /// [`FlowFabricWorker::connect`] now wraps the worker's own
121 /// `ferriskey::Client` in a `ValkeyBackend` via
122 /// `ValkeyBackend::from_client_and_partitions`, and
123 /// [`FlowFabricWorker::connect_with`] replaces that default with
124 /// the caller-supplied `Arc<dyn EngineBackend>`. The
125 /// [`FlowFabricWorker::backend`] accessor still returns
126 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
127 /// narrows the return type once consumers have migrated.
128 ///
129 /// Hot paths (claim, deliver_signal, admin queries) still use the
130 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
131 /// migrates them through this field, and Stage 1d removes the
132 /// embedded client.
133 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
134 /// Optional handle to the same underlying backend viewed as a
135 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
136 /// Populated by [`Self::connect`] from the bundled
137 /// `ValkeyBackend` (which implements the trait); supplied by the
138 /// caller on [`Self::connect_with`] as an explicit
139 /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
140 /// backend does not support push-based completion" (e.g. a future
141 /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
142 /// and other completion-subscription consumers reach this through
143 /// [`Self::completion_backend`].
144 completion_backend_handle:
145 Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
146}
147
148/// Number of partitions scanned per scheduler-bypass claim poll
149/// (`claim_next_via_backend` and its `direct-valkey-claim`-gated
150/// back-compat alias `claim_next`). Keeps idle backend load at
151/// O(PARTITION_SCAN_CHUNK) per worker-second instead of
152/// O(num_flow_partitions).
153const PARTITION_SCAN_CHUNK: usize = 32;
154
155impl FlowFabricWorker {
156 /// RFC-023 Phase 1a helper: borrow the embedded `ferriskey::Client`,
157 /// panicking with a clear message if the worker was built via
158 /// [`Self::connect_with`] (which does not dial Valkey). Every
159 /// Valkey-specific claim/signal method funnels through this accessor
160 /// so the panic site is uniform and traceable.
161 ///
162 /// A backend-agnostic claim/signal API is deferred per §8 breaking-
163 /// change disclosure; until that lands, valkey-default consumers
164 /// must use [`Self::connect`] if they intend to drive the Valkey
165 /// claim/signal loop.
166 #[cfg(feature = "valkey-default")]
167 #[inline]
168 #[allow(dead_code)]
169 fn valkey_client(&self) -> &Client {
170 self.client.as_ref().expect(
171 "FlowFabricWorker was built via connect_with (no Valkey dial) \
172 but a Valkey-specific claim/signal method was invoked. \
173 Use FlowFabricWorker::connect to dial Valkey, or drive the \
174 backend directly through the trait surface via .backend().",
175 )
176 }
177}
178
179impl FlowFabricWorker {
180 /// Connect to Valkey and prepare the worker.
181 ///
182 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
183 /// library — that is the server's responsibility (ff-server calls
184 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
185 /// the library is already loaded.
186 ///
187 /// # Smoke / dev scripts: rotate `WorkerInstanceId`
188 ///
189 /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
190 /// `WorkerInstanceId`. When a smoke / dev script reuses the same
191 /// `WorkerInstanceId` across restarts, subsequent runs trap behind
192 /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
193 /// configured lease TTL) expires — the worker appears stuck and
194 /// claims nothing. Iterative scripts should synthesise a fresh
195 /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
196 /// or embed a UUID/timestamp) rather than hard-coding a stable
197 /// value. Production workers that cleanly shut down release the
198 /// key; only crashed / kill -9'd processes hit this trap.
199 ///
200 /// **RFC-023 Phase 1a (§4.4 item 10d, v0.12.0):** this Valkey-bundled
201 /// entry point is `#[cfg(feature = "valkey-default")]`-gated.
202 /// Consumers on the sqlite-only feature set must use
203 /// [`FlowFabricWorker::connect_with`] and drive the backend directly
204 /// through the trait surface.
205 #[cfg(feature = "valkey-default")]
206 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
207 if config.lanes.is_empty() {
208 return Err(SdkError::Config {
209 context: "worker_config".into(),
210 field: None,
211 message: "at least one lane is required".into(),
212 });
213 }
214
215 // Build the ferriskey client from the nested `BackendConfig`.
216 // Delegates to `ff_backend_valkey::build_client` so host/port +
217 // TLS + cluster + `BackendTimeouts::request` + `BackendRetry`
218 // wiring lives in exactly one place (RFC-012 Stage 1c tranche 1).
219 //
220 // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
221 // Finding 2): `WorkerConfig.backend` is `Option<BackendConfig>`
222 // because `connect_with` ignores it. The URL-based `connect`
223 // path still requires a concrete `BackendConfig` to dial;
224 // reject `None` up front with a typed Config error instead of
225 // silently using a default that would hide a caller mistake.
226 let backend_config = config.backend.as_ref().ok_or_else(|| SdkError::Config {
227 context: "worker_config".into(),
228 field: Some("backend".into()),
229 message: "FlowFabricWorker::connect requires WorkerConfig.backend = \
230 Some(BackendConfig::...); use FlowFabricWorker::connect_with for \
231 the backend-agnostic path that takes a pre-built \
232 Arc<dyn EngineBackend>".into(),
233 })?;
234 let client = ff_backend_valkey::build_client(backend_config).await?;
235
236 // v0.12 PR-6: the Valkey-specific preamble (PING, alive-key
237 // SET-NX, `ff:config:partitions` HGETALL, caps ingress +
238 // sorted-dedup CSV, caps HASH + workers-index writes) lives
239 // in `crate::valkey_preamble`. RFC-025 Phase 5 cutover: the
240 // caps write is now a namespaced HASH matching the
241 // `ff_register_worker` FCALL shape. The write order is
242 // observable from scheduler-side reads (unblock scanner:
243 // SMEMBERS ff:idx:{ns}:workers → HGET ff:worker:{ns}:{id}:caps
244 // caps_csv) so preservation is load-bearing. See
245 // `valkey_preamble::run`.
246 let crate::valkey_preamble::PreambleOutput {
247 partition_config,
248 capabilities_csv: _worker_capabilities_csv,
249 capabilities_hash: worker_capabilities_hash,
250 } = crate::valkey_preamble::run(&client, &config).await?;
251
252 let max_tasks = config.max_concurrent_tasks.max(1);
253 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
254
255 tracing::info!(
256 worker_id = %config.worker_id,
257 instance_id = %config.worker_instance_id,
258 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
259 "FlowFabricWorker connected"
260 );
261
262 let scan_cursor_init = scan_cursor_seed(
263 config.worker_instance_id.as_str(),
264 partition_config.num_flow_partitions.max(1) as usize,
265 );
266
267 // RFC-012 Stage 1b: wrap the dialed client in a ValkeyBackend
268 // so `ClaimedTask`'s trait forwarders have something to call.
269 // `from_client_and_partitions` reuses the already-dialed client
270 // — no second connection. Share the concrete
271 // `Arc<ValkeyBackend>` across the two trait objects — one
272 // allocation, both accessors yield identity-equivalent handles.
273 let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
274 ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
275 client.clone(),
276 partition_config,
277 );
278 let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
279 let completion_backend_handle: Option<
280 Arc<dyn ff_core::completion_backend::CompletionBackend>,
281 > = Some(valkey_backend);
282
283 Ok(Self {
284 client: Some(client),
285 config,
286 partition_config,
287 worker_capabilities_hash,
288 lane_index: AtomicUsize::new(0),
289 concurrency_semaphore,
290 scan_cursor: AtomicUsize::new(scan_cursor_init),
291 backend,
292 completion_backend_handle,
293 })
294 }
295
296 /// Store pre-built [`EngineBackend`] and (optional)
297 /// [`CompletionBackend`] handles on the worker. Builds the worker
298 /// via the legacy [`FlowFabricWorker::connect`] path first (so the
299 /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
300 /// paths still use is dialed), then replaces the default
301 /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
302 ///
303 /// The `completion` argument is explicit: 0.3.3 previously accepted
304 /// only `backend` and `completion_backend()` silently returned
305 /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
306 /// upcast to `Arc<dyn CompletionBackend>` without loss of
307 /// trait-object identity. 0.3.4 lets the caller decide.
308 ///
309 /// - `Some(arc)` — caller supplies a completion backend.
310 /// [`Self::completion_backend`] returns `Some(clone)`.
311 /// - `None` — this backend does not support push-based completion
312 /// (future Postgres backend without LISTEN/NOTIFY, test mocks).
313 /// [`Self::completion_backend`] returns `None`.
314 ///
315 /// When the underlying backend implements both traits (as
316 /// `ValkeyBackend` does), pass the same `Arc` twice — the two
317 /// trait-object views share one allocation:
318 ///
319 /// ```rust,ignore
320 /// use std::sync::Arc;
321 /// use ff_backend_valkey::ValkeyBackend;
322 /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
323 ///
324 /// # async fn doc(worker_config: WorkerConfig,
325 /// # backend_config: ff_backend_valkey::BackendConfig)
326 /// # -> Result<(), ff_sdk::SdkError> {
327 /// // Valkey (completion supported):
328 /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
329 /// let worker = FlowFabricWorker::connect_with(
330 /// worker_config,
331 /// valkey.clone(),
332 /// Some(valkey),
333 /// ).await?;
334 /// # Ok(()) }
335 /// ```
336 ///
337 /// Backend without completion support:
338 ///
339 /// ```rust,ignore
340 /// let worker = FlowFabricWorker::connect_with(
341 /// worker_config,
342 /// backend,
343 /// None,
344 /// ).await?;
345 /// ```
346 ///
347 /// **Stage 1b + Round-7 scope — what the injected backend covers
348 /// today.** The injected backend currently covers these per-task
349 /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
350 /// `delay_execution` / `move_to_waiting_children` / `complete` /
351 /// `cancel` / `fail` / `create_pending_waitpoint` /
352 /// `append_frame` / `report_usage`. A mock backend therefore sees
353 /// that portion of the worker's per-task write surface. Lease
354 /// renewal also routes through `backend.renew(&handle)`. Round-7
355 /// (#135/#145) closed the four trait-shape gaps tracked by #117,
356 /// but `suspend` still reaches the embedded `ferriskey::Client`
357 /// directly via `ff_suspend_execution` — this is the deferred
358 /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
359 /// work. `claim_next` / `claim_from_grant` /
360 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
361 /// are Stage 1c hot-path work. Stage 1d removes the embedded
362 /// client entirely.
363 ///
364 /// Today's constructor is therefore NOT yet a drop-in way to swap
365 /// in a non-Valkey backend — it requires a reachable Valkey node
366 /// for `suspend` plus the remaining hot-path ops. Tests that
367 /// exercise only the migrated per-task ops can run fully against
368 /// a mock backend.
369 ///
370 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
371 /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
372 pub async fn connect_with(
373 config: WorkerConfig,
374 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
375 completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
376 ) -> Result<Self, SdkError> {
377 // RFC-023 Phase 1a (§4.4 item 10e, v0.12.0): direct
378 // backend-agnostic construction. No `Self::connect` preamble,
379 // no ferriskey round-trips (no PING, no alive-key SET-NX, no
380 // `ff:config:partitions` HGETALL). Callers needing a
381 // non-default `PartitionConfig` under non-Valkey backends set
382 // [`WorkerConfig::partition_config`] (v0.12 PR-6 closed the
383 // original follow-up flagged here).
384 //
385 // Lane-empty validation (the one check `connect` did before
386 // any Valkey work) is hoisted here so every entry point
387 // refuses an empty lane list.
388 if config.lanes.is_empty() {
389 return Err(SdkError::Config {
390 context: "worker_config".into(),
391 field: None,
392 message: "at least one lane is required".into(),
393 });
394 }
395
396 // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
397 // Finding 2): `WorkerConfig.backend` is ignored on this path —
398 // the caller-supplied `Arc<dyn EngineBackend>` is authoritative.
399 // If the caller left behind a `Some(..)` from an earlier
400 // `connect(..)` template, warn so the ambiguity is visible.
401 if config.backend.is_some() {
402 tracing::warn!(
403 worker_id = %config.worker_id,
404 instance_id = %config.worker_instance_id,
405 "WorkerConfig.backend is Some(..) but FlowFabricWorker::connect_with \
406 was invoked — BackendConfig is ignored, the injected \
407 Arc<dyn EngineBackend> is authoritative. Set WorkerConfig.backend = \
408 None on the connect_with path to silence this warning."
409 );
410 }
411
412 let max_tasks = config.max_concurrent_tasks.max(1);
413 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
414 // v0.12 PR-6: honor the optional `WorkerConfig::partition_config`
415 // override. `None` keeps the pre-PR-6 default shape (256 / 32 /
416 // 32); `Some(cfg)` lets non-Valkey deployments with a custom
417 // `num_flow_partitions` bind correctly instead of silently
418 // missing data via the wrong partition index.
419 let partition_config = config.partition_config.unwrap_or_default();
420
421 let scan_cursor_init = scan_cursor_seed(
422 config.worker_instance_id.as_str(),
423 partition_config.num_flow_partitions.max(1) as usize,
424 );
425
426 // Capability validation + CSV/hash compute mirrors `connect`'s
427 // preamble (valkey_preamble::run) so both entry points refuse
428 // the same malformed tokens and populate the same
429 // `worker_capabilities_hash` used by claim_next_via_backend's
430 // per-mismatch log.
431 for cap in &config.capabilities {
432 if cap.is_empty() {
433 return Err(SdkError::Config {
434 context: "worker_config".into(),
435 field: Some("capabilities".into()),
436 message: "capability token must not be empty".into(),
437 });
438 }
439 if cap.contains(',') {
440 return Err(SdkError::Config {
441 context: "worker_config".into(),
442 field: Some("capabilities".into()),
443 message: format!(
444 "capability token may not contain ',' (CSV delimiter): {cap:?}"
445 ),
446 });
447 }
448 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
449 return Err(SdkError::Config {
450 context: "worker_config".into(),
451 field: Some("capabilities".into()),
452 message: format!(
453 "capability token must not contain whitespace or control \
454 characters: {cap:?}"
455 ),
456 });
457 }
458 }
459 let worker_capabilities_hash = {
460 let set: std::collections::BTreeSet<&str> = config
461 .capabilities
462 .iter()
463 .map(|s| s.as_str())
464 .filter(|s| !s.is_empty())
465 .collect();
466 let csv: String = set.into_iter().collect::<Vec<_>>().join(",");
467 ff_core::hash::fnv1a_xor8hex(&csv)
468 };
469
470 Ok(Self {
471 #[cfg(feature = "valkey-default")]
472 client: None,
473 config,
474 partition_config,
475 worker_capabilities_hash,
476 lane_index: AtomicUsize::new(0),
477 concurrency_semaphore,
478 scan_cursor: AtomicUsize::new(scan_cursor_init),
479 backend,
480 completion_backend_handle: completion,
481 })
482 }
483
484 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
485 /// ops through.
486 ///
487 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
488 /// the `Option` wrapper is retained for API stability with the
489 /// Stage-1a shape. Stage 1c narrows the return type to
490 /// `&Arc<dyn EngineBackend>`.
491 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
492 Some(&self.backend)
493 }
494
495 /// Crate-internal direct borrow of the backend. The public
496 /// [`Self::backend`] still returns `Option` for API stability
497 /// (Stage 1b holdover). Snapshot trait-forwarders in
498 /// [`crate::snapshot`] need an un-wrapped reference.
499 #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
500 pub(crate) fn backend_ref(
501 &self,
502 ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
503 &self.backend
504 }
505
506 /// Handle to the completion-event subscription backend, for
507 /// consumers that need to observe execution completions (DAG
508 /// reconcilers, tenant-isolated subscribers).
509 ///
510 /// Returns `Some` when the worker was built through
511 /// [`Self::connect`] on the default `valkey-default` feature
512 /// (the bundled `ValkeyBackend` implements
513 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
514 /// or via [`Self::connect_with`] with a `Some(..)` completion
515 /// handle. Returns `None` when the caller passed `None` to
516 /// [`Self::connect_with`] — i.e. the backend does not support
517 /// push-based completion streams (future Postgres without
518 /// LISTEN/NOTIFY, test mocks).
519 ///
520 /// The returned handle shares the same underlying allocation as
521 /// [`Self::backend`]; calls through it (e.g.
522 /// `subscribe_completions_filtered`) hit the same connection
523 /// the worker itself uses.
524 pub fn completion_backend(
525 &self,
526 ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
527 self.completion_backend_handle.clone()
528 }
529
530 /// Get the worker config.
531 pub fn config(&self) -> &WorkerConfig {
532 &self.config
533 }
534
535 /// Get the server-published partition config this worker bound to at
536 /// `connect()`. Exposed so consumers that mint custom
537 /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
538 /// produced outside this worker) stay aligned with the server's
539 /// `num_flow_partitions` — using `PartitionConfig::default()`
540 /// assumes 256 partitions and silently misses data on deployments
541 /// with any other value.
542 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
543 &self.partition_config
544 }
545
546 /// Back-compat alias for [`Self::claim_next_via_backend`]. Stays
547 /// behind the `direct-valkey-claim` feature to preserve the prior
548 /// opt-in ergonomics; new consumers should call
549 /// [`Self::claim_next_via_backend`] directly (no feature flag
550 /// required) — v0.14 un-gating of the scheduler-bypass scanner.
551 #[cfg(feature = "direct-valkey-claim")]
552 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
553 self.claim_next_via_backend().await
554 }
555
556 /// Attempt to claim the next eligible execution through the
557 /// [`EngineBackend`] trait — works on any backend with
558 /// `scan_eligible_executions` + `issue_claim_grant` + `claim_execution`
559 /// trait bodies (Valkey today; Postgres/SQLite when their
560 /// grant-consumer RFC lands).
561 ///
562 /// Simplified claim flow:
563 /// 1. Pick a lane (round-robin across configured lanes)
564 /// 2. Scan a [`PARTITION_SCAN_CHUNK`] window of partitions for
565 /// eligible executions via `backend.scan_eligible_executions`
566 /// 3. Issue a claim grant via `backend.issue_claim_grant`
567 /// 4. Claim the execution via `backend.claim_execution`
568 /// 5. Return a [`ClaimedTask`] with auto lease renewal
569 ///
570 /// # Scheduler bypass — read before production use
571 ///
572 /// This path **bypasses the scheduler's budget / quota admission
573 /// checks**. Enable only when the scheduler hop is measurement
574 /// noise (benches, single-tenant dev, examples demonstrating the
575 /// claim primitive) or when the test harness wants a deterministic
576 /// worker-local path. For production, consume scheduler-issued
577 /// grants via [`Self::claim_from_grant`] — the scheduler enforces
578 /// budget breach, quota sliding-window, concurrency cap, and
579 /// capability-match checks before issuing grants.
580 ///
581 /// # `None` semantics
582 ///
583 /// `Ok(None)` means **no work was found in the partition window this
584 /// poll covered**, not "the cluster is idle". Each call scans a chunk
585 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
586 /// `scan_cursor`; the cursor advances by that chunk size on every
587 /// invocation, so a worker covers every partition exactly once every
588 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
589 ///
590 /// Callers should treat `None` as "poll again soon" (typically after
591 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
592 /// time". Backing off too aggressively on `None` can starve workers
593 /// when work lives on partitions outside the current window.
594 ///
595 /// Returns `Err` on backend errors or script failures.
596 ///
597 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
598 pub async fn claim_next_via_backend(&self) -> Result<Option<ClaimedTask>, SdkError> {
599 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
600 // try_acquire returns immediately — if no permits available, the worker
601 // is at capacity and should not claim more work.
602 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
603 Ok(p) => p,
604 Err(_) => return Ok(None), // At capacity — no claim attempted
605 };
606
607 let lane_id = self.next_lane();
608 let now = TimestampMs::now();
609
610 // Phase 1: We scan eligible executions directly by reading the eligible
611 // ZSET across execution partitions. In production the scheduler
612 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
613 // simplified inline claim.
614 //
615 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
616 // partitions starting at a rolling offset. This keeps idle Valkey
617 // load at O(chunk) per worker-second instead of O(num_partitions),
618 // and the worker-instance-seeded initial cursor spreads concurrent
619 // workers across different partition windows.
620 let num_partitions = self.partition_config.num_flow_partitions as usize;
621 if num_partitions == 0 {
622 return Ok(None);
623 }
624 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
625 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
626
627 // Hoist the sorted/deduped capability set out of the loop — the
628 // per-partition iteration reused a fresh BTreeSet on every
629 // step pre-fix, adding O(n log n) alloc/sort to the scanner
630 // hot path on every tick. Computed once per `claim_next` call.
631 let worker_capabilities: std::collections::BTreeSet<String> = self
632 .config
633 .capabilities
634 .iter()
635 .cloned()
636 .collect();
637
638 for step in 0..chunk {
639 let partition_idx = ((start + step) % num_partitions) as u16;
640 let partition = ff_core::partition::Partition {
641 family: ff_core::partition::PartitionFamily::Execution,
642 index: partition_idx,
643 };
644
645 // v0.12 PR-5: trait-routed scanner primitive. Replaces the
646 // pre-PR-5 `ZRANGEBYSCORE` inline; the Valkey backend fires
647 // the identical command byte-for-byte (see
648 // `ff_backend_valkey::scan_eligible_executions_impl`).
649 let scan_args = ff_core::contracts::ScanEligibleArgs::new(
650 lane_id.clone(),
651 partition,
652 1,
653 );
654 let candidates = self.backend.scan_eligible_executions(scan_args).await?;
655 let execution_id = match candidates.into_iter().next() {
656 Some(id) => id,
657 None => continue, // No eligible executions on this partition
658 };
659
660 // Step 1: Issue claim grant (v0.12 PR-5: trait-routed).
661 let grant_args = ff_core::contracts::IssueClaimGrantArgs::new(
662 execution_id.clone(),
663 lane_id.clone(),
664 self.config.worker_id.clone(),
665 self.config.worker_instance_id.clone(),
666 partition,
667 worker_capabilities.clone(),
668 5_000, // grant_ttl_ms
669 now,
670 );
671 let grant_result = self.backend.issue_claim_grant(grant_args).await;
672
673 match grant_result {
674 Ok(_) => {}
675 Err(ref boxed)
676 if matches!(
677 boxed,
678 crate::EngineError::Validation {
679 kind: crate::ValidationKind::CapabilityMismatch,
680 ..
681 }
682 ) =>
683 {
684 let missing = match boxed {
685 crate::EngineError::Validation { detail, .. } => detail.clone(),
686 _ => unreachable!(),
687 };
688 // Block-on-mismatch (RFC-009 §7.5) — parity with
689 // ff-scheduler's Scheduler::claim_for_worker. Without
690 // this, the inline-direct-claim path would hot-loop
691 // on an unclaimable top-of-zset (every tick picks the
692 // same execution, wastes an FCALL, logs, releases,
693 // repeats). The scheduler-side unblock scanner
694 // promotes blocked_route executions back to eligible
695 // when a worker with matching caps registers.
696 tracing::info!(
697 execution_id = %execution_id,
698 worker_id = %self.config.worker_id,
699 worker_caps_hash = %self.worker_capabilities_hash,
700 missing = %missing,
701 "capability mismatch, blocking execution off eligible (SDK inline claim)"
702 );
703 // v0.12 PR-5: trait-routed block_route. Swallow
704 // typed outcomes + transport faults (best-effort
705 // semantic preserved from pre-PR-5 behaviour —
706 // see `BlockRouteOutcome::LuaRejected` rustdoc).
707 let block_args = ff_core::contracts::BlockRouteArgs::new(
708 execution_id.clone(),
709 lane_id.clone(),
710 partition,
711 "waiting_for_capable_worker".to_owned(),
712 "no connected worker satisfies required_capabilities".to_owned(),
713 now,
714 );
715 match self.backend.block_route(block_args).await {
716 Ok(ff_core::contracts::BlockRouteOutcome::Blocked { .. }) => {}
717 Ok(ff_core::contracts::BlockRouteOutcome::LuaRejected { message }) => {
718 tracing::warn!(
719 execution_id = %execution_id,
720 error = %message,
721 "SDK block_route: Lua rejected; eligible ZSET unchanged, next \
722 poll will re-evaluate"
723 );
724 }
725 Ok(_) => {}
726 Err(e) => {
727 tracing::warn!(
728 execution_id = %execution_id,
729 error = %e,
730 "SDK block_route: transport failure; eligible ZSET unchanged"
731 );
732 }
733 }
734 continue;
735 }
736 Err(ref e) if is_retryable_claim_error(e) => {
737 tracing::debug!(
738 execution_id = %execution_id,
739 error = %e,
740 "claim grant failed (retryable), trying next partition"
741 );
742 continue;
743 }
744 Err(e) => return Err(SdkError::from(e)),
745 }
746
747 // Step 2: Claim the execution
748 match self
749 .claim_execution(&execution_id, &lane_id, &partition, now)
750 .await
751 {
752 Ok(mut task) => {
753 // Transfer concurrency permit to the task. When the task is
754 // completed/failed/cancelled/dropped the permit returns to
755 // the semaphore, allowing another claim.
756 task.set_concurrency_permit(permit);
757 return Ok(Some(task));
758 }
759 Err(SdkError::Engine(ref boxed))
760 if matches!(
761 **boxed,
762 crate::EngineError::Contention(
763 crate::ContentionKind::UseClaimResumedExecution
764 )
765 ) =>
766 {
767 // Execution was resumed from suspension — attempt_interrupted.
768 // ff_claim_execution rejects this; use ff_claim_resumed_execution
769 // which reuses the existing attempt instead of creating a new one.
770 tracing::debug!(
771 execution_id = %execution_id,
772 "execution is resumed, using claim_resumed path"
773 );
774 match self
775 .claim_resumed_execution(&execution_id, &lane_id, &partition)
776 .await
777 {
778 Ok(mut task) => {
779 task.set_concurrency_permit(permit);
780 return Ok(Some(task));
781 }
782 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
783 tracing::debug!(
784 execution_id = %execution_id,
785 error = %e2,
786 "claim_resumed failed (retryable), trying next partition"
787 );
788 continue;
789 }
790 Err(e2) => return Err(e2),
791 }
792 }
793 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
794 tracing::debug!(
795 execution_id = %execution_id,
796 error = %e,
797 "claim execution failed (retryable), trying next partition"
798 );
799 continue;
800 }
801 Err(e) => return Err(e),
802 }
803 }
804
805 // No eligible work found on any partition
806 Ok(None)
807 }
808
809 /// Low-level claim of a granted execution. Routes through
810 /// [`EngineBackend::claim_execution`](ff_core::engine_backend::EngineBackend::claim_execution)
811 /// — the trait-level grant-consumer method landed in v0.12 PR-4 —
812 /// and returns a `ClaimedTask` with auto lease renewal.
813 ///
814 /// Pre-PR-4 the SDK fired the `ff_claim_execution` FCALL directly
815 /// against `valkey_client()` and hand-parsed the Lua response shape.
816 /// The body now collapses to `backend.claim_execution(args)` +
817 /// `backend.read_execution_context(...)` + `ClaimedTask::new(...)`;
818 /// the Valkey-specific FCALL plumbing lives behind the trait in
819 /// `ff_backend_valkey::claim_execution_impl`.
820 ///
821 /// As of v0.12 PR-5.5 this helper + `claim_from_grant` are no
822 /// longer `valkey-default`-gated: the backend mints the `Handle`
823 /// at claim time and `ClaimedTask::new` caches it, so no
824 /// Valkey-specific handle synthesis is required on this path. The
825 /// `EngineBackend::claim_execution` default impl returns
826 /// `Err(Unavailable)` on PG/SQLite today (grant-consumer surface
827 /// is Valkey-only until the PG/SQLite grant-consumer RFC lands);
828 /// the compile surface is fully agnostic.
829 ///
830 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
831 async fn claim_execution(
832 &self,
833 execution_id: &ExecutionId,
834 lane_id: &LaneId,
835 partition: &ff_core::partition::Partition,
836 now: TimestampMs,
837 ) -> Result<ClaimedTask, SdkError> {
838 // v0.12 PR-5.5 retry-path fix — pre-read the **total attempt
839 // counter**, not the current-attempt pointer. The fresh-claim
840 // path mints a new attempt row whose index is the counter's
841 // current value (the backend's Lua 5920 / PG `ff_claim_execution`
842 // / SQLite `claim_impl` all consult this same counter to compute
843 // `next_att_idx`). Pre-PR-5.5 this call went inline as `HGET
844 // {exec}:core total_attempt_count` on the Valkey client; the
845 // first PR-5.5 landing mistakenly routed it to
846 // `read_current_attempt_index`, which returns the *pointer* at
847 // the previously-leased attempt — on a retry-of-a-retry that
848 // still named the terminal-failed prior attempt and the new
849 // claim collided with it. `read_total_attempt_count` wraps
850 // the same byte-for-byte HGET on Valkey and provides the JSONB
851 // / json_extract equivalents on PG / SQLite. See
852 // `EngineBackend::read_total_attempt_count` rustdoc for the
853 // pointer-vs-counter distinction.
854 let att_idx = self
855 .backend
856 .read_total_attempt_count(execution_id)
857 .await
858 .map_err(|e| SdkError::Engine(Box::new(e)))?;
859
860 let args = ff_core::contracts::ClaimExecutionArgs::new(
861 execution_id.clone(),
862 self.config.worker_id.clone(),
863 self.config.worker_instance_id.clone(),
864 lane_id.clone(),
865 LeaseId::new(),
866 self.config.lease_ttl_ms,
867 AttemptId::new(),
868 att_idx,
869 "{}".to_owned(),
870 None,
871 None,
872 now,
873 );
874
875 // `ClaimExecutionResult` is `#[non_exhaustive]` (v0.12 PR-4)
876 // so let-binding requires an explicit wildcard arm even though
877 // `Claimed` is the only variant today. A future additive variant
878 // surfaces here as a typed `ScriptError::Parse` — loud, routed
879 // through the existing SDK error path, and never silently
880 // dropped.
881 let claimed = match self.backend.claim_execution(args).await? {
882 ff_core::contracts::ClaimExecutionResult::Claimed(c) => c,
883 other => {
884 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
885 fcall: "ff_claim_execution".into(),
886 execution_id: Some(execution_id.to_string()),
887 message: format!(
888 "unexpected ClaimExecutionResult variant: {other:?}"
889 ),
890 }));
891 }
892 };
893
894 // Read execution payload and metadata
895 let (input_payload, execution_kind, tags) = self
896 .read_execution_context(execution_id, partition)
897 .await?;
898
899 Ok(ClaimedTask::new(
900 self.backend.clone(),
901 self.partition_config,
902 claimed.handle,
903 execution_id.clone(),
904 claimed.attempt_index,
905 claimed.attempt_id,
906 claimed.lease_id,
907 claimed.lease_epoch,
908 self.config.lease_ttl_ms,
909 lane_id.clone(),
910 self.config.worker_instance_id.clone(),
911 input_payload,
912 execution_kind,
913 tags,
914 ))
915 }
916
917 /// Consume a [`ClaimGrant`] and claim the granted execution on
918 /// this worker. The intended production entry point: pair with
919 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
920 /// scheduler-issued grants into the SDK without enabling the
921 /// `direct-valkey-claim` feature (which bypasses budget/quota
922 /// admission control).
923 ///
924 /// The worker's concurrency semaphore is checked BEFORE the FCALL
925 /// so a saturated worker does not consume the grant: the grant
926 /// stays valid for its remaining TTL and the caller can either
927 /// release it back to the scheduler or retry after some other
928 /// in-flight task completes.
929 ///
930 /// On success the returned [`ClaimedTask`] holds a concurrency
931 /// permit that releases automatically on
932 /// `complete`/`fail`/`cancel`/drop — same contract as
933 /// `claim_next`.
934 ///
935 /// # Arguments
936 ///
937 /// * `lane` — the lane the grant was issued for. Must match what
938 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
939 /// uses it to look up `lane_eligible`, `lane_active`, and the
940 /// `worker_leases` index slot.
941 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
942 ///
943 /// # Errors
944 ///
945 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
946 /// permits all held. Retryable; the grant is untouched.
947 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
948 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
949 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
950 /// (wrapped in [`SdkError::Engine`]).
951 /// * `ScriptError::CapabilityMismatch` — execution's required
952 /// capabilities not a subset of this worker's caps (wrapped in
953 /// [`SdkError::Engine`]). Surfaced post-grant if a race
954 /// between grant issuance and caps change allows it.
955 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
956 /// unexpected shape (wrapped in [`SdkError::Engine`]).
957 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
958 /// transport error during the FCALL or the
959 /// `read_execution_context` follow-up.
960 ///
961 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
962 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
963 ///
964 /// # Backend coverage (v0.12 PR-5.5)
965 ///
966 /// Method ungated across backends. The Valkey backend handles the
967 /// grant fully. Postgres + SQLite backends return
968 /// [`EngineError::Unavailable`](ff_core::engine_error::EngineError::Unavailable)
969 /// from [`EngineBackend::claim_execution`] today — grants on those
970 /// backends flow through the scheduler-routed [`claim_via_server`]
971 /// path (the PG/SQLite scheduler lives outside the `EngineBackend`
972 /// trait in this release). See
973 /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation and
974 /// planned follow-up.
975 ///
976 /// [`claim_via_server`]: FlowFabricWorker::claim_via_server
977 /// [`EngineBackend::claim_execution`]: ff_core::engine_backend::EngineBackend::claim_execution
978 pub async fn claim_from_grant(
979 &self,
980 lane: LaneId,
981 grant: ff_core::contracts::ClaimGrant,
982 ) -> Result<ClaimedTask, SdkError> {
983 // Semaphore check FIRST. If the worker is saturated we must
984 // surface the condition to the caller without touching the
985 // grant — silently returning Ok(None) (as claim_next does)
986 // would drop a grant the scheduler has already committed work
987 // to issuing, wasting the slot until its TTL elapses.
988 let permit = self
989 .concurrency_semaphore
990 .clone()
991 .try_acquire_owned()
992 .map_err(|_| SdkError::WorkerAtCapacity)?;
993
994 let now = TimestampMs::now();
995 let partition = grant.partition().map_err(|e| SdkError::Config {
996 context: "claim_from_grant".to_owned(),
997 field: Some("partition_key".to_owned()),
998 message: e.to_string(),
999 })?;
1000 let mut task = match self
1001 .claim_execution(&grant.execution_id, &lane, &partition, now)
1002 .await
1003 {
1004 Ok(task) => task,
1005 Err(SdkError::Engine(ref boxed))
1006 if matches!(
1007 **boxed,
1008 crate::EngineError::Contention(
1009 crate::ContentionKind::UseClaimResumedExecution
1010 )
1011 ) =>
1012 {
1013 // Execution was resumed from suspension — attempt_interrupted.
1014 // ff_claim_execution rejects this; use ff_claim_resumed_execution
1015 // which reuses the existing attempt instead of creating a new one.
1016 // Mirrors the fallback inside `claim_next` so HTTP-routed callers
1017 // (`claim_via_server` → `claim_from_grant`) get the same transparent
1018 // re-claim behavior.
1019 tracing::debug!(
1020 execution_id = %grant.execution_id,
1021 "execution is resumed, using claim_resumed path"
1022 );
1023 self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
1024 .await?
1025 }
1026 Err(e) => return Err(e),
1027 };
1028 task.set_concurrency_permit(permit);
1029 Ok(task)
1030 }
1031
1032 /// Scheduler-routed claim: POST the server's
1033 /// `/v1/workers/{id}/claim`, then chain to
1034 /// [`Self::claim_from_grant`].
1035 ///
1036 /// Batch C item 2 PR-B. This is the production entry point —
1037 /// budget + quota + capability admission run server-side inside
1038 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1039 /// enable the `direct-valkey-claim` feature.
1040 ///
1041 /// Returns `Ok(None)` when the server says no eligible execution
1042 /// (HTTP 204). Callers typically back off by
1043 /// `config.claim_poll_interval_ms` and try again, same cadence
1044 /// as the direct-claim path's `Ok(None)`.
1045 ///
1046 /// The `admin` client is the established HTTP surface
1047 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1048 /// second reqwest client around. Build once at worker boot and
1049 /// hand in by reference on every claim.
1050 pub async fn claim_via_server(
1051 &self,
1052 admin: &crate::FlowFabricAdminClient,
1053 lane: &LaneId,
1054 grant_ttl_ms: u64,
1055 ) -> Result<Option<ClaimedTask>, SdkError> {
1056 let req = crate::admin::ClaimForWorkerRequest {
1057 worker_id: self.config.worker_id.to_string(),
1058 lane_id: lane.to_string(),
1059 worker_instance_id: self.config.worker_instance_id.to_string(),
1060 capabilities: self.config.capabilities.clone(),
1061 grant_ttl_ms,
1062 };
1063 let Some(resp) = admin.claim_for_worker(req).await? else {
1064 return Ok(None);
1065 };
1066 let grant = resp.into_grant()?;
1067 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1068 }
1069
1070 /// Consume a [`ResumeGrant`] and transition the granted
1071 /// `attempt_interrupted` execution into a `started` state on this
1072 /// worker. Symmetric partner to [`claim_from_grant`] for the
1073 /// resume path.
1074 ///
1075 /// **Renamed from `claim_from_reclaim_grant` (RFC-024 PR-B+C).**
1076 /// The new `claim_from_reclaim_grant` method lands with PR-G and
1077 /// dispatches to [`EngineBackend::reclaim_execution`] for the
1078 /// lease-reclaim path (distinct semantic, distinct grant type).
1079 ///
1080 /// The grant must have been issued to THIS worker (matching
1081 /// `worker_id` at grant time). A mismatch returns
1082 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1083 /// atomically by `ff_claim_resumed_execution`; a second call with
1084 /// the same grant also returns `InvalidClaimGrant`.
1085 ///
1086 /// # Concurrency
1087 ///
1088 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1089 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1090 /// assume pre-existing capacity on this worker — a reclaim can
1091 /// land on a fresh worker instance that just came up after a
1092 /// crash/restart and is picking up a previously-interrupted
1093 /// execution. If the worker is saturated, the grant stays valid
1094 /// for its remaining TTL and the caller can release it or retry.
1095 ///
1096 /// On success the returned [`ClaimedTask`] holds a concurrency
1097 /// permit that releases automatically on
1098 /// `complete`/`fail`/`cancel`/drop.
1099 ///
1100 /// # Errors
1101 ///
1102 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1103 /// permits all held. Retryable; the grant is untouched (no
1104 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1105 /// atomically consume the grant key).
1106 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1107 /// or `worker_id` mismatch.
1108 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1109 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1110 /// `attempt_interrupted`.
1111 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1112 /// not `runnable`.
1113 /// * `ScriptError::ExecutionNotFound` — core key missing.
1114 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1115 /// transport.
1116 ///
1117 /// [`ResumeGrant`]: ff_core::contracts::ResumeGrant
1118 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1119 pub async fn claim_from_resume_grant(
1120 &self,
1121 grant: ff_core::contracts::ResumeGrant,
1122 ) -> Result<ClaimedTask, SdkError> {
1123 // Semaphore check FIRST — same load-bearing ordering as
1124 // `claim_from_grant`. If the worker is saturated, surface
1125 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1126 // atomic consume on the grant key, so calling it past-
1127 // saturation would destroy the grant while leaving no
1128 // permit to attach to the returned `ClaimedTask`.
1129 let permit = self
1130 .concurrency_semaphore
1131 .clone()
1132 .try_acquire_owned()
1133 .map_err(|_| SdkError::WorkerAtCapacity)?;
1134
1135 // Grant carries partition + lane_id so no round-trip is needed
1136 // to resolve them before the FCALL.
1137 let partition = grant.partition().map_err(|e| SdkError::Config {
1138 context: "claim_from_resume_grant".to_owned(),
1139 field: Some("partition_key".to_owned()),
1140 message: e.to_string(),
1141 })?;
1142 let mut task = self
1143 .claim_resumed_execution(
1144 &grant.execution_id,
1145 &grant.lane_id,
1146 &partition,
1147 )
1148 .await?;
1149 task.set_concurrency_permit(permit);
1150 Ok(task)
1151 }
1152
1153 /// Consume a [`ReclaimGrant`] to mint a fresh attempt for a
1154 /// lease-expired / lease-revoked execution (RFC-024 §3.4).
1155 ///
1156 /// Backend-agnostic. Routes through
1157 /// [`EngineBackend::reclaim_execution`] on whatever backend the
1158 /// worker was connected with (Valkey via `connect` / `connect_with`,
1159 /// Postgres / SQLite via `connect_with`). Distinct from
1160 /// [`claim_from_resume_grant`]: reclaim creates a NEW attempt row
1161 /// and bumps `lease_reclaim_count` (`HandleKind::Reclaimed`), while
1162 /// resume re-uses the existing attempt under
1163 /// `ff_claim_resumed_execution` (`HandleKind::Resumed`).
1164 ///
1165 /// # Return shape
1166 ///
1167 /// Returns the raw [`ReclaimExecutionOutcome`] so consumers match
1168 /// on the four outcomes (`Claimed(Handle)`, `NotReclaimable`,
1169 /// `ReclaimCapExceeded`, `GrantNotFound`) and decide their
1170 /// dispatch. The `Claimed` variant carries a [`Handle`] whose
1171 /// `kind` is [`HandleKind::Reclaimed`]; downstream ops (complete,
1172 /// fail, renew, append_frame, …) take the handle directly via the
1173 /// `EngineBackend` trait.
1174 ///
1175 /// This contrasts with [`claim_from_resume_grant`], which wraps
1176 /// the handle in a [`ClaimedTask`] with a concurrency-permit and
1177 /// auto-lease-renewal loop. Those affordances are
1178 /// `valkey-default`-gated today (they depend on the bundled
1179 /// `ferriskey::Client` + `lease_ttl_ms` renewal timer). The
1180 /// reclaim surface is intentionally narrower so it compiles under
1181 /// `--no-default-features, features = ["sqlite"]` and consumers
1182 /// can drive the reclaim flow on any backend.
1183 ///
1184 /// # Feature compatibility
1185 ///
1186 /// No cfg-gate. Compiles + runs under every feature set ff-sdk
1187 /// supports (including sqlite-only). Verified by the compile-time
1188 /// type assertion
1189 /// `worker_claim_from_reclaim_grant_is_backend_agnostic_at_type_level`
1190 /// in `crates/ff-sdk/tests/rfc024_sdk.rs`, which pins the method's
1191 /// full signature under the default feature set and is paralleled
1192 /// by `sqlite_only_compile_surface_tests` in this file for the
1193 /// `--no-default-features, features = ["sqlite"]` compile anchor on
1194 /// the rest of the backend-agnostic surface.
1195 ///
1196 /// # worker_capabilities
1197 ///
1198 /// `ReclaimExecutionArgs::worker_capabilities` is NOT part of the
1199 /// Lua FCALL (the reclaim Lua validates grant consumption via
1200 /// `grant.worker_id == args.worker_id` only — see
1201 /// `crates/ff-script/src/flowfabric.lua:3088` and RFC-024 §4.4).
1202 /// Capability matching happens at grant-issuance time (see
1203 /// [`FlowFabricAdminClient::issue_reclaim_grant`]
1204 /// — in the `ff-sdk::admin` module, `valkey-default`-gated).
1205 ///
1206 /// # Errors
1207 ///
1208 /// * [`SdkError::Engine`] — the backend's `reclaim_execution`
1209 /// returned an [`EngineError`] (transport fault, validation
1210 /// failure, `Unavailable` from a backend that does not
1211 /// implement RFC-024 — currently only pre-RFC out-of-tree
1212 /// backends).
1213 ///
1214 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1215 /// [`ReclaimExecutionOutcome`]: ff_core::contracts::ReclaimExecutionOutcome
1216 /// [`Handle`]: ff_core::backend::Handle
1217 /// [`HandleKind::Reclaimed`]: ff_core::backend::HandleKind::Reclaimed
1218 /// [`EngineBackend::reclaim_execution`]: ff_core::engine_backend::EngineBackend::reclaim_execution
1219 /// [`EngineError`]: ff_core::engine_error::EngineError
1220 /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1221 /// [`FlowFabricAdminClient::issue_reclaim_grant`]: crate::admin::FlowFabricAdminClient::issue_reclaim_grant
1222 pub async fn claim_from_reclaim_grant(
1223 &self,
1224 grant: ff_core::contracts::ReclaimGrant,
1225 args: ff_core::contracts::ReclaimExecutionArgs,
1226 ) -> Result<ff_core::contracts::ReclaimExecutionOutcome, SdkError> {
1227 // `ReclaimGrant` is accepted as a parameter so the call-site
1228 // shape matches RFC-024 §3.4 (consumer receives a grant from
1229 // `issue_reclaim_grant` and feeds it + args into
1230 // `claim_from_reclaim_grant`). The grant metadata is
1231 // already embedded in the backend's server-side store
1232 // (Valkey `claim_grant` hash, PG/SQLite `ff_claim_grant`
1233 // table) keyed by (execution_id, grant_key) — the trait
1234 // method looks it up from `args.execution_id`.
1235 //
1236 // The overlap between `ReclaimGrant` and
1237 // `ReclaimExecutionArgs` is (execution_id, lane_id);
1238 // mismatched grant/args is a consumer-side bug (grant-for-A
1239 // + args-for-B), and silently forwarding lets the SDK act
1240 // on the args execution. Reject the mismatch up front so
1241 // the misuse surfaces at the SDK boundary instead of
1242 // succeeding against the wrong execution. The grant's
1243 // `expires_at_ms` is validated against wall-clock now so
1244 // an already-expired grant is rejected without a backend
1245 // round-trip (the backend also enforces expiry, but the
1246 // SDK-side check gives a crisper error and preserves the
1247 // reclaim-budget / lease slot).
1248 // `TimestampMs` is an `i64` (Unix-epoch ms); cast to `u64` for
1249 // comparison against `ReclaimGrant::expires_at_ms`. Clamp
1250 // negatives to 0 so a pre-epoch system clock (vanishingly
1251 // unlikely, but representable) doesn't wrap to a future u64.
1252 let now_ms: u64 = ff_core::types::TimestampMs::now().0.max(0) as u64;
1253 validate_reclaim_grant_against_args(&grant, &args, now_ms)?;
1254 self.backend
1255 .reclaim_execution(args)
1256 .await
1257 .map_err(|e| SdkError::Engine(Box::new(e)))
1258 }
1259
1260 /// Low-level resume claim. Forwards through
1261 /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1262 /// — the trait-level trigger surface landed in issue #150 — and
1263 /// returns a [`ClaimedTask`] bound to the resumed attempt.
1264 ///
1265 /// The method stays private; external callers use
1266 /// [`claim_from_resume_grant`].
1267 ///
1268 /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1269 async fn claim_resumed_execution(
1270 &self,
1271 execution_id: &ExecutionId,
1272 lane_id: &LaneId,
1273 partition: &ff_core::partition::Partition,
1274 ) -> Result<ClaimedTask, SdkError> {
1275 // v0.12 PR-3 — pre-read current_attempt_index via the trait
1276 // rather than an inline `HGET` on the Valkey client. Load-bearing:
1277 // the backend's KEYS[6] (Valkey) / `ff_attempt` PK tuple (PG/SQLite)
1278 // must target the real existing attempt hash/row, and the backend
1279 // takes the index verbatim from
1280 // `ClaimResumedExecutionArgs::current_attempt_index`.
1281 let att_idx = self
1282 .backend
1283 .read_current_attempt_index(execution_id)
1284 .await
1285 .map_err(|e| SdkError::Engine(Box::new(e)))?;
1286
1287 let args = ff_core::contracts::ClaimResumedExecutionArgs {
1288 execution_id: execution_id.clone(),
1289 worker_id: self.config.worker_id.clone(),
1290 worker_instance_id: self.config.worker_instance_id.clone(),
1291 lane_id: lane_id.clone(),
1292 lease_id: LeaseId::new(),
1293 lease_ttl_ms: self.config.lease_ttl_ms,
1294 current_attempt_index: att_idx,
1295 remaining_attempt_timeout_ms: None,
1296 now: TimestampMs::now(),
1297 };
1298
1299 let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1300 self.backend.claim_resumed_execution(args).await?;
1301
1302 let (input_payload, execution_kind, tags) = self
1303 .read_execution_context(execution_id, partition)
1304 .await?;
1305
1306 Ok(ClaimedTask::new(
1307 self.backend.clone(),
1308 self.partition_config,
1309 claimed.handle,
1310 execution_id.clone(),
1311 claimed.attempt_index,
1312 claimed.attempt_id,
1313 claimed.lease_id,
1314 claimed.lease_epoch,
1315 self.config.lease_ttl_ms,
1316 lane_id.clone(),
1317 self.config.worker_instance_id.clone(),
1318 input_payload,
1319 execution_kind,
1320 tags,
1321 ))
1322 }
1323
1324 /// Read payload + execution_kind + tags from exec_core.
1325 ///
1326 /// As of v0.12 PR-1 this forwards through
1327 /// [`EngineBackend::read_execution_context`](ff_core::engine_backend::EngineBackend::read_execution_context)
1328 /// rather than issuing direct GET/HGET/HGETALL against Valkey. The
1329 /// outer `valkey-default` gate + `(&ExecutionId, &Partition)`
1330 /// signature are preserved; hot-path decoupling (ungating this
1331 /// helper + its call sites in `claim_execution` and
1332 /// `claim_resumed_execution`) is PR-4/PR-5 scope per the v0.12
1333 /// agnostic-SDK plan.
1334 async fn read_execution_context(
1335 &self,
1336 execution_id: &ExecutionId,
1337 _partition: &ff_core::partition::Partition,
1338 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1339 let ctx = self.backend.read_execution_context(execution_id).await?;
1340 Ok((ctx.input_payload, ctx.execution_kind, ctx.tags))
1341 }
1342
1343 // ── Phase 3: Signal delivery ──
1344
1345 /// Deliver a signal to a suspended execution's waitpoint.
1346 ///
1347 /// The engine atomically records the signal, evaluates the resume condition,
1348 /// and optionally transitions the execution from `suspended` to `runnable`.
1349 ///
1350 /// Forwards through
1351 /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1352 /// — the trait-level trigger surface landed in issue #150.
1353 ///
1354 /// Backend-agnostic as of v0.12 PR-3. Compiles + runs under every
1355 /// feature set ff-sdk supports (including
1356 /// `--no-default-features --features sqlite`); pinned by
1357 /// `sqlite_only_compile_surface_tests::deliver_signal_addressable_under_sqlite_only`.
1358 pub async fn deliver_signal(
1359 &self,
1360 execution_id: &ExecutionId,
1361 waitpoint_id: &WaitpointId,
1362 signal: crate::task::Signal,
1363 ) -> Result<crate::task::SignalOutcome, SdkError> {
1364 let args = ff_core::contracts::DeliverSignalArgs {
1365 execution_id: execution_id.clone(),
1366 waitpoint_id: waitpoint_id.clone(),
1367 signal_id: ff_core::types::SignalId::new(),
1368 signal_name: signal.signal_name,
1369 signal_category: signal.signal_category,
1370 source_type: signal.source_type,
1371 source_identity: signal.source_identity,
1372 payload: signal.payload,
1373 payload_encoding: Some("json".to_owned()),
1374 correlation_id: None,
1375 idempotency_key: signal.idempotency_key,
1376 target_scope: "waitpoint".to_owned(),
1377 created_at: Some(TimestampMs::now()),
1378 dedup_ttl_ms: None,
1379 resume_delay_ms: None,
1380 max_signals_per_execution: None,
1381 signal_maxlen: None,
1382 waitpoint_token: signal.waitpoint_token,
1383 now: TimestampMs::now(),
1384 };
1385
1386 let result = self.backend.deliver_signal(args).await?;
1387 Ok(match result {
1388 ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1389 if effect == "resume_condition_satisfied" {
1390 crate::task::SignalOutcome::TriggeredResume { signal_id }
1391 } else {
1392 crate::task::SignalOutcome::Accepted { signal_id, effect }
1393 }
1394 }
1395 ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1396 crate::task::SignalOutcome::Duplicate {
1397 existing_signal_id: existing_signal_id.to_string(),
1398 }
1399 }
1400 })
1401 }
1402
1403 fn next_lane(&self) -> LaneId {
1404 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1405 self.config.lanes[idx].clone()
1406 }
1407}
1408
1409fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1410 use ff_core::error::ErrorClass;
1411 matches!(
1412 ff_script::engine_error_ext::class(err),
1413 ErrorClass::Retryable | ErrorClass::Informational
1414 )
1415}
1416
1417/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1418/// instance id with FNV-1a to place distinct worker processes on different
1419/// partition windows from their first poll. Zero is valid for single-worker
1420/// clusters but spreads work in multi-worker deployments.
1421fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1422 if num_partitions == 0 {
1423 return 0;
1424 }
1425 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1426}
1427
1428/// Cross-check the [`ReclaimGrant`] handed back by
1429/// `issue_reclaim_grant` against the [`ReclaimExecutionArgs`] the
1430/// consumer is about to dispatch. Catches the grant-for-A + args-for-B
1431/// misuse at the SDK boundary before a backend round-trip (PR #407
1432/// review F1).
1433///
1434/// The overlap between the two types is `(execution_id, lane_id)`;
1435/// `partition_key` / `grant_key` live only on the grant and are
1436/// verified server-side. The grant's `expires_at_ms` is also
1437/// validated against `now_ms` so an expired grant fails fast without
1438/// burning a backend call (the backend enforces expiry too, but the
1439/// SDK-side check gives a crisper, typed error).
1440///
1441/// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1442/// [`ReclaimExecutionArgs`]: ff_core::contracts::ReclaimExecutionArgs
1443fn validate_reclaim_grant_against_args(
1444 grant: &ff_core::contracts::ReclaimGrant,
1445 args: &ff_core::contracts::ReclaimExecutionArgs,
1446 now_ms: u64,
1447) -> Result<(), SdkError> {
1448 if grant.execution_id != args.execution_id {
1449 return Err(SdkError::Config {
1450 context: "claim_from_reclaim_grant".to_owned(),
1451 field: Some("execution_id".to_owned()),
1452 message: format!(
1453 "grant.execution_id ({}) does not match args.execution_id ({})",
1454 grant.execution_id, args.execution_id
1455 ),
1456 });
1457 }
1458 if grant.lane_id != args.lane_id {
1459 return Err(SdkError::Config {
1460 context: "claim_from_reclaim_grant".to_owned(),
1461 field: Some("lane_id".to_owned()),
1462 message: format!(
1463 "grant.lane_id ({}) does not match args.lane_id ({})",
1464 grant.lane_id.as_str(),
1465 args.lane_id.as_str()
1466 ),
1467 });
1468 }
1469 if grant.expires_at_ms <= now_ms {
1470 return Err(SdkError::Config {
1471 context: "claim_from_reclaim_grant".to_owned(),
1472 field: Some("expires_at_ms".to_owned()),
1473 message: format!(
1474 "grant expired: expires_at_ms={} now_ms={}",
1475 grant.expires_at_ms, now_ms
1476 ),
1477 });
1478 }
1479 Ok(())
1480}
1481
1482#[cfg(test)]
1483mod reclaim_grant_validation_tests {
1484 //! Unit tests for `validate_reclaim_grant_against_args` — the
1485 //! SDK-side cross-check that catches grant/args mismatch (PR #407
1486 //! review F1). No Valkey / backend required: the helper is pure.
1487 use super::validate_reclaim_grant_against_args;
1488 use crate::SdkError;
1489 use ff_core::contracts::{ReclaimExecutionArgs, ReclaimGrant};
1490 use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1491 use ff_core::types::{
1492 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseId, WorkerId, WorkerInstanceId,
1493 };
1494
1495 const EXEC_A: &str = "{fp:7}:00000000-0000-4000-8000-000000000001";
1496 const EXEC_B: &str = "{fp:7}:00000000-0000-4000-8000-000000000002";
1497
1498 fn exec(s: &str) -> ExecutionId {
1499 ExecutionId::parse(s).expect("valid execution id")
1500 }
1501
1502 fn grant_for(execution_id: ExecutionId, lane: &str, expires_at_ms: u64) -> ReclaimGrant {
1503 ReclaimGrant::new(
1504 execution_id,
1505 PartitionKey::from(&Partition { family: PartitionFamily::Flow, index: 7 }),
1506 "reclaim:grant:abc".to_owned(),
1507 expires_at_ms,
1508 LaneId::new(lane),
1509 )
1510 }
1511
1512 fn args_for(execution_id: ExecutionId, lane: &str) -> ReclaimExecutionArgs {
1513 ReclaimExecutionArgs::new(
1514 execution_id,
1515 WorkerId::new("w1"),
1516 WorkerInstanceId::new("w1-i1"),
1517 LaneId::new(lane),
1518 None,
1519 LeaseId::new(),
1520 30_000,
1521 AttemptId::new(),
1522 "{}".to_owned(),
1523 None,
1524 WorkerInstanceId::new("w1-i0"),
1525 AttemptIndex::new(0),
1526 )
1527 }
1528
1529 #[test]
1530 fn accepts_matching_grant_and_args() {
1531 let g = grant_for(exec(EXEC_A), "main", 2_000);
1532 let a = args_for(exec(EXEC_A), "main");
1533 assert!(validate_reclaim_grant_against_args(&g, &a, 1_000).is_ok());
1534 }
1535
1536 #[test]
1537 fn rejects_mismatched_execution_id() {
1538 let g = grant_for(exec(EXEC_A), "main", 2_000);
1539 let a = args_for(exec(EXEC_B), "main");
1540 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1541 .expect_err("expected mismatched execution_id to fail");
1542 match err {
1543 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("execution_id")),
1544 other => panic!("expected Config error, got {other:?}"),
1545 }
1546 }
1547
1548 #[test]
1549 fn rejects_mismatched_lane_id() {
1550 let g = grant_for(exec(EXEC_A), "main", 2_000);
1551 let a = args_for(exec(EXEC_A), "other");
1552 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1553 .expect_err("expected mismatched lane_id to fail");
1554 match err {
1555 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("lane_id")),
1556 other => panic!("expected Config error, got {other:?}"),
1557 }
1558 }
1559
1560 #[test]
1561 fn rejects_expired_grant() {
1562 // expires_at_ms == now_ms is rejected (strict `<=`) so the
1563 // server's TTL enforcement window can't race us.
1564 let g = grant_for(exec(EXEC_A), "main", 1_000);
1565 let a = args_for(exec(EXEC_A), "main");
1566 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1567 .expect_err("expected expired grant to fail");
1568 match err {
1569 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1570 other => panic!("expected Config error, got {other:?}"),
1571 }
1572
1573 // Also rejected when already past expiry.
1574 let err2 = validate_reclaim_grant_against_args(&g, &a, 5_000)
1575 .expect_err("expected past-expiry grant to fail");
1576 match err2 {
1577 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1578 other => panic!("expected Config error, got {other:?}"),
1579 }
1580 }
1581}
1582
1583#[cfg(test)]
1584mod completion_accessor_type_tests {
1585 //! Type-level compile check that
1586 //! [`FlowFabricWorker::completion_backend`] returns an
1587 //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1588 //! the assertion is at the function-pointer type level and the
1589 //! #[test] body exists solely so the compiler elaborates it.
1590 use super::FlowFabricWorker;
1591 use ff_core::completion_backend::CompletionBackend;
1592 use std::sync::Arc;
1593
1594 #[test]
1595 fn completion_backend_accessor_signature() {
1596 // If this line compiles, the public accessor returns the
1597 // advertised type. The function is never called (no live
1598 // worker), so no I/O happens.
1599 let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1600 FlowFabricWorker::completion_backend;
1601 }
1602}
1603
1604/// RFC-023 Phase 1a §4.4 item 10g compile-only anchor. Parallels
1605/// `completion_accessor_type_tests` but fires under the sqlite-only
1606/// feature set: proves `FlowFabricWorker::connect_with` is callable
1607/// and returns the advertised type under `--no-default-features,
1608/// features = ["sqlite"]`. The matching `§9` CI cell (`cargo check
1609/// -p ff-sdk --no-default-features --features sqlite`) executes this
1610/// compile check mechanically so future PRs that accidentally reach
1611/// outside the `valkey-default` gate fail the build.
1612#[cfg(all(test, not(feature = "valkey-default")))]
1613mod sqlite_only_compile_surface_tests {
1614 use super::FlowFabricWorker;
1615 use ff_core::completion_backend::CompletionBackend;
1616 use ff_core::engine_backend::EngineBackend;
1617 use std::sync::Arc;
1618
1619 #[test]
1620 fn addressable_surface_under_sqlite_only() {
1621 // Type-level proof that the backend-agnostic accessors are
1622 // reachable without the `valkey-default` feature. None of
1623 // these are called; the assignment targets pin the signature.
1624 let _a: fn(
1625 &FlowFabricWorker,
1626 ) -> Option<&Arc<dyn EngineBackend>> = FlowFabricWorker::backend;
1627 let _b: fn(
1628 &FlowFabricWorker,
1629 ) -> Option<Arc<dyn CompletionBackend>> =
1630 FlowFabricWorker::completion_backend;
1631 let _c: fn(&FlowFabricWorker) -> &crate::config::WorkerConfig =
1632 FlowFabricWorker::config;
1633 let _d: fn(&FlowFabricWorker) -> &ff_core::partition::PartitionConfig =
1634 FlowFabricWorker::partition_config;
1635 }
1636
1637 /// v0.12 PR-1 compile anchor — the new
1638 /// [`EngineBackend::read_execution_context`] trait method MUST be
1639 /// addressable under `--no-default-features --features sqlite`. A
1640 /// direct fn-pointer cast is awkward under `#[async_trait]`
1641 /// (lifetime-generic fn items don't coerce to `fn` pointers), so
1642 /// we take the next-best compile proof: exercise a generic that
1643 /// names the method through the trait bound. A bodyless call would
1644 /// require a concrete backend; the generic keeps the test pure
1645 /// compile-time.
1646 #[test]
1647 fn read_execution_context_addressable_under_sqlite_only() {
1648 use ff_core::contracts::ExecutionContext;
1649 use ff_core::engine_error::EngineError;
1650 use ff_core::types::ExecutionId;
1651
1652 #[allow(dead_code)]
1653 async fn _pin<B: EngineBackend + ?Sized>(
1654 b: &B,
1655 id: &ExecutionId,
1656 ) -> Result<ExecutionContext, EngineError> {
1657 b.read_execution_context(id).await
1658 }
1659 }
1660
1661 /// v0.12 PR-2 compile anchor — `ClaimedTask` as a type MUST be
1662 /// addressable under `--no-default-features --features sqlite`
1663 /// (the `task` module is no longer `valkey-default`-gated at the
1664 /// module level). As of v0.12 PR-5.5 the `impl ClaimedTask { ... }`
1665 /// block is likewise ungated: the backend mints the `Handle` at
1666 /// claim time and `cloned_handle` just clones the cached field, so
1667 /// no Valkey-specific synthesis is required. This anchor pins the
1668 /// struct path + its field types through a generic-over-T wrapper
1669 /// so a compile-time lookup of `ClaimedTask` is exercised
1670 /// mechanically under the sqlite-only feature set.
1671 #[test]
1672 fn claimed_task_type_addressable_under_sqlite_only() {
1673 use crate::task::ClaimedTask;
1674
1675 #[allow(dead_code)]
1676 fn _pin<T>(_: std::marker::PhantomData<T>) -> std::marker::PhantomData<ClaimedTask> {
1677 std::marker::PhantomData
1678 }
1679 }
1680
1681 /// v0.12 PR-6 compile anchor — ungating `admin` + `snapshot`
1682 /// modules at module level must not re-introduce ferriskey
1683 /// symbols under `--no-default-features --features sqlite`.
1684 /// Pins that [`FlowFabricAdminClient::new`] and a representative
1685 /// snapshot forwarder are addressable without the
1686 /// `valkey-default` feature.
1687 #[test]
1688 fn admin_and_snapshot_addressable_under_sqlite_only() {
1689 use crate::admin::FlowFabricAdminClient;
1690 use crate::SdkError;
1691 use ff_core::contracts::ExecutionSnapshot;
1692 use ff_core::types::ExecutionId;
1693
1694 // `FlowFabricAdminClient::new` is `fn(impl Into<String>)` —
1695 // can't fn-pointer-coerce directly. A no-op call with a
1696 // concrete `&str` pins the method addressably under the
1697 // sqlite-only feature set (the error return-type is the
1698 // same `SdkError` the rest of the module returns).
1699 #[allow(dead_code)]
1700 fn _pin_admin_new() -> Result<FlowFabricAdminClient, SdkError> {
1701 FlowFabricAdminClient::new("http://anchor")
1702 }
1703
1704 // Snapshot method is `async`, so coerce via the same
1705 // trait-bound pattern as `read_execution_context` above.
1706 #[allow(dead_code)]
1707 async fn _pin_describe(
1708 w: &FlowFabricWorker,
1709 id: &ExecutionId,
1710 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
1711 w.describe_execution(id).await
1712 }
1713 }
1714
1715 /// v0.13 SC-10 compile anchor — the backend-agnostic admin facade
1716 /// (`FlowFabricAdminClient::connect_with` + embedded-transport
1717 /// methods) MUST be addressable under
1718 /// `--no-default-features --features sqlite`. Without this anchor,
1719 /// a regression that re-introduces a ferriskey symbol on the
1720 /// embedded admin path would only surface on downstream
1721 /// sqlite-only consumers.
1722 #[test]
1723 fn admin_facade_addressable_under_sqlite_only() {
1724 use crate::admin::{
1725 ClaimForWorkerRequest, ClaimForWorkerResponse, FlowFabricAdminClient,
1726 IssueReclaimGrantRequest, IssueReclaimGrantResponse, RotateWaitpointSecretRequest,
1727 RotateWaitpointSecretResponse,
1728 };
1729 use crate::SdkError;
1730 use std::sync::Arc;
1731
1732 // `connect_with` is infallible; fn-pointer coerce pins the
1733 // signature under the sqlite-only feature set.
1734 let _ctor: fn(Arc<dyn EngineBackend>) -> FlowFabricAdminClient =
1735 FlowFabricAdminClient::connect_with;
1736
1737 // Each async method is generic over `&self` lifetimes under
1738 // `#[async_trait]`-style expansion, so pin via a wrapper fn.
1739 #[allow(dead_code)]
1740 async fn _pin_claim(
1741 c: &FlowFabricAdminClient,
1742 req: ClaimForWorkerRequest,
1743 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
1744 c.claim_for_worker(req).await
1745 }
1746 #[allow(dead_code)]
1747 async fn _pin_reclaim(
1748 c: &FlowFabricAdminClient,
1749 id: &str,
1750 req: IssueReclaimGrantRequest,
1751 ) -> Result<IssueReclaimGrantResponse, SdkError> {
1752 c.issue_reclaim_grant(id, req).await
1753 }
1754 #[allow(dead_code)]
1755 async fn _pin_rotate(
1756 c: &FlowFabricAdminClient,
1757 req: RotateWaitpointSecretRequest,
1758 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
1759 c.rotate_waitpoint_secret(req).await
1760 }
1761 }
1762
1763 /// v0.12 PR-3 compile anchor — the new
1764 /// [`EngineBackend::read_current_attempt_index`] trait method MUST
1765 /// be addressable under `--no-default-features --features sqlite`.
1766 /// Mirrors the PR-1 `read_execution_context` anchor: takes a
1767 /// generic over the trait bound so `#[async_trait]` lifetime
1768 /// elision doesn't block an `fn`-pointer coercion.
1769 #[test]
1770 fn read_current_attempt_index_addressable_under_sqlite_only() {
1771 use ff_core::engine_error::EngineError;
1772 use ff_core::types::{AttemptIndex, ExecutionId};
1773
1774 #[allow(dead_code)]
1775 async fn _pin<B: EngineBackend + ?Sized>(
1776 b: &B,
1777 id: &ExecutionId,
1778 ) -> Result<AttemptIndex, EngineError> {
1779 b.read_current_attempt_index(id).await
1780 }
1781 }
1782
1783 /// v0.12 PR-5.5 retry-path-fix compile anchor — the new
1784 /// [`EngineBackend::read_total_attempt_count`] trait method MUST
1785 /// be addressable under `--no-default-features --features sqlite`.
1786 /// Mirrors the PR-3 `read_current_attempt_index` anchor.
1787 #[test]
1788 fn read_total_attempt_count_addressable_under_sqlite_only() {
1789 use ff_core::engine_error::EngineError;
1790 use ff_core::types::{AttemptIndex, ExecutionId};
1791
1792 #[allow(dead_code)]
1793 async fn _pin<B: EngineBackend + ?Sized>(
1794 b: &B,
1795 id: &ExecutionId,
1796 ) -> Result<AttemptIndex, EngineError> {
1797 b.read_total_attempt_count(id).await
1798 }
1799 }
1800
1801 /// v0.12 PR-4 compile anchor — the new
1802 /// [`EngineBackend::claim_execution`] trait method MUST be
1803 /// addressable under `--no-default-features --features sqlite`.
1804 /// Mirrors the PR-3 `read_current_attempt_index` anchor: takes a
1805 /// generic over the trait bound so `#[async_trait]` lifetime
1806 /// elision doesn't block a plain fn-pointer coercion.
1807 ///
1808 /// The method has an `Err(Unavailable)` default impl; PG + SQLite
1809 /// backends don't override it today (grants are Valkey-only until
1810 /// the PG/SQLite grant-consumer RFC lands). The anchor pins the
1811 /// trait-surface signature — not a runtime call — so the compile
1812 /// check passes cleanly on every feature set.
1813 #[test]
1814 fn claim_execution_addressable_under_sqlite_only() {
1815 use ff_core::contracts::{ClaimExecutionArgs, ClaimExecutionResult};
1816 use ff_core::engine_error::EngineError;
1817
1818 #[allow(dead_code)]
1819 async fn _pin<B: EngineBackend + ?Sized>(
1820 b: &B,
1821 args: ClaimExecutionArgs,
1822 ) -> Result<ClaimExecutionResult, EngineError> {
1823 b.claim_execution(args).await
1824 }
1825 }
1826
1827 /// v0.12 PR-5 compile anchor — the three new scanner-primitive
1828 /// trait methods (`scan_eligible_executions`, `issue_claim_grant`,
1829 /// `block_route`) MUST be addressable under
1830 /// `--no-default-features --features sqlite`. Each has an
1831 /// `Err(Unavailable)` default impl; PG + SQLite backends don't
1832 /// override (the scheduler-routed `claim_for_worker` path is the
1833 /// supported PG/SQLite entry point). The anchor pins the trait-
1834 /// surface signatures — not runtime calls — so the compile check
1835 /// passes cleanly on every feature set.
1836 #[test]
1837 fn scan_eligible_executions_addressable_under_sqlite_only() {
1838 use ff_core::contracts::ScanEligibleArgs;
1839 use ff_core::engine_error::EngineError;
1840 use ff_core::types::ExecutionId;
1841
1842 #[allow(dead_code)]
1843 async fn _pin<B: EngineBackend + ?Sized>(
1844 b: &B,
1845 args: ScanEligibleArgs,
1846 ) -> Result<Vec<ExecutionId>, EngineError> {
1847 b.scan_eligible_executions(args).await
1848 }
1849 }
1850
1851 #[test]
1852 fn issue_claim_grant_addressable_under_sqlite_only() {
1853 use ff_core::contracts::{IssueClaimGrantArgs, IssueClaimGrantOutcome};
1854 use ff_core::engine_error::EngineError;
1855
1856 #[allow(dead_code)]
1857 async fn _pin<B: EngineBackend + ?Sized>(
1858 b: &B,
1859 args: IssueClaimGrantArgs,
1860 ) -> Result<IssueClaimGrantOutcome, EngineError> {
1861 b.issue_claim_grant(args).await
1862 }
1863 }
1864
1865 #[test]
1866 fn block_route_addressable_under_sqlite_only() {
1867 use ff_core::contracts::{BlockRouteArgs, BlockRouteOutcome};
1868 use ff_core::engine_error::EngineError;
1869
1870 #[allow(dead_code)]
1871 async fn _pin<B: EngineBackend + ?Sized>(
1872 b: &B,
1873 args: BlockRouteArgs,
1874 ) -> Result<BlockRouteOutcome, EngineError> {
1875 b.block_route(args).await
1876 }
1877 }
1878
1879 /// v0.12 PR-3 compile anchor — `FlowFabricWorker::deliver_signal`
1880 /// MUST be addressable under `--no-default-features --features sqlite`
1881 /// (ungated in PR-3 — the body is pure
1882 /// `self.backend.deliver_signal(...)` trait dispatch).
1883 #[test]
1884 fn deliver_signal_addressable_under_sqlite_only() {
1885 use crate::task::{Signal, SignalOutcome};
1886 use crate::SdkError;
1887 use ff_core::types::{ExecutionId, WaitpointId};
1888 use std::future::Future;
1889 use std::pin::Pin;
1890
1891 // `deliver_signal` is `async fn`, so its item signature bakes
1892 // in a hidden lifetime and opaque return future. Take an
1893 // `fn`-pointer to an explicit wrapper that names the return
1894 // type through the trait — same shape as the PR-1 anchor
1895 // (`read_execution_context_addressable_under_sqlite_only`).
1896 #[allow(dead_code)]
1897 fn _pin<'a>(
1898 w: &'a FlowFabricWorker,
1899 id: &'a ExecutionId,
1900 wp: &'a WaitpointId,
1901 s: Signal,
1902 ) -> Pin<Box<dyn Future<Output = Result<SignalOutcome, SdkError>> + Send + 'a>> {
1903 Box::pin(w.deliver_signal(id, wp, s))
1904 }
1905 }
1906
1907 /// v0.12 PR-5.5 compile anchor — `claim_from_grant` MUST be
1908 /// addressable under `--no-default-features --features sqlite`
1909 /// (ungated in PR-5.5). PG / SQLite return `EngineError::Unavailable`
1910 /// from the underlying trait method today, so the call path is
1911 /// compile-reachable but runtime-unavailable. The anchor pins the
1912 /// signature; a future PR wiring real PG/SQLite grant-consumer
1913 /// bodies flips the runtime behaviour without touching this test.
1914 #[test]
1915 fn claim_from_grant_addressable_under_sqlite_only() {
1916 use crate::task::ClaimedTask;
1917 use crate::SdkError;
1918 use ff_core::contracts::ClaimGrant;
1919 use ff_core::types::LaneId;
1920 use std::future::Future;
1921 use std::pin::Pin;
1922
1923 #[allow(dead_code)]
1924 fn _pin<'a>(
1925 w: &'a FlowFabricWorker,
1926 lane: LaneId,
1927 grant: ClaimGrant,
1928 ) -> Pin<Box<dyn Future<Output = Result<ClaimedTask, SdkError>> + Send + 'a>> {
1929 Box::pin(w.claim_from_grant(lane, grant))
1930 }
1931 }
1932
1933 /// v0.12 PR-5.5 compile anchor — `claim_via_server` MUST be
1934 /// addressable under `--no-default-features --features sqlite`.
1935 /// The scheduler-routed path is the supported PG/SQLite claim
1936 /// entry point per `project_claim_from_grant_pg_sqlite_gap.md`;
1937 /// pinning the signature here prevents a future PR from
1938 /// accidentally re-gating it behind `valkey-default`.
1939 #[test]
1940 fn claim_via_server_addressable_under_sqlite_only() {
1941 use crate::admin::FlowFabricAdminClient;
1942 use crate::task::ClaimedTask;
1943 use crate::SdkError;
1944 use ff_core::types::LaneId;
1945 use std::future::Future;
1946 use std::pin::Pin;
1947
1948 #[allow(dead_code)]
1949 fn _pin<'a>(
1950 w: &'a FlowFabricWorker,
1951 admin: &'a FlowFabricAdminClient,
1952 lane: &'a LaneId,
1953 grant_ttl_ms: u64,
1954 ) -> Pin<Box<dyn Future<Output = Result<Option<ClaimedTask>, SdkError>> + Send + 'a>>
1955 {
1956 Box::pin(w.claim_via_server(admin, lane, grant_ttl_ms))
1957 }
1958 }
1959
1960 /// v0.12 PR-5.5 compile anchor — `ClaimedTask::{complete, fail,
1961 /// cancel}` MUST be addressable under `--no-default-features
1962 /// --features sqlite`. The `impl ClaimedTask` block is now
1963 /// module-level ungated (PR-5.5); the terminal ops route through
1964 /// `EngineBackend::{complete, fail, cancel}` which are core trait
1965 /// methods (no `streaming` / `suspension` / `budget` gate).
1966 #[test]
1967 fn claimed_task_terminal_ops_addressable_under_sqlite_only() {
1968 use crate::task::{ClaimedTask, FailOutcome};
1969 use crate::SdkError;
1970 use std::future::Future;
1971 use std::pin::Pin;
1972
1973 #[allow(dead_code)]
1974 fn _pin_complete(
1975 t: ClaimedTask,
1976 payload: Option<Vec<u8>>,
1977 ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send>> {
1978 Box::pin(t.complete(payload))
1979 }
1980
1981 #[allow(dead_code)]
1982 fn _pin_fail<'a>(
1983 t: ClaimedTask,
1984 reason: &'a str,
1985 error_category: &'a str,
1986 ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, SdkError>> + Send + 'a>> {
1987 Box::pin(t.fail(reason, error_category))
1988 }
1989
1990 #[allow(dead_code)]
1991 fn _pin_cancel<'a>(
1992 t: ClaimedTask,
1993 reason: &'a str,
1994 ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send + 'a>> {
1995 Box::pin(t.cancel(reason))
1996 }
1997 }
1998}
1999
2000#[cfg(all(test, feature = "direct-valkey-claim"))]
2001mod scan_cursor_tests {
2002 use super::scan_cursor_seed;
2003
2004 #[test]
2005 fn stable_for_same_input() {
2006 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
2007 }
2008
2009 #[test]
2010 fn distinct_for_different_ids() {
2011 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
2012 }
2013
2014 #[test]
2015 fn bounded_by_partition_count() {
2016 for i in 0..100 {
2017 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
2018 }
2019 }
2020
2021 #[test]
2022 fn zero_partitions_returns_zero() {
2023 assert_eq!(scan_cursor_seed("w1", 0), 0);
2024 }
2025}