ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43/// if let Some(task) = worker.claim_next().await? {
44/// // Process task...
45/// task.complete(Some(b"result".to_vec())).await?;
46/// } else {
47/// tokio::time::sleep(Duration::from_secs(1)).await;
48/// }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52 client: Client,
53 config: WorkerConfig,
54 partition_config: PartitionConfig,
55 /// Sorted, deduplicated, comma-separated capabilities — computed once
56 /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57 /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58 /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59 /// reproducible logs and tests.
60 #[cfg(feature = "direct-valkey-claim")]
61 worker_capabilities_csv: String,
62 /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63 /// per-mismatch logs so the 4KB CSV never echoes on every reject
64 /// during an incident. Full CSV logged once at connect-time WARN for
65 /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66 #[cfg(feature = "direct-valkey-claim")]
67 worker_capabilities_hash: String,
68 #[cfg(feature = "direct-valkey-claim")]
69 lane_index: AtomicUsize,
70 /// Concurrency cap for in-flight tasks. Permits are acquired or
71 /// transferred by [`claim_next`] (feature-gated),
72 /// [`claim_from_grant`] (always available), and
73 /// [`claim_from_reclaim_grant`], transferred to the returned
74 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75 /// Holds `max_concurrent_tasks` permits total.
76 ///
77 /// [`claim_next`]: FlowFabricWorker::claim_next
78 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80 concurrency_semaphore: Arc<Semaphore>,
81 /// Rolling offset for chunked partition scans. Each poll advances the
82 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83 /// chunk)` polls every partition is covered. The initial value is
84 /// derived from `worker_instance_id` so idle workers spread their
85 /// scans across different partitions from the first poll onward.
86 ///
87 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91 /// correctness because the sequence simply restarts a new cycle.
92 #[cfg(feature = "direct-valkey-claim")]
93 scan_cursor: AtomicUsize,
94 /// The [`EngineBackend`] the Stage-1b trait forwarders route
95 /// through.
96 ///
97 /// **RFC-012 Stage 1b.** Always populated:
98 /// [`FlowFabricWorker::connect`] now wraps the worker's own
99 /// `ferriskey::Client` in a `ValkeyBackend` via
100 /// `ValkeyBackend::from_client_and_partitions`, and
101 /// [`FlowFabricWorker::connect_with`] replaces that default with
102 /// the caller-supplied `Arc<dyn EngineBackend>`. The
103 /// [`FlowFabricWorker::backend`] accessor still returns
104 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
105 /// narrows the return type once consumers have migrated.
106 ///
107 /// Hot paths (claim, deliver_signal, admin queries) still use the
108 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
109 /// migrates them through this field, and Stage 1d removes the
110 /// embedded client.
111 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
112}
113
114/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
115/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
116/// O(num_flow_partitions).
117#[cfg(feature = "direct-valkey-claim")]
118const PARTITION_SCAN_CHUNK: usize = 32;
119
120impl FlowFabricWorker {
121 /// Connect to Valkey and prepare the worker.
122 ///
123 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
124 /// library — that is the server's responsibility (ff-server calls
125 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
126 /// the library is already loaded.
127 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
128 if config.lanes.is_empty() {
129 return Err(SdkError::Config {
130 context: "worker_config".into(),
131 field: None,
132 message: "at least one lane is required".into(),
133 });
134 }
135
136 let mut builder = ClientBuilder::new()
137 .host(&config.host, config.port)
138 .connect_timeout(Duration::from_secs(10))
139 .request_timeout(Duration::from_millis(5000));
140 if config.tls {
141 builder = builder.tls();
142 }
143 if config.cluster {
144 builder = builder.cluster();
145 }
146 let client = builder.build()
147 .await
148 .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
149
150 // Verify connectivity
151 let pong: String = client
152 .cmd("PING")
153 .execute()
154 .await
155 .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
156 if pong != "PONG" {
157 return Err(SdkError::Config {
158 context: "worker_connect".into(),
159 field: None,
160 message: format!("unexpected PING response: {pong}"),
161 });
162 }
163
164 // Guard against two worker processes sharing the same
165 // `worker_instance_id`. A duplicate instance would clobber each
166 // other's lease_current/active_index entries and double-claim work.
167 // SET NX on a liveness key with 2× lease TTL; if the key already
168 // exists another process is live. The key auto-expires if this
169 // process crashes without renewal, so a restart after a hard crash
170 // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
171 //
172 // Known limitations of this minimal scheme (documented for operators):
173 // 1. **Startup-only, not runtime.** There is no heartbeat renewal
174 // path. After `2 × lease_ttl_ms` elapses the alive key expires
175 // naturally even while this worker is still running, and a
176 // second process with the same `worker_instance_id` launched
177 // later will successfully SET NX alongside the first. The check
178 // catches misconfiguration at boot; it does not fence duplicates
179 // that appear mid-lifetime. Production deployments should rely
180 // on the orchestrator (Kubernetes, systemd unit with
181 // `Restart=on-failure`, etc.) as the authoritative single-
182 // instance enforcer; this SET NX is belt-and-suspenders.
183 //
184 // 2. **Restart delay after a crash.** If a worker crashes
185 // ungracefully (SIGKILL, container OOM) and is restarted within
186 // `2 × lease_ttl_ms`, the alive key is still present and the
187 // new process exits with `SdkError::Config("duplicate
188 // worker_instance_id ...")`. Options for operators:
189 // - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
190 // before restarting.
191 // - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
192 // unblock the restart.
193 // - Use a fresh `worker_instance_id` for the restart (the
194 // orchestrator should already do this per-Pod).
195 //
196 // 3. **No graceful cleanup on shutdown.** There is no explicit
197 // `disconnect()` call that DELs the alive key. On clean
198 // `SIGTERM` the key lingers until its TTL expires. A follow-up
199 // can add `FlowFabricWorker::disconnect(self)` for callers that
200 // want to skip the restart-delay window.
201 let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
202 let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
203 let set_result: Option<String> = client
204 .cmd("SET")
205 .arg(&alive_key)
206 .arg("1")
207 .arg("NX")
208 .arg("PX")
209 .arg(alive_ttl_ms.to_string().as_str())
210 .execute()
211 .await
212 .map_err(|e| SdkError::ValkeyContext {
213 source: e,
214 context: "SET NX worker alive key".into(),
215 })?;
216 if set_result.is_none() {
217 return Err(SdkError::Config {
218 context: "worker_connect".into(),
219 field: Some("worker_instance_id".into()),
220 message: format!(
221 "duplicate worker_instance_id '{}': another process already holds {alive_key}",
222 config.worker_instance_id
223 ),
224 });
225 }
226
227 // Read partition config from Valkey (set by ff-server on startup).
228 // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
229 let partition_config = read_partition_config(&client).await
230 .unwrap_or_else(|e| {
231 tracing::warn!(
232 error = %e,
233 "ff:config:partitions not found, using defaults"
234 );
235 PartitionConfig::default()
236 });
237
238 let max_tasks = config.max_concurrent_tasks.max(1);
239 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
240
241 tracing::info!(
242 worker_id = %config.worker_id,
243 instance_id = %config.worker_instance_id,
244 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
245 "FlowFabricWorker connected"
246 );
247
248 #[cfg(feature = "direct-valkey-claim")]
249 let scan_cursor_init = scan_cursor_seed(
250 config.worker_instance_id.as_str(),
251 partition_config.num_flow_partitions.max(1) as usize,
252 );
253
254 // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
255 // and deduplicates in one pass; string joining happens once here.
256 //
257 // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
258 // - `,` is the CSV delimiter; a token containing one would split
259 // mid-parse and could let a {"gpu"} worker appear to satisfy
260 // {"gpu,cuda"} (silent auth bypass).
261 // - Empty strings would produce leading/adjacent commas on the
262 // wire and inflate token count for no semantic reason.
263 // - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
264 // `"gpu\n"` vs `"gpu"` produce silent mismatches that are
265 // miserable to debug. Reject anything outside printable ASCII
266 // excluding space (`'!'..='~'`) at ingress so a typo fails
267 // loudly at connect instead of silently mis-routing forever.
268 // Reject at boot so operator misconfig is loud, symmetric with the
269 // scheduler path.
270 #[cfg(feature = "direct-valkey-claim")]
271 for cap in &config.capabilities {
272 if cap.is_empty() {
273 return Err(SdkError::Config {
274 context: "worker_config".into(),
275 field: Some("capabilities".into()),
276 message: "capability token must not be empty".into(),
277 });
278 }
279 if cap.contains(',') {
280 return Err(SdkError::Config {
281 context: "worker_config".into(),
282 field: Some("capabilities".into()),
283 message: format!(
284 "capability token may not contain ',' (CSV delimiter): {cap:?}"
285 ),
286 });
287 }
288 // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
289 // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
290 // characters above 0x7F are ALLOWED so i18n caps like
291 // "东京-gpu" can be used. The CSV wire form is byte-safe for
292 // multibyte UTF-8 because `,` is always a single byte and
293 // never part of a multibyte continuation (only 0x80-0xBF are
294 // continuations, ',' is 0x2C).
295 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
296 return Err(SdkError::Config {
297 context: "worker_config".into(),
298 field: Some("capabilities".into()),
299 message: format!(
300 "capability token must not contain whitespace or control \
301 characters: {cap:?}"
302 ),
303 });
304 }
305 }
306 #[cfg(feature = "direct-valkey-claim")]
307 let worker_capabilities_csv: String = {
308 let set: std::collections::BTreeSet<&str> = config
309 .capabilities
310 .iter()
311 .map(|s| s.as_str())
312 .filter(|s| !s.is_empty())
313 .collect();
314 if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
315 return Err(SdkError::Config {
316 context: "worker_config".into(),
317 field: Some("capabilities".into()),
318 message: format!(
319 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
320 ff_core::policy::CAPS_MAX_TOKENS,
321 set.len()
322 ),
323 });
324 }
325 let csv = set.into_iter().collect::<Vec<_>>().join(",");
326 if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
327 return Err(SdkError::Config {
328 context: "worker_config".into(),
329 field: Some("capabilities".into()),
330 message: format!(
331 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
332 ff_core::policy::CAPS_MAX_BYTES,
333 csv.len()
334 ),
335 });
336 }
337 csv
338 };
339
340 // Short stable digest of the sorted caps CSV, computed once so
341 // per-mismatch logs carry a stable identifier instead of the 4KB
342 // CSV. Shared helper — ff-scheduler uses the same one for its
343 // own per-mismatch logs, so cross-component log lines are
344 // diffable against each other.
345 #[cfg(feature = "direct-valkey-claim")]
346 let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
347
348 // Full CSV logged once at connect so per-mismatch logs (which
349 // carry only the 8-hex hash) can be cross-referenced by ops.
350 #[cfg(feature = "direct-valkey-claim")]
351 if !worker_capabilities_csv.is_empty() {
352 tracing::info!(
353 worker_instance_id = %config.worker_instance_id,
354 worker_caps_hash = %worker_capabilities_hash,
355 worker_caps = %worker_capabilities_csv,
356 "worker connected with capabilities (full CSV — mismatch logs use hash only)"
357 );
358 }
359
360 // Non-authoritative advertisement of caps for operator visibility
361 // (CLI introspection, dashboards). The AUTHORITATIVE source for
362 // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
363 // that, never this string. Lossy here is correctness-safe.
364 //
365 // Storage: a single STRING key holding the sorted CSV. Rationale:
366 // * **Atomic overwrite.** `SET` is a single command — a concurrent
367 // reader can never observe a transient empty value (the prior
368 // DEL+SADD pair had that window).
369 // * **Crash cleanup without refresh loop.** The alive-key SET NX
370 // is startup-only (see §1 above); there's no periodic renew
371 // to piggy-back on, so a TTL on caps would independently
372 // expire mid-flight and hide a live worker's caps from ops
373 // tools. Instead we drop the TTL: each reconnect overwrites;
374 // a crashed worker leaves a stale CSV until a new process
375 // with the same worker_instance_id boots (which triggers
376 // `duplicate worker_instance_id` via alive-key guard anyway —
377 // the orchestrator allocates a new id, and operators can DEL
378 // the stale caps key if they care).
379 // * **Empty caps = DEL.** A restart from {gpu} to {} clears the
380 // advertisement rather than leaving stale data.
381 // Cluster-safe advertisement: the per-worker caps STRING lives at
382 // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
383 // and the INSTANCE ID is SADD'd to the global workers-index SET
384 // `ff:idx:workers` (single slot). The unblock scanner's cluster
385 // enumeration uses SMEMBERS on the index + per-member GET on each
386 // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
387 // cluster mode only scans the shard the SCAN lands on and misses
388 // workers whose key hashes elsewhere). Pattern mirrors Batch A
389 // `budget_policies_index` / `flow_index` / `deps_all_edges`:
390 // operations stay atomic per command, the index is the
391 // cluster-wide enumeration surface.
392 #[cfg(feature = "direct-valkey-claim")]
393 {
394 let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
395 let index_key = ff_core::keys::workers_index_key();
396 let instance_id = config.worker_instance_id.to_string();
397 if worker_capabilities_csv.is_empty() {
398 // No caps advertised. DEL the per-worker caps string AND
399 // SREM from the index so the scanner doesn't GET an empty
400 // string for a worker that never declares caps.
401 let _ = client
402 .cmd("DEL")
403 .arg(&caps_key)
404 .execute::<Option<i64>>()
405 .await;
406 if let Err(e) = client
407 .cmd("SREM")
408 .arg(&index_key)
409 .arg(&instance_id)
410 .execute::<Option<i64>>()
411 .await
412 {
413 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
414 "SREM workers-index failed; continuing (non-authoritative)");
415 }
416 } else {
417 // Atomic overwrite of the caps STRING (one-command). Then
418 // SADD to the index (idempotent — re-running connect for
419 // the same id is a no-op at SADD level). The per-worker
420 // caps key is written BEFORE the index SADD so that when
421 // the scanner observes the id in the index, the caps key
422 // is guaranteed to resolve to a non-stale CSV (the reverse
423 // order would leak an index entry pointing at a stale or
424 // empty caps key during a narrow window).
425 if let Err(e) = client
426 .cmd("SET")
427 .arg(&caps_key)
428 .arg(&worker_capabilities_csv)
429 .execute::<Option<String>>()
430 .await
431 {
432 tracing::warn!(error = %e, key = %caps_key,
433 "SET worker caps advertisement failed; continuing");
434 }
435 if let Err(e) = client
436 .cmd("SADD")
437 .arg(&index_key)
438 .arg(&instance_id)
439 .execute::<Option<i64>>()
440 .await
441 {
442 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
443 "SADD workers-index failed; continuing");
444 }
445 }
446 }
447
448 // RFC-012 Stage 1b: wrap the dialed client in a
449 // ValkeyBackend so `ClaimedTask`'s trait forwarders have
450 // something to call. `from_client_and_partitions` reuses the
451 // already-dialed client — no second connection.
452 let backend = ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
453 client.clone(),
454 partition_config,
455 );
456
457 Ok(Self {
458 client,
459 config,
460 partition_config,
461 #[cfg(feature = "direct-valkey-claim")]
462 worker_capabilities_csv,
463 #[cfg(feature = "direct-valkey-claim")]
464 worker_capabilities_hash,
465 #[cfg(feature = "direct-valkey-claim")]
466 lane_index: AtomicUsize::new(0),
467 concurrency_semaphore,
468 #[cfg(feature = "direct-valkey-claim")]
469 scan_cursor: AtomicUsize::new(scan_cursor_init),
470 backend,
471 })
472 }
473
474 /// Store a pre-built [`EngineBackend`] on the worker. Builds
475 /// the worker via the legacy [`FlowFabricWorker::connect`] path
476 /// first (so the embedded `ferriskey::Client` that the Stage 1b
477 /// non-migrated hot paths still use is dialed), then replaces
478 /// the default `ValkeyBackend` wrapper with the caller-supplied
479 /// `Arc<dyn EngineBackend>`.
480 ///
481 /// **Stage 1b scope — what the injected backend covers today.**
482 /// After this PR, `ClaimedTask`'s 8 migrated ops
483 /// (`renew_lease` / `update_progress` / `resume_signals` /
484 /// `delay_execution` / `move_to_waiting_children` /
485 /// `complete` / `cancel` / `fail`) route through the injected
486 /// backend. That's the first material use of `connect_with`: a
487 /// mock backend now genuinely sees the worker's per-task
488 /// write-surface calls. The 4 trait-shape-deferred ops
489 /// (`create_pending_waitpoint`, `append_frame`, `suspend`,
490 /// `report_usage`) still reach the embedded
491 /// `ferriskey::Client` directly until issue #117's trait
492 /// amendment lands; `claim_next` / `claim_from_grant` /
493 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
494 /// are Stage 1c hot-path work. Stage 1d removes the embedded
495 /// client entirely.
496 ///
497 /// Today's constructor is therefore NOT yet a drop-in way to swap
498 /// in a non-Valkey backend — it requires a reachable Valkey node
499 /// for the 4 deferred + hot-path ops. Tests that exercise only
500 /// the 8 migrated ops can run fully against a mock backend.
501 ///
502 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
503 pub async fn connect_with(
504 config: WorkerConfig,
505 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
506 ) -> Result<Self, SdkError> {
507 let mut worker = Self::connect(config).await?;
508 worker.backend = backend;
509 Ok(worker)
510 }
511
512 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
513 /// ops through.
514 ///
515 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
516 /// the `Option` wrapper is retained for API stability with the
517 /// Stage-1a shape. Stage 1c narrows the return type to
518 /// `&Arc<dyn EngineBackend>`.
519 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
520 Some(&self.backend)
521 }
522
523 /// Get a reference to the underlying ferriskey client.
524 pub fn client(&self) -> &Client {
525 &self.client
526 }
527
528 /// Get the worker config.
529 pub fn config(&self) -> &WorkerConfig {
530 &self.config
531 }
532
533 /// Get the server-published partition config this worker bound to at
534 /// `connect()`. Callers need this when computing partition hash-tags
535 /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
536 /// with the server's `num_flow_partitions` — using
537 /// `PartitionConfig::default()` assumes 256 partitions and silently
538 /// misses data on deployments with any other value.
539 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
540 &self.partition_config
541 }
542
543 /// Attempt to claim the next eligible execution.
544 ///
545 /// Phase 1 simplified claim flow:
546 /// 1. Pick a lane (round-robin across configured lanes)
547 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
548 /// 3. Claim the execution via `ff_claim_execution`
549 /// 4. Read execution payload + tags
550 /// 5. Return a [`ClaimedTask`] with auto lease renewal
551 ///
552 /// Gated behind the `direct-valkey-claim` feature — bypasses the
553 /// scheduler's budget / quota / capability admission checks. Enable
554 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
555 /// the scheduler hop would be measurement noise (benches) or when
556 /// the test harness needs a deterministic worker-local path. Prefer
557 /// the scheduler-routed HTTP claim path in production.
558 ///
559 /// # `None` semantics
560 ///
561 /// `Ok(None)` means **no work was found in the partition window this
562 /// poll covered**, not "the cluster is idle". Each call scans a chunk
563 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
564 /// `scan_cursor`; the cursor advances by that chunk size on every
565 /// invocation, so a worker covers every partition exactly once every
566 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
567 ///
568 /// Callers should treat `None` as "poll again soon" (typically after
569 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
570 /// time". Backing off too aggressively on `None` can starve workers
571 /// when work lives on partitions outside the current window.
572 ///
573 /// Returns `Err` on Valkey errors or script failures.
574 #[cfg(feature = "direct-valkey-claim")]
575 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
576 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
577 // try_acquire returns immediately — if no permits available, the worker
578 // is at capacity and should not claim more work.
579 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
580 Ok(p) => p,
581 Err(_) => return Ok(None), // At capacity — no claim attempted
582 };
583
584 let lane_id = self.next_lane();
585 let now = TimestampMs::now();
586
587 // Phase 1: We scan eligible executions directly by reading the eligible
588 // ZSET across execution partitions. In production the scheduler
589 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
590 // simplified inline claim.
591 //
592 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
593 // partitions starting at a rolling offset. This keeps idle Valkey
594 // load at O(chunk) per worker-second instead of O(num_partitions),
595 // and the worker-instance-seeded initial cursor spreads concurrent
596 // workers across different partition windows.
597 let num_partitions = self.partition_config.num_flow_partitions as usize;
598 if num_partitions == 0 {
599 return Ok(None);
600 }
601 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
602 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
603
604 for step in 0..chunk {
605 let partition_idx = ((start + step) % num_partitions) as u16;
606 let partition = ff_core::partition::Partition {
607 family: ff_core::partition::PartitionFamily::Execution,
608 index: partition_idx,
609 };
610 let idx = IndexKeys::new(&partition);
611 let eligible_key = idx.lane_eligible(&lane_id);
612
613 // ZRANGEBYSCORE to get the highest-priority eligible execution.
614 // Score format: -(priority * 1_000_000_000_000) + created_at_ms
615 // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
616 let result: Value = self
617 .client
618 .cmd("ZRANGEBYSCORE")
619 .arg(&eligible_key)
620 .arg("-inf")
621 .arg("+inf")
622 .arg("LIMIT")
623 .arg("0")
624 .arg("1")
625 .execute()
626 .await
627 .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
628
629 let execution_id_str = match extract_first_array_string(&result) {
630 Some(s) => s,
631 None => continue, // No eligible executions on this partition
632 };
633
634 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
635 SdkError::from(ff_script::error::ScriptError::Parse {
636 fcall: "claim_execution_from_eligible_set".into(),
637 execution_id: None,
638 message: format!("bad execution_id in eligible set: {e}"),
639 })
640 })?;
641
642 // Step 1: Issue claim grant
643 let grant_result = self
644 .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
645 .await;
646
647 match grant_result {
648 Ok(()) => {}
649 Err(SdkError::Engine(ref boxed))
650 if matches!(
651 **boxed,
652 crate::EngineError::Validation {
653 kind: crate::ValidationKind::CapabilityMismatch,
654 ..
655 }
656 ) =>
657 {
658 let missing = match &**boxed {
659 crate::EngineError::Validation { detail, .. } => detail.clone(),
660 _ => unreachable!(),
661 };
662 // Block-on-mismatch (RFC-009 §7.5) — parity with
663 // ff-scheduler's Scheduler::claim_for_worker. Without
664 // this, the inline-direct-claim path would hot-loop
665 // on an unclaimable top-of-zset (every tick picks the
666 // same execution, wastes an FCALL, logs, releases,
667 // repeats). The scheduler-side unblock scanner
668 // promotes blocked_route executions back to eligible
669 // when a worker with matching caps registers.
670 tracing::info!(
671 execution_id = %execution_id,
672 worker_id = %self.config.worker_id,
673 worker_caps_hash = %self.worker_capabilities_hash,
674 missing = %missing,
675 "capability mismatch, blocking execution off eligible (SDK inline claim)"
676 );
677 self.block_route(&execution_id, &lane_id, &partition, &idx).await;
678 continue;
679 }
680 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
681 tracing::debug!(
682 execution_id = %execution_id,
683 error = %e,
684 "claim grant failed (retryable), trying next partition"
685 );
686 continue;
687 }
688 Err(e) => return Err(e),
689 }
690
691 // Step 2: Claim the execution
692 match self
693 .claim_execution(&execution_id, &lane_id, &partition, now)
694 .await
695 {
696 Ok(mut task) => {
697 // Transfer concurrency permit to the task. When the task is
698 // completed/failed/cancelled/dropped the permit returns to
699 // the semaphore, allowing another claim.
700 task.set_concurrency_permit(permit);
701 return Ok(Some(task));
702 }
703 Err(SdkError::Engine(ref boxed))
704 if matches!(
705 **boxed,
706 crate::EngineError::Contention(
707 crate::ContentionKind::UseClaimResumedExecution
708 )
709 ) =>
710 {
711 // Execution was resumed from suspension — attempt_interrupted.
712 // ff_claim_execution rejects this; use ff_claim_resumed_execution
713 // which reuses the existing attempt instead of creating a new one.
714 tracing::debug!(
715 execution_id = %execution_id,
716 "execution is resumed, using claim_resumed path"
717 );
718 match self
719 .claim_resumed_execution(&execution_id, &lane_id, &partition)
720 .await
721 {
722 Ok(mut task) => {
723 task.set_concurrency_permit(permit);
724 return Ok(Some(task));
725 }
726 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
727 tracing::debug!(
728 execution_id = %execution_id,
729 error = %e2,
730 "claim_resumed failed (retryable), trying next partition"
731 );
732 continue;
733 }
734 Err(e2) => return Err(e2),
735 }
736 }
737 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
738 tracing::debug!(
739 execution_id = %execution_id,
740 error = %e,
741 "claim execution failed (retryable), trying next partition"
742 );
743 continue;
744 }
745 Err(e) => return Err(e),
746 }
747 }
748
749 // No eligible work found on any partition
750 Ok(None)
751 }
752
753 #[cfg(feature = "direct-valkey-claim")]
754 async fn issue_claim_grant(
755 &self,
756 execution_id: &ExecutionId,
757 lane_id: &LaneId,
758 partition: &ff_core::partition::Partition,
759 idx: &IndexKeys,
760 ) -> Result<(), SdkError> {
761 let ctx = ExecKeyContext::new(partition, execution_id);
762
763 // KEYS (3): exec_core, claim_grant_key, eligible_zset
764 let keys: Vec<String> = vec![
765 ctx.core(),
766 ctx.claim_grant(),
767 idx.lane_eligible(lane_id),
768 ];
769
770 // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
771 // capability_hash, grant_ttl_ms, route_snapshot_json,
772 // admission_summary, worker_capabilities_csv (sorted)
773 let args: Vec<String> = vec![
774 execution_id.to_string(),
775 self.config.worker_id.to_string(),
776 self.config.worker_instance_id.to_string(),
777 lane_id.to_string(),
778 String::new(), // capability_hash
779 "5000".to_owned(), // grant_ttl_ms (5 seconds)
780 String::new(), // route_snapshot_json
781 String::new(), // admission_summary
782 self.worker_capabilities_csv.clone(), // sorted CSV
783 ];
784
785 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
786 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
787
788 let raw: Value = self
789 .client
790 .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
791 .await
792 .map_err(SdkError::Valkey)?;
793
794 crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
795 }
796
797 /// Move an execution from the lane's eligible ZSET into its
798 /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
799 /// after a `CapabilityMismatch` reject — without this, the inline
800 /// direct-claim path would re-pick the same top-of-zset every tick
801 /// (same pattern the scheduler's block_candidate handles). The
802 /// engine's unblock scanner periodically promotes blocked_route
803 /// back to eligible once a worker with matching caps registers.
804 ///
805 /// Best-effort: transport or logical rejects (e.g. the execution
806 /// already went terminal between pick and block) are logged and the
807 /// outer loop simply `continue`s to the next partition. Parity with
808 /// ff-scheduler::Scheduler::block_candidate.
809 #[cfg(feature = "direct-valkey-claim")]
810 async fn block_route(
811 &self,
812 execution_id: &ExecutionId,
813 lane_id: &LaneId,
814 partition: &ff_core::partition::Partition,
815 idx: &IndexKeys,
816 ) {
817 let ctx = ExecKeyContext::new(partition, execution_id);
818 let core_key = ctx.core();
819 let eligible_key = idx.lane_eligible(lane_id);
820 let blocked_key = idx.lane_blocked_route(lane_id);
821 let eid_s = execution_id.to_string();
822 let now_ms = TimestampMs::now().0.to_string();
823
824 let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
825 let argv: [&str; 4] = [
826 &eid_s,
827 "waiting_for_capable_worker",
828 "no connected worker satisfies required_capabilities",
829 &now_ms,
830 ];
831
832 match self
833 .client
834 .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
835 .await
836 {
837 Ok(v) => {
838 // Parse Lua result so a logical reject (e.g. execution
839 // went terminal mid-flight) is visible — same fix we
840 // applied to ff-scheduler's block_candidate.
841 if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
842 tracing::warn!(
843 execution_id = %execution_id,
844 error = %e,
845 "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
846 will re-evaluate"
847 );
848 }
849 }
850 Err(e) => {
851 tracing::warn!(
852 execution_id = %execution_id,
853 error = %e,
854 "SDK block_route: transport failure; eligible ZSET unchanged"
855 );
856 }
857 }
858 }
859
860 /// Low-level claim of a granted execution. Invokes
861 /// `ff_claim_execution` and returns a `ClaimedTask` with auto
862 /// lease renewal.
863 ///
864 /// Previously gated behind `direct-valkey-claim`; ungated so
865 /// the public [`claim_from_grant`] entry point can reuse the
866 /// same FCALL plumbing. The method stays private — external
867 /// callers use `claim_from_grant`.
868 ///
869 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
870 async fn claim_execution(
871 &self,
872 execution_id: &ExecutionId,
873 lane_id: &LaneId,
874 partition: &ff_core::partition::Partition,
875 _now: TimestampMs,
876 ) -> Result<ClaimedTask, SdkError> {
877 let ctx = ExecKeyContext::new(partition, execution_id);
878 let idx = IndexKeys::new(partition);
879
880 // Pre-read total_attempt_count from exec_core to derive next attempt index.
881 // The Lua uses total_attempt_count as the new index and dynamically builds
882 // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
883 // we pass the correct index for documentation/debugging.
884 let total_str: Option<String> = self.client
885 .cmd("HGET")
886 .arg(ctx.core())
887 .arg("total_attempt_count")
888 .execute()
889 .await
890 .unwrap_or(None);
891 let next_idx = total_str
892 .as_deref()
893 .and_then(|s| s.parse::<u32>().ok())
894 .unwrap_or(0);
895 let att_idx = AttemptIndex::new(next_idx);
896
897 let lease_id = LeaseId::new().to_string();
898 let attempt_id = AttemptId::new().to_string();
899 let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
900
901 // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
902 let keys: Vec<String> = vec![
903 ctx.core(), // 1 exec_core
904 ctx.claim_grant(), // 2 claim_grant
905 idx.lane_eligible(lane_id), // 3 eligible_zset
906 idx.lease_expiry(), // 4 lease_expiry_zset
907 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
908 ctx.attempt_hash(att_idx), // 6 attempt_hash (placeholder)
909 ctx.attempt_usage(att_idx), // 7 attempt_usage (placeholder)
910 ctx.attempt_policy(att_idx), // 8 attempt_policy (placeholder)
911 ctx.attempts(), // 9 attempts_zset
912 ctx.lease_current(), // 10 lease_current
913 ctx.lease_history(), // 11 lease_history
914 idx.lane_active(lane_id), // 12 active_index
915 idx.attempt_timeout(), // 13 attempt_timeout_zset
916 idx.execution_deadline(), // 14 execution_deadline_zset
917 ];
918
919 // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
920 let args: Vec<String> = vec![
921 execution_id.to_string(), // 1 execution_id
922 self.config.worker_id.to_string(), // 2 worker_id
923 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
924 lane_id.to_string(), // 4 lane
925 String::new(), // 5 capability_hash
926 lease_id.clone(), // 6 lease_id
927 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
928 renew_before_ms.to_string(), // 8 renew_before_ms
929 attempt_id.clone(), // 9 attempt_id
930 "{}".to_owned(), // 10 attempt_policy_json
931 String::new(), // 11 attempt_timeout_ms
932 String::new(), // 12 execution_deadline_at
933 ];
934
935 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
936 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
937
938 let raw: Value = self
939 .client
940 .fcall("ff_claim_execution", &key_refs, &arg_refs)
941 .await
942 .map_err(SdkError::Valkey)?;
943
944 // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
945 // attempt_id, attempt_type, lease_expires_at}
946 let arr = match &raw {
947 Value::Array(arr) => arr,
948 _ => {
949 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
950 fcall: "ff_claim_execution".into(),
951 execution_id: Some(execution_id.to_string()),
952 message: "expected Array".into(),
953 }));
954 }
955 };
956
957 let status_code = match arr.first() {
958 Some(Ok(Value::Int(n))) => *n,
959 _ => {
960 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
961 fcall: "ff_claim_execution".into(),
962 execution_id: Some(execution_id.to_string()),
963 message: "bad status code".into(),
964 }));
965 }
966 };
967
968 if status_code != 1 {
969 let err_field_str = |idx: usize| -> String {
970 arr.get(idx)
971 .and_then(|v| match v {
972 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
973 Ok(Value::SimpleString(s)) => Some(s.clone()),
974 _ => None,
975 })
976 .unwrap_or_default()
977 };
978 let error_code = {
979 let s = err_field_str(1);
980 if s.is_empty() { "unknown".to_owned() } else { s }
981 };
982 let detail = err_field_str(2);
983
984 return Err(SdkError::from(
985 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
986 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
987 fcall: "ff_claim_execution".into(),
988 execution_id: Some(execution_id.to_string()),
989 message: format!("unknown error: {error_code}"),
990 }),
991 ));
992 }
993
994 // Extract fields from success response
995 let field_str = |idx: usize| -> String {
996 arr.get(idx + 2) // skip status_code and "OK"
997 .and_then(|v| match v {
998 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
999 Ok(Value::SimpleString(s)) => Some(s.clone()),
1000 Ok(Value::Int(n)) => Some(n.to_string()),
1001 _ => None,
1002 })
1003 .unwrap_or_default()
1004 };
1005
1006 // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1007 // Positions: 0 1 2 3 4 5
1008 let lease_id = LeaseId::parse(&field_str(0))
1009 .unwrap_or_else(|_| LeaseId::new());
1010 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1011 // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1012 let attempt_id = AttemptId::parse(&field_str(3))
1013 .unwrap_or_else(|_| AttemptId::new());
1014 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1015
1016 // Read execution payload and metadata
1017 let (input_payload, execution_kind, tags) = self
1018 .read_execution_context(execution_id, partition)
1019 .await?;
1020
1021 Ok(ClaimedTask::new(
1022 self.client.clone(),
1023 self.backend.clone(),
1024 self.partition_config,
1025 execution_id.clone(),
1026 attempt_index,
1027 attempt_id,
1028 lease_id,
1029 lease_epoch,
1030 self.config.lease_ttl_ms,
1031 lane_id.clone(),
1032 self.config.worker_instance_id.clone(),
1033 input_payload,
1034 execution_kind,
1035 tags,
1036 ))
1037 }
1038
1039 /// Consume a [`ClaimGrant`] and claim the granted execution on
1040 /// this worker. The intended production entry point: pair with
1041 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1042 /// scheduler-issued grants into the SDK without enabling the
1043 /// `direct-valkey-claim` feature (which bypasses budget/quota
1044 /// admission control).
1045 ///
1046 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1047 /// so a saturated worker does not consume the grant: the grant
1048 /// stays valid for its remaining TTL and the caller can either
1049 /// release it back to the scheduler or retry after some other
1050 /// in-flight task completes.
1051 ///
1052 /// On success the returned [`ClaimedTask`] holds a concurrency
1053 /// permit that releases automatically on
1054 /// `complete`/`fail`/`cancel`/drop — same contract as
1055 /// `claim_next`.
1056 ///
1057 /// # Arguments
1058 ///
1059 /// * `lane` — the lane the grant was issued for. Must match what
1060 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1061 /// uses it to look up `lane_eligible`, `lane_active`, and the
1062 /// `worker_leases` index slot.
1063 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1064 ///
1065 /// # Errors
1066 ///
1067 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1068 /// permits all held. Retryable; the grant is untouched.
1069 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1070 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1071 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1072 /// (wrapped in [`SdkError::Engine`]).
1073 /// * `ScriptError::CapabilityMismatch` — execution's required
1074 /// capabilities not a subset of this worker's caps (wrapped in
1075 /// [`SdkError::Engine`]). Surfaced post-grant if a race
1076 /// between grant issuance and caps change allows it.
1077 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1078 /// unexpected shape (wrapped in [`SdkError::Engine`]).
1079 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1080 /// transport error during the FCALL or the
1081 /// `read_execution_context` follow-up.
1082 ///
1083 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1084 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1085 pub async fn claim_from_grant(
1086 &self,
1087 lane: LaneId,
1088 grant: ff_core::contracts::ClaimGrant,
1089 ) -> Result<ClaimedTask, SdkError> {
1090 // Semaphore check FIRST. If the worker is saturated we must
1091 // surface the condition to the caller without touching the
1092 // grant — silently returning Ok(None) (as claim_next does)
1093 // would drop a grant the scheduler has already committed work
1094 // to issuing, wasting the slot until its TTL elapses.
1095 let permit = self
1096 .concurrency_semaphore
1097 .clone()
1098 .try_acquire_owned()
1099 .map_err(|_| SdkError::WorkerAtCapacity)?;
1100
1101 let now = TimestampMs::now();
1102 let partition = grant.partition().map_err(|e| SdkError::Config {
1103 context: "claim_from_grant".to_owned(),
1104 field: Some("partition_key".to_owned()),
1105 message: e.to_string(),
1106 })?;
1107 let mut task = self
1108 .claim_execution(&grant.execution_id, &lane, &partition, now)
1109 .await?;
1110 task.set_concurrency_permit(permit);
1111 Ok(task)
1112 }
1113
1114 /// Scheduler-routed claim: POST the server's
1115 /// `/v1/workers/{id}/claim`, then chain to
1116 /// [`Self::claim_from_grant`].
1117 ///
1118 /// Batch C item 2 PR-B. This is the production entry point —
1119 /// budget + quota + capability admission run server-side inside
1120 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1121 /// enable the `direct-valkey-claim` feature.
1122 ///
1123 /// Returns `Ok(None)` when the server says no eligible execution
1124 /// (HTTP 204). Callers typically back off by
1125 /// `config.claim_poll_interval_ms` and try again, same cadence
1126 /// as the direct-claim path's `Ok(None)`.
1127 ///
1128 /// The `admin` client is the established HTTP surface
1129 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1130 /// second reqwest client around. Build once at worker boot and
1131 /// hand in by reference on every claim.
1132 pub async fn claim_via_server(
1133 &self,
1134 admin: &crate::FlowFabricAdminClient,
1135 lane: &LaneId,
1136 grant_ttl_ms: u64,
1137 ) -> Result<Option<ClaimedTask>, SdkError> {
1138 let req = crate::admin::ClaimForWorkerRequest {
1139 worker_id: self.config.worker_id.to_string(),
1140 lane_id: lane.to_string(),
1141 worker_instance_id: self.config.worker_instance_id.to_string(),
1142 capabilities: self.config.capabilities.clone(),
1143 grant_ttl_ms,
1144 };
1145 let Some(resp) = admin.claim_for_worker(req).await? else {
1146 return Ok(None);
1147 };
1148 let grant = resp.into_grant()?;
1149 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1150 }
1151
1152 /// Consume a [`ReclaimGrant`] and transition the granted
1153 /// `attempt_interrupted` execution into a `started` state on this
1154 /// worker. Symmetric partner to [`claim_from_grant`] for the
1155 /// resume path.
1156 ///
1157 /// The grant must have been issued to THIS worker (matching
1158 /// `worker_id` at grant time). A mismatch returns
1159 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1160 /// atomically by `ff_claim_resumed_execution`; a second call with
1161 /// the same grant also returns `InvalidClaimGrant`.
1162 ///
1163 /// # Concurrency
1164 ///
1165 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1166 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1167 /// assume pre-existing capacity on this worker — a reclaim can
1168 /// land on a fresh worker instance that just came up after a
1169 /// crash/restart and is picking up a previously-interrupted
1170 /// execution. If the worker is saturated, the grant stays valid
1171 /// for its remaining TTL and the caller can release it or retry.
1172 ///
1173 /// On success the returned [`ClaimedTask`] holds a concurrency
1174 /// permit that releases automatically on
1175 /// `complete`/`fail`/`cancel`/drop.
1176 ///
1177 /// # Errors
1178 ///
1179 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1180 /// permits all held. Retryable; the grant is untouched (no
1181 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1182 /// atomically consume the grant key).
1183 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1184 /// or `worker_id` mismatch.
1185 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1186 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1187 /// `attempt_interrupted`.
1188 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1189 /// not `runnable`.
1190 /// * `ScriptError::ExecutionNotFound` — core key missing.
1191 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1192 /// transport.
1193 ///
1194 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1195 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1196 pub async fn claim_from_reclaim_grant(
1197 &self,
1198 grant: ff_core::contracts::ReclaimGrant,
1199 ) -> Result<ClaimedTask, SdkError> {
1200 // Semaphore check FIRST — same load-bearing ordering as
1201 // `claim_from_grant`. If the worker is saturated, surface
1202 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1203 // atomic consume on the grant key, so calling it past-
1204 // saturation would destroy the grant while leaving no
1205 // permit to attach to the returned `ClaimedTask`.
1206 let permit = self
1207 .concurrency_semaphore
1208 .clone()
1209 .try_acquire_owned()
1210 .map_err(|_| SdkError::WorkerAtCapacity)?;
1211
1212 // Grant carries partition + lane_id so no round-trip is needed
1213 // to resolve them before the FCALL.
1214 let partition = grant.partition().map_err(|e| SdkError::Config {
1215 context: "claim_from_reclaim_grant".to_owned(),
1216 field: Some("partition_key".to_owned()),
1217 message: e.to_string(),
1218 })?;
1219 let mut task = self
1220 .claim_resumed_execution(
1221 &grant.execution_id,
1222 &grant.lane_id,
1223 &partition,
1224 )
1225 .await?;
1226 task.set_concurrency_permit(permit);
1227 Ok(task)
1228 }
1229
1230 /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1231 /// and returns a `ClaimedTask` bound to the resumed attempt.
1232 ///
1233 /// Previously gated behind `direct-valkey-claim`; ungated so the
1234 /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1235 /// The method stays private — external callers use
1236 /// `claim_from_reclaim_grant`.
1237 ///
1238 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1239 async fn claim_resumed_execution(
1240 &self,
1241 execution_id: &ExecutionId,
1242 lane_id: &LaneId,
1243 partition: &ff_core::partition::Partition,
1244 ) -> Result<ClaimedTask, SdkError> {
1245 let ctx = ExecKeyContext::new(partition, execution_id);
1246 let idx = IndexKeys::new(partition);
1247
1248 // Pre-read current_attempt_index for the existing attempt hash key.
1249 // This is load-bearing: KEYS[6] must point to the real attempt hash.
1250 let att_idx_str: Option<String> = self.client
1251 .cmd("HGET")
1252 .arg(ctx.core())
1253 .arg("current_attempt_index")
1254 .execute()
1255 .await
1256 .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1257 let att_idx = AttemptIndex::new(
1258 att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1259 );
1260
1261 let lease_id = LeaseId::new().to_string();
1262
1263 // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1264 let keys: Vec<String> = vec![
1265 ctx.core(), // 1 exec_core
1266 ctx.claim_grant(), // 2 claim_grant
1267 idx.lane_eligible(lane_id), // 3 eligible_zset
1268 idx.lease_expiry(), // 4 lease_expiry_zset
1269 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1270 ctx.attempt_hash(att_idx), // 6 existing_attempt_hash
1271 ctx.lease_current(), // 7 lease_current
1272 ctx.lease_history(), // 8 lease_history
1273 idx.lane_active(lane_id), // 9 active_index
1274 idx.attempt_timeout(), // 10 attempt_timeout_zset
1275 idx.execution_deadline(), // 11 execution_deadline_zset
1276 ];
1277
1278 // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1279 let args: Vec<String> = vec![
1280 execution_id.to_string(), // 1 execution_id
1281 self.config.worker_id.to_string(), // 2 worker_id
1282 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1283 lane_id.to_string(), // 4 lane
1284 String::new(), // 5 capability_hash
1285 lease_id.clone(), // 6 lease_id
1286 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1287 String::new(), // 8 remaining_attempt_timeout_ms
1288 ];
1289
1290 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1291 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1292
1293 let raw: Value = self
1294 .client
1295 .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1296 .await
1297 .map_err(SdkError::Valkey)?;
1298
1299 // Parse result — same format as ff_claim_execution:
1300 // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1301 let arr = match &raw {
1302 Value::Array(arr) => arr,
1303 _ => {
1304 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1305 fcall: "ff_claim_resumed_execution".into(),
1306 execution_id: Some(execution_id.to_string()),
1307 message: "expected Array".into(),
1308 }));
1309 }
1310 };
1311
1312 let status_code = match arr.first() {
1313 Some(Ok(Value::Int(n))) => *n,
1314 _ => {
1315 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1316 fcall: "ff_claim_resumed_execution".into(),
1317 execution_id: Some(execution_id.to_string()),
1318 message: "bad status code".into(),
1319 }));
1320 }
1321 };
1322
1323 if status_code != 1 {
1324 let err_field_str = |idx: usize| -> String {
1325 arr.get(idx)
1326 .and_then(|v| match v {
1327 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1328 Ok(Value::SimpleString(s)) => Some(s.clone()),
1329 _ => None,
1330 })
1331 .unwrap_or_default()
1332 };
1333 let error_code = {
1334 let s = err_field_str(1);
1335 if s.is_empty() { "unknown".to_owned() } else { s }
1336 };
1337 let detail = err_field_str(2);
1338
1339 return Err(SdkError::from(
1340 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1341 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1342 fcall: "ff_claim_resumed_execution".into(),
1343 execution_id: Some(execution_id.to_string()),
1344 message: format!("unknown error: {error_code}"),
1345 }),
1346 ));
1347 }
1348
1349 let field_str = |idx: usize| -> String {
1350 arr.get(idx + 2)
1351 .and_then(|v| match v {
1352 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1353 Ok(Value::SimpleString(s)) => Some(s.clone()),
1354 Ok(Value::Int(n)) => Some(n.to_string()),
1355 _ => None,
1356 })
1357 .unwrap_or_default()
1358 };
1359
1360 let lease_id = LeaseId::parse(&field_str(0))
1361 .unwrap_or_else(|_| LeaseId::new());
1362 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1363 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1364 let attempt_id = AttemptId::parse(&field_str(3))
1365 .unwrap_or_else(|_| AttemptId::new());
1366
1367 let (input_payload, execution_kind, tags) = self
1368 .read_execution_context(execution_id, partition)
1369 .await?;
1370
1371 Ok(ClaimedTask::new(
1372 self.client.clone(),
1373 self.backend.clone(),
1374 self.partition_config,
1375 execution_id.clone(),
1376 attempt_index,
1377 attempt_id,
1378 lease_id,
1379 lease_epoch,
1380 self.config.lease_ttl_ms,
1381 lane_id.clone(),
1382 self.config.worker_instance_id.clone(),
1383 input_payload,
1384 execution_kind,
1385 tags,
1386 ))
1387 }
1388
1389 /// Read payload + execution_kind + tags from exec_core. Previously
1390 /// gated behind `direct-valkey-claim`; now shared by the
1391 /// feature-gated inline claim path and the public
1392 /// `claim_from_reclaim_grant` entry point.
1393 async fn read_execution_context(
1394 &self,
1395 execution_id: &ExecutionId,
1396 partition: &ff_core::partition::Partition,
1397 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1398 let ctx = ExecKeyContext::new(partition, execution_id);
1399
1400 // Read payload
1401 let payload: Option<String> = self
1402 .client
1403 .get(&ctx.payload())
1404 .await
1405 .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1406 let input_payload = payload.unwrap_or_default().into_bytes();
1407
1408 // Read execution_kind from core
1409 let kind: Option<String> = self
1410 .client
1411 .hget(&ctx.core(), "execution_kind")
1412 .await
1413 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1414 let execution_kind = kind.unwrap_or_default();
1415
1416 // Read tags
1417 let tags: HashMap<String, String> = self
1418 .client
1419 .hgetall(&ctx.tags())
1420 .await
1421 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1422
1423 Ok((input_payload, execution_kind, tags))
1424 }
1425
1426 // ── Phase 3: Signal delivery ──
1427
1428 /// Deliver a signal to a suspended execution's waitpoint.
1429 ///
1430 /// The engine atomically records the signal, evaluates the resume condition,
1431 /// and optionally transitions the execution from `suspended` to `runnable`.
1432 pub async fn deliver_signal(
1433 &self,
1434 execution_id: &ExecutionId,
1435 waitpoint_id: &WaitpointId,
1436 signal: crate::task::Signal,
1437 ) -> Result<crate::task::SignalOutcome, SdkError> {
1438 let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1439 let ctx = ExecKeyContext::new(&partition, execution_id);
1440 let idx = IndexKeys::new(&partition);
1441
1442 let signal_id = ff_core::types::SignalId::new();
1443 let now = TimestampMs::now();
1444
1445 // Pre-read lane_id from exec_core — the execution may be on any lane,
1446 // not necessarily one of this worker's configured lanes.
1447 let lane_str: Option<String> = self
1448 .client
1449 .hget(&ctx.core(), "lane_id")
1450 .await
1451 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1452 let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1453
1454 // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1455 // exec_signals_zset, signal_hash, signal_payload,
1456 // idem_key, waitpoint_hash, suspension_current,
1457 // eligible_zset, suspended_zset, delayed_zset,
1458 // suspension_timeout_zset, hmac_secrets
1459 let idem_key = if let Some(ref ik) = signal.idempotency_key {
1460 ctx.signal_dedup(waitpoint_id, ik)
1461 } else {
1462 ctx.noop() // must share {p:N} hash tag for cluster mode
1463 };
1464 let keys: Vec<String> = vec![
1465 ctx.core(), // 1
1466 ctx.waitpoint_condition(waitpoint_id), // 2
1467 ctx.waitpoint_signals(waitpoint_id), // 3
1468 ctx.exec_signals(), // 4
1469 ctx.signal(&signal_id), // 5
1470 ctx.signal_payload(&signal_id), // 6
1471 idem_key, // 7
1472 ctx.waitpoint(waitpoint_id), // 8
1473 ctx.suspension_current(), // 9
1474 idx.lane_eligible(&lane_id), // 10
1475 idx.lane_suspended(&lane_id), // 11
1476 idx.lane_delayed(&lane_id), // 12
1477 idx.suspension_timeout(), // 13
1478 idx.waitpoint_hmac_secrets(), // 14
1479 ];
1480
1481 let payload_str = signal
1482 .payload
1483 .as_ref()
1484 .map(|p| String::from_utf8_lossy(p).into_owned())
1485 .unwrap_or_default();
1486
1487 // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1488 // signal_category, source_type, source_identity,
1489 // payload, payload_encoding, idempotency_key,
1490 // correlation_id, target_scope, created_at,
1491 // dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1492 // max_signals_per_execution, waitpoint_token
1493 let args: Vec<String> = vec![
1494 signal_id.to_string(), // 1
1495 execution_id.to_string(), // 2
1496 waitpoint_id.to_string(), // 3
1497 signal.signal_name, // 4
1498 signal.signal_category, // 5
1499 signal.source_type, // 6
1500 signal.source_identity, // 7
1501 payload_str, // 8
1502 "json".to_owned(), // 9 payload_encoding
1503 signal.idempotency_key.unwrap_or_default(), // 10
1504 String::new(), // 11 correlation_id
1505 "waitpoint".to_owned(), // 12 target_scope
1506 now.to_string(), // 13 created_at
1507 "86400000".to_owned(), // 14 dedup_ttl_ms
1508 "0".to_owned(), // 15 resume_delay_ms
1509 "1000".to_owned(), // 16 signal_maxlen
1510 "10000".to_owned(), // 17 max_signals
1511 // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1512 // use ToString/Display (those are redacted for log safety);
1513 // .as_str() is the explicit opt-in that gets the secret bytes.
1514 signal.waitpoint_token.as_str().to_owned(), // 18 waitpoint_token
1515 ];
1516
1517 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1518 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1519
1520 let raw: Value = self
1521 .client
1522 .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1523 .await
1524 .map_err(SdkError::Valkey)?;
1525
1526 crate::task::parse_signal_result(&raw)
1527 }
1528
1529 #[cfg(feature = "direct-valkey-claim")]
1530 fn next_lane(&self) -> LaneId {
1531 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1532 self.config.lanes[idx].clone()
1533 }
1534}
1535
1536#[cfg(feature = "direct-valkey-claim")]
1537fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1538 use ff_core::error::ErrorClass;
1539 matches!(
1540 ff_script::engine_error_ext::class(err),
1541 ErrorClass::Retryable | ErrorClass::Informational
1542 )
1543}
1544
1545/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1546/// instance id with FNV-1a to place distinct worker processes on different
1547/// partition windows from their first poll. Zero is valid for single-worker
1548/// clusters but spreads work in multi-worker deployments.
1549#[cfg(feature = "direct-valkey-claim")]
1550fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1551 if num_partitions == 0 {
1552 return 0;
1553 }
1554 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1555}
1556
1557#[cfg(feature = "direct-valkey-claim")]
1558fn extract_first_array_string(value: &Value) -> Option<String> {
1559 match value {
1560 Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1561 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1562 Ok(Value::SimpleString(s)) => Some(s.clone()),
1563 _ => None,
1564 },
1565 _ => None,
1566 }
1567}
1568
1569/// Read partition config from Valkey's `ff:config:partitions` hash.
1570/// Returns Err if the key doesn't exist or can't be read.
1571async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1572 let key = ff_core::keys::global_config_partitions();
1573 let fields: HashMap<String, String> = client
1574 .hgetall(&key)
1575 .await
1576 .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1577
1578 if fields.is_empty() {
1579 return Err(SdkError::Config {
1580 context: "read_partition_config".into(),
1581 field: None,
1582 message: "ff:config:partitions not found in Valkey".into(),
1583 });
1584 }
1585
1586 let parse = |field: &str, default: u16| -> u16 {
1587 fields
1588 .get(field)
1589 .and_then(|v| v.parse().ok())
1590 .filter(|&n: &u16| n > 0)
1591 .unwrap_or(default)
1592 };
1593
1594 Ok(PartitionConfig {
1595 num_flow_partitions: parse("num_flow_partitions", 256),
1596 num_budget_partitions: parse("num_budget_partitions", 32),
1597 num_quota_partitions: parse("num_quota_partitions", 32),
1598 })
1599}
1600
1601#[cfg(all(test, feature = "direct-valkey-claim"))]
1602mod scan_cursor_tests {
1603 use super::scan_cursor_seed;
1604
1605 #[test]
1606 fn stable_for_same_input() {
1607 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1608 }
1609
1610 #[test]
1611 fn distinct_for_different_ids() {
1612 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1613 }
1614
1615 #[test]
1616 fn bounded_by_partition_count() {
1617 for i in 0..100 {
1618 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1619 }
1620 }
1621
1622 #[test]
1623 fn zero_partitions_returns_zero() {
1624 assert_eq!(scan_cursor_seed("w1", 0), 0);
1625 }
1626}