ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6// RFC-023 Phase 1a (§4.4 item 10b): ferriskey imports scoped behind
7// `valkey-default` so the module compiles under
8// `--no-default-features, features = ["sqlite"]`.
9#[cfg(feature = "valkey-default")]
10use ferriskey::Client;
11use ff_core::partition::PartitionConfig;
12use ff_core::types::*;
13use tokio::sync::Semaphore;
14
15use crate::config::WorkerConfig;
16use crate::task::ClaimedTask;
17use crate::SdkError;
18
19/// FlowFabric worker — connects to Valkey, claims executions, and provides
20/// the worker-facing API.
21///
22/// # Admission control
23///
24/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
25/// **bypasses the scheduler's admission controls**: it reads the eligible
26/// ZSET directly and mints its own claim grant without consulting budget
27/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
28/// benchmarks, tests, and single-tenant development where the scheduler
29/// hop is measurement noise, not for production.
30///
31/// For production deployments, consume scheduler-issued grants via
32/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
33/// budget breach, quota sliding-window, concurrency cap, and
34/// capability-match checks before issuing grants.
35///
36/// # Usage
37///
38/// ```rust,ignore
39/// use ff_core::backend::BackendConfig;
40/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
41/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
42///
43/// let config = WorkerConfig {
44/// backend: BackendConfig::valkey("localhost", 6379),
45/// worker_id: WorkerId::new("w1"),
46/// worker_instance_id: WorkerInstanceId::new("w1-i1"),
47/// namespace: Namespace::new("default"),
48/// lanes: vec![LaneId::new("main")],
49/// capabilities: Vec::new(),
50/// lease_ttl_ms: 30_000,
51/// claim_poll_interval_ms: 1_000,
52/// max_concurrent_tasks: 1,
53/// partition_config: None,
54/// };
55/// let worker = FlowFabricWorker::connect(config).await?;
56///
57/// loop {
58/// if let Some(task) = worker.claim_next().await? {
59/// // Process task...
60/// task.complete(Some(b"result".to_vec())).await?;
61/// } else {
62/// tokio::time::sleep(Duration::from_secs(1)).await;
63/// }
64/// }
65/// ```
66pub struct FlowFabricWorker {
67 /// RFC-023 Phase 1a (§4.4 item 10c): `ferriskey::Client` is
68 /// `valkey-default`-gated **and** `Option` wrapped. Under
69 /// sqlite-only features the field is absent. Under
70 /// `valkey-default` the field is `Some(client)` when the worker
71 /// was built via [`Self::connect`] (the Valkey-bundled entry
72 /// point that dials), and `None` when it was built via
73 /// [`Self::connect_with`] (the backend-agnostic entry point per
74 /// §4.4 item 10e — no ferriskey round-trip). Claim/signal hot
75 /// paths (all `valkey-default`-gated per §4.4 item 10f) expect
76 /// `Some`; a claim call against a `connect_with`-built worker
77 /// panics with a clear message until the backend-agnostic SDK
78 /// worker-loop RFC lands (tracked in §8).
79 #[cfg(feature = "valkey-default")]
80 #[allow(dead_code)]
81 client: Option<Client>,
82 config: WorkerConfig,
83 partition_config: PartitionConfig,
84 /// 8-hex FNV-1a digest of the sorted capabilities CSV. Used in
85 /// per-mismatch logs so the 4KB CSV never echoes on every reject
86 /// during an incident. For [`Self::connect`]-built workers, the
87 /// full CSV is additionally logged once at connect-time WARN via
88 /// `valkey_preamble::run` for cross-reference; [`Self::connect_with`]-
89 /// built workers compute the hash without the companion CSV log.
90 /// Mirrors `ff-scheduler::claim::worker_caps_digest`.
91 #[cfg(feature = "direct-valkey-claim")]
92 worker_capabilities_hash: String,
93 #[cfg(feature = "direct-valkey-claim")]
94 lane_index: AtomicUsize,
95 /// Concurrency cap for in-flight tasks. Permits are acquired or
96 /// transferred by [`claim_next`] (feature-gated),
97 /// [`claim_from_grant`] (always available), and
98 /// [`claim_from_reclaim_grant`], transferred to the returned
99 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
100 /// Holds `max_concurrent_tasks` permits total.
101 ///
102 /// [`claim_next`]: FlowFabricWorker::claim_next
103 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
104 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
105 #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
106 concurrency_semaphore: Arc<Semaphore>,
107 /// Rolling offset for chunked partition scans. Each poll advances the
108 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
109 /// chunk)` polls every partition is covered. The initial value is
110 /// derived from `worker_instance_id` so idle workers spread their
111 /// scans across different partitions from the first poll onward.
112 ///
113 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
114 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
115 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
116 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
117 /// correctness because the sequence simply restarts a new cycle.
118 #[cfg(feature = "direct-valkey-claim")]
119 scan_cursor: AtomicUsize,
120 /// The [`EngineBackend`] the Stage-1b trait forwarders route
121 /// through.
122 ///
123 /// **RFC-012 Stage 1b.** Always populated:
124 /// [`FlowFabricWorker::connect`] now wraps the worker's own
125 /// `ferriskey::Client` in a `ValkeyBackend` via
126 /// `ValkeyBackend::from_client_and_partitions`, and
127 /// [`FlowFabricWorker::connect_with`] replaces that default with
128 /// the caller-supplied `Arc<dyn EngineBackend>`. The
129 /// [`FlowFabricWorker::backend`] accessor still returns
130 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
131 /// narrows the return type once consumers have migrated.
132 ///
133 /// Hot paths (claim, deliver_signal, admin queries) still use the
134 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
135 /// migrates them through this field, and Stage 1d removes the
136 /// embedded client.
137 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
138 /// Optional handle to the same underlying backend viewed as a
139 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
140 /// Populated by [`Self::connect`] from the bundled
141 /// `ValkeyBackend` (which implements the trait); supplied by the
142 /// caller on [`Self::connect_with`] as an explicit
143 /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
144 /// backend does not support push-based completion" (e.g. a future
145 /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
146 /// and other completion-subscription consumers reach this through
147 /// [`Self::completion_backend`].
148 completion_backend_handle:
149 Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
150}
151
152/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
153/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
154/// O(num_flow_partitions).
155#[cfg(feature = "direct-valkey-claim")]
156const PARTITION_SCAN_CHUNK: usize = 32;
157
158impl FlowFabricWorker {
159 /// RFC-023 Phase 1a helper: borrow the embedded `ferriskey::Client`,
160 /// panicking with a clear message if the worker was built via
161 /// [`Self::connect_with`] (which does not dial Valkey). Every
162 /// Valkey-specific claim/signal method funnels through this accessor
163 /// so the panic site is uniform and traceable.
164 ///
165 /// A backend-agnostic claim/signal API is deferred per §8 breaking-
166 /// change disclosure; until that lands, valkey-default consumers
167 /// must use [`Self::connect`] if they intend to drive the Valkey
168 /// claim/signal loop.
169 #[cfg(feature = "valkey-default")]
170 #[inline]
171 #[allow(dead_code)]
172 fn valkey_client(&self) -> &Client {
173 self.client.as_ref().expect(
174 "FlowFabricWorker was built via connect_with (no Valkey dial) \
175 but a Valkey-specific claim/signal method was invoked. \
176 Use FlowFabricWorker::connect to dial Valkey, or drive the \
177 backend directly through the trait surface via .backend().",
178 )
179 }
180}
181
182impl FlowFabricWorker {
183 /// Connect to Valkey and prepare the worker.
184 ///
185 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
186 /// library — that is the server's responsibility (ff-server calls
187 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
188 /// the library is already loaded.
189 ///
190 /// # Smoke / dev scripts: rotate `WorkerInstanceId`
191 ///
192 /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
193 /// `WorkerInstanceId`. When a smoke / dev script reuses the same
194 /// `WorkerInstanceId` across restarts, subsequent runs trap behind
195 /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
196 /// configured lease TTL) expires — the worker appears stuck and
197 /// claims nothing. Iterative scripts should synthesise a fresh
198 /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
199 /// or embed a UUID/timestamp) rather than hard-coding a stable
200 /// value. Production workers that cleanly shut down release the
201 /// key; only crashed / kill -9'd processes hit this trap.
202 ///
203 /// **RFC-023 Phase 1a (§4.4 item 10d, v0.12.0):** this Valkey-bundled
204 /// entry point is `#[cfg(feature = "valkey-default")]`-gated.
205 /// Consumers on the sqlite-only feature set must use
206 /// [`FlowFabricWorker::connect_with`] and drive the backend directly
207 /// through the trait surface.
208 #[cfg(feature = "valkey-default")]
209 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
210 if config.lanes.is_empty() {
211 return Err(SdkError::Config {
212 context: "worker_config".into(),
213 field: None,
214 message: "at least one lane is required".into(),
215 });
216 }
217
218 // Build the ferriskey client from the nested `BackendConfig`.
219 // Delegates to `ff_backend_valkey::build_client` so host/port +
220 // TLS + cluster + `BackendTimeouts::request` + `BackendRetry`
221 // wiring lives in exactly one place (RFC-012 Stage 1c tranche 1).
222 let client = ff_backend_valkey::build_client(&config.backend).await?;
223
224 // v0.12 PR-6: the Valkey-specific preamble (PING, alive-key
225 // SET-NX, `ff:config:partitions` HGETALL, caps ingress +
226 // sorted-dedup CSV, caps STRING + workers-index writes) lives
227 // in `crate::valkey_preamble`. Extracted byte-for-byte from
228 // the pre-PR-6 inline body; the write order is observable
229 // from scheduler-side reads (unblock scanner:
230 // SMEMBERS ff:idx:workers → GET ff:worker:{id}:caps) so
231 // preservation is load-bearing. See `valkey_preamble::run`.
232 let crate::valkey_preamble::PreambleOutput {
233 partition_config,
234 #[cfg(feature = "direct-valkey-claim")]
235 capabilities_csv: _worker_capabilities_csv,
236 #[cfg(feature = "direct-valkey-claim")]
237 capabilities_hash: worker_capabilities_hash,
238 } = crate::valkey_preamble::run(&client, &config).await?;
239
240 let max_tasks = config.max_concurrent_tasks.max(1);
241 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
242
243 tracing::info!(
244 worker_id = %config.worker_id,
245 instance_id = %config.worker_instance_id,
246 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
247 "FlowFabricWorker connected"
248 );
249
250 #[cfg(feature = "direct-valkey-claim")]
251 let scan_cursor_init = scan_cursor_seed(
252 config.worker_instance_id.as_str(),
253 partition_config.num_flow_partitions.max(1) as usize,
254 );
255
256 // RFC-012 Stage 1b: wrap the dialed client in a ValkeyBackend
257 // so `ClaimedTask`'s trait forwarders have something to call.
258 // `from_client_and_partitions` reuses the already-dialed client
259 // — no second connection. Share the concrete
260 // `Arc<ValkeyBackend>` across the two trait objects — one
261 // allocation, both accessors yield identity-equivalent handles.
262 let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
263 ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
264 client.clone(),
265 partition_config,
266 );
267 let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
268 let completion_backend_handle: Option<
269 Arc<dyn ff_core::completion_backend::CompletionBackend>,
270 > = Some(valkey_backend);
271
272 Ok(Self {
273 client: Some(client),
274 config,
275 partition_config,
276 #[cfg(feature = "direct-valkey-claim")]
277 worker_capabilities_hash,
278 #[cfg(feature = "direct-valkey-claim")]
279 lane_index: AtomicUsize::new(0),
280 concurrency_semaphore,
281 #[cfg(feature = "direct-valkey-claim")]
282 scan_cursor: AtomicUsize::new(scan_cursor_init),
283 backend,
284 completion_backend_handle,
285 })
286 }
287
288 /// Store pre-built [`EngineBackend`] and (optional)
289 /// [`CompletionBackend`] handles on the worker. Builds the worker
290 /// via the legacy [`FlowFabricWorker::connect`] path first (so the
291 /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
292 /// paths still use is dialed), then replaces the default
293 /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
294 ///
295 /// The `completion` argument is explicit: 0.3.3 previously accepted
296 /// only `backend` and `completion_backend()` silently returned
297 /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
298 /// upcast to `Arc<dyn CompletionBackend>` without loss of
299 /// trait-object identity. 0.3.4 lets the caller decide.
300 ///
301 /// - `Some(arc)` — caller supplies a completion backend.
302 /// [`Self::completion_backend`] returns `Some(clone)`.
303 /// - `None` — this backend does not support push-based completion
304 /// (future Postgres backend without LISTEN/NOTIFY, test mocks).
305 /// [`Self::completion_backend`] returns `None`.
306 ///
307 /// When the underlying backend implements both traits (as
308 /// `ValkeyBackend` does), pass the same `Arc` twice — the two
309 /// trait-object views share one allocation:
310 ///
311 /// ```rust,ignore
312 /// use std::sync::Arc;
313 /// use ff_backend_valkey::ValkeyBackend;
314 /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
315 ///
316 /// # async fn doc(worker_config: WorkerConfig,
317 /// # backend_config: ff_backend_valkey::BackendConfig)
318 /// # -> Result<(), ff_sdk::SdkError> {
319 /// // Valkey (completion supported):
320 /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
321 /// let worker = FlowFabricWorker::connect_with(
322 /// worker_config,
323 /// valkey.clone(),
324 /// Some(valkey),
325 /// ).await?;
326 /// # Ok(()) }
327 /// ```
328 ///
329 /// Backend without completion support:
330 ///
331 /// ```rust,ignore
332 /// let worker = FlowFabricWorker::connect_with(
333 /// worker_config,
334 /// backend,
335 /// None,
336 /// ).await?;
337 /// ```
338 ///
339 /// **Stage 1b + Round-7 scope — what the injected backend covers
340 /// today.** The injected backend currently covers these per-task
341 /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
342 /// `delay_execution` / `move_to_waiting_children` / `complete` /
343 /// `cancel` / `fail` / `create_pending_waitpoint` /
344 /// `append_frame` / `report_usage`. A mock backend therefore sees
345 /// that portion of the worker's per-task write surface. Lease
346 /// renewal also routes through `backend.renew(&handle)`. Round-7
347 /// (#135/#145) closed the four trait-shape gaps tracked by #117,
348 /// but `suspend` still reaches the embedded `ferriskey::Client`
349 /// directly via `ff_suspend_execution` — this is the deferred
350 /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
351 /// work. `claim_next` / `claim_from_grant` /
352 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
353 /// are Stage 1c hot-path work. Stage 1d removes the embedded
354 /// client entirely.
355 ///
356 /// Today's constructor is therefore NOT yet a drop-in way to swap
357 /// in a non-Valkey backend — it requires a reachable Valkey node
358 /// for `suspend` plus the remaining hot-path ops. Tests that
359 /// exercise only the migrated per-task ops can run fully against
360 /// a mock backend.
361 ///
362 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
363 /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
364 pub async fn connect_with(
365 config: WorkerConfig,
366 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
367 completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
368 ) -> Result<Self, SdkError> {
369 // RFC-023 Phase 1a (§4.4 item 10e, v0.12.0): direct
370 // backend-agnostic construction. No `Self::connect` preamble,
371 // no ferriskey round-trips (no PING, no alive-key SET-NX, no
372 // `ff:config:partitions` HGETALL). Callers needing a
373 // non-default `PartitionConfig` under non-Valkey backends set
374 // [`WorkerConfig::partition_config`] (v0.12 PR-6 closed the
375 // original follow-up flagged here).
376 //
377 // Lane-empty validation (the one check `connect` did before
378 // any Valkey work) is hoisted here so every entry point
379 // refuses an empty lane list.
380 if config.lanes.is_empty() {
381 return Err(SdkError::Config {
382 context: "worker_config".into(),
383 field: None,
384 message: "at least one lane is required".into(),
385 });
386 }
387
388 let max_tasks = config.max_concurrent_tasks.max(1);
389 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
390 // v0.12 PR-6: honor the optional `WorkerConfig::partition_config`
391 // override. `None` keeps the pre-PR-6 default shape (256 / 32 /
392 // 32); `Some(cfg)` lets non-Valkey deployments with a custom
393 // `num_flow_partitions` bind correctly instead of silently
394 // missing data via the wrong partition index.
395 let partition_config = config.partition_config.unwrap_or_default();
396
397 #[cfg(feature = "direct-valkey-claim")]
398 let scan_cursor_init = scan_cursor_seed(
399 config.worker_instance_id.as_str(),
400 partition_config.num_flow_partitions.max(1) as usize,
401 );
402
403 // Build the capabilities CSV / hash inline for the
404 // direct-valkey-claim path. The validation mirrors `connect`'s
405 // ingress so the two entry points refuse the same malformed
406 // tokens.
407 #[cfg(feature = "direct-valkey-claim")]
408 for cap in &config.capabilities {
409 if cap.is_empty() {
410 return Err(SdkError::Config {
411 context: "worker_config".into(),
412 field: Some("capabilities".into()),
413 message: "capability token must not be empty".into(),
414 });
415 }
416 if cap.contains(',') {
417 return Err(SdkError::Config {
418 context: "worker_config".into(),
419 field: Some("capabilities".into()),
420 message: format!(
421 "capability token may not contain ',' (CSV delimiter): {cap:?}"
422 ),
423 });
424 }
425 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
426 return Err(SdkError::Config {
427 context: "worker_config".into(),
428 field: Some("capabilities".into()),
429 message: format!(
430 "capability token must not contain whitespace or control \
431 characters: {cap:?}"
432 ),
433 });
434 }
435 }
436 #[cfg(feature = "direct-valkey-claim")]
437 let worker_capabilities_hash = {
438 let set: std::collections::BTreeSet<&str> = config
439 .capabilities
440 .iter()
441 .map(|s| s.as_str())
442 .filter(|s| !s.is_empty())
443 .collect();
444 let csv: String = set.into_iter().collect::<Vec<_>>().join(",");
445 ff_core::hash::fnv1a_xor8hex(&csv)
446 };
447
448 Ok(Self {
449 #[cfg(feature = "valkey-default")]
450 client: None,
451 config,
452 partition_config,
453 #[cfg(feature = "direct-valkey-claim")]
454 worker_capabilities_hash,
455 #[cfg(feature = "direct-valkey-claim")]
456 lane_index: AtomicUsize::new(0),
457 concurrency_semaphore,
458 #[cfg(feature = "direct-valkey-claim")]
459 scan_cursor: AtomicUsize::new(scan_cursor_init),
460 backend,
461 completion_backend_handle: completion,
462 })
463 }
464
465 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
466 /// ops through.
467 ///
468 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
469 /// the `Option` wrapper is retained for API stability with the
470 /// Stage-1a shape. Stage 1c narrows the return type to
471 /// `&Arc<dyn EngineBackend>`.
472 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
473 Some(&self.backend)
474 }
475
476 /// Crate-internal direct borrow of the backend. The public
477 /// [`Self::backend`] still returns `Option` for API stability
478 /// (Stage 1b holdover). Snapshot trait-forwarders in
479 /// [`crate::snapshot`] need an un-wrapped reference.
480 #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
481 pub(crate) fn backend_ref(
482 &self,
483 ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
484 &self.backend
485 }
486
487 /// Handle to the completion-event subscription backend, for
488 /// consumers that need to observe execution completions (DAG
489 /// reconcilers, tenant-isolated subscribers).
490 ///
491 /// Returns `Some` when the worker was built through
492 /// [`Self::connect`] on the default `valkey-default` feature
493 /// (the bundled `ValkeyBackend` implements
494 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
495 /// or via [`Self::connect_with`] with a `Some(..)` completion
496 /// handle. Returns `None` when the caller passed `None` to
497 /// [`Self::connect_with`] — i.e. the backend does not support
498 /// push-based completion streams (future Postgres without
499 /// LISTEN/NOTIFY, test mocks).
500 ///
501 /// The returned handle shares the same underlying allocation as
502 /// [`Self::backend`]; calls through it (e.g.
503 /// `subscribe_completions_filtered`) hit the same connection
504 /// the worker itself uses.
505 pub fn completion_backend(
506 &self,
507 ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
508 self.completion_backend_handle.clone()
509 }
510
511 /// Get the worker config.
512 pub fn config(&self) -> &WorkerConfig {
513 &self.config
514 }
515
516 /// Get the server-published partition config this worker bound to at
517 /// `connect()`. Exposed so consumers that mint custom
518 /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
519 /// produced outside this worker) stay aligned with the server's
520 /// `num_flow_partitions` — using `PartitionConfig::default()`
521 /// assumes 256 partitions and silently misses data on deployments
522 /// with any other value.
523 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
524 &self.partition_config
525 }
526
527 /// Attempt to claim the next eligible execution.
528 ///
529 /// Phase 1 simplified claim flow:
530 /// 1. Pick a lane (round-robin across configured lanes)
531 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
532 /// 3. Claim the execution via `ff_claim_execution`
533 /// 4. Read execution payload + tags
534 /// 5. Return a [`ClaimedTask`] with auto lease renewal
535 ///
536 /// Gated behind the `direct-valkey-claim` feature — bypasses the
537 /// scheduler's budget / quota / capability admission checks. Enable
538 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
539 /// the scheduler hop would be measurement noise (benches) or when
540 /// the test harness needs a deterministic worker-local path. Prefer
541 /// the scheduler-routed HTTP claim path in production.
542 ///
543 /// # `None` semantics
544 ///
545 /// `Ok(None)` means **no work was found in the partition window this
546 /// poll covered**, not "the cluster is idle". Each call scans a chunk
547 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
548 /// `scan_cursor`; the cursor advances by that chunk size on every
549 /// invocation, so a worker covers every partition exactly once every
550 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
551 ///
552 /// Callers should treat `None` as "poll again soon" (typically after
553 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
554 /// time". Backing off too aggressively on `None` can starve workers
555 /// when work lives on partitions outside the current window.
556 ///
557 /// Returns `Err` on Valkey errors or script failures.
558 #[cfg(feature = "direct-valkey-claim")]
559 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
560 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
561 // try_acquire returns immediately — if no permits available, the worker
562 // is at capacity and should not claim more work.
563 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
564 Ok(p) => p,
565 Err(_) => return Ok(None), // At capacity — no claim attempted
566 };
567
568 let lane_id = self.next_lane();
569 let now = TimestampMs::now();
570
571 // Phase 1: We scan eligible executions directly by reading the eligible
572 // ZSET across execution partitions. In production the scheduler
573 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
574 // simplified inline claim.
575 //
576 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
577 // partitions starting at a rolling offset. This keeps idle Valkey
578 // load at O(chunk) per worker-second instead of O(num_partitions),
579 // and the worker-instance-seeded initial cursor spreads concurrent
580 // workers across different partition windows.
581 let num_partitions = self.partition_config.num_flow_partitions as usize;
582 if num_partitions == 0 {
583 return Ok(None);
584 }
585 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
586 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
587
588 // Hoist the sorted/deduped capability set out of the loop — the
589 // per-partition iteration reused a fresh BTreeSet on every
590 // step pre-fix, adding O(n log n) alloc/sort to the scanner
591 // hot path on every tick. Computed once per `claim_next` call.
592 let worker_capabilities: std::collections::BTreeSet<String> = self
593 .config
594 .capabilities
595 .iter()
596 .cloned()
597 .collect();
598
599 for step in 0..chunk {
600 let partition_idx = ((start + step) % num_partitions) as u16;
601 let partition = ff_core::partition::Partition {
602 family: ff_core::partition::PartitionFamily::Execution,
603 index: partition_idx,
604 };
605
606 // v0.12 PR-5: trait-routed scanner primitive. Replaces the
607 // pre-PR-5 `ZRANGEBYSCORE` inline; the Valkey backend fires
608 // the identical command byte-for-byte (see
609 // `ff_backend_valkey::scan_eligible_executions_impl`).
610 let scan_args = ff_core::contracts::ScanEligibleArgs::new(
611 lane_id.clone(),
612 partition,
613 1,
614 );
615 let candidates = self.backend.scan_eligible_executions(scan_args).await?;
616 let execution_id = match candidates.into_iter().next() {
617 Some(id) => id,
618 None => continue, // No eligible executions on this partition
619 };
620
621 // Step 1: Issue claim grant (v0.12 PR-5: trait-routed).
622 let grant_args = ff_core::contracts::IssueClaimGrantArgs::new(
623 execution_id.clone(),
624 lane_id.clone(),
625 self.config.worker_id.clone(),
626 self.config.worker_instance_id.clone(),
627 partition,
628 worker_capabilities.clone(),
629 5_000, // grant_ttl_ms
630 now,
631 );
632 let grant_result = self.backend.issue_claim_grant(grant_args).await;
633
634 match grant_result {
635 Ok(_) => {}
636 Err(ref boxed)
637 if matches!(
638 boxed,
639 crate::EngineError::Validation {
640 kind: crate::ValidationKind::CapabilityMismatch,
641 ..
642 }
643 ) =>
644 {
645 let missing = match boxed {
646 crate::EngineError::Validation { detail, .. } => detail.clone(),
647 _ => unreachable!(),
648 };
649 // Block-on-mismatch (RFC-009 §7.5) — parity with
650 // ff-scheduler's Scheduler::claim_for_worker. Without
651 // this, the inline-direct-claim path would hot-loop
652 // on an unclaimable top-of-zset (every tick picks the
653 // same execution, wastes an FCALL, logs, releases,
654 // repeats). The scheduler-side unblock scanner
655 // promotes blocked_route executions back to eligible
656 // when a worker with matching caps registers.
657 tracing::info!(
658 execution_id = %execution_id,
659 worker_id = %self.config.worker_id,
660 worker_caps_hash = %self.worker_capabilities_hash,
661 missing = %missing,
662 "capability mismatch, blocking execution off eligible (SDK inline claim)"
663 );
664 // v0.12 PR-5: trait-routed block_route. Swallow
665 // typed outcomes + transport faults (best-effort
666 // semantic preserved from pre-PR-5 behaviour —
667 // see `BlockRouteOutcome::LuaRejected` rustdoc).
668 let block_args = ff_core::contracts::BlockRouteArgs::new(
669 execution_id.clone(),
670 lane_id.clone(),
671 partition,
672 "waiting_for_capable_worker".to_owned(),
673 "no connected worker satisfies required_capabilities".to_owned(),
674 now,
675 );
676 match self.backend.block_route(block_args).await {
677 Ok(ff_core::contracts::BlockRouteOutcome::Blocked { .. }) => {}
678 Ok(ff_core::contracts::BlockRouteOutcome::LuaRejected { message }) => {
679 tracing::warn!(
680 execution_id = %execution_id,
681 error = %message,
682 "SDK block_route: Lua rejected; eligible ZSET unchanged, next \
683 poll will re-evaluate"
684 );
685 }
686 Ok(_) => {}
687 Err(e) => {
688 tracing::warn!(
689 execution_id = %execution_id,
690 error = %e,
691 "SDK block_route: transport failure; eligible ZSET unchanged"
692 );
693 }
694 }
695 continue;
696 }
697 Err(ref e) if is_retryable_claim_error(e) => {
698 tracing::debug!(
699 execution_id = %execution_id,
700 error = %e,
701 "claim grant failed (retryable), trying next partition"
702 );
703 continue;
704 }
705 Err(e) => return Err(SdkError::from(e)),
706 }
707
708 // Step 2: Claim the execution
709 match self
710 .claim_execution(&execution_id, &lane_id, &partition, now)
711 .await
712 {
713 Ok(mut task) => {
714 // Transfer concurrency permit to the task. When the task is
715 // completed/failed/cancelled/dropped the permit returns to
716 // the semaphore, allowing another claim.
717 task.set_concurrency_permit(permit);
718 return Ok(Some(task));
719 }
720 Err(SdkError::Engine(ref boxed))
721 if matches!(
722 **boxed,
723 crate::EngineError::Contention(
724 crate::ContentionKind::UseClaimResumedExecution
725 )
726 ) =>
727 {
728 // Execution was resumed from suspension — attempt_interrupted.
729 // ff_claim_execution rejects this; use ff_claim_resumed_execution
730 // which reuses the existing attempt instead of creating a new one.
731 tracing::debug!(
732 execution_id = %execution_id,
733 "execution is resumed, using claim_resumed path"
734 );
735 match self
736 .claim_resumed_execution(&execution_id, &lane_id, &partition)
737 .await
738 {
739 Ok(mut task) => {
740 task.set_concurrency_permit(permit);
741 return Ok(Some(task));
742 }
743 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
744 tracing::debug!(
745 execution_id = %execution_id,
746 error = %e2,
747 "claim_resumed failed (retryable), trying next partition"
748 );
749 continue;
750 }
751 Err(e2) => return Err(e2),
752 }
753 }
754 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
755 tracing::debug!(
756 execution_id = %execution_id,
757 error = %e,
758 "claim execution failed (retryable), trying next partition"
759 );
760 continue;
761 }
762 Err(e) => return Err(e),
763 }
764 }
765
766 // No eligible work found on any partition
767 Ok(None)
768 }
769
770 /// Low-level claim of a granted execution. Routes through
771 /// [`EngineBackend::claim_execution`](ff_core::engine_backend::EngineBackend::claim_execution)
772 /// — the trait-level grant-consumer method landed in v0.12 PR-4 —
773 /// and returns a `ClaimedTask` with auto lease renewal.
774 ///
775 /// Pre-PR-4 the SDK fired the `ff_claim_execution` FCALL directly
776 /// against `valkey_client()` and hand-parsed the Lua response shape.
777 /// The body now collapses to `backend.claim_execution(args)` +
778 /// `backend.read_execution_context(...)` + `ClaimedTask::new(...)`;
779 /// the Valkey-specific FCALL plumbing lives behind the trait in
780 /// `ff_backend_valkey::claim_execution_impl`.
781 ///
782 /// As of v0.12 PR-5.5 this helper + `claim_from_grant` are no
783 /// longer `valkey-default`-gated: the backend mints the `Handle`
784 /// at claim time and `ClaimedTask::new` caches it, so no
785 /// Valkey-specific handle synthesis is required on this path. The
786 /// `EngineBackend::claim_execution` default impl returns
787 /// `Err(Unavailable)` on PG/SQLite today (grant-consumer surface
788 /// is Valkey-only until the PG/SQLite grant-consumer RFC lands);
789 /// the compile surface is fully agnostic.
790 ///
791 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
792 async fn claim_execution(
793 &self,
794 execution_id: &ExecutionId,
795 lane_id: &LaneId,
796 partition: &ff_core::partition::Partition,
797 now: TimestampMs,
798 ) -> Result<ClaimedTask, SdkError> {
799 // v0.12 PR-5.5 retry-path fix — pre-read the **total attempt
800 // counter**, not the current-attempt pointer. The fresh-claim
801 // path mints a new attempt row whose index is the counter's
802 // current value (the backend's Lua 5920 / PG `ff_claim_execution`
803 // / SQLite `claim_impl` all consult this same counter to compute
804 // `next_att_idx`). Pre-PR-5.5 this call went inline as `HGET
805 // {exec}:core total_attempt_count` on the Valkey client; the
806 // first PR-5.5 landing mistakenly routed it to
807 // `read_current_attempt_index`, which returns the *pointer* at
808 // the previously-leased attempt — on a retry-of-a-retry that
809 // still named the terminal-failed prior attempt and the new
810 // claim collided with it. `read_total_attempt_count` wraps
811 // the same byte-for-byte HGET on Valkey and provides the JSONB
812 // / json_extract equivalents on PG / SQLite. See
813 // `EngineBackend::read_total_attempt_count` rustdoc for the
814 // pointer-vs-counter distinction.
815 let att_idx = self
816 .backend
817 .read_total_attempt_count(execution_id)
818 .await
819 .map_err(|e| SdkError::Engine(Box::new(e)))?;
820
821 let args = ff_core::contracts::ClaimExecutionArgs::new(
822 execution_id.clone(),
823 self.config.worker_id.clone(),
824 self.config.worker_instance_id.clone(),
825 lane_id.clone(),
826 LeaseId::new(),
827 self.config.lease_ttl_ms,
828 AttemptId::new(),
829 att_idx,
830 "{}".to_owned(),
831 None,
832 None,
833 now,
834 );
835
836 // `ClaimExecutionResult` is `#[non_exhaustive]` (v0.12 PR-4)
837 // so let-binding requires an explicit wildcard arm even though
838 // `Claimed` is the only variant today. A future additive variant
839 // surfaces here as a typed `ScriptError::Parse` — loud, routed
840 // through the existing SDK error path, and never silently
841 // dropped.
842 let claimed = match self.backend.claim_execution(args).await? {
843 ff_core::contracts::ClaimExecutionResult::Claimed(c) => c,
844 other => {
845 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
846 fcall: "ff_claim_execution".into(),
847 execution_id: Some(execution_id.to_string()),
848 message: format!(
849 "unexpected ClaimExecutionResult variant: {other:?}"
850 ),
851 }));
852 }
853 };
854
855 // Read execution payload and metadata
856 let (input_payload, execution_kind, tags) = self
857 .read_execution_context(execution_id, partition)
858 .await?;
859
860 Ok(ClaimedTask::new(
861 self.backend.clone(),
862 self.partition_config,
863 claimed.handle,
864 execution_id.clone(),
865 claimed.attempt_index,
866 claimed.attempt_id,
867 claimed.lease_id,
868 claimed.lease_epoch,
869 self.config.lease_ttl_ms,
870 lane_id.clone(),
871 self.config.worker_instance_id.clone(),
872 input_payload,
873 execution_kind,
874 tags,
875 ))
876 }
877
878 /// Consume a [`ClaimGrant`] and claim the granted execution on
879 /// this worker. The intended production entry point: pair with
880 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
881 /// scheduler-issued grants into the SDK without enabling the
882 /// `direct-valkey-claim` feature (which bypasses budget/quota
883 /// admission control).
884 ///
885 /// The worker's concurrency semaphore is checked BEFORE the FCALL
886 /// so a saturated worker does not consume the grant: the grant
887 /// stays valid for its remaining TTL and the caller can either
888 /// release it back to the scheduler or retry after some other
889 /// in-flight task completes.
890 ///
891 /// On success the returned [`ClaimedTask`] holds a concurrency
892 /// permit that releases automatically on
893 /// `complete`/`fail`/`cancel`/drop — same contract as
894 /// `claim_next`.
895 ///
896 /// # Arguments
897 ///
898 /// * `lane` — the lane the grant was issued for. Must match what
899 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
900 /// uses it to look up `lane_eligible`, `lane_active`, and the
901 /// `worker_leases` index slot.
902 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
903 ///
904 /// # Errors
905 ///
906 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
907 /// permits all held. Retryable; the grant is untouched.
908 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
909 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
910 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
911 /// (wrapped in [`SdkError::Engine`]).
912 /// * `ScriptError::CapabilityMismatch` — execution's required
913 /// capabilities not a subset of this worker's caps (wrapped in
914 /// [`SdkError::Engine`]). Surfaced post-grant if a race
915 /// between grant issuance and caps change allows it.
916 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
917 /// unexpected shape (wrapped in [`SdkError::Engine`]).
918 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
919 /// transport error during the FCALL or the
920 /// `read_execution_context` follow-up.
921 ///
922 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
923 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
924 ///
925 /// # Backend coverage (v0.12 PR-5.5)
926 ///
927 /// Method ungated across backends. The Valkey backend handles the
928 /// grant fully. Postgres + SQLite backends return
929 /// [`EngineError::Unavailable`](ff_core::engine_error::EngineError::Unavailable)
930 /// from [`EngineBackend::claim_execution`] today — grants on those
931 /// backends flow through the scheduler-routed [`claim_via_server`]
932 /// path (the PG/SQLite scheduler lives outside the `EngineBackend`
933 /// trait in this release). See
934 /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation and
935 /// planned follow-up.
936 ///
937 /// [`claim_via_server`]: FlowFabricWorker::claim_via_server
938 /// [`EngineBackend::claim_execution`]: ff_core::engine_backend::EngineBackend::claim_execution
939 pub async fn claim_from_grant(
940 &self,
941 lane: LaneId,
942 grant: ff_core::contracts::ClaimGrant,
943 ) -> Result<ClaimedTask, SdkError> {
944 // Semaphore check FIRST. If the worker is saturated we must
945 // surface the condition to the caller without touching the
946 // grant — silently returning Ok(None) (as claim_next does)
947 // would drop a grant the scheduler has already committed work
948 // to issuing, wasting the slot until its TTL elapses.
949 let permit = self
950 .concurrency_semaphore
951 .clone()
952 .try_acquire_owned()
953 .map_err(|_| SdkError::WorkerAtCapacity)?;
954
955 let now = TimestampMs::now();
956 let partition = grant.partition().map_err(|e| SdkError::Config {
957 context: "claim_from_grant".to_owned(),
958 field: Some("partition_key".to_owned()),
959 message: e.to_string(),
960 })?;
961 let mut task = match self
962 .claim_execution(&grant.execution_id, &lane, &partition, now)
963 .await
964 {
965 Ok(task) => task,
966 Err(SdkError::Engine(ref boxed))
967 if matches!(
968 **boxed,
969 crate::EngineError::Contention(
970 crate::ContentionKind::UseClaimResumedExecution
971 )
972 ) =>
973 {
974 // Execution was resumed from suspension — attempt_interrupted.
975 // ff_claim_execution rejects this; use ff_claim_resumed_execution
976 // which reuses the existing attempt instead of creating a new one.
977 // Mirrors the fallback inside `claim_next` so HTTP-routed callers
978 // (`claim_via_server` → `claim_from_grant`) get the same transparent
979 // re-claim behavior.
980 tracing::debug!(
981 execution_id = %grant.execution_id,
982 "execution is resumed, using claim_resumed path"
983 );
984 self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
985 .await?
986 }
987 Err(e) => return Err(e),
988 };
989 task.set_concurrency_permit(permit);
990 Ok(task)
991 }
992
993 /// Scheduler-routed claim: POST the server's
994 /// `/v1/workers/{id}/claim`, then chain to
995 /// [`Self::claim_from_grant`].
996 ///
997 /// Batch C item 2 PR-B. This is the production entry point —
998 /// budget + quota + capability admission run server-side inside
999 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1000 /// enable the `direct-valkey-claim` feature.
1001 ///
1002 /// Returns `Ok(None)` when the server says no eligible execution
1003 /// (HTTP 204). Callers typically back off by
1004 /// `config.claim_poll_interval_ms` and try again, same cadence
1005 /// as the direct-claim path's `Ok(None)`.
1006 ///
1007 /// The `admin` client is the established HTTP surface
1008 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1009 /// second reqwest client around. Build once at worker boot and
1010 /// hand in by reference on every claim.
1011 pub async fn claim_via_server(
1012 &self,
1013 admin: &crate::FlowFabricAdminClient,
1014 lane: &LaneId,
1015 grant_ttl_ms: u64,
1016 ) -> Result<Option<ClaimedTask>, SdkError> {
1017 let req = crate::admin::ClaimForWorkerRequest {
1018 worker_id: self.config.worker_id.to_string(),
1019 lane_id: lane.to_string(),
1020 worker_instance_id: self.config.worker_instance_id.to_string(),
1021 capabilities: self.config.capabilities.clone(),
1022 grant_ttl_ms,
1023 };
1024 let Some(resp) = admin.claim_for_worker(req).await? else {
1025 return Ok(None);
1026 };
1027 let grant = resp.into_grant()?;
1028 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1029 }
1030
1031 /// Consume a [`ResumeGrant`] and transition the granted
1032 /// `attempt_interrupted` execution into a `started` state on this
1033 /// worker. Symmetric partner to [`claim_from_grant`] for the
1034 /// resume path.
1035 ///
1036 /// **Renamed from `claim_from_reclaim_grant` (RFC-024 PR-B+C).**
1037 /// The new `claim_from_reclaim_grant` method lands with PR-G and
1038 /// dispatches to [`EngineBackend::reclaim_execution`] for the
1039 /// lease-reclaim path (distinct semantic, distinct grant type).
1040 ///
1041 /// The grant must have been issued to THIS worker (matching
1042 /// `worker_id` at grant time). A mismatch returns
1043 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1044 /// atomically by `ff_claim_resumed_execution`; a second call with
1045 /// the same grant also returns `InvalidClaimGrant`.
1046 ///
1047 /// # Concurrency
1048 ///
1049 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1050 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1051 /// assume pre-existing capacity on this worker — a reclaim can
1052 /// land on a fresh worker instance that just came up after a
1053 /// crash/restart and is picking up a previously-interrupted
1054 /// execution. If the worker is saturated, the grant stays valid
1055 /// for its remaining TTL and the caller can release it or retry.
1056 ///
1057 /// On success the returned [`ClaimedTask`] holds a concurrency
1058 /// permit that releases automatically on
1059 /// `complete`/`fail`/`cancel`/drop.
1060 ///
1061 /// # Errors
1062 ///
1063 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1064 /// permits all held. Retryable; the grant is untouched (no
1065 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1066 /// atomically consume the grant key).
1067 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1068 /// or `worker_id` mismatch.
1069 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1070 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1071 /// `attempt_interrupted`.
1072 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1073 /// not `runnable`.
1074 /// * `ScriptError::ExecutionNotFound` — core key missing.
1075 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1076 /// transport.
1077 ///
1078 /// [`ResumeGrant`]: ff_core::contracts::ResumeGrant
1079 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1080 pub async fn claim_from_resume_grant(
1081 &self,
1082 grant: ff_core::contracts::ResumeGrant,
1083 ) -> Result<ClaimedTask, SdkError> {
1084 // Semaphore check FIRST — same load-bearing ordering as
1085 // `claim_from_grant`. If the worker is saturated, surface
1086 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1087 // atomic consume on the grant key, so calling it past-
1088 // saturation would destroy the grant while leaving no
1089 // permit to attach to the returned `ClaimedTask`.
1090 let permit = self
1091 .concurrency_semaphore
1092 .clone()
1093 .try_acquire_owned()
1094 .map_err(|_| SdkError::WorkerAtCapacity)?;
1095
1096 // Grant carries partition + lane_id so no round-trip is needed
1097 // to resolve them before the FCALL.
1098 let partition = grant.partition().map_err(|e| SdkError::Config {
1099 context: "claim_from_resume_grant".to_owned(),
1100 field: Some("partition_key".to_owned()),
1101 message: e.to_string(),
1102 })?;
1103 let mut task = self
1104 .claim_resumed_execution(
1105 &grant.execution_id,
1106 &grant.lane_id,
1107 &partition,
1108 )
1109 .await?;
1110 task.set_concurrency_permit(permit);
1111 Ok(task)
1112 }
1113
1114 /// Consume a [`ReclaimGrant`] to mint a fresh attempt for a
1115 /// lease-expired / lease-revoked execution (RFC-024 §3.4).
1116 ///
1117 /// Backend-agnostic. Routes through
1118 /// [`EngineBackend::reclaim_execution`] on whatever backend the
1119 /// worker was connected with (Valkey via `connect` / `connect_with`,
1120 /// Postgres / SQLite via `connect_with`). Distinct from
1121 /// [`claim_from_resume_grant`]: reclaim creates a NEW attempt row
1122 /// and bumps `lease_reclaim_count` (`HandleKind::Reclaimed`), while
1123 /// resume re-uses the existing attempt under
1124 /// `ff_claim_resumed_execution` (`HandleKind::Resumed`).
1125 ///
1126 /// # Return shape
1127 ///
1128 /// Returns the raw [`ReclaimExecutionOutcome`] so consumers match
1129 /// on the four outcomes (`Claimed(Handle)`, `NotReclaimable`,
1130 /// `ReclaimCapExceeded`, `GrantNotFound`) and decide their
1131 /// dispatch. The `Claimed` variant carries a [`Handle`] whose
1132 /// `kind` is [`HandleKind::Reclaimed`]; downstream ops (complete,
1133 /// fail, renew, append_frame, …) take the handle directly via the
1134 /// `EngineBackend` trait.
1135 ///
1136 /// This contrasts with [`claim_from_resume_grant`], which wraps
1137 /// the handle in a [`ClaimedTask`] with a concurrency-permit and
1138 /// auto-lease-renewal loop. Those affordances are
1139 /// `valkey-default`-gated today (they depend on the bundled
1140 /// `ferriskey::Client` + `lease_ttl_ms` renewal timer). The
1141 /// reclaim surface is intentionally narrower so it compiles under
1142 /// `--no-default-features, features = ["sqlite"]` and consumers
1143 /// can drive the reclaim flow on any backend.
1144 ///
1145 /// # Feature compatibility
1146 ///
1147 /// No cfg-gate. Compiles + runs under every feature set ff-sdk
1148 /// supports (including sqlite-only). Verified by the compile-time
1149 /// type assertion
1150 /// `worker_claim_from_reclaim_grant_is_backend_agnostic_at_type_level`
1151 /// in `crates/ff-sdk/tests/rfc024_sdk.rs`, which pins the method's
1152 /// full signature under the default feature set and is paralleled
1153 /// by `sqlite_only_compile_surface_tests` in this file for the
1154 /// `--no-default-features, features = ["sqlite"]` compile anchor on
1155 /// the rest of the backend-agnostic surface.
1156 ///
1157 /// # worker_capabilities
1158 ///
1159 /// `ReclaimExecutionArgs::worker_capabilities` is NOT part of the
1160 /// Lua FCALL (the reclaim Lua validates grant consumption via
1161 /// `grant.worker_id == args.worker_id` only — see
1162 /// `crates/ff-script/src/flowfabric.lua:3088` and RFC-024 §4.4).
1163 /// Capability matching happens at grant-issuance time (see
1164 /// [`FlowFabricAdminClient::issue_reclaim_grant`]
1165 /// — in the `ff-sdk::admin` module, `valkey-default`-gated).
1166 ///
1167 /// # Errors
1168 ///
1169 /// * [`SdkError::Engine`] — the backend's `reclaim_execution`
1170 /// returned an [`EngineError`] (transport fault, validation
1171 /// failure, `Unavailable` from a backend that does not
1172 /// implement RFC-024 — currently only pre-RFC out-of-tree
1173 /// backends).
1174 ///
1175 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1176 /// [`ReclaimExecutionOutcome`]: ff_core::contracts::ReclaimExecutionOutcome
1177 /// [`Handle`]: ff_core::backend::Handle
1178 /// [`HandleKind::Reclaimed`]: ff_core::backend::HandleKind::Reclaimed
1179 /// [`EngineBackend::reclaim_execution`]: ff_core::engine_backend::EngineBackend::reclaim_execution
1180 /// [`EngineError`]: ff_core::engine_error::EngineError
1181 /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1182 /// [`FlowFabricAdminClient::issue_reclaim_grant`]: crate::admin::FlowFabricAdminClient::issue_reclaim_grant
1183 pub async fn claim_from_reclaim_grant(
1184 &self,
1185 grant: ff_core::contracts::ReclaimGrant,
1186 args: ff_core::contracts::ReclaimExecutionArgs,
1187 ) -> Result<ff_core::contracts::ReclaimExecutionOutcome, SdkError> {
1188 // `ReclaimGrant` is accepted as a parameter so the call-site
1189 // shape matches RFC-024 §3.4 (consumer receives a grant from
1190 // `issue_reclaim_grant` and feeds it + args into
1191 // `claim_from_reclaim_grant`). The grant metadata is
1192 // already embedded in the backend's server-side store
1193 // (Valkey `claim_grant` hash, PG/SQLite `ff_claim_grant`
1194 // table) keyed by (execution_id, grant_key) — the trait
1195 // method looks it up from `args.execution_id`.
1196 //
1197 // The overlap between `ReclaimGrant` and
1198 // `ReclaimExecutionArgs` is (execution_id, lane_id);
1199 // mismatched grant/args is a consumer-side bug (grant-for-A
1200 // + args-for-B), and silently forwarding lets the SDK act
1201 // on the args execution. Reject the mismatch up front so
1202 // the misuse surfaces at the SDK boundary instead of
1203 // succeeding against the wrong execution. The grant's
1204 // `expires_at_ms` is validated against wall-clock now so
1205 // an already-expired grant is rejected without a backend
1206 // round-trip (the backend also enforces expiry, but the
1207 // SDK-side check gives a crisper error and preserves the
1208 // reclaim-budget / lease slot).
1209 // `TimestampMs` is an `i64` (Unix-epoch ms); cast to `u64` for
1210 // comparison against `ReclaimGrant::expires_at_ms`. Clamp
1211 // negatives to 0 so a pre-epoch system clock (vanishingly
1212 // unlikely, but representable) doesn't wrap to a future u64.
1213 let now_ms: u64 = ff_core::types::TimestampMs::now().0.max(0) as u64;
1214 validate_reclaim_grant_against_args(&grant, &args, now_ms)?;
1215 self.backend
1216 .reclaim_execution(args)
1217 .await
1218 .map_err(|e| SdkError::Engine(Box::new(e)))
1219 }
1220
1221 /// Low-level resume claim. Forwards through
1222 /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1223 /// — the trait-level trigger surface landed in issue #150 — and
1224 /// returns a [`ClaimedTask`] bound to the resumed attempt.
1225 ///
1226 /// The method stays private; external callers use
1227 /// [`claim_from_resume_grant`].
1228 ///
1229 /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1230 async fn claim_resumed_execution(
1231 &self,
1232 execution_id: &ExecutionId,
1233 lane_id: &LaneId,
1234 partition: &ff_core::partition::Partition,
1235 ) -> Result<ClaimedTask, SdkError> {
1236 // v0.12 PR-3 — pre-read current_attempt_index via the trait
1237 // rather than an inline `HGET` on the Valkey client. Load-bearing:
1238 // the backend's KEYS[6] (Valkey) / `ff_attempt` PK tuple (PG/SQLite)
1239 // must target the real existing attempt hash/row, and the backend
1240 // takes the index verbatim from
1241 // `ClaimResumedExecutionArgs::current_attempt_index`.
1242 let att_idx = self
1243 .backend
1244 .read_current_attempt_index(execution_id)
1245 .await
1246 .map_err(|e| SdkError::Engine(Box::new(e)))?;
1247
1248 let args = ff_core::contracts::ClaimResumedExecutionArgs {
1249 execution_id: execution_id.clone(),
1250 worker_id: self.config.worker_id.clone(),
1251 worker_instance_id: self.config.worker_instance_id.clone(),
1252 lane_id: lane_id.clone(),
1253 lease_id: LeaseId::new(),
1254 lease_ttl_ms: self.config.lease_ttl_ms,
1255 current_attempt_index: att_idx,
1256 remaining_attempt_timeout_ms: None,
1257 now: TimestampMs::now(),
1258 };
1259
1260 let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1261 self.backend.claim_resumed_execution(args).await?;
1262
1263 let (input_payload, execution_kind, tags) = self
1264 .read_execution_context(execution_id, partition)
1265 .await?;
1266
1267 Ok(ClaimedTask::new(
1268 self.backend.clone(),
1269 self.partition_config,
1270 claimed.handle,
1271 execution_id.clone(),
1272 claimed.attempt_index,
1273 claimed.attempt_id,
1274 claimed.lease_id,
1275 claimed.lease_epoch,
1276 self.config.lease_ttl_ms,
1277 lane_id.clone(),
1278 self.config.worker_instance_id.clone(),
1279 input_payload,
1280 execution_kind,
1281 tags,
1282 ))
1283 }
1284
1285 /// Read payload + execution_kind + tags from exec_core.
1286 ///
1287 /// As of v0.12 PR-1 this forwards through
1288 /// [`EngineBackend::read_execution_context`](ff_core::engine_backend::EngineBackend::read_execution_context)
1289 /// rather than issuing direct GET/HGET/HGETALL against Valkey. The
1290 /// outer `valkey-default` gate + `(&ExecutionId, &Partition)`
1291 /// signature are preserved; hot-path decoupling (ungating this
1292 /// helper + its call sites in `claim_execution` and
1293 /// `claim_resumed_execution`) is PR-4/PR-5 scope per the v0.12
1294 /// agnostic-SDK plan.
1295 async fn read_execution_context(
1296 &self,
1297 execution_id: &ExecutionId,
1298 _partition: &ff_core::partition::Partition,
1299 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1300 let ctx = self.backend.read_execution_context(execution_id).await?;
1301 Ok((ctx.input_payload, ctx.execution_kind, ctx.tags))
1302 }
1303
1304 // ── Phase 3: Signal delivery ──
1305
1306 /// Deliver a signal to a suspended execution's waitpoint.
1307 ///
1308 /// The engine atomically records the signal, evaluates the resume condition,
1309 /// and optionally transitions the execution from `suspended` to `runnable`.
1310 ///
1311 /// Forwards through
1312 /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1313 /// — the trait-level trigger surface landed in issue #150.
1314 ///
1315 /// Backend-agnostic as of v0.12 PR-3. Compiles + runs under every
1316 /// feature set ff-sdk supports (including
1317 /// `--no-default-features --features sqlite`); pinned by
1318 /// `sqlite_only_compile_surface_tests::deliver_signal_addressable_under_sqlite_only`.
1319 pub async fn deliver_signal(
1320 &self,
1321 execution_id: &ExecutionId,
1322 waitpoint_id: &WaitpointId,
1323 signal: crate::task::Signal,
1324 ) -> Result<crate::task::SignalOutcome, SdkError> {
1325 let args = ff_core::contracts::DeliverSignalArgs {
1326 execution_id: execution_id.clone(),
1327 waitpoint_id: waitpoint_id.clone(),
1328 signal_id: ff_core::types::SignalId::new(),
1329 signal_name: signal.signal_name,
1330 signal_category: signal.signal_category,
1331 source_type: signal.source_type,
1332 source_identity: signal.source_identity,
1333 payload: signal.payload,
1334 payload_encoding: Some("json".to_owned()),
1335 correlation_id: None,
1336 idempotency_key: signal.idempotency_key,
1337 target_scope: "waitpoint".to_owned(),
1338 created_at: Some(TimestampMs::now()),
1339 dedup_ttl_ms: None,
1340 resume_delay_ms: None,
1341 max_signals_per_execution: None,
1342 signal_maxlen: None,
1343 waitpoint_token: signal.waitpoint_token,
1344 now: TimestampMs::now(),
1345 };
1346
1347 let result = self.backend.deliver_signal(args).await?;
1348 Ok(match result {
1349 ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1350 if effect == "resume_condition_satisfied" {
1351 crate::task::SignalOutcome::TriggeredResume { signal_id }
1352 } else {
1353 crate::task::SignalOutcome::Accepted { signal_id, effect }
1354 }
1355 }
1356 ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1357 crate::task::SignalOutcome::Duplicate {
1358 existing_signal_id: existing_signal_id.to_string(),
1359 }
1360 }
1361 })
1362 }
1363
1364 #[cfg(feature = "direct-valkey-claim")]
1365 fn next_lane(&self) -> LaneId {
1366 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1367 self.config.lanes[idx].clone()
1368 }
1369}
1370
1371#[cfg(feature = "direct-valkey-claim")]
1372fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1373 use ff_core::error::ErrorClass;
1374 matches!(
1375 ff_script::engine_error_ext::class(err),
1376 ErrorClass::Retryable | ErrorClass::Informational
1377 )
1378}
1379
1380/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1381/// instance id with FNV-1a to place distinct worker processes on different
1382/// partition windows from their first poll. Zero is valid for single-worker
1383/// clusters but spreads work in multi-worker deployments.
1384#[cfg(feature = "direct-valkey-claim")]
1385fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1386 if num_partitions == 0 {
1387 return 0;
1388 }
1389 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1390}
1391
1392/// Cross-check the [`ReclaimGrant`] handed back by
1393/// `issue_reclaim_grant` against the [`ReclaimExecutionArgs`] the
1394/// consumer is about to dispatch. Catches the grant-for-A + args-for-B
1395/// misuse at the SDK boundary before a backend round-trip (PR #407
1396/// review F1).
1397///
1398/// The overlap between the two types is `(execution_id, lane_id)`;
1399/// `partition_key` / `grant_key` live only on the grant and are
1400/// verified server-side. The grant's `expires_at_ms` is also
1401/// validated against `now_ms` so an expired grant fails fast without
1402/// burning a backend call (the backend enforces expiry too, but the
1403/// SDK-side check gives a crisper, typed error).
1404///
1405/// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1406/// [`ReclaimExecutionArgs`]: ff_core::contracts::ReclaimExecutionArgs
1407fn validate_reclaim_grant_against_args(
1408 grant: &ff_core::contracts::ReclaimGrant,
1409 args: &ff_core::contracts::ReclaimExecutionArgs,
1410 now_ms: u64,
1411) -> Result<(), SdkError> {
1412 if grant.execution_id != args.execution_id {
1413 return Err(SdkError::Config {
1414 context: "claim_from_reclaim_grant".to_owned(),
1415 field: Some("execution_id".to_owned()),
1416 message: format!(
1417 "grant.execution_id ({}) does not match args.execution_id ({})",
1418 grant.execution_id, args.execution_id
1419 ),
1420 });
1421 }
1422 if grant.lane_id != args.lane_id {
1423 return Err(SdkError::Config {
1424 context: "claim_from_reclaim_grant".to_owned(),
1425 field: Some("lane_id".to_owned()),
1426 message: format!(
1427 "grant.lane_id ({}) does not match args.lane_id ({})",
1428 grant.lane_id.as_str(),
1429 args.lane_id.as_str()
1430 ),
1431 });
1432 }
1433 if grant.expires_at_ms <= now_ms {
1434 return Err(SdkError::Config {
1435 context: "claim_from_reclaim_grant".to_owned(),
1436 field: Some("expires_at_ms".to_owned()),
1437 message: format!(
1438 "grant expired: expires_at_ms={} now_ms={}",
1439 grant.expires_at_ms, now_ms
1440 ),
1441 });
1442 }
1443 Ok(())
1444}
1445
1446#[cfg(test)]
1447mod reclaim_grant_validation_tests {
1448 //! Unit tests for `validate_reclaim_grant_against_args` — the
1449 //! SDK-side cross-check that catches grant/args mismatch (PR #407
1450 //! review F1). No Valkey / backend required: the helper is pure.
1451 use super::validate_reclaim_grant_against_args;
1452 use crate::SdkError;
1453 use ff_core::contracts::{ReclaimExecutionArgs, ReclaimGrant};
1454 use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1455 use ff_core::types::{
1456 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseId, WorkerId, WorkerInstanceId,
1457 };
1458
1459 const EXEC_A: &str = "{fp:7}:00000000-0000-4000-8000-000000000001";
1460 const EXEC_B: &str = "{fp:7}:00000000-0000-4000-8000-000000000002";
1461
1462 fn exec(s: &str) -> ExecutionId {
1463 ExecutionId::parse(s).expect("valid execution id")
1464 }
1465
1466 fn grant_for(execution_id: ExecutionId, lane: &str, expires_at_ms: u64) -> ReclaimGrant {
1467 ReclaimGrant::new(
1468 execution_id,
1469 PartitionKey::from(&Partition { family: PartitionFamily::Flow, index: 7 }),
1470 "reclaim:grant:abc".to_owned(),
1471 expires_at_ms,
1472 LaneId::new(lane),
1473 )
1474 }
1475
1476 fn args_for(execution_id: ExecutionId, lane: &str) -> ReclaimExecutionArgs {
1477 ReclaimExecutionArgs::new(
1478 execution_id,
1479 WorkerId::new("w1"),
1480 WorkerInstanceId::new("w1-i1"),
1481 LaneId::new(lane),
1482 None,
1483 LeaseId::new(),
1484 30_000,
1485 AttemptId::new(),
1486 "{}".to_owned(),
1487 None,
1488 WorkerInstanceId::new("w1-i0"),
1489 AttemptIndex::new(0),
1490 )
1491 }
1492
1493 #[test]
1494 fn accepts_matching_grant_and_args() {
1495 let g = grant_for(exec(EXEC_A), "main", 2_000);
1496 let a = args_for(exec(EXEC_A), "main");
1497 assert!(validate_reclaim_grant_against_args(&g, &a, 1_000).is_ok());
1498 }
1499
1500 #[test]
1501 fn rejects_mismatched_execution_id() {
1502 let g = grant_for(exec(EXEC_A), "main", 2_000);
1503 let a = args_for(exec(EXEC_B), "main");
1504 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1505 .expect_err("expected mismatched execution_id to fail");
1506 match err {
1507 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("execution_id")),
1508 other => panic!("expected Config error, got {other:?}"),
1509 }
1510 }
1511
1512 #[test]
1513 fn rejects_mismatched_lane_id() {
1514 let g = grant_for(exec(EXEC_A), "main", 2_000);
1515 let a = args_for(exec(EXEC_A), "other");
1516 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1517 .expect_err("expected mismatched lane_id to fail");
1518 match err {
1519 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("lane_id")),
1520 other => panic!("expected Config error, got {other:?}"),
1521 }
1522 }
1523
1524 #[test]
1525 fn rejects_expired_grant() {
1526 // expires_at_ms == now_ms is rejected (strict `<=`) so the
1527 // server's TTL enforcement window can't race us.
1528 let g = grant_for(exec(EXEC_A), "main", 1_000);
1529 let a = args_for(exec(EXEC_A), "main");
1530 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1531 .expect_err("expected expired grant to fail");
1532 match err {
1533 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1534 other => panic!("expected Config error, got {other:?}"),
1535 }
1536
1537 // Also rejected when already past expiry.
1538 let err2 = validate_reclaim_grant_against_args(&g, &a, 5_000)
1539 .expect_err("expected past-expiry grant to fail");
1540 match err2 {
1541 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1542 other => panic!("expected Config error, got {other:?}"),
1543 }
1544 }
1545}
1546
1547#[cfg(test)]
1548mod completion_accessor_type_tests {
1549 //! Type-level compile check that
1550 //! [`FlowFabricWorker::completion_backend`] returns an
1551 //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1552 //! the assertion is at the function-pointer type level and the
1553 //! #[test] body exists solely so the compiler elaborates it.
1554 use super::FlowFabricWorker;
1555 use ff_core::completion_backend::CompletionBackend;
1556 use std::sync::Arc;
1557
1558 #[test]
1559 fn completion_backend_accessor_signature() {
1560 // If this line compiles, the public accessor returns the
1561 // advertised type. The function is never called (no live
1562 // worker), so no I/O happens.
1563 let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1564 FlowFabricWorker::completion_backend;
1565 }
1566}
1567
1568/// RFC-023 Phase 1a §4.4 item 10g compile-only anchor. Parallels
1569/// `completion_accessor_type_tests` but fires under the sqlite-only
1570/// feature set: proves `FlowFabricWorker::connect_with` is callable
1571/// and returns the advertised type under `--no-default-features,
1572/// features = ["sqlite"]`. The matching `§9` CI cell (`cargo check
1573/// -p ff-sdk --no-default-features --features sqlite`) executes this
1574/// compile check mechanically so future PRs that accidentally reach
1575/// outside the `valkey-default` gate fail the build.
1576#[cfg(all(test, not(feature = "valkey-default")))]
1577mod sqlite_only_compile_surface_tests {
1578 use super::FlowFabricWorker;
1579 use ff_core::completion_backend::CompletionBackend;
1580 use ff_core::engine_backend::EngineBackend;
1581 use std::sync::Arc;
1582
1583 #[test]
1584 fn addressable_surface_under_sqlite_only() {
1585 // Type-level proof that the backend-agnostic accessors are
1586 // reachable without the `valkey-default` feature. None of
1587 // these are called; the assignment targets pin the signature.
1588 let _a: fn(
1589 &FlowFabricWorker,
1590 ) -> Option<&Arc<dyn EngineBackend>> = FlowFabricWorker::backend;
1591 let _b: fn(
1592 &FlowFabricWorker,
1593 ) -> Option<Arc<dyn CompletionBackend>> =
1594 FlowFabricWorker::completion_backend;
1595 let _c: fn(&FlowFabricWorker) -> &crate::config::WorkerConfig =
1596 FlowFabricWorker::config;
1597 let _d: fn(&FlowFabricWorker) -> &ff_core::partition::PartitionConfig =
1598 FlowFabricWorker::partition_config;
1599 }
1600
1601 /// v0.12 PR-1 compile anchor — the new
1602 /// [`EngineBackend::read_execution_context`] trait method MUST be
1603 /// addressable under `--no-default-features --features sqlite`. A
1604 /// direct fn-pointer cast is awkward under `#[async_trait]`
1605 /// (lifetime-generic fn items don't coerce to `fn` pointers), so
1606 /// we take the next-best compile proof: exercise a generic that
1607 /// names the method through the trait bound. A bodyless call would
1608 /// require a concrete backend; the generic keeps the test pure
1609 /// compile-time.
1610 #[test]
1611 fn read_execution_context_addressable_under_sqlite_only() {
1612 use ff_core::contracts::ExecutionContext;
1613 use ff_core::engine_error::EngineError;
1614 use ff_core::types::ExecutionId;
1615
1616 #[allow(dead_code)]
1617 async fn _pin<B: EngineBackend + ?Sized>(
1618 b: &B,
1619 id: &ExecutionId,
1620 ) -> Result<ExecutionContext, EngineError> {
1621 b.read_execution_context(id).await
1622 }
1623 }
1624
1625 /// v0.12 PR-2 compile anchor — `ClaimedTask` as a type MUST be
1626 /// addressable under `--no-default-features --features sqlite`
1627 /// (the `task` module is no longer `valkey-default`-gated at the
1628 /// module level). As of v0.12 PR-5.5 the `impl ClaimedTask { ... }`
1629 /// block is likewise ungated: the backend mints the `Handle` at
1630 /// claim time and `cloned_handle` just clones the cached field, so
1631 /// no Valkey-specific synthesis is required. This anchor pins the
1632 /// struct path + its field types through a generic-over-T wrapper
1633 /// so a compile-time lookup of `ClaimedTask` is exercised
1634 /// mechanically under the sqlite-only feature set.
1635 #[test]
1636 fn claimed_task_type_addressable_under_sqlite_only() {
1637 use crate::task::ClaimedTask;
1638
1639 #[allow(dead_code)]
1640 fn _pin<T>(_: std::marker::PhantomData<T>) -> std::marker::PhantomData<ClaimedTask> {
1641 std::marker::PhantomData
1642 }
1643 }
1644
1645 /// v0.12 PR-6 compile anchor — ungating `admin` + `snapshot`
1646 /// modules at module level must not re-introduce ferriskey
1647 /// symbols under `--no-default-features --features sqlite`.
1648 /// Pins that [`FlowFabricAdminClient::new`] and a representative
1649 /// snapshot forwarder are addressable without the
1650 /// `valkey-default` feature.
1651 #[test]
1652 fn admin_and_snapshot_addressable_under_sqlite_only() {
1653 use crate::admin::FlowFabricAdminClient;
1654 use crate::SdkError;
1655 use ff_core::contracts::ExecutionSnapshot;
1656 use ff_core::types::ExecutionId;
1657
1658 // `FlowFabricAdminClient::new` is `fn(impl Into<String>)` —
1659 // can't fn-pointer-coerce directly. A no-op call with a
1660 // concrete `&str` pins the method addressably under the
1661 // sqlite-only feature set (the error return-type is the
1662 // same `SdkError` the rest of the module returns).
1663 #[allow(dead_code)]
1664 fn _pin_admin_new() -> Result<FlowFabricAdminClient, SdkError> {
1665 FlowFabricAdminClient::new("http://anchor")
1666 }
1667
1668 // Snapshot method is `async`, so coerce via the same
1669 // trait-bound pattern as `read_execution_context` above.
1670 #[allow(dead_code)]
1671 async fn _pin_describe(
1672 w: &FlowFabricWorker,
1673 id: &ExecutionId,
1674 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
1675 w.describe_execution(id).await
1676 }
1677 }
1678
1679 /// v0.12 PR-3 compile anchor — the new
1680 /// [`EngineBackend::read_current_attempt_index`] trait method MUST
1681 /// be addressable under `--no-default-features --features sqlite`.
1682 /// Mirrors the PR-1 `read_execution_context` anchor: takes a
1683 /// generic over the trait bound so `#[async_trait]` lifetime
1684 /// elision doesn't block an `fn`-pointer coercion.
1685 #[test]
1686 fn read_current_attempt_index_addressable_under_sqlite_only() {
1687 use ff_core::engine_error::EngineError;
1688 use ff_core::types::{AttemptIndex, ExecutionId};
1689
1690 #[allow(dead_code)]
1691 async fn _pin<B: EngineBackend + ?Sized>(
1692 b: &B,
1693 id: &ExecutionId,
1694 ) -> Result<AttemptIndex, EngineError> {
1695 b.read_current_attempt_index(id).await
1696 }
1697 }
1698
1699 /// v0.12 PR-5.5 retry-path-fix compile anchor — the new
1700 /// [`EngineBackend::read_total_attempt_count`] trait method MUST
1701 /// be addressable under `--no-default-features --features sqlite`.
1702 /// Mirrors the PR-3 `read_current_attempt_index` anchor.
1703 #[test]
1704 fn read_total_attempt_count_addressable_under_sqlite_only() {
1705 use ff_core::engine_error::EngineError;
1706 use ff_core::types::{AttemptIndex, ExecutionId};
1707
1708 #[allow(dead_code)]
1709 async fn _pin<B: EngineBackend + ?Sized>(
1710 b: &B,
1711 id: &ExecutionId,
1712 ) -> Result<AttemptIndex, EngineError> {
1713 b.read_total_attempt_count(id).await
1714 }
1715 }
1716
1717 /// v0.12 PR-4 compile anchor — the new
1718 /// [`EngineBackend::claim_execution`] trait method MUST be
1719 /// addressable under `--no-default-features --features sqlite`.
1720 /// Mirrors the PR-3 `read_current_attempt_index` anchor: takes a
1721 /// generic over the trait bound so `#[async_trait]` lifetime
1722 /// elision doesn't block a plain fn-pointer coercion.
1723 ///
1724 /// The method has an `Err(Unavailable)` default impl; PG + SQLite
1725 /// backends don't override it today (grants are Valkey-only until
1726 /// the PG/SQLite grant-consumer RFC lands). The anchor pins the
1727 /// trait-surface signature — not a runtime call — so the compile
1728 /// check passes cleanly on every feature set.
1729 #[test]
1730 fn claim_execution_addressable_under_sqlite_only() {
1731 use ff_core::contracts::{ClaimExecutionArgs, ClaimExecutionResult};
1732 use ff_core::engine_error::EngineError;
1733
1734 #[allow(dead_code)]
1735 async fn _pin<B: EngineBackend + ?Sized>(
1736 b: &B,
1737 args: ClaimExecutionArgs,
1738 ) -> Result<ClaimExecutionResult, EngineError> {
1739 b.claim_execution(args).await
1740 }
1741 }
1742
1743 /// v0.12 PR-5 compile anchor — the three new scanner-primitive
1744 /// trait methods (`scan_eligible_executions`, `issue_claim_grant`,
1745 /// `block_route`) MUST be addressable under
1746 /// `--no-default-features --features sqlite`. Each has an
1747 /// `Err(Unavailable)` default impl; PG + SQLite backends don't
1748 /// override (the scheduler-routed `claim_for_worker` path is the
1749 /// supported PG/SQLite entry point). The anchor pins the trait-
1750 /// surface signatures — not runtime calls — so the compile check
1751 /// passes cleanly on every feature set.
1752 #[test]
1753 fn scan_eligible_executions_addressable_under_sqlite_only() {
1754 use ff_core::contracts::ScanEligibleArgs;
1755 use ff_core::engine_error::EngineError;
1756 use ff_core::types::ExecutionId;
1757
1758 #[allow(dead_code)]
1759 async fn _pin<B: EngineBackend + ?Sized>(
1760 b: &B,
1761 args: ScanEligibleArgs,
1762 ) -> Result<Vec<ExecutionId>, EngineError> {
1763 b.scan_eligible_executions(args).await
1764 }
1765 }
1766
1767 #[test]
1768 fn issue_claim_grant_addressable_under_sqlite_only() {
1769 use ff_core::contracts::{IssueClaimGrantArgs, IssueClaimGrantOutcome};
1770 use ff_core::engine_error::EngineError;
1771
1772 #[allow(dead_code)]
1773 async fn _pin<B: EngineBackend + ?Sized>(
1774 b: &B,
1775 args: IssueClaimGrantArgs,
1776 ) -> Result<IssueClaimGrantOutcome, EngineError> {
1777 b.issue_claim_grant(args).await
1778 }
1779 }
1780
1781 #[test]
1782 fn block_route_addressable_under_sqlite_only() {
1783 use ff_core::contracts::{BlockRouteArgs, BlockRouteOutcome};
1784 use ff_core::engine_error::EngineError;
1785
1786 #[allow(dead_code)]
1787 async fn _pin<B: EngineBackend + ?Sized>(
1788 b: &B,
1789 args: BlockRouteArgs,
1790 ) -> Result<BlockRouteOutcome, EngineError> {
1791 b.block_route(args).await
1792 }
1793 }
1794
1795 /// v0.12 PR-3 compile anchor — `FlowFabricWorker::deliver_signal`
1796 /// MUST be addressable under `--no-default-features --features sqlite`
1797 /// (ungated in PR-3 — the body is pure
1798 /// `self.backend.deliver_signal(...)` trait dispatch).
1799 #[test]
1800 fn deliver_signal_addressable_under_sqlite_only() {
1801 use crate::task::{Signal, SignalOutcome};
1802 use crate::SdkError;
1803 use ff_core::types::{ExecutionId, WaitpointId};
1804 use std::future::Future;
1805 use std::pin::Pin;
1806
1807 // `deliver_signal` is `async fn`, so its item signature bakes
1808 // in a hidden lifetime and opaque return future. Take an
1809 // `fn`-pointer to an explicit wrapper that names the return
1810 // type through the trait — same shape as the PR-1 anchor
1811 // (`read_execution_context_addressable_under_sqlite_only`).
1812 #[allow(dead_code)]
1813 fn _pin<'a>(
1814 w: &'a FlowFabricWorker,
1815 id: &'a ExecutionId,
1816 wp: &'a WaitpointId,
1817 s: Signal,
1818 ) -> Pin<Box<dyn Future<Output = Result<SignalOutcome, SdkError>> + Send + 'a>> {
1819 Box::pin(w.deliver_signal(id, wp, s))
1820 }
1821 }
1822
1823 /// v0.12 PR-5.5 compile anchor — `claim_from_grant` MUST be
1824 /// addressable under `--no-default-features --features sqlite`
1825 /// (ungated in PR-5.5). PG / SQLite return `EngineError::Unavailable`
1826 /// from the underlying trait method today, so the call path is
1827 /// compile-reachable but runtime-unavailable. The anchor pins the
1828 /// signature; a future PR wiring real PG/SQLite grant-consumer
1829 /// bodies flips the runtime behaviour without touching this test.
1830 #[test]
1831 fn claim_from_grant_addressable_under_sqlite_only() {
1832 use crate::task::ClaimedTask;
1833 use crate::SdkError;
1834 use ff_core::contracts::ClaimGrant;
1835 use ff_core::types::LaneId;
1836 use std::future::Future;
1837 use std::pin::Pin;
1838
1839 #[allow(dead_code)]
1840 fn _pin<'a>(
1841 w: &'a FlowFabricWorker,
1842 lane: LaneId,
1843 grant: ClaimGrant,
1844 ) -> Pin<Box<dyn Future<Output = Result<ClaimedTask, SdkError>> + Send + 'a>> {
1845 Box::pin(w.claim_from_grant(lane, grant))
1846 }
1847 }
1848
1849 /// v0.12 PR-5.5 compile anchor — `claim_via_server` MUST be
1850 /// addressable under `--no-default-features --features sqlite`.
1851 /// The scheduler-routed path is the supported PG/SQLite claim
1852 /// entry point per `project_claim_from_grant_pg_sqlite_gap.md`;
1853 /// pinning the signature here prevents a future PR from
1854 /// accidentally re-gating it behind `valkey-default`.
1855 #[test]
1856 fn claim_via_server_addressable_under_sqlite_only() {
1857 use crate::admin::FlowFabricAdminClient;
1858 use crate::task::ClaimedTask;
1859 use crate::SdkError;
1860 use ff_core::types::LaneId;
1861 use std::future::Future;
1862 use std::pin::Pin;
1863
1864 #[allow(dead_code)]
1865 fn _pin<'a>(
1866 w: &'a FlowFabricWorker,
1867 admin: &'a FlowFabricAdminClient,
1868 lane: &'a LaneId,
1869 grant_ttl_ms: u64,
1870 ) -> Pin<Box<dyn Future<Output = Result<Option<ClaimedTask>, SdkError>> + Send + 'a>>
1871 {
1872 Box::pin(w.claim_via_server(admin, lane, grant_ttl_ms))
1873 }
1874 }
1875
1876 /// v0.12 PR-5.5 compile anchor — `ClaimedTask::{complete, fail,
1877 /// cancel}` MUST be addressable under `--no-default-features
1878 /// --features sqlite`. The `impl ClaimedTask` block is now
1879 /// module-level ungated (PR-5.5); the terminal ops route through
1880 /// `EngineBackend::{complete, fail, cancel}` which are core trait
1881 /// methods (no `streaming` / `suspension` / `budget` gate).
1882 #[test]
1883 fn claimed_task_terminal_ops_addressable_under_sqlite_only() {
1884 use crate::task::{ClaimedTask, FailOutcome};
1885 use crate::SdkError;
1886 use std::future::Future;
1887 use std::pin::Pin;
1888
1889 #[allow(dead_code)]
1890 fn _pin_complete(
1891 t: ClaimedTask,
1892 payload: Option<Vec<u8>>,
1893 ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send>> {
1894 Box::pin(t.complete(payload))
1895 }
1896
1897 #[allow(dead_code)]
1898 fn _pin_fail<'a>(
1899 t: ClaimedTask,
1900 reason: &'a str,
1901 error_category: &'a str,
1902 ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, SdkError>> + Send + 'a>> {
1903 Box::pin(t.fail(reason, error_category))
1904 }
1905
1906 #[allow(dead_code)]
1907 fn _pin_cancel<'a>(
1908 t: ClaimedTask,
1909 reason: &'a str,
1910 ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send + 'a>> {
1911 Box::pin(t.cancel(reason))
1912 }
1913 }
1914}
1915
1916#[cfg(all(test, feature = "direct-valkey-claim"))]
1917mod scan_cursor_tests {
1918 use super::scan_cursor_seed;
1919
1920 #[test]
1921 fn stable_for_same_input() {
1922 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1923 }
1924
1925 #[test]
1926 fn distinct_for_different_ids() {
1927 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1928 }
1929
1930 #[test]
1931 fn bounded_by_partition_count() {
1932 for i in 0..100 {
1933 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1934 }
1935 }
1936
1937 #[test]
1938 fn zero_partitions_returns_zero() {
1939 assert_eq!(scan_cursor_seed("w1", 0), 0);
1940 }
1941}