ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6// RFC-023 Phase 1a (§4.4 item 10b): ferriskey imports scoped behind
7// `valkey-default` so the module compiles under
8// `--no-default-features, features = ["sqlite"]`.
9#[cfg(feature = "valkey-default")]
10use ferriskey::Client;
11use ff_core::partition::PartitionConfig;
12use ff_core::types::*;
13use tokio::sync::Semaphore;
14
15use crate::config::WorkerConfig;
16use crate::task::ClaimedTask;
17use crate::SdkError;
18
19/// FlowFabric worker — connects to Valkey, claims executions, and provides
20/// the worker-facing API.
21///
22/// # Admission control
23///
24/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
25/// **bypasses the scheduler's admission controls**: it reads the eligible
26/// ZSET directly and mints its own claim grant without consulting budget
27/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
28/// benchmarks, tests, and single-tenant development where the scheduler
29/// hop is measurement noise, not for production.
30///
31/// For production deployments, consume scheduler-issued grants via
32/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
33/// budget breach, quota sliding-window, concurrency cap, and
34/// capability-match checks before issuing grants.
35///
36/// # Usage
37///
38/// ```rust,ignore
39/// use ff_core::backend::BackendConfig;
40/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
41/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
42///
43/// let config = WorkerConfig {
44/// backend: Some(BackendConfig::valkey("localhost", 6379)),
45/// worker_id: WorkerId::new("w1"),
46/// worker_instance_id: WorkerInstanceId::new("w1-i1"),
47/// namespace: Namespace::new("default"),
48/// lanes: vec![LaneId::new("main")],
49/// capabilities: Vec::new(),
50/// lease_ttl_ms: 30_000,
51/// claim_poll_interval_ms: 1_000,
52/// max_concurrent_tasks: 1,
53/// partition_config: None,
54/// };
55/// let worker = FlowFabricWorker::connect(config).await?;
56///
57/// loop {
58/// if let Some(task) = worker.claim_next().await? {
59/// // Process task...
60/// task.complete(Some(b"result".to_vec())).await?;
61/// } else {
62/// tokio::time::sleep(Duration::from_secs(1)).await;
63/// }
64/// }
65/// ```
66pub struct FlowFabricWorker {
67 /// RFC-023 Phase 1a (§4.4 item 10c): `ferriskey::Client` is
68 /// `valkey-default`-gated **and** `Option` wrapped. Under
69 /// sqlite-only features the field is absent. Under
70 /// `valkey-default` the field is `Some(client)` when the worker
71 /// was built via [`Self::connect`] (the Valkey-bundled entry
72 /// point that dials), and `None` when it was built via
73 /// [`Self::connect_with`] (the backend-agnostic entry point per
74 /// §4.4 item 10e — no ferriskey round-trip). Claim/signal hot
75 /// paths (all `valkey-default`-gated per §4.4 item 10f) expect
76 /// `Some`; a claim call against a `connect_with`-built worker
77 /// panics with a clear message until the backend-agnostic SDK
78 /// worker-loop RFC lands (tracked in §8).
79 #[cfg(feature = "valkey-default")]
80 #[allow(dead_code)]
81 client: Option<Client>,
82 config: WorkerConfig,
83 partition_config: PartitionConfig,
84 /// 8-hex FNV-1a digest of the sorted capabilities CSV. Used in
85 /// per-mismatch logs so the 4KB CSV never echoes on every reject
86 /// during an incident. For [`Self::connect`]-built workers, the
87 /// full CSV is additionally logged once at connect-time WARN via
88 /// `valkey_preamble::run` for cross-reference; [`Self::connect_with`]-
89 /// built workers compute the hash without the companion CSV log.
90 /// Mirrors `ff-scheduler::claim::worker_caps_digest`.
91 #[cfg(feature = "direct-valkey-claim")]
92 worker_capabilities_hash: String,
93 #[cfg(feature = "direct-valkey-claim")]
94 lane_index: AtomicUsize,
95 /// Concurrency cap for in-flight tasks. Permits are acquired or
96 /// transferred by [`claim_next`] (feature-gated),
97 /// [`claim_from_grant`] (always available), and
98 /// [`claim_from_reclaim_grant`], transferred to the returned
99 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
100 /// Holds `max_concurrent_tasks` permits total.
101 ///
102 /// [`claim_next`]: FlowFabricWorker::claim_next
103 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
104 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
105 #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
106 concurrency_semaphore: Arc<Semaphore>,
107 /// Rolling offset for chunked partition scans. Each poll advances the
108 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
109 /// chunk)` polls every partition is covered. The initial value is
110 /// derived from `worker_instance_id` so idle workers spread their
111 /// scans across different partitions from the first poll onward.
112 ///
113 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
114 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
115 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
116 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
117 /// correctness because the sequence simply restarts a new cycle.
118 #[cfg(feature = "direct-valkey-claim")]
119 scan_cursor: AtomicUsize,
120 /// The [`EngineBackend`] the Stage-1b trait forwarders route
121 /// through.
122 ///
123 /// **RFC-012 Stage 1b.** Always populated:
124 /// [`FlowFabricWorker::connect`] now wraps the worker's own
125 /// `ferriskey::Client` in a `ValkeyBackend` via
126 /// `ValkeyBackend::from_client_and_partitions`, and
127 /// [`FlowFabricWorker::connect_with`] replaces that default with
128 /// the caller-supplied `Arc<dyn EngineBackend>`. The
129 /// [`FlowFabricWorker::backend`] accessor still returns
130 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
131 /// narrows the return type once consumers have migrated.
132 ///
133 /// Hot paths (claim, deliver_signal, admin queries) still use the
134 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
135 /// migrates them through this field, and Stage 1d removes the
136 /// embedded client.
137 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
138 /// Optional handle to the same underlying backend viewed as a
139 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
140 /// Populated by [`Self::connect`] from the bundled
141 /// `ValkeyBackend` (which implements the trait); supplied by the
142 /// caller on [`Self::connect_with`] as an explicit
143 /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
144 /// backend does not support push-based completion" (e.g. a future
145 /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
146 /// and other completion-subscription consumers reach this through
147 /// [`Self::completion_backend`].
148 completion_backend_handle:
149 Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
150}
151
152/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
153/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
154/// O(num_flow_partitions).
155#[cfg(feature = "direct-valkey-claim")]
156const PARTITION_SCAN_CHUNK: usize = 32;
157
158impl FlowFabricWorker {
159 /// RFC-023 Phase 1a helper: borrow the embedded `ferriskey::Client`,
160 /// panicking with a clear message if the worker was built via
161 /// [`Self::connect_with`] (which does not dial Valkey). Every
162 /// Valkey-specific claim/signal method funnels through this accessor
163 /// so the panic site is uniform and traceable.
164 ///
165 /// A backend-agnostic claim/signal API is deferred per §8 breaking-
166 /// change disclosure; until that lands, valkey-default consumers
167 /// must use [`Self::connect`] if they intend to drive the Valkey
168 /// claim/signal loop.
169 #[cfg(feature = "valkey-default")]
170 #[inline]
171 #[allow(dead_code)]
172 fn valkey_client(&self) -> &Client {
173 self.client.as_ref().expect(
174 "FlowFabricWorker was built via connect_with (no Valkey dial) \
175 but a Valkey-specific claim/signal method was invoked. \
176 Use FlowFabricWorker::connect to dial Valkey, or drive the \
177 backend directly through the trait surface via .backend().",
178 )
179 }
180}
181
182impl FlowFabricWorker {
183 /// Connect to Valkey and prepare the worker.
184 ///
185 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
186 /// library — that is the server's responsibility (ff-server calls
187 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
188 /// the library is already loaded.
189 ///
190 /// # Smoke / dev scripts: rotate `WorkerInstanceId`
191 ///
192 /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
193 /// `WorkerInstanceId`. When a smoke / dev script reuses the same
194 /// `WorkerInstanceId` across restarts, subsequent runs trap behind
195 /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
196 /// configured lease TTL) expires — the worker appears stuck and
197 /// claims nothing. Iterative scripts should synthesise a fresh
198 /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
199 /// or embed a UUID/timestamp) rather than hard-coding a stable
200 /// value. Production workers that cleanly shut down release the
201 /// key; only crashed / kill -9'd processes hit this trap.
202 ///
203 /// **RFC-023 Phase 1a (§4.4 item 10d, v0.12.0):** this Valkey-bundled
204 /// entry point is `#[cfg(feature = "valkey-default")]`-gated.
205 /// Consumers on the sqlite-only feature set must use
206 /// [`FlowFabricWorker::connect_with`] and drive the backend directly
207 /// through the trait surface.
208 #[cfg(feature = "valkey-default")]
209 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
210 if config.lanes.is_empty() {
211 return Err(SdkError::Config {
212 context: "worker_config".into(),
213 field: None,
214 message: "at least one lane is required".into(),
215 });
216 }
217
218 // Build the ferriskey client from the nested `BackendConfig`.
219 // Delegates to `ff_backend_valkey::build_client` so host/port +
220 // TLS + cluster + `BackendTimeouts::request` + `BackendRetry`
221 // wiring lives in exactly one place (RFC-012 Stage 1c tranche 1).
222 //
223 // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
224 // Finding 2): `WorkerConfig.backend` is `Option<BackendConfig>`
225 // because `connect_with` ignores it. The URL-based `connect`
226 // path still requires a concrete `BackendConfig` to dial;
227 // reject `None` up front with a typed Config error instead of
228 // silently using a default that would hide a caller mistake.
229 let backend_config = config.backend.as_ref().ok_or_else(|| SdkError::Config {
230 context: "worker_config".into(),
231 field: Some("backend".into()),
232 message: "FlowFabricWorker::connect requires WorkerConfig.backend = \
233 Some(BackendConfig::...); use FlowFabricWorker::connect_with for \
234 the backend-agnostic path that takes a pre-built \
235 Arc<dyn EngineBackend>".into(),
236 })?;
237 let client = ff_backend_valkey::build_client(backend_config).await?;
238
239 // v0.12 PR-6: the Valkey-specific preamble (PING, alive-key
240 // SET-NX, `ff:config:partitions` HGETALL, caps ingress +
241 // sorted-dedup CSV, caps STRING + workers-index writes) lives
242 // in `crate::valkey_preamble`. Extracted byte-for-byte from
243 // the pre-PR-6 inline body; the write order is observable
244 // from scheduler-side reads (unblock scanner:
245 // SMEMBERS ff:idx:workers → GET ff:worker:{id}:caps) so
246 // preservation is load-bearing. See `valkey_preamble::run`.
247 let crate::valkey_preamble::PreambleOutput {
248 partition_config,
249 #[cfg(feature = "direct-valkey-claim")]
250 capabilities_csv: _worker_capabilities_csv,
251 #[cfg(feature = "direct-valkey-claim")]
252 capabilities_hash: worker_capabilities_hash,
253 } = crate::valkey_preamble::run(&client, &config).await?;
254
255 let max_tasks = config.max_concurrent_tasks.max(1);
256 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
257
258 tracing::info!(
259 worker_id = %config.worker_id,
260 instance_id = %config.worker_instance_id,
261 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
262 "FlowFabricWorker connected"
263 );
264
265 #[cfg(feature = "direct-valkey-claim")]
266 let scan_cursor_init = scan_cursor_seed(
267 config.worker_instance_id.as_str(),
268 partition_config.num_flow_partitions.max(1) as usize,
269 );
270
271 // RFC-012 Stage 1b: wrap the dialed client in a ValkeyBackend
272 // so `ClaimedTask`'s trait forwarders have something to call.
273 // `from_client_and_partitions` reuses the already-dialed client
274 // — no second connection. Share the concrete
275 // `Arc<ValkeyBackend>` across the two trait objects — one
276 // allocation, both accessors yield identity-equivalent handles.
277 let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
278 ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
279 client.clone(),
280 partition_config,
281 );
282 let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
283 let completion_backend_handle: Option<
284 Arc<dyn ff_core::completion_backend::CompletionBackend>,
285 > = Some(valkey_backend);
286
287 Ok(Self {
288 client: Some(client),
289 config,
290 partition_config,
291 #[cfg(feature = "direct-valkey-claim")]
292 worker_capabilities_hash,
293 #[cfg(feature = "direct-valkey-claim")]
294 lane_index: AtomicUsize::new(0),
295 concurrency_semaphore,
296 #[cfg(feature = "direct-valkey-claim")]
297 scan_cursor: AtomicUsize::new(scan_cursor_init),
298 backend,
299 completion_backend_handle,
300 })
301 }
302
303 /// Store pre-built [`EngineBackend`] and (optional)
304 /// [`CompletionBackend`] handles on the worker. Builds the worker
305 /// via the legacy [`FlowFabricWorker::connect`] path first (so the
306 /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
307 /// paths still use is dialed), then replaces the default
308 /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
309 ///
310 /// The `completion` argument is explicit: 0.3.3 previously accepted
311 /// only `backend` and `completion_backend()` silently returned
312 /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
313 /// upcast to `Arc<dyn CompletionBackend>` without loss of
314 /// trait-object identity. 0.3.4 lets the caller decide.
315 ///
316 /// - `Some(arc)` — caller supplies a completion backend.
317 /// [`Self::completion_backend`] returns `Some(clone)`.
318 /// - `None` — this backend does not support push-based completion
319 /// (future Postgres backend without LISTEN/NOTIFY, test mocks).
320 /// [`Self::completion_backend`] returns `None`.
321 ///
322 /// When the underlying backend implements both traits (as
323 /// `ValkeyBackend` does), pass the same `Arc` twice — the two
324 /// trait-object views share one allocation:
325 ///
326 /// ```rust,ignore
327 /// use std::sync::Arc;
328 /// use ff_backend_valkey::ValkeyBackend;
329 /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
330 ///
331 /// # async fn doc(worker_config: WorkerConfig,
332 /// # backend_config: ff_backend_valkey::BackendConfig)
333 /// # -> Result<(), ff_sdk::SdkError> {
334 /// // Valkey (completion supported):
335 /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
336 /// let worker = FlowFabricWorker::connect_with(
337 /// worker_config,
338 /// valkey.clone(),
339 /// Some(valkey),
340 /// ).await?;
341 /// # Ok(()) }
342 /// ```
343 ///
344 /// Backend without completion support:
345 ///
346 /// ```rust,ignore
347 /// let worker = FlowFabricWorker::connect_with(
348 /// worker_config,
349 /// backend,
350 /// None,
351 /// ).await?;
352 /// ```
353 ///
354 /// **Stage 1b + Round-7 scope — what the injected backend covers
355 /// today.** The injected backend currently covers these per-task
356 /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
357 /// `delay_execution` / `move_to_waiting_children` / `complete` /
358 /// `cancel` / `fail` / `create_pending_waitpoint` /
359 /// `append_frame` / `report_usage`. A mock backend therefore sees
360 /// that portion of the worker's per-task write surface. Lease
361 /// renewal also routes through `backend.renew(&handle)`. Round-7
362 /// (#135/#145) closed the four trait-shape gaps tracked by #117,
363 /// but `suspend` still reaches the embedded `ferriskey::Client`
364 /// directly via `ff_suspend_execution` — this is the deferred
365 /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
366 /// work. `claim_next` / `claim_from_grant` /
367 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
368 /// are Stage 1c hot-path work. Stage 1d removes the embedded
369 /// client entirely.
370 ///
371 /// Today's constructor is therefore NOT yet a drop-in way to swap
372 /// in a non-Valkey backend — it requires a reachable Valkey node
373 /// for `suspend` plus the remaining hot-path ops. Tests that
374 /// exercise only the migrated per-task ops can run fully against
375 /// a mock backend.
376 ///
377 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
378 /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
379 pub async fn connect_with(
380 config: WorkerConfig,
381 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
382 completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
383 ) -> Result<Self, SdkError> {
384 // RFC-023 Phase 1a (§4.4 item 10e, v0.12.0): direct
385 // backend-agnostic construction. No `Self::connect` preamble,
386 // no ferriskey round-trips (no PING, no alive-key SET-NX, no
387 // `ff:config:partitions` HGETALL). Callers needing a
388 // non-default `PartitionConfig` under non-Valkey backends set
389 // [`WorkerConfig::partition_config`] (v0.12 PR-6 closed the
390 // original follow-up flagged here).
391 //
392 // Lane-empty validation (the one check `connect` did before
393 // any Valkey work) is hoisted here so every entry point
394 // refuses an empty lane list.
395 if config.lanes.is_empty() {
396 return Err(SdkError::Config {
397 context: "worker_config".into(),
398 field: None,
399 message: "at least one lane is required".into(),
400 });
401 }
402
403 // v0.13 ergonomics fix (feedback_sdk_reclaim_ergonomics
404 // Finding 2): `WorkerConfig.backend` is ignored on this path —
405 // the caller-supplied `Arc<dyn EngineBackend>` is authoritative.
406 // If the caller left behind a `Some(..)` from an earlier
407 // `connect(..)` template, warn so the ambiguity is visible.
408 if config.backend.is_some() {
409 tracing::warn!(
410 worker_id = %config.worker_id,
411 instance_id = %config.worker_instance_id,
412 "WorkerConfig.backend is Some(..) but FlowFabricWorker::connect_with \
413 was invoked — BackendConfig is ignored, the injected \
414 Arc<dyn EngineBackend> is authoritative. Set WorkerConfig.backend = \
415 None on the connect_with path to silence this warning."
416 );
417 }
418
419 let max_tasks = config.max_concurrent_tasks.max(1);
420 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
421 // v0.12 PR-6: honor the optional `WorkerConfig::partition_config`
422 // override. `None` keeps the pre-PR-6 default shape (256 / 32 /
423 // 32); `Some(cfg)` lets non-Valkey deployments with a custom
424 // `num_flow_partitions` bind correctly instead of silently
425 // missing data via the wrong partition index.
426 let partition_config = config.partition_config.unwrap_or_default();
427
428 #[cfg(feature = "direct-valkey-claim")]
429 let scan_cursor_init = scan_cursor_seed(
430 config.worker_instance_id.as_str(),
431 partition_config.num_flow_partitions.max(1) as usize,
432 );
433
434 // Build the capabilities CSV / hash inline for the
435 // direct-valkey-claim path. The validation mirrors `connect`'s
436 // ingress so the two entry points refuse the same malformed
437 // tokens.
438 #[cfg(feature = "direct-valkey-claim")]
439 for cap in &config.capabilities {
440 if cap.is_empty() {
441 return Err(SdkError::Config {
442 context: "worker_config".into(),
443 field: Some("capabilities".into()),
444 message: "capability token must not be empty".into(),
445 });
446 }
447 if cap.contains(',') {
448 return Err(SdkError::Config {
449 context: "worker_config".into(),
450 field: Some("capabilities".into()),
451 message: format!(
452 "capability token may not contain ',' (CSV delimiter): {cap:?}"
453 ),
454 });
455 }
456 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
457 return Err(SdkError::Config {
458 context: "worker_config".into(),
459 field: Some("capabilities".into()),
460 message: format!(
461 "capability token must not contain whitespace or control \
462 characters: {cap:?}"
463 ),
464 });
465 }
466 }
467 #[cfg(feature = "direct-valkey-claim")]
468 let worker_capabilities_hash = {
469 let set: std::collections::BTreeSet<&str> = config
470 .capabilities
471 .iter()
472 .map(|s| s.as_str())
473 .filter(|s| !s.is_empty())
474 .collect();
475 let csv: String = set.into_iter().collect::<Vec<_>>().join(",");
476 ff_core::hash::fnv1a_xor8hex(&csv)
477 };
478
479 Ok(Self {
480 #[cfg(feature = "valkey-default")]
481 client: None,
482 config,
483 partition_config,
484 #[cfg(feature = "direct-valkey-claim")]
485 worker_capabilities_hash,
486 #[cfg(feature = "direct-valkey-claim")]
487 lane_index: AtomicUsize::new(0),
488 concurrency_semaphore,
489 #[cfg(feature = "direct-valkey-claim")]
490 scan_cursor: AtomicUsize::new(scan_cursor_init),
491 backend,
492 completion_backend_handle: completion,
493 })
494 }
495
496 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
497 /// ops through.
498 ///
499 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
500 /// the `Option` wrapper is retained for API stability with the
501 /// Stage-1a shape. Stage 1c narrows the return type to
502 /// `&Arc<dyn EngineBackend>`.
503 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
504 Some(&self.backend)
505 }
506
507 /// Crate-internal direct borrow of the backend. The public
508 /// [`Self::backend`] still returns `Option` for API stability
509 /// (Stage 1b holdover). Snapshot trait-forwarders in
510 /// [`crate::snapshot`] need an un-wrapped reference.
511 #[cfg_attr(not(feature = "valkey-default"), allow(dead_code))]
512 pub(crate) fn backend_ref(
513 &self,
514 ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
515 &self.backend
516 }
517
518 /// Handle to the completion-event subscription backend, for
519 /// consumers that need to observe execution completions (DAG
520 /// reconcilers, tenant-isolated subscribers).
521 ///
522 /// Returns `Some` when the worker was built through
523 /// [`Self::connect`] on the default `valkey-default` feature
524 /// (the bundled `ValkeyBackend` implements
525 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
526 /// or via [`Self::connect_with`] with a `Some(..)` completion
527 /// handle. Returns `None` when the caller passed `None` to
528 /// [`Self::connect_with`] — i.e. the backend does not support
529 /// push-based completion streams (future Postgres without
530 /// LISTEN/NOTIFY, test mocks).
531 ///
532 /// The returned handle shares the same underlying allocation as
533 /// [`Self::backend`]; calls through it (e.g.
534 /// `subscribe_completions_filtered`) hit the same connection
535 /// the worker itself uses.
536 pub fn completion_backend(
537 &self,
538 ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
539 self.completion_backend_handle.clone()
540 }
541
542 /// Get the worker config.
543 pub fn config(&self) -> &WorkerConfig {
544 &self.config
545 }
546
547 /// Get the server-published partition config this worker bound to at
548 /// `connect()`. Exposed so consumers that mint custom
549 /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
550 /// produced outside this worker) stay aligned with the server's
551 /// `num_flow_partitions` — using `PartitionConfig::default()`
552 /// assumes 256 partitions and silently misses data on deployments
553 /// with any other value.
554 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
555 &self.partition_config
556 }
557
558 /// Attempt to claim the next eligible execution.
559 ///
560 /// Phase 1 simplified claim flow:
561 /// 1. Pick a lane (round-robin across configured lanes)
562 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
563 /// 3. Claim the execution via `ff_claim_execution`
564 /// 4. Read execution payload + tags
565 /// 5. Return a [`ClaimedTask`] with auto lease renewal
566 ///
567 /// Gated behind the `direct-valkey-claim` feature — bypasses the
568 /// scheduler's budget / quota / capability admission checks. Enable
569 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
570 /// the scheduler hop would be measurement noise (benches) or when
571 /// the test harness needs a deterministic worker-local path. Prefer
572 /// the scheduler-routed HTTP claim path in production.
573 ///
574 /// # `None` semantics
575 ///
576 /// `Ok(None)` means **no work was found in the partition window this
577 /// poll covered**, not "the cluster is idle". Each call scans a chunk
578 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
579 /// `scan_cursor`; the cursor advances by that chunk size on every
580 /// invocation, so a worker covers every partition exactly once every
581 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
582 ///
583 /// Callers should treat `None` as "poll again soon" (typically after
584 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
585 /// time". Backing off too aggressively on `None` can starve workers
586 /// when work lives on partitions outside the current window.
587 ///
588 /// Returns `Err` on Valkey errors or script failures.
589 #[cfg(feature = "direct-valkey-claim")]
590 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
591 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
592 // try_acquire returns immediately — if no permits available, the worker
593 // is at capacity and should not claim more work.
594 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
595 Ok(p) => p,
596 Err(_) => return Ok(None), // At capacity — no claim attempted
597 };
598
599 let lane_id = self.next_lane();
600 let now = TimestampMs::now();
601
602 // Phase 1: We scan eligible executions directly by reading the eligible
603 // ZSET across execution partitions. In production the scheduler
604 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
605 // simplified inline claim.
606 //
607 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
608 // partitions starting at a rolling offset. This keeps idle Valkey
609 // load at O(chunk) per worker-second instead of O(num_partitions),
610 // and the worker-instance-seeded initial cursor spreads concurrent
611 // workers across different partition windows.
612 let num_partitions = self.partition_config.num_flow_partitions as usize;
613 if num_partitions == 0 {
614 return Ok(None);
615 }
616 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
617 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
618
619 // Hoist the sorted/deduped capability set out of the loop — the
620 // per-partition iteration reused a fresh BTreeSet on every
621 // step pre-fix, adding O(n log n) alloc/sort to the scanner
622 // hot path on every tick. Computed once per `claim_next` call.
623 let worker_capabilities: std::collections::BTreeSet<String> = self
624 .config
625 .capabilities
626 .iter()
627 .cloned()
628 .collect();
629
630 for step in 0..chunk {
631 let partition_idx = ((start + step) % num_partitions) as u16;
632 let partition = ff_core::partition::Partition {
633 family: ff_core::partition::PartitionFamily::Execution,
634 index: partition_idx,
635 };
636
637 // v0.12 PR-5: trait-routed scanner primitive. Replaces the
638 // pre-PR-5 `ZRANGEBYSCORE` inline; the Valkey backend fires
639 // the identical command byte-for-byte (see
640 // `ff_backend_valkey::scan_eligible_executions_impl`).
641 let scan_args = ff_core::contracts::ScanEligibleArgs::new(
642 lane_id.clone(),
643 partition,
644 1,
645 );
646 let candidates = self.backend.scan_eligible_executions(scan_args).await?;
647 let execution_id = match candidates.into_iter().next() {
648 Some(id) => id,
649 None => continue, // No eligible executions on this partition
650 };
651
652 // Step 1: Issue claim grant (v0.12 PR-5: trait-routed).
653 let grant_args = ff_core::contracts::IssueClaimGrantArgs::new(
654 execution_id.clone(),
655 lane_id.clone(),
656 self.config.worker_id.clone(),
657 self.config.worker_instance_id.clone(),
658 partition,
659 worker_capabilities.clone(),
660 5_000, // grant_ttl_ms
661 now,
662 );
663 let grant_result = self.backend.issue_claim_grant(grant_args).await;
664
665 match grant_result {
666 Ok(_) => {}
667 Err(ref boxed)
668 if matches!(
669 boxed,
670 crate::EngineError::Validation {
671 kind: crate::ValidationKind::CapabilityMismatch,
672 ..
673 }
674 ) =>
675 {
676 let missing = match boxed {
677 crate::EngineError::Validation { detail, .. } => detail.clone(),
678 _ => unreachable!(),
679 };
680 // Block-on-mismatch (RFC-009 §7.5) — parity with
681 // ff-scheduler's Scheduler::claim_for_worker. Without
682 // this, the inline-direct-claim path would hot-loop
683 // on an unclaimable top-of-zset (every tick picks the
684 // same execution, wastes an FCALL, logs, releases,
685 // repeats). The scheduler-side unblock scanner
686 // promotes blocked_route executions back to eligible
687 // when a worker with matching caps registers.
688 tracing::info!(
689 execution_id = %execution_id,
690 worker_id = %self.config.worker_id,
691 worker_caps_hash = %self.worker_capabilities_hash,
692 missing = %missing,
693 "capability mismatch, blocking execution off eligible (SDK inline claim)"
694 );
695 // v0.12 PR-5: trait-routed block_route. Swallow
696 // typed outcomes + transport faults (best-effort
697 // semantic preserved from pre-PR-5 behaviour —
698 // see `BlockRouteOutcome::LuaRejected` rustdoc).
699 let block_args = ff_core::contracts::BlockRouteArgs::new(
700 execution_id.clone(),
701 lane_id.clone(),
702 partition,
703 "waiting_for_capable_worker".to_owned(),
704 "no connected worker satisfies required_capabilities".to_owned(),
705 now,
706 );
707 match self.backend.block_route(block_args).await {
708 Ok(ff_core::contracts::BlockRouteOutcome::Blocked { .. }) => {}
709 Ok(ff_core::contracts::BlockRouteOutcome::LuaRejected { message }) => {
710 tracing::warn!(
711 execution_id = %execution_id,
712 error = %message,
713 "SDK block_route: Lua rejected; eligible ZSET unchanged, next \
714 poll will re-evaluate"
715 );
716 }
717 Ok(_) => {}
718 Err(e) => {
719 tracing::warn!(
720 execution_id = %execution_id,
721 error = %e,
722 "SDK block_route: transport failure; eligible ZSET unchanged"
723 );
724 }
725 }
726 continue;
727 }
728 Err(ref e) if is_retryable_claim_error(e) => {
729 tracing::debug!(
730 execution_id = %execution_id,
731 error = %e,
732 "claim grant failed (retryable), trying next partition"
733 );
734 continue;
735 }
736 Err(e) => return Err(SdkError::from(e)),
737 }
738
739 // Step 2: Claim the execution
740 match self
741 .claim_execution(&execution_id, &lane_id, &partition, now)
742 .await
743 {
744 Ok(mut task) => {
745 // Transfer concurrency permit to the task. When the task is
746 // completed/failed/cancelled/dropped the permit returns to
747 // the semaphore, allowing another claim.
748 task.set_concurrency_permit(permit);
749 return Ok(Some(task));
750 }
751 Err(SdkError::Engine(ref boxed))
752 if matches!(
753 **boxed,
754 crate::EngineError::Contention(
755 crate::ContentionKind::UseClaimResumedExecution
756 )
757 ) =>
758 {
759 // Execution was resumed from suspension — attempt_interrupted.
760 // ff_claim_execution rejects this; use ff_claim_resumed_execution
761 // which reuses the existing attempt instead of creating a new one.
762 tracing::debug!(
763 execution_id = %execution_id,
764 "execution is resumed, using claim_resumed path"
765 );
766 match self
767 .claim_resumed_execution(&execution_id, &lane_id, &partition)
768 .await
769 {
770 Ok(mut task) => {
771 task.set_concurrency_permit(permit);
772 return Ok(Some(task));
773 }
774 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
775 tracing::debug!(
776 execution_id = %execution_id,
777 error = %e2,
778 "claim_resumed failed (retryable), trying next partition"
779 );
780 continue;
781 }
782 Err(e2) => return Err(e2),
783 }
784 }
785 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
786 tracing::debug!(
787 execution_id = %execution_id,
788 error = %e,
789 "claim execution failed (retryable), trying next partition"
790 );
791 continue;
792 }
793 Err(e) => return Err(e),
794 }
795 }
796
797 // No eligible work found on any partition
798 Ok(None)
799 }
800
801 /// Low-level claim of a granted execution. Routes through
802 /// [`EngineBackend::claim_execution`](ff_core::engine_backend::EngineBackend::claim_execution)
803 /// — the trait-level grant-consumer method landed in v0.12 PR-4 —
804 /// and returns a `ClaimedTask` with auto lease renewal.
805 ///
806 /// Pre-PR-4 the SDK fired the `ff_claim_execution` FCALL directly
807 /// against `valkey_client()` and hand-parsed the Lua response shape.
808 /// The body now collapses to `backend.claim_execution(args)` +
809 /// `backend.read_execution_context(...)` + `ClaimedTask::new(...)`;
810 /// the Valkey-specific FCALL plumbing lives behind the trait in
811 /// `ff_backend_valkey::claim_execution_impl`.
812 ///
813 /// As of v0.12 PR-5.5 this helper + `claim_from_grant` are no
814 /// longer `valkey-default`-gated: the backend mints the `Handle`
815 /// at claim time and `ClaimedTask::new` caches it, so no
816 /// Valkey-specific handle synthesis is required on this path. The
817 /// `EngineBackend::claim_execution` default impl returns
818 /// `Err(Unavailable)` on PG/SQLite today (grant-consumer surface
819 /// is Valkey-only until the PG/SQLite grant-consumer RFC lands);
820 /// the compile surface is fully agnostic.
821 ///
822 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
823 async fn claim_execution(
824 &self,
825 execution_id: &ExecutionId,
826 lane_id: &LaneId,
827 partition: &ff_core::partition::Partition,
828 now: TimestampMs,
829 ) -> Result<ClaimedTask, SdkError> {
830 // v0.12 PR-5.5 retry-path fix — pre-read the **total attempt
831 // counter**, not the current-attempt pointer. The fresh-claim
832 // path mints a new attempt row whose index is the counter's
833 // current value (the backend's Lua 5920 / PG `ff_claim_execution`
834 // / SQLite `claim_impl` all consult this same counter to compute
835 // `next_att_idx`). Pre-PR-5.5 this call went inline as `HGET
836 // {exec}:core total_attempt_count` on the Valkey client; the
837 // first PR-5.5 landing mistakenly routed it to
838 // `read_current_attempt_index`, which returns the *pointer* at
839 // the previously-leased attempt — on a retry-of-a-retry that
840 // still named the terminal-failed prior attempt and the new
841 // claim collided with it. `read_total_attempt_count` wraps
842 // the same byte-for-byte HGET on Valkey and provides the JSONB
843 // / json_extract equivalents on PG / SQLite. See
844 // `EngineBackend::read_total_attempt_count` rustdoc for the
845 // pointer-vs-counter distinction.
846 let att_idx = self
847 .backend
848 .read_total_attempt_count(execution_id)
849 .await
850 .map_err(|e| SdkError::Engine(Box::new(e)))?;
851
852 let args = ff_core::contracts::ClaimExecutionArgs::new(
853 execution_id.clone(),
854 self.config.worker_id.clone(),
855 self.config.worker_instance_id.clone(),
856 lane_id.clone(),
857 LeaseId::new(),
858 self.config.lease_ttl_ms,
859 AttemptId::new(),
860 att_idx,
861 "{}".to_owned(),
862 None,
863 None,
864 now,
865 );
866
867 // `ClaimExecutionResult` is `#[non_exhaustive]` (v0.12 PR-4)
868 // so let-binding requires an explicit wildcard arm even though
869 // `Claimed` is the only variant today. A future additive variant
870 // surfaces here as a typed `ScriptError::Parse` — loud, routed
871 // through the existing SDK error path, and never silently
872 // dropped.
873 let claimed = match self.backend.claim_execution(args).await? {
874 ff_core::contracts::ClaimExecutionResult::Claimed(c) => c,
875 other => {
876 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
877 fcall: "ff_claim_execution".into(),
878 execution_id: Some(execution_id.to_string()),
879 message: format!(
880 "unexpected ClaimExecutionResult variant: {other:?}"
881 ),
882 }));
883 }
884 };
885
886 // Read execution payload and metadata
887 let (input_payload, execution_kind, tags) = self
888 .read_execution_context(execution_id, partition)
889 .await?;
890
891 Ok(ClaimedTask::new(
892 self.backend.clone(),
893 self.partition_config,
894 claimed.handle,
895 execution_id.clone(),
896 claimed.attempt_index,
897 claimed.attempt_id,
898 claimed.lease_id,
899 claimed.lease_epoch,
900 self.config.lease_ttl_ms,
901 lane_id.clone(),
902 self.config.worker_instance_id.clone(),
903 input_payload,
904 execution_kind,
905 tags,
906 ))
907 }
908
909 /// Consume a [`ClaimGrant`] and claim the granted execution on
910 /// this worker. The intended production entry point: pair with
911 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
912 /// scheduler-issued grants into the SDK without enabling the
913 /// `direct-valkey-claim` feature (which bypasses budget/quota
914 /// admission control).
915 ///
916 /// The worker's concurrency semaphore is checked BEFORE the FCALL
917 /// so a saturated worker does not consume the grant: the grant
918 /// stays valid for its remaining TTL and the caller can either
919 /// release it back to the scheduler or retry after some other
920 /// in-flight task completes.
921 ///
922 /// On success the returned [`ClaimedTask`] holds a concurrency
923 /// permit that releases automatically on
924 /// `complete`/`fail`/`cancel`/drop — same contract as
925 /// `claim_next`.
926 ///
927 /// # Arguments
928 ///
929 /// * `lane` — the lane the grant was issued for. Must match what
930 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
931 /// uses it to look up `lane_eligible`, `lane_active`, and the
932 /// `worker_leases` index slot.
933 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
934 ///
935 /// # Errors
936 ///
937 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
938 /// permits all held. Retryable; the grant is untouched.
939 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
940 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
941 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
942 /// (wrapped in [`SdkError::Engine`]).
943 /// * `ScriptError::CapabilityMismatch` — execution's required
944 /// capabilities not a subset of this worker's caps (wrapped in
945 /// [`SdkError::Engine`]). Surfaced post-grant if a race
946 /// between grant issuance and caps change allows it.
947 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
948 /// unexpected shape (wrapped in [`SdkError::Engine`]).
949 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
950 /// transport error during the FCALL or the
951 /// `read_execution_context` follow-up.
952 ///
953 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
954 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
955 ///
956 /// # Backend coverage (v0.12 PR-5.5)
957 ///
958 /// Method ungated across backends. The Valkey backend handles the
959 /// grant fully. Postgres + SQLite backends return
960 /// [`EngineError::Unavailable`](ff_core::engine_error::EngineError::Unavailable)
961 /// from [`EngineBackend::claim_execution`] today — grants on those
962 /// backends flow through the scheduler-routed [`claim_via_server`]
963 /// path (the PG/SQLite scheduler lives outside the `EngineBackend`
964 /// trait in this release). See
965 /// `project_claim_from_grant_pg_sqlite_gap.md` for motivation and
966 /// planned follow-up.
967 ///
968 /// [`claim_via_server`]: FlowFabricWorker::claim_via_server
969 /// [`EngineBackend::claim_execution`]: ff_core::engine_backend::EngineBackend::claim_execution
970 pub async fn claim_from_grant(
971 &self,
972 lane: LaneId,
973 grant: ff_core::contracts::ClaimGrant,
974 ) -> Result<ClaimedTask, SdkError> {
975 // Semaphore check FIRST. If the worker is saturated we must
976 // surface the condition to the caller without touching the
977 // grant — silently returning Ok(None) (as claim_next does)
978 // would drop a grant the scheduler has already committed work
979 // to issuing, wasting the slot until its TTL elapses.
980 let permit = self
981 .concurrency_semaphore
982 .clone()
983 .try_acquire_owned()
984 .map_err(|_| SdkError::WorkerAtCapacity)?;
985
986 let now = TimestampMs::now();
987 let partition = grant.partition().map_err(|e| SdkError::Config {
988 context: "claim_from_grant".to_owned(),
989 field: Some("partition_key".to_owned()),
990 message: e.to_string(),
991 })?;
992 let mut task = match self
993 .claim_execution(&grant.execution_id, &lane, &partition, now)
994 .await
995 {
996 Ok(task) => task,
997 Err(SdkError::Engine(ref boxed))
998 if matches!(
999 **boxed,
1000 crate::EngineError::Contention(
1001 crate::ContentionKind::UseClaimResumedExecution
1002 )
1003 ) =>
1004 {
1005 // Execution was resumed from suspension — attempt_interrupted.
1006 // ff_claim_execution rejects this; use ff_claim_resumed_execution
1007 // which reuses the existing attempt instead of creating a new one.
1008 // Mirrors the fallback inside `claim_next` so HTTP-routed callers
1009 // (`claim_via_server` → `claim_from_grant`) get the same transparent
1010 // re-claim behavior.
1011 tracing::debug!(
1012 execution_id = %grant.execution_id,
1013 "execution is resumed, using claim_resumed path"
1014 );
1015 self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
1016 .await?
1017 }
1018 Err(e) => return Err(e),
1019 };
1020 task.set_concurrency_permit(permit);
1021 Ok(task)
1022 }
1023
1024 /// Scheduler-routed claim: POST the server's
1025 /// `/v1/workers/{id}/claim`, then chain to
1026 /// [`Self::claim_from_grant`].
1027 ///
1028 /// Batch C item 2 PR-B. This is the production entry point —
1029 /// budget + quota + capability admission run server-side inside
1030 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1031 /// enable the `direct-valkey-claim` feature.
1032 ///
1033 /// Returns `Ok(None)` when the server says no eligible execution
1034 /// (HTTP 204). Callers typically back off by
1035 /// `config.claim_poll_interval_ms` and try again, same cadence
1036 /// as the direct-claim path's `Ok(None)`.
1037 ///
1038 /// The `admin` client is the established HTTP surface
1039 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1040 /// second reqwest client around. Build once at worker boot and
1041 /// hand in by reference on every claim.
1042 pub async fn claim_via_server(
1043 &self,
1044 admin: &crate::FlowFabricAdminClient,
1045 lane: &LaneId,
1046 grant_ttl_ms: u64,
1047 ) -> Result<Option<ClaimedTask>, SdkError> {
1048 let req = crate::admin::ClaimForWorkerRequest {
1049 worker_id: self.config.worker_id.to_string(),
1050 lane_id: lane.to_string(),
1051 worker_instance_id: self.config.worker_instance_id.to_string(),
1052 capabilities: self.config.capabilities.clone(),
1053 grant_ttl_ms,
1054 };
1055 let Some(resp) = admin.claim_for_worker(req).await? else {
1056 return Ok(None);
1057 };
1058 let grant = resp.into_grant()?;
1059 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1060 }
1061
1062 /// Consume a [`ResumeGrant`] and transition the granted
1063 /// `attempt_interrupted` execution into a `started` state on this
1064 /// worker. Symmetric partner to [`claim_from_grant`] for the
1065 /// resume path.
1066 ///
1067 /// **Renamed from `claim_from_reclaim_grant` (RFC-024 PR-B+C).**
1068 /// The new `claim_from_reclaim_grant` method lands with PR-G and
1069 /// dispatches to [`EngineBackend::reclaim_execution`] for the
1070 /// lease-reclaim path (distinct semantic, distinct grant type).
1071 ///
1072 /// The grant must have been issued to THIS worker (matching
1073 /// `worker_id` at grant time). A mismatch returns
1074 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1075 /// atomically by `ff_claim_resumed_execution`; a second call with
1076 /// the same grant also returns `InvalidClaimGrant`.
1077 ///
1078 /// # Concurrency
1079 ///
1080 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1081 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1082 /// assume pre-existing capacity on this worker — a reclaim can
1083 /// land on a fresh worker instance that just came up after a
1084 /// crash/restart and is picking up a previously-interrupted
1085 /// execution. If the worker is saturated, the grant stays valid
1086 /// for its remaining TTL and the caller can release it or retry.
1087 ///
1088 /// On success the returned [`ClaimedTask`] holds a concurrency
1089 /// permit that releases automatically on
1090 /// `complete`/`fail`/`cancel`/drop.
1091 ///
1092 /// # Errors
1093 ///
1094 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1095 /// permits all held. Retryable; the grant is untouched (no
1096 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1097 /// atomically consume the grant key).
1098 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1099 /// or `worker_id` mismatch.
1100 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1101 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1102 /// `attempt_interrupted`.
1103 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1104 /// not `runnable`.
1105 /// * `ScriptError::ExecutionNotFound` — core key missing.
1106 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1107 /// transport.
1108 ///
1109 /// [`ResumeGrant`]: ff_core::contracts::ResumeGrant
1110 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1111 pub async fn claim_from_resume_grant(
1112 &self,
1113 grant: ff_core::contracts::ResumeGrant,
1114 ) -> Result<ClaimedTask, SdkError> {
1115 // Semaphore check FIRST — same load-bearing ordering as
1116 // `claim_from_grant`. If the worker is saturated, surface
1117 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1118 // atomic consume on the grant key, so calling it past-
1119 // saturation would destroy the grant while leaving no
1120 // permit to attach to the returned `ClaimedTask`.
1121 let permit = self
1122 .concurrency_semaphore
1123 .clone()
1124 .try_acquire_owned()
1125 .map_err(|_| SdkError::WorkerAtCapacity)?;
1126
1127 // Grant carries partition + lane_id so no round-trip is needed
1128 // to resolve them before the FCALL.
1129 let partition = grant.partition().map_err(|e| SdkError::Config {
1130 context: "claim_from_resume_grant".to_owned(),
1131 field: Some("partition_key".to_owned()),
1132 message: e.to_string(),
1133 })?;
1134 let mut task = self
1135 .claim_resumed_execution(
1136 &grant.execution_id,
1137 &grant.lane_id,
1138 &partition,
1139 )
1140 .await?;
1141 task.set_concurrency_permit(permit);
1142 Ok(task)
1143 }
1144
1145 /// Consume a [`ReclaimGrant`] to mint a fresh attempt for a
1146 /// lease-expired / lease-revoked execution (RFC-024 §3.4).
1147 ///
1148 /// Backend-agnostic. Routes through
1149 /// [`EngineBackend::reclaim_execution`] on whatever backend the
1150 /// worker was connected with (Valkey via `connect` / `connect_with`,
1151 /// Postgres / SQLite via `connect_with`). Distinct from
1152 /// [`claim_from_resume_grant`]: reclaim creates a NEW attempt row
1153 /// and bumps `lease_reclaim_count` (`HandleKind::Reclaimed`), while
1154 /// resume re-uses the existing attempt under
1155 /// `ff_claim_resumed_execution` (`HandleKind::Resumed`).
1156 ///
1157 /// # Return shape
1158 ///
1159 /// Returns the raw [`ReclaimExecutionOutcome`] so consumers match
1160 /// on the four outcomes (`Claimed(Handle)`, `NotReclaimable`,
1161 /// `ReclaimCapExceeded`, `GrantNotFound`) and decide their
1162 /// dispatch. The `Claimed` variant carries a [`Handle`] whose
1163 /// `kind` is [`HandleKind::Reclaimed`]; downstream ops (complete,
1164 /// fail, renew, append_frame, …) take the handle directly via the
1165 /// `EngineBackend` trait.
1166 ///
1167 /// This contrasts with [`claim_from_resume_grant`], which wraps
1168 /// the handle in a [`ClaimedTask`] with a concurrency-permit and
1169 /// auto-lease-renewal loop. Those affordances are
1170 /// `valkey-default`-gated today (they depend on the bundled
1171 /// `ferriskey::Client` + `lease_ttl_ms` renewal timer). The
1172 /// reclaim surface is intentionally narrower so it compiles under
1173 /// `--no-default-features, features = ["sqlite"]` and consumers
1174 /// can drive the reclaim flow on any backend.
1175 ///
1176 /// # Feature compatibility
1177 ///
1178 /// No cfg-gate. Compiles + runs under every feature set ff-sdk
1179 /// supports (including sqlite-only). Verified by the compile-time
1180 /// type assertion
1181 /// `worker_claim_from_reclaim_grant_is_backend_agnostic_at_type_level`
1182 /// in `crates/ff-sdk/tests/rfc024_sdk.rs`, which pins the method's
1183 /// full signature under the default feature set and is paralleled
1184 /// by `sqlite_only_compile_surface_tests` in this file for the
1185 /// `--no-default-features, features = ["sqlite"]` compile anchor on
1186 /// the rest of the backend-agnostic surface.
1187 ///
1188 /// # worker_capabilities
1189 ///
1190 /// `ReclaimExecutionArgs::worker_capabilities` is NOT part of the
1191 /// Lua FCALL (the reclaim Lua validates grant consumption via
1192 /// `grant.worker_id == args.worker_id` only — see
1193 /// `crates/ff-script/src/flowfabric.lua:3088` and RFC-024 §4.4).
1194 /// Capability matching happens at grant-issuance time (see
1195 /// [`FlowFabricAdminClient::issue_reclaim_grant`]
1196 /// — in the `ff-sdk::admin` module, `valkey-default`-gated).
1197 ///
1198 /// # Errors
1199 ///
1200 /// * [`SdkError::Engine`] — the backend's `reclaim_execution`
1201 /// returned an [`EngineError`] (transport fault, validation
1202 /// failure, `Unavailable` from a backend that does not
1203 /// implement RFC-024 — currently only pre-RFC out-of-tree
1204 /// backends).
1205 ///
1206 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1207 /// [`ReclaimExecutionOutcome`]: ff_core::contracts::ReclaimExecutionOutcome
1208 /// [`Handle`]: ff_core::backend::Handle
1209 /// [`HandleKind::Reclaimed`]: ff_core::backend::HandleKind::Reclaimed
1210 /// [`EngineBackend::reclaim_execution`]: ff_core::engine_backend::EngineBackend::reclaim_execution
1211 /// [`EngineError`]: ff_core::engine_error::EngineError
1212 /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1213 /// [`FlowFabricAdminClient::issue_reclaim_grant`]: crate::admin::FlowFabricAdminClient::issue_reclaim_grant
1214 pub async fn claim_from_reclaim_grant(
1215 &self,
1216 grant: ff_core::contracts::ReclaimGrant,
1217 args: ff_core::contracts::ReclaimExecutionArgs,
1218 ) -> Result<ff_core::contracts::ReclaimExecutionOutcome, SdkError> {
1219 // `ReclaimGrant` is accepted as a parameter so the call-site
1220 // shape matches RFC-024 §3.4 (consumer receives a grant from
1221 // `issue_reclaim_grant` and feeds it + args into
1222 // `claim_from_reclaim_grant`). The grant metadata is
1223 // already embedded in the backend's server-side store
1224 // (Valkey `claim_grant` hash, PG/SQLite `ff_claim_grant`
1225 // table) keyed by (execution_id, grant_key) — the trait
1226 // method looks it up from `args.execution_id`.
1227 //
1228 // The overlap between `ReclaimGrant` and
1229 // `ReclaimExecutionArgs` is (execution_id, lane_id);
1230 // mismatched grant/args is a consumer-side bug (grant-for-A
1231 // + args-for-B), and silently forwarding lets the SDK act
1232 // on the args execution. Reject the mismatch up front so
1233 // the misuse surfaces at the SDK boundary instead of
1234 // succeeding against the wrong execution. The grant's
1235 // `expires_at_ms` is validated against wall-clock now so
1236 // an already-expired grant is rejected without a backend
1237 // round-trip (the backend also enforces expiry, but the
1238 // SDK-side check gives a crisper error and preserves the
1239 // reclaim-budget / lease slot).
1240 // `TimestampMs` is an `i64` (Unix-epoch ms); cast to `u64` for
1241 // comparison against `ReclaimGrant::expires_at_ms`. Clamp
1242 // negatives to 0 so a pre-epoch system clock (vanishingly
1243 // unlikely, but representable) doesn't wrap to a future u64.
1244 let now_ms: u64 = ff_core::types::TimestampMs::now().0.max(0) as u64;
1245 validate_reclaim_grant_against_args(&grant, &args, now_ms)?;
1246 self.backend
1247 .reclaim_execution(args)
1248 .await
1249 .map_err(|e| SdkError::Engine(Box::new(e)))
1250 }
1251
1252 /// Low-level resume claim. Forwards through
1253 /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1254 /// — the trait-level trigger surface landed in issue #150 — and
1255 /// returns a [`ClaimedTask`] bound to the resumed attempt.
1256 ///
1257 /// The method stays private; external callers use
1258 /// [`claim_from_resume_grant`].
1259 ///
1260 /// [`claim_from_resume_grant`]: FlowFabricWorker::claim_from_resume_grant
1261 async fn claim_resumed_execution(
1262 &self,
1263 execution_id: &ExecutionId,
1264 lane_id: &LaneId,
1265 partition: &ff_core::partition::Partition,
1266 ) -> Result<ClaimedTask, SdkError> {
1267 // v0.12 PR-3 — pre-read current_attempt_index via the trait
1268 // rather than an inline `HGET` on the Valkey client. Load-bearing:
1269 // the backend's KEYS[6] (Valkey) / `ff_attempt` PK tuple (PG/SQLite)
1270 // must target the real existing attempt hash/row, and the backend
1271 // takes the index verbatim from
1272 // `ClaimResumedExecutionArgs::current_attempt_index`.
1273 let att_idx = self
1274 .backend
1275 .read_current_attempt_index(execution_id)
1276 .await
1277 .map_err(|e| SdkError::Engine(Box::new(e)))?;
1278
1279 let args = ff_core::contracts::ClaimResumedExecutionArgs {
1280 execution_id: execution_id.clone(),
1281 worker_id: self.config.worker_id.clone(),
1282 worker_instance_id: self.config.worker_instance_id.clone(),
1283 lane_id: lane_id.clone(),
1284 lease_id: LeaseId::new(),
1285 lease_ttl_ms: self.config.lease_ttl_ms,
1286 current_attempt_index: att_idx,
1287 remaining_attempt_timeout_ms: None,
1288 now: TimestampMs::now(),
1289 };
1290
1291 let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1292 self.backend.claim_resumed_execution(args).await?;
1293
1294 let (input_payload, execution_kind, tags) = self
1295 .read_execution_context(execution_id, partition)
1296 .await?;
1297
1298 Ok(ClaimedTask::new(
1299 self.backend.clone(),
1300 self.partition_config,
1301 claimed.handle,
1302 execution_id.clone(),
1303 claimed.attempt_index,
1304 claimed.attempt_id,
1305 claimed.lease_id,
1306 claimed.lease_epoch,
1307 self.config.lease_ttl_ms,
1308 lane_id.clone(),
1309 self.config.worker_instance_id.clone(),
1310 input_payload,
1311 execution_kind,
1312 tags,
1313 ))
1314 }
1315
1316 /// Read payload + execution_kind + tags from exec_core.
1317 ///
1318 /// As of v0.12 PR-1 this forwards through
1319 /// [`EngineBackend::read_execution_context`](ff_core::engine_backend::EngineBackend::read_execution_context)
1320 /// rather than issuing direct GET/HGET/HGETALL against Valkey. The
1321 /// outer `valkey-default` gate + `(&ExecutionId, &Partition)`
1322 /// signature are preserved; hot-path decoupling (ungating this
1323 /// helper + its call sites in `claim_execution` and
1324 /// `claim_resumed_execution`) is PR-4/PR-5 scope per the v0.12
1325 /// agnostic-SDK plan.
1326 async fn read_execution_context(
1327 &self,
1328 execution_id: &ExecutionId,
1329 _partition: &ff_core::partition::Partition,
1330 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1331 let ctx = self.backend.read_execution_context(execution_id).await?;
1332 Ok((ctx.input_payload, ctx.execution_kind, ctx.tags))
1333 }
1334
1335 // ── Phase 3: Signal delivery ──
1336
1337 /// Deliver a signal to a suspended execution's waitpoint.
1338 ///
1339 /// The engine atomically records the signal, evaluates the resume condition,
1340 /// and optionally transitions the execution from `suspended` to `runnable`.
1341 ///
1342 /// Forwards through
1343 /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1344 /// — the trait-level trigger surface landed in issue #150.
1345 ///
1346 /// Backend-agnostic as of v0.12 PR-3. Compiles + runs under every
1347 /// feature set ff-sdk supports (including
1348 /// `--no-default-features --features sqlite`); pinned by
1349 /// `sqlite_only_compile_surface_tests::deliver_signal_addressable_under_sqlite_only`.
1350 pub async fn deliver_signal(
1351 &self,
1352 execution_id: &ExecutionId,
1353 waitpoint_id: &WaitpointId,
1354 signal: crate::task::Signal,
1355 ) -> Result<crate::task::SignalOutcome, SdkError> {
1356 let args = ff_core::contracts::DeliverSignalArgs {
1357 execution_id: execution_id.clone(),
1358 waitpoint_id: waitpoint_id.clone(),
1359 signal_id: ff_core::types::SignalId::new(),
1360 signal_name: signal.signal_name,
1361 signal_category: signal.signal_category,
1362 source_type: signal.source_type,
1363 source_identity: signal.source_identity,
1364 payload: signal.payload,
1365 payload_encoding: Some("json".to_owned()),
1366 correlation_id: None,
1367 idempotency_key: signal.idempotency_key,
1368 target_scope: "waitpoint".to_owned(),
1369 created_at: Some(TimestampMs::now()),
1370 dedup_ttl_ms: None,
1371 resume_delay_ms: None,
1372 max_signals_per_execution: None,
1373 signal_maxlen: None,
1374 waitpoint_token: signal.waitpoint_token,
1375 now: TimestampMs::now(),
1376 };
1377
1378 let result = self.backend.deliver_signal(args).await?;
1379 Ok(match result {
1380 ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1381 if effect == "resume_condition_satisfied" {
1382 crate::task::SignalOutcome::TriggeredResume { signal_id }
1383 } else {
1384 crate::task::SignalOutcome::Accepted { signal_id, effect }
1385 }
1386 }
1387 ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1388 crate::task::SignalOutcome::Duplicate {
1389 existing_signal_id: existing_signal_id.to_string(),
1390 }
1391 }
1392 })
1393 }
1394
1395 #[cfg(feature = "direct-valkey-claim")]
1396 fn next_lane(&self) -> LaneId {
1397 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1398 self.config.lanes[idx].clone()
1399 }
1400}
1401
1402#[cfg(feature = "direct-valkey-claim")]
1403fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1404 use ff_core::error::ErrorClass;
1405 matches!(
1406 ff_script::engine_error_ext::class(err),
1407 ErrorClass::Retryable | ErrorClass::Informational
1408 )
1409}
1410
1411/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1412/// instance id with FNV-1a to place distinct worker processes on different
1413/// partition windows from their first poll. Zero is valid for single-worker
1414/// clusters but spreads work in multi-worker deployments.
1415#[cfg(feature = "direct-valkey-claim")]
1416fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1417 if num_partitions == 0 {
1418 return 0;
1419 }
1420 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1421}
1422
1423/// Cross-check the [`ReclaimGrant`] handed back by
1424/// `issue_reclaim_grant` against the [`ReclaimExecutionArgs`] the
1425/// consumer is about to dispatch. Catches the grant-for-A + args-for-B
1426/// misuse at the SDK boundary before a backend round-trip (PR #407
1427/// review F1).
1428///
1429/// The overlap between the two types is `(execution_id, lane_id)`;
1430/// `partition_key` / `grant_key` live only on the grant and are
1431/// verified server-side. The grant's `expires_at_ms` is also
1432/// validated against `now_ms` so an expired grant fails fast without
1433/// burning a backend call (the backend enforces expiry too, but the
1434/// SDK-side check gives a crisper, typed error).
1435///
1436/// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1437/// [`ReclaimExecutionArgs`]: ff_core::contracts::ReclaimExecutionArgs
1438fn validate_reclaim_grant_against_args(
1439 grant: &ff_core::contracts::ReclaimGrant,
1440 args: &ff_core::contracts::ReclaimExecutionArgs,
1441 now_ms: u64,
1442) -> Result<(), SdkError> {
1443 if grant.execution_id != args.execution_id {
1444 return Err(SdkError::Config {
1445 context: "claim_from_reclaim_grant".to_owned(),
1446 field: Some("execution_id".to_owned()),
1447 message: format!(
1448 "grant.execution_id ({}) does not match args.execution_id ({})",
1449 grant.execution_id, args.execution_id
1450 ),
1451 });
1452 }
1453 if grant.lane_id != args.lane_id {
1454 return Err(SdkError::Config {
1455 context: "claim_from_reclaim_grant".to_owned(),
1456 field: Some("lane_id".to_owned()),
1457 message: format!(
1458 "grant.lane_id ({}) does not match args.lane_id ({})",
1459 grant.lane_id.as_str(),
1460 args.lane_id.as_str()
1461 ),
1462 });
1463 }
1464 if grant.expires_at_ms <= now_ms {
1465 return Err(SdkError::Config {
1466 context: "claim_from_reclaim_grant".to_owned(),
1467 field: Some("expires_at_ms".to_owned()),
1468 message: format!(
1469 "grant expired: expires_at_ms={} now_ms={}",
1470 grant.expires_at_ms, now_ms
1471 ),
1472 });
1473 }
1474 Ok(())
1475}
1476
1477#[cfg(test)]
1478mod reclaim_grant_validation_tests {
1479 //! Unit tests for `validate_reclaim_grant_against_args` — the
1480 //! SDK-side cross-check that catches grant/args mismatch (PR #407
1481 //! review F1). No Valkey / backend required: the helper is pure.
1482 use super::validate_reclaim_grant_against_args;
1483 use crate::SdkError;
1484 use ff_core::contracts::{ReclaimExecutionArgs, ReclaimGrant};
1485 use ff_core::partition::{Partition, PartitionFamily, PartitionKey};
1486 use ff_core::types::{
1487 AttemptId, AttemptIndex, ExecutionId, LaneId, LeaseId, WorkerId, WorkerInstanceId,
1488 };
1489
1490 const EXEC_A: &str = "{fp:7}:00000000-0000-4000-8000-000000000001";
1491 const EXEC_B: &str = "{fp:7}:00000000-0000-4000-8000-000000000002";
1492
1493 fn exec(s: &str) -> ExecutionId {
1494 ExecutionId::parse(s).expect("valid execution id")
1495 }
1496
1497 fn grant_for(execution_id: ExecutionId, lane: &str, expires_at_ms: u64) -> ReclaimGrant {
1498 ReclaimGrant::new(
1499 execution_id,
1500 PartitionKey::from(&Partition { family: PartitionFamily::Flow, index: 7 }),
1501 "reclaim:grant:abc".to_owned(),
1502 expires_at_ms,
1503 LaneId::new(lane),
1504 )
1505 }
1506
1507 fn args_for(execution_id: ExecutionId, lane: &str) -> ReclaimExecutionArgs {
1508 ReclaimExecutionArgs::new(
1509 execution_id,
1510 WorkerId::new("w1"),
1511 WorkerInstanceId::new("w1-i1"),
1512 LaneId::new(lane),
1513 None,
1514 LeaseId::new(),
1515 30_000,
1516 AttemptId::new(),
1517 "{}".to_owned(),
1518 None,
1519 WorkerInstanceId::new("w1-i0"),
1520 AttemptIndex::new(0),
1521 )
1522 }
1523
1524 #[test]
1525 fn accepts_matching_grant_and_args() {
1526 let g = grant_for(exec(EXEC_A), "main", 2_000);
1527 let a = args_for(exec(EXEC_A), "main");
1528 assert!(validate_reclaim_grant_against_args(&g, &a, 1_000).is_ok());
1529 }
1530
1531 #[test]
1532 fn rejects_mismatched_execution_id() {
1533 let g = grant_for(exec(EXEC_A), "main", 2_000);
1534 let a = args_for(exec(EXEC_B), "main");
1535 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1536 .expect_err("expected mismatched execution_id to fail");
1537 match err {
1538 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("execution_id")),
1539 other => panic!("expected Config error, got {other:?}"),
1540 }
1541 }
1542
1543 #[test]
1544 fn rejects_mismatched_lane_id() {
1545 let g = grant_for(exec(EXEC_A), "main", 2_000);
1546 let a = args_for(exec(EXEC_A), "other");
1547 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1548 .expect_err("expected mismatched lane_id to fail");
1549 match err {
1550 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("lane_id")),
1551 other => panic!("expected Config error, got {other:?}"),
1552 }
1553 }
1554
1555 #[test]
1556 fn rejects_expired_grant() {
1557 // expires_at_ms == now_ms is rejected (strict `<=`) so the
1558 // server's TTL enforcement window can't race us.
1559 let g = grant_for(exec(EXEC_A), "main", 1_000);
1560 let a = args_for(exec(EXEC_A), "main");
1561 let err = validate_reclaim_grant_against_args(&g, &a, 1_000)
1562 .expect_err("expected expired grant to fail");
1563 match err {
1564 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1565 other => panic!("expected Config error, got {other:?}"),
1566 }
1567
1568 // Also rejected when already past expiry.
1569 let err2 = validate_reclaim_grant_against_args(&g, &a, 5_000)
1570 .expect_err("expected past-expiry grant to fail");
1571 match err2 {
1572 SdkError::Config { field, .. } => assert_eq!(field.as_deref(), Some("expires_at_ms")),
1573 other => panic!("expected Config error, got {other:?}"),
1574 }
1575 }
1576}
1577
1578#[cfg(test)]
1579mod completion_accessor_type_tests {
1580 //! Type-level compile check that
1581 //! [`FlowFabricWorker::completion_backend`] returns an
1582 //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1583 //! the assertion is at the function-pointer type level and the
1584 //! #[test] body exists solely so the compiler elaborates it.
1585 use super::FlowFabricWorker;
1586 use ff_core::completion_backend::CompletionBackend;
1587 use std::sync::Arc;
1588
1589 #[test]
1590 fn completion_backend_accessor_signature() {
1591 // If this line compiles, the public accessor returns the
1592 // advertised type. The function is never called (no live
1593 // worker), so no I/O happens.
1594 let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1595 FlowFabricWorker::completion_backend;
1596 }
1597}
1598
1599/// RFC-023 Phase 1a §4.4 item 10g compile-only anchor. Parallels
1600/// `completion_accessor_type_tests` but fires under the sqlite-only
1601/// feature set: proves `FlowFabricWorker::connect_with` is callable
1602/// and returns the advertised type under `--no-default-features,
1603/// features = ["sqlite"]`. The matching `§9` CI cell (`cargo check
1604/// -p ff-sdk --no-default-features --features sqlite`) executes this
1605/// compile check mechanically so future PRs that accidentally reach
1606/// outside the `valkey-default` gate fail the build.
1607#[cfg(all(test, not(feature = "valkey-default")))]
1608mod sqlite_only_compile_surface_tests {
1609 use super::FlowFabricWorker;
1610 use ff_core::completion_backend::CompletionBackend;
1611 use ff_core::engine_backend::EngineBackend;
1612 use std::sync::Arc;
1613
1614 #[test]
1615 fn addressable_surface_under_sqlite_only() {
1616 // Type-level proof that the backend-agnostic accessors are
1617 // reachable without the `valkey-default` feature. None of
1618 // these are called; the assignment targets pin the signature.
1619 let _a: fn(
1620 &FlowFabricWorker,
1621 ) -> Option<&Arc<dyn EngineBackend>> = FlowFabricWorker::backend;
1622 let _b: fn(
1623 &FlowFabricWorker,
1624 ) -> Option<Arc<dyn CompletionBackend>> =
1625 FlowFabricWorker::completion_backend;
1626 let _c: fn(&FlowFabricWorker) -> &crate::config::WorkerConfig =
1627 FlowFabricWorker::config;
1628 let _d: fn(&FlowFabricWorker) -> &ff_core::partition::PartitionConfig =
1629 FlowFabricWorker::partition_config;
1630 }
1631
1632 /// v0.12 PR-1 compile anchor — the new
1633 /// [`EngineBackend::read_execution_context`] trait method MUST be
1634 /// addressable under `--no-default-features --features sqlite`. A
1635 /// direct fn-pointer cast is awkward under `#[async_trait]`
1636 /// (lifetime-generic fn items don't coerce to `fn` pointers), so
1637 /// we take the next-best compile proof: exercise a generic that
1638 /// names the method through the trait bound. A bodyless call would
1639 /// require a concrete backend; the generic keeps the test pure
1640 /// compile-time.
1641 #[test]
1642 fn read_execution_context_addressable_under_sqlite_only() {
1643 use ff_core::contracts::ExecutionContext;
1644 use ff_core::engine_error::EngineError;
1645 use ff_core::types::ExecutionId;
1646
1647 #[allow(dead_code)]
1648 async fn _pin<B: EngineBackend + ?Sized>(
1649 b: &B,
1650 id: &ExecutionId,
1651 ) -> Result<ExecutionContext, EngineError> {
1652 b.read_execution_context(id).await
1653 }
1654 }
1655
1656 /// v0.12 PR-2 compile anchor — `ClaimedTask` as a type MUST be
1657 /// addressable under `--no-default-features --features sqlite`
1658 /// (the `task` module is no longer `valkey-default`-gated at the
1659 /// module level). As of v0.12 PR-5.5 the `impl ClaimedTask { ... }`
1660 /// block is likewise ungated: the backend mints the `Handle` at
1661 /// claim time and `cloned_handle` just clones the cached field, so
1662 /// no Valkey-specific synthesis is required. This anchor pins the
1663 /// struct path + its field types through a generic-over-T wrapper
1664 /// so a compile-time lookup of `ClaimedTask` is exercised
1665 /// mechanically under the sqlite-only feature set.
1666 #[test]
1667 fn claimed_task_type_addressable_under_sqlite_only() {
1668 use crate::task::ClaimedTask;
1669
1670 #[allow(dead_code)]
1671 fn _pin<T>(_: std::marker::PhantomData<T>) -> std::marker::PhantomData<ClaimedTask> {
1672 std::marker::PhantomData
1673 }
1674 }
1675
1676 /// v0.12 PR-6 compile anchor — ungating `admin` + `snapshot`
1677 /// modules at module level must not re-introduce ferriskey
1678 /// symbols under `--no-default-features --features sqlite`.
1679 /// Pins that [`FlowFabricAdminClient::new`] and a representative
1680 /// snapshot forwarder are addressable without the
1681 /// `valkey-default` feature.
1682 #[test]
1683 fn admin_and_snapshot_addressable_under_sqlite_only() {
1684 use crate::admin::FlowFabricAdminClient;
1685 use crate::SdkError;
1686 use ff_core::contracts::ExecutionSnapshot;
1687 use ff_core::types::ExecutionId;
1688
1689 // `FlowFabricAdminClient::new` is `fn(impl Into<String>)` —
1690 // can't fn-pointer-coerce directly. A no-op call with a
1691 // concrete `&str` pins the method addressably under the
1692 // sqlite-only feature set (the error return-type is the
1693 // same `SdkError` the rest of the module returns).
1694 #[allow(dead_code)]
1695 fn _pin_admin_new() -> Result<FlowFabricAdminClient, SdkError> {
1696 FlowFabricAdminClient::new("http://anchor")
1697 }
1698
1699 // Snapshot method is `async`, so coerce via the same
1700 // trait-bound pattern as `read_execution_context` above.
1701 #[allow(dead_code)]
1702 async fn _pin_describe(
1703 w: &FlowFabricWorker,
1704 id: &ExecutionId,
1705 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
1706 w.describe_execution(id).await
1707 }
1708 }
1709
1710 /// v0.13 SC-10 compile anchor — the backend-agnostic admin facade
1711 /// (`FlowFabricAdminClient::connect_with` + embedded-transport
1712 /// methods) MUST be addressable under
1713 /// `--no-default-features --features sqlite`. Without this anchor,
1714 /// a regression that re-introduces a ferriskey symbol on the
1715 /// embedded admin path would only surface on downstream
1716 /// sqlite-only consumers.
1717 #[test]
1718 fn admin_facade_addressable_under_sqlite_only() {
1719 use crate::admin::{
1720 ClaimForWorkerRequest, ClaimForWorkerResponse, FlowFabricAdminClient,
1721 IssueReclaimGrantRequest, IssueReclaimGrantResponse, RotateWaitpointSecretRequest,
1722 RotateWaitpointSecretResponse,
1723 };
1724 use crate::SdkError;
1725 use std::sync::Arc;
1726
1727 // `connect_with` is infallible; fn-pointer coerce pins the
1728 // signature under the sqlite-only feature set.
1729 let _ctor: fn(Arc<dyn EngineBackend>) -> FlowFabricAdminClient =
1730 FlowFabricAdminClient::connect_with;
1731
1732 // Each async method is generic over `&self` lifetimes under
1733 // `#[async_trait]`-style expansion, so pin via a wrapper fn.
1734 #[allow(dead_code)]
1735 async fn _pin_claim(
1736 c: &FlowFabricAdminClient,
1737 req: ClaimForWorkerRequest,
1738 ) -> Result<Option<ClaimForWorkerResponse>, SdkError> {
1739 c.claim_for_worker(req).await
1740 }
1741 #[allow(dead_code)]
1742 async fn _pin_reclaim(
1743 c: &FlowFabricAdminClient,
1744 id: &str,
1745 req: IssueReclaimGrantRequest,
1746 ) -> Result<IssueReclaimGrantResponse, SdkError> {
1747 c.issue_reclaim_grant(id, req).await
1748 }
1749 #[allow(dead_code)]
1750 async fn _pin_rotate(
1751 c: &FlowFabricAdminClient,
1752 req: RotateWaitpointSecretRequest,
1753 ) -> Result<RotateWaitpointSecretResponse, SdkError> {
1754 c.rotate_waitpoint_secret(req).await
1755 }
1756 }
1757
1758 /// v0.12 PR-3 compile anchor — the new
1759 /// [`EngineBackend::read_current_attempt_index`] trait method MUST
1760 /// be addressable under `--no-default-features --features sqlite`.
1761 /// Mirrors the PR-1 `read_execution_context` anchor: takes a
1762 /// generic over the trait bound so `#[async_trait]` lifetime
1763 /// elision doesn't block an `fn`-pointer coercion.
1764 #[test]
1765 fn read_current_attempt_index_addressable_under_sqlite_only() {
1766 use ff_core::engine_error::EngineError;
1767 use ff_core::types::{AttemptIndex, ExecutionId};
1768
1769 #[allow(dead_code)]
1770 async fn _pin<B: EngineBackend + ?Sized>(
1771 b: &B,
1772 id: &ExecutionId,
1773 ) -> Result<AttemptIndex, EngineError> {
1774 b.read_current_attempt_index(id).await
1775 }
1776 }
1777
1778 /// v0.12 PR-5.5 retry-path-fix compile anchor — the new
1779 /// [`EngineBackend::read_total_attempt_count`] trait method MUST
1780 /// be addressable under `--no-default-features --features sqlite`.
1781 /// Mirrors the PR-3 `read_current_attempt_index` anchor.
1782 #[test]
1783 fn read_total_attempt_count_addressable_under_sqlite_only() {
1784 use ff_core::engine_error::EngineError;
1785 use ff_core::types::{AttemptIndex, ExecutionId};
1786
1787 #[allow(dead_code)]
1788 async fn _pin<B: EngineBackend + ?Sized>(
1789 b: &B,
1790 id: &ExecutionId,
1791 ) -> Result<AttemptIndex, EngineError> {
1792 b.read_total_attempt_count(id).await
1793 }
1794 }
1795
1796 /// v0.12 PR-4 compile anchor — the new
1797 /// [`EngineBackend::claim_execution`] trait method MUST be
1798 /// addressable under `--no-default-features --features sqlite`.
1799 /// Mirrors the PR-3 `read_current_attempt_index` anchor: takes a
1800 /// generic over the trait bound so `#[async_trait]` lifetime
1801 /// elision doesn't block a plain fn-pointer coercion.
1802 ///
1803 /// The method has an `Err(Unavailable)` default impl; PG + SQLite
1804 /// backends don't override it today (grants are Valkey-only until
1805 /// the PG/SQLite grant-consumer RFC lands). The anchor pins the
1806 /// trait-surface signature — not a runtime call — so the compile
1807 /// check passes cleanly on every feature set.
1808 #[test]
1809 fn claim_execution_addressable_under_sqlite_only() {
1810 use ff_core::contracts::{ClaimExecutionArgs, ClaimExecutionResult};
1811 use ff_core::engine_error::EngineError;
1812
1813 #[allow(dead_code)]
1814 async fn _pin<B: EngineBackend + ?Sized>(
1815 b: &B,
1816 args: ClaimExecutionArgs,
1817 ) -> Result<ClaimExecutionResult, EngineError> {
1818 b.claim_execution(args).await
1819 }
1820 }
1821
1822 /// v0.12 PR-5 compile anchor — the three new scanner-primitive
1823 /// trait methods (`scan_eligible_executions`, `issue_claim_grant`,
1824 /// `block_route`) MUST be addressable under
1825 /// `--no-default-features --features sqlite`. Each has an
1826 /// `Err(Unavailable)` default impl; PG + SQLite backends don't
1827 /// override (the scheduler-routed `claim_for_worker` path is the
1828 /// supported PG/SQLite entry point). The anchor pins the trait-
1829 /// surface signatures — not runtime calls — so the compile check
1830 /// passes cleanly on every feature set.
1831 #[test]
1832 fn scan_eligible_executions_addressable_under_sqlite_only() {
1833 use ff_core::contracts::ScanEligibleArgs;
1834 use ff_core::engine_error::EngineError;
1835 use ff_core::types::ExecutionId;
1836
1837 #[allow(dead_code)]
1838 async fn _pin<B: EngineBackend + ?Sized>(
1839 b: &B,
1840 args: ScanEligibleArgs,
1841 ) -> Result<Vec<ExecutionId>, EngineError> {
1842 b.scan_eligible_executions(args).await
1843 }
1844 }
1845
1846 #[test]
1847 fn issue_claim_grant_addressable_under_sqlite_only() {
1848 use ff_core::contracts::{IssueClaimGrantArgs, IssueClaimGrantOutcome};
1849 use ff_core::engine_error::EngineError;
1850
1851 #[allow(dead_code)]
1852 async fn _pin<B: EngineBackend + ?Sized>(
1853 b: &B,
1854 args: IssueClaimGrantArgs,
1855 ) -> Result<IssueClaimGrantOutcome, EngineError> {
1856 b.issue_claim_grant(args).await
1857 }
1858 }
1859
1860 #[test]
1861 fn block_route_addressable_under_sqlite_only() {
1862 use ff_core::contracts::{BlockRouteArgs, BlockRouteOutcome};
1863 use ff_core::engine_error::EngineError;
1864
1865 #[allow(dead_code)]
1866 async fn _pin<B: EngineBackend + ?Sized>(
1867 b: &B,
1868 args: BlockRouteArgs,
1869 ) -> Result<BlockRouteOutcome, EngineError> {
1870 b.block_route(args).await
1871 }
1872 }
1873
1874 /// v0.12 PR-3 compile anchor — `FlowFabricWorker::deliver_signal`
1875 /// MUST be addressable under `--no-default-features --features sqlite`
1876 /// (ungated in PR-3 — the body is pure
1877 /// `self.backend.deliver_signal(...)` trait dispatch).
1878 #[test]
1879 fn deliver_signal_addressable_under_sqlite_only() {
1880 use crate::task::{Signal, SignalOutcome};
1881 use crate::SdkError;
1882 use ff_core::types::{ExecutionId, WaitpointId};
1883 use std::future::Future;
1884 use std::pin::Pin;
1885
1886 // `deliver_signal` is `async fn`, so its item signature bakes
1887 // in a hidden lifetime and opaque return future. Take an
1888 // `fn`-pointer to an explicit wrapper that names the return
1889 // type through the trait — same shape as the PR-1 anchor
1890 // (`read_execution_context_addressable_under_sqlite_only`).
1891 #[allow(dead_code)]
1892 fn _pin<'a>(
1893 w: &'a FlowFabricWorker,
1894 id: &'a ExecutionId,
1895 wp: &'a WaitpointId,
1896 s: Signal,
1897 ) -> Pin<Box<dyn Future<Output = Result<SignalOutcome, SdkError>> + Send + 'a>> {
1898 Box::pin(w.deliver_signal(id, wp, s))
1899 }
1900 }
1901
1902 /// v0.12 PR-5.5 compile anchor — `claim_from_grant` MUST be
1903 /// addressable under `--no-default-features --features sqlite`
1904 /// (ungated in PR-5.5). PG / SQLite return `EngineError::Unavailable`
1905 /// from the underlying trait method today, so the call path is
1906 /// compile-reachable but runtime-unavailable. The anchor pins the
1907 /// signature; a future PR wiring real PG/SQLite grant-consumer
1908 /// bodies flips the runtime behaviour without touching this test.
1909 #[test]
1910 fn claim_from_grant_addressable_under_sqlite_only() {
1911 use crate::task::ClaimedTask;
1912 use crate::SdkError;
1913 use ff_core::contracts::ClaimGrant;
1914 use ff_core::types::LaneId;
1915 use std::future::Future;
1916 use std::pin::Pin;
1917
1918 #[allow(dead_code)]
1919 fn _pin<'a>(
1920 w: &'a FlowFabricWorker,
1921 lane: LaneId,
1922 grant: ClaimGrant,
1923 ) -> Pin<Box<dyn Future<Output = Result<ClaimedTask, SdkError>> + Send + 'a>> {
1924 Box::pin(w.claim_from_grant(lane, grant))
1925 }
1926 }
1927
1928 /// v0.12 PR-5.5 compile anchor — `claim_via_server` MUST be
1929 /// addressable under `--no-default-features --features sqlite`.
1930 /// The scheduler-routed path is the supported PG/SQLite claim
1931 /// entry point per `project_claim_from_grant_pg_sqlite_gap.md`;
1932 /// pinning the signature here prevents a future PR from
1933 /// accidentally re-gating it behind `valkey-default`.
1934 #[test]
1935 fn claim_via_server_addressable_under_sqlite_only() {
1936 use crate::admin::FlowFabricAdminClient;
1937 use crate::task::ClaimedTask;
1938 use crate::SdkError;
1939 use ff_core::types::LaneId;
1940 use std::future::Future;
1941 use std::pin::Pin;
1942
1943 #[allow(dead_code)]
1944 fn _pin<'a>(
1945 w: &'a FlowFabricWorker,
1946 admin: &'a FlowFabricAdminClient,
1947 lane: &'a LaneId,
1948 grant_ttl_ms: u64,
1949 ) -> Pin<Box<dyn Future<Output = Result<Option<ClaimedTask>, SdkError>> + Send + 'a>>
1950 {
1951 Box::pin(w.claim_via_server(admin, lane, grant_ttl_ms))
1952 }
1953 }
1954
1955 /// v0.12 PR-5.5 compile anchor — `ClaimedTask::{complete, fail,
1956 /// cancel}` MUST be addressable under `--no-default-features
1957 /// --features sqlite`. The `impl ClaimedTask` block is now
1958 /// module-level ungated (PR-5.5); the terminal ops route through
1959 /// `EngineBackend::{complete, fail, cancel}` which are core trait
1960 /// methods (no `streaming` / `suspension` / `budget` gate).
1961 #[test]
1962 fn claimed_task_terminal_ops_addressable_under_sqlite_only() {
1963 use crate::task::{ClaimedTask, FailOutcome};
1964 use crate::SdkError;
1965 use std::future::Future;
1966 use std::pin::Pin;
1967
1968 #[allow(dead_code)]
1969 fn _pin_complete(
1970 t: ClaimedTask,
1971 payload: Option<Vec<u8>>,
1972 ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send>> {
1973 Box::pin(t.complete(payload))
1974 }
1975
1976 #[allow(dead_code)]
1977 fn _pin_fail<'a>(
1978 t: ClaimedTask,
1979 reason: &'a str,
1980 error_category: &'a str,
1981 ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, SdkError>> + Send + 'a>> {
1982 Box::pin(t.fail(reason, error_category))
1983 }
1984
1985 #[allow(dead_code)]
1986 fn _pin_cancel<'a>(
1987 t: ClaimedTask,
1988 reason: &'a str,
1989 ) -> Pin<Box<dyn Future<Output = Result<(), SdkError>> + Send + 'a>> {
1990 Box::pin(t.cancel(reason))
1991 }
1992 }
1993}
1994
1995#[cfg(all(test, feature = "direct-valkey-claim"))]
1996mod scan_cursor_tests {
1997 use super::scan_cursor_seed;
1998
1999 #[test]
2000 fn stable_for_same_input() {
2001 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
2002 }
2003
2004 #[test]
2005 fn distinct_for_different_ids() {
2006 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
2007 }
2008
2009 #[test]
2010 fn bounded_by_partition_count() {
2011 for i in 0..100 {
2012 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
2013 }
2014 }
2015
2016 #[test]
2017 fn zero_partitions_returns_zero() {
2018 assert_eq!(scan_cursor_seed("w1", 0), 0);
2019 }
2020}