ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43/// if let Some(task) = worker.claim_next().await? {
44/// // Process task...
45/// task.complete(Some(b"result".to_vec())).await?;
46/// } else {
47/// tokio::time::sleep(Duration::from_secs(1)).await;
48/// }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52 client: Client,
53 config: WorkerConfig,
54 partition_config: PartitionConfig,
55 /// Sorted, deduplicated, comma-separated capabilities — computed once
56 /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57 /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58 /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59 /// reproducible logs and tests.
60 #[cfg(feature = "direct-valkey-claim")]
61 worker_capabilities_csv: String,
62 /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63 /// per-mismatch logs so the 4KB CSV never echoes on every reject
64 /// during an incident. Full CSV logged once at connect-time WARN for
65 /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66 #[cfg(feature = "direct-valkey-claim")]
67 worker_capabilities_hash: String,
68 #[cfg(feature = "direct-valkey-claim")]
69 lane_index: AtomicUsize,
70 /// Concurrency cap for in-flight tasks. Permits are acquired or
71 /// transferred by [`claim_next`] (feature-gated),
72 /// [`claim_from_grant`] (always available), and
73 /// [`claim_from_reclaim_grant`], transferred to the returned
74 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75 /// Holds `max_concurrent_tasks` permits total.
76 ///
77 /// [`claim_next`]: FlowFabricWorker::claim_next
78 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80 concurrency_semaphore: Arc<Semaphore>,
81 /// Rolling offset for chunked partition scans. Each poll advances the
82 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83 /// chunk)` polls every partition is covered. The initial value is
84 /// derived from `worker_instance_id` so idle workers spread their
85 /// scans across different partitions from the first poll onward.
86 ///
87 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91 /// correctness because the sequence simply restarts a new cycle.
92 #[cfg(feature = "direct-valkey-claim")]
93 scan_cursor: AtomicUsize,
94 /// The [`EngineBackend`] the Stage-1b trait forwarders route
95 /// through.
96 ///
97 /// **RFC-012 Stage 1b.** Always populated:
98 /// [`FlowFabricWorker::connect`] now wraps the worker's own
99 /// `ferriskey::Client` in a `ValkeyBackend` via
100 /// `ValkeyBackend::from_client_and_partitions`, and
101 /// [`FlowFabricWorker::connect_with`] replaces that default with
102 /// the caller-supplied `Arc<dyn EngineBackend>`. The
103 /// [`FlowFabricWorker::backend`] accessor still returns
104 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
105 /// narrows the return type once consumers have migrated.
106 ///
107 /// Hot paths (claim, deliver_signal, admin queries) still use the
108 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
109 /// migrates them through this field, and Stage 1d removes the
110 /// embedded client.
111 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
112 /// Optional handle to the same underlying backend viewed as a
113 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
114 /// Populated by [`Self::connect`] from the bundled
115 /// `ValkeyBackend` (which implements the trait); supplied by the
116 /// caller on [`Self::connect_with`] as an explicit
117 /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
118 /// backend does not support push-based completion" (e.g. a future
119 /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
120 /// and other completion-subscription consumers reach this through
121 /// [`Self::completion_backend`].
122 completion_backend_handle:
123 Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
124}
125
126/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
127/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
128/// O(num_flow_partitions).
129#[cfg(feature = "direct-valkey-claim")]
130const PARTITION_SCAN_CHUNK: usize = 32;
131
132impl FlowFabricWorker {
133 /// Connect to Valkey and prepare the worker.
134 ///
135 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
136 /// library — that is the server's responsibility (ff-server calls
137 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
138 /// the library is already loaded.
139 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
140 if config.lanes.is_empty() {
141 return Err(SdkError::Config {
142 context: "worker_config".into(),
143 field: None,
144 message: "at least one lane is required".into(),
145 });
146 }
147
148 let mut builder = ClientBuilder::new()
149 .host(&config.host, config.port)
150 .connect_timeout(Duration::from_secs(10))
151 .request_timeout(Duration::from_millis(5000));
152 if config.tls {
153 builder = builder.tls();
154 }
155 if config.cluster {
156 builder = builder.cluster();
157 }
158 let client = builder.build()
159 .await
160 .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
161
162 // Verify connectivity
163 let pong: String = client
164 .cmd("PING")
165 .execute()
166 .await
167 .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
168 if pong != "PONG" {
169 return Err(SdkError::Config {
170 context: "worker_connect".into(),
171 field: None,
172 message: format!("unexpected PING response: {pong}"),
173 });
174 }
175
176 // Guard against two worker processes sharing the same
177 // `worker_instance_id`. A duplicate instance would clobber each
178 // other's lease_current/active_index entries and double-claim work.
179 // SET NX on a liveness key with 2× lease TTL; if the key already
180 // exists another process is live. The key auto-expires if this
181 // process crashes without renewal, so a restart after a hard crash
182 // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
183 //
184 // Known limitations of this minimal scheme (documented for operators):
185 // 1. **Startup-only, not runtime.** There is no heartbeat renewal
186 // path. After `2 × lease_ttl_ms` elapses the alive key expires
187 // naturally even while this worker is still running, and a
188 // second process with the same `worker_instance_id` launched
189 // later will successfully SET NX alongside the first. The check
190 // catches misconfiguration at boot; it does not fence duplicates
191 // that appear mid-lifetime. Production deployments should rely
192 // on the orchestrator (Kubernetes, systemd unit with
193 // `Restart=on-failure`, etc.) as the authoritative single-
194 // instance enforcer; this SET NX is belt-and-suspenders.
195 //
196 // 2. **Restart delay after a crash.** If a worker crashes
197 // ungracefully (SIGKILL, container OOM) and is restarted within
198 // `2 × lease_ttl_ms`, the alive key is still present and the
199 // new process exits with `SdkError::Config("duplicate
200 // worker_instance_id ...")`. Options for operators:
201 // - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
202 // before restarting.
203 // - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
204 // unblock the restart.
205 // - Use a fresh `worker_instance_id` for the restart (the
206 // orchestrator should already do this per-Pod).
207 //
208 // 3. **No graceful cleanup on shutdown.** There is no explicit
209 // `disconnect()` call that DELs the alive key. On clean
210 // `SIGTERM` the key lingers until its TTL expires. A follow-up
211 // can add `FlowFabricWorker::disconnect(self)` for callers that
212 // want to skip the restart-delay window.
213 let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
214 let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
215 let set_result: Option<String> = client
216 .cmd("SET")
217 .arg(&alive_key)
218 .arg("1")
219 .arg("NX")
220 .arg("PX")
221 .arg(alive_ttl_ms.to_string().as_str())
222 .execute()
223 .await
224 .map_err(|e| SdkError::ValkeyContext {
225 source: e,
226 context: "SET NX worker alive key".into(),
227 })?;
228 if set_result.is_none() {
229 return Err(SdkError::Config {
230 context: "worker_connect".into(),
231 field: Some("worker_instance_id".into()),
232 message: format!(
233 "duplicate worker_instance_id '{}': another process already holds {alive_key}",
234 config.worker_instance_id
235 ),
236 });
237 }
238
239 // Read partition config from Valkey (set by ff-server on startup).
240 // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
241 let partition_config = read_partition_config(&client).await
242 .unwrap_or_else(|e| {
243 tracing::warn!(
244 error = %e,
245 "ff:config:partitions not found, using defaults"
246 );
247 PartitionConfig::default()
248 });
249
250 let max_tasks = config.max_concurrent_tasks.max(1);
251 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
252
253 tracing::info!(
254 worker_id = %config.worker_id,
255 instance_id = %config.worker_instance_id,
256 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
257 "FlowFabricWorker connected"
258 );
259
260 #[cfg(feature = "direct-valkey-claim")]
261 let scan_cursor_init = scan_cursor_seed(
262 config.worker_instance_id.as_str(),
263 partition_config.num_flow_partitions.max(1) as usize,
264 );
265
266 // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
267 // and deduplicates in one pass; string joining happens once here.
268 //
269 // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
270 // - `,` is the CSV delimiter; a token containing one would split
271 // mid-parse and could let a {"gpu"} worker appear to satisfy
272 // {"gpu,cuda"} (silent auth bypass).
273 // - Empty strings would produce leading/adjacent commas on the
274 // wire and inflate token count for no semantic reason.
275 // - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
276 // `"gpu\n"` vs `"gpu"` produce silent mismatches that are
277 // miserable to debug. Reject anything outside printable ASCII
278 // excluding space (`'!'..='~'`) at ingress so a typo fails
279 // loudly at connect instead of silently mis-routing forever.
280 // Reject at boot so operator misconfig is loud, symmetric with the
281 // scheduler path.
282 #[cfg(feature = "direct-valkey-claim")]
283 for cap in &config.capabilities {
284 if cap.is_empty() {
285 return Err(SdkError::Config {
286 context: "worker_config".into(),
287 field: Some("capabilities".into()),
288 message: "capability token must not be empty".into(),
289 });
290 }
291 if cap.contains(',') {
292 return Err(SdkError::Config {
293 context: "worker_config".into(),
294 field: Some("capabilities".into()),
295 message: format!(
296 "capability token may not contain ',' (CSV delimiter): {cap:?}"
297 ),
298 });
299 }
300 // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
301 // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
302 // characters above 0x7F are ALLOWED so i18n caps like
303 // "东京-gpu" can be used. The CSV wire form is byte-safe for
304 // multibyte UTF-8 because `,` is always a single byte and
305 // never part of a multibyte continuation (only 0x80-0xBF are
306 // continuations, ',' is 0x2C).
307 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
308 return Err(SdkError::Config {
309 context: "worker_config".into(),
310 field: Some("capabilities".into()),
311 message: format!(
312 "capability token must not contain whitespace or control \
313 characters: {cap:?}"
314 ),
315 });
316 }
317 }
318 #[cfg(feature = "direct-valkey-claim")]
319 let worker_capabilities_csv: String = {
320 let set: std::collections::BTreeSet<&str> = config
321 .capabilities
322 .iter()
323 .map(|s| s.as_str())
324 .filter(|s| !s.is_empty())
325 .collect();
326 if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
327 return Err(SdkError::Config {
328 context: "worker_config".into(),
329 field: Some("capabilities".into()),
330 message: format!(
331 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
332 ff_core::policy::CAPS_MAX_TOKENS,
333 set.len()
334 ),
335 });
336 }
337 let csv = set.into_iter().collect::<Vec<_>>().join(",");
338 if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
339 return Err(SdkError::Config {
340 context: "worker_config".into(),
341 field: Some("capabilities".into()),
342 message: format!(
343 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
344 ff_core::policy::CAPS_MAX_BYTES,
345 csv.len()
346 ),
347 });
348 }
349 csv
350 };
351
352 // Short stable digest of the sorted caps CSV, computed once so
353 // per-mismatch logs carry a stable identifier instead of the 4KB
354 // CSV. Shared helper — ff-scheduler uses the same one for its
355 // own per-mismatch logs, so cross-component log lines are
356 // diffable against each other.
357 #[cfg(feature = "direct-valkey-claim")]
358 let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
359
360 // Full CSV logged once at connect so per-mismatch logs (which
361 // carry only the 8-hex hash) can be cross-referenced by ops.
362 #[cfg(feature = "direct-valkey-claim")]
363 if !worker_capabilities_csv.is_empty() {
364 tracing::info!(
365 worker_instance_id = %config.worker_instance_id,
366 worker_caps_hash = %worker_capabilities_hash,
367 worker_caps = %worker_capabilities_csv,
368 "worker connected with capabilities (full CSV — mismatch logs use hash only)"
369 );
370 }
371
372 // Non-authoritative advertisement of caps for operator visibility
373 // (CLI introspection, dashboards). The AUTHORITATIVE source for
374 // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
375 // that, never this string. Lossy here is correctness-safe.
376 //
377 // Storage: a single STRING key holding the sorted CSV. Rationale:
378 // * **Atomic overwrite.** `SET` is a single command — a concurrent
379 // reader can never observe a transient empty value (the prior
380 // DEL+SADD pair had that window).
381 // * **Crash cleanup without refresh loop.** The alive-key SET NX
382 // is startup-only (see §1 above); there's no periodic renew
383 // to piggy-back on, so a TTL on caps would independently
384 // expire mid-flight and hide a live worker's caps from ops
385 // tools. Instead we drop the TTL: each reconnect overwrites;
386 // a crashed worker leaves a stale CSV until a new process
387 // with the same worker_instance_id boots (which triggers
388 // `duplicate worker_instance_id` via alive-key guard anyway —
389 // the orchestrator allocates a new id, and operators can DEL
390 // the stale caps key if they care).
391 // * **Empty caps = DEL.** A restart from {gpu} to {} clears the
392 // advertisement rather than leaving stale data.
393 // Cluster-safe advertisement: the per-worker caps STRING lives at
394 // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
395 // and the INSTANCE ID is SADD'd to the global workers-index SET
396 // `ff:idx:workers` (single slot). The unblock scanner's cluster
397 // enumeration uses SMEMBERS on the index + per-member GET on each
398 // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
399 // cluster mode only scans the shard the SCAN lands on and misses
400 // workers whose key hashes elsewhere). Pattern mirrors Batch A
401 // `budget_policies_index` / `flow_index` / `deps_all_edges`:
402 // operations stay atomic per command, the index is the
403 // cluster-wide enumeration surface.
404 #[cfg(feature = "direct-valkey-claim")]
405 {
406 let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
407 let index_key = ff_core::keys::workers_index_key();
408 let instance_id = config.worker_instance_id.to_string();
409 if worker_capabilities_csv.is_empty() {
410 // No caps advertised. DEL the per-worker caps string AND
411 // SREM from the index so the scanner doesn't GET an empty
412 // string for a worker that never declares caps.
413 let _ = client
414 .cmd("DEL")
415 .arg(&caps_key)
416 .execute::<Option<i64>>()
417 .await;
418 if let Err(e) = client
419 .cmd("SREM")
420 .arg(&index_key)
421 .arg(&instance_id)
422 .execute::<Option<i64>>()
423 .await
424 {
425 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
426 "SREM workers-index failed; continuing (non-authoritative)");
427 }
428 } else {
429 // Atomic overwrite of the caps STRING (one-command). Then
430 // SADD to the index (idempotent — re-running connect for
431 // the same id is a no-op at SADD level). The per-worker
432 // caps key is written BEFORE the index SADD so that when
433 // the scanner observes the id in the index, the caps key
434 // is guaranteed to resolve to a non-stale CSV (the reverse
435 // order would leak an index entry pointing at a stale or
436 // empty caps key during a narrow window).
437 if let Err(e) = client
438 .cmd("SET")
439 .arg(&caps_key)
440 .arg(&worker_capabilities_csv)
441 .execute::<Option<String>>()
442 .await
443 {
444 tracing::warn!(error = %e, key = %caps_key,
445 "SET worker caps advertisement failed; continuing");
446 }
447 if let Err(e) = client
448 .cmd("SADD")
449 .arg(&index_key)
450 .arg(&instance_id)
451 .execute::<Option<i64>>()
452 .await
453 {
454 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
455 "SADD workers-index failed; continuing");
456 }
457 }
458 }
459
460 // RFC-012 Stage 1b: wrap the dialed client in a
461 // ValkeyBackend so `ClaimedTask`'s trait forwarders have
462 // something to call. `from_client_and_partitions` reuses the
463 // already-dialed client — no second connection.
464 // Share the concrete `Arc<ValkeyBackend>` across the two
465 // trait objects — one allocation, both accessors yield
466 // identity-equivalent handles.
467 let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
468 ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
469 client.clone(),
470 partition_config,
471 );
472 let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
473 let completion_backend_handle: Option<
474 Arc<dyn ff_core::completion_backend::CompletionBackend>,
475 > = Some(valkey_backend);
476
477 Ok(Self {
478 client,
479 config,
480 partition_config,
481 #[cfg(feature = "direct-valkey-claim")]
482 worker_capabilities_csv,
483 #[cfg(feature = "direct-valkey-claim")]
484 worker_capabilities_hash,
485 #[cfg(feature = "direct-valkey-claim")]
486 lane_index: AtomicUsize::new(0),
487 concurrency_semaphore,
488 #[cfg(feature = "direct-valkey-claim")]
489 scan_cursor: AtomicUsize::new(scan_cursor_init),
490 backend,
491 completion_backend_handle,
492 })
493 }
494
495 /// Store pre-built [`EngineBackend`] and (optional)
496 /// [`CompletionBackend`] handles on the worker. Builds the worker
497 /// via the legacy [`FlowFabricWorker::connect`] path first (so the
498 /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
499 /// paths still use is dialed), then replaces the default
500 /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
501 ///
502 /// The `completion` argument is explicit: 0.3.3 previously accepted
503 /// only `backend` and `completion_backend()` silently returned
504 /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
505 /// upcast to `Arc<dyn CompletionBackend>` without loss of
506 /// trait-object identity. 0.3.4 lets the caller decide.
507 ///
508 /// - `Some(arc)` — caller supplies a completion backend.
509 /// [`Self::completion_backend`] returns `Some(clone)`.
510 /// - `None` — this backend does not support push-based completion
511 /// (future Postgres backend without LISTEN/NOTIFY, test mocks).
512 /// [`Self::completion_backend`] returns `None`.
513 ///
514 /// When the underlying backend implements both traits (as
515 /// `ValkeyBackend` does), pass the same `Arc` twice — the two
516 /// trait-object views share one allocation:
517 ///
518 /// ```rust,ignore
519 /// use std::sync::Arc;
520 /// use ff_backend_valkey::ValkeyBackend;
521 /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
522 ///
523 /// # async fn doc(worker_config: WorkerConfig,
524 /// # backend_config: ff_backend_valkey::BackendConfig)
525 /// # -> Result<(), ff_sdk::SdkError> {
526 /// // Valkey (completion supported):
527 /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
528 /// let worker = FlowFabricWorker::connect_with(
529 /// worker_config,
530 /// valkey.clone(),
531 /// Some(valkey),
532 /// ).await?;
533 /// # Ok(()) }
534 /// ```
535 ///
536 /// Backend without completion support:
537 ///
538 /// ```rust,ignore
539 /// let worker = FlowFabricWorker::connect_with(
540 /// worker_config,
541 /// backend,
542 /// None,
543 /// ).await?;
544 /// ```
545 ///
546 /// **Stage 1b scope — what the injected backend covers today.**
547 /// After this PR, `ClaimedTask`'s 8 migrated ops
548 /// (`renew_lease` / `update_progress` / `resume_signals` /
549 /// `delay_execution` / `move_to_waiting_children` /
550 /// `complete` / `cancel` / `fail`) route through the injected
551 /// backend. That's the first material use of `connect_with`: a
552 /// mock backend now genuinely sees the worker's per-task
553 /// write-surface calls. The 4 trait-shape-deferred ops
554 /// (`create_pending_waitpoint`, `append_frame`, `suspend`,
555 /// `report_usage`) still reach the embedded
556 /// `ferriskey::Client` directly until issue #117's trait
557 /// amendment lands; `claim_next` / `claim_from_grant` /
558 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
559 /// are Stage 1c hot-path work. Stage 1d removes the embedded
560 /// client entirely.
561 ///
562 /// Today's constructor is therefore NOT yet a drop-in way to swap
563 /// in a non-Valkey backend — it requires a reachable Valkey node
564 /// for the 4 deferred + hot-path ops. Tests that exercise only
565 /// the 8 migrated ops can run fully against a mock backend.
566 ///
567 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
568 /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
569 pub async fn connect_with(
570 config: WorkerConfig,
571 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
572 completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
573 ) -> Result<Self, SdkError> {
574 let mut worker = Self::connect(config).await?;
575 worker.backend = backend;
576 worker.completion_backend_handle = completion;
577 Ok(worker)
578 }
579
580 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
581 /// ops through.
582 ///
583 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
584 /// the `Option` wrapper is retained for API stability with the
585 /// Stage-1a shape. Stage 1c narrows the return type to
586 /// `&Arc<dyn EngineBackend>`.
587 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
588 Some(&self.backend)
589 }
590
591 /// Handle to the completion-event subscription backend, for
592 /// consumers that need to observe execution completions (DAG
593 /// reconcilers, tenant-isolated subscribers).
594 ///
595 /// Returns `Some` when the worker was built through
596 /// [`Self::connect`] on the default `valkey-default` feature
597 /// (the bundled `ValkeyBackend` implements
598 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
599 /// or via [`Self::connect_with`] with a `Some(..)` completion
600 /// handle. Returns `None` when the caller passed `None` to
601 /// [`Self::connect_with`] — i.e. the backend does not support
602 /// push-based completion streams (future Postgres without
603 /// LISTEN/NOTIFY, test mocks).
604 ///
605 /// The returned handle shares the same underlying allocation as
606 /// [`Self::backend`]; calls through it (e.g.
607 /// `subscribe_completions_filtered`) hit the same connection
608 /// the worker itself uses.
609 pub fn completion_backend(
610 &self,
611 ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
612 self.completion_backend_handle.clone()
613 }
614
615 /// Get a reference to the underlying ferriskey client.
616 pub fn client(&self) -> &Client {
617 &self.client
618 }
619
620 /// Get the worker config.
621 pub fn config(&self) -> &WorkerConfig {
622 &self.config
623 }
624
625 /// Get the server-published partition config this worker bound to at
626 /// `connect()`. Callers need this when computing partition hash-tags
627 /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
628 /// with the server's `num_flow_partitions` — using
629 /// `PartitionConfig::default()` assumes 256 partitions and silently
630 /// misses data on deployments with any other value.
631 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
632 &self.partition_config
633 }
634
635 /// Attempt to claim the next eligible execution.
636 ///
637 /// Phase 1 simplified claim flow:
638 /// 1. Pick a lane (round-robin across configured lanes)
639 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
640 /// 3. Claim the execution via `ff_claim_execution`
641 /// 4. Read execution payload + tags
642 /// 5. Return a [`ClaimedTask`] with auto lease renewal
643 ///
644 /// Gated behind the `direct-valkey-claim` feature — bypasses the
645 /// scheduler's budget / quota / capability admission checks. Enable
646 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
647 /// the scheduler hop would be measurement noise (benches) or when
648 /// the test harness needs a deterministic worker-local path. Prefer
649 /// the scheduler-routed HTTP claim path in production.
650 ///
651 /// # `None` semantics
652 ///
653 /// `Ok(None)` means **no work was found in the partition window this
654 /// poll covered**, not "the cluster is idle". Each call scans a chunk
655 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
656 /// `scan_cursor`; the cursor advances by that chunk size on every
657 /// invocation, so a worker covers every partition exactly once every
658 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
659 ///
660 /// Callers should treat `None` as "poll again soon" (typically after
661 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
662 /// time". Backing off too aggressively on `None` can starve workers
663 /// when work lives on partitions outside the current window.
664 ///
665 /// Returns `Err` on Valkey errors or script failures.
666 #[cfg(feature = "direct-valkey-claim")]
667 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
668 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
669 // try_acquire returns immediately — if no permits available, the worker
670 // is at capacity and should not claim more work.
671 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
672 Ok(p) => p,
673 Err(_) => return Ok(None), // At capacity — no claim attempted
674 };
675
676 let lane_id = self.next_lane();
677 let now = TimestampMs::now();
678
679 // Phase 1: We scan eligible executions directly by reading the eligible
680 // ZSET across execution partitions. In production the scheduler
681 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
682 // simplified inline claim.
683 //
684 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
685 // partitions starting at a rolling offset. This keeps idle Valkey
686 // load at O(chunk) per worker-second instead of O(num_partitions),
687 // and the worker-instance-seeded initial cursor spreads concurrent
688 // workers across different partition windows.
689 let num_partitions = self.partition_config.num_flow_partitions as usize;
690 if num_partitions == 0 {
691 return Ok(None);
692 }
693 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
694 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
695
696 for step in 0..chunk {
697 let partition_idx = ((start + step) % num_partitions) as u16;
698 let partition = ff_core::partition::Partition {
699 family: ff_core::partition::PartitionFamily::Execution,
700 index: partition_idx,
701 };
702 let idx = IndexKeys::new(&partition);
703 let eligible_key = idx.lane_eligible(&lane_id);
704
705 // ZRANGEBYSCORE to get the highest-priority eligible execution.
706 // Score format: -(priority * 1_000_000_000_000) + created_at_ms
707 // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
708 let result: Value = self
709 .client
710 .cmd("ZRANGEBYSCORE")
711 .arg(&eligible_key)
712 .arg("-inf")
713 .arg("+inf")
714 .arg("LIMIT")
715 .arg("0")
716 .arg("1")
717 .execute()
718 .await
719 .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
720
721 let execution_id_str = match extract_first_array_string(&result) {
722 Some(s) => s,
723 None => continue, // No eligible executions on this partition
724 };
725
726 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
727 SdkError::from(ff_script::error::ScriptError::Parse {
728 fcall: "claim_execution_from_eligible_set".into(),
729 execution_id: None,
730 message: format!("bad execution_id in eligible set: {e}"),
731 })
732 })?;
733
734 // Step 1: Issue claim grant
735 let grant_result = self
736 .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
737 .await;
738
739 match grant_result {
740 Ok(()) => {}
741 Err(SdkError::Engine(ref boxed))
742 if matches!(
743 **boxed,
744 crate::EngineError::Validation {
745 kind: crate::ValidationKind::CapabilityMismatch,
746 ..
747 }
748 ) =>
749 {
750 let missing = match &**boxed {
751 crate::EngineError::Validation { detail, .. } => detail.clone(),
752 _ => unreachable!(),
753 };
754 // Block-on-mismatch (RFC-009 §7.5) — parity with
755 // ff-scheduler's Scheduler::claim_for_worker. Without
756 // this, the inline-direct-claim path would hot-loop
757 // on an unclaimable top-of-zset (every tick picks the
758 // same execution, wastes an FCALL, logs, releases,
759 // repeats). The scheduler-side unblock scanner
760 // promotes blocked_route executions back to eligible
761 // when a worker with matching caps registers.
762 tracing::info!(
763 execution_id = %execution_id,
764 worker_id = %self.config.worker_id,
765 worker_caps_hash = %self.worker_capabilities_hash,
766 missing = %missing,
767 "capability mismatch, blocking execution off eligible (SDK inline claim)"
768 );
769 self.block_route(&execution_id, &lane_id, &partition, &idx).await;
770 continue;
771 }
772 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
773 tracing::debug!(
774 execution_id = %execution_id,
775 error = %e,
776 "claim grant failed (retryable), trying next partition"
777 );
778 continue;
779 }
780 Err(e) => return Err(e),
781 }
782
783 // Step 2: Claim the execution
784 match self
785 .claim_execution(&execution_id, &lane_id, &partition, now)
786 .await
787 {
788 Ok(mut task) => {
789 // Transfer concurrency permit to the task. When the task is
790 // completed/failed/cancelled/dropped the permit returns to
791 // the semaphore, allowing another claim.
792 task.set_concurrency_permit(permit);
793 return Ok(Some(task));
794 }
795 Err(SdkError::Engine(ref boxed))
796 if matches!(
797 **boxed,
798 crate::EngineError::Contention(
799 crate::ContentionKind::UseClaimResumedExecution
800 )
801 ) =>
802 {
803 // Execution was resumed from suspension — attempt_interrupted.
804 // ff_claim_execution rejects this; use ff_claim_resumed_execution
805 // which reuses the existing attempt instead of creating a new one.
806 tracing::debug!(
807 execution_id = %execution_id,
808 "execution is resumed, using claim_resumed path"
809 );
810 match self
811 .claim_resumed_execution(&execution_id, &lane_id, &partition)
812 .await
813 {
814 Ok(mut task) => {
815 task.set_concurrency_permit(permit);
816 return Ok(Some(task));
817 }
818 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
819 tracing::debug!(
820 execution_id = %execution_id,
821 error = %e2,
822 "claim_resumed failed (retryable), trying next partition"
823 );
824 continue;
825 }
826 Err(e2) => return Err(e2),
827 }
828 }
829 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
830 tracing::debug!(
831 execution_id = %execution_id,
832 error = %e,
833 "claim execution failed (retryable), trying next partition"
834 );
835 continue;
836 }
837 Err(e) => return Err(e),
838 }
839 }
840
841 // No eligible work found on any partition
842 Ok(None)
843 }
844
845 #[cfg(feature = "direct-valkey-claim")]
846 async fn issue_claim_grant(
847 &self,
848 execution_id: &ExecutionId,
849 lane_id: &LaneId,
850 partition: &ff_core::partition::Partition,
851 idx: &IndexKeys,
852 ) -> Result<(), SdkError> {
853 let ctx = ExecKeyContext::new(partition, execution_id);
854
855 // KEYS (3): exec_core, claim_grant_key, eligible_zset
856 let keys: Vec<String> = vec![
857 ctx.core(),
858 ctx.claim_grant(),
859 idx.lane_eligible(lane_id),
860 ];
861
862 // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
863 // capability_hash, grant_ttl_ms, route_snapshot_json,
864 // admission_summary, worker_capabilities_csv (sorted)
865 let args: Vec<String> = vec![
866 execution_id.to_string(),
867 self.config.worker_id.to_string(),
868 self.config.worker_instance_id.to_string(),
869 lane_id.to_string(),
870 String::new(), // capability_hash
871 "5000".to_owned(), // grant_ttl_ms (5 seconds)
872 String::new(), // route_snapshot_json
873 String::new(), // admission_summary
874 self.worker_capabilities_csv.clone(), // sorted CSV
875 ];
876
877 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
878 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
879
880 let raw: Value = self
881 .client
882 .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
883 .await
884 .map_err(SdkError::Valkey)?;
885
886 crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
887 }
888
889 /// Move an execution from the lane's eligible ZSET into its
890 /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
891 /// after a `CapabilityMismatch` reject — without this, the inline
892 /// direct-claim path would re-pick the same top-of-zset every tick
893 /// (same pattern the scheduler's block_candidate handles). The
894 /// engine's unblock scanner periodically promotes blocked_route
895 /// back to eligible once a worker with matching caps registers.
896 ///
897 /// Best-effort: transport or logical rejects (e.g. the execution
898 /// already went terminal between pick and block) are logged and the
899 /// outer loop simply `continue`s to the next partition. Parity with
900 /// ff-scheduler::Scheduler::block_candidate.
901 #[cfg(feature = "direct-valkey-claim")]
902 async fn block_route(
903 &self,
904 execution_id: &ExecutionId,
905 lane_id: &LaneId,
906 partition: &ff_core::partition::Partition,
907 idx: &IndexKeys,
908 ) {
909 let ctx = ExecKeyContext::new(partition, execution_id);
910 let core_key = ctx.core();
911 let eligible_key = idx.lane_eligible(lane_id);
912 let blocked_key = idx.lane_blocked_route(lane_id);
913 let eid_s = execution_id.to_string();
914 let now_ms = TimestampMs::now().0.to_string();
915
916 let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
917 let argv: [&str; 4] = [
918 &eid_s,
919 "waiting_for_capable_worker",
920 "no connected worker satisfies required_capabilities",
921 &now_ms,
922 ];
923
924 match self
925 .client
926 .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
927 .await
928 {
929 Ok(v) => {
930 // Parse Lua result so a logical reject (e.g. execution
931 // went terminal mid-flight) is visible — same fix we
932 // applied to ff-scheduler's block_candidate.
933 if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
934 tracing::warn!(
935 execution_id = %execution_id,
936 error = %e,
937 "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
938 will re-evaluate"
939 );
940 }
941 }
942 Err(e) => {
943 tracing::warn!(
944 execution_id = %execution_id,
945 error = %e,
946 "SDK block_route: transport failure; eligible ZSET unchanged"
947 );
948 }
949 }
950 }
951
952 /// Low-level claim of a granted execution. Invokes
953 /// `ff_claim_execution` and returns a `ClaimedTask` with auto
954 /// lease renewal.
955 ///
956 /// Previously gated behind `direct-valkey-claim`; ungated so
957 /// the public [`claim_from_grant`] entry point can reuse the
958 /// same FCALL plumbing. The method stays private — external
959 /// callers use `claim_from_grant`.
960 ///
961 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
962 async fn claim_execution(
963 &self,
964 execution_id: &ExecutionId,
965 lane_id: &LaneId,
966 partition: &ff_core::partition::Partition,
967 _now: TimestampMs,
968 ) -> Result<ClaimedTask, SdkError> {
969 let ctx = ExecKeyContext::new(partition, execution_id);
970 let idx = IndexKeys::new(partition);
971
972 // Pre-read total_attempt_count from exec_core to derive next attempt index.
973 // The Lua uses total_attempt_count as the new index and dynamically builds
974 // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
975 // we pass the correct index for documentation/debugging.
976 let total_str: Option<String> = self.client
977 .cmd("HGET")
978 .arg(ctx.core())
979 .arg("total_attempt_count")
980 .execute()
981 .await
982 .unwrap_or(None);
983 let next_idx = total_str
984 .as_deref()
985 .and_then(|s| s.parse::<u32>().ok())
986 .unwrap_or(0);
987 let att_idx = AttemptIndex::new(next_idx);
988
989 let lease_id = LeaseId::new().to_string();
990 let attempt_id = AttemptId::new().to_string();
991 let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
992
993 // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
994 let keys: Vec<String> = vec![
995 ctx.core(), // 1 exec_core
996 ctx.claim_grant(), // 2 claim_grant
997 idx.lane_eligible(lane_id), // 3 eligible_zset
998 idx.lease_expiry(), // 4 lease_expiry_zset
999 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1000 ctx.attempt_hash(att_idx), // 6 attempt_hash (placeholder)
1001 ctx.attempt_usage(att_idx), // 7 attempt_usage (placeholder)
1002 ctx.attempt_policy(att_idx), // 8 attempt_policy (placeholder)
1003 ctx.attempts(), // 9 attempts_zset
1004 ctx.lease_current(), // 10 lease_current
1005 ctx.lease_history(), // 11 lease_history
1006 idx.lane_active(lane_id), // 12 active_index
1007 idx.attempt_timeout(), // 13 attempt_timeout_zset
1008 idx.execution_deadline(), // 14 execution_deadline_zset
1009 ];
1010
1011 // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
1012 let args: Vec<String> = vec![
1013 execution_id.to_string(), // 1 execution_id
1014 self.config.worker_id.to_string(), // 2 worker_id
1015 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1016 lane_id.to_string(), // 4 lane
1017 String::new(), // 5 capability_hash
1018 lease_id.clone(), // 6 lease_id
1019 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1020 renew_before_ms.to_string(), // 8 renew_before_ms
1021 attempt_id.clone(), // 9 attempt_id
1022 "{}".to_owned(), // 10 attempt_policy_json
1023 String::new(), // 11 attempt_timeout_ms
1024 String::new(), // 12 execution_deadline_at
1025 ];
1026
1027 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1028 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1029
1030 let raw: Value = self
1031 .client
1032 .fcall("ff_claim_execution", &key_refs, &arg_refs)
1033 .await
1034 .map_err(SdkError::Valkey)?;
1035
1036 // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
1037 // attempt_id, attempt_type, lease_expires_at}
1038 let arr = match &raw {
1039 Value::Array(arr) => arr,
1040 _ => {
1041 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1042 fcall: "ff_claim_execution".into(),
1043 execution_id: Some(execution_id.to_string()),
1044 message: "expected Array".into(),
1045 }));
1046 }
1047 };
1048
1049 let status_code = match arr.first() {
1050 Some(Ok(Value::Int(n))) => *n,
1051 _ => {
1052 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1053 fcall: "ff_claim_execution".into(),
1054 execution_id: Some(execution_id.to_string()),
1055 message: "bad status code".into(),
1056 }));
1057 }
1058 };
1059
1060 if status_code != 1 {
1061 let err_field_str = |idx: usize| -> String {
1062 arr.get(idx)
1063 .and_then(|v| match v {
1064 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1065 Ok(Value::SimpleString(s)) => Some(s.clone()),
1066 _ => None,
1067 })
1068 .unwrap_or_default()
1069 };
1070 let error_code = {
1071 let s = err_field_str(1);
1072 if s.is_empty() { "unknown".to_owned() } else { s }
1073 };
1074 let detail = err_field_str(2);
1075
1076 return Err(SdkError::from(
1077 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1078 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1079 fcall: "ff_claim_execution".into(),
1080 execution_id: Some(execution_id.to_string()),
1081 message: format!("unknown error: {error_code}"),
1082 }),
1083 ));
1084 }
1085
1086 // Extract fields from success response
1087 let field_str = |idx: usize| -> String {
1088 arr.get(idx + 2) // skip status_code and "OK"
1089 .and_then(|v| match v {
1090 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1091 Ok(Value::SimpleString(s)) => Some(s.clone()),
1092 Ok(Value::Int(n)) => Some(n.to_string()),
1093 _ => None,
1094 })
1095 .unwrap_or_default()
1096 };
1097
1098 // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1099 // Positions: 0 1 2 3 4 5
1100 let lease_id = LeaseId::parse(&field_str(0))
1101 .unwrap_or_else(|_| LeaseId::new());
1102 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1103 // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1104 let attempt_id = AttemptId::parse(&field_str(3))
1105 .unwrap_or_else(|_| AttemptId::new());
1106 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1107
1108 // Read execution payload and metadata
1109 let (input_payload, execution_kind, tags) = self
1110 .read_execution_context(execution_id, partition)
1111 .await?;
1112
1113 Ok(ClaimedTask::new(
1114 self.client.clone(),
1115 self.backend.clone(),
1116 self.partition_config,
1117 execution_id.clone(),
1118 attempt_index,
1119 attempt_id,
1120 lease_id,
1121 lease_epoch,
1122 self.config.lease_ttl_ms,
1123 lane_id.clone(),
1124 self.config.worker_instance_id.clone(),
1125 input_payload,
1126 execution_kind,
1127 tags,
1128 ))
1129 }
1130
1131 /// Consume a [`ClaimGrant`] and claim the granted execution on
1132 /// this worker. The intended production entry point: pair with
1133 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1134 /// scheduler-issued grants into the SDK without enabling the
1135 /// `direct-valkey-claim` feature (which bypasses budget/quota
1136 /// admission control).
1137 ///
1138 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1139 /// so a saturated worker does not consume the grant: the grant
1140 /// stays valid for its remaining TTL and the caller can either
1141 /// release it back to the scheduler or retry after some other
1142 /// in-flight task completes.
1143 ///
1144 /// On success the returned [`ClaimedTask`] holds a concurrency
1145 /// permit that releases automatically on
1146 /// `complete`/`fail`/`cancel`/drop — same contract as
1147 /// `claim_next`.
1148 ///
1149 /// # Arguments
1150 ///
1151 /// * `lane` — the lane the grant was issued for. Must match what
1152 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1153 /// uses it to look up `lane_eligible`, `lane_active`, and the
1154 /// `worker_leases` index slot.
1155 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1156 ///
1157 /// # Errors
1158 ///
1159 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1160 /// permits all held. Retryable; the grant is untouched.
1161 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1162 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1163 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1164 /// (wrapped in [`SdkError::Engine`]).
1165 /// * `ScriptError::CapabilityMismatch` — execution's required
1166 /// capabilities not a subset of this worker's caps (wrapped in
1167 /// [`SdkError::Engine`]). Surfaced post-grant if a race
1168 /// between grant issuance and caps change allows it.
1169 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1170 /// unexpected shape (wrapped in [`SdkError::Engine`]).
1171 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1172 /// transport error during the FCALL or the
1173 /// `read_execution_context` follow-up.
1174 ///
1175 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1176 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1177 pub async fn claim_from_grant(
1178 &self,
1179 lane: LaneId,
1180 grant: ff_core::contracts::ClaimGrant,
1181 ) -> Result<ClaimedTask, SdkError> {
1182 // Semaphore check FIRST. If the worker is saturated we must
1183 // surface the condition to the caller without touching the
1184 // grant — silently returning Ok(None) (as claim_next does)
1185 // would drop a grant the scheduler has already committed work
1186 // to issuing, wasting the slot until its TTL elapses.
1187 let permit = self
1188 .concurrency_semaphore
1189 .clone()
1190 .try_acquire_owned()
1191 .map_err(|_| SdkError::WorkerAtCapacity)?;
1192
1193 let now = TimestampMs::now();
1194 let partition = grant.partition().map_err(|e| SdkError::Config {
1195 context: "claim_from_grant".to_owned(),
1196 field: Some("partition_key".to_owned()),
1197 message: e.to_string(),
1198 })?;
1199 let mut task = self
1200 .claim_execution(&grant.execution_id, &lane, &partition, now)
1201 .await?;
1202 task.set_concurrency_permit(permit);
1203 Ok(task)
1204 }
1205
1206 /// Scheduler-routed claim: POST the server's
1207 /// `/v1/workers/{id}/claim`, then chain to
1208 /// [`Self::claim_from_grant`].
1209 ///
1210 /// Batch C item 2 PR-B. This is the production entry point —
1211 /// budget + quota + capability admission run server-side inside
1212 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1213 /// enable the `direct-valkey-claim` feature.
1214 ///
1215 /// Returns `Ok(None)` when the server says no eligible execution
1216 /// (HTTP 204). Callers typically back off by
1217 /// `config.claim_poll_interval_ms` and try again, same cadence
1218 /// as the direct-claim path's `Ok(None)`.
1219 ///
1220 /// The `admin` client is the established HTTP surface
1221 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1222 /// second reqwest client around. Build once at worker boot and
1223 /// hand in by reference on every claim.
1224 pub async fn claim_via_server(
1225 &self,
1226 admin: &crate::FlowFabricAdminClient,
1227 lane: &LaneId,
1228 grant_ttl_ms: u64,
1229 ) -> Result<Option<ClaimedTask>, SdkError> {
1230 let req = crate::admin::ClaimForWorkerRequest {
1231 worker_id: self.config.worker_id.to_string(),
1232 lane_id: lane.to_string(),
1233 worker_instance_id: self.config.worker_instance_id.to_string(),
1234 capabilities: self.config.capabilities.clone(),
1235 grant_ttl_ms,
1236 };
1237 let Some(resp) = admin.claim_for_worker(req).await? else {
1238 return Ok(None);
1239 };
1240 let grant = resp.into_grant()?;
1241 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1242 }
1243
1244 /// Consume a [`ReclaimGrant`] and transition the granted
1245 /// `attempt_interrupted` execution into a `started` state on this
1246 /// worker. Symmetric partner to [`claim_from_grant`] for the
1247 /// resume path.
1248 ///
1249 /// The grant must have been issued to THIS worker (matching
1250 /// `worker_id` at grant time). A mismatch returns
1251 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1252 /// atomically by `ff_claim_resumed_execution`; a second call with
1253 /// the same grant also returns `InvalidClaimGrant`.
1254 ///
1255 /// # Concurrency
1256 ///
1257 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1258 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1259 /// assume pre-existing capacity on this worker — a reclaim can
1260 /// land on a fresh worker instance that just came up after a
1261 /// crash/restart and is picking up a previously-interrupted
1262 /// execution. If the worker is saturated, the grant stays valid
1263 /// for its remaining TTL and the caller can release it or retry.
1264 ///
1265 /// On success the returned [`ClaimedTask`] holds a concurrency
1266 /// permit that releases automatically on
1267 /// `complete`/`fail`/`cancel`/drop.
1268 ///
1269 /// # Errors
1270 ///
1271 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1272 /// permits all held. Retryable; the grant is untouched (no
1273 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1274 /// atomically consume the grant key).
1275 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1276 /// or `worker_id` mismatch.
1277 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1278 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1279 /// `attempt_interrupted`.
1280 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1281 /// not `runnable`.
1282 /// * `ScriptError::ExecutionNotFound` — core key missing.
1283 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1284 /// transport.
1285 ///
1286 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1287 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1288 pub async fn claim_from_reclaim_grant(
1289 &self,
1290 grant: ff_core::contracts::ReclaimGrant,
1291 ) -> Result<ClaimedTask, SdkError> {
1292 // Semaphore check FIRST — same load-bearing ordering as
1293 // `claim_from_grant`. If the worker is saturated, surface
1294 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1295 // atomic consume on the grant key, so calling it past-
1296 // saturation would destroy the grant while leaving no
1297 // permit to attach to the returned `ClaimedTask`.
1298 let permit = self
1299 .concurrency_semaphore
1300 .clone()
1301 .try_acquire_owned()
1302 .map_err(|_| SdkError::WorkerAtCapacity)?;
1303
1304 // Grant carries partition + lane_id so no round-trip is needed
1305 // to resolve them before the FCALL.
1306 let partition = grant.partition().map_err(|e| SdkError::Config {
1307 context: "claim_from_reclaim_grant".to_owned(),
1308 field: Some("partition_key".to_owned()),
1309 message: e.to_string(),
1310 })?;
1311 let mut task = self
1312 .claim_resumed_execution(
1313 &grant.execution_id,
1314 &grant.lane_id,
1315 &partition,
1316 )
1317 .await?;
1318 task.set_concurrency_permit(permit);
1319 Ok(task)
1320 }
1321
1322 /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1323 /// and returns a `ClaimedTask` bound to the resumed attempt.
1324 ///
1325 /// Previously gated behind `direct-valkey-claim`; ungated so the
1326 /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1327 /// The method stays private — external callers use
1328 /// `claim_from_reclaim_grant`.
1329 ///
1330 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1331 async fn claim_resumed_execution(
1332 &self,
1333 execution_id: &ExecutionId,
1334 lane_id: &LaneId,
1335 partition: &ff_core::partition::Partition,
1336 ) -> Result<ClaimedTask, SdkError> {
1337 let ctx = ExecKeyContext::new(partition, execution_id);
1338 let idx = IndexKeys::new(partition);
1339
1340 // Pre-read current_attempt_index for the existing attempt hash key.
1341 // This is load-bearing: KEYS[6] must point to the real attempt hash.
1342 let att_idx_str: Option<String> = self.client
1343 .cmd("HGET")
1344 .arg(ctx.core())
1345 .arg("current_attempt_index")
1346 .execute()
1347 .await
1348 .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1349 let att_idx = AttemptIndex::new(
1350 att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1351 );
1352
1353 let lease_id = LeaseId::new().to_string();
1354
1355 // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1356 let keys: Vec<String> = vec![
1357 ctx.core(), // 1 exec_core
1358 ctx.claim_grant(), // 2 claim_grant
1359 idx.lane_eligible(lane_id), // 3 eligible_zset
1360 idx.lease_expiry(), // 4 lease_expiry_zset
1361 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1362 ctx.attempt_hash(att_idx), // 6 existing_attempt_hash
1363 ctx.lease_current(), // 7 lease_current
1364 ctx.lease_history(), // 8 lease_history
1365 idx.lane_active(lane_id), // 9 active_index
1366 idx.attempt_timeout(), // 10 attempt_timeout_zset
1367 idx.execution_deadline(), // 11 execution_deadline_zset
1368 ];
1369
1370 // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1371 let args: Vec<String> = vec![
1372 execution_id.to_string(), // 1 execution_id
1373 self.config.worker_id.to_string(), // 2 worker_id
1374 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1375 lane_id.to_string(), // 4 lane
1376 String::new(), // 5 capability_hash
1377 lease_id.clone(), // 6 lease_id
1378 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1379 String::new(), // 8 remaining_attempt_timeout_ms
1380 ];
1381
1382 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1383 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1384
1385 let raw: Value = self
1386 .client
1387 .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1388 .await
1389 .map_err(SdkError::Valkey)?;
1390
1391 // Parse result — same format as ff_claim_execution:
1392 // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1393 let arr = match &raw {
1394 Value::Array(arr) => arr,
1395 _ => {
1396 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1397 fcall: "ff_claim_resumed_execution".into(),
1398 execution_id: Some(execution_id.to_string()),
1399 message: "expected Array".into(),
1400 }));
1401 }
1402 };
1403
1404 let status_code = match arr.first() {
1405 Some(Ok(Value::Int(n))) => *n,
1406 _ => {
1407 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1408 fcall: "ff_claim_resumed_execution".into(),
1409 execution_id: Some(execution_id.to_string()),
1410 message: "bad status code".into(),
1411 }));
1412 }
1413 };
1414
1415 if status_code != 1 {
1416 let err_field_str = |idx: usize| -> String {
1417 arr.get(idx)
1418 .and_then(|v| match v {
1419 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1420 Ok(Value::SimpleString(s)) => Some(s.clone()),
1421 _ => None,
1422 })
1423 .unwrap_or_default()
1424 };
1425 let error_code = {
1426 let s = err_field_str(1);
1427 if s.is_empty() { "unknown".to_owned() } else { s }
1428 };
1429 let detail = err_field_str(2);
1430
1431 return Err(SdkError::from(
1432 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1433 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1434 fcall: "ff_claim_resumed_execution".into(),
1435 execution_id: Some(execution_id.to_string()),
1436 message: format!("unknown error: {error_code}"),
1437 }),
1438 ));
1439 }
1440
1441 let field_str = |idx: usize| -> String {
1442 arr.get(idx + 2)
1443 .and_then(|v| match v {
1444 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1445 Ok(Value::SimpleString(s)) => Some(s.clone()),
1446 Ok(Value::Int(n)) => Some(n.to_string()),
1447 _ => None,
1448 })
1449 .unwrap_or_default()
1450 };
1451
1452 let lease_id = LeaseId::parse(&field_str(0))
1453 .unwrap_or_else(|_| LeaseId::new());
1454 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1455 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1456 let attempt_id = AttemptId::parse(&field_str(3))
1457 .unwrap_or_else(|_| AttemptId::new());
1458
1459 let (input_payload, execution_kind, tags) = self
1460 .read_execution_context(execution_id, partition)
1461 .await?;
1462
1463 Ok(ClaimedTask::new(
1464 self.client.clone(),
1465 self.backend.clone(),
1466 self.partition_config,
1467 execution_id.clone(),
1468 attempt_index,
1469 attempt_id,
1470 lease_id,
1471 lease_epoch,
1472 self.config.lease_ttl_ms,
1473 lane_id.clone(),
1474 self.config.worker_instance_id.clone(),
1475 input_payload,
1476 execution_kind,
1477 tags,
1478 ))
1479 }
1480
1481 /// Read payload + execution_kind + tags from exec_core. Previously
1482 /// gated behind `direct-valkey-claim`; now shared by the
1483 /// feature-gated inline claim path and the public
1484 /// `claim_from_reclaim_grant` entry point.
1485 async fn read_execution_context(
1486 &self,
1487 execution_id: &ExecutionId,
1488 partition: &ff_core::partition::Partition,
1489 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1490 let ctx = ExecKeyContext::new(partition, execution_id);
1491
1492 // Read payload
1493 let payload: Option<String> = self
1494 .client
1495 .get(&ctx.payload())
1496 .await
1497 .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1498 let input_payload = payload.unwrap_or_default().into_bytes();
1499
1500 // Read execution_kind from core
1501 let kind: Option<String> = self
1502 .client
1503 .hget(&ctx.core(), "execution_kind")
1504 .await
1505 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1506 let execution_kind = kind.unwrap_or_default();
1507
1508 // Read tags
1509 let tags: HashMap<String, String> = self
1510 .client
1511 .hgetall(&ctx.tags())
1512 .await
1513 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1514
1515 Ok((input_payload, execution_kind, tags))
1516 }
1517
1518 // ── Phase 3: Signal delivery ──
1519
1520 /// Deliver a signal to a suspended execution's waitpoint.
1521 ///
1522 /// The engine atomically records the signal, evaluates the resume condition,
1523 /// and optionally transitions the execution from `suspended` to `runnable`.
1524 pub async fn deliver_signal(
1525 &self,
1526 execution_id: &ExecutionId,
1527 waitpoint_id: &WaitpointId,
1528 signal: crate::task::Signal,
1529 ) -> Result<crate::task::SignalOutcome, SdkError> {
1530 let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1531 let ctx = ExecKeyContext::new(&partition, execution_id);
1532 let idx = IndexKeys::new(&partition);
1533
1534 let signal_id = ff_core::types::SignalId::new();
1535 let now = TimestampMs::now();
1536
1537 // Pre-read lane_id from exec_core — the execution may be on any lane,
1538 // not necessarily one of this worker's configured lanes.
1539 let lane_str: Option<String> = self
1540 .client
1541 .hget(&ctx.core(), "lane_id")
1542 .await
1543 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1544 let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1545
1546 // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1547 // exec_signals_zset, signal_hash, signal_payload,
1548 // idem_key, waitpoint_hash, suspension_current,
1549 // eligible_zset, suspended_zset, delayed_zset,
1550 // suspension_timeout_zset, hmac_secrets
1551 let idem_key = if let Some(ref ik) = signal.idempotency_key {
1552 ctx.signal_dedup(waitpoint_id, ik)
1553 } else {
1554 ctx.noop() // must share {p:N} hash tag for cluster mode
1555 };
1556 let keys: Vec<String> = vec![
1557 ctx.core(), // 1
1558 ctx.waitpoint_condition(waitpoint_id), // 2
1559 ctx.waitpoint_signals(waitpoint_id), // 3
1560 ctx.exec_signals(), // 4
1561 ctx.signal(&signal_id), // 5
1562 ctx.signal_payload(&signal_id), // 6
1563 idem_key, // 7
1564 ctx.waitpoint(waitpoint_id), // 8
1565 ctx.suspension_current(), // 9
1566 idx.lane_eligible(&lane_id), // 10
1567 idx.lane_suspended(&lane_id), // 11
1568 idx.lane_delayed(&lane_id), // 12
1569 idx.suspension_timeout(), // 13
1570 idx.waitpoint_hmac_secrets(), // 14
1571 ];
1572
1573 let payload_str = signal
1574 .payload
1575 .as_ref()
1576 .map(|p| String::from_utf8_lossy(p).into_owned())
1577 .unwrap_or_default();
1578
1579 // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1580 // signal_category, source_type, source_identity,
1581 // payload, payload_encoding, idempotency_key,
1582 // correlation_id, target_scope, created_at,
1583 // dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1584 // max_signals_per_execution, waitpoint_token
1585 let args: Vec<String> = vec![
1586 signal_id.to_string(), // 1
1587 execution_id.to_string(), // 2
1588 waitpoint_id.to_string(), // 3
1589 signal.signal_name, // 4
1590 signal.signal_category, // 5
1591 signal.source_type, // 6
1592 signal.source_identity, // 7
1593 payload_str, // 8
1594 "json".to_owned(), // 9 payload_encoding
1595 signal.idempotency_key.unwrap_or_default(), // 10
1596 String::new(), // 11 correlation_id
1597 "waitpoint".to_owned(), // 12 target_scope
1598 now.to_string(), // 13 created_at
1599 "86400000".to_owned(), // 14 dedup_ttl_ms
1600 "0".to_owned(), // 15 resume_delay_ms
1601 "1000".to_owned(), // 16 signal_maxlen
1602 "10000".to_owned(), // 17 max_signals
1603 // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1604 // use ToString/Display (those are redacted for log safety);
1605 // .as_str() is the explicit opt-in that gets the secret bytes.
1606 signal.waitpoint_token.as_str().to_owned(), // 18 waitpoint_token
1607 ];
1608
1609 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1610 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1611
1612 let raw: Value = self
1613 .client
1614 .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1615 .await
1616 .map_err(SdkError::Valkey)?;
1617
1618 crate::task::parse_signal_result(&raw)
1619 }
1620
1621 #[cfg(feature = "direct-valkey-claim")]
1622 fn next_lane(&self) -> LaneId {
1623 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1624 self.config.lanes[idx].clone()
1625 }
1626}
1627
1628#[cfg(feature = "direct-valkey-claim")]
1629fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1630 use ff_core::error::ErrorClass;
1631 matches!(
1632 ff_script::engine_error_ext::class(err),
1633 ErrorClass::Retryable | ErrorClass::Informational
1634 )
1635}
1636
1637/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1638/// instance id with FNV-1a to place distinct worker processes on different
1639/// partition windows from their first poll. Zero is valid for single-worker
1640/// clusters but spreads work in multi-worker deployments.
1641#[cfg(feature = "direct-valkey-claim")]
1642fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1643 if num_partitions == 0 {
1644 return 0;
1645 }
1646 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1647}
1648
1649#[cfg(feature = "direct-valkey-claim")]
1650fn extract_first_array_string(value: &Value) -> Option<String> {
1651 match value {
1652 Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1653 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1654 Ok(Value::SimpleString(s)) => Some(s.clone()),
1655 _ => None,
1656 },
1657 _ => None,
1658 }
1659}
1660
1661/// Read partition config from Valkey's `ff:config:partitions` hash.
1662/// Returns Err if the key doesn't exist or can't be read.
1663async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1664 let key = ff_core::keys::global_config_partitions();
1665 let fields: HashMap<String, String> = client
1666 .hgetall(&key)
1667 .await
1668 .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1669
1670 if fields.is_empty() {
1671 return Err(SdkError::Config {
1672 context: "read_partition_config".into(),
1673 field: None,
1674 message: "ff:config:partitions not found in Valkey".into(),
1675 });
1676 }
1677
1678 let parse = |field: &str, default: u16| -> u16 {
1679 fields
1680 .get(field)
1681 .and_then(|v| v.parse().ok())
1682 .filter(|&n: &u16| n > 0)
1683 .unwrap_or(default)
1684 };
1685
1686 Ok(PartitionConfig {
1687 num_flow_partitions: parse("num_flow_partitions", 256),
1688 num_budget_partitions: parse("num_budget_partitions", 32),
1689 num_quota_partitions: parse("num_quota_partitions", 32),
1690 })
1691}
1692
1693#[cfg(test)]
1694mod completion_accessor_type_tests {
1695 //! Type-level compile check that
1696 //! [`FlowFabricWorker::completion_backend`] returns an
1697 //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1698 //! the assertion is at the function-pointer type level and the
1699 //! #[test] body exists solely so the compiler elaborates it.
1700 use super::FlowFabricWorker;
1701 use ff_core::completion_backend::CompletionBackend;
1702 use std::sync::Arc;
1703
1704 #[test]
1705 fn completion_backend_accessor_signature() {
1706 // If this line compiles, the public accessor returns the
1707 // advertised type. The function is never called (no live
1708 // worker), so no I/O happens.
1709 let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1710 FlowFabricWorker::completion_backend;
1711 }
1712}
1713
1714#[cfg(all(test, feature = "direct-valkey-claim"))]
1715mod scan_cursor_tests {
1716 use super::scan_cursor_seed;
1717
1718 #[test]
1719 fn stable_for_same_input() {
1720 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1721 }
1722
1723 #[test]
1724 fn distinct_for_different_ids() {
1725 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1726 }
1727
1728 #[test]
1729 fn bounded_by_partition_count() {
1730 for i in 0..100 {
1731 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1732 }
1733 }
1734
1735 #[test]
1736 fn zero_partitions_returns_zero() {
1737 assert_eq!(scan_cursor_seed("w1", 0), 0);
1738 }
1739}