ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6use ferriskey::{Client, Value};
7use ff_core::keys::{ExecKeyContext, IndexKeys};
8use ff_core::partition::PartitionConfig;
9use ff_core::types::*;
10use tokio::sync::Semaphore;
11
12use crate::config::WorkerConfig;
13use crate::task::ClaimedTask;
14use crate::SdkError;
15
16/// FlowFabric worker — connects to Valkey, claims executions, and provides
17/// the worker-facing API.
18///
19/// # Admission control
20///
21/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
22/// **bypasses the scheduler's admission controls**: it reads the eligible
23/// ZSET directly and mints its own claim grant without consulting budget
24/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
25/// benchmarks, tests, and single-tenant development where the scheduler
26/// hop is measurement noise, not for production.
27///
28/// For production deployments, consume scheduler-issued grants via
29/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
30/// budget breach, quota sliding-window, concurrency cap, and
31/// capability-match checks before issuing grants.
32///
33/// # Usage
34///
35/// ```rust,ignore
36/// use ff_core::backend::BackendConfig;
37/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
38/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
39///
40/// let config = WorkerConfig {
41/// backend: BackendConfig::valkey("localhost", 6379),
42/// worker_id: WorkerId::new("w1"),
43/// worker_instance_id: WorkerInstanceId::new("w1-i1"),
44/// namespace: Namespace::new("default"),
45/// lanes: vec![LaneId::new("main")],
46/// capabilities: Vec::new(),
47/// lease_ttl_ms: 30_000,
48/// claim_poll_interval_ms: 1_000,
49/// max_concurrent_tasks: 1,
50/// };
51/// let worker = FlowFabricWorker::connect(config).await?;
52///
53/// loop {
54/// if let Some(task) = worker.claim_next().await? {
55/// // Process task...
56/// task.complete(Some(b"result".to_vec())).await?;
57/// } else {
58/// tokio::time::sleep(Duration::from_secs(1)).await;
59/// }
60/// }
61/// ```
62pub struct FlowFabricWorker {
63 client: Client,
64 config: WorkerConfig,
65 partition_config: PartitionConfig,
66 /// Sorted, deduplicated, comma-separated capabilities — computed once
67 /// from `config.capabilities` at connect time. Passed as ARGV[9] to
68 /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
69 /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
70 /// reproducible logs and tests.
71 #[cfg(feature = "direct-valkey-claim")]
72 worker_capabilities_csv: String,
73 /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
74 /// per-mismatch logs so the 4KB CSV never echoes on every reject
75 /// during an incident. Full CSV logged once at connect-time WARN for
76 /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
77 #[cfg(feature = "direct-valkey-claim")]
78 worker_capabilities_hash: String,
79 #[cfg(feature = "direct-valkey-claim")]
80 lane_index: AtomicUsize,
81 /// Concurrency cap for in-flight tasks. Permits are acquired or
82 /// transferred by [`claim_next`] (feature-gated),
83 /// [`claim_from_grant`] (always available), and
84 /// [`claim_from_reclaim_grant`], transferred to the returned
85 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
86 /// Holds `max_concurrent_tasks` permits total.
87 ///
88 /// [`claim_next`]: FlowFabricWorker::claim_next
89 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
90 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
91 concurrency_semaphore: Arc<Semaphore>,
92 /// Rolling offset for chunked partition scans. Each poll advances the
93 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
94 /// chunk)` polls every partition is covered. The initial value is
95 /// derived from `worker_instance_id` so idle workers spread their
96 /// scans across different partitions from the first poll onward.
97 ///
98 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
99 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
100 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
101 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
102 /// correctness because the sequence simply restarts a new cycle.
103 #[cfg(feature = "direct-valkey-claim")]
104 scan_cursor: AtomicUsize,
105 /// The [`EngineBackend`] the Stage-1b trait forwarders route
106 /// through.
107 ///
108 /// **RFC-012 Stage 1b.** Always populated:
109 /// [`FlowFabricWorker::connect`] now wraps the worker's own
110 /// `ferriskey::Client` in a `ValkeyBackend` via
111 /// `ValkeyBackend::from_client_and_partitions`, and
112 /// [`FlowFabricWorker::connect_with`] replaces that default with
113 /// the caller-supplied `Arc<dyn EngineBackend>`. The
114 /// [`FlowFabricWorker::backend`] accessor still returns
115 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
116 /// narrows the return type once consumers have migrated.
117 ///
118 /// Hot paths (claim, deliver_signal, admin queries) still use the
119 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
120 /// migrates them through this field, and Stage 1d removes the
121 /// embedded client.
122 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
123 /// Optional handle to the same underlying backend viewed as a
124 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
125 /// Populated by [`Self::connect`] from the bundled
126 /// `ValkeyBackend` (which implements the trait); supplied by the
127 /// caller on [`Self::connect_with`] as an explicit
128 /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
129 /// backend does not support push-based completion" (e.g. a future
130 /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
131 /// and other completion-subscription consumers reach this through
132 /// [`Self::completion_backend`].
133 completion_backend_handle:
134 Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
135}
136
137/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
138/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
139/// O(num_flow_partitions).
140#[cfg(feature = "direct-valkey-claim")]
141const PARTITION_SCAN_CHUNK: usize = 32;
142
143impl FlowFabricWorker {
144 /// Connect to Valkey and prepare the worker.
145 ///
146 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
147 /// library — that is the server's responsibility (ff-server calls
148 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
149 /// the library is already loaded.
150 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
151 if config.lanes.is_empty() {
152 return Err(SdkError::Config {
153 context: "worker_config".into(),
154 field: None,
155 message: "at least one lane is required".into(),
156 });
157 }
158
159 // Build the ferriskey client from the nested `BackendConfig`.
160 // Delegates to `ff_backend_valkey::build_client` so host/port +
161 // TLS + cluster + `BackendTimeouts::request` +
162 // `BackendRetry` wiring lives in exactly one place (pre-Stage
163 // 1c this path had its own `ClientBuilder` chain that diverged
164 // from the backend's shape; RFC-012 Stage 1c tranche 1
165 // consolidates).
166 let client = ff_backend_valkey::build_client(&config.backend).await?;
167
168 // Verify connectivity
169 let pong: String = client
170 .cmd("PING")
171 .execute()
172 .await
173 .map_err(|e| crate::backend_context(e, "PING failed"))?;
174 if pong != "PONG" {
175 return Err(SdkError::Config {
176 context: "worker_connect".into(),
177 field: None,
178 message: format!("unexpected PING response: {pong}"),
179 });
180 }
181
182 // Guard against two worker processes sharing the same
183 // `worker_instance_id`. A duplicate instance would clobber each
184 // other's lease_current/active_index entries and double-claim work.
185 // SET NX on a liveness key with 2× lease TTL; if the key already
186 // exists another process is live. The key auto-expires if this
187 // process crashes without renewal, so a restart after a hard crash
188 // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
189 //
190 // Known limitations of this minimal scheme (documented for operators):
191 // 1. **Startup-only, not runtime.** There is no heartbeat renewal
192 // path. After `2 × lease_ttl_ms` elapses the alive key expires
193 // naturally even while this worker is still running, and a
194 // second process with the same `worker_instance_id` launched
195 // later will successfully SET NX alongside the first. The check
196 // catches misconfiguration at boot; it does not fence duplicates
197 // that appear mid-lifetime. Production deployments should rely
198 // on the orchestrator (Kubernetes, systemd unit with
199 // `Restart=on-failure`, etc.) as the authoritative single-
200 // instance enforcer; this SET NX is belt-and-suspenders.
201 //
202 // 2. **Restart delay after a crash.** If a worker crashes
203 // ungracefully (SIGKILL, container OOM) and is restarted within
204 // `2 × lease_ttl_ms`, the alive key is still present and the
205 // new process exits with `SdkError::Config("duplicate
206 // worker_instance_id ...")`. Options for operators:
207 // - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
208 // before restarting.
209 // - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
210 // unblock the restart.
211 // - Use a fresh `worker_instance_id` for the restart (the
212 // orchestrator should already do this per-Pod).
213 //
214 // 3. **No graceful cleanup on shutdown.** There is no explicit
215 // `disconnect()` call that DELs the alive key. On clean
216 // `SIGTERM` the key lingers until its TTL expires. A follow-up
217 // can add `FlowFabricWorker::disconnect(self)` for callers that
218 // want to skip the restart-delay window.
219 let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
220 let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
221 let set_result: Option<String> = client
222 .cmd("SET")
223 .arg(&alive_key)
224 .arg("1")
225 .arg("NX")
226 .arg("PX")
227 .arg(alive_ttl_ms.to_string().as_str())
228 .execute()
229 .await
230 .map_err(|e| crate::backend_context(e, "SET NX worker alive key"))?;
231 if set_result.is_none() {
232 return Err(SdkError::Config {
233 context: "worker_connect".into(),
234 field: Some("worker_instance_id".into()),
235 message: format!(
236 "duplicate worker_instance_id '{}': another process already holds {alive_key}",
237 config.worker_instance_id
238 ),
239 });
240 }
241
242 // Read partition config from Valkey (set by ff-server on startup).
243 // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
244 let partition_config = read_partition_config(&client).await
245 .unwrap_or_else(|e| {
246 tracing::warn!(
247 error = %e,
248 "ff:config:partitions not found, using defaults"
249 );
250 PartitionConfig::default()
251 });
252
253 let max_tasks = config.max_concurrent_tasks.max(1);
254 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
255
256 tracing::info!(
257 worker_id = %config.worker_id,
258 instance_id = %config.worker_instance_id,
259 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
260 "FlowFabricWorker connected"
261 );
262
263 #[cfg(feature = "direct-valkey-claim")]
264 let scan_cursor_init = scan_cursor_seed(
265 config.worker_instance_id.as_str(),
266 partition_config.num_flow_partitions.max(1) as usize,
267 );
268
269 // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
270 // and deduplicates in one pass; string joining happens once here.
271 //
272 // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
273 // - `,` is the CSV delimiter; a token containing one would split
274 // mid-parse and could let a {"gpu"} worker appear to satisfy
275 // {"gpu,cuda"} (silent auth bypass).
276 // - Empty strings would produce leading/adjacent commas on the
277 // wire and inflate token count for no semantic reason.
278 // - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
279 // `"gpu\n"` vs `"gpu"` produce silent mismatches that are
280 // miserable to debug. Reject anything outside printable ASCII
281 // excluding space (`'!'..='~'`) at ingress so a typo fails
282 // loudly at connect instead of silently mis-routing forever.
283 // Reject at boot so operator misconfig is loud, symmetric with the
284 // scheduler path.
285 #[cfg(feature = "direct-valkey-claim")]
286 for cap in &config.capabilities {
287 if cap.is_empty() {
288 return Err(SdkError::Config {
289 context: "worker_config".into(),
290 field: Some("capabilities".into()),
291 message: "capability token must not be empty".into(),
292 });
293 }
294 if cap.contains(',') {
295 return Err(SdkError::Config {
296 context: "worker_config".into(),
297 field: Some("capabilities".into()),
298 message: format!(
299 "capability token may not contain ',' (CSV delimiter): {cap:?}"
300 ),
301 });
302 }
303 // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
304 // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
305 // characters above 0x7F are ALLOWED so i18n caps like
306 // "东京-gpu" can be used. The CSV wire form is byte-safe for
307 // multibyte UTF-8 because `,` is always a single byte and
308 // never part of a multibyte continuation (only 0x80-0xBF are
309 // continuations, ',' is 0x2C).
310 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
311 return Err(SdkError::Config {
312 context: "worker_config".into(),
313 field: Some("capabilities".into()),
314 message: format!(
315 "capability token must not contain whitespace or control \
316 characters: {cap:?}"
317 ),
318 });
319 }
320 }
321 #[cfg(feature = "direct-valkey-claim")]
322 let worker_capabilities_csv: String = {
323 let set: std::collections::BTreeSet<&str> = config
324 .capabilities
325 .iter()
326 .map(|s| s.as_str())
327 .filter(|s| !s.is_empty())
328 .collect();
329 if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
330 return Err(SdkError::Config {
331 context: "worker_config".into(),
332 field: Some("capabilities".into()),
333 message: format!(
334 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
335 ff_core::policy::CAPS_MAX_TOKENS,
336 set.len()
337 ),
338 });
339 }
340 let csv = set.into_iter().collect::<Vec<_>>().join(",");
341 if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
342 return Err(SdkError::Config {
343 context: "worker_config".into(),
344 field: Some("capabilities".into()),
345 message: format!(
346 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
347 ff_core::policy::CAPS_MAX_BYTES,
348 csv.len()
349 ),
350 });
351 }
352 csv
353 };
354
355 // Short stable digest of the sorted caps CSV, computed once so
356 // per-mismatch logs carry a stable identifier instead of the 4KB
357 // CSV. Shared helper — ff-scheduler uses the same one for its
358 // own per-mismatch logs, so cross-component log lines are
359 // diffable against each other.
360 #[cfg(feature = "direct-valkey-claim")]
361 let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
362
363 // Full CSV logged once at connect so per-mismatch logs (which
364 // carry only the 8-hex hash) can be cross-referenced by ops.
365 #[cfg(feature = "direct-valkey-claim")]
366 if !worker_capabilities_csv.is_empty() {
367 tracing::info!(
368 worker_instance_id = %config.worker_instance_id,
369 worker_caps_hash = %worker_capabilities_hash,
370 worker_caps = %worker_capabilities_csv,
371 "worker connected with capabilities (full CSV — mismatch logs use hash only)"
372 );
373 }
374
375 // Non-authoritative advertisement of caps for operator visibility
376 // (CLI introspection, dashboards). The AUTHORITATIVE source for
377 // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
378 // that, never this string. Lossy here is correctness-safe.
379 //
380 // Storage: a single STRING key holding the sorted CSV. Rationale:
381 // * **Atomic overwrite.** `SET` is a single command — a concurrent
382 // reader can never observe a transient empty value (the prior
383 // DEL+SADD pair had that window).
384 // * **Crash cleanup without refresh loop.** The alive-key SET NX
385 // is startup-only (see §1 above); there's no periodic renew
386 // to piggy-back on, so a TTL on caps would independently
387 // expire mid-flight and hide a live worker's caps from ops
388 // tools. Instead we drop the TTL: each reconnect overwrites;
389 // a crashed worker leaves a stale CSV until a new process
390 // with the same worker_instance_id boots (which triggers
391 // `duplicate worker_instance_id` via alive-key guard anyway —
392 // the orchestrator allocates a new id, and operators can DEL
393 // the stale caps key if they care).
394 // * **Empty caps = DEL.** A restart from {gpu} to {} clears the
395 // advertisement rather than leaving stale data.
396 // Cluster-safe advertisement: the per-worker caps STRING lives at
397 // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
398 // and the INSTANCE ID is SADD'd to the global workers-index SET
399 // `ff:idx:workers` (single slot). The unblock scanner's cluster
400 // enumeration uses SMEMBERS on the index + per-member GET on each
401 // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
402 // cluster mode only scans the shard the SCAN lands on and misses
403 // workers whose key hashes elsewhere). Pattern mirrors Batch A
404 // `budget_policies_index` / `flow_index` / `deps_all_edges`:
405 // operations stay atomic per command, the index is the
406 // cluster-wide enumeration surface.
407 #[cfg(feature = "direct-valkey-claim")]
408 {
409 let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
410 let index_key = ff_core::keys::workers_index_key();
411 let instance_id = config.worker_instance_id.to_string();
412 if worker_capabilities_csv.is_empty() {
413 // No caps advertised. DEL the per-worker caps string AND
414 // SREM from the index so the scanner doesn't GET an empty
415 // string for a worker that never declares caps.
416 let _ = client
417 .cmd("DEL")
418 .arg(&caps_key)
419 .execute::<Option<i64>>()
420 .await;
421 if let Err(e) = client
422 .cmd("SREM")
423 .arg(&index_key)
424 .arg(&instance_id)
425 .execute::<Option<i64>>()
426 .await
427 {
428 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
429 "SREM workers-index failed; continuing (non-authoritative)");
430 }
431 } else {
432 // Atomic overwrite of the caps STRING (one-command). Then
433 // SADD to the index (idempotent — re-running connect for
434 // the same id is a no-op at SADD level). The per-worker
435 // caps key is written BEFORE the index SADD so that when
436 // the scanner observes the id in the index, the caps key
437 // is guaranteed to resolve to a non-stale CSV (the reverse
438 // order would leak an index entry pointing at a stale or
439 // empty caps key during a narrow window).
440 if let Err(e) = client
441 .cmd("SET")
442 .arg(&caps_key)
443 .arg(&worker_capabilities_csv)
444 .execute::<Option<String>>()
445 .await
446 {
447 tracing::warn!(error = %e, key = %caps_key,
448 "SET worker caps advertisement failed; continuing");
449 }
450 if let Err(e) = client
451 .cmd("SADD")
452 .arg(&index_key)
453 .arg(&instance_id)
454 .execute::<Option<i64>>()
455 .await
456 {
457 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
458 "SADD workers-index failed; continuing");
459 }
460 }
461 }
462
463 // RFC-012 Stage 1b: wrap the dialed client in a
464 // ValkeyBackend so `ClaimedTask`'s trait forwarders have
465 // something to call. `from_client_and_partitions` reuses the
466 // already-dialed client — no second connection.
467 // Share the concrete `Arc<ValkeyBackend>` across the two
468 // trait objects — one allocation, both accessors yield
469 // identity-equivalent handles.
470 let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
471 ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
472 client.clone(),
473 partition_config,
474 );
475 let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
476 let completion_backend_handle: Option<
477 Arc<dyn ff_core::completion_backend::CompletionBackend>,
478 > = Some(valkey_backend);
479
480 Ok(Self {
481 client,
482 config,
483 partition_config,
484 #[cfg(feature = "direct-valkey-claim")]
485 worker_capabilities_csv,
486 #[cfg(feature = "direct-valkey-claim")]
487 worker_capabilities_hash,
488 #[cfg(feature = "direct-valkey-claim")]
489 lane_index: AtomicUsize::new(0),
490 concurrency_semaphore,
491 #[cfg(feature = "direct-valkey-claim")]
492 scan_cursor: AtomicUsize::new(scan_cursor_init),
493 backend,
494 completion_backend_handle,
495 })
496 }
497
498 /// Store pre-built [`EngineBackend`] and (optional)
499 /// [`CompletionBackend`] handles on the worker. Builds the worker
500 /// via the legacy [`FlowFabricWorker::connect`] path first (so the
501 /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
502 /// paths still use is dialed), then replaces the default
503 /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
504 ///
505 /// The `completion` argument is explicit: 0.3.3 previously accepted
506 /// only `backend` and `completion_backend()` silently returned
507 /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
508 /// upcast to `Arc<dyn CompletionBackend>` without loss of
509 /// trait-object identity. 0.3.4 lets the caller decide.
510 ///
511 /// - `Some(arc)` — caller supplies a completion backend.
512 /// [`Self::completion_backend`] returns `Some(clone)`.
513 /// - `None` — this backend does not support push-based completion
514 /// (future Postgres backend without LISTEN/NOTIFY, test mocks).
515 /// [`Self::completion_backend`] returns `None`.
516 ///
517 /// When the underlying backend implements both traits (as
518 /// `ValkeyBackend` does), pass the same `Arc` twice — the two
519 /// trait-object views share one allocation:
520 ///
521 /// ```rust,ignore
522 /// use std::sync::Arc;
523 /// use ff_backend_valkey::ValkeyBackend;
524 /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
525 ///
526 /// # async fn doc(worker_config: WorkerConfig,
527 /// # backend_config: ff_backend_valkey::BackendConfig)
528 /// # -> Result<(), ff_sdk::SdkError> {
529 /// // Valkey (completion supported):
530 /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
531 /// let worker = FlowFabricWorker::connect_with(
532 /// worker_config,
533 /// valkey.clone(),
534 /// Some(valkey),
535 /// ).await?;
536 /// # Ok(()) }
537 /// ```
538 ///
539 /// Backend without completion support:
540 ///
541 /// ```rust,ignore
542 /// let worker = FlowFabricWorker::connect_with(
543 /// worker_config,
544 /// backend,
545 /// None,
546 /// ).await?;
547 /// ```
548 ///
549 /// **Stage 1b + Round-7 scope — what the injected backend covers
550 /// today.** The injected backend currently covers these per-task
551 /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
552 /// `delay_execution` / `move_to_waiting_children` / `complete` /
553 /// `cancel` / `fail` / `create_pending_waitpoint` /
554 /// `append_frame` / `report_usage`. A mock backend therefore sees
555 /// that portion of the worker's per-task write surface. Lease
556 /// renewal also routes through `backend.renew(&handle)`. Round-7
557 /// (#135/#145) closed the four trait-shape gaps tracked by #117,
558 /// but `suspend` still reaches the embedded `ferriskey::Client`
559 /// directly via `ff_suspend_execution` — this is the deferred
560 /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
561 /// work. `claim_next` / `claim_from_grant` /
562 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
563 /// are Stage 1c hot-path work. Stage 1d removes the embedded
564 /// client entirely.
565 ///
566 /// Today's constructor is therefore NOT yet a drop-in way to swap
567 /// in a non-Valkey backend — it requires a reachable Valkey node
568 /// for `suspend` plus the remaining hot-path ops. Tests that
569 /// exercise only the migrated per-task ops can run fully against
570 /// a mock backend.
571 ///
572 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
573 /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
574 pub async fn connect_with(
575 config: WorkerConfig,
576 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
577 completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
578 ) -> Result<Self, SdkError> {
579 let mut worker = Self::connect(config).await?;
580 worker.backend = backend;
581 worker.completion_backend_handle = completion;
582 Ok(worker)
583 }
584
585 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
586 /// ops through.
587 ///
588 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
589 /// the `Option` wrapper is retained for API stability with the
590 /// Stage-1a shape. Stage 1c narrows the return type to
591 /// `&Arc<dyn EngineBackend>`.
592 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
593 Some(&self.backend)
594 }
595
596 /// Crate-internal direct borrow of the backend. The public
597 /// [`Self::backend`] still returns `Option` for API stability
598 /// (Stage 1b holdover). Snapshot trait-forwarders in
599 /// [`crate::snapshot`] need an un-wrapped reference.
600 pub(crate) fn backend_ref(
601 &self,
602 ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
603 &self.backend
604 }
605
606 /// Handle to the completion-event subscription backend, for
607 /// consumers that need to observe execution completions (DAG
608 /// reconcilers, tenant-isolated subscribers).
609 ///
610 /// Returns `Some` when the worker was built through
611 /// [`Self::connect`] on the default `valkey-default` feature
612 /// (the bundled `ValkeyBackend` implements
613 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
614 /// or via [`Self::connect_with`] with a `Some(..)` completion
615 /// handle. Returns `None` when the caller passed `None` to
616 /// [`Self::connect_with`] — i.e. the backend does not support
617 /// push-based completion streams (future Postgres without
618 /// LISTEN/NOTIFY, test mocks).
619 ///
620 /// The returned handle shares the same underlying allocation as
621 /// [`Self::backend`]; calls through it (e.g.
622 /// `subscribe_completions_filtered`) hit the same connection
623 /// the worker itself uses.
624 pub fn completion_backend(
625 &self,
626 ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
627 self.completion_backend_handle.clone()
628 }
629
630 /// Crate-internal borrow of the underlying `ferriskey::Client`.
631 ///
632 /// **RFC-012 Stage 1c tranche-4 (#87):** downgraded from `pub` to
633 /// `pub(crate)`. The public `FlowFabricWorker` surface no longer
634 /// exposes the ferriskey type — consumers that previously reached
635 /// in for `read_stream` / `tail_stream` should now use
636 /// [`ClaimedTask::read_stream`](crate::ClaimedTask::read_stream) /
637 /// [`ClaimedTask::tail_stream`](crate::ClaimedTask::tail_stream)
638 /// (task-holders) or [`read_stream`](crate::read_stream) /
639 /// [`tail_stream`](crate::tail_stream) through a `&dyn
640 /// EngineBackend` (non-task callers).
641 ///
642 /// Retained for in-crate use by
643 /// [`crate::worker::FlowFabricWorker`]'s raw HGET helpers, which
644 /// are not yet migrated to the trait (separate tracking issue).
645 pub(crate) fn client(&self) -> &Client {
646 &self.client
647 }
648
649 /// Get the worker config.
650 pub fn config(&self) -> &WorkerConfig {
651 &self.config
652 }
653
654 /// Get the server-published partition config this worker bound to at
655 /// `connect()`. Exposed so consumers that mint custom
656 /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
657 /// produced outside this worker) stay aligned with the server's
658 /// `num_flow_partitions` — using `PartitionConfig::default()`
659 /// assumes 256 partitions and silently misses data on deployments
660 /// with any other value.
661 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
662 &self.partition_config
663 }
664
665 /// Attempt to claim the next eligible execution.
666 ///
667 /// Phase 1 simplified claim flow:
668 /// 1. Pick a lane (round-robin across configured lanes)
669 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
670 /// 3. Claim the execution via `ff_claim_execution`
671 /// 4. Read execution payload + tags
672 /// 5. Return a [`ClaimedTask`] with auto lease renewal
673 ///
674 /// Gated behind the `direct-valkey-claim` feature — bypasses the
675 /// scheduler's budget / quota / capability admission checks. Enable
676 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
677 /// the scheduler hop would be measurement noise (benches) or when
678 /// the test harness needs a deterministic worker-local path. Prefer
679 /// the scheduler-routed HTTP claim path in production.
680 ///
681 /// # `None` semantics
682 ///
683 /// `Ok(None)` means **no work was found in the partition window this
684 /// poll covered**, not "the cluster is idle". Each call scans a chunk
685 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
686 /// `scan_cursor`; the cursor advances by that chunk size on every
687 /// invocation, so a worker covers every partition exactly once every
688 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
689 ///
690 /// Callers should treat `None` as "poll again soon" (typically after
691 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
692 /// time". Backing off too aggressively on `None` can starve workers
693 /// when work lives on partitions outside the current window.
694 ///
695 /// Returns `Err` on Valkey errors or script failures.
696 #[cfg(feature = "direct-valkey-claim")]
697 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
698 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
699 // try_acquire returns immediately — if no permits available, the worker
700 // is at capacity and should not claim more work.
701 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
702 Ok(p) => p,
703 Err(_) => return Ok(None), // At capacity — no claim attempted
704 };
705
706 let lane_id = self.next_lane();
707 let now = TimestampMs::now();
708
709 // Phase 1: We scan eligible executions directly by reading the eligible
710 // ZSET across execution partitions. In production the scheduler
711 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
712 // simplified inline claim.
713 //
714 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
715 // partitions starting at a rolling offset. This keeps idle Valkey
716 // load at O(chunk) per worker-second instead of O(num_partitions),
717 // and the worker-instance-seeded initial cursor spreads concurrent
718 // workers across different partition windows.
719 let num_partitions = self.partition_config.num_flow_partitions as usize;
720 if num_partitions == 0 {
721 return Ok(None);
722 }
723 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
724 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
725
726 for step in 0..chunk {
727 let partition_idx = ((start + step) % num_partitions) as u16;
728 let partition = ff_core::partition::Partition {
729 family: ff_core::partition::PartitionFamily::Execution,
730 index: partition_idx,
731 };
732 let idx = IndexKeys::new(&partition);
733 let eligible_key = idx.lane_eligible(&lane_id);
734
735 // ZRANGEBYSCORE to get the highest-priority eligible execution.
736 // Score format: -(priority * 1_000_000_000_000) + created_at_ms
737 // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
738 let result: Value = self
739 .client
740 .cmd("ZRANGEBYSCORE")
741 .arg(&eligible_key)
742 .arg("-inf")
743 .arg("+inf")
744 .arg("LIMIT")
745 .arg("0")
746 .arg("1")
747 .execute()
748 .await
749 .map_err(|e| crate::backend_context(e, "ZRANGEBYSCORE failed"))?;
750
751 let execution_id_str = match extract_first_array_string(&result) {
752 Some(s) => s,
753 None => continue, // No eligible executions on this partition
754 };
755
756 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
757 SdkError::from(ff_script::error::ScriptError::Parse {
758 fcall: "claim_execution_from_eligible_set".into(),
759 execution_id: None,
760 message: format!("bad execution_id in eligible set: {e}"),
761 })
762 })?;
763
764 // Step 1: Issue claim grant
765 let grant_result = self
766 .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
767 .await;
768
769 match grant_result {
770 Ok(()) => {}
771 Err(SdkError::Engine(ref boxed))
772 if matches!(
773 **boxed,
774 crate::EngineError::Validation {
775 kind: crate::ValidationKind::CapabilityMismatch,
776 ..
777 }
778 ) =>
779 {
780 let missing = match &**boxed {
781 crate::EngineError::Validation { detail, .. } => detail.clone(),
782 _ => unreachable!(),
783 };
784 // Block-on-mismatch (RFC-009 §7.5) — parity with
785 // ff-scheduler's Scheduler::claim_for_worker. Without
786 // this, the inline-direct-claim path would hot-loop
787 // on an unclaimable top-of-zset (every tick picks the
788 // same execution, wastes an FCALL, logs, releases,
789 // repeats). The scheduler-side unblock scanner
790 // promotes blocked_route executions back to eligible
791 // when a worker with matching caps registers.
792 tracing::info!(
793 execution_id = %execution_id,
794 worker_id = %self.config.worker_id,
795 worker_caps_hash = %self.worker_capabilities_hash,
796 missing = %missing,
797 "capability mismatch, blocking execution off eligible (SDK inline claim)"
798 );
799 self.block_route(&execution_id, &lane_id, &partition, &idx).await;
800 continue;
801 }
802 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
803 tracing::debug!(
804 execution_id = %execution_id,
805 error = %e,
806 "claim grant failed (retryable), trying next partition"
807 );
808 continue;
809 }
810 Err(e) => return Err(e),
811 }
812
813 // Step 2: Claim the execution
814 match self
815 .claim_execution(&execution_id, &lane_id, &partition, now)
816 .await
817 {
818 Ok(mut task) => {
819 // Transfer concurrency permit to the task. When the task is
820 // completed/failed/cancelled/dropped the permit returns to
821 // the semaphore, allowing another claim.
822 task.set_concurrency_permit(permit);
823 return Ok(Some(task));
824 }
825 Err(SdkError::Engine(ref boxed))
826 if matches!(
827 **boxed,
828 crate::EngineError::Contention(
829 crate::ContentionKind::UseClaimResumedExecution
830 )
831 ) =>
832 {
833 // Execution was resumed from suspension — attempt_interrupted.
834 // ff_claim_execution rejects this; use ff_claim_resumed_execution
835 // which reuses the existing attempt instead of creating a new one.
836 tracing::debug!(
837 execution_id = %execution_id,
838 "execution is resumed, using claim_resumed path"
839 );
840 match self
841 .claim_resumed_execution(&execution_id, &lane_id, &partition)
842 .await
843 {
844 Ok(mut task) => {
845 task.set_concurrency_permit(permit);
846 return Ok(Some(task));
847 }
848 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
849 tracing::debug!(
850 execution_id = %execution_id,
851 error = %e2,
852 "claim_resumed failed (retryable), trying next partition"
853 );
854 continue;
855 }
856 Err(e2) => return Err(e2),
857 }
858 }
859 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
860 tracing::debug!(
861 execution_id = %execution_id,
862 error = %e,
863 "claim execution failed (retryable), trying next partition"
864 );
865 continue;
866 }
867 Err(e) => return Err(e),
868 }
869 }
870
871 // No eligible work found on any partition
872 Ok(None)
873 }
874
875 #[cfg(feature = "direct-valkey-claim")]
876 async fn issue_claim_grant(
877 &self,
878 execution_id: &ExecutionId,
879 lane_id: &LaneId,
880 partition: &ff_core::partition::Partition,
881 idx: &IndexKeys,
882 ) -> Result<(), SdkError> {
883 let ctx = ExecKeyContext::new(partition, execution_id);
884
885 // KEYS (3): exec_core, claim_grant_key, eligible_zset
886 let keys: Vec<String> = vec![
887 ctx.core(),
888 ctx.claim_grant(),
889 idx.lane_eligible(lane_id),
890 ];
891
892 // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
893 // capability_hash, grant_ttl_ms, route_snapshot_json,
894 // admission_summary, worker_capabilities_csv (sorted)
895 let args: Vec<String> = vec![
896 execution_id.to_string(),
897 self.config.worker_id.to_string(),
898 self.config.worker_instance_id.to_string(),
899 lane_id.to_string(),
900 String::new(), // capability_hash
901 "5000".to_owned(), // grant_ttl_ms (5 seconds)
902 String::new(), // route_snapshot_json
903 String::new(), // admission_summary
904 self.worker_capabilities_csv.clone(), // sorted CSV
905 ];
906
907 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
908 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
909
910 let raw: Value = self
911 .client
912 .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
913 .await
914 .map_err(SdkError::from)?;
915
916 crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
917 }
918
919 /// Move an execution from the lane's eligible ZSET into its
920 /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
921 /// after a `CapabilityMismatch` reject — without this, the inline
922 /// direct-claim path would re-pick the same top-of-zset every tick
923 /// (same pattern the scheduler's block_candidate handles). The
924 /// engine's unblock scanner periodically promotes blocked_route
925 /// back to eligible once a worker with matching caps registers.
926 ///
927 /// Best-effort: transport or logical rejects (e.g. the execution
928 /// already went terminal between pick and block) are logged and the
929 /// outer loop simply `continue`s to the next partition. Parity with
930 /// ff-scheduler::Scheduler::block_candidate.
931 #[cfg(feature = "direct-valkey-claim")]
932 async fn block_route(
933 &self,
934 execution_id: &ExecutionId,
935 lane_id: &LaneId,
936 partition: &ff_core::partition::Partition,
937 idx: &IndexKeys,
938 ) {
939 let ctx = ExecKeyContext::new(partition, execution_id);
940 let core_key = ctx.core();
941 let eligible_key = idx.lane_eligible(lane_id);
942 let blocked_key = idx.lane_blocked_route(lane_id);
943 let eid_s = execution_id.to_string();
944 let now_ms = TimestampMs::now().0.to_string();
945
946 let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
947 let argv: [&str; 4] = [
948 &eid_s,
949 "waiting_for_capable_worker",
950 "no connected worker satisfies required_capabilities",
951 &now_ms,
952 ];
953
954 match self
955 .client
956 .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
957 .await
958 {
959 Ok(v) => {
960 // Parse Lua result so a logical reject (e.g. execution
961 // went terminal mid-flight) is visible — same fix we
962 // applied to ff-scheduler's block_candidate.
963 if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
964 tracing::warn!(
965 execution_id = %execution_id,
966 error = %e,
967 "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
968 will re-evaluate"
969 );
970 }
971 }
972 Err(e) => {
973 tracing::warn!(
974 execution_id = %execution_id,
975 error = %e,
976 "SDK block_route: transport failure; eligible ZSET unchanged"
977 );
978 }
979 }
980 }
981
982 /// Low-level claim of a granted execution. Invokes
983 /// `ff_claim_execution` and returns a `ClaimedTask` with auto
984 /// lease renewal.
985 ///
986 /// Previously gated behind `direct-valkey-claim`; ungated so
987 /// the public [`claim_from_grant`] entry point can reuse the
988 /// same FCALL plumbing. The method stays private — external
989 /// callers use `claim_from_grant`.
990 ///
991 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
992 async fn claim_execution(
993 &self,
994 execution_id: &ExecutionId,
995 lane_id: &LaneId,
996 partition: &ff_core::partition::Partition,
997 _now: TimestampMs,
998 ) -> Result<ClaimedTask, SdkError> {
999 let ctx = ExecKeyContext::new(partition, execution_id);
1000 let idx = IndexKeys::new(partition);
1001
1002 // Pre-read total_attempt_count from exec_core to derive next attempt index.
1003 // The Lua uses total_attempt_count as the new index and dynamically builds
1004 // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
1005 // we pass the correct index for documentation/debugging.
1006 let total_str: Option<String> = self.client
1007 .cmd("HGET")
1008 .arg(ctx.core())
1009 .arg("total_attempt_count")
1010 .execute()
1011 .await
1012 .unwrap_or(None);
1013 let next_idx = total_str
1014 .as_deref()
1015 .and_then(|s| s.parse::<u32>().ok())
1016 .unwrap_or(0);
1017 let att_idx = AttemptIndex::new(next_idx);
1018
1019 let lease_id = LeaseId::new().to_string();
1020 let attempt_id = AttemptId::new().to_string();
1021 let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
1022
1023 // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
1024 let keys: Vec<String> = vec![
1025 ctx.core(), // 1 exec_core
1026 ctx.claim_grant(), // 2 claim_grant
1027 idx.lane_eligible(lane_id), // 3 eligible_zset
1028 idx.lease_expiry(), // 4 lease_expiry_zset
1029 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1030 ctx.attempt_hash(att_idx), // 6 attempt_hash (placeholder)
1031 ctx.attempt_usage(att_idx), // 7 attempt_usage (placeholder)
1032 ctx.attempt_policy(att_idx), // 8 attempt_policy (placeholder)
1033 ctx.attempts(), // 9 attempts_zset
1034 ctx.lease_current(), // 10 lease_current
1035 ctx.lease_history(), // 11 lease_history
1036 idx.lane_active(lane_id), // 12 active_index
1037 idx.attempt_timeout(), // 13 attempt_timeout_zset
1038 idx.execution_deadline(), // 14 execution_deadline_zset
1039 ];
1040
1041 // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
1042 let args: Vec<String> = vec![
1043 execution_id.to_string(), // 1 execution_id
1044 self.config.worker_id.to_string(), // 2 worker_id
1045 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1046 lane_id.to_string(), // 4 lane
1047 String::new(), // 5 capability_hash
1048 lease_id.clone(), // 6 lease_id
1049 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1050 renew_before_ms.to_string(), // 8 renew_before_ms
1051 attempt_id.clone(), // 9 attempt_id
1052 "{}".to_owned(), // 10 attempt_policy_json
1053 String::new(), // 11 attempt_timeout_ms
1054 String::new(), // 12 execution_deadline_at
1055 ];
1056
1057 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1058 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1059
1060 let raw: Value = self
1061 .client
1062 .fcall("ff_claim_execution", &key_refs, &arg_refs)
1063 .await
1064 .map_err(SdkError::from)?;
1065
1066 // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
1067 // attempt_id, attempt_type, lease_expires_at}
1068 let arr = match &raw {
1069 Value::Array(arr) => arr,
1070 _ => {
1071 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1072 fcall: "ff_claim_execution".into(),
1073 execution_id: Some(execution_id.to_string()),
1074 message: "expected Array".into(),
1075 }));
1076 }
1077 };
1078
1079 let status_code = match arr.first() {
1080 Some(Ok(Value::Int(n))) => *n,
1081 _ => {
1082 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1083 fcall: "ff_claim_execution".into(),
1084 execution_id: Some(execution_id.to_string()),
1085 message: "bad status code".into(),
1086 }));
1087 }
1088 };
1089
1090 if status_code != 1 {
1091 let err_field_str = |idx: usize| -> String {
1092 arr.get(idx)
1093 .and_then(|v| match v {
1094 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1095 Ok(Value::SimpleString(s)) => Some(s.clone()),
1096 _ => None,
1097 })
1098 .unwrap_or_default()
1099 };
1100 let error_code = {
1101 let s = err_field_str(1);
1102 if s.is_empty() { "unknown".to_owned() } else { s }
1103 };
1104 let detail = err_field_str(2);
1105
1106 return Err(SdkError::from(
1107 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1108 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1109 fcall: "ff_claim_execution".into(),
1110 execution_id: Some(execution_id.to_string()),
1111 message: format!("unknown error: {error_code}"),
1112 }),
1113 ));
1114 }
1115
1116 // Extract fields from success response
1117 let field_str = |idx: usize| -> String {
1118 arr.get(idx + 2) // skip status_code and "OK"
1119 .and_then(|v| match v {
1120 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1121 Ok(Value::SimpleString(s)) => Some(s.clone()),
1122 Ok(Value::Int(n)) => Some(n.to_string()),
1123 _ => None,
1124 })
1125 .unwrap_or_default()
1126 };
1127
1128 // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1129 // Positions: 0 1 2 3 4 5
1130 let lease_id = LeaseId::parse(&field_str(0))
1131 .unwrap_or_else(|_| LeaseId::new());
1132 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1133 // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1134 let attempt_id = AttemptId::parse(&field_str(3))
1135 .unwrap_or_else(|_| AttemptId::new());
1136 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1137
1138 // Read execution payload and metadata
1139 let (input_payload, execution_kind, tags) = self
1140 .read_execution_context(execution_id, partition)
1141 .await?;
1142
1143 Ok(ClaimedTask::new(
1144 self.client.clone(),
1145 self.backend.clone(),
1146 self.partition_config,
1147 execution_id.clone(),
1148 attempt_index,
1149 attempt_id,
1150 lease_id,
1151 lease_epoch,
1152 self.config.lease_ttl_ms,
1153 lane_id.clone(),
1154 self.config.worker_instance_id.clone(),
1155 input_payload,
1156 execution_kind,
1157 tags,
1158 ))
1159 }
1160
1161 /// Consume a [`ClaimGrant`] and claim the granted execution on
1162 /// this worker. The intended production entry point: pair with
1163 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1164 /// scheduler-issued grants into the SDK without enabling the
1165 /// `direct-valkey-claim` feature (which bypasses budget/quota
1166 /// admission control).
1167 ///
1168 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1169 /// so a saturated worker does not consume the grant: the grant
1170 /// stays valid for its remaining TTL and the caller can either
1171 /// release it back to the scheduler or retry after some other
1172 /// in-flight task completes.
1173 ///
1174 /// On success the returned [`ClaimedTask`] holds a concurrency
1175 /// permit that releases automatically on
1176 /// `complete`/`fail`/`cancel`/drop — same contract as
1177 /// `claim_next`.
1178 ///
1179 /// # Arguments
1180 ///
1181 /// * `lane` — the lane the grant was issued for. Must match what
1182 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1183 /// uses it to look up `lane_eligible`, `lane_active`, and the
1184 /// `worker_leases` index slot.
1185 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1186 ///
1187 /// # Errors
1188 ///
1189 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1190 /// permits all held. Retryable; the grant is untouched.
1191 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1192 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1193 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1194 /// (wrapped in [`SdkError::Engine`]).
1195 /// * `ScriptError::CapabilityMismatch` — execution's required
1196 /// capabilities not a subset of this worker's caps (wrapped in
1197 /// [`SdkError::Engine`]). Surfaced post-grant if a race
1198 /// between grant issuance and caps change allows it.
1199 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1200 /// unexpected shape (wrapped in [`SdkError::Engine`]).
1201 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1202 /// transport error during the FCALL or the
1203 /// `read_execution_context` follow-up.
1204 ///
1205 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1206 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1207 pub async fn claim_from_grant(
1208 &self,
1209 lane: LaneId,
1210 grant: ff_core::contracts::ClaimGrant,
1211 ) -> Result<ClaimedTask, SdkError> {
1212 // Semaphore check FIRST. If the worker is saturated we must
1213 // surface the condition to the caller without touching the
1214 // grant — silently returning Ok(None) (as claim_next does)
1215 // would drop a grant the scheduler has already committed work
1216 // to issuing, wasting the slot until its TTL elapses.
1217 let permit = self
1218 .concurrency_semaphore
1219 .clone()
1220 .try_acquire_owned()
1221 .map_err(|_| SdkError::WorkerAtCapacity)?;
1222
1223 let now = TimestampMs::now();
1224 let partition = grant.partition().map_err(|e| SdkError::Config {
1225 context: "claim_from_grant".to_owned(),
1226 field: Some("partition_key".to_owned()),
1227 message: e.to_string(),
1228 })?;
1229 let mut task = self
1230 .claim_execution(&grant.execution_id, &lane, &partition, now)
1231 .await?;
1232 task.set_concurrency_permit(permit);
1233 Ok(task)
1234 }
1235
1236 /// Scheduler-routed claim: POST the server's
1237 /// `/v1/workers/{id}/claim`, then chain to
1238 /// [`Self::claim_from_grant`].
1239 ///
1240 /// Batch C item 2 PR-B. This is the production entry point —
1241 /// budget + quota + capability admission run server-side inside
1242 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1243 /// enable the `direct-valkey-claim` feature.
1244 ///
1245 /// Returns `Ok(None)` when the server says no eligible execution
1246 /// (HTTP 204). Callers typically back off by
1247 /// `config.claim_poll_interval_ms` and try again, same cadence
1248 /// as the direct-claim path's `Ok(None)`.
1249 ///
1250 /// The `admin` client is the established HTTP surface
1251 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1252 /// second reqwest client around. Build once at worker boot and
1253 /// hand in by reference on every claim.
1254 pub async fn claim_via_server(
1255 &self,
1256 admin: &crate::FlowFabricAdminClient,
1257 lane: &LaneId,
1258 grant_ttl_ms: u64,
1259 ) -> Result<Option<ClaimedTask>, SdkError> {
1260 let req = crate::admin::ClaimForWorkerRequest {
1261 worker_id: self.config.worker_id.to_string(),
1262 lane_id: lane.to_string(),
1263 worker_instance_id: self.config.worker_instance_id.to_string(),
1264 capabilities: self.config.capabilities.clone(),
1265 grant_ttl_ms,
1266 };
1267 let Some(resp) = admin.claim_for_worker(req).await? else {
1268 return Ok(None);
1269 };
1270 let grant = resp.into_grant()?;
1271 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1272 }
1273
1274 /// Consume a [`ReclaimGrant`] and transition the granted
1275 /// `attempt_interrupted` execution into a `started` state on this
1276 /// worker. Symmetric partner to [`claim_from_grant`] for the
1277 /// resume path.
1278 ///
1279 /// The grant must have been issued to THIS worker (matching
1280 /// `worker_id` at grant time). A mismatch returns
1281 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1282 /// atomically by `ff_claim_resumed_execution`; a second call with
1283 /// the same grant also returns `InvalidClaimGrant`.
1284 ///
1285 /// # Concurrency
1286 ///
1287 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1288 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1289 /// assume pre-existing capacity on this worker — a reclaim can
1290 /// land on a fresh worker instance that just came up after a
1291 /// crash/restart and is picking up a previously-interrupted
1292 /// execution. If the worker is saturated, the grant stays valid
1293 /// for its remaining TTL and the caller can release it or retry.
1294 ///
1295 /// On success the returned [`ClaimedTask`] holds a concurrency
1296 /// permit that releases automatically on
1297 /// `complete`/`fail`/`cancel`/drop.
1298 ///
1299 /// # Errors
1300 ///
1301 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1302 /// permits all held. Retryable; the grant is untouched (no
1303 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1304 /// atomically consume the grant key).
1305 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1306 /// or `worker_id` mismatch.
1307 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1308 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1309 /// `attempt_interrupted`.
1310 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1311 /// not `runnable`.
1312 /// * `ScriptError::ExecutionNotFound` — core key missing.
1313 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1314 /// transport.
1315 ///
1316 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1317 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1318 pub async fn claim_from_reclaim_grant(
1319 &self,
1320 grant: ff_core::contracts::ReclaimGrant,
1321 ) -> Result<ClaimedTask, SdkError> {
1322 // Semaphore check FIRST — same load-bearing ordering as
1323 // `claim_from_grant`. If the worker is saturated, surface
1324 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1325 // atomic consume on the grant key, so calling it past-
1326 // saturation would destroy the grant while leaving no
1327 // permit to attach to the returned `ClaimedTask`.
1328 let permit = self
1329 .concurrency_semaphore
1330 .clone()
1331 .try_acquire_owned()
1332 .map_err(|_| SdkError::WorkerAtCapacity)?;
1333
1334 // Grant carries partition + lane_id so no round-trip is needed
1335 // to resolve them before the FCALL.
1336 let partition = grant.partition().map_err(|e| SdkError::Config {
1337 context: "claim_from_reclaim_grant".to_owned(),
1338 field: Some("partition_key".to_owned()),
1339 message: e.to_string(),
1340 })?;
1341 let mut task = self
1342 .claim_resumed_execution(
1343 &grant.execution_id,
1344 &grant.lane_id,
1345 &partition,
1346 )
1347 .await?;
1348 task.set_concurrency_permit(permit);
1349 Ok(task)
1350 }
1351
1352 /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1353 /// and returns a `ClaimedTask` bound to the resumed attempt.
1354 ///
1355 /// Previously gated behind `direct-valkey-claim`; ungated so the
1356 /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1357 /// The method stays private — external callers use
1358 /// `claim_from_reclaim_grant`.
1359 ///
1360 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1361 async fn claim_resumed_execution(
1362 &self,
1363 execution_id: &ExecutionId,
1364 lane_id: &LaneId,
1365 partition: &ff_core::partition::Partition,
1366 ) -> Result<ClaimedTask, SdkError> {
1367 let ctx = ExecKeyContext::new(partition, execution_id);
1368 let idx = IndexKeys::new(partition);
1369
1370 // Pre-read current_attempt_index for the existing attempt hash key.
1371 // This is load-bearing: KEYS[6] must point to the real attempt hash.
1372 let att_idx_str: Option<String> = self.client
1373 .cmd("HGET")
1374 .arg(ctx.core())
1375 .arg("current_attempt_index")
1376 .execute()
1377 .await
1378 .map_err(|e| crate::backend_context(e, "read attempt_index"))?;
1379 let att_idx = AttemptIndex::new(
1380 att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1381 );
1382
1383 let lease_id = LeaseId::new().to_string();
1384
1385 // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1386 let keys: Vec<String> = vec![
1387 ctx.core(), // 1 exec_core
1388 ctx.claim_grant(), // 2 claim_grant
1389 idx.lane_eligible(lane_id), // 3 eligible_zset
1390 idx.lease_expiry(), // 4 lease_expiry_zset
1391 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1392 ctx.attempt_hash(att_idx), // 6 existing_attempt_hash
1393 ctx.lease_current(), // 7 lease_current
1394 ctx.lease_history(), // 8 lease_history
1395 idx.lane_active(lane_id), // 9 active_index
1396 idx.attempt_timeout(), // 10 attempt_timeout_zset
1397 idx.execution_deadline(), // 11 execution_deadline_zset
1398 ];
1399
1400 // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1401 let args: Vec<String> = vec![
1402 execution_id.to_string(), // 1 execution_id
1403 self.config.worker_id.to_string(), // 2 worker_id
1404 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1405 lane_id.to_string(), // 4 lane
1406 String::new(), // 5 capability_hash
1407 lease_id.clone(), // 6 lease_id
1408 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1409 String::new(), // 8 remaining_attempt_timeout_ms
1410 ];
1411
1412 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1413 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1414
1415 // TODO(#150): migrate when EngineBackend trait grows a
1416 // `claim_resumed_execution` method; for now this FCALL stays on
1417 // ff-sdk's direct client path.
1418 let raw: Value = self
1419 .client
1420 .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1421 .await
1422 .map_err(SdkError::from)?;
1423
1424 // Parse result — same format as ff_claim_execution:
1425 // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1426 let arr = match &raw {
1427 Value::Array(arr) => arr,
1428 _ => {
1429 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1430 fcall: "ff_claim_resumed_execution".into(),
1431 execution_id: Some(execution_id.to_string()),
1432 message: "expected Array".into(),
1433 }));
1434 }
1435 };
1436
1437 let status_code = match arr.first() {
1438 Some(Ok(Value::Int(n))) => *n,
1439 _ => {
1440 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1441 fcall: "ff_claim_resumed_execution".into(),
1442 execution_id: Some(execution_id.to_string()),
1443 message: "bad status code".into(),
1444 }));
1445 }
1446 };
1447
1448 if status_code != 1 {
1449 let err_field_str = |idx: usize| -> String {
1450 arr.get(idx)
1451 .and_then(|v| match v {
1452 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1453 Ok(Value::SimpleString(s)) => Some(s.clone()),
1454 _ => None,
1455 })
1456 .unwrap_or_default()
1457 };
1458 let error_code = {
1459 let s = err_field_str(1);
1460 if s.is_empty() { "unknown".to_owned() } else { s }
1461 };
1462 let detail = err_field_str(2);
1463
1464 return Err(SdkError::from(
1465 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1466 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1467 fcall: "ff_claim_resumed_execution".into(),
1468 execution_id: Some(execution_id.to_string()),
1469 message: format!("unknown error: {error_code}"),
1470 }),
1471 ));
1472 }
1473
1474 let field_str = |idx: usize| -> String {
1475 arr.get(idx + 2)
1476 .and_then(|v| match v {
1477 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1478 Ok(Value::SimpleString(s)) => Some(s.clone()),
1479 Ok(Value::Int(n)) => Some(n.to_string()),
1480 _ => None,
1481 })
1482 .unwrap_or_default()
1483 };
1484
1485 let lease_id = LeaseId::parse(&field_str(0))
1486 .unwrap_or_else(|_| LeaseId::new());
1487 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1488 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1489 let attempt_id = AttemptId::parse(&field_str(3))
1490 .unwrap_or_else(|_| AttemptId::new());
1491
1492 let (input_payload, execution_kind, tags) = self
1493 .read_execution_context(execution_id, partition)
1494 .await?;
1495
1496 Ok(ClaimedTask::new(
1497 self.client.clone(),
1498 self.backend.clone(),
1499 self.partition_config,
1500 execution_id.clone(),
1501 attempt_index,
1502 attempt_id,
1503 lease_id,
1504 lease_epoch,
1505 self.config.lease_ttl_ms,
1506 lane_id.clone(),
1507 self.config.worker_instance_id.clone(),
1508 input_payload,
1509 execution_kind,
1510 tags,
1511 ))
1512 }
1513
1514 /// Read payload + execution_kind + tags from exec_core. Previously
1515 /// gated behind `direct-valkey-claim`; now shared by the
1516 /// feature-gated inline claim path and the public
1517 /// `claim_from_reclaim_grant` entry point.
1518 async fn read_execution_context(
1519 &self,
1520 execution_id: &ExecutionId,
1521 partition: &ff_core::partition::Partition,
1522 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1523 let ctx = ExecKeyContext::new(partition, execution_id);
1524
1525 // Read payload
1526 let payload: Option<String> = self
1527 .client
1528 .get(&ctx.payload())
1529 .await
1530 .map_err(|e| crate::backend_context(e, "GET payload failed"))?;
1531 let input_payload = payload.unwrap_or_default().into_bytes();
1532
1533 // Read execution_kind from core
1534 let kind: Option<String> = self
1535 .client
1536 .hget(&ctx.core(), "execution_kind")
1537 .await
1538 .map_err(|e| crate::backend_context(e, "HGET execution_kind failed"))?;
1539 let execution_kind = kind.unwrap_or_default();
1540
1541 // Read tags
1542 let tags: HashMap<String, String> = self
1543 .client
1544 .hgetall(&ctx.tags())
1545 .await
1546 .map_err(|e| crate::backend_context(e, "HGETALL tags"))?;
1547
1548 Ok((input_payload, execution_kind, tags))
1549 }
1550
1551 // ── Phase 3: Signal delivery ──
1552
1553 /// Deliver a signal to a suspended execution's waitpoint.
1554 ///
1555 /// The engine atomically records the signal, evaluates the resume condition,
1556 /// and optionally transitions the execution from `suspended` to `runnable`.
1557 pub async fn deliver_signal(
1558 &self,
1559 execution_id: &ExecutionId,
1560 waitpoint_id: &WaitpointId,
1561 signal: crate::task::Signal,
1562 ) -> Result<crate::task::SignalOutcome, SdkError> {
1563 let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1564 let ctx = ExecKeyContext::new(&partition, execution_id);
1565 let idx = IndexKeys::new(&partition);
1566
1567 let signal_id = ff_core::types::SignalId::new();
1568 let now = TimestampMs::now();
1569
1570 // Pre-read lane_id from exec_core — the execution may be on any lane,
1571 // not necessarily one of this worker's configured lanes.
1572 let lane_str: Option<String> = self
1573 .client
1574 .hget(&ctx.core(), "lane_id")
1575 .await
1576 .map_err(|e| crate::backend_context(e, "HGET lane_id"))?;
1577 let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1578
1579 // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1580 // exec_signals_zset, signal_hash, signal_payload,
1581 // idem_key, waitpoint_hash, suspension_current,
1582 // eligible_zset, suspended_zset, delayed_zset,
1583 // suspension_timeout_zset, hmac_secrets
1584 let idem_key = if let Some(ref ik) = signal.idempotency_key {
1585 ctx.signal_dedup(waitpoint_id, ik)
1586 } else {
1587 ctx.noop() // must share {p:N} hash tag for cluster mode
1588 };
1589 let keys: Vec<String> = vec![
1590 ctx.core(), // 1
1591 ctx.waitpoint_condition(waitpoint_id), // 2
1592 ctx.waitpoint_signals(waitpoint_id), // 3
1593 ctx.exec_signals(), // 4
1594 ctx.signal(&signal_id), // 5
1595 ctx.signal_payload(&signal_id), // 6
1596 idem_key, // 7
1597 ctx.waitpoint(waitpoint_id), // 8
1598 ctx.suspension_current(), // 9
1599 idx.lane_eligible(&lane_id), // 10
1600 idx.lane_suspended(&lane_id), // 11
1601 idx.lane_delayed(&lane_id), // 12
1602 idx.suspension_timeout(), // 13
1603 idx.waitpoint_hmac_secrets(), // 14
1604 ];
1605
1606 let payload_str = signal
1607 .payload
1608 .as_ref()
1609 .map(|p| String::from_utf8_lossy(p).into_owned())
1610 .unwrap_or_default();
1611
1612 // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1613 // signal_category, source_type, source_identity,
1614 // payload, payload_encoding, idempotency_key,
1615 // correlation_id, target_scope, created_at,
1616 // dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1617 // max_signals_per_execution, waitpoint_token
1618 let args: Vec<String> = vec![
1619 signal_id.to_string(), // 1
1620 execution_id.to_string(), // 2
1621 waitpoint_id.to_string(), // 3
1622 signal.signal_name, // 4
1623 signal.signal_category, // 5
1624 signal.source_type, // 6
1625 signal.source_identity, // 7
1626 payload_str, // 8
1627 "json".to_owned(), // 9 payload_encoding
1628 signal.idempotency_key.unwrap_or_default(), // 10
1629 String::new(), // 11 correlation_id
1630 "waitpoint".to_owned(), // 12 target_scope
1631 now.to_string(), // 13 created_at
1632 "86400000".to_owned(), // 14 dedup_ttl_ms
1633 "0".to_owned(), // 15 resume_delay_ms
1634 "1000".to_owned(), // 16 signal_maxlen
1635 "10000".to_owned(), // 17 max_signals
1636 // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1637 // use ToString/Display (those are redacted for log safety);
1638 // .as_str() is the explicit opt-in that gets the secret bytes.
1639 signal.waitpoint_token.as_str().to_owned(), // 18 waitpoint_token
1640 ];
1641
1642 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1643 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1644
1645 // TODO(#150): migrate when EngineBackend trait grows a
1646 // `deliver_signal` method; for now this FCALL stays on
1647 // ff-sdk's direct client path.
1648 let raw: Value = self
1649 .client
1650 .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1651 .await
1652 .map_err(SdkError::from)?;
1653
1654 crate::task::parse_signal_result(&raw)
1655 }
1656
1657 #[cfg(feature = "direct-valkey-claim")]
1658 fn next_lane(&self) -> LaneId {
1659 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1660 self.config.lanes[idx].clone()
1661 }
1662}
1663
1664#[cfg(feature = "direct-valkey-claim")]
1665fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1666 use ff_core::error::ErrorClass;
1667 matches!(
1668 ff_script::engine_error_ext::class(err),
1669 ErrorClass::Retryable | ErrorClass::Informational
1670 )
1671}
1672
1673/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1674/// instance id with FNV-1a to place distinct worker processes on different
1675/// partition windows from their first poll. Zero is valid for single-worker
1676/// clusters but spreads work in multi-worker deployments.
1677#[cfg(feature = "direct-valkey-claim")]
1678fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1679 if num_partitions == 0 {
1680 return 0;
1681 }
1682 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1683}
1684
1685#[cfg(feature = "direct-valkey-claim")]
1686fn extract_first_array_string(value: &Value) -> Option<String> {
1687 match value {
1688 Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1689 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1690 Ok(Value::SimpleString(s)) => Some(s.clone()),
1691 _ => None,
1692 },
1693 _ => None,
1694 }
1695}
1696
1697/// Read partition config from Valkey's `ff:config:partitions` hash.
1698/// Returns Err if the key doesn't exist or can't be read.
1699async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1700 let key = ff_core::keys::global_config_partitions();
1701 let fields: HashMap<String, String> = client
1702 .hgetall(&key)
1703 .await
1704 .map_err(|e| crate::backend_context(e, format!("HGETALL {key}")))?;
1705
1706 if fields.is_empty() {
1707 return Err(SdkError::Config {
1708 context: "read_partition_config".into(),
1709 field: None,
1710 message: "ff:config:partitions not found in Valkey".into(),
1711 });
1712 }
1713
1714 let parse = |field: &str, default: u16| -> u16 {
1715 fields
1716 .get(field)
1717 .and_then(|v| v.parse().ok())
1718 .filter(|&n: &u16| n > 0)
1719 .unwrap_or(default)
1720 };
1721
1722 Ok(PartitionConfig {
1723 num_flow_partitions: parse("num_flow_partitions", 256),
1724 num_budget_partitions: parse("num_budget_partitions", 32),
1725 num_quota_partitions: parse("num_quota_partitions", 32),
1726 })
1727}
1728
1729#[cfg(test)]
1730mod completion_accessor_type_tests {
1731 //! Type-level compile check that
1732 //! [`FlowFabricWorker::completion_backend`] returns an
1733 //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1734 //! the assertion is at the function-pointer type level and the
1735 //! #[test] body exists solely so the compiler elaborates it.
1736 use super::FlowFabricWorker;
1737 use ff_core::completion_backend::CompletionBackend;
1738 use std::sync::Arc;
1739
1740 #[test]
1741 fn completion_backend_accessor_signature() {
1742 // If this line compiles, the public accessor returns the
1743 // advertised type. The function is never called (no live
1744 // worker), so no I/O happens.
1745 let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1746 FlowFabricWorker::completion_backend;
1747 }
1748}
1749
1750#[cfg(all(test, feature = "direct-valkey-claim"))]
1751mod scan_cursor_tests {
1752 use super::scan_cursor_seed;
1753
1754 #[test]
1755 fn stable_for_same_input() {
1756 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1757 }
1758
1759 #[test]
1760 fn distinct_for_different_ids() {
1761 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1762 }
1763
1764 #[test]
1765 fn bounded_by_partition_count() {
1766 for i in 0..100 {
1767 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1768 }
1769 }
1770
1771 #[test]
1772 fn zero_partitions_returns_zero() {
1773 assert_eq!(scan_cursor_seed("w1", 0), 0);
1774 }
1775}