ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43/// if let Some(task) = worker.claim_next().await? {
44/// // Process task...
45/// task.complete(Some(b"result".to_vec())).await?;
46/// } else {
47/// tokio::time::sleep(Duration::from_secs(1)).await;
48/// }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52 client: Client,
53 config: WorkerConfig,
54 partition_config: PartitionConfig,
55 /// Sorted, deduplicated, comma-separated capabilities — computed once
56 /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57 /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58 /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59 /// reproducible logs and tests.
60 #[cfg(feature = "direct-valkey-claim")]
61 worker_capabilities_csv: String,
62 /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63 /// per-mismatch logs so the 4KB CSV never echoes on every reject
64 /// during an incident. Full CSV logged once at connect-time WARN for
65 /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66 #[cfg(feature = "direct-valkey-claim")]
67 worker_capabilities_hash: String,
68 #[cfg(feature = "direct-valkey-claim")]
69 lane_index: AtomicUsize,
70 /// Concurrency cap for in-flight tasks. Permits are acquired or
71 /// transferred by [`claim_next`] (feature-gated),
72 /// [`claim_from_grant`] (always available), and
73 /// [`claim_from_reclaim_grant`], transferred to the returned
74 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75 /// Holds `max_concurrent_tasks` permits total.
76 ///
77 /// [`claim_next`]: FlowFabricWorker::claim_next
78 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80 concurrency_semaphore: Arc<Semaphore>,
81 /// Rolling offset for chunked partition scans. Each poll advances the
82 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83 /// chunk)` polls every partition is covered. The initial value is
84 /// derived from `worker_instance_id` so idle workers spread their
85 /// scans across different partitions from the first poll onward.
86 ///
87 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91 /// correctness because the sequence simply restarts a new cycle.
92 #[cfg(feature = "direct-valkey-claim")]
93 scan_cursor: AtomicUsize,
94}
95
96/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
97/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
98/// O(num_flow_partitions).
99#[cfg(feature = "direct-valkey-claim")]
100const PARTITION_SCAN_CHUNK: usize = 32;
101
102impl FlowFabricWorker {
103 /// Connect to Valkey and prepare the worker.
104 ///
105 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
106 /// library — that is the server's responsibility (ff-server calls
107 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
108 /// the library is already loaded.
109 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
110 if config.lanes.is_empty() {
111 return Err(SdkError::Config("at least one lane is required".into()));
112 }
113
114 let mut builder = ClientBuilder::new()
115 .host(&config.host, config.port)
116 .connect_timeout(Duration::from_secs(10))
117 .request_timeout(Duration::from_millis(5000));
118 if config.tls {
119 builder = builder.tls();
120 }
121 if config.cluster {
122 builder = builder.cluster();
123 }
124 let client = builder.build()
125 .await
126 .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
127
128 // Verify connectivity
129 let pong: String = client
130 .cmd("PING")
131 .execute()
132 .await
133 .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
134 if pong != "PONG" {
135 return Err(SdkError::Config(format!(
136 "unexpected PING response: {pong}"
137 )));
138 }
139
140 // Guard against two worker processes sharing the same
141 // `worker_instance_id`. A duplicate instance would clobber each
142 // other's lease_current/active_index entries and double-claim work.
143 // SET NX on a liveness key with 2× lease TTL; if the key already
144 // exists another process is live. The key auto-expires if this
145 // process crashes without renewal, so a restart after a hard crash
146 // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
147 //
148 // Known limitations of this minimal scheme (documented for operators):
149 // 1. **Startup-only, not runtime.** There is no heartbeat renewal
150 // path. After `2 × lease_ttl_ms` elapses the alive key expires
151 // naturally even while this worker is still running, and a
152 // second process with the same `worker_instance_id` launched
153 // later will successfully SET NX alongside the first. The check
154 // catches misconfiguration at boot; it does not fence duplicates
155 // that appear mid-lifetime. Production deployments should rely
156 // on the orchestrator (Kubernetes, systemd unit with
157 // `Restart=on-failure`, etc.) as the authoritative single-
158 // instance enforcer; this SET NX is belt-and-suspenders.
159 //
160 // 2. **Restart delay after a crash.** If a worker crashes
161 // ungracefully (SIGKILL, container OOM) and is restarted within
162 // `2 × lease_ttl_ms`, the alive key is still present and the
163 // new process exits with `SdkError::Config("duplicate
164 // worker_instance_id ...")`. Options for operators:
165 // - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
166 // before restarting.
167 // - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
168 // unblock the restart.
169 // - Use a fresh `worker_instance_id` for the restart (the
170 // orchestrator should already do this per-Pod).
171 //
172 // 3. **No graceful cleanup on shutdown.** There is no explicit
173 // `disconnect()` call that DELs the alive key. On clean
174 // `SIGTERM` the key lingers until its TTL expires. A follow-up
175 // can add `FlowFabricWorker::disconnect(self)` for callers that
176 // want to skip the restart-delay window.
177 let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
178 let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
179 let set_result: Option<String> = client
180 .cmd("SET")
181 .arg(&alive_key)
182 .arg("1")
183 .arg("NX")
184 .arg("PX")
185 .arg(alive_ttl_ms.to_string().as_str())
186 .execute()
187 .await
188 .map_err(|e| SdkError::ValkeyContext {
189 source: e,
190 context: "SET NX worker alive key".into(),
191 })?;
192 if set_result.is_none() {
193 return Err(SdkError::Config(format!(
194 "duplicate worker_instance_id '{}': another process already holds {alive_key}",
195 config.worker_instance_id
196 )));
197 }
198
199 // Read partition config from Valkey (set by ff-server on startup).
200 // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
201 let partition_config = read_partition_config(&client).await
202 .unwrap_or_else(|e| {
203 tracing::warn!(
204 error = %e,
205 "ff:config:partitions not found, using defaults"
206 );
207 PartitionConfig::default()
208 });
209
210 let max_tasks = config.max_concurrent_tasks.max(1);
211 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
212
213 tracing::info!(
214 worker_id = %config.worker_id,
215 instance_id = %config.worker_instance_id,
216 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
217 "FlowFabricWorker connected"
218 );
219
220 #[cfg(feature = "direct-valkey-claim")]
221 let scan_cursor_init = scan_cursor_seed(
222 config.worker_instance_id.as_str(),
223 partition_config.num_flow_partitions.max(1) as usize,
224 );
225
226 // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
227 // and deduplicates in one pass; string joining happens once here.
228 //
229 // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
230 // - `,` is the CSV delimiter; a token containing one would split
231 // mid-parse and could let a {"gpu"} worker appear to satisfy
232 // {"gpu,cuda"} (silent auth bypass).
233 // - Empty strings would produce leading/adjacent commas on the
234 // wire and inflate token count for no semantic reason.
235 // - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
236 // `"gpu\n"` vs `"gpu"` produce silent mismatches that are
237 // miserable to debug. Reject anything outside printable ASCII
238 // excluding space (`'!'..='~'`) at ingress so a typo fails
239 // loudly at connect instead of silently mis-routing forever.
240 // Reject at boot so operator misconfig is loud, symmetric with the
241 // scheduler path.
242 #[cfg(feature = "direct-valkey-claim")]
243 for cap in &config.capabilities {
244 if cap.is_empty() {
245 return Err(SdkError::Config(
246 "capability token must not be empty".into(),
247 ));
248 }
249 if cap.contains(',') {
250 return Err(SdkError::Config(format!(
251 "capability token may not contain ',' (CSV delimiter): {cap:?}"
252 )));
253 }
254 // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
255 // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
256 // characters above 0x7F are ALLOWED so i18n caps like
257 // "东京-gpu" can be used. The CSV wire form is byte-safe for
258 // multibyte UTF-8 because `,` is always a single byte and
259 // never part of a multibyte continuation (only 0x80-0xBF are
260 // continuations, ',' is 0x2C).
261 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
262 return Err(SdkError::Config(format!(
263 "capability token must not contain whitespace or control characters: {cap:?}"
264 )));
265 }
266 }
267 #[cfg(feature = "direct-valkey-claim")]
268 let worker_capabilities_csv: String = {
269 let set: std::collections::BTreeSet<&str> = config
270 .capabilities
271 .iter()
272 .map(|s| s.as_str())
273 .filter(|s| !s.is_empty())
274 .collect();
275 if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
276 return Err(SdkError::Config(format!(
277 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
278 ff_core::policy::CAPS_MAX_TOKENS,
279 set.len()
280 )));
281 }
282 let csv = set.into_iter().collect::<Vec<_>>().join(",");
283 if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
284 return Err(SdkError::Config(format!(
285 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
286 ff_core::policy::CAPS_MAX_BYTES,
287 csv.len()
288 )));
289 }
290 csv
291 };
292
293 // Short stable digest of the sorted caps CSV, computed once so
294 // per-mismatch logs carry a stable identifier instead of the 4KB
295 // CSV. Shared helper — ff-scheduler uses the same one for its
296 // own per-mismatch logs, so cross-component log lines are
297 // diffable against each other.
298 #[cfg(feature = "direct-valkey-claim")]
299 let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
300
301 // Full CSV logged once at connect so per-mismatch logs (which
302 // carry only the 8-hex hash) can be cross-referenced by ops.
303 #[cfg(feature = "direct-valkey-claim")]
304 if !worker_capabilities_csv.is_empty() {
305 tracing::info!(
306 worker_instance_id = %config.worker_instance_id,
307 worker_caps_hash = %worker_capabilities_hash,
308 worker_caps = %worker_capabilities_csv,
309 "worker connected with capabilities (full CSV — mismatch logs use hash only)"
310 );
311 }
312
313 // Non-authoritative advertisement of caps for operator visibility
314 // (CLI introspection, dashboards). The AUTHORITATIVE source for
315 // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
316 // that, never this string. Lossy here is correctness-safe.
317 //
318 // Storage: a single STRING key holding the sorted CSV. Rationale:
319 // * **Atomic overwrite.** `SET` is a single command — a concurrent
320 // reader can never observe a transient empty value (the prior
321 // DEL+SADD pair had that window).
322 // * **Crash cleanup without refresh loop.** The alive-key SET NX
323 // is startup-only (see §1 above); there's no periodic renew
324 // to piggy-back on, so a TTL on caps would independently
325 // expire mid-flight and hide a live worker's caps from ops
326 // tools. Instead we drop the TTL: each reconnect overwrites;
327 // a crashed worker leaves a stale CSV until a new process
328 // with the same worker_instance_id boots (which triggers
329 // `duplicate worker_instance_id` via alive-key guard anyway —
330 // the orchestrator allocates a new id, and operators can DEL
331 // the stale caps key if they care).
332 // * **Empty caps = DEL.** A restart from {gpu} to {} clears the
333 // advertisement rather than leaving stale data.
334 // Cluster-safe advertisement: the per-worker caps STRING lives at
335 // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
336 // and the INSTANCE ID is SADD'd to the global workers-index SET
337 // `ff:idx:workers` (single slot). The unblock scanner's cluster
338 // enumeration uses SMEMBERS on the index + per-member GET on each
339 // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
340 // cluster mode only scans the shard the SCAN lands on and misses
341 // workers whose key hashes elsewhere). Pattern mirrors Batch A
342 // `budget_policies_index` / `flow_index` / `deps_all_edges`:
343 // operations stay atomic per command, the index is the
344 // cluster-wide enumeration surface.
345 #[cfg(feature = "direct-valkey-claim")]
346 {
347 let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
348 let index_key = ff_core::keys::workers_index_key();
349 let instance_id = config.worker_instance_id.to_string();
350 if worker_capabilities_csv.is_empty() {
351 // No caps advertised. DEL the per-worker caps string AND
352 // SREM from the index so the scanner doesn't GET an empty
353 // string for a worker that never declares caps.
354 let _ = client
355 .cmd("DEL")
356 .arg(&caps_key)
357 .execute::<Option<i64>>()
358 .await;
359 if let Err(e) = client
360 .cmd("SREM")
361 .arg(&index_key)
362 .arg(&instance_id)
363 .execute::<Option<i64>>()
364 .await
365 {
366 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
367 "SREM workers-index failed; continuing (non-authoritative)");
368 }
369 } else {
370 // Atomic overwrite of the caps STRING (one-command). Then
371 // SADD to the index (idempotent — re-running connect for
372 // the same id is a no-op at SADD level). The per-worker
373 // caps key is written BEFORE the index SADD so that when
374 // the scanner observes the id in the index, the caps key
375 // is guaranteed to resolve to a non-stale CSV (the reverse
376 // order would leak an index entry pointing at a stale or
377 // empty caps key during a narrow window).
378 if let Err(e) = client
379 .cmd("SET")
380 .arg(&caps_key)
381 .arg(&worker_capabilities_csv)
382 .execute::<Option<String>>()
383 .await
384 {
385 tracing::warn!(error = %e, key = %caps_key,
386 "SET worker caps advertisement failed; continuing");
387 }
388 if let Err(e) = client
389 .cmd("SADD")
390 .arg(&index_key)
391 .arg(&instance_id)
392 .execute::<Option<i64>>()
393 .await
394 {
395 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
396 "SADD workers-index failed; continuing");
397 }
398 }
399 }
400
401 Ok(Self {
402 client,
403 config,
404 partition_config,
405 #[cfg(feature = "direct-valkey-claim")]
406 worker_capabilities_csv,
407 #[cfg(feature = "direct-valkey-claim")]
408 worker_capabilities_hash,
409 #[cfg(feature = "direct-valkey-claim")]
410 lane_index: AtomicUsize::new(0),
411 concurrency_semaphore,
412 #[cfg(feature = "direct-valkey-claim")]
413 scan_cursor: AtomicUsize::new(scan_cursor_init),
414 })
415 }
416
417 /// Get a reference to the underlying ferriskey client.
418 pub fn client(&self) -> &Client {
419 &self.client
420 }
421
422 /// Get the worker config.
423 pub fn config(&self) -> &WorkerConfig {
424 &self.config
425 }
426
427 /// Get the server-published partition config this worker bound to at
428 /// `connect()`. Callers need this when computing partition hash-tags
429 /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
430 /// with the server's `num_flow_partitions` — using
431 /// `PartitionConfig::default()` assumes 256 partitions and silently
432 /// misses data on deployments with any other value.
433 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
434 &self.partition_config
435 }
436
437 /// Attempt to claim the next eligible execution.
438 ///
439 /// Phase 1 simplified claim flow:
440 /// 1. Pick a lane (round-robin across configured lanes)
441 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
442 /// 3. Claim the execution via `ff_claim_execution`
443 /// 4. Read execution payload + tags
444 /// 5. Return a [`ClaimedTask`] with auto lease renewal
445 ///
446 /// Gated behind the `direct-valkey-claim` feature — bypasses the
447 /// scheduler's budget / quota / capability admission checks. Enable
448 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
449 /// the scheduler hop would be measurement noise (benches) or when
450 /// the test harness needs a deterministic worker-local path. Prefer
451 /// the scheduler-routed HTTP claim path in production.
452 ///
453 /// # `None` semantics
454 ///
455 /// `Ok(None)` means **no work was found in the partition window this
456 /// poll covered**, not "the cluster is idle". Each call scans a chunk
457 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
458 /// `scan_cursor`; the cursor advances by that chunk size on every
459 /// invocation, so a worker covers every partition exactly once every
460 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
461 ///
462 /// Callers should treat `None` as "poll again soon" (typically after
463 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
464 /// time". Backing off too aggressively on `None` can starve workers
465 /// when work lives on partitions outside the current window.
466 ///
467 /// Returns `Err` on Valkey errors or script failures.
468 #[cfg(feature = "direct-valkey-claim")]
469 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
470 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
471 // try_acquire returns immediately — if no permits available, the worker
472 // is at capacity and should not claim more work.
473 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
474 Ok(p) => p,
475 Err(_) => return Ok(None), // At capacity — no claim attempted
476 };
477
478 let lane_id = self.next_lane();
479 let now = TimestampMs::now();
480
481 // Phase 1: We scan eligible executions directly by reading the eligible
482 // ZSET across execution partitions. In production the scheduler
483 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
484 // simplified inline claim.
485 //
486 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
487 // partitions starting at a rolling offset. This keeps idle Valkey
488 // load at O(chunk) per worker-second instead of O(num_partitions),
489 // and the worker-instance-seeded initial cursor spreads concurrent
490 // workers across different partition windows.
491 let num_partitions = self.partition_config.num_flow_partitions as usize;
492 if num_partitions == 0 {
493 return Ok(None);
494 }
495 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
496 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
497
498 for step in 0..chunk {
499 let partition_idx = ((start + step) % num_partitions) as u16;
500 let partition = ff_core::partition::Partition {
501 family: ff_core::partition::PartitionFamily::Execution,
502 index: partition_idx,
503 };
504 let idx = IndexKeys::new(&partition);
505 let eligible_key = idx.lane_eligible(&lane_id);
506
507 // ZRANGEBYSCORE to get the highest-priority eligible execution.
508 // Score format: -(priority * 1_000_000_000_000) + created_at_ms
509 // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
510 let result: Value = self
511 .client
512 .cmd("ZRANGEBYSCORE")
513 .arg(&eligible_key)
514 .arg("-inf")
515 .arg("+inf")
516 .arg("LIMIT")
517 .arg("0")
518 .arg("1")
519 .execute()
520 .await
521 .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
522
523 let execution_id_str = match extract_first_array_string(&result) {
524 Some(s) => s,
525 None => continue, // No eligible executions on this partition
526 };
527
528 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
529 SdkError::Script(ff_script::error::ScriptError::Parse(format!(
530 "bad execution_id in eligible set: {e}"
531 )))
532 })?;
533
534 // Step 1: Issue claim grant
535 let grant_result = self
536 .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
537 .await;
538
539 match grant_result {
540 Ok(()) => {}
541 Err(SdkError::Script(ff_script::error::ScriptError::CapabilityMismatch(
542 ref missing,
543 ))) => {
544 // Block-on-mismatch (RFC-009 §7.5) — parity with
545 // ff-scheduler's Scheduler::claim_for_worker. Without
546 // this, the inline-direct-claim path would hot-loop
547 // on an unclaimable top-of-zset (every tick picks the
548 // same execution, wastes an FCALL, logs, releases,
549 // repeats). The scheduler-side unblock scanner
550 // promotes blocked_route executions back to eligible
551 // when a worker with matching caps registers.
552 tracing::info!(
553 execution_id = %execution_id,
554 worker_id = %self.config.worker_id,
555 worker_caps_hash = %self.worker_capabilities_hash,
556 missing = %missing,
557 "capability mismatch, blocking execution off eligible (SDK inline claim)"
558 );
559 self.block_route(&execution_id, &lane_id, &partition, &idx).await;
560 continue;
561 }
562 Err(SdkError::Script(ref e)) if is_retryable_claim_error(e) => {
563 tracing::debug!(
564 execution_id = %execution_id,
565 error = %e,
566 "claim grant failed (retryable), trying next partition"
567 );
568 continue;
569 }
570 Err(e) => return Err(e),
571 }
572
573 // Step 2: Claim the execution
574 match self
575 .claim_execution(&execution_id, &lane_id, &partition, now)
576 .await
577 {
578 Ok(mut task) => {
579 // Transfer concurrency permit to the task. When the task is
580 // completed/failed/cancelled/dropped the permit returns to
581 // the semaphore, allowing another claim.
582 task.set_concurrency_permit(permit);
583 return Ok(Some(task));
584 }
585 Err(SdkError::Script(ff_script::error::ScriptError::UseClaimResumedExecution)) => {
586 // Execution was resumed from suspension — attempt_interrupted.
587 // ff_claim_execution rejects this; use ff_claim_resumed_execution
588 // which reuses the existing attempt instead of creating a new one.
589 tracing::debug!(
590 execution_id = %execution_id,
591 "execution is resumed, using claim_resumed path"
592 );
593 match self
594 .claim_resumed_execution(&execution_id, &lane_id, &partition)
595 .await
596 {
597 Ok(mut task) => {
598 task.set_concurrency_permit(permit);
599 return Ok(Some(task));
600 }
601 Err(SdkError::Script(ref e2)) if is_retryable_claim_error(e2) => {
602 tracing::debug!(
603 execution_id = %execution_id,
604 error = %e2,
605 "claim_resumed failed (retryable), trying next partition"
606 );
607 continue;
608 }
609 Err(e2) => return Err(e2),
610 }
611 }
612 Err(SdkError::Script(ref e)) if is_retryable_claim_error(e) => {
613 tracing::debug!(
614 execution_id = %execution_id,
615 error = %e,
616 "claim execution failed (retryable), trying next partition"
617 );
618 continue;
619 }
620 Err(e) => return Err(e),
621 }
622 }
623
624 // No eligible work found on any partition
625 Ok(None)
626 }
627
628 #[cfg(feature = "direct-valkey-claim")]
629 async fn issue_claim_grant(
630 &self,
631 execution_id: &ExecutionId,
632 lane_id: &LaneId,
633 partition: &ff_core::partition::Partition,
634 idx: &IndexKeys,
635 ) -> Result<(), SdkError> {
636 let ctx = ExecKeyContext::new(partition, execution_id);
637
638 // KEYS (3): exec_core, claim_grant_key, eligible_zset
639 let keys: Vec<String> = vec![
640 ctx.core(),
641 ctx.claim_grant(),
642 idx.lane_eligible(lane_id),
643 ];
644
645 // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
646 // capability_hash, grant_ttl_ms, route_snapshot_json,
647 // admission_summary, worker_capabilities_csv (sorted)
648 let args: Vec<String> = vec![
649 execution_id.to_string(),
650 self.config.worker_id.to_string(),
651 self.config.worker_instance_id.to_string(),
652 lane_id.to_string(),
653 String::new(), // capability_hash
654 "5000".to_owned(), // grant_ttl_ms (5 seconds)
655 String::new(), // route_snapshot_json
656 String::new(), // admission_summary
657 self.worker_capabilities_csv.clone(), // sorted CSV
658 ];
659
660 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
661 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
662
663 let raw: Value = self
664 .client
665 .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
666 .await
667 .map_err(SdkError::Valkey)?;
668
669 crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
670 }
671
672 /// Move an execution from the lane's eligible ZSET into its
673 /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
674 /// after a `CapabilityMismatch` reject — without this, the inline
675 /// direct-claim path would re-pick the same top-of-zset every tick
676 /// (same pattern the scheduler's block_candidate handles). The
677 /// engine's unblock scanner periodically promotes blocked_route
678 /// back to eligible once a worker with matching caps registers.
679 ///
680 /// Best-effort: transport or logical rejects (e.g. the execution
681 /// already went terminal between pick and block) are logged and the
682 /// outer loop simply `continue`s to the next partition. Parity with
683 /// ff-scheduler::Scheduler::block_candidate.
684 #[cfg(feature = "direct-valkey-claim")]
685 async fn block_route(
686 &self,
687 execution_id: &ExecutionId,
688 lane_id: &LaneId,
689 partition: &ff_core::partition::Partition,
690 idx: &IndexKeys,
691 ) {
692 let ctx = ExecKeyContext::new(partition, execution_id);
693 let core_key = ctx.core();
694 let eligible_key = idx.lane_eligible(lane_id);
695 let blocked_key = idx.lane_blocked_route(lane_id);
696 let eid_s = execution_id.to_string();
697 let now_ms = TimestampMs::now().0.to_string();
698
699 let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
700 let argv: [&str; 4] = [
701 &eid_s,
702 "waiting_for_capable_worker",
703 "no connected worker satisfies required_capabilities",
704 &now_ms,
705 ];
706
707 match self
708 .client
709 .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
710 .await
711 {
712 Ok(v) => {
713 // Parse Lua result so a logical reject (e.g. execution
714 // went terminal mid-flight) is visible — same fix we
715 // applied to ff-scheduler's block_candidate.
716 if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
717 tracing::warn!(
718 execution_id = %execution_id,
719 error = %e,
720 "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
721 will re-evaluate"
722 );
723 }
724 }
725 Err(e) => {
726 tracing::warn!(
727 execution_id = %execution_id,
728 error = %e,
729 "SDK block_route: transport failure; eligible ZSET unchanged"
730 );
731 }
732 }
733 }
734
735 /// Low-level claim of a granted execution. Invokes
736 /// `ff_claim_execution` and returns a `ClaimedTask` with auto
737 /// lease renewal.
738 ///
739 /// Previously gated behind `direct-valkey-claim`; ungated so
740 /// the public [`claim_from_grant`] entry point can reuse the
741 /// same FCALL plumbing. The method stays private — external
742 /// callers use `claim_from_grant`.
743 ///
744 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
745 async fn claim_execution(
746 &self,
747 execution_id: &ExecutionId,
748 lane_id: &LaneId,
749 partition: &ff_core::partition::Partition,
750 _now: TimestampMs,
751 ) -> Result<ClaimedTask, SdkError> {
752 let ctx = ExecKeyContext::new(partition, execution_id);
753 let idx = IndexKeys::new(partition);
754
755 // Pre-read total_attempt_count from exec_core to derive next attempt index.
756 // The Lua uses total_attempt_count as the new index and dynamically builds
757 // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
758 // we pass the correct index for documentation/debugging.
759 let total_str: Option<String> = self.client
760 .cmd("HGET")
761 .arg(ctx.core())
762 .arg("total_attempt_count")
763 .execute()
764 .await
765 .unwrap_or(None);
766 let next_idx = total_str
767 .as_deref()
768 .and_then(|s| s.parse::<u32>().ok())
769 .unwrap_or(0);
770 let att_idx = AttemptIndex::new(next_idx);
771
772 let lease_id = LeaseId::new().to_string();
773 let attempt_id = AttemptId::new().to_string();
774 let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
775
776 // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
777 let keys: Vec<String> = vec![
778 ctx.core(), // 1 exec_core
779 ctx.claim_grant(), // 2 claim_grant
780 idx.lane_eligible(lane_id), // 3 eligible_zset
781 idx.lease_expiry(), // 4 lease_expiry_zset
782 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
783 ctx.attempt_hash(att_idx), // 6 attempt_hash (placeholder)
784 ctx.attempt_usage(att_idx), // 7 attempt_usage (placeholder)
785 ctx.attempt_policy(att_idx), // 8 attempt_policy (placeholder)
786 ctx.attempts(), // 9 attempts_zset
787 ctx.lease_current(), // 10 lease_current
788 ctx.lease_history(), // 11 lease_history
789 idx.lane_active(lane_id), // 12 active_index
790 idx.attempt_timeout(), // 13 attempt_timeout_zset
791 idx.execution_deadline(), // 14 execution_deadline_zset
792 ];
793
794 // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
795 let args: Vec<String> = vec![
796 execution_id.to_string(), // 1 execution_id
797 self.config.worker_id.to_string(), // 2 worker_id
798 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
799 lane_id.to_string(), // 4 lane
800 String::new(), // 5 capability_hash
801 lease_id.clone(), // 6 lease_id
802 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
803 renew_before_ms.to_string(), // 8 renew_before_ms
804 attempt_id.clone(), // 9 attempt_id
805 "{}".to_owned(), // 10 attempt_policy_json
806 String::new(), // 11 attempt_timeout_ms
807 String::new(), // 12 execution_deadline_at
808 ];
809
810 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
811 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
812
813 let raw: Value = self
814 .client
815 .fcall("ff_claim_execution", &key_refs, &arg_refs)
816 .await
817 .map_err(SdkError::Valkey)?;
818
819 // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
820 // attempt_id, attempt_type, lease_expires_at}
821 let arr = match &raw {
822 Value::Array(arr) => arr,
823 _ => {
824 return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
825 "ff_claim_execution: expected Array".into(),
826 )));
827 }
828 };
829
830 let status_code = match arr.first() {
831 Some(Ok(Value::Int(n))) => *n,
832 _ => {
833 return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
834 "ff_claim_execution: bad status code".into(),
835 )));
836 }
837 };
838
839 if status_code != 1 {
840 let err_field_str = |idx: usize| -> String {
841 arr.get(idx)
842 .and_then(|v| match v {
843 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
844 Ok(Value::SimpleString(s)) => Some(s.clone()),
845 _ => None,
846 })
847 .unwrap_or_default()
848 };
849 let error_code = {
850 let s = err_field_str(1);
851 if s.is_empty() { "unknown".to_owned() } else { s }
852 };
853 let detail = err_field_str(2);
854
855 return Err(SdkError::Script(
856 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
857 .unwrap_or_else(|| {
858 ff_script::error::ScriptError::Parse(format!(
859 "ff_claim_execution: {error_code}"
860 ))
861 }),
862 ));
863 }
864
865 // Extract fields from success response
866 let field_str = |idx: usize| -> String {
867 arr.get(idx + 2) // skip status_code and "OK"
868 .and_then(|v| match v {
869 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
870 Ok(Value::SimpleString(s)) => Some(s.clone()),
871 Ok(Value::Int(n)) => Some(n.to_string()),
872 _ => None,
873 })
874 .unwrap_or_default()
875 };
876
877 // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
878 // Positions: 0 1 2 3 4 5
879 let lease_id = LeaseId::parse(&field_str(0))
880 .unwrap_or_else(|_| LeaseId::new());
881 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
882 // field_str(2) is expires_at — skip it (lease timing managed by renewal)
883 let attempt_id = AttemptId::parse(&field_str(3))
884 .unwrap_or_else(|_| AttemptId::new());
885 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
886
887 // Read execution payload and metadata
888 let (input_payload, execution_kind, tags) = self
889 .read_execution_context(execution_id, partition)
890 .await?;
891
892 Ok(ClaimedTask::new(
893 self.client.clone(),
894 self.partition_config,
895 execution_id.clone(),
896 attempt_index,
897 attempt_id,
898 lease_id,
899 lease_epoch,
900 self.config.lease_ttl_ms,
901 lane_id.clone(),
902 self.config.worker_instance_id.clone(),
903 input_payload,
904 execution_kind,
905 tags,
906 ))
907 }
908
909 /// Consume a [`ClaimGrant`] and claim the granted execution on
910 /// this worker. The intended production entry point: pair with
911 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
912 /// scheduler-issued grants into the SDK without enabling the
913 /// `direct-valkey-claim` feature (which bypasses budget/quota
914 /// admission control).
915 ///
916 /// The worker's concurrency semaphore is checked BEFORE the FCALL
917 /// so a saturated worker does not consume the grant: the grant
918 /// stays valid for its remaining TTL and the caller can either
919 /// release it back to the scheduler or retry after some other
920 /// in-flight task completes.
921 ///
922 /// On success the returned [`ClaimedTask`] holds a concurrency
923 /// permit that releases automatically on
924 /// `complete`/`fail`/`cancel`/drop — same contract as
925 /// `claim_next`.
926 ///
927 /// # Arguments
928 ///
929 /// * `lane` — the lane the grant was issued for. Must match what
930 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
931 /// uses it to look up `lane_eligible`, `lane_active`, and the
932 /// `worker_leases` index slot.
933 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
934 ///
935 /// # Errors
936 ///
937 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
938 /// permits all held. Retryable; the grant is untouched.
939 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
940 /// or `worker_id` mismatch (wrapped in [`SdkError::Script`]).
941 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
942 /// (wrapped in [`SdkError::Script`]).
943 /// * `ScriptError::CapabilityMismatch` — execution's required
944 /// capabilities not a subset of this worker's caps (wrapped in
945 /// [`SdkError::Script`]). Surfaced post-grant if a race
946 /// between grant issuance and caps change allows it.
947 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
948 /// unexpected shape (wrapped in [`SdkError::Script`]).
949 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
950 /// transport error during the FCALL or the
951 /// `read_execution_context` follow-up.
952 ///
953 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
954 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
955 pub async fn claim_from_grant(
956 &self,
957 lane: LaneId,
958 grant: ff_core::contracts::ClaimGrant,
959 ) -> Result<ClaimedTask, SdkError> {
960 // Semaphore check FIRST. If the worker is saturated we must
961 // surface the condition to the caller without touching the
962 // grant — silently returning Ok(None) (as claim_next does)
963 // would drop a grant the scheduler has already committed work
964 // to issuing, wasting the slot until its TTL elapses.
965 let permit = self
966 .concurrency_semaphore
967 .clone()
968 .try_acquire_owned()
969 .map_err(|_| SdkError::WorkerAtCapacity)?;
970
971 let now = TimestampMs::now();
972 let mut task = self
973 .claim_execution(&grant.execution_id, &lane, &grant.partition, now)
974 .await?;
975 task.set_concurrency_permit(permit);
976 Ok(task)
977 }
978
979 /// Scheduler-routed claim: POST the server's
980 /// `/v1/workers/{id}/claim`, then chain to
981 /// [`Self::claim_from_grant`].
982 ///
983 /// Batch C item 2 PR-B. This is the production entry point —
984 /// budget + quota + capability admission run server-side inside
985 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
986 /// enable the `direct-valkey-claim` feature.
987 ///
988 /// Returns `Ok(None)` when the server says no eligible execution
989 /// (HTTP 204). Callers typically back off by
990 /// `config.claim_poll_interval_ms` and try again, same cadence
991 /// as the direct-claim path's `Ok(None)`.
992 ///
993 /// The `admin` client is the established HTTP surface
994 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
995 /// second reqwest client around. Build once at worker boot and
996 /// hand in by reference on every claim.
997 pub async fn claim_via_server(
998 &self,
999 admin: &crate::FlowFabricAdminClient,
1000 lane: &LaneId,
1001 grant_ttl_ms: u64,
1002 ) -> Result<Option<ClaimedTask>, SdkError> {
1003 let req = crate::admin::ClaimForWorkerRequest {
1004 worker_id: self.config.worker_id.to_string(),
1005 lane_id: lane.to_string(),
1006 worker_instance_id: self.config.worker_instance_id.to_string(),
1007 capabilities: self.config.capabilities.clone(),
1008 grant_ttl_ms,
1009 };
1010 let Some(resp) = admin.claim_for_worker(req).await? else {
1011 return Ok(None);
1012 };
1013 let grant = resp.into_grant()?;
1014 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1015 }
1016
1017 /// Consume a [`ReclaimGrant`] and transition the granted
1018 /// `attempt_interrupted` execution into a `started` state on this
1019 /// worker. Symmetric partner to [`claim_from_grant`] for the
1020 /// resume path.
1021 ///
1022 /// The grant must have been issued to THIS worker (matching
1023 /// `worker_id` at grant time). A mismatch returns
1024 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1025 /// atomically by `ff_claim_resumed_execution`; a second call with
1026 /// the same grant also returns `InvalidClaimGrant`.
1027 ///
1028 /// # Concurrency
1029 ///
1030 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1031 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1032 /// assume pre-existing capacity on this worker — a reclaim can
1033 /// land on a fresh worker instance that just came up after a
1034 /// crash/restart and is picking up a previously-interrupted
1035 /// execution. If the worker is saturated, the grant stays valid
1036 /// for its remaining TTL and the caller can release it or retry.
1037 ///
1038 /// On success the returned [`ClaimedTask`] holds a concurrency
1039 /// permit that releases automatically on
1040 /// `complete`/`fail`/`cancel`/drop.
1041 ///
1042 /// # Errors
1043 ///
1044 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1045 /// permits all held. Retryable; the grant is untouched (no
1046 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1047 /// atomically consume the grant key).
1048 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1049 /// or `worker_id` mismatch.
1050 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1051 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1052 /// `attempt_interrupted`.
1053 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1054 /// not `runnable`.
1055 /// * `ScriptError::ExecutionNotFound` — core key missing.
1056 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1057 /// transport.
1058 ///
1059 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1060 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1061 pub async fn claim_from_reclaim_grant(
1062 &self,
1063 grant: ff_core::contracts::ReclaimGrant,
1064 ) -> Result<ClaimedTask, SdkError> {
1065 // Semaphore check FIRST — same load-bearing ordering as
1066 // `claim_from_grant`. If the worker is saturated, surface
1067 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1068 // atomic consume on the grant key, so calling it past-
1069 // saturation would destroy the grant while leaving no
1070 // permit to attach to the returned `ClaimedTask`.
1071 let permit = self
1072 .concurrency_semaphore
1073 .clone()
1074 .try_acquire_owned()
1075 .map_err(|_| SdkError::WorkerAtCapacity)?;
1076
1077 // Grant carries partition + lane_id so no round-trip is needed
1078 // to resolve them before the FCALL.
1079 let mut task = self
1080 .claim_resumed_execution(
1081 &grant.execution_id,
1082 &grant.lane_id,
1083 &grant.partition,
1084 )
1085 .await?;
1086 task.set_concurrency_permit(permit);
1087 Ok(task)
1088 }
1089
1090 /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1091 /// and returns a `ClaimedTask` bound to the resumed attempt.
1092 ///
1093 /// Previously gated behind `direct-valkey-claim`; ungated so the
1094 /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1095 /// The method stays private — external callers use
1096 /// `claim_from_reclaim_grant`.
1097 ///
1098 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1099 async fn claim_resumed_execution(
1100 &self,
1101 execution_id: &ExecutionId,
1102 lane_id: &LaneId,
1103 partition: &ff_core::partition::Partition,
1104 ) -> Result<ClaimedTask, SdkError> {
1105 let ctx = ExecKeyContext::new(partition, execution_id);
1106 let idx = IndexKeys::new(partition);
1107
1108 // Pre-read current_attempt_index for the existing attempt hash key.
1109 // This is load-bearing: KEYS[6] must point to the real attempt hash.
1110 let att_idx_str: Option<String> = self.client
1111 .cmd("HGET")
1112 .arg(ctx.core())
1113 .arg("current_attempt_index")
1114 .execute()
1115 .await
1116 .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1117 let att_idx = AttemptIndex::new(
1118 att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1119 );
1120
1121 let lease_id = LeaseId::new().to_string();
1122
1123 // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1124 let keys: Vec<String> = vec![
1125 ctx.core(), // 1 exec_core
1126 ctx.claim_grant(), // 2 claim_grant
1127 idx.lane_eligible(lane_id), // 3 eligible_zset
1128 idx.lease_expiry(), // 4 lease_expiry_zset
1129 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1130 ctx.attempt_hash(att_idx), // 6 existing_attempt_hash
1131 ctx.lease_current(), // 7 lease_current
1132 ctx.lease_history(), // 8 lease_history
1133 idx.lane_active(lane_id), // 9 active_index
1134 idx.attempt_timeout(), // 10 attempt_timeout_zset
1135 idx.execution_deadline(), // 11 execution_deadline_zset
1136 ];
1137
1138 // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1139 let args: Vec<String> = vec![
1140 execution_id.to_string(), // 1 execution_id
1141 self.config.worker_id.to_string(), // 2 worker_id
1142 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1143 lane_id.to_string(), // 4 lane
1144 String::new(), // 5 capability_hash
1145 lease_id.clone(), // 6 lease_id
1146 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1147 String::new(), // 8 remaining_attempt_timeout_ms
1148 ];
1149
1150 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1151 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1152
1153 let raw: Value = self
1154 .client
1155 .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1156 .await
1157 .map_err(SdkError::Valkey)?;
1158
1159 // Parse result — same format as ff_claim_execution:
1160 // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1161 let arr = match &raw {
1162 Value::Array(arr) => arr,
1163 _ => {
1164 return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
1165 "ff_claim_resumed_execution: expected Array".into(),
1166 )));
1167 }
1168 };
1169
1170 let status_code = match arr.first() {
1171 Some(Ok(Value::Int(n))) => *n,
1172 _ => {
1173 return Err(SdkError::Script(ff_script::error::ScriptError::Parse(
1174 "ff_claim_resumed_execution: bad status code".into(),
1175 )));
1176 }
1177 };
1178
1179 if status_code != 1 {
1180 let err_field_str = |idx: usize| -> String {
1181 arr.get(idx)
1182 .and_then(|v| match v {
1183 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1184 Ok(Value::SimpleString(s)) => Some(s.clone()),
1185 _ => None,
1186 })
1187 .unwrap_or_default()
1188 };
1189 let error_code = {
1190 let s = err_field_str(1);
1191 if s.is_empty() { "unknown".to_owned() } else { s }
1192 };
1193 let detail = err_field_str(2);
1194
1195 return Err(SdkError::Script(
1196 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1197 .unwrap_or_else(|| {
1198 ff_script::error::ScriptError::Parse(format!(
1199 "ff_claim_resumed_execution: {error_code}"
1200 ))
1201 }),
1202 ));
1203 }
1204
1205 let field_str = |idx: usize| -> String {
1206 arr.get(idx + 2)
1207 .and_then(|v| match v {
1208 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1209 Ok(Value::SimpleString(s)) => Some(s.clone()),
1210 Ok(Value::Int(n)) => Some(n.to_string()),
1211 _ => None,
1212 })
1213 .unwrap_or_default()
1214 };
1215
1216 let lease_id = LeaseId::parse(&field_str(0))
1217 .unwrap_or_else(|_| LeaseId::new());
1218 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1219 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1220 let attempt_id = AttemptId::parse(&field_str(3))
1221 .unwrap_or_else(|_| AttemptId::new());
1222
1223 let (input_payload, execution_kind, tags) = self
1224 .read_execution_context(execution_id, partition)
1225 .await?;
1226
1227 Ok(ClaimedTask::new(
1228 self.client.clone(),
1229 self.partition_config,
1230 execution_id.clone(),
1231 attempt_index,
1232 attempt_id,
1233 lease_id,
1234 lease_epoch,
1235 self.config.lease_ttl_ms,
1236 lane_id.clone(),
1237 self.config.worker_instance_id.clone(),
1238 input_payload,
1239 execution_kind,
1240 tags,
1241 ))
1242 }
1243
1244 /// Read payload + execution_kind + tags from exec_core. Previously
1245 /// gated behind `direct-valkey-claim`; now shared by the
1246 /// feature-gated inline claim path and the public
1247 /// `claim_from_reclaim_grant` entry point.
1248 async fn read_execution_context(
1249 &self,
1250 execution_id: &ExecutionId,
1251 partition: &ff_core::partition::Partition,
1252 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1253 let ctx = ExecKeyContext::new(partition, execution_id);
1254
1255 // Read payload
1256 let payload: Option<String> = self
1257 .client
1258 .get(&ctx.payload())
1259 .await
1260 .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1261 let input_payload = payload.unwrap_or_default().into_bytes();
1262
1263 // Read execution_kind from core
1264 let kind: Option<String> = self
1265 .client
1266 .hget(&ctx.core(), "execution_kind")
1267 .await
1268 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1269 let execution_kind = kind.unwrap_or_default();
1270
1271 // Read tags
1272 let tags: HashMap<String, String> = self
1273 .client
1274 .hgetall(&ctx.tags())
1275 .await
1276 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1277
1278 Ok((input_payload, execution_kind, tags))
1279 }
1280
1281 // ── Phase 3: Signal delivery ──
1282
1283 /// Deliver a signal to a suspended execution's waitpoint.
1284 ///
1285 /// The engine atomically records the signal, evaluates the resume condition,
1286 /// and optionally transitions the execution from `suspended` to `runnable`.
1287 pub async fn deliver_signal(
1288 &self,
1289 execution_id: &ExecutionId,
1290 waitpoint_id: &WaitpointId,
1291 signal: crate::task::Signal,
1292 ) -> Result<crate::task::SignalOutcome, SdkError> {
1293 let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1294 let ctx = ExecKeyContext::new(&partition, execution_id);
1295 let idx = IndexKeys::new(&partition);
1296
1297 let signal_id = ff_core::types::SignalId::new();
1298 let now = TimestampMs::now();
1299
1300 // Pre-read lane_id from exec_core — the execution may be on any lane,
1301 // not necessarily one of this worker's configured lanes.
1302 let lane_str: Option<String> = self
1303 .client
1304 .hget(&ctx.core(), "lane_id")
1305 .await
1306 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1307 let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1308
1309 // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1310 // exec_signals_zset, signal_hash, signal_payload,
1311 // idem_key, waitpoint_hash, suspension_current,
1312 // eligible_zset, suspended_zset, delayed_zset,
1313 // suspension_timeout_zset, hmac_secrets
1314 let idem_key = if let Some(ref ik) = signal.idempotency_key {
1315 ctx.signal_dedup(waitpoint_id, ik)
1316 } else {
1317 ctx.noop() // must share {p:N} hash tag for cluster mode
1318 };
1319 let keys: Vec<String> = vec![
1320 ctx.core(), // 1
1321 ctx.waitpoint_condition(waitpoint_id), // 2
1322 ctx.waitpoint_signals(waitpoint_id), // 3
1323 ctx.exec_signals(), // 4
1324 ctx.signal(&signal_id), // 5
1325 ctx.signal_payload(&signal_id), // 6
1326 idem_key, // 7
1327 ctx.waitpoint(waitpoint_id), // 8
1328 ctx.suspension_current(), // 9
1329 idx.lane_eligible(&lane_id), // 10
1330 idx.lane_suspended(&lane_id), // 11
1331 idx.lane_delayed(&lane_id), // 12
1332 idx.suspension_timeout(), // 13
1333 idx.waitpoint_hmac_secrets(), // 14
1334 ];
1335
1336 let payload_str = signal
1337 .payload
1338 .as_ref()
1339 .map(|p| String::from_utf8_lossy(p).into_owned())
1340 .unwrap_or_default();
1341
1342 // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1343 // signal_category, source_type, source_identity,
1344 // payload, payload_encoding, idempotency_key,
1345 // correlation_id, target_scope, created_at,
1346 // dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1347 // max_signals_per_execution, waitpoint_token
1348 let args: Vec<String> = vec![
1349 signal_id.to_string(), // 1
1350 execution_id.to_string(), // 2
1351 waitpoint_id.to_string(), // 3
1352 signal.signal_name, // 4
1353 signal.signal_category, // 5
1354 signal.source_type, // 6
1355 signal.source_identity, // 7
1356 payload_str, // 8
1357 "json".to_owned(), // 9 payload_encoding
1358 signal.idempotency_key.unwrap_or_default(), // 10
1359 String::new(), // 11 correlation_id
1360 "waitpoint".to_owned(), // 12 target_scope
1361 now.to_string(), // 13 created_at
1362 "86400000".to_owned(), // 14 dedup_ttl_ms
1363 "0".to_owned(), // 15 resume_delay_ms
1364 "1000".to_owned(), // 16 signal_maxlen
1365 "10000".to_owned(), // 17 max_signals
1366 // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1367 // use ToString/Display (those are redacted for log safety);
1368 // .as_str() is the explicit opt-in that gets the secret bytes.
1369 signal.waitpoint_token.as_str().to_owned(), // 18 waitpoint_token
1370 ];
1371
1372 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1373 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1374
1375 let raw: Value = self
1376 .client
1377 .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1378 .await
1379 .map_err(SdkError::Valkey)?;
1380
1381 crate::task::parse_signal_result(&raw)
1382 }
1383
1384 #[cfg(feature = "direct-valkey-claim")]
1385 fn next_lane(&self) -> LaneId {
1386 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1387 self.config.lanes[idx].clone()
1388 }
1389}
1390
1391#[cfg(feature = "direct-valkey-claim")]
1392fn is_retryable_claim_error(err: &ff_script::error::ScriptError) -> bool {
1393 use ff_core::error::ErrorClass;
1394 matches!(
1395 err.class(),
1396 ErrorClass::Retryable | ErrorClass::Informational
1397 )
1398}
1399
1400/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1401/// instance id with FNV-1a to place distinct worker processes on different
1402/// partition windows from their first poll. Zero is valid for single-worker
1403/// clusters but spreads work in multi-worker deployments.
1404#[cfg(feature = "direct-valkey-claim")]
1405fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1406 if num_partitions == 0 {
1407 return 0;
1408 }
1409 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1410}
1411
1412#[cfg(feature = "direct-valkey-claim")]
1413fn extract_first_array_string(value: &Value) -> Option<String> {
1414 match value {
1415 Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1416 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1417 Ok(Value::SimpleString(s)) => Some(s.clone()),
1418 _ => None,
1419 },
1420 _ => None,
1421 }
1422}
1423
1424/// Read partition config from Valkey's `ff:config:partitions` hash.
1425/// Returns Err if the key doesn't exist or can't be read.
1426async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1427 let key = ff_core::keys::global_config_partitions();
1428 let fields: HashMap<String, String> = client
1429 .hgetall(&key)
1430 .await
1431 .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1432
1433 if fields.is_empty() {
1434 return Err(SdkError::Config(
1435 "ff:config:partitions not found in Valkey".into(),
1436 ));
1437 }
1438
1439 let parse = |field: &str, default: u16| -> u16 {
1440 fields
1441 .get(field)
1442 .and_then(|v| v.parse().ok())
1443 .filter(|&n: &u16| n > 0)
1444 .unwrap_or(default)
1445 };
1446
1447 Ok(PartitionConfig {
1448 num_flow_partitions: parse("num_flow_partitions", 256),
1449 num_budget_partitions: parse("num_budget_partitions", 32),
1450 num_quota_partitions: parse("num_quota_partitions", 32),
1451 })
1452}
1453
1454#[cfg(all(test, feature = "direct-valkey-claim"))]
1455mod scan_cursor_tests {
1456 use super::scan_cursor_seed;
1457
1458 #[test]
1459 fn stable_for_same_input() {
1460 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1461 }
1462
1463 #[test]
1464 fn distinct_for_different_ids() {
1465 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1466 }
1467
1468 #[test]
1469 fn bounded_by_partition_count() {
1470 for i in 0..100 {
1471 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1472 }
1473 }
1474
1475 #[test]
1476 fn zero_partitions_returns_zero() {
1477 assert_eq!(scan_cursor_seed("w1", 0), 0);
1478 }
1479}